深入Node源码分析 exec、execFile、spawn、fork 子进程的区别
本文最后更新于:2022年4月22日 上午
深入 Node 源码分析 exec、execFile、spawn、fork 子进程的区别
exec 源码调用栈流程图
首先奉上几张源码调用栈流程图
完整源码调用栈图
图片放大后看的更清晰
异步回调流程
当所有的同步代码运行完成时候,会开始执行异步回调
异步回调的流程如下


认识 child_process
node 内置了一个 child_process 模块,通过该模块可以衍生出子进程,让子进程来帮助我们做一些其他的操作,有关 child_process 的更多信息,可以查看 这里
本篇主要分析以下几个 child_process 中的方法
exec
execFile
spawn
fork
准备
通过以下简单示例,来深入 Node 源码,分析整个运行机制
const cp = require('child_process');
cp.exec('ls', (err, stdout, stderr) => {
  console.log('callback start ------------');
  console.log(err, stdout, stderr);
  console.log('callback end ------------');
})exec 源码
首先进入到 exec 源码中,可以看到 exec 的源码内容很少,它主要会做以下两件事
标准化参数
调用
execFile
也就是说,其实 exec 其实是对 execFile 进行了封装
function exec(command, options, callback) {
  const opts = normalizeExecArgs(command, options, callback);
  return module.exports.execFile(opts.file,
                                 opts.options,
                                 opts.callback);
}normalizeExecArgs (对 exec 进行参数标准化)
normalizeExecArgs 会做以下几件事
对可选参数
options做兼容,如果第二个参数传的是function,则两个参数交换位置如果没有手动指定
shell则会赋值为true,后面运行的时候,会根据平台默认选择,Unix 上是'/bin/sh',Windows 上是process.env.ComSpec返回一个处理后的对象
function normalizeExecArgs(command, options, callback) {
  // 如果第二个参数是函数,则第二个参数和第三个参数交换位置,兼容参数
  if (typeof options === 'function') {
    callback = options;
    options = undefined;
  }
  // 做一个浅拷贝,这样我们就不会破坏用户的选项对象。
  // Make a shallow copy so we don't clobber the user's options object.
  options = { ...options };
  // shell 如果没传,默认会是true,如果传了则是用户传的值
  options.shell = typeof options.shell === 'string' ? options.shell : true;
  // 将 command设置为 赋值为 file,为调用 execFile做准备
  return {
    file: command,
    options: options,
    callback: callback
  };
}execFile (源码)
exec 对参数进行简单处理后就交给了 execFile
简单看下一 execFile ,可以看到 execFile 其实只接收了第一个参数,其它参数是通过 arguments 来获取的, execFile 主要做了以下几件事
对参数进行校验
生成
options调用
spawn注册
close和error事件
所以,可以看到 execFile 底层是调用了 spawn ,然后注册了 close 和 error 回调,看到这里的时候,如果我们了解过 spawn 的话,会知道spawn 是没有回调的,这就意味着,execFeile 其实是对 sapwn 进行了一次封装。
// 只接收了一个参数,剩下的参数通过 arguments 来获取
function execFile(file /* , args, options, callback */) {
  let args = [];
  let callback;
  let options;
  // Parse the optional positional parameters.
  let pos = 1;
  // 判断是否传入了命令参数
  if (pos < arguments.length && ArrayIsArray(arguments[pos])) {
    args = arguments[pos++];
  } else if (pos < arguments.length && arguments[pos] == null) {
    pos++;
  }
  // 判断是否传入了 options
  if (pos < arguments.length && typeof arguments[pos] === 'object') {
    options = arguments[pos++];
  } else if (pos < arguments.length && arguments[pos] == null) {
    pos++;
  }
  // 判断是否传入回调
  if (pos < arguments.length && typeof arguments[pos] === 'function') {
    callback = arguments[pos++];
  }
  // 如果参数传错,则抛错
  if (!callback && pos < arguments.length && arguments[pos] != null) {
    throw new ERR_INVALID_ARG_VALUE('args', arguments[pos]);
  }
  // 生成 options,默认 会覆盖 shell
  options = {
    encoding: 'utf8',
    timeout: 0,
    maxBuffer: MAX_BUFFER,
    killSignal: 'SIGTERM',
    cwd: null,
    env: null,
    shell: false,
    ...options
  };
  // 一系列验证,是否 数字
  // Validate the timeout, if present.
  validateTimeout(options.timeout);
  validateMaxBuffer(options.maxBuffer);
  validateAbortSignal(options.signal, 'options.signal');
  // 操作系统相关
  options.killSignal = sanitizeKillSignal(options.killSignal);
  // 开始执行 spawn
  const child = spawn(file, args, {
    cwd: options.cwd,
    env: options.env,
    gid: options.gid,
    uid: options.uid,
    shell: options.shell,
    windowsHide: !!options.windowsHide,
    windowsVerbatimArguments: !!options.windowsVerbatimArguments
  });
  let encoding;  // 字符编码 默认 utf-8
  // 用来记录所有的输出和错误流,最后一次性输出
  const _stdout = [];
  const _stderr = [];
  if (options.encoding !== 'buffer' && Buffer.isEncoding(options.encoding)) {
    encoding = options.encoding;
  } else {
    encoding = null;
  }
  // 记录输出流和错误流的长度,用来判断是否超过maxBuffer
  let stdoutLen = 0;
  let stderrLen = 0;
  let killed = false;
  let exited = false;
  let timeoutId;
  let ex = null;
  let cmd = file;
  // 退出事件回调
  function exithandler(code, signal) {
    ...
  }
  // 错误事件回调
  function errorhandler(e) {
    ...
  }
  function kill() {
    ...
  }
  function abortHandler() {
    if (!ex)
      ex = new AbortError();
    process.nextTick(() => kill());
  }
  // 校验是否有 timeout
  if (options.timeout > 0) {
    ...
  }
  if (options.signal) {
    ...
  }
  // 是否有输出流
  if (child.stdout) {
    ...
  }
  // 对错误流的处理同上
  if (child.stderr) {
    ...
  }
  // 另外添加了 close 和 error 的事件监听
  child.addListener('close', exithandler);
  child.addListener('error', errorhandler);
  return child;
}spawn 源码
spawn 是没有回调的,它返回的是流,那这个流是怎么来的呢,来看 spawn 的源码,spawn 的源码其实很少
创建子进程且注册
exit回调normalizeSpawnArguments()对参数进行了,且对不同操作系统进行了命令兼容调用
child.spawn(options),内部执行了子进程
function spawn(file, args, options) {
  // 通过 new ChldProcess 内部创建子进程,
  const child = new ChildProcess();
  // 对参数进行校验
  options = normalizeSpawnArguments(file, args, options);
  // 将参数传给子进程
  child.spawn(options);
  // 返回流对象
  return child;
}ChildProcess (创建子进程)
在 ChildProcess 中创建了子进程且注册了关闭的回调
这里指的注意的是,关闭回调中会做哪些事
关闭输出流
关闭进程
判断是否发生错误
    1. 发生错误则发出 error 事件(this.emit('error', err))
    2. 没有发生错误则发出 exit 事件(this.emit('exit'))
- 调用 
maybeClose(),子进程的每一个流(子进程会创建3 个流,在下面会讲到) 在关闭时都会调用maybeClose(), 当调用过3 次后,会发出close事件,表示子进程关闭 
...省略
// 创建子进程
this._handle = new Process();
this._handle[owner_symbol] = this;
// 注册进程的关闭回调
this._handle.onexit = (exitCode, signalCode) => {
  if (signalCode) {
    this.signalCode = signalCode;
  } else {
    this.exitCode = exitCode;
  }
  if (this.stdin) {
    // 关闭输入流
    this.stdin.destroy();
  }
  // 执行进程的 close
  this._handle.close();
  this._handle = null;  //将对象置空,以更好的进行垃圾回收
  // 校验是否发生错误
  if (exitCode < 0) {
    const syscall = this.spawnfile ? 'spawn ' + this.spawnfile : 'spawn';
    const err = errnoException(exitCode, syscall);
    if (this.spawnfile)
      err.path = this.spawnfile;
    err.spawnargs = this.spawnargs.slice(1);
    this.emit('error', err);
  } else {
    // 发出进程的 exit 事件
    this.emit('exit', this.exitCode, this.signalCode);
  }
  // 在下一个事件循环将还在缓存区的内容输出掉
  process.nextTick(flushStdio, this);
  // 调用 maybeClose
  maybeClose(this);
};normalizeSpawnArguments
这里我们看 normalizeSpawnArguments 在处理系统兼容的部分源码
判断操作系统,这里也对应了在 Node 文档中所说
在 Unix 上使用
'/bin/sh',在 Windows 上使用process.env.ComSpec
if (options.shell) {
  // 将命令 和 命令参数(没传就是一个空数组)通过空串进行拼接
  const command = [file].concat(args).join(' ');
  // 判断操作系统
  if (process.platform === 'win32') {
    if (typeof options.shell === 'string')
      file = options.shell;
    else
      file = process.env.comspec || 'cmd.exe';   // 获取到用户 cmd 的位置 'C:\\Windows\\system32\\cmd.exe'
    // '/d /s /c' 仅用于 cmd.exe。
    if (/^(?:.*\\)?cmd(?:\.exe)?$/i.test(file)) {
      args = ['/d', '/s', '/c', `"${command}"`];
      windowsVerbatimArguments = true;
    } else {
      args = ['-c', command];
    }
  } else {
    if (typeof options.shell === 'string')
      file = options.shell;
    else if (process.platform === 'android')    // 安卓
      file = '/system/bin/sh';
    else
      file = '/bin/sh';                         // unix
    args = ['-c', command];
  }
}
child.spawn(options) (执行子进程)
child.spawn(options) 实际上调用的是 ChildProcess 原型上的 spawn方法(ChildProcess.prototype.spawn)
主要会做以下几件事
拿到
stdio,如果不传默认会取到'pipe'调用
getValidStdio创建管道,拿到创建一个管道数组[输入流,输出流,错误流]拿到
command,拿到args调用
this._handle.spawn(options)执行子进程遍历
stdio管道数组,为 可读流 和 错误流 注册close回调如果
ipc不为空,则调用setupChannel()创建ipc通道,让两个进程进行通信,(fork)会需要
这里指的注意的是 this._handle.spawn(options) 内部是通过调用 C++ 的代码来执行进程的,如果需要了解的话,需要查看到对应的 C++ 文件中
ChildProcess.prototype.spawn = function(options) {
  let i = 0;
  if (options === null || typeof options !== 'object') {
    throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);
  }
  // stdio 默认为 pipe
  let stdio = options.stdio || 'pipe';
  // 创建管道
  stdio = getValidStdio(stdio, false);
  const ipc = stdio.ipc;
  const ipcFd = stdio.ipcFd;
  stdio = options.stdio = stdio.stdio;
  ...
  this.spawnfile = options.file;      // 拿到命令执行器 'C:\\Windows\\system32\\cmd.exe'
  if (ArrayIsArray(options.args))
    this.spawnargs = options.args;    // 拿到命令 [ "C:\\Windows\\system32\\cmd.exe","/d", "/s", "/c","\"ls\"",]
  else if (options.args === undefined)
    this.spawnargs = [];
  else
    throw new ERR_INVALID_ARG_TYPE('options.args', 'Array', options.args);
  // 开始创建子进程 -> C++; 会返回一个错误码
  const err = this._handle.spawn(options);
  ...
  // 取到 pid
  this.pid = this._handle.pid;
  // 遍历 stdio 数组,创建 socket, 给输出流和错误流添加 close 监听
  for (i = 0; i < stdio.length; i++) {
    const stream = stdio[i];
    if (stream.type === 'ignore') continue;
    if (stream.ipc) {
      this._closesNeeded++;
      continue;
    }
    if (stream.type === 'wrap') {
      ...
    }
    if (stream.handle) {
      // 创建 cocket
      // 内部监听了
      // i === 0 表示这是一个可写流
      stream.socket = createSocket(this.pid !== 0 ?
        stream.handle : null, i > 0);
      // 并且给 输出流和错误流添加 close 监听
      // 并且调用 maybeClose
      if (i > 0 && this.pid !== 0) {
        this._closesNeeded++;
        stream.socket.on('close', () => {
          maybeClose(this);
        });
      }
    }
  }
  // 拿到三个流 输入流,输出流,错误流
  this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ?
    stdio[0].socket : null;
  this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ?
    stdio[1].socket : null;
  this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ?
    stdio[2].socket : null;
  this.stdio = [];
  // 将三个流都存储到 this.stdio 数组中
  for (i = 0; i < stdio.length; i++)
    this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket);
  // 添加 .send() 方法并开始监听 IPC 数据 使用 fork 的时候会用到
  if (ipc !== undefined) setupChannel(this, ipc, serialization);
  return err;
};getValidStdio (给流创建管道)
这一步主要会做以下几件事
将
stdio转换为数组'pipe'→['pipe', 'pipe', 'pipe']([输入流,输出流,错误流])遍历
stdio,给每个流都创建一个管道
function getValidStdio(stdio, sync) {
  let ipc;
  let ipcFd;
  if (typeof stdio === 'string') {
    // 将其转换为数组,如 ['pipe', 'pipe', 'pipe']
    stdio = stdioStringToArray(stdio);
  } else if (!ArrayIsArray(stdio)) {
    throw new ERR_INVALID_OPT_VALUE('stdio', stdio);
  }
  while (stdio.length < 3) stdio.push(undefined);
  // 对 stdio 进行遍历,将stdio转换为c++可读的形式(例如PipeWraps或fds)
  stdio = stdio.reduce((acc, stdio, i) => {
    function cleanup() {
      for (let i = 0; i < acc.length; i++) {
        if ((acc[i].type === 'pipe' || acc[i].type === 'ipc') && acc[i].handle)
          acc[i].handle.close();
      }
    }
    // Defaults
    if (stdio == null) {
      stdio = i < 3 ? 'pipe' : 'ignore';
    }
    // 会根据不同的 stdio 类型来建立管道
    if (stdio === 'ignore') {         // ignore 不会有输出流,一般用于静默执行
      acc.push({ type: 'ignore' });
    } else if (stdio === 'pipe' || (typeof stdio === 'number' && stdio < 0)) {
      // 每一个流对象,输入流可读,输出流和错误流可写
      const a = {
        type: 'pipe',
        readable: i === 0,
        writable: i !== 0
      };
      // 给每一个流对象建立管道
      if (!sync)
        a.handle = new Pipe(PipeConstants.SOCKET);
      acc.push(a);
    } else if (stdio === 'ipc') {
      if (sync || ipc !== undefined) {
        // Cleanup previously created pipes
        cleanup();
        if (!sync)
          throw new ERR_IPC_ONE_PIPE();
        else
          throw new ERR_IPC_SYNC_FORK();
      }
      ipc = new Pipe(PipeConstants.IPC);
      ipcFd = i;
      acc.push({
        type: 'pipe',
        handle: ipc,
        ipc: true
      });
    } else if (stdio === 'inherit') {
      acc.push({
        type: 'inherit',
        fd: i
      });
    } else if (typeof stdio === 'number' || typeof stdio.fd === 'number') {
      acc.push({
        type: 'fd',
        fd: typeof stdio === 'number' ? stdio : stdio.fd
      });
    } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) ||
               getHandleWrapType(stdio._handle)) {
      const handle = getHandleWrapType(stdio) ?
        stdio :
        getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle;
      acc.push({
        type: 'wrap',
        wrapType: getHandleWrapType(handle),
        handle: handle,
        _stdio: stdio
      });
    } else if (isArrayBufferView(stdio) || typeof stdio === 'string') {
      if (!sync) {
        cleanup();
        throw new ERR_INVALID_SYNC_FORK_INPUT(inspect(stdio));
      }
    } else {
      // Cleanup
      cleanup();
      throw new ERR_INVALID_OPT_VALUE('stdio', stdio);
    }
    return acc;
  }, []);
  // stdio 变为了三个socket 对象的数组
  // ipc 和 ipcFd 为 fork 需要用到
  return { stdio, ipc, ipcFd };
}createSocket
当创建完 3个管道后会走到 createSocket
stream.socket = createSocket(this.pid !== 0 ? stream.handle : null, i > 0);进入到 createSocket 源码
可以看到,createSocket 的源码非常的简单,只是调用了 net.Socket()
function createSocket(pipe, readable) {
  return net.Socket({ handle: pipe, readable, writable: !readable });
}net.Socket (创建 socket)
net 是 Node 内置的一个异步网络模块,而 socket 是该模块中的一个方法
function Socket(options) {
  if (!(this instanceof Socket)) return new Socket(options);
  ...省略
  if (options.handle) {
    this._handle = options.handle; // private
    this[async_id_symbol] = getNewAsyncId(this._handle);
  } else {
    ...
  }
  // 监听 socket的关闭  onReadableStreamEnd 发出
  this.on('end', onReadableStreamEnd);
  // 监听 socket 的输出
  initSocketHandle(this);
  this._pendingData = null;
  this._pendingEncoding = '';
  ...省略
}onReadableStreamEnd
通过 this.destroy() 发出 结束事件
function onReadableStreamEnd() {
  if (!this.allowHalfOpen) {
    this.write = writeAfterFIN;
    if (this.writable)
      this.end();
    else if (!this.writableLength)
      this.destroy();
  } else if (!this.destroyed && !this.writable && !this.writableLength)
    this.destroy();
}initSocketHandle
self._handle.onread = onStreamRead 为写入时会触发的事件
function initSocketHandle(self) {
  self._undestroy();
  self._sockname = null;
  if (self._handle) {
    self._handle[owner_symbol] = self;
    self._handle.onread = onStreamRead;
    self[async_id_symbol] = getNewAsyncId(self._handle);
    ...
  }
}在这个事件中,如果监听到输出流会发出 'data' 事件,这也是为什么我们监听输出流和错误流的时候要通过 on('data') 来监听了,这里具体的源码流程可以看下图

