cluster-file-writer
Advanced tools
Comparing version 0.0.4 to 0.0.5
@@ -10,11 +10,11 @@ var fs = require('fs'); | ||
this._log.on('drain', function () { | ||
while(self._pausedWorkers.length > 0) { | ||
var worker = self._pausedWorkers.pop(); | ||
try { | ||
worker.send('cluster-file-writer-resume'); | ||
while(self._pausedWorkers.length > 0) { | ||
var worker = self._pausedWorkers.pop(); | ||
worker.send('cluster-file-writer-resume'); | ||
} | ||
} catch (e) { | ||
console.log(e); | ||
console.log(e); | ||
} | ||
} | ||
}); | ||
@@ -71,6 +71,9 @@ | ||
function Worker() { | ||
this.buffer = []; | ||
var self = this; | ||
this._paused = true; | ||
this._buffer = []; | ||
this._masterPaused = true; | ||
this._socketPaused = false; | ||
this.messageCount = 0; | ||
process.on('message', function(msg) { | ||
@@ -86,26 +89,51 @@ if (msg === 'cluster-file-writer-resume') { | ||
Worker.prototype.write = function(message) { | ||
if (this._paused) { | ||
this._schedule(message); | ||
if (this._socketPaused || this._masterPaused) { | ||
this._schedule(message); | ||
} else { | ||
this.messageCount++; | ||
process.send(message); | ||
this._writeImpl(message); | ||
} | ||
}; | ||
Worker.prototype._writeImpl = function(message) { | ||
this.messageCount++; | ||
var socketResume = process.send(message); | ||
if (!socketResume) { | ||
this._socketPaused = true; | ||
var self = this; | ||
setTimeout(function () { | ||
self._socketPaused = false; | ||
self._flushBuffer(); | ||
}, 100); | ||
} | ||
}; | ||
Worker.prototype._schedule = function(message) { | ||
var self = this; | ||
setImmediate(function() { | ||
if (self._paused) | ||
self._schedule(message) | ||
else | ||
self.write(message); | ||
}); | ||
this._buffer.push(message); | ||
}; | ||
Worker.prototype._resume = function () { | ||
this._paused = false; | ||
Worker.prototype._flushBuffer = function() { | ||
var self = this; | ||
if (this._buffer.length === 0) return; | ||
if (this._masterPaused || this._socketPaused) return; | ||
while (this._buffer.length > 0) { | ||
this._writeImpl(this._buffer.pop()); | ||
if (this._masterPaused || this._socketPaused) | ||
break; | ||
} | ||
}; | ||
Worker.prototype._resume = function () { | ||
this._masterPaused = false; | ||
this._flushBuffer(); | ||
}; | ||
Worker.prototype._pause = function () { | ||
this._paused = true; | ||
this._masterPaused = true; | ||
}; | ||
@@ -112,0 +140,0 @@ |
{ | ||
"name": "cluster-file-writer", | ||
"version": "0.0.4", | ||
"version": "0.0.5", | ||
"description": "write to a single file from a cluster", | ||
@@ -5,0 +5,0 @@ "keywords": ["cluster", "logger", "filesystem", "file"], |
Cluster file writer | ||
=================== | ||
This writter attemps to manage two backpressures in a master / worker scenario. The first is a backpressure on the file written by the master and the second is backpressure from | ||
the underlying stream between each worker and master. Unfortunately there is no 'drain' event for worker to master communication, so this module tries its best to mimic it using | ||
timers. | ||
###Install | ||
@@ -5,0 +9,0 @@ |
@@ -20,9 +20,17 @@ var assert = require('assert'); | ||
var settings = { batchSize: 100, numberOfBatches: 100, workers: 4 }; | ||
var settings = { batchSize: 2000, numberOfBatches: 100, workers: 4 }; | ||
var expectedMessages = settings.workers * settings.batchSize * settings.numberOfBatches | ||
var lastMW = 0; | ||
var ref = setInterval(function() { | ||
console.log(writer.messagesWritten); | ||
if (lastMW > 0) { | ||
console.log('%s per second, total of %s out of %s', writer.messagesWritten - lastMW, writer.messagesWritten, expectedMessages); | ||
} | ||
lastMW = writer.messagesWritten; | ||
//console.log(writer.messagesWritten); | ||
if (writer.messagesWritten === expectedMessages) { | ||
@@ -29,0 +37,0 @@ clearInterval(ref); |
var ClusterFileWriter = require('../lib/ClusterFileWriter'); | ||
process.on('uncaughtException', function(err) { | ||
console.error(err); | ||
}) | ||
var worker = new ClusterFileWriter.Worker(); | ||
@@ -10,2 +14,8 @@ | ||
var ref2 = setInterval(function() { | ||
//console.log(process.memoryUsage()); | ||
// console.log('buffer:', worker._buffer.length) | ||
}, 500) | ||
var total = 0; | ||
@@ -21,2 +31,3 @@ var ref = setInterval(function() { | ||
clearInterval(ref); | ||
clearInterval(ref2); | ||
setTimeout(function () { | ||
@@ -27,2 +38,5 @@ console.log(total, 'messages'); | ||
} | ||
}, 97); | ||
}, 1000); | ||
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
4865136
229
34