Node学习5-进程机制

服务端模型的变迁

Web服务器的架构已经历了几次变迁。服务器处理客户端请求的并发量,就是每个里程碑的见证。

石器时代:同步

最早的服务器,其执行模型是同步的,它的服务模式是一次只为一次请求服务,所有的请求都得按次序等待服务。这意味着除了当前的请求被处理外,其余请求都处于等待被处理的状态。其处理性能相当低下,假设每次响应服务耗时为N秒,则这类服务的QPS为1/N。

这种架构如今已基本被淘汰,只在一些无并发需要的应用中存在。

青铜时代:复制进程

为了解决同步架构的并发问题,衍生出了复制多个进程以提高并发量的模式。每个进程为一个连接服务。但这种模式的消耗非常高,因为新建进程的代价是很高的,包括内存上下文的分配。因为在复制进程的同时也会复制进程内部的状态,也就是说同样的状态会在内存中存储多份,造成来个浪费。

假设通过进程复制和预复制的方式搭建的服务器有资源的限制,且进程的上限为M,那么这一类服务的QPS为M/N。

白银时代:多线程

为了解决创建进程代价过大的问题,多线程的概念被引入,线程相当于一个简化版的进程,其没有自己的系统资源,所以在创建,废除时,其代价都比进程小的多。但是值得注意的是,不同进程的线程切换会造成上下文的切换,当过多的不同进程的线程进行切换时,消耗仍然很大。虽然其有缺点,但线程仍然是当代操作系统的重要组成部分。

如果忽略多线程上下文切换的开销,假设线程所占用的资源为进程的1/L,受资源上限的影响,它的QPS则为N*L/N。

黄金时代:事件驱动

多线程的服务模型服役了很长一段时间,Apache就是采用多线程/多进程模式来实现的,当并发量增长到上万时,内存耗用的问题就会暴露出来,即为著名的C10k问题。

为了解决高并发的问题,基于事件驱动的服务模型出现了,像Node和Nginx均是基于事件驱动的方式实现的:即一个线程为所有请求服务,请求到来时触发事件,这与每个请求由一个线程服务的模式完全不同。采用单线程避免了不必要的内存开销和上下文切换问题。

但是这种基于单线程的模式虽然解决了上面的问题,但是其无法高效的利用多核CPU的问题,则为这个模式的问题之所在。所以当我们解决掉这个问题时,那么性能的提升是相当可观的。

多进程架构

面对单线程对多核利用不足的问题,前人的经验是启动多进程即可。理想状态下每个进程的各自利用一个CPU,以此实现多核CPU的利用。基于Node提供的child_process模块可以实现多进程的调用。我们创建一个简单的服务端代码:

1
2
3
4
5
6
7
8
9
let http = require('http')

const port = 5000 + Math.round(1 + Math.random() * 1000)
http.createServer(function(req, res){
res.writeHead(200, {'Content-type': 'text/plain'})
res.end("Hello World\n")
}).listen(port, '127.0.0.1')

console.log('server on 127.0.0.1:'+port)

通过node work.js启动它,它会监听听5000到6000之间的一个随机端口。将下面的代码保存为master.js,并通过node master.js启动它:

1
2
3
4
5
6
let fork = require('child_process').fork
let cpus = require('os').cpus()

for(let i = 0; i < cpus.length; i++){
fork('./work.js')
}

这段代码会根据当前机器上的CPU的数量复制出当前Node进程数。

会得到如下的结果:

1
2
3
4
server on 127.0.0.1:5123
server on 127.0.0.1:5481
server on 127.0.0.1:5065
server on 127.0.0.1:5516

如下图即为典型的Master-Work模式,又称为主从模式,其广泛用于并行处理业务的模式,具备较好的可伸缩性和稳定性。图中的进程分为两类:主进程和工作进程。主进程不负责具体的业务处理,而是负责调度或管理工作进程,它趋向于稳定。工作进程负责具体的业务处理。

注意通过fork()复制的进程都是一个独立的进程,每个进程都是一个独立而全新的V8实例。它需要至少30毫秒的启动时间和至少10MB的内存。

child_process模块

创建子进程

异步创建子进程

child_process.exec(command[, options][, callback])

该方法可以直接异步执行命令(不是JavaScript代码,而是shell命令),options参数可以控制各种条件,比如工作目录,编码等;第三个参数用于设置失败的回调。

该命令衍生shell,然后在shell中执行command,并缓冲任何产生的输出。传给command字符串会被shell直接处理,特殊字符串(因shell而异)需要被相应地处理:

1
2
3
4
5
exec('"/目录/空 格/文件.sh" 参数1 参数2');
// 使用双引号,使路径中的空格不会被解释为多个参数的分隔符。

exec('echo "\\$HOME 变量为 $HOME"');
// $HOME 变量在第一个实例中会被转义,但是第二个则不会。

所以对于用户的输入,一定要经过无害化处理才能使用这个函数,因为其可以触发任何命令,很危险!

示例
1
2
3
4
5
6
7
8
9
10
11
12
const process = require('child_process')

let cmd = 'console.log(123456)'

process.exec('echo 132',{encoding: 'GBK'}, (error, stdout, stderr) => {
if (error) {
console.error(`执行的错误: ${error}`);
return;
}
console.log(`stdout: ${stdout}`);
console.error(`stderr: ${stderr}`);
})

