找回密码
 立即注册
首页 资源区 代码 别再说我不懂Node"流"了

别再说我不懂Node"流"了

沃盼盼 前天 20:10
Nodejs中包括4种类型的流:Readable、Writable、Duplex和Transform.
Readable Stream

自定义Readable

自定义 Readable 流必须调用 new stream.Readable([options]) 构造函数并实现 readable._read() 方法。
  1. import { Readable } from "node:stream";
  2. const readable = new Readable();
  3. readable.on("data", (chunk) => {
  4.   console.log(chunk.toString());
  5. });
  6. readable.on('end', () => {
  7.   console.log('end');
  8. })
  9. readable.on('error', (err) => {
  10.   console.log('error-> ', err);
  11. })
复制代码
此时会触发error事件
error->  Error [ERR_METHOD_NOT_IMPLEMENTED]: The _read() method is not implemented
因此要创建一个正常工作的Readable,需要实现_read方法,有三种方式实现自定义Readable流(Node的4种流都可以通过下面三种形式实现)。
方式一、在Readable实例上挂载_read方法
  1. const readable = new Readable();
  2. readable._read = function(){
  3.   this.push("hello world"); //写入readable的缓冲区
  4.   this.push(null)
  5. }
复制代码
方式二、Readable初始化给options参数传递read(这个相当于_read方法)
  1. const readable = new Readable({
  2.   read(){
  3.     this.push("hello world");
  4.     this.push(null)
  5.   }
  6. });
复制代码
方式三、继承时实现_read
  1. class MyReadable extends Readable {
  2.   _read(){
  3.     this.push("hello world");
  4.     this.push(null)
  5.   }
  6. }
  7. const readable = new MyReadable();
复制代码
解释 _read 被调用的时机

在 Node.js 的流(Stream)API 中,_read 方法是 Readable 流的核心内部方法,它的调用时机主要有以下几点:

  • 当消费者调用 stream.read() 方法时:当外部代码通过 read() 方法请求数据时,如果内部缓冲区没有足够的数据,Node.js 会调用 _read 方法来获取更多数据。
  • 当消费者添加 'data' 事件监听器时:当你为 Readable 流添加 'data' 事件监听器时,流会自动切换到流动模式(flowing mode),此时会自动调用 _read 方法开始获取数据。
  • 当流从暂停模式切换到流动模式时:例如通过调用 resume() 方法时,会触发 _read 的调用。
  • 初始化流时:在某些情况下,当流被创建并进入流动模式时,_read 方法会被自动调用一次来填充初始数据。
_read 方法的工作原理是:

  • 它负责从底层资源(如文件、网络等)获取数据
  • 通过调用 this.push(chunk) 将数据放入流的内部缓冲区
  • 当没有更多数据时,调用 this.push(null) 表示流结束
Readable两种模式和三种状态

两种模式

  • 流动模式(flowing mode)。流会自动从内部缓冲区中读取并触发 'data' 事件,当缓存中没有数据时则调用_read把数据放入缓冲区。
  • 暂停模式(paused mode)。流不会自动触发 'data' 事件,数据会留在内部缓冲区,通过显示readable.read()获取数据。
三种状态
具体来说,在任何给定的时间点,每个 Readable 都处于三种可能的状态之一:


  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true
当 readable.readableFlowing 为 null 时,则不提供消费流数据的机制。因此,流不会生成数据。在此状态下,为 'data' 事件绑定监听器、调用 readable.pipe() 方法、或调用 readable.resume() 方法会将 readable.readableFlowing 切换到 true,从而使 Readable 在生成数据时开始主动触发事件。
调用 readable.pause()、readable.unpipe() 或接收背压将导致 readable.readableFlowing 设置为 false,暂时停止事件的流动但不会停止数据的生成。在此状态下,为 'data' 事件绑定监听器不会将 readable.readableFlowing 切换到 true。
总结:
值状态描述null暂停模式(默认),流既没有开始自动流动数据,也没有明确被暂停或恢复。数据会被缓存在内部缓冲区中,直到你明确开始消费。true流动模式(flowing mode) 自动消费false暂停模式(paused mode) 不会自动消费,需要显式调用read()消费流动模式示例:
  1. const readable = Readable.from(['A', 'B', 'C']);
  2. // 监听了'data'事件,此时readableFlowing === true
  3. readable.on('data', (chunk) => {
  4.     console.log('Got chunk:', chunk);
  5.     /*
  6.     Got chunk: A
  7.         Got chunk: B
  8.         Got chunk: C
  9.     */
  10. });
  11. readable.on('end', () => {
  12.     console.log('end'); //end  (流结束,不会再有新数据)
  13. })
复制代码
暂停模式示例:
  1. const readable = Readable.from(['A', 'B', 'C']);
  2. // ⚠️ 此时 readableFlowing === null
  3. readable.on('readable', () => {
  4.   let chunk;
  5.   while (null !== (chunk = readable.read())) {
  6.     console.log('Got chunk:', chunk);
  7.   }
  8.     /*
  9.     Got chunk: A
  10.         Got chunk: B
  11.         Got chunk: C
  12.     */
  13. });
  14. readable.on('end', () => {
  15.     console.log('end'); //end  (流结束,不会再有新数据)
  16. })
复制代码
P.S. readable.read()需要在'readable'事件中读取数据,因为在外面调用可能返回 null :如果在 'readable' 事件触发之前或者当内部缓冲区为空时调用 read() ,它会返回 null ,表示当前没有数据可读。
Readable的事件和方法

事件

  • 'data': 接受数据chunk(非对象模式下是Buffer或String), 数据在可用时会立即触发该事件。
  • 'end': 当流中没有更多数据了(比如this.push(null)),可由readable.end()触发。
  • close: 当流及其任何底层资源(例如文件描述符)已关闭时,则会触发 'close' 事件。该事件表明将不再触发更多事件,并且不会发生进一步的计算。默认情况下readable.destroy()会触发close事件。
  • 'error': 低层流由于低层内部故障导致无法生成或者推送无效数据时,触发。
  • 'readable': 当有数据可从流中读取时,将触发 'readable' 事件。
  • 'pause': 当调用 readable.pause() 并且 readableFlowing 不是 false 时,则会触发 'pause' 事件。
方法
方法说明read([size])从可读缓冲区中取出数据pipe(dest) / unpipe()管道传输pause() / resume()控制流动模式unshift(chunk)将数据重新放回可读缓冲push(chunk)向可读端推送数据(用于自定义实现)from(iterable[, options])从迭代对象中创建流下面着重介绍下pipe和from
1.Readable.pipe(destination[, options])    第一个参数destination是一个写入流(或者Duplex/Transform,对应到其写入流部分),这个方法将使Readable自动切换到流动模式并将其所有数据推送到绑定的 Writable`。
示例:
  1. const fs = require('node:fs');
  2. const readable = getReadableStreamSomehow();
  3. const writable = fs.createWriteStream('file.txt');
  4. // readable的数据自动写入writable
  5. readable.pipe(writable);
复制代码
2.Readable.from(iterable[, options])
第一个参数是实现 Symbol.asyncIterator 或 Symbol.iterator 可迭代协议的对象。如果传递空值,则触发 'error' 事件。默认情况下,Readable.from() 会将 options.objectMode 设置为 true,这意味着每次读取数据都是一个Javascript值。
  1. import { Readable } from 'node:stream';
  2. async function * generate() {
  3.   yield 'hello';
  4.   yield {name: 'streams'};
  5. }
  6. const readable = Readable.from(generate());
  7. readable.on('data', (chunk) => {
  8.   console.log(chunk);
  9.   /*
  10.   hello
  11.   { name: 'streams' }
  12.   */
  13. });
复制代码
继承了Readable的Node API

Readable 流的示例包括:

  • 客户端上的 HTTP 响应
  • 服务器上的 HTTP 请求
  • 文件系统读取流
  • TCP 套接字
  • 子进程标准输出和标准错误
  • process.stdin
文件系统读取流示例:
首先使用readFile一次性读取数据,这个时候如果是大文件,那么会占用非常大的内存。
  1. // 方式一
  2. fs.readFile(path.resolve(__dirname, './bigdata.txt'), 'utf8', (err, data) => {
  3.   if (err) {
  4.     console.error('读取文件时出错:', err);
  5.     return;
  6.   }
  7.   console.log('文件内容:');
  8.   console.log(data)
  9. });
复制代码
接下来我们创建一个流来读数据,分批次读取数据。
  1. const readStream = fs.createReadStream(path.resolve(__dirname, './bigdata.txt'), {
  2.   encoding: 'utf8',
  3.   highWaterMark: 4,   // 每次读取4个字节 (故意设置很小,方便观察)
  4. });
  5. readStream.on('data', (chunk) => {
  6.   console.log('读取到的数据:', chunk);
  7. });
  8. readStream.on('end', () => {
  9.   console.log('文件读取完成');
  10. });
  11. readStream.on('error', (err) => {
  12.   console.error('读取文件时出错:', err);
  13. });
复制代码
如下图所示,流的方式读取数据是分批次的。
1.png

但上述做法并不能真正解决大文件占用大内存,因为面临流的背压问题(大意就是读的快,写的慢导致读入的数据积压在输入缓冲区,后面「缓冲区、高压线和背压问题」一小节会探究这个问题)。
可以用pipe来处理,代码如下:
  1. const readStream = fs.createReadStream(path.resolve(__dirname, './bigdata.txt'), {
  2.   encoding: 'utf8',
  3.   highWaterMark: 4,   // 每次读取4个字节
  4. });
  5. // 使用pipe连接可读流和可写流
  6. readStream.pipe(process.stdout);
复制代码
依旧是每次读4个字节写入可写流,但pipe会自动处理背压问题。
Writable Stream

自定义Writable

和实现Readable类似,自定义实现Writable,需要实现_write方法。
  1. import {Writable} from 'node:stream';
  2. class MyWritable extends Writable {
  3.   _write(chunk:any, encoding:BufferEncoding, next:()=>void) {
  4.     console.log('Got chunk:', chunk.toString());
  5.     setTimeout(()=>{
  6.       next();
  7.     }, 1000)
  8.   }
  9. }
  10. const writable = new MyWritable();
  11. writable.write('hello writable'); //写入writable的缓冲区
复制代码
解释 _write 被调用的时机

Writable通过调用内部方法_write 实际处理写入数据。它接受三个参数:

  • chunk 是 any,encoding的编码模式决定了chunk具体是什么(Buffer还是字符串等)
  • encoding 是 BufferEncoding类型,包括"ascii" | "utf8" | "utf-8" | "utf16le" | "utf-16le" | "ucs2" | "ucs-2" | "base64" | "base64url" | "latin1" | "binary" | "hex"。还有可能是'buffer'(下面会介绍到)。
  • next 函数是一个回调函数,它在 _write 方法中扮演着非常重要的角色

    • 信号作用 :调用 next() 表示当前数据块已经处理完成,流可以继续处理下一个数据块
    • 流控制 :如果不调用 next() ,流会认为数据还在处理中,不会继续处理缓冲区中的其他数据
    • 错误处理 :如果处理过程中出现错误,可以调用 next(error) 来通知流发生了错误

writable.write(data) 仅是将数据写入内部缓冲区(此时不一定调用_write方法),当数据从内部缓冲区被消费时才会调用_write方法。
writable.write可以快速写入多个,但是当_write需要next被调用后才能处理缓冲区的下一个数据,所以有部分是会存入内部缓冲区中,只有当上一个数据处理完成才会对下一个数据调用_write方法。
区分:

  • write是将数据写入内部缓冲区。
  • _write是将数据从内部缓冲区写入目的地(比如磁盘、网络等)。
关于write API和编码问题
write方法的其中的一种重载形式:writable.write(data, encoding,callback),在默认情况下encoding参数是不会起作用的。
  1. class MyWritable extends Writable {
  2.   _write(chunk:any, encoding:string, next:()=>void) {
  3.     console.log('encoding:', encoding); // encoding: buffer
  4.     console.log('Got chunk:', chunk); // Got chunk: <Buffer 68 65 6c 6c 6f 20 77 72 69 74 61 62 6c 65>
  5.     setTimeout(()=>{
  6.       next();
  7.     }, 1000)
  8.   }
  9. }
  10. const writable = new MyWritable({
  11.   decodeStrings: true, //默认,这个参数会被设置为true
  12. });
  13. writable.write('hello writable', 'utf-8'); //此时encoding参数不生效, data还是是转换成Buffer处理的(默认)。
复制代码
  1. class MyWritable extends Writable {
  2.   _write(chunk:any, encoding:string, next:()=>void) {
  3.     console.log('encoding:', encoding); // encoding: utf-8
  4.     console.log('Got chunk:', chunk); // Got chunk: hello writable
  5.     setTimeout(()=>{
  6.       next();
  7.     }, 1000)
  8.   }
  9. }
  10. const writable = new MyWritable({
  11.   decodeStrings: false, //设为false
  12. });
  13. writable.write('hello writable', 'utf-8');
  14. //decodeStrings: false时,data才是按encoding='utf-8'处理。此时在内部_write可以发现第二参数encoding会是'utf-8', 第一个参数chunk则是一个字符串。
复制代码
Writable的事件和方法

事件

  • 'close': 当流及其任何底层资源(例如文件描述符)已关闭时,触发。
  • 'error': 如果在写入或管道数据时发生错误,触发。
  • 'drain': 当写入流内部的写入缓冲区被清空(目的地已接收这部分数据,缓冲区长度降为0),典型地,这发生在之前调用​writable.write()​​返回了​​false​​(表示缓冲达到或超过​​highWaterMark​​)之后,一旦缓冲被完全“排空”,就会发出​'drain'​,表示可以安全继续写入。
  1. // 一次性批量写入大量数据,大小达到highWaterMark,令write方法返回false
  2. function writeOneMillionTimes(writer, data, encoding, callback) {
  3.   let i = 1000000;
  4.   write();
  5.   function write() {
  6.     let ok = true;
  7.     do {
  8.       i--;
  9.       if (i === 0) {
  10.         // Last time!
  11.         writer.write(data, encoding, callback);
  12.       } else {
  13.         // 当缓冲区满了,ok=false
  14.         ok = writer.write(data, encoding);
  15.       }
  16.     } while (i > 0 && ok);
  17.     if (i > 0) {
  18.       // 当drain了(即缓冲区被清空了),可以继续写入
  19.       writer.once('drain', write);
  20.     }
  21.   }
  22. }
复制代码

  • 'finish': 在调用 stream.end() 方法之后,并且所有数据都已刷新到底层系统,触发。
  1. const writer = getWritableStreamSomehow();
  2. for (let i = 0; i < 100; i++) {
  3.   writer.write(`hello, #${i}!\n`);
  4. }
  5. writer.on('finish', () => {
  6.   console.log('All writes are now complete.');
  7. });
  8. writer.end('This is the end\n');
复制代码

  • 'pipe': Readable Stream上调用readable.pipe(writable)将数据传输到Writable Stream上时,触发。
  1. const writer = getWritableStreamSomehow();
  2. const reader = getReadableStreamSomehow();
  3. writer.on('pipe', (src) => {
  4.   console.log('Something is piping into the writer.');
  5.   assert.equal(src, reader);
  6. });
  7. reader.pipe(writer); //当流开始传输时触发writer的'pipe'事件
复制代码
方法
方法说明write(chunk[, encoding][, callback])写入数据end([chunk][, encoding][, callback])结束写入cork() / uncork()批量写入优化setDefaultEncoding(encoding)设置默认编码destroy([error])销毁流继承了Writable的Node API


  • 客户端的HTTP请求
  • 服务端的HTTP响应
  • 文件系统的写入流
  • 子进程标准输入
  • process.stdout
服务端HTTP响应示例
  1. import http from 'node:http';
  2. import { Readable } from 'node:stream';
  3. // 创建一个自定义的可读流,用于分批生成数据
  4. class BatchDataStream extends Readable {
  5.   constructor(options = {}) {
  6.     super(options);
  7.     this.dataSize = options.dataSize || 5; // 数据批次数量
  8.     this.currentBatch = 0;
  9.     this.interval = options.interval || 1000; // 每批数据的间隔时间(毫秒)
  10.   }
  11.   _read() {
  12.     // 如果已经发送完所有批次,结束流
  13.     if (this.currentBatch >= this.dataSize) {
  14.       this.push(null); // 表示流结束
  15.       return;
  16.     }
  17.     // 使用setTimeout模拟异步数据生成
  18.     setTimeout(() => {
  19.       const batchNumber = this.currentBatch + 1;
  20.       const data = `这是第 ${batchNumber} 批数据,时间戳: ${new Date().toISOString()}\n`;
  21.       
  22.       console.log(`正在发送第 ${batchNumber} 批数据`);
  23.       
  24.       // 将数据推送到流中
  25.       this.push(data);
  26.       
  27.       this.currentBatch++;
  28.     }, this.interval);
  29.   }
  30. }
  31. // 创建HTTP服务器
  32. const server = http.createServer((req, res) => {
  33.   // 设置响应头
  34.   res.setHeader('Content-Type', 'text/plain; charset=utf-8');
  35.   // res.setHeader('Transfer-Encoding', 'chunked');  //但使用pipe传输数据,会自动设置Transfer-Encoding为chunked,所以这里不需要设置
  36.   
  37.   console.log('收到新的请求,开始流式传输数据...');
  38.   
  39.   // 创建数据流实例
  40.   const dataStream = new BatchDataStream({
  41.     dataSize: 10,    // 总共发送10批数据
  42.     interval: 1000   // 每批数据间隔1秒
  43.   });
  44.   
  45.   // 使用pipe将数据流直接连接到响应对象
  46.   dataStream.pipe(res);
  47.   
  48.   // 当流结束时记录日志
  49.   dataStream.on('end', () => {
  50.     console.log('数据传输完成');
  51.   });
  52. });
  53. // 启动服务器
  54. const PORT = 3000;
  55. server.listen(PORT, () => {
  56.   console.log(`服务器运行在 http://localhost:${PORT}`);
  57.   console.log('请在浏览器中访问此地址,或使用 curl http://localhost:3000 查看流式数据传输');
  58. });
复制代码
运行curl http://localhost:3000可以看到每个1s钟接受一批数据。如下图:
2.png

在网页上也可以查看这个请求对应的响应头的Transfer-Encoding被设置为chunked.(使用pipe会自动设置chunked)
3.png

拓展知识: Transfer-Encoding
Chunked传输编码是HTTP中的一种传输编码方式,它允许服务器将响应数据分成一系列小块(chunks)来传输。每个chunk都有一个头部,用于指示其大小,然后是一个回车换行(CRLF)分隔符,接着是chunk的实际数据,最后再加上一个CRLF分隔符。这个过程一直持续到最后一个chunk,它的大小为0,表示响应数据的结束。
以下是一个示例HTTP响应使用chunked传输编码的样本:
  1. HTTP/1.1 200 OK
  2. Content-Type: text/plain
  3. Transfer-Encoding: chunked
  4. 4\r\n
  5. This\r\n
  6. 7\r\n
  7. is a \r\n
  8. 9\r\n
  9. chunked \r\n
  10. 6\r\n
  11. message\r\n
  12. 0\r\n
  13. \r\n
复制代码
大多数情况下,响应头会带上Content-Length字段(表示响应正文的长度),头Transfer-Encoding: chunked和Content-Length是互斥的,不会同时出现在响应头(如果同时出现Transfer-Encoding优先级是大于Content-Length的)
Chunked传输的使用场景:大文件下载、API响应流(逐渐加载数据)、AI生成内容(文本图像)
Duplex 双工流

自定义Duplex双工流

自定义Duplex需要同时实现_read和_write方法。因为 Duplex 流包含了 Readable 和 Writable两个流,所以要维护两个独立的内部缓冲区,用于读取和写入,允许每一方独立于另一方操作,同时保持适当和高效的数据流。
自定义一个XxxDuplex,可以互相写入数据。
  1. import { Duplex } from 'node:stream';
  2. class XxxDuplex extends Duplex {
  3.   constructor(peer = null, options = {}) {
  4.     super(options);
  5.     this.peer = peer; // 另一端的 Duplex
  6.   }
  7.   // 当可写端接收到数据时
  8.   _write(chunk, encoding, callback) {
  9.     const data = chunk.toString();
  10.     console.log(`[${this.label}] 写入数据:`, data);
  11.     // 把数据发给对端
  12.     if (this.peer) {
  13.       this.peer.push(data);
  14.     }
  15.     callback(); // 通知写操作完成
  16.   }
  17.   // 当可读端被调用时(通常由 .read() 或流消费触发)
  18.   _read(size) {
  19.     // 不做额外操作,等待对端 push()
  20.   }
  21.   // 结束时
  22.   _final(callback) {
  23.     if (this.peer) this.peer.push(null); // 通知对端结束
  24.     callback();
  25.   }
  26. }
  27. // 创建两个互为对端的 Duplex 流
  28. const duplexA = new XxxDuplex();
  29. const duplexB = new XxxDuplex(duplexA);
  30. duplexA.peer = duplexB;
  31. // 加上标识
  32. duplexA.label = 'A';
  33. duplexB.label = 'B';
  34. // 监听 B 的读取
  35. duplexB.on('data', chunk => {
  36.   console.log(`[${duplexB.label}] 收到数据:`, chunk.toString());
  37. });
  38. duplexB.on('end', () => {
  39.   console.log(`[${duplexB.label}] 流结束`);
  40. });
  41. // A 向 B 发送数据
  42. duplexA.write('你好,B!');
  43. duplexA.write('这是一条测试消息');
  44. duplexA.end();
  45. /*
  46. [A] 写入数据: 你好,B!
  47. [A] 写入数据: 这是一条测试消息
  48. [B] 收到数据: 你好,B!
  49. [B] 收到数据: 这是一条测试消息
  50. [B] 流结束
  51. */
复制代码
Duplex和readable&writable相互转换

Duplex和readable&writable互相转换
使用stream.Duplex.fromWeb(pair[, options])将readable和writable转为duplex。
  1. import { Duplex } from 'node:stream';
  2. import {
  3.   ReadableStream,
  4.   WritableStream,
  5. } from 'node:stream/web';
  6. const readable = new ReadableStream({
  7.   start(controller) {
  8.     controller.enqueue('world'); //设置readable buffer的初始数据
  9.   },
  10. });
  11. const writable = new WritableStream({
  12.   write(chunk) {
  13.     console.log('writable', chunk); //writable hello
  14.   },
  15. });
  16. const pair = {
  17.   readable,
  18.   writable,
  19. };
  20. //encoding: 'utf8'表示以utf8编码工作,objectMode:true表示以对象模式工作
  21. const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
  22. duplex.write('hello');
  23. duplex.on('data', (chunk) => {
  24.   console.log('readable', chunk);  //readable world
  25. });
复制代码
使用stream.Duplex.toWeb(streamDuplex)将duplex拆分成两个流
  1. import { Duplex } from 'node:stream';
  2. const duplex = Duplex({
  3.   objectMode: true,
  4.   read() {
  5.     this.push('world');
  6.     this.push(null);
  7.   },
  8.   write(chunk, encoding, callback) {
  9.     console.log('writable', chunk);
  10.     callback();
  11.   },
  12. });
  13. const { readable, writable } = Duplex.toWeb(duplex);
  14. writable.getWriter().write('hello');
  15. const { value } = await readable.getReader().read();
  16. console.log('readable', value);
复制代码
属于Duplex流的Node API


  • TCP套接字
  • zlib 流
  • 加密流
TCP套接字示例:
  1. import net from 'node:net';
  2. /** 服务端 */
  3. const server = net.createServer(function(clientSocket){
  4.     // clientSocket 就是一个 duplex 流
  5.     console.log('新的客户端 socket 连接');
  6.     clientSocket.on('data', function(data){
  7.         console.log(`服务端收到数据: ${data.toString()}`);
  8.         clientSocket.write('world!');
  9.     });
  10.     clientSocket.on('end', function(){
  11.         console.log('连接中断');
  12.     });
  13. });
  14. server.listen(6666, 'localhost', function(){
  15.     const address = server.address();
  16.         console.log('服务端启动,地址为:%j', address);
  17. });
  18. /** 客户端 */
  19. // socket 就是一个 duplex 流
  20. const socket = net.createConnection({
  21.     host: 'localhost',
  22.     port: 6666
  23. }, () => {
  24.   console.log('连接到了服务端!');
  25.   socket.write('hello');
  26.   setTimeout(()=> {
  27.     socket.end();
  28.   }, 2000);
  29. });
  30. socket.on('data', (data) => {
  31.   console.log(`客户端收到数据: ${data.toString()}`);
  32. });
  33. socket.on('end', () => {
  34.   console.log('断开连接');
  35. });
复制代码
Transform 转换流

自定义Transform流

Transform 流是一种双工流的特殊子类(和Duplex 双工流一样同时实现 Readable 和 Writable 接口)。那么Transform流和Duplex流的关联和区别?
关联:stream.Transform继承了stream.Duplex,并实现了自己的_read和_write方法。
区别:
类型特点(区别)用途关键方法Duplex 流读写互相独立,输入和输出没有直接关系双向通信数据处理Transform 流输入和输出相关:写入的数据经过处理后再输出read() / write()transform(chunk, encoding, callback)也就是说,Duplex是输入输出流两部分独立(不干扰,同时进行);而Transform同样有输入和输出流两部分,但是Node会自动将输出流缓冲区的内容写入输入流缓冲区。
  1. Writable Buffer
  2.     ↓ (消费)
  3. transform(chunk)
  4.     ↓ (push)
  5. Readable Buffer
复制代码
实现自定义的Transfrom流则需要实现_transfrom方法,举个例子:
  1. import  {Transform} from 'node:stream'
  2. class UpTransform extends Transform {
  3.   constructor() {
  4.     super();
  5.   }
  6.   _transform(chunk, enc, next) {
  7.     console.log('enc', enc); // enc buffer
  8.     this.push(chunk.toString().toUpperCase());
  9.     next();
  10.   }
  11. }
  12. const t = new UpTransform();
  13. t.write('abc');   // 写入 writable buffer
  14. t.end();
  15. // 从 readable buffer 读取数据
  16. t.on('data', (chunk) => console.log('Read:', chunk.toString()));
复制代码
也用Transform初始化传参的方式创建一个自定的Transfrom实例:
  1. const t = new Transform({
  2.   transform(chunk, enc, next) {
  3.     console.log('enc', enc); // enc buffer
  4.     this.push(chunk.toString().toUpperCase());
  5.     next();
  6.   }
  7. });
复制代码
_transform调用的时机

当使用Transform流往输入缓存区写入数据时,会调用_transform方法进行转换。
比如上面UpTransform那个例子,当t.write('abc');时就会触发_transform。
在pipe方法中使用Transform流,会调用_transform方法进行转换。
属于Transfrom流的Node API


  • zlib 流
  • 加密流
zlib流示例:
  1. const fs = require('node:fs');
  2. const zlib = require('node:zlib');
  3. const r = fs.createReadStream('file.txt');
  4. const z = zlib.createGzip(); //z是一个Transform流
  5. const w = fs.createWriteStream('file.txt.gz');
  6. r.pipe(z).pipe(w);
复制代码
缓冲区、高压线和背压问题

缓冲区、高压线

首先介绍下缓冲区,Readable/Writable内部维护了一个队列数据叫缓冲区。
高压线(highWaterMark)是Readable/Writable内部的一个阈值(可在初始化时修改)。用来告诉流缓冲区的数据大小不应该超过这个值。
背压问题解释

背压问题:当内部缓冲区的大小超过highWaterMark阈值,然后持续扩大,占用原来越多内存,甚至最后出现内存溢出。大白话来说就是缓存积压问题。
一旦内部读取缓冲区的总大小达到 highWaterMark 指定的阈值,则流将暂时停止从底层资源读取数据,直到可以消费当前缓冲的数据(也就是,流将停止调用内部的用于填充读取缓冲区 readable._read() 方法)。
这里我们会有一个疑问就是:readable流不是会停止读数据到缓冲区吗,怎么还有背压问题?
以下是GPT的解释,我消化总结下:

  • Readable 流有两种操作模式:flowing 和 paused,flowing模式下无法有效处理背压问题,因为不能暂停流的读(不能停止调用_read)。所以on('data')这种方式是无法处理背压问题的,它会持续不断的把数据积压到缓冲区。
  • 在paused模式下,可以调用pause()方法暂停流的读(pipe就是这个原理),从而可以做到处理背压问题,但是也只能是缓解。像文件流这种是可控制的,能立即停止从文件读取数据;但像Socket流,则不能立即停止数据的接受,但会:暂停从内核socket缓冲区中读取 & 在TCP层通过窗口机制通知发送端"别发那么快"。
pipe() 如何处理 Readable 流的背压?
readable.pipe(writable):

  • 如果 Writable 流的缓冲区满了(返回 false),pipe() 会自动调用 Readable.pause()
  • 当 Writable 流排空缓冲区并发出 'drain' 事件时,pipe() 会调用 Readable.resume()
  • 这样就在两个流之间建立了一个自动的背压处理机制
总结来说,虽然 Readable 流确实会在缓冲区达到 highWaterMark 时尝试暂停底层读取,但这只是背压处理的一部分。完整的背压处理需要整个流管道中的所有组件协同工作,而 pipe() 方法正是为了简化这种协同而设计的。
当重复调用 writable.write(chunk) 方法时,数据会缓存在 Writable 流中。虽然内部的写入缓冲区的总大小低于 highWaterMark 设置的阈值,但对 writable.write() 的调用将返回 true。一旦内部缓冲区的大小达到或超过 highWaterMark,则将返回 false。
下面这个例子就是Writable的背压问题解决(「Writable Stream」这节出现过的例子,来自官方文档)
  1. // 一次性批量写入大量数据,大小达到highWaterMark,令write方法返回false
  2. function writeOneMillionTimes(writer, data, encoding, callback) {
  3.   let i = 1000000;
  4.   write();
  5.   function write() {
  6.     let ok = true;
  7.     do {
  8.       i--;
  9.       if (i === 0) {
  10.         // Last time!
  11.         writer.write(data, encoding, callback);
  12.       } else {
  13.         // 当缓冲区满了,ok=false
  14.         ok = writer.write(data, encoding);
  15.       }
  16.     } while (i > 0 && ok);
  17.     if (i > 0) {
  18.       // 当drain了(即缓冲区被清空了),可以继续写入
  19.       writer.once('drain', write);
  20.     }
  21.   }
  22. }
复制代码
如何解决背压问题

方式一:手动拉数据来控制
  1. readable.on('readable', () => {
  2.   let chunk;
  3.   while (null !== (chunk = readable.read())) {
  4.     processChunk(chunk);
  5.   }
  6. });
复制代码
方式二:使用 pipe()(自动处理)
  1. readable.pipe(writable);
复制代码
方式三:使用 await 迭代(自动处理)
  1. for await (const chunk of readable) {
  2.   await processChunk(chunk); // 每次 await 都自然暂停上游读取
  3. }
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册