Node.js Stream 模块

Java FileNode.js 内置模块


Stream(流)是 Node.js 中处理流式数据的抽象接口。你可以把流想象成水流,数据就像水一样从一个地方流向另一个地方。这种处理方式特别适合处理大文件或连续的数据源,因为它不需要一次性加载所有数据到内存中。

为什么需要 Stream?

  1. 内存效率:处理大文件时,不用一次性加载整个文件到内存
  2. 时间效率:可以边接收边处理数据,不需要等待所有数据都准备好
  3. 组合性:多个流可以像管道一样连接起来,形成数据处理流水线

Stream 的四种基本类型

1. Readable Stream(可读流)

可读流是数据的源头,可以从中读取数据。例如从文件读取内容或从 HTTP 请求获取数据。

实例

const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');

readableStream.on('data', (chunk) => {
  console.log(`接收到 ${chunk.length} 字节的数据`);
});

readableStream.on('end', () => {
  console.log('没有更多数据了');
});

2. Writable Stream(可写流)

可写流是数据的目的地,可以向其中写入数据。例如写入文件或发送 HTTP 响应。

实例

const fs = require('fs');
const writableStream = fs.createWriteStream('output.txt');

writableStream.write('第一行数据\n');
writableStream.write('第二行数据\n');
writableStream.end('最后一行数据');

3. Duplex Stream(双工流)

双工流既是可读的也是可写的,例如 TCP 套接字。

实例

const { Duplex } = require('stream');

const myDuplex = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },
 
  read(size) {
    this.push('从双工流读取的数据');
    this.push(null); // 表示数据结束
  }
});

4. Transform Stream(转换流)

转换流是一种特殊的双工流,它在读写过程中可以修改或转换数据。例如压缩或加密数据。

实例

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

Stream 的常用方法

pipe() 方法

pipe() 方法是 Stream 最强大的特性之一,它可以将多个流连接起来形成管道。

实例

const fs = require('fs');

// 创建一个读取流和一个写入流
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');

// 使用 pipe 将读取流和写入流连接起来
readStream.pipe(writeStream);

writeStream.on('finish', () => {
  console.log('数据写入完成');
});

事件监听

Stream 是 EventEmitter 的实例,可以通过监听事件来处理各种情况:

  • data - 当有数据可读时触发
  • end - 当没有更多数据可读时触发
  • error - 当发生错误时触发
  • finish - 当所有数据已刷新到底层系统时触发

实际应用示例

示例 1:文件复制

实例

const fs = require('fs');

function copyFile(source, target, cb) {
  // 创建读写流
  const rd = fs.createReadStream(source);
  const wr = fs.createWriteStream(target);

  // 处理错误
  rd.on('error', err => cb(err));
  wr.on('error', err => cb(err));
 
  // 完成回调
  wr.on('finish', cb);
 
  // 开始复制
  rd.pipe(wr);
}

// 使用
copyFile('source.txt', 'target.txt', err => {
  if (err) console.error(err);
  else console.log('复制完成');
});

示例 2:HTTP 服务器压缩文件

实例

const fs = require('fs');
const http = require('http');
const zlib = require('zlib');

http.createServer((req, res) => {
  // 创建读取流
  const readStream = fs.createReadStream('./largeFile.txt');
 
  // 设置响应头
  res.writeHead(200, {
    'Content-Type': 'text/plain',
    'Content-Encoding': 'gzip'
  });
 
  // 管道链:读取 -> 压缩 -> 响应
  readStream.pipe(zlib.createGzip()).pipe(res);
}).listen(3000);

Stream 的最佳实践

  1. 错误处理:始终为流添加错误处理监听器
  2. 内存管理:对于大文件,使用流而不是一次性读取
  3. 管道链:合理使用 pipe() 方法组合多个流
  4. 背压处理:当写入速度跟不上读取速度时,需要适当处理背压

错误处理示例

实例

const fs = require('fs');

const readStream = fs.createReadStream('not-exist-file.txt');

readStream.on('error', (err) => {
  console.error('发生错误:', err.message);
});

通过掌握 Node.js 的 Stream 模块,你可以高效地处理各种 I/O 操作,特别是在处理大文件或实时数据时,流将成为你的强大工具。

Java FileNode.js 内置模块