注册 流的 close 回调
当 socket 创建完成后,会给 输出流和错误流添加 close 回调,回调中会调用 maybeClose()
stream.socket = createSocket(this.pid !== 0 ? stream.handle : null, i > 0);
if (i > 0 && this.pid !== 0) {
  this._closesNeeded++;
  stream.socket.on('close', () => {
    maybeClose(this);
  });
}同步代码准备出栈
这些 close 的回调都注册完毕之后,同步的流程基本上就已经完了
从 child.spawn 开始出栈
this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ?
  stdio[0].socket : null;
this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ?
  stdio[1].socket : null;
this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ?
  stdio[2].socket : null;
this.stdio = [];
for (i = 0; i < stdio.length; i++)
  this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket);
// Add .send() method and start listening for IPC data
if (ipc !== undefined) setupChannel(this, ipc, serialization);
return err;然后到 spawn
function spawn(file, args, options) {
  const child = new ChildProcess();
  options = normalizeSpawnArguments(file, args, options);
  child.spawn(options);
  return child;
}execFile 注册回调
接着到 execFile,在 execFile 调用完 spawn 后,主要做了四件事情
注册输出流的
data事件注册错误流的
data事件注册子进程的
close事件注册子进程的
error事件
on(‘data’) 回调
const child = spawn(file, args, {
  cwd: options.cwd,
  env: options.env,
  gid: options.gid,
  uid: options.uid,
  shell: options.shell,
  windowsHide: !!options.windowsHide,
  windowsVerbatimArguments: !!options.windowsVerbatimArguments
});
... 省略
// 注册输出流的 data 事件
if (child.stdout) {
  if (encoding)
    child.stdout.setEncoding(encoding); // 设置 encoding,默认为 utf-8
  // 添加 data 监听
  child.stdout.on('data', function onChildStdout(chunk) {
    const encoding = child.stdout.readableEncoding;
    const length = encoding ?
      Buffer.byteLength(chunk, encoding) :
      chunk.length;
    stdoutLen += length;
    // 校验是否超过 maxBuffer
    if (stdoutLen > options.maxBuffer) {
      const truncatedLen = options.maxBuffer - (stdoutLen - length);
      _stdout.push(chunk.slice(0, truncatedLen));   // 每次监听到输出都累加
      ex = new ERR_CHILD_PROCESS_STDIO_MAXBUFFER('stdout');
      kill();   //如果超出则直接 杀死这个进程
    } else {
      // 存入 _stdout 数组中
      _stdout.push(chunk);
    }
  });
}
// 注册错误流的 data 事件
if (child.stderr) {
  if (encoding)
    child.stderr.setEncoding(encoding);
  child.stderr.on('data', function onChildStderr(chunk) {
    const encoding = child.stderr.readableEncoding;
    const length = encoding ?
      Buffer.byteLength(chunk, encoding) :
      chunk.length;
    stderrLen += length;
    if (stderrLen > options.maxBuffer) {
      const truncatedLen = options.maxBuffer - (stderrLen - length);
      _stderr.push(chunk.slice(0, truncatedLen));
      ex = new ERR_CHILD_PROCESS_STDIO_MAXBUFFER('stderr');
      kill();
    } else {
      _stderr.push(chunk);
    }
  });
}
child.addListener('close', exithandler);
child.addListener('error', errorhandler);
return child;这里我们需要关注的是 stdout 和stderr 的on('data') 事件处理是类似的
这里面每次接收到流的输出时会先判断加上这次的输出是否有超过 maxBuffer(默认值 1024 * 1024),如果超过则会 kill 掉子进程
另外一个是 _stdout.push(chunk),这个_stdout是之前定义的一个数组,每次监听到 输出流的信息,他都会push进去,最后一次性进行返回,这也是为什么说 exec和execFile 不太适合做一些耗时较长的任务,因为在任务没有结束前,它不会有任何输出
close 回调
这里还需要注意 close 回调,因为这里才是真正执行我们 callback 的地方
主要做以下几件事
判断是否有传
callback取到所有输出流
取到所有错误流
判断是否有错误
    1. 无:调用 callback(null, stdout, stderr), err 为 null
    2. 有:调用 callback(ex, stdout, stderr), err 为 cmd + 错误流
function exithandler(code, signal) {
   if (exited) return;
   exited = true;
   if (timeoutId) {
     clearTimeout(timeoutId);
     timeoutId = null;
   }
   if (!callback) return; //如果没有callback则 return
   // merge chunks
   let stdout;
   let stderr;
   if (encoding ||
     (
       child.stdout &&
       child.stdout.readableEncoding
     )) {
     stdout = _stdout.join('');    // 取到所有输出流
   } else {
     stdout = Buffer.concat(_stdout);
   }
   if (encoding ||
     (
       child.stderr &&
       child.stderr.readableEncoding
     )) {
     stderr = _stderr.join('');    // 取到所有错误流
   } else {
     stderr = Buffer.concat(_stderr);
   }
   if (!ex && code === 0 && signal === null) {
     // 调用 callback,这个callback 就是我们在外面调用 exec 和 execFile 写的 callback
     callback(null, stdout, stderr);
     return;
   }
   if (args.length !== 0)
     cmd += ` ${args.join(' ')}`;
   if (!ex) {
     // eslint-disable-next-line no-restricted-syntax
     ex = new Error('Command failed: ' + cmd + '\n' + stderr);
     ex.killed = child.killed || killed;
     ex.code = code < 0 ? getSystemErrorName(code) : code;
     ex.signal = signal;
   }
   ex.cmd = cmd;
   callback(ex, stdout, stderr);
 }本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议,转载请注明出处。
