orbit-db-pubsub
Advanced tools
Comparing version 0.6.1-3ed04f8.0 to 0.6.1-5d7f99e.0
{ | ||
"name": "orbit-db-pubsub", | ||
"version": "0.6.1-3ed04f8.0", | ||
"version": "0.6.1-5d7f99e.0", | ||
"description": "Message propagation module for orbit-db", | ||
@@ -14,3 +14,4 @@ "main": "index.js", | ||
"scripts": { | ||
"test": "echo \"Error: no test specified\" && exit 1" | ||
"test": "echo \"Error: no test specified\" && exit 1", | ||
"lint": "standard" | ||
}, | ||
@@ -35,3 +36,6 @@ "author": "Haad", | ||
"hajamark <mark@haja.io>" | ||
] | ||
], | ||
"devDependencies": { | ||
"standard": "^17.0.0" | ||
} | ||
} |
@@ -7,3 +7,3 @@ 'use strict' | ||
const Logger = require('logplease') | ||
const logger = Logger.create("pubsub", { color: Logger.Colors.Yellow }) | ||
const logger = Logger.create('pubsub', { color: Logger.Colors.Yellow }) | ||
Logger.setLogLevel('ERROR') | ||
@@ -15,3 +15,3 @@ | ||
class IPFSPubsub { | ||
constructor(ipfs, id) { | ||
constructor (ipfs, id) { | ||
this._ipfs = ipfs | ||
@@ -21,4 +21,3 @@ this._id = id | ||
if (this._ipfs.pubsub === null) | ||
logger.error("The provided version of ipfs doesn't have pubsub support. Messages will not be exchanged.") | ||
if (this._ipfs.pubsub === null) { logger.error("The provided version of ipfs doesn't have pubsub support. Messages will not be exchanged.") } | ||
@@ -29,8 +28,7 @@ this._handleMessage = this._handleMessage.bind(this) | ||
// ie. number of databases replicating | ||
if (this._ipfs.setMaxListeners) | ||
this._ipfs.setMaxListeners(maxTopicsOpen) | ||
if (this._ipfs.setMaxListeners) { this._ipfs.setMaxListeners(maxTopicsOpen) } | ||
} | ||
async subscribe(topic, onMessageCallback, onNewPeerCallback, options = {}) { | ||
if(!this._subscriptions[topic] && this._ipfs.pubsub) { | ||
async subscribe (topic, onMessageCallback, onNewPeerCallback, options = {}) { | ||
if (!this._subscriptions[topic] && this._ipfs.pubsub) { | ||
await this._ipfs.pubsub.subscribe(topic, this._handleMessage, options) | ||
@@ -55,3 +53,3 @@ | ||
this._subscriptions[topic] = { | ||
topicMonitor: topicMonitor, | ||
topicMonitor, | ||
onMessage: onMessageCallback, | ||
@@ -61,9 +59,9 @@ onNewPeer: onNewPeerCallback | ||
topicsOpenCount ++ | ||
logger.debug("Topics open:", topicsOpenCount) | ||
topicsOpenCount++ | ||
logger.debug('Topics open:', topicsOpenCount) | ||
} | ||
} | ||
async unsubscribe(hash) { | ||
if(this._subscriptions[hash]) { | ||
async unsubscribe (hash) { | ||
if (this._subscriptions[hash]) { | ||
await this._ipfs.pubsub.unsubscribe(hash, this._handleMessage) | ||
@@ -73,15 +71,15 @@ this._subscriptions[hash].topicMonitor.stop() | ||
logger.debug(`Unsubscribed from '${hash}'`) | ||
topicsOpenCount -- | ||
logger.debug("Topics open:", topicsOpenCount) | ||
topicsOpenCount-- | ||
logger.debug('Topics open:', topicsOpenCount) | ||
} | ||
} | ||
publish(topic, message, options = {}) { | ||
publish (topic, message, options = {}) { | ||
if (this._subscriptions[topic] && this._ipfs.pubsub) { | ||
let payload; | ||
//Buffer should be already serialized. Everything else will get serialized as json if not buffer, string. | ||
if(Buffer.isBuffer(message) | typeof message === "string") { | ||
payload = message; | ||
let payload | ||
// Buffer should be already serialized. Everything else will get serialized as json if not buffer, string. | ||
if (Buffer.isBuffer(message) | typeof message === 'string') { | ||
payload = message | ||
} else { | ||
payload = JSON.stringify(message); | ||
payload = JSON.stringify(message) | ||
} | ||
@@ -92,3 +90,3 @@ this._ipfs.pubsub.publish(topic, Buffer.from(payload), options) | ||
async disconnect() { | ||
async disconnect () { | ||
const topics = Object.keys(this._subscriptions) | ||
@@ -99,20 +97,19 @@ await pSeries(topics.map((t) => this.unsubscribe.bind(this, t))) | ||
async _handleMessage(message) { | ||
async _handleMessage (message) { | ||
// Don't process our own messages | ||
if (message.from === this._id) | ||
return | ||
if (message.from === this._id) { return } | ||
// Get the message content and a subscription | ||
let content, subscription, topicId | ||
let content | ||
// Get the topic | ||
topicId = message.topic | ||
const topicId = message.topic | ||
try { | ||
content = JSON.parse(Buffer.from(message.data).toString()) | ||
} catch { | ||
content = message.data; //Leave content alone. Meant for higher level code using custom serialization. | ||
content = message.data // Leave content alone. Meant for higher level code using custom serialization. | ||
} | ||
subscription = this._subscriptions[topicId] | ||
const subscription = this._subscriptions[topicId] | ||
if(subscription && subscription.onMessage && content) { | ||
if (subscription && subscription.onMessage && content) { | ||
await subscription.onMessage(topicId, content, message.from) | ||
@@ -119,0 +116,0 @@ } |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
14089
1
91