值得注意的是:标准输出是在回调的stdout参数中,而不会直接打印在当前控制台中。

源码

我们简单看一下其源码:

1
2
3
4
5
6
function exec(command, options, callback) {
const opts = normalizeExecArgs(command, options, callback);
return module.exports.execFile(opts.file,
opts.options,
opts.callback);
}

可以看到其首先是调用了normalizeExecArgs()这个函数,我们再看一下内部:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
function normalizeExecArgs(command, options, callback) {
if (typeof options === 'function') {
callback = options;
options = undefined;
}

// Make a shallow copy so we don't clobber the user's options object.
options = { ...options };
options.shell = typeof options.shell === 'string' ? options.shell : true;

return {
file: command,
options: options,
callback: callback
};
}

内部首先是做了一个判断,即忽略options功能的实现。

然后是对options做了浅拷贝防止修改用户的options对象。因为后面需要对options.shell进行处理,即是否使用默认的shell.exe。详见下面:

shell 的要求

Shell 需要能理解 -c 开关。 如果 shell 是 'cmd.exe',则它需要能理解 /d /s /c 开关,且命令行解析需要能兼容。

默认的 Windows shell

尽管微软指定在根环境中 %COMSPEC% 必须包含 'cmd.exe' 的路径,但子进程并不总是遵循相同的要求。 因此,在可以衍生 shell 的 child_process 函数中,如果 process.env.ComSpec 不可以,则使用 'cmd.exe' 作为后备。

然后直接返回了一个对象,并且将command重命名为file(这里我没有看到哪里将command转换为file,但是exeFile确实只接受File,疑惑)

跳出这个函数,看exe()函数,可以发现其仍然是调用了exeFile()函数,我们下面再来看这个函数的源码。

child_process.execFile(file[, args][, options][, callback])

该函数与child_process.exe()相似,都是执行命令。但仍然有些许不同。

  • 该方法第一个参数接受一个命令文件(名字或路径),而不是命令的字符串。
  • child_process.execFile() 函数类似于 child_process.exec(),但默认情况下不会衍生 shell。 指定的可执行文件 file 会被直接衍生作为新的进程,使其比 child_process.exec() 稍微更高效。
  • 由于没有衍生 shell,因此不支持 I/O 重定向和文件通配等行为。
源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
function execFile(file /* , args, options, callback */) {
let args = [];
let callback;
let options;

// Parse the optional positional parameters.解析参数
let pos = 1;
if (pos < arguments.length && ArrayIsArray(arguments[pos])) {
args = arguments[pos++];
} else if (pos < arguments.length && arguments[pos] == null) {
pos++;
}

if (pos < arguments.length && typeof arguments[pos] === 'object') {
options = arguments[pos++];
} else if (pos < arguments.length && arguments[pos] == null) {
pos++;
}

if (pos < arguments.length && typeof arguments[pos] === 'function') {
callback = arguments[pos++];
}

if (!callback && pos < arguments.length && arguments[pos] != null) {
throw new ERR_INVALID_ARG_VALUE('args', arguments[pos]);
}

options = {
encoding: 'utf8',
timeout: 0,
maxBuffer: MAX_BUFFER,
killSignal: 'SIGTERM',
cwd: null,
env: null,
shell: false,
...options
};

// Validate the timeout, if present.验证参数
validateTimeout(options.timeout);

// Validate maxBuffer, if present.
validateMaxBuffer(options.maxBuffer);

options.killSignal = sanitizeKillSignal(options.killSignal);

//关键-产生子进程
const child = spawn(file, args, {
cwd: options.cwd,
env: options.env,
gid: options.gid,
shell: options.shell,
signal: options.signal,
uid: options.uid,
windowsHide: !!options.windowsHide,
windowsVerbatimArguments: !!options.windowsVerbatimArguments
});

let encoding;
const _stdout = [];
const _stderr = [];
if (options.encoding !== 'buffer' && Buffer.isEncoding(options.encoding)) {
encoding = options.encoding;
} else {
encoding = null;
}
let stdoutLen = 0;
let stderrLen = 0;
let killed = false;
let exited = false;
let timeoutId;

let ex = null;

let cmd = file;

function exithandler(code, signal) {
if (exited) return;
exited = true;

if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}

if (!callback) return;

// merge chunks
let stdout;
let stderr;
if (encoding ||
(
child.stdout &&
child.stdout.readableEncoding
)) {
stdout = ArrayPrototypeJoin(_stdout, '');
} else {
stdout = Buffer.concat(_stdout);
}
if (encoding ||
(
child.stderr &&
child.stderr.readableEncoding
)) {
stderr = ArrayPrototypeJoin(_stderr, '');
} else {
stderr = Buffer.concat(_stderr);
}

if (!ex && code === 0 && signal === null) {
callback(null, stdout, stderr);
return;
}

if (args.length !== 0)
cmd += ` ${ArrayPrototypeJoin(args, ' ')}`;

if (!ex) {
// eslint-disable-next-line no-restricted-syntax
ex = new Error('Command failed: ' + cmd + '\n' + stderr);
ex.killed = child.killed || killed;
ex.code = code < 0 ? getSystemErrorName(code) : code;
ex.signal = signal;
}

ex.cmd = cmd;
callback(ex, stdout, stderr);
}

function errorhandler(e) {
ex = e;

if (child.stdout)
child.stdout.destroy();

if (child.stderr)
child.stderr.destroy();

exithandler();
}

function kill() {
if (child.stdout)
child.stdout.destroy();

if (child.stderr)
child.stderr.destroy();

killed = true;
try {
child.kill(options.killSignal);
} catch (e) {
ex = e;
exithandler();
}
}

if (options.timeout > 0) {
timeoutId = setTimeout(function delayedKill() {
kill();
timeoutId = null;
}, options.timeout);
}

if (child.stdout) {
if (encoding)
child.stdout.setEncoding(encoding);

child.stdout.on('data', function onChildStdout(chunk) {
const encoding = child.stdout.readableEncoding;
const length = encoding ?
Buffer.byteLength(chunk, encoding) :
chunk.length;
const slice = encoding ? StringPrototypeSlice :
(buf, ...args) => buf.slice(...args);
stdoutLen += length;

if (stdoutLen > options.maxBuffer) {
const truncatedLen = options.maxBuffer - (stdoutLen - length);
ArrayPrototypePush(_stdout, slice(chunk, 0, truncatedLen));

ex = new ERR_CHILD_PROCESS_STDIO_MAXBUFFER('stdout');
kill();
} else {
ArrayPrototypePush(_stdout, chunk);
}
});
}

if (child.stderr) {
if (encoding)
child.stderr.setEncoding(encoding);

child.stderr.on('data', function onChildStderr(chunk) {
const encoding = child.stderr.readableEncoding;
const length = encoding ?
Buffer.byteLength(chunk, encoding) :
chunk.length;
stderrLen += length;

if (stderrLen > options.maxBuffer) {
const truncatedLen = options.maxBuffer - (stderrLen - length);
ArrayPrototypePush(_stderr,
chunk.slice(0, truncatedLen));

ex = new ERR_CHILD_PROCESS_STDIO_MAXBUFFER('stderr');
kill();
} else {
_stderr.push(chunk);
}
});
}

child.addListener('close', exithandler);
child.addListener('error', errorhandler);

return child;
}

这个函数相对还是比较长,有217行。但是结构相当清晰:

  • L 2-4:定义参数变量
  • L 7-26:解析参数(由于定义了可选参数,所以需要这样解析)
  • L 28-46:定义options并验证参数合法性
  • L 48-57(关键):调用spawn()函数产生子进程
  • Rest:剩下的代码都是为child添加closeerror事件和其准备工作
  • Last:返回child。

在 Windows 上衍生 .bat.cmd 文件

child_process.exec()child_process.execFile()之间区别的重要性可能因平台而异。 在 Unix 类型的操作系统(Unix、Linux、macOS)上,child_process.execFile()可以更高效,因为默认情况下不会衍生 shell。 但是在 Windows 上, .bat.cmd 文件在没有终端的情况下不能自行执行,因此无法使用 child_process.execFile()启动。 当在 Windows 上运行时,要调用 .bat.cmd 文件,可以使用设置了 shell 选项的 child_process.spawn()、或 child_process.exec()、或衍生 cmd.exe 并将 .bat.cmd 文件作为参数传入(也就是 shell 选项和 child_process.exec()所做的)。 在任何情况下,如果脚本的文件名包含空格,则需要加上引号。

child_process.spawn(command[, args][, options])

可以看到,最后创建进程的工作,都是由这个方法实现的,但是其与exec()有较大的不同,否则也不会设计两个API,一直存在这么久。

spawn()exec()的不同:
  • exec()command是包含参数的,而spawn命令的参数是作为函数的第二个参数传入的。
  • exec()是将处理函数作为回调传入内部,而spawn实现了emitter,可以直接在外部监听事件。还可以准确地监听close等事件,exec()则只能统一监听error事件。
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const { spawn } = require('child_process');
const ls = spawn('ls', ['-lh', '/usr']);

//需要显式的监听
ls.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
});

ls.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
});

ls.on('close', (code) => {
console.log(`子进程退出,退出码 ${code}`);
});
源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
function spawn(file, args, options) {
//解析参数
options = normalizeSpawnArguments(file, args, options);
//验证timeout
validateTimeout(options.timeout);
//验证signal
validateAbortSignal(options.signal, 'options.signal');
//验证并转换killSignal信号,这里嵌套太深而且不重要,就不再深究
const killSignal = sanitizeKillSignal(options.killSignal);
//新建子进程,重要
const child = new ChildProcess();
//这里实际是记录log,而不是debug
debug('spawn', options);
//初始化子进程参数
child.spawn(options);

//子进程超时的操作
if (options.timeout > 0) {
let timeoutId = setTimeout(() => {
if (timeoutId) {
try {
child.kill(killSignal);
} catch (err) {
child.emit('error', err);
}
timeoutId = null;
}
}, options.timeout);

child.once('exit', () => {
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
});
}

//是否有killSignal参数,即终止进程的信号值,有的话添加abort事件
if (options.signal) {
const signal = options.signal;
if (signal.aborted) {
process.nextTick(onAbortListener);
} else {
signal.addEventListener('abort', onAbortListener, { once: true });
child.once('exit',
() => signal.removeEventListener('abort', onAbortListener));
}

function onAbortListener() {
abortChildProcess(child, killSignal);
}
}
//返回子进程
return child;
}

这个函数中最重要的就是 const child = new ChildProcess();这一部分,我们再看一下这个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
function ChildProcess() {
FunctionPrototypeCall(EventEmitter, this);

this._closesNeeded = 1;
this._closesGot = 0;
this.connected = false;

this.signalCode = null;
this.exitCode = null;
this.killed = false;
this.spawnfile = null;

//重要,新建进程
this._handle = new Process();
this._handle[owner_symbol] = this;

this._handle.onexit = (exitCode, signalCode) => {
if (signalCode) {
this.signalCode = signalCode;
} else {
this.exitCode = exitCode;
}

if (this.stdin) {
this.stdin.destroy();
}

this._handle.close();
this._handle = null;

if (exitCode < 0) {
const syscall = this.spawnfile ? 'spawn ' + this.spawnfile : 'spawn';
const err = errnoException(exitCode, syscall);

if (this.spawnfile)
err.path = this.spawnfile;

err.spawnargs = ArrayPrototypeSlice(this.spawnargs, 1);
this.emit('error', err);
} else {
this.emit('exit', this.exitCode, this.signalCode);
}

// If any of the stdio streams have not been touched,
// then pull all the data through so that it can get the
// eof and emit a 'close' event.
// Do it on nextTick so that the user has one last chance
// to consume the output, if for example they only want to
// start reading the data once the process exits.
process.nextTick(flushStdio, this);

maybeClose(this);
};
}
ObjectSetPrototypeOf(ChildProcess.prototype, EventEmitter.prototype);
ObjectSetPrototypeOf(ChildProcess, EventEmitter);
//...
ChildProcess.prototype.spawn = function(options) {
//...
}

//...
ChildProcess.prototype.kill = function(sig) {
//...
}
//...
ChildProcess.prototype.ref = function() {
if (this._handle) this._handle.ref();
};


ChildProcess.prototype.unref = function() {
if (this._handle) this._handle.unref();
};
//

可以看到这实际上就是一个原生的JS类,其在构造函数中定义了很多内部变量,其中最重要的是this._handle = new Process();来真实的新建一个进程。而这个类则是从C++底层导出的,所以我们不再向下深入。

在定义完类之后,Node立即为其添加了event模块,用于用户对其进行操作。

其中spawn()也很重要,注意每层的对象都有spawn()函数,其用来得到最后的输出结果。

child_process.fork(modulePath[, args][, options])

child_process.fork() 方法是 child_process.spawn()的特例,专门用于衍生新的 Node.js 进程。 与 child_process.spawn() 一样返回 ChildProcess对象。 返回的 ChildProcess会内置额外的通信通道,允许消息在父进程和子进程之间来回传递。 详见 subprocess.send()

记住,衍生的 Node.js 子进程独立于父进程,但两者之间建立的 IPC 通信通道除外。 每个进程都有自己的内存,带有自己的 V8 实例。 由于需要额外的资源分配,因此不建议衍生大量的 Node.js 子进程。

默认情况下, child_process.fork() 会使用父进程的 process.execPath来衍生新的 Node.js 实例。

该方法应该比较常用的,因为我们在做web的负载均衡或者多核CPU利用时,一般其他的server也是Node,所以通过该API可以直接生成新的进程。

示例
1
2
3
4
5
6
7
//前面的主从的例子
let fork = require('child_process').fork
let cpus = require('os').cpus()

for(let i = 0; i < cpus.length; i++){
fork('./work.js')
}

同步创建子进程

下面的函数会同步创建,执行进程,并且将会阻塞 Node.js 事件循环、暂停任何其他代码的执行,直到衍生的进程退出。阻塞这些调用对于简化通用的脚本任务和简化应用程序配置在启动时的加载或处理都非常有用。

child_process.execFileSync(file[, args][, options])

child_process.execFileSync() 方法通常与 child_process.execFile() 相同,但该方法在子进程完全关闭之前不会返回。 当遇到超时并且已发送 killSignal 时,该方法也需等到进程完全退出后才返回。

如果子进程拦截并处理了 SIGTERM 信号但未退出,则父进程仍将等待子进程退出。

这个函数也是和execFile()的调用栈差不多。

exeFilr()->spawnSync(lib)->spawnSync()(internal)-> spawn_sync.spawn(options)(C++内建模块)

child_process.execSync(command[, options])

child_process.execSync() 方法通常与 child_process.exec() 相同,但该方法在子进程完全关闭之前不会返回。 当遇到超时并且已发送 killSignal 时,该方法也需等到进程完全退出后才返回。 如果子进程拦截并处理了 SIGTERM 信号但未退出,则父进程会等待直到子进程退出。

源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
function execSync(command, options) {
const opts = normalizeExecArgs(command, options, null);
const inheritStderr = !opts.options.stdio;

const ret = spawnSync(opts.file, opts.options);

if (inheritStderr && ret.stderr)
process.stderr.write(ret.stderr);

const err = checkExecSyncError(ret, opts.args, command);

if (err)
throw err;

return ret.stdout;
}

可以看到这个函数就没有调用exeFileSync(),而是直接调用的spawnSync()函数。下面我们再看spawnSync()

child_process.spawnSync(command[, args][, options])

child_process.spawnSync() 方法通常与 child_process.spawn() 相同,但在子进程完全关闭之前该函数不会返回。 当遇到超时并且已发送 killSignal 时,该方法也需等到进程完全退出后才返回。 如果进程拦截并处理了 SIGTERM 信号但未退出,则父进程会等待直到子进程退出。

源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
function spawnSync(file, args, options) {
options = {
maxBuffer: MAX_BUFFER,
...normalizeSpawnArguments(file, args, options)
};

debug('spawnSync', options);

// Validate the timeout, if present.
validateTimeout(options.timeout);

// Validate maxBuffer, if present.
validateMaxBuffer(options.maxBuffer);

// Validate and translate the kill signal, if present.
options.killSignal = sanitizeKillSignal(options.killSignal);

options.stdio = getValidStdio(options.stdio || 'pipe', true).stdio;

if (options.input) {
const stdin = options.stdio[0] = { ...options.stdio[0] };
stdin.input = options.input;
}

// We may want to pass data in on any given fd, ensure it is a valid buffer
for (let i = 0; i < options.stdio.length; i++) {
const input = options.stdio[i] && options.stdio[i].input;
if (input != null) {
const pipe = options.stdio[i] = { ...options.stdio[i] };
if (isArrayBufferView(input)) {
pipe.input = input;
} else if (typeof input === 'string') {
pipe.input = Buffer.from(input, options.encoding);
} else {
throw new ERR_INVALID_ARG_TYPE(`options.stdio[${i}]`,
['Buffer',
'TypedArray',
'DataView',
'string'],
input);
}
}
}

//主要,调用internal的spawnSync的函数
return child_process.spawnSync(options);
}

上面都是对参数进行验证和处理。最后只是调用了child_process.spawnSync(options)函数,我们再看一下这个函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
function spawnSync(options) {
const result = spawn_sync.spawn(options);

if (result.output && options.encoding && options.encoding !== 'buffer') {
for (let i = 0; i < result.output.length; i++) {
if (!result.output[i])
continue;
result.output[i] = result.output[i].toString(options.encoding);
}
}

result.stdout = result.output && result.output[1];
result.stderr = result.output && result.output[2];

if (result.error) {
result.error = errnoException(result.error, 'spawnSync ' + options.file);
result.error.path = options.file;
result.error.spawnargs = ArrayPrototypeSlice(options.args, 1);
}

return result;
}

这里最重要的就是第一条语句,即调用了spawn_sync.spawn(options)函数,这个函数也是从C++导入的内部模块。

所以对于Process模块,JavaScript层面只做了参数的验证,处理,以及返回数据。(因为JavaScript的能力就止于此了),其内部具体的逻辑,都是在C++平台来处理的。而这一部分,是由libuv进行处理的,因为涉及到跨平台处理不同。

总结

名称 执行类型 同步/异步 事件处理方式
exec 命令 异步 callback回调/event监听
execFile 命令(文件) 异步 callback回调/event监听
Fork JavaScript模块 异步 event监听
execSync 命令 同步 /
execFileSync 命令(文件) 同步 /

进程间通信

在Master-Work中模式中,要实现主进程管理和调度工作进程的功能,需要主进程和工作进程之间的通信。对于child_process模块,创建好了子进程,然后与父子进程间通信是十分容易的。

由于Node中的进程模块实现了EventEmitter,所以可以直接使用事件监听的方式来进行通信。

用法

对应的子进程有5个事件,其中包括通信的message事件。

事件 触发时间 返回
close 进程关闭 1. code 子进程自行退出时的退出码。
2. signal子进程被终止的信号。
disconnect 调用父进程中的 subprocess.disconnect()或子进程中的 process.disconnect() 后会触发 'disconnect' 事件。 /
error 1. 无法衍生进程;
2. 无法杀死进程;
3. 向子进程发送消息失败。
err 错误
exit 当子进程结束后时会触发 'exit' 事件。 1. code子进程自行退出时的退出码。
2. signal 子进程被终止的信号。
message 当子进程使用 process.send()发送消息时会触发 'message' 事件。 1. message 一个已解析的 JSON 对象或原始值。
2. sendHandle一个 net.Socketnet.Server 对象,或 undefined

通过Process.end()方法,可以向子进程发送消息,而主进程可以通过message事件监听子进程的消息。

而子进程可以通过内置对象processsend()方法,和同样的事件监听来与父进程通信。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//master.js
const cp = require('child_process')
const n = cp.fork(__dirname + '/sub.js')

n.on('message', (m) => {
console.log(`进程消息:${m}`)
})

setTimeout(() => {
work.send('父进程发送消息来了')
}, 3000)

//work.js
process.on('message', (m) => {
console.log(`进程消息:${m}`)
})

setTimeout(() => {
process.send('子进程发送消息来了')
}, 2000)

原理

为了实现父子进程之间的通信,父进程与子进程之间将会创建IPC通道。通过IPC通道,父子进程之间才能通过messagesend()传递消息。

IPC的全程是Inter-Process-Communication,即进程间通信。进程间通信的目的是为了让不同的进程能够互相访问资源并进行协调工作。实现进程间的通信技术有很多,如管道通信、共享存储、消息传递、socket等。Node中实现IPC通信的是管道(pipe)技术。但这个管道与传统的管道有一些不同,在Node中管道是一个抽象层的称呼,具体实现细节由libuv提供,在Windows下由命名管道(named pipe)实现,*nix下则采用Unix Domain Socket。表现在应用层上的进程通信只有简单的message事件和send()方法,接口十分简洁。

IPC

句柄传递

一般来说,NodeJS用于后端服务器的开发,所以多进程一般用于调动多核CPU来提高并发,那么不同的进程能否监听同一个端口来实现提高并发的目的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//work.js
const http = require('http')

