Socket
Socket
Sign inDemoInstall

orbit-db-pubsub

Package Overview
Dependencies
Maintainers
1
Versions
41
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

orbit-db-pubsub - npm Package Compare versions

Comparing version 0.5.3 to 0.5.4

5

package.json
{
"name": "orbit-db-pubsub",
"version": "0.5.3",
"version": "0.5.4",
"description": "Message propagation module for orbit-db",

@@ -13,4 +13,5 @@ "main": "index.js",

"ipfs-pubsub-peer-monitor": "~0.0.5",
"logplease": "~1.2.14"
"logplease": "~1.2.14",
"p-series": "^1.1.0"
}
}

63

src/ipfs-pubsub.js
'use strict'
const pSeries = require('p-series')
const PeerMonitor = require('ipfs-pubsub-peer-monitor')

@@ -29,38 +30,36 @@

subscribe(topic, onMessageCallback, onNewPeerCallback) {
async subscribe(topic, onMessageCallback, onNewPeerCallback) {
if(!this._subscriptions[topic] && this._ipfs.pubsub) {
this._ipfs.pubsub.subscribe(topic, this._handleMessage, (err, res) => {
if (err) throw err
await this._ipfs.pubsub.subscribe(topic, this._handleMessage)
const topicMonitor = new PeerMonitor(this._ipfs.pubsub, topic)
const topicMonitor = new PeerMonitor(this._ipfs.pubsub, topic)
topicMonitor.on('join', (peer) => {
logger.debug(`Peer joined ${topic}:`)
logger.debug(peer)
if (this._subscriptions[topic]) {
onNewPeerCallback(topic, peer)
} else {
logger.warn('Peer joined a room we don\'t have a subscription for')
logger.warn(topic, peer)
}
})
topicMonitor.on('join', (peer) => {
logger.debug(`Peer joined ${topic}:`)
logger.debug(peer)
if (this._subscriptions[topic]) {
onNewPeerCallback(topic, peer)
} else {
logger.warn('Peer joined a room we don\'t have a subscription for')
logger.warn(topic, peer)
}
})
topicMonitor.on('leave', (peer) => logger.debug(`Peer ${peer} left ${topic}`))
topicMonitor.on('error', (e) => logger.error(e))
topicMonitor.on('leave', (peer) => logger.debug(`Peer ${peer} left ${topic}`))
topicMonitor.on('error', (e) => logger.error(e))
this._subscriptions[topic] = {
topicMonitor: topicMonitor,
onMessage: onMessageCallback,
onNewPeer: onNewPeerCallback
}
this._subscriptions[topic] = {
topicMonitor: topicMonitor,
onMessage: onMessageCallback,
onNewPeer: onNewPeerCallback
}
topicsOpenCount ++
logger.debug("Topics open:", topicsOpenCount)
})
topicsOpenCount ++
logger.debug("Topics open:", topicsOpenCount)
}
}
unsubscribe(hash) {
async unsubscribe(hash) {
if(this._subscriptions[hash]) {
this._ipfs.pubsub.unsubscribe(hash, this._handleMessage)
await this._ipfs.pubsub.unsubscribe(hash, this._handleMessage)
this._subscriptions[hash].topicMonitor.stop()

@@ -80,11 +79,9 @@ delete this._subscriptions[hash]

disconnect() {
Object.keys(this._subscriptions)
.forEach((e) => this.unsubscribe(e))
async disconnect() {
const topics = Object.keys(this._subscriptions)
await pSeries(topics.map((t) => this.unsubscribe.bind(this, t)))
this._subscriptions = {}
}
_handleMessage(message) {
async _handleMessage(message) {
// Don't process our own messages

@@ -107,3 +104,3 @@ if (message.from === this._id)

if(subscription && subscription.onMessage && content) {
subscription.onMessage(topicId, content)
await subscription.onMessage(topicId, content)
}

@@ -110,0 +107,0 @@ }

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc