Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

liferaft

Package Overview
Dependencies
Maintainers
1
Versions
6
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

liferaft - npm Package Compare versions

Comparing version 0.0.0 to 0.0.1

.travis.yml

20

append.js
'use strict';
/**
*
* @constructor
* @param {LifeRaft} liferaft Liferaft instance.
* @param {Object} options Optional configuration
*/
function Append(liferaft, options) {
if (!(this instanceof Append)) return new Append(liferaft, options);
options = options || {};
this.raft = liferaft;
this.liferaft = liferaft;
}
Append.prototype.entry = function entry() {
return {
term: this.raft.term;
};
};
//
// Expose the module.
//
module.exports = Append;

@@ -7,2 +7,13 @@ 'use strict';

/**
* Proper type checking.
*
* @param {Mixed} of Thing we want to know the type of.
* @returns {String} The type.
* @api private
*/
function type(of) {
return Object.prototype.toString.call(of).slice(8, -1).toLowerCase();
}
/**
* Generate a somewhat unique UUID.

@@ -26,2 +37,10 @@ *

/**
* A nope function for when people don't want message acknowledgements. Because
* they don't care about CAP.
*
* @api private
*/
function nope() {}
/**
* Representation of a single node in the cluster.

@@ -31,3 +50,3 @@ *

*
* - `id`: An unique id of this given node.
* - `name`: An unique id of this given node.
* - `heartbeat min`: Minimum heartbeat timeout.

@@ -39,11 +58,10 @@ * - `heartbeat max`: Maximum heartbeat timeout.

* timeout.
* - `Log`: A Log constructor that should be used to store commit logs.
*
* @constructor
* @param {Function} read Method will be called to receive a callback.
* @param {Function} write Called when the node needs to communicate.
* @param {Object} options Node configuration.
* @api public
*/
function Node(read, write, options) {
if (!(this instanceof Node)) return new Node(read, write, options);
function Node(options) {
if (!(this instanceof Node)) return new Node(options);

@@ -58,36 +76,46 @@ options = options || {};

this.beat = {
min: Tick.parse(options['heartbeat min'] || '150 ms'),
max: Tick.parse(options['heartbeat max'] || '300 ms')
min: Tick.parse(options['heartbeat min'] || this.election.min),
max: Tick.parse(options['heartbeat max'] || this.election.max)
};
this.votes = {
for: null,
granted: 0
for: null, // Who did we vote for in this current term.
granted: 0 // How many votes we're granted to us.
};
this.write = this.write || options.write || null;
this.threshold = options.threshold || 0.8;
this.name = options.name || UUID();
this.timers = new Tick();
this._write = write;
this.timers = new Tick(this);
this.Log = options.Log;
this.log = null;
this.nodes = [];
//
// 5.2: When a server starts, it's always started as Follower and it will
// remain in this state until receive a message from a Leader or Candidate.
// Raft §5.2:
//
// When a server starts, it's always started as Follower and it will remain in
// this state until receive a message from a Leader or Candidate.
//
this.state = Node.FOLLOWER; // Our current state.
this.leader = null; // Leader in our cluster.
this.leader = ''; // Leader in our cluster.
this.term = 0; // Our current term.
this.initialize();
read(this.emits('RPC'));
this.initialize(options);
}
//
// Add some sugar and spice and everything nice. Oh, and also inheritance.
//
Node.extend = require('extendible');
Node.prototype = new EventEmitter();
Node.prototype.emits = require('emits');
Node.prototype.constructor = Node;
/**
* The different states that a node can have.
* Raft §5.1:
*
* A Node can be in only one of the various states. The stopped state is not
* something that is part of the Raft protocol but something we might want to
* use internally while we're starting or shutting down our node.
*
* @type {Number}

@@ -102,11 +130,14 @@ * @private

/**
* Initialize the node.
* Initialize the node and start listening to the various of events we're
* emitting as we're quite chatty to provide the maximum amount of flexibility
* and reconfigurability.
*
* @param {Object} options The configuration you passed in the constructor.
* @api private
*/
Node.prototype.initialize = function initialize() {
Node.prototype.initialize = function initialize(options) {
//
// Reset our vote as we're starting a new term. Votes only last one term.
//
this.on('term change', function change() {
//
// Reset our vote as we're starting a new term. Votes only last one term.
//
this.votes.for = null;

@@ -116,32 +147,65 @@ this.votes.granted = 0;

this.on('RPC', function incoming(data) {
if ('object' !== typeof data) return; /* Invalid data structure, G.T.F.O. */
//
// Reset our times and start the heartbeat again. If we're promoted to leader
// the heartbeat will automatically be broadcasted to users as well.
//
this.on('state change', function change(currently, previously) {
this.timers.clear();
this.heartbeat();
});
//
// Receive incoming messages and process them.
//
this.on('data', function incoming(packet, write) {
write = write || nope;
if ('object' !== type(packet)) {
return write(new Error('Invalid packet received'));
}
//
// We're waiting for votes to come in as we're promoted to a candidate but
// a new node with a leadership role just send us a message. If his term is
// greater then ours we will step down as candidate and acknowledge their
// leadership.
// Raft §5.1:
//
if (
Node.CANDIDATE === this.state
&& Node.LEADER === this.state
) {
if (data.term >= this.term) {
this.change({
state: Node.FOLLOWER,
term: data.term
});
}
else return; /* We need to ignore the RPC as it's in an incorrect state */
// Applies to all states. If a response contains a higher term then our
// current term need to change our state to FOLLOWER and set the received
// term.
//
// If the node receives a request with a stale term number it should be
// rejected.
//
if (packet.term > this.term) {
this.change({
leader: packet.leader,
state: Node.FOLLOWER,
term: packet.term
});
} else if (packet.term < this.term) {
return write(new Error('Stale term detected, we are at '+ this.term));
}
switch (data.type) {
case 'heartbeat':
if (Node.LEADER === data.state) {
this.heartbeat(data.data);
}
break;
//
// Raft §5.2:
//
// If we receive a message from someone who claims to be leader and shares
// our same term while we're in candidate mode we will recognize their
// leadership and return as follower
//
if (Node.LEADER === packet.state && Node.FOLLOWER !== this.state) {
this.change({ state: Node.FOLLOWER, leader: packet.leader });
}
//
// Always when we receive an message from the Leader we need to reset our
// heartbeat.
//
if (Node.LEADER === packet.state) {
this.heartbeat();
}
switch (packet.type) {
//
// Raft §5.2:
// Raft §5.4:
//
// A node asked us to vote on them. We can only vote to them if they

@@ -151,2 +215,34 @@ // represent a higher term (and last log term, last log index).

case 'vote':
//
// The term of the vote is bigger then ours so we need to update it. If
// it's the same and we already voted, we need to deny the vote.
//
if (this.votes.for && this.votes.for !== packet.name) {
this.emit('vote', packet, false);
return write(undefined, this.packet('vote', { granted: false }));
}
//
// If we maintain a log, check if the candidates log is as up to date as
// ours.
//
// @TODO point to index of last commit entry.
// @TODO point to term of last commit entry.
//
if (this.log && packet.last && (
this.log.index > packet.last.index
|| this.term > packet.last.term
)) {
this.emit('vote', packet, false);
return write(undefined, this.packet('vote', { granted: false }));
}
//
// We've made our decision, we haven't voted for this term yet and this
// candidate came in first so it gets our vote as all requirements are
// met.
//
this.votes.for = packet.name;
this.emit('vote', packet, true);
write(undefined, this.packet('vote', { granted: true }));
break;

@@ -159,20 +255,98 @@

//
// Only accepts votes while we're still in a CANDIDATE state.
//
if (Node.CANDIDATE !== this.state) {
return write(new Error('No longer a candidate'));
}
//
// Increment our received votes when our voting request has been
// granted by the node that received the data.
//
if (packet.data.granted) this.votes.granted++;
//
// Check if we've received the minimal amount of votes required for this
// current voting round to be considered valid
//
if (this.votes.granted === (this.nodes.length / 2) + 1) {
this.change({
leader: this.name,
state: Node.LEADER
});
if (this.quorum(this.votes.granted)) {
this.change({ leader: this.name, state: Node.LEADER });
}
//
// Empty write, nothing to do.
//
write();
break;
case 'rpc':
//
// Remark: Are we assuming we are getting an appendEntries from the
// leader and comparing and appending our log?
//
case 'append':
break;
//
// Remark: So does this get emit when we need to write our OWN log?
//
case 'log':
break;
//
// Unknown event, we have no idea how to process this so we're going to
// return an error.
//
default:
write(new Error('Unknown message type: '+ packet.type));
}
});
//
// We do not need to execute the functionality below if we have a write method
// assigned to our selfs. This prevents us from timing out and other nasty
// stuff.
//
if (this.write) return;
//
// Setup the log & appends. Assume that if we're given a function log that it
// needs to be initialized as it requires access to our node instance so it
// can read our information like our leader, state, term etc.
//
if ('function' === type(this.Log)) {
this.log = new this.Log(this, options);
}
//
// The node is now listening to events so we can start our heartbeat timeout.
// So that if we don't hear anything from a leader we can promote our selfs to
// a candidate state.
//
this.heartbeat();
};
/**
* Check if we've reached our quorum (a.k.a. minimum amount of votes requires
* for a voting round to be considered valid) for the given amount of votes.
*
* @param {Number} responses Amount of responses received.
* @returns {Boolean}
* @api private
*/
Node.prototype.quorum = function quorum(responses) {
if (!this.nodes.length || !responses) return false;
return responses >= this.majority();
};
/**
* The majority required to reach our the quorum.
*
* @returns {Number}
* @api private
*/
Node.prototype.majority = function majority() {
return Math.ceil(this.nodes.length / 2) + 1;
};
/**
* Process a change in the node.

@@ -186,8 +360,14 @@ *

var changes = ['term', 'leader', 'state']
, currently, previously
, i = 0;
if (!changed) return this;
for (; i < changes.length; i++) {
if (changes[i] in changed && changed[changes[i]] !== this[changes[i]]) {
this[changes[i]] = changed[changes[i]];
this.emit(changes[i] +' change');
currently = changed[changes[i]];
previously = this[changes[i]];
this[changes[i]] = currently;
this.emit(changes[i] +' change', currently, previously);
}

@@ -206,6 +386,6 @@ }

* @returns {Node}
* @api public
* @api private
*/
Node.prototype.heartbeat = function heartbeat(duration) {
duration = duration || this.timeout('heartbeat');
duration = duration || this.timeout('beat');

@@ -218,8 +398,10 @@ if (this.timers.active('heartbeat')) {

this.timers.setTimeout('heartbeat', function () {
if (Node.LEADER !== this.state) return this.promote();
if (Node.LEADER !== this.state) {
this.emit('heartbeat timeout');
return this.promote();
}
//
// We're the LEADER so we should be broadcasting.
// @TODO We're the LEADER so we should be broadcasting.
//
this.broadcast('heartbeat');
}, duration);

@@ -235,23 +417,21 @@

* @returns {Number}
* @api public
* @api private
*/
Node.prototype.timeout = function timeout(which) {
var times = this[which] || this.beat;
var times = this[which];
return Math.min(
Math.round((Math.random() * 1) * times.min),
times.max
);
return Math.floor(Math.random() * (times.max - times.min + 1) + times.min);
};
/**
* Node detected a failure in the cluster and wishes to be promoted to new
* master and promotes it self to candidate.
* Raft §5.2:
*
* We've detected a timeout from the leaders heartbeats and need to start a new
* election for leadership. We increment our current term, set the CANDIDATE
* state, vote our selfs and ask all others nodes to vote for us.
*
* @returns {Node}
* @api public
* @api private
*/
Node.prototype.promote = function promote() {
if (Node.CANDIDATE === this.state) return this;
this.change({

@@ -269,4 +449,23 @@ state: Node.CANDIDATE, // We're now a candidate,

this.votes.granted = 1;
this.broadcast('vote');
//
// Broadcast the voting request to all connected nodes in your private
// cluster.
//
var packet = this.packet('vote')
, i = 0;
for (; i < this.nodes.length; i++) {
this.nodes[i].write(packet);
}
//
// Set the election timeout. This gives the nodes some time to reach
// consensuses about who they want to vote for. If no consensus has been
// reached within the set timeout we will attempt it again.
//
this.timers
.clear() // Clear all old timers, this one is the most important now.
.setTimeout('election', this.promote, this.timeout('election'));
return this;

@@ -276,41 +475,113 @@ };

/**
* Write out a message.
* Wrap the outgoing messages in an object with additional required data.
*
* @param {String} type Message type we're trying to send.
* @param {Mixed} data Data to be transfered.
* @returns {Boolean} Successful write.
* @returns {Object} Packet.
* @api private
*/
Node.prototype.packet = function wrap(type, data) {
var packet = {
state: this.state, // So you know if we're a leader, candidate or follower.
term: this.term, // Our current term so we can find mis matches.
name: this.name, // Name of the sender.
data: data, // Custom data we send.
type: type, // Message type.
leader: this.leader, // Who is our leader.
};
//
// If we have logging and state replication enabled we also need to send this
// additional data so we can use it determine the state of this node.
//
// @TODO point to index of last commit entry.
// @TODO point to term of last commit entry.
//
if (this.log) packet.last = { term: this.term, index: this.log.index };
return packet;
};
/**
* Create a clone of the current instance with the same configuration. Ideally
* for creating connected nodes in a cluster.. And let that be something we're
* planning on doing.
*
* @param {Object} options Configuration that should override the default config.
* @returns {Node} The newly created instance.
* @api public
*/
Node.prototype.write = function write(type, data) {
return this._write(this.packet(type, data));
Node.prototype.clone = function clone(options) {
options = options || {};
var node = {
'election min': this.election.min,
'election max': this.election.max,
'heartbeat min': this.beat.min,
'heartbeat max': this.beat.max,
'threshold': this.threshold,
'Log': this.Log
}, key;
for (key in node) {
if (key in options || !node.hasOwnProperty(key)) continue;
options[key] = node[key];
}
return new this.constructor(options);
};
/**
* Broadcast a message.
* A new node is about to join the cluster. So we need to upgrade the
* configuration of every single node.
*
* @param {String} type Message type we're trying to send.
* @param {Mixed} data Data to be transfered.
* @returns {Boolean} Successful write.
* @param {String} name The name of the node that is connected.
* @param {Function} write A method that we use to write data.
* @returns {Node} The node we created and that joined our cluster.
* @api public
*/
Node.prototype.broadcast = function broadcast(type, data) {
var packet = this.packet(type, data);
Node.prototype.join = function join(name, write) {
if ('function' === type(name)) {
write = name; name = null;
}
var node = this.clone({
write: write, // Function that receives our writes.
name: name // A custom name for the node we added.
});
node.once('end', function end() {
this.leave(node);
}, this);
this.nodes.push(node);
this.emit('join', node);
return node;
};
/**
* Wrap the outgoing messages in an object with additional required data.
* Remove a node from the cluster.
*
* @param {String} type Message type we're trying to send.
* @param {Mixed} data Data to be transfered.
* @returns {Object} Packet.
* @api private
* @returns {Node} The node that we removed.
* @api public
*/
Node.prototype.packet = function packet(type, data) {
return {
state: this.state, // So you know if we're a leader, candidate or follower
term: this.term, // Our current term so we can find mis matches
name: this.name, // Name of the sender.
data: data, // Custom data we send.
type: type // Message type.
};
Node.prototype.leave = function leave(name) {
var index = -1
, node;
for (var i = 0; i < this.nodes.length; i++) {
if (this.nodes[i] === name || this.nodes[i].name === name) {
node = this.nodes[i];
index = i;
break;
}
}
if (~index && node) {
if (node.end) node.end();
this.nodes.splice(index, 1);
this.emit('leave', node);
}
return node;
};

@@ -325,8 +596,15 @@

Node.prototype.end = function end() {
if (!this.state) return false;
if (Node.STOPPED === this.state) return false;
this.state = Node.STOPPED;
if (this.nodes.length) for (var i = 0; i < this.nodes.length; i++) {
this.leave(this.nodes[i]);
}
this.emit('end');
this.timers.end();
this.removeAllListeners();
this.timers = this.state = this.write = this.read = null;
if (this.log) this.log.end();
this.timers = this.log = this.Log = this.beat = this.election = null;

@@ -333,0 +611,0 @@ return true;

{
"name": "liferaft",
"version": "0.0.0",
"version": "0.0.1",
"description": "Consensus protocol based on raft, but only for saving lifes.",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
"test": "mocha --reporter spec --ui bdd test.js",
"watch": "mocha --watch --reporter spec --ui bdd test.js",
"coverage": "istanbul cover ./node_modules/.bin/_mocha -- --reporter spec --ui bdd test.js",
"test-travis": "istanbul cover node_modules/.bin/_mocha --report lcovonly -- --reporter spec --ui bdd test.js"
},

@@ -25,4 +28,6 @@ "repository": {

"dependencies": {
"emits": "1.0.x",
"eventemitter3": "0.1.x"
"eventemitter3": "0.1.x",
"extendible": "0.1.x",
"immediate": "3.0.x",
"tick-tock": "0.0.x"
},

@@ -29,0 +34,0 @@ "devDependencies": {

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc