Socket
Socket
Sign inDemoInstall

broadcast-channel

Package Overview
Dependencies
Maintainers
1
Versions
98
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

broadcast-channel - npm Package Compare versions

Comparing version 1.2.5 to 1.2.6

dist/es/oblivious-set.js

16

dist/es/index.js

@@ -39,4 +39,5 @@ import { isPromise } from './util.js';

var time = this.method.microSeconds();
var msgObj = {
time: new Date().getTime(),
time: time,
type: type,

@@ -62,3 +63,3 @@ data: msg

set onmessage(fn) {
var time = new Date().getTime();
var time = this.method.microSeconds();
var listenObj = {

@@ -78,3 +79,3 @@ time: time,

addEventListener: function addEventListener(type, fn) {
var time = new Date().getTime();
var time = this.method.microSeconds();
var listenObj = {

@@ -155,2 +156,7 @@ time: time,

channel._addEventListeners[msgObj.type].forEach(function (obj) {
/*
console.log('got message for ' + channel._state.uuid);
console.log('... message time:' + channel._state.uuid + ' - ' + msgObj.time);
console.log('listener time:' + channel._state.uuid + ' - ' + obj.time);
*/
if (msgObj.time >= obj.time) {

@@ -162,3 +168,3 @@ obj.fn(msgObj.data);

var time = new Date().getTime() - 5;
var time = channel.method.microSeconds();
if (channel._preparePromise) {

@@ -180,3 +186,3 @@ channel._preparePromise.then(function () {

channel._isListening = false;
var time = new Date().getTime() - 5;
var time = channel.method.microSeconds();
channel.method.onMessage(channel._state, null, time);

@@ -183,0 +189,0 @@ }

@@ -9,4 +9,7 @@ /**

import { sleep, randomInt, randomToken } from '../util.js';
import { sleep, randomInt, randomToken, microSeconds as micro } from '../util.js';
export var microSeconds = micro;
import ObliviousSet from '../oblivious-set';
import { fillOptionsWithDefaults } from '../options';

@@ -171,3 +174,5 @@

// contains all messages that have been emitted before
emittedMessagesIds: new Set(),
emittedMessagesIds: new ObliviousSet(options.idb.ttl * 2),
// ensures we do not read messages in parrallel
writeBlockPromise: Promise.resolve(),
messagesCallback: null,

@@ -199,2 +204,9 @@ readQueuePromises: [],

function _filterMessage(msgObj, state) {
if (msgObj.uuid === state.uuid) return false; // send by own
if (state.emittedMessagesIds.has(msgObj.id)) return false; // already emitted
if (msgObj.data.time < state.messagesCallbackTime) return false; // older then onMessageCallback
return true;
}
/**

@@ -204,2 +216,6 @@ * reads all new messages from the database and emits them

function readNewMessages(state) {
// if no one is listening, we do not need to scan for new messages
if (!state.messagesCallback) return Promise.resolve();
return getMessagesHigherThen(state.db, state.lastCursorId).then(function (newerMessages) {

@@ -212,22 +228,9 @@ var useMessages = newerMessages.map(function (msgObj) {

}).filter(function (msgObj) {
return msgObj.uuid !== state.uuid;
}) // not send by own
.filter(function (msgObj) {
return !state.emittedMessagesIds.has(msgObj.id);
}) // not already emitted
.filter(function (msgObj) {
return msgObj.time >= state.messagesCallbackTime;
}) // not older then onMessageCallback
.sort(function (msgObjA, msgObjB) {
return _filterMessage(msgObj, state);
}).sort(function (msgObjA, msgObjB) {
return msgObjA.time - msgObjB.time;
}); // sort by time
useMessages.forEach(function (msgObj) {
if (state.messagesCallback) {
state.emittedMessagesIds.add(msgObj.id);
setTimeout(function () {
return state.emittedMessagesIds['delete'](msgObj.id);
}, state.options.idb.ttl * 2);
state.messagesCallback(msgObj.data);

@@ -247,3 +250,6 @@ }

export function postMessage(channelState, messageJson) {
return writeMessage(channelState.db, channelState.uuid, messageJson).then(function () {
channelState.writeBlockPromise = channelState.writeBlockPromise.then(function () {
return writeMessage(channelState.db, channelState.uuid, messageJson);
}).then(function () {
if (randomInt(0, 10) === 0) {

@@ -253,2 +259,4 @@ /* await (do not await) */cleanOldMessages(channelState.db, channelState.options.idb.ttl);

});
return channelState.writeBlockPromise;
}

@@ -255,0 +263,0 @@

@@ -10,7 +10,10 @@ /**

var isNode = require('detect-node');
import ObliviousSet from '../oblivious-set';
import { fillOptionsWithDefaults } from '../options';
import { sleep, randomToken } from '../util';
import { sleep, randomToken, microSeconds as micro } from '../util';
export var microSeconds = micro;
var KEY_PREFIX = 'pubkey.broadcastChannel-';

@@ -98,3 +101,3 @@ export var type = 'localstorage';

// contains all messages that have been emitted before
var emittedMessagesIds = new Set();
var emittedMessagesIds = new ObliviousSet(options.localstorage.removeTimeout);

@@ -113,8 +116,5 @@ var state = {

if (!msgObj.token || emittedMessagesIds.has(msgObj.token)) return; // already emitted
if (msgObj.time && msgObj.time < state.messagesCallbackTime) return; // too old
if (msgObj.data.time && msgObj.data.time < state.messagesCallbackTime) return; // too old
emittedMessagesIds.add(msgObj.token);
setTimeout(function () {
return emittedMessagesIds['delete'](msgObj.token);
}, options.localstorage.removeTimeout);
state.messagesCallback(msgObj.data);

@@ -121,0 +121,0 @@ });

var isNode = require('detect-node');
import { randomToken, microSeconds as micro } from '../util';
export var microSeconds = micro;
export var type = 'native';

@@ -8,2 +12,3 @@

var state = {
uuid: randomToken(10),
channelName: channelName,

@@ -10,0 +15,0 @@ options: options,

@@ -14,6 +14,7 @@ import _regeneratorRuntime from 'babel-runtime/regenerator';

import * as path from 'path';
import micro from 'nano-time';
import { sha3_224 } from 'js-sha3';
import isNode from 'detect-node';
import IdleQueue from 'custom-idle-queue';
import unload from 'unload';

@@ -25,2 +26,4 @@

import ObliviousSet from '../oblivious-set';
/**

@@ -47,2 +50,3 @@ * windows sucks, so we have handle windows-type of socket-paths

var TMP_FOLDER_NAME = 'pubkey.broadcast-channel';
var OTHER_INSTANCES = {};

@@ -143,3 +147,3 @@ var getPathsCache = new Map();

return writeFile(pathToFile, JSON.stringify({
time: new Date().getTime()
time: microSeconds()
})).then(function () {

@@ -169,4 +173,2 @@ return pathToFile;

stream.on('data', function (msg) {
// console.log('server: got data:');
// console.dir(msg.toString());
emitter.emit('data', msg.toString());

@@ -247,3 +249,3 @@ });

case 0:
time = new Date().getTime();
time = microSeconds();
writeObject = {

@@ -406,6 +408,6 @@ uuid: readerUuid,

case 0:
olderThen = new Date().getTime() - ttl;
olderThen = Date.now() - ttl;
_context8.next = 3;
return Promise.all(messageObjects.filter(function (obj) {
return obj.time < olderThen;
return obj.time / 1000 < olderThen;
}).map(function (obj) {

@@ -436,3 +438,3 @@ return unlink(obj.path)['catch'](function () {

var uuid, writeQueue, state, _ref10, socketEE, infoFilePath;
var time, uuid, state, _ref10, socketEE, infoFilePath;

@@ -444,13 +446,10 @@ return _regeneratorRuntime.wrap(function _callee9$(_context9) {

options = fillOptionsWithDefaults(options);
_context9.next = 3;
time = microSeconds();
_context9.next = 4;
return ensureFoldersExist(channelName);
case 3:
case 4:
uuid = randomToken(10);
// ensures we do not read messages in parrallel
writeQueue = new IdleQueue(1);
state = {
time: time,
channelName: channelName,

@@ -460,6 +459,7 @@ options: options,

// contains all messages that have been emitted before
emittedMessagesIds: new Set(),
emittedMessagesIds: new ObliviousSet(options.node.ttl * 2),
messagesCallbackTime: null,
messagesCallback: null,
writeQueue: writeQueue,
// ensures we do not read messages in parrallel
writeBlockPromise: Promise.resolve(),
otherReaderClients: {},

@@ -472,6 +472,11 @@ // ensure if process crashes, everything is cleaned up

};
_context9.next = 8;
if (!OTHER_INSTANCES[channelName]) OTHER_INSTANCES[channelName] = [];
OTHER_INSTANCES[channelName].push(state);
_context9.next = 10;
return Promise.all([createSocketEventEmitter(channelName, uuid), createSocketInfoFile(channelName, uuid), refreshReaderClients(state)]);
case 8:
case 10:
_ref10 = _context9.sent;

@@ -486,4 +491,8 @@ socketEE = _ref10[0];

socketEE.emitter.on('data', function (data) {
var obj = JSON.parse(data);
handleMessagePing(state, obj);
try {
var obj = JSON.parse(data);
handleMessagePing(state, obj);
} catch (err) {
throw new Error('could not parse data: ' + data);
}
});

@@ -493,3 +502,3 @@

case 15:
case 17:
case 'end':

@@ -508,5 +517,18 @@ return _context9.stop();

export function _filterMessage(msgObj, state) {
/* console.log('_filterMessage()');
console.dir(msgObj);
console.log(msgObj.senderUuid === state.uuid);
console.log(state.emittedMessagesIds.has(msgObj.token));
console.log(!state.messagesCallback);
console.log(msgObj.time < state.messagesCallbackTime);
console.log(msgObj.time < state.time);*/
if (msgObj.senderUuid === state.uuid) return false; // not send by own
if (state.emittedMessagesIds.has(msgObj.token)) return false; // not already emitted
if (!state.messagesCallback) return false; // no listener
if (msgObj.time < state.messagesCallbackTime) return false; // not older then onMessageCallback
if (msgObj.time < state.time) return false; // msgObj is older then channel
state.emittedMessagesIds.add(msgObj.token);
return true;

@@ -582,7 +604,5 @@ }

state.emittedMessagesIds.add(msgObj.token);
setTimeout(function () {
return state.emittedMessagesIds['delete'](msgObj.token);
}, state.options.node.ttl * 2);
if (state.messagesCallback) {
// emit to subscribers
state.messagesCallback(msgObj.content.data);

@@ -728,56 +748,97 @@ }

// ensure we do this not in parallel
return channelState.writeQueue.requestIdlePromise().then(function () {
return channelState.writeQueue.wrapCall(_asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee14() {
var _ref16, msgObj, pingStr;
var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson);
return _regeneratorRuntime.wrap(function _callee14$(_context14) {
while (1) {
switch (_context14.prev = _context14.next) {
case 0:
_context14.next = 2;
return Promise.all([writeMessage(channelState.channelName, channelState.uuid, messageJson), refreshReaderClients(channelState)]);
channelState.writeBlockPromise = channelState.writeBlockPromise.then(_asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee14() {
var _ref16, msgObj, pingStr;
case 2:
_ref16 = _context14.sent;
msgObj = _ref16[0];
pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}';
_context14.next = 7;
return Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) {
return client.writable;
}) // client might have closed in between
.map(function (client) {
return new Promise(function (res) {
client.write(pingStr, res);
});
}));
return _regeneratorRuntime.wrap(function _callee14$(_context14) {
while (1) {
switch (_context14.prev = _context14.next) {
case 0:
_context14.next = 2;
return new Promise(function (res) {
return setTimeout(res, 0);
});
case 7:
case 2:
_context14.next = 4;
return new Promise(function (res) {
return setTimeout(res, 0);
});
/**
* clean up old messages
* to not waste resources on cleaning up,
* only if random-int matches, we clean up old messages
*/
if (randomInt(0, 50) === 0) {
/* await */getAllMessages(channelState.channelName).then(function (allMessages) {
return cleanOldMessages(allMessages, channelState.options.node.ttl);
});
}
case 4:
_context14.next = 6;
return Promise.all([writePromise, refreshReaderClients(channelState)]);
// emit to own eventEmitter
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson)));
case 6:
_ref16 = _context14.sent;
msgObj = _ref16[0];
case 8:
case 'end':
return _context14.stop();
}
emitOverFastPath(channelState, msgObj, messageJson);
pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}';
_context14.next = 12;
return Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) {
return client.writable;
}) // client might have closed in between
.map(function (client) {
return new Promise(function (res) {
client.write(pingStr, res);
});
}));
case 12:
/**
* clean up old messages
* to not waste resources on cleaning up,
* only if random-int matches, we clean up old messages
*/
if (randomInt(0, 20) === 0) {
/* await */getAllMessages(channelState.channelName).then(function (allMessages) {
return cleanOldMessages(allMessages, channelState.options.node.ttl);
});
}
// emit to own eventEmitter
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson)));
case 13:
case 'end':
return _context14.stop();
}
}, _callee14, _this2);
})));
}
}, _callee14, _this2);
})));
return channelState.writeBlockPromise;
}
/**
* When multiple BroadcastChannels with the same name
* are created in a single node-process, we can access them directly and emit messages.
* This might not happen often in production
* but will speed up things when this module is used in unit-tests.
*/
export function emitOverFastPath(state, msgObj, messageJson) {
if (!state.options.node.useFastPath) return; // disabled
var others = OTHER_INSTANCES[state.channelName].filter(function (s) {
return s !== state;
});
var checkObj = {
time: msgObj.time,
senderUuid: msgObj.uuid,
token: msgObj.token
};
others.filter(function (otherState) {
return _filterMessage(checkObj, otherState);
}).forEach(function (otherState) {
// console.log('EMIT OVER FAST PATH');
otherState.messagesCallback(messageJson);
});
}
export function onMessage(channelState, fn) {
var time = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : new Date().getTime();
var time = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : microSeconds();

@@ -792,2 +853,6 @@ channelState.messagesCallbackTime = time;

channelState.closed = true;
channelState.emittedMessagesIds.clear();
OTHER_INSTANCES[channelState.channelName] = OTHER_INSTANCES[channelState.channelName].filter(function (o) {
return o !== channelState;
});

@@ -805,3 +870,2 @@ if (typeof channelState.removeUnload === 'function') channelState.removeUnload();

channelState.socketEE.emitter.removeAllListeners();
channelState.writeQueue.clear();

@@ -823,2 +887,6 @@ Object.values(channelState.otherReaderClients).forEach(function (client) {

return 50;
}
export function microSeconds() {
return parseInt(micro.microseconds());
}

@@ -21,4 +21,5 @@ export function fillOptionsWithDefaults(options) {

if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes;
if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true;
return options;
}

@@ -34,2 +34,24 @@ /**

}return text;
}
var lastMs = 0;
var additional = 0;
/**
* returns the current time in micro-seconds,
* WARNING: This is a pseudo-function
* Performance.now is not reliable in webworkers, so we just make sure to never return the same time.
* This is enough in browsers, and this function will not be used in nodejs.
* The main reason for this hack is to ensure that BroadcastChannel behaves equal to production when it is used in fast-running unit tests.
*/
export function microSeconds() {
var ms = new Date().getTime();
if (ms === lastMs) {
additional++;
return ms * 1000 + additional;
} else {
lastMs = ms;
additional = 0;
return ms * 1000;
}
}

@@ -45,4 +45,5 @@ 'use strict';

var time = this.method.microSeconds();
var msgObj = {
time: new Date().getTime(),
time: time,
type: type,

@@ -68,3 +69,3 @@ data: msg

set onmessage(fn) {
var time = new Date().getTime();
var time = this.method.microSeconds();
var listenObj = {

@@ -84,3 +85,3 @@ time: time,

addEventListener: function addEventListener(type, fn) {
var time = new Date().getTime();
var time = this.method.microSeconds();
var listenObj = {

@@ -161,2 +162,7 @@ time: time,

channel._addEventListeners[msgObj.type].forEach(function (obj) {
/*
console.log('got message for ' + channel._state.uuid);
console.log('... message time:' + channel._state.uuid + ' - ' + msgObj.time);
console.log('listener time:' + channel._state.uuid + ' - ' + obj.time);
*/
if (msgObj.time >= obj.time) {

@@ -168,3 +174,3 @@ obj.fn(msgObj.data);

var time = new Date().getTime() - 5;
var time = channel.method.microSeconds();
if (channel._preparePromise) {

@@ -186,3 +192,3 @@ channel._preparePromise.then(function () {

channel._isListening = false;
var time = new Date().getTime() - 5;
var time = channel.method.microSeconds();
channel.method.onMessage(channel._state, null, time);

@@ -189,0 +195,0 @@ }

@@ -6,3 +6,3 @@ 'use strict';

});
exports.type = undefined;
exports.type = exports.microSeconds = undefined;
exports.getIdb = getIdb;

@@ -25,4 +25,10 @@ exports.createDatabase = createDatabase;

var _obliviousSet = require('../oblivious-set');
var _obliviousSet2 = _interopRequireDefault(_obliviousSet);
var _options = require('../options');
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { 'default': obj }; }
/**

@@ -36,2 +42,5 @@ * this method uses indexeddb to store the messages

var microSeconds = exports.microSeconds = _util.microSeconds;
var DB_PREFIX = 'pubkey.broadcast-channel-0-';

@@ -194,3 +203,5 @@ var OBJECT_STORE_ID = 'messages';

// contains all messages that have been emitted before
emittedMessagesIds: new Set(),
emittedMessagesIds: new _obliviousSet2['default'](options.idb.ttl * 2),
// ensures we do not read messages in parrallel
writeBlockPromise: Promise.resolve(),
messagesCallback: null,

@@ -222,2 +233,9 @@ readQueuePromises: [],

function _filterMessage(msgObj, state) {
if (msgObj.uuid === state.uuid) return false; // send by own
if (state.emittedMessagesIds.has(msgObj.id)) return false; // already emitted
if (msgObj.data.time < state.messagesCallbackTime) return false; // older then onMessageCallback
return true;
}
/**

@@ -227,2 +245,6 @@ * reads all new messages from the database and emits them

function readNewMessages(state) {
// if no one is listening, we do not need to scan for new messages
if (!state.messagesCallback) return Promise.resolve();
return getMessagesHigherThen(state.db, state.lastCursorId).then(function (newerMessages) {

@@ -235,22 +257,9 @@ var useMessages = newerMessages.map(function (msgObj) {

}).filter(function (msgObj) {
return msgObj.uuid !== state.uuid;
}) // not send by own
.filter(function (msgObj) {
return !state.emittedMessagesIds.has(msgObj.id);
}) // not already emitted
.filter(function (msgObj) {
return msgObj.time >= state.messagesCallbackTime;
}) // not older then onMessageCallback
.sort(function (msgObjA, msgObjB) {
return _filterMessage(msgObj, state);
}).sort(function (msgObjA, msgObjB) {
return msgObjA.time - msgObjB.time;
}); // sort by time
useMessages.forEach(function (msgObj) {
if (state.messagesCallback) {
state.emittedMessagesIds.add(msgObj.id);
setTimeout(function () {
return state.emittedMessagesIds['delete'](msgObj.id);
}, state.options.idb.ttl * 2);
state.messagesCallback(msgObj.data);

@@ -270,3 +279,6 @@ }

function postMessage(channelState, messageJson) {
return writeMessage(channelState.db, channelState.uuid, messageJson).then(function () {
channelState.writeBlockPromise = channelState.writeBlockPromise.then(function () {
return writeMessage(channelState.db, channelState.uuid, messageJson);
}).then(function () {
if ((0, _util.randomInt)(0, 10) === 0) {

@@ -276,2 +288,4 @@ /* await (do not await) */cleanOldMessages(channelState.db, channelState.options.idb.ttl);

});
return channelState.writeBlockPromise;
}

@@ -278,0 +292,0 @@

@@ -6,3 +6,3 @@ 'use strict';

});
exports.type = undefined;
exports.type = exports.microSeconds = undefined;
exports.getLocalStorage = getLocalStorage;

@@ -19,2 +19,6 @@ exports.storageKey = storageKey;

var _obliviousSet = require('../oblivious-set');
var _obliviousSet2 = _interopRequireDefault(_obliviousSet);
var _options = require('../options');

@@ -24,2 +28,4 @@

function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { 'default': obj }; }
/**

@@ -34,2 +40,3 @@ * A localStorage-only method which uses localstorage and its 'storage'-event

var isNode = require('detect-node');
var microSeconds = exports.microSeconds = _util.microSeconds;

@@ -118,3 +125,3 @@ var KEY_PREFIX = 'pubkey.broadcastChannel-';

// contains all messages that have been emitted before
var emittedMessagesIds = new Set();
var emittedMessagesIds = new _obliviousSet2['default'](options.localstorage.removeTimeout);

@@ -133,8 +140,5 @@ var state = {

if (!msgObj.token || emittedMessagesIds.has(msgObj.token)) return; // already emitted
if (msgObj.time && msgObj.time < state.messagesCallbackTime) return; // too old
if (msgObj.data.time && msgObj.data.time < state.messagesCallbackTime) return; // too old
emittedMessagesIds.add(msgObj.token);
setTimeout(function () {
return emittedMessagesIds['delete'](msgObj.token);
}, options.localstorage.removeTimeout);
state.messagesCallback(msgObj.data);

@@ -141,0 +145,0 @@ });

@@ -6,2 +6,3 @@ 'use strict';

});
exports.type = exports.microSeconds = undefined;
exports.create = create;

@@ -13,4 +14,9 @@ exports.close = close;

exports.averageResponseTime = averageResponseTime;
var _util = require('../util');
var isNode = require('detect-node');
var microSeconds = exports.microSeconds = _util.microSeconds;
var type = exports.type = 'native';

@@ -21,2 +27,3 @@

var state = {
uuid: (0, _util.randomToken)(10),
channelName: channelName,

@@ -23,0 +30,0 @@ options: options,

@@ -91,4 +91,2 @@ 'use strict';

stream.on('data', function (msg) {
// console.log('server: got data:');
// console.dir(msg.toString());
emitter.emit('data', msg.toString());

@@ -171,3 +169,3 @@ });

case 0:
time = new Date().getTime();
time = microSeconds();
writeObject = {

@@ -315,6 +313,6 @@ uuid: readerUuid,

case 0:
olderThen = new Date().getTime() - ttl;
olderThen = Date.now() - ttl;
_context8.next = 3;
return Promise.all(messageObjects.filter(function (obj) {
return obj.time < olderThen;
return obj.time / 1000 < olderThen;
}).map(function (obj) {

@@ -343,3 +341,3 @@ return unlink(obj.path)['catch'](function () {

var uuid, writeQueue, state, _ref10, _ref11, socketEE, infoFilePath;
var time, uuid, state, _ref10, _ref11, socketEE, infoFilePath;

@@ -351,13 +349,10 @@ return _regenerator2['default'].wrap(function _callee9$(_context9) {

options = (0, _options.fillOptionsWithDefaults)(options);
_context9.next = 3;
time = microSeconds();
_context9.next = 4;
return ensureFoldersExist(channelName);
case 3:
case 4:
uuid = (0, _util2.randomToken)(10);
// ensures we do not read messages in parrallel
writeQueue = new _customIdleQueue2['default'](1);
state = {
time: time,
channelName: channelName,

@@ -367,6 +362,7 @@ options: options,

// contains all messages that have been emitted before
emittedMessagesIds: new Set(),
emittedMessagesIds: new _obliviousSet2['default'](options.node.ttl * 2),
messagesCallbackTime: null,
messagesCallback: null,
writeQueue: writeQueue,
// ensures we do not read messages in parrallel
writeBlockPromise: Promise.resolve(),
otherReaderClients: {},

@@ -379,6 +375,11 @@ // ensure if process crashes, everything is cleaned up

};
_context9.next = 8;
if (!OTHER_INSTANCES[channelName]) OTHER_INSTANCES[channelName] = [];
OTHER_INSTANCES[channelName].push(state);
_context9.next = 10;
return Promise.all([createSocketEventEmitter(channelName, uuid), createSocketInfoFile(channelName, uuid), refreshReaderClients(state)]);
case 8:
case 10:
_ref10 = _context9.sent;

@@ -394,4 +395,8 @@ _ref11 = (0, _slicedToArray3['default'])(_ref10, 2);

socketEE.emitter.on('data', function (data) {
var obj = JSON.parse(data);
handleMessagePing(state, obj);
try {
var obj = JSON.parse(data);
handleMessagePing(state, obj);
} catch (err) {
throw new Error('could not parse data: ' + data);
}
});

@@ -401,3 +406,3 @@

case 16:
case 18:
case 'end':

@@ -482,7 +487,5 @@ return _context9.stop();

state.emittedMessagesIds.add(msgObj.token);
setTimeout(function () {
return state.emittedMessagesIds['delete'](msgObj.token);
}, state.options.node.ttl * 2);
if (state.messagesCallback) {
// emit to subscribers
state.messagesCallback(msgObj.content.data);

@@ -634,2 +637,3 @@ }

exports.postMessage = postMessage;
exports.emitOverFastPath = emitOverFastPath;
exports.onMessage = onMessage;

@@ -639,2 +643,3 @@ exports.close = close;

exports.averageResponseTime = averageResponseTime;
exports.microSeconds = microSeconds;

@@ -665,2 +670,6 @@ var _util = require('util');

var _nanoTime = require('nano-time');
var _nanoTime2 = _interopRequireDefault(_nanoTime);
var _jsSha = require('js-sha3');

@@ -672,6 +681,2 @@

var _customIdleQueue = require('custom-idle-queue');
var _customIdleQueue2 = _interopRequireDefault(_customIdleQueue);
var _unload = require('unload');

@@ -685,2 +690,6 @@

var _obliviousSet = require('../oblivious-set');
var _obliviousSet2 = _interopRequireDefault(_obliviousSet);
function _interopRequireWildcard(obj) { if (obj && obj.__esModule) { return obj; } else { var newObj = {}; if (obj != null) { for (var key in obj) { if (Object.prototype.hasOwnProperty.call(obj, key)) newObj[key] = obj[key]; } } newObj['default'] = obj; return newObj; } }

@@ -694,7 +703,2 @@

*/
/**
* this method is used in nodejs-environments.
* The ipc is handled via sockets and file-writes to the tmp-folder
*/
function cleanPipeName(str) {

@@ -708,3 +712,6 @@ if (process.platform === 'win32' && !str.startsWith('\\\\.\\pipe\\')) {

}
}
} /**
* this method is used in nodejs-environments.
* The ipc is handled via sockets and file-writes to the tmp-folder
*/

@@ -718,2 +725,3 @@ var mkdir = util.promisify(fs.mkdir);

var TMP_FOLDER_NAME = 'pubkey.broadcast-channel';
var OTHER_INSTANCES = {};

@@ -762,3 +770,3 @@ var getPathsCache = new Map();

return writeFile(pathToFile, JSON.stringify({
time: new Date().getTime()
time: microSeconds()
})).then(function () {

@@ -787,5 +795,18 @@ return pathToFile;

function _filterMessage(msgObj, state) {
/* console.log('_filterMessage()');
console.dir(msgObj);
console.log(msgObj.senderUuid === state.uuid);
console.log(state.emittedMessagesIds.has(msgObj.token));
console.log(!state.messagesCallback);
console.log(msgObj.time < state.messagesCallbackTime);
console.log(msgObj.time < state.time);*/
if (msgObj.senderUuid === state.uuid) return false; // not send by own
if (state.emittedMessagesIds.has(msgObj.token)) return false; // not already emitted
if (!state.messagesCallback) return false; // no listener
if (msgObj.time < state.messagesCallbackTime) return false; // not older then onMessageCallback
if (msgObj.time < state.time) return false; // msgObj is older then channel
state.emittedMessagesIds.add(msgObj.token);
return true;

@@ -795,57 +816,98 @@ }function postMessage(channelState, messageJson) {

// ensure we do this not in parallel
return channelState.writeQueue.requestIdlePromise().then(function () {
return channelState.writeQueue.wrapCall((0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee14() {
var _ref17, _ref18, msgObj, pingStr;
var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson);
return _regenerator2['default'].wrap(function _callee14$(_context14) {
while (1) {
switch (_context14.prev = _context14.next) {
case 0:
_context14.next = 2;
return Promise.all([writeMessage(channelState.channelName, channelState.uuid, messageJson), refreshReaderClients(channelState)]);
channelState.writeBlockPromise = channelState.writeBlockPromise.then((0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee14() {
var _ref17, _ref18, msgObj, pingStr;
case 2:
_ref17 = _context14.sent;
_ref18 = (0, _slicedToArray3['default'])(_ref17, 1);
msgObj = _ref18[0];
pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}';
_context14.next = 8;
return Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) {
return client.writable;
}) // client might have closed in between
.map(function (client) {
return new Promise(function (res) {
client.write(pingStr, res);
});
}));
return _regenerator2['default'].wrap(function _callee14$(_context14) {
while (1) {
switch (_context14.prev = _context14.next) {
case 0:
_context14.next = 2;
return new Promise(function (res) {
return setTimeout(res, 0);
});
case 8:
case 2:
_context14.next = 4;
return new Promise(function (res) {
return setTimeout(res, 0);
});
/**
* clean up old messages
* to not waste resources on cleaning up,
* only if random-int matches, we clean up old messages
*/
if ((0, _util2.randomInt)(0, 50) === 0) {
/* await */getAllMessages(channelState.channelName).then(function (allMessages) {
return cleanOldMessages(allMessages, channelState.options.node.ttl);
});
}
case 4:
_context14.next = 6;
return Promise.all([writePromise, refreshReaderClients(channelState)]);
// emit to own eventEmitter
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson)));
case 6:
_ref17 = _context14.sent;
_ref18 = (0, _slicedToArray3['default'])(_ref17, 1);
msgObj = _ref18[0];
case 9:
case 'end':
return _context14.stop();
}
emitOverFastPath(channelState, msgObj, messageJson);
pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}';
_context14.next = 13;
return Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) {
return client.writable;
}) // client might have closed in between
.map(function (client) {
return new Promise(function (res) {
client.write(pingStr, res);
});
}));
case 13:
/**
* clean up old messages
* to not waste resources on cleaning up,
* only if random-int matches, we clean up old messages
*/
if ((0, _util2.randomInt)(0, 20) === 0) {
/* await */getAllMessages(channelState.channelName).then(function (allMessages) {
return cleanOldMessages(allMessages, channelState.options.node.ttl);
});
}
// emit to own eventEmitter
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson)));
case 14:
case 'end':
return _context14.stop();
}
}, _callee14, _this2);
})));
}
}, _callee14, _this2);
})));
return channelState.writeBlockPromise;
}
/**
* When multiple BroadcastChannels with the same name
* are created in a single node-process, we can access them directly and emit messages.
* This might not happen often in production
* but will speed up things when this module is used in unit-tests.
*/
function emitOverFastPath(state, msgObj, messageJson) {
if (!state.options.node.useFastPath) return; // disabled
var others = OTHER_INSTANCES[state.channelName].filter(function (s) {
return s !== state;
});
var checkObj = {
time: msgObj.time,
senderUuid: msgObj.uuid,
token: msgObj.token
};
others.filter(function (otherState) {
return _filterMessage(checkObj, otherState);
}).forEach(function (otherState) {
// console.log('EMIT OVER FAST PATH');
otherState.messagesCallback(messageJson);
});
}
function onMessage(channelState, fn) {
var time = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : new Date().getTime();
var time = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : microSeconds();

@@ -860,2 +922,6 @@ channelState.messagesCallbackTime = time;

channelState.closed = true;
channelState.emittedMessagesIds.clear();
OTHER_INSTANCES[channelState.channelName] = OTHER_INSTANCES[channelState.channelName].filter(function (o) {
return o !== channelState;
});

@@ -873,3 +939,2 @@ if (typeof channelState.removeUnload === 'function') channelState.removeUnload();

channelState.socketEE.emitter.removeAllListeners();
channelState.writeQueue.clear();

@@ -891,2 +956,6 @@ Object.values(channelState.otherReaderClients).forEach(function (client) {

return 50;
}
function microSeconds() {
return parseInt(_nanoTime2['default'].microseconds());
}

@@ -27,4 +27,5 @@ 'use strict';

if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes;
if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true;
return options;
}

@@ -10,2 +10,3 @@ 'use strict';

exports.randomToken = randomToken;
exports.microSeconds = microSeconds;
/**

@@ -44,2 +45,24 @@ * returns true if the given object is a promise

}return text;
}
var lastMs = 0;
var additional = 0;
/**
* returns the current time in micro-seconds,
* WARNING: This is a pseudo-function
* Performance.now is not reliable in webworkers, so we just make sure to never return the same time.
* This is enough in browsers, and this function will not be used in nodejs.
* The main reason for this hack is to ensure that BroadcastChannel behaves equal to production when it is used in fast-running unit tests.
*/
function microSeconds() {
var ms = new Date().getTime();
if (ms === lastMs) {
additional++;
return ms * 1000 + additional;
} else {
lastMs = ms;
additional = 0;
return ms * 1000;
}
}

@@ -16,2 +16,3 @@ declare type MethodType = 'node' | 'idb' | 'native' | 'localstorage';

ttl?: number;
useFastPath?: boolean;
};

@@ -18,0 +19,0 @@ idb?: {

{
"name": "broadcast-channel",
"version": "1.2.5",
"version": "1.2.6",
"description": "A BroadcastChannel implementation that works with new browsers, older browsers and Node.js",

@@ -63,2 +63,4 @@ "homepage": "https://github.com/pubkey/broadcast-channel#readme",

"js-sha3": "0.7.0",
"microseconds": "0.1.0",
"nano-time": "1.0.0",
"unload": "1.3.9"

@@ -65,0 +67,0 @@ },

@@ -96,3 +96,3 @@ BEFORE:

-----------------------------------------
16. July.2018: test:performance
test:performance

@@ -150,1 +150,60 @@ BEFORE: {

-----------------------------------------
test:performance
BEFORE: {
"openClose": 714.9132689982653,
"sendRecieve": {
"parallel": 6018.035248000175,
"series": 4019.5094799995422
}
}
AFTER: { // write message up front
"openClose": 703.9341719998047,
"sendRecieve": {
"parallel": 233.59367400035262,
"series": 4531.717969999649
}
}
-----------------------------------------
-----------------------------------------
test:performance - forgetting set
BEFORE: {
"openClose": 703.9341719998047,
"sendRecieve": {
"parallel": 233.59367400035262,
"series": 4531.717969999649
}
}
AFTER: { // add fast path
"openClose": 698.5278329998255,
"sendRecieve": {
"parallel": 254.588275000453,
"series": 3679.5491359978914
}
}
-----------------------------------------
no idle-queue
BEFORE: {
"openClose": 720.8237979999976,
"sendRecieve": {
"parallel": 250.95046299998648,
"series": 3671.9275919999927
}
}
AFTER: {
"openClose": 684.5638470000122,
"sendRecieve": {
"parallel": 246.08427699981257,
"series": 2251.4478739998303
}
}
-----------------------------------------

@@ -45,4 +45,5 @@ import {

_post(type, msg) {
const time = this.method.microSeconds();
const msgObj = {
time: new Date().getTime(),
time,
type,

@@ -73,3 +74,3 @@ data: msg

set onmessage(fn) {
const time = new Date().getTime();
const time = this.method.microSeconds();
const listenObj = {

@@ -89,3 +90,3 @@ time,

addEventListener(type, fn) {
const time = new Date().getTime();
const time = this.method.microSeconds();
const listenObj = {

@@ -161,2 +162,7 @@ time,

channel._addEventListeners[msgObj.type].forEach(obj => {
/*
console.log('got message for ' + channel._state.uuid);
console.log('... message time:' + channel._state.uuid + ' - ' + msgObj.time);
console.log('listener time:' + channel._state.uuid + ' - ' + obj.time);
*/
if (msgObj.time >= obj.time) {

@@ -168,3 +174,3 @@ obj.fn(msgObj.data);

const time = new Date().getTime() - 5;
const time = channel.method.microSeconds();
if (channel._preparePromise) {

@@ -194,3 +200,3 @@ channel._preparePromise.then(() => {

channel._isListening = false;
const time = new Date().getTime() - 5;
const time = channel.method.microSeconds();
channel.method.onMessage(

@@ -197,0 +203,0 @@ channel._state,

@@ -12,5 +12,9 @@ /**

randomInt,
randomToken
randomToken,
microSeconds as micro
} from '../util.js';
export const microSeconds = micro;
import ObliviousSet from '../oblivious-set';
import {

@@ -172,3 +176,5 @@ fillOptionsWithDefaults

// contains all messages that have been emitted before
emittedMessagesIds: new Set(),
emittedMessagesIds: new ObliviousSet(options.idb.ttl * 2),
// ensures we do not read messages in parrallel
writeBlockPromise: Promise.resolve(),
messagesCallback: null,

@@ -198,2 +204,10 @@ readQueuePromises: [],

function _filterMessage(msgObj, state) {
if (msgObj.uuid === state.uuid) return false; // send by own
if (state.emittedMessagesIds.has(msgObj.id)) return false; // already emitted
if (msgObj.data.time < state.messagesCallbackTime) return false; // older then onMessageCallback
return true;
}
/**

@@ -203,2 +217,6 @@ * reads all new messages from the database and emits them

function readNewMessages(state) {
// if no one is listening, we do not need to scan for new messages
if (!state.messagesCallback) return Promise.resolve();
return getMessagesHigherThen(state.db, state.lastCursorId)

@@ -213,16 +231,7 @@ .then(newerMessages => {

})
.filter(msgObj => msgObj.uuid !== state.uuid) // not send by own
.filter(msgObj => !state.emittedMessagesIds.has(msgObj.id)) // not already emitted
.filter(msgObj => msgObj.time >= state.messagesCallbackTime) // not older then onMessageCallback
.filter(msgObj => _filterMessage(msgObj, state))
.sort((msgObjA, msgObjB) => msgObjA.time - msgObjB.time); // sort by time
useMessages.forEach(msgObj => {
if (state.messagesCallback) {
state.emittedMessagesIds.add(msgObj.id);
setTimeout(
() => state.emittedMessagesIds.delete(msgObj.id),
state.options.idb.ttl * 2
);
state.messagesCallback(msgObj.data);

@@ -242,14 +251,19 @@ }

export function postMessage(channelState, messageJson) {
return writeMessage(
channelState.db,
channelState.uuid,
messageJson
).then(() => {
if (randomInt(0, 10) === 0) {
/* await (do not await) */ cleanOldMessages(
channelState.db,
channelState.options.idb.ttl
);
}
});
channelState.writeBlockPromise = channelState.writeBlockPromise
.then(() => writeMessage(
channelState.db,
channelState.uuid,
messageJson
))
.then(() => {
if (randomInt(0, 10) === 0) {
/* await (do not await) */ cleanOldMessages(
channelState.db,
channelState.options.idb.ttl
);
}
});
return channelState.writeBlockPromise;
}

@@ -256,0 +270,0 @@

@@ -10,2 +10,3 @@ /**

const isNode = require('detect-node');
import ObliviousSet from '../oblivious-set';

@@ -18,5 +19,8 @@ import {

sleep,
randomToken
randomToken,
microSeconds as micro
} from '../util';
export const microSeconds = micro;
const KEY_PREFIX = 'pubkey.broadcastChannel-';

@@ -105,3 +109,3 @@ export const type = 'localstorage';

// contains all messages that have been emitted before
const emittedMessagesIds = new Set();
const emittedMessagesIds = new ObliviousSet(options.localstorage.removeTimeout);

@@ -123,9 +127,5 @@ const state = {

if (!msgObj.token || emittedMessagesIds.has(msgObj.token)) return; // already emitted
if (msgObj.time && msgObj.time < state.messagesCallbackTime) return; // too old
if (msgObj.data.time && msgObj.data.time < state.messagesCallbackTime) return; // too old
emittedMessagesIds.add(msgObj.token);
setTimeout(
() => emittedMessagesIds.delete(msgObj.token),
options.localstorage.removeTimeout
);
state.messagesCallback(msgObj.data);

@@ -132,0 +132,0 @@ }

const isNode = require('detect-node');
import {
randomToken,
microSeconds as micro
} from '../util';
export const microSeconds = micro;
export const type = 'native';
export function create(channelName, options) {
if(!options) options = {};
if (!options) options = {};
const state = {
uuid: randomToken(10),
channelName,

@@ -9,0 +17,0 @@ options,

@@ -12,2 +12,4 @@ /**

import * as path from 'path';
import micro from 'nano-time';
import {

@@ -18,3 +20,2 @@ sha3_224

import isNode from 'detect-node';
import IdleQueue from 'custom-idle-queue';
import unload from 'unload';

@@ -31,2 +32,4 @@

import ObliviousSet from '../oblivious-set';
/**

@@ -56,2 +59,3 @@ * windows sucks, so we have handle windows-type of socket-paths

const TMP_FOLDER_NAME = 'pubkey.broadcast-channel';
const OTHER_INSTANCES = {};

@@ -131,3 +135,3 @@ const getPathsCache = new Map();

JSON.stringify({
time: new Date().getTime()
time: microSeconds()
})

@@ -152,4 +156,2 @@ ).then(() => pathToFile);

stream.on('data', function (msg) {
// console.log('server: got data:');
// console.dir(msg.toString());
emitter.emit('data', msg.toString());

@@ -193,3 +195,3 @@ });

export async function writeMessage(channelName, readerUuid, messageJson) {
const time = new Date().getTime();
const time = microSeconds();
const writeObject = {

@@ -286,7 +288,6 @@ uuid: readerUuid,

export async function cleanOldMessages(messageObjects, ttl) {
const olderThen = new Date().getTime() - ttl;
const olderThen = Date.now() - ttl;
await Promise.all(
messageObjects
.filter(obj => obj.time < olderThen)
.filter(obj => (obj.time / 1000) < olderThen)
.map(obj => unlink(obj.path).catch(() => null))

@@ -302,11 +303,8 @@ );

options = fillOptionsWithDefaults(options);
const time = microSeconds();
await ensureFoldersExist(channelName);
const uuid = randomToken(10);
// ensures we do not read messages in parrallel
const writeQueue = new IdleQueue(1);
const state = {
time,
channelName,

@@ -316,6 +314,7 @@ options,

// contains all messages that have been emitted before
emittedMessagesIds: new Set(),
emittedMessagesIds: new ObliviousSet(options.node.ttl * 2),
messagesCallbackTime: null,
messagesCallback: null,
writeQueue,
// ensures we do not read messages in parrallel
writeBlockPromise: Promise.resolve(),
otherReaderClients: {},

@@ -327,2 +326,5 @@ // ensure if process crashes, everything is cleaned up

if (!OTHER_INSTANCES[channelName]) OTHER_INSTANCES[channelName] = [];
OTHER_INSTANCES[channelName].push(state);
const [

@@ -341,4 +343,8 @@ socketEE,

socketEE.emitter.on('data', data => {
const obj = JSON.parse(data);
handleMessagePing(state, obj);
try {
const obj = JSON.parse(data);
handleMessagePing(state, obj);
} catch (err) {
throw new Error('could not parse data: ' + data);
}
});

@@ -349,13 +355,22 @@

export function _filterMessage(msgObj, state) {
/* console.log('_filterMessage()');
console.dir(msgObj);
console.log(msgObj.senderUuid === state.uuid);
console.log(state.emittedMessagesIds.has(msgObj.token));
console.log(!state.messagesCallback);
console.log(msgObj.time < state.messagesCallbackTime);
console.log(msgObj.time < state.time);*/
export function _filterMessage(msgObj, state) {
if (msgObj.senderUuid === state.uuid) return false; // not send by own
if (state.emittedMessagesIds.has(msgObj.token)) return false; // not already emitted
if (!state.messagesCallback) return false; // no listener
if (msgObj.time < state.messagesCallbackTime) return false; // not older then onMessageCallback
if (msgObj.time < state.time) return false; // msgObj is older then channel
state.emittedMessagesIds.add(msgObj.token);
return true;
}
/**

@@ -366,2 +381,3 @@ * when the socket pings, so that we now new messages came,

export async function handleMessagePing(state, msgObj) {
/**

@@ -401,8 +417,5 @@ * when there are no listener, we do nothing

state.emittedMessagesIds.add(msgObj.token);
setTimeout(
() => state.emittedMessagesIds.delete(msgObj.token),
state.options.node.ttl * 2
);
if (state.messagesCallback) {
// emit to subscribers
state.messagesCallback(msgObj.content.data);

@@ -445,46 +458,75 @@ }

export function postMessage(channelState, messageJson) {
// ensure we do this not in parallel
return channelState.writeQueue.requestIdlePromise()
.then(
() => channelState.writeQueue.wrapCall(
async () => {
const [msgObj] = await Promise.all([
writeMessage(
channelState.channelName,
channelState.uuid,
messageJson
),
refreshReaderClients(channelState)
]);
const pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}';
await Promise.all(
Object.values(channelState.otherReaderClients)
.filter(client => client.writable) // client might have closed in between
.map(client => {
return new Promise(res => {
client.write(pingStr, res);
});
})
);
const writePromise = writeMessage(
channelState.channelName,
channelState.uuid,
messageJson
);
/**
* clean up old messages
* to not waste resources on cleaning up,
* only if random-int matches, we clean up old messages
*/
if (randomInt(0, 50) === 0) {
/* await */ getAllMessages(channelState.channelName)
.then(allMessages => cleanOldMessages(allMessages, channelState.options.node.ttl));
}
channelState.writeBlockPromise = channelState.writeBlockPromise.then(async () => {
// emit to own eventEmitter
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson)));
}
)
// w8 to ticks to let the buffer flush
await new Promise(res => setTimeout(res, 0));
await new Promise(res => setTimeout(res, 0));
const [msgObj] = await Promise.all([
writePromise,
refreshReaderClients(channelState)
]);
emitOverFastPath(channelState, msgObj, messageJson);
const pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}';
await Promise.all(
Object.values(channelState.otherReaderClients)
.filter(client => client.writable) // client might have closed in between
.map(client => {
return new Promise(res => {
client.write(pingStr, res);
});
})
);
/**
* clean up old messages
* to not waste resources on cleaning up,
* only if random-int matches, we clean up old messages
*/
if (randomInt(0, 20) === 0) {
/* await */ getAllMessages(channelState.channelName)
.then(allMessages => cleanOldMessages(allMessages, channelState.options.node.ttl));
}
// emit to own eventEmitter
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson)));
});
return channelState.writeBlockPromise;
}
/**
* When multiple BroadcastChannels with the same name
* are created in a single node-process, we can access them directly and emit messages.
* This might not happen often in production
* but will speed up things when this module is used in unit-tests.
*/
export function emitOverFastPath(state, msgObj, messageJson) {
if (!state.options.node.useFastPath) return; // disabled
const others = OTHER_INSTANCES[state.channelName].filter(s => s !== state);
export function onMessage(channelState, fn, time = new Date().getTime()) {
const checkObj = {
time: msgObj.time,
senderUuid: msgObj.uuid,
token: msgObj.token
};
others
.filter(otherState => _filterMessage(checkObj, otherState))
.forEach(otherState => {
// console.log('EMIT OVER FAST PATH');
otherState.messagesCallback(messageJson);
});
}
export function onMessage(channelState, fn, time = microSeconds()) {
channelState.messagesCallbackTime = time;

@@ -498,2 +540,4 @@ channelState.messagesCallback = fn;

channelState.closed = true;
channelState.emittedMessagesIds.clear();
OTHER_INSTANCES[channelState.channelName] = OTHER_INSTANCES[channelState.channelName].filter(o => o !== channelState);

@@ -510,3 +554,2 @@ if (typeof channelState.removeUnload === 'function')

channelState.socketEE.emitter.removeAllListeners();
channelState.writeQueue.clear();

@@ -524,5 +567,8 @@ Object.values(channelState.otherReaderClients)

export function averageResponseTime() {
return 50;
}
export function microSeconds() {
return parseInt(micro.microseconds());
}

@@ -22,4 +22,5 @@ export function fillOptionsWithDefaults(options) {

if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes;
if(typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true;
return options;
}

@@ -34,2 +34,25 @@ /**

return text;
}
let lastMs = 0;
let additional = 0;
/**
* returns the current time in micro-seconds,
* WARNING: This is a pseudo-function
* Performance.now is not reliable in webworkers, so we just make sure to never return the same time.
* This is enough in browsers, and this function will not be used in nodejs.
* The main reason for this hack is to ensure that BroadcastChannel behaves equal to production when it is used in fast-running unit tests.
*/
export function microSeconds() {
const ms = new Date().getTime();
if (ms === lastMs) {
additional++;
return ms * 1000 + additional;
} else {
lastMs = ms;
additional = 0;
return ms * 1000;
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc