Comparing version 0.1.0 to 0.1.1
@@ -34,11 +34,13 @@ 'use strict'; | ||
debug('initializing reply socket on port %s', raft.name); | ||
debug('initializing reply socket on port %s', raft.address); | ||
socket = raft.socket = msg.socket('rep'); | ||
socket.bind(raft.name); | ||
socket.on('message', raft.emits('data')); | ||
socket.bind(raft.address); | ||
socket.on('message', function (data, fn) { | ||
raft.emit('data', data, fn); | ||
}); | ||
socket.on('error', function err() { | ||
debug('failed to initialize on port: ', raft.name); | ||
debug('failed to initialize on port: ', raft.address); | ||
}); | ||
@@ -61,9 +63,9 @@ }, | ||
socket.connect(raft.name); | ||
socket.connect(raft.address); | ||
socket.on('error', function err() { | ||
console.error('failed to write to: ', raft.name); | ||
console.error('failed to write to: ', raft.address); | ||
}); | ||
} | ||
debug('writing packet to socket on port %s', raft.name); | ||
debug('writing packet to socket on port %s', raft.address); | ||
socket.send(packet, function (data) { | ||
@@ -70,0 +72,0 @@ fn(undefined, data); |
559
index.js
@@ -5,2 +5,3 @@ 'use strict'; | ||
, Tick = require('tick-tock') | ||
, ms = require('millisecond') | ||
, one = require('one-time'); | ||
@@ -35,3 +36,3 @@ | ||
/** | ||
* Representation of a single node in the cluster. | ||
* Representation of a single raft node in the cluster. | ||
* | ||
@@ -52,13 +53,15 @@ * Options: | ||
* Please note, when adding new options make sure that you also update the | ||
* `Node#join` method so it will correctly copy the new option to the clone as | ||
* `Raft#join` method so it will correctly copy the new option to the clone as | ||
* well. | ||
* | ||
* @constructor | ||
* @param {Mixed} address Unique address, id or name of this given node. | ||
* @param {Object} options Node configuration. | ||
* @param {Mixed} address Unique address, id or name of this given raft node. | ||
* @param {Object} options Raft configuration. | ||
* @api public | ||
*/ | ||
function Node(address, options) { | ||
if (!(this instanceof Node)) return new Node(options); | ||
function Raft(address, options) { | ||
var raft = this; | ||
if (!(raft instanceof Raft)) return new Raft(options); | ||
options = options || {}; | ||
@@ -69,10 +72,10 @@ | ||
this.election = { | ||
min: Tick.parse(options['election min'] || '150 ms'), | ||
max: Tick.parse(options['election max'] || '300 ms') | ||
raft.election = { | ||
min: ms(options['election min'] || '150 ms'), | ||
max: ms(options['election max'] || '300 ms') | ||
}; | ||
this.beat = Tick.parse(options.heartbeat || '50 ms'); | ||
raft.beat = ms(options.heartbeat || '50 ms'); | ||
this.votes = { | ||
raft.votes = { | ||
for: null, // Who did we vote for in this current term. | ||
@@ -82,10 +85,10 @@ granted: 0 // How many votes we're granted to us. | ||
this.write = this.write || options.write || null; | ||
this.threshold = options.threshold || 0.8; | ||
this.address = options.address || UUID(); | ||
this.timers = new Tick(this); | ||
this.Log = options.Log; | ||
this.latency = 0; | ||
this.log = null; | ||
this.nodes = []; | ||
raft.write = raft.write || options.write || null; | ||
raft.threshold = options.threshold || 0.8; | ||
raft.address = options.address || UUID(); | ||
raft.timers = new Tick(raft); | ||
raft.Log = options.Log; | ||
raft.latency = 0; | ||
raft.log = null; | ||
raft.nodes = []; | ||
@@ -98,11 +101,7 @@ // | ||
// | ||
this.state = options.state || Node.FOLLOWER; // Our current state. | ||
this.leader = ''; // Leader in our cluster. | ||
this.term = 0; // Our current term. | ||
raft.state = options.state || Raft.FOLLOWER; // Our current state. | ||
raft.leader = ''; // Leader in our cluster. | ||
raft.term = 0; // Our current term. | ||
if ('function' === this.type(this.initialize)) { | ||
this.once('initialize', this.initialize); | ||
} | ||
this._initialize(options); | ||
raft._initialize(options); | ||
} | ||
@@ -113,11 +112,16 @@ | ||
// | ||
Node.extend = require('extendible'); | ||
Node.prototype = new EventEmitter(); | ||
Node.prototype.constructor = Node; | ||
Node.prototype.emits = require('emits'); | ||
Raft.extend = require('extendible'); | ||
Raft.prototype = new EventEmitter(); | ||
Raft.prototype.constructor = Raft; | ||
// | ||
// Add some methods which are best done using modules. | ||
// | ||
Raft.prototype.emits = require('emits'); | ||
Raft.prototype.change = require('modification')(' change'); | ||
/** | ||
* Raft §5.1: | ||
* | ||
* A Node can be in only one of the various states. The stopped state is not | ||
* A Raft can be in only one of the various states. The stopped state is not | ||
* something that is part of the Raft protocol but something we might want to | ||
@@ -136,9 +140,9 @@ * use internally while we're starting or shutting down our node. The following | ||
*/ | ||
Node.states = 'STOPPED,LEADER,CANDIDATE,FOLLOWER,CHILD'.split(','); | ||
for (var s = 0; s < Node.states.length; s++) { | ||
Node[Node.states[s]] = s; | ||
Raft.states = 'STOPPED,LEADER,CANDIDATE,FOLLOWER,CHILD'.split(','); | ||
for (var s = 0; s < Raft.states.length; s++) { | ||
Raft[Raft.states[s]] = s; | ||
} | ||
/** | ||
* Initialize the node and start listening to the various of events we're | ||
* Initialize Raft and start listening to the various of events we're | ||
* emitting as we're quite chatty to provide the maximum amount of flexibility | ||
@@ -150,9 +154,11 @@ * and reconfigurability. | ||
*/ | ||
Node.prototype._initialize = function initialize(options) { | ||
Raft.prototype._initialize = function initializing(options) { | ||
var raft = this; | ||
// | ||
// Reset our vote as we're starting a new term. Votes only last one term. | ||
// | ||
this.on('term change', function change() { | ||
this.votes.for = null; | ||
this.votes.granted = 0; | ||
raft.on('term change', function change() { | ||
raft.votes.for = null; | ||
raft.votes.granted = 0; | ||
}); | ||
@@ -164,6 +170,6 @@ | ||
// | ||
this.on('state change', function change(state) { | ||
this.timers.clear('heartbeat, election'); | ||
this.heartbeat(Node.LEADER === this.state ? this.beat : this.timeout()); | ||
this.emit(Node.states[state].toLowerCase()); | ||
raft.on('state change', function change(state) { | ||
raft.timers.clear('heartbeat, election'); | ||
raft.heartbeat(Raft.LEADER === raft.state ? raft.beat : raft.timeout()); | ||
raft.emit(Raft.states[state].toLowerCase()); | ||
}); | ||
@@ -174,10 +180,11 @@ | ||
// | ||
this.on('data', function incoming(packet, write) { | ||
raft.on('data', function incoming(packet, write) { | ||
write = write || nope; | ||
var reason; | ||
if ('object' !== this.type(packet)) { | ||
if ('object' !== raft.type(packet)) { | ||
reason = 'Invalid packet received'; | ||
this.emit('error', new Error(reason)); | ||
return write(this.packet('error', reason)); | ||
raft.emit('error', new Error(reason)); | ||
return write(raft.packet('error', reason)); | ||
} | ||
@@ -192,15 +199,16 @@ | ||
// | ||
// If the node receives a request with a stale term number it should be | ||
// If the raft receives a request with a stale term number it should be | ||
// rejected. | ||
// | ||
if (packet.term > this.term) { | ||
this.change({ | ||
leader: Node.LEADER === packet.state ? packet.address : packet.leader || this.leader, | ||
state: Node.FOLLOWER, | ||
if (packet.term > raft.term) { | ||
raft.change({ | ||
leader: Raft.LEADER === packet.state ? packet.address : packet.leader || raft.leader, | ||
state: Raft.FOLLOWER, | ||
term: packet.term | ||
}); | ||
} else if (packet.term < this.term) { | ||
reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ this.term; | ||
this.emit('error', new Error(reason)); | ||
return write(this.packet('error', reason)); | ||
} else if (packet.term < raft.term) { | ||
reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ raft.term; | ||
raft.emit('error', new Error(reason)); | ||
return write(raft.packet('error', reason)); | ||
} | ||
@@ -218,5 +226,5 @@ | ||
// | ||
if (Node.LEADER === packet.state) { | ||
if (Node.FOLLOWER !== this.state) this.change({ state: Node.FOLLOWER }); | ||
if (packet.address !== this.leader) this.change({ leader: packet.address }); | ||
if (Raft.LEADER === packet.state) { | ||
if (Raft.FOLLOWER !== raft.state) raft.change({ state: Raft.FOLLOWER }); | ||
if (packet.address !== raft.leader) raft.change({ leader: packet.address }); | ||
@@ -227,3 +235,3 @@ // | ||
// | ||
this.heartbeat(this.timeout()); | ||
raft.heartbeat(raft.timeout()); | ||
} | ||
@@ -236,3 +244,3 @@ | ||
// | ||
// A node asked us to vote on them. We can only vote to them if they | ||
// A raft asked us to vote on them. We can only vote to them if they | ||
// represent a higher term (and last log term, last log index). | ||
@@ -245,5 +253,6 @@ // | ||
// | ||
if (this.votes.for && this.votes.for !== packet.address) { | ||
this.emit('vote', packet, false); | ||
return write(this.packet('voted', { granted: false })); | ||
if (raft.votes.for && raft.votes.for !== packet.address) { | ||
raft.emit('vote', packet, false); | ||
return write(raft.packet('voted', { granted: false })); | ||
} | ||
@@ -258,8 +267,9 @@ | ||
// | ||
if (this.log && packet.last && ( | ||
this.log.index > packet.last.index | ||
|| this.term > packet.last.term | ||
if (raft.log && packet.last && ( | ||
raft.log.index > packet.last.index | ||
|| raft.term > packet.last.term | ||
)) { | ||
this.emit('vote', packet, false); | ||
return write(this.packet('voted', { granted: false })); | ||
raft.emit('vote', packet, false); | ||
return write(raft.packet('voted', { granted: false })); | ||
} | ||
@@ -272,14 +282,14 @@ | ||
// | ||
this.votes.for = packet.address; | ||
this.emit('vote', packet, true); | ||
this.change({ leader: packet.address, term: packet.term }); | ||
write(this.packet('voted', { granted: true })); | ||
raft.votes.for = packet.address; | ||
raft.emit('vote', packet, true); | ||
raft.change({ leader: packet.address, term: packet.term }); | ||
write(raft.packet('voted', { granted: true })); | ||
// | ||
// We've accepted someone as potential new leader, so we should reset | ||
// our heartbeat to prevent this node from timing out after voting. | ||
// our heartbeat to prevent this raft from timing out after voting. | ||
// Which would again increment the term causing us to be next CANDIDATE | ||
// and invalidates the request we just got, so that's silly willy. | ||
// | ||
this.heartbeat(this.timeout()); | ||
raft.heartbeat(raft.timeout()); | ||
break; | ||
@@ -294,4 +304,4 @@ | ||
// | ||
if (Node.CANDIDATE !== this.state) { | ||
return write(this.packet('error', 'No longer a candidate, ignoring vote')); | ||
if (Raft.CANDIDATE !== raft.state) { | ||
return write(raft.packet('error', 'No longer a candidate, ignoring vote')); | ||
} | ||
@@ -301,6 +311,6 @@ | ||
// Increment our received votes when our voting request has been | ||
// granted by the node that received the data. | ||
// granted by the raft that received the data. | ||
// | ||
if (packet.data.granted) { | ||
this.votes.granted++; | ||
raft.votes.granted++; | ||
} | ||
@@ -312,4 +322,4 @@ | ||
// | ||
if (this.quorum(this.votes.granted)) { | ||
this.change({ leader: this.address, state: Node.LEADER }); | ||
if (raft.quorum(raft.votes.granted)) { | ||
raft.change({ leader: raft.address, state: Raft.LEADER }); | ||
@@ -319,3 +329,3 @@ // | ||
// | ||
this.message(Node.FOLLOWER, this.packet('append')); | ||
raft.message(Raft.FOLLOWER, raft.packet('append')); | ||
} | ||
@@ -330,3 +340,3 @@ | ||
case 'error': | ||
this.emit('error', new Error(packet.data)); | ||
raft.emit('error', new Error(packet.data)); | ||
break; | ||
@@ -358,6 +368,6 @@ | ||
default: | ||
if (this.listeners('rpc').length) { | ||
this.emit('rpc', packet, write); | ||
if (raft.listeners('rpc').length) { | ||
raft.emit('rpc', packet, write); | ||
} else { | ||
write(this.packet('error', 'Unknown message type: '+ packet.type)); | ||
write(raft.packet('error', 'Unknown message type: '+ packet.type)); | ||
} | ||
@@ -369,25 +379,38 @@ } | ||
// We do not need to execute the rest of the functionality below as we're | ||
// currently running as "child" node of the cluster not as the "root" node. | ||
// currently running as "child" raft of the cluster not as the "root" raft. | ||
// | ||
if (Node.CHILD === this.state) return; | ||
if (Raft.CHILD === raft.state) return raft.emit('initialize'); | ||
// | ||
// Setup the log & appends. Assume that if we're given a function log that it | ||
// needs to be initialized as it requires access to our node instance so it | ||
// needs to be initialized as it requires access to our raft instance so it | ||
// can read our information like our leader, state, term etc. | ||
// | ||
if ('function' === this.type(this.Log)) { | ||
this.log = new this.Log(this, options); | ||
if ('function' === raft.type(raft.Log)) { | ||
raft.log = new raft.Log(raft, options); | ||
} | ||
// | ||
// The node is now listening to events so we can start our heartbeat timeout. | ||
// So that if we don't hear anything from a leader we can promote our selfs to | ||
// a candidate state. | ||
// | ||
// We want to call the `initialize` event before starting a heartbeat so | ||
// implementors have some time to start listening for incoming ping packets. | ||
// | ||
this.emit('initialize'); | ||
this.heartbeat(this.timeout()); | ||
/** | ||
* The raft is now listening to events so we can start our heartbeat timeout. | ||
* So that if we don't hear anything from a leader we can promote our selfs to | ||
* a candidate state. | ||
* | ||
* Start listening listening for heartbeats when implementors are also ready | ||
* with setting up their code. | ||
* | ||
* @api private | ||
*/ | ||
function initialize(err) { | ||
if (err) return raft.emit('error', err); | ||
raft.emit('initialize'); | ||
raft.heartbeat(raft.timeout()); | ||
} | ||
if ('function' === raft.type(raft.initialize)) { | ||
if (raft.initialize.length === 2) return raft.initialize(options, initialize); | ||
raft.initialize(options); | ||
} | ||
initialize(); | ||
}; | ||
@@ -402,3 +425,3 @@ | ||
*/ | ||
Node.prototype.type = function type(of) { | ||
Raft.prototype.type = function type(of) { | ||
return Object.prototype.toString.call(of).slice(8, -1).toLowerCase(); | ||
@@ -415,3 +438,3 @@ }; | ||
*/ | ||
Node.prototype.quorum = function quorum(responses) { | ||
Raft.prototype.quorum = function quorum(responses) { | ||
if (!this.nodes.length || !responses) return false; | ||
@@ -428,3 +451,3 @@ | ||
*/ | ||
Node.prototype.majority = function majority() { | ||
Raft.prototype.majority = function majority() { | ||
return Math.ceil(this.nodes.length / 2) + 1; | ||
@@ -439,8 +462,8 @@ }; | ||
* @param {Number} timeout Which timeout should we use. | ||
* @returns {Node} | ||
* @returns {Raft} | ||
* @api public | ||
*/ | ||
Node.prototype.indefinitely = function indefinitely(attempt, fn, timeout) { | ||
Raft.prototype.indefinitely = function indefinitely(attempt, fn, timeout) { | ||
var uuid = UUID() | ||
, node = this; | ||
, raft = this; | ||
@@ -455,7 +478,8 @@ (function again() { | ||
var next = one(function force(err, data) { | ||
if (!node.timers) return; // We're been destroyed, ignore all. | ||
if (!raft.timers) return; // We're been destroyed, ignore all. | ||
node.timers.setImmediate(uuid +'@async', function async() { | ||
raft.timers.setImmediate(uuid +'@async', function async() { | ||
if (err) { | ||
node.emit('error', err); | ||
raft.emit('error', err); | ||
return again(); | ||
@@ -469,9 +493,9 @@ } | ||
// | ||
// Ensure that the assigned callback has the same context as our node. | ||
// Ensure that the assigned callback has the same context as our raft. | ||
// | ||
attempt.call(node, next); | ||
attempt.call(raft, next); | ||
node.timers.setTimeout(uuid, function timeoutfn() { | ||
raft.timers.setTimeout(uuid, function timeoutfn() { | ||
next(new Error('Timed out, attempting to retry again')); | ||
}, +timeout || node.timeout()); | ||
}, +timeout || raft.timeout()); | ||
}()); | ||
@@ -483,49 +507,26 @@ | ||
/** | ||
* Process a change in the node. | ||
* Start or update the heartbeat of the Raft. If we detect that we've received | ||
* a heartbeat timeout we will promote our selfs to a candidate to take over the | ||
* leadership. | ||
* | ||
* @param {Object} changed Data that is changed. | ||
* @returns {Node} | ||
* @param {String|Number} duration Time it would take for the heartbeat to timeout. | ||
* @returns {Raft} | ||
* @api private | ||
*/ | ||
Node.prototype.change = function change(changed) { | ||
var changes = ['term', 'leader', 'state'] | ||
, currently, previously | ||
, i = 0; | ||
Raft.prototype.heartbeat = function heartbeat(duration) { | ||
var raft = this; | ||
if (!changed) return this; | ||
duration = duration || raft.beat; | ||
for (; i < changes.length; i++) { | ||
if (changes[i] in changed && changed[changes[i]] !== this[changes[i]]) { | ||
currently = changed[changes[i]]; | ||
previously = this[changes[i]]; | ||
if (raft.timers.active('heartbeat')) { | ||
raft.timers.adjust('heartbeat', duration); | ||
this[changes[i]] = currently; | ||
this.emit(changes[i] +' change', currently, previously); | ||
} | ||
return raft; | ||
} | ||
return this; | ||
}; | ||
raft.timers.setTimeout('heartbeat', function heartbeattimeout() { | ||
if (Raft.LEADER !== raft.state) { | ||
raft.emit('heartbeat timeout'); | ||
/** | ||
* Start or update the heartbeat of the Node. If we detect that we've received | ||
* a heartbeat timeout we will promote our selfs to a candidate to take over the | ||
* leadership. | ||
* | ||
* @param {String|Number} duration Time it would take for the heartbeat to timeout. | ||
* @returns {Node} | ||
* @api private | ||
*/ | ||
Node.prototype.heartbeat = function heartbeat(duration) { | ||
duration = duration || this.beat; | ||
if (this.timers.active('heartbeat')) { | ||
this.timers.adjust('heartbeat', duration); | ||
return this; | ||
} | ||
this.timers.setTimeout('heartbeat', function heartbeattimeout() { | ||
if (Node.LEADER !== this.state) { | ||
this.emit('heartbeat timeout'); | ||
return this.promote(); | ||
return raft.promote(); | ||
} | ||
@@ -540,9 +541,9 @@ | ||
// | ||
var packet = this.packet('append'); | ||
var packet = raft.packet('append'); | ||
this.emit('heartbeat', packet); | ||
this.message(Node.FOLLOWER, packet).heartbeat(this.beat); | ||
raft.emit('heartbeat', packet); | ||
raft.message(Raft.FOLLOWER, packet).heartbeat(raft.beat); | ||
}, duration); | ||
return this; | ||
return raft; | ||
}; | ||
@@ -554,6 +555,6 @@ | ||
* | ||
* - Node.LEADER : Send a message to cluster's current leader. | ||
* - Node.FOLLOWER : Send a message to all non leaders. | ||
* - Node.CHILD : Send a message to everybody. | ||
* - <address> : Send a message to a node based on the address. | ||
* - Raft.LEADER : Send a message to cluster's current leader. | ||
* - Raft.FOLLOWER : Send a message to all non leaders. | ||
* - Raft.CHILD : Send a message to everybody. | ||
* - <address> : Send a message to a raft based on the address. | ||
* | ||
@@ -563,11 +564,20 @@ * @param {Mixed} who Recipient of the message. | ||
* @param {Function} when Completion callback | ||
* @returns {Node} | ||
* @returns {Raft} | ||
* @api public | ||
*/ | ||
Node.prototype.message = function message(who, what, when) { | ||
Raft.prototype.message = function message(who, what, when) { | ||
when = when || nope; | ||
var length = this.nodes.length | ||
// | ||
// If the "who" is undefined, the developer made an error somewhere. Tell them! | ||
// | ||
if (typeof who === 'undefined') { | ||
throw new Error('Cannot send message to `undefined`. Check your spelling!'); | ||
} | ||
var output = { errors: {}, results: {} } | ||
, length = this.nodes.length | ||
, errors = false | ||
, latency = [] | ||
, node = this | ||
, raft = this | ||
, nodes = [] | ||
@@ -577,21 +587,21 @@ , i = 0; | ||
switch (who) { | ||
case Node.LEADER: for (; i < length; i++) | ||
if (node.leader === node.nodes[i].address) { | ||
nodes.push(node.nodes[i]); | ||
case Raft.LEADER: for (; i < length; i++) | ||
if (raft.leader === raft.nodes[i].address) { | ||
nodes.push(raft.nodes[i]); | ||
} | ||
break; | ||
case Node.FOLLOWER: for (; i < length; i++) | ||
if (node.leader !== node.nodes[i].address) { | ||
nodes.push(node.nodes[i]); | ||
case Raft.FOLLOWER: for (; i < length; i++) | ||
if (raft.leader !== raft.nodes[i].address) { | ||
nodes.push(raft.nodes[i]); | ||
} | ||
break; | ||
case Node.CHILD: | ||
Array.prototype.push.apply(nodes, node.nodes); | ||
case Raft.CHILD: | ||
Array.prototype.push.apply(nodes, raft.nodes); | ||
break; | ||
default: for (; i < length; i++) | ||
if (who === node.nodes[i].address) { | ||
nodes.push(node.nodes[i]); | ||
if (who === raft.nodes[i].address) { | ||
nodes.push(raft.nodes[i]); | ||
} | ||
@@ -603,3 +613,3 @@ } | ||
* | ||
* @param {Node} client Node we need to write a message to. | ||
* @param {Raft} client Raft we need to write a message to. | ||
* @param {Object} data Message that needs to be send. | ||
@@ -615,2 +625,13 @@ * @api private | ||
// | ||
// Add the error or output to our `output` object to be | ||
// passed to the callback when all the writing is done. | ||
// | ||
if (err) { | ||
errors = true; | ||
output.errors[client.address] = err; | ||
} else { | ||
output.results[client.address] = data; | ||
} | ||
// | ||
// OK, so this is the strange part here. We've broadcasted messages and | ||
@@ -621,4 +642,4 @@ // got replies back. This reply contained data so we need to process it. | ||
// | ||
if (err) node.emit('error', err); | ||
else if (data) node.emit('data', data); | ||
if (err) raft.emit('error', err); | ||
else if (data) raft.emit('data', data); | ||
@@ -629,3 +650,6 @@ // | ||
if (latency.length === length) { | ||
node.timing(latency); | ||
raft.timing(latency); | ||
when(errors ? output.errors : undefined, output.results); | ||
latency.length = nodes.length = 0; | ||
output = null; | ||
} | ||
@@ -642,3 +666,3 @@ }); | ||
return node; | ||
return raft; | ||
}; | ||
@@ -652,3 +676,3 @@ | ||
*/ | ||
Node.prototype.timeout = function timeout() { | ||
Raft.prototype.timeout = function timeout() { | ||
var times = this.election; | ||
@@ -667,13 +691,17 @@ | ||
*/ | ||
Node.prototype.timing = function timing(latency) { | ||
if (Node.STOPPED === this.state) return false; | ||
Raft.prototype.timing = function timing(latency) { | ||
var raft = this | ||
, sum = 0 | ||
, i = 0; | ||
for (var i = 0, sum = 0; i < latency.length; i++) { | ||
if (Raft.STOPPED === raft.state) return false; | ||
for (; i < latency.length; i++) { | ||
sum += latency[i]; | ||
} | ||
this.latency = Math.floor(sum / latency.length); | ||
raft.latency = Math.floor(sum / latency.length); | ||
if (this.latency > this.election.min * this.threshold) { | ||
this.emit('threshold'); | ||
if (raft.latency > raft.election.min * raft.threshold) { | ||
raft.emit('threshold'); | ||
} | ||
@@ -689,11 +717,13 @@ | ||
* election for leadership. We increment our current term, set the CANDIDATE | ||
* state, vote our selfs and ask all others nodes to vote for us. | ||
* state, vote our selfs and ask all others rafts to vote for us. | ||
* | ||
* @returns {Node} | ||
* @returns {Raft} | ||
* @api private | ||
*/ | ||
Node.prototype.promote = function promote() { | ||
this.change({ | ||
state: Node.CANDIDATE, // We're now a candidate, | ||
term: this.term + 1, // but only for this term. | ||
Raft.prototype.promote = function promote() { | ||
var raft = this; | ||
raft.change({ | ||
state: Raft.CANDIDATE, // We're now a candidate, | ||
term: raft.term + 1, // but only for this term. | ||
leader: '' // We no longer have a leader. | ||
@@ -704,26 +734,26 @@ }); | ||
// Candidates are always biased and vote for them selfs first before sending | ||
// out a voting request to all other nodes in the cluster. | ||
// out a voting request to all other rafts in the cluster. | ||
// | ||
this.votes.for = this.address; | ||
this.votes.granted = 1; | ||
raft.votes.for = raft.address; | ||
raft.votes.granted = 1; | ||
// | ||
// Broadcast the voting request to all connected nodes in your private | ||
// Broadcast the voting request to all connected rafts in your private | ||
// cluster. | ||
// | ||
var packet = this.packet('vote') | ||
var packet = raft.packet('vote') | ||
, i = 0; | ||
this.message(Node.FOLLOWER, this.packet('vote')); | ||
raft.message(Raft.FOLLOWER, raft.packet('vote')); | ||
// | ||
// Set the election timeout. This gives the nodes some time to reach | ||
// Set the election timeout. This gives the rafts some time to reach | ||
// consensuses about who they want to vote for. If no consensus has been | ||
// reached within the set timeout we will attempt it again. | ||
// | ||
this.timers | ||
raft.timers | ||
.clear('heartbeat, election') | ||
.setTimeout('election', this.promote, this.timeout()); | ||
.setTimeout('election', raft.promote, raft.timeout()); | ||
return this; | ||
return raft; | ||
}; | ||
@@ -739,14 +769,15 @@ | ||
*/ | ||
Node.prototype.packet = function wrap(type, data) { | ||
var packet = { | ||
state: this.state, // So you know if we're a leader, candidate or follower. | ||
term: this.term, // Our current term so we can find mis matches. | ||
address: this.address, // Adress of the sender. | ||
type: type, // Message type. | ||
leader: this.leader, // Who is our leader. | ||
}; | ||
Raft.prototype.packet = function wrap(type, data) { | ||
var raft = this | ||
, packet = { | ||
state: raft.state, // Are we're a leader, candidate or follower. | ||
term: raft.term, // Our current term so we can find mis matches. | ||
address: raft.address, // Address of the sender. | ||
type: type, // Message type. | ||
leader: raft.leader, // Who is our leader. | ||
}; | ||
// | ||
// If we have logging and state replication enabled we also need to send this | ||
// additional data so we can use it determine the state of this node. | ||
// additional data so we can use it determine the state of this raft. | ||
// | ||
@@ -756,3 +787,3 @@ // @TODO point to index of last commit entry. | ||
// | ||
if (this.log) packet.last = { term: this.term, index: this.log.index }; | ||
if (raft.log) packet.last = { term: raft.term, index: raft.log.index }; | ||
if (arguments.length === 2) packet.data = data; | ||
@@ -769,15 +800,16 @@ | ||
* @param {Object} options Configuration that should override the default config. | ||
* @returns {Node} The newly created instance. | ||
* @returns {Raft} The newly created instance. | ||
* @api public | ||
*/ | ||
Node.prototype.clone = function clone(options) { | ||
Raft.prototype.clone = function clone(options) { | ||
options = options || {}; | ||
var node = { | ||
'Log': this.Log, | ||
'election max': this.election.max, | ||
'election min': this.election.min, | ||
'heartbeat': this.beat, | ||
'threshold': this.threshold, | ||
}, key; | ||
var raft = this | ||
, node = { | ||
'Log': raft.Log, | ||
'election max': raft.election.max, | ||
'election min': raft.election.min, | ||
'heartbeat': raft.beat, | ||
'threshold': raft.threshold, | ||
}, key; | ||
@@ -790,16 +822,18 @@ for (key in node) { | ||
return new this.constructor(options); | ||
return new raft.constructor(options); | ||
}; | ||
/** | ||
* A new node is about to join the cluster. So we need to upgrade the | ||
* configuration of every single node. | ||
* A new raft is about to join the cluster. So we need to upgrade the | ||
* configuration of every single raft. | ||
* | ||
* @param {String} address The address of the node that is connected. | ||
* @param {String} address The address of the raft that is connected. | ||
* @param {Function} write A method that we use to write data. | ||
* @returns {Node} The node we created and that joined our cluster. | ||
* @returns {Raft} The raft we created and that joined our cluster. | ||
* @api public | ||
*/ | ||
Node.prototype.join = function join(address, write) { | ||
if ('function' === this.type(address)) { | ||
Raft.prototype.join = function join(address, write) { | ||
var raft = this; | ||
if ('function' === raft.type(address)) { | ||
write = address; address = null; | ||
@@ -811,18 +845,18 @@ } | ||
// add a really simple address check here. Return nothing so people can actually | ||
// check if a node has been added. | ||
// check if a raft has been added. | ||
// | ||
if (this.address === address) return; | ||
if (raft.address === address) return; | ||
var node = this.clone({ | ||
var node = raft.clone({ | ||
write: write, // Optional function that receives our writes. | ||
address: address, // A custom address for the node we added. | ||
state: Node.CHILD // We are a node in the cluster. | ||
address: address, // A custom address for the raft we added. | ||
state: Raft.CHILD // We are a raft in the cluster. | ||
}); | ||
node.once('end', function end() { | ||
this.leave(node); | ||
}, this); | ||
raft.leave(node); | ||
}, raft); | ||
this.nodes.push(node); | ||
this.emit('join', node); | ||
raft.nodes.push(node); | ||
raft.emit('join', node); | ||
@@ -833,15 +867,16 @@ return node; | ||
/** | ||
* Remove a node from the cluster. | ||
* Remove a raft from the cluster. | ||
* | ||
* @param {String} address The address of the node that should be removed. | ||
* @returns {Node} The node that we removed. | ||
* @param {String} address The address of the raft that should be removed. | ||
* @returns {Raft} The raft that we removed. | ||
* @api public | ||
*/ | ||
Node.prototype.leave = function leave(address) { | ||
var index = -1 | ||
Raft.prototype.leave = function leave(address) { | ||
var raft = this | ||
, index = -1 | ||
, node; | ||
for (var i = 0; i < this.nodes.length; i++) { | ||
if (this.nodes[i] === address || this.nodes[i].address === address) { | ||
node = this.nodes[i]; | ||
for (var i = 0; i < raft.nodes.length; i++) { | ||
if (raft.nodes[i] === address || raft.nodes[i].address === address) { | ||
node = raft.nodes[i]; | ||
index = i; | ||
@@ -853,6 +888,6 @@ break; | ||
if (~index && node) { | ||
raft.nodes.splice(index, 1); | ||
if (node.end) node.end(); | ||
this.nodes.splice(index, 1); | ||
this.emit('leave', node); | ||
raft.emit('leave', node); | ||
} | ||
@@ -864,3 +899,3 @@ | ||
/** | ||
* This Node needs to be shut down. | ||
* This Raft needs to be shut down. | ||
* | ||
@@ -870,16 +905,18 @@ * @returns {Boolean} Successful destruction. | ||
*/ | ||
Node.prototype.end = Node.prototype.destroy = function end() { | ||
if (Node.STOPPED === this.state) return false; | ||
this.state = Node.STOPPED; | ||
Raft.prototype.end = Raft.prototype.destroy = function end() { | ||
var raft = this; | ||
if (this.nodes.length) for (var i = 0; i < this.nodes.length; i++) { | ||
this.leave(this.nodes[i]); | ||
if (Raft.STOPPED === raft.state) return false; | ||
raft.state = Raft.STOPPED; | ||
if (raft.nodes.length) for (var i = 0; i < raft.nodes.length; i++) { | ||
raft.leave(raft.nodes[i]); | ||
} | ||
this.emit('end'); | ||
this.timers.end(); | ||
this.removeAllListeners(); | ||
raft.emit('end'); | ||
raft.timers.end(); | ||
raft.removeAllListeners(); | ||
if (this.log) this.log.end(); | ||
this.timers = this.log = this.Log = this.beat = this.election = null; | ||
if (raft.log) raft.log.end(); | ||
raft.timers = raft.log = raft.Log = raft.beat = raft.election = null; | ||
@@ -892,2 +929,2 @@ return true; | ||
// | ||
module.exports = Node; | ||
module.exports = Raft; |
{ | ||
"name": "liferaft", | ||
"version": "0.1.0", | ||
"version": "0.1.1", | ||
"description": "Consensus protocol based on raft, it will one day save your live.", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "mocha --reporter spec --ui bdd test.js", | ||
"watch": "mocha --watch --reporter spec --ui bdd test.js", | ||
"coverage": "istanbul cover ./node_modules/.bin/_mocha -- --reporter spec --ui bdd test.js", | ||
"test-travis": "istanbul cover node_modules/.bin/_mocha --report lcovonly -- --reporter spec --ui bdd test.js" | ||
"100%": "istanbul check-coverage --statements 100 --functions 100 --lines 100 --branches 100", | ||
"test": "mocha test.js", | ||
"watch": "mocha --watch test.js", | ||
"coverage": "istanbul cover ./node_modules/.bin/_mocha -- test.js", | ||
"test-travis": "istanbul cover node_modules/.bin/_mocha --report lcovonly -- test.js" | ||
}, | ||
@@ -28,16 +29,18 @@ "repository": { | ||
"dependencies": { | ||
"emits": "1.0.x", | ||
"eventemitter3": "0.1.x", | ||
"emits": "3.0.x", | ||
"eventemitter3": "1.1.x", | ||
"extendible": "0.1.x", | ||
"immediate": "3.0.x", | ||
"immediate": "3.2.x", | ||
"millisecond": "0.0.x", | ||
"modification": "1.0.x", | ||
"one-time": "0.0.x", | ||
"tick-tock": "0.0.x" | ||
"tick-tock": "0.1.x" | ||
}, | ||
"devDependencies": { | ||
"assume": "0.0.x", | ||
"assume": "1.2.x", | ||
"diagnostics": "0.0.x", | ||
"istanbul": "0.3.x", | ||
"mocha": "2.0.x", | ||
"pre-commit": "0.0.x" | ||
"mocha": "2.2.x", | ||
"pre-commit": "1.0.x" | ||
} | ||
} |
109
README.md
@@ -35,3 +35,3 @@ # liferaft | ||
- [Extending](#extending) | ||
- [Initialization](#initialization) | ||
- [Transports](#transports) | ||
- [License](#license) | ||
@@ -89,2 +89,14 @@ | ||
```js | ||
var raft = new Raft({ | ||
'address': 'tcp://localhost:8089', | ||
'election min': '200 millisecond', | ||
'election max': '1 second' | ||
}); | ||
``` | ||
As you might have noticed we're using two different styles of passing in the | ||
address to the raft instance, as address property in the options and as first | ||
argument in the constructor. | ||
### Events | ||
@@ -178,3 +190,3 @@ | ||
According to section x.x of the Raft paper it's required that we retry sending | ||
According to section 5.3 of the Raft paper it's required that we retry sending | ||
the RPC messages until they succeed. This function will run the given `attempt` | ||
@@ -340,13 +352,55 @@ function until the received callback has been called successfully and within our | ||
### Initialization | ||
## Transports | ||
The library ships without transports by default. If we we're to implement this | ||
it would have made this library way to opinionated. You might want to leverage | ||
and existing infrastructure or library for messaging instead of going with our | ||
solution. There are only two methods you need to implement an `initialize` | ||
method and an `write` method. Both methods serve different use cases so we're | ||
going to take a closer look at both of them. | ||
### write | ||
```js | ||
var LifeBoat = LifeRaft.extend({ | ||
socket: null, | ||
write: function write(packet, callback) { | ||
if (!this.socket) this.socket = require('net').connect(this.address); | ||
this.socket.write(JSON.stringify(packet)); | ||
// More code here ;-) | ||
} | ||
}); | ||
``` | ||
There are a couple of things that we assume you implement in the write | ||
method: | ||
- **Message encoding** The packet that you receive is an JSON object but you | ||
have to decide how you're going transfer that over the write in the most | ||
efficient way for you. | ||
- **message resending** The Raft protocol states the messages that you write | ||
should be retried until indefinitely ([Raft 5.3][5.3]). There are already | ||
transports which do this automatically for you but if your's is missing this, | ||
the [LifeRaft#indefinitely()](#liferaftindefinitelyattempt-fn-timeout) is | ||
specifically written for this. | ||
### initialize | ||
When you extend the `LifeRaft` instance you can assign a special `initialize` | ||
method. This method will be called when our `LifeRaft` code has been fully | ||
initialized and we're ready to initialize your code. Please bare in mind that | ||
this is a synchronous invocation and that we will start heart beat timeout after | ||
the execution of the function. This method is ideal for implementing your own | ||
transport technology. The function is invoked with one argument, these are the | ||
options that were used to construct the instance. If no options were provided we | ||
will default to empty object so this argument is always an available. | ||
initialized and we're ready to initialize your code. The invocation type depends | ||
on the amount of arguments you specify in the function. | ||
- **synchronous**: Your function specifies less then 2 arguments, it will | ||
receive one argument which is the options object that was provided in the | ||
constructor. If no options were provided it will be an empty object. | ||
- **asynchronous**: Your function specifies 2 arguments, just like the | ||
synchronous execution it will receive the passed options as first argument but | ||
it will also receive a callback function as second argument. This callback | ||
should be executed once you're done with setting up your transport and you are | ||
ready to receive messages. The function follows an error first pattern so it | ||
receives an error as first argument it will emit the `error` event on the | ||
constructed instance. | ||
```js | ||
@@ -359,6 +413,27 @@ var LifeBoat = LifeRaft.extend({ | ||
}); | ||
// | ||
// Or in async mode: | ||
// | ||
var LifeBoat = LifeRaft.extend({ | ||
server: null, | ||
initialize: function initialize(options, fn) { | ||
this.server = require('net').createServer(function () { | ||
// Do stuff here to handle incoming connections etc. | ||
}.bind(this)); | ||
var next = require('one-time')(fn); | ||
this.server.once('listening', next); | ||
this.server.once('error', next); | ||
this.server.listen(this.address); | ||
} | ||
}) | ||
``` | ||
In parallel to the execution of your `initialize` method we also emit an | ||
`initialize` event. This receives the same amount of arguments. | ||
After your `initialize` method is called we will emit the `initialize` event. If | ||
your `initialize` method is asynchronous we will emit the event **after** the | ||
callback has been executed. Once the event is emitted we will start our timeout | ||
timers and hope that we will receive message in time. | ||
@@ -371,1 +446,13 @@ ## License | ||
[Browserify]: http://browserify.org/ | ||
[5]: https://github.com/unshiftio/liferaft/blob/master/raft.md#5-the-raft-consensus-algorithm | ||
[5.1]: https://github.com/unshiftio/liferaft/blob/master/raft.md#51-raft-basics | ||
[5.2]: https://github.com/unshiftio/liferaft/blob/master/raft.md#52-leader-election | ||
[5.3]: https://github.com/unshiftio/liferaft/blob/master/raft.md#53-log-replication | ||
[5.4]: https://github.com/unshiftio/liferaft/blob/master/raft.md#54-safety | ||
[5.4.1]: https://github.com/unshiftio/liferaft/blob/master/raft.md#541-election-restriction | ||
[5.4.2]: https://github.com/unshiftio/liferaft/blob/master/raft.md#542-committing-entries-from-previous-terms | ||
[5.4.3]: https://github.com/unshiftio/liferaft/blob/master/raft.md#543-safety-argument | ||
[5.5]: https://github.com/unshiftio/liferaft/blob/master/raft.md#55-follower-and-candidate-crashes | ||
[5.6]: https://github.com/unshiftio/liferaft/blob/master/raft.md#56-timing-and-availability | ||
[6]: https://github.com/unshiftio/liferaft/blob/master/raft.md#6-cluster-membership-changes | ||
[7]: https://github.com/unshiftio/liferaft/blob/master/raft.md#7-log-compaction |
87
test.js
@@ -94,2 +94,44 @@ /* istanbul ignore next */ | ||
}); | ||
it('async emits the initialize event once the initialize method is done', function (next) { | ||
var ready = false; | ||
var MyRaft = Raft.extend({ | ||
initialize: function initialize(options, init) { | ||
assume(options.custom).equals('options'); | ||
assume(ready).is.false(); | ||
setTimeout(function () { | ||
ready = true; | ||
init(); | ||
}, 100); | ||
} | ||
}); | ||
var raft = new MyRaft('foobar', { custom: 'options' }); | ||
raft.on('initialize', function () { | ||
assume(ready).is.true(); | ||
next(); | ||
}); | ||
}); | ||
it('emits error when the initialize fails', function (next) { | ||
var MyRaft = Raft.extend({ | ||
initialize: function initialize(options, init) { | ||
setTimeout(function () { | ||
init(new Error('Failure')); | ||
}, 100); | ||
} | ||
}); | ||
var raft = new MyRaft(); | ||
raft.on('error', function (err) { | ||
assume(err.message).equals('Failure'); | ||
next(); | ||
}); | ||
}); | ||
}); | ||
@@ -181,2 +223,34 @@ | ||
}); | ||
it('throws an error on undefined message', function () { | ||
assume(function () { | ||
raft.message(undefined, raft.packet('foo')); | ||
}).throws('Cannot send message to `undefined`'); | ||
}); | ||
it('runs the `when` callback with no errors', function (next) { | ||
var node = raft.join(function (data, callback) { | ||
callback(undefined, 'foo'); | ||
}); | ||
node.address = 'addr'; | ||
raft.message(Raft.FOLLOWER, raft.packet('foo'), function (err, data) { | ||
assume(err).equals(undefined); | ||
assume(data).deep.equals({ addr: 'foo' }); | ||
next(); | ||
}); | ||
}); | ||
it('runs the `when` callback with no errors', function (next) { | ||
var node = raft.join(function (data, callback) { | ||
callback('bar'); | ||
}); | ||
node.address = 'addr'; | ||
raft.message(Raft.FOLLOWER, raft.packet('foo'), function (err, data) { | ||
assume(err).deep.equals({ addr: 'bar' }); | ||
assume(data).deep.equals({}); | ||
next(); | ||
}); | ||
}); | ||
}); | ||
@@ -958,2 +1032,15 @@ | ||
}); | ||
describe('bugs', function () { | ||
it('correctly deletes nodes from the list on leave', function () { | ||
raft.join('1'); | ||
raft.join('2'); | ||
raft.join('3'); | ||
assume(raft.nodes.length).equals(3); | ||
raft.leave('2'); | ||
assume(raft.nodes.length).equals(2); | ||
}); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
124474
13
1901
453
8
3
+ Addedmillisecond@0.0.x
+ Addedmodification@1.0.x
+ Addedemits@3.0.0(transitive)
+ Addedeventemitter3@1.1.1(transitive)
+ Addedimmediate@3.2.3(transitive)
+ Addedmillisecond@0.1.2(transitive)
+ Addedmodification@1.0.0(transitive)
+ Addedtick-tock@0.1.6(transitive)
- Removedemits@1.0.2(transitive)
- Removedeventemitter3@0.1.6(transitive)
- Removedimmediate@3.0.6(transitive)
- Removedtick-tock@0.0.9(transitive)
Updatedemits@3.0.x
Updatedeventemitter3@1.1.x
Updatedimmediate@3.2.x
Updatedtick-tock@0.1.x