Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

protomux-wakeup

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

protomux-wakeup - npm Package Compare versions

Comparing version
2.6.1
to
2.7.0
+138
-56
index.js

@@ -6,6 +6,3 @@ const crypto = require('hypercore-crypto')

const [
NS_INITATOR,
NS_RESPONDER
] = crypto.namespace('wakeup', 2)
const [NS_INITATOR, NS_RESPONDER] = crypto.namespace('wakeup', 2)

@@ -18,6 +15,14 @@ const Handshake = schema.getEncoding('@wakeup/handshake')

module.exports = class WakeupSwarm {
constructor (onwakeup = noop) {
constructor(onwakeup = noop) {
this.topics = new Map()
this.topicsGC = new Set()
this.muxers = new Set()
this.stats = {
sessionsOpened: 0,
sessionsClosed: 0,
topicsAdded: 0,
topicsGcd: 0,
peersAdded: 0,
peersRemoved: 0
}

@@ -30,3 +35,3 @@ this.onwakeup = onwakeup

session (capability, handlers = {}) {
session(capability, handlers = {}) {
const id = handlers.discoveryKey || crypto.discoveryKey(capability)

@@ -51,3 +56,3 @@ const active = handlers.active !== false

getSessions (capability, handlers = {}) {
getSessions(capability, handlers = {}) {
const id = handlers.discoveryKey || crypto.discoveryKey(capability)

@@ -59,3 +64,3 @@ const hex = b4a.toString(id, 'hex')

hasStream (stream, capability, handlers = {}) {
hasStream(stream, capability, handlers = {}) {
if (!capability) {

@@ -73,3 +78,3 @@ const noiseStream = stream.noiseStream || stream

addStream (stream) {
addStream(stream) {
const noiseStream = stream.noiseStream || stream

@@ -83,3 +88,3 @@

const muxer = getMuxer(noiseStream)
muxer.pair({ protocol: 'wakeup' }, id => this._onpair(id, muxer))
muxer.pair({ protocol: 'wakeup' }, (id) => this._onpair(id, muxer))

@@ -95,3 +100,3 @@ this.muxers.add(muxer)

_onActive (w) {
_onActive(w) {
for (const m of this.muxers) {

@@ -102,3 +107,3 @@ w._onopen(m, false)

_addGC (topic) {
_addGC(topic) {
if (topic.destroyed) return

@@ -111,3 +116,3 @@ this.topicsGC.add(topic)

_removeGC (topic) {
_removeGC(topic) {
this.topicsGC.delete(topic)

@@ -120,3 +125,3 @@ if (this.topicsGC.size === 0 && this._gcInterval) {

_gc () {
_gc() {
const destroy = []

@@ -130,3 +135,3 @@ for (const w of this.topicsGC) {

destroy () {
destroy() {
if (this._gcInterval) clearInterval(this._gcInterval)

@@ -138,3 +143,3 @@ this._gcInterval = null

async _onpair (id, stream) {
async _onpair(id, stream) {
const hex = b4a.toString(id, 'hex')

@@ -145,6 +150,63 @@ const w = this.topics.get(hex)

}
registerMetrics(promClient) {
const self = this
new promClient.Gauge({
// eslint-disable-line no-new
name: 'protomux_wakeup_sessions_opened',
help: 'The amount of sessions opened by protomux wakeup',
collect() {
this.set(self.stats.sessionsOpened)
}
})
new promClient.Gauge({
// eslint-disable-line no-new
name: 'protomux_wakeup_sessions_closed',
help: 'The amount of sessions closed by protomux wakeup',
collect() {
this.set(self.stats.sessionsClosed)
}
})
new promClient.Gauge({
// eslint-disable-line no-new
name: 'protomux_wakeup_topics_added',
help: 'The amount of topics added to protomux wakeup',
collect() {
this.set(self.stats.topicsAdded)
}
})
new promClient.Gauge({
// eslint-disable-line no-new
name: 'protomux_wakeup_topics_gcd',
help: 'The amount of topics that got garbage collected by protomux wakeup',
collect() {
this.set(self.stats.topicsGcd)
}
})
new promClient.Gauge({
// eslint-disable-line no-new
name: 'protomux_wakeup_peers_added',
help: 'The amount of peers added by protomux wakeup, across all topics',
collect() {
this.set(self.stats.peersAdded)
}
})
new promClient.Gauge({
// eslint-disable-line no-new
name: 'protomux_wakeup_peers_removed',
help: 'The amount of peers removed by protomux wakeup, across all topics',
collect() {
this.set(self.stats.peersRemoved)
}
})
}
}
class WakeupPeer {
constructor (topic) {
constructor(topic) {
this.index = 0

@@ -161,5 +223,9 @@ this.userData = null // for the user

this.wireInfo = null
this.active = false
this.topic.state.stats.peersAdded++
}
unlink (list) {
unlink(list) {
this.topic.state.stats.peersRemoved++
// note that since we pop here we can iterate in reverse safely in case a peer is removed in the same tick

@@ -174,3 +240,3 @@ const head = list.pop()

class WakeupSession {
constructor (topic, handlers) {
constructor(topic, handlers) {
this.index = 0

@@ -183,15 +249,19 @@ this.topic = topic

get peers () {
get peers() {
return this.topic.peers
}
addStream (stream) {
hasStream(stream) {
return !!this.getPeer(stream)
}
addStream(stream) {
this.topic.addStream(stream)
}
getPeer (stream) {
getPeer(stream) {
return this.topic.peersByStream.get(stream) || null
}
broadcastLookup (req) {
broadcastLookup(req) {
for (const peer of this.topic.pendingPeers) {

@@ -205,3 +275,3 @@ this.lookup(peer, req)

lookupByStream (stream, req) {
lookupByStream(stream, req) {
const peer = this.topic.peersByStream.get(stream)

@@ -211,7 +281,7 @@ if (peer) this.lookup(peer, req)

lookup (peer, req) {
lookup(peer, req) {
peer.wireLookup.send(req || { hash: null })
}
announceByStream (stream, wakeup) {
announceByStream(stream, wakeup) {
const peer = this.topic.peersByStream.get(stream)

@@ -221,7 +291,7 @@ if (peer && !peer.pending) this.announce(peer, wakeup)

announce (peer, wakeup) {
announce(peer, wakeup) {
peer.wireAnnounce.send(wakeup)
}
active () {
active() {
this.isActive = true

@@ -231,3 +301,3 @@ this.topic._bumpActivity()

inactive () {
inactive() {
this.isActive = false

@@ -237,3 +307,3 @@ this.topic._bumpActivity()

destroy () {
destroy() {
if (this.destroyed) return

@@ -246,3 +316,3 @@ this.destroyed = true

class WakeupTopic {
constructor (state, id, capability, active) {
constructor(state, id, capability, active) {
this.state = state

@@ -260,5 +330,8 @@ this.sessions = []

this.destroyed = false
this.state.stats.topicsAdded++
}
addSession (handlers) {
addSession(handlers) {
this.state.stats.sessionsOpened++
const session = new WakeupSession(this, handlers)

@@ -271,6 +344,8 @@ session.index = this.sessions.length

removeSession (session) {
removeSession(session) {
if (this.sessions.length <= session.index) return
if (this.sessions[session.index] !== session) return
this.state.stats.sessionsClosed++
// same as with the peer, this allows us to iterate while removing if iterating backwards

@@ -287,3 +362,3 @@ const head = this.sessions.pop()

_bumpActivity () {
_bumpActivity() {
let isActive = false

@@ -302,3 +377,3 @@

active () {
active() {
if (this.isActive) return

@@ -310,3 +385,3 @@ this.idleTicks = 0

inactive () {
inactive() {
if (!this.isActive) return

@@ -317,3 +392,3 @@ this.isActive = false

_updateActive (active) {
_updateActive(active) {
const info = { active }

@@ -329,6 +404,8 @@

teardown () {
teardown() {
if (this.destroyed) return
this.destroyed = true
this.state.stats.topicsGcd++
for (let i = this.peers.length - 1; i >= 0; i--) {

@@ -349,16 +426,21 @@ this.peers[i].channel.close()

addStream (stream) {
addStream(stream) {
this._onopen(getMuxer(stream), false)
}
_proveCapabilityTo (stream) {
_proveCapabilityTo(stream) {
return this._makeCapability(stream.isInitiator, stream.handshakeHash)
}
_makeCapability (isInitiator, handshakeHash) {
_makeCapability(isInitiator, handshakeHash) {
return crypto.hash([isInitiator ? NS_INITATOR : NS_RESPONDER, this.capability, handshakeHash])
}
_addPeer (peer, open) {
if (!b4a.equals(open.capability, this._makeCapability(!peer.stream.isInitiator, peer.stream.handshakeHash))) {
_addPeer(peer, open) {
if (
!b4a.equals(
open.capability,
this._makeCapability(!peer.stream.isInitiator, peer.stream.handshakeHash)
)
) {
peer.channel.close()

@@ -390,3 +472,3 @@ return

_checkGC () {
_checkGC() {
const shouldGC = this.activePeers === 0 && this.sessions.length === 0

@@ -407,3 +489,3 @@

_removePeer (peer) {
_removePeer(peer) {
peer.removed = true

@@ -436,3 +518,3 @@ this.peersByStream.delete(peer.stream)

_onannounce (wakeup, peer) {
_onannounce(wakeup, peer) {
for (let i = this.sessions.length - 1; i >= 0; i--) {

@@ -446,3 +528,3 @@ const session = this.sessions[i]

_onlookup (req, peer) {
_onlookup(req, peer) {
for (let i = this.sessions.length - 1; i >= 0; i--) {

@@ -456,3 +538,3 @@ const session = this.sessions[i]

_oninfo (info, peer) {
_oninfo(info, peer) {
if (info.active) {

@@ -487,3 +569,3 @@ if (!peer.active) {

_onopen (muxer, unique) {
_onopen(muxer, unique) {
if (!unique && this.peersByStream.has(muxer.stream)) return

@@ -526,3 +608,3 @@

function onchannelopen (open, channel) {
function onchannelopen(open, channel) {
const peer = channel.userData

@@ -532,3 +614,3 @@ peer.topic._addPeer(peer, open)

function onchannelclose (close, channel) {
function onchannelclose(close, channel) {
const peer = channel.userData

@@ -538,3 +620,3 @@ peer.topic._removePeer(peer)

function onlookup (req, channel) {
function onlookup(req, channel) {
const peer = channel.userData

@@ -544,3 +626,3 @@ peer.topic._onlookup(req, peer)

function onannounce (wakeup, channel) {
function onannounce(wakeup, channel) {
const peer = channel.userData

@@ -550,3 +632,3 @@ peer.topic._onannounce(wakeup, peer)

function onchannelinfo (info, channel) {
function onchannelinfo(info, channel) {
const peer = channel.userData

@@ -556,3 +638,3 @@ peer.topic._oninfo(info, peer)

function getMuxer (stream) {
function getMuxer(stream) {
if (Protomux.isProtomux(stream)) return stream

@@ -565,2 +647,2 @@ if (stream.noiseStream.userData) return stream.noiseStream.userData

function noop () {}
function noop() {}
{
"name": "protomux-wakeup",
"version": "2.6.1",
"version": "2.7.0",
"description": "Wakeup protocol over protomux",

@@ -12,2 +12,10 @@ "main": "index.js",

},
"devDependencies": {
"@hyperswarm/secret-stream": "^6.8.1",
"brittle": "^3.17.1",
"lunte": "^1.0.0",
"prettier": "^3.6.2",
"prettier-config-holepunch": "^2.0.0",
"prom-client": "^15.1.3"
},
"files": [

@@ -18,3 +26,4 @@ "index.js",

"scripts": {
"test": "standard"
"format": "prettier --write .",
"test": "prettier --check . && lunte && brittle test/*.js"
},

@@ -30,6 +39,3 @@ "repository": {

},
"homepage": "https://github.com/holepunchto/protomux-wakeup",
"devDependencies": {
"standard": "^17.1.2"
}
"homepage": "https://github.com/holepunchto/protomux-wakeup"
}
+100
-3

@@ -11,3 +11,3 @@ # protomux-wakeup

``` js
```js
const Wakeup = require('protomux-wakeup')

@@ -50,9 +50,106 @@

s.active()
```
// release the handlers
s.release()
## API
#### `const w = new Wakeup([onwakeup])`
Create a new wakeup swarm with a `onwakeup` callback. `onwakeup` is called with the `id` and `stream` whenever a new peer connects. `id` is the wakeup topic (see `w.session()` for more info). `stream` is the Protomux instance.
#### `w.topics`
A map of all topics being tracked.
#### `const sessions = w.getSessions(capability, handlers = {})`
Return all sessions for the topic that matches the `capability` & `handlers`.
#### `const s = w.session(capability, handlers = {})`
Create a session to track the capability. Handlers includes hook callbacks for the life cycle of peers in the session.
```
const s = w.session(core.key, {
onpeeradd: (peer, session) => {
// Called when a peer is added to the session
},
onpeerremove: (peer, session) => {
// Called when a peer is removed from the session
},
onpeeractive: (peer, session) => {
// Called when a peer becomes active
},
onpeerinactive: (peer, session) => {
// Called when a peer becomes inactive
},
onannounce: (wakeup, peer, session) => {
// Called when a peer announces its available wake ups
},
onlookup: (req, peer, session) => {
// Called when a peer requests available wake ups
}
})
```
`handlers` can also includes:
```
{
active: true, // Whether the session is currently active / replicating
discoveryKey: crypto.discoveryKey(capability) // The buffer to use as the `id` for the wake up topic. If not provide a discoveryKey hash of the capability will be used.
}
```
#### `s.peers`
Peers on the session's topic.
#### `s.topic`
The session's topic.
#### `s.isActive`
Whether the session is active.
#### `s.addStream(stream)`
Add the `stream` to the current session. If `stream` doesn't have a Protomux instance, one will added.
#### `const peer = s.getPeer(stream)`
Get the peer for the given `stream`.
#### `s.lookup(peer, req = { hash: null })`
Send a lookup request to a `peer`. `req` can be of the form `{ hash }`.
Usually used to prompt the peer for any available wake ups.
#### `s.broadcastLookup(req)`
Call `s.lookup()` on all peers.
#### `s.lookupByStream(stream, req)`
A shortcut for calling `s.lookup()` on the peer for a given `stream`.
#### `s.announce(peer, wakeup)`
Announce to the peer an array of wake up messages. Wake up messages have the form `{ key: Buffer, length: Number }`. The `key` buffer is 32 bytes long.
#### `s.announceByStream(stream, wakeup)`
A shortcut for calling `s.announce()` on the peer for a given `stream`.
#### `s.active()`
Set the session as active. Sessions are active by default.
#### `s.inactive()`
Set the session as inactive. This must be called when the session is done.
## License
Apache-2.0

@@ -14,6 +14,6 @@ // This file is autogenerated by the hyperschema compiler

const encoding0 = {
preencode (state, m) {
preencode(state, m) {
state.end++ // max flag is 1 so always one byte
},
encode (state, m) {
encode(state, m) {
const flags = m.active ? 1 : 0

@@ -23,3 +23,3 @@

},
decode (state) {
decode(state) {
const flags = c.uint.decode(state)

@@ -35,3 +35,3 @@

const encoding1 = {
preencode (state, m) {
preencode(state, m) {
c.uint.preencode(state, m.version)

@@ -41,3 +41,3 @@ c.fixed32.preencode(state, m.capability)

},
encode (state, m) {
encode(state, m) {
const flags = m.active ? 1 : 0

@@ -49,3 +49,3 @@

},
decode (state) {
decode(state) {
const r0 = c.uint.decode(state)

@@ -65,11 +65,11 @@ const r1 = c.fixed32.decode(state)

const encoding2 = {
preencode (state, m) {
preencode(state, m) {
c.fixed32.preencode(state, m.key)
c.uint.preencode(state, m.length)
},
encode (state, m) {
encode(state, m) {
c.fixed32.encode(state, m.key)
c.uint.encode(state, m.length)
},
decode (state) {
decode(state) {
const r0 = c.fixed32.decode(state)

@@ -90,3 +90,3 @@ const r1 = c.uint.decode(state)

const encoding4 = {
preencode (state, m) {
preencode(state, m) {
state.end++ // max flag is 1 so always one byte

@@ -96,3 +96,3 @@

},
encode (state, m) {
encode(state, m) {
const flags = m.hash ? 1 : 0

@@ -104,3 +104,3 @@

},
decode (state) {
decode(state) {
const flags = c.uint.decode(state)

@@ -114,7 +114,7 @@

function setVersion (v) {
function setVersion(v) {
version = v
}
function encode (name, value, v = VERSION) {
function encode(name, value, v = VERSION) {
version = v

@@ -124,3 +124,3 @@ return c.encode(getEncoding(name), value)

function decode (name, buffer, v = VERSION) {
function decode(name, buffer, v = VERSION) {
version = v

@@ -130,31 +130,38 @@ return c.decode(getEncoding(name), buffer)

function getEnum (name) {
function getEnum(name) {
switch (name) {
default: throw new Error('Enum not found ' + name)
default:
throw new Error('Enum not found ' + name)
}
}
function getEncoding (name) {
function getEncoding(name) {
switch (name) {
case '@wakeup/info': return encoding0
case '@wakeup/handshake': return encoding1
case '@wakeup/writer': return encoding2
case '@wakeup/announce': return encoding3
case '@wakeup/lookup': return encoding4
default: throw new Error('Encoder not found ' + name)
case '@wakeup/info':
return encoding0
case '@wakeup/handshake':
return encoding1
case '@wakeup/writer':
return encoding2
case '@wakeup/announce':
return encoding3
case '@wakeup/lookup':
return encoding4
default:
throw new Error('Encoder not found ' + name)
}
}
function getStruct (name, v = VERSION) {
function getStruct(name, v = VERSION) {
const enc = getEncoding(name)
return {
preencode (state, m) {
preencode(state, m) {
version = v
enc.preencode(state, m)
},
encode (state, m) {
encode(state, m) {
version = v
enc.encode(state, m)
},
decode (state) {
decode(state) {
version = v

@@ -168,2 +175,11 @@ return enc.decode(state)

module.exports = { resolveStruct, getStruct, getEnum, getEncoding, encode, decode, setVersion, version }
module.exports = {
resolveStruct,
getStruct,
getEnum,
getEncoding,
encode,
decode,
setVersion,
version
}