Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

liferaft

Package Overview
Dependencies
Maintainers
4
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

liferaft - npm Package Compare versions

Comparing version 0.1.1 to 1.0.0

.gitattributes

66

example/index.js

@@ -1,8 +0,7 @@

'use strict';
var debug = require('diagnostics')('raft')
const debug = require('diagnostics')('raft')
, argv = require('argh').argv
, LifeRaft = require('../')
, msg;
, LifeRaft = require('../');
let msg;
if (argv.queue) msg = require(argv.queue);

@@ -15,10 +14,3 @@ else msg = require('axon');

//
var MsgRaft = LifeRaft.extend({
/**
* Reference our socket.
*
* @type {Msg}
* @private
*/
socket: null,
class MsgRaft extends LifeRaft {

@@ -31,19 +23,16 @@ /**

*/
initialize: function initialize(options) {
var raft = this
, socket;
initialize (options) {
debug('initializing reply socket on port %s', this.address);
debug('initializing reply socket on port %s', raft.address);
const socket = this.socket = msg.socket('rep');
socket = raft.socket = msg.socket('rep');
socket.bind(raft.address);
socket.on('message', function (data, fn) {
raft.emit('data', data, fn);
socket.bind(this.address);
socket.on('message', (data, fn) => {
this.emit('data', data, fn);
});
socket.on('error', function err() {
debug('failed to initialize on port: ', raft.address);
socket.on('error', () => {
debug('failed to initialize on port: ', this.address);
});
},
}

@@ -57,21 +46,18 @@ /**

*/
write: function write(packet, fn) {
var raft = this
, socket = raft.socket;
write (packet, fn) {
if (!this.socket) {
this.socket = msg.socket('req');
if (!socket) {
socket = raft.socket = msg.socket('req');
socket.connect(raft.address);
socket.on('error', function err() {
console.error('failed to write to: ', raft.address);
this.socket.connect(this.address);
this.socket.on('error', function err() {
console.error('failed to write to: ', this.address);
});
}
debug('writing packet to socket on port %s', raft.address);
socket.send(packet, function (data) {
debug('writing packet to socket on port %s', this.address);
this.socket.send(packet, (data) => {
fn(undefined, data);
});
}
});
}

@@ -83,3 +69,3 @@ //

//
var ports = [
const ports = [
8081, 8082,

@@ -99,3 +85,3 @@ 8083, 8084,

//
var raft = new MsgRaft('tcp://127.0.0.1:'+ port, {
const raft = new MsgRaft('tcp://127.0.0.1:'+ port, {
'election min': 2000,

@@ -133,3 +119,3 @@ 'election max': 5000,

//
ports.forEach(function join(nr) {
ports.forEach((nr) => {
if (!nr || port === nr) return;

@@ -136,0 +122,0 @@

@@ -26,5 +26,5 @@ {

"axon": "2.0.x",
"diagnostics": "0.0.x",
"nanomsg": "0.2.x"
"diagnostics": "1.1.x",
"nanomsg": "4.0.x"
}
}

@@ -1,4 +0,2 @@

'use strict';
var debug = require('diagnostics')('raft')
const debug = require('diagnostics')('raft')
, argv = require('argh').argv

@@ -12,10 +10,3 @@ , LifeRaft = require('../')

//
var TCPRaft = LifeRaft.extend({
/**
* Reference our socket.
*
* @type {Msg}
* @private
*/
socket: null,
class TCPRaft extends LifeRaft {

@@ -28,12 +19,12 @@ /**

*/
initialize: function initialize(options) {
var raft = this;
initialize (options) {
// var raft = this;
var server = net.createServer(function incoming(socket) {
socket.on('data', function (buff) {
const server = net.createServer((socket) => {
socket.on('data', buff => {
var data = JSON.parse(buff.toString());
debug(raft.address +':packet#data', data);
raft.emit('data', data, function reply(data) {
debug(raft.address +':packet#reply', data);
debug(this.address +':packet#data', data);
this.emit('data', data, data => {
debug(this.address +':packet#reply', data);
socket.write(JSON.stringify(data));

@@ -48,3 +39,3 @@ socket.end();

});
},
}

@@ -59,10 +50,9 @@ /**

*/
write: function write(packet, fn) {
var socket = net.connect(this.address)
, raft = this;
write (packet, fn) {
const socket = net.connect(this.address);
debug(raft.address +':packet#write', packet);
debug(this.address +':packet#write', packet);
socket.on('error', fn);
socket.on('data', function (buff) {
var data;
socket.on('data', buff => {
let data;

@@ -72,3 +62,3 @@ try { data = JSON.parse(buff.toString()); }

debug(raft.address +':packet#callback', packet);
debug(this.address +':packet#callback', packet);
fn(undefined, data);

@@ -80,3 +70,3 @@ });

}
});
}

@@ -88,3 +78,3 @@ //

//
var ports = [
const ports = [
8081, 8082,

@@ -98,3 +88,3 @@ 8083, 8084,

//
var port = +argv.port || ports[0];
const port = +argv.port || ports[0];

@@ -105,3 +95,3 @@ //

//
var raft = new TCPRaft(port, {
const raft = new TCPRaft(port, {
'election min': 2000,

@@ -112,7 +102,7 @@ 'election max': 5000,

raft.on('heartbeat timeout', function () {
raft.on('heartbeat timeout', () => {
debug('heart beat timeout, starting election');
});
raft.on('term change', function (to, from) {
raft.on('term change', (to, from) => {
debug('were now running on term %s -- was %s', to, from);

@@ -125,3 +115,3 @@ }).on('leader change', function (to, from) {

raft.on('leader', function () {
raft.on('leader', () => {
console.log('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@');

@@ -132,3 +122,3 @@ console.log('I am elected as leader');

raft.on('candidate', function () {
raft.on('candidate', () => {
console.log('----------------------------------');

@@ -142,3 +132,3 @@ console.log('I am starting as candidate');

//
ports.forEach(function join(nr) {
ports.forEach(nr => {
if (!nr || port === nr) return;

@@ -145,0 +135,0 @@

@@ -1,8 +0,8 @@

'use strict';
const EventEmitter = require('eventemitter3');
const modification = require('modification');
const Tick = require('tick-tock');
const ms = require('millisecond');
const one = require('one-time');
const emits = require('emits');
var EventEmitter = require('eventemitter3')
, Tick = require('tick-tock')
, ms = require('millisecond')
, one = require('one-time');
/**

@@ -13,3 +13,3 @@ * Generate a somewhat unique UUID.

* @returns {String} UUID.
* @api private
* @private
*/

@@ -28,6 +28,14 @@ function UUID() {

/**
* Emit when modifications are made.
*
* @type {Fucntion}
* @private
*/
const change = require('modification')(' change');
/**
* A nope function for when people don't want message acknowledgements. Because
* they don't care about CAP.
*
* @api private
* @private
*/

@@ -59,436 +67,457 @@ function nope() {}

* @param {Object} options Raft configuration.
* @api public
* @public
*/
function Raft(address, options) {
var raft = this;
class Raft extends EventEmitter {
constructor(address, options = {}) {
super();
if (!(raft instanceof Raft)) return new Raft(options);
var raft = this;
options = options || {};
if ('object' === typeof address) options = address;
else if (!options.address) options.address = address;
if ('object' === typeof address) options = address;
else if (!options.address) options.address = address;
raft.election = {
min: ms(options['election min'] || '150 ms'),
max: ms(options['election max'] || '300 ms')
};
raft.election = {
min: ms(options['election min'] || '150 ms'),
max: ms(options['election max'] || '300 ms')
};
raft.beat = ms(options.heartbeat || '50 ms');
raft.beat = ms(options.heartbeat || '50 ms');
raft.votes = {
for: null, // Who did we vote for in this current term.
granted: 0 // How many votes we're granted to us.
};
raft.votes = {
for: null, // Who did we vote for in this current term.
granted: 0 // How many votes we're granted to us.
};
raft.write = raft.write || options.write || null;
raft.threshold = options.threshold || 0.8;
raft.address = options.address || UUID();
raft.timers = new Tick(raft);
raft.Log = options.Log;
raft.change = change;
raft.emits = emits;
raft.latency = 0;
raft.log = null;
raft.nodes = [];
raft.write = raft.write || options.write || null;
raft.threshold = options.threshold || 0.8;
raft.address = options.address || UUID();
raft.timers = new Tick(raft);
raft.Log = options.Log;
raft.latency = 0;
raft.log = null;
raft.nodes = [];
//
// Raft §5.2:
//
// When a server starts, it's always started as Follower and it will remain in
// this state until receive a message from a Leader or Candidate.
//
raft.state = options.state || Raft.FOLLOWER; // Our current state.
raft.leader = ''; // Leader in our cluster.
raft.term = 0; // Our current term.
//
// Raft §5.2:
//
// When a server starts, it's always started as Follower and it will remain in
// this state until receive a message from a Leader or Candidate.
//
raft.state = options.state || Raft.FOLLOWER; // Our current state.
raft.leader = ''; // Leader in our cluster.
raft.term = 0; // Our current term.
raft._initialize(options);
}
raft._initialize(options);
}
/**
* Initialize Raft and start listening to the various of events we're
* emitting as we're quite chatty to provide the maximum amount of flexibility
* and reconfigurability.
*
* @param {Object} options The configuration you passed in the constructor.
* @private
*/
_initialize(options) {
var raft = this;
//
// Add some sugar and spice and everything nice. Oh, and also inheritance.
//
Raft.extend = require('extendible');
Raft.prototype = new EventEmitter();
Raft.prototype.constructor = Raft;
//
// Add some methods which are best done using modules.
//
Raft.prototype.emits = require('emits');
Raft.prototype.change = require('modification')(' change');
/**
* Raft §5.1:
*
* A Raft can be in only one of the various states. The stopped state is not
* something that is part of the Raft protocol but something we might want to
* use internally while we're starting or shutting down our node. The following
* states are generated:
*
* - STOPPED: Assume we're dead.
* - LEADER: We're selected as leader process.
* - CANDIDATE: We want to be promoted to leader.
* - FOLLOWER: We're just following a leader.
* - CHILD: A node that has been added using JOIN.
*
* @type {Number}
* @private
*/
Raft.states = 'STOPPED,LEADER,CANDIDATE,FOLLOWER,CHILD'.split(',');
for (var s = 0; s < Raft.states.length; s++) {
Raft[Raft.states[s]] = s;
}
/**
* Initialize Raft and start listening to the various of events we're
* emitting as we're quite chatty to provide the maximum amount of flexibility
* and reconfigurability.
*
* @param {Object} options The configuration you passed in the constructor.
* @api private
*/
Raft.prototype._initialize = function initializing(options) {
var raft = this;
//
// Reset our vote as we're starting a new term. Votes only last one term.
//
raft.on('term change', function change() {
raft.votes.for = null;
raft.votes.granted = 0;
});
//
// Reset our times and start the heartbeat again. If we're promoted to leader
// the heartbeat will automatically be broadcasted to users as well.
//
raft.on('state change', function change(state) {
raft.timers.clear('heartbeat, election');
raft.heartbeat(Raft.LEADER === raft.state ? raft.beat : raft.timeout());
raft.emit(Raft.states[state].toLowerCase());
});
//
// Receive incoming messages and process them.
//
raft.on('data', function incoming(packet, write) {
write = write || nope;
var reason;
if ('object' !== raft.type(packet)) {
reason = 'Invalid packet received';
raft.emit('error', new Error(reason));
return write(raft.packet('error', reason));
}
//
// Raft §5.1:
// Reset our vote as we're starting a new term. Votes only last one term.
//
// Applies to all states. If a response contains a higher term then our
// current term need to change our state to FOLLOWER and set the received
// term.
raft.on('term change', function change() {
raft.votes.for = null;
raft.votes.granted = 0;
});
//
// If the raft receives a request with a stale term number it should be
// rejected.
// Reset our times and start the heartbeat again. If we're promoted to leader
// the heartbeat will automatically be broadcasted to users as well.
//
if (packet.term > raft.term) {
raft.change({
leader: Raft.LEADER === packet.state ? packet.address : packet.leader || raft.leader,
state: Raft.FOLLOWER,
term: packet.term
});
} else if (packet.term < raft.term) {
reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ raft.term;
raft.emit('error', new Error(reason));
raft.on('state change', function change(state) {
raft.timers.clear('heartbeat, election');
raft.heartbeat(Raft.LEADER === raft.state ? raft.beat : raft.timeout());
raft.emit(Raft.states[state].toLowerCase());
});
return write(raft.packet('error', reason));
}
//
// Raft §5.2:
// Receive incoming messages and process them.
//
// If we receive a message from someone who claims to be leader and shares
// our same term while we're in candidate mode we will recognize their
// leadership and return as follower.
//
// If we got this far we already know that our terms are the same as it
// would be changed or prevented above..
//
if (Raft.LEADER === packet.state) {
if (Raft.FOLLOWER !== raft.state) raft.change({ state: Raft.FOLLOWER });
if (packet.address !== raft.leader) raft.change({ leader: packet.address });
raft.on('data', async (packet, write) => {
write = write || nope;
var reason;
if ('object' !== raft.type(packet)) {
reason = 'Invalid packet received';
raft.emit('error', new Error(reason));
return write(await raft.packet('error', reason));
}
//
// Always when we receive an message from the Leader we need to reset our
// heartbeat.
// Raft §5.1:
//
raft.heartbeat(raft.timeout());
}
// Applies to all states. If a response contains a higher term then our
// current term need to change our state to FOLLOWER and set the received
// term.
//
// If the raft receives a request with a stale term number it should be
// rejected.
//
if (packet.term > raft.term) {
raft.change({
leader: Raft.LEADER === packet.state ? packet.address : packet.leader || raft.leader,
state: Raft.FOLLOWER,
term: packet.term
});
} else if (packet.term < raft.term) {
reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ raft.term;
raft.emit('error', new Error(reason));
switch (packet.type) {
return write(raft.packet('error', reason));
}
//
// Raft §5.2:
// Raft §5.4:
//
// A raft asked us to vote on them. We can only vote to them if they
// represent a higher term (and last log term, last log index).
// If we receive a message from someone who claims to be leader and shares
// our same term while we're in candidate mode we will recognize their
// leadership and return as follower.
//
case 'vote':
// If we got this far we already know that our terms are the same as it
// would be changed or prevented above..
//
if (Raft.LEADER === packet.state) {
if (Raft.FOLLOWER !== raft.state) raft.change({ state: Raft.FOLLOWER });
if (packet.address !== raft.leader) raft.change({ leader: packet.address });
//
// The term of the vote is bigger then ours so we need to update it. If
// it's the same and we already voted, we need to deny the vote.
// Always when we receive an message from the Leader we need to reset our
// heartbeat.
//
if (raft.votes.for && raft.votes.for !== packet.address) {
raft.emit('vote', packet, false);
raft.heartbeat(raft.timeout());
}
return write(raft.packet('voted', { granted: false }));
}
switch (packet.type) {
//
// If we maintain a log, check if the candidates log is as up to date as
// ours.
// Raft §5.2:
// Raft §5.4:
//
// @TODO point to index of last commit entry.
// @TODO point to term of last commit entry.
// A raft asked us to vote on them. We can only vote to them if they
// represent a higher term (and last log term, last log index).
//
if (raft.log && packet.last && (
raft.log.index > packet.last.index
|| raft.term > packet.last.term
)) {
raft.emit('vote', packet, false);
case 'vote':
//
// The term of the vote is bigger then ours so we need to update it. If
// it's the same and we already voted, we need to deny the vote.
//
if (raft.votes.for && raft.votes.for !== packet.address) {
raft.emit('vote', packet, false);
return write(raft.packet('voted', { granted: false }));
}
return write(await raft.packet('voted', { granted: false }));
}
//
// We've made our decision, we haven't voted for this term yet and this
// candidate came in first so it gets our vote as all requirements are
// met.
//
raft.votes.for = packet.address;
raft.emit('vote', packet, true);
raft.change({ leader: packet.address, term: packet.term });
write(raft.packet('voted', { granted: true }));
//
// If we maintain a log, check if the candidates log is as up to date as
// ours.
//
if (raft.log) {
const { index, term } = await raft.log.getLastInfo();
//
// We've accepted someone as potential new leader, so we should reset
// our heartbeat to prevent this raft from timing out after voting.
// Which would again increment the term causing us to be next CANDIDATE
// and invalidates the request we just got, so that's silly willy.
//
raft.heartbeat(raft.timeout());
break;
if (index > packet.last.index && term > packet.last.term) {
raft.emit('vote', packet, false);
//
// A new incoming vote.
//
case 'voted':
//
// Only accepts votes while we're still in a CANDIDATE state.
//
if (Raft.CANDIDATE !== raft.state) {
return write(raft.packet('error', 'No longer a candidate, ignoring vote'));
}
return write(await raft.packet('voted', { granted: false }));
}
}
//
// Increment our received votes when our voting request has been
// granted by the raft that received the data.
//
if (packet.data.granted) {
raft.votes.granted++;
}
//
// We've made our decision, we haven't voted for this term yet and this
// candidate came in first so it gets our vote as all requirements are
// met.
//
raft.votes.for = packet.address;
raft.emit('vote', packet, true);
raft.change({ leader: packet.address, term: packet.term });
write(await raft.packet('voted', { granted: true }));
//
// We've accepted someone as potential new leader, so we should reset
// our heartbeat to prevent this raft from timing out after voting.
// Which would again increment the term causing us to be next CANDIDATE
// and invalidates the request we just got, so that's silly willy.
//
raft.heartbeat(raft.timeout());
break;
//
// Check if we've received the minimal amount of votes required for this
// current voting round to be considered valid.
// A new incoming vote.
//
if (raft.quorum(raft.votes.granted)) {
raft.change({ leader: raft.address, state: Raft.LEADER });
case 'voted':
//
// Only accepts votes while we're still in a CANDIDATE state.
//
if (Raft.CANDIDATE !== raft.state) {
return write(await raft.packet('error', 'No longer a candidate, ignoring vote'));
}
//
// Send a heartbeat message to all connected clients.
// Increment our received votes when our voting request has been
// granted by the raft that received the data.
//
raft.message(Raft.FOLLOWER, raft.packet('append'));
}
if (packet.data.granted) {
raft.votes.granted++;
}
//
// Check if we've received the minimal amount of votes required for this
// current voting round to be considered valid.
//
if (raft.quorum(raft.votes.granted)) {
raft.change({ leader: raft.address, state: Raft.LEADER });
//
// Send a heartbeat message to all connected clients.
//
raft.message(Raft.FOLLOWER, await raft.packet('append'));
}
//
// Empty write, nothing to do.
//
write();
break;
case 'error':
raft.emit('error', new Error(packet.data));
break;
case 'append':
const {term, index} = await raft.log.getLastInfo();
// We do not have the last index as our last entry
// Look back in log in case we have it previously
// if we do remove any bad uncommitted entries following it
if (packet.last.index !== index && packet.last.index !== 0) {
const hasIndex = await raft.log.has(packet.last.index);
if (hasIndex) raft.log.removeEntriesAfter(packet.last.index);
else return raft.message(Raft.LEADER, await raft.packet('append fail', {
term: packet.last.term,
index: packet.last.index
}));
}
if (packet.data) {
const entry = packet.data[0];
await raft.log.saveCommand(entry.command, entry.term, entry.index);
raft.message(Raft.LEADER, await raft.packet('append ack', {
term: entry.term,
index: entry.index
}));
}
//if packet commit index not the same. Commit commands
if (raft.log.committedIndex < packet.last.committedIndex) {
const entries = await raft.log.getUncommittedEntriesUpToIndex(packet.last.committedIndex, packet.last.term);
raft.commitEntries(entries);
}
break;
case 'append ack':
const entry = await raft.log.commandAck(packet.data.index, packet.address);
if (raft.quorum(entry.responses.length) && !entry.committed) {
const entries = await raft.log.getUncommittedEntriesUpToIndex(entry.index, entry.term);
raft.commitEntries(entries);
}
break;
case 'append fail':
const previousEntry = await raft.log.get(packet.data.index);
const append = await raft.appendPacket(previousEntry);
write(append);
break;
//
// Empty write, nothing to do.
// RPC command
//
write();
break;
case 'exec':
break;
case 'error':
raft.emit('error', new Error(packet.data));
break;
//
// Unknown event, we have no idea how to process this so we're going to
// return an error.
//
default:
if (raft.listeners('rpc').length) {
raft.emit('rpc', packet, write);
} else {
write(await raft.packet('error', 'Unknown message type: '+ packet.type));
}
}
});
//
// Remark: Are we assuming we are getting an appendEntries from the
// leader and comparing and appending our log?
//
case 'append':
break;
//
// We do not need to execute the rest of the functionality below as we're
// currently running as "child" raft of the cluster not as the "root" raft.
//
if (Raft.CHILD === raft.state) return raft.emit('initialize');
//
// Remark: So does this get emit when we need to write our OWN log?
//
case 'log':
break;
//
// Setup the log & appends. Assume that if we're given a function log that it
// needs to be initialized as it requires access to our raft instance so it
// can read our information like our leader, state, term etc.
//
if ('function' === raft.type(raft.Log)) {
raft.log = new raft.Log(raft, options);
}
//
// RPC command
//
case 'exec':
break;
/**
* The raft is now listening to events so we can start our heartbeat timeout.
* So that if we don't hear anything from a leader we can promote our selfs to
* a candidate state.
*
* Start listening listening for heartbeats when implementors are also ready
* with setting up their code.
*
* @api private
*/
function initialize(err) {
if (err) return raft.emit('error', err);
//
// Unknown event, we have no idea how to process this so we're going to
// return an error.
//
default:
if (raft.listeners('rpc').length) {
raft.emit('rpc', packet, write);
} else {
write(raft.packet('error', 'Unknown message type: '+ packet.type));
}
raft.emit('initialize');
raft.heartbeat(raft.timeout());
}
});
//
// We do not need to execute the rest of the functionality below as we're
// currently running as "child" raft of the cluster not as the "root" raft.
//
if (Raft.CHILD === raft.state) return raft.emit('initialize');
if ('function' === raft.type(raft.initialize)) {
if (raft.initialize.length === 2) return raft.initialize(options, initialize);
raft.initialize(options);
}
//
// Setup the log & appends. Assume that if we're given a function log that it
// needs to be initialized as it requires access to our raft instance so it
// can read our information like our leader, state, term etc.
//
if ('function' === raft.type(raft.Log)) {
raft.log = new raft.Log(raft, options);
initialize();
}
/**
* The raft is now listening to events so we can start our heartbeat timeout.
* So that if we don't hear anything from a leader we can promote our selfs to
* a candidate state.
* Proper type checking.
*
* Start listening listening for heartbeats when implementors are also ready
* with setting up their code.
* @param {Mixed} of Thing we want to know the type of.
* @returns {String} The type.
* @private
*/
type(of) {
return Object.prototype.toString.call(of).slice(8, -1).toLowerCase();
}
/**
* Check if we've reached our quorum (a.k.a. minimum amount of votes requires
* for a voting round to be considered valid) for the given amount of votes.
*
* @api private
* @param {Number} responses Amount of responses received.
* @returns {Boolean}
* @public
*/
function initialize(err) {
if (err) return raft.emit('error', err);
quorum(responses) {
if (!this.nodes.length || !responses) return false;
raft.emit('initialize');
raft.heartbeat(raft.timeout());
return responses >= this.majority();
}
if ('function' === raft.type(raft.initialize)) {
if (raft.initialize.length === 2) return raft.initialize(options, initialize);
raft.initialize(options);
/**
* The majority required to reach our the quorum.
*
* @returns {Number}
* @public
*/
majority() {
return Math.ceil(this.nodes.length / 2) + 1;
}
initialize();
};
/**
* Attempt to run a function indefinitely until the callback is called.
*
* @param {Function} attempt Function that needs to be attempted.
* @param {Function} fn Completion callback.
* @param {Number} timeout Which timeout should we use.
* @returns {Raft}
* @public
*/
indefinitely(attempt, fn, timeout) {
var uuid = UUID()
, raft = this;
/**
* Proper type checking.
*
* @param {Mixed} of Thing we want to know the type of.
* @returns {String} The type.
* @api private
*/
Raft.prototype.type = function type(of) {
return Object.prototype.toString.call(of).slice(8, -1).toLowerCase();
};
(function again() {
//
// We need to force async execution here because we do not want to saturate
// the event loop with sync executions. We know that it's important these
// functions are retried indefinitely but if it's called synchronously we will
// not have time to receive data or updates.
//
var next = one(function force(err, data) {
if (!raft.timers) return; // We're been destroyed, ignore all.
/**
* Check if we've reached our quorum (a.k.a. minimum amount of votes requires
* for a voting round to be considered valid) for the given amount of votes.
*
* @param {Number} responses Amount of responses received.
* @returns {Boolean}
* @api public
*/
Raft.prototype.quorum = function quorum(responses) {
if (!this.nodes.length || !responses) return false;
raft.timers.setImmediate(uuid +'@async', function async() {
if (err) {
raft.emit('error', err);
return responses >= this.majority();
};
return again();
}
/**
* The majority required to reach our the quorum.
*
* @returns {Number}
* @api public
*/
Raft.prototype.majority = function majority() {
return Math.ceil(this.nodes.length / 2) + 1;
};
fn(data);
});
});
/**
* Attempt to run a function indefinitely until the callback is called.
*
* @param {Function} attempt Function that needs to be attempted.
* @param {Function} fn Completion callback.
* @param {Number} timeout Which timeout should we use.
* @returns {Raft}
* @api public
*/
Raft.prototype.indefinitely = function indefinitely(attempt, fn, timeout) {
var uuid = UUID()
, raft = this;
//
// Ensure that the assigned callback has the same context as our raft.
//
attempt.call(raft, next);
(function again() {
//
// We need to force async execution here because we do not want to saturate
// the event loop with sync executions. We know that it's important these
// functions are retried indefinitely but if it's called synchronously we will
// not have time to receive data or updates.
//
var next = one(function force(err, data) {
if (!raft.timers) return; // We're been destroyed, ignore all.
raft.timers.setTimeout(uuid, function timeoutfn() {
next(new Error('Timed out, attempting to retry again'));
}, +timeout || raft.timeout());
}());
raft.timers.setImmediate(uuid +'@async', function async() {
if (err) {
raft.emit('error', err);
return this;
}
return again();
}
/**
* Start or update the heartbeat of the Raft. If we detect that we've received
* a heartbeat timeout we will promote our selfs to a candidate to take over the
* leadership.
*
* @param {String|Number} duration Time it would take for the heartbeat to timeout.
* @returns {Raft}
* @private
*/
heartbeat(duration) {
var raft = this;
fn(data);
});
});
duration = duration || raft.beat;
//
// Ensure that the assigned callback has the same context as our raft.
//
attempt.call(raft, next);
if (raft.timers.active('heartbeat')) {
raft.timers.adjust('heartbeat', duration);
raft.timers.setTimeout(uuid, function timeoutfn() {
next(new Error('Timed out, attempting to retry again'));
}, +timeout || raft.timeout());
}());
return raft;
}
return this;
};
raft.timers.setTimeout('heartbeat', async () => {
if (Raft.LEADER !== raft.state) {
raft.emit('heartbeat timeout');
/**
* Start or update the heartbeat of the Raft. If we detect that we've received
* a heartbeat timeout we will promote our selfs to a candidate to take over the
* leadership.
*
* @param {String|Number} duration Time it would take for the heartbeat to timeout.
* @returns {Raft}
* @api private
*/
Raft.prototype.heartbeat = function heartbeat(duration) {
var raft = this;
return raft.promote();
}
duration = duration || raft.beat;
//
// According to the raft spec we should be sending empty append requests as
// heartbeat. We want to emit an event so people can modify or inspect the
// payload before we send it. It's also a good indication for when the
// idle state of a LEADER as it didn't get any messages to append/commit to
// the FOLLOWER'S.
//
var packet = await raft.packet('append');
if (raft.timers.active('heartbeat')) {
raft.timers.adjust('heartbeat', duration);
raft.emit('heartbeat', packet);
raft.message(Raft.FOLLOWER, packet).heartbeat(raft.beat);
}, duration);

@@ -498,382 +527,437 @@ return raft;

raft.timers.setTimeout('heartbeat', function heartbeattimeout() {
if (Raft.LEADER !== raft.state) {
raft.emit('heartbeat timeout');
/**
* Send a message to connected nodes within our cluster. The following messaging
* patterns (who) are available:
*
* - Raft.LEADER : Send a message to cluster's current leader.
* - Raft.FOLLOWER : Send a message to all non leaders.
* - Raft.CHILD : Send a message to everybody.
* - <address> : Send a message to a raft based on the address.
*
* @param {Mixed} who Recipient of the message.
* @param {Mixed} what The data we need to send.
* @param {Function} when Completion callback
* @returns {Raft}
* @public
*/
message(who, what, when) {
when = when || nope;
return raft.promote();
}
//
// According to the raft spec we should be sending empty append requests as
// heartbeat. We want to emit an event so people can modify or inspect the
// payload before we send it. It's also a good indication for when the
// idle state of a LEADER as it didn't get any messages to append/commit to
// the FOLLOWER'S.
// If the "who" is undefined, the developer made an error somewhere. Tell them!
//
var packet = raft.packet('append');
if (typeof who === 'undefined') {
throw new Error('Cannot send message to `undefined`. Check your spelling!');
}
raft.emit('heartbeat', packet);
raft.message(Raft.FOLLOWER, packet).heartbeat(raft.beat);
}, duration);
var output = { errors: {}, results: {} }
, length = this.nodes.length
, errors = false
, latency = []
, raft = this
, nodes = []
, i = 0;
return raft;
};
switch (who) {
case Raft.LEADER: for (; i < length; i++)
if (raft.leader === raft.nodes[i].address) {
nodes.push(raft.nodes[i]);
}
break;
/**
* Send a message to connected nodes within our cluster. The following messaging
* patterns (who) are available:
*
* - Raft.LEADER : Send a message to cluster's current leader.
* - Raft.FOLLOWER : Send a message to all non leaders.
* - Raft.CHILD : Send a message to everybody.
* - <address> : Send a message to a raft based on the address.
*
* @param {Mixed} who Recipient of the message.
* @param {Mixed} what The data we need to send.
* @param {Function} when Completion callback
* @returns {Raft}
* @api public
*/
Raft.prototype.message = function message(who, what, when) {
when = when || nope;
case Raft.FOLLOWER: for (; i < length; i++)
if (raft.leader !== raft.nodes[i].address) {
nodes.push(raft.nodes[i]);
}
break;
//
// If the "who" is undefined, the developer made an error somewhere. Tell them!
//
if (typeof who === 'undefined') {
throw new Error('Cannot send message to `undefined`. Check your spelling!');
}
case Raft.CHILD:
Array.prototype.push.apply(nodes, raft.nodes);
break;
var output = { errors: {}, results: {} }
, length = this.nodes.length
, errors = false
, latency = []
, raft = this
, nodes = []
, i = 0;
default: for (; i < length; i++)
if (who === raft.nodes[i].address) {
nodes.push(raft.nodes[i]);
}
}
switch (who) {
case Raft.LEADER: for (; i < length; i++)
if (raft.leader === raft.nodes[i].address) {
nodes.push(raft.nodes[i]);
}
break;
/**
* A small wrapper to force indefinitely sending of a certain packet.
*
* @param {Raft} client Raft we need to write a message to.
* @param {Object} data Message that needs to be send.
* @api private
*/
function wrapper(client, data) {
var start = +new Date();
case Raft.FOLLOWER: for (; i < length; i++)
if (raft.leader !== raft.nodes[i].address) {
nodes.push(raft.nodes[i]);
}
break;
client.write(data, function written(err, data) {
latency.push(+new Date() - start);
case Raft.CHILD:
Array.prototype.push.apply(nodes, raft.nodes);
break;
//
// Add the error or output to our `output` object to be
// passed to the callback when all the writing is done.
//
if (err) {
errors = true;
output.errors[client.address] = err;
} else {
output.results[client.address] = data;
}
default: for (; i < length; i++)
if (who === raft.nodes[i].address) {
nodes.push(raft.nodes[i]);
}
//
// OK, so this is the strange part here. We've broadcasted messages and
// got replies back. This reply contained data so we need to process it.
// What if the data is incorrect? Then we have no way at the moment to
// send back reply to a reply to the server.
//
if (err) raft.emit('error', err);
else if (data) raft.emit('data', data);
//
// Messaging has been completed.
//
if (latency.length === length) {
raft.timing(latency);
when(errors ? output.errors : undefined, output.results);
latency.length = nodes.length = 0;
output = null;
}
});
}
length = nodes.length;
i = 0;
for (; i < length; i++) {
wrapper(nodes[i], what);
}
return raft;
};
/**
* Generate the various of timeouts.
*
* @returns {Number}
* @private
*/
timeout() {
var times = this.election;
return Math.floor(Math.random() * (times.max - times.min + 1) + times.min);
}
/**
* A small wrapper to force indefinitely sending of a certain packet.
* Calculate if our average latency causes us to come dangerously close to the
* minimum election timeout.
*
* @param {Raft} client Raft we need to write a message to.
* @param {Object} data Message that needs to be send.
* @api private
* @param {Array} latency Latency of the last broadcast.
* @param {Boolean} Success-fully calculated the threshold.
* @private
*/
function wrapper(client, data) {
var start = +new Date();
timing(latency) {
var raft = this
, sum = 0
, i = 0;
client.write(data, function written(err, data) {
latency.push(+new Date() - start);
if (Raft.STOPPED === raft.state) return false;
//
// Add the error or output to our `output` object to be
// passed to the callback when all the writing is done.
//
if (err) {
errors = true;
output.errors[client.address] = err;
} else {
output.results[client.address] = data;
}
for (; i < latency.length; i++) {
sum += latency[i];
}
//
// OK, so this is the strange part here. We've broadcasted messages and
// got replies back. This reply contained data so we need to process it.
// What if the data is incorrect? Then we have no way at the moment to
// send back reply to a reply to the server.
//
if (err) raft.emit('error', err);
else if (data) raft.emit('data', data);
raft.latency = Math.floor(sum / latency.length);
//
// Messaging has been completed.
//
if (latency.length === length) {
raft.timing(latency);
when(errors ? output.errors : undefined, output.results);
latency.length = nodes.length = 0;
output = null;
}
});
if (raft.latency > raft.election.min * raft.threshold) {
raft.emit('threshold');
}
return true;
}
length = nodes.length;
i = 0;
/**
* Raft §5.2:
*
* We've detected a timeout from the leaders heartbeats and need to start a new
* election for leadership. We increment our current term, set the CANDIDATE
* state, vote our selfs and ask all others rafts to vote for us.
*
* @returns {Raft}
* @private
*/
async promote() {
var raft = this;
for (; i < length; i++) {
wrapper(nodes[i], what);
}
raft.change({
state: Raft.CANDIDATE, // We're now a candidate,
term: raft.term + 1, // but only for this term.
leader: '' // We no longer have a leader.
});
return raft;
};
//
// Candidates are always biased and vote for them selfs first before sending
// out a voting request to all other rafts in the cluster.
//
raft.votes.for = raft.address;
raft.votes.granted = 1;
/**
* Generate the various of timeouts.
*
* @returns {Number}
* @api private
*/
Raft.prototype.timeout = function timeout() {
var times = this.election;
//
// Broadcast the voting request to all connected rafts in your private
// cluster.
//
const packet = await raft.packet('vote')
return Math.floor(Math.random() * (times.max - times.min + 1) + times.min);
};
raft.message(Raft.FOLLOWER, packet);
/**
* Calculate if our average latency causes us to come dangerously close to the
* minimum election timeout.
*
* @param {Array} latency Latency of the last broadcast.
* @param {Boolean} Success-fully calculated the threshold.
* @api private
*/
Raft.prototype.timing = function timing(latency) {
var raft = this
, sum = 0
, i = 0;
//
// Set the election timeout. This gives the rafts some time to reach
// consensuses about who they want to vote for. If no consensus has been
// reached within the set timeout we will attempt it again.
//
raft.timers
.clear('heartbeat, election')
.setTimeout('election', raft.promote, raft.timeout());
if (Raft.STOPPED === raft.state) return false;
for (; i < latency.length; i++) {
sum += latency[i];
return raft;
}
raft.latency = Math.floor(sum / latency.length);
/**
* Wrap the outgoing messages in an object with additional required data.
*
* @async
* @param {String} type Message type we're trying to send.
* @param {Mixed} data Data to be transfered.
* @returns {Promise<Object>} Packet.
* @private
*/
async packet(type, data) {
var raft = this
, wrapped = {
state: raft.state, // Are we're a leader, candidate or follower.
term: raft.term, // Our current term so we can find mis matches.
address: raft.address, // Address of the sender.
type: type, // Message type.
leader: raft.leader, // Who is our leader.
};
if (raft.latency > raft.election.min * raft.threshold) {
raft.emit('threshold');
//
// If we have logging and state replication enabled we also need to send this
// additional data so we can use it determine the state of this raft.
//
if (raft.log) wrapped.last = await raft.log.getLastInfo();
if (arguments.length === 2) wrapped.data = data;
return wrapped;
}
return true;
};
/**
* appendPacket - Send append message with entry and using the previous entry as the last.index and last.term
*
* @param {Entry} entry Entry to send as data
*
* @return {Promise<object>} Description
* @private
*/
async appendPacket (entry) {
const raft = this;
const last = await raft.log.getEntryInfoBefore(entry);
return{
state: raft.state, // Are we're a leader, candidate or follower.
term: raft.term, // Our current term so we can find mis matches.
address: raft.address, // Address of the sender.
type: 'append', // Append message type .
leader: raft.leader, // Who is our leader.
data: [entry], // The command to send to the other nodes
last,
};
}
/**
* Raft §5.2:
*
* We've detected a timeout from the leaders heartbeats and need to start a new
* election for leadership. We increment our current term, set the CANDIDATE
* state, vote our selfs and ask all others rafts to vote for us.
*
* @returns {Raft}
* @api private
*/
Raft.prototype.promote = function promote() {
var raft = this;
/**
* Create a clone of the current instance with the same configuration. Ideally
* for creating connected nodes in a cluster.. And let that be something we're
* planning on doing.
*
* @param {Object} options Configuration that should override the default config.
* @returns {Raft} The newly created instance.
* @public
*/
clone(options) {
options = options || {};
raft.change({
state: Raft.CANDIDATE, // We're now a candidate,
term: raft.term + 1, // but only for this term.
leader: '' // We no longer have a leader.
});
var raft = this
, node = {
'Log': raft.Log,
'election max': raft.election.max,
'election min': raft.election.min,
'heartbeat': raft.beat,
'threshold': raft.threshold,
}, key;
//
// Candidates are always biased and vote for them selfs first before sending
// out a voting request to all other rafts in the cluster.
//
raft.votes.for = raft.address;
raft.votes.granted = 1;
for (key in node) {
if (key in options || !node.hasOwnProperty(key)) continue;
//
// Broadcast the voting request to all connected rafts in your private
// cluster.
//
var packet = raft.packet('vote')
, i = 0;
options[key] = node[key];
}
raft.message(Raft.FOLLOWER, raft.packet('vote'));
return new raft.constructor(options);
}
//
// Set the election timeout. This gives the rafts some time to reach
// consensuses about who they want to vote for. If no consensus has been
// reached within the set timeout we will attempt it again.
//
raft.timers
.clear('heartbeat, election')
.setTimeout('election', raft.promote, raft.timeout());
/**
* A new raft is about to join the cluster. So we need to upgrade the
* configuration of every single raft.
*
* @param {String} address The address of the raft that is connected.
* @param {Function} write A method that we use to write data.
* @returns {Raft} The raft we created and that joined our cluster.
* @public
*/
join(address, write) {
var raft = this;
return raft;
};
// can be function or asyncfunction
if (/function/.test(raft.type(address))) {
write = address; address = null;
}
/**
* Wrap the outgoing messages in an object with additional required data.
*
* @param {String} type Message type we're trying to send.
* @param {Mixed} data Data to be transfered.
* @returns {Object} Packet.
* @api private
*/
Raft.prototype.packet = function wrap(type, data) {
var raft = this
, packet = {
state: raft.state, // Are we're a leader, candidate or follower.
term: raft.term, // Our current term so we can find mis matches.
address: raft.address, // Address of the sender.
type: type, // Message type.
leader: raft.leader, // Who is our leader.
};
//
// You shouldn't be able to join the cluster as your self. So we're going to
// add a really simple address check here. Return nothing so people can actually
// check if a raft has been added.
//
if (raft.address === address) return;
//
// If we have logging and state replication enabled we also need to send this
// additional data so we can use it determine the state of this raft.
//
// @TODO point to index of last commit entry.
// @TODO point to term of last commit entry.
//
if (raft.log) packet.last = { term: raft.term, index: raft.log.index };
if (arguments.length === 2) packet.data = data;
var node = raft.clone({
write: write, // Optional function that receives our writes.
address: address, // A custom address for the raft we added.
state: Raft.CHILD // We are a raft in the cluster.
});
return packet;
};
node.once('end', function end() {
raft.leave(node);
}, raft);
/**
* Create a clone of the current instance with the same configuration. Ideally
* for creating connected nodes in a cluster.. And let that be something we're
* planning on doing.
*
* @param {Object} options Configuration that should override the default config.
* @returns {Raft} The newly created instance.
* @api public
*/
Raft.prototype.clone = function clone(options) {
options = options || {};
raft.nodes.push(node);
raft.emit('join', node);
var raft = this
, node = {
'Log': raft.Log,
'election max': raft.election.max,
'election min': raft.election.min,
'heartbeat': raft.beat,
'threshold': raft.threshold,
}, key;
return node;
}
for (key in node) {
if (key in options || !node.hasOwnProperty(key)) continue;
/**
* Remove a raft from the cluster.
*
* @param {String} address The address of the raft that should be removed.
* @returns {Raft} The raft that we removed.
* @public
*/
leave(address) {
var raft = this
, index = -1
, node;
options[key] = node[key];
}
for (var i = 0; i < raft.nodes.length; i++) {
if (raft.nodes[i] === address || raft.nodes[i].address === address) {
node = raft.nodes[i];
index = i;
break;
}
}
return new raft.constructor(options);
};
if (~index && node) {
raft.nodes.splice(index, 1);
/**
* A new raft is about to join the cluster. So we need to upgrade the
* configuration of every single raft.
*
* @param {String} address The address of the raft that is connected.
* @param {Function} write A method that we use to write data.
* @returns {Raft} The raft we created and that joined our cluster.
* @api public
*/
Raft.prototype.join = function join(address, write) {
var raft = this;
if (node.end) node.end();
raft.emit('leave', node);
}
if ('function' === raft.type(address)) {
write = address; address = null;
return node;
}
//
// You shouldn't be able to join the cluster as your self. So we're going to
// add a really simple address check here. Return nothing so people can actually
// check if a raft has been added.
//
if (raft.address === address) return;
/**
* This Raft needs to be shut down.
*
* @returns {Boolean} Successful destruction.
* @public
*/
end() {
var raft = this;
var node = raft.clone({
write: write, // Optional function that receives our writes.
address: address, // A custom address for the raft we added.
state: Raft.CHILD // We are a raft in the cluster.
});
if (Raft.STOPPED === raft.state) return false;
raft.change({ state: Raft.STOPPED });
node.once('end', function end() {
raft.leave(node);
}, raft);
if (raft.nodes.length) for (var i = 0; i < raft.nodes.length; i++) {
raft.leave(raft.nodes[i]);
}
raft.nodes.push(node);
raft.emit('join', node);
raft.emit('end');
raft.timers.end();
raft.removeAllListeners();
return node;
};
if (raft.log) raft.log.end();
raft.timers = raft.Log = raft.beat = raft.election = null;
/**
* Remove a raft from the cluster.
*
* @param {String} address The address of the raft that should be removed.
* @returns {Raft} The raft that we removed.
* @api public
*/
Raft.prototype.leave = function leave(address) {
var raft = this
, index = -1
, node;
return true;
}
for (var i = 0; i < raft.nodes.length; i++) {
if (raft.nodes[i] === address || raft.nodes[i].address === address) {
node = raft.nodes[i];
index = i;
break;
/**
* Raft §5.3:
* command - Saves command to log and replicates to followers
*
* @param {type} command Json command to be stored in the log
*
* @return {Promise<void>} Description
*/
async command(command) {
let raft = this;
if(raft.state !== Raft.LEADER) {
return fn({
message: 'NOTLEADER',
leaderAddress: raft.leader
});
}
// about to send an append so don't send a heart beat
// raft.heartbeat(raft.beat);
const entry = await raft.log.saveCommand(command, raft.term);
const appendPacket = await raft.appendPacket(entry);
raft.message(Raft.FOLLOWER, appendPacket);
}
if (~index && node) {
raft.nodes.splice(index, 1);
if (node.end) node.end();
raft.emit('leave', node);
/**
* commitEntries - Commites entries in log and emits commited entries
*
* @param {Entry[]} entries Entries to commit
* @return {Promise<void>}
*/
async commitEntries (entries) {
entries.forEach(async (entry) => {
await this.log.commit(entry.index)
this.emit('commit', entry.command);
});
}
}
return node;
};
/**
* This Raft needs to be shut down.
* Raft §5.1:
*
* @returns {Boolean} Successful destruction.
* @api public
* A Raft can be in only one of the various states. The stopped state is not
* something that is part of the Raft protocol but something we might want to
* use internally while we're starting or shutting down our node. The following
* states are generated:
*
* - STOPPED: Assume we're dead.
* - LEADER: We're selected as leader process.
* - CANDIDATE: We want to be promoted to leader.
* - FOLLOWER: We're just following a leader.
* - CHILD: A node that has been added using JOIN.
*
* @type {Number}
* @private
*/
Raft.prototype.end = Raft.prototype.destroy = function end() {
var raft = this;
Raft.states = 'STOPPED,LEADER,CANDIDATE,FOLLOWER,CHILD'.split(',');
for (var s = 0; s < Raft.states.length; s++) {
Raft[Raft.states[s]] = s;
}
if (Raft.STOPPED === raft.state) return false;
raft.state = Raft.STOPPED;
if (raft.nodes.length) for (var i = 0; i < raft.nodes.length; i++) {
raft.leave(raft.nodes[i]);
}
raft.emit('end');
raft.timers.end();
raft.removeAllListeners();
if (raft.log) raft.log.end();
raft.timers = raft.log = raft.Log = raft.beat = raft.election = null;
return true;
};
//
// Expose the module interface.
//
module.exports = Raft;

@@ -1,121 +0,348 @@

'use strict';
const encode = require('encoding-down');
const levelup = require('levelup');
var setImmediate = require('immediate');
/**
* The representation of the log of a single node.
*
* Options:
*
* - `engine` The storage engine that should be used.
*
* @constructor
* @param {Node} node Instance of a node.
* @param {Object} options Optional configuration.
* @api public
* @typedef Entry
* @property {number} index the key for the entry
* @property {number} term the term that the entry was saved in
* @property {boolean} Committed if the entry has been committed
* @property {array} responses number of followers that have saved the log entry
* @property {object} command The command to be used in the raft state machine
*/
function Log(node, options) {
if (!(this instanceof Log)) return new Log(node, options);
class Log {
/**
* @class
* @param {object} node The raft node using this log
* @param {object} Options Options object
* @param {object} Options.[adapter= require('leveldown')] Leveldown adapter, defaults to leveldown
* @param {string} Options.[path='./'] Path to save the log db to
* @return {Log}
*/
constructor (node, {adapter = require('leveldown'), path = ''}) {
this.node = node;
this.committedIndex = 0;
this.db = levelup(encode(adapter(path), { valueEncoding: 'json', keyEncoding: 'binary'}));
}
this.node = node;
this.engine = options.engine || 'memory';
/**
* saveCommand - Saves a command to the log
* Initially the command is uncommitted. Once a majority
* of follower nodes have saved the log entry it will be
* committed.
*
* A follow node will also use this method to save a received command to
* its log
*
* @async
* @param {object} command A json object to save to the log
* @param {number} term Term to save with the log entry
* @param {number} [index] Index to save the entry with. This is used by the followers
* @return {Promise<entry>} Description
*/
async saveCommand (command, term, index) {
//
// Remark: So we want to use something like leveldb here with a particular engine but
// for now lets just use a silly little array
// The following would all be stored in a leveldb database. Entries would be
// its own namespaced key set for easy stream reading and the other values
// would be stored at their particular key for proper persistence and
// fetching. These could be used as a cache like thing as well if we wanted
// faster lookups by default.
//
this.commitIndex = 0;
this.lastApplied = 0;
this.startIndex = 0;
this.startTerm = 0;
this.entries = [];
}
if (!index) {
const {
index: lastIndex,
} = await this.getLastInfo();
//
// Add some sugar and spice and everything nice. Oh, and also inheritance.
//
Log.extend = require('extendible');
index = lastIndex + 1;
}
/**
* Commit a log entry
*
* @param {Object} data Data we receive from ourselves or from LEADER
* @param {function} fn function
* @api public
*/
Log.prototype.commit = function commit(data, fn) {
var entry = this.entry(data);
const entry = {
term: term,
index,
committed: false,
responses: [{
address: this.node.address, // start with vote from leader
ack: true
}],
command,
}
if (entry) this.append(entry);
return setImmediate(fn.bind(null, null, !!entry));
};
await this.put(entry);
return entry;
}
Log.prototype.append = function append(entry) {
this.entries.push(entry);
};
/**
* put - Save entry to database using the index as the key
*
* @async
* @param {Entry} entry entry to save
* @return {Promise<void>} Resolves once entry is saved
* @public
*/
put (entry) {
return this.db.put(entry.index, entry);
}
/**
* Return the last entry (this may be async in the future)
*
* @returns {Object}
* @api public
*/
Log.prototype.last = function lastentry() {
var last = this.entries[this.entries.length - 1];
if (last) return last;
/**
* getEntriesAfter - Get all the entries after a specific index
*
* @param {number} index Index that entries must be greater than
* @return {Promise<Entry[]>} returns all entries
* @public
*/
getEntriesAfter(index) {
const entries = [];
return new Promise((resolve, reject) => {
this.db.createReadStream({gt: index})
.on('data', data => {
entries.push(data.value);
})
.on('error', err => {
reject(err)
})
.on('end', () => {
resolve(entries);
})
});
return {
index: this.startIndex,
term: this.startTerm
};
};
}
/**
* Create a log entry that we will append with correct form and attrs
*
* @param {object} Data to compute to a proper entry
* @api public
*/
Log.prototype.entry = function entry(data) {
//
// type of entry, (data/command, or something related to raft itself)
//
var type = data.type
, command = data.command
//
// Remark: Hmm this may have to be async if we are fetching everything from a db,
// lets just keep it in memory for now because we may just preload into cache
// on startup?
//
, index = this.last().index + 1;
//
// Remark: How do we want to store function executions or particular actions
// to be replayed in case necessary?
//
return {
command: command,
index: index,
term: this.node.term,
type: type
};
};
/**
* removeEntriesAfter - Removes all entries after a given index
*
* @async
* @param {Number} index Index to use to find all entries after
* @return {Promise<void>} Returns once all antries are removed
* @public
*/
async removeEntriesAfter (index) {
const entries = await this.getEntriesAfter(index)
return Promise.all(entries.map(entry => {
return this.db.del(entry.index);
}));
}
/**
* The raft instance we're attached to is closing.
*
* @returns {Boolean} First time shutdown.
* @api private
*/
Log.prototype.end = function end() {
return true;
/**
* has - Checks if entry exists at index
*
* @async
* @param {number} index Index position to check if entry exists
* @return {boolean} Boolean on whether entry exists at index
* @public
*/
async has (index) {
try {
const entry = await this.db.get(index);
return true
} catch (err) {
return false;
}
}
/**
* get - Gets an entry at the specified index position
*
* @param {type} index Index position of entry
* @return {Promise<Entry>} Promise of found entry returns NotFoundError if does not exist
* @public
*/
get (index) {
return this.db.get(index);
}
/**
* getLastInfo - Returns index, term of the last entry in the long along with
* the committedIndex
*
* @async
* @return {Promise<Object>} Last entries index, term and committedIndex
*/
async getLastInfo () {
const { index, term } = await this.getLastEntry();
return {
index,
term,
committedIndex: this.committedIndex
};
}
/**
* getLastEntry - Returns last entry in the log
*
* @return {Promise<Entry>} returns {index: 0, term: node.term} if there are no entries in the log
*/
getLastEntry () {
return new Promise((resolve, reject) => {
let hasResolved = false;
let entry = {
index: 0,
term: this.node.term
};
this.db.createReadStream({reverse: true, limit: 1})
.on('data', data => {
hasResolved = true;
entry = data.value;
})
.on('error', err => {
hasResolved = true;
reject(err)
})
.on('end', () => {
resolve(entry);
})
});
}
/**
* getEntryInfoBefore - Gets the index and term of the previous entry along with the log's committedIndex
* If there is no item before it returns {index: 0}
*
*
* @async
* @param {Entry} entry
* @return {Promise<object>} {index, term, committedIndex}
*/
async getEntryInfoBefore (entry) {
const {index, term} = await this.getEntryBefore(entry);
return {
index,
term,
committedIndex: this.committedIndex
};
}
/**
* getEntryBefore - Get entry before the specified entry
* If there is no item before it returns {index: 0}
*
* @async
* @param {Entry} entry
*
* @return {Promise<Entry>}
*/
getEntryBefore (entry) {
const defaultInfo = {
index: 0,
term: this.node.term
};
// We know it is the first entry, so save the query time
if (entry.index === 1) {
return Promise.resolve(defaultInfo);
}
return new Promise((resolve, reject) => {
let hasResolved = false;
this.db.createReadStream({
reverse: true,
limit: 1,
lt: entry.index
})
.on('data', (data) => {
hasResolved = true;
resolve(data.value);
})
.on('error', (err) => {
hasResolved = true;
reject(err);
})
.on('end', () => {
if (!hasResolved) {
// Returns empty index if there is no items
// before entry or log is empty
resolve(defaultInfo);
}
});
});
}
/**
* commandAck - acknowledges a follow with address has stored entry at index
* This is used to determine if a quorom has been met for a log entry and
* if enough followers have stored it so that it can be committed
*
* @async
* @param {number} index Index of entry that follow has stored
* @param {string} address Address of follower that has stored log
* @return {Promise<Entry>}
*/
async commandAck (index, address) {
let entry;
try {
entry = await this.get(index);
} catch (err) {
return {
responses: []
}
}
const entryIndex = await entry.responses.findIndex(resp => resp.address === address);
// node hasn't voted yet. Add response
if (entryIndex === -1) {
entry.responses.push({
address,
ack: true
});
}
await this.put(entry);
return entry;
}
/**
* commit - Set the entry to committed
*
* @async
* @param {number} Index index
*
* @return {Promise<entry>}
*/
async commit (index) {
const entry = await this.db.get(index);
entry.committed = true;
this.committedIndex = entry.index;
return this.put(entry);
}
/**
* getUncommittedEntriesUpToIndex - Returns all entries before index that have not been committed yet
*
* @param {number} index Index value to find all entries up to
* @return {Promise<Entry[]}
* @private
*/
getUncommittedEntriesUpToIndex (index) {
return new Promise((resolve, reject) => {
let hasResolved = false;
const entries = [];
this.db.createReadStream({
gt: this.committedIndex,
lte: index
})
.on('data', data => {
if (!data.value.committed) {
entries.push(data.value);
}
})
.on('error', err => {
reject(err)
})
.on('end', () => {
resolve(entries);
});
});
}
/**
* end - Log end
* Called when the node is shutting down
*
* @return {boolean} Successful close.
* @private
*/
end () {
return this.db.close();
}
};
//
// Expose the log module.
//
module.exports = Log;
{
"name": "liferaft",
"version": "0.1.1",
"version": "1.0.0",
"description": "Consensus protocol based on raft, it will one day save your live.",
"main": "index.js",
"browser": "./lib/",
"scripts": {
"100%": "istanbul check-coverage --statements 100 --functions 100 --lines 100 --branches 100",
"test": "mocha test.js",
"watch": "mocha --watch test.js",
"coverage": "istanbul cover ./node_modules/.bin/_mocha -- test.js",
"test-travis": "istanbul cover node_modules/.bin/_mocha --report lcovonly -- test.js"
"test": "npm run test-memdown && npm run test-leveldown",
"test-leveldown": "ADAPTER=leveldown mocha test/**.js",
"test-memdown": "ADAPTER=memdown mocha test/**.js",
"test-travis": "nyc --reporter=html --reporter=text npm test",
"prepublish": "npm run build",
"build": "babel ./index.js ./log.js -d ./lib"
},

@@ -30,17 +32,32 @@ "repository": {

"emits": "3.0.x",
"eventemitter3": "1.1.x",
"encoding-down": "^3.0.1",
"eventemitter3": "2.0.x",
"extendible": "0.1.x",
"immediate": "3.2.x",
"millisecond": "0.0.x",
"leveldown": "^3.0.0",
"levelup": "^2.0.1",
"millisecond": "0.1.x",
"modification": "1.0.x",
"one-time": "0.0.x",
"tick-tock": "0.1.x"
"tick-tock": "1.0.x"
},
"devDependencies": {
"assume": "1.2.x",
"diagnostics": "0.0.x",
"istanbul": "0.3.x",
"mocha": "2.2.x",
"pre-commit": "1.0.x"
"@babel/cli": "^7.0.0-beta.38",
"@babel/core": "^7.0.0-beta.38",
"@babel/preset-env": "^7.0.0-beta.38",
"@babel/register": "^7.0.0-beta.38",
"assume": "1.5.x",
"diagnostics": "1.1.x",
"memdown": "^1.4.1",
"mkdirp": "^0.5.1",
"mocha": "5.0.x",
"nyc": "^11.3.0",
"pre-commit": "1.2.x",
"rimraf": "^2.6.2"
},
"babel": {
"presets": [
"@babel/preset-env"
]
}
}

@@ -5,3 +5,3 @@ # liferaft

`liferaft` is an JavaScript implementation of the [Raft] consensus algorithm.
`liferaft` is an JavaScript implementation of the [Raft] consensus algorithm.

@@ -35,2 +35,4 @@ ## Installation

- [LifeRaft#end()](#liferaftend)
- [LifeRaft#command()](#liferaftcommand)
- [Log Replication](#logreplication)
- [Extending](#extending)

@@ -80,3 +82,3 @@ - [Transports](#transports)

- `Log`: An Log compatible constructor we which use for state and data
replication.
replication.

@@ -92,3 +94,3 @@ The timeout values can be configured with either a number which represents the

```js
var raft = new Raft({
var raft = new Raft({
'address': 'tcp://localhost:8089',

@@ -128,2 +130,3 @@ 'election min': '200 millisecond',

`heartbeat` | The leader is about to send a heartbeat message.
`commit` | A command has been saved to the majority of node's logs

@@ -335,2 +338,19 @@ ---

### LifeRaft#command(command)
Save a json command to the log. The command will be added to the log and then
replicated to all the follower nodes. Once the majority of nodes have received
and stored the command. A `commit` event will be triggered so that the
command can be used.
```js
raft.command({name: 'Jimi', surname: 'Hendrix'});
raft.on('commit', function (command) {
console.log(command.name, command.surname);
});
```
## Extending

@@ -355,2 +375,21 @@

## Log Replication
LifeRaft uses [Levelup](https://github.com/Level/levelup) for storing the log
that is replicated to each node. Log replication is optional and so the log
constructor needs to be included in the options when creating a raft instance.
You can use any leveldown compatible database to store the log.
LifeRaft will default to using leveldown. A unique path is required for
each node's log.
```js
const Log = require('liferaft/log');
const raft = new Raft({
adapter: require('leveldown'),
path: './db/log1'
});
```
## Transports

@@ -357,0 +396,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc