🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
Book a DemoInstallSign in
Socket

mqemitter

Package Overview
Dependencies
Maintainers
1
Versions
33
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mqemitter - npm Package Compare versions

Comparing version

to
0.2.0

35

index.js

@@ -30,11 +30,26 @@ /*

opts.wildcardOne = opts.wildcardOne || '+'
opts.wildcardSome = opts.wildcardSome || '#'
opts.separator = opts.separator || '/'
this._messageQueue = []
this._messageCallbacks = []
this.concurrency = opts.concurrency || opts.maxlength || 0
this.maxlength = opts.maxlength || 0
this.concurrency = opts.concurrency
this.current = 0
this._matcher = new Qlobber()
this._matcher = new Qlobber({
separator: opts.separator
, wildcard_one: opts.wildcardOne
, wildcard_some: opts.wildcardSome
})
}
Object.defineProperty(MQEmitter.prototype, "length", {
get: function() {
return this._messageQueue.length;
},
enumerable: true
});
MQEmitter.prototype.on = function on(topic, notify) {

@@ -62,7 +77,2 @@ assert(topic)

if (this.concurrency > 0 && this.current >= this.concurrency) {
if (this.maxlength > 0 && this._messageQueue.length === this.maxlength) {
return cb(new Error('Max queue length reached'))
}
this._messageQueue.push(message)

@@ -94,3 +104,2 @@ this._messageCallbacks.push(cb)

var matches = this._matcher.match(message.topic)
, match
, i

@@ -107,9 +116,3 @@

for (i = 0; i < matches.length; i++) {
match = matches[i]
if (match.length === 1) {
match(receiver.counter);
} else {
match(message, receiver.counter);
}
matches[i].call(this, message, receiver.counter);
}

@@ -116,0 +119,0 @@

{
"name": "mqemitter",
"version": "0.1.2",
"version": "0.2.0",
"description": "An Opinionated Message Queue with an emitter-style API",

@@ -5,0 +5,0 @@ "main": "index.js",

mqemitter&nbsp;&nbsp;[![Build Status](https://travis-ci.org/mcollina/mqemitter.png)](https://travis-ci.org/mcollina/mqemitter)
=================================================================
An Opinionated Message Queue with an emitter-style API, but with
callbacks.
An Opinionated Message Queue with an emitter-style API
* <a href="#install">Installation</a>

@@ -26,3 +25,3 @@ * <a href="#basic">Basic Example</a>

var mq = require('mqemitter')
, emitter = mq({ concurrency: 5, maxlength: 42 })
, emitter = mq({ concurrency: 5 })
, message

@@ -38,4 +37,4 @@

message = { topic: 'hello world', payload: 'or any other fields' }
emitter.emit(message, function(err) {
// we can have an err if we enqueued too many messages
emitter.emit(message, function() {
// emitter will never return an error
})

@@ -62,14 +61,15 @@ ```

on concurrent delivery.
- `maxlength`: the maximum number of messages that can be enqueued if
there is it has reached maximum concurrency.
- `wildcardOne`: the char that will match one level wildcards.
- `wildcardSome`: that char that will match multiple level wildcards.
- `separator`: the separator for the different levels.
For more information on wildcards, see [this explanation](#wildcards) or
[Qlobber](https://github.com/davedoesdev/qlobber).
-------------------------------------------------------
<a name="emit"></a>
### emitter.emit(message, callback(err))
### emitter.emit(message, callback())
Emit the given message, which must have a `topic` property, which can contain wildcards
as defined by [QLobber](https://github.com/davedoesdev/qlobber).
The `callback`, accept only one parameter, the possible `Error` object.
This situation might happen if the message cannot be enqueued, i.e.
`maxlength` has been reached.
as defined on creation.

@@ -81,3 +81,3 @@ -------------------------------------------------------

Add the given callback to the passed topic. Topic can contain wildcards,
as defined by [QLobber](https://github.com/davedoesdev/qlobber).
as defined on creation.
The `callback`, accept two parameters, the passed message and a `done`

@@ -95,2 +95,58 @@ callback.

<a name="wildcards"></a>
## Wildcards
__MQEmitter__ supports the use of wildcards: every topic is splitted
according to `separator` (default `/`).
The wildcard character `+` matches exactly one word:
```javascript
var mq = require('mqemitter')
, emitter = mq()
emitter.on('hello/+/world', function(message, cb) {
// this will print { topic: 'hello/my/world', 'something': 'more' }
console.log(message)
cb()
})
emitter.on('hello/+', function(message, cb) {
// this will not be called
console.log(message)
cb()
})
emitter.emit({ topic: 'hello/my/world', something: 'more' })
```
The wildcard character `#` matches zero or more words:
```javascript
var mq = require('mqemitter')
, emitter = mq()
emitter.on('hello/#', function(message, cb) {
// this will print { topic: 'hello/my/world', 'something': 'more' }
console.log(message)
cb()
})
emitter.on('#', function(message, cb) {
// this will print { topic: 'hello/my/world', 'something': 'more' }
console.log(message)
cb()
})
emitter.on('hello/my/world/#', function(message, cb) {
// this will print { topic: 'hello/my/world', 'something': 'more' }
console.log(message)
cb()
})
emitter.emit({ topic: 'hello/my/world', something: 'more' })
```
Of course, you can mix `#` and `+` in the same subscription.
## LICENSE

@@ -97,0 +153,0 @@

@@ -6,3 +6,3 @@

test('support on and emit', function(t) {
t.plan(2)
t.plan(3)

@@ -18,2 +18,3 @@ var e = mq()

t.equal(message, expected)
t.equal(this, e)
cb()

@@ -51,42 +52,4 @@ })

test('support wildcards', function(t) {
t.plan(1)
var e = mq()
, expected = {
topic: 'hello.world'
, payload: { my: 'message' }
}
e.on('hello.*', function(message, cb) {
t.equal(message.topic, 'hello.world')
cb()
})
e.emit(expected, function() {
t.end()
})
})
test('support only one on argument', function(t) {
t.plan(1)
var e = mq()
, expected = {
topic: 'hello world'
, payload: { my: 'message' }
}
e.on('hello world', function(cb) {
t.ok(true)
cb()
})
e.emit(expected, function() {
t.end()
})
})
test('queue concurrency', function(t) {
t.plan(3)
t.plan(4)

@@ -116,2 +79,3 @@ var e = mq({ concurrency: 1 })

t.ok(intermediate - start >= 5, 'min 5 ms between start and intermediate')
t.equal(e.length, 1)
})

@@ -126,26 +90,2 @@

test('queue maxlength', function(t) {
t.plan(2)
var e = mq({ maxlength: 1 })
, expected = {
topic: 'hello world'
, payload: { my: 'message' }
}
e.on('hello 1', function(message, cb) {
// fakely do not callback
})
start = Date.now()
e.emit({ topic: 'hello 1' }, function() {})
e.emit({ topic: 'hello 2' }, function() {})
e.emit({ topic: 'hello 3' }, function(err) {
t.ok(err, 'should return an error object')
t.equal(err.message, 'Max queue length reached')
t.end()
})
})
test('removeListener', function(t) {

@@ -215,1 +155,94 @@ var e = mq()

})
test('support one level wildcard', function(t) {
t.plan(1)
var e = mq()
, expected = {
topic: 'hello/world'
, payload: { my: 'message' }
}
e.on('hello/+', function(message, cb) {
t.equal(message.topic, 'hello/world')
cb()
})
// this will not be catched
e.emit({ topic: 'hello/my/world' })
// this will be catched
e.emit(expected)
})
test('support changing one level wildcard', function(t) {
t.plan(1)
var e = mq({ wildcardOne: '~' })
, expected = {
topic: 'hello/world'
, payload: { my: 'message' }
}
e.on('hello/~', function(message, cb) {
t.equal(message.topic, 'hello/world')
cb()
})
e.emit(expected, function() {
t.end()
})
})
test('support deep wildcard', function(t) {
t.plan(1)
var e = mq()
, expected = {
topic: 'hello/my/world'
, payload: { my: 'message' }
}
e.on('hello/#', function(message, cb) {
t.equal(message.topic, 'hello/my/world')
cb()
})
e.emit(expected)
})
test('support changing deep wildcard', function(t) {
t.plan(1)
var e = mq({ wildcardSome: '*' })
, expected = {
topic: 'hello/my/world'
, payload: { my: 'message' }
}
e.on('hello/*', function(message, cb) {
t.equal(message.topic, 'hello/my/world')
cb()
})
e.emit(expected)
})
test('support changing the level separator', function(t) {
t.plan(1)
var e = mq({ separator: '~' })
, expected = {
topic: 'hello~world'
, payload: { my: 'message' }
}
e.on('hello~+', function(message, cb) {
t.equal(message.topic, 'hello~world')
cb()
})
e.emit(expected, function() {
t.end()
})
})

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet