broadcast-channel
Advanced tools
Comparing version 1.2.5 to 1.2.6
@@ -39,4 +39,5 @@ import { isPromise } from './util.js'; | ||
var time = this.method.microSeconds(); | ||
var msgObj = { | ||
time: new Date().getTime(), | ||
time: time, | ||
type: type, | ||
@@ -62,3 +63,3 @@ data: msg | ||
set onmessage(fn) { | ||
var time = new Date().getTime(); | ||
var time = this.method.microSeconds(); | ||
var listenObj = { | ||
@@ -78,3 +79,3 @@ time: time, | ||
addEventListener: function addEventListener(type, fn) { | ||
var time = new Date().getTime(); | ||
var time = this.method.microSeconds(); | ||
var listenObj = { | ||
@@ -155,2 +156,7 @@ time: time, | ||
channel._addEventListeners[msgObj.type].forEach(function (obj) { | ||
/* | ||
console.log('got message for ' + channel._state.uuid); | ||
console.log('... message time:' + channel._state.uuid + ' - ' + msgObj.time); | ||
console.log('listener time:' + channel._state.uuid + ' - ' + obj.time); | ||
*/ | ||
if (msgObj.time >= obj.time) { | ||
@@ -162,3 +168,3 @@ obj.fn(msgObj.data); | ||
var time = new Date().getTime() - 5; | ||
var time = channel.method.microSeconds(); | ||
if (channel._preparePromise) { | ||
@@ -180,3 +186,3 @@ channel._preparePromise.then(function () { | ||
channel._isListening = false; | ||
var time = new Date().getTime() - 5; | ||
var time = channel.method.microSeconds(); | ||
channel.method.onMessage(channel._state, null, time); | ||
@@ -183,0 +189,0 @@ } |
@@ -9,4 +9,7 @@ /** | ||
import { sleep, randomInt, randomToken } from '../util.js'; | ||
import { sleep, randomInt, randomToken, microSeconds as micro } from '../util.js'; | ||
export var microSeconds = micro; | ||
import ObliviousSet from '../oblivious-set'; | ||
import { fillOptionsWithDefaults } from '../options'; | ||
@@ -171,3 +174,5 @@ | ||
// contains all messages that have been emitted before | ||
emittedMessagesIds: new Set(), | ||
emittedMessagesIds: new ObliviousSet(options.idb.ttl * 2), | ||
// ensures we do not read messages in parrallel | ||
writeBlockPromise: Promise.resolve(), | ||
messagesCallback: null, | ||
@@ -199,2 +204,9 @@ readQueuePromises: [], | ||
function _filterMessage(msgObj, state) { | ||
if (msgObj.uuid === state.uuid) return false; // send by own | ||
if (state.emittedMessagesIds.has(msgObj.id)) return false; // already emitted | ||
if (msgObj.data.time < state.messagesCallbackTime) return false; // older then onMessageCallback | ||
return true; | ||
} | ||
/** | ||
@@ -204,2 +216,6 @@ * reads all new messages from the database and emits them | ||
function readNewMessages(state) { | ||
// if no one is listening, we do not need to scan for new messages | ||
if (!state.messagesCallback) return Promise.resolve(); | ||
return getMessagesHigherThen(state.db, state.lastCursorId).then(function (newerMessages) { | ||
@@ -212,22 +228,9 @@ var useMessages = newerMessages.map(function (msgObj) { | ||
}).filter(function (msgObj) { | ||
return msgObj.uuid !== state.uuid; | ||
}) // not send by own | ||
.filter(function (msgObj) { | ||
return !state.emittedMessagesIds.has(msgObj.id); | ||
}) // not already emitted | ||
.filter(function (msgObj) { | ||
return msgObj.time >= state.messagesCallbackTime; | ||
}) // not older then onMessageCallback | ||
.sort(function (msgObjA, msgObjB) { | ||
return _filterMessage(msgObj, state); | ||
}).sort(function (msgObjA, msgObjB) { | ||
return msgObjA.time - msgObjB.time; | ||
}); // sort by time | ||
useMessages.forEach(function (msgObj) { | ||
if (state.messagesCallback) { | ||
state.emittedMessagesIds.add(msgObj.id); | ||
setTimeout(function () { | ||
return state.emittedMessagesIds['delete'](msgObj.id); | ||
}, state.options.idb.ttl * 2); | ||
state.messagesCallback(msgObj.data); | ||
@@ -247,3 +250,6 @@ } | ||
export function postMessage(channelState, messageJson) { | ||
return writeMessage(channelState.db, channelState.uuid, messageJson).then(function () { | ||
channelState.writeBlockPromise = channelState.writeBlockPromise.then(function () { | ||
return writeMessage(channelState.db, channelState.uuid, messageJson); | ||
}).then(function () { | ||
if (randomInt(0, 10) === 0) { | ||
@@ -253,2 +259,4 @@ /* await (do not await) */cleanOldMessages(channelState.db, channelState.options.idb.ttl); | ||
}); | ||
return channelState.writeBlockPromise; | ||
} | ||
@@ -255,0 +263,0 @@ |
@@ -10,7 +10,10 @@ /** | ||
var isNode = require('detect-node'); | ||
import ObliviousSet from '../oblivious-set'; | ||
import { fillOptionsWithDefaults } from '../options'; | ||
import { sleep, randomToken } from '../util'; | ||
import { sleep, randomToken, microSeconds as micro } from '../util'; | ||
export var microSeconds = micro; | ||
var KEY_PREFIX = 'pubkey.broadcastChannel-'; | ||
@@ -98,3 +101,3 @@ export var type = 'localstorage'; | ||
// contains all messages that have been emitted before | ||
var emittedMessagesIds = new Set(); | ||
var emittedMessagesIds = new ObliviousSet(options.localstorage.removeTimeout); | ||
@@ -113,8 +116,5 @@ var state = { | ||
if (!msgObj.token || emittedMessagesIds.has(msgObj.token)) return; // already emitted | ||
if (msgObj.time && msgObj.time < state.messagesCallbackTime) return; // too old | ||
if (msgObj.data.time && msgObj.data.time < state.messagesCallbackTime) return; // too old | ||
emittedMessagesIds.add(msgObj.token); | ||
setTimeout(function () { | ||
return emittedMessagesIds['delete'](msgObj.token); | ||
}, options.localstorage.removeTimeout); | ||
state.messagesCallback(msgObj.data); | ||
@@ -121,0 +121,0 @@ }); |
var isNode = require('detect-node'); | ||
import { randomToken, microSeconds as micro } from '../util'; | ||
export var microSeconds = micro; | ||
export var type = 'native'; | ||
@@ -8,2 +12,3 @@ | ||
var state = { | ||
uuid: randomToken(10), | ||
channelName: channelName, | ||
@@ -10,0 +15,0 @@ options: options, |
@@ -14,6 +14,7 @@ import _regeneratorRuntime from 'babel-runtime/regenerator'; | ||
import * as path from 'path'; | ||
import micro from 'nano-time'; | ||
import { sha3_224 } from 'js-sha3'; | ||
import isNode from 'detect-node'; | ||
import IdleQueue from 'custom-idle-queue'; | ||
import unload from 'unload'; | ||
@@ -25,2 +26,4 @@ | ||
import ObliviousSet from '../oblivious-set'; | ||
/** | ||
@@ -47,2 +50,3 @@ * windows sucks, so we have handle windows-type of socket-paths | ||
var TMP_FOLDER_NAME = 'pubkey.broadcast-channel'; | ||
var OTHER_INSTANCES = {}; | ||
@@ -143,3 +147,3 @@ var getPathsCache = new Map(); | ||
return writeFile(pathToFile, JSON.stringify({ | ||
time: new Date().getTime() | ||
time: microSeconds() | ||
})).then(function () { | ||
@@ -169,4 +173,2 @@ return pathToFile; | ||
stream.on('data', function (msg) { | ||
// console.log('server: got data:'); | ||
// console.dir(msg.toString()); | ||
emitter.emit('data', msg.toString()); | ||
@@ -247,3 +249,3 @@ }); | ||
case 0: | ||
time = new Date().getTime(); | ||
time = microSeconds(); | ||
writeObject = { | ||
@@ -406,6 +408,6 @@ uuid: readerUuid, | ||
case 0: | ||
olderThen = new Date().getTime() - ttl; | ||
olderThen = Date.now() - ttl; | ||
_context8.next = 3; | ||
return Promise.all(messageObjects.filter(function (obj) { | ||
return obj.time < olderThen; | ||
return obj.time / 1000 < olderThen; | ||
}).map(function (obj) { | ||
@@ -436,3 +438,3 @@ return unlink(obj.path)['catch'](function () { | ||
var uuid, writeQueue, state, _ref10, socketEE, infoFilePath; | ||
var time, uuid, state, _ref10, socketEE, infoFilePath; | ||
@@ -444,13 +446,10 @@ return _regeneratorRuntime.wrap(function _callee9$(_context9) { | ||
options = fillOptionsWithDefaults(options); | ||
_context9.next = 3; | ||
time = microSeconds(); | ||
_context9.next = 4; | ||
return ensureFoldersExist(channelName); | ||
case 3: | ||
case 4: | ||
uuid = randomToken(10); | ||
// ensures we do not read messages in parrallel | ||
writeQueue = new IdleQueue(1); | ||
state = { | ||
time: time, | ||
channelName: channelName, | ||
@@ -460,6 +459,7 @@ options: options, | ||
// contains all messages that have been emitted before | ||
emittedMessagesIds: new Set(), | ||
emittedMessagesIds: new ObliviousSet(options.node.ttl * 2), | ||
messagesCallbackTime: null, | ||
messagesCallback: null, | ||
writeQueue: writeQueue, | ||
// ensures we do not read messages in parrallel | ||
writeBlockPromise: Promise.resolve(), | ||
otherReaderClients: {}, | ||
@@ -472,6 +472,11 @@ // ensure if process crashes, everything is cleaned up | ||
}; | ||
_context9.next = 8; | ||
if (!OTHER_INSTANCES[channelName]) OTHER_INSTANCES[channelName] = []; | ||
OTHER_INSTANCES[channelName].push(state); | ||
_context9.next = 10; | ||
return Promise.all([createSocketEventEmitter(channelName, uuid), createSocketInfoFile(channelName, uuid), refreshReaderClients(state)]); | ||
case 8: | ||
case 10: | ||
_ref10 = _context9.sent; | ||
@@ -486,4 +491,8 @@ socketEE = _ref10[0]; | ||
socketEE.emitter.on('data', function (data) { | ||
var obj = JSON.parse(data); | ||
handleMessagePing(state, obj); | ||
try { | ||
var obj = JSON.parse(data); | ||
handleMessagePing(state, obj); | ||
} catch (err) { | ||
throw new Error('could not parse data: ' + data); | ||
} | ||
}); | ||
@@ -493,3 +502,3 @@ | ||
case 15: | ||
case 17: | ||
case 'end': | ||
@@ -508,5 +517,18 @@ return _context9.stop(); | ||
export function _filterMessage(msgObj, state) { | ||
/* console.log('_filterMessage()'); | ||
console.dir(msgObj); | ||
console.log(msgObj.senderUuid === state.uuid); | ||
console.log(state.emittedMessagesIds.has(msgObj.token)); | ||
console.log(!state.messagesCallback); | ||
console.log(msgObj.time < state.messagesCallbackTime); | ||
console.log(msgObj.time < state.time);*/ | ||
if (msgObj.senderUuid === state.uuid) return false; // not send by own | ||
if (state.emittedMessagesIds.has(msgObj.token)) return false; // not already emitted | ||
if (!state.messagesCallback) return false; // no listener | ||
if (msgObj.time < state.messagesCallbackTime) return false; // not older then onMessageCallback | ||
if (msgObj.time < state.time) return false; // msgObj is older then channel | ||
state.emittedMessagesIds.add(msgObj.token); | ||
return true; | ||
@@ -582,7 +604,5 @@ } | ||
state.emittedMessagesIds.add(msgObj.token); | ||
setTimeout(function () { | ||
return state.emittedMessagesIds['delete'](msgObj.token); | ||
}, state.options.node.ttl * 2); | ||
if (state.messagesCallback) { | ||
// emit to subscribers | ||
state.messagesCallback(msgObj.content.data); | ||
@@ -728,56 +748,97 @@ } | ||
// ensure we do this not in parallel | ||
return channelState.writeQueue.requestIdlePromise().then(function () { | ||
return channelState.writeQueue.wrapCall(_asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee14() { | ||
var _ref16, msgObj, pingStr; | ||
var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson); | ||
return _regeneratorRuntime.wrap(function _callee14$(_context14) { | ||
while (1) { | ||
switch (_context14.prev = _context14.next) { | ||
case 0: | ||
_context14.next = 2; | ||
return Promise.all([writeMessage(channelState.channelName, channelState.uuid, messageJson), refreshReaderClients(channelState)]); | ||
channelState.writeBlockPromise = channelState.writeBlockPromise.then(_asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee14() { | ||
var _ref16, msgObj, pingStr; | ||
case 2: | ||
_ref16 = _context14.sent; | ||
msgObj = _ref16[0]; | ||
pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}'; | ||
_context14.next = 7; | ||
return Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) { | ||
return client.writable; | ||
}) // client might have closed in between | ||
.map(function (client) { | ||
return new Promise(function (res) { | ||
client.write(pingStr, res); | ||
}); | ||
})); | ||
return _regeneratorRuntime.wrap(function _callee14$(_context14) { | ||
while (1) { | ||
switch (_context14.prev = _context14.next) { | ||
case 0: | ||
_context14.next = 2; | ||
return new Promise(function (res) { | ||
return setTimeout(res, 0); | ||
}); | ||
case 7: | ||
case 2: | ||
_context14.next = 4; | ||
return new Promise(function (res) { | ||
return setTimeout(res, 0); | ||
}); | ||
/** | ||
* clean up old messages | ||
* to not waste resources on cleaning up, | ||
* only if random-int matches, we clean up old messages | ||
*/ | ||
if (randomInt(0, 50) === 0) { | ||
/* await */getAllMessages(channelState.channelName).then(function (allMessages) { | ||
return cleanOldMessages(allMessages, channelState.options.node.ttl); | ||
}); | ||
} | ||
case 4: | ||
_context14.next = 6; | ||
return Promise.all([writePromise, refreshReaderClients(channelState)]); | ||
// emit to own eventEmitter | ||
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson))); | ||
case 6: | ||
_ref16 = _context14.sent; | ||
msgObj = _ref16[0]; | ||
case 8: | ||
case 'end': | ||
return _context14.stop(); | ||
} | ||
emitOverFastPath(channelState, msgObj, messageJson); | ||
pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}'; | ||
_context14.next = 12; | ||
return Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) { | ||
return client.writable; | ||
}) // client might have closed in between | ||
.map(function (client) { | ||
return new Promise(function (res) { | ||
client.write(pingStr, res); | ||
}); | ||
})); | ||
case 12: | ||
/** | ||
* clean up old messages | ||
* to not waste resources on cleaning up, | ||
* only if random-int matches, we clean up old messages | ||
*/ | ||
if (randomInt(0, 20) === 0) { | ||
/* await */getAllMessages(channelState.channelName).then(function (allMessages) { | ||
return cleanOldMessages(allMessages, channelState.options.node.ttl); | ||
}); | ||
} | ||
// emit to own eventEmitter | ||
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson))); | ||
case 13: | ||
case 'end': | ||
return _context14.stop(); | ||
} | ||
}, _callee14, _this2); | ||
}))); | ||
} | ||
}, _callee14, _this2); | ||
}))); | ||
return channelState.writeBlockPromise; | ||
} | ||
/** | ||
* When multiple BroadcastChannels with the same name | ||
* are created in a single node-process, we can access them directly and emit messages. | ||
* This might not happen often in production | ||
* but will speed up things when this module is used in unit-tests. | ||
*/ | ||
export function emitOverFastPath(state, msgObj, messageJson) { | ||
if (!state.options.node.useFastPath) return; // disabled | ||
var others = OTHER_INSTANCES[state.channelName].filter(function (s) { | ||
return s !== state; | ||
}); | ||
var checkObj = { | ||
time: msgObj.time, | ||
senderUuid: msgObj.uuid, | ||
token: msgObj.token | ||
}; | ||
others.filter(function (otherState) { | ||
return _filterMessage(checkObj, otherState); | ||
}).forEach(function (otherState) { | ||
// console.log('EMIT OVER FAST PATH'); | ||
otherState.messagesCallback(messageJson); | ||
}); | ||
} | ||
export function onMessage(channelState, fn) { | ||
var time = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : new Date().getTime(); | ||
var time = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : microSeconds(); | ||
@@ -792,2 +853,6 @@ channelState.messagesCallbackTime = time; | ||
channelState.closed = true; | ||
channelState.emittedMessagesIds.clear(); | ||
OTHER_INSTANCES[channelState.channelName] = OTHER_INSTANCES[channelState.channelName].filter(function (o) { | ||
return o !== channelState; | ||
}); | ||
@@ -805,3 +870,2 @@ if (typeof channelState.removeUnload === 'function') channelState.removeUnload(); | ||
channelState.socketEE.emitter.removeAllListeners(); | ||
channelState.writeQueue.clear(); | ||
@@ -823,2 +887,6 @@ Object.values(channelState.otherReaderClients).forEach(function (client) { | ||
return 50; | ||
} | ||
export function microSeconds() { | ||
return parseInt(micro.microseconds()); | ||
} |
@@ -21,4 +21,5 @@ export function fillOptionsWithDefaults(options) { | ||
if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; | ||
if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; | ||
return options; | ||
} |
@@ -34,2 +34,24 @@ /** | ||
}return text; | ||
} | ||
var lastMs = 0; | ||
var additional = 0; | ||
/** | ||
* returns the current time in micro-seconds, | ||
* WARNING: This is a pseudo-function | ||
* Performance.now is not reliable in webworkers, so we just make sure to never return the same time. | ||
* This is enough in browsers, and this function will not be used in nodejs. | ||
* The main reason for this hack is to ensure that BroadcastChannel behaves equal to production when it is used in fast-running unit tests. | ||
*/ | ||
export function microSeconds() { | ||
var ms = new Date().getTime(); | ||
if (ms === lastMs) { | ||
additional++; | ||
return ms * 1000 + additional; | ||
} else { | ||
lastMs = ms; | ||
additional = 0; | ||
return ms * 1000; | ||
} | ||
} |
@@ -45,4 +45,5 @@ 'use strict'; | ||
var time = this.method.microSeconds(); | ||
var msgObj = { | ||
time: new Date().getTime(), | ||
time: time, | ||
type: type, | ||
@@ -68,3 +69,3 @@ data: msg | ||
set onmessage(fn) { | ||
var time = new Date().getTime(); | ||
var time = this.method.microSeconds(); | ||
var listenObj = { | ||
@@ -84,3 +85,3 @@ time: time, | ||
addEventListener: function addEventListener(type, fn) { | ||
var time = new Date().getTime(); | ||
var time = this.method.microSeconds(); | ||
var listenObj = { | ||
@@ -161,2 +162,7 @@ time: time, | ||
channel._addEventListeners[msgObj.type].forEach(function (obj) { | ||
/* | ||
console.log('got message for ' + channel._state.uuid); | ||
console.log('... message time:' + channel._state.uuid + ' - ' + msgObj.time); | ||
console.log('listener time:' + channel._state.uuid + ' - ' + obj.time); | ||
*/ | ||
if (msgObj.time >= obj.time) { | ||
@@ -168,3 +174,3 @@ obj.fn(msgObj.data); | ||
var time = new Date().getTime() - 5; | ||
var time = channel.method.microSeconds(); | ||
if (channel._preparePromise) { | ||
@@ -186,3 +192,3 @@ channel._preparePromise.then(function () { | ||
channel._isListening = false; | ||
var time = new Date().getTime() - 5; | ||
var time = channel.method.microSeconds(); | ||
channel.method.onMessage(channel._state, null, time); | ||
@@ -189,0 +195,0 @@ } |
@@ -6,3 +6,3 @@ 'use strict'; | ||
}); | ||
exports.type = undefined; | ||
exports.type = exports.microSeconds = undefined; | ||
exports.getIdb = getIdb; | ||
@@ -25,4 +25,10 @@ exports.createDatabase = createDatabase; | ||
var _obliviousSet = require('../oblivious-set'); | ||
var _obliviousSet2 = _interopRequireDefault(_obliviousSet); | ||
var _options = require('../options'); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { 'default': obj }; } | ||
/** | ||
@@ -36,2 +42,5 @@ * this method uses indexeddb to store the messages | ||
var microSeconds = exports.microSeconds = _util.microSeconds; | ||
var DB_PREFIX = 'pubkey.broadcast-channel-0-'; | ||
@@ -194,3 +203,5 @@ var OBJECT_STORE_ID = 'messages'; | ||
// contains all messages that have been emitted before | ||
emittedMessagesIds: new Set(), | ||
emittedMessagesIds: new _obliviousSet2['default'](options.idb.ttl * 2), | ||
// ensures we do not read messages in parrallel | ||
writeBlockPromise: Promise.resolve(), | ||
messagesCallback: null, | ||
@@ -222,2 +233,9 @@ readQueuePromises: [], | ||
function _filterMessage(msgObj, state) { | ||
if (msgObj.uuid === state.uuid) return false; // send by own | ||
if (state.emittedMessagesIds.has(msgObj.id)) return false; // already emitted | ||
if (msgObj.data.time < state.messagesCallbackTime) return false; // older then onMessageCallback | ||
return true; | ||
} | ||
/** | ||
@@ -227,2 +245,6 @@ * reads all new messages from the database and emits them | ||
function readNewMessages(state) { | ||
// if no one is listening, we do not need to scan for new messages | ||
if (!state.messagesCallback) return Promise.resolve(); | ||
return getMessagesHigherThen(state.db, state.lastCursorId).then(function (newerMessages) { | ||
@@ -235,22 +257,9 @@ var useMessages = newerMessages.map(function (msgObj) { | ||
}).filter(function (msgObj) { | ||
return msgObj.uuid !== state.uuid; | ||
}) // not send by own | ||
.filter(function (msgObj) { | ||
return !state.emittedMessagesIds.has(msgObj.id); | ||
}) // not already emitted | ||
.filter(function (msgObj) { | ||
return msgObj.time >= state.messagesCallbackTime; | ||
}) // not older then onMessageCallback | ||
.sort(function (msgObjA, msgObjB) { | ||
return _filterMessage(msgObj, state); | ||
}).sort(function (msgObjA, msgObjB) { | ||
return msgObjA.time - msgObjB.time; | ||
}); // sort by time | ||
useMessages.forEach(function (msgObj) { | ||
if (state.messagesCallback) { | ||
state.emittedMessagesIds.add(msgObj.id); | ||
setTimeout(function () { | ||
return state.emittedMessagesIds['delete'](msgObj.id); | ||
}, state.options.idb.ttl * 2); | ||
state.messagesCallback(msgObj.data); | ||
@@ -270,3 +279,6 @@ } | ||
function postMessage(channelState, messageJson) { | ||
return writeMessage(channelState.db, channelState.uuid, messageJson).then(function () { | ||
channelState.writeBlockPromise = channelState.writeBlockPromise.then(function () { | ||
return writeMessage(channelState.db, channelState.uuid, messageJson); | ||
}).then(function () { | ||
if ((0, _util.randomInt)(0, 10) === 0) { | ||
@@ -276,2 +288,4 @@ /* await (do not await) */cleanOldMessages(channelState.db, channelState.options.idb.ttl); | ||
}); | ||
return channelState.writeBlockPromise; | ||
} | ||
@@ -278,0 +292,0 @@ |
@@ -6,3 +6,3 @@ 'use strict'; | ||
}); | ||
exports.type = undefined; | ||
exports.type = exports.microSeconds = undefined; | ||
exports.getLocalStorage = getLocalStorage; | ||
@@ -19,2 +19,6 @@ exports.storageKey = storageKey; | ||
var _obliviousSet = require('../oblivious-set'); | ||
var _obliviousSet2 = _interopRequireDefault(_obliviousSet); | ||
var _options = require('../options'); | ||
@@ -24,2 +28,4 @@ | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { 'default': obj }; } | ||
/** | ||
@@ -34,2 +40,3 @@ * A localStorage-only method which uses localstorage and its 'storage'-event | ||
var isNode = require('detect-node'); | ||
var microSeconds = exports.microSeconds = _util.microSeconds; | ||
@@ -118,3 +125,3 @@ var KEY_PREFIX = 'pubkey.broadcastChannel-'; | ||
// contains all messages that have been emitted before | ||
var emittedMessagesIds = new Set(); | ||
var emittedMessagesIds = new _obliviousSet2['default'](options.localstorage.removeTimeout); | ||
@@ -133,8 +140,5 @@ var state = { | ||
if (!msgObj.token || emittedMessagesIds.has(msgObj.token)) return; // already emitted | ||
if (msgObj.time && msgObj.time < state.messagesCallbackTime) return; // too old | ||
if (msgObj.data.time && msgObj.data.time < state.messagesCallbackTime) return; // too old | ||
emittedMessagesIds.add(msgObj.token); | ||
setTimeout(function () { | ||
return emittedMessagesIds['delete'](msgObj.token); | ||
}, options.localstorage.removeTimeout); | ||
state.messagesCallback(msgObj.data); | ||
@@ -141,0 +145,0 @@ }); |
@@ -6,2 +6,3 @@ 'use strict'; | ||
}); | ||
exports.type = exports.microSeconds = undefined; | ||
exports.create = create; | ||
@@ -13,4 +14,9 @@ exports.close = close; | ||
exports.averageResponseTime = averageResponseTime; | ||
var _util = require('../util'); | ||
var isNode = require('detect-node'); | ||
var microSeconds = exports.microSeconds = _util.microSeconds; | ||
var type = exports.type = 'native'; | ||
@@ -21,2 +27,3 @@ | ||
var state = { | ||
uuid: (0, _util.randomToken)(10), | ||
channelName: channelName, | ||
@@ -23,0 +30,0 @@ options: options, |
@@ -91,4 +91,2 @@ 'use strict'; | ||
stream.on('data', function (msg) { | ||
// console.log('server: got data:'); | ||
// console.dir(msg.toString()); | ||
emitter.emit('data', msg.toString()); | ||
@@ -171,3 +169,3 @@ }); | ||
case 0: | ||
time = new Date().getTime(); | ||
time = microSeconds(); | ||
writeObject = { | ||
@@ -315,6 +313,6 @@ uuid: readerUuid, | ||
case 0: | ||
olderThen = new Date().getTime() - ttl; | ||
olderThen = Date.now() - ttl; | ||
_context8.next = 3; | ||
return Promise.all(messageObjects.filter(function (obj) { | ||
return obj.time < olderThen; | ||
return obj.time / 1000 < olderThen; | ||
}).map(function (obj) { | ||
@@ -343,3 +341,3 @@ return unlink(obj.path)['catch'](function () { | ||
var uuid, writeQueue, state, _ref10, _ref11, socketEE, infoFilePath; | ||
var time, uuid, state, _ref10, _ref11, socketEE, infoFilePath; | ||
@@ -351,13 +349,10 @@ return _regenerator2['default'].wrap(function _callee9$(_context9) { | ||
options = (0, _options.fillOptionsWithDefaults)(options); | ||
_context9.next = 3; | ||
time = microSeconds(); | ||
_context9.next = 4; | ||
return ensureFoldersExist(channelName); | ||
case 3: | ||
case 4: | ||
uuid = (0, _util2.randomToken)(10); | ||
// ensures we do not read messages in parrallel | ||
writeQueue = new _customIdleQueue2['default'](1); | ||
state = { | ||
time: time, | ||
channelName: channelName, | ||
@@ -367,6 +362,7 @@ options: options, | ||
// contains all messages that have been emitted before | ||
emittedMessagesIds: new Set(), | ||
emittedMessagesIds: new _obliviousSet2['default'](options.node.ttl * 2), | ||
messagesCallbackTime: null, | ||
messagesCallback: null, | ||
writeQueue: writeQueue, | ||
// ensures we do not read messages in parrallel | ||
writeBlockPromise: Promise.resolve(), | ||
otherReaderClients: {}, | ||
@@ -379,6 +375,11 @@ // ensure if process crashes, everything is cleaned up | ||
}; | ||
_context9.next = 8; | ||
if (!OTHER_INSTANCES[channelName]) OTHER_INSTANCES[channelName] = []; | ||
OTHER_INSTANCES[channelName].push(state); | ||
_context9.next = 10; | ||
return Promise.all([createSocketEventEmitter(channelName, uuid), createSocketInfoFile(channelName, uuid), refreshReaderClients(state)]); | ||
case 8: | ||
case 10: | ||
_ref10 = _context9.sent; | ||
@@ -394,4 +395,8 @@ _ref11 = (0, _slicedToArray3['default'])(_ref10, 2); | ||
socketEE.emitter.on('data', function (data) { | ||
var obj = JSON.parse(data); | ||
handleMessagePing(state, obj); | ||
try { | ||
var obj = JSON.parse(data); | ||
handleMessagePing(state, obj); | ||
} catch (err) { | ||
throw new Error('could not parse data: ' + data); | ||
} | ||
}); | ||
@@ -401,3 +406,3 @@ | ||
case 16: | ||
case 18: | ||
case 'end': | ||
@@ -482,7 +487,5 @@ return _context9.stop(); | ||
state.emittedMessagesIds.add(msgObj.token); | ||
setTimeout(function () { | ||
return state.emittedMessagesIds['delete'](msgObj.token); | ||
}, state.options.node.ttl * 2); | ||
if (state.messagesCallback) { | ||
// emit to subscribers | ||
state.messagesCallback(msgObj.content.data); | ||
@@ -634,2 +637,3 @@ } | ||
exports.postMessage = postMessage; | ||
exports.emitOverFastPath = emitOverFastPath; | ||
exports.onMessage = onMessage; | ||
@@ -639,2 +643,3 @@ exports.close = close; | ||
exports.averageResponseTime = averageResponseTime; | ||
exports.microSeconds = microSeconds; | ||
@@ -665,2 +670,6 @@ var _util = require('util'); | ||
var _nanoTime = require('nano-time'); | ||
var _nanoTime2 = _interopRequireDefault(_nanoTime); | ||
var _jsSha = require('js-sha3'); | ||
@@ -672,6 +681,2 @@ | ||
var _customIdleQueue = require('custom-idle-queue'); | ||
var _customIdleQueue2 = _interopRequireDefault(_customIdleQueue); | ||
var _unload = require('unload'); | ||
@@ -685,2 +690,6 @@ | ||
var _obliviousSet = require('../oblivious-set'); | ||
var _obliviousSet2 = _interopRequireDefault(_obliviousSet); | ||
function _interopRequireWildcard(obj) { if (obj && obj.__esModule) { return obj; } else { var newObj = {}; if (obj != null) { for (var key in obj) { if (Object.prototype.hasOwnProperty.call(obj, key)) newObj[key] = obj[key]; } } newObj['default'] = obj; return newObj; } } | ||
@@ -694,7 +703,2 @@ | ||
*/ | ||
/** | ||
* this method is used in nodejs-environments. | ||
* The ipc is handled via sockets and file-writes to the tmp-folder | ||
*/ | ||
function cleanPipeName(str) { | ||
@@ -708,3 +712,6 @@ if (process.platform === 'win32' && !str.startsWith('\\\\.\\pipe\\')) { | ||
} | ||
} | ||
} /** | ||
* this method is used in nodejs-environments. | ||
* The ipc is handled via sockets and file-writes to the tmp-folder | ||
*/ | ||
@@ -718,2 +725,3 @@ var mkdir = util.promisify(fs.mkdir); | ||
var TMP_FOLDER_NAME = 'pubkey.broadcast-channel'; | ||
var OTHER_INSTANCES = {}; | ||
@@ -762,3 +770,3 @@ var getPathsCache = new Map(); | ||
return writeFile(pathToFile, JSON.stringify({ | ||
time: new Date().getTime() | ||
time: microSeconds() | ||
})).then(function () { | ||
@@ -787,5 +795,18 @@ return pathToFile; | ||
function _filterMessage(msgObj, state) { | ||
/* console.log('_filterMessage()'); | ||
console.dir(msgObj); | ||
console.log(msgObj.senderUuid === state.uuid); | ||
console.log(state.emittedMessagesIds.has(msgObj.token)); | ||
console.log(!state.messagesCallback); | ||
console.log(msgObj.time < state.messagesCallbackTime); | ||
console.log(msgObj.time < state.time);*/ | ||
if (msgObj.senderUuid === state.uuid) return false; // not send by own | ||
if (state.emittedMessagesIds.has(msgObj.token)) return false; // not already emitted | ||
if (!state.messagesCallback) return false; // no listener | ||
if (msgObj.time < state.messagesCallbackTime) return false; // not older then onMessageCallback | ||
if (msgObj.time < state.time) return false; // msgObj is older then channel | ||
state.emittedMessagesIds.add(msgObj.token); | ||
return true; | ||
@@ -795,57 +816,98 @@ }function postMessage(channelState, messageJson) { | ||
// ensure we do this not in parallel | ||
return channelState.writeQueue.requestIdlePromise().then(function () { | ||
return channelState.writeQueue.wrapCall((0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee14() { | ||
var _ref17, _ref18, msgObj, pingStr; | ||
var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson); | ||
return _regenerator2['default'].wrap(function _callee14$(_context14) { | ||
while (1) { | ||
switch (_context14.prev = _context14.next) { | ||
case 0: | ||
_context14.next = 2; | ||
return Promise.all([writeMessage(channelState.channelName, channelState.uuid, messageJson), refreshReaderClients(channelState)]); | ||
channelState.writeBlockPromise = channelState.writeBlockPromise.then((0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee14() { | ||
var _ref17, _ref18, msgObj, pingStr; | ||
case 2: | ||
_ref17 = _context14.sent; | ||
_ref18 = (0, _slicedToArray3['default'])(_ref17, 1); | ||
msgObj = _ref18[0]; | ||
pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}'; | ||
_context14.next = 8; | ||
return Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) { | ||
return client.writable; | ||
}) // client might have closed in between | ||
.map(function (client) { | ||
return new Promise(function (res) { | ||
client.write(pingStr, res); | ||
}); | ||
})); | ||
return _regenerator2['default'].wrap(function _callee14$(_context14) { | ||
while (1) { | ||
switch (_context14.prev = _context14.next) { | ||
case 0: | ||
_context14.next = 2; | ||
return new Promise(function (res) { | ||
return setTimeout(res, 0); | ||
}); | ||
case 8: | ||
case 2: | ||
_context14.next = 4; | ||
return new Promise(function (res) { | ||
return setTimeout(res, 0); | ||
}); | ||
/** | ||
* clean up old messages | ||
* to not waste resources on cleaning up, | ||
* only if random-int matches, we clean up old messages | ||
*/ | ||
if ((0, _util2.randomInt)(0, 50) === 0) { | ||
/* await */getAllMessages(channelState.channelName).then(function (allMessages) { | ||
return cleanOldMessages(allMessages, channelState.options.node.ttl); | ||
}); | ||
} | ||
case 4: | ||
_context14.next = 6; | ||
return Promise.all([writePromise, refreshReaderClients(channelState)]); | ||
// emit to own eventEmitter | ||
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson))); | ||
case 6: | ||
_ref17 = _context14.sent; | ||
_ref18 = (0, _slicedToArray3['default'])(_ref17, 1); | ||
msgObj = _ref18[0]; | ||
case 9: | ||
case 'end': | ||
return _context14.stop(); | ||
} | ||
emitOverFastPath(channelState, msgObj, messageJson); | ||
pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}'; | ||
_context14.next = 13; | ||
return Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) { | ||
return client.writable; | ||
}) // client might have closed in between | ||
.map(function (client) { | ||
return new Promise(function (res) { | ||
client.write(pingStr, res); | ||
}); | ||
})); | ||
case 13: | ||
/** | ||
* clean up old messages | ||
* to not waste resources on cleaning up, | ||
* only if random-int matches, we clean up old messages | ||
*/ | ||
if ((0, _util2.randomInt)(0, 20) === 0) { | ||
/* await */getAllMessages(channelState.channelName).then(function (allMessages) { | ||
return cleanOldMessages(allMessages, channelState.options.node.ttl); | ||
}); | ||
} | ||
// emit to own eventEmitter | ||
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson))); | ||
case 14: | ||
case 'end': | ||
return _context14.stop(); | ||
} | ||
}, _callee14, _this2); | ||
}))); | ||
} | ||
}, _callee14, _this2); | ||
}))); | ||
return channelState.writeBlockPromise; | ||
} | ||
/** | ||
* When multiple BroadcastChannels with the same name | ||
* are created in a single node-process, we can access them directly and emit messages. | ||
* This might not happen often in production | ||
* but will speed up things when this module is used in unit-tests. | ||
*/ | ||
function emitOverFastPath(state, msgObj, messageJson) { | ||
if (!state.options.node.useFastPath) return; // disabled | ||
var others = OTHER_INSTANCES[state.channelName].filter(function (s) { | ||
return s !== state; | ||
}); | ||
var checkObj = { | ||
time: msgObj.time, | ||
senderUuid: msgObj.uuid, | ||
token: msgObj.token | ||
}; | ||
others.filter(function (otherState) { | ||
return _filterMessage(checkObj, otherState); | ||
}).forEach(function (otherState) { | ||
// console.log('EMIT OVER FAST PATH'); | ||
otherState.messagesCallback(messageJson); | ||
}); | ||
} | ||
function onMessage(channelState, fn) { | ||
var time = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : new Date().getTime(); | ||
var time = arguments.length > 2 && arguments[2] !== undefined ? arguments[2] : microSeconds(); | ||
@@ -860,2 +922,6 @@ channelState.messagesCallbackTime = time; | ||
channelState.closed = true; | ||
channelState.emittedMessagesIds.clear(); | ||
OTHER_INSTANCES[channelState.channelName] = OTHER_INSTANCES[channelState.channelName].filter(function (o) { | ||
return o !== channelState; | ||
}); | ||
@@ -873,3 +939,2 @@ if (typeof channelState.removeUnload === 'function') channelState.removeUnload(); | ||
channelState.socketEE.emitter.removeAllListeners(); | ||
channelState.writeQueue.clear(); | ||
@@ -891,2 +956,6 @@ Object.values(channelState.otherReaderClients).forEach(function (client) { | ||
return 50; | ||
} | ||
function microSeconds() { | ||
return parseInt(_nanoTime2['default'].microseconds()); | ||
} |
@@ -27,4 +27,5 @@ 'use strict'; | ||
if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; | ||
if (typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; | ||
return options; | ||
} |
@@ -10,2 +10,3 @@ 'use strict'; | ||
exports.randomToken = randomToken; | ||
exports.microSeconds = microSeconds; | ||
/** | ||
@@ -44,2 +45,24 @@ * returns true if the given object is a promise | ||
}return text; | ||
} | ||
var lastMs = 0; | ||
var additional = 0; | ||
/** | ||
* returns the current time in micro-seconds, | ||
* WARNING: This is a pseudo-function | ||
* Performance.now is not reliable in webworkers, so we just make sure to never return the same time. | ||
* This is enough in browsers, and this function will not be used in nodejs. | ||
* The main reason for this hack is to ensure that BroadcastChannel behaves equal to production when it is used in fast-running unit tests. | ||
*/ | ||
function microSeconds() { | ||
var ms = new Date().getTime(); | ||
if (ms === lastMs) { | ||
additional++; | ||
return ms * 1000 + additional; | ||
} else { | ||
lastMs = ms; | ||
additional = 0; | ||
return ms * 1000; | ||
} | ||
} |
@@ -16,2 +16,3 @@ declare type MethodType = 'node' | 'idb' | 'native' | 'localstorage'; | ||
ttl?: number; | ||
useFastPath?: boolean; | ||
}; | ||
@@ -18,0 +19,0 @@ idb?: { |
{ | ||
"name": "broadcast-channel", | ||
"version": "1.2.5", | ||
"version": "1.2.6", | ||
"description": "A BroadcastChannel implementation that works with new browsers, older browsers and Node.js", | ||
@@ -63,2 +63,4 @@ "homepage": "https://github.com/pubkey/broadcast-channel#readme", | ||
"js-sha3": "0.7.0", | ||
"microseconds": "0.1.0", | ||
"nano-time": "1.0.0", | ||
"unload": "1.3.9" | ||
@@ -65,0 +67,0 @@ }, |
61
perf.txt
@@ -96,3 +96,3 @@ BEFORE: | ||
----------------------------------------- | ||
16. July.2018: test:performance | ||
test:performance | ||
@@ -150,1 +150,60 @@ BEFORE: { | ||
----------------------------------------- | ||
test:performance | ||
BEFORE: { | ||
"openClose": 714.9132689982653, | ||
"sendRecieve": { | ||
"parallel": 6018.035248000175, | ||
"series": 4019.5094799995422 | ||
} | ||
} | ||
AFTER: { // write message up front | ||
"openClose": 703.9341719998047, | ||
"sendRecieve": { | ||
"parallel": 233.59367400035262, | ||
"series": 4531.717969999649 | ||
} | ||
} | ||
----------------------------------------- | ||
----------------------------------------- | ||
test:performance - forgetting set | ||
BEFORE: { | ||
"openClose": 703.9341719998047, | ||
"sendRecieve": { | ||
"parallel": 233.59367400035262, | ||
"series": 4531.717969999649 | ||
} | ||
} | ||
AFTER: { // add fast path | ||
"openClose": 698.5278329998255, | ||
"sendRecieve": { | ||
"parallel": 254.588275000453, | ||
"series": 3679.5491359978914 | ||
} | ||
} | ||
----------------------------------------- | ||
no idle-queue | ||
BEFORE: { | ||
"openClose": 720.8237979999976, | ||
"sendRecieve": { | ||
"parallel": 250.95046299998648, | ||
"series": 3671.9275919999927 | ||
} | ||
} | ||
AFTER: { | ||
"openClose": 684.5638470000122, | ||
"sendRecieve": { | ||
"parallel": 246.08427699981257, | ||
"series": 2251.4478739998303 | ||
} | ||
} | ||
----------------------------------------- |
@@ -45,4 +45,5 @@ import { | ||
_post(type, msg) { | ||
const time = this.method.microSeconds(); | ||
const msgObj = { | ||
time: new Date().getTime(), | ||
time, | ||
type, | ||
@@ -73,3 +74,3 @@ data: msg | ||
set onmessage(fn) { | ||
const time = new Date().getTime(); | ||
const time = this.method.microSeconds(); | ||
const listenObj = { | ||
@@ -89,3 +90,3 @@ time, | ||
addEventListener(type, fn) { | ||
const time = new Date().getTime(); | ||
const time = this.method.microSeconds(); | ||
const listenObj = { | ||
@@ -161,2 +162,7 @@ time, | ||
channel._addEventListeners[msgObj.type].forEach(obj => { | ||
/* | ||
console.log('got message for ' + channel._state.uuid); | ||
console.log('... message time:' + channel._state.uuid + ' - ' + msgObj.time); | ||
console.log('listener time:' + channel._state.uuid + ' - ' + obj.time); | ||
*/ | ||
if (msgObj.time >= obj.time) { | ||
@@ -168,3 +174,3 @@ obj.fn(msgObj.data); | ||
const time = new Date().getTime() - 5; | ||
const time = channel.method.microSeconds(); | ||
if (channel._preparePromise) { | ||
@@ -194,3 +200,3 @@ channel._preparePromise.then(() => { | ||
channel._isListening = false; | ||
const time = new Date().getTime() - 5; | ||
const time = channel.method.microSeconds(); | ||
channel.method.onMessage( | ||
@@ -197,0 +203,0 @@ channel._state, |
@@ -12,5 +12,9 @@ /** | ||
randomInt, | ||
randomToken | ||
randomToken, | ||
microSeconds as micro | ||
} from '../util.js'; | ||
export const microSeconds = micro; | ||
import ObliviousSet from '../oblivious-set'; | ||
import { | ||
@@ -172,3 +176,5 @@ fillOptionsWithDefaults | ||
// contains all messages that have been emitted before | ||
emittedMessagesIds: new Set(), | ||
emittedMessagesIds: new ObliviousSet(options.idb.ttl * 2), | ||
// ensures we do not read messages in parrallel | ||
writeBlockPromise: Promise.resolve(), | ||
messagesCallback: null, | ||
@@ -198,2 +204,10 @@ readQueuePromises: [], | ||
function _filterMessage(msgObj, state) { | ||
if (msgObj.uuid === state.uuid) return false; // send by own | ||
if (state.emittedMessagesIds.has(msgObj.id)) return false; // already emitted | ||
if (msgObj.data.time < state.messagesCallbackTime) return false; // older then onMessageCallback | ||
return true; | ||
} | ||
/** | ||
@@ -203,2 +217,6 @@ * reads all new messages from the database and emits them | ||
function readNewMessages(state) { | ||
// if no one is listening, we do not need to scan for new messages | ||
if (!state.messagesCallback) return Promise.resolve(); | ||
return getMessagesHigherThen(state.db, state.lastCursorId) | ||
@@ -213,16 +231,7 @@ .then(newerMessages => { | ||
}) | ||
.filter(msgObj => msgObj.uuid !== state.uuid) // not send by own | ||
.filter(msgObj => !state.emittedMessagesIds.has(msgObj.id)) // not already emitted | ||
.filter(msgObj => msgObj.time >= state.messagesCallbackTime) // not older then onMessageCallback | ||
.filter(msgObj => _filterMessage(msgObj, state)) | ||
.sort((msgObjA, msgObjB) => msgObjA.time - msgObjB.time); // sort by time | ||
useMessages.forEach(msgObj => { | ||
if (state.messagesCallback) { | ||
state.emittedMessagesIds.add(msgObj.id); | ||
setTimeout( | ||
() => state.emittedMessagesIds.delete(msgObj.id), | ||
state.options.idb.ttl * 2 | ||
); | ||
state.messagesCallback(msgObj.data); | ||
@@ -242,14 +251,19 @@ } | ||
export function postMessage(channelState, messageJson) { | ||
return writeMessage( | ||
channelState.db, | ||
channelState.uuid, | ||
messageJson | ||
).then(() => { | ||
if (randomInt(0, 10) === 0) { | ||
/* await (do not await) */ cleanOldMessages( | ||
channelState.db, | ||
channelState.options.idb.ttl | ||
); | ||
} | ||
}); | ||
channelState.writeBlockPromise = channelState.writeBlockPromise | ||
.then(() => writeMessage( | ||
channelState.db, | ||
channelState.uuid, | ||
messageJson | ||
)) | ||
.then(() => { | ||
if (randomInt(0, 10) === 0) { | ||
/* await (do not await) */ cleanOldMessages( | ||
channelState.db, | ||
channelState.options.idb.ttl | ||
); | ||
} | ||
}); | ||
return channelState.writeBlockPromise; | ||
} | ||
@@ -256,0 +270,0 @@ |
@@ -10,2 +10,3 @@ /** | ||
const isNode = require('detect-node'); | ||
import ObliviousSet from '../oblivious-set'; | ||
@@ -18,5 +19,8 @@ import { | ||
sleep, | ||
randomToken | ||
randomToken, | ||
microSeconds as micro | ||
} from '../util'; | ||
export const microSeconds = micro; | ||
const KEY_PREFIX = 'pubkey.broadcastChannel-'; | ||
@@ -105,3 +109,3 @@ export const type = 'localstorage'; | ||
// contains all messages that have been emitted before | ||
const emittedMessagesIds = new Set(); | ||
const emittedMessagesIds = new ObliviousSet(options.localstorage.removeTimeout); | ||
@@ -123,9 +127,5 @@ const state = { | ||
if (!msgObj.token || emittedMessagesIds.has(msgObj.token)) return; // already emitted | ||
if (msgObj.time && msgObj.time < state.messagesCallbackTime) return; // too old | ||
if (msgObj.data.time && msgObj.data.time < state.messagesCallbackTime) return; // too old | ||
emittedMessagesIds.add(msgObj.token); | ||
setTimeout( | ||
() => emittedMessagesIds.delete(msgObj.token), | ||
options.localstorage.removeTimeout | ||
); | ||
state.messagesCallback(msgObj.data); | ||
@@ -132,0 +132,0 @@ } |
const isNode = require('detect-node'); | ||
import { | ||
randomToken, | ||
microSeconds as micro | ||
} from '../util'; | ||
export const microSeconds = micro; | ||
export const type = 'native'; | ||
export function create(channelName, options) { | ||
if(!options) options = {}; | ||
if (!options) options = {}; | ||
const state = { | ||
uuid: randomToken(10), | ||
channelName, | ||
@@ -9,0 +17,0 @@ options, |
@@ -12,2 +12,4 @@ /** | ||
import * as path from 'path'; | ||
import micro from 'nano-time'; | ||
import { | ||
@@ -18,3 +20,2 @@ sha3_224 | ||
import isNode from 'detect-node'; | ||
import IdleQueue from 'custom-idle-queue'; | ||
import unload from 'unload'; | ||
@@ -31,2 +32,4 @@ | ||
import ObliviousSet from '../oblivious-set'; | ||
/** | ||
@@ -56,2 +59,3 @@ * windows sucks, so we have handle windows-type of socket-paths | ||
const TMP_FOLDER_NAME = 'pubkey.broadcast-channel'; | ||
const OTHER_INSTANCES = {}; | ||
@@ -131,3 +135,3 @@ const getPathsCache = new Map(); | ||
JSON.stringify({ | ||
time: new Date().getTime() | ||
time: microSeconds() | ||
}) | ||
@@ -152,4 +156,2 @@ ).then(() => pathToFile); | ||
stream.on('data', function (msg) { | ||
// console.log('server: got data:'); | ||
// console.dir(msg.toString()); | ||
emitter.emit('data', msg.toString()); | ||
@@ -193,3 +195,3 @@ }); | ||
export async function writeMessage(channelName, readerUuid, messageJson) { | ||
const time = new Date().getTime(); | ||
const time = microSeconds(); | ||
const writeObject = { | ||
@@ -286,7 +288,6 @@ uuid: readerUuid, | ||
export async function cleanOldMessages(messageObjects, ttl) { | ||
const olderThen = new Date().getTime() - ttl; | ||
const olderThen = Date.now() - ttl; | ||
await Promise.all( | ||
messageObjects | ||
.filter(obj => obj.time < olderThen) | ||
.filter(obj => (obj.time / 1000) < olderThen) | ||
.map(obj => unlink(obj.path).catch(() => null)) | ||
@@ -302,11 +303,8 @@ ); | ||
options = fillOptionsWithDefaults(options); | ||
const time = microSeconds(); | ||
await ensureFoldersExist(channelName); | ||
const uuid = randomToken(10); | ||
// ensures we do not read messages in parrallel | ||
const writeQueue = new IdleQueue(1); | ||
const state = { | ||
time, | ||
channelName, | ||
@@ -316,6 +314,7 @@ options, | ||
// contains all messages that have been emitted before | ||
emittedMessagesIds: new Set(), | ||
emittedMessagesIds: new ObliviousSet(options.node.ttl * 2), | ||
messagesCallbackTime: null, | ||
messagesCallback: null, | ||
writeQueue, | ||
// ensures we do not read messages in parrallel | ||
writeBlockPromise: Promise.resolve(), | ||
otherReaderClients: {}, | ||
@@ -327,2 +326,5 @@ // ensure if process crashes, everything is cleaned up | ||
if (!OTHER_INSTANCES[channelName]) OTHER_INSTANCES[channelName] = []; | ||
OTHER_INSTANCES[channelName].push(state); | ||
const [ | ||
@@ -341,4 +343,8 @@ socketEE, | ||
socketEE.emitter.on('data', data => { | ||
const obj = JSON.parse(data); | ||
handleMessagePing(state, obj); | ||
try { | ||
const obj = JSON.parse(data); | ||
handleMessagePing(state, obj); | ||
} catch (err) { | ||
throw new Error('could not parse data: ' + data); | ||
} | ||
}); | ||
@@ -349,13 +355,22 @@ | ||
export function _filterMessage(msgObj, state) { | ||
/* console.log('_filterMessage()'); | ||
console.dir(msgObj); | ||
console.log(msgObj.senderUuid === state.uuid); | ||
console.log(state.emittedMessagesIds.has(msgObj.token)); | ||
console.log(!state.messagesCallback); | ||
console.log(msgObj.time < state.messagesCallbackTime); | ||
console.log(msgObj.time < state.time);*/ | ||
export function _filterMessage(msgObj, state) { | ||
if (msgObj.senderUuid === state.uuid) return false; // not send by own | ||
if (state.emittedMessagesIds.has(msgObj.token)) return false; // not already emitted | ||
if (!state.messagesCallback) return false; // no listener | ||
if (msgObj.time < state.messagesCallbackTime) return false; // not older then onMessageCallback | ||
if (msgObj.time < state.time) return false; // msgObj is older then channel | ||
state.emittedMessagesIds.add(msgObj.token); | ||
return true; | ||
} | ||
/** | ||
@@ -366,2 +381,3 @@ * when the socket pings, so that we now new messages came, | ||
export async function handleMessagePing(state, msgObj) { | ||
/** | ||
@@ -401,8 +417,5 @@ * when there are no listener, we do nothing | ||
state.emittedMessagesIds.add(msgObj.token); | ||
setTimeout( | ||
() => state.emittedMessagesIds.delete(msgObj.token), | ||
state.options.node.ttl * 2 | ||
); | ||
if (state.messagesCallback) { | ||
// emit to subscribers | ||
state.messagesCallback(msgObj.content.data); | ||
@@ -445,46 +458,75 @@ } | ||
export function postMessage(channelState, messageJson) { | ||
// ensure we do this not in parallel | ||
return channelState.writeQueue.requestIdlePromise() | ||
.then( | ||
() => channelState.writeQueue.wrapCall( | ||
async () => { | ||
const [msgObj] = await Promise.all([ | ||
writeMessage( | ||
channelState.channelName, | ||
channelState.uuid, | ||
messageJson | ||
), | ||
refreshReaderClients(channelState) | ||
]); | ||
const pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}'; | ||
await Promise.all( | ||
Object.values(channelState.otherReaderClients) | ||
.filter(client => client.writable) // client might have closed in between | ||
.map(client => { | ||
return new Promise(res => { | ||
client.write(pingStr, res); | ||
}); | ||
}) | ||
); | ||
const writePromise = writeMessage( | ||
channelState.channelName, | ||
channelState.uuid, | ||
messageJson | ||
); | ||
/** | ||
* clean up old messages | ||
* to not waste resources on cleaning up, | ||
* only if random-int matches, we clean up old messages | ||
*/ | ||
if (randomInt(0, 50) === 0) { | ||
/* await */ getAllMessages(channelState.channelName) | ||
.then(allMessages => cleanOldMessages(allMessages, channelState.options.node.ttl)); | ||
} | ||
channelState.writeBlockPromise = channelState.writeBlockPromise.then(async () => { | ||
// emit to own eventEmitter | ||
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson))); | ||
} | ||
) | ||
// w8 to ticks to let the buffer flush | ||
await new Promise(res => setTimeout(res, 0)); | ||
await new Promise(res => setTimeout(res, 0)); | ||
const [msgObj] = await Promise.all([ | ||
writePromise, | ||
refreshReaderClients(channelState) | ||
]); | ||
emitOverFastPath(channelState, msgObj, messageJson); | ||
const pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}'; | ||
await Promise.all( | ||
Object.values(channelState.otherReaderClients) | ||
.filter(client => client.writable) // client might have closed in between | ||
.map(client => { | ||
return new Promise(res => { | ||
client.write(pingStr, res); | ||
}); | ||
}) | ||
); | ||
/** | ||
* clean up old messages | ||
* to not waste resources on cleaning up, | ||
* only if random-int matches, we clean up old messages | ||
*/ | ||
if (randomInt(0, 20) === 0) { | ||
/* await */ getAllMessages(channelState.channelName) | ||
.then(allMessages => cleanOldMessages(allMessages, channelState.options.node.ttl)); | ||
} | ||
// emit to own eventEmitter | ||
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson))); | ||
}); | ||
return channelState.writeBlockPromise; | ||
} | ||
/** | ||
* When multiple BroadcastChannels with the same name | ||
* are created in a single node-process, we can access them directly and emit messages. | ||
* This might not happen often in production | ||
* but will speed up things when this module is used in unit-tests. | ||
*/ | ||
export function emitOverFastPath(state, msgObj, messageJson) { | ||
if (!state.options.node.useFastPath) return; // disabled | ||
const others = OTHER_INSTANCES[state.channelName].filter(s => s !== state); | ||
export function onMessage(channelState, fn, time = new Date().getTime()) { | ||
const checkObj = { | ||
time: msgObj.time, | ||
senderUuid: msgObj.uuid, | ||
token: msgObj.token | ||
}; | ||
others | ||
.filter(otherState => _filterMessage(checkObj, otherState)) | ||
.forEach(otherState => { | ||
// console.log('EMIT OVER FAST PATH'); | ||
otherState.messagesCallback(messageJson); | ||
}); | ||
} | ||
export function onMessage(channelState, fn, time = microSeconds()) { | ||
channelState.messagesCallbackTime = time; | ||
@@ -498,2 +540,4 @@ channelState.messagesCallback = fn; | ||
channelState.closed = true; | ||
channelState.emittedMessagesIds.clear(); | ||
OTHER_INSTANCES[channelState.channelName] = OTHER_INSTANCES[channelState.channelName].filter(o => o !== channelState); | ||
@@ -510,3 +554,2 @@ if (typeof channelState.removeUnload === 'function') | ||
channelState.socketEE.emitter.removeAllListeners(); | ||
channelState.writeQueue.clear(); | ||
@@ -524,5 +567,8 @@ Object.values(channelState.otherReaderClients) | ||
export function averageResponseTime() { | ||
return 50; | ||
} | ||
export function microSeconds() { | ||
return parseInt(micro.microseconds()); | ||
} |
@@ -22,4 +22,5 @@ export function fillOptionsWithDefaults(options) { | ||
if (!options.node.ttl) options.node.ttl = 1000 * 60 * 2; // 2 minutes; | ||
if(typeof options.node.useFastPath === 'undefined') options.node.useFastPath = true; | ||
return options; | ||
} |
@@ -34,2 +34,25 @@ /** | ||
return text; | ||
} | ||
let lastMs = 0; | ||
let additional = 0; | ||
/** | ||
* returns the current time in micro-seconds, | ||
* WARNING: This is a pseudo-function | ||
* Performance.now is not reliable in webworkers, so we just make sure to never return the same time. | ||
* This is enough in browsers, and this function will not be used in nodejs. | ||
* The main reason for this hack is to ensure that BroadcastChannel behaves equal to production when it is used in fast-running unit tests. | ||
*/ | ||
export function microSeconds() { | ||
const ms = new Date().getTime(); | ||
if (ms === lastMs) { | ||
additional++; | ||
return ms * 1000 + additional; | ||
} else { | ||
lastMs = ms; | ||
additional = 0; | ||
return ms * 1000; | ||
} | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
203496
50
4835
7
+ Addedmicroseconds@0.1.0
+ Addednano-time@1.0.0
+ Addedbig-integer@1.6.52(transitive)
+ Addedmicroseconds@0.1.0(transitive)
+ Addednano-time@1.0.0(transitive)