| * text=auto eol=lf |
| name: Integrate | ||
| on: | ||
| push: | ||
| branches: [main] | ||
| pull_request: | ||
| branches: [main] | ||
| jobs: | ||
| lint: | ||
| runs-on: ubuntu-latest | ||
| name: Lint | ||
| steps: | ||
| - uses: holepunchto/actions/node-base@v1 | ||
| - run: npm run lint | ||
| test: | ||
| strategy: | ||
| matrix: | ||
| include: | ||
| - os: ubuntu-latest | ||
| platform: linux | ||
| - os: macos-latest | ||
| platform: darwin | ||
| - os: windows-latest | ||
| platform: win32 | ||
| runs-on: ${{ matrix.os }} | ||
| name: Test / ${{ matrix.platform }} | ||
| steps: | ||
| - uses: holepunchto/actions/bare-base@v1 | ||
| - run: npm test |
| "prettier-config-holepunch" |
+6
-2
@@ -8,3 +8,3 @@ import Iambus from './index.js' | ||
| async function msglogger () { | ||
| async function msglogger() { | ||
| for await (const message of bus.sub({})) console.log('BUS MSG', new Date(), '-', message) | ||
@@ -21,3 +21,7 @@ } | ||
| setImmediate(() => { | ||
| bus.pub({ match: 'this', and: { also: 'this' }, content: 'even more content' }) | ||
| bus.pub({ | ||
| match: 'this', | ||
| and: { also: 'this' }, | ||
| content: 'even more content' | ||
| }) | ||
| }) | ||
@@ -24,0 +28,0 @@ }) |
+16
-29
| 'use strict' | ||
| const assert = require('nanoassert') | ||
| const streamx = require('streamx') | ||
| const match = require('ptnm') | ||
| const FALLBACK_CUTOVER_DELAY = 180_000 // 3 minutes | ||
@@ -9,3 +10,3 @@ | ||
| constructor (bus, pattern, opts = {}) { | ||
| constructor(bus, pattern, opts = {}) { | ||
| super(opts) | ||
@@ -31,3 +32,3 @@ this.bus = bus | ||
| cutover (after = 0) { | ||
| cutover(after = 0) { | ||
| if (this._timeout !== null) { | ||
@@ -42,3 +43,3 @@ clearTimeout(this._timeout) | ||
| feed (subscriber) { | ||
| feed(subscriber) { | ||
| if (this.bus.subscribers.has(subscriber) === false) return subscriber | ||
@@ -51,3 +52,3 @@ for (const message of this.queue) { | ||
| push (message) { | ||
| push(message) { | ||
| if (this.retain) { | ||
@@ -61,3 +62,3 @@ this.queue.push(message) | ||
| pushOnMatch (message) { | ||
| pushOnMatch(message) { | ||
| if (match(message, this.pattern)) { | ||
@@ -68,7 +69,7 @@ this.push(message) | ||
| end () { | ||
| end() { | ||
| this.push(null) | ||
| } | ||
| _destroy (cb) { | ||
| _destroy(cb) { | ||
| this.bus.subscribers.delete(this) | ||
@@ -85,3 +86,3 @@ this.once('cutover', () => { | ||
| static Subscriber = Subscriber | ||
| constructor ({ onsub = () => {} } = {}) { | ||
| constructor({ onsub = () => {} } = {}) { | ||
| this.subscribers = new Set() | ||
@@ -91,3 +92,3 @@ this._onsub = onsub | ||
| pub (message) { | ||
| pub(message) { | ||
| for (const subscriber of this.subscribers) { | ||
@@ -98,4 +99,7 @@ subscriber.pushOnMatch(message) | ||
| sub (pattern = {}, opts) { | ||
| assert(typeof pattern === 'object' && pattern !== null, 'Pass a pattern object: bus.sub(pattern)') | ||
| sub(pattern = {}, opts) { | ||
| assert( | ||
| typeof pattern === 'object' && pattern !== null, | ||
| 'Pass a pattern object: bus.sub(pattern)' | ||
| ) | ||
| const subscriber = new Subscriber(this, pattern, opts) | ||
@@ -106,3 +110,3 @@ this._onsub(subscriber) | ||
| destroy () { | ||
| destroy() { | ||
| for (const subscriber of this.subscribers) { | ||
@@ -113,18 +117,1 @@ subscriber.destroy() | ||
| } | ||
| function match (message, pattern) { | ||
| if (typeof pattern !== 'object' || pattern === null) return false | ||
| for (const key in pattern) { | ||
| if (Object.hasOwn(pattern, key) === false) continue | ||
| if (Object.hasOwn(message, key) === false) return false | ||
| const messageValue = message[key] | ||
| const patternValue = pattern[key] | ||
| const nested = typeof patternValue === 'object' && patternValue !== null && typeof messageValue === 'object' && messageValue !== null | ||
| if (nested) { | ||
| if (match(messageValue, patternValue) === false) return false | ||
| } else if (messageValue !== patternValue) { | ||
| return false | ||
| } | ||
| } | ||
| return true | ||
| } |
+10
-4
| { | ||
| "name": "iambus", | ||
| "version": "2.0.5", | ||
| "version": "2.0.6", | ||
| "description": "minimalist pattern-matching pub-sub message-bus", | ||
| "main": "index.js", | ||
| "scripts": { | ||
| "test": "brittle test.js && npm run lint", | ||
| "lint": "standard" | ||
| "format": "prettier --write .", | ||
| "lint": "prettier --check . && lunte", | ||
| "test": "npm run test:node && npm run test:bare", | ||
| "test:node": "brittle-node test.js", | ||
| "test:bare": "brittle-bare test.js" | ||
| }, | ||
@@ -22,2 +25,3 @@ "keywords": [ | ||
| "nanoassert": "^2.0.0", | ||
| "ptnm": "^1.0.0", | ||
| "streamx": "^2.13.2" | ||
@@ -27,3 +31,5 @@ }, | ||
| "brittle": "^3.3.2", | ||
| "standard": "^17.1.0" | ||
| "lunte": "^1.0.0", | ||
| "prettier": "^3.6.2", | ||
| "prettier-config-holepunch": "^2.0.0" | ||
| }, | ||
@@ -30,0 +36,0 @@ "repository": { |
+36
-26
@@ -36,3 +36,3 @@ # iambus | ||
| Subscribers are Readable [streamx](https://npmjs.com/package/streamx) streams. | ||
| Subscribers are Readable [streamx](https://npmjs.com/package/streamx) streams. | ||
@@ -44,3 +44,3 @@ Subscribe to a pattern (an object which partially matches against target messages). | ||
| ```js | ||
| const pattern= { topic: 'news' } | ||
| const pattern = { topic: 'news' } | ||
| const subscriber = bus.sub(pattern) | ||
@@ -77,3 +77,3 @@ ``` | ||
| ```js | ||
| async function msglogger () { | ||
| async function msglogger() { | ||
| for await (const message of bus.sub({})) console.log('BUS MSG', Date.now(), ' - ', message) | ||
@@ -84,3 +84,2 @@ } | ||
| ## Retain - `bus.sub(pattern, { [ retain = false ] })` | ||
@@ -102,3 +101,7 @@ | ||
| setImmediate(() => { | ||
| bus.pub({ match: 'this', and: { also: 'this' }, content: 'even more content' }) | ||
| bus.pub({ | ||
| match: 'this', | ||
| and: { also: 'this' }, | ||
| content: 'even more content' | ||
| }) | ||
| }) | ||
@@ -109,8 +112,8 @@ }) | ||
| const subscriber = bus.sub({ match: 'this', and: { also: 'this' } }, { retain: true }) | ||
| const consumerA = subscriber.feed(bus.sub({ match: 'this'})) | ||
| const consumerA = subscriber.feed(bus.sub({ match: 'this' })) | ||
| const consumerB = subscriber.feed(bus.sub({ and: { also: 'this' } })) | ||
| subscriber.on('data', (data) => console.log('Subscriber got', data) ) | ||
| consumerA.on('data', (data) => console.log('ConsumerA got', data) ) | ||
| consumerB.on('data', (data) => console.log('ConsumerB got', data) ) | ||
| subscriber.on('data', (data) => console.log('Subscriber got', data)) | ||
| consumerA.on('data', (data) => console.log('ConsumerA got', data)) | ||
| consumerB.on('data', (data) => console.log('ConsumerB got', data)) | ||
| ``` | ||
@@ -150,3 +153,7 @@ | ||
| setImmediate(() => { | ||
| bus.pub({ match: 'this', and: { also: 'this' }, content: 'even more content' }) | ||
| bus.pub({ | ||
| match: 'this', | ||
| and: { also: 'this' }, | ||
| content: 'even more content' | ||
| }) | ||
| }) | ||
@@ -157,13 +164,11 @@ }) | ||
| const subscriber = bus.sub({ match: 'this', and: { also: 'this' } }, { retain: true, max: 2 }) | ||
| const consumerA = subscriber.feed(bus.sub({ match: 'this'})) | ||
| const consumerA = subscriber.feed(bus.sub({ match: 'this' })) | ||
| setTimeout(() => { | ||
| const consumerB = subscriber.feed(bus.sub({ and: { also: 'this' } })) | ||
| consumerB.on('data', (data) => console.log('ConsumerB got', data) ) | ||
| consumerB.on('data', (data) => console.log('ConsumerB got', data)) | ||
| }, 1000) | ||
| subscriber.on('data', (data) => console.log('Subscriber got', data) ) | ||
| consumerA.on('data', (data) => console.log('ConsumerA got', data) ) | ||
| consumerB.on('data', (data) => console.log('ConsumerB got', data) ) | ||
| subscriber.on('data', (data) => console.log('Subscriber got', data)) | ||
| consumerA.on('data', (data) => console.log('ConsumerA got', data)) | ||
| consumerB.on('data', (data) => console.log('ConsumerB got', data)) | ||
| ``` | ||
@@ -188,3 +193,3 @@ | ||
| Calling `subscriber.cutover()` clears the queue and stops retaining. | ||
| Calling `subscriber.cutover()` clears the queue and stops retaining. | ||
@@ -204,3 +209,7 @@ This should be called if `opts.retain` is set to `true`. | ||
| setTimeout(() => { | ||
| bus.pub({ match: 'this', and: { also: 'this' }, content: 'even more content' }) | ||
| bus.pub({ | ||
| match: 'this', | ||
| and: { also: 'this' }, | ||
| content: 'even more content' | ||
| }) | ||
| }, 1500) | ||
@@ -217,3 +226,2 @@ }) | ||
| subscriber.on('data', (data) => console.log('Subscriber got', data)) | ||
@@ -235,3 +243,3 @@ consumerA.on('data', (data) => console.log('ConsumerA got', data)) | ||
| Note that ConsumerB only recieves the last message, because cutover occurs before it subscribes so by then the queue is empty. | ||
| Note that ConsumerB only recieves the last message, because cutover occurs before it subscribes so by then the queue is empty. | ||
@@ -251,3 +259,7 @@ Cutover can occur after a delay by passing an argument representing milliseonds until cutover: | ||
| setTimeout(() => { | ||
| bus.pub({ match: 'this', and: { also: 'this' }, content: 'even more content' }) | ||
| bus.pub({ | ||
| match: 'this', | ||
| and: { also: 'this' }, | ||
| content: 'even more content' | ||
| }) | ||
| }, 1500) | ||
@@ -264,3 +276,2 @@ }) | ||
| subscriber.on('data', (data) => console.log('Subscriber got', data)) | ||
@@ -290,11 +301,10 @@ consumerA.on('data', (data) => console.log('ConsumerA got', data)) | ||
| ## `Iambus.match(message, pattern) -> boolean` | ||
| ## `Iambus.match(message, pattern) -> boolean` | ||
| Returns `true` if pattern matches message, `false` if not. | ||
| ## Example | ||
| The [example.mjs](./example.mjs) file contains three subscribers and the message logger | ||
| that uses an empty pattern to subscribe to all messages. | ||
| that uses an empty pattern to subscribe to all messages. | ||
@@ -301,0 +311,0 @@ ``` |
+65
-26
@@ -9,3 +9,5 @@ 'use strict' | ||
| const bus = new Iambus() | ||
| teardown(() => { bus.destroy() }) | ||
| teardown(() => { | ||
| bus.destroy() | ||
| }) | ||
| const expected = { topic: 'news', content: 'Hello, world!' } | ||
@@ -20,15 +22,25 @@ | ||
| test('bus.sub(\'invalid pattern\')', async ({ plan, exception, teardown }) => { | ||
| test("bus.sub('invalid pattern')", async ({ plan, exception, teardown }) => { | ||
| plan(1) | ||
| const bus = new Iambus() | ||
| teardown(() => { bus.destroy() }) | ||
| teardown(() => { | ||
| bus.destroy() | ||
| }) | ||
| exception(() => { bus.sub('invalid pattern') }) | ||
| exception(() => { | ||
| bus.sub('invalid pattern') | ||
| }) | ||
| }) | ||
| test('bus.sub(pattern) - for await consume, matching and non-matching messages published', async ({ plan, alike, teardown }) => { | ||
| test('bus.sub(pattern) - for await consume, matching and non-matching messages published', async ({ | ||
| plan, | ||
| alike, | ||
| teardown | ||
| }) => { | ||
| plan(1) | ||
| const bus = new Iambus() | ||
| teardown(() => { bus.destroy() }) | ||
| teardown(() => { | ||
| bus.destroy() | ||
| }) | ||
@@ -62,3 +74,5 @@ const matchingMessage = { topic: 'news', content: 'Hello, world!' } | ||
| const bus = new Iambus() | ||
| teardown(() => { bus.destroy() }) | ||
| teardown(() => { | ||
| bus.destroy() | ||
| }) | ||
@@ -78,9 +92,13 @@ setImmediate(() => { | ||
| } | ||
| alike(received, [ | ||
| { topic: 'news', content: 'Hello, world!' }, | ||
| { topic: 'art', content: '<^_^>' } | ||
| ], 'Received expected matching message') | ||
| alike( | ||
| received, | ||
| [ | ||
| { topic: 'news', content: 'Hello, world!' }, | ||
| { topic: 'art', content: '<^_^>' } | ||
| ], | ||
| 'Received expected matching message' | ||
| ) | ||
| }) | ||
| test('subscribe().on(\'data\',...) with sub.destroy()', async ({ plan, is, pass, teardown }) => { | ||
| test("subscribe().on('data',...) with sub.destroy()", async ({ plan, is, pass, teardown }) => { | ||
| plan(2) | ||
@@ -97,3 +115,5 @@ const bus = new Iambus() | ||
| await new Promise(setImmediate) // skip tick for event propagation | ||
| subscriber.on('close', () => { pass('close') }) | ||
| subscriber.on('close', () => { | ||
| pass('close') | ||
| }) | ||
| subscriber.destroy() | ||
@@ -108,3 +128,5 @@ bus.pub({ topic: 'news', content: 'will be ignored' }) | ||
| const bus = new Iambus() | ||
| teardown(() => { bus.destroy() }) | ||
| teardown(() => { | ||
| bus.destroy() | ||
| }) | ||
@@ -134,3 +156,5 @@ setImmediate(() => { | ||
| const bus = new Iambus() | ||
| teardown(() => { bus.destroy() }) | ||
| teardown(() => { | ||
| bus.destroy() | ||
| }) | ||
@@ -158,7 +182,14 @@ setImmediate(() => { | ||
| test('subscriber.feed(toSubscriber) consumes data from subscriber queue', async ({ plan, alike, is, teardown }) => { | ||
| test('subscriber.feed(toSubscriber) consumes data from subscriber queue', async ({ | ||
| plan, | ||
| alike, | ||
| is, | ||
| teardown | ||
| }) => { | ||
| plan(3) | ||
| const bus = new Iambus() | ||
| teardown(() => { bus.destroy() }) | ||
| teardown(() => { | ||
| bus.destroy() | ||
| }) | ||
@@ -189,3 +220,5 @@ const subscriber = bus.sub({ topic: 'live' }, { retain: true }) | ||
| const bus = new Iambus() | ||
| teardown(() => { bus.destroy() }) | ||
| teardown(() => { | ||
| bus.destroy() | ||
| }) | ||
| const subscriber = bus.sub({ topic: 'max' }, { retain: true, max: 2 }) | ||
@@ -200,4 +233,4 @@ | ||
| console.log('huwhut') | ||
| consumer.on('data', msg => received.push(msg)) | ||
| await new Promise(resolve => setTimeout(resolve, 10)) | ||
| consumer.on('data', (msg) => received.push(msg)) | ||
| await new Promise((resolve) => setTimeout(resolve, 10)) | ||
@@ -214,3 +247,5 @@ alike(received, [ | ||
| const bus = new Iambus() | ||
| teardown(() => { bus.destroy() }) | ||
| teardown(() => { | ||
| bus.destroy() | ||
| }) | ||
| const subscriber = bus.sub({ topic: 'cutover' }, { retain: true }) | ||
@@ -228,3 +263,3 @@ | ||
| consumer.on('data', msg => received.push(msg)) | ||
| consumer.on('data', (msg) => received.push(msg)) | ||
| bus.pub({ topic: 'cutover', content: '3rd' }) | ||
@@ -240,3 +275,5 @@ | ||
| const bus = new Iambus() | ||
| teardown(() => { bus.destroy() }) | ||
| teardown(() => { | ||
| bus.destroy() | ||
| }) | ||
| const subscriber = bus.sub({ topic: 'cutover' }, { retain: true }) | ||
@@ -256,3 +293,3 @@ | ||
| consumer.on('data', msg => received.push(msg)) | ||
| consumer.on('data', (msg) => received.push(msg)) | ||
| bus.pub({ topic: 'cutover', content: '3rd' }) | ||
@@ -268,3 +305,5 @@ | ||
| const bus = new Iambus() | ||
| teardown(() => { bus.destroy() }) | ||
| teardown(() => { | ||
| bus.destroy() | ||
| }) | ||
| const subscriber = bus.sub({ topic: 'cutover' }, { retain: true }) | ||
@@ -281,3 +320,3 @@ bus.pub({ topic: 'cutover', content: '1st' }) | ||
| consumer.on('data', msg => received.push(msg)) | ||
| consumer.on('data', (msg) => received.push(msg)) | ||
| bus.pub({ topic: 'cutover', content: '3rd' }) | ||
@@ -284,0 +323,0 @@ |
36029
1.48%9
50%385
8.76%339
3.04%3
50%4
100%+ Added
+ Added