Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mqemitter

Package Overview
Dependencies
Maintainers
1
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mqemitter - npm Package Compare versions

Comparing version 0.3.1 to 0.4.0

287

abstractTest.js

@@ -0,16 +1,32 @@

/*
* Copyright (c) 2014-2015, Matteo Collina <hello@matteocollina.com>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR
* IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
'use strict';
function buildTests(opts) {
function buildTests (opts) {
var builder = opts.builder
, test = opts.test
var test = opts.test
test('support on and emit', function(t) {
test('support on and emit', function (t) {
t.plan(4)
var e = builder()
, expected = {
topic: 'hello world'
, payload: { my: 'message' }
}
var expected = {
topic: 'hello world',
payload: { my: 'message' }
}
e.on('hello world', function(message, cb) {
e.on('hello world', function (message, cb) {
t.equal(e.current, 1, 'number of current messages')

@@ -20,5 +36,5 @@ t.deepEqual(message, expected)

cb()
}, function() {
e.emit(expected, function() {
e.close(function() {
}, function () {
e.emit(expected, function () {
e.close(function () {
t.pass('closed')

@@ -30,21 +46,21 @@ })

test('support multiple subscribers', function(t) {
test('support multiple subscribers', function (t) {
t.plan(3)
var e = builder()
, expected = {
topic: 'hello world'
, payload: { my: 'message' }
}
var expected = {
topic: 'hello world',
payload: { my: 'message' }
}
e.on('hello world', function(message, cb) {
e.on('hello world', function (message, cb) {
t.ok(message, 'message received')
cb()
}, function() {
e.on('hello world', function(message, cb) {
}, function () {
e.on('hello world', function (message, cb) {
t.ok(message, 'message received')
cb()
}, function() {
e.emit(expected, function() {
e.close(function() {
}, function () {
e.emit(expected, function () {
e.close(function () {
t.pass('closed')

@@ -57,12 +73,12 @@ })

test('support multiple subscribers and unsubscribers', function(t) {
test('support multiple subscribers and unsubscribers', function (t) {
t.plan(2)
var e = builder()
, expected = {
topic: 'hello world'
, payload: { my: 'message' }
}
var expected = {
topic: 'hello world',
payload: { my: 'message' }
}
function first(message, cb) {
function first (message, cb) {
t.fail('first listener should not receive any events')

@@ -72,6 +88,6 @@ cb()

function second(message, cb) {
function second (message, cb) {
t.ok(message, 'second listener must receive the message')
cb()
e.close(function() {
e.close(function () {
t.pass('closed')

@@ -81,5 +97,5 @@ })

e.on('hello world', first, function() {
e.on('hello world', second, function() {
e.removeListener('hello world', first, function() {
e.on('hello world', first, function () {
e.on('hello world', second, function () {
e.removeListener('hello world', first, function () {
e.emit(expected)

@@ -91,14 +107,13 @@ })

test('removeListener', function(t) {
test('removeListener', function (t) {
t.plan(1)
var e = builder()
, expected = {
topic: 'hello world'
, payload: { my: 'message' }
}
, toRemoveCalled = false
var expected = {
topic: 'hello world',
payload: { my: 'message' }
}
var toRemoveCalled = false
function toRemove(message, cb) {
function toRemove (message, cb) {
toRemoveCalled = true

@@ -108,9 +123,9 @@ cb()

e.on('hello world', function(message, cb) {
e.on('hello world', function (message, cb) {
cb()
}, function() {
e.on('hello world', toRemove, function() {
e.removeListener('hello world', toRemove, function() {
e.emit(expected, function() {
e.close(function() {
}, function () {
e.on('hello world', toRemove, function () {
e.removeListener('hello world', toRemove, function () {
e.emit(expected, function () {
e.close(function () {
t.notOk(toRemoveCalled, 'the toRemove function must not be called')

@@ -124,14 +139,14 @@ })

test('without a callback on emit and on', function(t) {
test('without a callback on emit and on', function (t) {
t.plan(1)
var e = builder()
, expected = {
topic: 'hello world'
, payload: { my: 'message' }
}
var expected = {
topic: 'hello world',
payload: { my: 'message' }
}
e.on('hello world', function(message, cb) {
e.on('hello world', function (message, cb) {
cb()
e.close(function() {
e.close(function () {
t.pass('closed')

@@ -144,14 +159,14 @@ })

test('without any listeners', function(t) {
test('without any listeners', function (t) {
t.plan(2)
var e = builder()
, expected = {
topic: 'hello world'
, payload: { my: 'message' }
}
var expected = {
topic: 'hello world',
payload: { my: 'message' }
}
e.emit(expected)
t.equal(e.current, 0, 'reset the current messages trackers')
e.close(function() {
e.close(function () {
t.pass('closed')

@@ -161,15 +176,15 @@ })

test('support one level wildcard', function(t) {
test('support one level wildcard', function (t) {
t.plan(2)
var e = builder()
, expected = {
topic: 'hello/world'
, payload: { my: 'message' }
}
var expected = {
topic: 'hello/world',
payload: { my: 'message' }
}
e.on('hello/+', function(message, cb) {
e.on('hello/+', function (message, cb) {
t.equal(message.topic, 'hello/world')
cb()
}, function() {
}, function () {
// this will not be catched

@@ -179,4 +194,4 @@ e.emit({ topic: 'hello/my/world' })

// this will be catched
e.emit(expected, function() {
e.close(function() {
e.emit(expected, function () {
e.close(function () {
t.pass('closed')

@@ -188,17 +203,17 @@ })

test('support changing one level wildcard', function(t) {
test('support changing one level wildcard', function (t) {
t.plan(2)
var e = builder({ wildcardOne: '~' })
, expected = {
topic: 'hello/world'
, payload: { my: 'message' }
}
var expected = {
topic: 'hello/world',
payload: { my: 'message' }
}
e.on('hello/~', function(message, cb) {
e.on('hello/~', function (message, cb) {
t.equal(message.topic, 'hello/world')
cb()
}, function() {
e.emit(expected, function() {
e.close(function() {
}, function () {
e.emit(expected, function () {
e.close(function () {
t.pass('closed')

@@ -210,17 +225,17 @@ })

test('support deep wildcard', function(t) {
test('support deep wildcard', function (t) {
t.plan(2)
var e = builder()
, expected = {
topic: 'hello/my/world'
, payload: { my: 'message' }
}
var expected = {
topic: 'hello/my/world',
payload: { my: 'message' }
}
e.on('hello/#', function(message, cb) {
e.on('hello/#', function (message, cb) {
t.equal(message.topic, 'hello/my/world')
cb()
}, function() {
e.emit(expected, function() {
e.close(function() {
}, function () {
e.emit(expected, function () {
e.close(function () {
t.pass('closed')

@@ -232,17 +247,17 @@ })

test('support changing deep wildcard', function(t) {
test('support changing deep wildcard', function (t) {
t.plan(2)
var e = builder({ wildcardSome: '*' })
, expected = {
topic: 'hello/my/world'
, payload: { my: 'message' }
}
var expected = {
topic: 'hello/my/world',
payload: { my: 'message' }
}
e.on('hello/*', function(message, cb) {
e.on('hello/*', function (message, cb) {
t.equal(message.topic, 'hello/my/world')
cb()
}, function() {
e.emit(expected, function() {
e.close(function() {
}, function () {
e.emit(expected, function () {
e.close(function () {
t.pass('closed')

@@ -254,17 +269,17 @@ })

test('support changing the level separator', function(t) {
test('support changing the level separator', function (t) {
t.plan(2)
var e = builder({ separator: '~' })
, expected = {
topic: 'hello~world'
, payload: { my: 'message' }
}
var expected = {
topic: 'hello~world',
payload: { my: 'message' }
}
e.on('hello~+', function(message, cb) {
e.on('hello~+', function (message, cb) {
t.equal(message.topic, 'hello~world')
cb()
}, function() {
e.emit(expected, function() {
e.close(function() {
}, function () {
e.emit(expected, function () {
e.close(function () {
t.pass('closed')

@@ -276,9 +291,9 @@ })

test('close support', function(t) {
var e = builder()
, check = false
test('close support', function (t) {
var e = builder()
var check = false
t.notOk(e.closed, 'must have a false closed property')
e.close(function() {
e.close(function () {
t.ok(check, 'must delay the close callback')

@@ -292,7 +307,7 @@ t.ok(e.closed, 'must have a true closed property')

test('emit after close errors', function(t) {
test('emit after close errors', function (t) {
var e = builder()
e.close(function() {
e.emit({ topic: 'hello' }, function(err) {
e.close(function () {
e.emit({ topic: 'hello' }, function (err) {
t.ok(err, 'must return an error')

@@ -304,12 +319,12 @@ t.end()

test('support multiple subscribers with wildcards', function(t) {
test('support multiple subscribers with wildcards', function (t) {
var e = builder()
, expected = {
topic: 'hello/world'
, payload: { my: 'message' }
}
, firstCalled = false
, secondCalled = false
var expected = {
topic: 'hello/world',
payload: { my: 'message' }
}
var firstCalled = false
var secondCalled = false
e.on('hello/#', function(message, cb) {
e.on('hello/#', function (message, cb) {
t.notOk(firstCalled, 'first subscriber must only be called once')

@@ -320,9 +335,9 @@ firstCalled = true

e.on('hello/+', function(message, cb) {
e.on('hello/+', function (message, cb) {
t.notOk(secondCalled, 'second subscriber must only be called once')
secondCalled = true
cb()
}, function() {
e.emit(expected, function() {
e.close(function() {
}, function () {
e.emit(expected, function () {
e.close(function () {
t.end()

@@ -334,12 +349,12 @@ })

test('support multiple subscribers with wildcards (deep)', function(t) {
test('support multiple subscribers with wildcards (deep)', function (t) {
var e = builder()
, expected = {
topic: 'hello/my/world'
, payload: { my: 'message' }
}
, firstCalled = false
, secondCalled = false
var expected = {
topic: 'hello/my/world',
payload: { my: 'message' }
}
var firstCalled = false
var secondCalled = false
e.on('hello/#', function(message, cb) {
e.on('hello/#', function (message, cb) {
t.notOk(firstCalled, 'first subscriber must only be called once')

@@ -350,9 +365,9 @@ firstCalled = true

e.on('hello/+/world', function(message, cb) {
e.on('hello/+/world', function (message, cb) {
t.notOk(secondCalled, 'second subscriber must only be called once')
secondCalled = true
cb()
}, function() {
e.emit(expected, function() {
e.close(function() {
}, function () {
e.emit(expected, function () {
e.close(function () {
t.end()

@@ -359,0 +374,0 @@ })

@@ -0,9 +1,26 @@

/*
* Copyright (c) 2014-2015, Matteo Collina <hello@matteocollina.com>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR
* IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
'use strict';
var mqemitter = require('./')
, emitter = mqemitter({ concurrency: 10 })
, total = 1000000
, written = 0
, received = 0
, timerKey = 'time for sending ' + total + ' messages'
var emitter = mqemitter({ concurrency: 10 })
var total = 1000000
var written = 0
var received = 0
var timerKey = 'time for sending ' + total + ' messages'
function write() {
function write () {
if (written === total) {

@@ -18,3 +35,3 @@ return

emitter.on('hello', function(msg, cb) {
emitter.on('hello', function (msg, cb) {
received++

@@ -21,0 +38,0 @@ if (received === total) {

/*
* Copyright (c) 2014, Matteo Collina <hello@matteocollina.com>
* Copyright (c) 2014-2015, Matteo Collina <hello@matteocollina.com>
*

@@ -16,10 +16,9 @@ * Permission to use, copy, modify, and/or distribute this software for any

*/
'use strict'
'use strict';
var Qlobber = require('qlobber').Qlobber
, assert = require('assert')
, nop = function() {}
var assert = require('assert')
var fastparallel = require('fastparallel')
function MQEmitter(opts) {
function MQEmitter (opts) {
if (!(this instanceof MQEmitter)) {

@@ -29,2 +28,4 @@ return new MQEmitter(opts)

var that = this
opts = opts || {}

@@ -38,3 +39,6 @@

this._messageCallbacks = []
this._latestReceiver = new CallbackReceiver(this)
this._parallel = fastparallel({
results: false,
released: released
})

@@ -45,18 +49,31 @@ this.concurrency = opts.concurrency

this._matcher = new Qlobber({
separator: opts.separator
, wildcard_one: opts.wildcardOne
, wildcard_some: opts.wildcardSome
separator: opts.separator,
wildcard_one: opts.wildcardOne,
wildcard_some: opts.wildcardSome
})
this.closed = false
this._released = released
function released () {
var message = that._messageQueue.shift()
var callback = that._messageCallbacks.shift()
if (!message) {
// we are at the end of the queue
that.current--
} else {
that._do(message, callback)
}
}
}
Object.defineProperty(MQEmitter.prototype, "length", {
get: function() {
return this._messageQueue.length;
Object.defineProperty(MQEmitter.prototype, 'length', {
get: function () {
return this._messageQueue.length
},
enumerable: true
});
})
MQEmitter.prototype.on = function on(topic, notify, done) {
MQEmitter.prototype.on = function on (topic, notify, done) {
assert(topic)

@@ -73,3 +90,3 @@ assert(notify)

MQEmitter.prototype.removeListener = function removeListener(topic, notify, done) {
MQEmitter.prototype.removeListener = function removeListener (topic, notify, done) {
assert(topic)

@@ -86,12 +103,11 @@ assert(notify)

MQEmitter.prototype.emit = function emit(message, cb) {
MQEmitter.prototype.emit = function emit (message, cb) {
assert(message)
if (this.closed)
if (this.closed) {
return cb(new Error('mqemitter is closed'))
}
cb = cb || nop
var receiver = null;
if (this.concurrency > 0 && this.current >= this.concurrency) {

@@ -102,12 +118,3 @@ this._messageQueue.push(message)

this.current++
receiver = this._latestReceiver
if (this._latestReceiver) {
receiver = this._latestReceiver
this._latestReceiver = null
} else {
receiver = new CallbackReceiver(this);
}
this._do(message, cb, receiver)
this._do(message, cb)
}

@@ -118,3 +125,3 @@

MQEmitter.prototype.close = function close(cb) {
MQEmitter.prototype.close = function close (cb) {
this.closed = true

@@ -126,54 +133,17 @@ setImmediate(cb)

MQEmitter.prototype._next = function next(receiver) {
var message = this._messageQueue.shift()
, callback = this._messageCallbacks.shift()
if (!message) {
// we are at the end of the queue
this.current--
this._latestReceiver = receiver
} else {
this._do(message, callback, receiver)
}
return this
}
MQEmitter.prototype._do = function(message, callback, receiver) {
MQEmitter.prototype._do = function (message, callback) {
var matches = this._matcher.match(message.topic)
, i
if (matches.length === 0) {
callback()
return this._next(receiver)
this._released()
} else {
this._parallel(this, matches, message, callback)
}
receiver.num = matches.length
receiver.callback = callback
for (i = 0; i < matches.length; i++) {
matches[i].call(this, message, receiver.counter);
}
return this
}
function CallbackReceiver(mq) {
// these will be initialized by the caller
this.num = -1
this.callback = null
function nop () {}
var that = this
this.counter = function() {
that.num--;
if (that.num === 0) {
that.callback()
that.callback = nop
mq._next(that)
}
}
}
module.exports = MQEmitter
{
"name": "mqemitter",
"version": "0.3.1",
"version": "0.4.0",
"description": "An Opinionated Message Queue with an emitter-style API",
"main": "mqemitter.js",
"scripts": {
"lint": "standard",
"test": "tape test.js | faucet"
},
"pre-commit": [
"lint",
"test"

@@ -35,7 +37,9 @@ ],

"pre-commit": "0.0.9",
"standard": "^3.1.0",
"tape": "^2.14.0"
},
"dependencies": {
"fastparallel": "^1.1.1",
"qlobber": "~0.5.0"
}
}

@@ -14,4 +14,7 @@ mqemitter&nbsp;&nbsp;[![Build Status](https://travis-ci.org/mcollina/mqemitter.png)](https://travis-ci.org/mcollina/mqemitter)

Do you need a multi process MQEmitter? Check out
[mqemitter-redis](http://github.com/mqemitter-redis).
[mqemitter-redis](http://npm.im/mqemitter-redis) or
[mqemitter-mongodb](http://npm.im/mqemitter-mongodb).
[![js-standard-style](https://raw.githubusercontent.com/feross/standard/master/badge.png)](https://github.com/feross/standard)
<a name="install"></a>

@@ -29,6 +32,6 @@ ## Installation

var mq = require('mqemitter')
, emitter = mq({ concurrency: 5 })
, message
var emitter = mq({ concurrency: 5 })
var message
emitter.on('hello world', function(message, cb) {
emitter.on('hello world', function (message, cb) {
// call callback when you are done

@@ -41,3 +44,3 @@ // do not pass any errors, the emitter cannot handle it.

message = { topic: 'hello world', payload: 'or any other fields' }
emitter.emit(message, function() {
emitter.emit(message, function () {
// emitter will never return an error

@@ -165,3 +168,3 @@ })

Copyright (c) 2014, Matteo Collina <hello@matteocollina.com>
Copyright (c) 2014-2015, Matteo Collina <hello@matteocollina.com>

@@ -168,0 +171,0 @@ Permission to use, copy, modify, and/or distribute this software for any

@@ -0,37 +1,48 @@

/*
* Copyright (c) 2014-2015, Matteo Collina <hello@matteocollina.com>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR
* IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
'use strict';
var abstractTest = require('./abstractTest')
, test = require('tape').test
, mq = require('./')
var abstractTest = require('./abstractTest')
var test = require('tape').test
var mq = require('./')
abstractTest({
builder: mq
, test: require('tape').test
builder: mq,
test: require('tape').test
})
test('queue concurrency', function(t) {
test('queue concurrency', function (t) {
t.plan(2)
var e = mq({ concurrency: 1 })
, expected = {
topic: 'hello world'
, payload: { my: 'message' }
}
, completed1 = false
var completed1 = false
t.equal(e.concurrency, 1)
e.on('hello 1', function(message, cb) {
e.on('hello 1', function (message, cb) {
setTimeout(cb, 10)
})
e.on('hello 2', function(message, cb) {
e.on('hello 2', function (message, cb) {
cb()
})
start = Date.now()
e.emit({ topic: 'hello 1' }, function() {
e.emit({ topic: 'hello 1' }, function () {
completed1 = true
})
e.emit({ topic: 'hello 2' }, function() {
e.emit({ topic: 'hello 2' }, function () {
t.ok(completed1, 'the first message must be completed')

@@ -41,12 +52,12 @@ })

test('without any listeners and a callback', function(t) {
test('without any listeners and a callback', function (t) {
var e = mq()
, expected = {
topic: 'hello world'
, payload: { my: 'message' }
}
var expected = {
topic: 'hello world',
payload: { my: 'message' }
}
e.emit(expected, function() {
e.emit(expected, function () {
t.equal(e.current, 1, 'there 1 message that is being processed')
e.close(function() {
e.close(function () {
t.end()

@@ -53,0 +64,0 @@ })

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc