Comparing version 0.3.1 to 0.4.0
@@ -0,16 +1,32 @@ | ||
/* | ||
* Copyright (c) 2014-2015, Matteo Collina <hello@matteocollina.com> | ||
* | ||
* Permission to use, copy, modify, and/or distribute this software for any | ||
* purpose with or without fee is hereby granted, provided that the above | ||
* copyright notice and this permission notice appear in all copies. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | ||
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | ||
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | ||
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | ||
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | ||
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR | ||
* IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||
*/ | ||
'use strict'; | ||
function buildTests(opts) { | ||
function buildTests (opts) { | ||
var builder = opts.builder | ||
, test = opts.test | ||
var test = opts.test | ||
test('support on and emit', function(t) { | ||
test('support on and emit', function (t) { | ||
t.plan(4) | ||
var e = builder() | ||
, expected = { | ||
topic: 'hello world' | ||
, payload: { my: 'message' } | ||
} | ||
var expected = { | ||
topic: 'hello world', | ||
payload: { my: 'message' } | ||
} | ||
e.on('hello world', function(message, cb) { | ||
e.on('hello world', function (message, cb) { | ||
t.equal(e.current, 1, 'number of current messages') | ||
@@ -20,5 +36,5 @@ t.deepEqual(message, expected) | ||
cb() | ||
}, function() { | ||
e.emit(expected, function() { | ||
e.close(function() { | ||
}, function () { | ||
e.emit(expected, function () { | ||
e.close(function () { | ||
t.pass('closed') | ||
@@ -30,21 +46,21 @@ }) | ||
test('support multiple subscribers', function(t) { | ||
test('support multiple subscribers', function (t) { | ||
t.plan(3) | ||
var e = builder() | ||
, expected = { | ||
topic: 'hello world' | ||
, payload: { my: 'message' } | ||
} | ||
var expected = { | ||
topic: 'hello world', | ||
payload: { my: 'message' } | ||
} | ||
e.on('hello world', function(message, cb) { | ||
e.on('hello world', function (message, cb) { | ||
t.ok(message, 'message received') | ||
cb() | ||
}, function() { | ||
e.on('hello world', function(message, cb) { | ||
}, function () { | ||
e.on('hello world', function (message, cb) { | ||
t.ok(message, 'message received') | ||
cb() | ||
}, function() { | ||
e.emit(expected, function() { | ||
e.close(function() { | ||
}, function () { | ||
e.emit(expected, function () { | ||
e.close(function () { | ||
t.pass('closed') | ||
@@ -57,12 +73,12 @@ }) | ||
test('support multiple subscribers and unsubscribers', function(t) { | ||
test('support multiple subscribers and unsubscribers', function (t) { | ||
t.plan(2) | ||
var e = builder() | ||
, expected = { | ||
topic: 'hello world' | ||
, payload: { my: 'message' } | ||
} | ||
var expected = { | ||
topic: 'hello world', | ||
payload: { my: 'message' } | ||
} | ||
function first(message, cb) { | ||
function first (message, cb) { | ||
t.fail('first listener should not receive any events') | ||
@@ -72,6 +88,6 @@ cb() | ||
function second(message, cb) { | ||
function second (message, cb) { | ||
t.ok(message, 'second listener must receive the message') | ||
cb() | ||
e.close(function() { | ||
e.close(function () { | ||
t.pass('closed') | ||
@@ -81,5 +97,5 @@ }) | ||
e.on('hello world', first, function() { | ||
e.on('hello world', second, function() { | ||
e.removeListener('hello world', first, function() { | ||
e.on('hello world', first, function () { | ||
e.on('hello world', second, function () { | ||
e.removeListener('hello world', first, function () { | ||
e.emit(expected) | ||
@@ -91,14 +107,13 @@ }) | ||
test('removeListener', function(t) { | ||
test('removeListener', function (t) { | ||
t.plan(1) | ||
var e = builder() | ||
, expected = { | ||
topic: 'hello world' | ||
, payload: { my: 'message' } | ||
} | ||
, toRemoveCalled = false | ||
var expected = { | ||
topic: 'hello world', | ||
payload: { my: 'message' } | ||
} | ||
var toRemoveCalled = false | ||
function toRemove(message, cb) { | ||
function toRemove (message, cb) { | ||
toRemoveCalled = true | ||
@@ -108,9 +123,9 @@ cb() | ||
e.on('hello world', function(message, cb) { | ||
e.on('hello world', function (message, cb) { | ||
cb() | ||
}, function() { | ||
e.on('hello world', toRemove, function() { | ||
e.removeListener('hello world', toRemove, function() { | ||
e.emit(expected, function() { | ||
e.close(function() { | ||
}, function () { | ||
e.on('hello world', toRemove, function () { | ||
e.removeListener('hello world', toRemove, function () { | ||
e.emit(expected, function () { | ||
e.close(function () { | ||
t.notOk(toRemoveCalled, 'the toRemove function must not be called') | ||
@@ -124,14 +139,14 @@ }) | ||
test('without a callback on emit and on', function(t) { | ||
test('without a callback on emit and on', function (t) { | ||
t.plan(1) | ||
var e = builder() | ||
, expected = { | ||
topic: 'hello world' | ||
, payload: { my: 'message' } | ||
} | ||
var expected = { | ||
topic: 'hello world', | ||
payload: { my: 'message' } | ||
} | ||
e.on('hello world', function(message, cb) { | ||
e.on('hello world', function (message, cb) { | ||
cb() | ||
e.close(function() { | ||
e.close(function () { | ||
t.pass('closed') | ||
@@ -144,14 +159,14 @@ }) | ||
test('without any listeners', function(t) { | ||
test('without any listeners', function (t) { | ||
t.plan(2) | ||
var e = builder() | ||
, expected = { | ||
topic: 'hello world' | ||
, payload: { my: 'message' } | ||
} | ||
var expected = { | ||
topic: 'hello world', | ||
payload: { my: 'message' } | ||
} | ||
e.emit(expected) | ||
t.equal(e.current, 0, 'reset the current messages trackers') | ||
e.close(function() { | ||
e.close(function () { | ||
t.pass('closed') | ||
@@ -161,15 +176,15 @@ }) | ||
test('support one level wildcard', function(t) { | ||
test('support one level wildcard', function (t) { | ||
t.plan(2) | ||
var e = builder() | ||
, expected = { | ||
topic: 'hello/world' | ||
, payload: { my: 'message' } | ||
} | ||
var expected = { | ||
topic: 'hello/world', | ||
payload: { my: 'message' } | ||
} | ||
e.on('hello/+', function(message, cb) { | ||
e.on('hello/+', function (message, cb) { | ||
t.equal(message.topic, 'hello/world') | ||
cb() | ||
}, function() { | ||
}, function () { | ||
// this will not be catched | ||
@@ -179,4 +194,4 @@ e.emit({ topic: 'hello/my/world' }) | ||
// this will be catched | ||
e.emit(expected, function() { | ||
e.close(function() { | ||
e.emit(expected, function () { | ||
e.close(function () { | ||
t.pass('closed') | ||
@@ -188,17 +203,17 @@ }) | ||
test('support changing one level wildcard', function(t) { | ||
test('support changing one level wildcard', function (t) { | ||
t.plan(2) | ||
var e = builder({ wildcardOne: '~' }) | ||
, expected = { | ||
topic: 'hello/world' | ||
, payload: { my: 'message' } | ||
} | ||
var expected = { | ||
topic: 'hello/world', | ||
payload: { my: 'message' } | ||
} | ||
e.on('hello/~', function(message, cb) { | ||
e.on('hello/~', function (message, cb) { | ||
t.equal(message.topic, 'hello/world') | ||
cb() | ||
}, function() { | ||
e.emit(expected, function() { | ||
e.close(function() { | ||
}, function () { | ||
e.emit(expected, function () { | ||
e.close(function () { | ||
t.pass('closed') | ||
@@ -210,17 +225,17 @@ }) | ||
test('support deep wildcard', function(t) { | ||
test('support deep wildcard', function (t) { | ||
t.plan(2) | ||
var e = builder() | ||
, expected = { | ||
topic: 'hello/my/world' | ||
, payload: { my: 'message' } | ||
} | ||
var expected = { | ||
topic: 'hello/my/world', | ||
payload: { my: 'message' } | ||
} | ||
e.on('hello/#', function(message, cb) { | ||
e.on('hello/#', function (message, cb) { | ||
t.equal(message.topic, 'hello/my/world') | ||
cb() | ||
}, function() { | ||
e.emit(expected, function() { | ||
e.close(function() { | ||
}, function () { | ||
e.emit(expected, function () { | ||
e.close(function () { | ||
t.pass('closed') | ||
@@ -232,17 +247,17 @@ }) | ||
test('support changing deep wildcard', function(t) { | ||
test('support changing deep wildcard', function (t) { | ||
t.plan(2) | ||
var e = builder({ wildcardSome: '*' }) | ||
, expected = { | ||
topic: 'hello/my/world' | ||
, payload: { my: 'message' } | ||
} | ||
var expected = { | ||
topic: 'hello/my/world', | ||
payload: { my: 'message' } | ||
} | ||
e.on('hello/*', function(message, cb) { | ||
e.on('hello/*', function (message, cb) { | ||
t.equal(message.topic, 'hello/my/world') | ||
cb() | ||
}, function() { | ||
e.emit(expected, function() { | ||
e.close(function() { | ||
}, function () { | ||
e.emit(expected, function () { | ||
e.close(function () { | ||
t.pass('closed') | ||
@@ -254,17 +269,17 @@ }) | ||
test('support changing the level separator', function(t) { | ||
test('support changing the level separator', function (t) { | ||
t.plan(2) | ||
var e = builder({ separator: '~' }) | ||
, expected = { | ||
topic: 'hello~world' | ||
, payload: { my: 'message' } | ||
} | ||
var expected = { | ||
topic: 'hello~world', | ||
payload: { my: 'message' } | ||
} | ||
e.on('hello~+', function(message, cb) { | ||
e.on('hello~+', function (message, cb) { | ||
t.equal(message.topic, 'hello~world') | ||
cb() | ||
}, function() { | ||
e.emit(expected, function() { | ||
e.close(function() { | ||
}, function () { | ||
e.emit(expected, function () { | ||
e.close(function () { | ||
t.pass('closed') | ||
@@ -276,9 +291,9 @@ }) | ||
test('close support', function(t) { | ||
var e = builder() | ||
, check = false | ||
test('close support', function (t) { | ||
var e = builder() | ||
var check = false | ||
t.notOk(e.closed, 'must have a false closed property') | ||
e.close(function() { | ||
e.close(function () { | ||
t.ok(check, 'must delay the close callback') | ||
@@ -292,7 +307,7 @@ t.ok(e.closed, 'must have a true closed property') | ||
test('emit after close errors', function(t) { | ||
test('emit after close errors', function (t) { | ||
var e = builder() | ||
e.close(function() { | ||
e.emit({ topic: 'hello' }, function(err) { | ||
e.close(function () { | ||
e.emit({ topic: 'hello' }, function (err) { | ||
t.ok(err, 'must return an error') | ||
@@ -304,12 +319,12 @@ t.end() | ||
test('support multiple subscribers with wildcards', function(t) { | ||
test('support multiple subscribers with wildcards', function (t) { | ||
var e = builder() | ||
, expected = { | ||
topic: 'hello/world' | ||
, payload: { my: 'message' } | ||
} | ||
, firstCalled = false | ||
, secondCalled = false | ||
var expected = { | ||
topic: 'hello/world', | ||
payload: { my: 'message' } | ||
} | ||
var firstCalled = false | ||
var secondCalled = false | ||
e.on('hello/#', function(message, cb) { | ||
e.on('hello/#', function (message, cb) { | ||
t.notOk(firstCalled, 'first subscriber must only be called once') | ||
@@ -320,9 +335,9 @@ firstCalled = true | ||
e.on('hello/+', function(message, cb) { | ||
e.on('hello/+', function (message, cb) { | ||
t.notOk(secondCalled, 'second subscriber must only be called once') | ||
secondCalled = true | ||
cb() | ||
}, function() { | ||
e.emit(expected, function() { | ||
e.close(function() { | ||
}, function () { | ||
e.emit(expected, function () { | ||
e.close(function () { | ||
t.end() | ||
@@ -334,12 +349,12 @@ }) | ||
test('support multiple subscribers with wildcards (deep)', function(t) { | ||
test('support multiple subscribers with wildcards (deep)', function (t) { | ||
var e = builder() | ||
, expected = { | ||
topic: 'hello/my/world' | ||
, payload: { my: 'message' } | ||
} | ||
, firstCalled = false | ||
, secondCalled = false | ||
var expected = { | ||
topic: 'hello/my/world', | ||
payload: { my: 'message' } | ||
} | ||
var firstCalled = false | ||
var secondCalled = false | ||
e.on('hello/#', function(message, cb) { | ||
e.on('hello/#', function (message, cb) { | ||
t.notOk(firstCalled, 'first subscriber must only be called once') | ||
@@ -350,9 +365,9 @@ firstCalled = true | ||
e.on('hello/+/world', function(message, cb) { | ||
e.on('hello/+/world', function (message, cb) { | ||
t.notOk(secondCalled, 'second subscriber must only be called once') | ||
secondCalled = true | ||
cb() | ||
}, function() { | ||
e.emit(expected, function() { | ||
e.close(function() { | ||
}, function () { | ||
e.emit(expected, function () { | ||
e.close(function () { | ||
t.end() | ||
@@ -359,0 +374,0 @@ }) |
31
bench.js
@@ -0,9 +1,26 @@ | ||
/* | ||
* Copyright (c) 2014-2015, Matteo Collina <hello@matteocollina.com> | ||
* | ||
* Permission to use, copy, modify, and/or distribute this software for any | ||
* purpose with or without fee is hereby granted, provided that the above | ||
* copyright notice and this permission notice appear in all copies. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | ||
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | ||
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | ||
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | ||
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | ||
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR | ||
* IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||
*/ | ||
'use strict'; | ||
var mqemitter = require('./') | ||
, emitter = mqemitter({ concurrency: 10 }) | ||
, total = 1000000 | ||
, written = 0 | ||
, received = 0 | ||
, timerKey = 'time for sending ' + total + ' messages' | ||
var emitter = mqemitter({ concurrency: 10 }) | ||
var total = 1000000 | ||
var written = 0 | ||
var received = 0 | ||
var timerKey = 'time for sending ' + total + ' messages' | ||
function write() { | ||
function write () { | ||
if (written === total) { | ||
@@ -18,3 +35,3 @@ return | ||
emitter.on('hello', function(msg, cb) { | ||
emitter.on('hello', function (msg, cb) { | ||
received++ | ||
@@ -21,0 +38,0 @@ if (received === total) { |
116
mqemitter.js
/* | ||
* Copyright (c) 2014, Matteo Collina <hello@matteocollina.com> | ||
* Copyright (c) 2014-2015, Matteo Collina <hello@matteocollina.com> | ||
* | ||
@@ -16,10 +16,9 @@ * Permission to use, copy, modify, and/or distribute this software for any | ||
*/ | ||
'use strict' | ||
'use strict'; | ||
var Qlobber = require('qlobber').Qlobber | ||
, assert = require('assert') | ||
, nop = function() {} | ||
var assert = require('assert') | ||
var fastparallel = require('fastparallel') | ||
function MQEmitter(opts) { | ||
function MQEmitter (opts) { | ||
if (!(this instanceof MQEmitter)) { | ||
@@ -29,2 +28,4 @@ return new MQEmitter(opts) | ||
var that = this | ||
opts = opts || {} | ||
@@ -38,3 +39,6 @@ | ||
this._messageCallbacks = [] | ||
this._latestReceiver = new CallbackReceiver(this) | ||
this._parallel = fastparallel({ | ||
results: false, | ||
released: released | ||
}) | ||
@@ -45,18 +49,31 @@ this.concurrency = opts.concurrency | ||
this._matcher = new Qlobber({ | ||
separator: opts.separator | ||
, wildcard_one: opts.wildcardOne | ||
, wildcard_some: opts.wildcardSome | ||
separator: opts.separator, | ||
wildcard_one: opts.wildcardOne, | ||
wildcard_some: opts.wildcardSome | ||
}) | ||
this.closed = false | ||
this._released = released | ||
function released () { | ||
var message = that._messageQueue.shift() | ||
var callback = that._messageCallbacks.shift() | ||
if (!message) { | ||
// we are at the end of the queue | ||
that.current-- | ||
} else { | ||
that._do(message, callback) | ||
} | ||
} | ||
} | ||
Object.defineProperty(MQEmitter.prototype, "length", { | ||
get: function() { | ||
return this._messageQueue.length; | ||
Object.defineProperty(MQEmitter.prototype, 'length', { | ||
get: function () { | ||
return this._messageQueue.length | ||
}, | ||
enumerable: true | ||
}); | ||
}) | ||
MQEmitter.prototype.on = function on(topic, notify, done) { | ||
MQEmitter.prototype.on = function on (topic, notify, done) { | ||
assert(topic) | ||
@@ -73,3 +90,3 @@ assert(notify) | ||
MQEmitter.prototype.removeListener = function removeListener(topic, notify, done) { | ||
MQEmitter.prototype.removeListener = function removeListener (topic, notify, done) { | ||
assert(topic) | ||
@@ -86,12 +103,11 @@ assert(notify) | ||
MQEmitter.prototype.emit = function emit(message, cb) { | ||
MQEmitter.prototype.emit = function emit (message, cb) { | ||
assert(message) | ||
if (this.closed) | ||
if (this.closed) { | ||
return cb(new Error('mqemitter is closed')) | ||
} | ||
cb = cb || nop | ||
var receiver = null; | ||
if (this.concurrency > 0 && this.current >= this.concurrency) { | ||
@@ -102,12 +118,3 @@ this._messageQueue.push(message) | ||
this.current++ | ||
receiver = this._latestReceiver | ||
if (this._latestReceiver) { | ||
receiver = this._latestReceiver | ||
this._latestReceiver = null | ||
} else { | ||
receiver = new CallbackReceiver(this); | ||
} | ||
this._do(message, cb, receiver) | ||
this._do(message, cb) | ||
} | ||
@@ -118,3 +125,3 @@ | ||
MQEmitter.prototype.close = function close(cb) { | ||
MQEmitter.prototype.close = function close (cb) { | ||
this.closed = true | ||
@@ -126,54 +133,17 @@ setImmediate(cb) | ||
MQEmitter.prototype._next = function next(receiver) { | ||
var message = this._messageQueue.shift() | ||
, callback = this._messageCallbacks.shift() | ||
if (!message) { | ||
// we are at the end of the queue | ||
this.current-- | ||
this._latestReceiver = receiver | ||
} else { | ||
this._do(message, callback, receiver) | ||
} | ||
return this | ||
} | ||
MQEmitter.prototype._do = function(message, callback, receiver) { | ||
MQEmitter.prototype._do = function (message, callback) { | ||
var matches = this._matcher.match(message.topic) | ||
, i | ||
if (matches.length === 0) { | ||
callback() | ||
return this._next(receiver) | ||
this._released() | ||
} else { | ||
this._parallel(this, matches, message, callback) | ||
} | ||
receiver.num = matches.length | ||
receiver.callback = callback | ||
for (i = 0; i < matches.length; i++) { | ||
matches[i].call(this, message, receiver.counter); | ||
} | ||
return this | ||
} | ||
function CallbackReceiver(mq) { | ||
// these will be initialized by the caller | ||
this.num = -1 | ||
this.callback = null | ||
function nop () {} | ||
var that = this | ||
this.counter = function() { | ||
that.num--; | ||
if (that.num === 0) { | ||
that.callback() | ||
that.callback = nop | ||
mq._next(that) | ||
} | ||
} | ||
} | ||
module.exports = MQEmitter |
{ | ||
"name": "mqemitter", | ||
"version": "0.3.1", | ||
"version": "0.4.0", | ||
"description": "An Opinionated Message Queue with an emitter-style API", | ||
"main": "mqemitter.js", | ||
"scripts": { | ||
"lint": "standard", | ||
"test": "tape test.js | faucet" | ||
}, | ||
"pre-commit": [ | ||
"lint", | ||
"test" | ||
@@ -35,7 +37,9 @@ ], | ||
"pre-commit": "0.0.9", | ||
"standard": "^3.1.0", | ||
"tape": "^2.14.0" | ||
}, | ||
"dependencies": { | ||
"fastparallel": "^1.1.1", | ||
"qlobber": "~0.5.0" | ||
} | ||
} |
@@ -14,4 +14,7 @@ mqemitter [![Build Status](https://travis-ci.org/mcollina/mqemitter.png)](https://travis-ci.org/mcollina/mqemitter) | ||
Do you need a multi process MQEmitter? Check out | ||
[mqemitter-redis](http://github.com/mqemitter-redis). | ||
[mqemitter-redis](http://npm.im/mqemitter-redis) or | ||
[mqemitter-mongodb](http://npm.im/mqemitter-mongodb). | ||
[![js-standard-style](https://raw.githubusercontent.com/feross/standard/master/badge.png)](https://github.com/feross/standard) | ||
<a name="install"></a> | ||
@@ -29,6 +32,6 @@ ## Installation | ||
var mq = require('mqemitter') | ||
, emitter = mq({ concurrency: 5 }) | ||
, message | ||
var emitter = mq({ concurrency: 5 }) | ||
var message | ||
emitter.on('hello world', function(message, cb) { | ||
emitter.on('hello world', function (message, cb) { | ||
// call callback when you are done | ||
@@ -41,3 +44,3 @@ // do not pass any errors, the emitter cannot handle it. | ||
message = { topic: 'hello world', payload: 'or any other fields' } | ||
emitter.emit(message, function() { | ||
emitter.emit(message, function () { | ||
// emitter will never return an error | ||
@@ -165,3 +168,3 @@ }) | ||
Copyright (c) 2014, Matteo Collina <hello@matteocollina.com> | ||
Copyright (c) 2014-2015, Matteo Collina <hello@matteocollina.com> | ||
@@ -168,0 +171,0 @@ Permission to use, copy, modify, and/or distribute this software for any |
57
test.js
@@ -0,37 +1,48 @@ | ||
/* | ||
* Copyright (c) 2014-2015, Matteo Collina <hello@matteocollina.com> | ||
* | ||
* Permission to use, copy, modify, and/or distribute this software for any | ||
* purpose with or without fee is hereby granted, provided that the above | ||
* copyright notice and this permission notice appear in all copies. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | ||
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | ||
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | ||
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | ||
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | ||
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR | ||
* IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||
*/ | ||
'use strict'; | ||
var abstractTest = require('./abstractTest') | ||
, test = require('tape').test | ||
, mq = require('./') | ||
var abstractTest = require('./abstractTest') | ||
var test = require('tape').test | ||
var mq = require('./') | ||
abstractTest({ | ||
builder: mq | ||
, test: require('tape').test | ||
builder: mq, | ||
test: require('tape').test | ||
}) | ||
test('queue concurrency', function(t) { | ||
test('queue concurrency', function (t) { | ||
t.plan(2) | ||
var e = mq({ concurrency: 1 }) | ||
, expected = { | ||
topic: 'hello world' | ||
, payload: { my: 'message' } | ||
} | ||
, completed1 = false | ||
var completed1 = false | ||
t.equal(e.concurrency, 1) | ||
e.on('hello 1', function(message, cb) { | ||
e.on('hello 1', function (message, cb) { | ||
setTimeout(cb, 10) | ||
}) | ||
e.on('hello 2', function(message, cb) { | ||
e.on('hello 2', function (message, cb) { | ||
cb() | ||
}) | ||
start = Date.now() | ||
e.emit({ topic: 'hello 1' }, function() { | ||
e.emit({ topic: 'hello 1' }, function () { | ||
completed1 = true | ||
}) | ||
e.emit({ topic: 'hello 2' }, function() { | ||
e.emit({ topic: 'hello 2' }, function () { | ||
t.ok(completed1, 'the first message must be completed') | ||
@@ -41,12 +52,12 @@ }) | ||
test('without any listeners and a callback', function(t) { | ||
test('without any listeners and a callback', function (t) { | ||
var e = mq() | ||
, expected = { | ||
topic: 'hello world' | ||
, payload: { my: 'message' } | ||
} | ||
var expected = { | ||
topic: 'hello world', | ||
payload: { my: 'message' } | ||
} | ||
e.emit(expected, function() { | ||
e.emit(expected, function () { | ||
t.equal(e.current, 1, 'there 1 message that is being processed') | ||
e.close(function() { | ||
e.close(function () { | ||
t.end() | ||
@@ -53,0 +64,0 @@ }) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
22009
520
178
2
4
+ Addedfastparallel@^1.1.1
+ Addedfastparallel@1.7.2(transitive)
+ Addedreusify@1.0.4(transitive)
+ Addedxtend@4.0.2(transitive)