Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@dxos/broadcast

Package Overview
Dependencies
Maintainers
13
Versions
735
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@dxos/broadcast - npm Package Compare versions

Comparing version 1.0.0-beta.7 to 1.0.0-beta.8

152

dist/broadcast.js

@@ -10,4 +10,2 @@ "use strict";

var _events = require("events");
var _crypto = _interopRequireDefault(require("crypto"));

@@ -19,4 +17,6 @@

var _timeLruSet = require("./time-lru-set");
var _emitter = require("nanoresource-promise/emitter");
var _lru = _interopRequireDefault(require("lru"));
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }

@@ -43,5 +43,4 @@

* @typedef {Object} Middleware
* @property {Function} lookup Async peer lookup.
* @property {Function} send Defines how to send the packet builded by the broadcast.
* @property {Function} subscribe Defines how to subscribe to incoming packets.
* @property {Function} subscribe Defines how to subscribe to incoming packets and update the internal list of peers.
*/

@@ -64,3 +63,3 @@

class Broadcast extends _events.EventEmitter {
class Broadcast extends _emitter.NanoresourcePromise {
/**

@@ -72,7 +71,6 @@ * @constructor

* @param {number} [options.maxAge=10000] Defines the max live time for the cache messages.
* @param {number} [options.maxSize=100] Defines the max size for the cache messages.
* @param {number} [options.maxSize=1000] Defines the max size for the cache messages.
*/
constructor(middleware, options = {}) {
(0, _assert.default)(middleware);
(0, _assert.default)(middleware.lookup);
(0, _assert.default)(middleware.send);

@@ -84,15 +82,13 @@ (0, _assert.default)(middleware.subscribe);

maxAge = 10 * 1000,
maxSize = 100
maxSize = 1000
} = options;
this._id = id;
this._lookup = this._buildLookup(() => middleware.lookup());
this._send = (...args) => middleware.send(...args);
this._subscribe = onPacket => middleware.subscribe(onPacket);
this._subscribe = next => middleware.subscribe(next);
this._running = false;
this._seenSeqs = new _timeLruSet.TimeLRUSet({
this._seenSeqs = new _lru.default({
maxAge,
maxSize
max: maxSize
});

@@ -103,2 +99,8 @@ this._peers = [];

this._codec.addJson(JSON.parse(schema)).build();
/** @deprecated */
if (middleware.lookup) {
this._lookup = () => middleware.lookup();
}
}

@@ -121,7 +123,2 @@ /**

(0, _assert.default)(Buffer.isBuffer(seqno));
if (!this._running) {
throw new Error('Broadcast not running.');
}
const packet = {

@@ -135,52 +132,80 @@ seqno,

/**
* Initialize the cache and runs the defined subscription.
* Update internal list of peers.
*
* @returns {undefined}
* @param {Array<Peer>} peers
*/
run() {
if (this._running) return;
this._running = true;
this._subscription = this._subscribe(packetEncoded => this._onPacket(packetEncoded)) || (() => {});
log('running %h', this._id);
updatePeers(peers) {
this._peers = peers;
}
/**
* Clear the cache and unsubscribe from incoming messages.
* Update internal cache options
*
* @returns {undefined}
* @param {{ maxAge: number, maxSize: number }} opts
*/
stop() {
if (!this._running) return;
this._running = false;
updateCache(opts = {}) {
if (opts.maxAge) {
this._seenSeqs.maxAge = opts.maxAge;
}
this._subscription();
if (opts.maxSize) {
this._seenSeqs.max = opts.maxSize;
}
}
/**
* Prune the internal cache items in timeout
*/
this._seenSeqs.clear();
log('stop %h', this._id);
pruneCache() {
for (const key of this._seenSeqs.keys) {
this._seenSeqs.peek(key);
}
}
/**
* Build a deferrer lookup.
* If we call the lookup several times it would runs once a wait for it.
*
* @param {Function} lookup
* @returns {Function}
* @deprecated
*/
_buildLookup(lookup) {
return async () => {
try {
this._peers = await lookup();
log('lookup of %h', this._id, this._peers);
} catch (err) {
this.emit('lookup-error', err);
}
run() {
this.open().catch(err => {
process.nextTick(() => this.emit('error', err));
});
}
/**
* @deprecated
*/
stop() {
this.close().catch(err => {
process.nextTick(() => this.emit('error', err));
});
}
_open() {
const onData = this._onPacket.bind(this);
const onPeers = this.updatePeers.bind(this); // deprecated the use of lookup
const next = this._lookup ? onData : {
onData,
onPeers
};
this._unsubscribe = this._subscribe(next) || (() => {});
log('running %h', this._id);
}
_close() {
this._unsubscribe();
this._seenSeqs.clear();
log('stop %h', this._id);
}
/**

@@ -196,3 +221,3 @@ * Publish and/or Forward a packet message to each peer neighboor.

async _publish(packet, options = {}) {
if (!this._running) return;
await this.open();

@@ -202,3 +227,3 @@ try {

if (this._seenSeqs.has(ownerId)) {
if (this._seenSeqs.get(ownerId)) {
return;

@@ -208,6 +233,8 @@ } // Seen it by me.

this._seenSeqs.add(ownerId);
this._seenSeqs.set(ownerId, true);
/** @deprecated */
await this._lookup(); // Update the package to set the current sender..
this._lookup && this.updatePeers(await this._lookup()); // Update the package to set the current sender..
packet = Object.assign({}, packet, {

@@ -220,7 +247,4 @@ from: this._id

const waitFor = this._peers.map(peer => {
if (!this._running) {
return;
} // Don't send the message to the "origin" peer.
if (!this.opened) return; // Don't send the message to the "origin" peer.
if (packet.origin.equals(peer.id)) {

@@ -231,3 +255,3 @@ return Promise.resolve();

if (this._seenSeqs.has(msgId(packet.seqno, peer.id))) {
if (this._seenSeqs.get(msgId(packet.seqno, peer.id))) {
return Promise.resolve();

@@ -238,5 +262,7 @@ }

this._seenSeqs.add(msgId(packet.seqno, peer.id));
this._seenSeqs.set(msgId(packet.seqno, peer.id), true);
return this._send(packetEncoded, peer, options).catch(err => {
return this._send(packetEncoded, peer, options).then(() => {
this.emit('send', packetEncoded, peer, options);
}).catch(err => {
this.emit('send-error', err);

@@ -262,3 +288,3 @@ });

_onPacket(packetEncoded) {
if (!this._running) return;
if (!this.opened) return;

@@ -274,6 +300,6 @@ try {

this._seenSeqs.add(msgId(packet.seqno, packet.from)); // Check if I already see this packet.
this._seenSeqs.set(msgId(packet.seqno, packet.from), true); // Check if I already see this packet.
if (this._seenSeqs.has(msgId(packet.seqno, this._id))) {
if (this._seenSeqs.get(msgId(packet.seqno, this._id))) {
return;

@@ -280,0 +306,0 @@ }

@@ -18,14 +18,2 @@ "use strict";

});
var _timeLruSet = require("./time-lru-set.js");
Object.keys(_timeLruSet).forEach(function (key) {
if (key === "default" || key === "__esModule") return;
Object.defineProperty(exports, key, {
enumerable: true,
get: function () {
return _timeLruSet[key];
}
});
});
//# sourceMappingURL=index.js.map
{
"name": "@dxos/broadcast",
"version": "1.0.0-beta.7",
"version": "1.0.0-beta.8",
"description": "Abstract module to send broadcast messages.",

@@ -41,2 +41,4 @@ "homepage": "https://github.com/dxos/broadcast#readme",

"debug": "^4.1.1",
"lru": "^3.1.0",
"nanoresource-promise": "^2.0.0",
"source-map-support": "^0.5.12"

@@ -43,0 +45,0 @@ },

@@ -57,5 +57,10 @@ # Broadcast

const middleware = {
lookup: async () => {
// Return the list of neighbors peers with the format:
// [{ id: Buffer, ...extraArgs }, { id: Buffer, ...extraArgs }]
subscribe: ({ onData, onPeers }) => {
// Defines how to process incoming data and peers update.
// on('peers', onPeers)
// on('data', onData)
return () => {
// Return a dispose function.
}
},

@@ -66,21 +71,2 @@ send: async (packet, node) => {

// "node" is the peer object generate from the lookup.
// e.g. If node is a stream
node.write(packet);
// e.g. If node is a websocket
node.send(packet);
},
subscribe: (onPacket) => {
// Defines how to process incoming packets.
// e.g. Using websockets
const onMessage = data => onPacket(data);
socket.on('message', onMessage);
// Return a dispose function.
return () => {
socket.off('message', onMessage);
}
}

@@ -91,12 +77,12 @@ };

id: crypto.randomBytes(32),
maxAge: 10 * 1000, // Timeout for each message in the LRU cache.
maxSize: 200 // Limit of messages in the LRU cache.
maxAge: 15 * 1000, // Timeout for each message in the LRU cache.
maxSize: 1000 // Limit of messages in the LRU cache.
})
// We initialize the middleware and subscription inside the broadcast.
broadcast.run()
await broadcast.open()
broadcast.publish(Buffer.from('Hello everyone'))
broadcast.stop()
await broadcast.close()
```

@@ -111,8 +97,7 @@

- `middleware`: The middleware defines an interface to connect the broadcast to any request/response solution.
- `lookup: () => Promise<Array<Peer>>`: Runs a lookup to get the ids of your peers neighboors.
- `Peer: { id: Buffer, ...props }`
- `subscribe: ({ onData, onPeers }) => unsubscribeFunction`: Defines how to subscribe to incoming packets and peers update.
- `onData: (data: Buffer) => (Packet|undefined)`: Callback to process incoming data. It returns true if the broadcast could decode the message or false if not.
- `onPeers: (peers: [Peer])`: Callback to update the internal list of peers. A `Peer` object must follow the spec: `{ id: Buffer, ...props }`
- `unsubscribeFunction: Function`: Defines a way to unsubscribe from listening messages if the broadcast stop working. Helpful if you are working with streams and event emitters.
- `send: (packet: Buffer, peer: Object) => Promise`: Defines how to send the packet builded by the broadcast.
- `subscribe: (onPacket) => unsubscribeFunction`: Defines how to subscribe to incoming packets.
- `onPacket: (data: Buffer) => (Packet|undefined)`: Callback to process incoming data. It returns true if the broadcast could decode the message or false if not.
- `unsubscribeFunction: Function`: Defines a way to unsubscribe from listening messages if the broadcast stop working. Helpful if you are working with streams and event emitters.

@@ -122,9 +107,9 @@ - `options`

- `maxAge: number`: Defines the max live time for the cache messages. Default: `10 * 1000`.
- `maxSize: number`: Defines the max size for the cache messages. Default: `100`.
- `maxSize: number`: Defines the max size for the cache messages. Default: `1000`.
#### `broadcast.run()`
#### `broadcast.open() => Promise`
Initialize the cache and runs the defined subscription.
#### `broadcast.stop()`
#### `broadcast.close() => Promise`

@@ -131,0 +116,0 @@ Clear the cache and unsubscribe from incoming messages.

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc