atomic-queue
Advanced tools
Comparing version 2.0.0 to 3.0.0
26
index.js
@@ -22,7 +22,6 @@ var events = require('events') | ||
this.worker = worker | ||
this.concurrency = opts.concurrency || 1 | ||
this.db = opts.db || memdb() | ||
this.opts = opts | ||
this.pool = createPool(this.worker, opts) | ||
this.pool = createPool(worker, opts) | ||
this.changes = createChangeDB({ | ||
@@ -36,2 +35,5 @@ db: this.db, | ||
this.stream = this.createDuplexStream() | ||
this.stream._queue = this | ||
this.pool.on('start', function start (data, worker, change) { | ||
@@ -49,3 +51,3 @@ var changeNum = change.change | ||
this.on('update-start', function updateStart (data) { | ||
this.stream.on('update-start', function updateStart (data) { | ||
debug('update-start', data) | ||
@@ -55,3 +57,3 @@ self.updatingInflight = true | ||
this.on('update-end', function updateEnd (data) { | ||
this.stream.on('update-end', function updateEnd (data) { | ||
debug('update-end', data) | ||
@@ -61,5 +63,2 @@ self.updatingInflight = false | ||
this.stream = this.createDuplexStream() | ||
this.stream._queue = this | ||
events.EventEmitter.call(this) | ||
@@ -88,4 +87,4 @@ | ||
this.initialize(function ready (err) { | ||
if (err) return self.emit('error', err) | ||
self.emit('ready') | ||
if (err) return self.stream.destroy(err) | ||
self.stream.emit('ready', self.inflight) | ||
var readStream = self.createWorkStream({since: self.inflight.since, live: true}) | ||
@@ -114,3 +113,2 @@ duplexStream.setReadable(readStream) | ||
cb() | ||
// also kick off the worker | ||
@@ -123,4 +121,2 @@ proc.work(data.value.value, doneWorking, data) | ||
self.emit('finish', data) | ||
// TODO implement purging. should remove processed entries from the changes feed | ||
@@ -133,6 +129,6 @@ | ||
function update () { | ||
if (self.updatingInflight) return self.once('update-end', update) | ||
self.emit('update-start', inflight) | ||
if (self.updatingInflight) return self.stream.once('update-end', update) | ||
self.stream.emit('update-start', inflight) | ||
self.db.put('inflight', inflight, function updated (err) { | ||
self.emit('update-end', inflight) | ||
self.stream.emit('update-end', inflight) | ||
if (err) splitStream.destroy(err) | ||
@@ -139,0 +135,0 @@ if (output) splitStream.push(output) |
{ | ||
"name": "atomic-queue", | ||
"version": "2.0.0", | ||
"version": "3.0.0", | ||
"description": "a crash friendly queue that persists queue state and can restart. uses a worker pool and has configurable concurrency", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
12
pool.js
@@ -7,6 +7,6 @@ var events = require('events') | ||
function Pool (workFn, opts) { | ||
if (!(this instanceof Pool)) return new Pool(workFn, opts) | ||
function Pool (workerTemplate, opts) { | ||
if (!(this instanceof Pool)) return new Pool(workerTemplate, opts) | ||
if (!opts) opts = {} | ||
this.workFn = workFn | ||
this.workerTemplate = workerTemplate | ||
this.working = 0 | ||
@@ -23,4 +23,8 @@ this.limit = opts.concurrency || 1 | ||
var workers = [] | ||
var useExistingWorkers = false | ||
if (Array.isArray(this.workerTemplate)) useExistingWorkers = true | ||
for (var i = 0; i < this.limit; i++) { | ||
var worker = createWorker(this.workFn) | ||
var workFn = useExistingWorkers ? this.workerTemplate[i] : this.workerTemplate | ||
var worker = createWorker(workFn) | ||
@@ -27,0 +31,0 @@ // consolidate events |
@@ -35,24 +35,28 @@ # atomic-queue | ||
### queue.on | ||
### events | ||
you can listen to the following events: | ||
in addition to standard stream events you can also listen to the following: | ||
#### error | ||
#### queue.on('ready') | ||
emitted after startup when the queue state has been read from disk and the queue is now ready to start working | ||
#### queue.on('error') | ||
when a catastrophic error has occurred | ||
#### start | ||
#### queue.on('update-start') | ||
when a job starts working | ||
when the queue starts flushing its state to disk | ||
#### end | ||
#### queue.on('update-end') | ||
when a job finishes working | ||
when the queue finishes flushing its state to disk | ||
#### update-start | ||
#### queue.pool.on('start') | ||
when the queue starts flushing its state to disk | ||
when a job starts working | ||
#### update-end | ||
#### queue.pool.on('finish') | ||
when the queue finishes flushing its state to disk | ||
when a job finishes working |
var events = require('events') | ||
var inherits = require('inherits') | ||
var debug = require('debug')('atomic-queue-worker') | ||
@@ -18,5 +19,7 @@ module.exports = Worker | ||
this.emit('start', data, change) | ||
debug('start', change.change) | ||
this.working = true | ||
this.workFn(data, function done (err, output) { | ||
self.working = false | ||
debug('finish', change.change) | ||
if (err) self.emit('error', err) | ||
@@ -23,0 +26,0 @@ self.emit('finish', output, data, change) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
9995
232
62