Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

broadcast-channel

Package Overview
Dependencies
Maintainers
1
Versions
98
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

broadcast-channel - npm Package Compare versions

Comparing version 2.0.1 to 2.0.2

566

dist/es/methods/node.js

@@ -5,4 +5,3 @@ import _regeneratorRuntime from 'babel-runtime/regenerator';

var ensureFoldersExist = function () {
var _ref = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee(channelName) {
var paths;
var _ref = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee(channelName, paths) {
return _regeneratorRuntime.wrap(function _callee$(_context) {

@@ -12,10 +11,14 @@ while (1) {

case 0:
paths = getPaths(channelName);
_context.next = 3;
return mkdir(paths.base)['catch'](function () {
return null;
});
paths = paths || getPaths(channelName);
case 3:
_context.next = 5;
if (!ENSURE_BASE_FOLDER_EXISTS_PROMISE) {
ENSURE_BASE_FOLDER_EXISTS_PROMISE = mkdir(paths.base)['catch'](function () {
return null;
});
}
_context.next = 4;
return ENSURE_BASE_FOLDER_EXISTS_PROMISE;
case 4:
_context.next = 6;
return mkdir(paths.channelBase)['catch'](function () {

@@ -25,4 +28,4 @@ return null;

case 5:
_context.next = 7;
case 6:
_context.next = 8;
return Promise.all([mkdir(paths.readers)['catch'](function () {

@@ -34,3 +37,3 @@ return null;

case 7:
case 8:
case 'end':

@@ -43,3 +46,3 @@ return _context.stop();

return function ensureFoldersExist(_x) {
return function ensureFoldersExist(_x, _x2) {
return _ref.apply(this, arguments);

@@ -73,9 +76,11 @@ };

case 4:
_context2.next = 6;
ENSURE_BASE_FOLDER_EXISTS_PROMISE = null;
_context2.next = 7;
return removeDir(paths.base);
case 6:
case 7:
ENSURE_BASE_FOLDER_EXISTS_PROMISE = null;
return _context2.abrupt('return', true);
case 7:
case 9:
case 'end':

@@ -98,3 +103,3 @@ return _context2.stop();

var createSocketEventEmitter = function () {
var _ref3 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee3(channelName, readerUuid) {
var _ref3 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee3(channelName, readerUuid, paths) {
var pathToSocket, emitter, server;

@@ -105,3 +110,3 @@ return _regeneratorRuntime.wrap(function _callee3$(_context3) {

case 0:
pathToSocket = socketPath(channelName, readerUuid);
pathToSocket = socketPath(channelName, readerUuid, paths);
emitter = new events.EventEmitter();

@@ -116,3 +121,2 @@ server = net.createServer(function (stream) {

return new Promise(function (resolve, reject) {
server.listen(pathToSocket, function (err, res) {

@@ -124,4 +128,2 @@ if (err) reject(err);else resolve(res);

case 5:
server.on('connection', function () {});
return _context3.abrupt('return', {

@@ -133,3 +135,3 @@ path: pathToSocket,

case 7:
case 6:
case 'end':

@@ -142,3 +144,3 @@ return _context3.stop();

return function createSocketEventEmitter(_x2, _x3) {
return function createSocketEventEmitter(_x3, _x4, _x5) {
return _ref3.apply(this, arguments);

@@ -173,3 +175,3 @@ };

return function openClientConnection(_x4, _x5) {
return function openClientConnection(_x6, _x7) {
return _ref4.apply(this, arguments);

@@ -182,45 +184,6 @@ };

* so other readers can find it
* @return {Promise}
*/
var writeMessage = function () {
var _ref5 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee5(channelName, readerUuid, messageJson) {
var time, writeObject, token, fileName, msgPath;
return _regeneratorRuntime.wrap(function _callee5$(_context5) {
while (1) {
switch (_context5.prev = _context5.next) {
case 0:
time = microSeconds();
writeObject = {
uuid: readerUuid,
time: time,
data: messageJson
};
token = randomToken(12);
fileName = time + '_' + readerUuid + '_' + token + '.json';
msgPath = path.join(getPaths(channelName).messages, fileName);
_context5.next = 7;
return writeFile(msgPath, JSON.stringify(writeObject));
case 7:
return _context5.abrupt('return', {
time: time,
uuid: readerUuid,
token: token,
path: msgPath
});
case 8:
case 'end':
return _context5.stop();
}
}
}, _callee5, this);
}));
return function writeMessage(_x6, _x7, _x8) {
return _ref5.apply(this, arguments);
};
}();
/**

@@ -230,18 +193,17 @@ * returns the uuids of all readers

*/
var getReadersUuids = function () {
var _ref6 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee6(channelName) {
var _ref5 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee5(channelName, paths) {
var readersPath, files;
return _regeneratorRuntime.wrap(function _callee6$(_context6) {
return _regeneratorRuntime.wrap(function _callee5$(_context5) {
while (1) {
switch (_context6.prev = _context6.next) {
switch (_context5.prev = _context5.next) {
case 0:
readersPath = getPaths(channelName).readers;
_context6.next = 3;
paths = paths || getPaths(channelName);
readersPath = paths.readers;
_context5.next = 4;
return readdir(readersPath);
case 3:
files = _context6.sent;
return _context6.abrupt('return', files.map(function (file) {
case 4:
files = _context5.sent;
return _context5.abrupt('return', files.map(function (file) {
return file.split('.');

@@ -255,12 +217,12 @@ }).filter(function (split) {

case 5:
case 6:
case 'end':
return _context6.stop();
return _context5.stop();
}
}
}, _callee6, this);
}, _callee5, this);
}));
return function getReadersUuids(_x9) {
return _ref6.apply(this, arguments);
return function getReadersUuids(_x8, _x9) {
return _ref5.apply(this, arguments);
};

@@ -270,22 +232,22 @@ }();

var messagePath = function () {
var _ref7 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee7(channelName, time, token, writerUuid) {
var _ref6 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee6(channelName, time, token, writerUuid) {
var fileName, msgPath;
return _regeneratorRuntime.wrap(function _callee7$(_context7) {
return _regeneratorRuntime.wrap(function _callee6$(_context6) {
while (1) {
switch (_context7.prev = _context7.next) {
switch (_context6.prev = _context6.next) {
case 0:
fileName = time + '_' + writerUuid + '_' + token + '.json';
msgPath = path.join(getPaths(channelName).messages, fileName);
return _context7.abrupt('return', msgPath);
return _context6.abrupt('return', msgPath);
case 3:
case 'end':
return _context7.stop();
return _context6.stop();
}
}
}, _callee7, this);
}, _callee6, this);
}));
return function messagePath(_x10, _x11, _x12, _x13) {
return _ref7.apply(this, arguments);
return _ref6.apply(this, arguments);
};

@@ -295,15 +257,16 @@ }();

var getAllMessages = function () {
var _ref8 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee8(channelName) {
var _ref7 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee7(channelName, paths) {
var messagesPath, files;
return _regeneratorRuntime.wrap(function _callee8$(_context8) {
return _regeneratorRuntime.wrap(function _callee7$(_context7) {
while (1) {
switch (_context8.prev = _context8.next) {
switch (_context7.prev = _context7.next) {
case 0:
messagesPath = getPaths(channelName).messages;
_context8.next = 3;
paths = paths || getPaths(channelName);
messagesPath = paths.messages;
_context7.next = 4;
return readdir(messagesPath);
case 3:
files = _context8.sent;
return _context8.abrupt('return', files.map(function (file) {
case 4:
files = _context7.sent;
return _context7.abrupt('return', files.map(function (file) {
var fileName = file.split('.')[0];

@@ -320,12 +283,12 @@ var split = fileName.split('_');

case 5:
case 6:
case 'end':
return _context8.stop();
return _context7.stop();
}
}
}, _callee8, this);
}, _callee7, this);
}));
return function getAllMessages(_x14) {
return _ref8.apply(this, arguments);
return function getAllMessages(_x14, _x15) {
return _ref7.apply(this, arguments);
};

@@ -335,10 +298,10 @@ }();

var cleanOldMessages = function () {
var _ref9 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee9(messageObjects, ttl) {
var _ref8 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee8(messageObjects, ttl) {
var olderThen;
return _regeneratorRuntime.wrap(function _callee9$(_context9) {
return _regeneratorRuntime.wrap(function _callee8$(_context8) {
while (1) {
switch (_context9.prev = _context9.next) {
switch (_context8.prev = _context8.next) {
case 0:
olderThen = Date.now() - ttl;
_context9.next = 3;
_context8.next = 3;
return Promise.all(messageObjects.filter(function (obj) {

@@ -354,29 +317,31 @@ return obj.time / 1000 < olderThen;

case 'end':
return _context9.stop();
return _context8.stop();
}
}
}, _callee9, this);
}, _callee8, this);
}));
return function cleanOldMessages(_x15, _x16) {
return _ref9.apply(this, arguments);
return function cleanOldMessages(_x16, _x17) {
return _ref8.apply(this, arguments);
};
}();
/**
* creates a new channelState
* @return {Promise<any>}
*/
var create = function () {
var _ref10 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee10(channelName) {
var _ref9 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee9(channelName) {
var options = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {};
var time, uuid, state, _ref11, socketEE, infoFilePath;
var time, paths, ensureFolderExistsPromise, uuid, state, _ref10, socketEE, infoFilePath;
return _regeneratorRuntime.wrap(function _callee10$(_context10) {
return _regeneratorRuntime.wrap(function _callee9$(_context9) {
while (1) {
switch (_context10.prev = _context10.next) {
switch (_context9.prev = _context9.next) {
case 0:
options = fillOptionsWithDefaults(options);
time = microSeconds();
_context10.next = 4;
return ensureFoldersExist(channelName);
case 4:
paths = getPaths(channelName);
ensureFolderExistsPromise = ensureFoldersExist(channelName, paths);
uuid = randomToken(10);

@@ -388,2 +353,3 @@ state = {

uuid: uuid,
paths: paths,
// contains all messages that have been emitted before

@@ -407,10 +373,14 @@ emittedMessagesIds: new ObliviousSet(options.node.ttl * 2),

_context10.next = 10;
return Promise.all([createSocketEventEmitter(channelName, uuid), createSocketInfoFile(channelName, uuid), refreshReaderClients(state)]);
_context9.next = 10;
return ensureFolderExistsPromise;
case 10:
_ref11 = _context10.sent;
socketEE = _ref11[0];
infoFilePath = _ref11[1];
_context9.next = 12;
return Promise.all([createSocketEventEmitter(channelName, uuid, paths), createSocketInfoFile(channelName, uuid, paths), refreshReaderClients(state)]);
case 12:
_ref10 = _context9.sent;
socketEE = _ref10[0];
infoFilePath = _ref10[1];
state.socketEE = socketEE;

@@ -437,14 +407,14 @@ state.infoFilePath = infoFilePath;

return _context10.abrupt('return', state);
return _context9.abrupt('return', state);
case 17:
case 19:
case 'end':
return _context10.stop();
return _context9.stop();
}
}
}, _callee10, this);
}, _callee9, this);
}));
return function create(_x17) {
return _ref10.apply(this, arguments);
return function create(_x18) {
return _ref9.apply(this, arguments);
};

@@ -458,14 +428,14 @@ }();

var handleMessagePing = function () {
var _ref12 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee11(state, msgObj) {
var _ref11 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee10(state, msgObj) {
var messages, useMessages;
return _regeneratorRuntime.wrap(function _callee11$(_context11) {
return _regeneratorRuntime.wrap(function _callee10$(_context10) {
while (1) {
switch (_context11.prev = _context11.next) {
switch (_context10.prev = _context10.next) {
case 0:
if (state.messagesCallback) {
_context11.next = 2;
_context10.next = 2;
break;
}
return _context11.abrupt('return');
return _context10.abrupt('return');

@@ -476,12 +446,12 @@ case 2:

if (msgObj) {
_context11.next = 9;
_context10.next = 9;
break;
}
_context11.next = 6;
return getAllMessages(state.channelName);
_context10.next = 6;
return getAllMessages(state.channelName, state.paths);
case 6:
messages = _context11.sent;
_context11.next = 10;
messages = _context10.sent;
_context10.next = 10;
break;

@@ -491,3 +461,3 @@

// get single message
messages = [getSingleMessage(state.channelName, msgObj)];
messages = [getSingleMessage(state.channelName, msgObj, state.paths)];

@@ -505,10 +475,10 @@ case 10:

if (!(!useMessages.length || !state.messagesCallback)) {
_context11.next = 13;
_context10.next = 13;
break;
}
return _context11.abrupt('return');
return _context10.abrupt('return');
case 13:
_context11.next = 15;
_context10.next = 15;
return Promise.all(useMessages.map(function (msgObj) {

@@ -533,133 +503,19 @@ return readMessage(msgObj).then(function (content) {

case 'end':
return _context11.stop();
return _context10.stop();
}
}
}, _callee11, this);
}, _callee10, this);
}));
return function handleMessagePing(_x19, _x20) {
return _ref12.apply(this, arguments);
return function handleMessagePing(_x20, _x21) {
return _ref11.apply(this, arguments);
};
}();
var refreshReaderClients = function () {
var _ref13 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee14(channelState) {
var _this = this;
/**
* ensures that the channelState is connected with all other readers
* @return {Promise<void>}
*/
var otherReaders;
return _regeneratorRuntime.wrap(function _callee14$(_context14) {
while (1) {
switch (_context14.prev = _context14.next) {
case 0:
_context14.next = 2;
return getReadersUuids(channelState.channelName);
case 2:
otherReaders = _context14.sent;
// remove subscriptions to closed readers
Object.keys(channelState.otherReaderClients).filter(function (readerUuid) {
return !otherReaders.includes(readerUuid);
}).forEach(function () {
var _ref14 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee12(readerUuid) {
return _regeneratorRuntime.wrap(function _callee12$(_context12) {
while (1) {
switch (_context12.prev = _context12.next) {
case 0:
_context12.prev = 0;
_context12.next = 3;
return channelState.otherReaderClients[readerUuid].destroy();
case 3:
_context12.next = 7;
break;
case 5:
_context12.prev = 5;
_context12.t0 = _context12['catch'](0);
case 7:
delete channelState.otherReaderClients[readerUuid];
case 8:
case 'end':
return _context12.stop();
}
}
}, _callee12, _this, [[0, 5]]);
}));
return function (_x22) {
return _ref14.apply(this, arguments);
};
}());
_context14.next = 6;
return Promise.all(otherReaders.filter(function (readerUuid) {
return readerUuid !== channelState.uuid;
}) // not own
.filter(function (readerUuid) {
return !channelState.otherReaderClients[readerUuid];
}) // not already has client
.map(function () {
var _ref15 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee13(readerUuid) {
var client;
return _regeneratorRuntime.wrap(function _callee13$(_context13) {
while (1) {
switch (_context13.prev = _context13.next) {
case 0:
_context13.prev = 0;
if (!channelState.closed) {
_context13.next = 3;
break;
}
return _context13.abrupt('return');
case 3:
_context13.next = 5;
return openClientConnection(channelState.channelName, readerUuid);
case 5:
client = _context13.sent;
channelState.otherReaderClients[readerUuid] = client;
_context13.next = 11;
break;
case 9:
_context13.prev = 9;
_context13.t0 = _context13['catch'](0);
case 11:
case 'end':
return _context13.stop();
}
}
}, _callee13, _this, [[0, 9]]);
}));
return function (_x23) {
return _ref15.apply(this, arguments);
};
}()
// this might throw if the other channel is closed at the same time when this one is running refresh
// so we do not throw an error
));
case 6:
case 'end':
return _context14.stop();
}
}
}, _callee14, this);
}));
return function refreshReaderClients(_x21) {
return _ref13.apply(this, arguments);
};
}();
/**

@@ -716,3 +572,2 @@ * this method is used in nodejs-environments.

if (!getPathsCache.has(channelName)) {
var channelHash = sha3_224(channelName); // use hash incase of strange characters

@@ -742,5 +597,7 @@ /**

function socketPath(channelName, readerUuid) {
var ENSURE_BASE_FOLDER_EXISTS_PROMISE = null;
var paths = getPaths(channelName);
function socketPath(channelName, readerUuid, paths) {
paths = paths || getPaths(channelName);
var socketPath = path.join(paths.readers, readerUuid + '.s');

@@ -750,4 +607,4 @@ return cleanPipeName(socketPath);

function socketInfoPath(channelName, readerUuid) {
var paths = getPaths(channelName);
function socketInfoPath(channelName, readerUuid, paths) {
paths = paths || getPaths(channelName);
var socketPath = path.join(paths.readers, readerUuid + '.json');

@@ -762,4 +619,4 @@ return socketPath;

*/
function createSocketInfoFile(channelName, readerUuid) {
var pathToFile = socketInfoPath(channelName, readerUuid);
function createSocketInfoFile(channelName, readerUuid, paths) {
var pathToFile = socketInfoPath(channelName, readerUuid, paths);
return writeFile(pathToFile, JSON.stringify({

@@ -770,9 +627,31 @@ time: microSeconds()

});
}function writeMessage(channelName, readerUuid, messageJson, paths) {
paths = paths || getPaths(channelName);
var time = microSeconds();
var writeObject = {
uuid: readerUuid,
time: time,
data: messageJson
};
var token = randomToken(12);
var fileName = time + '_' + readerUuid + '_' + token + '.json';
var msgPath = path.join(paths.messages, fileName);
return writeFile(msgPath, JSON.stringify(writeObject)).then(function () {
return {
time: time,
uuid: readerUuid,
token: token,
path: msgPath
};
});
}
function getSingleMessage(channelName, msgObj) {
var messagesPath = getPaths(channelName).messages;
function getSingleMessage(channelName, msgObj, paths) {
paths = paths || getPaths(channelName);
return {
path: path.join(messagesPath, msgObj.t + '_' + msgObj.u + '_' + msgObj.to + '.json'),
path: path.join(paths.messages, msgObj.t + '_' + msgObj.u + '_' + msgObj.to + '.json'),
time: msgObj.t,

@@ -801,17 +680,115 @@ senderUuid: msgObj.u,

return true;
}function refreshReaderClients(channelState) {
var _this = this;
return getReadersUuids(channelState.channelName, channelState.paths).then(function (otherReaders) {
// remove subscriptions to closed readers
Object.keys(channelState.otherReaderClients).filter(function (readerUuid) {
return !otherReaders.includes(readerUuid);
}).forEach(function () {
var _ref12 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee11(readerUuid) {
return _regeneratorRuntime.wrap(function _callee11$(_context11) {
while (1) {
switch (_context11.prev = _context11.next) {
case 0:
_context11.prev = 0;
_context11.next = 3;
return channelState.otherReaderClients[readerUuid].destroy();
case 3:
_context11.next = 7;
break;
case 5:
_context11.prev = 5;
_context11.t0 = _context11['catch'](0);
case 7:
delete channelState.otherReaderClients[readerUuid];
case 8:
case 'end':
return _context11.stop();
}
}
}, _callee11, _this, [[0, 5]]);
}));
return function (_x22) {
return _ref12.apply(this, arguments);
};
}());
// add new readers
return Promise.all(otherReaders.filter(function (readerUuid) {
return readerUuid !== channelState.uuid;
}) // not own
.filter(function (readerUuid) {
return !channelState.otherReaderClients[readerUuid];
}) // not already has client
.map(function () {
var _ref13 = _asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee12(readerUuid) {
var client;
return _regeneratorRuntime.wrap(function _callee12$(_context12) {
while (1) {
switch (_context12.prev = _context12.next) {
case 0:
_context12.prev = 0;
if (!channelState.closed) {
_context12.next = 3;
break;
}
return _context12.abrupt('return');
case 3:
_context12.next = 5;
return openClientConnection(channelState.channelName, readerUuid);
case 5:
client = _context12.sent;
channelState.otherReaderClients[readerUuid] = client;
_context12.next = 11;
break;
case 9:
_context12.prev = 9;
_context12.t0 = _context12['catch'](0);
case 11:
case 'end':
return _context12.stop();
}
}
}, _callee12, _this, [[0, 9]]);
}));
return function (_x23) {
return _ref13.apply(this, arguments);
};
}()
// this might throw if the other channel is closed at the same time when this one is running refresh
// so we do not throw an error
));
});
}
/**
* post a message to the other readers
* @return {Promise<void>}
*/
function postMessage(channelState, messageJson) {
var _this2 = this;
var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson);
var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson, channelState.paths);
channelState.writeBlockPromise = channelState.writeBlockPromise.then(_asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee13() {
var _ref15, msgObj, pingStr, writeToReadersPromise;
channelState.writeBlockPromise = channelState.writeBlockPromise.then(_asyncToGenerator( /*#__PURE__*/_regeneratorRuntime.mark(function _callee15() {
var _ref17, msgObj, pingStr;
return _regeneratorRuntime.wrap(function _callee15$(_context15) {
return _regeneratorRuntime.wrap(function _callee13$(_context13) {
while (1) {
switch (_context15.prev = _context15.next) {
switch (_context13.prev = _context13.next) {
case 0:
_context15.next = 2;
_context13.next = 2;
return new Promise(function (res) {

@@ -822,19 +799,12 @@ return setTimeout(res, 0);

case 2:
_context15.next = 4;
return new Promise(function (res) {
return setTimeout(res, 0);
});
_context13.next = 4;
return Promise.all([writePromise, refreshReaderClients(channelState)]);
case 4:
_context15.next = 6;
return Promise.all([writePromise, refreshReaderClients(channelState)]);
_ref15 = _context13.sent;
msgObj = _ref15[0];
case 6:
_ref17 = _context15.sent;
msgObj = _ref17[0];
emitOverFastPath(channelState, msgObj, messageJson);
pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}|';
_context15.next = 12;
return Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) {
writeToReadersPromise = Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) {
return client.writable;

@@ -848,4 +818,2 @@ }) // client might have closed in between

case 12:
/**

@@ -856,5 +824,6 @@ * clean up old messages

*/
if (randomInt(0, 20) === 0) {
/* await */
getAllMessages(channelState.channelName).then(function (allMessages) {
getAllMessages(channelState.channelName, channelState.paths).then(function (allMessages) {
return cleanOldMessages(allMessages, channelState.options.node.ttl);

@@ -864,11 +833,10 @@ });

// emit to own eventEmitter
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson)));
return _context13.abrupt('return', writeToReadersPromise);
case 13:
case 11:
case 'end':
return _context15.stop();
return _context13.stop();
}
}
}, _callee15, _this2);
}, _callee13, _this2);
})));

@@ -875,0 +843,0 @@

@@ -16,4 +16,3 @@ 'use strict';

var ensureFoldersExist = function () {
var _ref = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee(channelName) {
var paths;
var _ref = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee(channelName, paths) {
return _regenerator2['default'].wrap(function _callee$(_context) {

@@ -23,10 +22,14 @@ while (1) {

case 0:
paths = getPaths(channelName);
_context.next = 3;
return mkdir(paths.base)['catch'](function () {
return null;
});
paths = paths || getPaths(channelName);
case 3:
_context.next = 5;
if (!ENSURE_BASE_FOLDER_EXISTS_PROMISE) {
ENSURE_BASE_FOLDER_EXISTS_PROMISE = mkdir(paths.base)['catch'](function () {
return null;
});
}
_context.next = 4;
return ENSURE_BASE_FOLDER_EXISTS_PROMISE;
case 4:
_context.next = 6;
return mkdir(paths.channelBase)['catch'](function () {

@@ -36,4 +39,4 @@ return null;

case 5:
_context.next = 7;
case 6:
_context.next = 8;
return Promise.all([mkdir(paths.readers)['catch'](function () {

@@ -45,3 +48,3 @@ return null;

case 7:
case 8:
case 'end':

@@ -54,3 +57,3 @@ return _context.stop();

return function ensureFoldersExist(_x) {
return function ensureFoldersExist(_x, _x2) {
return _ref.apply(this, arguments);

@@ -84,9 +87,11 @@ };

case 4:
_context2.next = 6;
ENSURE_BASE_FOLDER_EXISTS_PROMISE = null;
_context2.next = 7;
return removeDir(paths.base);
case 6:
case 7:
ENSURE_BASE_FOLDER_EXISTS_PROMISE = null;
return _context2.abrupt('return', true);
case 7:
case 9:
case 'end':

@@ -109,3 +114,3 @@ return _context2.stop();

var createSocketEventEmitter = function () {
var _ref3 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee3(channelName, readerUuid) {
var _ref3 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee3(channelName, readerUuid, paths) {
var pathToSocket, emitter, server;

@@ -116,3 +121,3 @@ return _regenerator2['default'].wrap(function _callee3$(_context3) {

case 0:
pathToSocket = socketPath(channelName, readerUuid);
pathToSocket = socketPath(channelName, readerUuid, paths);
emitter = new events.EventEmitter();

@@ -127,3 +132,2 @@ server = net.createServer(function (stream) {

return new Promise(function (resolve, reject) {
server.listen(pathToSocket, function (err, res) {

@@ -135,4 +139,2 @@ if (err) reject(err);else resolve(res);

case 5:
server.on('connection', function () {});
return _context3.abrupt('return', {

@@ -144,3 +146,3 @@ path: pathToSocket,

case 7:
case 6:
case 'end':

@@ -153,3 +155,3 @@ return _context3.stop();

return function createSocketEventEmitter(_x2, _x3) {
return function createSocketEventEmitter(_x3, _x4, _x5) {
return _ref3.apply(this, arguments);

@@ -184,3 +186,3 @@ };

return function openClientConnection(_x4, _x5) {
return function openClientConnection(_x6, _x7) {
return _ref4.apply(this, arguments);

@@ -193,45 +195,6 @@ };

* so other readers can find it
* @return {Promise}
*/
var writeMessage = function () {
var _ref5 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee5(channelName, readerUuid, messageJson) {
var time, writeObject, token, fileName, msgPath;
return _regenerator2['default'].wrap(function _callee5$(_context5) {
while (1) {
switch (_context5.prev = _context5.next) {
case 0:
time = microSeconds();
writeObject = {
uuid: readerUuid,
time: time,
data: messageJson
};
token = randomToken(12);
fileName = time + '_' + readerUuid + '_' + token + '.json';
msgPath = path.join(getPaths(channelName).messages, fileName);
_context5.next = 7;
return writeFile(msgPath, JSON.stringify(writeObject));
case 7:
return _context5.abrupt('return', {
time: time,
uuid: readerUuid,
token: token,
path: msgPath
});
case 8:
case 'end':
return _context5.stop();
}
}
}, _callee5, this);
}));
return function writeMessage(_x6, _x7, _x8) {
return _ref5.apply(this, arguments);
};
}();
/**

@@ -241,18 +204,17 @@ * returns the uuids of all readers

*/
var getReadersUuids = function () {
var _ref6 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee6(channelName) {
var _ref5 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee5(channelName, paths) {
var readersPath, files;
return _regenerator2['default'].wrap(function _callee6$(_context6) {
return _regenerator2['default'].wrap(function _callee5$(_context5) {
while (1) {
switch (_context6.prev = _context6.next) {
switch (_context5.prev = _context5.next) {
case 0:
readersPath = getPaths(channelName).readers;
_context6.next = 3;
paths = paths || getPaths(channelName);
readersPath = paths.readers;
_context5.next = 4;
return readdir(readersPath);
case 3:
files = _context6.sent;
return _context6.abrupt('return', files.map(function (file) {
case 4:
files = _context5.sent;
return _context5.abrupt('return', files.map(function (file) {
return file.split('.');

@@ -266,12 +228,12 @@ }).filter(function (split) {

case 5:
case 6:
case 'end':
return _context6.stop();
return _context5.stop();
}
}
}, _callee6, this);
}, _callee5, this);
}));
return function getReadersUuids(_x9) {
return _ref6.apply(this, arguments);
return function getReadersUuids(_x8, _x9) {
return _ref5.apply(this, arguments);
};

@@ -281,22 +243,22 @@ }();

var messagePath = function () {
var _ref7 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee7(channelName, time, token, writerUuid) {
var _ref6 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee6(channelName, time, token, writerUuid) {
var fileName, msgPath;
return _regenerator2['default'].wrap(function _callee7$(_context7) {
return _regenerator2['default'].wrap(function _callee6$(_context6) {
while (1) {
switch (_context7.prev = _context7.next) {
switch (_context6.prev = _context6.next) {
case 0:
fileName = time + '_' + writerUuid + '_' + token + '.json';
msgPath = path.join(getPaths(channelName).messages, fileName);
return _context7.abrupt('return', msgPath);
return _context6.abrupt('return', msgPath);
case 3:
case 'end':
return _context7.stop();
return _context6.stop();
}
}
}, _callee7, this);
}, _callee6, this);
}));
return function messagePath(_x10, _x11, _x12, _x13) {
return _ref7.apply(this, arguments);
return _ref6.apply(this, arguments);
};

@@ -306,15 +268,16 @@ }();

var getAllMessages = function () {
var _ref8 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee8(channelName) {
var _ref7 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee7(channelName, paths) {
var messagesPath, files;
return _regenerator2['default'].wrap(function _callee8$(_context8) {
return _regenerator2['default'].wrap(function _callee7$(_context7) {
while (1) {
switch (_context8.prev = _context8.next) {
switch (_context7.prev = _context7.next) {
case 0:
messagesPath = getPaths(channelName).messages;
_context8.next = 3;
paths = paths || getPaths(channelName);
messagesPath = paths.messages;
_context7.next = 4;
return readdir(messagesPath);
case 3:
files = _context8.sent;
return _context8.abrupt('return', files.map(function (file) {
case 4:
files = _context7.sent;
return _context7.abrupt('return', files.map(function (file) {
var fileName = file.split('.')[0];

@@ -331,12 +294,12 @@ var split = fileName.split('_');

case 5:
case 6:
case 'end':
return _context8.stop();
return _context7.stop();
}
}
}, _callee8, this);
}, _callee7, this);
}));
return function getAllMessages(_x14) {
return _ref8.apply(this, arguments);
return function getAllMessages(_x14, _x15) {
return _ref7.apply(this, arguments);
};

@@ -346,10 +309,10 @@ }();

var cleanOldMessages = function () {
var _ref9 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee9(messageObjects, ttl) {
var _ref8 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee8(messageObjects, ttl) {
var olderThen;
return _regenerator2['default'].wrap(function _callee9$(_context9) {
return _regenerator2['default'].wrap(function _callee8$(_context8) {
while (1) {
switch (_context9.prev = _context9.next) {
switch (_context8.prev = _context8.next) {
case 0:
olderThen = Date.now() - ttl;
_context9.next = 3;
_context8.next = 3;
return Promise.all(messageObjects.filter(function (obj) {

@@ -365,29 +328,31 @@ return obj.time / 1000 < olderThen;

case 'end':
return _context9.stop();
return _context8.stop();
}
}
}, _callee9, this);
}, _callee8, this);
}));
return function cleanOldMessages(_x15, _x16) {
return _ref9.apply(this, arguments);
return function cleanOldMessages(_x16, _x17) {
return _ref8.apply(this, arguments);
};
}();
/**
* creates a new channelState
* @return {Promise<any>}
*/
var create = function () {
var _ref10 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee10(channelName) {
var _ref9 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee9(channelName) {
var options = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {};
var time, uuid, state, _ref11, _ref12, socketEE, infoFilePath;
var time, paths, ensureFolderExistsPromise, uuid, state, _ref10, _ref11, socketEE, infoFilePath;
return _regenerator2['default'].wrap(function _callee10$(_context10) {
return _regenerator2['default'].wrap(function _callee9$(_context9) {
while (1) {
switch (_context10.prev = _context10.next) {
switch (_context9.prev = _context9.next) {
case 0:
options = fillOptionsWithDefaults(options);
time = microSeconds();
_context10.next = 4;
return ensureFoldersExist(channelName);
case 4:
paths = getPaths(channelName);
ensureFolderExistsPromise = ensureFoldersExist(channelName, paths);
uuid = randomToken(10);

@@ -399,2 +364,3 @@ state = {

uuid: uuid,
paths: paths,
// contains all messages that have been emitted before

@@ -418,11 +384,15 @@ emittedMessagesIds: new ObliviousSet(options.node.ttl * 2),

_context10.next = 10;
return Promise.all([createSocketEventEmitter(channelName, uuid), createSocketInfoFile(channelName, uuid), refreshReaderClients(state)]);
_context9.next = 10;
return ensureFolderExistsPromise;
case 10:
_ref11 = _context10.sent;
_ref12 = (0, _slicedToArray3['default'])(_ref11, 2);
socketEE = _ref12[0];
infoFilePath = _ref12[1];
_context9.next = 12;
return Promise.all([createSocketEventEmitter(channelName, uuid, paths), createSocketInfoFile(channelName, uuid, paths), refreshReaderClients(state)]);
case 12:
_ref10 = _context9.sent;
_ref11 = (0, _slicedToArray3['default'])(_ref10, 2);
socketEE = _ref11[0];
infoFilePath = _ref11[1];
state.socketEE = socketEE;

@@ -449,14 +419,14 @@ state.infoFilePath = infoFilePath;

return _context10.abrupt('return', state);
return _context9.abrupt('return', state);
case 18:
case 20:
case 'end':
return _context10.stop();
return _context9.stop();
}
}
}, _callee10, this);
}, _callee9, this);
}));
return function create(_x18) {
return _ref10.apply(this, arguments);
return function create(_x19) {
return _ref9.apply(this, arguments);
};

@@ -470,14 +440,14 @@ }();

var handleMessagePing = function () {
var _ref13 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee11(state, msgObj) {
var _ref12 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee10(state, msgObj) {
var messages, useMessages;
return _regenerator2['default'].wrap(function _callee11$(_context11) {
return _regenerator2['default'].wrap(function _callee10$(_context10) {
while (1) {
switch (_context11.prev = _context11.next) {
switch (_context10.prev = _context10.next) {
case 0:
if (state.messagesCallback) {
_context11.next = 2;
_context10.next = 2;
break;
}
return _context11.abrupt('return');
return _context10.abrupt('return');

@@ -488,12 +458,12 @@ case 2:

if (msgObj) {
_context11.next = 9;
_context10.next = 9;
break;
}
_context11.next = 6;
return getAllMessages(state.channelName);
_context10.next = 6;
return getAllMessages(state.channelName, state.paths);
case 6:
messages = _context11.sent;
_context11.next = 10;
messages = _context10.sent;
_context10.next = 10;
break;

@@ -503,3 +473,3 @@

// get single message
messages = [getSingleMessage(state.channelName, msgObj)];
messages = [getSingleMessage(state.channelName, msgObj, state.paths)];

@@ -517,10 +487,10 @@ case 10:

if (!(!useMessages.length || !state.messagesCallback)) {
_context11.next = 13;
_context10.next = 13;
break;
}
return _context11.abrupt('return');
return _context10.abrupt('return');
case 13:
_context11.next = 15;
_context10.next = 15;
return Promise.all(useMessages.map(function (msgObj) {

@@ -545,133 +515,19 @@ return readMessage(msgObj).then(function (content) {

case 'end':
return _context11.stop();
return _context10.stop();
}
}
}, _callee11, this);
}, _callee10, this);
}));
return function handleMessagePing(_x19, _x20) {
return _ref13.apply(this, arguments);
return function handleMessagePing(_x20, _x21) {
return _ref12.apply(this, arguments);
};
}();
var refreshReaderClients = function () {
var _ref14 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee14(channelState) {
var _this = this;
/**
* ensures that the channelState is connected with all other readers
* @return {Promise<void>}
*/
var otherReaders;
return _regenerator2['default'].wrap(function _callee14$(_context14) {
while (1) {
switch (_context14.prev = _context14.next) {
case 0:
_context14.next = 2;
return getReadersUuids(channelState.channelName);
case 2:
otherReaders = _context14.sent;
// remove subscriptions to closed readers
Object.keys(channelState.otherReaderClients).filter(function (readerUuid) {
return !otherReaders.includes(readerUuid);
}).forEach(function () {
var _ref15 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee12(readerUuid) {
return _regenerator2['default'].wrap(function _callee12$(_context12) {
while (1) {
switch (_context12.prev = _context12.next) {
case 0:
_context12.prev = 0;
_context12.next = 3;
return channelState.otherReaderClients[readerUuid].destroy();
case 3:
_context12.next = 7;
break;
case 5:
_context12.prev = 5;
_context12.t0 = _context12['catch'](0);
case 7:
delete channelState.otherReaderClients[readerUuid];
case 8:
case 'end':
return _context12.stop();
}
}
}, _callee12, _this, [[0, 5]]);
}));
return function (_x22) {
return _ref15.apply(this, arguments);
};
}());
_context14.next = 6;
return Promise.all(otherReaders.filter(function (readerUuid) {
return readerUuid !== channelState.uuid;
}) // not own
.filter(function (readerUuid) {
return !channelState.otherReaderClients[readerUuid];
}) // not already has client
.map(function () {
var _ref16 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee13(readerUuid) {
var client;
return _regenerator2['default'].wrap(function _callee13$(_context13) {
while (1) {
switch (_context13.prev = _context13.next) {
case 0:
_context13.prev = 0;
if (!channelState.closed) {
_context13.next = 3;
break;
}
return _context13.abrupt('return');
case 3:
_context13.next = 5;
return openClientConnection(channelState.channelName, readerUuid);
case 5:
client = _context13.sent;
channelState.otherReaderClients[readerUuid] = client;
_context13.next = 11;
break;
case 9:
_context13.prev = 9;
_context13.t0 = _context13['catch'](0);
case 11:
case 'end':
return _context13.stop();
}
}
}, _callee13, _this, [[0, 9]]);
}));
return function (_x23) {
return _ref16.apply(this, arguments);
};
}()
// this might throw if the other channel is closed at the same time when this one is running refresh
// so we do not throw an error
));
case 6:
case 'end':
return _context14.stop();
}
}
}, _callee14, this);
}));
return function refreshReaderClients(_x21) {
return _ref14.apply(this, arguments);
};
}();
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { 'default': obj }; }

@@ -730,3 +586,2 @@

if (!getPathsCache.has(channelName)) {
var channelHash = sha3_224(channelName); // use hash incase of strange characters

@@ -756,5 +611,7 @@ /**

function socketPath(channelName, readerUuid) {
var ENSURE_BASE_FOLDER_EXISTS_PROMISE = null;
var paths = getPaths(channelName);
function socketPath(channelName, readerUuid, paths) {
paths = paths || getPaths(channelName);
var socketPath = path.join(paths.readers, readerUuid + '.s');

@@ -764,4 +621,4 @@ return cleanPipeName(socketPath);

function socketInfoPath(channelName, readerUuid) {
var paths = getPaths(channelName);
function socketInfoPath(channelName, readerUuid, paths) {
paths = paths || getPaths(channelName);
var socketPath = path.join(paths.readers, readerUuid + '.json');

@@ -776,4 +633,4 @@ return socketPath;

*/
function createSocketInfoFile(channelName, readerUuid) {
var pathToFile = socketInfoPath(channelName, readerUuid);
function createSocketInfoFile(channelName, readerUuid, paths) {
var pathToFile = socketInfoPath(channelName, readerUuid, paths);
return writeFile(pathToFile, JSON.stringify({

@@ -784,9 +641,31 @@ time: microSeconds()

});
}function writeMessage(channelName, readerUuid, messageJson, paths) {
paths = paths || getPaths(channelName);
var time = microSeconds();
var writeObject = {
uuid: readerUuid,
time: time,
data: messageJson
};
var token = randomToken(12);
var fileName = time + '_' + readerUuid + '_' + token + '.json';
var msgPath = path.join(paths.messages, fileName);
return writeFile(msgPath, JSON.stringify(writeObject)).then(function () {
return {
time: time,
uuid: readerUuid,
token: token,
path: msgPath
};
});
}
function getSingleMessage(channelName, msgObj) {
var messagesPath = getPaths(channelName).messages;
function getSingleMessage(channelName, msgObj, paths) {
paths = paths || getPaths(channelName);
return {
path: path.join(messagesPath, msgObj.t + '_' + msgObj.u + '_' + msgObj.to + '.json'),
path: path.join(paths.messages, msgObj.t + '_' + msgObj.u + '_' + msgObj.to + '.json'),
time: msgObj.t,

@@ -815,17 +694,115 @@ senderUuid: msgObj.u,

return true;
}function refreshReaderClients(channelState) {
var _this = this;
return getReadersUuids(channelState.channelName, channelState.paths).then(function (otherReaders) {
// remove subscriptions to closed readers
Object.keys(channelState.otherReaderClients).filter(function (readerUuid) {
return !otherReaders.includes(readerUuid);
}).forEach(function () {
var _ref13 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee11(readerUuid) {
return _regenerator2['default'].wrap(function _callee11$(_context11) {
while (1) {
switch (_context11.prev = _context11.next) {
case 0:
_context11.prev = 0;
_context11.next = 3;
return channelState.otherReaderClients[readerUuid].destroy();
case 3:
_context11.next = 7;
break;
case 5:
_context11.prev = 5;
_context11.t0 = _context11['catch'](0);
case 7:
delete channelState.otherReaderClients[readerUuid];
case 8:
case 'end':
return _context11.stop();
}
}
}, _callee11, _this, [[0, 5]]);
}));
return function (_x22) {
return _ref13.apply(this, arguments);
};
}());
// add new readers
return Promise.all(otherReaders.filter(function (readerUuid) {
return readerUuid !== channelState.uuid;
}) // not own
.filter(function (readerUuid) {
return !channelState.otherReaderClients[readerUuid];
}) // not already has client
.map(function () {
var _ref14 = (0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee12(readerUuid) {
var client;
return _regenerator2['default'].wrap(function _callee12$(_context12) {
while (1) {
switch (_context12.prev = _context12.next) {
case 0:
_context12.prev = 0;
if (!channelState.closed) {
_context12.next = 3;
break;
}
return _context12.abrupt('return');
case 3:
_context12.next = 5;
return openClientConnection(channelState.channelName, readerUuid);
case 5:
client = _context12.sent;
channelState.otherReaderClients[readerUuid] = client;
_context12.next = 11;
break;
case 9:
_context12.prev = 9;
_context12.t0 = _context12['catch'](0);
case 11:
case 'end':
return _context12.stop();
}
}
}, _callee12, _this, [[0, 9]]);
}));
return function (_x23) {
return _ref14.apply(this, arguments);
};
}()
// this might throw if the other channel is closed at the same time when this one is running refresh
// so we do not throw an error
));
});
}
/**
* post a message to the other readers
* @return {Promise<void>}
*/
function postMessage(channelState, messageJson) {
var _this2 = this;
var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson);
var writePromise = writeMessage(channelState.channelName, channelState.uuid, messageJson, channelState.paths);
channelState.writeBlockPromise = channelState.writeBlockPromise.then((0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee13() {
var _ref16, _ref17, msgObj, pingStr, writeToReadersPromise;
channelState.writeBlockPromise = channelState.writeBlockPromise.then((0, _asyncToGenerator3['default'])( /*#__PURE__*/_regenerator2['default'].mark(function _callee15() {
var _ref18, _ref19, msgObj, pingStr;
return _regenerator2['default'].wrap(function _callee15$(_context15) {
return _regenerator2['default'].wrap(function _callee13$(_context13) {
while (1) {
switch (_context15.prev = _context15.next) {
switch (_context13.prev = _context13.next) {
case 0:
_context15.next = 2;
_context13.next = 2;
return new Promise(function (res) {

@@ -836,20 +813,13 @@ return setTimeout(res, 0);

case 2:
_context15.next = 4;
return new Promise(function (res) {
return setTimeout(res, 0);
});
_context13.next = 4;
return Promise.all([writePromise, refreshReaderClients(channelState)]);
case 4:
_context15.next = 6;
return Promise.all([writePromise, refreshReaderClients(channelState)]);
_ref16 = _context13.sent;
_ref17 = (0, _slicedToArray3['default'])(_ref16, 1);
msgObj = _ref17[0];
case 6:
_ref18 = _context15.sent;
_ref19 = (0, _slicedToArray3['default'])(_ref18, 1);
msgObj = _ref19[0];
emitOverFastPath(channelState, msgObj, messageJson);
pingStr = '{"t":' + msgObj.time + ',"u":"' + msgObj.uuid + '","to":"' + msgObj.token + '"}|';
_context15.next = 13;
return Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) {
writeToReadersPromise = Promise.all(Object.values(channelState.otherReaderClients).filter(function (client) {
return client.writable;

@@ -863,4 +833,2 @@ }) // client might have closed in between

case 13:
/**

@@ -871,5 +839,6 @@ * clean up old messages

*/
if (randomInt(0, 20) === 0) {
/* await */
getAllMessages(channelState.channelName).then(function (allMessages) {
getAllMessages(channelState.channelName, channelState.paths).then(function (allMessages) {
return cleanOldMessages(allMessages, channelState.options.node.ttl);

@@ -879,11 +848,10 @@ });

// emit to own eventEmitter
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson)));
return _context13.abrupt('return', writeToReadersPromise);
case 14:
case 12:
case 'end':
return _context15.stop();
return _context13.stop();
}
}
}, _callee15, _this2);
}, _callee13, _this2);
})));

@@ -890,0 +858,0 @@

{
"name": "broadcast-channel",
"version": "2.0.1",
"version": "2.0.2",
"description": "A BroadcastChannel implementation that works with new browsers, older browsers and Node.js",

@@ -34,2 +34,3 @@ "homepage": "https://github.com/pubkey/broadcast-channel#readme",

"test:node": "npm run build && mocha ./test/index.test.js -b --timeout 6000 --exit",
"test:node:loop": "npm run test:node && npm run test:node:loop",
"test:browser": "npm run build && karma start ./config/karma.conf.js --single-run",

@@ -36,0 +37,0 @@ "test:e2e": "concurrently \"npm run docs:serve\" \"sleep 5 && testcafe -b && testcafe all test/e2e.test.js\" --kill-others --success first",

@@ -59,3 +59,2 @@ /**

if (!getPathsCache.has(channelName)) {
const channelHash = sha3_224(channelName); // use hash incase of strange characters

@@ -94,5 +93,11 @@ /**

async function ensureFoldersExist(channelName) {
const paths = getPaths(channelName);
await mkdir(paths.base).catch(() => null);
let ENSURE_BASE_FOLDER_EXISTS_PROMISE = null;
async function ensureFoldersExist(channelName, paths) {
paths = paths || getPaths(channelName);
if (!ENSURE_BASE_FOLDER_EXISTS_PROMISE) {
ENSURE_BASE_FOLDER_EXISTS_PROMISE = mkdir(paths.base).catch(() => null);
}
await ENSURE_BASE_FOLDER_EXISTS_PROMISE;
await mkdir(paths.channelBase).catch(() => null);

@@ -115,3 +120,5 @@ await Promise.all([

}
ENSURE_BASE_FOLDER_EXISTS_PROMISE = null;
await removeDir(paths.base);
ENSURE_BASE_FOLDER_EXISTS_PROMISE = null;
return true;

@@ -121,5 +128,4 @@ }

function socketPath(channelName, readerUuid) {
const paths = getPaths(channelName);
function socketPath(channelName, readerUuid, paths) {
paths = paths || getPaths(channelName);
const socketPath = path.join(

@@ -132,4 +138,4 @@ paths.readers,

function socketInfoPath(channelName, readerUuid) {
const paths = getPaths(channelName);
function socketInfoPath(channelName, readerUuid, paths) {
paths = paths || getPaths(channelName);
const socketPath = path.join(

@@ -148,4 +154,4 @@ paths.readers,

*/
function createSocketInfoFile(channelName, readerUuid) {
const pathToFile = socketInfoPath(channelName, readerUuid);
function createSocketInfoFile(channelName, readerUuid, paths) {
const pathToFile = socketInfoPath(channelName, readerUuid, paths);
return writeFile(

@@ -163,4 +169,4 @@ pathToFile,

*/
async function createSocketEventEmitter(channelName, readerUuid) {
const pathToSocket = socketPath(channelName, readerUuid);
async function createSocketEventEmitter(channelName, readerUuid, paths) {
const pathToSocket = socketPath(channelName, readerUuid, paths);

@@ -177,3 +183,2 @@ const emitter = new events.EventEmitter();

await new Promise((resolve, reject) => {
server.listen(pathToSocket, (err, res) => {

@@ -184,3 +189,2 @@ if (err) reject(err);

});
server.on('connection', () => {});

@@ -210,4 +214,6 @@ return {

* so other readers can find it
* @return {Promise}
*/
async function writeMessage(channelName, readerUuid, messageJson) {
function writeMessage(channelName, readerUuid, messageJson, paths) {
paths = paths || getPaths(channelName);
const time = microSeconds();

@@ -224,17 +230,17 @@ const writeObject = {

const msgPath = path.join(
getPaths(channelName).messages,
paths.messages,
fileName
);
await writeFile(
return writeFile(
msgPath,
JSON.stringify(writeObject)
);
return {
time,
uuid: readerUuid,
token,
path: msgPath
};
).then(() => {
return {
time,
uuid: readerUuid,
token,
path: msgPath
};
});
}

@@ -246,4 +252,5 @@

*/
async function getReadersUuids(channelName) {
const readersPath = getPaths(channelName).readers;
async function getReadersUuids(channelName, paths) {
paths = paths || getPaths(channelName);
const readersPath = paths.readers;
const files = await readdir(readersPath);

@@ -267,4 +274,5 @@

async function getAllMessages(channelName) {
const messagesPath = getPaths(channelName).messages;
async function getAllMessages(channelName, paths) {
paths = paths || getPaths(channelName);
const messagesPath = paths.messages;
const files = await readdir(messagesPath);

@@ -287,8 +295,8 @@ return files.map(file => {

function getSingleMessage(channelName, msgObj) {
const messagesPath = getPaths(channelName).messages;
function getSingleMessage(channelName, msgObj, paths) {
paths = paths || getPaths(channelName);
return {
path: path.join(
messagesPath,
paths.messages,
msgObj.t + '_' + msgObj.u + '_' + msgObj.to + '.json'

@@ -321,6 +329,11 @@ ),

/**
* creates a new channelState
* @return {Promise<any>}
*/
async function create(channelName, options = {}) {
options = fillOptionsWithDefaults(options);
const time = microSeconds();
await ensureFoldersExist(channelName);
const paths = getPaths(channelName);
const ensureFolderExistsPromise = ensureFoldersExist(channelName, paths);
const uuid = randomToken(10);

@@ -333,2 +346,3 @@

uuid,
paths,
// contains all messages that have been emitted before

@@ -349,2 +363,3 @@ emittedMessagesIds: new ObliviousSet(options.node.ttl * 2),

await ensureFolderExistsPromise;
const [

@@ -354,4 +369,4 @@ socketEE,

] = await Promise.all([
createSocketEventEmitter(channelName, uuid),
createSocketInfoFile(channelName, uuid),
createSocketEventEmitter(channelName, uuid, paths),
createSocketInfoFile(channelName, uuid, paths),
refreshReaderClients(state)

@@ -408,7 +423,7 @@ ]);

// get all
messages = await getAllMessages(state.channelName);
messages = await getAllMessages(state.channelName, state.paths);
} else {
// get single message
messages = [
getSingleMessage(state.channelName, msgObj)
getSingleMessage(state.channelName, msgObj, state.paths)
];

@@ -443,46 +458,53 @@ }

async function refreshReaderClients(channelState) {
// ensure we have subscribed to all readers
const otherReaders = await getReadersUuids(channelState.channelName);
/**
* ensures that the channelState is connected with all other readers
* @return {Promise<void>}
*/
function refreshReaderClients(channelState) {
return getReadersUuids(channelState.channelName, channelState.paths)
.then(otherReaders => {
// remove subscriptions to closed readers
Object.keys(channelState.otherReaderClients)
.filter(readerUuid => !otherReaders.includes(readerUuid))
.forEach(async (readerUuid) => {
try {
await channelState.otherReaderClients[readerUuid].destroy();
} catch (err) {}
delete channelState.otherReaderClients[readerUuid];
});
// remove subscriptions to closed readers
Object.keys(channelState.otherReaderClients)
.filter(readerUuid => !otherReaders.includes(readerUuid))
.forEach(async (readerUuid) => {
try {
await channelState.otherReaderClients[readerUuid].destroy();
} catch (err) {}
delete channelState.otherReaderClients[readerUuid];
// add new readers
return Promise.all(
otherReaders
.filter(readerUuid => readerUuid !== channelState.uuid) // not own
.filter(readerUuid => !channelState.otherReaderClients[readerUuid]) // not already has client
.map(async (readerUuid) => {
try {
if (channelState.closed) return;
const client = await openClientConnection(channelState.channelName, readerUuid);
channelState.otherReaderClients[readerUuid] = client;
} catch (err) {
// this might throw if the other channel is closed at the same time when this one is running refresh
// so we do not throw an error
}
})
);
});
await Promise.all(
otherReaders
.filter(readerUuid => readerUuid !== channelState.uuid) // not own
.filter(readerUuid => !channelState.otherReaderClients[readerUuid]) // not already has client
.map(async (readerUuid) => {
try {
if (channelState.closed) return;
const client = await openClientConnection(channelState.channelName, readerUuid);
channelState.otherReaderClients[readerUuid] = client;
} catch (err) {
// this might throw if the other channel is closed at the same time when this one is running refresh
// so we do not throw an error
}
})
);
}
/**
* post a message to the other readers
* @return {Promise<void>}
*/
function postMessage(channelState, messageJson) {
const writePromise = writeMessage(
channelState.channelName,
channelState.uuid,
messageJson
messageJson,
channelState.paths
);
channelState.writeBlockPromise = channelState.writeBlockPromise.then(async () => {
// w8 to ticks to let the buffer flush
// w8 one tick to let the buffer flush
await new Promise(res => setTimeout(res, 0));
await new Promise(res => setTimeout(res, 0));

@@ -496,3 +518,3 @@ const [msgObj] = await Promise.all([

await Promise.all(
const writeToReadersPromise = Promise.all(
Object.values(channelState.otherReaderClients)

@@ -514,8 +536,7 @@ .filter(client => client.writable) // client might have closed in between

/* await */
getAllMessages(channelState.channelName)
getAllMessages(channelState.channelName, channelState.paths)
.then(allMessages => cleanOldMessages(allMessages, channelState.options.node.ttl));
}
// emit to own eventEmitter
// channelState.socketEE.emitter.emit('data', JSON.parse(JSON.stringify(messageJson)));
return writeToReadersPromise;
});

@@ -522,0 +543,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc