You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

iambus

Package Overview
Dependencies
Maintainers
2
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

iambus - npm Package Compare versions

Comparing version
2.0.5
to
2.0.6
+1
.gitattributes
* text=auto eol=lf
name: Integrate
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
lint:
runs-on: ubuntu-latest
name: Lint
steps:
- uses: holepunchto/actions/node-base@v1
- run: npm run lint
test:
strategy:
matrix:
include:
- os: ubuntu-latest
platform: linux
- os: macos-latest
platform: darwin
- os: windows-latest
platform: win32
runs-on: ${{ matrix.os }}
name: Test / ${{ matrix.platform }}
steps:
- uses: holepunchto/actions/bare-base@v1
- run: npm test
"prettier-config-holepunch"
+6
-2

@@ -8,3 +8,3 @@ import Iambus from './index.js'

async function msglogger () {
async function msglogger() {
for await (const message of bus.sub({})) console.log('BUS MSG', new Date(), '-', message)

@@ -21,3 +21,7 @@ }

setImmediate(() => {
bus.pub({ match: 'this', and: { also: 'this' }, content: 'even more content' })
bus.pub({
match: 'this',
and: { also: 'this' },
content: 'even more content'
})
})

@@ -24,0 +28,0 @@ })

+16
-29
'use strict'
const assert = require('nanoassert')
const streamx = require('streamx')
const match = require('ptnm')
const FALLBACK_CUTOVER_DELAY = 180_000 // 3 minutes

@@ -9,3 +10,3 @@

constructor (bus, pattern, opts = {}) {
constructor(bus, pattern, opts = {}) {
super(opts)

@@ -31,3 +32,3 @@ this.bus = bus

cutover (after = 0) {
cutover(after = 0) {
if (this._timeout !== null) {

@@ -42,3 +43,3 @@ clearTimeout(this._timeout)

feed (subscriber) {
feed(subscriber) {
if (this.bus.subscribers.has(subscriber) === false) return subscriber

@@ -51,3 +52,3 @@ for (const message of this.queue) {

push (message) {
push(message) {
if (this.retain) {

@@ -61,3 +62,3 @@ this.queue.push(message)

pushOnMatch (message) {
pushOnMatch(message) {
if (match(message, this.pattern)) {

@@ -68,7 +69,7 @@ this.push(message)

end () {
end() {
this.push(null)
}
_destroy (cb) {
_destroy(cb) {
this.bus.subscribers.delete(this)

@@ -85,3 +86,3 @@ this.once('cutover', () => {

static Subscriber = Subscriber
constructor ({ onsub = () => {} } = {}) {
constructor({ onsub = () => {} } = {}) {
this.subscribers = new Set()

@@ -91,3 +92,3 @@ this._onsub = onsub

pub (message) {
pub(message) {
for (const subscriber of this.subscribers) {

@@ -98,4 +99,7 @@ subscriber.pushOnMatch(message)

sub (pattern = {}, opts) {
assert(typeof pattern === 'object' && pattern !== null, 'Pass a pattern object: bus.sub(pattern)')
sub(pattern = {}, opts) {
assert(
typeof pattern === 'object' && pattern !== null,
'Pass a pattern object: bus.sub(pattern)'
)
const subscriber = new Subscriber(this, pattern, opts)

@@ -106,3 +110,3 @@ this._onsub(subscriber)

destroy () {
destroy() {
for (const subscriber of this.subscribers) {

@@ -113,18 +117,1 @@ subscriber.destroy()

}
function match (message, pattern) {
if (typeof pattern !== 'object' || pattern === null) return false
for (const key in pattern) {
if (Object.hasOwn(pattern, key) === false) continue
if (Object.hasOwn(message, key) === false) return false
const messageValue = message[key]
const patternValue = pattern[key]
const nested = typeof patternValue === 'object' && patternValue !== null && typeof messageValue === 'object' && messageValue !== null
if (nested) {
if (match(messageValue, patternValue) === false) return false
} else if (messageValue !== patternValue) {
return false
}
}
return true
}
{
"name": "iambus",
"version": "2.0.5",
"version": "2.0.6",
"description": "minimalist pattern-matching pub-sub message-bus",
"main": "index.js",
"scripts": {
"test": "brittle test.js && npm run lint",
"lint": "standard"
"format": "prettier --write .",
"lint": "prettier --check . && lunte",
"test": "npm run test:node && npm run test:bare",
"test:node": "brittle-node test.js",
"test:bare": "brittle-bare test.js"
},

@@ -22,2 +25,3 @@ "keywords": [

"nanoassert": "^2.0.0",
"ptnm": "^1.0.0",
"streamx": "^2.13.2"

@@ -27,3 +31,5 @@ },

"brittle": "^3.3.2",
"standard": "^17.1.0"
"lunte": "^1.0.0",
"prettier": "^3.6.2",
"prettier-config-holepunch": "^2.0.0"
},

@@ -30,0 +36,0 @@ "repository": {

+36
-26

@@ -36,3 +36,3 @@ # iambus

Subscribers are Readable [streamx](https://npmjs.com/package/streamx) streams.
Subscribers are Readable [streamx](https://npmjs.com/package/streamx) streams.

@@ -44,3 +44,3 @@ Subscribe to a pattern (an object which partially matches against target messages).

```js
const pattern= { topic: 'news' }
const pattern = { topic: 'news' }
const subscriber = bus.sub(pattern)

@@ -77,3 +77,3 @@ ```

```js
async function msglogger () {
async function msglogger() {
for await (const message of bus.sub({})) console.log('BUS MSG', Date.now(), ' - ', message)

@@ -84,3 +84,2 @@ }

## Retain - `bus.sub(pattern, { [ retain = false ] })`

@@ -102,3 +101,7 @@

setImmediate(() => {
bus.pub({ match: 'this', and: { also: 'this' }, content: 'even more content' })
bus.pub({
match: 'this',
and: { also: 'this' },
content: 'even more content'
})
})

@@ -109,8 +112,8 @@ })

const subscriber = bus.sub({ match: 'this', and: { also: 'this' } }, { retain: true })
const consumerA = subscriber.feed(bus.sub({ match: 'this'}))
const consumerA = subscriber.feed(bus.sub({ match: 'this' }))
const consumerB = subscriber.feed(bus.sub({ and: { also: 'this' } }))
subscriber.on('data', (data) => console.log('Subscriber got', data) )
consumerA.on('data', (data) => console.log('ConsumerA got', data) )
consumerB.on('data', (data) => console.log('ConsumerB got', data) )
subscriber.on('data', (data) => console.log('Subscriber got', data))
consumerA.on('data', (data) => console.log('ConsumerA got', data))
consumerB.on('data', (data) => console.log('ConsumerB got', data))
```

@@ -150,3 +153,7 @@

setImmediate(() => {
bus.pub({ match: 'this', and: { also: 'this' }, content: 'even more content' })
bus.pub({
match: 'this',
and: { also: 'this' },
content: 'even more content'
})
})

@@ -157,13 +164,11 @@ })

const subscriber = bus.sub({ match: 'this', and: { also: 'this' } }, { retain: true, max: 2 })
const consumerA = subscriber.feed(bus.sub({ match: 'this'}))
const consumerA = subscriber.feed(bus.sub({ match: 'this' }))
setTimeout(() => {
const consumerB = subscriber.feed(bus.sub({ and: { also: 'this' } }))
consumerB.on('data', (data) => console.log('ConsumerB got', data) )
consumerB.on('data', (data) => console.log('ConsumerB got', data))
}, 1000)
subscriber.on('data', (data) => console.log('Subscriber got', data) )
consumerA.on('data', (data) => console.log('ConsumerA got', data) )
consumerB.on('data', (data) => console.log('ConsumerB got', data) )
subscriber.on('data', (data) => console.log('Subscriber got', data))
consumerA.on('data', (data) => console.log('ConsumerA got', data))
consumerB.on('data', (data) => console.log('ConsumerB got', data))
```

@@ -188,3 +193,3 @@

Calling `subscriber.cutover()` clears the queue and stops retaining.
Calling `subscriber.cutover()` clears the queue and stops retaining.

@@ -204,3 +209,7 @@ This should be called if `opts.retain` is set to `true`.

setTimeout(() => {
bus.pub({ match: 'this', and: { also: 'this' }, content: 'even more content' })
bus.pub({
match: 'this',
and: { also: 'this' },
content: 'even more content'
})
}, 1500)

@@ -217,3 +226,2 @@ })

subscriber.on('data', (data) => console.log('Subscriber got', data))

@@ -235,3 +243,3 @@ consumerA.on('data', (data) => console.log('ConsumerA got', data))

Note that ConsumerB only recieves the last message, because cutover occurs before it subscribes so by then the queue is empty.
Note that ConsumerB only recieves the last message, because cutover occurs before it subscribes so by then the queue is empty.

@@ -251,3 +259,7 @@ Cutover can occur after a delay by passing an argument representing milliseonds until cutover:

setTimeout(() => {
bus.pub({ match: 'this', and: { also: 'this' }, content: 'even more content' })
bus.pub({
match: 'this',
and: { also: 'this' },
content: 'even more content'
})
}, 1500)

@@ -264,3 +276,2 @@ })

subscriber.on('data', (data) => console.log('Subscriber got', data))

@@ -290,11 +301,10 @@ consumerA.on('data', (data) => console.log('ConsumerA got', data))

## `Iambus.match(message, pattern) -> boolean`
## `Iambus.match(message, pattern) -> boolean`
Returns `true` if pattern matches message, `false` if not.
## Example
The [example.mjs](./example.mjs) file contains three subscribers and the message logger
that uses an empty pattern to subscribe to all messages.
that uses an empty pattern to subscribe to all messages.

@@ -301,0 +311,0 @@ ```

+65
-26

@@ -9,3 +9,5 @@ 'use strict'

const bus = new Iambus()
teardown(() => { bus.destroy() })
teardown(() => {
bus.destroy()
})
const expected = { topic: 'news', content: 'Hello, world!' }

@@ -20,15 +22,25 @@

test('bus.sub(\'invalid pattern\')', async ({ plan, exception, teardown }) => {
test("bus.sub('invalid pattern')", async ({ plan, exception, teardown }) => {
plan(1)
const bus = new Iambus()
teardown(() => { bus.destroy() })
teardown(() => {
bus.destroy()
})
exception(() => { bus.sub('invalid pattern') })
exception(() => {
bus.sub('invalid pattern')
})
})
test('bus.sub(pattern) - for await consume, matching and non-matching messages published', async ({ plan, alike, teardown }) => {
test('bus.sub(pattern) - for await consume, matching and non-matching messages published', async ({
plan,
alike,
teardown
}) => {
plan(1)
const bus = new Iambus()
teardown(() => { bus.destroy() })
teardown(() => {
bus.destroy()
})

@@ -62,3 +74,5 @@ const matchingMessage = { topic: 'news', content: 'Hello, world!' }

const bus = new Iambus()
teardown(() => { bus.destroy() })
teardown(() => {
bus.destroy()
})

@@ -78,9 +92,13 @@ setImmediate(() => {

}
alike(received, [
{ topic: 'news', content: 'Hello, world!' },
{ topic: 'art', content: '<^_^>' }
], 'Received expected matching message')
alike(
received,
[
{ topic: 'news', content: 'Hello, world!' },
{ topic: 'art', content: '<^_^>' }
],
'Received expected matching message'
)
})
test('subscribe().on(\'data\',...) with sub.destroy()', async ({ plan, is, pass, teardown }) => {
test("subscribe().on('data',...) with sub.destroy()", async ({ plan, is, pass, teardown }) => {
plan(2)

@@ -97,3 +115,5 @@ const bus = new Iambus()

await new Promise(setImmediate) // skip tick for event propagation
subscriber.on('close', () => { pass('close') })
subscriber.on('close', () => {
pass('close')
})
subscriber.destroy()

@@ -108,3 +128,5 @@ bus.pub({ topic: 'news', content: 'will be ignored' })

const bus = new Iambus()
teardown(() => { bus.destroy() })
teardown(() => {
bus.destroy()
})

@@ -134,3 +156,5 @@ setImmediate(() => {

const bus = new Iambus()
teardown(() => { bus.destroy() })
teardown(() => {
bus.destroy()
})

@@ -158,7 +182,14 @@ setImmediate(() => {

test('subscriber.feed(toSubscriber) consumes data from subscriber queue', async ({ plan, alike, is, teardown }) => {
test('subscriber.feed(toSubscriber) consumes data from subscriber queue', async ({
plan,
alike,
is,
teardown
}) => {
plan(3)
const bus = new Iambus()
teardown(() => { bus.destroy() })
teardown(() => {
bus.destroy()
})

@@ -189,3 +220,5 @@ const subscriber = bus.sub({ topic: 'live' }, { retain: true })

const bus = new Iambus()
teardown(() => { bus.destroy() })
teardown(() => {
bus.destroy()
})
const subscriber = bus.sub({ topic: 'max' }, { retain: true, max: 2 })

@@ -200,4 +233,4 @@

console.log('huwhut')
consumer.on('data', msg => received.push(msg))
await new Promise(resolve => setTimeout(resolve, 10))
consumer.on('data', (msg) => received.push(msg))
await new Promise((resolve) => setTimeout(resolve, 10))

@@ -214,3 +247,5 @@ alike(received, [

const bus = new Iambus()
teardown(() => { bus.destroy() })
teardown(() => {
bus.destroy()
})
const subscriber = bus.sub({ topic: 'cutover' }, { retain: true })

@@ -228,3 +263,3 @@

consumer.on('data', msg => received.push(msg))
consumer.on('data', (msg) => received.push(msg))
bus.pub({ topic: 'cutover', content: '3rd' })

@@ -240,3 +275,5 @@

const bus = new Iambus()
teardown(() => { bus.destroy() })
teardown(() => {
bus.destroy()
})
const subscriber = bus.sub({ topic: 'cutover' }, { retain: true })

@@ -256,3 +293,3 @@

consumer.on('data', msg => received.push(msg))
consumer.on('data', (msg) => received.push(msg))
bus.pub({ topic: 'cutover', content: '3rd' })

@@ -268,3 +305,5 @@

const bus = new Iambus()
teardown(() => { bus.destroy() })
teardown(() => {
bus.destroy()
})
const subscriber = bus.sub({ topic: 'cutover' }, { retain: true })

@@ -281,3 +320,3 @@ bus.pub({ topic: 'cutover', content: '1st' })

consumer.on('data', msg => received.push(msg))
consumer.on('data', (msg) => received.push(msg))
bus.pub({ topic: 'cutover', content: '3rd' })

@@ -284,0 +323,0 @@