Comparing version 1.0.2 to 1.0.3
@@ -317,2 +317,3 @@ 'use strict'; | ||
if (res) { | ||
self._next_mature_t = res; | ||
var delta = res - Queue.now ().getTime(); | ||
@@ -357,3 +358,3 @@ self._verbose ('_nextDelta: got from queue.next_t to be %d msecs', delta); | ||
this._nextDelta (function (delta) { | ||
//|| TODO cap out delta to a max of 5 or 10 min | ||
// TODO cap out delta to a max of 5 or 10 min | ||
self._getOrFail_timeout = setTimeout (function () {self._getOrFail ()}, delta); | ||
@@ -440,4 +441,7 @@ self._verbose ('_rearm_getOrFail: _getOrFail rearmed, wait is %d', delta); | ||
self._verbose ('_getOrFail: consumer %s (tid %s) was cancelled, ignoring it...', consumer.cid, consumer.tid); | ||
return self._reinsertAndRearm (result); | ||
// TODO do not reinsert if it's using reserve!!!! | ||
// do not reinsert if it's using reserve | ||
if (!(consumer.reserve)) { | ||
return self._reinsertAndRearm (result); | ||
} | ||
} | ||
@@ -444,0 +448,0 @@ |
{ | ||
"name": "keuss", | ||
"version": "1.0.2", | ||
"version": "1.0.3", | ||
"keywords": ["queue", "job"], | ||
@@ -26,3 +26,4 @@ "homepage":"https://github.com/pepmartinez/keuss", | ||
"ioredis": "3.1.1", | ||
"commander": "2.10.0" | ||
"commander": "2.10.0", | ||
"mitt": "1.1.2" | ||
}, | ||
@@ -29,0 +30,0 @@ "devDependencies": { |
'use strict'; | ||
var mitt = require ('mitt'); | ||
var Signal = require ('../Signal'); | ||
@@ -9,2 +11,11 @@ | ||
this._factory = factory; | ||
this._channel = 'keuss:q:signal:' + queue.type () + ':' + queue.name (); | ||
var self = this; | ||
this._factory._emitter.on (this._channel, function (message) { | ||
var mature = message; | ||
self._verbose ('got mitt event on ch [%s], message is %s, calling master.emitInsertion(%d)', self._channel, message); | ||
self._master.signalInsertion (new Date (mature)); | ||
}); | ||
this._verbose ('created local signaller on queue [%s]', queue.name()); | ||
@@ -16,4 +27,4 @@ } | ||
emitInsertion (mature, cb) { | ||
this._verbose ('calling master.emitInsertion(%d)', mature); | ||
this._master.signalInsertion (mature, cb); | ||
// convey to local through mitt | ||
this._factory._emitter.emit (this._channel, mature.getTime ()); | ||
} | ||
@@ -25,2 +36,3 @@ } | ||
constructor (opts) { | ||
this._emitter = mitt(); | ||
} | ||
@@ -36,4 +48,3 @@ | ||
// TODO: make local, in-process pubsub over the factory. use mitt | ||
module.exports = LocalSignalFactory; |
'use strict'; | ||
var mitt = require ('mitt'); | ||
var RedisConn = require ('../utils/RedisConn'); | ||
@@ -13,13 +15,20 @@ var Signal = require ('../Signal'); | ||
this._opts = opts || {}; | ||
var self = this; | ||
this._rediscl_pub = RedisConn.conn (this._opts); | ||
this._rediscl_sub = RedisConn.conn (this._opts); | ||
this._factory._emitter.on (this._channel, function (message) { | ||
var mature = parseInt (message); | ||
self._verbose ('got redis pubsub event on ch [%s], message is %s, calling master.emitInsertion(%d)', self._channel, message, mature); | ||
self._master.signalInsertion (new Date (mature)); | ||
}); | ||
this._rediscl_pub = this._factory._rediscl_pub; | ||
this._rediscl_sub = this._factory._rediscl_sub; | ||
this._rediscl_sub.subscribe (this._channel); | ||
var self = this; | ||
this._rediscl_sub.on ('message', function (channel, message) { | ||
var mature = parseInt (message); | ||
self._verbose ('got redis pubsub event on ch [%s], mature is %d (%s),calling master.emitInsertion(%d)', channel, mature, message, mature); | ||
self._master.signalInsertion (new Date (mature)); | ||
self._verbose ('got redis pubsub event on ch [%s], message is %s,calling master.emitInsertion(%d)', channel, message, mature); | ||
// convey to local through mitt | ||
self._factory._emitter.emit (channel, message); | ||
}); | ||
@@ -41,2 +50,5 @@ | ||
this._opts = opts || {}; | ||
this._emitter = mitt(); | ||
this._rediscl_pub = RedisConn.conn (this._opts); | ||
this._rediscl_sub = RedisConn.conn (this._opts); | ||
} | ||
@@ -52,4 +64,2 @@ | ||
// TODO: relay to in-process pubsub over the factory. use mitt | ||
module.exports = RPSSignalFactory; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
151100
3044
8
+ Addedmitt@1.1.2
+ Addedmitt@1.1.2(transitive)