Nodejs中包括4种类型的流:Readable、Writable、Duplex和Transform.
Readable Stream
自定义Readable
自定义 Readable 流必须调用 new stream.Readable([options]) 构造函数并实现 readable._read() 方法。- import { Readable } from "node:stream";
- const readable = new Readable();
- readable.on("data", (chunk) => {
- console.log(chunk.toString());
- });
- readable.on('end', () => {
- console.log('end');
- })
- readable.on('error', (err) => {
- console.log('error-> ', err);
- })
复制代码 此时会触发error事件
error-> Error [ERR_METHOD_NOT_IMPLEMENTED]: The _read() method is not implemented
因此要创建一个正常工作的Readable,需要实现_read方法,有三种方式实现自定义Readable流(Node的4种流都可以通过下面三种形式实现)。
方式一、在Readable实例上挂载_read方法- const readable = new Readable();
- readable._read = function(){
- this.push("hello world"); //写入readable的缓冲区
- this.push(null)
- }
复制代码 方式二、Readable初始化给options参数传递read(这个相当于_read方法)- const readable = new Readable({
- read(){
- this.push("hello world");
- this.push(null)
- }
- });
复制代码 方式三、继承时实现_read- class MyReadable extends Readable {
- _read(){
- this.push("hello world");
- this.push(null)
- }
- }
- const readable = new MyReadable();
复制代码 解释 _read 被调用的时机
在 Node.js 的流(Stream)API 中,_read 方法是 Readable 流的核心内部方法,它的调用时机主要有以下几点:
- 当消费者调用 stream.read() 方法时:当外部代码通过 read() 方法请求数据时,如果内部缓冲区没有足够的数据,Node.js 会调用 _read 方法来获取更多数据。
- 当消费者添加 'data' 事件监听器时:当你为 Readable 流添加 'data' 事件监听器时,流会自动切换到流动模式(flowing mode),此时会自动调用 _read 方法开始获取数据。
- 当流从暂停模式切换到流动模式时:例如通过调用 resume() 方法时,会触发 _read 的调用。
- 初始化流时:在某些情况下,当流被创建并进入流动模式时,_read 方法会被自动调用一次来填充初始数据。
_read 方法的工作原理是:
- 它负责从底层资源(如文件、网络等)获取数据
- 通过调用 this.push(chunk) 将数据放入流的内部缓冲区
- 当没有更多数据时,调用 this.push(null) 表示流结束
Readable两种模式和三种状态
两种模式
- 流动模式(flowing mode)。流会自动从内部缓冲区中读取并触发 'data' 事件,当缓存中没有数据时则调用_read把数据放入缓冲区。
- 暂停模式(paused mode)。流不会自动触发 'data' 事件,数据会留在内部缓冲区,通过显示readable.read()获取数据。
三种状态
具体来说,在任何给定的时间点,每个 Readable 都处于三种可能的状态之一:
- readable.readableFlowing === null
- readable.readableFlowing === false
- readable.readableFlowing === true
当 readable.readableFlowing 为 null 时,则不提供消费流数据的机制。因此,流不会生成数据。在此状态下,为 'data' 事件绑定监听器、调用 readable.pipe() 方法、或调用 readable.resume() 方法会将 readable.readableFlowing 切换到 true,从而使 Readable 在生成数据时开始主动触发事件。
调用 readable.pause()、readable.unpipe() 或接收背压将导致 readable.readableFlowing 设置为 false,暂时停止事件的流动但不会停止数据的生成。在此状态下,为 'data' 事件绑定监听器不会将 readable.readableFlowing 切换到 true。
总结:
值状态描述null暂停模式(默认),流既没有开始自动流动数据,也没有明确被暂停或恢复。数据会被缓存在内部缓冲区中,直到你明确开始消费。true流动模式(flowing mode) 自动消费false暂停模式(paused mode) 不会自动消费,需要显式调用read()消费流动模式示例:- const readable = Readable.from(['A', 'B', 'C']);
- // 监听了'data'事件,此时readableFlowing === true
- readable.on('data', (chunk) => {
- console.log('Got chunk:', chunk);
- /*
- Got chunk: A
- Got chunk: B
- Got chunk: C
- */
- });
- readable.on('end', () => {
- console.log('end'); //end (流结束,不会再有新数据)
- })
复制代码 暂停模式示例:- const readable = Readable.from(['A', 'B', 'C']);
- // ⚠️ 此时 readableFlowing === null
- readable.on('readable', () => {
- let chunk;
- while (null !== (chunk = readable.read())) {
- console.log('Got chunk:', chunk);
- }
- /*
- Got chunk: A
- Got chunk: B
- Got chunk: C
- */
- });
- readable.on('end', () => {
- console.log('end'); //end (流结束,不会再有新数据)
- })
复制代码 P.S. readable.read()需要在'readable'事件中读取数据,因为在外面调用可能返回 null :如果在 'readable' 事件触发之前或者当内部缓冲区为空时调用 read() ,它会返回 null ,表示当前没有数据可读。
Readable的事件和方法
事件
- 'data': 接受数据chunk(非对象模式下是Buffer或String), 数据在可用时会立即触发该事件。
- 'end': 当流中没有更多数据了(比如this.push(null)),可由readable.end()触发。
- close: 当流及其任何底层资源(例如文件描述符)已关闭时,则会触发 'close' 事件。该事件表明将不再触发更多事件,并且不会发生进一步的计算。默认情况下readable.destroy()会触发close事件。
- 'error': 低层流由于低层内部故障导致无法生成或者推送无效数据时,触发。
- 'readable': 当有数据可从流中读取时,将触发 'readable' 事件。
- 'pause': 当调用 readable.pause() 并且 readableFlowing 不是 false 时,则会触发 'pause' 事件。
方法
方法说明read([size])从可读缓冲区中取出数据pipe(dest) / unpipe()管道传输pause() / resume()控制流动模式unshift(chunk)将数据重新放回可读缓冲push(chunk)向可读端推送数据(用于自定义实现)from(iterable[, options])从迭代对象中创建流下面着重介绍下pipe和from
1.Readable.pipe(destination[, options]) 第一个参数destination是一个写入流(或者Duplex/Transform,对应到其写入流部分),这个方法将使Readable自动切换到流动模式并将其所有数据推送到绑定的 Writable`。
示例:- const fs = require('node:fs');
- const readable = getReadableStreamSomehow();
- const writable = fs.createWriteStream('file.txt');
- // readable的数据自动写入writable
- readable.pipe(writable);
复制代码 2.Readable.from(iterable[, options])
第一个参数是实现 Symbol.asyncIterator 或 Symbol.iterator 可迭代协议的对象。如果传递空值,则触发 'error' 事件。默认情况下,Readable.from() 会将 options.objectMode 设置为 true,这意味着每次读取数据都是一个Javascript值。- import { Readable } from 'node:stream';
- async function * generate() {
- yield 'hello';
- yield {name: 'streams'};
- }
- const readable = Readable.from(generate());
- readable.on('data', (chunk) => {
- console.log(chunk);
- /*
- hello
- { name: 'streams' }
- */
- });
复制代码 继承了Readable的Node API
Readable 流的示例包括:
- 客户端上的 HTTP 响应
- 服务器上的 HTTP 请求
- 文件系统读取流
- TCP 套接字
- 子进程标准输出和标准错误
- process.stdin
文件系统读取流示例:
首先使用readFile一次性读取数据,这个时候如果是大文件,那么会占用非常大的内存。- // 方式一
- fs.readFile(path.resolve(__dirname, './bigdata.txt'), 'utf8', (err, data) => {
- if (err) {
- console.error('读取文件时出错:', err);
- return;
- }
- console.log('文件内容:');
- console.log(data)
- });
复制代码 接下来我们创建一个流来读数据,分批次读取数据。- const readStream = fs.createReadStream(path.resolve(__dirname, './bigdata.txt'), {
- encoding: 'utf8',
- highWaterMark: 4, // 每次读取4个字节 (故意设置很小,方便观察)
- });
- readStream.on('data', (chunk) => {
- console.log('读取到的数据:', chunk);
- });
- readStream.on('end', () => {
- console.log('文件读取完成');
- });
- readStream.on('error', (err) => {
- console.error('读取文件时出错:', err);
- });
复制代码 如下图所示,流的方式读取数据是分批次的。
但上述做法并不能真正解决大文件占用大内存,因为面临流的背压问题(大意就是读的快,写的慢导致读入的数据积压在输入缓冲区,后面「缓冲区、高压线和背压问题」一小节会探究这个问题)。
可以用pipe来处理,代码如下:- const readStream = fs.createReadStream(path.resolve(__dirname, './bigdata.txt'), {
- encoding: 'utf8',
- highWaterMark: 4, // 每次读取4个字节
- });
- // 使用pipe连接可读流和可写流
- readStream.pipe(process.stdout);
复制代码 依旧是每次读4个字节写入可写流,但pipe会自动处理背压问题。
Writable Stream
自定义Writable
和实现Readable类似,自定义实现Writable,需要实现_write方法。- import {Writable} from 'node:stream';
- class MyWritable extends Writable {
- _write(chunk:any, encoding:BufferEncoding, next:()=>void) {
- console.log('Got chunk:', chunk.toString());
- setTimeout(()=>{
- next();
- }, 1000)
- }
- }
- const writable = new MyWritable();
- writable.write('hello writable'); //写入writable的缓冲区
复制代码 解释 _write 被调用的时机
Writable通过调用内部方法_write 实际处理写入数据。它接受三个参数:
- chunk 是 any,encoding的编码模式决定了chunk具体是什么(Buffer还是字符串等)
- encoding 是 BufferEncoding类型,包括"ascii" | "utf8" | "utf-8" | "utf16le" | "utf-16le" | "ucs2" | "ucs-2" | "base64" | "base64url" | "latin1" | "binary" | "hex"。还有可能是'buffer'(下面会介绍到)。
- next 函数是一个回调函数,它在 _write 方法中扮演着非常重要的角色
- 信号作用 :调用 next() 表示当前数据块已经处理完成,流可以继续处理下一个数据块
- 流控制 :如果不调用 next() ,流会认为数据还在处理中,不会继续处理缓冲区中的其他数据
- 错误处理 :如果处理过程中出现错误,可以调用 next(error) 来通知流发生了错误
writable.write(data) 仅是将数据写入内部缓冲区(此时不一定调用_write方法),当数据从内部缓冲区被消费时才会调用_write方法。
writable.write可以快速写入多个,但是当_write需要next被调用后才能处理缓冲区的下一个数据,所以有部分是会存入内部缓冲区中,只有当上一个数据处理完成才会对下一个数据调用_write方法。
区分:
- write是将数据写入内部缓冲区。
- _write是将数据从内部缓冲区写入目的地(比如磁盘、网络等)。
关于write API和编码问题
write方法的其中的一种重载形式:writable.write(data, encoding,callback),在默认情况下encoding参数是不会起作用的。- class MyWritable extends Writable {
- _write(chunk:any, encoding:string, next:()=>void) {
- console.log('encoding:', encoding); // encoding: buffer
- console.log('Got chunk:', chunk); // Got chunk: <Buffer 68 65 6c 6c 6f 20 77 72 69 74 61 62 6c 65>
- setTimeout(()=>{
- next();
- }, 1000)
- }
- }
- const writable = new MyWritable({
- decodeStrings: true, //默认,这个参数会被设置为true
- });
- writable.write('hello writable', 'utf-8'); //此时encoding参数不生效, data还是是转换成Buffer处理的(默认)。
复制代码- class MyWritable extends Writable {
- _write(chunk:any, encoding:string, next:()=>void) {
- console.log('encoding:', encoding); // encoding: utf-8
- console.log('Got chunk:', chunk); // Got chunk: hello writable
- setTimeout(()=>{
- next();
- }, 1000)
- }
- }
- const writable = new MyWritable({
- decodeStrings: false, //设为false
- });
- writable.write('hello writable', 'utf-8');
- //decodeStrings: false时,data才是按encoding='utf-8'处理。此时在内部_write可以发现第二参数encoding会是'utf-8', 第一个参数chunk则是一个字符串。
复制代码 Writable的事件和方法
事件
- 'close': 当流及其任何底层资源(例如文件描述符)已关闭时,触发。
- 'error': 如果在写入或管道数据时发生错误,触发。
- 'drain': 当写入流内部的写入缓冲区被清空(目的地已接收这部分数据,缓冲区长度降为0),典型地,这发生在之前调用writable.write()返回了false(表示缓冲达到或超过highWaterMark)之后,一旦缓冲被完全“排空”,就会发出'drain',表示可以安全继续写入。
- // 一次性批量写入大量数据,大小达到highWaterMark,令write方法返回false
- function writeOneMillionTimes(writer, data, encoding, callback) {
- let i = 1000000;
- write();
- function write() {
- let ok = true;
- do {
- i--;
- if (i === 0) {
- // Last time!
- writer.write(data, encoding, callback);
- } else {
- // 当缓冲区满了,ok=false
- ok = writer.write(data, encoding);
- }
- } while (i > 0 && ok);
- if (i > 0) {
- // 当drain了(即缓冲区被清空了),可以继续写入
- writer.once('drain', write);
- }
- }
- }
复制代码
- 'finish': 在调用 stream.end() 方法之后,并且所有数据都已刷新到底层系统,触发。
- const writer = getWritableStreamSomehow();
- for (let i = 0; i < 100; i++) {
- writer.write(`hello, #${i}!\n`);
- }
- writer.on('finish', () => {
- console.log('All writes are now complete.');
- });
- writer.end('This is the end\n');
复制代码
- 'pipe': Readable Stream上调用readable.pipe(writable)将数据传输到Writable Stream上时,触发。
- const writer = getWritableStreamSomehow();
- const reader = getReadableStreamSomehow();
- writer.on('pipe', (src) => {
- console.log('Something is piping into the writer.');
- assert.equal(src, reader);
- });
- reader.pipe(writer); //当流开始传输时触发writer的'pipe'事件
复制代码 方法
方法说明write(chunk[, encoding][, callback])写入数据end([chunk][, encoding][, callback])结束写入cork() / uncork()批量写入优化setDefaultEncoding(encoding)设置默认编码destroy([error])销毁流继承了Writable的Node API
- 客户端的HTTP请求
- 服务端的HTTP响应
- 文件系统的写入流
- 子进程标准输入
- process.stdout
服务端HTTP响应示例- import http from 'node:http';
- import { Readable } from 'node:stream';
- // 创建一个自定义的可读流,用于分批生成数据
- class BatchDataStream extends Readable {
- constructor(options = {}) {
- super(options);
- this.dataSize = options.dataSize || 5; // 数据批次数量
- this.currentBatch = 0;
- this.interval = options.interval || 1000; // 每批数据的间隔时间(毫秒)
- }
- _read() {
- // 如果已经发送完所有批次,结束流
- if (this.currentBatch >= this.dataSize) {
- this.push(null); // 表示流结束
- return;
- }
- // 使用setTimeout模拟异步数据生成
- setTimeout(() => {
- const batchNumber = this.currentBatch + 1;
- const data = `这是第 ${batchNumber} 批数据,时间戳: ${new Date().toISOString()}\n`;
-
- console.log(`正在发送第 ${batchNumber} 批数据`);
-
- // 将数据推送到流中
- this.push(data);
-
- this.currentBatch++;
- }, this.interval);
- }
- }
- // 创建HTTP服务器
- const server = http.createServer((req, res) => {
- // 设置响应头
- res.setHeader('Content-Type', 'text/plain; charset=utf-8');
- // res.setHeader('Transfer-Encoding', 'chunked'); //但使用pipe传输数据,会自动设置Transfer-Encoding为chunked,所以这里不需要设置
-
- console.log('收到新的请求,开始流式传输数据...');
-
- // 创建数据流实例
- const dataStream = new BatchDataStream({
- dataSize: 10, // 总共发送10批数据
- interval: 1000 // 每批数据间隔1秒
- });
-
- // 使用pipe将数据流直接连接到响应对象
- dataStream.pipe(res);
-
- // 当流结束时记录日志
- dataStream.on('end', () => {
- console.log('数据传输完成');
- });
- });
- // 启动服务器
- const PORT = 3000;
- server.listen(PORT, () => {
- console.log(`服务器运行在 http://localhost:${PORT}`);
- console.log('请在浏览器中访问此地址,或使用 curl http://localhost:3000 查看流式数据传输');
- });
复制代码 运行curl http://localhost:3000可以看到每个1s钟接受一批数据。如下图:
在网页上也可以查看这个请求对应的响应头的Transfer-Encoding被设置为chunked.(使用pipe会自动设置chunked)
拓展知识: Transfer-Encoding
Chunked传输编码是HTTP中的一种传输编码方式,它允许服务器将响应数据分成一系列小块(chunks)来传输。每个chunk都有一个头部,用于指示其大小,然后是一个回车换行(CRLF)分隔符,接着是chunk的实际数据,最后再加上一个CRLF分隔符。这个过程一直持续到最后一个chunk,它的大小为0,表示响应数据的结束。
以下是一个示例HTTP响应使用chunked传输编码的样本:- HTTP/1.1 200 OK
- Content-Type: text/plain
- Transfer-Encoding: chunked
- 4\r\n
- This\r\n
- 7\r\n
- is a \r\n
- 9\r\n
- chunked \r\n
- 6\r\n
- message\r\n
- 0\r\n
- \r\n
复制代码 大多数情况下,响应头会带上Content-Length字段(表示响应正文的长度),头Transfer-Encoding: chunked和Content-Length是互斥的,不会同时出现在响应头(如果同时出现Transfer-Encoding优先级是大于Content-Length的)
Chunked传输的使用场景:大文件下载、API响应流(逐渐加载数据)、AI生成内容(文本图像)
Duplex 双工流
自定义Duplex双工流
自定义Duplex需要同时实现_read和_write方法。因为 Duplex 流包含了 Readable 和 Writable两个流,所以要维护两个独立的内部缓冲区,用于读取和写入,允许每一方独立于另一方操作,同时保持适当和高效的数据流。
自定义一个XxxDuplex,可以互相写入数据。- import { Duplex } from 'node:stream';
- class XxxDuplex extends Duplex {
- constructor(peer = null, options = {}) {
- super(options);
- this.peer = peer; // 另一端的 Duplex
- }
- // 当可写端接收到数据时
- _write(chunk, encoding, callback) {
- const data = chunk.toString();
- console.log(`[${this.label}] 写入数据:`, data);
- // 把数据发给对端
- if (this.peer) {
- this.peer.push(data);
- }
- callback(); // 通知写操作完成
- }
- // 当可读端被调用时(通常由 .read() 或流消费触发)
- _read(size) {
- // 不做额外操作,等待对端 push()
- }
- // 结束时
- _final(callback) {
- if (this.peer) this.peer.push(null); // 通知对端结束
- callback();
- }
- }
- // 创建两个互为对端的 Duplex 流
- const duplexA = new XxxDuplex();
- const duplexB = new XxxDuplex(duplexA);
- duplexA.peer = duplexB;
- // 加上标识
- duplexA.label = 'A';
- duplexB.label = 'B';
- // 监听 B 的读取
- duplexB.on('data', chunk => {
- console.log(`[${duplexB.label}] 收到数据:`, chunk.toString());
- });
- duplexB.on('end', () => {
- console.log(`[${duplexB.label}] 流结束`);
- });
- // A 向 B 发送数据
- duplexA.write('你好,B!');
- duplexA.write('这是一条测试消息');
- duplexA.end();
- /*
- [A] 写入数据: 你好,B!
- [A] 写入数据: 这是一条测试消息
- [B] 收到数据: 你好,B!
- [B] 收到数据: 这是一条测试消息
- [B] 流结束
- */
复制代码 Duplex和readable&writable相互转换
Duplex和readable&writable互相转换
使用stream.Duplex.fromWeb(pair[, options])将readable和writable转为duplex。- import { Duplex } from 'node:stream';
- import {
- ReadableStream,
- WritableStream,
- } from 'node:stream/web';
- const readable = new ReadableStream({
- start(controller) {
- controller.enqueue('world'); //设置readable buffer的初始数据
- },
- });
- const writable = new WritableStream({
- write(chunk) {
- console.log('writable', chunk); //writable hello
- },
- });
- const pair = {
- readable,
- writable,
- };
- //encoding: 'utf8'表示以utf8编码工作,objectMode:true表示以对象模式工作
- const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
- duplex.write('hello');
- duplex.on('data', (chunk) => {
- console.log('readable', chunk); //readable world
- });
复制代码 使用stream.Duplex.toWeb(streamDuplex)将duplex拆分成两个流- import { Duplex } from 'node:stream';
- const duplex = Duplex({
- objectMode: true,
- read() {
- this.push('world');
- this.push(null);
- },
- write(chunk, encoding, callback) {
- console.log('writable', chunk);
- callback();
- },
- });
- const { readable, writable } = Duplex.toWeb(duplex);
- writable.getWriter().write('hello');
- const { value } = await readable.getReader().read();
- console.log('readable', value);
复制代码 属于Duplex流的Node API
TCP套接字示例:- import net from 'node:net';
- /** 服务端 */
- const server = net.createServer(function(clientSocket){
- // clientSocket 就是一个 duplex 流
- console.log('新的客户端 socket 连接');
- clientSocket.on('data', function(data){
- console.log(`服务端收到数据: ${data.toString()}`);
- clientSocket.write('world!');
- });
- clientSocket.on('end', function(){
- console.log('连接中断');
- });
- });
- server.listen(6666, 'localhost', function(){
- const address = server.address();
- console.log('服务端启动,地址为:%j', address);
- });
- /** 客户端 */
- // socket 就是一个 duplex 流
- const socket = net.createConnection({
- host: 'localhost',
- port: 6666
- }, () => {
- console.log('连接到了服务端!');
- socket.write('hello');
- setTimeout(()=> {
- socket.end();
- }, 2000);
- });
- socket.on('data', (data) => {
- console.log(`客户端收到数据: ${data.toString()}`);
- });
- socket.on('end', () => {
- console.log('断开连接');
- });
复制代码 Transform 转换流
自定义Transform流
Transform 流是一种双工流的特殊子类(和Duplex 双工流一样同时实现 Readable 和 Writable 接口)。那么Transform流和Duplex流的关联和区别?
关联:stream.Transform继承了stream.Duplex,并实现了自己的_read和_write方法。
区别:
类型特点(区别)用途关键方法Duplex 流读写互相独立,输入和输出没有直接关系双向通信数据处理Transform 流输入和输出相关:写入的数据经过处理后再输出read() / write()transform(chunk, encoding, callback)也就是说,Duplex是输入输出流两部分独立(不干扰,同时进行);而Transform同样有输入和输出流两部分,但是Node会自动将输出流缓冲区的内容写入输入流缓冲区。- Writable Buffer
- ↓ (消费)
- transform(chunk)
- ↓ (push)
- Readable Buffer
复制代码 实现自定义的Transfrom流则需要实现_transfrom方法,举个例子:- import {Transform} from 'node:stream'
- class UpTransform extends Transform {
- constructor() {
- super();
- }
- _transform(chunk, enc, next) {
- console.log('enc', enc); // enc buffer
- this.push(chunk.toString().toUpperCase());
- next();
- }
- }
- const t = new UpTransform();
- t.write('abc'); // 写入 writable buffer
- t.end();
- // 从 readable buffer 读取数据
- t.on('data', (chunk) => console.log('Read:', chunk.toString()));
复制代码 也用Transform初始化传参的方式创建一个自定的Transfrom实例:- const t = new Transform({
- transform(chunk, enc, next) {
- console.log('enc', enc); // enc buffer
- this.push(chunk.toString().toUpperCase());
- next();
- }
- });
复制代码 _transform调用的时机
当使用Transform流往输入缓存区写入数据时,会调用_transform方法进行转换。
比如上面UpTransform那个例子,当t.write('abc');时就会触发_transform。
在pipe方法中使用Transform流,会调用_transform方法进行转换。
属于Transfrom流的Node API
zlib流示例:- const fs = require('node:fs');
- const zlib = require('node:zlib');
- const r = fs.createReadStream('file.txt');
- const z = zlib.createGzip(); //z是一个Transform流
- const w = fs.createWriteStream('file.txt.gz');
- r.pipe(z).pipe(w);
复制代码 缓冲区、高压线和背压问题
缓冲区、高压线
首先介绍下缓冲区,Readable/Writable内部维护了一个队列数据叫缓冲区。
高压线(highWaterMark)是Readable/Writable内部的一个阈值(可在初始化时修改)。用来告诉流缓冲区的数据大小不应该超过这个值。
背压问题解释
背压问题:当内部缓冲区的大小超过highWaterMark阈值,然后持续扩大,占用原来越多内存,甚至最后出现内存溢出。大白话来说就是缓存积压问题。
一旦内部读取缓冲区的总大小达到 highWaterMark 指定的阈值,则流将暂时停止从底层资源读取数据,直到可以消费当前缓冲的数据(也就是,流将停止调用内部的用于填充读取缓冲区 readable._read() 方法)。
这里我们会有一个疑问就是:readable流不是会停止读数据到缓冲区吗,怎么还有背压问题?
以下是GPT的解释,我消化总结下:
- Readable 流有两种操作模式:flowing 和 paused,flowing模式下无法有效处理背压问题,因为不能暂停流的读(不能停止调用_read)。所以on('data')这种方式是无法处理背压问题的,它会持续不断的把数据积压到缓冲区。
- 在paused模式下,可以调用pause()方法暂停流的读(pipe就是这个原理),从而可以做到处理背压问题,但是也只能是缓解。像文件流这种是可控制的,能立即停止从文件读取数据;但像Socket流,则不能立即停止数据的接受,但会:暂停从内核socket缓冲区中读取 & 在TCP层通过窗口机制通知发送端"别发那么快"。
pipe() 如何处理 Readable 流的背压?
readable.pipe(writable):
- 如果 Writable 流的缓冲区满了(返回 false),pipe() 会自动调用 Readable.pause()
- 当 Writable 流排空缓冲区并发出 'drain' 事件时,pipe() 会调用 Readable.resume()
- 这样就在两个流之间建立了一个自动的背压处理机制
总结来说,虽然 Readable 流确实会在缓冲区达到 highWaterMark 时尝试暂停底层读取,但这只是背压处理的一部分。完整的背压处理需要整个流管道中的所有组件协同工作,而 pipe() 方法正是为了简化这种协同而设计的。
当重复调用 writable.write(chunk) 方法时,数据会缓存在 Writable 流中。虽然内部的写入缓冲区的总大小低于 highWaterMark 设置的阈值,但对 writable.write() 的调用将返回 true。一旦内部缓冲区的大小达到或超过 highWaterMark,则将返回 false。
下面这个例子就是Writable的背压问题解决(「Writable Stream」这节出现过的例子,来自官方文档)- // 一次性批量写入大量数据,大小达到highWaterMark,令write方法返回false
- function writeOneMillionTimes(writer, data, encoding, callback) {
- let i = 1000000;
- write();
- function write() {
- let ok = true;
- do {
- i--;
- if (i === 0) {
- // Last time!
- writer.write(data, encoding, callback);
- } else {
- // 当缓冲区满了,ok=false
- ok = writer.write(data, encoding);
- }
- } while (i > 0 && ok);
- if (i > 0) {
- // 当drain了(即缓冲区被清空了),可以继续写入
- writer.once('drain', write);
- }
- }
- }
复制代码 如何解决背压问题
方式一:手动拉数据来控制- readable.on('readable', () => {
- let chunk;
- while (null !== (chunk = readable.read())) {
- processChunk(chunk);
- }
- });
复制代码 方式二:使用 pipe()(自动处理)方式三:使用 await 迭代(自动处理)- for await (const chunk of readable) {
- await processChunk(chunk); // 每次 await 都自然暂停上游读取
- }
复制代码 来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |