Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

clusterluck

Package Overview
Dependencies
Maintainers
5
Versions
11
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

clusterluck - npm Package Compare versions

Comparing version 1.3.0 to 2.0.0

test/integration/dlm.js

77

bin/cli.js

@@ -40,2 +40,8 @@ #!/usr/local/bin/node

var singular = argv._.length === 0;
function log(...args) {
if (singular) console.log.apply(null, args);
}
const from = {id: argv.I + "_" + process.pid};

@@ -87,3 +93,3 @@ let client;

data = _.omit(NetKernel._decodeMsg(this._cookie, data), "tag");
console.log(util.inspect(data, {depth: null}));
console.log(JSON.stringify(data, null, 2));
cb();

@@ -99,3 +105,5 @@ }

_handleConnect() {
console.log("Connected to %s", this._id);
if (singular) {
log("Connected to %s", this._id);
}
this._connected = true;

@@ -110,3 +118,3 @@ this._disLog = false;

this._disLog = true;
console.log("Disconnected from %s", this._id);
log("Disconnected from %s", this._id);
}

@@ -146,2 +154,9 @@ this._connected = false;

vorpal
.command("nodes")
.description("Prints the nodes of the ring of this node to the console.")
.action(function (args, cb) {
client.send("nodes", null, cb);
});
vorpal
.command("get <id>")

@@ -189,2 +204,19 @@ .description("Returns information about a node's hostname and port in this node's cluster.")

vorpal
.command("weight <id>")
.types({
string: ["id"]
})
.action(function (args, cb) {
client.send("weight", {
id: args.id
}, cb);
});
vorpal
.command("weights")
.action(function (args, cb) {
client.send("weights", null, cb);
});
vorpal
.command("leave")

@@ -210,4 +242,5 @@ .description("Leaves the cluster this node is a part of. " +

.option("-f, --force", forceStr)
.option("-w, --weight <weight>", "Number of virtual nodes to assign to the node being inserted. Defaults to the `rfactor` of the session node.")
.types({
string: ["id", "host", "port"]
string: ["id", "host", "port", "weight"]
})

@@ -217,2 +250,3 @@ .action(function (args, cb) {

force: args.options.force,
weight: parseInt(args.options.weight),
node: {id: args.id, host: args.host, port: parseInt(args.port)}

@@ -229,2 +263,3 @@ }, cb);

.option("-f, --force", forceStr)
.option("-w, --weight <w>", "Number of virtual nodes to assign to the nodes being inserted. Defaults to the `rfactor` of the session node.")
.action(function (args, cb) {

@@ -234,2 +269,3 @@ const nodes = parseNodeList(args.nodes);

force: args.options.force,
weight: parseInt(args.options.weight),
nodes: nodes

@@ -240,2 +276,20 @@ }, cb);

vorpal
.command("update <id> <host> <port> [weight]")
.description("Inserts a node into this node's cluster. " +
"If force is passed, the gossip processor on this node " +
"won't wait for current message streams to be processed " +
"before executing this command.")
.option("-f, --force", forceStr)
.types({
string: ["id", "host", "port", "weight"]
})
.action(function (args, cb) {
client.send("update", {
force: args.options.force,
weight: parseInt(args.weight),
node: {id: args.id, host: args.host, port: parseInt(args.port)}
}, cb);
});
vorpal
.command("remove <id> <host> <port>")

@@ -272,3 +326,12 @@ .description("Removes a node from this node's cluster. " +

console.log("Connecting to IPC server on node: %s, host: %s, port: %s", argv.I, argv.H, argv.p);
vorpal
.command("ping")
.description("Ping a node in the cluster.")
.action(function (args, cb) {
client.send("ping", null, cb);
});
if (singular) {
log("Connecting to IPC server on node: %s, host: %s, port: %s", argv.I, argv.H, argv.p);
}
ipc.config.silent = true;

@@ -280,3 +343,3 @@ ipc.config.sync = true;

client.once("connect", () => {
if (argv._.length === 0) {
if (singular) {
vorpal

@@ -286,3 +349,3 @@ .delimiter("> ")

} else {
vorpal.exec(argv._, function (err, res) {
vorpal.exec(argv._.join(" "), function (err, res) {
client.stop();

@@ -289,0 +352,0 @@ if (err) {

# Change Log
## [v2.0.0](https://github.com/azuqua/clusterluck/tree/v2.0.0) (2017-10-08)
[Full Changelog](https://github.com/azuqua/clusterluck/compare/v1.3.0...v2.0.0)
**Implemented enhancements:**
- Add ability to explicitly set rfactor on node insertion/update, grab explicit range length of nodes [\#45](https://github.com/azuqua/clusterluck/issues/45)
- Documentation or interface goodies for setting configurations on external IPC connections [\#42](https://github.com/azuqua/clusterluck/issues/42)
- Add convenience commands to CLI tool for reading nodes from a ring, pinging external nodes [\#41](https://github.com/azuqua/clusterluck/issues/41)
**Merged pull requests:**
- Develop [\#51](https://github.com/azuqua/clusterluck/pull/51) ([kevinwilson541](https://github.com/kevinwilson541))
- update missing documentation for parameters on insert/minsert/range f… [\#50](https://github.com/azuqua/clusterluck/pull/50) ([kevinwilson541](https://github.com/kevinwilson541))
- 2.0.0 alpha [\#49](https://github.com/azuqua/clusterluck/pull/49) ([kevinwilson541](https://github.com/kevinwilson541))
- update CLI output format to be parseable by jq and other JSON parsing… [\#48](https://github.com/azuqua/clusterluck/pull/48) ([kevinwilson541](https://github.com/kevinwilson541))
- integration tests for dlm/dsm/gen\_server [\#47](https://github.com/azuqua/clusterluck/pull/47) ([kevinwilson541](https://github.com/kevinwilson541))
- Kevin/experiment [\#46](https://github.com/azuqua/clusterluck/pull/46) ([kevinwilson541](https://github.com/kevinwilson541))
- update README with new CLI commands \(nodes, ping\) [\#44](https://github.com/azuqua/clusterluck/pull/44) ([kevinwilson541](https://github.com/kevinwilson541))
- Kevin/experiment [\#43](https://github.com/azuqua/clusterluck/pull/43) ([kevinwilson541](https://github.com/kevinwilson541))
- Kevin/experiment [\#40](https://github.com/azuqua/clusterluck/pull/40) ([kevinwilson541](https://github.com/kevinwilson541))
- Develop [\#39](https://github.com/azuqua/clusterluck/pull/39) ([kevinwilson541](https://github.com/kevinwilson541))
- updated .travis.yml file [\#38](https://github.com/azuqua/clusterluck/pull/38) ([kevinwilson541](https://github.com/kevinwilson541))
## [v1.3.0](https://github.com/azuqua/clusterluck/tree/v1.3.0) (2017-07-12)

@@ -4,0 +27,0 @@ [Full Changelog](https://github.com/azuqua/clusterluck/compare/v1.2.3...v1.3.0)

@@ -72,2 +72,3 @@ var _ = require("lodash"),

* @param {Object} [opts.vclockOpts] - Vector clock options for trimming; occurs at the same interval as `interval`. Defaults to `clusterluck.consts.vclockOpts`.
* @param {Object} [opts.connOpts] - Connection options for when connecting to new nodes.
*

@@ -74,0 +75,0 @@ * @return {Clusterluck.GossipRing} A new gossip ring instance.

326

lib/chash.js

@@ -1,8 +0,12 @@

var _ = require("lodash"),
debug = require("debug")("clusterluck:lib:chash"),
rbt = require("functional-red-black-tree"),
crypto = require("crypto"),
Node = require("./node"),
utils = require("./utils");
const _ = require("lodash"),
debug = require("debug")("clusterluck:lib:chash"),
rbt = require("functional-red-black-tree"),
LRU = require("lru-cache"),
crypto = require("crypto"),
Node = require("./node"),
utils = require("./utils"),
consts = require("./consts");
const chashOpts = consts.chashOpts;
class CHash {

@@ -19,8 +23,13 @@ /**

* @param {RBTree} [tree] - Existing red-black tree to instantiate local tree from.
* @param {Object} [opts] - Options object for hash ring.
* @param {Number} [opts.maxCacheSize] - Maximum cache size for caching hash values in hash ring. Defaults to 5000.
*
*/
constructor(rfactor, pfactor, tree) {
constructor(rfactor, pfactor, tree, opts=chashOpts) {
opts = _.defaults(opts, chashOpts);
this._rfactor = rfactor;
this._pfactor = pfactor;
this._tree = tree || rbt();
this._cache = new LRU({max: opts.maxCacheSize});
this._calculateWeights();
}

@@ -36,2 +45,3 @@

* @param {Node} node - Node to insert into this instance.
* @param {Number} [weight] - Number of times to insert this node. Defaults to `rfactor`.
*

@@ -41,7 +51,9 @@ * @return {Clusterluck.CHash} This instance.

*/
insert(node) {
if (this._tree.get(CHash._nodeName(node, 1))) return this;
_.times(this._rfactor, (n) => {
this._tree = this._tree.insert(CHash._nodeName(node, n+1), node);
insert(node, weight) {
if (this._weights.has(node.id())) return this;
weight = weight !== undefined ? weight : this._rfactor;
_.times(weight, (n) => {
this._tree = this._tree.insert(this._nodeName(node, n+1), node);
});
this._weights.set(node.id(), weight);
return this;

@@ -63,6 +75,8 @@ }

remove(node) {
if (!this._tree.get(CHash._nodeName(node, 1))) return this;
_.times(this._rfactor, (n) => {
this._tree = this._tree.remove(CHash._nodeName(node, n+1));
if (this._weights.has(node.id()) === false) return this;
const weight = this._weights.get(node.id());
_.times(weight, (n) => {
this._tree = this._tree.remove(this._nodeName(node, n+1));
});
this._weights.delete(node.id());
return this;

@@ -85,3 +99,3 @@ }

get(node, def) {
var res = this._tree.get(CHash._nodeName(node, 1));
const res = this._tree.get(this._nodeName(node, 1));
return res === undefined ? def : res;

@@ -91,3 +105,3 @@ }

/**
* Update the state of `node` in the hash ring. Useful if the host and port of a node need to be updated.
* Update the weight of `node` in the hash ring.
*

@@ -99,3 +113,3 @@ * @method update

* @param {Node} node - Node to add state to.
* @param {Node} state - State to add to this hash ring.
* @param {Number} weight - Weight to change `node` to in the ring.
*

@@ -105,6 +119,7 @@ * @return {Clusterluck.CHash} This instance.

*/
update(node, state) {
update(node, weight) {
this.remove(node);
_.times(this._rfactor, (n) => {
this._tree = this._tree.insert(CHash._nodeName(node, n+1), state);
this._weights.set(node.id(), weight);
_.times(weight, (n) => {
this._tree = this._tree.insert(this._nodeName(node, n+1), node);
});

@@ -127,4 +142,4 @@ return this;

find(data) {
var hash = crypto.createHash("sha256").update(data).digest("base64");
var iter = this._tree.gt(hash);
const hash = this._findHash(data);
const iter = this._tree.gt(hash);
if (iter.valid === false) return this._tree.begin.value;

@@ -146,11 +161,40 @@ return iter.value;

*/
next(node) {
next(node, range) {
if (this._weights.size <= 1 || this._weights.has(node.id()) === false) return [];
range = Math.min(range !== undefined ? range : this._pfactor, this._weights.size-1);
const result = new Map();
const name = this._nodeName(node, 1);
const iter = this._tree.gt(name);
this._successors(node, iter, result, range);
return utils.mapValues(result);
}
/**
* Returns a bucket of nodes including and following the hash slot corresponding to `data`. This is bounded by `range`, which dictates the size of the bucket returned).
*
* @method rangeNext
* @memberof Clusterluck.CHash
* @instance
*
* @param {String} data - Data to calculate hash of and find following nodes.
* @param {Number} [range] - Number of nodes to return. Defaults to `rfactor`.
*
* @return {Array} Bucket of nodes with size `range` associated with key `data`.
*
*/
rangeNext(data, range) {
if (this.size() === 0) return [];
if (this.size() === this._rfactor) return [];
var result = new Map();
_.times(this._rfactor, (n) => {
var name = CHash._nodeName(node, n+1);
var iter = this._tree.gt(name);
this._successors(node, iter, result);
});
range = Math.min(range !== undefined ? range : (this._pfactor+1), this._weights.size);
const hash = this._findHash(data);
const result = new Map();
let iter = this._tree.gt(hash);
if (iter.valid === false) iter = this._tree.begin;
const node = iter.value;
result.set(node.id(), node);
iter.next();
this._successors(node, iter, result, range);
return utils.mapValues(result);

@@ -162,3 +206,3 @@ }

*
* @method next
* @method prev
* @memberof Clusterluck.CHash

@@ -172,11 +216,40 @@ * @instance

*/
prev(node) {
prev(node, range) {
if (this.size() <= 1 || this._weights.has(node.id()) === false) return [];
range = Math.min(range !== undefined ? range : this._pfactor, this._weights.size-1);
const result = new Map();
const name = this._nodeName(node, 1);
const iter = this._tree.lt(name);
this._precursors(node, iter, result, range);
return utils.mapValues(result);
}
/**
* Returns a bucket of nodes preceding the hash slot corresponding to `data`. This is bounded by `range`, which dictates the size of the bucket returned).
*
* @method rangePrev
* @memberof Clusterluck.CHash
* @instance
*
* @param {String} data - Data to calculate hash of and find following nodes.
* @param {Number} [range] - Number of nodes to return. Defaults to `rfactor`.
*
* @return {Array} Bucket of nodes with size `range` preceding key `data`.
*
*/
rangePrev(data, range) {
if (this.size() === 0) return [];
if (this.size() === this._rfactor) return [];
var result = new Map();
_.times(this._rfactor, (n) => {
var name = CHash._nodeName(node, n+1);
var iter = this._tree.lt(name);
this._precursors(node, iter, result);
});
range = Math.min(range !== undefined ? range : (this._pfactor+1), this._weights.size);
const hash = this._findHash(data);
const result = new Map();
let iter = this._tree.lt(hash);
if (iter.valid === false) iter = this._tree.end;
const node = iter.value;
result.set(node.id(), node);
iter.prev();
this._precursors(node, iter, result, range);
return utils.mapValues(result);

@@ -198,14 +271,14 @@ }

merge(chash) {
var str;
if (chash.rfactor() !== this._rfactor) {
str = "rfactors " + chash.rfactor() +
" and " + this._rfactor + " do not match.";
throw new Error(str);
}
if (chash.pfactor() !== this._pfactor) {
str = "pfactors " + chash.pfactor() +
" and " + this._pfactor + " do not match.";
throw new Error(str);
}
return this._merge(chash);
chash.tree().forEach((key, value) => {
if (this._tree.get(key) === undefined) {
this._tree = this._tree.insert(key, value);
if (this._weights.has(value.id())) {
const weight = this._weights.get(value.id());
this._weights.set(value.id(), weight+1);
} else {
this._weights.set(value.id(), 1);
}
}
});
return this;
}

@@ -226,14 +299,17 @@

intersect(chash) {
var str;
if (chash.rfactor() !== this._rfactor) {
str = "rfactors " + chash.rfactor() +
" and " + this._rfactor + " do not match.";
throw new Error(str);
}
if (chash.pfactor() !== this._pfactor) {
str = "pfactors " + chash.pfactor() +
" and " + this._pfactor + " do not match.";
throw new Error(str);
}
return this._intersect(chash);
let tree = rbt();
this._weights.clear();
chash.tree().forEach((key, value) => {
if (this._tree.get(key) !== undefined) {
tree = tree.insert(key, value);
if (this._weights.has(value.id())) {
const weight = this._weights.get(value.id());
this._weights.set(value.id(), weight+1);
} else {
this._weights.set(value.id(), 1);
}
}
});
this._tree = tree;
return this;
}

@@ -256,2 +332,30 @@

/**
* Returns the number of nodes in the ring.
*
* @method numberNodes
* @memberof Clusterluck.CHash
* @instance
*
* @return {Number} Number of nodes in this hash ring.
*
*/
numberNodes() {
return this._weights.size;
}
/**
* Returns the weight map from node IDs to weights in the hash ring.
*
* @method weights
* @memberof Clusterluck.CHash
* @instance
*
* @return {Map} Weight map from node IDs to weights
*
*/
weights() {
return this._weights;
}
/**
* Acts as a getter for the nodes in this hash ring.

@@ -267,3 +371,3 @@ *

nodes() {
var memo = {};
const memo = {};
this._tree.forEach((key, node) => {

@@ -286,4 +390,3 @@ memo[node.id()] = node;

isDefined(node) {
var name = CHash._nodeName(node, 1);
return !!this._tree.get(name);
return this._weights.has(node.id());
}

@@ -364,9 +467,6 @@

equals(chash) {
var dims = this._rfactor === chash.rfactor() &&
this._pfactor === chash.pfactor() &&
this._tree.length === chash.tree().length;
if (dims === false) return false;
var tree = chash.tree();
var it = this.begin();
var out;
if (this._tree.length !== chash.tree().length) return false;
const tree = chash.tree();
const it = this.begin();
let out;
while (it.valid) {

@@ -394,7 +494,7 @@ out = tree.get(it.key);

toJSON(fast) {
var tree = [];
const tree = [];
this._tree.forEach((key, val) => {
tree.push({key: key, value: val.toJSON(fast)});
});
var out = {
const out = {
rfactor: this._rfactor,

@@ -424,5 +524,6 @@ pfactor: this._pfactor,

ent.tree.forEach((val) => {
var node = new Node(val.value.id, val.value.host, val.value.port);
const node = new Node(val.value.id, val.value.host, val.value.port);
this._tree = this._tree.insert(val.key, node);
});
this._calculateWeights();
return this;

@@ -483,20 +584,20 @@ }

*/
_successors(node, iter, memo) {
_successors(node, iter, memo, range) {
// if we have more nodes than pfactor, pfactor will be number of elements returned
// otherwise, return number of nodes other than node
if (memo.size === Math.min(this.size()/this._rfactor - 1, this._pfactor)) {
if (memo.size === range) {
return memo;
}
if (iter.valid === false) {
return this._successors(node, this._tree.begin, memo);
return this._successors(node, this._tree.begin, memo, range);
}
if (iter.value.id() === node.id()) {
iter.next();
return this._successors(node, iter, memo);
return this._successors(node, iter, memo, range);
}
var val = iter.value;
const val = iter.value;
memo.set(val.id(), val);
iter.next();
return this._successors(node, iter, memo);
return this._successors(node, iter, memo, range);
}

@@ -512,18 +613,18 @@

*/
_precursors(node, iter, memo) {
if (memo.size === Math.min(this.size()/this._rfactor - 1, this._pfactor)) {
_precursors(node, iter, memo, range) {
if (memo.size === range) {
return memo;
}
if (iter.valid === false) {
return this._precursors(node, this._tree.end, memo);
return this._precursors(node, this._tree.end, memo, range);
}
if (iter.value.id() === node.id()) {
iter.prev();
return this._precursors(node, iter, memo);
return this._precursors(node, iter, memo, range);
}
var val = iter.value;
const val = iter.value;
memo.set(val.id(), iter.value);
iter.prev();
return this._precursors(node, iter, memo);
return this._precursors(node, iter, memo, range);
}

@@ -533,15 +634,10 @@

*
* @method _merge
* @method _nodeName
* @memberof Clusterluck.CHash
* @instance
* @private
* @instance
*
*/
_merge(chash) {
chash.tree().forEach((key, value) => {
if (this._tree.get(key) === undefined) {
this._tree = this._tree.insert(key, value);
}
});
return this;
_nodeName(node, index) {
return this._findHash(node.id() + "_" + index);
}

@@ -551,17 +647,13 @@

*
* @method _intersect
* @method _findHash
* @memberof Clusterluck.CHash
* @instance
* @private
* @instance
*
*/
_intersect(chash) {
var tree = rbt();
chash.tree().forEach((key, value) => {
if (this._tree.get(key) !== undefined) {
tree = tree.insert(key, value);
}
});
this._tree = tree;
return this;
_findHash(data) {
if (this._cache.has(data)) return this._cache.get(data);
const value = crypto.createHash("sha256").update(data).digest("base64");
this._cache.set(data, value);
return value;
}

@@ -571,10 +663,20 @@

*
* @method _nodeName
* @method _calculateWeights
* @memberof Clusterluck.CHash
* @instance
* @private
* @static
*
*/
static _nodeName(node, index) {
return crypto.createHash("sha256").update(node.id() + "_" + index, "utf8").digest("base64");
_calculateWeights() {
const memo = new Map();
this._tree.forEach((key, val) => {
if (memo.has(val.id())) {
const weight = memo.get(val.id());
memo.set(val.id(), weight+1);
} else {
memo.set(val.id(), 1);
}
});
this._weights = memo;
return memo;
}

@@ -581,0 +683,0 @@ }

@@ -121,2 +121,4 @@ var _ = require("lodash"),

* @param {Function} [cb] - Optional callback; called when network kernel has been fully started and listening for IPC messages.
* @param {Object} [opts] - Network kernel options.
* @param {Number} [opts.tokenGenInterval] - Interval in which to generate a new reply token for synchronous cluster calls. Defaults to 1 hour.
*

@@ -126,3 +128,3 @@ * @return {Clusterluck.ClusterNode} This instance.

*/
start(cookie, ringID, cb) {
start(cookie, ringID, cb, kernelOpts = {}) {
// if ring was successfully read from disk and ring ID different than input, error out

@@ -137,3 +139,3 @@ if (typeof this._gossip._ringID === "string" &&

this._gossip.start(ringID);
this._kernel.start({cookie: cookie});
this._kernel.start(_.extend(_.clone(kernelOpts), {cookie: cookie}));

@@ -140,0 +142,0 @@ /**

@@ -1,13 +0,10 @@

var _ = require("lodash"),
async = require("async"),
uuid = require("uuid"),
microtime = require("microtime"),
debug = require("debug")("clusterluck:lib:command_server");
const _ = require("lodash"),
debug = require("debug")("clusterluck:lib:command_server");
var GenServer = require("./gen_server"),
NetKernel = require("./kernel"),
Node = require("./node"),
utils = require("./utils");
const GenServer = require("./gen_server"),
NetKernel = require("./kernel"),
Node = require("./node"),
utils = require("./utils");
var commands = {
const commands = {
"join": utils.hasID,

@@ -18,7 +15,12 @@ "leave": _.identity,

"minsert": utils.parseNodeList,
"update": utils.parseNode,
"remove": utils.parseNode,
"mremove": utils.parseNodeList,
"inspect": _.identity,
"nodes": _.identity,
"has": utils.hasID,
"get": utils.hasID
"get": utils.hasID,
"weight": utils.hasID,
"weights": _.identity,
"ping": _.identity
};

@@ -56,3 +58,3 @@

Object.keys(commands).forEach((key) => {
var handle = this[key].bind(this);
const handle = this[key].bind(this);
this.on(key, handle);

@@ -96,9 +98,9 @@ this.once("stop", _.partial(this.removeListener, key, handle).bind(this));

decodeJob(buf) {
var out = super.decodeJob(buf);
const out = super.decodeJob(buf);
if (out instanceof Error) return out;
var data = out.data;
let data = out.data;
if (commands[out.event] === undefined) {
return new Error("Cannot run against unknown command '%s'", out.event);
return new Error("Cannot run against unknown command '" + out.event + "'");
}
var parser = commands[out.event];
const parser = commands[out.event];
data = parser(data);

@@ -114,2 +116,25 @@ if (data instanceof Error) {

*
* @method decodeSingleton
* @memberof Clusterluck.CommandServer
* @instance
*
* @param {Object} data
*
* @return {Object|Error}
*
*/
decodeSingleton(data) {
const out = super.decodeSingleton(data);
if (commands[out.event] === undefined) {
return new Error("Cannot run against unknown command '" + data.event + "'");
}
const parser = commands[out.event];
data = parser(out.data);
if (data instanceof Error) return data;
out.data = data;
return out;
}
/**
*
* @method join

@@ -127,3 +152,3 @@ * @memberof Clusterluck.CommandServer

join(data, from) {
var out = this._gossip.join(data.id);
const out = this._gossip.join(data.id);
if (out instanceof Error) {

@@ -190,4 +215,5 @@ return this._encodedReply(from, {

insert(data, from) {
var force = data.force === true ? true : false;
this._gossip.insert(data.node, force);
const force = data.force === true ? true : false;
const weight = typeof data.weight === "number" && data.weight > 0 ? data.weight : this._gossip.ring().rfactor();
this._gossip.insert(data.node, weight, force);
return this._encodedReply(from, {ok: true});

@@ -211,6 +237,7 @@ }

minsert(data, from) {
var force = data.force === true ? true : false;
var nodes = data.nodes;
const force = data.force === true ? true : false;
const weight = typeof data.weight === "number" && data.weight > 0 ? data.weight : this._gossip.ring().rfactor();
let nodes = data.nodes;
nodes = nodes.filter((node) => {return node.id() !== this._kernel.self().id();});
this._gossip.minsert(nodes, force);
this._gossip.minsert(nodes, weight, force);
return this._encodedReply(from, {ok: true});

@@ -221,2 +248,24 @@ }

*
* @method update
* @memberof Clusterluck.CommandServer
* @instance
*
* @param {Object} data
* @param {Clusterluck.Node} data.node
* @param {Boolean} data.force
* @param {Number} data.weight
* @param {Object} from
*
* @return {Clusterluck.CommandServer}
*
*/
update(data, from) {
const force = data.force === true ? true : false;
const weight = typeof data.weight === "number" && data.weight > 0 ? data.weight : this._gossip.ring().rfactor();
this._gossip.update(data.node, weight, force);
return this._encodedReply(from, {ok: true});
}
/**
*
* @method remove

@@ -235,3 +284,3 @@ * @memberof Clusterluck.CommandServer

remove(data, from) {
var force = data.force === true ? true : false;
const force = data.force === true ? true : false;
this._gossip.remove(data.node, force);

@@ -256,4 +305,4 @@ return this._encodedReply(from, {ok: true});

mremove(data, from) {
var force = data.force === true ? true : false;
var nodes = data.nodes;
const force = data.force === true ? true : false;
let nodes = data.nodes;
nodes = nodes.filter((node) => {return node.id() !== this._kernel.self().id();});

@@ -277,3 +326,3 @@ this._gossip.mremove(nodes, force);

inspect(data, from) {
var ring = this._gossip.ring();
const ring = this._gossip.ring();
return this._encodedReply(from, {

@@ -287,2 +336,22 @@ ok: true,

*
* @method nodes
* @memberof Clusterluck.CommandServer
* @instance
*
* @param {Any} data
* @param {Object} from
*
* @return {Clusterluck.CommandServer}
*
*/
nodes(data, from) {
const nodes = this._gossip.ring().nodes();
return this._encodedReply(from, {
ok: true,
data: nodes
});
}
/**
*
* @method has

@@ -300,4 +369,4 @@ * @memberof Clusterluck.CommandServer

has(data, from) {
var ring = this._gossip.ring();
var node = new Node(data.id);
const ring = this._gossip.ring();
const node = new Node(data.id);
return this._encodedReply(from, {

@@ -323,6 +392,6 @@ ok: true,

get(data, from) {
var ring = this._gossip.ring();
var node = ring.get(new Node(data.id));
const ring = this._gossip.ring();
const node = ring.get(new Node(data.id));
if (node === undefined) {
var msg = "'" + data.id + "' is not defined in this ring.";
const msg = "'" + data.id + "' is not defined in this ring.";
return this._encodedReply(from, {

@@ -341,2 +410,71 @@ ok: false,

*
* @method weight
* @memberof Clusterluck.CommandServer
* @instance
*
* @param {Object} data
* @param {String} data.id
* @param {Object} from
*
* @return {Clsuterluck.CommandServer}
*
*/
weight(data, from) {
const ring = this._gossip.ring();
const weight = ring.weights().get(data.id);
if (weight === undefined) {
const msg = "'" + data.id + "' is not defined in this ring.";
return this._encodedReply(from, {
ok: false,
error: utils.errorToObject(new Error(msg))
});
}
return this._encodedReply(from, {
ok: true,
data: weight
});
}
/**
*
* @method weights
* @memberof Clusterluck.CommandServer
* @instance
*
* @param {Any} data
* @param {Object} from
*
* @return {Clsuterluck.CommandServer}
*
*/
weights(data, from) {
const ring = this._gossip.ring();
const weights = ring.weights();
return this._encodedReply(from, {
ok: true,
data: utils.mapToObject(weights)
});
}
/**
*
* @method ping
* @memberof Clusterluck.CommandServer
* @instance
*
* @param {Any} data
* @param {Object} from
*
* @return {Clusterluck.CommandServer}
*
*/
ping(data, from) {
return this._encodedReply(from, {
ok: true,
data: "pong"
});
}
/**
*
* @method _encodedReply

@@ -354,3 +492,3 @@ * @memberof Clusterluck.CommandServer

_encodedReply(from, msg) {
var sendMsg = _.extend(NetKernel._encodeMsg(this._kernel.cookie(), _.extend(msg, {
const sendMsg = _.extend(NetKernel._encodeMsg(this._kernel.cookie(), _.extend(msg, {
tag: from.tag

@@ -357,0 +495,0 @@ })));

@@ -1,8 +0,9 @@

var _ = require("lodash"),
async = require("async"),
uuid = require("uuid"),
debug = require("debug")("clusterluck:lib:conn"),
EventEmitter = require("events").EventEmitter,
Queue = require("./queue");
const _ = require("lodash"),
consts = require("./consts"),
debug = require("debug")("clusterluck:lib:conn"),
EventEmitter = require("events").EventEmitter,
Queue = require("./queue");
const connDefaults = consts.connDefaults;
class Connection extends EventEmitter {

@@ -18,6 +19,9 @@ /**

* @param {Clusterluck.Node} node - Node this connection communicates with.
* @param {Object} [opts] - Options object for connection.
* @param {Number} [opts.maxLen] - Maximum length of messages that can buffered while IPC socket is down. Defaults to 1024. Once breached, the oldest messages will be dropped until the queue is of this size. For unbounded buffering, set this to `Infinity`.
*
*/
constructor(ipc, node) {
constructor(ipc, node, opts=connDefaults) {
super();
opts = _.defaults(opts, connDefaults);
this._ipc = ipc;

@@ -29,2 +33,3 @@ this._node = node;

this._streams = new Map();
this._maxLen = opts.maxLen;
}

@@ -160,2 +165,26 @@

*
* Acts as a getter/setter for the max length of the internal message queue
* for this IPC socket connection.
*
* @method maxLen
* @memberof Clusterluck.Connection
* @instance
*
* @param {Number} [len] - Number to set maximum message queue length to.
*
* @return {Number} The maximum message queue length of this IPC socket.
*
*/
maxLen(len) {
if (typeof len === "number" && len >= 0) {
this._maxLen = len;
while (this._queue.size() > this._maxLen) {
this._queue.dequeue();
}
}
return this._maxLen;
}
/**
*
* Sends message `data` under event `event` through this IPC socket.

@@ -178,2 +207,5 @@ *

if (this._connecting === true) {
if (this._queue.size() >= this._maxLen) {
this._queue.dequeue();
}
this._queue.enqueue({

@@ -191,2 +223,17 @@ event: event,

/**
*
* Marks message stream `stream` in order to indicate to this connection beforehand that it is not
* in an idle state.
*
* @method initiateStream
* @memberof Clusterluck.Connection
* @instance
*
* @param {Object} stream - Message stream to mark.
* @param {Object} stream.stream - Unique ID of mesage stream.
*
* @return {Clusterluck.Connection} This instance.
*
*/
initiateStream(stream) {

@@ -232,4 +279,3 @@ this._streams.set(stream.stream, true);

this._connecting = true;
}
else {
} else {
this._connecting = false;

@@ -253,3 +299,3 @@ }

_updateStream(stream) {
if (stream.done) {
if (stream.done && stream.stream) {
this._streams.delete(stream.stream);

@@ -259,4 +305,3 @@ if (this._streams.size === 0) {

}
}
else {
} else if (stream.stream) {
this._streams.set(stream.stream, true);

@@ -263,0 +308,0 @@ }

@@ -25,2 +25,6 @@ var _ = require("lodash"),

var connOpts = {
maxLen: 1024
};
module.exports = {

@@ -44,2 +48,4 @@ networkHost: networkHost,

}),
// defaults for IPC connection
connDefaults: Object.freeze(connOpts),
// defaults for gossip processor

@@ -59,3 +65,4 @@ gossipOpts: Object.freeze({

// to the hash ring
vclockOpts: _.cloneDeep(vclockOpts)
vclockOpts: _.cloneDeep(vclockOpts),
connOpts: _.clone(connOpts)
}),

@@ -65,2 +72,5 @@ // vector clock defaults, in the event we want direct creation/manipulation of vector

vclockOpts: Object.freeze(_.cloneDeep(vclockOpts)),
chashOpts: Object.freeze({
maxCacheSize: 5000
}),
dtableOpts: Object.freeze(dtableOpts),

@@ -70,2 +80,3 @@ dlmOpts: Object.freeze(_.extend({

wquorum: 0.51,
rfactor: 3,
minWaitTimeout: 10,

@@ -72,0 +83,0 @@ maxWaitTimeout: 100,

@@ -48,2 +48,3 @@ const _ = require("lodash"),

* @param {Number} [opts.wquorum] - Quorum for write lock requests.
* @param {Number} [opts.rfactor] - Replication factor for number of nodes to involve in a quorum.
* @param {Number} [opts.minWaitTimeout] - Minimum amount of time in milliseconds to wait for a retry on a locking request.

@@ -66,2 +67,3 @@ * @param {Number} [opts.maxWaitTimeout] - Maximum amount of time in milliseconds to wait for a retry on a locking request.

this._wquorum = opts.wquorum;
this._rfactor = opts.rfactor;
this._minWaitTimeout = opts.minWaitTimeout;

@@ -74,3 +76,4 @@ this._maxWaitTimeout = opts.maxWaitTimeout;

"writeThreshold",
"autoSave"
"autoSave",
"compress"
]));

@@ -118,3 +121,3 @@ } else {

};
this._table.on("idle", handler.bind(this));
this._table.on("idle", handler);
this.once("stop", _.partial(this._table.removeListener, "idle", handler).bind(this._table));

@@ -231,3 +234,3 @@ return this;

rlock(id, holder, timeout, cb, reqTimeout=Infinity, retries=Infinity) {
const nodes = this._gossip.find(id);
const nodes = this._gossip.range(id, this._rfactor);
const time = microtime.now();

@@ -286,3 +289,3 @@ debug("Making rlock request on %s against %s with holder %s...", this._id, id, holder);

wlock(id, holder, timeout, cb, reqTimeout=Infinity, retries=Infinity) {
const nodes = this._gossip.find(id);
const nodes = this._gossip.range(id, this._rfactor);
const time = microtime.now();

@@ -336,3 +339,3 @@ debug("Making wlock request on %s against %s with holder %s...", this._id, id, holder);

runlock(id, holder, cb, reqTimeout=Infinity) {
const nodes = this._gossip.find(id);
const nodes = this._gossip.range(id, this._rfactor);
debug("Unlocking rlock %s on %s with holder %s...", id, this._id, holder);

@@ -363,3 +366,3 @@ this.multicall(nodes, this._id, "runlock", {

runlockAsync(id, holder) {
const nodes = this._gossip.find(id);
const nodes = this._gossip.range(id, this._rfactor);
debug("Asynchronously unlocking rlock %s on %s with holder %s...", id, this._id, holder);

@@ -391,3 +394,3 @@ this.abcast(nodes, this._id, "runlock", {

wunlock(id, holder, cb, reqTimeout=Infinity) {
const nodes = this._gossip.find(id);
const nodes = this._gossip.range(id, this._rfactor);
debug("Unlocking wlock %s on %s with holder %s...", id, this._id, holder);

@@ -418,3 +421,3 @@ this.multicall(nodes, this._id, "wunlock", {

wunlockAsync(id, holder) {
const nodes = this._gossip.find(id);
const nodes = this._gossip.range(id, this._rfactor);
debug("Asynchronously unlocking wlock %s on %s with holder %s...", id, this._id, holder);

@@ -443,3 +446,3 @@ this.abcast(nodes, this._id, "wunlock", {

if (out instanceof Error) return out;
let data = DLMServer.parseJob(out.data, out.event);
const data = DLMServer.parseJob(out.data, out.event);
if (data instanceof Error) {

@@ -454,2 +457,23 @@ return data;

*
* Parses a singleton message stream into an object containing a key/value pair. If we fail to parse the job object (invalid format for given event value, etc.), we just return an error and this GenServer will skip emitting an event. Otherwise, triggers user-defined logic for the parsed event.
*
* @method decodeSingleton
* @memberof Clusterluck.DLMServer
* @instance
*
* @param {Object} data - Message to be processed with `event` and `data` parameters.
*
* @return {Object|Error} Object containing an event and data key/value pair, which are used to emit an event for user-defined logic.
*
*/
decodeSingleton(data) {
data = super.decodeSingleton(data);
const nData = DLMServer.parseJob(data.data, data.event);
if (nData instanceof Error) return nData;
data.data = nData;
return data;
}
/**
*
* @method _doRLock

@@ -470,6 +494,6 @@ * @memberof Clusterluck.DLMServer

debug("rlock request on %s against %s failed, lock already exists as a write lock.", this._id, data.id);
return this.reply(from, DLMServer.encodeResp({ok: false}));
return this.reply(from, {ok: false});
} else if (lock && lock.timeout().has(data.holder)) {
debug("Holder %s already exists on rlock %s on %s", data.holder, data.id, this._id);
return this.reply(from, DLMServer.encodeResp({ok: true}));
return this.reply(from, {ok: true});
}

@@ -486,3 +510,3 @@

debug("Added holder %s to rlock %s on %s.", data.holder, data.id, this._id);
return this.reply(from, DLMServer.encodeResp({ok: true}));
return this.reply(from, {ok: true});
}

@@ -506,3 +530,3 @@

debug("wlock request on %s against %s failed, lock already exists.", this._id, data.id);
return this.reply(from, DLMServer.encodeResp({ok: false}));
return this.reply(from, {ok: false});
}

@@ -515,3 +539,3 @@

debug("added holder %s to wlock %s on %s.", data.holder, data.id, this._id);
return this.reply(from, DLMServer.encodeResp({ok: true}));
return this.reply(from, {ok: true});
}

@@ -536,3 +560,3 @@

debug("runlock request on %s against %s failed, lock isn't an rlock.", this._id, data.id);
return this._safeReply(from, DLMServer.encodeResp({ok: false}));
return this._safeReply(from, {ok: false});
}

@@ -550,6 +574,6 @@

debug("removed holder %s from rlock %s on %s.", data.holder, data.id, this._id);
return this._safeReply(from, DLMServer.encodeResp({ok: true}));
return this._safeReply(from, {ok: true});
} else {
debug("runlock request on %s against %s failed, holder %s is no longer associated.", this._id, data.id, data.holder);
return this._safeReply(from, DLMServer.encodeResp({ok: false}));
return this._safeReply(from, {ok: false});
}

@@ -575,3 +599,3 @@ }

debug("wunlock request on %s against %s failed, lock isn't a wlock.", this._id, data.id);
return this._safeReply(from, DLMServer.encodeResp({ok: false}));
return this._safeReply(from, {ok: false});
}

@@ -585,6 +609,6 @@

debug("removed holder %s from wlock %s on %s.", data.holder, data.id, this._id);
return this._safeReply(from, DLMServer.encodeResp({ok: true}));
return this._safeReply(from, {ok: true});
} else {
debug("wunlock request on %s against %s failed, holder %s is no longer associated.", this._id, data.id, data.holder);
return this._safeReply(from, DLMServer.encodeResp({ok: false}));
return this._safeReply(from, {ok: false});
}

@@ -710,19 +734,2 @@ }

*
* Encoding function for replies to gen_server requests.
*
* @method encodeResp
* @memberof Clusterluck.DLMServer
* @static
*
* @param {Any} res - Response to encode for reply.
*
* @return {String} Encoded response.
*
*/
static encodeResp(res) {
return JSON.stringify(res);
}
/**
*
* Returns the set of nodes with successful responses according to `data`, where each node in `nodes` has a corresponding response in `data` according to index.

@@ -742,4 +749,3 @@ *

return data.reduce((memo, val, idx) => {
val = utils.safeParse(val);
if (util.isObject(val) && val.ok === true) {
if (val.ok === true) {
memo.push(nodes[idx]);

@@ -746,0 +752,0 @@ }

@@ -85,3 +85,4 @@ const _ = require("lodash"),

"writeThreshold",
"autoSave"
"autoSave",
"compress"
]));

@@ -235,5 +236,5 @@ } else {

create(id, n, cb, reqTimeout=Infinity) {
const nodes = this._gossip.find(id);
const node = this._gossip.ring().find(id);
debug("Creating semaphore %s on %s with concurrency of %i...", id, this._id, n);
this.call({node: nodes[0], id: this._id}, "create", {
this.call({node: node, id: this._id}, "create", {
id: id,

@@ -267,10 +268,8 @@ n: n

read(id, cb, reqTimeout=Infinity) {
const nodes = this._gossip.find(id);
const node = this._gossip.ring().find(id);
debug("Reading semaphore %s on %s...", id, this._id);
this.call({node: nodes[0], id: this._id}, "read", {
this.call({node: node, id: this._id}, "read", {
id: id
}, (err, data) => {
if (err) return cb(err);
data = utils.safeParse(data);
if (data instanceof Error) return cb(data);
return cb(null, data.data);

@@ -299,5 +298,5 @@ }, reqTimeout);

destroy(id, cb, reqTimeout=Infinity) {
const nodes = this._gossip.find(id);
const node = this._gossip.ring().find(id);
debug("Destroying semaphore %s on %s", id, this._id);
this.call({node: nodes[0], id: this._id}, "destroy", {
this.call({node: node, id: this._id}, "destroy", {
id: id

@@ -332,6 +331,6 @@ }, (err, data) => {

post(id, holder, timeout, cb, reqTimeout=Infinity, retries=Infinity) {
const nodes = this._gossip.find(id);
const node = this._gossip.ring().find(id);
const time = microtime.now();
debug("Grabbing semaphore %s on %s with holder %s...", id, this._id, holder);
this.call({node: nodes[0], id: this._id}, "post", {
this.call({node: node, id: this._id}, "post", {
id: id,

@@ -345,3 +344,2 @@ holder: holder,

}
data = utils.safeParse(data);
const delta = (microtime.now()-time)/mcsToMs;

@@ -384,5 +382,5 @@ if (delta < timeout && data.ok === true) {

close(id, holder, cb, reqTimeout=Infinity) {
const nodes = this._gossip.find(id);
const node = this._gossip.ring().find(id);
debug("Dropping semaphore %s on %s with holder %s...", id, this._id, holder);
this.call({node: nodes[0], id: this._id}, "close", {
this.call({node: node, id: this._id}, "close", {
id: id,

@@ -411,5 +409,5 @@ holder: holder

closeAsync(id, holder) {
const nodes = this._gossip.find(id);
const node = this._gossip.ring().find(id);
debug("Asynchronously dropping semaphore %s on %s with holder %s...", id, this._id, holder);
this.cast({node: nodes[0], id: this._id}, "close", {
this.cast({node: node, id: this._id}, "close", {
id: id,

@@ -446,2 +444,23 @@ holder: holder

*
* Parses a singleton message stream into an object containing a key/value pair. If we fail to parse the job object (invalid format for given event value, etc.), we just return an error and this GenServer will skip emitting an event. Otherwise, triggers user-defined logic for the parsed event.
*
* @method decodeSingleton
* @memberof Clusterluck.DSMServer
* @instance
*
* @param {Object} data - Message to be processed with `event` and `data` parameters.
*
* @return {Object|Error} Object containing an event and data key/value pair, which are used to emit an event for user-defined logic.
*
*/
decodeSingleton(data) {
data = super.decodeSingleton(data);
const nData = DSMServer.parseJob(data.data, data.event);
if (nData instanceof Error) return nData;
data.data = nData;
return data;
}
/**
*
* @method _doCreate

@@ -466,3 +485,3 @@ * @memberof Clusterluck.DSMServer

} else {
return this.reply(from, DSMServer.encodeResp({ok: true}));
return this.reply(from, {ok: true});
}

@@ -473,3 +492,3 @@ }

this._writeToLog("set", semPrefix + data.id, data.n);
return this.reply(from, DSMServer.encodeResp({ok: true}));
return this.reply(from, {ok: true});
}

@@ -497,6 +516,6 @@

}
return this.reply(from, DSMServer.encodeResp({
return this.reply(from, {
ok: true,
data: {n: sem.size(), active: sem.timeouts().size}
}));
});
}

@@ -520,3 +539,3 @@

if (!sem) {
return this.reply(from, DSMServer.encodeResp({ok: true}));
return this.reply(from, {ok: true});
}

@@ -529,3 +548,3 @@ const timeouts = sem.timeouts();

this._writeToLog("del", semPrefix + data.id);
return this.reply(from, DSMServer.encodeResp({ok: true}));
return this.reply(from, {ok: true});
}

@@ -553,5 +572,5 @@

} else if (sem.timeouts().has(data.holder)) {
return this.reply(from, DSMServer.encodeResp({ok: true}));
return this.reply(from, {ok: true});
} else if (sem.timeouts().size === sem.size()) {
return this.reply(from, DSMServer.encodeResp({ok: false}));
return this.reply(from, {ok: false});
}

@@ -563,3 +582,3 @@ const timeout = setTimeout(_.partial(this._clearSem, data.id, data.holder).bind(this), data.timeout);

this._writeToLog("hset", dataPrefix + data.id, data.holder, {start: created, timeout: data.timeout});
return this.reply(from, DSMServer.encodeResp({ok: true}));
return this.reply(from, {ok: true});
}

@@ -594,5 +613,5 @@

this._writeToLog("hdel", dataPrefix + data.id, data.holder);
return this._safeReply(from, DSMServer.encodeResp({ok: true}));
return this._safeReply(from, {ok: true});
} else {
return this._safeReply(from, DSMServer.encodeResp({ok: false}));
return this._safeReply(from, {ok: false});
}

@@ -718,19 +737,2 @@ }

*
* Encoding function for replies to gen_server requests.
*
* @method encodeResp
* @memberof Clusterluck.DSMServer
* @static
*
* @param {Any} res - Response to encode for reply.
*
* @return {String} Encoded response.
*
*/
static encodeResp(res) {
return JSON.stringify(res);
}
/**
*
* Calculates wait time for retry functionality in post requests.

@@ -737,0 +739,0 @@ *

@@ -9,2 +9,3 @@ const _ = require("lodash"),

readline = require("readline"),
zlib = require("zlib"),
EventEmitter = require("events").EventEmitter;

@@ -18,2 +19,49 @@

const ops = {
set: (key, val, table) => {
table.set(key, val);
return val;
},
sset: (key, val, table) => {
const out = table.get(key) || new Set();
if (!(out instanceof Set)) {
throw DTable.invalidTypeError("sset", key, typeof out);
}
out.add(val);
table.set(key, out);
return out;
},
hset: (key, hkey, val, table) => {
const out = table.get(key) || new Map();
if (!(out instanceof Map)) {
throw DTable.invalidTypeError("hset", key, typeof out);
}
out.set(hkey, val);
table.set(key, out);
return out;
},
del: (key, table) => {
table.delete(key);
},
sdel: (key, val, table) => {
const out = table.get(key) || new Set();
if (!(out instanceof Set)) {
throw DTable.invalidTypeError("sdel", key, typeof out);
}
out.delete(val);
if (out.size === 0) table.delete(key);
},
hdel: (key, hkey, table) => {
const out = table.get(key) || new Map();
if (!(out instanceof Map)) {
throw DTable.invalidTypeError("hdel", key, typeof out);
}
out.delete(hkey);
if (out.size === 0) table.delete(key);
},
clear: (table) => {
table.clear();
}
};
class DTable extends EventEmitter {

@@ -26,3 +74,4 @@ /**

* - Delete LATEST.LOG and reopen with a new file descriptor
* - Dump state of table to DATA.SNAP (storage snapshot)
* - Dump state of table to DATA_PREV.SNAP (storage snapshot)
* - If succesful, move this to DATA.SNAP, else delete DATA_PREV.SNAP
* - Delete PREV.LOG

@@ -48,5 +97,4 @@ *

}
this._path = path.join(opts.path, "DATA.SNAP");
this._aofPath = path.join(opts.path, "LATEST.LOG");
this._tmpAOFPath = path.join(opts.path, "PREV.LOG");
this._prefix = opts.path;
this._autoSave = opts.autoSave;

@@ -62,2 +110,5 @@ this._writeCount = 0;

this._flushing = false;
this._compress = opts.compress !== undefined ? opts.compress : false;
this._encodeFn = typeof opts.encodeFn === "function" ? opts.encodeFn : DTable.encodeValue;
this._decodeFn = typeof opts.decodeFn === "function" ? opts.decodeFn : DTable.decodeValue;
}

@@ -80,2 +131,6 @@

this._name = name;
this._path = path.join(this._prefix, this._name + "_DATA.SNAP");
this._tmpDumpPath = path.join(this._prefix, this._name + "_DATA_PREV.SNAP");
this._aofPath = path.join(this._prefix, this._name + "_LATEST.LOG");
this._tmpAOFPath = path.join(this._prefix, this._name + "_PREV.LOG");
this._id = shortid.generate();

@@ -124,3 +179,4 @@ this._setupDiskOps();

async.series([
_.partial(this._loadState).bind(this),
_.partial(this._loadState, this._path).bind(this),
_.partial(this._loadState, this._tmpDumpPath).bind(this),
_.partial(this._loadAOF, this._tmpAOFPath).bind(this),

@@ -148,2 +204,22 @@ _.partial(this._loadAOF, this._aofPath).bind(this)

*
* Acts as a getter/setter for whether this table instance compresses the state snapshot.
*
* @method compress
* @memberof Clusterluck.Dtable
* @instance
*
* @param {Boolean} bool - Boolean to change the compression flag of this instance to.
*
* @return {Boolean} Whether this instance compresses it's state snapshots.
*
*/
compress(bool) {
if (bool !== undefined) {
this._compress = bool;
}
return this._compress;
}
/**
*
* Retrieves value stored at `key`, returning `undefined` if no such data exists.

@@ -201,3 +277,3 @@ *

hget(key, hkey) {
var out = this._table.get(key) || new Map();
const out = this._table.get(key) || new Map();
if (!(out instanceof Map)) {

@@ -224,3 +300,3 @@ throw new DTable.invalidTypeError("hget", key, typeof out);

set(key, val) {
this._table.set(key, val);
ops.set(key, val, this._table);
this._writeToLog("set", key, val);

@@ -246,8 +322,3 @@ this._updateWriteCount();

sset(key, val) {
const out = this._table.get(key) || new Set();
if (!(out instanceof Set)) {
throw DTable.invalidTypeError("sset", key, typeof out);
}
out.add(val);
this._table.set(key, out);
const out = ops.sset(key, val, this._table);
this._writeToLog("sset", key, val);

@@ -274,8 +345,3 @@ this._updateWriteCount();

hset(key, hkey, val) {
const out = this._table.get(key) || new Map();
if (!(out instanceof Map)) {
throw DTable.invalidTypeError("hset", key, typeof out);
}
out.set(hkey, val);
this._table.set(key, out);
const out = ops.hset(key, hkey, val, this._table);
this._writeToLog("hset", key, hkey, val);

@@ -300,3 +366,3 @@ this._updateWriteCount();

del(key) {
this._table.delete(key);
ops.del(key, this._table);
this._writeToLog("del", key);

@@ -322,8 +388,3 @@ this._updateWriteCount();

sdel(key, val) {
const out = this._table.get(key) || new Set();
if (!(out instanceof Set)) {
throw DTable.invalidTypeError("sdel", key, typeof out);
}
out.delete(val);
if (out.size === 0) this._table.delete(key);
ops.sdel(key, val, this._table);
this._writeToLog("sdel", key, val);

@@ -349,8 +410,3 @@ this._updateWriteCount();

hdel(key, hkey) {
const out = this._table.get(key) || new Map();
if (!(out instanceof Map)) {
throw DTable.invalidTypeError("hdel", key, typeof out);
}
out.delete(hkey);
if (out.size === 0) this._table.delete(key);
ops.hdel(key, hkey, this._table);
this._writeToLog("hdel", key, hkey);

@@ -373,3 +429,3 @@ this._updateWriteCount();

clear() {
this._table.clear();
ops.clear(this._table);
this._writeToLog("clear");

@@ -538,3 +594,3 @@ this._updateWriteCount();

var obj = utils.mapToObject(this._table);
const obj = utils.mapToObject(this._table);
this._fstream.once("close", () => {

@@ -602,13 +658,13 @@ async.series([

let called = false;
const wstream = fs.createWriteStream(this._path);
wstream.on("error", (err) => {
const {fstream, wstream} = this._createFlushStreams();
fstream.on("error", (err) => {
if (called) return;
called = true;
wstream.end();
return cb(err);
fs.unlink(this._tmpDumpPath, _.partial(cb, err));
});
wstream.on("close", () => {
fstream.on("close", () => {
if (called) return;
called = true;
return cb();
fs.rename(this._tmpDumpPath, this._path, cb);
});

@@ -620,4 +676,5 @@ async.eachLimit(Object.keys(obj), 4, (key, next) => {

key: key,
value: DTable.encodeValue(val)
value: this._encodeFn(val)
}) + "\n");
delete obj[key];
async.nextTick(next);

@@ -648,3 +705,3 @@ }, (err) => {

args = args.map((val) => {
return DTable.encodeValue(val);
return this._encodeFn(val);
});

@@ -661,2 +718,22 @@ const out = JSON.stringify({

*
* @method _createFlushStreams
* @memberof Clusterluck.DTable
* @instance
* @private
*
*/
_createFlushStreams() {
const fstream = fs.createWriteStream(this._tmpDumpPath);
let wstream;
if (this._compress) {
wstream = zlib.createGzip();
wstream.pipe(fstream);
} else {
wstream = fstream;
}
return {fstream, wstream};
}
/**
*
* @method _loadState

@@ -670,7 +747,8 @@ * @memberof Clusterluck.DTable

*/
_loadState(cb) {
fs.stat(this._path, (err) => {
_loadState(location, cb) {
fs.stat(location, (err) => {
if (err && err.code !== "ENOENT") return cb(err);
else if (err && err.code === "ENOENT") return cb();
const rstream = fs.createReadStream(this._path);
const {fstream, rstream} = this._createLoadStreams(location);
const rline = readline.createInterface({

@@ -685,6 +763,6 @@ input: rstream

});
rstream.on("error", (err) => {
fstream.on("error", (err) => {
if (called) return;
called = true;
rstream.resume();
fstream.resume();
rline.close();

@@ -695,3 +773,3 @@ return cb(err);

const lineObj = JSON.parse(line);
const val = DTable.decodeValue(lineObj.value);
const val = this._decodeFn(lineObj.value);
this._table.set(lineObj.key, val);

@@ -725,3 +803,2 @@ });

called = true;
this._queue.flush();
return cb();

@@ -734,3 +811,2 @@ });

rline.close();
this._queue.flush();
return cb(err);

@@ -741,7 +817,8 @@ });

lineObj.args = lineObj.args.map((val) => {
return DTable.decodeValue(val);
return this._decodeFn(val);
});
try {
this[lineObj.op].apply(this, lineObj.args);
lineObj.args.push(this._table);
ops[lineObj.op].apply(null, lineObj.args);
} catch (e) {

@@ -756,2 +833,24 @@ debug("failed to complete operation %s with args " + JSON.stringify(lineObj.args) + " from %s", lineObj.op, path);

*
* @method _CreateLoadStreams
* @memberof Clusterluck.DTable
* @instance
* @private
*
* @param {String} location
*
*/
_createLoadStreams(location) {
const fstream = fs.createReadStream(location);
let rstream;
if (this._compress) {
rstream = zlib.createGunzip();
fstream.pipe(rstream);
} else {
rstream = fstream;
}
return {fstream, rstream};
}
/**
*
* @method invalidTypeError

@@ -758,0 +857,0 @@ * @memberof Clusterluck.DTable

@@ -1,9 +0,8 @@

var _ = require("lodash"),
async = require("async"),
shortid = require("shortid"),
EventEmitter = require("events").EventEmitter,
stream = require("stream"),
util = require("util"),
utils = require("./utils"),
debug = require("debug")("clusterluck:lib:gen_server");
const _ = require("lodash"),
shortid = require("shortid"),
EventEmitter = require("events").EventEmitter,
stream = require("stream"),
util = require("util"),
utils = require("./utils"),
debug = require("debug")("clusterluck:lib:gen_server");

@@ -13,3 +12,3 @@ class GenServer extends EventEmitter {

*
* Generic server implementation. Basic unit of logic handling in clusterluck. Provides the ability to send/receive messages across a cluster using `call`, `cast`, `multicall`, and `abcast`. Largely derived from Erlang's gen_server model, but incorporated into node.js' EventEmitter model. Internally, message streams are memoized until finished, which then triggers the firing of an event for given event (say we're listening for event "hello", if this server receives a message stream with event "hello", we'll fire it under the hood).
* Generic server implementation. Basic unit of logic handling in clusterluck. Provides the ability to send/receive messages across a cluster using `call`, `cast`, `multicall`, and `abcast`. Largely derived from Erlang's gen_server model, but incorporated into node.js' EventEmitter model. Internally, message streams are memoized until finished, which then triggers the firing of an event for given event (say we're listening for event "hello", if this server receives a message stream with event "hello", we'll fire it under the hood). In addition to message streams, message singletons are supported, which are message streams with a singleton message and bypass any stream memoization.
*

@@ -30,2 +29,3 @@ * Think of it like event emitters that are targetable across a cluster, as well as internal to a node.

this._streamTimeout = (opts && opts.streamTimeout) || 30000;
this._paused = false;
}

@@ -53,3 +53,3 @@

}
var handler = this._parse.bind(this);
const handler = this._parse.bind(this);
this._kernel.on(this._id, handler);

@@ -130,2 +130,3 @@ this.once("pause", _.partial(this._kernel.removeListener, this._id, handler).bind(this._kernel));

*/
this._paused = true;
this.emit("pause");

@@ -151,3 +152,3 @@ return this;

resume() {
var handler = this._parse.bind(this);
const handler = this._parse.bind(this);
this._kernel.on(this._id, handler);

@@ -172,2 +173,3 @@ this.once("pause", _.partial(this._kernel.removeListener, this._id, handler).bind(this._kernel));

*/
this._paused = false;
this.emit("resume");

@@ -183,3 +185,2 @@ return this;

* @memberof Clusterluck.GenServer
* @private
* @abstract

@@ -194,3 +195,3 @@ * @instance

decodeJob(job) {
var out = utils.safeParse(job, (k, v) => {
const out = utils.safeParse(job, (k, v) => {
if (util.isObject(v) &&

@@ -212,2 +213,26 @@ v.type === "Buffer" &&

*
* Parses a message singleton into an object containing a key/value pair. If we fail to parse the job, we just return an error and this GenServer will skip emitting an event. Otherwise, triggers user-defined logic for the parsed event.
*
* @method decodeJob
* @memberof Clusterluck.GenServer
* @abstract
* @instance
*
* @param {Object} job - Memoized buffer that represents complete message stream.
*
* @return {Object} Object containing an event and data key/value pair, which are used to emit an event for user-defined logic.
*
*/
decodeSingleton(data) {
if (util.isObject(data) &&
util.isObject(data.data) &&
data.data.type === "Buffer" &&
Array.isArray(data.data.data)) {
data.data = Buffer.from(data.data);
}
return data;
}
/**
*
* Acts as a getter/setter for the ID of this instance.

@@ -298,3 +323,3 @@ *

* @param {Clusterluck.Node} from.node - Node representing the sender.
* @param {Stream|Buffer} data - Data to reply to synchronous message with.
* @param {Buffer|String|Number|Boolean|Object|Array} data - Data to reply to synchronous message with.
*

@@ -327,7 +352,7 @@ * @return {Clusterluck.GenServer} This instance.

call(id, event, data, cb, timeout=Infinity) {
var out = this._parseRecipient(id);
return this._kernel.call(out.node, out.id, JSON.stringify({
const out = this._parseRecipient(id);
return this._kernel.callSingleton(out.node, out.id, {
event: event,
data: data
}), cb, timeout);
}, cb, timeout);
}

@@ -354,6 +379,6 @@

multicall(nodes, id, event, data, cb, timeout=Infinity) {
return this._kernel.multicall(nodes, id, JSON.stringify({
return this._kernel.multicallSingleton(nodes, id, {
event: event,
data
}), cb, timeout);
}, cb, timeout);
}

@@ -377,7 +402,7 @@

cast(id, event, data) {
var out = this._parseRecipient(id);
this._kernel.cast(out.node, out.id, JSON.stringify({
const out = this._parseRecipient(id);
this._kernel.cast(out.node, out.id, {
event: event,
data: data
}));
});
return this;

@@ -403,6 +428,6 @@ }

abcast(nodes, id, event, data) {
this._kernel.abcast(nodes, id, JSON.stringify({
this._kernel.abcast(nodes, id, {
event: event,
data: data
}));
});
return this;

@@ -423,4 +448,13 @@ }

_parse(data, stream, from) {
if (this._paused === true) return this;
if (stream.done === true && data !== null && !stream.stream) {
data = this.decodeSingleton(data);
if (!(data instanceof Error)) this.emit(data.event, data.data, from);
return this;
} else if (stream.done === true && stream.error !== undefined && !stream.stream) {
return this;
}
if (!this._streams.has(stream.stream)) {
var t = this._registerTimeout(stream, from);
const t = this._registerTimeout(stream, from);
this._streams.set(stream.stream, {

@@ -431,3 +465,3 @@ data: Buffer.from(""),

}
var inner = this._streams.get(stream.stream);
const inner = this._streams.get(stream.stream);
if (data) {

@@ -440,3 +474,3 @@ inner.data = Buffer.concat([inner.data, data], inner.data.length + data.length);

if (!stream.error) {
var job = this.decodeJob(inner.data);
const job = this.decodeJob(inner.data);
// maybe reply to msg stream with error, but stream error occurs sender-side,

@@ -476,3 +510,3 @@ // so they probably know if an error occurred (or at least should)

_parseRecipient(id) {
var node;
let node;
if (typeof id === "string") {

@@ -500,6 +534,6 @@ node = this._kernel.self();

// just remove stream for now, it'll invalidate any gathered JSON and fail `decodeJob`
var t = setTimeout(() => {
const t = setTimeout(() => {
if (!this._streams.has(istream.stream)) return;
var rstream = new stream.PassThrough();
var out = this._safeReply(from, rstream);
const rstream = new stream.PassThrough();
const out = this._safeReply(from, rstream);
if (out) rstream.emit("error", new Error("Timeout."));

@@ -533,3 +567,3 @@ this._streams.delete(istream.stream);

*
* Suppose we have a GenServer named `server` started with name `name_goes_here`. Any message routed to our node, both internal and external, with id set to `name_goes_here` will be gathered by `serve` into a map of streams. Each stream has a stream ID `stream` and a boolean flag `done` which indicates whether the stream has finished sending data. This stream ID is used as an index for stream memoization between concurrent requests.
* Suppose we have a GenServer named `server` started with name `name_goes_here`. Any message stream routed to our node, both internal and external, with id set to `name_goes_here` will be gathered by `serve` into a map of streams. Each stream has a stream ID `stream` and a boolean flag `done` which indicates whether the stream has finished sending data. This stream ID is used as an index for stream memoization between concurrent requests.
*

@@ -557,3 +591,3 @@ * ```javascript

*
* Diagram of data flow:
* Diagram of data flow for message streams:
* ```

@@ -564,2 +598,9 @@ * Messages comes into node with id set to `name_goes_here` ---------------> this._parse

*
* As for singleton message streams (a message stream with only one message, and `done` set to true), no memoization of streams into a stream map occurs. The data is parsed by `decodeSingleton` into an event and message, under `event` and `data` respectively. From here, the server emits an event just like an EventEmitter with `event` and `data`.
*
* Diagram of data flow for message singletons:
* ```
* Messages comes into node with id set to `name_goes_here` ---------------> this._parse ---> this.decodeSingleton ---> this.emit(event, data,from) ---> this.on(event, handlerLogic...)
* ```
*
* @event Clusterluck.GenServer#GenServer:user_defined

@@ -566,0 +607,0 @@ * @memberof Clusterluck.GenServer

@@ -1,14 +0,13 @@

var _ = require("lodash"),
async = require("async"),
uuid = require("uuid"),
EventEmitter = require("events").EventEmitter,
microtime = require("microtime"),
fs = require("fs"),
util = require("util"),
debug = require("debug")("clusterluck:lib:gossip");
const _ = require("lodash"),
async = require("async"),
uuid = require("uuid"),
microtime = require("microtime"),
fs = require("fs"),
util = require("util"),
debug = require("debug")("clusterluck:lib:gossip");
var GenServer = require("./gen_server"),
CHash = require("./chash"),
VectorClock = require("./vclock"),
utils = require("./utils");
const GenServer = require("./gen_server"),
CHash = require("./chash"),
VectorClock = require("./vclock"),
utils = require("./utils");

@@ -30,2 +29,3 @@ class GossipRing extends GenServer {

* @param {Object} opts.vclockOpts - Options for periodic trimming of internal vector clock.
* @param {Object} opts.connOpts - Options for whe connecting to new nodes.
*

@@ -41,2 +41,3 @@ */

this._vclockOpts = opts.vclockOpts;
this._connOpts = opts.connOpts;
this._interval = null;

@@ -83,7 +84,7 @@ this._flush = null;

*/
var events = [
const events = [
{event: "ring", method: "_updateRing"},
];
events.forEach((event) => {
var handle = this[event.method].bind(this);
const handle = this[event.method].bind(this);
this.on(event.event, handle);

@@ -176,5 +177,23 @@ this.once("stop", _.partial(this.removeListener, event.event, handle).bind(this));

decodeJob(buf) {
var out = super.decodeJob(buf);
const out = super.decodeJob(buf);
if (out instanceof Error) return out;
var val = out.data;
return this.decodeSingleton(out);
}
/**
*
* Parses a singleton message into an object containing a key/value pair. If we fail to parse the job (invalid job format for event, etc), we just return an error and this GenServer will skip emitting an event. Otherwise, triggers user-defined logic for the parsed event.
*
* @method decodeSingleton
* @memberof Clusterluck.GossipRing
* @private
* @instance
*
* @param {Object} out - Job that represents a singleton message.
*
* @return {Object} Object containing an event and data key/value pair, which are used to emit an event for user-defined logic.
*
*/
decodeSingleton(out) {
const val = out.data;
out.data = {

@@ -322,3 +341,3 @@ type: val.type,

this._ringID = ringID;
var handler = this._parse.bind(this);
const handler = this._parse.bind(this);
this._kernel.on(this._ringID, handler);

@@ -347,4 +366,4 @@ this.once("pause", _.partial(this._kernel.removeListener, this._ringID, handler).bind(this._kernel));

debug("Meeting node in cluster:", node.id());
this._kernel.connect(node);
var msg = this._ring.toJSON(true);
this._kernel.connect(node, null, this._connOpts);
const msg = this._ring.toJSON(true);
// we don't reflect new message in this vector clock; if we did, it'd possibly negate

@@ -388,2 +407,3 @@ // any future rounds of gossip from the node we're meeting (since joins don't introduce

* @param {Clusterluck.Node} node - Node to insert into this gossip ring's cluster.
* @param {Number} weight - Number of virtual nodes to assign to `node`.
* @param {Boolean} [force] - Whether to forcibly add `node` into the current state of this ring, or wait for an idle state. Defaults to false.

@@ -394,11 +414,11 @@ *

*/
insert(node, force = false) {
insert(node, weight, force = false) {
if (this._ring.isDefined(node)) return this;
debug("Inserting node into cluster:", node.id());
if (this.idle() || force === true) {
var oldRing = (new CHash(this._ring.rfactor(), this._ring.pfactor(), this._ring.tree()));
this._ring.insert(node);
const oldRing = (new CHash(this._ring.rfactor(), this._ring.pfactor(), this._ring.tree()));
this._ring.insert(node, weight);
this._actor = uuid.v4();
this._vclock.increment(this._actor);
this._kernel.connect(node);
this._kernel.connect(node, null, this._connOpts);
this.sendRing(GossipRing.maxMsgRound(this._ring));

@@ -420,3 +440,3 @@

}
this.once("idle", _.partial(this.insert, node).bind(this));
this.once("idle", _.partial(this.insert, node, weight).bind(this));
return this;

@@ -437,2 +457,3 @@ }

* @param {Array} nodes - Nodes to insert into this gossip ring's cluster.
* @param {Number} weight - Number of virtual nodes to assign to each node in `nodes`.
* @param {Boolean} [force] - Whether to forcibly add `nodes` into the current state of this ring, or wait for an idle state. Defaults to false.

@@ -443,8 +464,8 @@ *

*/
minsert(nodes, force = false) {
minsert(nodes, weight, force = false) {
if (_.every(nodes, this._ring.isDefined.bind(this._ring))) return this;
debug("Inserting multiple nodes into cluster:", _.map(nodes, "_id"));
if (this.idle() || force === true) {
var oldRing = (new CHash(this._ring.rfactor(), this._ring.pfactor(), this._ring.tree()));
nodes.forEach(_.ary(this._ring.insert.bind(this._ring), 1));
const oldRing = (new CHash(this._ring.rfactor(), this._ring.pfactor(), this._ring.tree()));
nodes.forEach(_.ary(_.partial(this._ring.insert, _, weight).bind(this._ring), 1));
this._actor = uuid.v4();

@@ -457,3 +478,3 @@ this._vclock.increment(this._actor);

}
this.once("idle", _.partial(this.minsert, nodes, force).bind(this));
this.once("idle", _.partial(this.minsert, nodes, weight, force).bind(this));
return this;

@@ -464,2 +485,40 @@ }

*
* Updates `node` in this node's cluster with weight `weight`, asynchronously propagating this new state through the cluster. Provides the option to forcibly update `node`, meaning it will precede any existing internal message streams pending completion.
*
* @method update
* @memberof Clusterluck.GossipRing
* @instance
*
* @fires Clusterluck.GossipRing#GossipRing:process
* @listens Clusterluck.GenServer#GenServer:idle
*
* @param {Clusterluck.Node} node - Node to update in this gossip ring's cluster.
* @param {Number} weight - Number of virtual nodes to assign `node` in the hash ring.
* @param {Boolean} [force] - Whether to forcibly update `node` into the current state of this ring, or wait for an idle state. Defaults to false.
*
* @return {Clusterluck.GossipRing} This instance.
*
*/
update(node, weight, force = false) {
debug("Inserting node into cluster:", node.id());
if (this.idle() || force === true) {
const oldRing = (new CHash(this._ring.rfactor(), this._ring.pfactor(), this._ring.tree()));
if (this._ring.isDefined(node)) {
this._ring.update(node, weight);
} else {
this._ring.insert(node, weight);
}
this._actor = uuid.v4();
this._vclock.increment(this._actor);
this._kernel.connect(node, null, this._connOpts);
this.sendRing(GossipRing.maxMsgRound(this._ring));
this.emit("process", oldRing, this._ring);
return this;
}
this.once("idle", _.partial(this.update, node, weight).bind(this));
return this;
}
/**
*
* Leaves the current cluster, removing all nodes from it's hash ring and creating a new vector clock for this new ring. Triggers the removal of this ring's message handler on the network kernel. Provides the option to forcibly leave the cluster, meaning this function won't wait for an idle state to execute and execute immediately. Otherwise, will wait for message streams to complete.

@@ -483,3 +542,3 @@ *

this.pause();
if (this._ring.size() <= this._ring.rfactor()) {
if (this._ring.numberNodes() <= 1) {
this.emit("leave", this._ring);

@@ -521,7 +580,7 @@ this._streams.clear();

if (this.idle() || force === true) {
var oldRing = (new CHash(this._ring.rfactor(), this._ring.pfactor(), this._ring.tree()));
const oldRing = (new CHash(this._ring.rfactor(), this._ring.pfactor(), this._ring.tree()));
this._ring.remove(node);
this._actor = uuid.v4();
this._vclock.increment(this._actor);
this._kernel.disconnect(node);
this._kernel.disconnect(node, null, this._connOpts);
this.sendRing(GossipRing.maxMsgRound(this._ring));

@@ -556,3 +615,3 @@ this.emit("process", oldRing, this._ring);

if (this.idle() || force === true) {
var oldRing = (new CHash(this._ring.rfactor(), this._ring.pfactor(), this._ring.tree()));
const oldRing = (new CHash(this._ring.rfactor(), this._ring.pfactor(), this._ring.tree()));
nodes.forEach(_.ary(this._ring.remove.bind(this._ring), 1));

@@ -572,3 +631,3 @@ this._actor = uuid.v4();

*
* Finds the bucket of this instance's hash ring that `data` routes to.
* Finds the bucket of this instance's hash ring that `data` routes to, based on the nodes the corresponding `node` replicates to. This differs from `range` in that `range` provides replication on a per-key basis, whereas with `find` any data routed to `node` has the same replicants.
*

@@ -579,10 +638,12 @@ * @method find

*
* @param {Buffer} data - Data to find hash bucket of.
* @param {Buffer|String} data - Data to find hash bucket of.
* @param {Number} [size] - Number of elements to return in bucket.
*
*
* @return {Array} Array of nodes responsible for the bucket `data` hashes to.
*
*/
find(data) {
var node = this._ring.find(data);
return [node].concat(this._ring.next(node));
find(data, size) {
const node = this._ring.find(data);
return [node].concat(this._ring.next(node, size-1));
}

@@ -592,2 +653,20 @@

*
* Finds the bucket of this instance's hash ring that `data` routes to, based on a per-key replication strategy. This differs from `find` in that `find` provides per-node replication, where as with `range` every key has it's own set of nodes for replication.
*
* @method range
* @memberof Clusterluck.GossipRing
* @instance
*
* @param {Buffer|String} data - Data to find hash bucket of.
* @param {Number} [size] - Number of elements to return in bucket.
*
* @return {Array} Array of nodes responsible for the bucket `data` hashes to.
*
*/
range(data, size) {
return this._ring.rangeNext(data, size);
}
/**
*
* Function that executes on this cluster's internal interval. Trims this instance's vector clock and sends the ring to random recipients in the cluster.

@@ -604,3 +683,3 @@ *

if (this._actor === null) return this;
var time = microtime.now();
const time = microtime.now();
this._vclock.trim(time, this._vclockOpts);

@@ -636,3 +715,3 @@ this.sendRing(1);

debug("Flushing gossip ring to disk");
fs.writeFile(this._flushPath, JSON.stringify({
fs.writeFile(this._flushPath + ".tmp", JSON.stringify({
ring: this._ringID,

@@ -644,2 +723,5 @@ actor: this._actor,

if (err) debug("Error writing ring to disk:", err);
else {
fs.rename(this._flushPath + ".tmp", this._flushPath, _.noop);
}
});

@@ -663,3 +745,3 @@ return this;

sendRing(n = 1) {
if (this._ring.size() <= this._ring.rfactor() || !this._ringID) return this;
if (this._ring.numberNodes() <= 1 || !this._ringID) return this;
return this.send("ring", this._ring.toJSON(true), this._actor, this._vclock, n);

@@ -771,5 +853,5 @@ }

_updateRing(data) {
var oldRing = (new CHash(this._ring.rfactor(), this._ring.pfactor(), this._ring.tree()));
var nodes = this._mergeRings(data);
var nRound = this._updateRound(data);
const oldRing = (new CHash(this._ring.rfactor(), this._ring.pfactor(), this._ring.tree()));
const nodes = this._mergeRings(data);
const nRound = this._updateRound(data);
this._makeConnects(nodes[0]);

@@ -795,3 +877,3 @@ // make this true so we don't wait for a ring update that involves node departures on every node

_mergeRings(data) {
var nodes = [[], []];
let nodes = [[], []];
// if type is "join", just merge the two rings and increment actor, then return

@@ -837,3 +919,3 @@ if (data.type === "join") {

_imposeRing(data) {
var nodes = GossipRing._ringDiff(data.data, this._ring);
const nodes = GossipRing._ringDiff(data.data, this._ring);
this._vclock = data.vclock;

@@ -866,3 +948,3 @@ this._ring = data.data;

this.emit("conflict", data.data, data.vclock);
var oldRing = this._ring;
const oldRing = this._ring;
// use LWW to handle conflict automatically, but this can be abstracted

@@ -873,4 +955,3 @@ // for different conflict handlers

this._vclock.merge(data.vclock);
var nodes = GossipRing._ringDiff(this._ring, oldRing);
return nodes;
return GossipRing._ringDiff(this._ring, oldRing);
}

@@ -932,7 +1013,7 @@

_closeRing() {
var nodes = this.selectRandom(2);
var actor = uuid.v4();
var sendClock = this._leaveClock(actor);
var sendRing = this._leaveRing();
var msg = sendRing.toJSON(true);
const nodes = this.selectRandom(2);
const actor = uuid.v4();
const sendClock = this._leaveClock(actor);
const sendRing = this._leaveRing();
const msg = sendRing.toJSON(true);
this.abcast(nodes, this._ringID, "ring", {

@@ -980,3 +1061,3 @@ type: "leave",

_leaveClock(actor) {
var sendClock = this._vclock.increment(actor);
const sendClock = this._vclock.increment(actor);
this._actor = uuid.v4();

@@ -996,3 +1077,3 @@ this._vclock = new VectorClock(this._actor, 1);

_leaveRing() {
var sendRing = this._ring.remove(this._kernel.self());
const sendRing = this._ring.remove(this._kernel.self());
this._ring = new CHash(this._ring.rfactor(), this._ring.pfactor());

@@ -1036,8 +1117,8 @@ this._ring.insert(this._kernel.self());

static _ringDiff(ringa, ringb) {
var aNodes = ringa.nodes();
var bNodes = ringb.nodes();
var add = _.differenceWith(aNodes, bNodes, (a, b) => {
const aNodes = ringa.nodes();
const bNodes = ringb.nodes();
const add = _.differenceWith(aNodes, bNodes, (a, b) => {
return a.equals(b);
});
var rem = _.differenceWith(bNodes, aNodes, (a, b) => {
const rem = _.differenceWith(bNodes, aNodes, (a, b) => {
return a.equals(b);

@@ -1058,4 +1139,4 @@ });

if (ring.size() === 0) return 0;
if (ring.size() === ring.rfactor()) return 1;
return Math.ceil(Math.log2(ring.size()/ring.rfactor()));
if (ring.numberNodes() === 1) return 1;
return Math.ceil(Math.log2(ring.numberNodes()));
}

@@ -1074,6 +1155,6 @@

static LWW(state1, clock1, state2, clock2) {
var maxIn = clock1.nodes().reduce((memo, val) => {
const maxIn = clock1.nodes().reduce((memo, val) => {
return Math.max(memo, clock1.getInsert(val));
}, 0);
var maxData = clock2.nodes().reduce((memo, val) => {
const maxData = clock2.nodes().reduce((memo, val) => {
return Math.max(memo, clock2.getInsert(val));

@@ -1080,0 +1161,0 @@ }, maxIn);

@@ -1,14 +0,21 @@

var _ = require("lodash"),
async = require("async"),
uuid = require("uuid"),
EventEmitter = require("events").EventEmitter,
stream = require("stream"),
crypto = require("crypto"),
util = require("util"),
debug = require("debug")("clusterluck:lib:kernel");
const _ = require("lodash"),
async = require("async"),
uuid = require("uuid"),
EventEmitter = require("events").EventEmitter,
stream = require("stream"),
crypto = require("crypto"),
util = require("util"),
debug = require("debug")("clusterluck:lib:kernel");
var Node = require("./node"),
Connection = require("./conn"),
utils = require("./utils");
const Node = require("./node"),
consts = require("./consts"),
Connection = require("./conn"),
utils = require("./utils");
const RAND_BYTE_LEN = 24;
const BYTE_ENCODING = "hex";
const HEX_RADIX = 16;
const connDefaults = consts.connDefaults;
class NetKernel extends EventEmitter {

@@ -67,2 +74,5 @@

this._ipc.server.start();
this._baseToken = crypto.randomBytes(RAND_BYTE_LEN).toString(BYTE_ENCODING);
this._interval = setInterval(this._resetToken.bind(this), opts.tokenGenInterval || 60*60*1000);
this._index = 0;
return this;

@@ -88,2 +98,4 @@ }

this._srcs = new Map();
clearInterval(this._interval);
this._index = 0;

@@ -280,3 +292,4 @@ /**

*/
connect(node, cb) {
connect(node, cb, opts=connDefaults) {
opts = _.defaults(opts, connDefaults);
if (node.id() === this._id || this._sinks.has(node.id())) {

@@ -287,3 +300,3 @@ if (typeof cb === "function") return cb();

debug("Connecting to IPC server on node " + node.id());
var conn = new Connection(this._ipc, node);
const conn = new Connection(this._ipc, node, opts);
conn.start();

@@ -356,2 +369,38 @@ this._sinks.set(node.id(), conn);

*
* Makes a synchronous ping to external node `node`. To accomplish this, a tag is passed that uniquely identifies the returnee. This netkernel then listens for a message with the event ID'd as this tag and passes data into the callback.
*
* @method ping
* @memberof Clusterluck.NetKernel
* @instance
*
* @param {Clusterluck.Node} node - Node to send `data` to.
* @param {Function} cb - Callback to receive the ping response to `node` from.
*
*/
ping(node, cb, timeout=Infinity) {
if (node.id() === this._id) {
return cb(null, "pong");
} else if (!this._sinks.has(node.id())) {
return cb(new Error("Unknown node " + node.id()));
}
const id = this._generateToken();
const t = this._setupSingletonReply(node, id, timeout, cb);
const conn = this.connection(node);
let data = {
tag: id,
from: this._self.toJSON(true),
stream: {done: true}
};
data = NetKernel._encodeMsg(this._cookie, data);
const out = conn.send("ping", data);
if (out instanceof Error) {
this._teardownSingletonReply(id, t);
return cb(out);
}
}
/**
*
* Makes a synchronous call to external node `node`, streaming `data` over and then waits for a complete response. To accomplish this, a tag is passed that uniquely identifies the returnee. This netkernel then listens for messages with the event ID'd as this tag and passes data into the return stream, closing the stream when a `done` message is passed for this tag. If `cb` is passed, the return stream is collected into a single Buffer and then returned. Otherwise, the return stream is given to the caller to do manual data collection and error handling.

@@ -365,3 +414,3 @@ *

* @param {String} event - Event this message is sent under.
* @param {Stream|Buffer|String} data - Data to send with this message. `data` is coerced into a stream format, throwing on failed coersion.
* @param {Stream|Buffer|String|Number|Boolean|Object|Array} data - Data to send with this message. `data` is coerced into a stream format, throwing on failed coersion.
* @param {Function} [cb] - Optional callback to collect stream data and handle error reporting. Useful for smaller payloads with minimal memory footprints. Has two parameters: the first is error for when an error occurs at any point in the request, and the second is a returned Buffer on successful completion.

@@ -391,14 +440,16 @@ *

call(node, event, data, cb, timeout=Infinity) {
var id = uuid.v4();
data = NetKernel._coerceStream(data);
const id = this._generateToken();
const rstream = this._rstream(node, id, timeout);
if (node.id() === this._id) {
this._streamLocal(event, id, data);
async.nextTick(_.partial(this._streamLocal, event, id, data).bind(this));
} else if (!this._sinks.has(node.id())) {
async.nextTick(_.partial(rstream.emit, "error", new Error("Unknown node " + node.id())).bind(rstream));
} else {
const err = this._streamData(node, event, id, data);
if (err !== null) async.nextTick(_.partial(rstream.emit, "error", err).bind(rstream));
}
else if (!this._sinks.has(node.id())) return null;
else {
this._streamData(node, event, id, data);
}
var rstream = this._rstream(node, id, timeout);
if (typeof cb === "function") {
return NetKernel._collectStream(rstream, cb);
NetKernel._collectStream(rstream, cb);
}

@@ -410,2 +461,41 @@ return rstream;

*
* Makes a synchronous call to external node `node`, sending `data` over and then waits for a complete response. To accomplish this, a tag is passed that uniquely identifies the returnee. This netkernel then listens for a single message with the event ID'd as this tag and passes data into the return callback.
*
* @method callSingleton
* @memberof Clusterluck.NetKernel
* @instance
*
* @param {Clusterluck.Node} node - Node to send `data` to.
* @param {String} event - Event this message is sent under.
* @param {Buffer|Stream|String|Number|Object|Array|Boolean} data - Data to send with this message.
* @param {Function} cb - Callback to receive the response from `node` to our message.
*
* @example
* // with callback
* kernel.callSingleton(node, "job", "hello", (err, data) => {
* // data can be any JSON or Buffer
* // ...
* }, 5000);
*
*/
callSingleton(node, event, data, cb, timeout=Infinity) {
const id = this._generateToken();
const t = this._setupSingletonReply(node, id, timeout, cb);
if (node.id() === this._id) {
this._streamLocal(event, id, data);
} else if (!this._sinks.has(node.id())) {
this._teardownSingletonReply(id, t);
return cb(new Error("Unknown node " + node.id()));
} else {
const err = this._streamData(node, event, id, data);
if (err !== null) {
this._teardownSingletonReply(id, t);
return cb(err);
}
}
}
/**
*
* Makes a synchronous call to a list of external nodes `nodes`, streaming `data` over and then waits for a complete response from all nodes (using `this.call(...)` on each node in `nodes`). If `cb` is passed, each return stream is collected into a single Buffer and then returned. Otherwise, the list of return streams is given to the caller to do manual data collection and error handling. If `cb` is passed and any node in the list incurs an error at any point, it's called with this error and any further processing on the other nodes is ignored.

@@ -419,3 +509,3 @@ *

* @param {String} event - Event this message is sent under.
* @param {Stream|Buffer|String} data - Data to send with each message. `data` is coerced into a stream format, throwing on failed coersion.
* @param {Stream|Buffer|String|Number|Boolean|Object|Array} data - Data to send with each message. `data` is coerced into a stream format, throwing on failed coersion.
* @param {Function} [cb] - Optional callback to wait for collect stream data and handle error reporting on each node in `nodes`. Useful for smaller payloads with minimal memory footprints. Has two parameters: the first is an error for when an error occurs at any point in the request for any node, and the second is an array of returned Buffers on successful completion.

@@ -447,5 +537,6 @@ *

multicall(nodes, event, data, cb, timeout=Infinity) {
data = NetKernel._coerceStream(data);
if (typeof cb !== "function") {
return nodes.map(_.partialRight(this.call, event, data, _, timeout).bind(this));
return nodes.map((node) => {
return this.call(node, event, data, null, timeout);
});
}

@@ -457,4 +548,28 @@ return async.map(nodes, _.partialRight(this.call, event, data, _, timeout).bind(this), cb);

*
* Replies to a synchronous message received on this netkernel.
* Makes a synchronous call to a list of external nodes `nodes`, sending `data` over and then waits for a complete response from all nodes (using `this.call(...)` on each node in `nodes`).
*
* @method multicallSingleton
* @memberof Clusterluck.NetKernel
* @instance
*
* @param {Array} nodes - Nodes to send `data` to.
* @param {String} event - Event this message is sent under.
* @param {Buffer|String|Number|Object|Array|Boolean} data - Data to send with each message.
* @param {Function} cb - Callback to wait for message data and handle error reporting on each node in `nodes`. Has two parameters: the first is an error for when an error occurs at any point in the request for any node, and the second is an array of returned messages on successful completion.
*
* @example
* kernel.multicallSingleton([node1, node2], "job", "hello", (err, data) => {
* // data is an array of JSON and/or Buffers on success
* // ...
* });
*
*/
multicallSingleton(nodes, event, data, cb, timeout=Infinity) {
return async.map(nodes, _.partialRight(this.callSingleton, event, data, _, timeout).bind(this), cb);
}
/**
*
* Replies to a synchronous message received on this netkernel. If responding to a request made with `callSingleton`, a stream should not be passed in, as the reqquester is only listening for a single message (whereas a stream has at least two messages).
*
* @method reply

@@ -467,3 +582,3 @@ * @memberof Clusterluck.NetKernel

* @param {Clusterluck.Node} from.node - Node representing the sender.
* @param {Stream|Buffer|String} data - Data to send with this message. `data` is coerced into a stream format, throwing on failed coersion.
* @param {Stream|Buffer|String|Number|Boolean|Object|Array} data - Data to send with this message. `data` is coerced into a stream format, throwing on failed coersion.
*

@@ -474,9 +589,7 @@ * @return {Clusterluck.NetKernel} This instance.

reply(from, data) {
var err;
if (typeof from.tag !== "string") {
err = new Error("Cannot reply to message with no callback tag.");
const err = new Error("Cannot reply to message with no callback tag.");
throw err;
}
this.cast(from.node, from.tag, data);
return this;
return this.cast(from.node, from.tag, data);
}

@@ -494,3 +607,3 @@

* @param {String} event - Event this message is sent under.
* @param {Stream|Buffer|String} data - Data to send with this message. `data` is coerced into a stream format, throwing on failed coersion.
* @param {Stream|Buffer|String|Number|Boolean|Object|Array} data - Data to send with this message. `data` is coerced into a stream format, throwing on failed coersion.
*

@@ -501,3 +614,2 @@ * @return {Clusterluck.NetKernel} This instance.

cast(node, event, data) {
data = NetKernel._coerceStream(data);
if (node.id() === this._id) {

@@ -521,3 +633,3 @@ return this._streamLocal(event, null, data);

* @param {String} event - Event this message is sent under.
* @param {Stream|Buffer|String} data - Data to send with this message. `data` is coerced into a stream format, throwing on failed coersion.
* @param {Stream|Buffer|String|Number|Boolean|Object|Array} data - Data to send with this message. `data` is coerced into a stream format, throwing on failed coersion.
*

@@ -528,4 +640,2 @@ * @return {Clusterluck.NetKernel} This instance.

abcast(nodes, event, data) {
// to avoid creating N duplicates of data
data = NetKernel._coerceStream(data);
nodes.forEach(_.partial(this.cast, _, event, data).bind(this));

@@ -553,3 +663,3 @@ return this;

this._addSocket(socket);
var nData = NetKernel._decodeBuffer(data.data);
const nData = NetKernel._decodeBuffer(data.data);
debug("Received message on net kernel with stream:", data.stream.stream + ",", "event:", data.id + ",", "from:", data.from.id);

@@ -580,3 +690,13 @@

this._ipc.server.on("socket.disconnected", this._disconnectSocket.bind(this));
this._ipc.server.on("ping", (data, socket) => {
data = NetKernel._decodeMsg(this._cookie, data);
if (data instanceof Error) {
return this._skipMsg(data);
}
const conn = this.connection(Node.from(data.from));
if (conn === null) return;
this._sendData(null, conn, data.tag, null, {done: true}, "pong");
});
/**

@@ -608,6 +728,12 @@ *

_streamLocal(event, tag, data) {
var streamID = uuid.v4();
var rstream = {stream: streamID, done: false};
const rstream = {done: false};
if (!(data instanceof stream.Stream)) {
rstream.done = true;
this._sendLocal(event, tag, rstream, data);
return this;
}
const streamID = crypto.randomBytes(RAND_BYTE_LEN).toString(BYTE_ENCODING);
rstream.stream = streamID;
debug("Streaming data locally,", "stream: " + streamID + ",", "event: " + event + ",", "tag: " + tag);
var dataListener = _.partial(this._sendLocal, event, tag, rstream).bind(this);
const dataListener = _.partial(this._sendLocal, event, tag, _.clone(rstream)).bind(this);
data.on("data", dataListener);

@@ -643,3 +769,3 @@ data.once("end", _.partial(this._finishLocal, event, tag, rstream).bind(this));

_sendLocal(event, tag, stream, data) {
this.emit(event, data, _.clone(stream), {
this.emit(event, data, stream, {
tag: tag,

@@ -714,7 +840,14 @@ node: this._self

_streamData(node, event, tag, data) {
var streamID = uuid.v4();
var rstream = {stream: streamID, done: false};
var conn = this.connection(node);
const rstream = {done: false};
const conn = this.connection(node);
if (!(data instanceof stream.Stream)) {
rstream.done = true;
return this._sendData(null, conn, event, tag, rstream, data);
}
const streamID = crypto.randomBytes(RAND_BYTE_LEN).toString(BYTE_ENCODING);
debug("Streaming data to " + node.id() + ",", "stream: " + streamID + ",", "event: " + event + ",", "tag: " + tag);
var dataListener = _.partial(this._sendData, data, conn, event, tag, rstream).bind(this);
rstream.stream = streamID;
// mark conn to not be idle
conn.initiateStream(rstream);
const dataListener = _.partial(this._sendData, data, conn, event, tag, _.clone(rstream)).bind(this);
data.on("data", dataListener);

@@ -727,5 +860,3 @@ data.once("end", _.partial(this._finishData, data, conn, event, tag, rstream).bind(this));

});
// mark conn to not be idle
conn.initiateStream(rstream);
return this;
return null;
}

@@ -757,8 +888,11 @@

from: this._self.toJSON(true),
stream: _.clone(stream),
stream: stream,
data: data
});
var out = conn.send("message", data);
if (out instanceof Error) source.emit("error", out);
return this;
const out = conn.send("message", data);
if (out instanceof Error) {
if (source) source.emit("error", out);
else return out;
}
return null;
}

@@ -866,6 +1000,6 @@

_rstream(node, id, timeout=Infinity) {
var pstream = new stream.PassThrough();
const pstream = new stream.PassThrough({objectMode: true});
this.on(id, _.partial(this._rcvData, node, id, pstream).bind(this));
if (timeout === Infinity || typeof timeout !== "number") return pstream;
var t = setTimeout(() => {
const t = setTimeout(() => {
this.removeAllListeners(id);

@@ -908,2 +1042,3 @@ pstream.emit("error", new Error("Timeout."));

}
if (data) pstream.write(data);
if (stream.done) {

@@ -913,3 +1048,2 @@ this.removeAllListeners(id);

}
else pstream.write(data);
}

@@ -919,2 +1053,90 @@

*
* @method _setupSingletonReply
* @memberof Clusterluck.NetKernel
* @instance
* @private
*
* @param {Clusterluck.Node} node
* @param {String} id
* @param {Number} timeout
* @param {Function} cb
*
* @return {Timeout}
*
*/
_setupSingletonReply(node, id, timeout, cb) {
let t = null;
if (timeout !== Infinity) {
t = setTimeout(() => {
this.removeAllListeners(id);
return cb(new Error("Timeout."));
}, timeout);
}
this.once(id, (data, stream, from) => {
this._teardownSingletonReply(id, t);
if (from.node.id() !== node.id()) {
const err = _.extend(new Error("Synchronous response node mismatch."), {
type: "INVALID_REPLY",
actual: from.node.id(),
expected: node.id()
});
return cb(err);
}
if (stream.error) {
return cb(stream.error);
}
return cb(null, data);
});
return t;
}
/**
*
* @method _teardownSingletonReply
* @memberof Clusterluck.NetKernel
* @instance
* @private
*
* @param {String} id
* @param {Timeout} t
*
*/
_teardownSingletonReply(id, t) {
clearTimeout(t);
this.removeAllListeners(id);
return this;
}
/**
*
* @method _generateToken
* @memberof Clusterluck.NetKernel
* @instance
* @private
*
*/
_generateToken() {
const token = this._baseToken + this._index.toString(HEX_RADIX);
this._index += 1;
return token;
}
/**
*
* @method _resetToken
* @memberof Clusterluck.NetKernel
* @instance
* @private
*
*/
_resetToken() {
this._baseToken = crypto.randomBytes(RAND_BYTE_LEN).toString(BYTE_ENCODING);
this._index = 0;
}
/**
*
* @method _skipMsg

@@ -963,5 +1185,4 @@ * @memberof Clusterluck.NetKernel

return data;
}
else if (data instanceof Buffer || typeof data === "string") {
var nstream = new stream.PassThrough();
} else if (data instanceof Buffer || typeof data === "string") {
const nstream = new stream.PassThrough();
async.nextTick(() => {

@@ -972,5 +1193,4 @@ nstream.write(typeof data === "string" ? Buffer.from(data) : data);

return nstream;
}
else {
var err = new Error("Input data could not be coerced into a stream.");
} else {
const err = new Error("Input data could not be coerced into a stream.");
throw err;

@@ -992,7 +1212,7 @@ }

static _collectStream(rstream, cb, limit = 10000000) {
var acc = Buffer.from("");
var called = false;
var dataHandler = (data) => {
acc = Buffer.concat([acc, data], acc.length + data.length);
if (acc.length > limit) rstream.emit("error", new Error("Buffer limit exceeded."));
let acc;
let called = false;
const dataHandler = (data) => {
acc = acc ? Buffer.concat([acc, data], acc.length + data.length) : data;
if (util.isBuffer(acc) && acc.length > limit) rstream.emit("error", new Error("Buffer limit exceeded."));
};

@@ -1028,2 +1248,3 @@ rstream.on("data", dataHandler);

if (buf === null) return null;
else if (!util.isBuffer(buf)) return buf;
return buf.toJSON();

@@ -1045,5 +1266,7 @@ }

static _decodeBuffer(data) {
if (data === null) return null;
// buffer.toJSON returns {type: "Buffer", data: [array of bytes...]}
return Buffer.from(data.data);
if (util.isObject(data) && data.type === "Buffer" && Array.isArray(data.data)) {
return Buffer.from(data);
} else {
return data;
}
}

@@ -1083,3 +1306,3 @@

if (key === null) return data;
var checkSum = NetKernel._hmacData(key, JSON.stringify(data));
const checkSum = NetKernel._hmacData(key, JSON.stringify(data));
return _.defaults({checkSum: checkSum}, data);

@@ -1103,5 +1326,5 @@ }

if (key === null) return data;
var checkSum = data.checkSum;
const checkSum = data.checkSum;
data = _.omit(data, ["checkSum"]);
var calculated = NetKernel._hmacData(key, JSON.stringify(data));
const calculated = NetKernel._hmacData(key, JSON.stringify(data));
if (checkSum !== calculated) {

@@ -1108,0 +1331,0 @@ return _.extend(new Error("Checksum failure."), {

{
"name": "clusterluck",
"version": "1.3.0",
"version": "2.0.0",
"description": "Distributed systems library for gossip protocols, consistent hash rings, and vector clocks.",

@@ -39,3 +39,2 @@ "main": "index.js",

"dependencies": {
"JSONStream": "^1.3.1",
"async": "^1.5.2",

@@ -45,2 +44,3 @@ "debug": "^2.2.0",

"lodash": "^4.17.2",
"lru-cache": "^4.1.1",
"microtime": "^2.1.2",

@@ -47,0 +47,0 @@ "node-ipc": "^8.9.3",

@@ -54,4 +54,8 @@ Clusterluck

- [`inspect`](#inspect)
- [`nodes`](#nodes)
- [`ping`](#ping)
- [`get`](#get)
- [`has`](#has)
- [`weight`](#weight)
- [`weights`](#weights)
- [`join`](#join)

@@ -62,2 +66,3 @@ - [`meet`](#meet)

- [`minsert`](#minsert)
- [`update`](#update)
- [`remove`](#remove)

@@ -219,3 +224,4 @@ - [`mremove`](#mremove)

As an implementation note, `GenServer`s should not be used as a replacement for EventEmitters when orchestrating state local to a node.
Generally speaking, serialization costs place an undue cost when we can just pass native JS objects around.
Generally speaking, there is little overhead in using this over a raw EventEmitter, but there are conditional branches and extra V8 constructions
that may be unneeded for your implementation.
Instead, making other `GenServer`s part of the constructor of other `GenServer`s is preferred (using OOP principles to enforce actor relations), similar to how the `CommandServer` class works.

@@ -241,15 +247,67 @@ In fact, both the `GossipRing` and `CommandServer` classes, built into every node in the cluster, are `GenServer`s themselves!

> inspect
{ ok: true,
data:
{ rfactor: 3,
pfactor: 2,
tree:
[ { key: 'avmox6bKHfmLdzmObwjwIrh2WC6XM471ods56FWbDo0=',
value: { id: 'foo', host: 'localhost', port: 7022 } },
{ key: 'kL2YfHLEuxHGaEz4nOxWYyPSiFlGBsFMzoYDXXxuXK0=',
value: { id: 'foo', host: 'localhost', port: 7022 } },
{ key: 'kzMt7C+SJZbxNQmrL3vhpfJ+a0RgPiGlRhrxwS57RWI=',
value: { id: 'foo', host: 'localhost', port: 7022 } } ] } }
{
"ok": true,
"data": {
"rfactor": 3,
"pfactor": 2,
"tree": [
{
"key": "avmox6bKHfmLdzmObwjwIrh2WC6XM471ods56FWbDo0=",
"value": {
"id": "foo",
"host": "localhost",
"port": 7022
}
},
{
"key": "kL2YfHLEuxHGaEz4nOxWYyPSiFlGBsFMzoYDXXxuXK0=",
"value": {
"id": "foo",
"host": "localhost",
"port": 7022
}
},
{
"key": "kzMt7C+SJZbxNQmrL3vhpfJ+a0RgPiGlRhrxwS57RWI=",
"value": {
"id": "foo",
"host": "localhost",
"port": 7022
}
}
]
}
}
```
##### nodes
In the CLI session, type `nodes`. This command will print the nodes of the cluster on the console. For example, if we've just started a new node with id `foo` at hostname `localhost` with port `7022`, we'd see the following output:
```
> nodes
{
"ok": true,
"data": [
{
"id": "foo",
"host": "localhost",
"port": 7021
}
]
}
```
##### ping
In the CLI session, type `ping`. This command will ping the node this session is targetting. For example, we'd see the following output:
```
> ping
{
"ok": true,
"data": "pong"
}
```
##### get

@@ -261,10 +319,19 @@

> get foo
{ ok: true,
data: { id: 'foo', host: 'localhost', port: 7022 } }
{
"ok": true,
"data": {
"id": "foo",
"host": "localhost",
"port": 7022
}
}
> get bar
{ ok: false,
error:
{ message: '\'bar\' is not defined in this ring.',
_error: true } }
{
"ok": false,
"error": {
"message": "'bar' is not defined in this ring.',
"_error": true
}
}
```

@@ -278,10 +345,49 @@

> has foo
{ ok: true,
data: true }
{
"ok": true,
"data": true
}
> has bar
{ ok: true,
data: false }
{
"ok": true,
"data": false
}
```
##### weight
This command will print metadata about an input node's weight (number of virtual nodes) in the cluster, or will return an error if the node doesn't exist in the cluster (according to the node our session targets). For example, given the previous setup:
```
> weight foo
{
"ok": true,
"data": 3
}
> weight bar
{
"ok": false,
"error": {
"message": "'bar' is not defined in this ring.',
"_error": true
}
}
```
##### weights
In the CLI session, type `weights`. This command will print the weights (number of virtual nodes) for every node in the cluster on the console. For example, if we've just started a new node with id `foo` at hostname `localhost` with port `7022` and default weight 3, we'd see the following output:
```
> weights
{
"ok": true,
"data": {
"foo": 3
}
}
```
##### join

@@ -294,10 +400,15 @@

> join ring
{ ok: true }
{
"ok": true
}
// if it's in a ring
> join ring
{ ok: false,
error:
{ message: 'Node already belongs to ring \'ring\'',
_error: true } }
{
"ok": false,
"error": {
"message": "Node already belongs to ring 'ring'",
"_error": true
}
}
```

@@ -314,3 +425,5 @@

> meet bar localhost 7023
{ ok: true }
{
"ok": true
}

@@ -328,11 +441,17 @@ // wait some time...

> leave
{ ok: true }
{
"ok": true
}
// immediately following this command...
> has bar
{ ok: true,
data: false }
{
"ok": true,
"data": false
}
// leave is done forcefully
> leave --force
{ ok: true }
{
"ok": true
}
```

@@ -351,13 +470,29 @@

> insert bar localhost 7023
{ ok: true }
{
"ok": true
}
> get bar
{ ok: true,
data: { id: 'bar', host: 'localhost', port: 7023 } }
{
"ok": true,
"data": {
"id": "bar",
"host": "localhost",
"port": 7023
}
}
// insert is done forcefully
> insert --force bar localhost 7023
{ ok: true }
{
"ok": true
}
> get bar
{ ok: true,
data: { id: 'bar', host: 'localhost', port: 7023 } }
{
"ok": true,
"data": {
"id": "bar",
"host": "localhost",
"port": 7023
}
}
```

@@ -375,19 +510,47 @@

> minsert bar localhost 7023 baz localhost 7024
{ ok: true }
{
"ok": true
}
> get bar
{ ok: true,
data: { id: 'bar', host: 'localhost', port: 7023 } }
{
"ok": true,
"data": {
"id": "bar",
"host": "localhost",
"port": 7023
}
}
> get baz
{ ok: true,
data: { id: 'baz', host: 'localhost', port: 7024 } }
{
"ok": true,
"data": {
"id": "baz",
"host": "localhost",
"port": 7024
}
}
// minsert is done forcefully
> minsert --force bar localhost 7023 baz localhost 7024
{ ok: true }
{
"ok": true
}
> get bar
{ ok: true,
data: { id: 'bar', host: 'localhost', port: 7023 } }
{
"ok": true,
"data": {
"id": "bar",
"host": "localhost",
"port": 7023
}
}
> get baz
{ ok: true,
data: { id: 'baz', host: 'localhost', port: 7024 } }
{
"ok": true,
"data": {
"id": "baz",
"host": "localhost",
"port": 7024
}
}
```

@@ -397,2 +560,41 @@

##### update
This command will tell the targeted node by this session to update a node in its cluster (as it currently views it) with a new insertion weight.
Subsequently, this information will be gossiped around the cluster, eventually resulting in every node thinking the input node belongs in the cluster.
For example:
```
> update foo localhost 7022 4
{
"ok": true
}
> get bar
{
"ok": true,
"data": {
"id": "foo",
"host": "localhost",
"port": 7023
}
}
// update is done forcefully
> update --force bar localhost 7022 4
{
"ok": true
}
> get bar
{
"ok": true,
"data": {
"id": "foo",
"host": "localhost",
"port": 7023
}
}
```
For documentation on how the `--force` option works for this command, or any other option, just run `help insert`.
##### remove

@@ -406,13 +608,21 @@

> remove bar localhost 7023
{ ok: true }
{
"ok": true
}
> has bar
{ ok: true,
data: false }
{
"ok": true,
"data": false
}
// remove is done forcefully
> remove --force bar localhost 7023
{ ok: true }
{
"ok": true
}
> has bar
{ ok: true,
data: false }
{
"ok": true,
"data": false
}
```

@@ -430,19 +640,31 @@

> mremove bar localhost 7023 baz localhost 7024
{ ok: true }
{
"ok": true
}
> has bar
{ ok: true,
data: false }
{
"ok": true,
"data": false
}
> has baz
{ ok: true,
data: false }
{
"ok": true,
"data": false
}
// mremove is done forcefully
> mremove --force bar localhost 7023 baz localhost 7024
{ ok: true }
{
"ok": true
}
> has bar
{ ok: true,
data: false }
{
"ok": true,
"data": false
}
> has baz
{ ok: true,
data: false }
{
"ok": true,
"data": false
}
```

@@ -449,0 +671,0 @@

@@ -8,3 +8,3 @@ var _ = require("lodash"),

require(path.join(__dirname, "unit"))(mocks, lib);
// require(path.join(__dirname, "integration"))(mocks, lib);
require(path.join(__dirname, "integration"))(mocks, lib);
});

@@ -1,9 +0,7 @@

var _ = require("lodash"),
async = require("async"),
assert = require("chai").assert;
module.exports = function (app, deps, mocks) {
module.exports = function (mocks, lib) {
describe("Integration tests", function () {
require("./gen_server")(mocks, lib);
require("./dlm")(mocks, lib);
require("./dsm")(mocks, lib);
});
};

@@ -19,5 +19,6 @@ var _ = require("lodash"),

assert.equal(chash.tree().length, 0);
assert.equal(chash._weights.size, 0);
var tree = rbt();
tree = tree.insert("key", "value");
tree = tree.insert("key", new Node("foo", "bar", 7022));
chash = new CHash(rfactor, pfactor, tree);

@@ -27,2 +28,3 @@ assert.equal(chash.rfactor(), 1);

assert.equal(chash.tree().length, 1);
assert.equal(chash._weights.get("foo"), 1);
});

@@ -35,2 +37,3 @@

assert.equal(chash.size(), rfactor);
assert.equal(chash._weights.get(node.id()), 1);

@@ -40,2 +43,8 @@ // should do nothing if 'id' already exists

assert.equal(chash.size(), rfactor);
var node2 = new Node("id2", "localhost", 8001);
chash.insert(node2, rfactor+1);
assert.equal(chash.size(), 2*rfactor+1);
assert.equal(chash._weights.get(node2.id()), rfactor+1);
assert.equal(chash._weights.size, 2);
});

@@ -47,3 +56,9 @@

chash.insert(node);
var node2 = new Node("id2", "localhost", 8001);
chash.insert(node2, rfactor+1);
// should remove node with custom weight
chash.remove(node2);
assert.equal(chash.size(), rfactor);
// should remove node

@@ -56,2 +71,3 @@ chash.remove(node);

assert.equal(chash.size(), 0);
assert.equal(chash._weights.size, 0);
});

@@ -73,8 +89,12 @@

it("Should update state at node", function () {
it("Should update weight at node", function () {
var node = new Node("id", "localhost", 8000);
chash.insert(node);
assert.equal(chash.size(), rfactor);
chash.update(node, "here");
assert.equal(chash.get(node), "here");
chash.update(node, rfactor+1);
assert.deepEqual(chash.get(node), node);
assert.equal(chash.size(), rfactor+1);
assert.equal(chash._weights.get(node.id()), rfactor+1);
assert.equal(chash._weights.size, 1);
});

@@ -164,2 +184,89 @@

it("Should find neighbors of a node with specified range", function () {
chash.pfactor(3);
chash.rfactor(3);
var node = new Node("id", "localhost", 8000);
chash.insert(node);
_.times(3, (n) => {
var tmp = new Node("id" + (n+1), "localhost", 8000 + n + 1);
chash.insert(tmp);
});
var out = chash.next(node, 1);
assert.lengthOf(out, 1);
assert.notOk(out[0].id() === node.id());
});
it("Should grab range of nodes based on a key, uniqueness/length check", function () {
var out = chash.rangeNext("foo");
assert.lengthOf(out, 0);
chash.pfactor(2);
chash.rfactor(3);
var node = new Node("id", "localhost", 8000);
chash.insert(node);
// should return neighbors
_.times(3, (n) => {
var tmp = new Node("id" + (n+1), "localhost", 8000 + n + 1);
chash.insert(tmp);
});
out = chash.rangeNext("key");
assert.lengthOf(out, chash.pfactor()+1);
var outMap = out.map((n) => n.id());
assert.lengthOf(_.uniq(outMap), out.length);
var out2 = chash.rangeNext("key", 2);
assert.lengthOf(out2, 2);
assert.deepEqual(out.slice(0, 2), out2);
var end = chash.tree().end;
var key = end.key;
var found = _.find([1,2,3].map((n) => {
return {node: end.value, n: n};
}), (val) => {
return chash._nodeName(val.node, val.n) === key;
});
out2 = chash.rangeNext(found.node.id() + "_" + found.n, 2);
assert.lengthOf(out2, 2);
var out2Map = out2.map((n) => n.id());
assert.lengthOf(_.uniq(out2Map), out2.length);
});
it("Should grab range of nodes based on a key, explicit list check", function () {
var node1 = new Node("id", "localhost", 8000);
var node2 = new Node("id2", "localhost", 8001);
var node3 = new Node("id3", "localhost", 8002);
var initKeys = [
"key1",
"key2",
"key3",
"key4",
"key5",
"key6"
];
var keys = initKeys.map((key) => {
return {key: key, hash: chash._findHash(key)};
}).sort((a, b) => {
return a.hash < b.hash ? -1 : (a.hash === b.hash ? 0 : 1);
});
initKeys = _.map(keys, "key");
keys = _.map(keys, "hash");
var tree = [
node1,
node2,
node1,
node3,
node2,
node3,
].map((val, i) => {
return {key: keys[i], value: val.toJSON(true)};
});
chash.fromJSON({rfactor: 2, pfactor: 2, tree: tree});
var out = chash.rangeNext(initKeys[0], 3);
assert.deepEqual(out, [node2, node1, node3]);
out = chash.rangeNext(initKeys[3], 3);
assert.deepEqual(out, [node2, node3, node1]);
});
it("Should find preceding neighbors of a node (pfactor=1,rfactor=1)", function () {

@@ -230,21 +337,86 @@ var node = new Node("id", "localhost", 8000);

it("Should fail to merge two rings if rfactors/pfactors don't match", function (){
var chash2 = new CHash(rfactor+1, pfactor);
var out;
try {
out = chash.merge(chash2);
} catch (e) {
out = e;
}
assert(out instanceof Error);
it("Should find preceding neighbors of a node with specified range", function () {
chash.pfactor(3);
chash.rfactor(3);
var node = new Node("id", "localhost", 8000);
chash.insert(node);
chash2 = new CHash(rfactor, pfactor+1);
try {
out = chash.merge(chash2);
} catch (e) {
out = e;
}
assert(out instanceof Error);
// should return neighbors
_.times(3, (n) => {
var tmp = new Node("id" + (n+1), "localhost", 8000 + n + 1);
chash.insert(tmp);
});
var out = chash.prev(node, 1);
assert.lengthOf(out, 1);
assert.notOk(out[0].id() === node.id());
});
it("Should grab reverse range of nodes based on key, unique/length check", function () {
var out = chash.rangePrev("foo");
assert.lengthOf(out, 0);
chash.pfactor(2);
chash.rfactor(3);
var node = new Node("id", "localhost", 8000);
chash.insert(node);
// should return neighbors
_.times(3, (n) => {
var tmp = new Node("id" + (n+1), "localhost", 8000 + n + 1);
chash.insert(tmp);
});
out = chash.rangePrev("key");
assert.lengthOf(out, chash.pfactor()+1);
var out2 = chash.rangePrev("key", 2);
assert.lengthOf(out2, 2);
assert.deepEqual(out.slice(0, 2), out2);
var begin = chash.tree().begin;
var key = begin.key;
var found = _.find([1,2,3].map((n) => {
return {node: begin.value, n: n};
}), (val) => {
return chash._nodeName(val.node, val.n) === key;
});
out2 = chash.rangePrev(found.node.id() + "_" + found.n, 2);
assert.lengthOf(out2, 2);
});
it("Should grab reverse range of nodes based on a key, explicit list check", function () {
var node1 = new Node("id", "localhost", 8000);
var node2 = new Node("id2", "localhost", 8001);
var node3 = new Node("id3", "localhost", 8002);
var initKeys = [
"key1",
"key2",
"key3",
"key4",
"key5",
"key6"
];
var keys = initKeys.map((key) => {
return {key: key, hash: chash._findHash(key)};
}).sort((a, b) => {
return a.hash < b.hash ? -1 : (a.hash === b.hash ? 0 : 1);
});
initKeys = _.map(keys, "key");
keys = _.map(keys, "hash");
var tree = [
node1,
node2,
node1,
node3,
node2,
node3,
].map((val, i) => {
return {key: keys[i], value: val.toJSON(true)};
});
chash.fromJSON({rfactor: 2, pfactor: 2, tree: tree});
var out = chash.rangePrev(initKeys[0], 3);
assert.deepEqual(out, [node3, node2, node1]);
out = chash.rangePrev(initKeys[3], 3);
assert.deepEqual(out, [node1, node2, node3]);
});
it("Should merge ring, not add any existing entries", function () {

@@ -258,2 +430,3 @@ var chash2 = new CHash(rfactor, pfactor);

assert.ok(chash.get(node));
assert.equal(chash._weights.get(node.id()), rfactor);
});

@@ -266,5 +439,6 @@

chash.insert(node);
chash2.insert(node, rfactor+1);
chash2.insert(node2);
chash.merge(chash2);
assert.equal(chash.size(), chash.rfactor()*2);
assert.equal(chash.size(), chash.rfactor()*2+1);
var nodes = _.map(chash.nodes(), (el) => {

@@ -274,23 +448,6 @@ return el.id();

assert.lengthOf(_.xor(nodes, ["id", "id2"]), 0);
assert.equal(chash._weights.get(node.id()), rfactor+1);
assert.equal(chash._weights.get(node2.id()), rfactor);
});
it("Should fail to intersect two rings if rfactors/pfactors don't match", function () {
var chash2 = new CHash(rfactor+1, pfactor);
var out;
try {
out = chash.intersect(chash2);
} catch (e) {
out = e;
}
assert(out instanceof Error);
chash2 = new CHash(rfactor, pfactor+1);
try {
out = chash.intersect(chash2);
} catch (e) {
out = e;
}
assert(out instanceof Error);
});
it("Should intersect rings, subset", function () {

@@ -300,8 +457,10 @@ var chash2 = new CHash(rfactor, pfactor);

var node2 = new Node("id2", "localhost", 8001);
chash.insert(node);
chash2.insert(node);
chash.insert(node, 2);
chash2.insert(node, 2);
chash2.insert(node2);
chash.intersect(chash2);
assert.equal(chash.size(), chash.rfactor());
assert.equal(chash.size(), chash.rfactor()*2);
assert.equal(chash.nodes()[0].id(), node.id());
assert.equal(chash._weights.get(node.id()), 2);
assert.equal(chash._weights.has(node2.id()), false);
});

@@ -319,2 +478,4 @@

assert.equal(chash.nodes()[0].id(), node.id());
assert.equal(chash._weights.get(node.id()), rfactor);
assert.equal(chash._weights.has(node2.id()), false);
});

@@ -330,2 +491,3 @@

assert.equal(chash.size(), 0);
assert.equal(chash._weights.size, 0);
});

@@ -340,2 +502,21 @@

it("Should return number of nodes correctly", function () {
assert.equal(chash.numberNodes(), 0);
var node = new Node("id", "localhost", 8000);
chash.insert(node);
assert.equal(chash.numberNodes(), 1);
});
it("Should return weights of nodes", function () {
var node = new Node("id", "localhost", 8000);
chash.insert(node);
var node2 = new Node("id2", "localhost", 8001);
chash.insert(node2, 2);
var weights = chash.weights();
assert.equal(weights.get(node.id()), 1);
assert.equal(weights.get(node2.id()), 2);
});
it("Should return nodes correctly", function () {

@@ -357,12 +538,2 @@ assert.lengthOf(chash.nodes(), 0);

it("Should fail equality check if rfactors mismatch", function () {
var chash2 = new CHash(rfactor+1, pfactor);
assert.notOk(chash.equals(chash2));
});
it("Should fail equality check if pfactors mismatch", function () {
var chash2 = new CHash(rfactor, pfactor+1);
assert.notOk(chash.equals(chash2));
});
it("Should fail equality check if tree sizes mismatch", function () {

@@ -403,3 +574,3 @@ var chash2 = new CHash(rfactor, pfactor);

out = chash.toJSON(true);
var name = CHash._nodeName(node, 1);
var name = chash._nodeName(node, 1);
assert.deepEqual(out, {

@@ -419,3 +590,3 @@ rfactor: rfactor,

var node = new Node("id", "localhost", 8000);
var name = CHash._nodeName(node, 1);
var name = chash._nodeName(node, 1);
var ent = {

@@ -439,2 +610,3 @@ rfactor: 1,

assert.equal(chash.size(), 1);
assert.equal(chash._weights.get("id"), 1);
assert.ok(chash.get(node));

@@ -441,0 +613,0 @@ });

@@ -15,2 +15,3 @@ var _ = require("lodash"),

VectorClock = lib.vclock,
utils = lib.utils,
Node = lib.node,

@@ -159,2 +160,14 @@ MockIPC = mocks.ipc;

it("Should decode singleton", function () {
var job = {event: "inspect"};
var out = server.decodeSingleton(job);
assert.deepEqual(out, {event: "inspect", data: undefined});
out = server.decodeSingleton({event: "get"});
assert.ok(out instanceof Error);
out = server.decodeSingleton({event: "not defined"});
assert.ok(out instanceof Error);
});
it("Should skip parsing stream if command comes from cluster node", function () {

@@ -289,4 +302,5 @@ var data = Buffer.from(JSON.stringify({ok: true}));

var data = {node: new Node("foo", "localhost", 8000)};
sinon.stub(gossip, "insert", (node, force) => {
sinon.stub(gossip, "insert", (node, weight, force) => {
assert.ok(node.equals(data.node));
assert.equal(weight, gossip.ring().rfactor());
assert.equal(force, false);

@@ -297,9 +311,30 @@ });

gossip.insert.restore();
data = {node: new Node("foo", "localhost", 8000), weight: -1};
sinon.stub(gossip, "insert", (node, weight, force) => {
assert.ok(node.equals(data.node));
assert.equal(weight, gossip.ring().rfactor());
assert.equal(force, false);
});
out = server.insert(data, from);
assert.equal(out.ok, true);
gossip.insert.restore();
data = {node: new Node("foo", "localhost", 8000), weight: 1};
sinon.stub(gossip, "insert", (node, weight, force) => {
assert.ok(node.equals(data.node));
assert.equal(weight, 1);
assert.equal(force, false);
});
out = server.insert(data, from);
assert.equal(out.ok, true);
gossip.insert.restore();
done();
});
it("Should insert a node into a ring, default force", function (done) {
it("Should insert a node into a ring, set force", function (done) {
var data = {node: new Node("foo", "localhost", 8000), force: true};
sinon.stub(gossip, "insert", (node, force) => {
sinon.stub(gossip, "insert", (node, weight, force) => {
assert.ok(node.equals(data.node));
assert.equal(weight, gossip.ring().rfactor());
assert.equal(force, true);

@@ -315,4 +350,5 @@ });

var data = {nodes: [new Node("foo", "localhost", 8000)]};
sinon.stub(gossip, "minsert", (nodes, force) => {
sinon.stub(gossip, "minsert", (nodes, weight, force) => {
assert.lengthOf(nodes, 1);
assert.equal(weight, gossip.ring().rfactor());
assert.equal(force, false);

@@ -323,2 +359,22 @@ });

gossip.minsert.restore();
data = {nodes: [new Node("foo", "localhost", 8000)], weight: -1};
sinon.stub(gossip, "minsert", (nodes, weight, force) => {
assert.lengthOf(nodes, 1);
assert.equal(weight, gossip.ring().rfactor());
assert.equal(force, false);
});
out = server.minsert(data, from);
assert.equal(out.ok, true);
gossip.minsert.restore();
data = {nodes: [new Node("foo", "localhost", 8000)], weight: 1};
sinon.stub(gossip, "minsert", (nodes, weight, force) => {
assert.lengthOf(nodes, 1);
assert.equal(weight, 1);
assert.equal(force, false);
});
out = server.minsert(data, from);
assert.equal(out.ok, true);
gossip.minsert.restore();
done();

@@ -329,4 +385,5 @@ });

var data = {nodes: [new Node("foo", "localhost", 8000)], force: true};
sinon.stub(gossip, "minsert", (nodes, force) => {
sinon.stub(gossip, "minsert", (nodes, weight, force) => {
assert.lengthOf(nodes, 1);
assert.equal(weight, gossip.ring().rfactor());
assert.equal(force, true);

@@ -342,4 +399,5 @@ });

var data = {nodes: [kernel.self()]};
sinon.stub(gossip, "minsert", (nodes, force) => {
sinon.stub(gossip, "minsert", (nodes, weight, force) => {
assert.lengthOf(nodes, 0);
assert.equal(weight, gossip.ring().rfactor());
assert.equal(force, false);

@@ -353,2 +411,48 @@ });

it("Should insert a node into a ring, default force", function (done) {
var data = {node: new Node("foo", "localhost", 8000)};
sinon.stub(gossip, "update", (node, weight, force) => {
assert.ok(node.equals(data.node));
assert.equal(weight, gossip.ring().rfactor());
assert.equal(force, false);
});
var out = server.update(data, from);
assert.equal(out.ok, true);
gossip.update.restore();
data = {node: new Node("foo", "localhost", 8000), weight: -1};
sinon.stub(gossip, "update", (node, weight, force) => {
assert.ok(node.equals(data.node));
assert.equal(weight, gossip.ring().rfactor());
assert.equal(force, false);
});
out = server.update(data, from);
assert.equal(out.ok, true);
gossip.update.restore();
data = {node: new Node("foo", "localhost", 8000), weight: 1};
sinon.stub(gossip, "update", (node, weight, force) => {
assert.ok(node.equals(data.node));
assert.equal(weight, 1);
assert.equal(force, false);
});
out = server.update(data, from);
assert.equal(out.ok, true);
gossip.update.restore();
done();
});
it("Should insert a node into a ring, set force", function (done) {
var data = {node: new Node("foo", "localhost", 8000), force: true};
sinon.stub(gossip, "update", (node, weight, force) => {
assert.ok(node.equals(data.node));
assert.equal(weight, gossip.ring().rfactor());
assert.equal(force, true);
});
var out = server.update(data, from);
assert.equal(out.ok, true);
gossip.update.restore();
done();
});
it("Should remove a node from a ring, default force", function (done) {

@@ -450,4 +554,44 @@ var data = {node: new Node("foo", "localhost", 8000)};

});
it("Should return weight of node in ring", function (done) {
var data = {id: id};
var out = server.weight(data, from);
assert.equal(out.ok, true);
assert.equal(out.data, gossip.ring().weights().get(kernel.self().id()));
done();
});
it("Should return error if checking weight of node that doesn't exist", function (done) {
var data = {id: id + "1"};
var out = server.weight(data, from);
assert.equal(out.ok, false);
assert.isObject(out.error);
done();
});
it("Should return weights of all nodes in ring", function (done) {
var data = null;
var out = server.weights(data, from);
assert.equal(out.ok, true);
assert.deepEqual(out.data, utils.mapToObject(gossip.ring().weights()));
done();
});
it("Should return list of nodes in ring", function (done) {
var data = {};
var out = server.nodes(data, from);
assert.equal(out.ok, true);
assert.deepEqual(out.data, gossip.ring().nodes());
done();
});
it("Should return status of residing node on ping", function (done) {
var data = {};
var out = server.ping(data, from);
assert.equal(out.ok, true);
assert.equal(out.data, "pong");
done();
});
});
});
};

@@ -142,4 +142,20 @@ var _ = require("lodash"),

it("Should return max queue length", function () {
assert.equal(conn.maxLen(), 1024);
});
it("Should set new max queue length", function () {
conn.maxLen(1023);
assert.equal(conn.maxLen(), 1023);
conn.maxLen(-1);
assert.equal(conn.maxLen(), 1023);
conn._queue.enqueue("foo");
conn._queue.enqueue("bar");
conn.maxLen(1);
assert.equal(conn.maxLen(), 1);
assert.equal(conn._queue.size(), 1);
});
it("Should throw if connection inactive and data is sent", function () {
var out;
var data = {

@@ -149,7 +165,3 @@ data: "bar",

};
try {
out = conn.send("foo", data);
} catch (e) {
out = e;
}
var out = conn.send("foo", data);
assert(out instanceof Error);

@@ -173,2 +185,20 @@ });

it("Should queue data and drop data if queue has reached max size", function () {
conn._active = true;
conn._connecting = true;
conn._maxLen = 1;
conn._queue.enqueue("data");
var data = {
data: "bar",
stream: {stream: uuid.v4(), done: false}
};
conn.send("foo", data);
assert.equal(conn.queue().size(), 1);
assert.notEqual(conn._queue.peek(), "data");
assert.deepEqual(conn.queue().dequeue(), {
event: "foo",
data: data
});
});
it("Should write data if connection active and not reconnecting", function (done) {

@@ -175,0 +205,0 @@ conn.start();

@@ -226,2 +226,21 @@ var _ = require("lodash"),

});
it("Should decode singleton", function () {
var job = {
event: "rlock",
data: {
id: "id",
holder: "holder",
timeout: 1000
}
};
var out = server.decodeSingleton(job);
assert.deepEqual(out, job);
out = server.decodeSingleton({event: "rlock"});
assert.ok(out instanceof Error);
out = server.decodeSingleton({event: "not defined"});
assert.ok(out instanceof Error);
});
});

@@ -362,5 +381,5 @@

});
return cb(null, [JSON.stringify({ok: true})]);
return cb(null, [{ok: true}]);
});
return cb(null, [JSON.stringify({ok: false})]);
return cb(null, [{ok: false}]);
});

@@ -389,3 +408,3 @@ sinon.stub(server, "abcast");

});
return cb(null, [JSON.stringify({ok: true})]);
return cb(null, [{ok: true}]);
});

@@ -492,5 +511,5 @@ server.rlock("id", "holder", 1000, (err, res) => {

});
return cb(null, [JSON.stringify({ok: true})]);
return cb(null, [{ok: true}]);
});
return cb(null, [JSON.stringify({ok: false})]);
return cb(null, [{ok: false}]);
});

@@ -509,3 +528,3 @@ sinon.stub(server, "abcast");

it("Should perform rlock command", function (done) {
it("Should perform wlock command", function (done) {
sinon.stub(server, "multicall", (nodes, id, event, msg, cb, timeout) => {

@@ -520,3 +539,3 @@ assert.deepEqual(nodes, gossip.ring().nodes());

});
return cb(null, [JSON.stringify({ok: true})]);
return cb(null, [{ok: true}]);
});

@@ -646,3 +665,3 @@ server.wlock("id", "holder", 1000, (err, res) => {

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: false});
assert.deepEqual(out, {ok: false});
});

@@ -664,3 +683,3 @@ server._doRLock({

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true});
assert.deepEqual(out, {ok: true});
});

@@ -683,3 +702,3 @@ server._doRLock({

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true});
assert.deepEqual(out, {ok: true});
});

@@ -715,3 +734,3 @@ server._doRLock({

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: false});
assert.deepEqual(out, {ok: false});
});

@@ -732,3 +751,3 @@ server._doWLock({

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true});
assert.deepEqual(out, {ok: true});
});

@@ -752,3 +771,3 @@ server._doWLock({

sinon.stub(server, "_safeReply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: false});
assert.deepEqual(out, {ok: false});
});

@@ -769,3 +788,3 @@ server._doRUnlock({

sinon.stub(server, "_safeReply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: false});
assert.deepEqual(out, {ok: false});
});

@@ -786,3 +805,3 @@ server._doRUnlock({

sinon.stub(server, "_safeReply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: false});
assert.deepEqual(out, {ok: false});
});

@@ -803,3 +822,3 @@ server._doRUnlock({

sinon.stub(server, "_safeReply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true});
assert.deepEqual(out, {ok: true});
});

@@ -826,3 +845,3 @@ server._doRUnlock({

sinon.stub(server, "_safeReply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: false});
assert.deepEqual(out, {ok: false});
});

@@ -843,3 +862,3 @@ server._doWUnlock({

sinon.stub(server, "_safeReply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: false});
assert.deepEqual(out, {ok: false});
});

@@ -861,3 +880,3 @@ server._doWUnlock({

sinon.stub(server, "_safeReply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: false});
assert.deepEqual(out, {ok: false});
});

@@ -879,3 +898,3 @@ server._doWUnlock({

sinon.stub(server, "_safeReply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true});
assert.deepEqual(out, {ok: true});
});

@@ -981,9 +1000,4 @@ server._doWUnlock({

it("Should encode a response over the network kernel", function () {
var out = DLMServer.encodeResp({foo: "bar"});
assert.equal(out, JSON.stringify({foo: "bar"}));
});
it("Should find, from a multicall response, which responses indicate success", function () {
var out = DLMServer.findLockPasses(["foo", "bar"], [JSON.stringify({ok: false}), JSON.stringify({ok: true})]);
var out = DLMServer.findLockPasses(["foo", "bar"], [{ok: false}, {ok: true}]);
assert.deepEqual(out, ["bar"]);

@@ -990,0 +1004,0 @@ });

@@ -345,23 +345,2 @@ var _ = require("lodash"),

it("Should fail to perform read command, response invalid JSON", function (done) {
sinon.stub(server, "call", (node, event, msg, cb, timeout) => {
assert.deepEqual(node, {
node: gossip.ring().find("id"),
id: server._id
});
assert.equal(event, "read");
assert.deepEqual(msg, {
id: "id"
});
async.nextTick(() => {
return cb(null, "{");
});
});
server.read("id", (err, res) => {
assert.ok(err);
server.call.restore();
done();
});
});
it("Should perform read command", function (done) {

@@ -377,3 +356,3 @@ sinon.stub(server, "call", (node, event, msg, cb, timeout) => {

});
return cb(null, JSON.stringify({ok: true, data: {n: 3, active: 0}}));
return cb(null, {ok: true, data: {n: 3, active: 0}});
});

@@ -504,5 +483,5 @@ server.read("id", (err, res) => {

});
return cb(null, JSON.stringify({ok: true}));
return cb(null, {ok: true});
});
return cb(null, JSON.stringify({ok: false}));
return cb(null, {ok: false});
});

@@ -532,3 +511,3 @@ sinon.stub(server, "cast");

});
return cb(null, JSON.stringify({ok: true}));
return cb(null, {ok: true});
});

@@ -625,3 +604,3 @@ server.post("id", "holder", 1000, (err) => {

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true});
assert.deepEqual(out, {ok: true});
});

@@ -641,3 +620,3 @@ server._doCreate({

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true});
assert.deepEqual(out, {ok: true});
});

@@ -673,3 +652,3 @@ server._doCreate({

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true, data: {n: 3, active: 0}});
assert.deepEqual(out, {ok: true, data: {n: 3, active: 0}});
});

@@ -688,3 +667,3 @@ server._doRead({

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true});
assert.deepEqual(out, {ok: true});
});

@@ -704,3 +683,3 @@ server._doDestroy({

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true});
assert.deepEqual(out, {ok: true});
});

@@ -736,3 +715,3 @@ server._doDestroy({

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: false});
assert.deepEqual(out, {ok: false});
});

@@ -753,3 +732,3 @@ server._doPost({

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true});
assert.deepEqual(out, {ok: true});
});

@@ -770,3 +749,3 @@ server._doPost({

sinon.stub(server, "reply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true});
assert.deepEqual(out, {ok: true});
});

@@ -804,3 +783,3 @@ server._doPost({

sinon.stub(server, "_safeReply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: false});
assert.deepEqual(out, {ok: false});
});

@@ -821,3 +800,3 @@ server._doClose({

sinon.stub(server, "_safeReply", (from, out) => {
assert.deepEqual(JSON.parse(out), {ok: true});
assert.deepEqual(out, {ok: true});
});

@@ -897,7 +876,2 @@ server._doClose({

it("Should encode a response over the network kernel", function () {
var out = DSMServer.encodeResp({foo: "bar"});
assert.equal(out, JSON.stringify({foo: "bar"}));
});
it("Should calculate wait time for a retry", function () {

@@ -904,0 +878,0 @@ var out = DSMServer.calculateWaitTime(0, 100);

@@ -6,2 +6,3 @@ var _ = require("lodash"),

fs = require("fs"),
zlib = require("zlib"),
assert = require("chai").assert;

@@ -20,3 +21,7 @@

it("Should construct a DTabel", function () {
after(function (done) {
fs.unlink("./data/foo_LATEST.LOG", done);
});
it("Should construct a DTable", function () {
assert.equal(dtable._autoSave, consts.dtableOpts.autoSave);

@@ -31,2 +36,14 @@ assert.equal(dtable._writeCount, 0);

assert.deepEqual(dtable._encodeFn, DTable.encodeValue);
assert.deepEqual(dtable._decodeFn, DTable.decodeValue);
dtable = new DTable({
path: "./data",
encodeFn: _.identity,
decodeFn: _.identity
});
assert.deepEqual(dtable._encodeFn, _.identity);
assert.deepEqual(dtable._decodeFn, _.identity);
var out;

@@ -94,5 +111,8 @@ try {

it("Should load dtable instance", function (done) {
dtable.load((err) => {
assert.notOk(err);
done();
dtable.start("foo");
dtable.once("open", () => {
dtable.load((err) => {
assert.notOk(err);
dtable.stop(done);
});
});

@@ -108,2 +128,8 @@ });

it("Should return if dtable has compression enabled or not", function () {
assert.equal(dtable.compress(), false);
dtable.compress(true);
assert.equal(dtable.compress(), true);
});
it("Should get value in table", function () {

@@ -286,3 +312,3 @@ dtable._table.set("key", "val");

it("Should flush state to disk", function (done) {
dtable._setupAOFSyncInterval();
dtable.start("foo");
sinon.stub(dtable, "_setupAOFSyncInterval");

@@ -312,2 +338,4 @@

it("Should flush AOF files to disk", function (done) {
dtable._aofPath = "./data/LATEST.LOG";
dtable._tmpAOFPath = "./data/PREV.LOG";
fs.writeFile(dtable._aofPath, "", (err) => {

@@ -323,2 +351,4 @@ assert.notOk(err);

it("Should fail flushing AOF files to disk", function (done) {
dtable._aofPath = "./data/LATEST.LOG";
dtable._tmpAOFPath = "./data/PREV.LOG";
fs.writeFile(dtable._aofPath, "", (err) => {

@@ -344,38 +374,54 @@ assert.notOk(err);

dtable.set("foo", "bar");
sinon.stub(fs, "createWriteStream", () => {
var pstream = new stream.PassThrough();
pstream.on("data", (data) => {
data = JSON.parse(data);
out.set(data.key, DTable.decodeValue(data.value));
dtable.start("foo");
dtable.once("open", () => {
sinon.stub(fs, "createWriteStream", () => {
var pstream = new stream.PassThrough();
pstream.on("data", (data) => {
data = JSON.parse(data);
out.set(data.key, DTable.decodeValue(data.value));
});
pstream.once("end", () => {
pstream.emit("close");
});
return pstream;
});
pstream.once("end", () => {
pstream.emit("close");
sinon.stub(fs, "rename", (oldPath, newPath, cb) => {
cb();
});
return pstream;
dtable._flushTable({
foo: "bar"
}, (err) => {
assert.notOk(err);
assert.ok(_.isEqual(out, dtable._table));
fs.createWriteStream.restore();
assert.ok(fs.rename.called);
fs.rename.restore();
dtable.stop(done);
});
});
dtable._flushTable({
foo: "bar"
}, (err) => {
assert.notOk(err);
assert.ok(_.isEqual(out, dtable._table));
fs.createWriteStream.restore();
async.nextTick(done);
});
});
it("Should fail flushing snapshot of table to disk", function (done) {
sinon.stub(fs, "createWriteStream", () => {
var pstream = new stream.PassThrough();
async.nextTick(() => {
pstream.emit("error", new Error("foo"));
dtable.start("foo");
dtable.once("open", () => {
sinon.stub(fs, "createWriteStream", () => {
var pstream = new stream.PassThrough();
async.nextTick(() => {
pstream.emit("error", new Error("foo"));
});
return pstream;
});
return pstream;
sinon.stub(fs, "unlink", (rmPath, cb) => {
cb();
});
dtable._flushTable({
foo: "bar"
}, (err) => {
assert.ok(err);
fs.createWriteStream.restore();
assert.ok(fs.unlink.called);
fs.unlink.restore();
dtable.stop(done);
});
});
dtable._flushTable({
foo: "bar"
}, (err) => {
assert.ok(err);
fs.createWriteStream.restore();
async.nextTick(done);
});
});

@@ -410,3 +456,3 @@

});
dtable._loadState((err) => {
dtable._loadState("path", (err) => {
assert.ok(err);

@@ -422,3 +468,3 @@ fs.stat.restore();

});
dtable._loadState((err) => {
dtable._loadState("path", (err) => {
assert.notOk(err);

@@ -446,3 +492,3 @@ assert.equal(dtable._table.size, 0);

});
dtable._loadState((err) => {
dtable._loadState("path", (err) => {
assert.notOk(err);

@@ -467,3 +513,3 @@ assert.ok(_.isEqual(dtable._table, new Map([["key", "val"]])));

});
dtable._loadState((err) => {
dtable._loadState("path", (err) => {
assert.ok(err);

@@ -544,2 +590,46 @@ fs.stat.restore();

});
it("Should create flush streams with compression", function (done) {
let acc = Buffer.from("");
dtable._path = "./data/DATA.SNAP";
dtable.compress(true);
sinon.stub(fs, "createWriteStream", () => {
var pstream = new stream.PassThrough();
pstream.on("data", (data) => {
acc = Buffer.concat([acc, data], acc.length+data.length);
});
return pstream;
});
const {fstream, wstream} = dtable._createFlushStreams();
fstream.on("end", () => {
const comp = zlib.gzipSync(Buffer.from("foobar"));
assert.equal(Buffer.compare(acc, comp), 0);
done();
});
wstream.write(Buffer.from("foobar"));
wstream.end();
});
it("Should create load streams with compression", function (done) {
let acc = Buffer.from("");
dtable._path = "./data/DATA.SNAP";
dtable.compress(true);
sinon.stub(fs, "createReadStream", () => {
var pstream = new stream.PassThrough();
async.nextTick(() => {
pstream.write(zlib.gzipSync("foobar"));
pstream.end();
});
return pstream;
});
const {rstream} = dtable._createLoadStreams();
rstream.on("data", (data) => {
acc = Buffer.concat([acc, data], acc.length+data.length);
});
rstream.on("end", () => {
const comp = Buffer.from("foobar");
assert.equal(Buffer.compare(acc, comp), 0);
done();
});
});
});

@@ -546,0 +636,0 @@

@@ -143,5 +143,27 @@ var _ = require("lodash"),

job = Buffer.from(JSON.stringify({event: "foo", data: Buffer.from("bar"), hello: "world"}));
out = server.decodeJob(job);
assert.deepEqual(out, {event: "foo", data: Buffer.from("bar")});
out = server.decodeJob(Buffer.from("foo"));
assert.ok(out instanceof Error);
});
it("Should decode singleton", function () {
var job = "foo";
var out = server.decodeSingleton(job);
assert.equal(job, out);
job = {event: "foo", data: "bar"};
out = server.decodeSingleton(job);
assert.deepEqual(job, out);
job = {event: "foo", data: {}};
out = server.decodeSingleton(job);
assert.deepEqual(job, out);
job = {event: "foo", data: Buffer.from("bar").toJSON()};
out = server.decodeSingleton(job);
assert.deepEqual(out, {event: "foo", data: Buffer.from("bar")});
});
});

@@ -188,2 +210,36 @@

it("Should skip parsing job stream if paused", function () {
server._paused = true;
sinon.spy(server, "decodeSingleton");
server._parse("", {done: true}, {});
assert.notOk(server.decodeSingleton.called);
server.decodeSingleton.restore();
server._paused = false;
});
it("Should parse incoming singleton", function (done) {
server.once("foo", () => {
async.nextTick(done);
});
server._parse({event: "foo", data: "bar"}, {done: true}, {});
});
it("Should skip emitting message on failed singleton", function () {
sinon.spy(server, "emit");
sinon.stub(server, "decodeSingleton", () => {
return new Error("foo");
});
server._parse({event: "foo", data: "bar"}, {done: true}, {});
assert.notOk(server.emit.called);
server.emit.restore();
server.decodeSingleton.restore();
});
it("Should skip emitting singleton when stream has error", function () {
sinon.spy(server, "emit");
server._parse(null, {error: {}, done: true}, {});
assert.notOk(server.emit.called);
server.emit.restore();
});
it("Should parse incoming job streams", function () {

@@ -211,7 +267,6 @@ var data = Buffer.from(JSON.stringify({ok: true}));

sinon.spy(server, "decodeJob");
var data = Buffer.from(JSON.stringify({ok: true}));
var stream = {stream: uuid.v4(), error: {foo: "bar"}, done: true};
var init = Buffer.from("foo");
server.streams().set(stream.stream, {data: init});
server._parse(data, stream, {});
server._parse(null, stream, {});
assert.notOk(server.streams().has(stream.stream));

@@ -234,3 +289,3 @@ assert.notOk(server.decodeJob.called);

server._parse(data, stream, {});
server._parse(data, {stream: stream.stream, done: true}, {});
server._parse(null, {stream: stream.stream, done: true}, {});
assert.notOk(server._streams.has(stream.stream));

@@ -250,3 +305,3 @@ assert.ok(server.emit.called);

server._parse(data, stream, {});
server._parse(data, {stream: stream.stream, done: true}, {});
server._parse(null, {stream: stream.stream, done: true}, {});
assert.notOk(server._streams.has(stream.stream));

@@ -257,12 +312,2 @@ assert.notOk(server.emit.calledWith(["idle"]));

it("Should decode incoming job", function () {
var buf = Buffer.from(JSON.stringify({
event: "msg",
data: "foo"
}));
var out = server.decodeJob(buf);
assert.equal(out.event, "msg");
assert.equal(out.data, "foo");
});
it("Should parse recipient as local node", function () {

@@ -269,0 +314,0 @@ var id = "id";

@@ -393,4 +393,3 @@ var _ = require("lodash"),

gossip.kernel().connection(node2).once("send", (msg, data) => {
var inner = JSON.parse(Buffer.from(data.data.data));
var job = inner.data;
var job = data.data.data;
assert.equal(job.type, "join");

@@ -410,3 +409,3 @@ assert.notEqual(job.actor, gossip._actor);

var before = gossip.ring().size();
gossip.insert(node2);
gossip.insert(node2, 1);
assert.deepEqual(gossip.ring().size(), before);

@@ -427,3 +426,3 @@ gossip.ring().remove(node2);

});
gossip.insert(node2);
gossip.insert(node2, 3);
assert.equal(gossip.ring().size(), 6);

@@ -434,4 +433,3 @@ assert.ok(gossip._actor);

gossip.kernel().connection(node2).once("send", (msg, data) => {
var inner = JSON.parse(Buffer.from(data.data.data));
var job = inner.data;
var job = data.data.data;
assert.equal(job.type, "update");

@@ -449,3 +447,3 @@ assert.equal(job.actor, gossip._actor);

gossip.streams(new Map([["key", "value"]]));
gossip.insert(node2);
gossip.insert(node2, 3);
gossip.once("process", (ring) => {

@@ -458,4 +456,3 @@ assert.ok(gossip.ring().isDefined(node2));

gossip.kernel().connection(node2).once("send", (msg, data) => {
var inner = JSON.parse(Buffer.from(data.data.data));
var job = inner.data;
var job = data.data.data;
assert.equal(job.type, "update");

@@ -490,3 +487,3 @@ assert.equal(job.actor, gossip._actor);

});
gossip.insert(node2, true);
gossip.insert(node2, 3, true);
assert.equal(gossip.ring().size(), 6);

@@ -497,4 +494,3 @@ assert.ok(gossip._actor);

gossip.kernel().connection(node2).once("send", (msg, data) => {
var inner = JSON.parse(Buffer.from(data.data.data));
var job = inner.data;
var job = data.data.data;
assert.equal(job.type, "update");

@@ -509,2 +505,115 @@ assert.equal(job.actor, gossip._actor);

it("Should update a node in gossip ring when already idle, node doesn't exist", function (done) {
var node2 = new Node("id2", host, port+1);
gossip.once("process", (ring) => {
assert.equal(gossip.ring().size(), 7);
assert.ok(gossip.ring().isDefined(node2));
});
gossip.once("send", (clock, event, msg) => {
assert.equal(event, "ring");
assert.deepEqual(msg, gossip.ring().toJSON(true));
assert.deepEqual(clock, gossip.vclock());
});
gossip.update(node2, 4);
assert.equal(gossip.ring().size(), 7);
assert.ok(gossip._actor);
assert.ok(gossip.vclock().has(gossip._actor));
assert.ok(gossip.kernel().isConnected(node2));
gossip.kernel().connection(node2).once("send", (msg, data) => {
var job = data.data.data;
assert.equal(job.type, "update");
assert.equal(job.actor, gossip._actor);
assert.deepEqual(job.data, gossip.ring().toJSON(true));
assert.deepEqual(job.vclock, gossip.vclock().toJSON(true));
assert.equal(job.round, GossipRing.maxMsgRound(gossip.ring())-1);
done();
});
});
it("Should update a node in gossip ring when already idle, node already exists", function (done) {
var node2 = new Node("id2", host, port+1);
gossip.ring().insert(node2);
assert.equal(gossip.ring().size(), 6);
gossip.once("process", (ring) => {
assert.equal(gossip.ring().size(), 7);
assert.ok(gossip.ring().isDefined(node2));
});
gossip.once("send", (clock, event, msg) => {
assert.equal(event, "ring");
assert.deepEqual(msg, gossip.ring().toJSON(true));
assert.deepEqual(clock, gossip.vclock());
});
gossip.update(node2, 4);
assert.equal(gossip.ring().size(), 7);
assert.ok(gossip._actor);
assert.ok(gossip.vclock().has(gossip._actor));
assert.ok(gossip.kernel().isConnected(node2));
gossip.kernel().connection(node2).once("send", (msg, data) => {
var job = data.data.data;
assert.equal(job.type, "update");
assert.equal(job.actor, gossip._actor);
assert.deepEqual(job.data, gossip.ring().toJSON(true));
assert.deepEqual(job.vclock, gossip.vclock().toJSON(true));
assert.equal(job.round, GossipRing.maxMsgRound(gossip.ring())-1);
done();
});
});
it("Should update a node in gossip ring, waiting for idle state", function (done) {
var node2 = new Node("id2", host, port+1);
gossip.streams(new Map([["key", "value"]]));
gossip.update(node2, 4);
gossip.once("process", (ring) => {
assert.ok(gossip.ring().isDefined(node2));
assert.equal(gossip.ring().size(), 7);
assert.ok(gossip._actor);
assert.ok(gossip.vclock().has(gossip._actor));
assert.ok(gossip.kernel().isConnected(node2));
gossip.kernel().connection(node2).once("send", (msg, data) => {
var job = data.data.data;
assert.equal(job.type, "update");
assert.equal(job.actor, gossip._actor);
assert.deepEqual(job.data, gossip.ring().toJSON(true));
assert.deepEqual(job.vclock, gossip.vclock().toJSON(true));
assert.equal(job.round, GossipRing.maxMsgRound(gossip.ring())-1);
done();
});
});
gossip.once("send", (clock, event, msg) => {
assert.equal(event, "ring");
assert.deepEqual(msg, gossip.ring().toJSON(true));
assert.deepEqual(clock, gossip.vclock());
});
gossip.streams(new Map());
// trigger test
gossip.emit("idle");
});
it("Should forcefully update a node in gossip ring", function (done) {
var node2 = new Node("id2", host, port+1);
gossip.once("process", (ring) => {
assert.equal(gossip.ring().size(), 7);
assert.ok(gossip.ring().isDefined(node2));
});
gossip.once("send", (clock, event, msg) => {
assert.equal(event, "ring");
assert.deepEqual(msg, gossip.ring().toJSON(true));
assert.deepEqual(clock, gossip.vclock());
});
gossip.insert(node2, 4, true);
assert.equal(gossip.ring().size(), 7);
assert.ok(gossip._actor);
assert.ok(gossip.vclock().has(gossip._actor));
assert.ok(gossip.kernel().isConnected(node2));
gossip.kernel().connection(node2).once("send", (msg, data) => {
var job = data.data.data;
assert.equal(job.type, "update");
assert.equal(job.actor, gossip._actor);
assert.deepEqual(job.data, gossip.ring().toJSON(true));
assert.deepEqual(job.vclock, gossip.vclock().toJSON(true));
assert.equal(job.round, GossipRing.maxMsgRound(gossip.ring())-1);
done();
});
});
it("Should ignore an minsert request into the gossip ring", function () {

@@ -514,3 +623,3 @@ var node2 = new Node("id2", host, port+1);

var before = gossip.ring().size();
gossip.minsert([node2]);
gossip.minsert([node2], 3);
assert.equal(gossip.ring().size(), before);

@@ -531,3 +640,3 @@ gossip.ring().remove(node2);

});
gossip.minsert([node2]);
gossip.minsert([node2], 3);
assert.equal(gossip.ring().size(), 6);

@@ -538,4 +647,3 @@ assert.ok(gossip._actor);

gossip.kernel().connection(node2).once("send", (msg, data) => {
var inner = JSON.parse(Buffer.from(data.data.data));
var job = inner.data;
var job = data.data.data;
assert.equal(job.type, "update");

@@ -553,3 +661,3 @@ assert.equal(job.actor, gossip._actor);

gossip.streams(new Map([["key", "value"]]));
gossip.minsert([node2]);
gossip.minsert([node2], 3);
gossip.once("process", (ring) => {

@@ -562,3 +670,3 @@ assert.ok(gossip.ring().isDefined(node2));

gossip.kernel().connection(node2).once("send", (msg, data) => {
var inner = JSON.parse(Buffer.from(data.data.data)).data;
var inner = data.data.data;
assert.equal(inner.type, "update");

@@ -593,3 +701,3 @@ assert.equal(inner.actor, gossip._actor);

});
gossip.minsert([node2], true);
gossip.minsert([node2], 3, true);
assert.equal(gossip.ring().size(), 6);

@@ -600,3 +708,3 @@ assert.ok(gossip._actor);

gossip.kernel().connection(node2).once("send", (msg, data) => {
var inner = JSON.parse(Buffer.from(data.data.data)).data;
var inner = data.data.data;
assert.equal(inner.type, "update");

@@ -624,3 +732,3 @@ assert.equal(inner.actor, gossip._actor);

var oldClock = gossip.vclock();
gossip.insert(node2);
gossip.insert(node2, 3);
gossip.once("send", (clock, event, msg) => {

@@ -652,3 +760,3 @@ Object.keys(clock._vector).forEach((key) => {

var oldClock = gossip.vclock();
gossip.insert(node2);
gossip.insert(node2, 3);
gossip.once("send", (clock, event, msg) => {

@@ -683,3 +791,3 @@ Object.keys(clock._vector).forEach((key) => {

var oldClock = gossip.vclock();
gossip.insert(node2);
gossip.insert(node2, 3);
gossip.once("send", (clock, event, msg) => {

@@ -716,3 +824,3 @@ Object.keys(clock._vector).forEach((key) => {

var node2 = new Node("id2", host, port+1);
gossip.insert(node2);
gossip.insert(node2, 3);
gossip.once("process", (ring) => {

@@ -726,4 +834,2 @@ assert.equal(gossip.ring().size(), 3);

});
var conn = gossip.kernel().connection(node2);
conn.once("idle", done);
gossip.remove(node2);

@@ -734,2 +840,3 @@ assert.equal(gossip.ring().size(), 3);

assert.notOk(gossip.kernel().isConnected(node2));
done();
});

@@ -739,3 +846,3 @@

var node2 = new Node("id2", host, port+1);
gossip.insert(node2);
gossip.insert(node2, 3);
gossip.once("process", (ring) => {

@@ -761,3 +868,3 @@ assert.equal(gossip.ring().size(), 3);

var node2 = new Node("id2", host, port+1);
gossip.insert(node2);
gossip.insert(node2, 3);
gossip.once("process", (ring) => {

@@ -788,3 +895,3 @@ assert.equal(gossip.ring().size(), 3);

var node2 = new Node("id2", host, port+1);
gossip.insert(node2);
gossip.insert(node2, 3);
gossip.once("process", (ring) => {

@@ -808,3 +915,3 @@ assert.equal(gossip.ring().size(), 3);

var node2 = new Node("id2", host, port+1);
gossip.insert(node2);
gossip.insert(node2, 3);
gossip.once("process", (ring) => {

@@ -830,3 +937,3 @@ assert.equal(gossip.ring().size(), 3);

var node2 = new Node("id2", host, port+1);
gossip.insert(node2);
gossip.insert(node2, 3);
gossip.once("process", (ring) => {

@@ -854,2 +961,13 @@ assert.equal(gossip.ring().size(), 3);

it("Should grab range of nodes in ring", function () {
var nodes = gossip.range(Buffer.from("foo"));
assert.lengthOf(nodes, 1);
assert.deepEqual(nodes[0], gossip.ring().nodes()[0]);
var node = new Node("foo", "bar", 8001);
gossip.ring().insert(node);
nodes = gossip.range(Buffer.from("foo"), 1);
assert.lengthOf(nodes, 1);
});
it("Should skip polling ring if no actor exists", function () {

@@ -887,4 +1005,4 @@ sinon.spy(gossip, "sendRing");

it("Should flush state to disk", function () {
sinon.stub(fs, "writeFile", (path, data) => {
it("Should do nothing if failed to flush state to disk", function (done) {
sinon.stub(fs, "writeFile", (path, data, cb) => {
data = JSON.parse(data);

@@ -894,9 +1012,34 @@ assert.equal(data.actor, "foo");

assert.deepEqual(data.vclock, gossip.vclock().toJSON(true));
fs.writeFile.restore();
cb(new Error("foo"));
assert.notOk(fs.rename.called);
fs.rename.restore();
done();
});
sinon.spy(fs, "rename");
gossip._actor = "foo";
gossip.flush();
assert.ok(fs.writeFile.called);
fs.writeFile.restore();
});
it("Should flush state to disk", function (done) {
sinon.stub(fs, "writeFile", (path, data, cb) => {
data = JSON.parse(data);
assert.equal(data.actor, "foo");
assert.deepEqual(data.chash, gossip.ring().toJSON(true));
assert.deepEqual(data.vclock, gossip.vclock().toJSON(true));
cb();
});
sinon.stub(fs, "rename", (oldName, newName, cb) => {
assert.equal(oldName, gossip._flushPath + ".tmp");
assert.equal(newName, gossip._flushPath);
assert.ok(fs.writeFile.called);
fs.writeFile.restore();
fs.rename.restore();
cb();
done();
});
gossip._actor = "foo";
gossip.flush();
});
it("Should skip sending ring externally if n=0", function () {

@@ -919,3 +1062,3 @@ gossip.ring().insert(new Node("id2", host, port+1));

var node2 = new Node("id2", host, port+1);
gossip.insert(node2, true);
gossip.insert(node2, 3, true);
gossip.once("send", (clock, event, msg) => {

@@ -928,3 +1071,3 @@ var ring = (new CHash()).fromJSON(msg);

gossip.kernel().connection(node2).once("send", function (msg, data) {
var parsed = JSON.parse(Buffer.from(data.data.data)).data;
var parsed = data.data.data;
assert.equal(parsed.type, "update");

@@ -1021,7 +1164,6 @@ assert.equal(parsed.actor, gossip._actor);

sinon.spy(gossip, "decodeJob");
var data = Buffer.from(JSON.stringify(chash.toJSON(true)));
var stream = {stream: uuid.v4(), error: {foo: "bar"}, done: true};
var init = Buffer.from("foo");
gossip.streams().set(stream.stream, {data: init});
gossip._parse(data, stream, {});
gossip._parse(null, stream, {});
assert.notOk(gossip.streams().has(stream.stream));

@@ -1053,3 +1195,3 @@ assert.notOk(gossip.decodeJob.called);

gossip._parse(data, stream, {});
gossip._parse(data, {stream: stream.stream, done: true}, {});
gossip._parse(null, {stream: stream.stream, done: true}, {});
assert.notOk(gossip._streams.has(stream.stream));

@@ -1075,3 +1217,3 @@ assert.ok(gossip.emit.called);

gossip._parse(data, stream, {});
gossip._parse(data, {stream: stream.stream, done: true}, {});
gossip._parse(null, {stream: stream.stream, done: true}, {});
assert.notOk(gossip._streams.has(stream.stream));

@@ -1268,3 +1410,3 @@ assert.notOk(gossip.emit.calledWith(["idle"]));

var node2 = new Node("id2", host, port+1);
chash2.insert(node2);
chash2.insert(node2, 3);
var vclock2 = new VectorClock(uuid.v4(), 1);

@@ -1357,3 +1499,3 @@ var data = gossip.decodeJob(Buffer.from(JSON.stringify({

assert.equal(id, oldID);
state = JSON.parse(state).data;
state = state.data;
assert.equal(state.type, "leave");

@@ -1360,0 +1502,0 @@ assert.notEqual(state.actor, gossip._actor);

@@ -69,2 +69,12 @@ var _ = require("lodash"),

it("Should grab IPC object of netkernel", function () {
const ipcRef = kernel._ipc;
assert.deepEqual(ipcRef, kernel.ipc());
});
it("Should set IPC object of netkernel", function () {
kernel.ipc("foo");
assert.equal(kernel.ipc(), "foo");
});
it("Should grab self-referential node of netkernel", function () {

@@ -164,2 +174,11 @@ var node = kernel.self();

it("Should initialize ipc server without ops", function (done) {
kernel.start();
kernel.on("_ready", () => {
assert.equal(kernel._cookie, null);
kernel._ipc.network().delete(kernel.self().id());
done();
});
});
it("Should stop ipc server", function (done) {

@@ -188,2 +207,11 @@ var opts = {retry: 500, maxRetries: false};

});
it("Should reset token", function () {
kernel._index = 0;
kernel._baseToken = "foo";
kernel._resetToken();
assert.equal(kernel._index, 0);
assert.isString(kernel._baseToken);
assert.notEqual(kernel._baseToken, "foo");
});
});

@@ -311,2 +339,7 @@

it("Should skip connecting to node if node is self, no callback", function () {
kernel.connect(kernel.self());
assert.equal(kernel.sinks().size, 0);
});
it("Should skip connecting to node if already connected", function (done) {

@@ -415,14 +448,70 @@ var node = nKernel.self();

it("Should send ping command to another node", function (done) {
nKernel._ipc.server.on("ping", (data, socket) => {
assert.deepEqual(data.from, kernel.self().toJSON(true));
});
kernel.ping(nKernel.self(), (err, out) => {
assert.notOk(err);
assert.equal(out, "pong");
done();
});
});
it("Should immeidately return if pinging local node", function (done) {
sinon.spy(kernel, "_generateToken");
kernel.ping(kernel.self(), (err, out) => {
assert.notOk(err);
assert.equal(out, "pong");
assert.equal(kernel._generateToken.called, false);
kernel._generateToken.restore();
done();
});
});
it("Should fail to ping if no node connection found", function (done) {
kernel.ping(new Node("not_known"), (err) => {
assert.ok(err);
done();
});
});
it("Should fail to make ping to external node if connection shutdown", function (done) {
var conn = kernel.connection(nKernel.self());
conn._active = false;
kernel.ping(nKernel.self(), (err, out) => {
assert.ok(err);
conn._active = true;
done();
});
});
it("Should fail to make ping if cookies mismatch", function () {
sinon.spy(kernel, "_skipMsg");
kernel.cookie("foo");
kernel._ipc.server.emit("ping", {
tag: "bar",
from: {},
checkSum: "foo"
});
assert.equal(kernel._skipMsg.called, true);
kernel._skipMsg.restore();
kernel._cookie = null;
});
it("Should fail to make ping if unknown node", function () {
sinon.spy(kernel, "_sendData");
kernel._ipc.server.emit("ping", {
tag: "bar",
from: {id: "unknown"}
});
assert.equal(kernel._sendData.called, false);
kernel._sendData.restore();
});
it("Should make async call to internal node with buffer", function (done) {
var data = Buffer.from("hello");
var acc = Buffer.from("");
var job = uuid.v4();
kernel.on(job, (data, stream, from) => {
if (!stream.done) {
acc = Buffer.concat([acc, data], acc.length + data.length);
}
else {
assert.equal(Buffer.compare(acc, Buffer.from("hello")), 0);
done();
}
kernel.on(job, (out, stream, from) => {
assert.equal(Buffer.compare(out, Buffer.from("hello")), 0);
done();
});

@@ -454,12 +543,6 @@ kernel.cast(kernel.self(), job, data);

var data = Buffer.from("hello");
var acc = Buffer.from("");
var job = uuid.v4();
nKernel.on(job, (data, stream, from) => {
if (!stream.done) {
acc = Buffer.concat([acc, data], acc.length + data.length);
}
else {
assert.equal(Buffer.compare(acc, Buffer.from("hello")), 0);
done();
}
nKernel.on(job, (out, stream, from) => {
assert.equal(Buffer.compare(out, Buffer.from("hello")), 0);
done();
});

@@ -516,15 +599,55 @@ kernel.cast(nKernel.self(), job, data);

it("Should make sync call to internal node with input buffer, return callback", function (done) {
var data = Buffer.from("hello");
var exp = Buffer.from("world");
var job = uuid.v4();
kernel.on(job, (out, strm, from) => {
assert.equal(Buffer.compare(out, Buffer.from("hello")), 0);
const pstream = new stream.PassThrough();
async.nextTick(() => {
pstream.write(exp);
pstream.end();
});
kernel.cast(from.node, from.tag, pstream);
});
kernel.call(kernel.self(), job, data, (err, out) => {
assert.notOk(err);
assert.equal(Buffer.compare(out, exp), 0);
done();
});
});
it("Should fail to make sync call to unknown node", function (done) {
var data = Buffer.from("hello");
var job = uuid.v4();
kernel.call(new Node("id_unknown", "localhost", 0), job, data, (err, out) => {
assert.ok(err);
done();
});
});
it("Should fail to make sync call to external node if connection shutdown", function (done) {
var data = Buffer.from("hello");
var job = uuid.v4();
var conn = kernel.connection(nKernel.self());
conn._active = false;
kernel.call(nKernel.self(), job, data, (err, out) => {
assert.ok(err);
conn._active = true;
done();
});
});
it("Should make sync call to external node with input buffer, return callback", function (done) {
var data = Buffer.from("hello");
var exp = Buffer.from("world");
var acc = Buffer.from("");
var job = uuid.v4();
nKernel.on(job, (data, stream, from) => {
if (!stream.done) {
acc = Buffer.concat([acc, data], acc.length + data.length);
}
else {
assert.equal(Buffer.compare(acc, Buffer.from("hello")), 0);
nKernel.cast(from.node, from.tag, exp);
}
nKernel.on(job, (out, strm, from) => {
assert.equal(Buffer.compare(out, Buffer.from("hello")), 0);
const pstream = new stream.PassThrough();
async.nextTick(() => {
pstream.write(exp);
pstream.end();
});
nKernel.cast(from.node, from.tag, pstream);
});

@@ -541,12 +664,11 @@ kernel.call(nKernel.self(), job, data, (err, out) => {

var exp = Buffer.from("world");
var acc = Buffer.from("");
var job = uuid.v4();
nKernel.on(job, (data, stream, from) => {
if (!stream.done) {
acc = Buffer.concat([acc, data], acc.length + data.length);
}
else {
assert.equal(Buffer.compare(acc, Buffer.from("hello")), 0);
nKernel.cast(from.node, from.tag, exp);
}
nKernel.on(job, (out, strm, from) => {
assert.equal(Buffer.compare(out, Buffer.from("hello")), 0);
const pstream = new stream.PassThrough();
async.nextTick(() => {
pstream.write(exp);
pstream.end();
});
nKernel.cast(from.node, from.tag, pstream);
});

@@ -619,19 +741,64 @@ var rstream = kernel.call(nKernel.self(), job, data);

it("Should make sycn callSingleton to internal node with input buffer", function (done) {
var data = Buffer.from("hello");
var exp = Buffer.from("world");
var job = uuid.v4();
kernel.on(job, (out, strm, from) => {
assert.equal(Buffer.compare(out, Buffer.from("hello")), 0);
kernel.cast(from.node, from.tag, exp);
});
kernel.callSingleton(kernel.self(), job, data, (err, out) => {
assert.notOk(err);
assert.equal(Buffer.compare(out, exp), 0);
done();
});
});
it("Should fail to make sync callSingleton to unknown node", function (done) {
var data = Buffer.from("hello");
var job = uuid.v4();
kernel.callSingleton(new Node("id_unknown", "localhost", 0), job, data, (err, out) => {
assert.ok(err);
done();
});
});
it("Should fail to make sync callSingleton to external node if connection shutdown", function (done) {
var data = Buffer.from("hello");
var job = uuid.v4();
var conn = kernel.connection(nKernel.self());
conn._active = false;
kernel.callSingleton(nKernel.self(), job, data, (err, out) => {
assert.ok(err);
conn._active = true;
done();
});
});
it("Should make sync callSingleton to external node with input buffer", function (done) {
var data = Buffer.from("hello");
var exp = Buffer.from("world");
var job = uuid.v4();
nKernel.on(job, (out, strm, from) => {
assert.equal(Buffer.compare(out, Buffer.from("hello")), 0);
nKernel.cast(from.node, from.tag, exp);
});
kernel.callSingleton(nKernel.self(), job, data, (err, out) => {
assert.notOk(err);
assert.equal(Buffer.compare(out, exp), 0);
done();
});
});
it("Should return an error if return stream errors", function (done) {
var data = Buffer.from("hello");
var acc = Buffer.from("");
var job = uuid.v4();
nKernel.on(job, (data, rcvStream, from) => {
if (!rcvStream.done) {
acc = Buffer.concat([acc, data], acc.length + data.length);
}
else {
assert.equal(Buffer.compare(acc, Buffer.from("hello")), 0);
var rstream = new stream.PassThrough();
async.nextTick(() => {
rstream.emit("error", new Error("foo"));
rstream.end();
});
nKernel.cast(from.node, from.tag, rstream);
}
nKernel.on(job, (out, rcvStream, from) => {
assert.equal(Buffer.compare(out, Buffer.from("hello")), 0);
var rstream = new stream.PassThrough();
async.nextTick(() => {
rstream.emit("error", new Error("foo"));
rstream.end();
});
nKernel.cast(from.node, from.tag, rstream);
});

@@ -647,19 +814,14 @@ kernel.call(nKernel.self(), job, data, (err, out) => {

var exp = Buffer.from("world");
var acc = Buffer.from("");
var job = uuid.v4();
nKernel.on(job, (data, stream, from) => {
if (!stream.done) {
acc = Buffer.concat([acc, data], acc.length + data.length);
nKernel.on(job, (out, stream, from) => {
assert.equal(Buffer.compare(out, Buffer.from("hello")), 0);
try {
out = nKernel.reply({node: from.node, tag: null}, exp);
} catch (e) {
out = e;
}
else {
assert.equal(Buffer.compare(acc, Buffer.from("hello")), 0);
var out;
try {
out = nKernel.reply({node: from.node, tag: null}, exp);
} catch (e) {
out = e;
}
assert(out instanceof Error);
assert(out instanceof Error);
async.nextTick(() => {
nKernel.reply(from, exp);
}
});
});

@@ -676,12 +838,8 @@ kernel.call(nKernel.self(), job, data, (err, out) => {

var exp = Buffer.from("world");
var acc = Buffer.from("");
var job = uuid.v4();
nKernel.on(job, (data, stream, from) => {
if (!stream.done) {
acc = Buffer.concat([acc, data], acc.length + data.length);
}
else {
assert.equal(Buffer.compare(acc, Buffer.from("hello")), 0);
nKernel.on(job, (out, stream, from) => {
assert.equal(Buffer.compare(out, Buffer.from("hello")), 0);
async.nextTick(() => {
nKernel.reply(from, exp);
}
});
});

@@ -695,21 +853,11 @@ kernel.call(nKernel.self(), job, data, (err, out) => {

it("Should make sync call to multiple external nodes with buffer", function (done) {
it("Should make sync call to multiple external nodes with buffer, with callbacks", function (done) {
var data = Buffer.from("hello");
var exp = Buffer.from("world");
var acc = Buffer.from("");
var job = uuid.v4();
var streams = new Map();
nKernel.on(job, (data, stream, from) => {
if (!streams.has(stream.stream)) {
streams.set(stream.stream, Buffer.from(""));
}
var inner = streams.get(stream.stream);
if (!stream.done) {
inner = Buffer.concat([inner, data], inner.length + data.length);
streams.set(stream.stream, inner);
}
else {
assert.equal(Buffer.compare(inner, Buffer.from("hello")), 0);
nKernel.on(job, (out, stream, from) => {
assert.equal(Buffer.compare(out, Buffer.from("hello")), 0);
async.nextTick(() => {
nKernel.reply(from, exp);
}
});
});

@@ -726,2 +874,28 @@ kernel.multicall([nKernel.self(), nKernel.self()], job, data, (err, out) => {

it("Should make sync call to multiple external nodes with buffer, with array of streams", function (done) {
var data = Buffer.from("hello");
var exp = Buffer.from("world");
var job = uuid.v4();
nKernel.on(job, (out, stream, from) => {
assert.equal(Buffer.compare(out, Buffer.from("hello")), 0);
async.nextTick(() => {
nKernel.reply(from, exp);
});
});
var rStreams = kernel.multicall([nKernel.self(), nKernel.self()], job, data);
assert.lengthOf(rStreams, 2);
let count = rStreams.length;
rStreams.forEach((rstream) => {
let acc = Buffer.from("");
rstream.on("data", (data) => {
acc = Buffer.concat([acc, data]);
});
rstream.on("end", () => {
count--;
assert.equal(Buffer.compare(acc, exp), 0);
if (count === 0) done();
});
});
});
it("Should stream data locally", function (done) {

@@ -965,2 +1139,17 @@ var pstream = new stream.PassThrough();

it("Should send data externally w/o stream, but fail with 'error'", function () {
var node = nKernel.self();
var job = uuid.v4();
var buf = Buffer.from("foo");
var tag = "tag";
var istream = {stream: uuid.v4(), done: true};
var conn = kernel.connection(node);
sinon.stub(conn, "send", () => {
conn.send.restore();
return new Error("error");
});
var out = kernel._sendData(null, conn, job, tag, istream, buf);
assert.ok(out instanceof Error);
});
it("Should send end externally", function (done) {

@@ -1169,2 +1358,60 @@ var node = nKernel.self();

});
it("Should handle setting up singleton reply", function (done) {
var node = kernel.self();
var tag = uuid.v4();
var from = {
tag: null,
node: node
};
kernel._setupSingletonReply(node, tag, Infinity, (err, res) => {
assert.notOk(err);
assert.equal(res, "bar");
assert.equal(kernel.listeners(tag), 0);
done();
});
async.nextTick(() => {
kernel.emit(tag, "bar", {}, from);
});
});
it("Should handle triggering timeout on singleton reply", function (done) {
var node = kernel.self();
var tag = uuid.v4();
kernel._setupSingletonReply(node, tag, 0, (err, res) => {
assert.ok(err);
assert.equal(kernel.listeners(tag), 0);
async.nextTick(done);
});
});
it("Should handle returning mismatched node error on singleton reply", function (done) {
var node = kernel.self();
var tag = uuid.v4();
var from = {
tag: null,
node: new Node("not_same", "localhost", 0)
};
kernel._setupSingletonReply(node, tag, Infinity, (err, res) => {
assert.ok(err);
assert.equal(kernel.listeners(tag), 0);
async.nextTick(done);
});
kernel.emit(tag, "bar", {}, from);
});
it("Should handle stream error return on singleton reply", function (done) {
var node = kernel.self();
var tag = uuid.v4();
var from = {
tag: null,
node: node
};
kernel._setupSingletonReply(node, tag, Infinity, (err, res) => {
assert.ok(err);
assert.equal(kernel.listeners(tag), 0);
async.nextTick(done);
});
kernel.emit(tag, null, {error: {}}, from);
});
});

@@ -1171,0 +1418,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc