使用进程池提高Node.js并行计算能力

Author Avatar
GeniusFunny 3月 22, 2019
  • 在其它设备中阅读本文章

使用进程池提高Node.js并行计算能力

背景

Node是单线程模型,当需要执行多个独立且耗时任务的时候,只能通过child_process来分发任务,提高处理速度;不像Java这种多线程语言,可以通过线程来解决并行问题,Node只能创建进程来进行处理;但是进程相对于线程来说,开销太大。一旦进程数较多时,CPU和内存消耗严重(影响我干其他的事情),所以做了一个简易版的进程池,用来解决并行任务的处理。

适用场景:相同且独立且耗时的任务,例如,拿到某网站1000个用户的账号密码,我现在想要他们的信息,爬他,node-process-pool非常适合。

思路

主控进程+工作进程群

ProcessPool是我们管理进程的地方,我们通过传递配置参数(任务脚本、脚本需要的参数、最大并行进程数)生成一个ProcessPool实例,然后通过这个实例来管控进程池。

ProcessItem是我们进程池里的进程对象,ProcessItem对象除了process的信息,我们还增加了唯一标识和状态(忙碌、任务失败、任务完成、进程不可用)。

一批任务开始时,我们会一次性fork到最大并行进程数,然后开始监控是否有工作进程完成任务,如果有工作进程完成了任务,那我们就可以复用这个工作进程,让其执行新任务;如果任务执行失败,我们会将任务归还给进程池,等待下一次分发。

由于是相同且独立且耗时的任务,所以当某个工作进程完成任务时,我们很有必要去检测所有的工作进程是否已完成任务,而不只是复用这个工作进程,我们要一批一批的复用!!!

因为差不多的时间开始执行相同的任务,当一个工作进程完成时,完全可以相信其他工作进程也完成了任务,所以检测一轮所有的工作进程,若空闲,给他们分配新任务。

既然是批量分配任务,就不会存在只有某个工作进程在辛苦的运行,其他工作进程袖手旁,哈哈哈哈哈,总得雨露均沾嘛。

由于主控进程即要负责IPC又要不断监听批任务完成的情况,目前我采用的方式是setInterval切割,让IPC和监控能交替进行(ps:应该有更好的方法

我们真的需要setInterval来去轮询任务状态吗,什么时候才需要轮询任务状态然后调度?
工作进程状态发生改变的时候,才是我们需要去检测任务状态和调度的时机;所以,我们也可以利用IPC来通知主控进程进行检测任务状态和调度。
ps:当然,还有更好的方法,嘿嘿

实现

ProcessPool.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
const fs = require('fs')
const ProcessItem = require('./ProcessItem')
const isCorrectType = require('./util').isCorrectType
/**
* 进程池类
* @param maxParallelProcess,最大并行工作进程数
* @param timeToClose,任务最长耗时时间
* @param taskParams,所有任务脚本需要的参数
* @param dependency,任务脚本所需依赖
* @param taskName, 工作脚本名称
* @param script 脚本内容
* @param workDir 工作目录
*/
function ProcessPool({
maxParallelProcess = 50,
timeToClose = 60 * 1000,
taskParams = [],
dependency = '',
workDir ='',
taskName = Date.now(),
script = '',}) {
try {
isCorrectType('task', script, 'function')
isCorrectType('maxParallelProcess', maxParallelProcess, 'number')
isCorrectType('timeToClose', timeToClose, 'number')
isCorrectType('dependency', dependency, 'string')
isCorrectType('workDir', workDir, 'string')
} catch (e) {
throw new Error('参数不合法' + e)
}
this.timeToClose = timeToClose
this.processList = new Map() // 使用Map存储进程对象
this.currentProcessNum = 0 // 当前活动进程数
this.dependency = dependency // 任务脚本依赖
this.workDir = workDir // 主控函数工作目录
this.taskName = taskName // 任务脚本名称
this.task = `${this.workDir}/${this.taskName}.js`// 任务脚本路径
this.taskParamsTodo = taskParams // 待完成的任务参数数组,包含了n个小任务所需参数,所以是一个二维数组
this.maxParallelProcess = maxParallelProcess // 最大进程并行数
this.script = script // 任务脚本内容
this.ready = false // 任务脚本是否构建完成
try {
this.buildTaskScript() // 根据模版创建任务脚本
} catch (e) {
throw new Error('创建任务脚本失败' + e)
}
}
/**
* 启动进程池
*/
ProcessPool.prototype.run = function() {
if (this.ready) {
let flag = this.hasWorkProcessRunning() // 判断是否有工作进程正在执行或是否是第一次处理任务
const taskTodoNum = this.taskParamsTodo.length
if (flag === 1 && taskTodoNum) {
// 初始阶段,fork min{任务数,最大进程数} 的进程
while (this.currentProcessNum < this.maxParallelProcess && this.currentProcessNum < taskTodoNum) {
this.addProcess()
}
} else if (flag === 2 && !taskTodoNum) {
// 有忙碌的工作进程,且任务已下发完
} else if (flag === 2 && taskTodoNum) {
// 有忙碌的工作进程,但还有任务需下发
const processList = this.processList.values()
for (const p of processList) {
if (p.state !== 1 || p.state !== 4) {
this.reuseProcess(p.id)
}
}
} else if (flag === -1 && taskTodoNum) {
// 所有工作进程空闲,但还有任务需下发
const processList = this.processList.values()
for (const p of processList) {
if (p.state !== 1 || p.state !== 4) {
this.reuseProcess(p.id)
}
}
} else if (flag < 0 && !taskTodoNum) {
// 所有进程空闲,且任务已下发完
this.closeProcessPool()
}
}
}
/**
* 生成任务脚本
*/
ProcessPool.prototype.buildTaskScript = function() {
const taskDir = this.task
const templateDir = `${__dirname}/task.js`
const dependency = `${this.dependency}\n`
const taskBody = this.script.toString()
const templateReadStream = fs.createReadStream(templateDir)
const taskWriteStream = fs.createWriteStream(taskDir)
taskWriteStream.write(dependency)
templateReadStream.pipe(taskWriteStream).write(taskBody)
taskWriteStream.on('finish', () => {
this.ready = true
this.run()
})
}
/**
* 添加一个工作进程、指派任务且监听IPC
*/
ProcessPool.prototype.addProcess = function() {
if (this.currentProcessNum <= this.maxParallelProcess) {
let workParam = this.taskParamsTodo.shift()
const newProcess = new ProcessItem({task: this.task, workParam})
this.processList.set(newProcess.id, newProcess)
this.currentProcessNum++
this.listenProcessState(newProcess, workParam)
}
}
/**
* 工作进程与主控进程IPC
* @param workProcess
* @param params
*/
ProcessPool.prototype.listenProcessState = function(workProcess, params) {
workProcess.process.on('message', message => {
if (message === 'finish') {
workProcess.finishTask()
} else if (message === 'failed') {
this.taskParamsTodo.unshift(params)
workProcess.unFinishTask()
}
this.run()
})
}
/**
* 监测当前是否有正在处理任务的工作进程
* @returns {number}
*/
ProcessPool.prototype.hasWorkProcessRunning = function() {
if (!this.processList) return -1
if (this.processList && !this.processList.size) return 1 // 进程池刚启动,尚无工作进程
const processList = this.processList.values()
for (const p of processList) {
if (p.state === 1) return 2 // 有忙碌的进程
}
return -1
}
/**
* 复用空闲进程
* @param id,工作进程的pid
*/
ProcessPool.prototype.reuseProcess = function(id) {
const workProcess = this.processList.get(id)
if (this.taskParamsTodo.length && workProcess && workProcess.state !== 1) {
const taskParam = this.taskParamsTodo.shift()
workProcess.state = 1 // 设置为忙碌
workProcess.process.send(taskParam)
}
}
/**
* 关闭工作进程
* @param id
*/
ProcessPool.prototype.removeProcess = function(id) {
let workProcess = this.processList.get(id)
if (workProcess) {
workProcess.terminate()
this.currentProcessNum--
}
}
/**
* 关闭所有工作进程
*/
ProcessPool.prototype.removeAllProcess = function() {
const processItems = this.processList.values()
for (const processItem of processItems) {
processItem.terminate()
}
}
/**
* 关闭进程池
*/
ProcessPool.prototype.closeProcessPool = function() {
this.removeAllProcess()
this.ready = false
this.processList = null
}
module.exports = ProcessPool
ProcessItem.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
const ChildProcess = require('child_process')
/**
* 工作进程类
*/
function ProcessItem({ task = './task.js', workParam = [] }) {
/**
* state 状态码
* 1: 忙碌
* 2: 完成任务
* 3: 未完成任务
* 4: 不可用
*/
if (!Array.isArray(workParam)) {
throw new Error('workParam must be a array')
}
if (typeof task !== 'string') {
throw new Error('workParam must be a string')
}
this.process = this.createProcess(task, workParam)
this.state = 1
this.id = this.process.pid
}
ProcessItem.prototype.finishTask = function() {
if (this.state === 1) {
this.state = 2
}
}
ProcessItem.prototype.unFinishTask = function() {
this.state = 3
}
ProcessItem.prototype.terminate = function() {
try {
this.process.kill()
this.state = 4
} catch (e) {
throw new Error(`关闭进程${this.id}失败`)
}
}
ProcessItem.prototype.createProcess = function (task, workParam) {
let childProcess = ChildProcess.fork(task, workParam)
if (childProcess) {
return childProcess
} else {
throw new Error('create process failed')
}
}

module.exports = ProcessItem
Task.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 当进程被子进程创建后,立刻执行工作任务
*/
async function firstTask() {
const workParam = process.argv.slice(2)
await task(workParam)
}
/**
* 完成任务,提示进程池已完成,工作进程空闲
*/
async function finishTask() {
await process.send('finish')
}
/**
* 任务失败,提示进程池未完成,归还任务
*/
async function unFinishTask() {
await process.send('failed')
}
/**
* 监听进程池后续指派的任务
*/
process.on('message', async workParam => {
await task(workParam)
try {
await finishTask()
} catch (e) {
await unFinishTask()
}
})
/**
* 进程被创建时立即执行进程池指派的任务
* @returns {Promise<void>}
*/
async function main() {
try {
await firstTask()
await finishTask()
} catch (e) {
await unFinishTask()
}
}
main()
/**
* @name 工作进程负责的任务
* @param workParam // 执行任务所需的参数数组
* 动态添加任务脚本到此文件尾部
*/
Util.js
1
2
3
4
5
6
7
8
9
10
11
12
13
function isCorrectType(name,value, type) {
if (type === 'array') {
if (!Array.isArray(value)) {
throw new Error(`${name} must be a array`)
}
} else {
if (typeof value !== type) {
throw new Error(`${name} must be a ${type}`)
}
}
}

exports.isCorrectType = isCorrectType

使用方法

安装
1
npm install node-process-pool
使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 进程池使用示例
const ProcessPool = require('../src/ProcessPool')
const taskParams = []
for (let i = 0; i < 100; i++) {
taskParams[i] = [i]
}
// 创建进程池实例
const processPool = new ProcessPool({
maxParallelProcess: 50, // 支持最大进程并行数
timeToClose: 60 * 1000, // 单个任务被执行最大时长
dependency: `const path = require('path')`, // 任务脚本依赖
workDir: __dirname, // 当前目录
taskName: 'test', // 任务脚本名称
script: async function task(workParam) {
console.log(workParam)
}, // 任务脚本内容
taskParams // 需要执行的任务参数列表,二维数组
})
// 利用进程池进行处理大规模任务
processPool.run()

其他

github地址:https://github.com/GeniusFunny/ProcessPool