http.createServer(function(req, res){
res.writeHead(200, {'Content-Type': 'text/pain'})
res.end('Hello World')
}).listen(8888, '127.0.0.1')

//master.js
const fork = requrie('child_process').fork

for(let i = 0; i < 2; i++){
for('./work.js')
}

得到的错误如下:

```bash
events.js:187
throw er; // Unhandled 'error' event
^

Error: listen EADDRINUSE: address already in use 127.0.0.1:8888

很明显,不同的进程启动同一个端口的server,肯定会引起端口占用的问题。

最开始为了解决这个问题,我们的想法是代理。即主进程对外接受所有的网络请求,再将这些请求分别代理到不同端口的进程上。

通过代理,可以避免端口不能重复监听的问题,甚至可以在代理进程上做适当的负载均衡,使得每个子进程可以较为均衡地执行任务。但是由于进程每接受到一个连接,将会用掉一个文件描述符,因此代理方案中客户端连接到代理进程,代理进程连接到工作进程地过程就会用掉两个文件描述符。操作系统的文件描述符是有限的,代理方案浪费掉一倍的文件描述符地做法影响了系统地扩展能力。

为了解决上述问题,Node在V0.5.9加入了进程间发送句柄的功能。sned()方法除了能通过IPC发送数据外,还能够发送句柄,第二个可选参数即为句柄。

句柄即一种用来标识资源的引用,它的内部包含了指向对象的文件描述符。比如句柄可以来把标识一个socket对象,一个UDP套接字,一个管道等。因此,我们可以通过发送句柄来达到更高的多核利用效率。下面是一个例子(传递一个HTTP句柄):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//master.js
const { fork } = require('child_process');
const net = require('net');
const os = require('os');

const workers = [];
for (let i = 0, len = os.cpus().length; i < len; i++) {
const worker = fork('./work.js');
workers.push(worker);
console.log('创建新进程:'+worker.pid)
}

const server = net.createServer();
server.listen(9527, () => {
workers.forEach(worker => {
worker.send('SERVER', server);
});
//关闭主服务器
server.close();
});

//work.js
const http = require('http');

// 创建每个协助进程的 http 服务器,不监听任何端口号
const httpServer = http.createServer((req, res) => {
res.end(`Hello world by ${process.pid}\n`);
});

process.on('message', (msg, tcpServer) => {
// 如果是 master 传递来的 tcp server
if (msg === 'SERVER') {
// 新连接建立的时候触发
tcpServer.on('connection', socket => {
// 把 tcp server 的连接转给 http server 处理
httpServer.emit('connection', socket);
});
}
});

可以看到,我们在主进程发送完句柄之后,即关闭监听,也就是说,将所有的请求处理都交给子进程,这样可以做到权责分明。结构图如下:

分布式

同端口监听的原理

句柄的发送与还原

目前子进程对象的send()方法可以发送的句柄类型包括如下几种:

  • net.Socket:TCP套接字。
  • net.Server:TCP服务器,任意建立在TCP服务上的应用层服务都可以进行传递。
  • net.Native:C++层面的TCP套接字或IPC管道。
  • dgram.Socket:UDP套接字。
  • dgram.Native:C++层面的UDP套接字。

send()方法在将消息发送到IPC管道之前,会将消息组装成为两个对象,一个参数是handle,另一个是message

我们简单看一下其源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
//暴露用于调用的API,主要用来解析参数,后面调用_send()来进行发送
target.send = function(message, handle, options, callback) {
if (typeof handle === 'function') {
callback = handle;
handle = undefined;
options = undefined;
} else if (typeof options === 'function') {
callback = options;
options = undefined;
} else if (options !== undefined) {
validateObject(options, 'options');
}

options = { swallowErrors: false, ...options };

if (this.connected) {
return this._send(message, handle, options, callback);
}
const ex = new ERR_IPC_CHANNEL_CLOSED();
if (typeof callback === 'function') {
process.nextTick(callback, ex);
} else {
process.nextTick(() => this.emit('error', ex));
}
return false;
};

//实际调用的API
target._send = function(message, handle, options, callback) {
assert(this.connected || this.channel);

//消息为空
if (message === undefined)
throw new ERR_MISSING_ARGS('message');

// Non-serializable messages should not reach the remote
// end point; as any failure in the stringification there
// will result in error message that is weakly consumable.
// So perform a final check on message prior to sending.
if (typeof message !== 'string' &&
typeof message !== 'object' &&
typeof message !== 'number' &&
typeof message !== 'boolean') {
throw new ERR_INVALID_ARG_TYPE(
'message', ['string', 'object', 'number', 'boolean'], message);
}

// Support legacy function signature
if (typeof options === 'boolean') {
options = { swallowErrors: options };
}

let obj;

// Package messages with a handle object
//注意:这里处理含有handle参数的情况
if (handle) {
// This message will be handled by an internalMessage event handler
//新的message将会含有一个type属性用来标识handle的类型
message = {
cmd: 'NODE_HANDLE',
type: null,
msg: message
};

//判断handle种类
if (handle instanceof net.Socket) {
message.type = 'net.Socket';
} else if (handle instanceof net.Server) {
message.type = 'net.Server';
} else if (handle instanceof TCP || handle instanceof Pipe) {
message.type = 'net.Native';
} else if (handle instanceof dgram.Socket) {
message.type = 'dgram.Socket';
} else if (handle instanceof UDP) {
message.type = 'dgram.Native';
} else {
throw new ERR_INVALID_HANDLE_TYPE();
}

// Queue-up message and handle if we haven't received ACK yet.
if (this._handleQueue) {
ArrayPrototypePush(this._handleQueue, {
callback: callback,
handle: handle,
options: options,
message: message.msg,
});
return this._handleQueue.length === 1;
}

obj = handleConversion[message.type];

// convert TCP object to native handle object
handle = ReflectApply(handleConversion[message.type].send,
target, [message, handle, options]);

// If handle was sent twice, or it is impossible to get native handle
// out of it - just send a text without the handle.
if (!handle)
message = message.msg;

// Update simultaneous accepts on Windows
if (obj.simultaneousAccepts && process.platform === 'win32') {
handle.setSimultaneousAccepts(true);
}
} else if (this._handleQueue &&
!(message && (message.cmd === 'NODE_HANDLE_ACK' ||
message.cmd === 'NODE_HANDLE_NACK'))) {
// Queue request anyway to avoid out-of-order messages.
ArrayPrototypePush(this._handleQueue, {
callback: callback,
handle: null,
options: options,
message: message,
});
return this._handleQueue.length === 1;
}

可以看到这里在接收到参数之后,会构建一个新的message对象,其中含有handle的类型。然后将其推入_handleQueue中,后面由Node进行发送。

实际上发送到IPC管道中的是该句柄的文件描述符,文件描述符实际上是一个整数值。这个message对象在写入到IPC管道时会通过JSON.stringify()进行序列化。所以最终发送到IPC管道中的信息都是字符串。send()能发送消息和句柄不代表它能够发送任意对象。

连接了IPC管道的子进程可以读取到父进程发来的消息,将字符串通过JSON.parse()解析还原为对象后,才出发message事件将消息作为消息体传递给应用层。在这个过程中,消息对象还要被过滤处理,message.cmd的值如果以NODE_为前缀,它将响应一个内部事件interalMessage。如果message.cmd值为NODE_HANDLE,它将取出message.type值和得到的为文件描述符一起还原出一个对应的对象。这个过程如图。

句柄发送还原

端口的共同监听

在了解句柄传递背后的原理之后,我们继续探索为何发送句柄后,多个进程可以监听到同一个端口。其原因就在于:我们启动独立的进程中,TCP服务器端socket套接字的文件描述符并不相同,导致监听到相同的端口时会抛出异常。

Node底层对每个端口监听都设置了SO_REUSEADDR选项,这个选项的含义是不同的进程可以就相同的网卡和端口进行监听,这个服务器套接字可以被不同的进程复用。

但是由于独立启动的进程互相之间不知道文件描述符,所以监听相同的端口时就会失败。但对于send()发送的句柄还原出来的服务而言,他们的文件描述符时相同的,所以可以监听到相同的端口不会引发异常。

多个应用监听同一个端口时,文件描述符同一时间只能被某一个进程所用。也就是说一个连接只能由一个进程处理,这些进程服务是抢占式的。

集群稳定之路

进程事件

在进程除了message事件外,Node还定义了以下事件:

  • error:当子进程无法被创建或无法被杀死或无法发送消息时会触发该事件。
  • exit:子进程退出时触发该事件,如果是正常退出,整个事件的第一个参数为退出码,否则为null。如果是通过kill()方法杀死的,会得到第二个参数,它标识杀死进程时的信号。
  • close:在子进程的标准输入输出流终止时触发该事件,参数与exit相同。
  • disconnect:在父进程或子进程中调用disconnect()方法时触发该事件,在调用该方法时将关闭监听IPC通道。

自动重启

有了父子进程之间的相关事件之后,就可以在这些关系之间创建出需要的机制。我们通过子进程的exit事件来获知其退出的消息。同时利用这个事件,当一个子进程退出时,可以重新启动一个进程来继续服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
let fork = require('child_process').fork
let cpus = require('os').cpus()

let server = require('http').createServer()
server.listen(1337)

let works = {}
let createWork = function(){
let worker = fork(__dirname + '/worker.js')
worker.on('exit', function(){
console.log('work' + worker.pid + 'exited.')
delete workers[worker.pid]
createWork()
})
//句柄转发
worker.send('server', server)
workers[worker.pid] = worker
console.log('Create worker. pid:'+worker.pid)

for(let i = 0; i < cpus.length; i++){
createServer()
}

process.on('exit', function(){
for(let pid in workers){
workers[pid].kill()
}
})
}

结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
PS D:\Test\Node> node reStart.js
Create worker. pid:11436
Create worker. pid:14868
Create worker. pid:16240
Create worker. pid:11044

//杀死11436
C:\Users\Administrator>taskkill -F /pid 11436
成功: 已终止 PID 为 11436 的进程。

//自动新增进程
work11436exited.
Create worker. pid:15608

负载均衡

多个进程的同时服务一个工作时,必须要保证每个进程都得到适量的工作,这种保证多个单元工作量公平的策略叫做负载均衡。

Node默认提供的机制是采用操作系统的抢占式策略。所谓的抢占式就是在一顿工作进程中,闲着的进程对到来的请求进行争抢,谁抢到谁服务。

这种机制一般来说是公平的,但是需要明白的是,一个进程的繁忙分为CPU繁忙和I/O繁忙两个部分构成。对不同的业务,可能存在I/O繁忙,而CPU较为空闲的情况,这可能造成某个进程能够能够抢到较多的进程,形成负载不均衡的情况。

在此Node在v0.11中提供了一种的新的策略使得负载均衡更加合理,即轮转调度算法。轮转调度的工作方式是由主进程接受连接,将其依次分发给工作进程。分发的策略是在N个工作进程中,每次选择第i = (i + 1) % n 个进程来发送连接。在cluster模块中启用它。

Cluster模块

前面介绍的child_process模块,如果完全使用该模块来完成一个集群,那么还是有一定的难度,所以Node提供了cluster模块用于更方便的实现集群。

工作原理

工作进程由 child_process.fork() 方法创建,因此它们可以使用 IPC 和父进程通信,从而使各进程交替处理连接服务。

cluster 模块支持两种分发连接的方法。

第一种方法(也是除 Windows 外所有平台的默认方法)是循环法,由主进程负责监听端口,接收新连接后再将连接循环分发给工作进程,在分发中使用了一些内置技巧防止工作进程任务过载。

第二种方法是,主进程创建监听 socket 后发送给感兴趣的工作进程,由工作进程负责直接接收连接。

理论上第二种方法应该是效率最佳的。 但在实际情况下,由于操作系统调度机制的难以捉摸,会使分发变得不稳定。 可能会出现八个进程中有两个分担了 70% 的负载。

因为 server.listen() 将大部分工作交给主进程完成,因此导致普通 Node.js 进程与 cluster 工作进程差异的情况有三种:

  1. server.listen({fd: 7}) 因为消息会被传给主进程,所以父进程中的文件描述符 7 将会被监听并将句柄传给工作进程,而不是监听文件描述符 7 指向的工作进程。
  2. server.listen(handle) 显式地监听句柄,会导致工作进程直接使用该句柄,而不是和主进程通信。
  3. server.listen(0) 正常情况下,这种调用会导致 server 在随机端口上监听。

但在 cluster 模式中,所有工作进程每次调用 listen(0) 时会收到相同的“随机”端口。 实质上,这种端口只在第一次分配时随机,之后就变得可预料。 如果要使用独立端口的话,应该根据工作进程的 ID 来生成端口号。

Node.js 不支持路由逻辑。 因此在设计应用时,不应该过分依赖内存数据对象,例如 session 和登陆等。

由于各工作进程是独立的进程,它们可以根据需要随时关闭或重新生成,而不影响其他进程的正常运行。 只要有存活的工作进程,服务器就可以继续处理连接。 如果没有存活的工作进程,现有连接会丢失,新的连接也会被拒绝。 Node.js 不会自动管理工作进程的数量,而应该由具体的应用根据实际需要来管理进程池。

虽然 cluster 模块主要用于网络相关的情况,但同样可以用于其他需要工作进程的情况。

使用

将上面的代码改写为cluster的方式:

这里的setupMaster方法:

  1. setupMaster 用于修改默认的 ‘fork’ 行为。 一旦调用,将会按照 cluster.settings 进行设置。

  2. 所有的设置只对后来的 .fork() 调用有效,对之前的工作进程无影响。

  3. 唯一无法通过 .setupMaster() 设置的属性是传给 .fork()env 属性。

其接受的参数为cluster.settings对象:

  1. execArgv <string[]> 传给 Node.js 可执行文件的字符串参数列表。默认值: process.execArgv。
  2. exec 工作进程的文件路径。默认值: process.argv[1]。
  3. args <string[]> 传给工作进程的字符串参数。默认值: process.argv.slice(2)。
  4. cwd 工作进程的当前工作目录。默认值: undefined(从父进程继承)。
  5. serialization 指定用于在进程之间发送消息的序列化类型。可能的值为 ‘json’ 和 ‘advanced’。有关更多详细信息,请参见child_process 的高级序列化。默认值: false。
  6. silent 是否需要发送输出到父进程的 stdio。默认值: false。
  7. stdio 配置衍生的进程的 stdio。 由于 cluster 模块运行依赖于 IPC,这个配置必须包含 ‘ipc’。如果提供了这个选项,则覆盖 silent。
  8. uid 设置进程的用户标识符。参见 setuid(2)。
  9. gid 设置进程的群组标识符。参见 setgid(2)。
    inspectPort | 设置工作进程的检查端口。这可以是一个数字、或不带参数并返回数字的函数。默认情况下,每个工作进程都有自己的端口,从主进程的 process.debugPort 开始递增。
    windowsHide 隐藏衍生的进程的控制台窗口(通常在 Windows 系统上会创建)。默认值: false。
1
2
3
4
5
6
7
8
9
10
let  cluster = require('cluster')

cluster.setupMaster({
exec: 'worker.js'
})

let cpus = require('os').cpus()
for(let i = 0; i < cpus.length; i++){
cluster.fork()
}

事件

同样,cluster模块提供了一些事件监听:

  1. disconnect事件
  2. exit事件
  3. fork事件
  4. listening事件
  5. message事件
  6. online事件:当衍生一个新的工作进程后,工作进程应当响应一个上线消息。
  7. setup事件:每当 .setupMaster() 被调用时触发。

引用

本文大部分参考《深入浅出nodejs》以及Node官网

Powered by Hexo and Hexo-theme-hiker

Copyright © 2019 - 2024 My Wonderland All Rights Reserved.

UV : | PV :