Comparing version 0.1.1 to 1.0.0
@@ -1,8 +0,7 @@ | ||
'use strict'; | ||
var debug = require('diagnostics')('raft') | ||
const debug = require('diagnostics')('raft') | ||
, argv = require('argh').argv | ||
, LifeRaft = require('../') | ||
, msg; | ||
, LifeRaft = require('../'); | ||
let msg; | ||
if (argv.queue) msg = require(argv.queue); | ||
@@ -15,10 +14,3 @@ else msg = require('axon'); | ||
// | ||
var MsgRaft = LifeRaft.extend({ | ||
/** | ||
* Reference our socket. | ||
* | ||
* @type {Msg} | ||
* @private | ||
*/ | ||
socket: null, | ||
class MsgRaft extends LifeRaft { | ||
@@ -31,19 +23,16 @@ /** | ||
*/ | ||
initialize: function initialize(options) { | ||
var raft = this | ||
, socket; | ||
initialize (options) { | ||
debug('initializing reply socket on port %s', this.address); | ||
debug('initializing reply socket on port %s', raft.address); | ||
const socket = this.socket = msg.socket('rep'); | ||
socket = raft.socket = msg.socket('rep'); | ||
socket.bind(raft.address); | ||
socket.on('message', function (data, fn) { | ||
raft.emit('data', data, fn); | ||
socket.bind(this.address); | ||
socket.on('message', (data, fn) => { | ||
this.emit('data', data, fn); | ||
}); | ||
socket.on('error', function err() { | ||
debug('failed to initialize on port: ', raft.address); | ||
socket.on('error', () => { | ||
debug('failed to initialize on port: ', this.address); | ||
}); | ||
}, | ||
} | ||
@@ -57,21 +46,18 @@ /** | ||
*/ | ||
write: function write(packet, fn) { | ||
var raft = this | ||
, socket = raft.socket; | ||
write (packet, fn) { | ||
if (!this.socket) { | ||
this.socket = msg.socket('req'); | ||
if (!socket) { | ||
socket = raft.socket = msg.socket('req'); | ||
socket.connect(raft.address); | ||
socket.on('error', function err() { | ||
console.error('failed to write to: ', raft.address); | ||
this.socket.connect(this.address); | ||
this.socket.on('error', function err() { | ||
console.error('failed to write to: ', this.address); | ||
}); | ||
} | ||
debug('writing packet to socket on port %s', raft.address); | ||
socket.send(packet, function (data) { | ||
debug('writing packet to socket on port %s', this.address); | ||
this.socket.send(packet, (data) => { | ||
fn(undefined, data); | ||
}); | ||
} | ||
}); | ||
} | ||
@@ -83,3 +69,3 @@ // | ||
// | ||
var ports = [ | ||
const ports = [ | ||
8081, 8082, | ||
@@ -99,3 +85,3 @@ 8083, 8084, | ||
// | ||
var raft = new MsgRaft('tcp://127.0.0.1:'+ port, { | ||
const raft = new MsgRaft('tcp://127.0.0.1:'+ port, { | ||
'election min': 2000, | ||
@@ -133,3 +119,3 @@ 'election max': 5000, | ||
// | ||
ports.forEach(function join(nr) { | ||
ports.forEach((nr) => { | ||
if (!nr || port === nr) return; | ||
@@ -136,0 +122,0 @@ |
@@ -26,5 +26,5 @@ { | ||
"axon": "2.0.x", | ||
"diagnostics": "0.0.x", | ||
"nanomsg": "0.2.x" | ||
"diagnostics": "1.1.x", | ||
"nanomsg": "4.0.x" | ||
} | ||
} |
@@ -1,4 +0,2 @@ | ||
'use strict'; | ||
var debug = require('diagnostics')('raft') | ||
const debug = require('diagnostics')('raft') | ||
, argv = require('argh').argv | ||
@@ -12,10 +10,3 @@ , LifeRaft = require('../') | ||
// | ||
var TCPRaft = LifeRaft.extend({ | ||
/** | ||
* Reference our socket. | ||
* | ||
* @type {Msg} | ||
* @private | ||
*/ | ||
socket: null, | ||
class TCPRaft extends LifeRaft { | ||
@@ -28,12 +19,12 @@ /** | ||
*/ | ||
initialize: function initialize(options) { | ||
var raft = this; | ||
initialize (options) { | ||
// var raft = this; | ||
var server = net.createServer(function incoming(socket) { | ||
socket.on('data', function (buff) { | ||
const server = net.createServer((socket) => { | ||
socket.on('data', buff => { | ||
var data = JSON.parse(buff.toString()); | ||
debug(raft.address +':packet#data', data); | ||
raft.emit('data', data, function reply(data) { | ||
debug(raft.address +':packet#reply', data); | ||
debug(this.address +':packet#data', data); | ||
this.emit('data', data, data => { | ||
debug(this.address +':packet#reply', data); | ||
socket.write(JSON.stringify(data)); | ||
@@ -48,3 +39,3 @@ socket.end(); | ||
}); | ||
}, | ||
} | ||
@@ -59,10 +50,9 @@ /** | ||
*/ | ||
write: function write(packet, fn) { | ||
var socket = net.connect(this.address) | ||
, raft = this; | ||
write (packet, fn) { | ||
const socket = net.connect(this.address); | ||
debug(raft.address +':packet#write', packet); | ||
debug(this.address +':packet#write', packet); | ||
socket.on('error', fn); | ||
socket.on('data', function (buff) { | ||
var data; | ||
socket.on('data', buff => { | ||
let data; | ||
@@ -72,3 +62,3 @@ try { data = JSON.parse(buff.toString()); } | ||
debug(raft.address +':packet#callback', packet); | ||
debug(this.address +':packet#callback', packet); | ||
fn(undefined, data); | ||
@@ -80,3 +70,3 @@ }); | ||
} | ||
}); | ||
} | ||
@@ -88,3 +78,3 @@ // | ||
// | ||
var ports = [ | ||
const ports = [ | ||
8081, 8082, | ||
@@ -98,3 +88,3 @@ 8083, 8084, | ||
// | ||
var port = +argv.port || ports[0]; | ||
const port = +argv.port || ports[0]; | ||
@@ -105,3 +95,3 @@ // | ||
// | ||
var raft = new TCPRaft(port, { | ||
const raft = new TCPRaft(port, { | ||
'election min': 2000, | ||
@@ -112,7 +102,7 @@ 'election max': 5000, | ||
raft.on('heartbeat timeout', function () { | ||
raft.on('heartbeat timeout', () => { | ||
debug('heart beat timeout, starting election'); | ||
}); | ||
raft.on('term change', function (to, from) { | ||
raft.on('term change', (to, from) => { | ||
debug('were now running on term %s -- was %s', to, from); | ||
@@ -125,3 +115,3 @@ }).on('leader change', function (to, from) { | ||
raft.on('leader', function () { | ||
raft.on('leader', () => { | ||
console.log('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@'); | ||
@@ -132,3 +122,3 @@ console.log('I am elected as leader'); | ||
raft.on('candidate', function () { | ||
raft.on('candidate', () => { | ||
console.log('----------------------------------'); | ||
@@ -142,3 +132,3 @@ console.log('I am starting as candidate'); | ||
// | ||
ports.forEach(function join(nr) { | ||
ports.forEach(nr => { | ||
if (!nr || port === nr) return; | ||
@@ -145,0 +135,0 @@ |
1432
index.js
@@ -1,8 +0,8 @@ | ||
'use strict'; | ||
const EventEmitter = require('eventemitter3'); | ||
const modification = require('modification'); | ||
const Tick = require('tick-tock'); | ||
const ms = require('millisecond'); | ||
const one = require('one-time'); | ||
const emits = require('emits'); | ||
var EventEmitter = require('eventemitter3') | ||
, Tick = require('tick-tock') | ||
, ms = require('millisecond') | ||
, one = require('one-time'); | ||
/** | ||
@@ -13,3 +13,3 @@ * Generate a somewhat unique UUID. | ||
* @returns {String} UUID. | ||
* @api private | ||
* @private | ||
*/ | ||
@@ -28,6 +28,14 @@ function UUID() { | ||
/** | ||
* Emit when modifications are made. | ||
* | ||
* @type {Fucntion} | ||
* @private | ||
*/ | ||
const change = require('modification')(' change'); | ||
/** | ||
* A nope function for when people don't want message acknowledgements. Because | ||
* they don't care about CAP. | ||
* | ||
* @api private | ||
* @private | ||
*/ | ||
@@ -59,436 +67,457 @@ function nope() {} | ||
* @param {Object} options Raft configuration. | ||
* @api public | ||
* @public | ||
*/ | ||
function Raft(address, options) { | ||
var raft = this; | ||
class Raft extends EventEmitter { | ||
constructor(address, options = {}) { | ||
super(); | ||
if (!(raft instanceof Raft)) return new Raft(options); | ||
var raft = this; | ||
options = options || {}; | ||
if ('object' === typeof address) options = address; | ||
else if (!options.address) options.address = address; | ||
if ('object' === typeof address) options = address; | ||
else if (!options.address) options.address = address; | ||
raft.election = { | ||
min: ms(options['election min'] || '150 ms'), | ||
max: ms(options['election max'] || '300 ms') | ||
}; | ||
raft.election = { | ||
min: ms(options['election min'] || '150 ms'), | ||
max: ms(options['election max'] || '300 ms') | ||
}; | ||
raft.beat = ms(options.heartbeat || '50 ms'); | ||
raft.beat = ms(options.heartbeat || '50 ms'); | ||
raft.votes = { | ||
for: null, // Who did we vote for in this current term. | ||
granted: 0 // How many votes we're granted to us. | ||
}; | ||
raft.votes = { | ||
for: null, // Who did we vote for in this current term. | ||
granted: 0 // How many votes we're granted to us. | ||
}; | ||
raft.write = raft.write || options.write || null; | ||
raft.threshold = options.threshold || 0.8; | ||
raft.address = options.address || UUID(); | ||
raft.timers = new Tick(raft); | ||
raft.Log = options.Log; | ||
raft.change = change; | ||
raft.emits = emits; | ||
raft.latency = 0; | ||
raft.log = null; | ||
raft.nodes = []; | ||
raft.write = raft.write || options.write || null; | ||
raft.threshold = options.threshold || 0.8; | ||
raft.address = options.address || UUID(); | ||
raft.timers = new Tick(raft); | ||
raft.Log = options.Log; | ||
raft.latency = 0; | ||
raft.log = null; | ||
raft.nodes = []; | ||
// | ||
// Raft §5.2: | ||
// | ||
// When a server starts, it's always started as Follower and it will remain in | ||
// this state until receive a message from a Leader or Candidate. | ||
// | ||
raft.state = options.state || Raft.FOLLOWER; // Our current state. | ||
raft.leader = ''; // Leader in our cluster. | ||
raft.term = 0; // Our current term. | ||
// | ||
// Raft §5.2: | ||
// | ||
// When a server starts, it's always started as Follower and it will remain in | ||
// this state until receive a message from a Leader or Candidate. | ||
// | ||
raft.state = options.state || Raft.FOLLOWER; // Our current state. | ||
raft.leader = ''; // Leader in our cluster. | ||
raft.term = 0; // Our current term. | ||
raft._initialize(options); | ||
} | ||
raft._initialize(options); | ||
} | ||
/** | ||
* Initialize Raft and start listening to the various of events we're | ||
* emitting as we're quite chatty to provide the maximum amount of flexibility | ||
* and reconfigurability. | ||
* | ||
* @param {Object} options The configuration you passed in the constructor. | ||
* @private | ||
*/ | ||
_initialize(options) { | ||
var raft = this; | ||
// | ||
// Add some sugar and spice and everything nice. Oh, and also inheritance. | ||
// | ||
Raft.extend = require('extendible'); | ||
Raft.prototype = new EventEmitter(); | ||
Raft.prototype.constructor = Raft; | ||
// | ||
// Add some methods which are best done using modules. | ||
// | ||
Raft.prototype.emits = require('emits'); | ||
Raft.prototype.change = require('modification')(' change'); | ||
/** | ||
* Raft §5.1: | ||
* | ||
* A Raft can be in only one of the various states. The stopped state is not | ||
* something that is part of the Raft protocol but something we might want to | ||
* use internally while we're starting or shutting down our node. The following | ||
* states are generated: | ||
* | ||
* - STOPPED: Assume we're dead. | ||
* - LEADER: We're selected as leader process. | ||
* - CANDIDATE: We want to be promoted to leader. | ||
* - FOLLOWER: We're just following a leader. | ||
* - CHILD: A node that has been added using JOIN. | ||
* | ||
* @type {Number} | ||
* @private | ||
*/ | ||
Raft.states = 'STOPPED,LEADER,CANDIDATE,FOLLOWER,CHILD'.split(','); | ||
for (var s = 0; s < Raft.states.length; s++) { | ||
Raft[Raft.states[s]] = s; | ||
} | ||
/** | ||
* Initialize Raft and start listening to the various of events we're | ||
* emitting as we're quite chatty to provide the maximum amount of flexibility | ||
* and reconfigurability. | ||
* | ||
* @param {Object} options The configuration you passed in the constructor. | ||
* @api private | ||
*/ | ||
Raft.prototype._initialize = function initializing(options) { | ||
var raft = this; | ||
// | ||
// Reset our vote as we're starting a new term. Votes only last one term. | ||
// | ||
raft.on('term change', function change() { | ||
raft.votes.for = null; | ||
raft.votes.granted = 0; | ||
}); | ||
// | ||
// Reset our times and start the heartbeat again. If we're promoted to leader | ||
// the heartbeat will automatically be broadcasted to users as well. | ||
// | ||
raft.on('state change', function change(state) { | ||
raft.timers.clear('heartbeat, election'); | ||
raft.heartbeat(Raft.LEADER === raft.state ? raft.beat : raft.timeout()); | ||
raft.emit(Raft.states[state].toLowerCase()); | ||
}); | ||
// | ||
// Receive incoming messages and process them. | ||
// | ||
raft.on('data', function incoming(packet, write) { | ||
write = write || nope; | ||
var reason; | ||
if ('object' !== raft.type(packet)) { | ||
reason = 'Invalid packet received'; | ||
raft.emit('error', new Error(reason)); | ||
return write(raft.packet('error', reason)); | ||
} | ||
// | ||
// Raft §5.1: | ||
// Reset our vote as we're starting a new term. Votes only last one term. | ||
// | ||
// Applies to all states. If a response contains a higher term then our | ||
// current term need to change our state to FOLLOWER and set the received | ||
// term. | ||
raft.on('term change', function change() { | ||
raft.votes.for = null; | ||
raft.votes.granted = 0; | ||
}); | ||
// | ||
// If the raft receives a request with a stale term number it should be | ||
// rejected. | ||
// Reset our times and start the heartbeat again. If we're promoted to leader | ||
// the heartbeat will automatically be broadcasted to users as well. | ||
// | ||
if (packet.term > raft.term) { | ||
raft.change({ | ||
leader: Raft.LEADER === packet.state ? packet.address : packet.leader || raft.leader, | ||
state: Raft.FOLLOWER, | ||
term: packet.term | ||
}); | ||
} else if (packet.term < raft.term) { | ||
reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ raft.term; | ||
raft.emit('error', new Error(reason)); | ||
raft.on('state change', function change(state) { | ||
raft.timers.clear('heartbeat, election'); | ||
raft.heartbeat(Raft.LEADER === raft.state ? raft.beat : raft.timeout()); | ||
raft.emit(Raft.states[state].toLowerCase()); | ||
}); | ||
return write(raft.packet('error', reason)); | ||
} | ||
// | ||
// Raft §5.2: | ||
// Receive incoming messages and process them. | ||
// | ||
// If we receive a message from someone who claims to be leader and shares | ||
// our same term while we're in candidate mode we will recognize their | ||
// leadership and return as follower. | ||
// | ||
// If we got this far we already know that our terms are the same as it | ||
// would be changed or prevented above.. | ||
// | ||
if (Raft.LEADER === packet.state) { | ||
if (Raft.FOLLOWER !== raft.state) raft.change({ state: Raft.FOLLOWER }); | ||
if (packet.address !== raft.leader) raft.change({ leader: packet.address }); | ||
raft.on('data', async (packet, write) => { | ||
write = write || nope; | ||
var reason; | ||
if ('object' !== raft.type(packet)) { | ||
reason = 'Invalid packet received'; | ||
raft.emit('error', new Error(reason)); | ||
return write(await raft.packet('error', reason)); | ||
} | ||
// | ||
// Always when we receive an message from the Leader we need to reset our | ||
// heartbeat. | ||
// Raft §5.1: | ||
// | ||
raft.heartbeat(raft.timeout()); | ||
} | ||
// Applies to all states. If a response contains a higher term then our | ||
// current term need to change our state to FOLLOWER and set the received | ||
// term. | ||
// | ||
// If the raft receives a request with a stale term number it should be | ||
// rejected. | ||
// | ||
if (packet.term > raft.term) { | ||
raft.change({ | ||
leader: Raft.LEADER === packet.state ? packet.address : packet.leader || raft.leader, | ||
state: Raft.FOLLOWER, | ||
term: packet.term | ||
}); | ||
} else if (packet.term < raft.term) { | ||
reason = 'Stale term detected, received `'+ packet.term +'` we are at '+ raft.term; | ||
raft.emit('error', new Error(reason)); | ||
switch (packet.type) { | ||
return write(raft.packet('error', reason)); | ||
} | ||
// | ||
// Raft §5.2: | ||
// Raft §5.4: | ||
// | ||
// A raft asked us to vote on them. We can only vote to them if they | ||
// represent a higher term (and last log term, last log index). | ||
// If we receive a message from someone who claims to be leader and shares | ||
// our same term while we're in candidate mode we will recognize their | ||
// leadership and return as follower. | ||
// | ||
case 'vote': | ||
// If we got this far we already know that our terms are the same as it | ||
// would be changed or prevented above.. | ||
// | ||
if (Raft.LEADER === packet.state) { | ||
if (Raft.FOLLOWER !== raft.state) raft.change({ state: Raft.FOLLOWER }); | ||
if (packet.address !== raft.leader) raft.change({ leader: packet.address }); | ||
// | ||
// The term of the vote is bigger then ours so we need to update it. If | ||
// it's the same and we already voted, we need to deny the vote. | ||
// Always when we receive an message from the Leader we need to reset our | ||
// heartbeat. | ||
// | ||
if (raft.votes.for && raft.votes.for !== packet.address) { | ||
raft.emit('vote', packet, false); | ||
raft.heartbeat(raft.timeout()); | ||
} | ||
return write(raft.packet('voted', { granted: false })); | ||
} | ||
switch (packet.type) { | ||
// | ||
// If we maintain a log, check if the candidates log is as up to date as | ||
// ours. | ||
// Raft §5.2: | ||
// Raft §5.4: | ||
// | ||
// @TODO point to index of last commit entry. | ||
// @TODO point to term of last commit entry. | ||
// A raft asked us to vote on them. We can only vote to them if they | ||
// represent a higher term (and last log term, last log index). | ||
// | ||
if (raft.log && packet.last && ( | ||
raft.log.index > packet.last.index | ||
|| raft.term > packet.last.term | ||
)) { | ||
raft.emit('vote', packet, false); | ||
case 'vote': | ||
// | ||
// The term of the vote is bigger then ours so we need to update it. If | ||
// it's the same and we already voted, we need to deny the vote. | ||
// | ||
if (raft.votes.for && raft.votes.for !== packet.address) { | ||
raft.emit('vote', packet, false); | ||
return write(raft.packet('voted', { granted: false })); | ||
} | ||
return write(await raft.packet('voted', { granted: false })); | ||
} | ||
// | ||
// We've made our decision, we haven't voted for this term yet and this | ||
// candidate came in first so it gets our vote as all requirements are | ||
// met. | ||
// | ||
raft.votes.for = packet.address; | ||
raft.emit('vote', packet, true); | ||
raft.change({ leader: packet.address, term: packet.term }); | ||
write(raft.packet('voted', { granted: true })); | ||
// | ||
// If we maintain a log, check if the candidates log is as up to date as | ||
// ours. | ||
// | ||
if (raft.log) { | ||
const { index, term } = await raft.log.getLastInfo(); | ||
// | ||
// We've accepted someone as potential new leader, so we should reset | ||
// our heartbeat to prevent this raft from timing out after voting. | ||
// Which would again increment the term causing us to be next CANDIDATE | ||
// and invalidates the request we just got, so that's silly willy. | ||
// | ||
raft.heartbeat(raft.timeout()); | ||
break; | ||
if (index > packet.last.index && term > packet.last.term) { | ||
raft.emit('vote', packet, false); | ||
// | ||
// A new incoming vote. | ||
// | ||
case 'voted': | ||
// | ||
// Only accepts votes while we're still in a CANDIDATE state. | ||
// | ||
if (Raft.CANDIDATE !== raft.state) { | ||
return write(raft.packet('error', 'No longer a candidate, ignoring vote')); | ||
} | ||
return write(await raft.packet('voted', { granted: false })); | ||
} | ||
} | ||
// | ||
// Increment our received votes when our voting request has been | ||
// granted by the raft that received the data. | ||
// | ||
if (packet.data.granted) { | ||
raft.votes.granted++; | ||
} | ||
// | ||
// We've made our decision, we haven't voted for this term yet and this | ||
// candidate came in first so it gets our vote as all requirements are | ||
// met. | ||
// | ||
raft.votes.for = packet.address; | ||
raft.emit('vote', packet, true); | ||
raft.change({ leader: packet.address, term: packet.term }); | ||
write(await raft.packet('voted', { granted: true })); | ||
// | ||
// We've accepted someone as potential new leader, so we should reset | ||
// our heartbeat to prevent this raft from timing out after voting. | ||
// Which would again increment the term causing us to be next CANDIDATE | ||
// and invalidates the request we just got, so that's silly willy. | ||
// | ||
raft.heartbeat(raft.timeout()); | ||
break; | ||
// | ||
// Check if we've received the minimal amount of votes required for this | ||
// current voting round to be considered valid. | ||
// A new incoming vote. | ||
// | ||
if (raft.quorum(raft.votes.granted)) { | ||
raft.change({ leader: raft.address, state: Raft.LEADER }); | ||
case 'voted': | ||
// | ||
// Only accepts votes while we're still in a CANDIDATE state. | ||
// | ||
if (Raft.CANDIDATE !== raft.state) { | ||
return write(await raft.packet('error', 'No longer a candidate, ignoring vote')); | ||
} | ||
// | ||
// Send a heartbeat message to all connected clients. | ||
// Increment our received votes when our voting request has been | ||
// granted by the raft that received the data. | ||
// | ||
raft.message(Raft.FOLLOWER, raft.packet('append')); | ||
} | ||
if (packet.data.granted) { | ||
raft.votes.granted++; | ||
} | ||
// | ||
// Check if we've received the minimal amount of votes required for this | ||
// current voting round to be considered valid. | ||
// | ||
if (raft.quorum(raft.votes.granted)) { | ||
raft.change({ leader: raft.address, state: Raft.LEADER }); | ||
// | ||
// Send a heartbeat message to all connected clients. | ||
// | ||
raft.message(Raft.FOLLOWER, await raft.packet('append')); | ||
} | ||
// | ||
// Empty write, nothing to do. | ||
// | ||
write(); | ||
break; | ||
case 'error': | ||
raft.emit('error', new Error(packet.data)); | ||
break; | ||
case 'append': | ||
const {term, index} = await raft.log.getLastInfo(); | ||
// We do not have the last index as our last entry | ||
// Look back in log in case we have it previously | ||
// if we do remove any bad uncommitted entries following it | ||
if (packet.last.index !== index && packet.last.index !== 0) { | ||
const hasIndex = await raft.log.has(packet.last.index); | ||
if (hasIndex) raft.log.removeEntriesAfter(packet.last.index); | ||
else return raft.message(Raft.LEADER, await raft.packet('append fail', { | ||
term: packet.last.term, | ||
index: packet.last.index | ||
})); | ||
} | ||
if (packet.data) { | ||
const entry = packet.data[0]; | ||
await raft.log.saveCommand(entry.command, entry.term, entry.index); | ||
raft.message(Raft.LEADER, await raft.packet('append ack', { | ||
term: entry.term, | ||
index: entry.index | ||
})); | ||
} | ||
//if packet commit index not the same. Commit commands | ||
if (raft.log.committedIndex < packet.last.committedIndex) { | ||
const entries = await raft.log.getUncommittedEntriesUpToIndex(packet.last.committedIndex, packet.last.term); | ||
raft.commitEntries(entries); | ||
} | ||
break; | ||
case 'append ack': | ||
const entry = await raft.log.commandAck(packet.data.index, packet.address); | ||
if (raft.quorum(entry.responses.length) && !entry.committed) { | ||
const entries = await raft.log.getUncommittedEntriesUpToIndex(entry.index, entry.term); | ||
raft.commitEntries(entries); | ||
} | ||
break; | ||
case 'append fail': | ||
const previousEntry = await raft.log.get(packet.data.index); | ||
const append = await raft.appendPacket(previousEntry); | ||
write(append); | ||
break; | ||
// | ||
// Empty write, nothing to do. | ||
// RPC command | ||
// | ||
write(); | ||
break; | ||
case 'exec': | ||
break; | ||
case 'error': | ||
raft.emit('error', new Error(packet.data)); | ||
break; | ||
// | ||
// Unknown event, we have no idea how to process this so we're going to | ||
// return an error. | ||
// | ||
default: | ||
if (raft.listeners('rpc').length) { | ||
raft.emit('rpc', packet, write); | ||
} else { | ||
write(await raft.packet('error', 'Unknown message type: '+ packet.type)); | ||
} | ||
} | ||
}); | ||
// | ||
// Remark: Are we assuming we are getting an appendEntries from the | ||
// leader and comparing and appending our log? | ||
// | ||
case 'append': | ||
break; | ||
// | ||
// We do not need to execute the rest of the functionality below as we're | ||
// currently running as "child" raft of the cluster not as the "root" raft. | ||
// | ||
if (Raft.CHILD === raft.state) return raft.emit('initialize'); | ||
// | ||
// Remark: So does this get emit when we need to write our OWN log? | ||
// | ||
case 'log': | ||
break; | ||
// | ||
// Setup the log & appends. Assume that if we're given a function log that it | ||
// needs to be initialized as it requires access to our raft instance so it | ||
// can read our information like our leader, state, term etc. | ||
// | ||
if ('function' === raft.type(raft.Log)) { | ||
raft.log = new raft.Log(raft, options); | ||
} | ||
// | ||
// RPC command | ||
// | ||
case 'exec': | ||
break; | ||
/** | ||
* The raft is now listening to events so we can start our heartbeat timeout. | ||
* So that if we don't hear anything from a leader we can promote our selfs to | ||
* a candidate state. | ||
* | ||
* Start listening listening for heartbeats when implementors are also ready | ||
* with setting up their code. | ||
* | ||
* @api private | ||
*/ | ||
function initialize(err) { | ||
if (err) return raft.emit('error', err); | ||
// | ||
// Unknown event, we have no idea how to process this so we're going to | ||
// return an error. | ||
// | ||
default: | ||
if (raft.listeners('rpc').length) { | ||
raft.emit('rpc', packet, write); | ||
} else { | ||
write(raft.packet('error', 'Unknown message type: '+ packet.type)); | ||
} | ||
raft.emit('initialize'); | ||
raft.heartbeat(raft.timeout()); | ||
} | ||
}); | ||
// | ||
// We do not need to execute the rest of the functionality below as we're | ||
// currently running as "child" raft of the cluster not as the "root" raft. | ||
// | ||
if (Raft.CHILD === raft.state) return raft.emit('initialize'); | ||
if ('function' === raft.type(raft.initialize)) { | ||
if (raft.initialize.length === 2) return raft.initialize(options, initialize); | ||
raft.initialize(options); | ||
} | ||
// | ||
// Setup the log & appends. Assume that if we're given a function log that it | ||
// needs to be initialized as it requires access to our raft instance so it | ||
// can read our information like our leader, state, term etc. | ||
// | ||
if ('function' === raft.type(raft.Log)) { | ||
raft.log = new raft.Log(raft, options); | ||
initialize(); | ||
} | ||
/** | ||
* The raft is now listening to events so we can start our heartbeat timeout. | ||
* So that if we don't hear anything from a leader we can promote our selfs to | ||
* a candidate state. | ||
* Proper type checking. | ||
* | ||
* Start listening listening for heartbeats when implementors are also ready | ||
* with setting up their code. | ||
* @param {Mixed} of Thing we want to know the type of. | ||
* @returns {String} The type. | ||
* @private | ||
*/ | ||
type(of) { | ||
return Object.prototype.toString.call(of).slice(8, -1).toLowerCase(); | ||
} | ||
/** | ||
* Check if we've reached our quorum (a.k.a. minimum amount of votes requires | ||
* for a voting round to be considered valid) for the given amount of votes. | ||
* | ||
* @api private | ||
* @param {Number} responses Amount of responses received. | ||
* @returns {Boolean} | ||
* @public | ||
*/ | ||
function initialize(err) { | ||
if (err) return raft.emit('error', err); | ||
quorum(responses) { | ||
if (!this.nodes.length || !responses) return false; | ||
raft.emit('initialize'); | ||
raft.heartbeat(raft.timeout()); | ||
return responses >= this.majority(); | ||
} | ||
if ('function' === raft.type(raft.initialize)) { | ||
if (raft.initialize.length === 2) return raft.initialize(options, initialize); | ||
raft.initialize(options); | ||
/** | ||
* The majority required to reach our the quorum. | ||
* | ||
* @returns {Number} | ||
* @public | ||
*/ | ||
majority() { | ||
return Math.ceil(this.nodes.length / 2) + 1; | ||
} | ||
initialize(); | ||
}; | ||
/** | ||
* Attempt to run a function indefinitely until the callback is called. | ||
* | ||
* @param {Function} attempt Function that needs to be attempted. | ||
* @param {Function} fn Completion callback. | ||
* @param {Number} timeout Which timeout should we use. | ||
* @returns {Raft} | ||
* @public | ||
*/ | ||
indefinitely(attempt, fn, timeout) { | ||
var uuid = UUID() | ||
, raft = this; | ||
/** | ||
* Proper type checking. | ||
* | ||
* @param {Mixed} of Thing we want to know the type of. | ||
* @returns {String} The type. | ||
* @api private | ||
*/ | ||
Raft.prototype.type = function type(of) { | ||
return Object.prototype.toString.call(of).slice(8, -1).toLowerCase(); | ||
}; | ||
(function again() { | ||
// | ||
// We need to force async execution here because we do not want to saturate | ||
// the event loop with sync executions. We know that it's important these | ||
// functions are retried indefinitely but if it's called synchronously we will | ||
// not have time to receive data or updates. | ||
// | ||
var next = one(function force(err, data) { | ||
if (!raft.timers) return; // We're been destroyed, ignore all. | ||
/** | ||
* Check if we've reached our quorum (a.k.a. minimum amount of votes requires | ||
* for a voting round to be considered valid) for the given amount of votes. | ||
* | ||
* @param {Number} responses Amount of responses received. | ||
* @returns {Boolean} | ||
* @api public | ||
*/ | ||
Raft.prototype.quorum = function quorum(responses) { | ||
if (!this.nodes.length || !responses) return false; | ||
raft.timers.setImmediate(uuid +'@async', function async() { | ||
if (err) { | ||
raft.emit('error', err); | ||
return responses >= this.majority(); | ||
}; | ||
return again(); | ||
} | ||
/** | ||
* The majority required to reach our the quorum. | ||
* | ||
* @returns {Number} | ||
* @api public | ||
*/ | ||
Raft.prototype.majority = function majority() { | ||
return Math.ceil(this.nodes.length / 2) + 1; | ||
}; | ||
fn(data); | ||
}); | ||
}); | ||
/** | ||
* Attempt to run a function indefinitely until the callback is called. | ||
* | ||
* @param {Function} attempt Function that needs to be attempted. | ||
* @param {Function} fn Completion callback. | ||
* @param {Number} timeout Which timeout should we use. | ||
* @returns {Raft} | ||
* @api public | ||
*/ | ||
Raft.prototype.indefinitely = function indefinitely(attempt, fn, timeout) { | ||
var uuid = UUID() | ||
, raft = this; | ||
// | ||
// Ensure that the assigned callback has the same context as our raft. | ||
// | ||
attempt.call(raft, next); | ||
(function again() { | ||
// | ||
// We need to force async execution here because we do not want to saturate | ||
// the event loop with sync executions. We know that it's important these | ||
// functions are retried indefinitely but if it's called synchronously we will | ||
// not have time to receive data or updates. | ||
// | ||
var next = one(function force(err, data) { | ||
if (!raft.timers) return; // We're been destroyed, ignore all. | ||
raft.timers.setTimeout(uuid, function timeoutfn() { | ||
next(new Error('Timed out, attempting to retry again')); | ||
}, +timeout || raft.timeout()); | ||
}()); | ||
raft.timers.setImmediate(uuid +'@async', function async() { | ||
if (err) { | ||
raft.emit('error', err); | ||
return this; | ||
} | ||
return again(); | ||
} | ||
/** | ||
* Start or update the heartbeat of the Raft. If we detect that we've received | ||
* a heartbeat timeout we will promote our selfs to a candidate to take over the | ||
* leadership. | ||
* | ||
* @param {String|Number} duration Time it would take for the heartbeat to timeout. | ||
* @returns {Raft} | ||
* @private | ||
*/ | ||
heartbeat(duration) { | ||
var raft = this; | ||
fn(data); | ||
}); | ||
}); | ||
duration = duration || raft.beat; | ||
// | ||
// Ensure that the assigned callback has the same context as our raft. | ||
// | ||
attempt.call(raft, next); | ||
if (raft.timers.active('heartbeat')) { | ||
raft.timers.adjust('heartbeat', duration); | ||
raft.timers.setTimeout(uuid, function timeoutfn() { | ||
next(new Error('Timed out, attempting to retry again')); | ||
}, +timeout || raft.timeout()); | ||
}()); | ||
return raft; | ||
} | ||
return this; | ||
}; | ||
raft.timers.setTimeout('heartbeat', async () => { | ||
if (Raft.LEADER !== raft.state) { | ||
raft.emit('heartbeat timeout'); | ||
/** | ||
* Start or update the heartbeat of the Raft. If we detect that we've received | ||
* a heartbeat timeout we will promote our selfs to a candidate to take over the | ||
* leadership. | ||
* | ||
* @param {String|Number} duration Time it would take for the heartbeat to timeout. | ||
* @returns {Raft} | ||
* @api private | ||
*/ | ||
Raft.prototype.heartbeat = function heartbeat(duration) { | ||
var raft = this; | ||
return raft.promote(); | ||
} | ||
duration = duration || raft.beat; | ||
// | ||
// According to the raft spec we should be sending empty append requests as | ||
// heartbeat. We want to emit an event so people can modify or inspect the | ||
// payload before we send it. It's also a good indication for when the | ||
// idle state of a LEADER as it didn't get any messages to append/commit to | ||
// the FOLLOWER'S. | ||
// | ||
var packet = await raft.packet('append'); | ||
if (raft.timers.active('heartbeat')) { | ||
raft.timers.adjust('heartbeat', duration); | ||
raft.emit('heartbeat', packet); | ||
raft.message(Raft.FOLLOWER, packet).heartbeat(raft.beat); | ||
}, duration); | ||
@@ -498,382 +527,437 @@ return raft; | ||
raft.timers.setTimeout('heartbeat', function heartbeattimeout() { | ||
if (Raft.LEADER !== raft.state) { | ||
raft.emit('heartbeat timeout'); | ||
/** | ||
* Send a message to connected nodes within our cluster. The following messaging | ||
* patterns (who) are available: | ||
* | ||
* - Raft.LEADER : Send a message to cluster's current leader. | ||
* - Raft.FOLLOWER : Send a message to all non leaders. | ||
* - Raft.CHILD : Send a message to everybody. | ||
* - <address> : Send a message to a raft based on the address. | ||
* | ||
* @param {Mixed} who Recipient of the message. | ||
* @param {Mixed} what The data we need to send. | ||
* @param {Function} when Completion callback | ||
* @returns {Raft} | ||
* @public | ||
*/ | ||
message(who, what, when) { | ||
when = when || nope; | ||
return raft.promote(); | ||
} | ||
// | ||
// According to the raft spec we should be sending empty append requests as | ||
// heartbeat. We want to emit an event so people can modify or inspect the | ||
// payload before we send it. It's also a good indication for when the | ||
// idle state of a LEADER as it didn't get any messages to append/commit to | ||
// the FOLLOWER'S. | ||
// If the "who" is undefined, the developer made an error somewhere. Tell them! | ||
// | ||
var packet = raft.packet('append'); | ||
if (typeof who === 'undefined') { | ||
throw new Error('Cannot send message to `undefined`. Check your spelling!'); | ||
} | ||
raft.emit('heartbeat', packet); | ||
raft.message(Raft.FOLLOWER, packet).heartbeat(raft.beat); | ||
}, duration); | ||
var output = { errors: {}, results: {} } | ||
, length = this.nodes.length | ||
, errors = false | ||
, latency = [] | ||
, raft = this | ||
, nodes = [] | ||
, i = 0; | ||
return raft; | ||
}; | ||
switch (who) { | ||
case Raft.LEADER: for (; i < length; i++) | ||
if (raft.leader === raft.nodes[i].address) { | ||
nodes.push(raft.nodes[i]); | ||
} | ||
break; | ||
/** | ||
* Send a message to connected nodes within our cluster. The following messaging | ||
* patterns (who) are available: | ||
* | ||
* - Raft.LEADER : Send a message to cluster's current leader. | ||
* - Raft.FOLLOWER : Send a message to all non leaders. | ||
* - Raft.CHILD : Send a message to everybody. | ||
* - <address> : Send a message to a raft based on the address. | ||
* | ||
* @param {Mixed} who Recipient of the message. | ||
* @param {Mixed} what The data we need to send. | ||
* @param {Function} when Completion callback | ||
* @returns {Raft} | ||
* @api public | ||
*/ | ||
Raft.prototype.message = function message(who, what, when) { | ||
when = when || nope; | ||
case Raft.FOLLOWER: for (; i < length; i++) | ||
if (raft.leader !== raft.nodes[i].address) { | ||
nodes.push(raft.nodes[i]); | ||
} | ||
break; | ||
// | ||
// If the "who" is undefined, the developer made an error somewhere. Tell them! | ||
// | ||
if (typeof who === 'undefined') { | ||
throw new Error('Cannot send message to `undefined`. Check your spelling!'); | ||
} | ||
case Raft.CHILD: | ||
Array.prototype.push.apply(nodes, raft.nodes); | ||
break; | ||
var output = { errors: {}, results: {} } | ||
, length = this.nodes.length | ||
, errors = false | ||
, latency = [] | ||
, raft = this | ||
, nodes = [] | ||
, i = 0; | ||
default: for (; i < length; i++) | ||
if (who === raft.nodes[i].address) { | ||
nodes.push(raft.nodes[i]); | ||
} | ||
} | ||
switch (who) { | ||
case Raft.LEADER: for (; i < length; i++) | ||
if (raft.leader === raft.nodes[i].address) { | ||
nodes.push(raft.nodes[i]); | ||
} | ||
break; | ||
/** | ||
* A small wrapper to force indefinitely sending of a certain packet. | ||
* | ||
* @param {Raft} client Raft we need to write a message to. | ||
* @param {Object} data Message that needs to be send. | ||
* @api private | ||
*/ | ||
function wrapper(client, data) { | ||
var start = +new Date(); | ||
case Raft.FOLLOWER: for (; i < length; i++) | ||
if (raft.leader !== raft.nodes[i].address) { | ||
nodes.push(raft.nodes[i]); | ||
} | ||
break; | ||
client.write(data, function written(err, data) { | ||
latency.push(+new Date() - start); | ||
case Raft.CHILD: | ||
Array.prototype.push.apply(nodes, raft.nodes); | ||
break; | ||
// | ||
// Add the error or output to our `output` object to be | ||
// passed to the callback when all the writing is done. | ||
// | ||
if (err) { | ||
errors = true; | ||
output.errors[client.address] = err; | ||
} else { | ||
output.results[client.address] = data; | ||
} | ||
default: for (; i < length; i++) | ||
if (who === raft.nodes[i].address) { | ||
nodes.push(raft.nodes[i]); | ||
} | ||
// | ||
// OK, so this is the strange part here. We've broadcasted messages and | ||
// got replies back. This reply contained data so we need to process it. | ||
// What if the data is incorrect? Then we have no way at the moment to | ||
// send back reply to a reply to the server. | ||
// | ||
if (err) raft.emit('error', err); | ||
else if (data) raft.emit('data', data); | ||
// | ||
// Messaging has been completed. | ||
// | ||
if (latency.length === length) { | ||
raft.timing(latency); | ||
when(errors ? output.errors : undefined, output.results); | ||
latency.length = nodes.length = 0; | ||
output = null; | ||
} | ||
}); | ||
} | ||
length = nodes.length; | ||
i = 0; | ||
for (; i < length; i++) { | ||
wrapper(nodes[i], what); | ||
} | ||
return raft; | ||
}; | ||
/** | ||
* Generate the various of timeouts. | ||
* | ||
* @returns {Number} | ||
* @private | ||
*/ | ||
timeout() { | ||
var times = this.election; | ||
return Math.floor(Math.random() * (times.max - times.min + 1) + times.min); | ||
} | ||
/** | ||
* A small wrapper to force indefinitely sending of a certain packet. | ||
* Calculate if our average latency causes us to come dangerously close to the | ||
* minimum election timeout. | ||
* | ||
* @param {Raft} client Raft we need to write a message to. | ||
* @param {Object} data Message that needs to be send. | ||
* @api private | ||
* @param {Array} latency Latency of the last broadcast. | ||
* @param {Boolean} Success-fully calculated the threshold. | ||
* @private | ||
*/ | ||
function wrapper(client, data) { | ||
var start = +new Date(); | ||
timing(latency) { | ||
var raft = this | ||
, sum = 0 | ||
, i = 0; | ||
client.write(data, function written(err, data) { | ||
latency.push(+new Date() - start); | ||
if (Raft.STOPPED === raft.state) return false; | ||
// | ||
// Add the error or output to our `output` object to be | ||
// passed to the callback when all the writing is done. | ||
// | ||
if (err) { | ||
errors = true; | ||
output.errors[client.address] = err; | ||
} else { | ||
output.results[client.address] = data; | ||
} | ||
for (; i < latency.length; i++) { | ||
sum += latency[i]; | ||
} | ||
// | ||
// OK, so this is the strange part here. We've broadcasted messages and | ||
// got replies back. This reply contained data so we need to process it. | ||
// What if the data is incorrect? Then we have no way at the moment to | ||
// send back reply to a reply to the server. | ||
// | ||
if (err) raft.emit('error', err); | ||
else if (data) raft.emit('data', data); | ||
raft.latency = Math.floor(sum / latency.length); | ||
// | ||
// Messaging has been completed. | ||
// | ||
if (latency.length === length) { | ||
raft.timing(latency); | ||
when(errors ? output.errors : undefined, output.results); | ||
latency.length = nodes.length = 0; | ||
output = null; | ||
} | ||
}); | ||
if (raft.latency > raft.election.min * raft.threshold) { | ||
raft.emit('threshold'); | ||
} | ||
return true; | ||
} | ||
length = nodes.length; | ||
i = 0; | ||
/** | ||
* Raft §5.2: | ||
* | ||
* We've detected a timeout from the leaders heartbeats and need to start a new | ||
* election for leadership. We increment our current term, set the CANDIDATE | ||
* state, vote our selfs and ask all others rafts to vote for us. | ||
* | ||
* @returns {Raft} | ||
* @private | ||
*/ | ||
async promote() { | ||
var raft = this; | ||
for (; i < length; i++) { | ||
wrapper(nodes[i], what); | ||
} | ||
raft.change({ | ||
state: Raft.CANDIDATE, // We're now a candidate, | ||
term: raft.term + 1, // but only for this term. | ||
leader: '' // We no longer have a leader. | ||
}); | ||
return raft; | ||
}; | ||
// | ||
// Candidates are always biased and vote for them selfs first before sending | ||
// out a voting request to all other rafts in the cluster. | ||
// | ||
raft.votes.for = raft.address; | ||
raft.votes.granted = 1; | ||
/** | ||
* Generate the various of timeouts. | ||
* | ||
* @returns {Number} | ||
* @api private | ||
*/ | ||
Raft.prototype.timeout = function timeout() { | ||
var times = this.election; | ||
// | ||
// Broadcast the voting request to all connected rafts in your private | ||
// cluster. | ||
// | ||
const packet = await raft.packet('vote') | ||
return Math.floor(Math.random() * (times.max - times.min + 1) + times.min); | ||
}; | ||
raft.message(Raft.FOLLOWER, packet); | ||
/** | ||
* Calculate if our average latency causes us to come dangerously close to the | ||
* minimum election timeout. | ||
* | ||
* @param {Array} latency Latency of the last broadcast. | ||
* @param {Boolean} Success-fully calculated the threshold. | ||
* @api private | ||
*/ | ||
Raft.prototype.timing = function timing(latency) { | ||
var raft = this | ||
, sum = 0 | ||
, i = 0; | ||
// | ||
// Set the election timeout. This gives the rafts some time to reach | ||
// consensuses about who they want to vote for. If no consensus has been | ||
// reached within the set timeout we will attempt it again. | ||
// | ||
raft.timers | ||
.clear('heartbeat, election') | ||
.setTimeout('election', raft.promote, raft.timeout()); | ||
if (Raft.STOPPED === raft.state) return false; | ||
for (; i < latency.length; i++) { | ||
sum += latency[i]; | ||
return raft; | ||
} | ||
raft.latency = Math.floor(sum / latency.length); | ||
/** | ||
* Wrap the outgoing messages in an object with additional required data. | ||
* | ||
* @async | ||
* @param {String} type Message type we're trying to send. | ||
* @param {Mixed} data Data to be transfered. | ||
* @returns {Promise<Object>} Packet. | ||
* @private | ||
*/ | ||
async packet(type, data) { | ||
var raft = this | ||
, wrapped = { | ||
state: raft.state, // Are we're a leader, candidate or follower. | ||
term: raft.term, // Our current term so we can find mis matches. | ||
address: raft.address, // Address of the sender. | ||
type: type, // Message type. | ||
leader: raft.leader, // Who is our leader. | ||
}; | ||
if (raft.latency > raft.election.min * raft.threshold) { | ||
raft.emit('threshold'); | ||
// | ||
// If we have logging and state replication enabled we also need to send this | ||
// additional data so we can use it determine the state of this raft. | ||
// | ||
if (raft.log) wrapped.last = await raft.log.getLastInfo(); | ||
if (arguments.length === 2) wrapped.data = data; | ||
return wrapped; | ||
} | ||
return true; | ||
}; | ||
/** | ||
* appendPacket - Send append message with entry and using the previous entry as the last.index and last.term | ||
* | ||
* @param {Entry} entry Entry to send as data | ||
* | ||
* @return {Promise<object>} Description | ||
* @private | ||
*/ | ||
async appendPacket (entry) { | ||
const raft = this; | ||
const last = await raft.log.getEntryInfoBefore(entry); | ||
return{ | ||
state: raft.state, // Are we're a leader, candidate or follower. | ||
term: raft.term, // Our current term so we can find mis matches. | ||
address: raft.address, // Address of the sender. | ||
type: 'append', // Append message type . | ||
leader: raft.leader, // Who is our leader. | ||
data: [entry], // The command to send to the other nodes | ||
last, | ||
}; | ||
} | ||
/** | ||
* Raft §5.2: | ||
* | ||
* We've detected a timeout from the leaders heartbeats and need to start a new | ||
* election for leadership. We increment our current term, set the CANDIDATE | ||
* state, vote our selfs and ask all others rafts to vote for us. | ||
* | ||
* @returns {Raft} | ||
* @api private | ||
*/ | ||
Raft.prototype.promote = function promote() { | ||
var raft = this; | ||
/** | ||
* Create a clone of the current instance with the same configuration. Ideally | ||
* for creating connected nodes in a cluster.. And let that be something we're | ||
* planning on doing. | ||
* | ||
* @param {Object} options Configuration that should override the default config. | ||
* @returns {Raft} The newly created instance. | ||
* @public | ||
*/ | ||
clone(options) { | ||
options = options || {}; | ||
raft.change({ | ||
state: Raft.CANDIDATE, // We're now a candidate, | ||
term: raft.term + 1, // but only for this term. | ||
leader: '' // We no longer have a leader. | ||
}); | ||
var raft = this | ||
, node = { | ||
'Log': raft.Log, | ||
'election max': raft.election.max, | ||
'election min': raft.election.min, | ||
'heartbeat': raft.beat, | ||
'threshold': raft.threshold, | ||
}, key; | ||
// | ||
// Candidates are always biased and vote for them selfs first before sending | ||
// out a voting request to all other rafts in the cluster. | ||
// | ||
raft.votes.for = raft.address; | ||
raft.votes.granted = 1; | ||
for (key in node) { | ||
if (key in options || !node.hasOwnProperty(key)) continue; | ||
// | ||
// Broadcast the voting request to all connected rafts in your private | ||
// cluster. | ||
// | ||
var packet = raft.packet('vote') | ||
, i = 0; | ||
options[key] = node[key]; | ||
} | ||
raft.message(Raft.FOLLOWER, raft.packet('vote')); | ||
return new raft.constructor(options); | ||
} | ||
// | ||
// Set the election timeout. This gives the rafts some time to reach | ||
// consensuses about who they want to vote for. If no consensus has been | ||
// reached within the set timeout we will attempt it again. | ||
// | ||
raft.timers | ||
.clear('heartbeat, election') | ||
.setTimeout('election', raft.promote, raft.timeout()); | ||
/** | ||
* A new raft is about to join the cluster. So we need to upgrade the | ||
* configuration of every single raft. | ||
* | ||
* @param {String} address The address of the raft that is connected. | ||
* @param {Function} write A method that we use to write data. | ||
* @returns {Raft} The raft we created and that joined our cluster. | ||
* @public | ||
*/ | ||
join(address, write) { | ||
var raft = this; | ||
return raft; | ||
}; | ||
// can be function or asyncfunction | ||
if (/function/.test(raft.type(address))) { | ||
write = address; address = null; | ||
} | ||
/** | ||
* Wrap the outgoing messages in an object with additional required data. | ||
* | ||
* @param {String} type Message type we're trying to send. | ||
* @param {Mixed} data Data to be transfered. | ||
* @returns {Object} Packet. | ||
* @api private | ||
*/ | ||
Raft.prototype.packet = function wrap(type, data) { | ||
var raft = this | ||
, packet = { | ||
state: raft.state, // Are we're a leader, candidate or follower. | ||
term: raft.term, // Our current term so we can find mis matches. | ||
address: raft.address, // Address of the sender. | ||
type: type, // Message type. | ||
leader: raft.leader, // Who is our leader. | ||
}; | ||
// | ||
// You shouldn't be able to join the cluster as your self. So we're going to | ||
// add a really simple address check here. Return nothing so people can actually | ||
// check if a raft has been added. | ||
// | ||
if (raft.address === address) return; | ||
// | ||
// If we have logging and state replication enabled we also need to send this | ||
// additional data so we can use it determine the state of this raft. | ||
// | ||
// @TODO point to index of last commit entry. | ||
// @TODO point to term of last commit entry. | ||
// | ||
if (raft.log) packet.last = { term: raft.term, index: raft.log.index }; | ||
if (arguments.length === 2) packet.data = data; | ||
var node = raft.clone({ | ||
write: write, // Optional function that receives our writes. | ||
address: address, // A custom address for the raft we added. | ||
state: Raft.CHILD // We are a raft in the cluster. | ||
}); | ||
return packet; | ||
}; | ||
node.once('end', function end() { | ||
raft.leave(node); | ||
}, raft); | ||
/** | ||
* Create a clone of the current instance with the same configuration. Ideally | ||
* for creating connected nodes in a cluster.. And let that be something we're | ||
* planning on doing. | ||
* | ||
* @param {Object} options Configuration that should override the default config. | ||
* @returns {Raft} The newly created instance. | ||
* @api public | ||
*/ | ||
Raft.prototype.clone = function clone(options) { | ||
options = options || {}; | ||
raft.nodes.push(node); | ||
raft.emit('join', node); | ||
var raft = this | ||
, node = { | ||
'Log': raft.Log, | ||
'election max': raft.election.max, | ||
'election min': raft.election.min, | ||
'heartbeat': raft.beat, | ||
'threshold': raft.threshold, | ||
}, key; | ||
return node; | ||
} | ||
for (key in node) { | ||
if (key in options || !node.hasOwnProperty(key)) continue; | ||
/** | ||
* Remove a raft from the cluster. | ||
* | ||
* @param {String} address The address of the raft that should be removed. | ||
* @returns {Raft} The raft that we removed. | ||
* @public | ||
*/ | ||
leave(address) { | ||
var raft = this | ||
, index = -1 | ||
, node; | ||
options[key] = node[key]; | ||
} | ||
for (var i = 0; i < raft.nodes.length; i++) { | ||
if (raft.nodes[i] === address || raft.nodes[i].address === address) { | ||
node = raft.nodes[i]; | ||
index = i; | ||
break; | ||
} | ||
} | ||
return new raft.constructor(options); | ||
}; | ||
if (~index && node) { | ||
raft.nodes.splice(index, 1); | ||
/** | ||
* A new raft is about to join the cluster. So we need to upgrade the | ||
* configuration of every single raft. | ||
* | ||
* @param {String} address The address of the raft that is connected. | ||
* @param {Function} write A method that we use to write data. | ||
* @returns {Raft} The raft we created and that joined our cluster. | ||
* @api public | ||
*/ | ||
Raft.prototype.join = function join(address, write) { | ||
var raft = this; | ||
if (node.end) node.end(); | ||
raft.emit('leave', node); | ||
} | ||
if ('function' === raft.type(address)) { | ||
write = address; address = null; | ||
return node; | ||
} | ||
// | ||
// You shouldn't be able to join the cluster as your self. So we're going to | ||
// add a really simple address check here. Return nothing so people can actually | ||
// check if a raft has been added. | ||
// | ||
if (raft.address === address) return; | ||
/** | ||
* This Raft needs to be shut down. | ||
* | ||
* @returns {Boolean} Successful destruction. | ||
* @public | ||
*/ | ||
end() { | ||
var raft = this; | ||
var node = raft.clone({ | ||
write: write, // Optional function that receives our writes. | ||
address: address, // A custom address for the raft we added. | ||
state: Raft.CHILD // We are a raft in the cluster. | ||
}); | ||
if (Raft.STOPPED === raft.state) return false; | ||
raft.change({ state: Raft.STOPPED }); | ||
node.once('end', function end() { | ||
raft.leave(node); | ||
}, raft); | ||
if (raft.nodes.length) for (var i = 0; i < raft.nodes.length; i++) { | ||
raft.leave(raft.nodes[i]); | ||
} | ||
raft.nodes.push(node); | ||
raft.emit('join', node); | ||
raft.emit('end'); | ||
raft.timers.end(); | ||
raft.removeAllListeners(); | ||
return node; | ||
}; | ||
if (raft.log) raft.log.end(); | ||
raft.timers = raft.Log = raft.beat = raft.election = null; | ||
/** | ||
* Remove a raft from the cluster. | ||
* | ||
* @param {String} address The address of the raft that should be removed. | ||
* @returns {Raft} The raft that we removed. | ||
* @api public | ||
*/ | ||
Raft.prototype.leave = function leave(address) { | ||
var raft = this | ||
, index = -1 | ||
, node; | ||
return true; | ||
} | ||
for (var i = 0; i < raft.nodes.length; i++) { | ||
if (raft.nodes[i] === address || raft.nodes[i].address === address) { | ||
node = raft.nodes[i]; | ||
index = i; | ||
break; | ||
/** | ||
* Raft §5.3: | ||
* command - Saves command to log and replicates to followers | ||
* | ||
* @param {type} command Json command to be stored in the log | ||
* | ||
* @return {Promise<void>} Description | ||
*/ | ||
async command(command) { | ||
let raft = this; | ||
if(raft.state !== Raft.LEADER) { | ||
return fn({ | ||
message: 'NOTLEADER', | ||
leaderAddress: raft.leader | ||
}); | ||
} | ||
// about to send an append so don't send a heart beat | ||
// raft.heartbeat(raft.beat); | ||
const entry = await raft.log.saveCommand(command, raft.term); | ||
const appendPacket = await raft.appendPacket(entry); | ||
raft.message(Raft.FOLLOWER, appendPacket); | ||
} | ||
if (~index && node) { | ||
raft.nodes.splice(index, 1); | ||
if (node.end) node.end(); | ||
raft.emit('leave', node); | ||
/** | ||
* commitEntries - Commites entries in log and emits commited entries | ||
* | ||
* @param {Entry[]} entries Entries to commit | ||
* @return {Promise<void>} | ||
*/ | ||
async commitEntries (entries) { | ||
entries.forEach(async (entry) => { | ||
await this.log.commit(entry.index) | ||
this.emit('commit', entry.command); | ||
}); | ||
} | ||
} | ||
return node; | ||
}; | ||
/** | ||
* This Raft needs to be shut down. | ||
* Raft §5.1: | ||
* | ||
* @returns {Boolean} Successful destruction. | ||
* @api public | ||
* A Raft can be in only one of the various states. The stopped state is not | ||
* something that is part of the Raft protocol but something we might want to | ||
* use internally while we're starting or shutting down our node. The following | ||
* states are generated: | ||
* | ||
* - STOPPED: Assume we're dead. | ||
* - LEADER: We're selected as leader process. | ||
* - CANDIDATE: We want to be promoted to leader. | ||
* - FOLLOWER: We're just following a leader. | ||
* - CHILD: A node that has been added using JOIN. | ||
* | ||
* @type {Number} | ||
* @private | ||
*/ | ||
Raft.prototype.end = Raft.prototype.destroy = function end() { | ||
var raft = this; | ||
Raft.states = 'STOPPED,LEADER,CANDIDATE,FOLLOWER,CHILD'.split(','); | ||
for (var s = 0; s < Raft.states.length; s++) { | ||
Raft[Raft.states[s]] = s; | ||
} | ||
if (Raft.STOPPED === raft.state) return false; | ||
raft.state = Raft.STOPPED; | ||
if (raft.nodes.length) for (var i = 0; i < raft.nodes.length; i++) { | ||
raft.leave(raft.nodes[i]); | ||
} | ||
raft.emit('end'); | ||
raft.timers.end(); | ||
raft.removeAllListeners(); | ||
if (raft.log) raft.log.end(); | ||
raft.timers = raft.log = raft.Log = raft.beat = raft.election = null; | ||
return true; | ||
}; | ||
// | ||
// Expose the module interface. | ||
// | ||
module.exports = Raft; |
437
log.js
@@ -1,121 +0,348 @@ | ||
'use strict'; | ||
const encode = require('encoding-down'); | ||
const levelup = require('levelup'); | ||
var setImmediate = require('immediate'); | ||
/** | ||
* The representation of the log of a single node. | ||
* | ||
* Options: | ||
* | ||
* - `engine` The storage engine that should be used. | ||
* | ||
* @constructor | ||
* @param {Node} node Instance of a node. | ||
* @param {Object} options Optional configuration. | ||
* @api public | ||
* @typedef Entry | ||
* @property {number} index the key for the entry | ||
* @property {number} term the term that the entry was saved in | ||
* @property {boolean} Committed if the entry has been committed | ||
* @property {array} responses number of followers that have saved the log entry | ||
* @property {object} command The command to be used in the raft state machine | ||
*/ | ||
function Log(node, options) { | ||
if (!(this instanceof Log)) return new Log(node, options); | ||
class Log { | ||
/** | ||
* @class | ||
* @param {object} node The raft node using this log | ||
* @param {object} Options Options object | ||
* @param {object} Options.[adapter= require('leveldown')] Leveldown adapter, defaults to leveldown | ||
* @param {string} Options.[path='./'] Path to save the log db to | ||
* @return {Log} | ||
*/ | ||
constructor (node, {adapter = require('leveldown'), path = ''}) { | ||
this.node = node; | ||
this.committedIndex = 0; | ||
this.db = levelup(encode(adapter(path), { valueEncoding: 'json', keyEncoding: 'binary'})); | ||
} | ||
this.node = node; | ||
this.engine = options.engine || 'memory'; | ||
/** | ||
* saveCommand - Saves a command to the log | ||
* Initially the command is uncommitted. Once a majority | ||
* of follower nodes have saved the log entry it will be | ||
* committed. | ||
* | ||
* A follow node will also use this method to save a received command to | ||
* its log | ||
* | ||
* @async | ||
* @param {object} command A json object to save to the log | ||
* @param {number} term Term to save with the log entry | ||
* @param {number} [index] Index to save the entry with. This is used by the followers | ||
* @return {Promise<entry>} Description | ||
*/ | ||
async saveCommand (command, term, index) { | ||
// | ||
// Remark: So we want to use something like leveldb here with a particular engine but | ||
// for now lets just use a silly little array | ||
// The following would all be stored in a leveldb database. Entries would be | ||
// its own namespaced key set for easy stream reading and the other values | ||
// would be stored at their particular key for proper persistence and | ||
// fetching. These could be used as a cache like thing as well if we wanted | ||
// faster lookups by default. | ||
// | ||
this.commitIndex = 0; | ||
this.lastApplied = 0; | ||
this.startIndex = 0; | ||
this.startTerm = 0; | ||
this.entries = []; | ||
} | ||
if (!index) { | ||
const { | ||
index: lastIndex, | ||
} = await this.getLastInfo(); | ||
// | ||
// Add some sugar and spice and everything nice. Oh, and also inheritance. | ||
// | ||
Log.extend = require('extendible'); | ||
index = lastIndex + 1; | ||
} | ||
/** | ||
* Commit a log entry | ||
* | ||
* @param {Object} data Data we receive from ourselves or from LEADER | ||
* @param {function} fn function | ||
* @api public | ||
*/ | ||
Log.prototype.commit = function commit(data, fn) { | ||
var entry = this.entry(data); | ||
const entry = { | ||
term: term, | ||
index, | ||
committed: false, | ||
responses: [{ | ||
address: this.node.address, // start with vote from leader | ||
ack: true | ||
}], | ||
command, | ||
} | ||
if (entry) this.append(entry); | ||
return setImmediate(fn.bind(null, null, !!entry)); | ||
}; | ||
await this.put(entry); | ||
return entry; | ||
} | ||
Log.prototype.append = function append(entry) { | ||
this.entries.push(entry); | ||
}; | ||
/** | ||
* put - Save entry to database using the index as the key | ||
* | ||
* @async | ||
* @param {Entry} entry entry to save | ||
* @return {Promise<void>} Resolves once entry is saved | ||
* @public | ||
*/ | ||
put (entry) { | ||
return this.db.put(entry.index, entry); | ||
} | ||
/** | ||
* Return the last entry (this may be async in the future) | ||
* | ||
* @returns {Object} | ||
* @api public | ||
*/ | ||
Log.prototype.last = function lastentry() { | ||
var last = this.entries[this.entries.length - 1]; | ||
if (last) return last; | ||
/** | ||
* getEntriesAfter - Get all the entries after a specific index | ||
* | ||
* @param {number} index Index that entries must be greater than | ||
* @return {Promise<Entry[]>} returns all entries | ||
* @public | ||
*/ | ||
getEntriesAfter(index) { | ||
const entries = []; | ||
return new Promise((resolve, reject) => { | ||
this.db.createReadStream({gt: index}) | ||
.on('data', data => { | ||
entries.push(data.value); | ||
}) | ||
.on('error', err => { | ||
reject(err) | ||
}) | ||
.on('end', () => { | ||
resolve(entries); | ||
}) | ||
}); | ||
return { | ||
index: this.startIndex, | ||
term: this.startTerm | ||
}; | ||
}; | ||
} | ||
/** | ||
* Create a log entry that we will append with correct form and attrs | ||
* | ||
* @param {object} Data to compute to a proper entry | ||
* @api public | ||
*/ | ||
Log.prototype.entry = function entry(data) { | ||
// | ||
// type of entry, (data/command, or something related to raft itself) | ||
// | ||
var type = data.type | ||
, command = data.command | ||
// | ||
// Remark: Hmm this may have to be async if we are fetching everything from a db, | ||
// lets just keep it in memory for now because we may just preload into cache | ||
// on startup? | ||
// | ||
, index = this.last().index + 1; | ||
// | ||
// Remark: How do we want to store function executions or particular actions | ||
// to be replayed in case necessary? | ||
// | ||
return { | ||
command: command, | ||
index: index, | ||
term: this.node.term, | ||
type: type | ||
}; | ||
}; | ||
/** | ||
* removeEntriesAfter - Removes all entries after a given index | ||
* | ||
* @async | ||
* @param {Number} index Index to use to find all entries after | ||
* @return {Promise<void>} Returns once all antries are removed | ||
* @public | ||
*/ | ||
async removeEntriesAfter (index) { | ||
const entries = await this.getEntriesAfter(index) | ||
return Promise.all(entries.map(entry => { | ||
return this.db.del(entry.index); | ||
})); | ||
} | ||
/** | ||
* The raft instance we're attached to is closing. | ||
* | ||
* @returns {Boolean} First time shutdown. | ||
* @api private | ||
*/ | ||
Log.prototype.end = function end() { | ||
return true; | ||
/** | ||
* has - Checks if entry exists at index | ||
* | ||
* @async | ||
* @param {number} index Index position to check if entry exists | ||
* @return {boolean} Boolean on whether entry exists at index | ||
* @public | ||
*/ | ||
async has (index) { | ||
try { | ||
const entry = await this.db.get(index); | ||
return true | ||
} catch (err) { | ||
return false; | ||
} | ||
} | ||
/** | ||
* get - Gets an entry at the specified index position | ||
* | ||
* @param {type} index Index position of entry | ||
* @return {Promise<Entry>} Promise of found entry returns NotFoundError if does not exist | ||
* @public | ||
*/ | ||
get (index) { | ||
return this.db.get(index); | ||
} | ||
/** | ||
* getLastInfo - Returns index, term of the last entry in the long along with | ||
* the committedIndex | ||
* | ||
* @async | ||
* @return {Promise<Object>} Last entries index, term and committedIndex | ||
*/ | ||
async getLastInfo () { | ||
const { index, term } = await this.getLastEntry(); | ||
return { | ||
index, | ||
term, | ||
committedIndex: this.committedIndex | ||
}; | ||
} | ||
/** | ||
* getLastEntry - Returns last entry in the log | ||
* | ||
* @return {Promise<Entry>} returns {index: 0, term: node.term} if there are no entries in the log | ||
*/ | ||
getLastEntry () { | ||
return new Promise((resolve, reject) => { | ||
let hasResolved = false; | ||
let entry = { | ||
index: 0, | ||
term: this.node.term | ||
}; | ||
this.db.createReadStream({reverse: true, limit: 1}) | ||
.on('data', data => { | ||
hasResolved = true; | ||
entry = data.value; | ||
}) | ||
.on('error', err => { | ||
hasResolved = true; | ||
reject(err) | ||
}) | ||
.on('end', () => { | ||
resolve(entry); | ||
}) | ||
}); | ||
} | ||
/** | ||
* getEntryInfoBefore - Gets the index and term of the previous entry along with the log's committedIndex | ||
* If there is no item before it returns {index: 0} | ||
* | ||
* | ||
* @async | ||
* @param {Entry} entry | ||
* @return {Promise<object>} {index, term, committedIndex} | ||
*/ | ||
async getEntryInfoBefore (entry) { | ||
const {index, term} = await this.getEntryBefore(entry); | ||
return { | ||
index, | ||
term, | ||
committedIndex: this.committedIndex | ||
}; | ||
} | ||
/** | ||
* getEntryBefore - Get entry before the specified entry | ||
* If there is no item before it returns {index: 0} | ||
* | ||
* @async | ||
* @param {Entry} entry | ||
* | ||
* @return {Promise<Entry>} | ||
*/ | ||
getEntryBefore (entry) { | ||
const defaultInfo = { | ||
index: 0, | ||
term: this.node.term | ||
}; | ||
// We know it is the first entry, so save the query time | ||
if (entry.index === 1) { | ||
return Promise.resolve(defaultInfo); | ||
} | ||
return new Promise((resolve, reject) => { | ||
let hasResolved = false; | ||
this.db.createReadStream({ | ||
reverse: true, | ||
limit: 1, | ||
lt: entry.index | ||
}) | ||
.on('data', (data) => { | ||
hasResolved = true; | ||
resolve(data.value); | ||
}) | ||
.on('error', (err) => { | ||
hasResolved = true; | ||
reject(err); | ||
}) | ||
.on('end', () => { | ||
if (!hasResolved) { | ||
// Returns empty index if there is no items | ||
// before entry or log is empty | ||
resolve(defaultInfo); | ||
} | ||
}); | ||
}); | ||
} | ||
/** | ||
* commandAck - acknowledges a follow with address has stored entry at index | ||
* This is used to determine if a quorom has been met for a log entry and | ||
* if enough followers have stored it so that it can be committed | ||
* | ||
* @async | ||
* @param {number} index Index of entry that follow has stored | ||
* @param {string} address Address of follower that has stored log | ||
* @return {Promise<Entry>} | ||
*/ | ||
async commandAck (index, address) { | ||
let entry; | ||
try { | ||
entry = await this.get(index); | ||
} catch (err) { | ||
return { | ||
responses: [] | ||
} | ||
} | ||
const entryIndex = await entry.responses.findIndex(resp => resp.address === address); | ||
// node hasn't voted yet. Add response | ||
if (entryIndex === -1) { | ||
entry.responses.push({ | ||
address, | ||
ack: true | ||
}); | ||
} | ||
await this.put(entry); | ||
return entry; | ||
} | ||
/** | ||
* commit - Set the entry to committed | ||
* | ||
* @async | ||
* @param {number} Index index | ||
* | ||
* @return {Promise<entry>} | ||
*/ | ||
async commit (index) { | ||
const entry = await this.db.get(index); | ||
entry.committed = true; | ||
this.committedIndex = entry.index; | ||
return this.put(entry); | ||
} | ||
/** | ||
* getUncommittedEntriesUpToIndex - Returns all entries before index that have not been committed yet | ||
* | ||
* @param {number} index Index value to find all entries up to | ||
* @return {Promise<Entry[]} | ||
* @private | ||
*/ | ||
getUncommittedEntriesUpToIndex (index) { | ||
return new Promise((resolve, reject) => { | ||
let hasResolved = false; | ||
const entries = []; | ||
this.db.createReadStream({ | ||
gt: this.committedIndex, | ||
lte: index | ||
}) | ||
.on('data', data => { | ||
if (!data.value.committed) { | ||
entries.push(data.value); | ||
} | ||
}) | ||
.on('error', err => { | ||
reject(err) | ||
}) | ||
.on('end', () => { | ||
resolve(entries); | ||
}); | ||
}); | ||
} | ||
/** | ||
* end - Log end | ||
* Called when the node is shutting down | ||
* | ||
* @return {boolean} Successful close. | ||
* @private | ||
*/ | ||
end () { | ||
return this.db.close(); | ||
} | ||
}; | ||
// | ||
// Expose the log module. | ||
// | ||
module.exports = Log; |
{ | ||
"name": "liferaft", | ||
"version": "0.1.1", | ||
"version": "1.0.0", | ||
"description": "Consensus protocol based on raft, it will one day save your live.", | ||
"main": "index.js", | ||
"browser": "./lib/", | ||
"scripts": { | ||
"100%": "istanbul check-coverage --statements 100 --functions 100 --lines 100 --branches 100", | ||
"test": "mocha test.js", | ||
"watch": "mocha --watch test.js", | ||
"coverage": "istanbul cover ./node_modules/.bin/_mocha -- test.js", | ||
"test-travis": "istanbul cover node_modules/.bin/_mocha --report lcovonly -- test.js" | ||
"test": "npm run test-memdown && npm run test-leveldown", | ||
"test-leveldown": "ADAPTER=leveldown mocha test/**.js", | ||
"test-memdown": "ADAPTER=memdown mocha test/**.js", | ||
"test-travis": "nyc --reporter=html --reporter=text npm test", | ||
"prepublish": "npm run build", | ||
"build": "babel ./index.js ./log.js -d ./lib" | ||
}, | ||
@@ -30,17 +32,32 @@ "repository": { | ||
"emits": "3.0.x", | ||
"eventemitter3": "1.1.x", | ||
"encoding-down": "^3.0.1", | ||
"eventemitter3": "2.0.x", | ||
"extendible": "0.1.x", | ||
"immediate": "3.2.x", | ||
"millisecond": "0.0.x", | ||
"leveldown": "^3.0.0", | ||
"levelup": "^2.0.1", | ||
"millisecond": "0.1.x", | ||
"modification": "1.0.x", | ||
"one-time": "0.0.x", | ||
"tick-tock": "0.1.x" | ||
"tick-tock": "1.0.x" | ||
}, | ||
"devDependencies": { | ||
"assume": "1.2.x", | ||
"diagnostics": "0.0.x", | ||
"istanbul": "0.3.x", | ||
"mocha": "2.2.x", | ||
"pre-commit": "1.0.x" | ||
"@babel/cli": "^7.0.0-beta.38", | ||
"@babel/core": "^7.0.0-beta.38", | ||
"@babel/preset-env": "^7.0.0-beta.38", | ||
"@babel/register": "^7.0.0-beta.38", | ||
"assume": "1.5.x", | ||
"diagnostics": "1.1.x", | ||
"memdown": "^1.4.1", | ||
"mkdirp": "^0.5.1", | ||
"mocha": "5.0.x", | ||
"nyc": "^11.3.0", | ||
"pre-commit": "1.2.x", | ||
"rimraf": "^2.6.2" | ||
}, | ||
"babel": { | ||
"presets": [ | ||
"@babel/preset-env" | ||
] | ||
} | ||
} |
@@ -5,3 +5,3 @@ # liferaft | ||
`liferaft` is an JavaScript implementation of the [Raft] consensus algorithm. | ||
`liferaft` is an JavaScript implementation of the [Raft] consensus algorithm. | ||
@@ -35,2 +35,4 @@ ## Installation | ||
- [LifeRaft#end()](#liferaftend) | ||
- [LifeRaft#command()](#liferaftcommand) | ||
- [Log Replication](#logreplication) | ||
- [Extending](#extending) | ||
@@ -80,3 +82,3 @@ - [Transports](#transports) | ||
- `Log`: An Log compatible constructor we which use for state and data | ||
replication. | ||
replication. | ||
@@ -92,3 +94,3 @@ The timeout values can be configured with either a number which represents the | ||
```js | ||
var raft = new Raft({ | ||
var raft = new Raft({ | ||
'address': 'tcp://localhost:8089', | ||
@@ -128,2 +130,3 @@ 'election min': '200 millisecond', | ||
`heartbeat` | The leader is about to send a heartbeat message. | ||
`commit` | A command has been saved to the majority of node's logs | ||
@@ -335,2 +338,19 @@ --- | ||
### LifeRaft#command(command) | ||
Save a json command to the log. The command will be added to the log and then | ||
replicated to all the follower nodes. Once the majority of nodes have received | ||
and stored the command. A `commit` event will be triggered so that the | ||
command can be used. | ||
```js | ||
raft.command({name: 'Jimi', surname: 'Hendrix'}); | ||
raft.on('commit', function (command) { | ||
console.log(command.name, command.surname); | ||
}); | ||
``` | ||
## Extending | ||
@@ -355,2 +375,21 @@ | ||
## Log Replication | ||
LifeRaft uses [Levelup](https://github.com/Level/levelup) for storing the log | ||
that is replicated to each node. Log replication is optional and so the log | ||
constructor needs to be included in the options when creating a raft instance. | ||
You can use any leveldown compatible database to store the log. | ||
LifeRaft will default to using leveldown. A unique path is required for | ||
each node's log. | ||
```js | ||
const Log = require('liferaft/log'); | ||
const raft = new Raft({ | ||
adapter: require('leveldown'), | ||
path: './db/log1' | ||
}); | ||
``` | ||
## Transports | ||
@@ -357,0 +396,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
215093
19
4502
0
492
11
12
3
4
+ Addedencoding-down@^3.0.1
+ Addedleveldown@^3.0.0
+ Addedlevelup@^2.0.1
+ Addedabstract-leveldown@3.0.04.0.3(transitive)
+ Addedansi-regex@2.1.1(transitive)
+ Addedaproba@1.2.0(transitive)
+ Addedare-we-there-yet@1.1.7(transitive)
+ Addedbindings@1.3.1(transitive)
+ Addedbl@1.2.3(transitive)
+ Addedbuffer-alloc@1.2.0(transitive)
+ Addedbuffer-alloc-unsafe@1.1.0(transitive)
+ Addedbuffer-fill@1.0.0(transitive)
+ Addedchownr@1.1.4(transitive)
+ Addedcode-point-at@1.1.0(transitive)
+ Addedconsole-control-strings@1.1.0(transitive)
+ Addedcore-util-is@1.0.3(transitive)
+ Addeddecompress-response@3.3.0(transitive)
+ Addeddeep-extend@0.6.0(transitive)
+ Addeddeferred-leveldown@3.0.0(transitive)
+ Addeddelegates@1.0.0(transitive)
+ Addeddetect-libc@1.0.3(transitive)
+ Addedencoding-down@3.0.1(transitive)
+ Addedend-of-stream@1.4.4(transitive)
+ Addederrno@0.1.8(transitive)
+ Addedeventemitter3@2.0.3(transitive)
+ Addedexpand-template@1.1.1(transitive)
+ Addedfast-future@1.0.2(transitive)
+ Addedfs-constants@1.0.0(transitive)
+ Addedgauge@2.7.4(transitive)
+ Addedgithub-from-package@0.0.0(transitive)
+ Addedhas-unicode@2.0.1(transitive)
+ Addedinherits@2.0.4(transitive)
+ Addedini@1.3.8(transitive)
+ Addedis-fullwidth-code-point@1.0.0(transitive)
+ Addedisarray@1.0.0(transitive)
+ Addedlevel-codec@8.0.0(transitive)
+ Addedlevel-errors@1.1.2(transitive)
+ Addedlevel-iterator-stream@2.0.3(transitive)
+ Addedleveldown@3.0.2(transitive)
+ Addedlevelup@2.0.2(transitive)
+ Addedmimic-response@1.0.1(transitive)
+ Addedminimist@1.2.8(transitive)
+ Addedmkdirp@0.5.6(transitive)
+ Addednan@2.10.0(transitive)
+ Addednode-abi@2.30.1(transitive)
+ Addednoop-logger@0.1.1(transitive)
+ Addednpmlog@4.1.2(transitive)
+ Addednumber-is-nan@1.0.1(transitive)
+ Addedobject-assign@4.1.1(transitive)
+ Addedonce@1.4.0(transitive)
+ Addedos-homedir@1.0.2(transitive)
+ Addedprebuild-install@4.0.0(transitive)
+ Addedprocess-nextick-args@2.0.1(transitive)
+ Addedprr@1.0.1(transitive)
+ Addedpump@1.0.32.0.1(transitive)
+ Addedrc@1.2.8(transitive)
+ Addedreadable-stream@2.3.8(transitive)
+ Addedsafe-buffer@5.1.2(transitive)
+ Addedsemver@5.7.2(transitive)
+ Addedset-blocking@2.0.0(transitive)
+ Addedsignal-exit@3.0.7(transitive)
+ Addedsimple-concat@1.0.1(transitive)
+ Addedsimple-get@2.8.2(transitive)
+ Addedstring-width@1.0.2(transitive)
+ Addedstring_decoder@1.1.1(transitive)
+ Addedstrip-ansi@3.0.1(transitive)
+ Addedstrip-json-comments@2.0.1(transitive)
+ Addedtar-fs@1.16.3(transitive)
+ Addedtar-stream@1.6.2(transitive)
+ Addedtick-tock@1.0.0(transitive)
+ Addedto-buffer@1.1.1(transitive)
+ Addedtunnel-agent@0.6.0(transitive)
+ Addedutil-deprecate@1.0.2(transitive)
+ Addedwhich-pm-runs@1.1.0(transitive)
+ Addedwide-align@1.1.5(transitive)
+ Addedwrappy@1.0.2(transitive)
+ Addedxtend@4.0.2(transitive)
- Removedeventemitter3@1.1.1(transitive)
- Removedmillisecond@0.0.1(transitive)
- Removedtick-tock@0.1.6(transitive)
Updatedeventemitter3@2.0.x
Updatedmillisecond@0.1.x
Updatedtick-tock@1.0.x