Comparing version 0.0.0 to 0.0.1
'use strict'; | ||
/** | ||
* | ||
* @constructor | ||
* @param {LifeRaft} liferaft Liferaft instance. | ||
* @param {Object} options Optional configuration | ||
*/ | ||
function Append(liferaft, options) { | ||
if (!(this instanceof Append)) return new Append(liferaft, options); | ||
options = options || {}; | ||
this.raft = liferaft; | ||
this.liferaft = liferaft; | ||
} | ||
Append.prototype.entry = function entry() { | ||
return { | ||
term: this.raft.term; | ||
}; | ||
}; | ||
// | ||
// Expose the module. | ||
// | ||
module.exports = Append; |
472
index.js
@@ -7,2 +7,13 @@ 'use strict'; | ||
/** | ||
* Proper type checking. | ||
* | ||
* @param {Mixed} of Thing we want to know the type of. | ||
* @returns {String} The type. | ||
* @api private | ||
*/ | ||
function type(of) { | ||
return Object.prototype.toString.call(of).slice(8, -1).toLowerCase(); | ||
} | ||
/** | ||
* Generate a somewhat unique UUID. | ||
@@ -26,2 +37,10 @@ * | ||
/** | ||
* A nope function for when people don't want message acknowledgements. Because | ||
* they don't care about CAP. | ||
* | ||
* @api private | ||
*/ | ||
function nope() {} | ||
/** | ||
* Representation of a single node in the cluster. | ||
@@ -31,3 +50,3 @@ * | ||
* | ||
* - `id`: An unique id of this given node. | ||
* - `name`: An unique id of this given node. | ||
* - `heartbeat min`: Minimum heartbeat timeout. | ||
@@ -39,11 +58,10 @@ * - `heartbeat max`: Maximum heartbeat timeout. | ||
* timeout. | ||
* - `Log`: A Log constructor that should be used to store commit logs. | ||
* | ||
* @constructor | ||
* @param {Function} read Method will be called to receive a callback. | ||
* @param {Function} write Called when the node needs to communicate. | ||
* @param {Object} options Node configuration. | ||
* @api public | ||
*/ | ||
function Node(read, write, options) { | ||
if (!(this instanceof Node)) return new Node(read, write, options); | ||
function Node(options) { | ||
if (!(this instanceof Node)) return new Node(options); | ||
@@ -58,36 +76,46 @@ options = options || {}; | ||
this.beat = { | ||
min: Tick.parse(options['heartbeat min'] || '150 ms'), | ||
max: Tick.parse(options['heartbeat max'] || '300 ms') | ||
min: Tick.parse(options['heartbeat min'] || this.election.min), | ||
max: Tick.parse(options['heartbeat max'] || this.election.max) | ||
}; | ||
this.votes = { | ||
for: null, | ||
granted: 0 | ||
for: null, // Who did we vote for in this current term. | ||
granted: 0 // How many votes we're granted to us. | ||
}; | ||
this.write = this.write || options.write || null; | ||
this.threshold = options.threshold || 0.8; | ||
this.name = options.name || UUID(); | ||
this.timers = new Tick(); | ||
this._write = write; | ||
this.timers = new Tick(this); | ||
this.Log = options.Log; | ||
this.log = null; | ||
this.nodes = []; | ||
// | ||
// 5.2: When a server starts, it's always started as Follower and it will | ||
// remain in this state until receive a message from a Leader or Candidate. | ||
// Raft §5.2: | ||
// | ||
// When a server starts, it's always started as Follower and it will remain in | ||
// this state until receive a message from a Leader or Candidate. | ||
// | ||
this.state = Node.FOLLOWER; // Our current state. | ||
this.leader = null; // Leader in our cluster. | ||
this.leader = ''; // Leader in our cluster. | ||
this.term = 0; // Our current term. | ||
this.initialize(); | ||
read(this.emits('RPC')); | ||
this.initialize(options); | ||
} | ||
// | ||
// Add some sugar and spice and everything nice. Oh, and also inheritance. | ||
// | ||
Node.extend = require('extendible'); | ||
Node.prototype = new EventEmitter(); | ||
Node.prototype.emits = require('emits'); | ||
Node.prototype.constructor = Node; | ||
/** | ||
* The different states that a node can have. | ||
* Raft §5.1: | ||
* | ||
* A Node can be in only one of the various states. The stopped state is not | ||
* something that is part of the Raft protocol but something we might want to | ||
* use internally while we're starting or shutting down our node. | ||
* | ||
* @type {Number} | ||
@@ -102,11 +130,14 @@ * @private | ||
/** | ||
* Initialize the node. | ||
* Initialize the node and start listening to the various of events we're | ||
* emitting as we're quite chatty to provide the maximum amount of flexibility | ||
* and reconfigurability. | ||
* | ||
* @param {Object} options The configuration you passed in the constructor. | ||
* @api private | ||
*/ | ||
Node.prototype.initialize = function initialize() { | ||
Node.prototype.initialize = function initialize(options) { | ||
// | ||
// Reset our vote as we're starting a new term. Votes only last one term. | ||
// | ||
this.on('term change', function change() { | ||
// | ||
// Reset our vote as we're starting a new term. Votes only last one term. | ||
// | ||
this.votes.for = null; | ||
@@ -116,32 +147,65 @@ this.votes.granted = 0; | ||
this.on('RPC', function incoming(data) { | ||
if ('object' !== typeof data) return; /* Invalid data structure, G.T.F.O. */ | ||
// | ||
// Reset our times and start the heartbeat again. If we're promoted to leader | ||
// the heartbeat will automatically be broadcasted to users as well. | ||
// | ||
this.on('state change', function change(currently, previously) { | ||
this.timers.clear(); | ||
this.heartbeat(); | ||
}); | ||
// | ||
// Receive incoming messages and process them. | ||
// | ||
this.on('data', function incoming(packet, write) { | ||
write = write || nope; | ||
if ('object' !== type(packet)) { | ||
return write(new Error('Invalid packet received')); | ||
} | ||
// | ||
// We're waiting for votes to come in as we're promoted to a candidate but | ||
// a new node with a leadership role just send us a message. If his term is | ||
// greater then ours we will step down as candidate and acknowledge their | ||
// leadership. | ||
// Raft §5.1: | ||
// | ||
if ( | ||
Node.CANDIDATE === this.state | ||
&& Node.LEADER === this.state | ||
) { | ||
if (data.term >= this.term) { | ||
this.change({ | ||
state: Node.FOLLOWER, | ||
term: data.term | ||
}); | ||
} | ||
else return; /* We need to ignore the RPC as it's in an incorrect state */ | ||
// Applies to all states. If a response contains a higher term then our | ||
// current term need to change our state to FOLLOWER and set the received | ||
// term. | ||
// | ||
// If the node receives a request with a stale term number it should be | ||
// rejected. | ||
// | ||
if (packet.term > this.term) { | ||
this.change({ | ||
leader: packet.leader, | ||
state: Node.FOLLOWER, | ||
term: packet.term | ||
}); | ||
} else if (packet.term < this.term) { | ||
return write(new Error('Stale term detected, we are at '+ this.term)); | ||
} | ||
switch (data.type) { | ||
case 'heartbeat': | ||
if (Node.LEADER === data.state) { | ||
this.heartbeat(data.data); | ||
} | ||
break; | ||
// | ||
// Raft §5.2: | ||
// | ||
// If we receive a message from someone who claims to be leader and shares | ||
// our same term while we're in candidate mode we will recognize their | ||
// leadership and return as follower | ||
// | ||
if (Node.LEADER === packet.state && Node.FOLLOWER !== this.state) { | ||
this.change({ state: Node.FOLLOWER, leader: packet.leader }); | ||
} | ||
// | ||
// Always when we receive an message from the Leader we need to reset our | ||
// heartbeat. | ||
// | ||
if (Node.LEADER === packet.state) { | ||
this.heartbeat(); | ||
} | ||
switch (packet.type) { | ||
// | ||
// Raft §5.2: | ||
// Raft §5.4: | ||
// | ||
// A node asked us to vote on them. We can only vote to them if they | ||
@@ -151,2 +215,34 @@ // represent a higher term (and last log term, last log index). | ||
case 'vote': | ||
// | ||
// The term of the vote is bigger then ours so we need to update it. If | ||
// it's the same and we already voted, we need to deny the vote. | ||
// | ||
if (this.votes.for && this.votes.for !== packet.name) { | ||
this.emit('vote', packet, false); | ||
return write(undefined, this.packet('vote', { granted: false })); | ||
} | ||
// | ||
// If we maintain a log, check if the candidates log is as up to date as | ||
// ours. | ||
// | ||
// @TODO point to index of last commit entry. | ||
// @TODO point to term of last commit entry. | ||
// | ||
if (this.log && packet.last && ( | ||
this.log.index > packet.last.index | ||
|| this.term > packet.last.term | ||
)) { | ||
this.emit('vote', packet, false); | ||
return write(undefined, this.packet('vote', { granted: false })); | ||
} | ||
// | ||
// We've made our decision, we haven't voted for this term yet and this | ||
// candidate came in first so it gets our vote as all requirements are | ||
// met. | ||
// | ||
this.votes.for = packet.name; | ||
this.emit('vote', packet, true); | ||
write(undefined, this.packet('vote', { granted: true })); | ||
break; | ||
@@ -159,20 +255,98 @@ | ||
// | ||
// Only accepts votes while we're still in a CANDIDATE state. | ||
// | ||
if (Node.CANDIDATE !== this.state) { | ||
return write(new Error('No longer a candidate')); | ||
} | ||
// | ||
// Increment our received votes when our voting request has been | ||
// granted by the node that received the data. | ||
// | ||
if (packet.data.granted) this.votes.granted++; | ||
// | ||
// Check if we've received the minimal amount of votes required for this | ||
// current voting round to be considered valid | ||
// | ||
if (this.votes.granted === (this.nodes.length / 2) + 1) { | ||
this.change({ | ||
leader: this.name, | ||
state: Node.LEADER | ||
}); | ||
if (this.quorum(this.votes.granted)) { | ||
this.change({ leader: this.name, state: Node.LEADER }); | ||
} | ||
// | ||
// Empty write, nothing to do. | ||
// | ||
write(); | ||
break; | ||
case 'rpc': | ||
// | ||
// Remark: Are we assuming we are getting an appendEntries from the | ||
// leader and comparing and appending our log? | ||
// | ||
case 'append': | ||
break; | ||
// | ||
// Remark: So does this get emit when we need to write our OWN log? | ||
// | ||
case 'log': | ||
break; | ||
// | ||
// Unknown event, we have no idea how to process this so we're going to | ||
// return an error. | ||
// | ||
default: | ||
write(new Error('Unknown message type: '+ packet.type)); | ||
} | ||
}); | ||
// | ||
// We do not need to execute the functionality below if we have a write method | ||
// assigned to our selfs. This prevents us from timing out and other nasty | ||
// stuff. | ||
// | ||
if (this.write) return; | ||
// | ||
// Setup the log & appends. Assume that if we're given a function log that it | ||
// needs to be initialized as it requires access to our node instance so it | ||
// can read our information like our leader, state, term etc. | ||
// | ||
if ('function' === type(this.Log)) { | ||
this.log = new this.Log(this, options); | ||
} | ||
// | ||
// The node is now listening to events so we can start our heartbeat timeout. | ||
// So that if we don't hear anything from a leader we can promote our selfs to | ||
// a candidate state. | ||
// | ||
this.heartbeat(); | ||
}; | ||
/** | ||
* Check if we've reached our quorum (a.k.a. minimum amount of votes requires | ||
* for a voting round to be considered valid) for the given amount of votes. | ||
* | ||
* @param {Number} responses Amount of responses received. | ||
* @returns {Boolean} | ||
* @api private | ||
*/ | ||
Node.prototype.quorum = function quorum(responses) { | ||
if (!this.nodes.length || !responses) return false; | ||
return responses >= this.majority(); | ||
}; | ||
/** | ||
* The majority required to reach our the quorum. | ||
* | ||
* @returns {Number} | ||
* @api private | ||
*/ | ||
Node.prototype.majority = function majority() { | ||
return Math.ceil(this.nodes.length / 2) + 1; | ||
}; | ||
/** | ||
* Process a change in the node. | ||
@@ -186,8 +360,14 @@ * | ||
var changes = ['term', 'leader', 'state'] | ||
, currently, previously | ||
, i = 0; | ||
if (!changed) return this; | ||
for (; i < changes.length; i++) { | ||
if (changes[i] in changed && changed[changes[i]] !== this[changes[i]]) { | ||
this[changes[i]] = changed[changes[i]]; | ||
this.emit(changes[i] +' change'); | ||
currently = changed[changes[i]]; | ||
previously = this[changes[i]]; | ||
this[changes[i]] = currently; | ||
this.emit(changes[i] +' change', currently, previously); | ||
} | ||
@@ -206,6 +386,6 @@ } | ||
* @returns {Node} | ||
* @api public | ||
* @api private | ||
*/ | ||
Node.prototype.heartbeat = function heartbeat(duration) { | ||
duration = duration || this.timeout('heartbeat'); | ||
duration = duration || this.timeout('beat'); | ||
@@ -218,8 +398,10 @@ if (this.timers.active('heartbeat')) { | ||
this.timers.setTimeout('heartbeat', function () { | ||
if (Node.LEADER !== this.state) return this.promote(); | ||
if (Node.LEADER !== this.state) { | ||
this.emit('heartbeat timeout'); | ||
return this.promote(); | ||
} | ||
// | ||
// We're the LEADER so we should be broadcasting. | ||
// @TODO We're the LEADER so we should be broadcasting. | ||
// | ||
this.broadcast('heartbeat'); | ||
}, duration); | ||
@@ -235,23 +417,21 @@ | ||
* @returns {Number} | ||
* @api public | ||
* @api private | ||
*/ | ||
Node.prototype.timeout = function timeout(which) { | ||
var times = this[which] || this.beat; | ||
var times = this[which]; | ||
return Math.min( | ||
Math.round((Math.random() * 1) * times.min), | ||
times.max | ||
); | ||
return Math.floor(Math.random() * (times.max - times.min + 1) + times.min); | ||
}; | ||
/** | ||
* Node detected a failure in the cluster and wishes to be promoted to new | ||
* master and promotes it self to candidate. | ||
* Raft §5.2: | ||
* | ||
* We've detected a timeout from the leaders heartbeats and need to start a new | ||
* election for leadership. We increment our current term, set the CANDIDATE | ||
* state, vote our selfs and ask all others nodes to vote for us. | ||
* | ||
* @returns {Node} | ||
* @api public | ||
* @api private | ||
*/ | ||
Node.prototype.promote = function promote() { | ||
if (Node.CANDIDATE === this.state) return this; | ||
this.change({ | ||
@@ -269,4 +449,23 @@ state: Node.CANDIDATE, // We're now a candidate, | ||
this.votes.granted = 1; | ||
this.broadcast('vote'); | ||
// | ||
// Broadcast the voting request to all connected nodes in your private | ||
// cluster. | ||
// | ||
var packet = this.packet('vote') | ||
, i = 0; | ||
for (; i < this.nodes.length; i++) { | ||
this.nodes[i].write(packet); | ||
} | ||
// | ||
// Set the election timeout. This gives the nodes some time to reach | ||
// consensuses about who they want to vote for. If no consensus has been | ||
// reached within the set timeout we will attempt it again. | ||
// | ||
this.timers | ||
.clear() // Clear all old timers, this one is the most important now. | ||
.setTimeout('election', this.promote, this.timeout('election')); | ||
return this; | ||
@@ -276,41 +475,113 @@ }; | ||
/** | ||
* Write out a message. | ||
* Wrap the outgoing messages in an object with additional required data. | ||
* | ||
* @param {String} type Message type we're trying to send. | ||
* @param {Mixed} data Data to be transfered. | ||
* @returns {Boolean} Successful write. | ||
* @returns {Object} Packet. | ||
* @api private | ||
*/ | ||
Node.prototype.packet = function wrap(type, data) { | ||
var packet = { | ||
state: this.state, // So you know if we're a leader, candidate or follower. | ||
term: this.term, // Our current term so we can find mis matches. | ||
name: this.name, // Name of the sender. | ||
data: data, // Custom data we send. | ||
type: type, // Message type. | ||
leader: this.leader, // Who is our leader. | ||
}; | ||
// | ||
// If we have logging and state replication enabled we also need to send this | ||
// additional data so we can use it determine the state of this node. | ||
// | ||
// @TODO point to index of last commit entry. | ||
// @TODO point to term of last commit entry. | ||
// | ||
if (this.log) packet.last = { term: this.term, index: this.log.index }; | ||
return packet; | ||
}; | ||
/** | ||
* Create a clone of the current instance with the same configuration. Ideally | ||
* for creating connected nodes in a cluster.. And let that be something we're | ||
* planning on doing. | ||
* | ||
* @param {Object} options Configuration that should override the default config. | ||
* @returns {Node} The newly created instance. | ||
* @api public | ||
*/ | ||
Node.prototype.write = function write(type, data) { | ||
return this._write(this.packet(type, data)); | ||
Node.prototype.clone = function clone(options) { | ||
options = options || {}; | ||
var node = { | ||
'election min': this.election.min, | ||
'election max': this.election.max, | ||
'heartbeat min': this.beat.min, | ||
'heartbeat max': this.beat.max, | ||
'threshold': this.threshold, | ||
'Log': this.Log | ||
}, key; | ||
for (key in node) { | ||
if (key in options || !node.hasOwnProperty(key)) continue; | ||
options[key] = node[key]; | ||
} | ||
return new this.constructor(options); | ||
}; | ||
/** | ||
* Broadcast a message. | ||
* A new node is about to join the cluster. So we need to upgrade the | ||
* configuration of every single node. | ||
* | ||
* @param {String} type Message type we're trying to send. | ||
* @param {Mixed} data Data to be transfered. | ||
* @returns {Boolean} Successful write. | ||
* @param {String} name The name of the node that is connected. | ||
* @param {Function} write A method that we use to write data. | ||
* @returns {Node} The node we created and that joined our cluster. | ||
* @api public | ||
*/ | ||
Node.prototype.broadcast = function broadcast(type, data) { | ||
var packet = this.packet(type, data); | ||
Node.prototype.join = function join(name, write) { | ||
if ('function' === type(name)) { | ||
write = name; name = null; | ||
} | ||
var node = this.clone({ | ||
write: write, // Function that receives our writes. | ||
name: name // A custom name for the node we added. | ||
}); | ||
node.once('end', function end() { | ||
this.leave(node); | ||
}, this); | ||
this.nodes.push(node); | ||
this.emit('join', node); | ||
return node; | ||
}; | ||
/** | ||
* Wrap the outgoing messages in an object with additional required data. | ||
* Remove a node from the cluster. | ||
* | ||
* @param {String} type Message type we're trying to send. | ||
* @param {Mixed} data Data to be transfered. | ||
* @returns {Object} Packet. | ||
* @api private | ||
* @returns {Node} The node that we removed. | ||
* @api public | ||
*/ | ||
Node.prototype.packet = function packet(type, data) { | ||
return { | ||
state: this.state, // So you know if we're a leader, candidate or follower | ||
term: this.term, // Our current term so we can find mis matches | ||
name: this.name, // Name of the sender. | ||
data: data, // Custom data we send. | ||
type: type // Message type. | ||
}; | ||
Node.prototype.leave = function leave(name) { | ||
var index = -1 | ||
, node; | ||
for (var i = 0; i < this.nodes.length; i++) { | ||
if (this.nodes[i] === name || this.nodes[i].name === name) { | ||
node = this.nodes[i]; | ||
index = i; | ||
break; | ||
} | ||
} | ||
if (~index && node) { | ||
if (node.end) node.end(); | ||
this.nodes.splice(index, 1); | ||
this.emit('leave', node); | ||
} | ||
return node; | ||
}; | ||
@@ -325,8 +596,15 @@ | ||
Node.prototype.end = function end() { | ||
if (!this.state) return false; | ||
if (Node.STOPPED === this.state) return false; | ||
this.state = Node.STOPPED; | ||
if (this.nodes.length) for (var i = 0; i < this.nodes.length; i++) { | ||
this.leave(this.nodes[i]); | ||
} | ||
this.emit('end'); | ||
this.timers.end(); | ||
this.removeAllListeners(); | ||
this.timers = this.state = this.write = this.read = null; | ||
if (this.log) this.log.end(); | ||
this.timers = this.log = this.Log = this.beat = this.election = null; | ||
@@ -333,0 +611,0 @@ return true; |
{ | ||
"name": "liferaft", | ||
"version": "0.0.0", | ||
"version": "0.0.1", | ||
"description": "Consensus protocol based on raft, but only for saving lifes.", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "echo \"Error: no test specified\" && exit 1" | ||
"test": "mocha --reporter spec --ui bdd test.js", | ||
"watch": "mocha --watch --reporter spec --ui bdd test.js", | ||
"coverage": "istanbul cover ./node_modules/.bin/_mocha -- --reporter spec --ui bdd test.js", | ||
"test-travis": "istanbul cover node_modules/.bin/_mocha --report lcovonly -- --reporter spec --ui bdd test.js" | ||
}, | ||
@@ -25,4 +28,6 @@ "repository": { | ||
"dependencies": { | ||
"emits": "1.0.x", | ||
"eventemitter3": "0.1.x" | ||
"eventemitter3": "0.1.x", | ||
"extendible": "0.1.x", | ||
"immediate": "3.0.x", | ||
"tick-tock": "0.0.x" | ||
}, | ||
@@ -29,0 +34,0 @@ "devDependencies": { |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No README
QualityPackage does not have a README. This may indicate a failed publish or a low quality package.
Found 1 instance in 1 package
No tests
QualityPackage does not have any tests. This is a strong signal of a poorly maintained or low quality package.
Found 1 instance in 1 package
83442
10
1145
1
1
154
4
5
1
+ Addedextendible@0.1.x
+ Addedimmediate@3.0.x
+ Addedtick-tock@0.0.x
+ Addedextendible@0.1.1(transitive)
+ Addedimmediate@3.0.6(transitive)
+ Addedmillisecond@0.0.1(transitive)
+ Addedtick-tock@0.0.9(transitive)
- Removedemits@1.0.x
- Removedemits@1.0.2(transitive)