Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

liferaft

Package Overview
Dependencies
Maintainers
2
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

liferaft - npm Package Compare versions

Comparing version 0.0.1 to 0.0.2

example/index.js

384

index.js
'use strict';
var EventEmitter = require('eventemitter3')
, Tick = require('tick-tock');
, Tick = require('tick-tock')
, one = require('one-time');
/**
* Proper type checking.
*
* @param {Mixed} of Thing we want to know the type of.
* @returns {String} The type.
* @api private
*/
function type(of) {
return Object.prototype.toString.call(of).slice(8, -1).toLowerCase();
}
/**
* Generate a somewhat unique UUID.

@@ -49,4 +39,3 @@ *

* - `name`: An unique id of this given node.
* - `heartbeat min`: Minimum heartbeat timeout.
* - `heartbeat max`: Maximum heartbeat timeout.
* - `heartbeat`: Heartbeat timeout.
* - `election min`: Minimum election timeout.

@@ -57,8 +46,16 @@ * - `election max`: Maximum election timeout.

* - `Log`: A Log constructor that should be used to store commit logs.
* - `state`: Our initial state. This is a private property and should not be
* set you unless you know what your are doing but as you want to use this
* property I highly doubt that that..
*
* Please note, when adding new options make sure that you also update the
* `Node#join` method so it will correctly copy the new option to the clone as
* well.
*
* @constructor
* @param {Mixed} name Unique id or name of this given node.
* @param {Object} options Node configuration.
* @api public
*/
function Node(options) {
function Node(name, options) {
if (!(this instanceof Node)) return new Node(options);

@@ -68,2 +65,5 @@

if ('object' === typeof name) options = name;
else if (!options.name) options.name = name;
this.election = {

@@ -74,6 +74,3 @@ min: Tick.parse(options['election min'] || '150 ms'),

this.beat = {
min: Tick.parse(options['heartbeat min'] || this.election.min),
max: Tick.parse(options['heartbeat max'] || this.election.max)
};
this.beat = Tick.parse(options.heartbeat || '50 ms');

@@ -90,2 +87,3 @@ this.votes = {

this.Log = options.Log;
this.latency = 0;
this.log = null;

@@ -100,7 +98,11 @@ this.nodes = [];

//
this.state = Node.FOLLOWER; // Our current state.
this.leader = ''; // Leader in our cluster.
this.term = 0; // Our current term.
this.state = options.state || Node.FOLLOWER; // Our current state.
this.leader = ''; // Leader in our cluster.
this.term = 0; // Our current term.
this.initialize(options);
if ('function' === this.type(this.initialize)) {
this.once('initialize', this.initialize);
}
this._initialize(options);
}

@@ -114,2 +116,3 @@

Node.prototype.constructor = Node;
Node.prototype.emits = require('emits');

@@ -121,11 +124,18 @@ /**

* something that is part of the Raft protocol but something we might want to
* use internally while we're starting or shutting down our node.
* use internally while we're starting or shutting down our node. The following
* states are generated:
*
* - STOPPED: Assume we're dead.
* - LEADER: We're selected as leader process.
* - CANDIDATE: We want to be promoted to leader.
* - FOLLOWER: We're just following a leader.
* - CHILD: A node that has been added using JOIN.
*
* @type {Number}
* @private
*/
Node.LEADER = 1; // We're selected as leader process.
Node.CANDIDATE = 2; // We want to be promoted to leader.
Node.FOLLOWER = 3; // We're just following a leader.
Node.STOPPED = 4; // Assume we're dead.
Node.states = 'STOPPED,LEADER,CANDIDATE,FOLLOWER,CHILD'.split(',');
for (var s = 0; s < Node.states.length; s++) {
Node[Node.states[s]] = s;
}

@@ -140,3 +150,3 @@ /**

*/
Node.prototype.initialize = function initialize(options) {
Node.prototype._initialize = function initialize(options) {
//

@@ -154,5 +164,6 @@ // Reset our vote as we're starting a new term. Votes only last one term.

//
this.on('state change', function change(currently, previously) {
this.timers.clear();
this.heartbeat();
this.on('state change', function change(state) {
this.timers.clear('heartbeat, election');
this.heartbeat(Node.LEADER === this.state ? this.beat : this.timeout());
this.emit(Node.states[state].toLowerCase());
});

@@ -165,5 +176,8 @@

write = write || nope;
var reason;
if ('object' !== type(packet)) {
return write(new Error('Invalid packet received'));
if ('object' !== this.type(packet)) {
reason = 'Invalid packet received';
this.emit('error', new Error(reason));
return write(this.packet('error', reason));
}

@@ -183,3 +197,3 @@

this.change({
leader: packet.leader,
leader: Node.LEADER === packet.state ? packet.name : packet.leader || this.leader,
state: Node.FOLLOWER,

@@ -189,3 +203,5 @@ term: packet.term

} else if (packet.term < this.term) {
return write(new Error('Stale term detected, we are at '+ this.term));
reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ this.term;
this.emit('error', new Error(reason));
return write(this.packet('error', reason));
}

@@ -198,14 +214,16 @@

// our same term while we're in candidate mode we will recognize their
// leadership and return as follower
// leadership and return as follower.
//
if (Node.LEADER === packet.state && Node.FOLLOWER !== this.state) {
this.change({ state: Node.FOLLOWER, leader: packet.leader });
}
// If we got this far we already know that our terms are the same as it
// would be changed or prevented above..
//
// Always when we receive an message from the Leader we need to reset our
// heartbeat.
//
if (Node.LEADER === packet.state) {
this.heartbeat();
if (Node.FOLLOWER !== this.state) this.change({ state: Node.FOLLOWER });
if (packet.name !== this.leader) this.change({ leader: packet.name });
//
// Always when we receive an message from the Leader we need to reset our
// heartbeat.
//
this.heartbeat(this.timeout());
}

@@ -228,3 +246,3 @@

this.emit('vote', packet, false);
return write(undefined, this.packet('vote', { granted: false }));
return write(this.packet('voted', { granted: false }));
}

@@ -244,3 +262,3 @@

this.emit('vote', packet, false);
return write(undefined, this.packet('vote', { granted: false }));
return write(this.packet('voted', { granted: false }));
}

@@ -255,3 +273,12 @@

this.emit('vote', packet, true);
write(undefined, this.packet('vote', { granted: true }));
this.change({ leader: packet.name, term: packet.term });
write(this.packet('voted', { granted: true }));
//
// We've accepted someone as potential new leader, so we should reset
// our heartbeat to prevent this node from timing out after voting.
// Which would again increment the term causing us to be next CANDIDATE
// and invalidates the request we just got, so that's silly willy.
//
this.heartbeat(this.timeout());
break;

@@ -267,3 +294,3 @@

if (Node.CANDIDATE !== this.state) {
return write(new Error('No longer a candidate'));
return write(this.packet('error', 'No longer a candidate, ignoring vote'));
}

@@ -275,10 +302,17 @@

//
if (packet.data.granted) this.votes.granted++;
if (packet.data.granted) {
this.votes.granted++;
}
//
// Check if we've received the minimal amount of votes required for this
// current voting round to be considered valid
// current voting round to be considered valid.
//
if (this.quorum(this.votes.granted)) {
this.change({ leader: this.name, state: Node.LEADER });
//
// Send a heartbeat message to all connected clients.
//
this.message(Node.FOLLOWER, this.packet('append'));
}

@@ -292,2 +326,6 @@

case 'error':
this.emit('error', new Error(packet.data));
break;
//

@@ -307,2 +345,8 @@ // Remark: Are we assuming we are getting an appendEntries from the

//
// RPC command
//
case 'exec':
break;
//
// Unknown event, we have no idea how to process this so we're going to

@@ -312,3 +356,7 @@ // return an error.

default:
write(new Error('Unknown message type: '+ packet.type));
if (this.listeners('rpc').length) {
this.emit('rpc', packet, write);
} else {
write(this.packet('error', 'Unknown message type: '+ packet.type));
}
}

@@ -318,7 +366,6 @@ });

//
// We do not need to execute the functionality below if we have a write method
// assigned to our selfs. This prevents us from timing out and other nasty
// stuff.
// We do not need to execute the rest of the functionality below as we're
// currently running as "child" node of the cluster not as the "root" node.
//
if (this.write) return;
if (Node.CHILD === this.state) return;

@@ -330,3 +377,3 @@ //

//
if ('function' === type(this.Log)) {
if ('function' === this.type(this.Log)) {
this.log = new this.Log(this, options);

@@ -340,6 +387,21 @@ }

//
this.heartbeat();
// We want to call the `initialize` event before starting a heartbeat so
// implementors have some time to start listening for incoming ping packets.
//
this.emit('initialize');
this.heartbeat(this.timeout());
};
/**
* Proper type checking.
*
* @param {Mixed} of Thing we want to know the type of.
* @returns {String} The type.
* @api private
*/
Node.prototype.type = function type(of) {
return Object.prototype.toString.call(of).slice(8, -1).toLowerCase();
};
/**
* Check if we've reached our quorum (a.k.a. minimum amount of votes requires

@@ -350,6 +412,7 @@ * for a voting round to be considered valid) for the given amount of votes.

* @returns {Boolean}
* @api private
* @api public
*/
Node.prototype.quorum = function quorum(responses) {
if (!this.nodes.length || !responses) return false;
return responses >= this.majority();

@@ -362,3 +425,3 @@ };

* @returns {Number}
* @api private
* @api public
*/

@@ -370,2 +433,48 @@ Node.prototype.majority = function majority() {

/**
* Attempt to run a function indefinitely until the callback is called.
*
* @param {Function} attempt Function that needs to be attempted.
* @param {Function} fn Completion callback.
* @param {Number} timeout Which timeout should we use.
* @returns {Node}
* @api public
*/
Node.prototype.indefinitely = function indefinitely(attempt, fn, timeout) {
var uuid = UUID()
, node = this;
(function again() {
//
// We need to force async execution here because we do not want to saturate
// the event loop with sync executions. We know that it's important these
// functions are retried indefinitely but if it's called synchronously we will
// not have time to receive data or updates.
//
var next = one(function force(err, data) {
if (!node.timers) return; // We're been destroyed, ignore all.
node.timers.setImmediate(uuid +'@async', function async() {
if (err) {
node.emit('error', err);
return again();
}
fn(data);
});
});
//
// Ensure that the assigned callback has the same context as our node.
//
attempt.call(node, next);
node.timers.setTimeout(uuid, function timeoutfn() {
next(new Error('Timed out, attempting to retry again'));
}, +timeout || node.timeout());
}());
return this;
};
/**
* Process a change in the node.

@@ -407,3 +516,3 @@ *

Node.prototype.heartbeat = function heartbeat(duration) {
duration = duration || this.timeout('beat');
duration = duration || this.beat;

@@ -415,3 +524,3 @@ if (this.timers.active('heartbeat')) {

this.timers.setTimeout('heartbeat', function () {
this.timers.setTimeout('heartbeat', function heartbeattimeout() {
if (Node.LEADER !== this.state) {

@@ -423,4 +532,7 @@ this.emit('heartbeat timeout');

//
// @TODO We're the LEADER so we should be broadcasting.
// @TODO this is a temporary hack to get the cluster running. According to
// the raft spec we should be sending empty append requests.
//
this.message(Node.FOLLOWER, this.packet('append'));
this.heartbeat(this.beat);
}, duration);

@@ -432,10 +544,97 @@

/**
* Send a message to connected nodes within our cluster. The following messaging
* patterns (who) are available:
*
* - Node.LEADER : Send a message to cluster's current leader.
* - Node.FOLLOWER : Send a message to all non leaders.
* - Node.CHILD : Send a message to everybody.
* - <name> : Send a message to a node based on the name.
*
* @param {Mixed} who Recipient of the message.
* @param {Mixed} what The data we need to send.
* @param {Function} when Completion callback
* @returns {Node}
* @api public
*/
Node.prototype.message = function message(who, what, when) {
when = when || nope;
var length = this.nodes.length
, latency = []
, node = this
, nodes = []
, i = 0;
switch (who) {
case Node.LEADER: for (; i < length; i++)
if (node.leader === node.nodes[i].name) {
nodes.push(node.nodes[i]);
}
break;
case Node.FOLLOWER: for (; i < length; i++)
if (node.leader !== node.nodes[i].name) {
nodes.push(node.nodes[i]);
}
break;
case Node.CHILD:
Array.prototype.push.apply(nodes, node.nodes);
break;
default: for (; i < length; i++)
if (who === node.nodes[i].name) {
nodes.push(node.nodes[i]);
}
}
/**
* A small wrapper to force indefinitely sending of a certain packet.
*
* @param {Node} client Node we need to write a message to.
* @param {Object} data Message that needs to be send.
* @api private
*/
function wrapper(client, data) {
var start = +new Date();
client.write(data, function written(err, data) {
latency.push(+new Date() - start);
//
// OK, so this is the strange part here. We've broadcasted messages and
// got replies back. This reply contained data so we need to process it.
// What if the data is incorrect? Then we have no way at the moment to
// send back reply to a reply to the server.
//
if (err) node.emit('error', err);
else if (data) node.emit('data', data);
//
// Messaging has been completed.
//
if (latency.length === length) {
node.timing(latency);
}
});
}
length = nodes.length;
i = 0;
for (; i < length; i++) {
wrapper(nodes[i], what);
}
return node;
};
/**
* Generate the various of timeouts.
*
* @param {String} which Type of timeout we want to generate.
* @returns {Number}
* @api private
*/
Node.prototype.timeout = function timeout(which) {
var times = this[which];
Node.prototype.timeout = function timeout() {
var times = this.election;

@@ -446,2 +645,26 @@ return Math.floor(Math.random() * (times.max - times.min + 1) + times.min);

/**
* Calculate if our average latency causes us to come dangerously close to the
* minimum election timeout.
*
* @param {Array} latency Latency of the last broadcast.
* @param {Boolean} Success-fully calculated the threshold.
* @api private
*/
Node.prototype.timing = function timing(latency) {
if (Node.STOPPED === this.state) return false;
for (var i = 0, sum = 0; i < latency.length; i++) {
sum += latency[i];
}
this.latency = Math.floor(sum / latency.length);
if (this.latency > this.election.min * this.threshold) {
this.emit('threshold');
}
return true;
};
/**
* Raft §5.2:

@@ -477,5 +700,3 @@ *

for (; i < this.nodes.length; i++) {
this.nodes[i].write(packet);
}
this.message(Node.FOLLOWER, this.packet('vote'));

@@ -488,4 +709,4 @@ //

this.timers
.clear() // Clear all old timers, this one is the most important now.
.setTimeout('election', this.promote, this.timeout('election'));
.clear('heartbeat, election')
.setTimeout('election', this.promote, this.timeout());

@@ -508,3 +729,2 @@ return this;

name: this.name, // Name of the sender.
data: data, // Custom data we send.
type: type, // Message type.

@@ -522,2 +742,4 @@ leader: this.leader, // Who is our leader.

if (this.log) packet.last = { term: this.term, index: this.log.index };
if (arguments.length === 2) packet.data = data;
return packet;

@@ -539,8 +761,7 @@ };

var node = {
'election min': this.election.min,
'election max': this.election.max,
'heartbeat min': this.beat.min,
'heartbeat max': this.beat.max,
'threshold': this.threshold,
'Log': this.Log
'Log': this.Log,
'election max': this.election.max,
'election min': this.election.min,
'heartbeat': this.beat,
'threshold': this.threshold,
}, key;

@@ -566,3 +787,3 @@

Node.prototype.join = function join(name, write) {
if ('function' === type(name)) {
if ('function' === this.type(name)) {
write = name; name = null;

@@ -572,4 +793,5 @@ }

var node = this.clone({
write: write, // Function that receives our writes.
name: name // A custom name for the node we added.
write: write, // Optional function that receives our writes.
name: name, // A custom name for the node we added.
state: Node.CHILD // We are a node in the cluster.
});

@@ -576,0 +798,0 @@

{
"name": "liferaft",
"version": "0.0.1",
"version": "0.0.2",
"description": "Consensus protocol based on raft, but only for saving lifes.",

@@ -28,5 +28,7 @@ "main": "index.js",

"dependencies": {
"emits": "1.0.x",
"eventemitter3": "0.1.x",
"extendible": "0.1.x",
"immediate": "3.0.x",
"one-time": "0.0.x",
"tick-tock": "0.0.x"

@@ -36,2 +38,3 @@ },

"assume": "0.0.x",
"diagnostics": "0.0.x",
"istanbul": "0.3.x",

@@ -38,0 +41,0 @@ "mocha": "2.0.x",

@@ -47,3 +47,3 @@ ```

replicate them across the cluster, forcing the other logs to agree with its
own ([Section 5.3][5.4]).
own ([Section 5.3][5.3]).
- **Safety**: the key safety property for Raft is the State Machine Safety

@@ -50,0 +50,0 @@ Property in Figure 3: if any server has applied a particular log entry to its

# liferaft
[![Build Status](https://travis-ci.org/unshiftio/liferaft.svg?branch=master)](https://travis-ci.org/unshiftio/liferaft)
[![Coverage Status](https://coveralls.io/repos/unshiftio/liferaft/badge.png?branch=master)](https://coveralls.io/r/unshiftio/liferaft?branch=master)
[![Made by unshift](https://img.shields.io/badge/made%20by-unshift-00ffcc.svg?style=flat-square)](http://unshift.io)[![Version npm](http://img.shields.io/npm/v/liferaft.svg?style=flat-square)](http://browsenpm.org/package/liferaft)[![Build Status](http://img.shields.io/travis/unshiftio/liferaft/master.svg?style=flat-square)](https://travis-ci.org/unshiftio/liferaft)[![Dependencies](https://img.shields.io/david/unshiftio/liferaft.svg?style=flat-square)](https://david-dm.org/unshiftio/liferaft)[![Coverage Status](http://img.shields.io/coveralls/unshiftio/liferaft/master.svg?style=flat-square)](https://coveralls.io/r/unshiftio/liferaft?branch=master)[![IRC channel](http://img.shields.io/badge/IRC-irc.freenode.net%23unshift-00a8ff.svg?style=flat-square)](http://webchat.freenode.net/?channels=unshift)

@@ -17,2 +16,24 @@ `liferaft` is an JavaScript implementation of the [Raft] consensus algorithm.

## Table Of Contents
- [Installation](#installation)
- [Usage](#usage)
- [Configuration](#configuration)
- [Events](#events)
- [LifeRaft.states](#liferaftstates)
- [LifeRaft.{state}](#liferaftfollower-leader-candidate-stopped-child)
- [LifeRaft#type()](#liferafttypeof)
- [LifeRaft#quorum()](#liferaftquorumresponses)
- [LifeRaft#majority()](#liferaftmajority)
- [LifeRaft#indefinitely()](#liferaftindefinitelyattempt-fn-timeout)
- [LifeRaft#packet()](#liferaftpackettype-data)
- [LifeRaft#message()](#liferaftmessagewho-what-when)
- [LifeRaft#join()](#liferaftjoinname-write)
- [LifeRaft#leave()](#liferaftleavename)
- [LifeRaft#promote()](#liferaftpromote)
- [LifeRaft#end()](#liferaftend)
- [Extending](#extending)
- [Initialization](#initialization)
- [License](#license)
## Usage

@@ -27,3 +48,3 @@

var LifeRaft = require('liferaft')
, raft = new Raft();
, raft = new Raft('name', { /* optional options */});
```

@@ -38,4 +59,7 @@

extendible as possible so you, as a developer, have complete freedom on how you
want to implement Raft in your architecture. Things like transport layers and
replicated logs are all made optional so you can decided.
want to implement Raft in your architecture. This also means that we ship this
library without any build in transport. This allows you to use it with your
existing technology stack and environment. If you want to use `SharedWorkers` as
transport in the browser? Awesome, you can do that. Want to use it on node?
There are literally thousands of different transport libraries that you can use.

@@ -47,8 +71,11 @@ ### Configuration

- `id` A unique id of the node that we just created. If none is supplied we will
generate a random UUID.
- `heartbeat min` Minimum heartbeat timeout.
- `heartbeat max` Maximum heartbeat timeout.
- `name` A unique name of the node that we just created. If none is supplied we
will generate a random UUID.
- `heartbeat` The heartbeat timeout. Make sure that this value is lower then
your minimum election timeout and take message latency in consideration when
specifying this and the minimum election value.
- `election min` Minimum election timeout.
- `election max` Maximum election timeout.
- `threshold` Threshold for when the heartbeat and latency is to close to the
minimum election timeout.
- `Log`: An Log compatible constructor we which use for state and data

@@ -58,5 +85,3 @@ replication.

The timeout values can be configured with either a number which represents the
time milliseconds or a human readable time string such as `10 ms`. The election
timeout is the leading timeout if you don't provide default values for the
heartbeat it will default to the values of the election timeout. The heartbeat
time milliseconds or a human readable time string such as `10 ms`. The heartbeat
timeouts are used to detect a disconnection from the `LEADER` process if no

@@ -84,21 +109,149 @@ message has been received within the given timeout we assume its dead that we

`end` | This Raft instance has ended.
`initialize` | The node has been fully initialized.
`error` | An error happened while doing.. Things!
`threshold` | The heartbeat timeout is getting close to election timeout.
`leader` | Our state changed to leader.
`follower` | Our state changed to follower.
`candidate` | Our state changed to candidate.
`stopped` | Our state changed to stopped.
### LifeRaft.promote()
---
**Private method, use with caution**
**Please note that the following properties are exposed on the `constructor` not
on the `prototype`.**
This promotes the Node from `FOLLOWER` to `CANDIDATE` and starts requesting
votes from other connected nodes. When the majority has voted in favour of this
node, it will become `LEADER`.
### LifeRaft.states
This is an array that contains the names of the states. It can be used to create
a human readable string from your current state.
```js
raft.promote();
console.log(LifeRaft.states[raft.state]); // FOLLOWER
```
### LifeRaft.join(name, write)
### LifeRaft.{FOLLOWER,LEADER,CANDIDATE,STOPPED,CHILD}
Add a new raft node to your cluster. The name is optional, but it would be nice
if it was the name of the node that you just added to the cluster.
These are the values that we set as state. If you instance is a leader it's
state will be set to `LifeRaft.LEADER`.
---
### LifeRaft#type(of)
Check the type of the given thing. This returns the correct type for arrays,
objects, regexps and all the things. It's used internally in the library but
might be useful for you as user as well. The function requires one argument
which would be the `thing` who's type you need figure out.
```js
raft.type([]); // array
raft.type({}); // object
```
### LifeRaft#quorum(responses)
Check if we've reached our quorum (a.k.a. minimum amount of votes requires for a
voting round to be considered valid) for the given amount of votes. This depends
on the amount of joined nodes. It requires one argument which is the amount of
responses that have been received.
```js
raft.join('tcp://127.0.0.1');
raft.join('tcp://127.0.0.2');
raft.join('tcp://127.0.0.3');
raft.join('tcp://127.0.0.4');
raft.join('tcp://127.0.0.4');
raft.quorum(5); // true
raft.quorum(2); // false
```
### LifeRaft#majority()
Returns the majority that needs to be reached for our quorum.
```js
raft.majority(); // 4
```
### LifeRaft#indefinitely(attempt, fn, timeout)
According to section x.x of the Raft paper it's required that we retry sending
the RPC messages until they succeed. This function will run the given `attempt`
function until the received callback has been called successfully and within our
given timeout. If this is not the case we will call the attempt function again
and again until it succeeds. The function requires 3 arguments:
1. `attempt`, The function that needs to be called over and over again until he
calls the receiving callback successfully and without errors as we assume an
error first callback pattern.
2. `fn`, Completion callback, we've successfully executed the attempt.
3. `timeout`, Time the attempt is allowed to take.
```js
raft.indefinitely(function attemp(next) {
dosomething(function (err, data) {
//
// if there is no error then we wil also pass the data to the completion
// callback.
//
return next(err, data);
});
}, function done(data) {
// Successful execution.
}, 1000);
```
### LifeRaft#packet(type, data)
Generate a new packet object that can be transfered to a client. The method
accepts 2 arguments:
1. `type`, Type of packet that we want to transfer.
2. `data`, Data that should be transfered.
```js
var packet = raft.packet('vote', { foo: 'bar' });
```
These packages will contain the following information:
- `state` If we are a `LEADER`, `FOLLOWER` or `CANDIDATE`
- `term` Our current term.
- `name` The name of this node.
- `leader` The name of our leader.
- `last` If logs are enabled we also include the last committed term and index.
And of course also the `type` which is the type you passed this function in and
the `data` that you want to send.
### LifeRaft#message(who, what, when)
The message method is somewhat private but it might also be useful for you as
developer. It's a message interface between every connected node in your
cluster. It allows you to send messages the current leader, or only the
followers or everybody. This allows you easily build other scale and high
availability patterns on top of this module and take advantage of all the
features that this library is offering. This method accepts 2 arguments:
1. `who`, The messaging pattern/mode you want it use. It can either be:
- `LifeRaft.LEADER`: Send message to the current leader.
- `LifeRaft.FOLLOWER`: Send to everybody who is not a leader.
- `LifeRaft.CHILD`: Send to every child in the cluster (everybody).
- `<node name>`: Find the node based on the provided name.
2. `what`, The message body you want to use. We high suggest using the `.packet`
method for constructing cluster messages so additional state can be send.
3. `when`, Optional completion callback for when all messages are send.
This message does have a side affect it also calculates the latency for sending
the messages so we know if we are dangerously close to our threshold.
### LifeRaft#join(name, write)
Add a new raft node to your cluster. All parameters are optional but normally
you would pass in the name or address with the location of the server you want
to add. The write method is only optional if you are using a custom instance
that already has the `write` method defined.
```js
var node = raft.join('127.0.0.1:8080', function write(packet) {

@@ -120,3 +273,3 @@ // Write the message to the actual server that you just added.

### LifeRaft.leave(name)
### LifeRaft#leave(name)

@@ -142,4 +295,16 @@ Now that you've added a new node to your raft cluster it's also good to know

### LifeRaft.end()
### LifeRaft#promote()
**Private method, use with caution**
This promotes the Node from `FOLLOWER` to `CANDIDATE` and starts requesting
votes from other connected nodes. When the majority has voted in favour of this
node, it will become `LEADER`.
```js
raft.promote();
```
### LifeRaft#end()
This signals that the node wants to be removed from the cluster. Once it has

@@ -156,2 +321,44 @@ successfully removed it self, it will emit the `end` event.

## Extending
LifeRaft uses the same pattern as Backbone.js to extend it's prototypes. It
exposes an `.extend` method on the constructor. When you call this method it
will return a fresh LifeRaft constructor with the newly applied prototypes and
properties. So these extends will not affect the default instance. This extend
method accepts 2 arguments.
1. Object with properties that should be merged with the `prototype`.
2. Object with properties that should be merged with the constructor.
```js
var LifeBoat = LifeRaft.extend({
foo: function foo() {
return 'bar';
}
});
```
### Initialization
When you extend the `LifeRaft` instance you can assign a special `initialize`
method. This method will be called when our `LifeRaft` code has been fully
initialized and we're ready to initialize your code. Please bare in mind that
this is a synchronous invocation and that we will start heart beat timeout after
the execution of the function. This method is ideal for implementing your own
transport technology. The function is invoked with one argument, these are the
options that were used to construct the instance. If no options were provided we
will default to empty object so this argument is always an available.
```js
var LifeBoat = LifeRaft.extend({
socket: null,
initialize: function initialize(options) {
this.socket = new CustomTransport(this.name);
}
});
```
In parallel to the execution of your `initialize` method we also emit an
`initialize` event. This receives the same amount of arguments.
## License

@@ -158,0 +365,0 @@

@@ -40,8 +40,6 @@ /* istanbul ignore next */

'election max': '150 ms',
'heartbeat min': '400 ms',
'heartbeat max': '600 ms'
'heartbeat': '600 ms'
});
assume(raft.beat.max).equals(600);
assume(raft.beat.min).equals(400);
assume(raft.beat).equals(600);
assume(raft.election.max).equals(150);

@@ -54,8 +52,6 @@ assume(raft.election.min).equals(100);

'election max': 150,
'heartbeat min': 400,
'heartbeat max': 600
'heartbeat': 600
});
assume(raft.beat.max).equals(600);
assume(raft.beat.min).equals(400);
assume(raft.beat).equals(600);
assume(raft.election.max).equals(150);

@@ -78,4 +74,112 @@ assume(raft.election.min).equals(100);

});
it('accepts the name as first argument', function () {
raft.end();
raft = new Raft('foo');
assume(raft.name).equals('foo');
});
it('will call the initialization function if exists', function (next) {
var MyRaft = Raft.extend({
initialize: function () {
var node = this;
setTimeout(function () {
node.end();
next();
}, 0);
}
});
new MyRaft();
});
});
describe('#indefinitely', function () {
it('it runs until the supplied callback is called', function (next) {
var attempts = 0;
raft.indefinitely(function attempt(done) {
attempts++;
if (attempts === 5) done();
}, next, 10);
});
it('it runs until the supplied callback is called without err', function (next) {
var attempts = 0;
raft.indefinitely(function attempt(done) {
attempts++;
if (attempts === 5) done();
else done(new Error('failure'));
}, next, 10);
});
});
describe('#message', function () {
it('calls all joined nodes', function (next) {
var pattern = '';
raft.join(function () { pattern += 'a'; });
raft.join(function () { pattern += 'b'; });
raft.join(function () { pattern += 'c'; });
raft.message(Raft.FOLLOWER, raft.packet('foo'));
setTimeout(function () {
assume(pattern).equals('abc');
next();
}, 20);
});
it('emits the `data` event with response', function (next) {
var node = raft.join(function (data, fn) {
fn(undefined, node.packet('external'));
});
raft.on('rpc', function (packet) {
assume(packet.type).equals('external');
assume(packet.name).equals(node.name);
assume(raft.name).does.not.equal(node.name);
next();
});
raft.message(Raft.FOLLOWER, raft.packet('foo'));
});
it('sends message to cluster leader', function (next) {
var leader = raft.join(function (packet) {
assume(packet.leader).equals(this.name);
assume(packet.type).equals('leader');
next();
});
raft.join(function () { throw new Error('We are followers, not leader'); });
raft.join(function () { throw new Error('We are followers, not leader'); });
raft.join(function () { throw new Error('We are followers, not leader'); });
raft.change({ leader: leader.name });
raft.message(Raft.LEADER, raft.packet('leader'));
});
it('sends a node specified by name', function (next) {
raft.join(function () { throw new Error('You sir, msg the wrong node'); });
var node = raft.join(function (packet) {
assume(packet.type).equals('named');
next();
});
raft.join(function () { throw new Error('You sir, msg the wrong node'); });
raft.join(function () { throw new Error('You sir, msg the wrong node'); });
raft.message(node.name, raft.packet('named'));
});
});
describe('#timeout', function () {

@@ -88,3 +192,3 @@ it('generates a random timeout between min/max', function () {

for (var i = 0; i < times; i++) {
timeouts.push(raft.timeout('election'));
timeouts.push(raft.timeout());
}

@@ -106,3 +210,3 @@

//
assume(Object.keys(same).length).is.above(70);
// assume(Object.keys(same).length).is.above(70);
});

@@ -483,3 +587,6 @@

raft.emit('data', { type: 'bar' }, function (err) {
assume(err).is.instanceOf(Error);
assume(err).is.not.instanceOf(Error);
assume(err).is.a('object');
assume(err.type).equals('error');
assume(err.data).includes('Unknown');
next();

@@ -491,3 +598,7 @@ });

raft.emit('data', 1, function (err) {
assume(err).is.instanceOf(Error);
assume(err).is.not.instanceOf(Error);
assume(err).is.a('object');
assume(err.type).equals('error');
assume(err.data).includes('Invalid');
next();

@@ -524,2 +635,56 @@ });

});
describe('state events', function () {
it('should emit a `leader` event', function (next) {
raft.once('leader', function () {
next();
});
raft.change({ state: Raft.LEADER });
});
it('should emit a `follower` event', function (next) {
raft.once('follower', function () {
next();
});
raft.change({ state: Raft.LEADER }); // Default is follower, so change first
raft.change({ state: Raft.FOLLOWER });
});
it('should emit a `candidate` event', function (next) {
raft.once('candidate', function () {
next();
});
raft.change({ state: Raft.CANDIDATE });
});
it('should emit a `stopped` event', function (next) {
raft.once('stopped', function () {
next();
});
raft.change({ state: Raft.STOPPED });
});
it('should emit a `child` event', function (next) {
raft.once('child', function () {
next();
});
raft.change({ state: Raft.CHILD });
});
});
describe('rpc', function () {
it('should emit an rpc event when an unknown package arrives', function (next) {
raft.once('rpc', function (packet) {
assume(packet.type).equals('shizzle');
next();
});
raft.emit('data', raft.packet('shizzle'));
});
});
});

@@ -619,3 +784,10 @@

//
raft.nodes.push(1, 2, 3, 4, 5);
raft.nodes.push(
{ write: function () {} },
{ write: function () {} },
{ write: function () {} },
{ write: function () {} },
{ write: function () {} }
);
raft.promote();

@@ -646,2 +818,124 @@

});
//
// Batch of tests which tests the clustering capabilities of liferaft as
// everything works different when you start working with massive clusters.
//
describe('cluster', function () {
var port = 8088
, net = require('net')
, debug = require('diagnostics')('cluster');
var Paddle = Raft.extend({
/**
* Initialize the server so we can receive connections.
*
* @param {Object} options Received optiosn when constructing the client.
* @api private
*/
initialize: function initialize(options) {
var raft = this;
var server = net.createServer(function incoming(socket) {
socket.on('data', function (buff) {
var data = JSON.parse(buff.toString());
debug(raft.name +':packet#data', data);
raft.emit('data', data, function reply(data) {
debug(raft.name +':packet#reply', data);
socket.write(JSON.stringify(data));
socket.end();
});
});
}).listen(this.name);
this.once('end', function enc() {
server.close();
});
},
/**
* Write to the connection.
*
* @param {Object} packet Data to be transfered.
* @param {Function} fn Completion callback.
* @api public
*/
write: function write(packet, fn) {
var socket = net.connect(this.name)
, raft = this;
debug(raft.name +':packet#write', packet);
socket.on('error', fn);
socket.on('data', function (buff) {
var data;
try { data = JSON.parse(buff.toString()); }
catch (e) { return fn(e); }
debug(raft.name +':packet#callback', packet);
fn(undefined, data);
});
socket.setNoDelay(true);
socket.write(JSON.stringify(packet));
}
});
it('reaches consensus about leader election', function (next) {
var ports = [port++, port++, port++, port++]
, nodes = []
, node
, i;
for (i = 0; i < ports.length; i++) {
node = new Paddle(ports[i]);
nodes.push(node);
for (var j = 0; j < ports.length; j++) {
if (ports[j] === ports[i]) continue;
node.join(ports[j]);
}
}
for (i = 0; i < nodes.length; i++) {
if (nodes[i] === node) continue;
nodes[i].once('state change', function (to, from) {
throw new Error('I should not change state, im a follower');
});
nodes[i].on('leader change', function (to, from) {
assume(to).equals(node.name);
});
}
//
// Force a node in to a candidate role to ensure that this node will be
// promoted as leader as it's the first to be alive.
//
node.promote();
node.once('state change', function changed(state) {
assume(state).equals(Paddle.LEADER);
//
// Check if every node is in sync
//
for (i = 0; i < nodes.length; i++) {
if (node === nodes[i]) continue;
assume(nodes[i].leader).equals(node.name);
assume(nodes[i].state).equals(Raft.FOLLOWER);
assume(nodes[i].term).equals(node.term);
}
for (i = 0; i < ports.length; i++) {
nodes[i].end();
}
next();
});
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc