Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

liferaft

Package Overview
Dependencies
Maintainers
3
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

liferaft - npm Package Compare versions

Comparing version 0.1.0 to 0.1.1

example/tcp.js

16

example/index.js

@@ -34,11 +34,13 @@ 'use strict';

debug('initializing reply socket on port %s', raft.name);
debug('initializing reply socket on port %s', raft.address);
socket = raft.socket = msg.socket('rep');
socket.bind(raft.name);
socket.on('message', raft.emits('data'));
socket.bind(raft.address);
socket.on('message', function (data, fn) {
raft.emit('data', data, fn);
});
socket.on('error', function err() {
debug('failed to initialize on port: ', raft.name);
debug('failed to initialize on port: ', raft.address);
});

@@ -61,9 +63,9 @@ },

socket.connect(raft.name);
socket.connect(raft.address);
socket.on('error', function err() {
console.error('failed to write to: ', raft.name);
console.error('failed to write to: ', raft.address);
});
}
debug('writing packet to socket on port %s', raft.name);
debug('writing packet to socket on port %s', raft.address);
socket.send(packet, function (data) {

@@ -70,0 +72,0 @@ fn(undefined, data);

@@ -5,2 +5,3 @@ 'use strict';

, Tick = require('tick-tock')
, ms = require('millisecond')
, one = require('one-time');

@@ -35,3 +36,3 @@

/**
* Representation of a single node in the cluster.
* Representation of a single raft node in the cluster.
*

@@ -52,13 +53,15 @@ * Options:

* Please note, when adding new options make sure that you also update the
* `Node#join` method so it will correctly copy the new option to the clone as
* `Raft#join` method so it will correctly copy the new option to the clone as
* well.
*
* @constructor
* @param {Mixed} address Unique address, id or name of this given node.
* @param {Object} options Node configuration.
* @param {Mixed} address Unique address, id or name of this given raft node.
* @param {Object} options Raft configuration.
* @api public
*/
function Node(address, options) {
if (!(this instanceof Node)) return new Node(options);
function Raft(address, options) {
var raft = this;
if (!(raft instanceof Raft)) return new Raft(options);
options = options || {};

@@ -69,10 +72,10 @@

this.election = {
min: Tick.parse(options['election min'] || '150 ms'),
max: Tick.parse(options['election max'] || '300 ms')
raft.election = {
min: ms(options['election min'] || '150 ms'),
max: ms(options['election max'] || '300 ms')
};
this.beat = Tick.parse(options.heartbeat || '50 ms');
raft.beat = ms(options.heartbeat || '50 ms');
this.votes = {
raft.votes = {
for: null, // Who did we vote for in this current term.

@@ -82,10 +85,10 @@ granted: 0 // How many votes we're granted to us.

this.write = this.write || options.write || null;
this.threshold = options.threshold || 0.8;
this.address = options.address || UUID();
this.timers = new Tick(this);
this.Log = options.Log;
this.latency = 0;
this.log = null;
this.nodes = [];
raft.write = raft.write || options.write || null;
raft.threshold = options.threshold || 0.8;
raft.address = options.address || UUID();
raft.timers = new Tick(raft);
raft.Log = options.Log;
raft.latency = 0;
raft.log = null;
raft.nodes = [];

@@ -98,11 +101,7 @@ //

//
this.state = options.state || Node.FOLLOWER; // Our current state.
this.leader = ''; // Leader in our cluster.
this.term = 0; // Our current term.
raft.state = options.state || Raft.FOLLOWER; // Our current state.
raft.leader = ''; // Leader in our cluster.
raft.term = 0; // Our current term.
if ('function' === this.type(this.initialize)) {
this.once('initialize', this.initialize);
}
this._initialize(options);
raft._initialize(options);
}

@@ -113,11 +112,16 @@

//
Node.extend = require('extendible');
Node.prototype = new EventEmitter();
Node.prototype.constructor = Node;
Node.prototype.emits = require('emits');
Raft.extend = require('extendible');
Raft.prototype = new EventEmitter();
Raft.prototype.constructor = Raft;
//
// Add some methods which are best done using modules.
//
Raft.prototype.emits = require('emits');
Raft.prototype.change = require('modification')(' change');
/**
* Raft §5.1:
*
* A Node can be in only one of the various states. The stopped state is not
* A Raft can be in only one of the various states. The stopped state is not
* something that is part of the Raft protocol but something we might want to

@@ -136,9 +140,9 @@ * use internally while we're starting or shutting down our node. The following

*/
Node.states = 'STOPPED,LEADER,CANDIDATE,FOLLOWER,CHILD'.split(',');
for (var s = 0; s < Node.states.length; s++) {
Node[Node.states[s]] = s;
Raft.states = 'STOPPED,LEADER,CANDIDATE,FOLLOWER,CHILD'.split(',');
for (var s = 0; s < Raft.states.length; s++) {
Raft[Raft.states[s]] = s;
}
/**
* Initialize the node and start listening to the various of events we're
* Initialize Raft and start listening to the various of events we're
* emitting as we're quite chatty to provide the maximum amount of flexibility

@@ -150,9 +154,11 @@ * and reconfigurability.

*/
Node.prototype._initialize = function initialize(options) {
Raft.prototype._initialize = function initializing(options) {
var raft = this;
//
// Reset our vote as we're starting a new term. Votes only last one term.
//
this.on('term change', function change() {
this.votes.for = null;
this.votes.granted = 0;
raft.on('term change', function change() {
raft.votes.for = null;
raft.votes.granted = 0;
});

@@ -164,6 +170,6 @@

//
this.on('state change', function change(state) {
this.timers.clear('heartbeat, election');
this.heartbeat(Node.LEADER === this.state ? this.beat : this.timeout());
this.emit(Node.states[state].toLowerCase());
raft.on('state change', function change(state) {
raft.timers.clear('heartbeat, election');
raft.heartbeat(Raft.LEADER === raft.state ? raft.beat : raft.timeout());
raft.emit(Raft.states[state].toLowerCase());
});

@@ -174,10 +180,11 @@

//
this.on('data', function incoming(packet, write) {
raft.on('data', function incoming(packet, write) {
write = write || nope;
var reason;
if ('object' !== this.type(packet)) {
if ('object' !== raft.type(packet)) {
reason = 'Invalid packet received';
this.emit('error', new Error(reason));
return write(this.packet('error', reason));
raft.emit('error', new Error(reason));
return write(raft.packet('error', reason));
}

@@ -192,15 +199,16 @@

//
// If the node receives a request with a stale term number it should be
// If the raft receives a request with a stale term number it should be
// rejected.
//
if (packet.term > this.term) {
this.change({
leader: Node.LEADER === packet.state ? packet.address : packet.leader || this.leader,
state: Node.FOLLOWER,
if (packet.term > raft.term) {
raft.change({
leader: Raft.LEADER === packet.state ? packet.address : packet.leader || raft.leader,
state: Raft.FOLLOWER,
term: packet.term
});
} else if (packet.term < this.term) {
reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ this.term;
this.emit('error', new Error(reason));
return write(this.packet('error', reason));
} else if (packet.term < raft.term) {
reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ raft.term;
raft.emit('error', new Error(reason));
return write(raft.packet('error', reason));
}

@@ -218,5 +226,5 @@

//
if (Node.LEADER === packet.state) {
if (Node.FOLLOWER !== this.state) this.change({ state: Node.FOLLOWER });
if (packet.address !== this.leader) this.change({ leader: packet.address });
if (Raft.LEADER === packet.state) {
if (Raft.FOLLOWER !== raft.state) raft.change({ state: Raft.FOLLOWER });
if (packet.address !== raft.leader) raft.change({ leader: packet.address });

@@ -227,3 +235,3 @@ //

//
this.heartbeat(this.timeout());
raft.heartbeat(raft.timeout());
}

@@ -236,3 +244,3 @@

//
// A node asked us to vote on them. We can only vote to them if they
// A raft asked us to vote on them. We can only vote to them if they
// represent a higher term (and last log term, last log index).

@@ -245,5 +253,6 @@ //

//
if (this.votes.for && this.votes.for !== packet.address) {
this.emit('vote', packet, false);
return write(this.packet('voted', { granted: false }));
if (raft.votes.for && raft.votes.for !== packet.address) {
raft.emit('vote', packet, false);
return write(raft.packet('voted', { granted: false }));
}

@@ -258,8 +267,9 @@

//
if (this.log && packet.last && (
this.log.index > packet.last.index
|| this.term > packet.last.term
if (raft.log && packet.last && (
raft.log.index > packet.last.index
|| raft.term > packet.last.term
)) {
this.emit('vote', packet, false);
return write(this.packet('voted', { granted: false }));
raft.emit('vote', packet, false);
return write(raft.packet('voted', { granted: false }));
}

@@ -272,14 +282,14 @@

//
this.votes.for = packet.address;
this.emit('vote', packet, true);
this.change({ leader: packet.address, term: packet.term });
write(this.packet('voted', { granted: true }));
raft.votes.for = packet.address;
raft.emit('vote', packet, true);
raft.change({ leader: packet.address, term: packet.term });
write(raft.packet('voted', { granted: true }));
//
// We've accepted someone as potential new leader, so we should reset
// our heartbeat to prevent this node from timing out after voting.
// our heartbeat to prevent this raft from timing out after voting.
// Which would again increment the term causing us to be next CANDIDATE
// and invalidates the request we just got, so that's silly willy.
//
this.heartbeat(this.timeout());
raft.heartbeat(raft.timeout());
break;

@@ -294,4 +304,4 @@

//
if (Node.CANDIDATE !== this.state) {
return write(this.packet('error', 'No longer a candidate, ignoring vote'));
if (Raft.CANDIDATE !== raft.state) {
return write(raft.packet('error', 'No longer a candidate, ignoring vote'));
}

@@ -301,6 +311,6 @@

// Increment our received votes when our voting request has been
// granted by the node that received the data.
// granted by the raft that received the data.
//
if (packet.data.granted) {
this.votes.granted++;
raft.votes.granted++;
}

@@ -312,4 +322,4 @@

//
if (this.quorum(this.votes.granted)) {
this.change({ leader: this.address, state: Node.LEADER });
if (raft.quorum(raft.votes.granted)) {
raft.change({ leader: raft.address, state: Raft.LEADER });

@@ -319,3 +329,3 @@ //

//
this.message(Node.FOLLOWER, this.packet('append'));
raft.message(Raft.FOLLOWER, raft.packet('append'));
}

@@ -330,3 +340,3 @@

case 'error':
this.emit('error', new Error(packet.data));
raft.emit('error', new Error(packet.data));
break;

@@ -358,6 +368,6 @@

default:
if (this.listeners('rpc').length) {
this.emit('rpc', packet, write);
if (raft.listeners('rpc').length) {
raft.emit('rpc', packet, write);
} else {
write(this.packet('error', 'Unknown message type: '+ packet.type));
write(raft.packet('error', 'Unknown message type: '+ packet.type));
}

@@ -369,25 +379,38 @@ }

// We do not need to execute the rest of the functionality below as we're
// currently running as "child" node of the cluster not as the "root" node.
// currently running as "child" raft of the cluster not as the "root" raft.
//
if (Node.CHILD === this.state) return;
if (Raft.CHILD === raft.state) return raft.emit('initialize');
//
// Setup the log & appends. Assume that if we're given a function log that it
// needs to be initialized as it requires access to our node instance so it
// needs to be initialized as it requires access to our raft instance so it
// can read our information like our leader, state, term etc.
//
if ('function' === this.type(this.Log)) {
this.log = new this.Log(this, options);
if ('function' === raft.type(raft.Log)) {
raft.log = new raft.Log(raft, options);
}
//
// The node is now listening to events so we can start our heartbeat timeout.
// So that if we don't hear anything from a leader we can promote our selfs to
// a candidate state.
//
// We want to call the `initialize` event before starting a heartbeat so
// implementors have some time to start listening for incoming ping packets.
//
this.emit('initialize');
this.heartbeat(this.timeout());
/**
* The raft is now listening to events so we can start our heartbeat timeout.
* So that if we don't hear anything from a leader we can promote our selfs to
* a candidate state.
*
* Start listening listening for heartbeats when implementors are also ready
* with setting up their code.
*
* @api private
*/
function initialize(err) {
if (err) return raft.emit('error', err);
raft.emit('initialize');
raft.heartbeat(raft.timeout());
}
if ('function' === raft.type(raft.initialize)) {
if (raft.initialize.length === 2) return raft.initialize(options, initialize);
raft.initialize(options);
}
initialize();
};

@@ -402,3 +425,3 @@

*/
Node.prototype.type = function type(of) {
Raft.prototype.type = function type(of) {
return Object.prototype.toString.call(of).slice(8, -1).toLowerCase();

@@ -415,3 +438,3 @@ };

*/
Node.prototype.quorum = function quorum(responses) {
Raft.prototype.quorum = function quorum(responses) {
if (!this.nodes.length || !responses) return false;

@@ -428,3 +451,3 @@

*/
Node.prototype.majority = function majority() {
Raft.prototype.majority = function majority() {
return Math.ceil(this.nodes.length / 2) + 1;

@@ -439,8 +462,8 @@ };

* @param {Number} timeout Which timeout should we use.
* @returns {Node}
* @returns {Raft}
* @api public
*/
Node.prototype.indefinitely = function indefinitely(attempt, fn, timeout) {
Raft.prototype.indefinitely = function indefinitely(attempt, fn, timeout) {
var uuid = UUID()
, node = this;
, raft = this;

@@ -455,7 +478,8 @@ (function again() {

var next = one(function force(err, data) {
if (!node.timers) return; // We're been destroyed, ignore all.
if (!raft.timers) return; // We're been destroyed, ignore all.
node.timers.setImmediate(uuid +'@async', function async() {
raft.timers.setImmediate(uuid +'@async', function async() {
if (err) {
node.emit('error', err);
raft.emit('error', err);
return again();

@@ -469,9 +493,9 @@ }

//
// Ensure that the assigned callback has the same context as our node.
// Ensure that the assigned callback has the same context as our raft.
//
attempt.call(node, next);
attempt.call(raft, next);
node.timers.setTimeout(uuid, function timeoutfn() {
raft.timers.setTimeout(uuid, function timeoutfn() {
next(new Error('Timed out, attempting to retry again'));
}, +timeout || node.timeout());
}, +timeout || raft.timeout());
}());

@@ -483,49 +507,26 @@

/**
* Process a change in the node.
* Start or update the heartbeat of the Raft. If we detect that we've received
* a heartbeat timeout we will promote our selfs to a candidate to take over the
* leadership.
*
* @param {Object} changed Data that is changed.
* @returns {Node}
* @param {String|Number} duration Time it would take for the heartbeat to timeout.
* @returns {Raft}
* @api private
*/
Node.prototype.change = function change(changed) {
var changes = ['term', 'leader', 'state']
, currently, previously
, i = 0;
Raft.prototype.heartbeat = function heartbeat(duration) {
var raft = this;
if (!changed) return this;
duration = duration || raft.beat;
for (; i < changes.length; i++) {
if (changes[i] in changed && changed[changes[i]] !== this[changes[i]]) {
currently = changed[changes[i]];
previously = this[changes[i]];
if (raft.timers.active('heartbeat')) {
raft.timers.adjust('heartbeat', duration);
this[changes[i]] = currently;
this.emit(changes[i] +' change', currently, previously);
}
return raft;
}
return this;
};
raft.timers.setTimeout('heartbeat', function heartbeattimeout() {
if (Raft.LEADER !== raft.state) {
raft.emit('heartbeat timeout');
/**
* Start or update the heartbeat of the Node. If we detect that we've received
* a heartbeat timeout we will promote our selfs to a candidate to take over the
* leadership.
*
* @param {String|Number} duration Time it would take for the heartbeat to timeout.
* @returns {Node}
* @api private
*/
Node.prototype.heartbeat = function heartbeat(duration) {
duration = duration || this.beat;
if (this.timers.active('heartbeat')) {
this.timers.adjust('heartbeat', duration);
return this;
}
this.timers.setTimeout('heartbeat', function heartbeattimeout() {
if (Node.LEADER !== this.state) {
this.emit('heartbeat timeout');
return this.promote();
return raft.promote();
}

@@ -540,9 +541,9 @@

//
var packet = this.packet('append');
var packet = raft.packet('append');
this.emit('heartbeat', packet);
this.message(Node.FOLLOWER, packet).heartbeat(this.beat);
raft.emit('heartbeat', packet);
raft.message(Raft.FOLLOWER, packet).heartbeat(raft.beat);
}, duration);
return this;
return raft;
};

@@ -554,6 +555,6 @@

*
* - Node.LEADER : Send a message to cluster's current leader.
* - Node.FOLLOWER : Send a message to all non leaders.
* - Node.CHILD : Send a message to everybody.
* - <address> : Send a message to a node based on the address.
* - Raft.LEADER : Send a message to cluster's current leader.
* - Raft.FOLLOWER : Send a message to all non leaders.
* - Raft.CHILD : Send a message to everybody.
* - <address> : Send a message to a raft based on the address.
*

@@ -563,11 +564,20 @@ * @param {Mixed} who Recipient of the message.

* @param {Function} when Completion callback
* @returns {Node}
* @returns {Raft}
* @api public
*/
Node.prototype.message = function message(who, what, when) {
Raft.prototype.message = function message(who, what, when) {
when = when || nope;
var length = this.nodes.length
//
// If the "who" is undefined, the developer made an error somewhere. Tell them!
//
if (typeof who === 'undefined') {
throw new Error('Cannot send message to `undefined`. Check your spelling!');
}
var output = { errors: {}, results: {} }
, length = this.nodes.length
, errors = false
, latency = []
, node = this
, raft = this
, nodes = []

@@ -577,21 +587,21 @@ , i = 0;

switch (who) {
case Node.LEADER: for (; i < length; i++)
if (node.leader === node.nodes[i].address) {
nodes.push(node.nodes[i]);
case Raft.LEADER: for (; i < length; i++)
if (raft.leader === raft.nodes[i].address) {
nodes.push(raft.nodes[i]);
}
break;
case Node.FOLLOWER: for (; i < length; i++)
if (node.leader !== node.nodes[i].address) {
nodes.push(node.nodes[i]);
case Raft.FOLLOWER: for (; i < length; i++)
if (raft.leader !== raft.nodes[i].address) {
nodes.push(raft.nodes[i]);
}
break;
case Node.CHILD:
Array.prototype.push.apply(nodes, node.nodes);
case Raft.CHILD:
Array.prototype.push.apply(nodes, raft.nodes);
break;
default: for (; i < length; i++)
if (who === node.nodes[i].address) {
nodes.push(node.nodes[i]);
if (who === raft.nodes[i].address) {
nodes.push(raft.nodes[i]);
}

@@ -603,3 +613,3 @@ }

*
* @param {Node} client Node we need to write a message to.
* @param {Raft} client Raft we need to write a message to.
* @param {Object} data Message that needs to be send.

@@ -615,2 +625,13 @@ * @api private

//
// Add the error or output to our `output` object to be
// passed to the callback when all the writing is done.
//
if (err) {
errors = true;
output.errors[client.address] = err;
} else {
output.results[client.address] = data;
}
//
// OK, so this is the strange part here. We've broadcasted messages and

@@ -621,4 +642,4 @@ // got replies back. This reply contained data so we need to process it.

//
if (err) node.emit('error', err);
else if (data) node.emit('data', data);
if (err) raft.emit('error', err);
else if (data) raft.emit('data', data);

@@ -629,3 +650,6 @@ //

if (latency.length === length) {
node.timing(latency);
raft.timing(latency);
when(errors ? output.errors : undefined, output.results);
latency.length = nodes.length = 0;
output = null;
}

@@ -642,3 +666,3 @@ });

return node;
return raft;
};

@@ -652,3 +676,3 @@

*/
Node.prototype.timeout = function timeout() {
Raft.prototype.timeout = function timeout() {
var times = this.election;

@@ -667,13 +691,17 @@

*/
Node.prototype.timing = function timing(latency) {
if (Node.STOPPED === this.state) return false;
Raft.prototype.timing = function timing(latency) {
var raft = this
, sum = 0
, i = 0;
for (var i = 0, sum = 0; i < latency.length; i++) {
if (Raft.STOPPED === raft.state) return false;
for (; i < latency.length; i++) {
sum += latency[i];
}
this.latency = Math.floor(sum / latency.length);
raft.latency = Math.floor(sum / latency.length);
if (this.latency > this.election.min * this.threshold) {
this.emit('threshold');
if (raft.latency > raft.election.min * raft.threshold) {
raft.emit('threshold');
}

@@ -689,11 +717,13 @@

* election for leadership. We increment our current term, set the CANDIDATE
* state, vote our selfs and ask all others nodes to vote for us.
* state, vote our selfs and ask all others rafts to vote for us.
*
* @returns {Node}
* @returns {Raft}
* @api private
*/
Node.prototype.promote = function promote() {
this.change({
state: Node.CANDIDATE, // We're now a candidate,
term: this.term + 1, // but only for this term.
Raft.prototype.promote = function promote() {
var raft = this;
raft.change({
state: Raft.CANDIDATE, // We're now a candidate,
term: raft.term + 1, // but only for this term.
leader: '' // We no longer have a leader.

@@ -704,26 +734,26 @@ });

// Candidates are always biased and vote for them selfs first before sending
// out a voting request to all other nodes in the cluster.
// out a voting request to all other rafts in the cluster.
//
this.votes.for = this.address;
this.votes.granted = 1;
raft.votes.for = raft.address;
raft.votes.granted = 1;
//
// Broadcast the voting request to all connected nodes in your private
// Broadcast the voting request to all connected rafts in your private
// cluster.
//
var packet = this.packet('vote')
var packet = raft.packet('vote')
, i = 0;
this.message(Node.FOLLOWER, this.packet('vote'));
raft.message(Raft.FOLLOWER, raft.packet('vote'));
//
// Set the election timeout. This gives the nodes some time to reach
// Set the election timeout. This gives the rafts some time to reach
// consensuses about who they want to vote for. If no consensus has been
// reached within the set timeout we will attempt it again.
//
this.timers
raft.timers
.clear('heartbeat, election')
.setTimeout('election', this.promote, this.timeout());
.setTimeout('election', raft.promote, raft.timeout());
return this;
return raft;
};

@@ -739,14 +769,15 @@

*/
Node.prototype.packet = function wrap(type, data) {
var packet = {
state: this.state, // So you know if we're a leader, candidate or follower.
term: this.term, // Our current term so we can find mis matches.
address: this.address, // Adress of the sender.
type: type, // Message type.
leader: this.leader, // Who is our leader.
};
Raft.prototype.packet = function wrap(type, data) {
var raft = this
, packet = {
state: raft.state, // Are we're a leader, candidate or follower.
term: raft.term, // Our current term so we can find mis matches.
address: raft.address, // Address of the sender.
type: type, // Message type.
leader: raft.leader, // Who is our leader.
};
//
// If we have logging and state replication enabled we also need to send this
// additional data so we can use it determine the state of this node.
// additional data so we can use it determine the state of this raft.
//

@@ -756,3 +787,3 @@ // @TODO point to index of last commit entry.

//
if (this.log) packet.last = { term: this.term, index: this.log.index };
if (raft.log) packet.last = { term: raft.term, index: raft.log.index };
if (arguments.length === 2) packet.data = data;

@@ -769,15 +800,16 @@

* @param {Object} options Configuration that should override the default config.
* @returns {Node} The newly created instance.
* @returns {Raft} The newly created instance.
* @api public
*/
Node.prototype.clone = function clone(options) {
Raft.prototype.clone = function clone(options) {
options = options || {};
var node = {
'Log': this.Log,
'election max': this.election.max,
'election min': this.election.min,
'heartbeat': this.beat,
'threshold': this.threshold,
}, key;
var raft = this
, node = {
'Log': raft.Log,
'election max': raft.election.max,
'election min': raft.election.min,
'heartbeat': raft.beat,
'threshold': raft.threshold,
}, key;

@@ -790,16 +822,18 @@ for (key in node) {

return new this.constructor(options);
return new raft.constructor(options);
};
/**
* A new node is about to join the cluster. So we need to upgrade the
* configuration of every single node.
* A new raft is about to join the cluster. So we need to upgrade the
* configuration of every single raft.
*
* @param {String} address The address of the node that is connected.
* @param {String} address The address of the raft that is connected.
* @param {Function} write A method that we use to write data.
* @returns {Node} The node we created and that joined our cluster.
* @returns {Raft} The raft we created and that joined our cluster.
* @api public
*/
Node.prototype.join = function join(address, write) {
if ('function' === this.type(address)) {
Raft.prototype.join = function join(address, write) {
var raft = this;
if ('function' === raft.type(address)) {
write = address; address = null;

@@ -811,18 +845,18 @@ }

// add a really simple address check here. Return nothing so people can actually
// check if a node has been added.
// check if a raft has been added.
//
if (this.address === address) return;
if (raft.address === address) return;
var node = this.clone({
var node = raft.clone({
write: write, // Optional function that receives our writes.
address: address, // A custom address for the node we added.
state: Node.CHILD // We are a node in the cluster.
address: address, // A custom address for the raft we added.
state: Raft.CHILD // We are a raft in the cluster.
});
node.once('end', function end() {
this.leave(node);
}, this);
raft.leave(node);
}, raft);
this.nodes.push(node);
this.emit('join', node);
raft.nodes.push(node);
raft.emit('join', node);

@@ -833,15 +867,16 @@ return node;

/**
* Remove a node from the cluster.
* Remove a raft from the cluster.
*
* @param {String} address The address of the node that should be removed.
* @returns {Node} The node that we removed.
* @param {String} address The address of the raft that should be removed.
* @returns {Raft} The raft that we removed.
* @api public
*/
Node.prototype.leave = function leave(address) {
var index = -1
Raft.prototype.leave = function leave(address) {
var raft = this
, index = -1
, node;
for (var i = 0; i < this.nodes.length; i++) {
if (this.nodes[i] === address || this.nodes[i].address === address) {
node = this.nodes[i];
for (var i = 0; i < raft.nodes.length; i++) {
if (raft.nodes[i] === address || raft.nodes[i].address === address) {
node = raft.nodes[i];
index = i;

@@ -853,6 +888,6 @@ break;

if (~index && node) {
raft.nodes.splice(index, 1);
if (node.end) node.end();
this.nodes.splice(index, 1);
this.emit('leave', node);
raft.emit('leave', node);
}

@@ -864,3 +899,3 @@

/**
* This Node needs to be shut down.
* This Raft needs to be shut down.
*

@@ -870,16 +905,18 @@ * @returns {Boolean} Successful destruction.

*/
Node.prototype.end = Node.prototype.destroy = function end() {
if (Node.STOPPED === this.state) return false;
this.state = Node.STOPPED;
Raft.prototype.end = Raft.prototype.destroy = function end() {
var raft = this;
if (this.nodes.length) for (var i = 0; i < this.nodes.length; i++) {
this.leave(this.nodes[i]);
if (Raft.STOPPED === raft.state) return false;
raft.state = Raft.STOPPED;
if (raft.nodes.length) for (var i = 0; i < raft.nodes.length; i++) {
raft.leave(raft.nodes[i]);
}
this.emit('end');
this.timers.end();
this.removeAllListeners();
raft.emit('end');
raft.timers.end();
raft.removeAllListeners();
if (this.log) this.log.end();
this.timers = this.log = this.Log = this.beat = this.election = null;
if (raft.log) raft.log.end();
raft.timers = raft.log = raft.Log = raft.beat = raft.election = null;

@@ -892,2 +929,2 @@ return true;

//
module.exports = Node;
module.exports = Raft;
{
"name": "liferaft",
"version": "0.1.0",
"version": "0.1.1",
"description": "Consensus protocol based on raft, it will one day save your live.",
"main": "index.js",
"scripts": {
"test": "mocha --reporter spec --ui bdd test.js",
"watch": "mocha --watch --reporter spec --ui bdd test.js",
"coverage": "istanbul cover ./node_modules/.bin/_mocha -- --reporter spec --ui bdd test.js",
"test-travis": "istanbul cover node_modules/.bin/_mocha --report lcovonly -- --reporter spec --ui bdd test.js"
"100%": "istanbul check-coverage --statements 100 --functions 100 --lines 100 --branches 100",
"test": "mocha test.js",
"watch": "mocha --watch test.js",
"coverage": "istanbul cover ./node_modules/.bin/_mocha -- test.js",
"test-travis": "istanbul cover node_modules/.bin/_mocha --report lcovonly -- test.js"
},

@@ -28,16 +29,18 @@ "repository": {

"dependencies": {
"emits": "1.0.x",
"eventemitter3": "0.1.x",
"emits": "3.0.x",
"eventemitter3": "1.1.x",
"extendible": "0.1.x",
"immediate": "3.0.x",
"immediate": "3.2.x",
"millisecond": "0.0.x",
"modification": "1.0.x",
"one-time": "0.0.x",
"tick-tock": "0.0.x"
"tick-tock": "0.1.x"
},
"devDependencies": {
"assume": "0.0.x",
"assume": "1.2.x",
"diagnostics": "0.0.x",
"istanbul": "0.3.x",
"mocha": "2.0.x",
"pre-commit": "0.0.x"
"mocha": "2.2.x",
"pre-commit": "1.0.x"
}
}

@@ -35,3 +35,3 @@ # liferaft

- [Extending](#extending)
- [Initialization](#initialization)
- [Transports](#transports)
- [License](#license)

@@ -89,2 +89,14 @@

```js
var raft = new Raft({
'address': 'tcp://localhost:8089',
'election min': '200 millisecond',
'election max': '1 second'
});
```
As you might have noticed we're using two different styles of passing in the
address to the raft instance, as address property in the options and as first
argument in the constructor.
### Events

@@ -178,3 +190,3 @@

According to section x.x of the Raft paper it's required that we retry sending
According to section 5.3 of the Raft paper it's required that we retry sending
the RPC messages until they succeed. This function will run the given `attempt`

@@ -340,13 +352,55 @@ function until the received callback has been called successfully and within our

### Initialization
## Transports
The library ships without transports by default. If we we're to implement this
it would have made this library way to opinionated. You might want to leverage
and existing infrastructure or library for messaging instead of going with our
solution. There are only two methods you need to implement an `initialize`
method and an `write` method. Both methods serve different use cases so we're
going to take a closer look at both of them.
### write
```js
var LifeBoat = LifeRaft.extend({
socket: null,
write: function write(packet, callback) {
if (!this.socket) this.socket = require('net').connect(this.address);
this.socket.write(JSON.stringify(packet));
// More code here ;-)
}
});
```
There are a couple of things that we assume you implement in the write
method:
- **Message encoding** The packet that you receive is an JSON object but you
have to decide how you're going transfer that over the write in the most
efficient way for you.
- **message resending** The Raft protocol states the messages that you write
should be retried until indefinitely ([Raft 5.3][5.3]). There are already
transports which do this automatically for you but if your's is missing this,
the [LifeRaft#indefinitely()](#liferaftindefinitelyattempt-fn-timeout) is
specifically written for this.
### initialize
When you extend the `LifeRaft` instance you can assign a special `initialize`
method. This method will be called when our `LifeRaft` code has been fully
initialized and we're ready to initialize your code. Please bare in mind that
this is a synchronous invocation and that we will start heart beat timeout after
the execution of the function. This method is ideal for implementing your own
transport technology. The function is invoked with one argument, these are the
options that were used to construct the instance. If no options were provided we
will default to empty object so this argument is always an available.
initialized and we're ready to initialize your code. The invocation type depends
on the amount of arguments you specify in the function.
- **synchronous**: Your function specifies less then 2 arguments, it will
receive one argument which is the options object that was provided in the
constructor. If no options were provided it will be an empty object.
- **asynchronous**: Your function specifies 2 arguments, just like the
synchronous execution it will receive the passed options as first argument but
it will also receive a callback function as second argument. This callback
should be executed once you're done with setting up your transport and you are
ready to receive messages. The function follows an error first pattern so it
receives an error as first argument it will emit the `error` event on the
constructed instance.
```js

@@ -359,6 +413,27 @@ var LifeBoat = LifeRaft.extend({

});
//
// Or in async mode:
//
var LifeBoat = LifeRaft.extend({
server: null,
initialize: function initialize(options, fn) {
this.server = require('net').createServer(function () {
// Do stuff here to handle incoming connections etc.
}.bind(this));
var next = require('one-time')(fn);
this.server.once('listening', next);
this.server.once('error', next);
this.server.listen(this.address);
}
})
```
In parallel to the execution of your `initialize` method we also emit an
`initialize` event. This receives the same amount of arguments.
After your `initialize` method is called we will emit the `initialize` event. If
your `initialize` method is asynchronous we will emit the event **after** the
callback has been executed. Once the event is emitted we will start our timeout
timers and hope that we will receive message in time.

@@ -371,1 +446,13 @@ ## License

[Browserify]: http://browserify.org/
[5]: https://github.com/unshiftio/liferaft/blob/master/raft.md#5-the-raft-consensus-algorithm
[5.1]: https://github.com/unshiftio/liferaft/blob/master/raft.md#51-raft-basics
[5.2]: https://github.com/unshiftio/liferaft/blob/master/raft.md#52-leader-election
[5.3]: https://github.com/unshiftio/liferaft/blob/master/raft.md#53-log-replication
[5.4]: https://github.com/unshiftio/liferaft/blob/master/raft.md#54-safety
[5.4.1]: https://github.com/unshiftio/liferaft/blob/master/raft.md#541-election-restriction
[5.4.2]: https://github.com/unshiftio/liferaft/blob/master/raft.md#542-committing-entries-from-previous-terms
[5.4.3]: https://github.com/unshiftio/liferaft/blob/master/raft.md#543-safety-argument
[5.5]: https://github.com/unshiftio/liferaft/blob/master/raft.md#55-follower-and-candidate-crashes
[5.6]: https://github.com/unshiftio/liferaft/blob/master/raft.md#56-timing-and-availability
[6]: https://github.com/unshiftio/liferaft/blob/master/raft.md#6-cluster-membership-changes
[7]: https://github.com/unshiftio/liferaft/blob/master/raft.md#7-log-compaction

@@ -94,2 +94,44 @@ /* istanbul ignore next */

});
it('async emits the initialize event once the initialize method is done', function (next) {
var ready = false;
var MyRaft = Raft.extend({
initialize: function initialize(options, init) {
assume(options.custom).equals('options');
assume(ready).is.false();
setTimeout(function () {
ready = true;
init();
}, 100);
}
});
var raft = new MyRaft('foobar', { custom: 'options' });
raft.on('initialize', function () {
assume(ready).is.true();
next();
});
});
it('emits error when the initialize fails', function (next) {
var MyRaft = Raft.extend({
initialize: function initialize(options, init) {
setTimeout(function () {
init(new Error('Failure'));
}, 100);
}
});
var raft = new MyRaft();
raft.on('error', function (err) {
assume(err.message).equals('Failure');
next();
});
});
});

@@ -181,2 +223,34 @@

});
it('throws an error on undefined message', function () {
assume(function () {
raft.message(undefined, raft.packet('foo'));
}).throws('Cannot send message to `undefined`');
});
it('runs the `when` callback with no errors', function (next) {
var node = raft.join(function (data, callback) {
callback(undefined, 'foo');
});
node.address = 'addr';
raft.message(Raft.FOLLOWER, raft.packet('foo'), function (err, data) {
assume(err).equals(undefined);
assume(data).deep.equals({ addr: 'foo' });
next();
});
});
it('runs the `when` callback with no errors', function (next) {
var node = raft.join(function (data, callback) {
callback('bar');
});
node.address = 'addr';
raft.message(Raft.FOLLOWER, raft.packet('foo'), function (err, data) {
assume(err).deep.equals({ addr: 'bar' });
assume(data).deep.equals({});
next();
});
});
});

@@ -958,2 +1032,15 @@

});
describe('bugs', function () {
it('correctly deletes nodes from the list on leave', function () {
raft.join('1');
raft.join('2');
raft.join('3');
assume(raft.nodes.length).equals(3);
raft.leave('2');
assume(raft.nodes.length).equals(2);
});
});
});

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc