Node.js 多进程

我们都知道 Node.js 是以单线程的模式运行的,但它使用的是事件驱动来处理并发,这样有助于我们在多核 cpu 的系统上创建多个子进程,从而提高性能。

每个子进程总是带有三个流对象:child.stdin, child.stdout 和child.stderr。他们可能会共享父进程的 stdio 流,或者也可以是独立的被导流的流对象。

Node 提供了 child_process 与 cluster 模块来创建子进程。

child_process 模块

  • exec - child_process.exec 使用子进程执行命令,缓存子进程的输出,并将子进程的输出以回调函数参数的形式返回。

  • execFile - 直接执行可执行文件,略安全。

  • spawn - child_process.spawn 使用指定的命令行参数创建新进程。

  • fork - child_process.fork 是 spawn()的特殊形式,用于在子进程中运行的模块,如 fork('./son.js') 相当于 spawn('node', ['./son.js']) 。与spawn方法不同的是,fork会在父进程与子进程之间,建立一个通信管道,用于进程之间的通信。

cluster 模块

  • 在一个端口上 多工作进程 共享监听,实现 一核一进程 的 HTTP 服务。
  • 适合把 Web 服务横向吃满多核。
  • 备注:官方更推荐新项目仔细评估(如直接用反向代理 + 多进程、或 worker_threads 做 CPU 密集),但 cluster 依旧广泛可用。

exec() 方法

child_process.exec 使用子进程执行命令,缓存子进程的输出,并将子进程的输出以回调函数参数的形式返回。

语法如下所示:

child_process.exec(command[, options], callback)

参数

参数说明如下:

command: 字符串, 将要运行的命令,参数使用空格隔开

options :对象,可以是:

  • cwd ,字符串,子进程的当前工作目录
  • env,对象 环境变量键值对
  • encoding ,字符串,字符编码(默认: 'utf8')
  • shell ,字符串,将要执行命令的 Shell(默认: 在 UNIX 中为/bin/sh, 在 Windows 中为cmd.exe, Shell 应当能识别 -c开关在 UNIX 中,或 /s /c 在 Windows 中。 在Windows 中,命令行解析应当能兼容cmd.exe
  • timeout,数字,超时时间(默认: 0)
  • maxBuffer,数字, 在 stdout 或 stderr 中允许存在的最大缓冲(二进制),如果超出那么子进程将会被杀死 (默认: 200*1024)
  • killSignal ,字符串,结束信号(默认:'SIGTERM')
  • uid,数字,设置用户进程的 ID
  • gid,数字,设置进程组的 ID

callback :回调函数,包含三个参数error, stdout 和 stderr。

exec() 方法返回最大的缓冲区,并等待进程结束,一次性返回缓冲区的内容。

实例

让我们创建两个 js 文件 support.js 和 master.js。

support.js 文件代码:

console.log("进程 " + process.argv[2] + " 执行。" );

master.js 文件代码:

const fs = require('fs'); const child_process = require('child_process'); for(var i=0; i<3; i++) { var workerProcess = child_process.exec('node support.js '+i, function (error, stdout, stderr) { if (error) { console.log(error.stack); console.log('Error code: '+error.code); console.log('Signal received: '+error.signal); } console.log('stdout: ' + stdout); console.log('stderr: ' + stderr); }); workerProcess.on('exit', function (code) { console.log('子进程已退出,退出码 '+code); }); }

执行以上代码,输出结果为:

$ node master.js 
子进程已退出,退出码 0
stdout: 进程 1 执行。

stderr: 
子进程已退出,退出码 0
stdout: 进程 0 执行。

stderr: 
子进程已退出,退出码 0
stdout: 进程 2 执行。

stderr: 

解析:

exec 会启动 3 个子进程,分别执行:

  • node support.js 0
  • node support.js 1
  • node support.js 2

每个子进程运行后立刻输出:

进程 0 执行。
进程 1 执行。
进程 2 执行。

这部分是子进程的 标准输出(stdout),会被缓冲,等子进程结束时一起传回 exec 的回调。

子进程退出时会先触发 exit 事件,然后 exec 的回调才会执行(注意:Node.js 文档里说明 exit 事件发生在进程结束时,而 exec 的回调是 stdio 流关闭后触发,通常 exit 会先于回调打印)。

输出顺序是不确定的,因为 3 个子进程并行执行,谁先结束取决于操作系统调度。


spawn() 方法

child_process.spawn 使用指定的命令行参数创建新进程,语法格式如下:

child_process.spawn(command[, args][, options])

参数

参数说明如下:

command: 将要运行的命令

args: Array 字符串参数数组

options Object

  • cwd String 子进程的当前工作目录
  • env Object 环境变量键值对
  • stdio Array|String 子进程的 stdio 配置
  • detached Boolean 这个子进程将会变成进程组的领导
  • uid Number 设置用户进程的 ID
  • gid Number 设置进程组的 ID

spawn() 方法返回流 (stdout & stderr),在进程返回大量数据时使用。进程一旦开始执行时 spawn() 就开始接收响应。

实例

让我们创建两个 js 文件 support.js 和 master.js。

support.js 文件代码:

console.log("进程 " + process.argv[2] + " 执行。" );

master.js 文件代码:

const fs = require('fs'); const child_process = require('child_process'); for(var i=0; i<3; i++) { var workerProcess = child_process.spawn('node', ['support.js', i]); workerProcess.stdout.on('data', function (data) { console.log('stdout: ' + data); }); workerProcess.stderr.on('data', function (data) { console.log('stderr: ' + data); }); workerProcess.on('close', function (code) { console.log('子进程已退出,退出码 '+code); }); }

执行以上代码,输出结果为:

$ node master.js stdout: 进程 0 执行。

子进程已退出,退出码 0
stdout: 进程 1 执行。

子进程已退出,退出码 0
stdout: 进程 2 执行。

子进程已退出,退出码 0

fork 方法

child_process.fork 是 spawn() 方法的特殊形式,用于创建进程,语法格式如下:

child_process.fork(modulePath[, args][, options])

参数

参数说明如下:

modulePath: String,将要在子进程中运行的模块

args: Array 字符串参数数组

options:Object

  • cwd String 子进程的当前工作目录
  • env Object 环境变量键值对
  • execPath String 创建子进程的可执行文件
  • execArgv Array 子进程的可执行文件的字符串参数数组(默认: process.execArgv)
  • silent Boolean 如果为true,子进程的stdinstdoutstderr将会被关联至父进程,否则,它们将会从父进程中继承。(默认为:false
  • uid Number 设置用户进程的 ID
  • gid Number 设置进程组的 ID

返回的对象除了拥有ChildProcess实例的所有方法,还有一个内建的通信信道。

实例

让我们创建两个 js 文件 support.js 和 master.js。

support.js 文件代码:

console.log("进程 " + process.argv[2] + " 执行。" );

master.js 文件代码:

const fs = require('fs'); const child_process = require('child_process'); for(var i=0; i<3; i++) { var worker_process = child_process.fork("support.js", [i]); worker_process.on('close', function (code) { console.log('子进程已退出,退出码 ' + code); }); }

执行以上代码,输出结果为:

$ node master.js 
进程 0 执行。
子进程已退出,退出码 0
进程 1 执行。
子进程已退出,退出码 0
进程 2 执行。
子进程已退出,退出码 0

cluster:一端口多进程的 HTTP 服务

最小可用的多核 HTTP 服务

实例

// server-cluster.js
import cluster from 'node:cluster';
import os from 'node:os';
import http from 'node:http';
import { pbkdf2Sync } from 'node:crypto';

if (cluster.isPrimary) {
  const cpuCount = Math.max(1, os.cpus().length);
  console.log(`主进程 ${process.pid},启动 ${cpuCount} 个工作进程`);
  for (let i = 0; i < cpuCount; i++) cluster.fork();

  cluster.on('exit', (worker, code) => {
    console.warn(`工作进程 ${worker.process.pid} 退出(code=${code}),重启中…`);
    cluster.fork();
  });
} else {
  const server = http.createServer((req, res) => {
    // 模拟 CPU 密集计算
    pbkdf2Sync('password', 'salt', 100_000, 64, 'sha512');
    res.writeHead(200, {'content-type':'text/plain; charset=utf-8'});
    res.end(`Handled by worker ${process.pid}\n`);
  });

  server.listen(3000, () => {
    console.log(`工作进程 ${process.pid} 监听 3000`);
  });
}

运行:node server-cluster.js,多次请求 http://localhost:3000 会看到不同 PID 响应。

注意:线上多实例一般前置反向代理/负载均衡(Nginx、Envoy、K8s Service),或由 cluster 在一个端口聚合。

WebSocket/粘性会话需要"sticky session"(同一客户端落到同一 worker)。可以用四层负载均衡的源地址哈希,或在应用层自己做粘滞(如按 req.socket.remoteAddress 做分发)。

优雅重启(零停机思路)

思路:主进程收到重载信号 → 先 fork 出新 worker 并等待其 listening → 再 disconnect 旧 worker,等请求跑完退出。

实例

// 摘要示例(在 cluster.isPrimary 分支里)
process.on('SIGUSR2', async () => {
  console.log('收到 SIGUSR2,开始优雅重启');
  const workers = Object.values(cluster.workers ?? {});
  // 1) 启新
  const fresh = cluster.fork();
  await new Promise(r => fresh.once('listening', r));
  // 2) 逐个下线旧的
  for (const w of workers) {
    w?.disconnect();
    // 超时还没退就强杀
    setTimeout(() => w?.process.kill('SIGKILL'), 5000);
  }
});

Windows 没有 SIGUSR2,可用 HTTP / RPC 管理接口触发重载。

自建「进程池」:控制并发、复用子进程

当你有大量独立 CPU 任务(如批量压缩、加密、爬虫解析),频繁 fork/spawn 成本高;这时需要 Process Pool 复用若干子进程,像"线程池"一样限流。

文件:pool/worker.js

实例

// pool/worker.js
process.on('message', async (msg) => {
  if (msg.type === 'task') {
    const { id, payload } = msg;
    // 模拟重任务(斐波那契)
    const fib = (n) => (n <= 1 ? n : fib(n-1) + fib(n-2));
    const result = fib(payload.n);
    process.send({ type: 'done', id, result });
  }
});

文件:pool/index.js

实例

// pool/index.js
import { fork } from 'node:child_process';
import os from 'node:os';

export class ProcessPool {
  constructor({ file, size = Math.max(1, os.cpus().length - 1) } = {}) {
    this.file = file;
    this.size = size;
    this.idle = [];
    this.busy = new Map();  // worker -> taskId
    this.queue = [];
    for (let i = 0; i < size; i++) this._spawn();
  }

  _spawn() {
    const w = fork(this.file);
    w.on('message', (m) => {
      if (m?.type === 'done') {
        const cb = this.callbacks.get(m.id);
        if (cb) cb.resolve(m.result);
        this.callbacks.delete(m.id);
        this._markIdle(w);
        this._drain();
      }
    });
    w.on('exit', () => {
      // 自动补齐池子
      this.busy.delete(w);
      const idx = this.idle.indexOf(w);
      if (idx >= 0) this.idle.splice(idx, 1);
      this._spawn();
    });
    if (!this.callbacks) this.callbacks = new Map();
    this.idle.push(w);
  }

  _markIdle(w) {
    this.busy.delete(w);
    if (!this.idle.includes(w)) this.idle.push(w);
  }

  _acquire() {
    return this.idle.length ? this.idle.shift() : null;
  }

  _drain() {
    while (this.queue.length && this.idle.length) {
      const { id, payload, resolve, reject } = this.queue.shift();
      const w = this._acquire();
      this.busy.set(w, id);
      this.callbacks.set(id, { resolve, reject });
      w.send({ type: 'task', id, payload });
    }
  }

  runTask(payload) {
    const id = Math.random().toString(36).slice(2);
    return new Promise((resolve, reject) => {
      this.queue.push({ id, payload, resolve, reject });
      this._drain();
    });
  }

  close() {
    for (const w of this.idle) w.kill();
    for (const w of this.busy.keys()) w.kill();
  }
}

文件:pool/demo.js

实例

// pool/demo.js
import { ProcessPool } from './index.js';

const pool = new ProcessPool({ file: './pool/worker.js', size: 4 });

const tasks = Array.from({ length: 10 }, (_, i) => pool.runTask({ n: 35 + (i % 3) }));
const t0 = Date.now();
const results = await Promise.all(tasks);
console.log('结果:', results);
console.log('耗时(ms):', Date.now() - t0);

await new Promise(r => setTimeout(r, 100)); // 等待消息刷完
pool.close();

说明:

  • 把"任务"封装成消息,进程池负责分配与复用 worker。
  • 真实业务可把 解析/转码/压缩 封装到 worker.js 里。
  • 注意 任务超时重试幂等背压(队列长度上限)。

实战小例:HTTP 接口把重任务扔进进程池

实例

// app.js
import http from 'node:http';
import { ProcessPool } from './pool/index.js';

const pool = new ProcessPool({ file: './pool/worker.js', size: 4 });
const server = http.createServer(async (req, res) => {
  if (req.url?.startsWith('/fib?')) {
    const url = new URL(req.url, 'http://localhost');
    const n = Number(url.searchParams.get('n') || 35);
    try {
      const result = await pool.runTask({ n });
      res.writeHead(200, {'content-type': 'application/json'});
      res.end(JSON.stringify({ pid: process.pid, n, result }));
    } catch (e) {
      res.writeHead(500); res.end('error');
    }
  } else {
    res.writeHead(404).end('Not Found');
  }
});

server.listen(3000, () => console.log('http://localhost:3000'));

访问:/fib?n=38 等,主进程不被阻塞,任务在子进程里算。