minionpool
Advanced tools
@@ -29,4 +29,4 @@ /** | ||
| debug: true, | ||
| concurrency: 50, | ||
| taskSourceInit: function() { | ||
| concurrency: 5, | ||
| taskSourceInit: function(callback) { | ||
| var pool = mysql.createPool({ | ||
@@ -38,3 +38,3 @@ host: '127.0.0.1', | ||
| }); | ||
| return {pool: pool, item: 0}; | ||
| callback({pool: pool, item: 0}); | ||
| }, | ||
@@ -41,0 +41,0 @@ taskSourceTerminate: function(state) { |
+1
-1
@@ -5,3 +5,3 @@ { | ||
| "description": "A simple task-consumer pool for nodejs", | ||
| "version": "0.0.3", | ||
| "version": "0.0.4", | ||
| "homepage": "https://github.com/marcelog/minionpool", | ||
@@ -8,0 +8,0 @@ "keywords": [ |
+5
-4
@@ -47,6 +47,7 @@ # About | ||
| // Called to initialize a 'task source'. It should return an initial 'state' | ||
| // (like db connections, file descriptors, etc). See below. | ||
| taskSourceInit: function() { | ||
| return {}; | ||
| // Called to initialize a 'task source'. It should call the callback with | ||
| // an initial 'state' (like db connections, file descriptors, etc). See below. | ||
| // The state will be passed when calling the next property. | ||
| taskSourceInit: function(callback) { | ||
| callback({}); | ||
| }, | ||
@@ -53,0 +54,0 @@ |
+34
-18
@@ -34,13 +34,15 @@ /** | ||
| var self = this; | ||
| this.taskSourceState = self.taskSource(self.taskSourceState, function(task) { | ||
| if(task === undefined) { | ||
| if(self.debug) { | ||
| self.debugMsg('MinionPool %s: No more tasks', self.name); | ||
| if(self.noMoreTasks) { | ||
| callback(undefined); | ||
| } else { | ||
| this.taskSourceState = self.taskSource(self.taskSourceState, function(task) { | ||
| if(task === undefined) { | ||
| if(self.debug) { | ||
| self.debugMsg('MinionPool %s: No more tasks', self.name); | ||
| } | ||
| self.noMoreTasks = true; | ||
| } | ||
| self.emit('noMoreTasks'); | ||
| self.noMoreTasks = true; | ||
| self.taskSourceTerminate(self.taskSourceState); | ||
| } | ||
| callback(task); | ||
| }); | ||
| callback(task); | ||
| }); | ||
| } | ||
| }; | ||
@@ -80,2 +82,12 @@ | ||
| var self = this; | ||
| this.minionsFinished = 0; | ||
| self.on('minionFinished', function(id) { | ||
| self.minionsFinished++; | ||
| if(self.minionsFinished === self.concurrency) { | ||
| if(self.debug) { | ||
| self.debugMsg('MinionPool %s: All minions done, shutting down', self.name); | ||
| } | ||
| self.taskSourceTerminate(self.taskSourceState); | ||
| } | ||
| }); | ||
| if(this.debug) { | ||
@@ -85,11 +97,13 @@ this.debugMsg('MinionPool %s: Starting work', this.name); | ||
| this.initMinions(); | ||
| this.taskSourceState = this.taskSourceInit(); | ||
| this.currentTasks = 0; | ||
| this.on('taskEnded', function(result) { | ||
| var minionId = result.minionId; | ||
| self.assignTask(minionId); | ||
| this.taskSourceInit(function(state) { | ||
| self.taskSourceState = state; | ||
| self.currentTasks = 0; | ||
| self.on('taskEnded', function(result) { | ||
| var minionId = result.minionId; | ||
| self.assignTask(minionId); | ||
| }); | ||
| for(var i = 0; i < self.concurrency; i++) { | ||
| self.assignTask(i); | ||
| } | ||
| }); | ||
| for(var i = 0; i < self.concurrency; i++) { | ||
| self.assignTask(i); | ||
| } | ||
| }; | ||
@@ -112,2 +126,4 @@ | ||
| minion.workOn(task, self.taskEnded); | ||
| } else { | ||
| self.emit('minionFinished', minionId); | ||
| } | ||
@@ -114,0 +130,0 @@ }); |
19265
2.86%206
8.42%85
1.19%