使用进程池提高Node.js并行计算能力 背景 Node是单线程模型,当需要执行多个独立且耗时任务的时候,只能通过child_process来分发任务,提高处理速度;不像Java这种多线程语言,可以通过线程来解决并行问题,Node只能创建进程来进行处理;但是进程相对于线程来说,开销太大。一旦进程数较多时,CPU和内存消耗严重(影响我干其他的事情),所以做了一个简易版的进程池,用来解决并行任务的处理。
思路 主控进程+工作进程群
我们真的需要setInterval来去轮询任务状态吗,什么时候才需要轮询任务状态然后调度? 工作进程状态发生改变的时候,才是我们需要去检测任务状态和调度的时机;所以,我们也可以利用IPC来通知主控进程进行检测任务状态和调度。ps:当然,还有更好的方法,嘿嘿
实现 ProcessPool.js 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 const fs = require ('fs' )const ProcessItem = require ('./ProcessItem' )const isCorrectType = require ('./util' ).isCorrectTypefunction ProcessPool ({ maxParallelProcess = 50 , timeToClose = 60 * 1000 , taskParams = [], dependency = '' , workDir ='' , taskName = Date.now( ), script = '',}) { try { isCorrectType('task' , script, 'function' ) isCorrectType('maxParallelProcess' , maxParallelProcess, 'number' ) isCorrectType('timeToClose' , timeToClose, 'number' ) isCorrectType('dependency' , dependency, 'string' ) isCorrectType('workDir' , workDir, 'string' ) } catch (e) { throw new Error ('参数不合法' + e) } this .timeToClose = timeToClose this .processList = new Map () this .currentProcessNum = 0 this .dependency = dependency this .workDir = workDir this .taskName = taskName this .task = `${this .workDir} /${this .taskName} .js` this .taskParamsTodo = taskParams this .maxParallelProcess = maxParallelProcess this .script = script this .ready = false try { this .buildTaskScript() } catch (e) { throw new Error ('创建任务脚本失败' + e) } } ProcessPool.prototype.run = function ( ) { if (this .ready) { let flag = this .hasWorkProcessRunning() const taskTodoNum = this .taskParamsTodo.length if (flag === 1 && taskTodoNum) { while (this .currentProcessNum < this .maxParallelProcess && this .currentProcessNum < taskTodoNum) { this .addProcess() } } else if (flag === 2 && !taskTodoNum) { } else if (flag === 2 && taskTodoNum) { const processList = this .processList.values() for (const p of processList) { if (p.state !== 1 || p.state !== 4 ) { this .reuseProcess(p.id) } } } else if (flag === -1 && taskTodoNum) { const processList = this .processList.values() for (const p of processList) { if (p.state !== 1 || p.state !== 4 ) { this .reuseProcess(p.id) } } } else if (flag < 0 && !taskTodoNum) { this .closeProcessPool() } } } ProcessPool.prototype.buildTaskScript = function ( ) { const taskDir = this .task const templateDir = `${__dirname} /task.js` const dependency = `${this .dependency} \n` const taskBody = this .script.toString() const templateReadStream = fs.createReadStream(templateDir) const taskWriteStream = fs.createWriteStream(taskDir) taskWriteStream.write(dependency) templateReadStream.pipe(taskWriteStream).write(taskBody) taskWriteStream.on('finish' , () => { this .ready = true this .run() }) } ProcessPool.prototype.addProcess = function ( ) { if (this .currentProcessNum <= this .maxParallelProcess) { let workParam = this .taskParamsTodo.shift() const newProcess = new ProcessItem({task : this .task, workParam}) this .processList.set(newProcess.id, newProcess) this .currentProcessNum++ this .listenProcessState(newProcess, workParam) } } ProcessPool.prototype.listenProcessState = function (workProcess, params ) { workProcess.process.on('message' , message => { if (message === 'finish' ) { workProcess.finishTask() } else if (message === 'failed' ) { this .taskParamsTodo.unshift(params) workProcess.unFinishTask() } this .run() }) } ProcessPool.prototype.hasWorkProcessRunning = function ( ) { if (!this .processList) return -1 if (this .processList && !this .processList.size) return 1 const processList = this .processList.values() for (const p of processList) { if (p.state === 1 ) return 2 } return -1 } ProcessPool.prototype.reuseProcess = function (id ) { const workProcess = this .processList.get(id) if (this .taskParamsTodo.length && workProcess && workProcess.state !== 1 ) { const taskParam = this .taskParamsTodo.shift() workProcess.state = 1 workProcess.process.send(taskParam) } } ProcessPool.prototype.removeProcess = function (id ) { let workProcess = this .processList.get(id) if (workProcess) { workProcess.terminate() this .currentProcessNum-- } } ProcessPool.prototype.removeAllProcess = function ( ) { const processItems = this .processList.values() for (const processItem of processItems) { processItem.terminate() } } ProcessPool.prototype.closeProcessPool = function ( ) { this .removeAllProcess() this .ready = false this .processList = null } module .exports = ProcessPool
ProcessItem.js 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 const ChildProcess = require ('child_process' )function ProcessItem ({ task = './task.js' , workParam = [] } ) { if (!Array .isArray(workParam)) { throw new Error ('workParam must be a array' ) } if (typeof task !== 'string' ) { throw new Error ('workParam must be a string' ) } this .process = this .createProcess(task, workParam) this .state = 1 this .id = this .process.pid } ProcessItem.prototype.finishTask = function ( ) { if (this .state === 1 ) { this .state = 2 } } ProcessItem.prototype.unFinishTask = function ( ) { this .state = 3 } ProcessItem.prototype.terminate = function ( ) { try { this .process.kill() this .state = 4 } catch (e) { throw new Error (`关闭进程${this .id} 失败` ) } } ProcessItem.prototype.createProcess = function (task, workParam ) { let childProcess = ChildProcess.fork(task, workParam) if (childProcess) { return childProcess } else { throw new Error ('create process failed' ) } } module .exports = ProcessItem
Task.js 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 async function firstTask ( ) { const workParam = process.argv.slice(2 ) await task(workParam) } async function finishTask ( ) { await process.send('finish' ) } async function unFinishTask ( ) { await process.send('failed' ) } process.on('message' , async workParam => { await task(workParam) try { await finishTask() } catch (e) { await unFinishTask() } }) async function main ( ) { try { await firstTask() await finishTask() } catch (e) { await unFinishTask() } } main()
Util.js 1 2 3 4 5 6 7 8 9 10 11 12 13 function isCorrectType (name,value, type ) { if (type === 'array' ) { if (!Array .isArray(value)) { throw new Error (`${name} must be a array` ) } } else { if (typeof value !== type) { throw new Error (`${name} must be a ${type} ` ) } } } exports.isCorrectType = isCorrectType
使用方法 安装 1 npm install node-process-pool
使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 const ProcessPool = require ('../src/ProcessPool' )const taskParams = []for (let i = 0 ; i < 100 ; i++) { taskParams[i] = [i] } const processPool = new ProcessPool({ maxParallelProcess: 50 , timeToClose: 60 * 1000 , dependency: `const path = require('path')` , workDir: __dirname, taskName: 'test' , script: async function task (workParam ) { console .log(workParam) }, taskParams }) processPool.run()
其他 github地址:https://github.com/GeniusFunny/ProcessPool