Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mqemitter

Package Overview
Dependencies
Maintainers
1
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mqemitter - npm Package Compare versions

Comparing version 0.1.2 to 0.2.0

35

index.js

@@ -30,11 +30,26 @@ /*

opts.wildcardOne = opts.wildcardOne || '+'
opts.wildcardSome = opts.wildcardSome || '#'
opts.separator = opts.separator || '/'
this._messageQueue = []
this._messageCallbacks = []
this.concurrency = opts.concurrency || opts.maxlength || 0
this.maxlength = opts.maxlength || 0
this.concurrency = opts.concurrency
this.current = 0
this._matcher = new Qlobber()
this._matcher = new Qlobber({
separator: opts.separator
, wildcard_one: opts.wildcardOne
, wildcard_some: opts.wildcardSome
})
}
Object.defineProperty(MQEmitter.prototype, "length", {
get: function() {
return this._messageQueue.length;
},
enumerable: true
});
MQEmitter.prototype.on = function on(topic, notify) {

@@ -62,7 +77,2 @@ assert(topic)

if (this.concurrency > 0 && this.current >= this.concurrency) {
if (this.maxlength > 0 && this._messageQueue.length === this.maxlength) {
return cb(new Error('Max queue length reached'))
}
this._messageQueue.push(message)

@@ -94,3 +104,2 @@ this._messageCallbacks.push(cb)

var matches = this._matcher.match(message.topic)
, match
, i

@@ -107,9 +116,3 @@

for (i = 0; i < matches.length; i++) {
match = matches[i]
if (match.length === 1) {
match(receiver.counter);
} else {
match(message, receiver.counter);
}
matches[i].call(this, message, receiver.counter);
}

@@ -116,0 +119,0 @@

{
"name": "mqemitter",
"version": "0.1.2",
"version": "0.2.0",
"description": "An Opinionated Message Queue with an emitter-style API",

@@ -5,0 +5,0 @@ "main": "index.js",

mqemitter&nbsp;&nbsp;[![Build Status](https://travis-ci.org/mcollina/mqemitter.png)](https://travis-ci.org/mcollina/mqemitter)
=================================================================
An Opinionated Message Queue with an emitter-style API, but with
callbacks.
An Opinionated Message Queue with an emitter-style API
* <a href="#install">Installation</a>

@@ -26,3 +25,3 @@ * <a href="#basic">Basic Example</a>

var mq = require('mqemitter')
, emitter = mq({ concurrency: 5, maxlength: 42 })
, emitter = mq({ concurrency: 5 })
, message

@@ -38,4 +37,4 @@

message = { topic: 'hello world', payload: 'or any other fields' }
emitter.emit(message, function(err) {
// we can have an err if we enqueued too many messages
emitter.emit(message, function() {
// emitter will never return an error
})

@@ -62,14 +61,15 @@ ```

on concurrent delivery.
- `maxlength`: the maximum number of messages that can be enqueued if
there is it has reached maximum concurrency.
- `wildcardOne`: the char that will match one level wildcards.
- `wildcardSome`: that char that will match multiple level wildcards.
- `separator`: the separator for the different levels.
For more information on wildcards, see [this explanation](#wildcards) or
[Qlobber](https://github.com/davedoesdev/qlobber).
-------------------------------------------------------
<a name="emit"></a>
### emitter.emit(message, callback(err))
### emitter.emit(message, callback())
Emit the given message, which must have a `topic` property, which can contain wildcards
as defined by [QLobber](https://github.com/davedoesdev/qlobber).
The `callback`, accept only one parameter, the possible `Error` object.
This situation might happen if the message cannot be enqueued, i.e.
`maxlength` has been reached.
as defined on creation.

@@ -81,3 +81,3 @@ -------------------------------------------------------

Add the given callback to the passed topic. Topic can contain wildcards,
as defined by [QLobber](https://github.com/davedoesdev/qlobber).
as defined on creation.
The `callback`, accept two parameters, the passed message and a `done`

@@ -95,2 +95,58 @@ callback.

<a name="wildcards"></a>
## Wildcards
__MQEmitter__ supports the use of wildcards: every topic is splitted
according to `separator` (default `/`).
The wildcard character `+` matches exactly one word:
```javascript
var mq = require('mqemitter')
, emitter = mq()
emitter.on('hello/+/world', function(message, cb) {
// this will print { topic: 'hello/my/world', 'something': 'more' }
console.log(message)
cb()
})
emitter.on('hello/+', function(message, cb) {
// this will not be called
console.log(message)
cb()
})
emitter.emit({ topic: 'hello/my/world', something: 'more' })
```
The wildcard character `#` matches zero or more words:
```javascript
var mq = require('mqemitter')
, emitter = mq()
emitter.on('hello/#', function(message, cb) {
// this will print { topic: 'hello/my/world', 'something': 'more' }
console.log(message)
cb()
})
emitter.on('#', function(message, cb) {
// this will print { topic: 'hello/my/world', 'something': 'more' }
console.log(message)
cb()
})
emitter.on('hello/my/world/#', function(message, cb) {
// this will print { topic: 'hello/my/world', 'something': 'more' }
console.log(message)
cb()
})
emitter.emit({ topic: 'hello/my/world', something: 'more' })
```
Of course, you can mix `#` and `+` in the same subscription.
## LICENSE

@@ -97,0 +153,0 @@

@@ -6,3 +6,3 @@

test('support on and emit', function(t) {
t.plan(2)
t.plan(3)

@@ -18,2 +18,3 @@ var e = mq()

t.equal(message, expected)
t.equal(this, e)
cb()

@@ -51,42 +52,4 @@ })

test('support wildcards', function(t) {
t.plan(1)
var e = mq()
, expected = {
topic: 'hello.world'
, payload: { my: 'message' }
}
e.on('hello.*', function(message, cb) {
t.equal(message.topic, 'hello.world')
cb()
})
e.emit(expected, function() {
t.end()
})
})
test('support only one on argument', function(t) {
t.plan(1)
var e = mq()
, expected = {
topic: 'hello world'
, payload: { my: 'message' }
}
e.on('hello world', function(cb) {
t.ok(true)
cb()
})
e.emit(expected, function() {
t.end()
})
})
test('queue concurrency', function(t) {
t.plan(3)
t.plan(4)

@@ -116,2 +79,3 @@ var e = mq({ concurrency: 1 })

t.ok(intermediate - start >= 5, 'min 5 ms between start and intermediate')
t.equal(e.length, 1)
})

@@ -126,26 +90,2 @@

test('queue maxlength', function(t) {
t.plan(2)
var e = mq({ maxlength: 1 })
, expected = {
topic: 'hello world'
, payload: { my: 'message' }
}
e.on('hello 1', function(message, cb) {
// fakely do not callback
})
start = Date.now()
e.emit({ topic: 'hello 1' }, function() {})
e.emit({ topic: 'hello 2' }, function() {})
e.emit({ topic: 'hello 3' }, function(err) {
t.ok(err, 'should return an error object')
t.equal(err.message, 'Max queue length reached')
t.end()
})
})
test('removeListener', function(t) {

@@ -215,1 +155,94 @@ var e = mq()

})
test('support one level wildcard', function(t) {
t.plan(1)
var e = mq()
, expected = {
topic: 'hello/world'
, payload: { my: 'message' }
}
e.on('hello/+', function(message, cb) {
t.equal(message.topic, 'hello/world')
cb()
})
// this will not be catched
e.emit({ topic: 'hello/my/world' })
// this will be catched
e.emit(expected)
})
test('support changing one level wildcard', function(t) {
t.plan(1)
var e = mq({ wildcardOne: '~' })
, expected = {
topic: 'hello/world'
, payload: { my: 'message' }
}
e.on('hello/~', function(message, cb) {
t.equal(message.topic, 'hello/world')
cb()
})
e.emit(expected, function() {
t.end()
})
})
test('support deep wildcard', function(t) {
t.plan(1)
var e = mq()
, expected = {
topic: 'hello/my/world'
, payload: { my: 'message' }
}
e.on('hello/#', function(message, cb) {
t.equal(message.topic, 'hello/my/world')
cb()
})
e.emit(expected)
})
test('support changing deep wildcard', function(t) {
t.plan(1)
var e = mq({ wildcardSome: '*' })
, expected = {
topic: 'hello/my/world'
, payload: { my: 'message' }
}
e.on('hello/*', function(message, cb) {
t.equal(message.topic, 'hello/my/world')
cb()
})
e.emit(expected)
})
test('support changing the level separator', function(t) {
t.plan(1)
var e = mq({ separator: '~' })
, expected = {
topic: 'hello~world'
, payload: { my: 'message' }
}
e.on('hello~+', function(message, cb) {
t.equal(message.topic, 'hello~world')
cb()
})
e.emit(expected, function() {
t.end()
})
})

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc