Node.js 多进程
我们都知道 Node.js 是以单线程的模式运行的,但它使用的是事件驱动来处理并发,这样有助于我们在多核 cpu 的系统上创建多个子进程,从而提高性能。
每个子进程总是带有三个流对象:child.stdin, child.stdout 和child.stderr。他们可能会共享父进程的 stdio 流,或者也可以是独立的被导流的流对象。
Node 提供了 child_process 与 cluster 模块来创建子进程。
child_process
模块
exec - child_process.exec 使用子进程执行命令,缓存子进程的输出,并将子进程的输出以回调函数参数的形式返回。
execFile - 直接执行可执行文件,略安全。
spawn - child_process.spawn 使用指定的命令行参数创建新进程。
fork - child_process.fork 是 spawn()的特殊形式,用于在子进程中运行的模块,如 fork('./son.js') 相当于 spawn('node', ['./son.js']) 。与spawn方法不同的是,fork会在父进程与子进程之间,建立一个通信管道,用于进程之间的通信。
cluster
模块
- 在一个端口上 多工作进程 共享监听,实现 一核一进程 的 HTTP 服务。
- 适合把 Web 服务横向吃满多核。
- 备注:官方更推荐新项目仔细评估(如直接用反向代理 + 多进程、或
worker_threads
做 CPU 密集),但cluster
依旧广泛可用。
exec() 方法
child_process.exec 使用子进程执行命令,缓存子进程的输出,并将子进程的输出以回调函数参数的形式返回。
语法如下所示:
child_process.exec(command[, options], callback)
参数
参数说明如下:
command: 字符串, 将要运行的命令,参数使用空格隔开
options :对象,可以是:
- cwd ,字符串,子进程的当前工作目录
- env,对象 环境变量键值对
- encoding ,字符串,字符编码(默认: 'utf8')
- shell ,字符串,将要执行命令的 Shell(默认: 在 UNIX 中为
/bin/sh
, 在 Windows 中为cmd.exe
, Shell 应当能识别-c
开关在 UNIX 中,或/s /c
在 Windows 中。 在Windows 中,命令行解析应当能兼容cmd.exe
) - timeout,数字,超时时间(默认: 0)
- maxBuffer,数字, 在 stdout 或 stderr 中允许存在的最大缓冲(二进制),如果超出那么子进程将会被杀死 (默认: 200*1024)
- killSignal ,字符串,结束信号(默认:'SIGTERM')
- uid,数字,设置用户进程的 ID
- gid,数字,设置进程组的 ID
callback :回调函数,包含三个参数error, stdout 和 stderr。
exec() 方法返回最大的缓冲区,并等待进程结束,一次性返回缓冲区的内容。
实例
让我们创建两个 js 文件 support.js 和 master.js。
support.js 文件代码:
master.js 文件代码:
执行以上代码,输出结果为:
$ node master.js 子进程已退出,退出码 0 stdout: 进程 1 执行。 stderr: 子进程已退出,退出码 0 stdout: 进程 0 执行。 stderr: 子进程已退出,退出码 0 stdout: 进程 2 执行。 stderr:
解析:
exec
会启动 3 个子进程,分别执行:
node support.js 0
node support.js 1
node support.js 2
每个子进程运行后立刻输出:
进程 0 执行。 进程 1 执行。 进程 2 执行。
这部分是子进程的 标准输出(stdout),会被缓冲,等子进程结束时一起传回 exec
的回调。
子进程退出时会先触发 exit
事件,然后 exec
的回调才会执行(注意:Node.js 文档里说明 exit
事件发生在进程结束时,而 exec
的回调是 stdio
流关闭后触发,通常 exit 会先于回调打印)。
输出顺序是不确定的,因为 3 个子进程并行执行,谁先结束取决于操作系统调度。
spawn() 方法
child_process.spawn 使用指定的命令行参数创建新进程,语法格式如下:
child_process.spawn(command[, args][, options])
参数
参数说明如下:
command: 将要运行的命令
args: Array 字符串参数数组
options Object
- cwd String 子进程的当前工作目录
- env Object 环境变量键值对
- stdio Array|String 子进程的 stdio 配置
- detached Boolean 这个子进程将会变成进程组的领导
- uid Number 设置用户进程的 ID
- gid Number 设置进程组的 ID
spawn() 方法返回流 (stdout & stderr),在进程返回大量数据时使用。进程一旦开始执行时 spawn() 就开始接收响应。
实例
让我们创建两个 js 文件 support.js 和 master.js。
support.js 文件代码:
master.js 文件代码:
执行以上代码,输出结果为:
$ node master.js stdout: 进程 0 执行。 子进程已退出,退出码 0 stdout: 进程 1 执行。 子进程已退出,退出码 0 stdout: 进程 2 执行。 子进程已退出,退出码 0
fork 方法
child_process.fork 是 spawn() 方法的特殊形式,用于创建进程,语法格式如下:
child_process.fork(modulePath[, args][, options])
参数
参数说明如下:
modulePath: String,将要在子进程中运行的模块
args: Array 字符串参数数组
options:Object
- cwd String 子进程的当前工作目录
- env Object 环境变量键值对
- execPath String 创建子进程的可执行文件
- execArgv Array 子进程的可执行文件的字符串参数数组(默认: process.execArgv)
- silent Boolean 如果为
true
,子进程的stdin
,stdout
和stderr
将会被关联至父进程,否则,它们将会从父进程中继承。(默认为:false
) - uid Number 设置用户进程的 ID
- gid Number 设置进程组的 ID
返回的对象除了拥有ChildProcess实例的所有方法,还有一个内建的通信信道。
实例
让我们创建两个 js 文件 support.js 和 master.js。
support.js 文件代码:
master.js 文件代码:
执行以上代码,输出结果为:
$ node master.js 进程 0 执行。 子进程已退出,退出码 0 进程 1 执行。 子进程已退出,退出码 0 进程 2 执行。 子进程已退出,退出码 0
cluster:一端口多进程的 HTTP 服务
最小可用的多核 HTTP 服务
实例
import cluster from 'node:cluster';
import os from 'node:os';
import http from 'node:http';
import { pbkdf2Sync } from 'node:crypto';
if (cluster.isPrimary) {
const cpuCount = Math.max(1, os.cpus().length);
console.log(`主进程 ${process.pid},启动 ${cpuCount} 个工作进程`);
for (let i = 0; i < cpuCount; i++) cluster.fork();
cluster.on('exit', (worker, code) => {
console.warn(`工作进程 ${worker.process.pid} 退出(code=${code}),重启中…`);
cluster.fork();
});
} else {
const server = http.createServer((req, res) => {
// 模拟 CPU 密集计算
pbkdf2Sync('password', 'salt', 100_000, 64, 'sha512');
res.writeHead(200, {'content-type':'text/plain; charset=utf-8'});
res.end(`Handled by worker ${process.pid}\n`);
});
server.listen(3000, () => {
console.log(`工作进程 ${process.pid} 监听 3000`);
});
}
运行:node server-cluster.js,多次请求 http://localhost:3000 会看到不同 PID 响应。
注意:线上多实例一般前置反向代理/负载均衡(Nginx、Envoy、K8s Service),或由 cluster 在一个端口聚合。
WebSocket/粘性会话需要"sticky session"(同一客户端落到同一 worker)。可以用四层负载均衡的源地址哈希,或在应用层自己做粘滞(如按 req.socket.remoteAddress 做分发)。
优雅重启(零停机思路)
思路:主进程收到重载信号 → 先 fork 出新 worker 并等待其 listening → 再 disconnect 旧 worker,等请求跑完退出。
实例
process.on('SIGUSR2', async () => {
console.log('收到 SIGUSR2,开始优雅重启');
const workers = Object.values(cluster.workers ?? {});
// 1) 启新
const fresh = cluster.fork();
await new Promise(r => fresh.once('listening', r));
// 2) 逐个下线旧的
for (const w of workers) {
w?.disconnect();
// 超时还没退就强杀
setTimeout(() => w?.process.kill('SIGKILL'), 5000);
}
});
Windows 没有 SIGUSR2,可用 HTTP / RPC 管理接口触发重载。
自建「进程池」:控制并发、复用子进程
当你有大量独立 CPU 任务(如批量压缩、加密、爬虫解析),频繁 fork/spawn 成本高;这时需要 Process Pool 复用若干子进程,像"线程池"一样限流。
文件:pool/worker.js
实例
process.on('message', async (msg) => {
if (msg.type === 'task') {
const { id, payload } = msg;
// 模拟重任务(斐波那契)
const fib = (n) => (n <= 1 ? n : fib(n-1) + fib(n-2));
const result = fib(payload.n);
process.send({ type: 'done', id, result });
}
});
文件:pool/index.js
实例
import { fork } from 'node:child_process';
import os from 'node:os';
export class ProcessPool {
constructor({ file, size = Math.max(1, os.cpus().length - 1) } = {}) {
this.file = file;
this.size = size;
this.idle = [];
this.busy = new Map(); // worker -> taskId
this.queue = [];
for (let i = 0; i < size; i++) this._spawn();
}
_spawn() {
const w = fork(this.file);
w.on('message', (m) => {
if (m?.type === 'done') {
const cb = this.callbacks.get(m.id);
if (cb) cb.resolve(m.result);
this.callbacks.delete(m.id);
this._markIdle(w);
this._drain();
}
});
w.on('exit', () => {
// 自动补齐池子
this.busy.delete(w);
const idx = this.idle.indexOf(w);
if (idx >= 0) this.idle.splice(idx, 1);
this._spawn();
});
if (!this.callbacks) this.callbacks = new Map();
this.idle.push(w);
}
_markIdle(w) {
this.busy.delete(w);
if (!this.idle.includes(w)) this.idle.push(w);
}
_acquire() {
return this.idle.length ? this.idle.shift() : null;
}
_drain() {
while (this.queue.length && this.idle.length) {
const { id, payload, resolve, reject } = this.queue.shift();
const w = this._acquire();
this.busy.set(w, id);
this.callbacks.set(id, { resolve, reject });
w.send({ type: 'task', id, payload });
}
}
runTask(payload) {
const id = Math.random().toString(36).slice(2);
return new Promise((resolve, reject) => {
this.queue.push({ id, payload, resolve, reject });
this._drain();
});
}
close() {
for (const w of this.idle) w.kill();
for (const w of this.busy.keys()) w.kill();
}
}
文件:pool/demo.js
实例
import { ProcessPool } from './index.js';
const pool = new ProcessPool({ file: './pool/worker.js', size: 4 });
const tasks = Array.from({ length: 10 }, (_, i) => pool.runTask({ n: 35 + (i % 3) }));
const t0 = Date.now();
const results = await Promise.all(tasks);
console.log('结果:', results);
console.log('耗时(ms):', Date.now() - t0);
await new Promise(r => setTimeout(r, 100)); // 等待消息刷完
pool.close();
说明:
- 把"任务"封装成消息,进程池负责分配与复用 worker。
- 真实业务可把 解析/转码/压缩 封装到
worker.js
里。 - 注意 任务超时、重试、幂等 与 背压(队列长度上限)。
实战小例:HTTP 接口把重任务扔进进程池
实例
import http from 'node:http';
import { ProcessPool } from './pool/index.js';
const pool = new ProcessPool({ file: './pool/worker.js', size: 4 });
const server = http.createServer(async (req, res) => {
if (req.url?.startsWith('/fib?')) {
const url = new URL(req.url, 'http://localhost');
const n = Number(url.searchParams.get('n') || 35);
try {
const result = await pool.runTask({ n });
res.writeHead(200, {'content-type': 'application/json'});
res.end(JSON.stringify({ pid: process.pid, n, result }));
} catch (e) {
res.writeHead(500); res.end('error');
}
} else {
res.writeHead(404).end('Not Found');
}
});
server.listen(3000, () => console.log('http://localhost:3000'));
访问:/fib?n=38 等,主进程不被阻塞,任务在子进程里算。
点我分享笔记