ipfs-pubsub-1on1
Advanced tools
Comparing version 0.0.7 to 0.0.8-360b964.0
{ | ||
"name": "ipfs-pubsub-1on1", | ||
"version": "0.0.7", | ||
"version": "0.0.8-360b964.0", | ||
"description": "1-to-1 communication channel over IPFS Pubsub between two peers", | ||
"license": "MIT", | ||
"author": "Haad", | ||
"type": "module", | ||
"main": "src/direct-channel.js", | ||
@@ -13,10 +14,13 @@ "dependencies": { | ||
"mocha": "^8.1.3", | ||
"orbit-db-test-utils": "^0.11.1", | ||
"orbit-db-test-utils": "^2.2.0", | ||
"p-map-series": "^2.1.0", | ||
"rimraf": "^3.0.2" | ||
"rimraf": "^3.0.2", | ||
"standard": "^17.0.0" | ||
}, | ||
"scripts": { | ||
"test": "TEST=all mocha", | ||
"test:coverage": "istanbul cover _mocha" | ||
"test:coverage": "istanbul cover _mocha", | ||
"lint": "standard", | ||
"lint:fix": "standard --fix" | ||
} | ||
} |
@@ -13,3 +13,3 @@ # ipfs-pubsub-1on1 | ||
// Include as lib | ||
const Channel = require('ipfs-pubsub-1on1') | ||
import Channel from 'ipfs-pubsub-1on1' | ||
// Create IPFS instance somehow | ||
@@ -16,0 +16,0 @@ const ipfs = new IPFS() |
@@ -1,14 +0,12 @@ | ||
'use strict' | ||
import path from 'path' | ||
import EventEmitter from 'events' | ||
import PROTOCOL from './protocol.js' | ||
import encode from './encoding.js' | ||
import waitForPeers from './wait-for-peers.js' | ||
import getPeerID from './get-peer-id.js' | ||
const path = require('path') | ||
const EventEmitter = require('events') | ||
const PROTOCOL = require('./protocol') | ||
const encode = require('./encoding') | ||
const waitForPeers = require('./wait-for-peers') | ||
const getPeerID = require('./get-peer-id') | ||
/** | ||
* Communication channel over Pubsub between two IPFS nodes | ||
*/ | ||
class DirectChannel extends EventEmitter { | ||
export default class DirectChannel extends EventEmitter { | ||
constructor (ipfs, receiverID) { | ||
@@ -24,2 +22,4 @@ super() | ||
this._closed = false | ||
this._isClosed = () => this._closed | ||
this._receiverID = receiverID | ||
@@ -50,3 +50,3 @@ | ||
async connect () { | ||
await waitForPeers(this._ipfs, [this._receiverID], this._id) | ||
await waitForPeers(this._ipfs, [this._receiverID], this._id, this._isClosed) | ||
} | ||
@@ -59,2 +59,3 @@ | ||
async send (message) { | ||
if (this._closed) return | ||
let m = encode(message) | ||
@@ -68,2 +69,3 @@ await this._ipfs.pubsub.publish(this._id, m) | ||
close () { | ||
this._closed = true | ||
this.removeAllListeners('message') | ||
@@ -85,3 +87,3 @@ this._ipfs.pubsub.unsubscribe(this._id, this._messageHandler) | ||
// Make sure the message is coming from the correct peer | ||
const isValid = message && message.from === this._receiverID | ||
const isValid = message && String(message.from) === String(this._receiverID) | ||
// Filter out all messages that didn't come from the second peer | ||
@@ -95,2 +97,3 @@ if (isValid) { | ||
async _openChannel () { | ||
this._closed = false | ||
await this._setup() | ||
@@ -106,3 +109,1 @@ await this._ipfs.pubsub.subscribe(this._id, this._messageHandler) | ||
} | ||
module.exports = DirectChannel |
@@ -1,6 +0,4 @@ | ||
'use strict' | ||
import { Buffer } from 'safe-buffer' | ||
const Buffer = require('safe-buffer').Buffer | ||
module.exports = (_message) => { | ||
export default (_message) => { | ||
let message = _message | ||
@@ -7,0 +5,0 @@ if (!Buffer.isBuffer(message)) { |
@@ -1,8 +0,4 @@ | ||
'use strict' | ||
const getPeerID = async (ipfs) => { | ||
export default async (ipfs) => { | ||
const peerInfo = await ipfs.id() | ||
return peerInfo.id | ||
} | ||
module.exports = getPeerID |
@@ -1,3 +0,1 @@ | ||
'use strict' | ||
module.exports = 'ipfs-pubsub-direct-channel/v1' | ||
export default 'ipfs-pubsub-direct-channel/v1' |
@@ -1,7 +0,7 @@ | ||
'use strict' | ||
const waitForPeers = async (ipfs, peersToWait, topic) => { | ||
export default async (ipfs, peersToWait, topic, isClosed) => { | ||
const checkPeers = async () => { | ||
const peers = await ipfs.pubsub.peers(topic) | ||
const hasAllPeers = peersToWait.map((e) => peers.includes(e)).filter((e) => e === false).length === 0 | ||
const idPeersToWait = peersToWait.map(e => String(e)) | ||
const idPeers = peers.map(e => String(e)) | ||
const hasAllPeers = idPeersToWait.map((e) => idPeers.includes(e)).filter((e) => e === false).length === 0 | ||
return hasAllPeers | ||
@@ -17,4 +17,6 @@ } | ||
try { | ||
if (await checkPeers()) { | ||
if (isClosed()) { | ||
clearInterval(interval) | ||
} else if (await checkPeers()) { | ||
clearInterval(interval) | ||
resolve() | ||
@@ -28,3 +30,1 @@ } | ||
} | ||
module.exports = waitForPeers |
@@ -1,8 +0,6 @@ | ||
'use strict' | ||
const path = require('path') | ||
const rmrf = require('rimraf') | ||
const assert = require('assert') | ||
const pMapSeries = require('p-map-series') | ||
const { | ||
import path from 'path' | ||
import rmrf from 'rimraf' | ||
import assert from 'assert' | ||
import pMapSeries from 'p-map-series' | ||
import { | ||
connectPeers, | ||
@@ -13,8 +11,8 @@ startIpfs, | ||
testAPIs, | ||
waitForPeers, | ||
} = require('orbit-db-test-utils') | ||
waitForPeers | ||
} from 'orbit-db-test-utils' | ||
const Channel = require('../src/direct-channel') | ||
const getPeerID = require('../src/get-peer-id') | ||
const PROTOCOL = require('../src/protocol') | ||
import Channel from '../src/direct-channel.js' | ||
import getPeerID from '../src/get-peer-id.js' | ||
import PROTOCOL from '../src/protocol.js' | ||
@@ -25,10 +23,10 @@ // IPFS instances used in these tests | ||
'./tmp/peer2/ipfs', | ||
'./tmp/peer3/ipfs', | ||
'./tmp/peer3/ipfs' | ||
] | ||
Object.keys(testAPIs).forEach(API => { | ||
describe(`DirectChannel ${API}`, function() { | ||
describe(`DirectChannel ${API}`, function () { | ||
this.timeout(5000) | ||
let instances = [] | ||
const instances = [] | ||
let ipfsd1, ipfsd2, ipfsd3, ipfs1, ipfs2, ipfs3 | ||
@@ -65,3 +63,3 @@ | ||
describe('create a channel', function() { | ||
describe('create a channel', function () { | ||
it('has two participants', async () => { | ||
@@ -100,3 +98,3 @@ const c = await Channel.open(ipfs1, id2) | ||
describe('properties', function() { | ||
describe('properties', function () { | ||
let c | ||
@@ -134,3 +132,3 @@ | ||
describe('messaging', function() { | ||
describe('messaging', function () { | ||
it('sends and receives messages', async () => { | ||
@@ -149,7 +147,6 @@ const c1 = await Channel.open(ipfs1, id2) | ||
assert.notEqual(m, null) | ||
assert.equal(m.from, id1) | ||
assert.equal(m.from, id1.toString()) | ||
assert.equal(Buffer.from(m.data).toString(), Buffer.from('hello1')) | ||
assert.equal(m.topicIDs.length, 1) | ||
assert.equal(m.topicIDs[0], c1.id) | ||
assert.equal(m.topicIDs[0], c2.id) | ||
assert.equal(m.topic, c1.id) | ||
assert.equal(m.topic, c2.id) | ||
await c2.send(Buffer.from('hello2')) | ||
@@ -159,7 +156,6 @@ }) | ||
c1.on('message', (m) => { | ||
assert.equal(m.from, id2) | ||
assert.equal(m.from, id2.toString()) | ||
assert.equal(Buffer.from(m.data).toString(), Buffer.from('hello2')) | ||
assert.equal(m.topicIDs.length, 1) | ||
assert.equal(m.topicIDs[0], c1.id) | ||
assert.equal(m.topicIDs[0], c2.id) | ||
assert.equal(m.topic, c1.id) | ||
assert.equal(m.topic, c2.id) | ||
c1.close() | ||
@@ -175,3 +171,3 @@ c2.close() | ||
describe('connect', function() { | ||
describe('connect', function () { | ||
it('connects the peers', async () => { | ||
@@ -189,3 +185,3 @@ let c1, c2 | ||
peers = await ipfs1.pubsub.peers(c1.id) | ||
assert.deepEqual(peers, [id2]) | ||
assert.deepEqual(peers.map(e => String(e)), [id2.toString()]) | ||
@@ -197,3 +193,3 @@ c1.close() | ||
describe('disconnecting', function() { | ||
describe('disconnecting', function () { | ||
it('closes a channel', async () => { | ||
@@ -207,9 +203,17 @@ const c1 = await Channel.open(ipfs1, id2) | ||
return new Promise(async (resolve, reject) => { | ||
assert.equal(c1._closed, false) | ||
assert.equal(c1._isClosed(), false) | ||
c1.close() | ||
const topics1 = await ipfs1.pubsub.ls() | ||
assert.deepEqual(topics1, []) | ||
assert.equal(c1._closed, true) | ||
assert.equal(c1._isClosed(), true) | ||
assert.equal(c2._closed, false) | ||
assert.equal(c2._isClosed(), false) | ||
c2.close() | ||
const topics2 = await ipfs2.pubsub.ls() | ||
assert.deepEqual(topics1, []) | ||
assert.equal(c2._closed, true) | ||
assert.equal(c2._isClosed(), true) | ||
@@ -242,3 +246,3 @@ setTimeout(async () => { | ||
describe('errors', function() { | ||
describe('errors', function () { | ||
it('throws an error if pubsub is not supported by given IPFS instance', async () => { | ||
@@ -267,4 +271,4 @@ let c, err | ||
describe('non-participant peers can\'t send messages', function() { | ||
it('doesn\'t receive unwated messages', async () => { | ||
describe('non-participant peers can\'t send messages', function () { | ||
it('doesn\'t receive unwanted messages', async () => { | ||
const c1 = await Channel.open(ipfs1, id2) | ||
@@ -277,11 +281,10 @@ const c2 = await Channel.open(ipfs2, id1) | ||
c1.on('message', (m) => { | ||
assert.equal(m.from, id2) | ||
assert.equal(m.from, id2.toString()) | ||
assert.equal(m.data.toString(), 'hello1') | ||
assert.equal(m.topicIDs.length, 1) | ||
assert.equal(m.topicIDs[0], c1.id) | ||
assert.equal(m.topicIDs[0], c2.id) | ||
assert.equal(m.topic, c1.id) | ||
assert.equal(m.topic, c2.id) | ||
}) | ||
await ipfs3.pubsub.subscribe(c1.id, () => {}) | ||
await waitForPeers(ipfs1, [id3], c1.id) | ||
await waitForPeers(ipfs1, [id3], c1.id, c1._isClosed.bind(c1)) | ||
await ipfs3.pubsub.publish(c1.id, Buffer.from('OMG!')) | ||
@@ -288,0 +291,0 @@ |
14714
11
371
Yes
5