@dxos/broadcast
Advanced tools
Comparing version 1.0.0-beta.7 to 1.0.0-beta.8
@@ -10,4 +10,2 @@ "use strict"; | ||
var _events = require("events"); | ||
var _crypto = _interopRequireDefault(require("crypto")); | ||
@@ -19,4 +17,6 @@ | ||
var _timeLruSet = require("./time-lru-set"); | ||
var _emitter = require("nanoresource-promise/emitter"); | ||
var _lru = _interopRequireDefault(require("lru")); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
@@ -43,5 +43,4 @@ | ||
* @typedef {Object} Middleware | ||
* @property {Function} lookup Async peer lookup. | ||
* @property {Function} send Defines how to send the packet builded by the broadcast. | ||
* @property {Function} subscribe Defines how to subscribe to incoming packets. | ||
* @property {Function} subscribe Defines how to subscribe to incoming packets and update the internal list of peers. | ||
*/ | ||
@@ -64,3 +63,3 @@ | ||
class Broadcast extends _events.EventEmitter { | ||
class Broadcast extends _emitter.NanoresourcePromise { | ||
/** | ||
@@ -72,7 +71,6 @@ * @constructor | ||
* @param {number} [options.maxAge=10000] Defines the max live time for the cache messages. | ||
* @param {number} [options.maxSize=100] Defines the max size for the cache messages. | ||
* @param {number} [options.maxSize=1000] Defines the max size for the cache messages. | ||
*/ | ||
constructor(middleware, options = {}) { | ||
(0, _assert.default)(middleware); | ||
(0, _assert.default)(middleware.lookup); | ||
(0, _assert.default)(middleware.send); | ||
@@ -84,15 +82,13 @@ (0, _assert.default)(middleware.subscribe); | ||
maxAge = 10 * 1000, | ||
maxSize = 100 | ||
maxSize = 1000 | ||
} = options; | ||
this._id = id; | ||
this._lookup = this._buildLookup(() => middleware.lookup()); | ||
this._send = (...args) => middleware.send(...args); | ||
this._subscribe = onPacket => middleware.subscribe(onPacket); | ||
this._subscribe = next => middleware.subscribe(next); | ||
this._running = false; | ||
this._seenSeqs = new _timeLruSet.TimeLRUSet({ | ||
this._seenSeqs = new _lru.default({ | ||
maxAge, | ||
maxSize | ||
max: maxSize | ||
}); | ||
@@ -103,2 +99,8 @@ this._peers = []; | ||
this._codec.addJson(JSON.parse(schema)).build(); | ||
/** @deprecated */ | ||
if (middleware.lookup) { | ||
this._lookup = () => middleware.lookup(); | ||
} | ||
} | ||
@@ -121,7 +123,2 @@ /** | ||
(0, _assert.default)(Buffer.isBuffer(seqno)); | ||
if (!this._running) { | ||
throw new Error('Broadcast not running.'); | ||
} | ||
const packet = { | ||
@@ -135,52 +132,80 @@ seqno, | ||
/** | ||
* Initialize the cache and runs the defined subscription. | ||
* Update internal list of peers. | ||
* | ||
* @returns {undefined} | ||
* @param {Array<Peer>} peers | ||
*/ | ||
run() { | ||
if (this._running) return; | ||
this._running = true; | ||
this._subscription = this._subscribe(packetEncoded => this._onPacket(packetEncoded)) || (() => {}); | ||
log('running %h', this._id); | ||
updatePeers(peers) { | ||
this._peers = peers; | ||
} | ||
/** | ||
* Clear the cache and unsubscribe from incoming messages. | ||
* Update internal cache options | ||
* | ||
* @returns {undefined} | ||
* @param {{ maxAge: number, maxSize: number }} opts | ||
*/ | ||
stop() { | ||
if (!this._running) return; | ||
this._running = false; | ||
updateCache(opts = {}) { | ||
if (opts.maxAge) { | ||
this._seenSeqs.maxAge = opts.maxAge; | ||
} | ||
this._subscription(); | ||
if (opts.maxSize) { | ||
this._seenSeqs.max = opts.maxSize; | ||
} | ||
} | ||
/** | ||
* Prune the internal cache items in timeout | ||
*/ | ||
this._seenSeqs.clear(); | ||
log('stop %h', this._id); | ||
pruneCache() { | ||
for (const key of this._seenSeqs.keys) { | ||
this._seenSeqs.peek(key); | ||
} | ||
} | ||
/** | ||
* Build a deferrer lookup. | ||
* If we call the lookup several times it would runs once a wait for it. | ||
* | ||
* @param {Function} lookup | ||
* @returns {Function} | ||
* @deprecated | ||
*/ | ||
_buildLookup(lookup) { | ||
return async () => { | ||
try { | ||
this._peers = await lookup(); | ||
log('lookup of %h', this._id, this._peers); | ||
} catch (err) { | ||
this.emit('lookup-error', err); | ||
} | ||
run() { | ||
this.open().catch(err => { | ||
process.nextTick(() => this.emit('error', err)); | ||
}); | ||
} | ||
/** | ||
* @deprecated | ||
*/ | ||
stop() { | ||
this.close().catch(err => { | ||
process.nextTick(() => this.emit('error', err)); | ||
}); | ||
} | ||
_open() { | ||
const onData = this._onPacket.bind(this); | ||
const onPeers = this.updatePeers.bind(this); // deprecated the use of lookup | ||
const next = this._lookup ? onData : { | ||
onData, | ||
onPeers | ||
}; | ||
this._unsubscribe = this._subscribe(next) || (() => {}); | ||
log('running %h', this._id); | ||
} | ||
_close() { | ||
this._unsubscribe(); | ||
this._seenSeqs.clear(); | ||
log('stop %h', this._id); | ||
} | ||
/** | ||
@@ -196,3 +221,3 @@ * Publish and/or Forward a packet message to each peer neighboor. | ||
async _publish(packet, options = {}) { | ||
if (!this._running) return; | ||
await this.open(); | ||
@@ -202,3 +227,3 @@ try { | ||
if (this._seenSeqs.has(ownerId)) { | ||
if (this._seenSeqs.get(ownerId)) { | ||
return; | ||
@@ -208,6 +233,8 @@ } // Seen it by me. | ||
this._seenSeqs.add(ownerId); | ||
this._seenSeqs.set(ownerId, true); | ||
/** @deprecated */ | ||
await this._lookup(); // Update the package to set the current sender.. | ||
this._lookup && this.updatePeers(await this._lookup()); // Update the package to set the current sender.. | ||
packet = Object.assign({}, packet, { | ||
@@ -220,7 +247,4 @@ from: this._id | ||
const waitFor = this._peers.map(peer => { | ||
if (!this._running) { | ||
return; | ||
} // Don't send the message to the "origin" peer. | ||
if (!this.opened) return; // Don't send the message to the "origin" peer. | ||
if (packet.origin.equals(peer.id)) { | ||
@@ -231,3 +255,3 @@ return Promise.resolve(); | ||
if (this._seenSeqs.has(msgId(packet.seqno, peer.id))) { | ||
if (this._seenSeqs.get(msgId(packet.seqno, peer.id))) { | ||
return Promise.resolve(); | ||
@@ -238,5 +262,7 @@ } | ||
this._seenSeqs.add(msgId(packet.seqno, peer.id)); | ||
this._seenSeqs.set(msgId(packet.seqno, peer.id), true); | ||
return this._send(packetEncoded, peer, options).catch(err => { | ||
return this._send(packetEncoded, peer, options).then(() => { | ||
this.emit('send', packetEncoded, peer, options); | ||
}).catch(err => { | ||
this.emit('send-error', err); | ||
@@ -262,3 +288,3 @@ }); | ||
_onPacket(packetEncoded) { | ||
if (!this._running) return; | ||
if (!this.opened) return; | ||
@@ -274,6 +300,6 @@ try { | ||
this._seenSeqs.add(msgId(packet.seqno, packet.from)); // Check if I already see this packet. | ||
this._seenSeqs.set(msgId(packet.seqno, packet.from), true); // Check if I already see this packet. | ||
if (this._seenSeqs.has(msgId(packet.seqno, this._id))) { | ||
if (this._seenSeqs.get(msgId(packet.seqno, this._id))) { | ||
return; | ||
@@ -280,0 +306,0 @@ } |
@@ -18,14 +18,2 @@ "use strict"; | ||
}); | ||
var _timeLruSet = require("./time-lru-set.js"); | ||
Object.keys(_timeLruSet).forEach(function (key) { | ||
if (key === "default" || key === "__esModule") return; | ||
Object.defineProperty(exports, key, { | ||
enumerable: true, | ||
get: function () { | ||
return _timeLruSet[key]; | ||
} | ||
}); | ||
}); | ||
//# sourceMappingURL=index.js.map |
{ | ||
"name": "@dxos/broadcast", | ||
"version": "1.0.0-beta.7", | ||
"version": "1.0.0-beta.8", | ||
"description": "Abstract module to send broadcast messages.", | ||
@@ -41,2 +41,4 @@ "homepage": "https://github.com/dxos/broadcast#readme", | ||
"debug": "^4.1.1", | ||
"lru": "^3.1.0", | ||
"nanoresource-promise": "^2.0.0", | ||
"source-map-support": "^0.5.12" | ||
@@ -43,0 +45,0 @@ }, |
@@ -57,5 +57,10 @@ # Broadcast | ||
const middleware = { | ||
lookup: async () => { | ||
// Return the list of neighbors peers with the format: | ||
// [{ id: Buffer, ...extraArgs }, { id: Buffer, ...extraArgs }] | ||
subscribe: ({ onData, onPeers }) => { | ||
// Defines how to process incoming data and peers update. | ||
// on('peers', onPeers) | ||
// on('data', onData) | ||
return () => { | ||
// Return a dispose function. | ||
} | ||
}, | ||
@@ -66,21 +71,2 @@ send: async (packet, node) => { | ||
// "node" is the peer object generate from the lookup. | ||
// e.g. If node is a stream | ||
node.write(packet); | ||
// e.g. If node is a websocket | ||
node.send(packet); | ||
}, | ||
subscribe: (onPacket) => { | ||
// Defines how to process incoming packets. | ||
// e.g. Using websockets | ||
const onMessage = data => onPacket(data); | ||
socket.on('message', onMessage); | ||
// Return a dispose function. | ||
return () => { | ||
socket.off('message', onMessage); | ||
} | ||
} | ||
@@ -91,12 +77,12 @@ }; | ||
id: crypto.randomBytes(32), | ||
maxAge: 10 * 1000, // Timeout for each message in the LRU cache. | ||
maxSize: 200 // Limit of messages in the LRU cache. | ||
maxAge: 15 * 1000, // Timeout for each message in the LRU cache. | ||
maxSize: 1000 // Limit of messages in the LRU cache. | ||
}) | ||
// We initialize the middleware and subscription inside the broadcast. | ||
broadcast.run() | ||
await broadcast.open() | ||
broadcast.publish(Buffer.from('Hello everyone')) | ||
broadcast.stop() | ||
await broadcast.close() | ||
``` | ||
@@ -111,8 +97,7 @@ | ||
- `middleware`: The middleware defines an interface to connect the broadcast to any request/response solution. | ||
- `lookup: () => Promise<Array<Peer>>`: Runs a lookup to get the ids of your peers neighboors. | ||
- `Peer: { id: Buffer, ...props }` | ||
- `subscribe: ({ onData, onPeers }) => unsubscribeFunction`: Defines how to subscribe to incoming packets and peers update. | ||
- `onData: (data: Buffer) => (Packet|undefined)`: Callback to process incoming data. It returns true if the broadcast could decode the message or false if not. | ||
- `onPeers: (peers: [Peer])`: Callback to update the internal list of peers. A `Peer` object must follow the spec: `{ id: Buffer, ...props }` | ||
- `unsubscribeFunction: Function`: Defines a way to unsubscribe from listening messages if the broadcast stop working. Helpful if you are working with streams and event emitters. | ||
- `send: (packet: Buffer, peer: Object) => Promise`: Defines how to send the packet builded by the broadcast. | ||
- `subscribe: (onPacket) => unsubscribeFunction`: Defines how to subscribe to incoming packets. | ||
- `onPacket: (data: Buffer) => (Packet|undefined)`: Callback to process incoming data. It returns true if the broadcast could decode the message or false if not. | ||
- `unsubscribeFunction: Function`: Defines a way to unsubscribe from listening messages if the broadcast stop working. Helpful if you are working with streams and event emitters. | ||
@@ -122,9 +107,9 @@ - `options` | ||
- `maxAge: number`: Defines the max live time for the cache messages. Default: `10 * 1000`. | ||
- `maxSize: number`: Defines the max size for the cache messages. Default: `100`. | ||
- `maxSize: number`: Defines the max size for the cache messages. Default: `1000`. | ||
#### `broadcast.run()` | ||
#### `broadcast.open() => Promise` | ||
Initialize the cache and runs the defined subscription. | ||
#### `broadcast.stop()` | ||
#### `broadcast.close() => Promise` | ||
@@ -131,0 +116,0 @@ Clear the cache and unsubscribe from incoming messages. |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
65701
5
9
280
134
+ Addedlru@^3.1.0
+ Addednanoresource-promise@^2.0.0
+ Addedinherits@2.0.4(transitive)
+ Addedlru@3.1.0(transitive)
+ Addednanoresource-promise@2.0.0(transitive)