@akiroz/mqemitter
Advanced tools
Comparing version 4.2.2 to 4.5.1
@@ -5,4 +5,4 @@ | ||
module.exports = function abstractTests (opts) { | ||
var builder = opts.builder | ||
var test = opts.test | ||
const builder = opts.builder | ||
const test = opts.test | ||
@@ -97,3 +97,3 @@ test('support on and emit', function (t) { | ||
} | ||
var toRemoveCalled = false | ||
let toRemoveCalled = false | ||
@@ -400,3 +400,3 @@ function toRemove (message, cb) { | ||
const e = builder() | ||
var check = false | ||
let check = false | ||
@@ -431,4 +431,4 @@ t.notOk(e.closed, 'must have a false closed property') | ||
} | ||
var firstCalled = false | ||
var secondCalled = false | ||
let firstCalled = false | ||
let secondCalled = false | ||
@@ -460,4 +460,4 @@ e.on('hello/#', function (message, cb) { | ||
} | ||
var firstCalled = false | ||
var secondCalled = false | ||
let firstCalled = false | ||
let secondCalled = false | ||
@@ -503,2 +503,31 @@ e.on('hello/#', function (message, cb) { | ||
test('packets are emitted in order', function (t) { | ||
const e = builder() | ||
const total = 10000 | ||
const topic = 'test' | ||
let received = 0 | ||
e.on(topic, function (msg, cb) { | ||
let fail = false | ||
if (received !== msg.payload) { | ||
t.fail(`leak detected. Count: ${received} - Payload: ${msg.payload}`) | ||
fail = true | ||
} | ||
received++ | ||
if (fail || received === total) { | ||
e.close(function () { | ||
t.end() | ||
}) | ||
} | ||
cb() | ||
}) | ||
for (let payload = 0; payload < total; payload++) { | ||
e.emit({ topic, payload }) | ||
} | ||
}) | ||
test('calling emit without cb when closed doesn\'t throw error', function (t) { | ||
@@ -505,0 +534,0 @@ const e = builder() |
@@ -6,4 +6,4 @@ 'use strict' | ||
const total = 1000000 | ||
var written = 0 | ||
var received = 0 | ||
let written = 0 | ||
let received = 0 | ||
const timerKey = 'time for sending ' + total + ' messages' | ||
@@ -10,0 +10,0 @@ |
@@ -30,2 +30,3 @@ 'use strict' | ||
this.current = 0 | ||
this._doing = false | ||
this._matcher = new Qlobber({ | ||
@@ -49,2 +50,4 @@ match_empty_levels: opts.matchEmptyLevels, | ||
that._do(message, callback) | ||
} else { | ||
that._doing = false | ||
} | ||
@@ -96,2 +99,6 @@ } | ||
this._messageCallbacks.push(cb) | ||
if (!this._doing) { | ||
process.emitWarning('MqEmitter leak detected', { detail: 'For more info check: https://github.com/mcollina/mqemitter/pull/94' }) | ||
this._released() | ||
} | ||
} else { | ||
@@ -112,2 +119,3 @@ this._do(message, cb) | ||
MQEmitter.prototype._do = function (message, callback) { | ||
this._doing = true | ||
const matches = this._matcher.match(message.topic) | ||
@@ -121,4 +129,4 @@ | ||
function noop () {} | ||
function noop () { } | ||
module.exports = MQEmitter |
{ | ||
"name": "@akiroz/mqemitter", | ||
"version": "4.2.2", | ||
"version": "4.5.1", | ||
"description": "An Opinionated Message Queue with an emitter-style API", | ||
"main": "mqemitter.js", | ||
"types": "mqemitter.d.ts", | ||
"types": "types/mqemitter.d.ts", | ||
"scripts": { | ||
"lint": "npm run lint:standard && npm run lint:typescript && npm run lint:markdown", | ||
"lint:standard": "standard --verbose | snazzy", | ||
"lint:typescript": "standard --parser @typescript-eslint/parser --plugin @typescript-eslint/eslint-plugin test/types/*.ts mqemitter.d.ts", | ||
"lint:typescript": "standard --parser @typescript-eslint/parser --plugin @typescript-eslint/eslint-plugin test/types/*.ts types/mqemitter.d.ts", | ||
"lint:markdown": "markdownlint README.md", | ||
@@ -17,3 +17,4 @@ "unit": "tape test/*.js", | ||
"test:report": "npm run lint && npm run unit:report && npm run typescript", | ||
"test": "npm run lint && npm run unit:cov && npm run typescript" | ||
"test:types": "tsd", | ||
"test": "npm run lint && npm run unit:cov && tsd && npm run typescript" | ||
}, | ||
@@ -47,12 +48,13 @@ "pre-commit": [ | ||
"devDependencies": { | ||
"@types/node": "^12.12.27", | ||
"@types/node": "^16.11.1", | ||
"@typescript-eslint/eslint-plugin": "^2.19.2", | ||
"@typescript-eslint/parser": "^2.19.2", | ||
"markdownlint-cli": "^0.22.0", | ||
"markdownlint-cli": "^0.29.0", | ||
"nyc": "^15.0.0", | ||
"pre-commit": "^1.2.2", | ||
"snazzy": "^8.0.0", | ||
"standard": "^14.3.1", | ||
"tape": "^4.13.0", | ||
"typescript": "^3.7.5" | ||
"snazzy": "^9.0.0", | ||
"standard": "^16.0.0", | ||
"tape": "^5.0.1", | ||
"tsd": "^0.18.0", | ||
"typescript": "^4.0.2" | ||
}, | ||
@@ -59,0 +61,0 @@ "dependencies": { |
@@ -15,3 +15,3 @@ 'use strict' | ||
const e = mq({ concurrency: 1 }) | ||
var completed1 = false | ||
let completed1 = false | ||
@@ -39,2 +39,29 @@ t.equal(e.concurrency, 1) | ||
test('queue released when full', function (t) { | ||
t.plan(21) | ||
const e = mq({ concurrency: 1 }) | ||
e.on('hello 1', function (message, cb) { | ||
t.ok(true, 'message received') | ||
setTimeout(cb, 10) | ||
}) | ||
function onSent () { | ||
t.ok(true, 'message sent') | ||
} | ||
for (let i = 0; i < 9; i++) { | ||
e._messageQueue.push({ topic: 'hello 1' }) | ||
e._messageCallbacks.push(onSent) | ||
e.current++ | ||
} | ||
e.emit({ topic: 'hello 1' }, onSent) | ||
process.once('warning', function (warning) { | ||
t.equal(warning.message, 'MqEmitter leak detected', 'warning message') | ||
}) | ||
}) | ||
test('without any listeners and a callback', function (t) { | ||
@@ -59,3 +86,3 @@ const e = mq() | ||
const e = mq({ concurrency: 1 }) | ||
var completed1 = false | ||
let completed1 = false | ||
@@ -62,0 +89,0 @@ t.equal(e.concurrency, 1) |
/* eslint no-unused-vars: 0 */ | ||
/* eslint no-undef: 0 */ | ||
import { MQEmitter, Message } from '../../mqemitter' | ||
import MQEmitter, { Message } from '../../types/mqemitter' | ||
const noop = function () {} | ||
var mq = MQEmitter() | ||
let mq = MQEmitter() | ||
mq = MQEmitter({ | ||
@@ -38,5 +38,5 @@ concurrency: 100 | ||
mq.emit('hello/world') | ||
mq.emit({ topic: 'hello/world', payload: 'or any other fields', [Symbol.for('me')]: 42 }) | ||
mq.emit('hello/world', function (err) { | ||
mq.emit({ topic: 'hello/world' }, function (err) { | ||
console.log(err) | ||
@@ -43,0 +43,0 @@ }) |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
36395
16
827
11