Node.js Stream 模块
Stream(流)是 Node.js 中处理流式数据的抽象接口。你可以把流想象成水流,数据就像水一样从一个地方流向另一个地方。这种处理方式特别适合处理大文件或连续的数据源,因为它不需要一次性加载所有数据到内存中。
为什么需要 Stream?
- 内存效率:处理大文件时,不用一次性加载整个文件到内存
- 时间效率:可以边接收边处理数据,不需要等待所有数据都准备好
- 组合性:多个流可以像管道一样连接起来,形成数据处理流水线
Stream 的四种基本类型
1. Readable Stream(可读流)
可读流是数据的源头,可以从中读取数据。例如从文件读取内容或从 HTTP 请求获取数据。
实例
const fs = require('fs');
const readableStream = fs.createReadStream('example.txt');
readableStream.on('data', (chunk) => {
console.log(`接收到 ${chunk.length} 字节的数据`);
});
readableStream.on('end', () => {
console.log('没有更多数据了');
});
const readableStream = fs.createReadStream('example.txt');
readableStream.on('data', (chunk) => {
console.log(`接收到 ${chunk.length} 字节的数据`);
});
readableStream.on('end', () => {
console.log('没有更多数据了');
});
2. Writable Stream(可写流)
可写流是数据的目的地,可以向其中写入数据。例如写入文件或发送 HTTP 响应。
实例
const fs = require('fs');
const writableStream = fs.createWriteStream('output.txt');
writableStream.write('第一行数据\n');
writableStream.write('第二行数据\n');
writableStream.end('最后一行数据');
const writableStream = fs.createWriteStream('output.txt');
writableStream.write('第一行数据\n');
writableStream.write('第二行数据\n');
writableStream.end('最后一行数据');
3. Duplex Stream(双工流)
双工流既是可读的也是可写的,例如 TCP 套接字。
实例
const { Duplex } = require('stream');
const myDuplex = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push('从双工流读取的数据');
this.push(null); // 表示数据结束
}
});
const myDuplex = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push('从双工流读取的数据');
this.push(null); // 表示数据结束
}
});
4. Transform Stream(转换流)
转换流是一种特殊的双工流,它在读写过程中可以修改或转换数据。例如压缩或加密数据。
实例
const { Transform } = require('stream');
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);
Stream 的常用方法
pipe() 方法
pipe()
方法是 Stream 最强大的特性之一,它可以将多个流连接起来形成管道。
实例
const fs = require('fs');
// 创建一个读取流和一个写入流
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');
// 使用 pipe 将读取流和写入流连接起来
readStream.pipe(writeStream);
writeStream.on('finish', () => {
console.log('数据写入完成');
});
// 创建一个读取流和一个写入流
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');
// 使用 pipe 将读取流和写入流连接起来
readStream.pipe(writeStream);
writeStream.on('finish', () => {
console.log('数据写入完成');
});
事件监听
Stream 是 EventEmitter 的实例,可以通过监听事件来处理各种情况:
data
- 当有数据可读时触发end
- 当没有更多数据可读时触发error
- 当发生错误时触发finish
- 当所有数据已刷新到底层系统时触发
实际应用示例
示例 1:文件复制
实例
const fs = require('fs');
function copyFile(source, target, cb) {
// 创建读写流
const rd = fs.createReadStream(source);
const wr = fs.createWriteStream(target);
// 处理错误
rd.on('error', err => cb(err));
wr.on('error', err => cb(err));
// 完成回调
wr.on('finish', cb);
// 开始复制
rd.pipe(wr);
}
// 使用
copyFile('source.txt', 'target.txt', err => {
if (err) console.error(err);
else console.log('复制完成');
});
function copyFile(source, target, cb) {
// 创建读写流
const rd = fs.createReadStream(source);
const wr = fs.createWriteStream(target);
// 处理错误
rd.on('error', err => cb(err));
wr.on('error', err => cb(err));
// 完成回调
wr.on('finish', cb);
// 开始复制
rd.pipe(wr);
}
// 使用
copyFile('source.txt', 'target.txt', err => {
if (err) console.error(err);
else console.log('复制完成');
});
示例 2:HTTP 服务器压缩文件
实例
const fs = require('fs');
const http = require('http');
const zlib = require('zlib');
http.createServer((req, res) => {
// 创建读取流
const readStream = fs.createReadStream('./largeFile.txt');
// 设置响应头
res.writeHead(200, {
'Content-Type': 'text/plain',
'Content-Encoding': 'gzip'
});
// 管道链:读取 -> 压缩 -> 响应
readStream.pipe(zlib.createGzip()).pipe(res);
}).listen(3000);
const http = require('http');
const zlib = require('zlib');
http.createServer((req, res) => {
// 创建读取流
const readStream = fs.createReadStream('./largeFile.txt');
// 设置响应头
res.writeHead(200, {
'Content-Type': 'text/plain',
'Content-Encoding': 'gzip'
});
// 管道链:读取 -> 压缩 -> 响应
readStream.pipe(zlib.createGzip()).pipe(res);
}).listen(3000);
Stream 的最佳实践
- 错误处理:始终为流添加错误处理监听器
- 内存管理:对于大文件,使用流而不是一次性读取
- 管道链:合理使用 pipe() 方法组合多个流
- 背压处理:当写入速度跟不上读取速度时,需要适当处理背压
错误处理示例
实例
const fs = require('fs');
const readStream = fs.createReadStream('not-exist-file.txt');
readStream.on('error', (err) => {
console.error('发生错误:', err.message);
});
const readStream = fs.createReadStream('not-exist-file.txt');
readStream.on('error', (err) => {
console.error('发生错误:', err.message);
});
通过掌握 Node.js 的 Stream 模块,你可以高效地处理各种 I/O 操作,特别是在处理大文件或实时数据时,流将成为你的强大工具。
点我分享笔记