orbit-db-pubsub
Advanced tools
Comparing version 0.4.0 to 0.5.0
{ | ||
"name": "orbit-db-pubsub", | ||
"version": "0.4.0", | ||
"version": "0.5.0", | ||
"description": "Message propagation module for orbit-db", | ||
@@ -12,5 +12,5 @@ "main": "index.js", | ||
"dependencies": { | ||
"ipfs-pubsub-room": "~1.2.0", | ||
"ipfs-pubsub-peer-monitor": "0.0.1", | ||
"logplease": "~1.2.14" | ||
} | ||
} |
'use strict' | ||
const Room = require('ipfs-pubsub-room') | ||
const PeerMonitor = require('ipfs-pubsub-peer-monitor') | ||
const Logger = require('logplease') | ||
const logger = Logger.create("orbit-db.ipfs-pubsub") | ||
const logger = Logger.create("pubsub", { color: Logger.Colors.Yellow }) | ||
Logger.setLogLevel('ERROR') | ||
@@ -30,33 +30,31 @@ | ||
subscribe(topic, onMessageCallback, onNewPeerCallback) { | ||
if(!this._subscriptions[topic]) { | ||
const room = Room(this._ipfs, topic) | ||
if(!this._subscriptions[topic] && this._ipfs.pubsub) { | ||
this._ipfs.pubsub.subscribe(topic, this._handleMessage, (err, res) => { | ||
if (err) throw err | ||
room.on('error', (e) => { | ||
logger.error("Pubsub Error:", e) | ||
}) | ||
const topicMonitor = new PeerMonitor(this._ipfs.pubsub, topic) | ||
room.on('message', (message) => { | ||
this._handleMessage(message) | ||
}) | ||
topicMonitor.on('join', (peer) => { | ||
logger.debug(`Peer joined ${topic}:`) | ||
logger.debug(peer) | ||
if (this._subscriptions[topic]) { | ||
onNewPeerCallback(topic, peer) | ||
} else { | ||
logger.warn('Peer joined a room we don\'t have a subscription for') | ||
logger.warn(topic, peer) | ||
} | ||
}) | ||
room.on('peer joined', (peer) => { | ||
logger.debug("Peer connected:", topic, topic === room._topic) | ||
if (this._subscriptions[topic]) { | ||
this._subscriptions[topic].onNewPeer(topic, peer, room) | ||
} else { | ||
logger.warn('Peer joined a room we don\'t have a subscription for') | ||
logger.warn(peer, room._topic, topic) | ||
} | ||
}) | ||
topicMonitor.on('leave', (peer) => logger.debug(`Peer ${peer} left ${topic}`)) | ||
topicMonitor.on('error', (e) => logger.error(e)) | ||
room.on('subscribed', () => { | ||
this._subscriptions[topic] = { | ||
room: room, | ||
onMessage: onMessageCallback, | ||
this._subscriptions[topic] = { | ||
topicMonitor: topicMonitor, | ||
onMessage: onMessageCallback, | ||
onNewPeer: onNewPeerCallback | ||
} | ||
topicsOpenCount ++ | ||
logger.debug("Topics open:", topicsOpenCount) | ||
}) | ||
} | ||
@@ -67,4 +65,4 @@ } | ||
if(this._subscriptions[hash]) { | ||
this._subscriptions[hash].room.leave() | ||
this._subscriptions[hash].room = null | ||
this._ipfs.pubsub.unsubscribe(hash, this._handleMessage) | ||
this._subscriptions[hash].topicMonitor.stop() | ||
delete this._subscriptions[hash] | ||
@@ -77,5 +75,5 @@ logger.debug(`Unsubscribed from '${hash}'`) | ||
publish(hash, message) { | ||
if(this._subscriptions[hash] && this._subscriptions[hash].room && this._ipfs.pubsub) { | ||
this._subscriptions[hash].room.broadcast(Buffer.from(JSON.stringify(message))) | ||
publish(topic, message) { | ||
if(this._subscriptions[topic] && this._ipfs.pubsub) { | ||
this._ipfs.pubsub.publish(topic, Buffer.from(JSON.stringify(message))) | ||
} | ||
@@ -82,0 +80,0 @@ } |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
4361
89
+ Addedipfs-pubsub-peer-monitor@0.0.1(transitive)
- Removedipfs-pubsub-room@~1.2.0
- Removedipfs-pubsub-room@1.2.1(transitive)
- Removedlodash.clonedeep@4.5.0(transitive)
- Removedpull-pushable@2.2.0(transitive)
- Removedpull-stream@3.7.0(transitive)
- Removedsafe-buffer@5.2.1(transitive)