orbit-db-pubsub
Advanced tools
Comparing version 0.5.3 to 0.5.4
{ | ||
"name": "orbit-db-pubsub", | ||
"version": "0.5.3", | ||
"version": "0.5.4", | ||
"description": "Message propagation module for orbit-db", | ||
@@ -13,4 +13,5 @@ "main": "index.js", | ||
"ipfs-pubsub-peer-monitor": "~0.0.5", | ||
"logplease": "~1.2.14" | ||
"logplease": "~1.2.14", | ||
"p-series": "^1.1.0" | ||
} | ||
} |
'use strict' | ||
const pSeries = require('p-series') | ||
const PeerMonitor = require('ipfs-pubsub-peer-monitor') | ||
@@ -29,38 +30,36 @@ | ||
subscribe(topic, onMessageCallback, onNewPeerCallback) { | ||
async subscribe(topic, onMessageCallback, onNewPeerCallback) { | ||
if(!this._subscriptions[topic] && this._ipfs.pubsub) { | ||
this._ipfs.pubsub.subscribe(topic, this._handleMessage, (err, res) => { | ||
if (err) throw err | ||
await this._ipfs.pubsub.subscribe(topic, this._handleMessage) | ||
const topicMonitor = new PeerMonitor(this._ipfs.pubsub, topic) | ||
const topicMonitor = new PeerMonitor(this._ipfs.pubsub, topic) | ||
topicMonitor.on('join', (peer) => { | ||
logger.debug(`Peer joined ${topic}:`) | ||
logger.debug(peer) | ||
if (this._subscriptions[topic]) { | ||
onNewPeerCallback(topic, peer) | ||
} else { | ||
logger.warn('Peer joined a room we don\'t have a subscription for') | ||
logger.warn(topic, peer) | ||
} | ||
}) | ||
topicMonitor.on('join', (peer) => { | ||
logger.debug(`Peer joined ${topic}:`) | ||
logger.debug(peer) | ||
if (this._subscriptions[topic]) { | ||
onNewPeerCallback(topic, peer) | ||
} else { | ||
logger.warn('Peer joined a room we don\'t have a subscription for') | ||
logger.warn(topic, peer) | ||
} | ||
}) | ||
topicMonitor.on('leave', (peer) => logger.debug(`Peer ${peer} left ${topic}`)) | ||
topicMonitor.on('error', (e) => logger.error(e)) | ||
topicMonitor.on('leave', (peer) => logger.debug(`Peer ${peer} left ${topic}`)) | ||
topicMonitor.on('error', (e) => logger.error(e)) | ||
this._subscriptions[topic] = { | ||
topicMonitor: topicMonitor, | ||
onMessage: onMessageCallback, | ||
onNewPeer: onNewPeerCallback | ||
} | ||
this._subscriptions[topic] = { | ||
topicMonitor: topicMonitor, | ||
onMessage: onMessageCallback, | ||
onNewPeer: onNewPeerCallback | ||
} | ||
topicsOpenCount ++ | ||
logger.debug("Topics open:", topicsOpenCount) | ||
}) | ||
topicsOpenCount ++ | ||
logger.debug("Topics open:", topicsOpenCount) | ||
} | ||
} | ||
unsubscribe(hash) { | ||
async unsubscribe(hash) { | ||
if(this._subscriptions[hash]) { | ||
this._ipfs.pubsub.unsubscribe(hash, this._handleMessage) | ||
await this._ipfs.pubsub.unsubscribe(hash, this._handleMessage) | ||
this._subscriptions[hash].topicMonitor.stop() | ||
@@ -80,11 +79,9 @@ delete this._subscriptions[hash] | ||
disconnect() { | ||
Object.keys(this._subscriptions) | ||
.forEach((e) => this.unsubscribe(e)) | ||
async disconnect() { | ||
const topics = Object.keys(this._subscriptions) | ||
await pSeries(topics.map((t) => this.unsubscribe.bind(this, t))) | ||
this._subscriptions = {} | ||
} | ||
_handleMessage(message) { | ||
async _handleMessage(message) { | ||
// Don't process our own messages | ||
@@ -107,3 +104,3 @@ if (message.from === this._id) | ||
if(subscription && subscription.onMessage && content) { | ||
subscription.onMessage(topicId, content) | ||
await subscription.onMessage(topicId, content) | ||
} | ||
@@ -110,0 +107,0 @@ } |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
4412
3
88
+ Addedp-series@^1.1.0
+ Added@sindresorhus/is@0.7.0(transitive)
+ Addedp-reduce@1.0.0(transitive)
+ Addedp-series@1.1.0(transitive)