Web Streams 无处不在(以及 Node.js 的 Fetch)

Avatar of Ollie Williams
Ollie Williams

DigitalOcean 为您旅程的每个阶段提供云产品。开始使用 200 美元的免费积分!

Chrome 开发者布道师 Jake Archibald 将 2016 年称为“Web 流之年”。显然,他的预测有点过早。流标准在 2014 年宣布。这花了一段时间,但现在在 现代浏览器(仍在等待 Firefox…)和 Node(以及 Deno)中实现了一个一致的流 API。

什么是流?

流式传输涉及将资源拆分为称为块的较小片段,并一次处理每个块。使用流,您无需等待所有数据下载完成,而可以一旦第一个块可用就逐步处理数据。

有三种类型的流:可读流、可写流和转换流。可读流 是数据块的来源。例如,底层数据源可以是文件或 HTTP 连接。然后,数据可以(可选地)由转换流修改。然后,数据块可以被传递到可写流

Web 流无处不在

Node 始终拥有自己的流类型。它们通常被认为难以使用。Web 超文本应用技术工作组 (WHATWG) 的流 Web 标准后来出现,并且在很大程度上被认为是一个改进。Node 文档称它们为“Web 流”,这听起来不那么笨拙。原始的 Node 流不会被弃用或删除,但它们现在将与 Web 标准流 API 共存。这使得编写跨平台代码变得更容易,并且意味着开发人员只需要学习一种做事方式。

Deno,Node 原始创建者对服务器端 JavaScript 的另一种尝试,始终与浏览器 API 密切相关,并完全支持 Web 流。Cloudflare Workers(有点像服务工作者,但在 CDN 边缘位置运行)和 Deno Deploy(来自 Deno 的无服务器产品)也支持流。

fetch() 响应作为可读流

有多种方法可以创建可读流,但调用 fetch() 可能是最常见的方法。fetch() 的响应主体是可读流。

fetch('data.txt')
.then(response => console.log(response.body));

如果您查看控制台日志,您会发现可读流有几个有用的方法。正如规范所说,“可读流可以直接传递到可写流,使用其 pipeTo() 方法,或者它可以先通过一个或多个转换流传递,使用其 pipeThrough() 方法。”

与浏览器不同,Node 核心目前没有实现 fetch。node-fetch 是一个流行的依赖项,它试图匹配浏览器标准的 API,返回 Node 流,而不是 WHATWG 流。Undici 是 Node.js 团队改进的 HTTP/1.1 客户端,是 Node.js 核心 http.request(node-fetch 和 Axios 等构建在其之上)的现代替代方案。Undici 实现了 fetch - 并且 response.body 确实返回 Web 流。🎉

Undici 最终可能会进入 Node.js 核心,并且看起来它将成为在 Node 中处理 HTTP 请求的推荐方法。一旦你 npm install undici 并导入 fetch,它的工作方式与浏览器中相同。在下面的示例中,我们将流通过转换流传递。流的每个块都是一个 Uint8Array。Node 核心提供了一个 TextDecoderStream 来解码二进制数据。

import { fetch } from 'undici';
import { TextDecoderStream } from 'node:stream/web';

async function fetchStream() {
  const response = await fetch('https://example.com')
  const stream = response.body;
  const textStream = stream.pipeThrough(new TextDecoderStream());
}

response.body 是同步的,因此您不需要 await 它。在浏览器中,fetchTextDecoderStream 在全局对象上可用,因此您不会包含任何导入语句。除此之外,Node 和 Web 浏览器的代码完全相同。Deno 也内置支持fetchTextDecoderStream

异步迭代

for-await-of 循环是 for-of 循环的异步版本。常规的 for-of 循环用于循环遍历数组和其他可迭代对象。例如,for-await-of 循环可用于迭代承诺数组。

const promiseArray = [Promise.resolve("thing 1"), Promise.resolve("thing 2")];
for await (const thing of promiseArray) { console.log(thing); }

对我们来说重要的是,这也可以用于迭代流。

async function fetchAndLogStream() {
  const response = await fetch('https://example.com')
  const stream = response.body;
  const textStream = stream.pipeThrough(new TextDecoderStream());

  for await (const chunk of textStream) {
    console.log(chunk);
  }
}

fetchAndLogStream();

流的异步迭代在 Node 和 Deno 中有效。所有现代浏览器都已发布 for-await-of 循环,但它们尚无法在流上使用。

获取可读流的其他一些方法

Fetch 将是获取流的最常见方法之一,但还有其他方法。BlobFile 都有一个 .stream() 方法,它返回一个可读流。以下代码在现代浏览器以及 Node 和 Deno 中都有效——尽管在 Node 中,您需要在使用它之前 import { Blob } from 'buffer';

const blobStream = new Blob(['Lorem ipsum'], { type: 'text/plain' }).stream();

这是一个基于浏览器的前端示例:如果您的标记中包含 <input type="file">,则很容易将用户选择的文件作为流获取。

const fileStream = document.querySelector('input').files[0].stream();

在 Node 17 中发布,由 fs/promises open() 函数返回的 FileHandle 对象具有一个 .readableWebStream() 方法。

import {
  open,
} from 'node:fs/promises';

const file = await open('./some/file/to/read');

for await (const chunk of file.readableWebStream())
  console.log(chunk);

await file.close();

流与 Promise 配合使用效果很好

如果您需要在流完成之后执行某些操作,可以使用 Promise。

someReadableStream
.pipeTo(someWritableStream)
.then(() => console.log("all data successfully written"))
.catch(error => console.error("something went wrong", error))

或者,您可以选择等待结果

await someReadableStream.pipeTo(someWritableStream)

创建自己的转换流

我们已经看到了 TextDecoderStream(还有一个 TextEncoderStream)。您还可以从头开始创建自己的转换流。TransformStream 构造函数可以接受一个对象。您可以在对象中指定三个方法:starttransformflush。它们都是可选的,但 transform 才是真正执行转换的部分。

例如,假设 TextDecoderStream() 不存在,并实现相同的功能(但在生产环境中一定要使用 TextDecoderStream,因为以下是过于简化的示例)

const decoder = new TextDecoder();
const decodeStream = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(decoder.decode(chunk, {stream: true}));
  }
});

每个接收到的块都会被修改,然后由控制器转发。在上面的示例中,每个块都是一些编码的文本,它会被解码,然后转发。让我们快速了解一下其他两种方法

const transformStream = new TransformStream({
  start(controller) {
    // Called immediately when the TransformStream is created
  },

  flush(controller) {
    // Called when chunks are no longer being forwarded to the transformer
  }
});

转换流是可读流和可写流协同工作,通常用于转换某些数据。每个使用 new TransformStream() 创建的对象都具有一个名为 readable 的属性,它是一个 ReadableStream,以及一个名为 writable 的属性,它是一个可写流。调用 someReadableStream.pipeThrough() 将数据从 someReadableStream 写入 transformStream.writable,可能转换数据,然后将数据推送到 transformStream.readable

有些人发现创建实际上不转换数据的转换流很有帮助。这被称为“身份转换流”——通过调用new TransformStream()且不传入任何对象参数,或省略转换方法来创建。它将写入其可写端的所有块转发到其可读端,而无需任何更改。作为该概念的一个简单示例,以下代码将记录“hello”

const {readable, writable} = new TransformStream();
writable.getWriter().write('hello');
readable.getReader().read().then(({value, done}) => console.log(value))

创建自己的可读流

可以创建一个自定义流并使用自己的块填充它。new ReadableStream()构造函数接受一个对象,该对象可以包含一个start函数、一个pull函数和一个cancel函数。此函数在创建ReadableStream时立即被调用。在start函数内部,使用controller.enqueue将块添加到流中。

这是一个基本的“hello world”示例

import { ReadableStream } from "node:stream/web";
const readable = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

const allChunks = [];
for await (const chunk of readable) {
  allChunks.push(chunk);
}
console.log(allChunks.join(" "));

这是一个来自流规范的更真实的示例,它将WebSocket转换为可读流

function makeReadableWebSocketStream(url, protocols) {
  let websocket = new WebSocket(url, protocols);
  websocket.binaryType = "arraybuffer";

  return new ReadableStream({
    start(controller) {
      websocket.onmessage = event => controller.enqueue(event.data);
      websocket.onclose = () => controller.close();
      websocket.onerror = () => controller.error(new Error("The WebSocket errored"));
    }
  });
}

Node流互操作性

在Node中,旧的特定于Node的流工作方式不会被移除。旧的Node流API和Web流API将共存。因此,可能有时需要使用.fromWeb().toWeb()方法将Node流转换为Web流,反之亦然,这些方法将在Node 17中添加。

import {Readable} from 'node:stream';
import {fetch} from 'undici';

const response = await fetch(url);
const readableNodeStream = Readable.fromWeb(response.body);

结论

ES模块、EventTargetAbortControllerURL解析器Web CryptoBlobTextEncoder/Decoder:越来越多的浏览器API最终出现在Node.js中。知识和技能是可以转移的。Fetch和流是这种融合的重要组成部分。

Domenic Denicola,流规范的合著者之一,曾写道,流API的目标是为I/O提供一个高效的抽象和统一的原语,就像Promise已成为异步操作一样。为了在前端真正变得有用,更多API需要实际支持流。目前,MediaStream虽然名称如此,但并不是一个可读流。如果您正在处理视频或音频(至少目前是这样),则无法将可读流分配给srcObject。或者假设您想获取图像并将其通过转换流传递,然后将其插入页面。在撰写本文时,将流用作图像元素src的代码有些冗长

const response = await fetch('cute-cat.png');
const bodyStream = response.body;
const newResponse = new Response(bodyStream);
const blob = await newResponse.blob();
const url = URL.createObjectURL(blob);
document.querySelector('img').src = url;    

但是,随着时间的推移,浏览器和Node(以及Deno)中的更多API将使用流,因此值得学习。例如,Deno和Chrome中已经有用于处理WebSocket的流API。Chrome已实现Fetch请求流。Node和Chrome已实现可传输流,以将数据传输到工作线程并从中传输数据,以便在单独的线程中处理块。人们已经在使用流为现实世界中的产品做一些有趣的事情:例如,文件共享Web应用程序Wormhole的创建者已开源加密流的代码。

也许2022年将是Web流之年……