Comparing version 0.1.2 to 0.2.0
35
index.js
@@ -30,11 +30,26 @@ /* | ||
opts.wildcardOne = opts.wildcardOne || '+' | ||
opts.wildcardSome = opts.wildcardSome || '#' | ||
opts.separator = opts.separator || '/' | ||
this._messageQueue = [] | ||
this._messageCallbacks = [] | ||
this.concurrency = opts.concurrency || opts.maxlength || 0 | ||
this.maxlength = opts.maxlength || 0 | ||
this.concurrency = opts.concurrency | ||
this.current = 0 | ||
this._matcher = new Qlobber() | ||
this._matcher = new Qlobber({ | ||
separator: opts.separator | ||
, wildcard_one: opts.wildcardOne | ||
, wildcard_some: opts.wildcardSome | ||
}) | ||
} | ||
Object.defineProperty(MQEmitter.prototype, "length", { | ||
get: function() { | ||
return this._messageQueue.length; | ||
}, | ||
enumerable: true | ||
}); | ||
MQEmitter.prototype.on = function on(topic, notify) { | ||
@@ -62,7 +77,2 @@ assert(topic) | ||
if (this.concurrency > 0 && this.current >= this.concurrency) { | ||
if (this.maxlength > 0 && this._messageQueue.length === this.maxlength) { | ||
return cb(new Error('Max queue length reached')) | ||
} | ||
this._messageQueue.push(message) | ||
@@ -94,3 +104,2 @@ this._messageCallbacks.push(cb) | ||
var matches = this._matcher.match(message.topic) | ||
, match | ||
, i | ||
@@ -107,9 +116,3 @@ | ||
for (i = 0; i < matches.length; i++) { | ||
match = matches[i] | ||
if (match.length === 1) { | ||
match(receiver.counter); | ||
} else { | ||
match(message, receiver.counter); | ||
} | ||
matches[i].call(this, message, receiver.counter); | ||
} | ||
@@ -116,0 +119,0 @@ |
{ | ||
"name": "mqemitter", | ||
"version": "0.1.2", | ||
"version": "0.2.0", | ||
"description": "An Opinionated Message Queue with an emitter-style API", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
mqemitter [![Build Status](https://travis-ci.org/mcollina/mqemitter.png)](https://travis-ci.org/mcollina/mqemitter) | ||
================================================================= | ||
An Opinionated Message Queue with an emitter-style API, but with | ||
callbacks. | ||
An Opinionated Message Queue with an emitter-style API | ||
* <a href="#install">Installation</a> | ||
@@ -26,3 +25,3 @@ * <a href="#basic">Basic Example</a> | ||
var mq = require('mqemitter') | ||
, emitter = mq({ concurrency: 5, maxlength: 42 }) | ||
, emitter = mq({ concurrency: 5 }) | ||
, message | ||
@@ -38,4 +37,4 @@ | ||
message = { topic: 'hello world', payload: 'or any other fields' } | ||
emitter.emit(message, function(err) { | ||
// we can have an err if we enqueued too many messages | ||
emitter.emit(message, function() { | ||
// emitter will never return an error | ||
}) | ||
@@ -62,14 +61,15 @@ ``` | ||
on concurrent delivery. | ||
- `maxlength`: the maximum number of messages that can be enqueued if | ||
there is it has reached maximum concurrency. | ||
- `wildcardOne`: the char that will match one level wildcards. | ||
- `wildcardSome`: that char that will match multiple level wildcards. | ||
- `separator`: the separator for the different levels. | ||
For more information on wildcards, see [this explanation](#wildcards) or | ||
[Qlobber](https://github.com/davedoesdev/qlobber). | ||
------------------------------------------------------- | ||
<a name="emit"></a> | ||
### emitter.emit(message, callback(err)) | ||
### emitter.emit(message, callback()) | ||
Emit the given message, which must have a `topic` property, which can contain wildcards | ||
as defined by [QLobber](https://github.com/davedoesdev/qlobber). | ||
The `callback`, accept only one parameter, the possible `Error` object. | ||
This situation might happen if the message cannot be enqueued, i.e. | ||
`maxlength` has been reached. | ||
as defined on creation. | ||
@@ -81,3 +81,3 @@ ------------------------------------------------------- | ||
Add the given callback to the passed topic. Topic can contain wildcards, | ||
as defined by [QLobber](https://github.com/davedoesdev/qlobber). | ||
as defined on creation. | ||
The `callback`, accept two parameters, the passed message and a `done` | ||
@@ -95,2 +95,58 @@ callback. | ||
<a name="wildcards"></a> | ||
## Wildcards | ||
__MQEmitter__ supports the use of wildcards: every topic is splitted | ||
according to `separator` (default `/`). | ||
The wildcard character `+` matches exactly one word: | ||
```javascript | ||
var mq = require('mqemitter') | ||
, emitter = mq() | ||
emitter.on('hello/+/world', function(message, cb) { | ||
// this will print { topic: 'hello/my/world', 'something': 'more' } | ||
console.log(message) | ||
cb() | ||
}) | ||
emitter.on('hello/+', function(message, cb) { | ||
// this will not be called | ||
console.log(message) | ||
cb() | ||
}) | ||
emitter.emit({ topic: 'hello/my/world', something: 'more' }) | ||
``` | ||
The wildcard character `#` matches zero or more words: | ||
```javascript | ||
var mq = require('mqemitter') | ||
, emitter = mq() | ||
emitter.on('hello/#', function(message, cb) { | ||
// this will print { topic: 'hello/my/world', 'something': 'more' } | ||
console.log(message) | ||
cb() | ||
}) | ||
emitter.on('#', function(message, cb) { | ||
// this will print { topic: 'hello/my/world', 'something': 'more' } | ||
console.log(message) | ||
cb() | ||
}) | ||
emitter.on('hello/my/world/#', function(message, cb) { | ||
// this will print { topic: 'hello/my/world', 'something': 'more' } | ||
console.log(message) | ||
cb() | ||
}) | ||
emitter.emit({ topic: 'hello/my/world', something: 'more' }) | ||
``` | ||
Of course, you can mix `#` and `+` in the same subscription. | ||
## LICENSE | ||
@@ -97,0 +153,0 @@ |
161
test.js
@@ -6,3 +6,3 @@ | ||
test('support on and emit', function(t) { | ||
t.plan(2) | ||
t.plan(3) | ||
@@ -18,2 +18,3 @@ var e = mq() | ||
t.equal(message, expected) | ||
t.equal(this, e) | ||
cb() | ||
@@ -51,42 +52,4 @@ }) | ||
test('support wildcards', function(t) { | ||
t.plan(1) | ||
var e = mq() | ||
, expected = { | ||
topic: 'hello.world' | ||
, payload: { my: 'message' } | ||
} | ||
e.on('hello.*', function(message, cb) { | ||
t.equal(message.topic, 'hello.world') | ||
cb() | ||
}) | ||
e.emit(expected, function() { | ||
t.end() | ||
}) | ||
}) | ||
test('support only one on argument', function(t) { | ||
t.plan(1) | ||
var e = mq() | ||
, expected = { | ||
topic: 'hello world' | ||
, payload: { my: 'message' } | ||
} | ||
e.on('hello world', function(cb) { | ||
t.ok(true) | ||
cb() | ||
}) | ||
e.emit(expected, function() { | ||
t.end() | ||
}) | ||
}) | ||
test('queue concurrency', function(t) { | ||
t.plan(3) | ||
t.plan(4) | ||
@@ -116,2 +79,3 @@ var e = mq({ concurrency: 1 }) | ||
t.ok(intermediate - start >= 5, 'min 5 ms between start and intermediate') | ||
t.equal(e.length, 1) | ||
}) | ||
@@ -126,26 +90,2 @@ | ||
test('queue maxlength', function(t) { | ||
t.plan(2) | ||
var e = mq({ maxlength: 1 }) | ||
, expected = { | ||
topic: 'hello world' | ||
, payload: { my: 'message' } | ||
} | ||
e.on('hello 1', function(message, cb) { | ||
// fakely do not callback | ||
}) | ||
start = Date.now() | ||
e.emit({ topic: 'hello 1' }, function() {}) | ||
e.emit({ topic: 'hello 2' }, function() {}) | ||
e.emit({ topic: 'hello 3' }, function(err) { | ||
t.ok(err, 'should return an error object') | ||
t.equal(err.message, 'Max queue length reached') | ||
t.end() | ||
}) | ||
}) | ||
test('removeListener', function(t) { | ||
@@ -215,1 +155,94 @@ var e = mq() | ||
}) | ||
test('support one level wildcard', function(t) { | ||
t.plan(1) | ||
var e = mq() | ||
, expected = { | ||
topic: 'hello/world' | ||
, payload: { my: 'message' } | ||
} | ||
e.on('hello/+', function(message, cb) { | ||
t.equal(message.topic, 'hello/world') | ||
cb() | ||
}) | ||
// this will not be catched | ||
e.emit({ topic: 'hello/my/world' }) | ||
// this will be catched | ||
e.emit(expected) | ||
}) | ||
test('support changing one level wildcard', function(t) { | ||
t.plan(1) | ||
var e = mq({ wildcardOne: '~' }) | ||
, expected = { | ||
topic: 'hello/world' | ||
, payload: { my: 'message' } | ||
} | ||
e.on('hello/~', function(message, cb) { | ||
t.equal(message.topic, 'hello/world') | ||
cb() | ||
}) | ||
e.emit(expected, function() { | ||
t.end() | ||
}) | ||
}) | ||
test('support deep wildcard', function(t) { | ||
t.plan(1) | ||
var e = mq() | ||
, expected = { | ||
topic: 'hello/my/world' | ||
, payload: { my: 'message' } | ||
} | ||
e.on('hello/#', function(message, cb) { | ||
t.equal(message.topic, 'hello/my/world') | ||
cb() | ||
}) | ||
e.emit(expected) | ||
}) | ||
test('support changing deep wildcard', function(t) { | ||
t.plan(1) | ||
var e = mq({ wildcardSome: '*' }) | ||
, expected = { | ||
topic: 'hello/my/world' | ||
, payload: { my: 'message' } | ||
} | ||
e.on('hello/*', function(message, cb) { | ||
t.equal(message.topic, 'hello/my/world') | ||
cb() | ||
}) | ||
e.emit(expected) | ||
}) | ||
test('support changing the level separator', function(t) { | ||
t.plan(1) | ||
var e = mq({ separator: '~' }) | ||
, expected = { | ||
topic: 'hello~world' | ||
, payload: { my: 'message' } | ||
} | ||
e.on('hello~+', function(message, cb) { | ||
t.equal(message.topic, 'hello~world') | ||
cb() | ||
}) | ||
e.emit(expected, function() { | ||
t.end() | ||
}) | ||
}) |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
297
162
155980