使用进程池提高Node.js并行计算能力 背景 Node是单线程模型,当需要执行多个独立且耗时任务的时候,只能通过child_process来分发任务,提高处理速度;不像Java这种多线程语言,可以通过线程来解决并行问题,Node只能创建进程来进行处理;但是进程相对于线程来说,开销太大。一旦进程数较多时,CPU和内存消耗严重(影响我干其他的事情),所以做了一个简易版的进程池,用来解决并行任务的处理。
适用场景:相同且独立且耗时的任务,例如,拿到某网站1000个用户的账号密码,我现在想要他们的信息,爬他,node-process-pool非常适合。
思路 主控进程+工作进程群
ProcessPool是我们管理进程的地方,我们通过传递配置参数(任务脚本、脚本需要的参数、最大并行进程数)生成一个ProcessPool实例,然后通过这个实例来管控进程池。
ProcessItem是我们进程池里的进程对象,ProcessItem对象除了process的信息,我们还增加了唯一标识和状态(忙碌、任务失败、任务完成、进程不可用)。
一批任务开始时,我们会一次性fork到最大并行进程数,然后开始监控是否有工作进程完成任务,如果有工作进程完成了任务,那我们就可以复用这个工作进程,让其执行新任务;如果任务执行失败,我们会将任务归还给进程池,等待下一次分发。
由于是相同且独立且耗时的任务,所以当某个工作进程完成任务时,我们很有必要去检测所有的工作进程是否已完成任务,而不只是复用这个工作进程,我们要一批一批的复用!!!
因为差不多的时间开始执行相同的任务,当一个工作进程完成时,完全可以相信其他工作进程也完成了任务,所以检测一轮所有的工作进程,若空闲,给他们分配新任务。
既然是批量分配任务,就不会存在只有某个工作进程在辛苦的运行,其他工作进程袖手旁,哈哈哈哈哈,总得雨露均沾嘛。
由于主控进程即要负责IPC又要不断监听批任务完成的情况,目前我采用的方式是setInterval切割,让IPC和监控能交替进行(ps:应该有更好的方法
我们真的需要setInterval来去轮询任务状态吗,什么时候才需要轮询任务状态然后调度? 工作进程状态发生改变的时候,才是我们需要去检测任务状态和调度的时机;所以,我们也可以利用IPC来通知主控进程进行检测任务状态和调度。ps:当然,还有更好的方法,嘿嘿
实现 ProcessPool.js 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 const fs = require ('fs' )const ProcessItem = require ('./ProcessItem' )const isCorrectType = require ('./util' ).isCorrectTypefunction ProcessPool ({ maxParallelProcess = 50 , timeToClose = 60 * 1000 , taskParams = [], dependency = '' , workDir ='' , taskName = Date.now( ), script = '',}) { try { isCorrectType('task' , script, 'function' ) isCorrectType('maxParallelProcess' , maxParallelProcess, 'number' ) isCorrectType('timeToClose' , timeToClose, 'number' ) isCorrectType('dependency' , dependency, 'string' ) isCorrectType('workDir' , workDir, 'string' ) } catch (e) { throw new Error ('参数不合法' + e) } this .timeToClose = timeToClose this .processList = new Map () this .currentProcessNum = 0 this .dependency = dependency this .workDir = workDir this .taskName = taskName this .task = `${this .workDir} /${this .taskName} .js` this .taskParamsTodo = taskParams this .maxParallelProcess = maxParallelProcess this .script = script this .ready = false try { this .buildTaskScript() } catch (e) { throw new Error ('创建任务脚本失败' + e) } } ProcessPool.prototype.run = function ( ) { if (this .ready) { let flag = this .hasWorkProcessRunning() const taskTodoNum = this .taskParamsTodo.length if (flag === 1 && taskTodoNum) { while (this .currentProcessNum < this .maxParallelProcess && this .currentProcessNum < taskTodoNum) { this .addProcess() } } else if (flag === 2 && !taskTodoNum) { } else if (flag === 2 && taskTodoNum) { const processList = this .processList.values() for (const p of processList) { if (p.state !== 1 || p.state !== 4 ) { this .reuseProcess(p.id) } } } else if (flag === -1 && taskTodoNum) { const processList = this .processList.values() for (const p of processList) { if (p.state !== 1 || p.state !== 4 ) { this .reuseProcess(p.id) } } } else if (flag < 0 && !taskTodoNum) { this .closeProcessPool() } } } ProcessPool.prototype.buildTaskScript = function ( ) { const taskDir = this .task const templateDir = `${__dirname} /task.js` const dependency = `${this .dependency} \n` const taskBody = this .script.toString() const templateReadStream = fs.createReadStream(templateDir) const taskWriteStream = fs.createWriteStream(taskDir) taskWriteStream.write(dependency) templateReadStream.pipe(taskWriteStream).write(taskBody) taskWriteStream.on('finish' , () => { this .ready = true this .run() }) } ProcessPool.prototype.addProcess = function ( ) { if (this .currentProcessNum <= this .maxParallelProcess) { let workParam = this .taskParamsTodo.shift() const newProcess = new ProcessItem({task : this .task, workParam}) this .processList.set(newProcess.id, newProcess) this .currentProcessNum++ this .listenProcessState(newProcess, workParam) } } ProcessPool.prototype.listenProcessState = function (workProcess, params ) { workProcess.process.on('message' , message => { if (message === 'finish' ) { workProcess.finishTask() } else if (message === 'failed' ) { this .taskParamsTodo.unshift(params) workProcess.unFinishTask() } this .run() }) } ProcessPool.prototype.hasWorkProcessRunning = function ( ) { if (!this .processList) return -1 if (this .processList && !this .processList.size) return 1 const processList = this .processList.values() for (const p of processList) { if (p.state === 1 ) return 2 } return -1 } ProcessPool.prototype.reuseProcess = function (id ) { const workProcess = this .processList.get(id) if (this .taskParamsTodo.length && workProcess && workProcess.state !== 1 ) { const taskParam = this .taskParamsTodo.shift() workProcess.state = 1 workProcess.process.send(taskParam) } } ProcessPool.prototype.removeProcess = function (id ) { let workProcess = this .processList.get(id) if (workProcess) { workProcess.terminate() this .currentProcessNum-- } } ProcessPool.prototype.removeAllProcess = function ( ) { const processItems = this .processList.values() for (const processItem of processItems) { processItem.terminate() } } ProcessPool.prototype.closeProcessPool = function ( ) { this .removeAllProcess() this .ready = false this .processList = null } module .exports = ProcessPool
ProcessItem.js 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 const ChildProcess = require ('child_process' )function ProcessItem ({ task = './task.js' , workParam = [] } ) { if (!Array .isArray(workParam)) { throw new Error ('workParam must be a array' ) } if (typeof task !== 'string' ) { throw new Error ('workParam must be a string' ) } this .process = this .createProcess(task, workParam) this .state = 1 this .id = this .process.pid } ProcessItem.prototype.finishTask = function ( ) { if (this .state === 1 ) { this .state = 2 } } ProcessItem.prototype.unFinishTask = function ( ) { this .state = 3 } ProcessItem.prototype.terminate = function ( ) { try { this .process.kill() this .state = 4 } catch (e) { throw new Error (`关闭进程${this .id} 失败` ) } } ProcessItem.prototype.createProcess = function (task, workParam ) { let childProcess = ChildProcess.fork(task, workParam) if (childProcess) { return childProcess } else { throw new Error ('create process failed' ) } } module .exports = ProcessItem
Task.js 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 async function firstTask ( ) { const workParam = process.argv.slice(2 ) await task(workParam) } async function finishTask ( ) { await process.send('finish' ) } async function unFinishTask ( ) { await process.send('failed' ) } process.on('message' , async workParam => { await task(workParam) try { await finishTask() } catch (e) { await unFinishTask() } }) async function main ( ) { try { await firstTask() await finishTask() } catch (e) { await unFinishTask() } } main()
Util.js 1 2 3 4 5 6 7 8 9 10 11 12 13 function isCorrectType (name,value, type ) { if (type === 'array' ) { if (!Array .isArray(value)) { throw new Error (`${name} must be a array` ) } } else { if (typeof value !== type) { throw new Error (`${name} must be a ${type} ` ) } } } exports.isCorrectType = isCorrectType
使用方法 安装 1 npm install node-process-pool
使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 const ProcessPool = require ('../src/ProcessPool' )const taskParams = []for (let i = 0 ; i < 100 ; i++) { taskParams[i] = [i] } const processPool = new ProcessPool({ maxParallelProcess: 50 , timeToClose: 60 * 1000 , dependency: `const path = require('path')` , workDir: __dirname, taskName: 'test' , script: async function task (workParam ) { console .log(workParam) }, taskParams }) processPool.run()
其他 github地址:https://github.com/GeniusFunny/ProcessPool