Comparing version 0.0.1 to 0.0.2
384
index.js
'use strict'; | ||
var EventEmitter = require('eventemitter3') | ||
, Tick = require('tick-tock'); | ||
, Tick = require('tick-tock') | ||
, one = require('one-time'); | ||
/** | ||
* Proper type checking. | ||
* | ||
* @param {Mixed} of Thing we want to know the type of. | ||
* @returns {String} The type. | ||
* @api private | ||
*/ | ||
function type(of) { | ||
return Object.prototype.toString.call(of).slice(8, -1).toLowerCase(); | ||
} | ||
/** | ||
* Generate a somewhat unique UUID. | ||
@@ -49,4 +39,3 @@ * | ||
* - `name`: An unique id of this given node. | ||
* - `heartbeat min`: Minimum heartbeat timeout. | ||
* - `heartbeat max`: Maximum heartbeat timeout. | ||
* - `heartbeat`: Heartbeat timeout. | ||
* - `election min`: Minimum election timeout. | ||
@@ -57,8 +46,16 @@ * - `election max`: Maximum election timeout. | ||
* - `Log`: A Log constructor that should be used to store commit logs. | ||
* - `state`: Our initial state. This is a private property and should not be | ||
* set you unless you know what your are doing but as you want to use this | ||
* property I highly doubt that that.. | ||
* | ||
* Please note, when adding new options make sure that you also update the | ||
* `Node#join` method so it will correctly copy the new option to the clone as | ||
* well. | ||
* | ||
* @constructor | ||
* @param {Mixed} name Unique id or name of this given node. | ||
* @param {Object} options Node configuration. | ||
* @api public | ||
*/ | ||
function Node(options) { | ||
function Node(name, options) { | ||
if (!(this instanceof Node)) return new Node(options); | ||
@@ -68,2 +65,5 @@ | ||
if ('object' === typeof name) options = name; | ||
else if (!options.name) options.name = name; | ||
this.election = { | ||
@@ -74,6 +74,3 @@ min: Tick.parse(options['election min'] || '150 ms'), | ||
this.beat = { | ||
min: Tick.parse(options['heartbeat min'] || this.election.min), | ||
max: Tick.parse(options['heartbeat max'] || this.election.max) | ||
}; | ||
this.beat = Tick.parse(options.heartbeat || '50 ms'); | ||
@@ -90,2 +87,3 @@ this.votes = { | ||
this.Log = options.Log; | ||
this.latency = 0; | ||
this.log = null; | ||
@@ -100,7 +98,11 @@ this.nodes = []; | ||
// | ||
this.state = Node.FOLLOWER; // Our current state. | ||
this.leader = ''; // Leader in our cluster. | ||
this.term = 0; // Our current term. | ||
this.state = options.state || Node.FOLLOWER; // Our current state. | ||
this.leader = ''; // Leader in our cluster. | ||
this.term = 0; // Our current term. | ||
this.initialize(options); | ||
if ('function' === this.type(this.initialize)) { | ||
this.once('initialize', this.initialize); | ||
} | ||
this._initialize(options); | ||
} | ||
@@ -114,2 +116,3 @@ | ||
Node.prototype.constructor = Node; | ||
Node.prototype.emits = require('emits'); | ||
@@ -121,11 +124,18 @@ /** | ||
* something that is part of the Raft protocol but something we might want to | ||
* use internally while we're starting or shutting down our node. | ||
* use internally while we're starting or shutting down our node. The following | ||
* states are generated: | ||
* | ||
* - STOPPED: Assume we're dead. | ||
* - LEADER: We're selected as leader process. | ||
* - CANDIDATE: We want to be promoted to leader. | ||
* - FOLLOWER: We're just following a leader. | ||
* - CHILD: A node that has been added using JOIN. | ||
* | ||
* @type {Number} | ||
* @private | ||
*/ | ||
Node.LEADER = 1; // We're selected as leader process. | ||
Node.CANDIDATE = 2; // We want to be promoted to leader. | ||
Node.FOLLOWER = 3; // We're just following a leader. | ||
Node.STOPPED = 4; // Assume we're dead. | ||
Node.states = 'STOPPED,LEADER,CANDIDATE,FOLLOWER,CHILD'.split(','); | ||
for (var s = 0; s < Node.states.length; s++) { | ||
Node[Node.states[s]] = s; | ||
} | ||
@@ -140,3 +150,3 @@ /** | ||
*/ | ||
Node.prototype.initialize = function initialize(options) { | ||
Node.prototype._initialize = function initialize(options) { | ||
// | ||
@@ -154,5 +164,6 @@ // Reset our vote as we're starting a new term. Votes only last one term. | ||
// | ||
this.on('state change', function change(currently, previously) { | ||
this.timers.clear(); | ||
this.heartbeat(); | ||
this.on('state change', function change(state) { | ||
this.timers.clear('heartbeat, election'); | ||
this.heartbeat(Node.LEADER === this.state ? this.beat : this.timeout()); | ||
this.emit(Node.states[state].toLowerCase()); | ||
}); | ||
@@ -165,5 +176,8 @@ | ||
write = write || nope; | ||
var reason; | ||
if ('object' !== type(packet)) { | ||
return write(new Error('Invalid packet received')); | ||
if ('object' !== this.type(packet)) { | ||
reason = 'Invalid packet received'; | ||
this.emit('error', new Error(reason)); | ||
return write(this.packet('error', reason)); | ||
} | ||
@@ -183,3 +197,3 @@ | ||
this.change({ | ||
leader: packet.leader, | ||
leader: Node.LEADER === packet.state ? packet.name : packet.leader || this.leader, | ||
state: Node.FOLLOWER, | ||
@@ -189,3 +203,5 @@ term: packet.term | ||
} else if (packet.term < this.term) { | ||
return write(new Error('Stale term detected, we are at '+ this.term)); | ||
reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ this.term; | ||
this.emit('error', new Error(reason)); | ||
return write(this.packet('error', reason)); | ||
} | ||
@@ -198,14 +214,16 @@ | ||
// our same term while we're in candidate mode we will recognize their | ||
// leadership and return as follower | ||
// leadership and return as follower. | ||
// | ||
if (Node.LEADER === packet.state && Node.FOLLOWER !== this.state) { | ||
this.change({ state: Node.FOLLOWER, leader: packet.leader }); | ||
} | ||
// If we got this far we already know that our terms are the same as it | ||
// would be changed or prevented above.. | ||
// | ||
// Always when we receive an message from the Leader we need to reset our | ||
// heartbeat. | ||
// | ||
if (Node.LEADER === packet.state) { | ||
this.heartbeat(); | ||
if (Node.FOLLOWER !== this.state) this.change({ state: Node.FOLLOWER }); | ||
if (packet.name !== this.leader) this.change({ leader: packet.name }); | ||
// | ||
// Always when we receive an message from the Leader we need to reset our | ||
// heartbeat. | ||
// | ||
this.heartbeat(this.timeout()); | ||
} | ||
@@ -228,3 +246,3 @@ | ||
this.emit('vote', packet, false); | ||
return write(undefined, this.packet('vote', { granted: false })); | ||
return write(this.packet('voted', { granted: false })); | ||
} | ||
@@ -244,3 +262,3 @@ | ||
this.emit('vote', packet, false); | ||
return write(undefined, this.packet('vote', { granted: false })); | ||
return write(this.packet('voted', { granted: false })); | ||
} | ||
@@ -255,3 +273,12 @@ | ||
this.emit('vote', packet, true); | ||
write(undefined, this.packet('vote', { granted: true })); | ||
this.change({ leader: packet.name, term: packet.term }); | ||
write(this.packet('voted', { granted: true })); | ||
// | ||
// We've accepted someone as potential new leader, so we should reset | ||
// our heartbeat to prevent this node from timing out after voting. | ||
// Which would again increment the term causing us to be next CANDIDATE | ||
// and invalidates the request we just got, so that's silly willy. | ||
// | ||
this.heartbeat(this.timeout()); | ||
break; | ||
@@ -267,3 +294,3 @@ | ||
if (Node.CANDIDATE !== this.state) { | ||
return write(new Error('No longer a candidate')); | ||
return write(this.packet('error', 'No longer a candidate, ignoring vote')); | ||
} | ||
@@ -275,10 +302,17 @@ | ||
// | ||
if (packet.data.granted) this.votes.granted++; | ||
if (packet.data.granted) { | ||
this.votes.granted++; | ||
} | ||
// | ||
// Check if we've received the minimal amount of votes required for this | ||
// current voting round to be considered valid | ||
// current voting round to be considered valid. | ||
// | ||
if (this.quorum(this.votes.granted)) { | ||
this.change({ leader: this.name, state: Node.LEADER }); | ||
// | ||
// Send a heartbeat message to all connected clients. | ||
// | ||
this.message(Node.FOLLOWER, this.packet('append')); | ||
} | ||
@@ -292,2 +326,6 @@ | ||
case 'error': | ||
this.emit('error', new Error(packet.data)); | ||
break; | ||
// | ||
@@ -307,2 +345,8 @@ // Remark: Are we assuming we are getting an appendEntries from the | ||
// | ||
// RPC command | ||
// | ||
case 'exec': | ||
break; | ||
// | ||
// Unknown event, we have no idea how to process this so we're going to | ||
@@ -312,3 +356,7 @@ // return an error. | ||
default: | ||
write(new Error('Unknown message type: '+ packet.type)); | ||
if (this.listeners('rpc').length) { | ||
this.emit('rpc', packet, write); | ||
} else { | ||
write(this.packet('error', 'Unknown message type: '+ packet.type)); | ||
} | ||
} | ||
@@ -318,7 +366,6 @@ }); | ||
// | ||
// We do not need to execute the functionality below if we have a write method | ||
// assigned to our selfs. This prevents us from timing out and other nasty | ||
// stuff. | ||
// We do not need to execute the rest of the functionality below as we're | ||
// currently running as "child" node of the cluster not as the "root" node. | ||
// | ||
if (this.write) return; | ||
if (Node.CHILD === this.state) return; | ||
@@ -330,3 +377,3 @@ // | ||
// | ||
if ('function' === type(this.Log)) { | ||
if ('function' === this.type(this.Log)) { | ||
this.log = new this.Log(this, options); | ||
@@ -340,6 +387,21 @@ } | ||
// | ||
this.heartbeat(); | ||
// We want to call the `initialize` event before starting a heartbeat so | ||
// implementors have some time to start listening for incoming ping packets. | ||
// | ||
this.emit('initialize'); | ||
this.heartbeat(this.timeout()); | ||
}; | ||
/** | ||
* Proper type checking. | ||
* | ||
* @param {Mixed} of Thing we want to know the type of. | ||
* @returns {String} The type. | ||
* @api private | ||
*/ | ||
Node.prototype.type = function type(of) { | ||
return Object.prototype.toString.call(of).slice(8, -1).toLowerCase(); | ||
}; | ||
/** | ||
* Check if we've reached our quorum (a.k.a. minimum amount of votes requires | ||
@@ -350,6 +412,7 @@ * for a voting round to be considered valid) for the given amount of votes. | ||
* @returns {Boolean} | ||
* @api private | ||
* @api public | ||
*/ | ||
Node.prototype.quorum = function quorum(responses) { | ||
if (!this.nodes.length || !responses) return false; | ||
return responses >= this.majority(); | ||
@@ -362,3 +425,3 @@ }; | ||
* @returns {Number} | ||
* @api private | ||
* @api public | ||
*/ | ||
@@ -370,2 +433,48 @@ Node.prototype.majority = function majority() { | ||
/** | ||
* Attempt to run a function indefinitely until the callback is called. | ||
* | ||
* @param {Function} attempt Function that needs to be attempted. | ||
* @param {Function} fn Completion callback. | ||
* @param {Number} timeout Which timeout should we use. | ||
* @returns {Node} | ||
* @api public | ||
*/ | ||
Node.prototype.indefinitely = function indefinitely(attempt, fn, timeout) { | ||
var uuid = UUID() | ||
, node = this; | ||
(function again() { | ||
// | ||
// We need to force async execution here because we do not want to saturate | ||
// the event loop with sync executions. We know that it's important these | ||
// functions are retried indefinitely but if it's called synchronously we will | ||
// not have time to receive data or updates. | ||
// | ||
var next = one(function force(err, data) { | ||
if (!node.timers) return; // We're been destroyed, ignore all. | ||
node.timers.setImmediate(uuid +'@async', function async() { | ||
if (err) { | ||
node.emit('error', err); | ||
return again(); | ||
} | ||
fn(data); | ||
}); | ||
}); | ||
// | ||
// Ensure that the assigned callback has the same context as our node. | ||
// | ||
attempt.call(node, next); | ||
node.timers.setTimeout(uuid, function timeoutfn() { | ||
next(new Error('Timed out, attempting to retry again')); | ||
}, +timeout || node.timeout()); | ||
}()); | ||
return this; | ||
}; | ||
/** | ||
* Process a change in the node. | ||
@@ -407,3 +516,3 @@ * | ||
Node.prototype.heartbeat = function heartbeat(duration) { | ||
duration = duration || this.timeout('beat'); | ||
duration = duration || this.beat; | ||
@@ -415,3 +524,3 @@ if (this.timers.active('heartbeat')) { | ||
this.timers.setTimeout('heartbeat', function () { | ||
this.timers.setTimeout('heartbeat', function heartbeattimeout() { | ||
if (Node.LEADER !== this.state) { | ||
@@ -423,4 +532,7 @@ this.emit('heartbeat timeout'); | ||
// | ||
// @TODO We're the LEADER so we should be broadcasting. | ||
// @TODO this is a temporary hack to get the cluster running. According to | ||
// the raft spec we should be sending empty append requests. | ||
// | ||
this.message(Node.FOLLOWER, this.packet('append')); | ||
this.heartbeat(this.beat); | ||
}, duration); | ||
@@ -432,10 +544,97 @@ | ||
/** | ||
* Send a message to connected nodes within our cluster. The following messaging | ||
* patterns (who) are available: | ||
* | ||
* - Node.LEADER : Send a message to cluster's current leader. | ||
* - Node.FOLLOWER : Send a message to all non leaders. | ||
* - Node.CHILD : Send a message to everybody. | ||
* - <name> : Send a message to a node based on the name. | ||
* | ||
* @param {Mixed} who Recipient of the message. | ||
* @param {Mixed} what The data we need to send. | ||
* @param {Function} when Completion callback | ||
* @returns {Node} | ||
* @api public | ||
*/ | ||
Node.prototype.message = function message(who, what, when) { | ||
when = when || nope; | ||
var length = this.nodes.length | ||
, latency = [] | ||
, node = this | ||
, nodes = [] | ||
, i = 0; | ||
switch (who) { | ||
case Node.LEADER: for (; i < length; i++) | ||
if (node.leader === node.nodes[i].name) { | ||
nodes.push(node.nodes[i]); | ||
} | ||
break; | ||
case Node.FOLLOWER: for (; i < length; i++) | ||
if (node.leader !== node.nodes[i].name) { | ||
nodes.push(node.nodes[i]); | ||
} | ||
break; | ||
case Node.CHILD: | ||
Array.prototype.push.apply(nodes, node.nodes); | ||
break; | ||
default: for (; i < length; i++) | ||
if (who === node.nodes[i].name) { | ||
nodes.push(node.nodes[i]); | ||
} | ||
} | ||
/** | ||
* A small wrapper to force indefinitely sending of a certain packet. | ||
* | ||
* @param {Node} client Node we need to write a message to. | ||
* @param {Object} data Message that needs to be send. | ||
* @api private | ||
*/ | ||
function wrapper(client, data) { | ||
var start = +new Date(); | ||
client.write(data, function written(err, data) { | ||
latency.push(+new Date() - start); | ||
// | ||
// OK, so this is the strange part here. We've broadcasted messages and | ||
// got replies back. This reply contained data so we need to process it. | ||
// What if the data is incorrect? Then we have no way at the moment to | ||
// send back reply to a reply to the server. | ||
// | ||
if (err) node.emit('error', err); | ||
else if (data) node.emit('data', data); | ||
// | ||
// Messaging has been completed. | ||
// | ||
if (latency.length === length) { | ||
node.timing(latency); | ||
} | ||
}); | ||
} | ||
length = nodes.length; | ||
i = 0; | ||
for (; i < length; i++) { | ||
wrapper(nodes[i], what); | ||
} | ||
return node; | ||
}; | ||
/** | ||
* Generate the various of timeouts. | ||
* | ||
* @param {String} which Type of timeout we want to generate. | ||
* @returns {Number} | ||
* @api private | ||
*/ | ||
Node.prototype.timeout = function timeout(which) { | ||
var times = this[which]; | ||
Node.prototype.timeout = function timeout() { | ||
var times = this.election; | ||
@@ -446,2 +645,26 @@ return Math.floor(Math.random() * (times.max - times.min + 1) + times.min); | ||
/** | ||
* Calculate if our average latency causes us to come dangerously close to the | ||
* minimum election timeout. | ||
* | ||
* @param {Array} latency Latency of the last broadcast. | ||
* @param {Boolean} Success-fully calculated the threshold. | ||
* @api private | ||
*/ | ||
Node.prototype.timing = function timing(latency) { | ||
if (Node.STOPPED === this.state) return false; | ||
for (var i = 0, sum = 0; i < latency.length; i++) { | ||
sum += latency[i]; | ||
} | ||
this.latency = Math.floor(sum / latency.length); | ||
if (this.latency > this.election.min * this.threshold) { | ||
this.emit('threshold'); | ||
} | ||
return true; | ||
}; | ||
/** | ||
* Raft §5.2: | ||
@@ -477,5 +700,3 @@ * | ||
for (; i < this.nodes.length; i++) { | ||
this.nodes[i].write(packet); | ||
} | ||
this.message(Node.FOLLOWER, this.packet('vote')); | ||
@@ -488,4 +709,4 @@ // | ||
this.timers | ||
.clear() // Clear all old timers, this one is the most important now. | ||
.setTimeout('election', this.promote, this.timeout('election')); | ||
.clear('heartbeat, election') | ||
.setTimeout('election', this.promote, this.timeout()); | ||
@@ -508,3 +729,2 @@ return this; | ||
name: this.name, // Name of the sender. | ||
data: data, // Custom data we send. | ||
type: type, // Message type. | ||
@@ -522,2 +742,4 @@ leader: this.leader, // Who is our leader. | ||
if (this.log) packet.last = { term: this.term, index: this.log.index }; | ||
if (arguments.length === 2) packet.data = data; | ||
return packet; | ||
@@ -539,8 +761,7 @@ }; | ||
var node = { | ||
'election min': this.election.min, | ||
'election max': this.election.max, | ||
'heartbeat min': this.beat.min, | ||
'heartbeat max': this.beat.max, | ||
'threshold': this.threshold, | ||
'Log': this.Log | ||
'Log': this.Log, | ||
'election max': this.election.max, | ||
'election min': this.election.min, | ||
'heartbeat': this.beat, | ||
'threshold': this.threshold, | ||
}, key; | ||
@@ -566,3 +787,3 @@ | ||
Node.prototype.join = function join(name, write) { | ||
if ('function' === type(name)) { | ||
if ('function' === this.type(name)) { | ||
write = name; name = null; | ||
@@ -572,4 +793,5 @@ } | ||
var node = this.clone({ | ||
write: write, // Function that receives our writes. | ||
name: name // A custom name for the node we added. | ||
write: write, // Optional function that receives our writes. | ||
name: name, // A custom name for the node we added. | ||
state: Node.CHILD // We are a node in the cluster. | ||
}); | ||
@@ -576,0 +798,0 @@ |
{ | ||
"name": "liferaft", | ||
"version": "0.0.1", | ||
"version": "0.0.2", | ||
"description": "Consensus protocol based on raft, but only for saving lifes.", | ||
@@ -28,5 +28,7 @@ "main": "index.js", | ||
"dependencies": { | ||
"emits": "1.0.x", | ||
"eventemitter3": "0.1.x", | ||
"extendible": "0.1.x", | ||
"immediate": "3.0.x", | ||
"one-time": "0.0.x", | ||
"tick-tock": "0.0.x" | ||
@@ -36,2 +38,3 @@ }, | ||
"assume": "0.0.x", | ||
"diagnostics": "0.0.x", | ||
"istanbul": "0.3.x", | ||
@@ -38,0 +41,0 @@ "mocha": "2.0.x", |
@@ -47,3 +47,3 @@ ``` | ||
replicate them across the cluster, forcing the other logs to agree with its | ||
own ([Section 5.3][5.4]). | ||
own ([Section 5.3][5.3]). | ||
- **Safety**: the key safety property for Raft is the State Machine Safety | ||
@@ -50,0 +50,0 @@ Property in Figure 3: if any server has applied a particular log entry to its |
253
README.md
# liferaft | ||
[![Build Status](https://travis-ci.org/unshiftio/liferaft.svg?branch=master)](https://travis-ci.org/unshiftio/liferaft) | ||
[![Coverage Status](https://coveralls.io/repos/unshiftio/liferaft/badge.png?branch=master)](https://coveralls.io/r/unshiftio/liferaft?branch=master) | ||
[![Made by unshift](https://img.shields.io/badge/made%20by-unshift-00ffcc.svg?style=flat-square)](http://unshift.io)[![Version npm](http://img.shields.io/npm/v/liferaft.svg?style=flat-square)](http://browsenpm.org/package/liferaft)[![Build Status](http://img.shields.io/travis/unshiftio/liferaft/master.svg?style=flat-square)](https://travis-ci.org/unshiftio/liferaft)[![Dependencies](https://img.shields.io/david/unshiftio/liferaft.svg?style=flat-square)](https://david-dm.org/unshiftio/liferaft)[![Coverage Status](http://img.shields.io/coveralls/unshiftio/liferaft/master.svg?style=flat-square)](https://coveralls.io/r/unshiftio/liferaft?branch=master)[![IRC channel](http://img.shields.io/badge/IRC-irc.freenode.net%23unshift-00a8ff.svg?style=flat-square)](http://webchat.freenode.net/?channels=unshift) | ||
@@ -17,2 +16,24 @@ `liferaft` is an JavaScript implementation of the [Raft] consensus algorithm. | ||
## Table Of Contents | ||
- [Installation](#installation) | ||
- [Usage](#usage) | ||
- [Configuration](#configuration) | ||
- [Events](#events) | ||
- [LifeRaft.states](#liferaftstates) | ||
- [LifeRaft.{state}](#liferaftfollower-leader-candidate-stopped-child) | ||
- [LifeRaft#type()](#liferafttypeof) | ||
- [LifeRaft#quorum()](#liferaftquorumresponses) | ||
- [LifeRaft#majority()](#liferaftmajority) | ||
- [LifeRaft#indefinitely()](#liferaftindefinitelyattempt-fn-timeout) | ||
- [LifeRaft#packet()](#liferaftpackettype-data) | ||
- [LifeRaft#message()](#liferaftmessagewho-what-when) | ||
- [LifeRaft#join()](#liferaftjoinname-write) | ||
- [LifeRaft#leave()](#liferaftleavename) | ||
- [LifeRaft#promote()](#liferaftpromote) | ||
- [LifeRaft#end()](#liferaftend) | ||
- [Extending](#extending) | ||
- [Initialization](#initialization) | ||
- [License](#license) | ||
## Usage | ||
@@ -27,3 +48,3 @@ | ||
var LifeRaft = require('liferaft') | ||
, raft = new Raft(); | ||
, raft = new Raft('name', { /* optional options */}); | ||
``` | ||
@@ -38,4 +59,7 @@ | ||
extendible as possible so you, as a developer, have complete freedom on how you | ||
want to implement Raft in your architecture. Things like transport layers and | ||
replicated logs are all made optional so you can decided. | ||
want to implement Raft in your architecture. This also means that we ship this | ||
library without any build in transport. This allows you to use it with your | ||
existing technology stack and environment. If you want to use `SharedWorkers` as | ||
transport in the browser? Awesome, you can do that. Want to use it on node? | ||
There are literally thousands of different transport libraries that you can use. | ||
@@ -47,8 +71,11 @@ ### Configuration | ||
- `id` A unique id of the node that we just created. If none is supplied we will | ||
generate a random UUID. | ||
- `heartbeat min` Minimum heartbeat timeout. | ||
- `heartbeat max` Maximum heartbeat timeout. | ||
- `name` A unique name of the node that we just created. If none is supplied we | ||
will generate a random UUID. | ||
- `heartbeat` The heartbeat timeout. Make sure that this value is lower then | ||
your minimum election timeout and take message latency in consideration when | ||
specifying this and the minimum election value. | ||
- `election min` Minimum election timeout. | ||
- `election max` Maximum election timeout. | ||
- `threshold` Threshold for when the heartbeat and latency is to close to the | ||
minimum election timeout. | ||
- `Log`: An Log compatible constructor we which use for state and data | ||
@@ -58,5 +85,3 @@ replication. | ||
The timeout values can be configured with either a number which represents the | ||
time milliseconds or a human readable time string such as `10 ms`. The election | ||
timeout is the leading timeout if you don't provide default values for the | ||
heartbeat it will default to the values of the election timeout. The heartbeat | ||
time milliseconds or a human readable time string such as `10 ms`. The heartbeat | ||
timeouts are used to detect a disconnection from the `LEADER` process if no | ||
@@ -84,21 +109,149 @@ message has been received within the given timeout we assume its dead that we | ||
`end` | This Raft instance has ended. | ||
`initialize` | The node has been fully initialized. | ||
`error` | An error happened while doing.. Things! | ||
`threshold` | The heartbeat timeout is getting close to election timeout. | ||
`leader` | Our state changed to leader. | ||
`follower` | Our state changed to follower. | ||
`candidate` | Our state changed to candidate. | ||
`stopped` | Our state changed to stopped. | ||
### LifeRaft.promote() | ||
--- | ||
**Private method, use with caution** | ||
**Please note that the following properties are exposed on the `constructor` not | ||
on the `prototype`.** | ||
This promotes the Node from `FOLLOWER` to `CANDIDATE` and starts requesting | ||
votes from other connected nodes. When the majority has voted in favour of this | ||
node, it will become `LEADER`. | ||
### LifeRaft.states | ||
This is an array that contains the names of the states. It can be used to create | ||
a human readable string from your current state. | ||
```js | ||
raft.promote(); | ||
console.log(LifeRaft.states[raft.state]); // FOLLOWER | ||
``` | ||
### LifeRaft.join(name, write) | ||
### LifeRaft.{FOLLOWER,LEADER,CANDIDATE,STOPPED,CHILD} | ||
Add a new raft node to your cluster. The name is optional, but it would be nice | ||
if it was the name of the node that you just added to the cluster. | ||
These are the values that we set as state. If you instance is a leader it's | ||
state will be set to `LifeRaft.LEADER`. | ||
--- | ||
### LifeRaft#type(of) | ||
Check the type of the given thing. This returns the correct type for arrays, | ||
objects, regexps and all the things. It's used internally in the library but | ||
might be useful for you as user as well. The function requires one argument | ||
which would be the `thing` who's type you need figure out. | ||
```js | ||
raft.type([]); // array | ||
raft.type({}); // object | ||
``` | ||
### LifeRaft#quorum(responses) | ||
Check if we've reached our quorum (a.k.a. minimum amount of votes requires for a | ||
voting round to be considered valid) for the given amount of votes. This depends | ||
on the amount of joined nodes. It requires one argument which is the amount of | ||
responses that have been received. | ||
```js | ||
raft.join('tcp://127.0.0.1'); | ||
raft.join('tcp://127.0.0.2'); | ||
raft.join('tcp://127.0.0.3'); | ||
raft.join('tcp://127.0.0.4'); | ||
raft.join('tcp://127.0.0.4'); | ||
raft.quorum(5); // true | ||
raft.quorum(2); // false | ||
``` | ||
### LifeRaft#majority() | ||
Returns the majority that needs to be reached for our quorum. | ||
```js | ||
raft.majority(); // 4 | ||
``` | ||
### LifeRaft#indefinitely(attempt, fn, timeout) | ||
According to section x.x of the Raft paper it's required that we retry sending | ||
the RPC messages until they succeed. This function will run the given `attempt` | ||
function until the received callback has been called successfully and within our | ||
given timeout. If this is not the case we will call the attempt function again | ||
and again until it succeeds. The function requires 3 arguments: | ||
1. `attempt`, The function that needs to be called over and over again until he | ||
calls the receiving callback successfully and without errors as we assume an | ||
error first callback pattern. | ||
2. `fn`, Completion callback, we've successfully executed the attempt. | ||
3. `timeout`, Time the attempt is allowed to take. | ||
```js | ||
raft.indefinitely(function attemp(next) { | ||
dosomething(function (err, data) { | ||
// | ||
// if there is no error then we wil also pass the data to the completion | ||
// callback. | ||
// | ||
return next(err, data); | ||
}); | ||
}, function done(data) { | ||
// Successful execution. | ||
}, 1000); | ||
``` | ||
### LifeRaft#packet(type, data) | ||
Generate a new packet object that can be transfered to a client. The method | ||
accepts 2 arguments: | ||
1. `type`, Type of packet that we want to transfer. | ||
2. `data`, Data that should be transfered. | ||
```js | ||
var packet = raft.packet('vote', { foo: 'bar' }); | ||
``` | ||
These packages will contain the following information: | ||
- `state` If we are a `LEADER`, `FOLLOWER` or `CANDIDATE` | ||
- `term` Our current term. | ||
- `name` The name of this node. | ||
- `leader` The name of our leader. | ||
- `last` If logs are enabled we also include the last committed term and index. | ||
And of course also the `type` which is the type you passed this function in and | ||
the `data` that you want to send. | ||
### LifeRaft#message(who, what, when) | ||
The message method is somewhat private but it might also be useful for you as | ||
developer. It's a message interface between every connected node in your | ||
cluster. It allows you to send messages the current leader, or only the | ||
followers or everybody. This allows you easily build other scale and high | ||
availability patterns on top of this module and take advantage of all the | ||
features that this library is offering. This method accepts 2 arguments: | ||
1. `who`, The messaging pattern/mode you want it use. It can either be: | ||
- `LifeRaft.LEADER`: Send message to the current leader. | ||
- `LifeRaft.FOLLOWER`: Send to everybody who is not a leader. | ||
- `LifeRaft.CHILD`: Send to every child in the cluster (everybody). | ||
- `<node name>`: Find the node based on the provided name. | ||
2. `what`, The message body you want to use. We high suggest using the `.packet` | ||
method for constructing cluster messages so additional state can be send. | ||
3. `when`, Optional completion callback for when all messages are send. | ||
This message does have a side affect it also calculates the latency for sending | ||
the messages so we know if we are dangerously close to our threshold. | ||
### LifeRaft#join(name, write) | ||
Add a new raft node to your cluster. All parameters are optional but normally | ||
you would pass in the name or address with the location of the server you want | ||
to add. The write method is only optional if you are using a custom instance | ||
that already has the `write` method defined. | ||
```js | ||
var node = raft.join('127.0.0.1:8080', function write(packet) { | ||
@@ -120,3 +273,3 @@ // Write the message to the actual server that you just added. | ||
### LifeRaft.leave(name) | ||
### LifeRaft#leave(name) | ||
@@ -142,4 +295,16 @@ Now that you've added a new node to your raft cluster it's also good to know | ||
### LifeRaft.end() | ||
### LifeRaft#promote() | ||
**Private method, use with caution** | ||
This promotes the Node from `FOLLOWER` to `CANDIDATE` and starts requesting | ||
votes from other connected nodes. When the majority has voted in favour of this | ||
node, it will become `LEADER`. | ||
```js | ||
raft.promote(); | ||
``` | ||
### LifeRaft#end() | ||
This signals that the node wants to be removed from the cluster. Once it has | ||
@@ -156,2 +321,44 @@ successfully removed it self, it will emit the `end` event. | ||
## Extending | ||
LifeRaft uses the same pattern as Backbone.js to extend it's prototypes. It | ||
exposes an `.extend` method on the constructor. When you call this method it | ||
will return a fresh LifeRaft constructor with the newly applied prototypes and | ||
properties. So these extends will not affect the default instance. This extend | ||
method accepts 2 arguments. | ||
1. Object with properties that should be merged with the `prototype`. | ||
2. Object with properties that should be merged with the constructor. | ||
```js | ||
var LifeBoat = LifeRaft.extend({ | ||
foo: function foo() { | ||
return 'bar'; | ||
} | ||
}); | ||
``` | ||
### Initialization | ||
When you extend the `LifeRaft` instance you can assign a special `initialize` | ||
method. This method will be called when our `LifeRaft` code has been fully | ||
initialized and we're ready to initialize your code. Please bare in mind that | ||
this is a synchronous invocation and that we will start heart beat timeout after | ||
the execution of the function. This method is ideal for implementing your own | ||
transport technology. The function is invoked with one argument, these are the | ||
options that were used to construct the instance. If no options were provided we | ||
will default to empty object so this argument is always an available. | ||
```js | ||
var LifeBoat = LifeRaft.extend({ | ||
socket: null, | ||
initialize: function initialize(options) { | ||
this.socket = new CustomTransport(this.name); | ||
} | ||
}); | ||
``` | ||
In parallel to the execution of your `initialize` method we also emit an | ||
`initialize` event. This receives the same amount of arguments. | ||
## License | ||
@@ -158,0 +365,0 @@ |
320
test.js
@@ -40,8 +40,6 @@ /* istanbul ignore next */ | ||
'election max': '150 ms', | ||
'heartbeat min': '400 ms', | ||
'heartbeat max': '600 ms' | ||
'heartbeat': '600 ms' | ||
}); | ||
assume(raft.beat.max).equals(600); | ||
assume(raft.beat.min).equals(400); | ||
assume(raft.beat).equals(600); | ||
assume(raft.election.max).equals(150); | ||
@@ -54,8 +52,6 @@ assume(raft.election.min).equals(100); | ||
'election max': 150, | ||
'heartbeat min': 400, | ||
'heartbeat max': 600 | ||
'heartbeat': 600 | ||
}); | ||
assume(raft.beat.max).equals(600); | ||
assume(raft.beat.min).equals(400); | ||
assume(raft.beat).equals(600); | ||
assume(raft.election.max).equals(150); | ||
@@ -78,4 +74,112 @@ assume(raft.election.min).equals(100); | ||
}); | ||
it('accepts the name as first argument', function () { | ||
raft.end(); | ||
raft = new Raft('foo'); | ||
assume(raft.name).equals('foo'); | ||
}); | ||
it('will call the initialization function if exists', function (next) { | ||
var MyRaft = Raft.extend({ | ||
initialize: function () { | ||
var node = this; | ||
setTimeout(function () { | ||
node.end(); | ||
next(); | ||
}, 0); | ||
} | ||
}); | ||
new MyRaft(); | ||
}); | ||
}); | ||
describe('#indefinitely', function () { | ||
it('it runs until the supplied callback is called', function (next) { | ||
var attempts = 0; | ||
raft.indefinitely(function attempt(done) { | ||
attempts++; | ||
if (attempts === 5) done(); | ||
}, next, 10); | ||
}); | ||
it('it runs until the supplied callback is called without err', function (next) { | ||
var attempts = 0; | ||
raft.indefinitely(function attempt(done) { | ||
attempts++; | ||
if (attempts === 5) done(); | ||
else done(new Error('failure')); | ||
}, next, 10); | ||
}); | ||
}); | ||
describe('#message', function () { | ||
it('calls all joined nodes', function (next) { | ||
var pattern = ''; | ||
raft.join(function () { pattern += 'a'; }); | ||
raft.join(function () { pattern += 'b'; }); | ||
raft.join(function () { pattern += 'c'; }); | ||
raft.message(Raft.FOLLOWER, raft.packet('foo')); | ||
setTimeout(function () { | ||
assume(pattern).equals('abc'); | ||
next(); | ||
}, 20); | ||
}); | ||
it('emits the `data` event with response', function (next) { | ||
var node = raft.join(function (data, fn) { | ||
fn(undefined, node.packet('external')); | ||
}); | ||
raft.on('rpc', function (packet) { | ||
assume(packet.type).equals('external'); | ||
assume(packet.name).equals(node.name); | ||
assume(raft.name).does.not.equal(node.name); | ||
next(); | ||
}); | ||
raft.message(Raft.FOLLOWER, raft.packet('foo')); | ||
}); | ||
it('sends message to cluster leader', function (next) { | ||
var leader = raft.join(function (packet) { | ||
assume(packet.leader).equals(this.name); | ||
assume(packet.type).equals('leader'); | ||
next(); | ||
}); | ||
raft.join(function () { throw new Error('We are followers, not leader'); }); | ||
raft.join(function () { throw new Error('We are followers, not leader'); }); | ||
raft.join(function () { throw new Error('We are followers, not leader'); }); | ||
raft.change({ leader: leader.name }); | ||
raft.message(Raft.LEADER, raft.packet('leader')); | ||
}); | ||
it('sends a node specified by name', function (next) { | ||
raft.join(function () { throw new Error('You sir, msg the wrong node'); }); | ||
var node = raft.join(function (packet) { | ||
assume(packet.type).equals('named'); | ||
next(); | ||
}); | ||
raft.join(function () { throw new Error('You sir, msg the wrong node'); }); | ||
raft.join(function () { throw new Error('You sir, msg the wrong node'); }); | ||
raft.message(node.name, raft.packet('named')); | ||
}); | ||
}); | ||
describe('#timeout', function () { | ||
@@ -88,3 +192,3 @@ it('generates a random timeout between min/max', function () { | ||
for (var i = 0; i < times; i++) { | ||
timeouts.push(raft.timeout('election')); | ||
timeouts.push(raft.timeout()); | ||
} | ||
@@ -106,3 +210,3 @@ | ||
// | ||
assume(Object.keys(same).length).is.above(70); | ||
// assume(Object.keys(same).length).is.above(70); | ||
}); | ||
@@ -483,3 +587,6 @@ | ||
raft.emit('data', { type: 'bar' }, function (err) { | ||
assume(err).is.instanceOf(Error); | ||
assume(err).is.not.instanceOf(Error); | ||
assume(err).is.a('object'); | ||
assume(err.type).equals('error'); | ||
assume(err.data).includes('Unknown'); | ||
next(); | ||
@@ -491,3 +598,7 @@ }); | ||
raft.emit('data', 1, function (err) { | ||
assume(err).is.instanceOf(Error); | ||
assume(err).is.not.instanceOf(Error); | ||
assume(err).is.a('object'); | ||
assume(err.type).equals('error'); | ||
assume(err.data).includes('Invalid'); | ||
next(); | ||
@@ -524,2 +635,56 @@ }); | ||
}); | ||
describe('state events', function () { | ||
it('should emit a `leader` event', function (next) { | ||
raft.once('leader', function () { | ||
next(); | ||
}); | ||
raft.change({ state: Raft.LEADER }); | ||
}); | ||
it('should emit a `follower` event', function (next) { | ||
raft.once('follower', function () { | ||
next(); | ||
}); | ||
raft.change({ state: Raft.LEADER }); // Default is follower, so change first | ||
raft.change({ state: Raft.FOLLOWER }); | ||
}); | ||
it('should emit a `candidate` event', function (next) { | ||
raft.once('candidate', function () { | ||
next(); | ||
}); | ||
raft.change({ state: Raft.CANDIDATE }); | ||
}); | ||
it('should emit a `stopped` event', function (next) { | ||
raft.once('stopped', function () { | ||
next(); | ||
}); | ||
raft.change({ state: Raft.STOPPED }); | ||
}); | ||
it('should emit a `child` event', function (next) { | ||
raft.once('child', function () { | ||
next(); | ||
}); | ||
raft.change({ state: Raft.CHILD }); | ||
}); | ||
}); | ||
describe('rpc', function () { | ||
it('should emit an rpc event when an unknown package arrives', function (next) { | ||
raft.once('rpc', function (packet) { | ||
assume(packet.type).equals('shizzle'); | ||
next(); | ||
}); | ||
raft.emit('data', raft.packet('shizzle')); | ||
}); | ||
}); | ||
}); | ||
@@ -619,3 +784,10 @@ | ||
// | ||
raft.nodes.push(1, 2, 3, 4, 5); | ||
raft.nodes.push( | ||
{ write: function () {} }, | ||
{ write: function () {} }, | ||
{ write: function () {} }, | ||
{ write: function () {} }, | ||
{ write: function () {} } | ||
); | ||
raft.promote(); | ||
@@ -646,2 +818,124 @@ | ||
}); | ||
// | ||
// Batch of tests which tests the clustering capabilities of liferaft as | ||
// everything works different when you start working with massive clusters. | ||
// | ||
describe('cluster', function () { | ||
var port = 8088 | ||
, net = require('net') | ||
, debug = require('diagnostics')('cluster'); | ||
var Paddle = Raft.extend({ | ||
/** | ||
* Initialize the server so we can receive connections. | ||
* | ||
* @param {Object} options Received optiosn when constructing the client. | ||
* @api private | ||
*/ | ||
initialize: function initialize(options) { | ||
var raft = this; | ||
var server = net.createServer(function incoming(socket) { | ||
socket.on('data', function (buff) { | ||
var data = JSON.parse(buff.toString()); | ||
debug(raft.name +':packet#data', data); | ||
raft.emit('data', data, function reply(data) { | ||
debug(raft.name +':packet#reply', data); | ||
socket.write(JSON.stringify(data)); | ||
socket.end(); | ||
}); | ||
}); | ||
}).listen(this.name); | ||
this.once('end', function enc() { | ||
server.close(); | ||
}); | ||
}, | ||
/** | ||
* Write to the connection. | ||
* | ||
* @param {Object} packet Data to be transfered. | ||
* @param {Function} fn Completion callback. | ||
* @api public | ||
*/ | ||
write: function write(packet, fn) { | ||
var socket = net.connect(this.name) | ||
, raft = this; | ||
debug(raft.name +':packet#write', packet); | ||
socket.on('error', fn); | ||
socket.on('data', function (buff) { | ||
var data; | ||
try { data = JSON.parse(buff.toString()); } | ||
catch (e) { return fn(e); } | ||
debug(raft.name +':packet#callback', packet); | ||
fn(undefined, data); | ||
}); | ||
socket.setNoDelay(true); | ||
socket.write(JSON.stringify(packet)); | ||
} | ||
}); | ||
it('reaches consensus about leader election', function (next) { | ||
var ports = [port++, port++, port++, port++] | ||
, nodes = [] | ||
, node | ||
, i; | ||
for (i = 0; i < ports.length; i++) { | ||
node = new Paddle(ports[i]); | ||
nodes.push(node); | ||
for (var j = 0; j < ports.length; j++) { | ||
if (ports[j] === ports[i]) continue; | ||
node.join(ports[j]); | ||
} | ||
} | ||
for (i = 0; i < nodes.length; i++) { | ||
if (nodes[i] === node) continue; | ||
nodes[i].once('state change', function (to, from) { | ||
throw new Error('I should not change state, im a follower'); | ||
}); | ||
nodes[i].on('leader change', function (to, from) { | ||
assume(to).equals(node.name); | ||
}); | ||
} | ||
// | ||
// Force a node in to a candidate role to ensure that this node will be | ||
// promoted as leader as it's the first to be alive. | ||
// | ||
node.promote(); | ||
node.once('state change', function changed(state) { | ||
assume(state).equals(Paddle.LEADER); | ||
// | ||
// Check if every node is in sync | ||
// | ||
for (i = 0; i < nodes.length; i++) { | ||
if (node === nodes[i]) continue; | ||
assume(nodes[i].leader).equals(node.name); | ||
assume(nodes[i].state).equals(Raft.FOLLOWER); | ||
assume(nodes[i].term).equals(node.term); | ||
} | ||
for (i = 0; i < ports.length; i++) { | ||
nodes[i].end(); | ||
} | ||
next(); | ||
}); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
111601
13
1674
361
6
5
1
+ Addedemits@1.0.x
+ Addedone-time@0.0.x
+ Addedemits@1.0.2(transitive)
+ Addedone-time@0.0.4(transitive)