Socket
Socket
Sign inDemoInstall

r5n

Package Overview
Dependencies
10
Maintainers
2
Versions
10
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.2.6 to 0.2.8

0

.jscs.json

@@ -0,0 +0,0 @@ {

22

gulpfile.js

@@ -8,3 +8,3 @@ 'use strict';

var semver = require('semver');
var execSync = require('exec-sync');
var execSync = require('execSync');

@@ -29,18 +29,18 @@ var packageFilename = './package.json';

}
execSync('git checkout develop', true);
execSync.run('git checkout develop', true);
var pkg = require(packageFilename);
pkg.version = semver.inc(pkg.version, type);
fs.writeFileSync(packageFilename, JSON.stringify(pkg, null, 2));
execSync('git commit -a -m "Bumped version"', true);
execSync('git push origin develop', true);
execSync('git checkout master', true);
execSync('git merge --no-ff develop -m "Merged develop into master"', true);
execSync('git push origin master', true);
execSync('git tag v' + pkg.version, true);
execSync('git push origin --tags', true);
var publishRes = execSync('npm publish', true);
execSync.run('git commit -a -m "Bumped version"', true);
execSync.run('git push origin develop', true);
execSync.run('git checkout master', true);
execSync.run('git merge --no-ff develop -m "Merged develop into master"', true);
execSync.run('git push origin master', true);
execSync.run('git tag v' + pkg.version, true);
execSync.run('git push origin --tags', true);
var publishRes = execSync.run('npm publish', true);
if (publishRes.stderr) {
console.error(publishRes.stderr);
}
execSync('git checkout develop', true);
execSync.run('git checkout develop', true);
}

@@ -47,0 +47,0 @@ gulp.task('bump-patch', function() {

@@ -0,0 +0,0 @@ /**

var OS = require('os');
var deferred = require('deferred');
var crypto = require('crypto');
var AsyncTask = require('./async_task');
var AsyncTask = require('./async_task').AsyncTask;
var Base64Encode = require('./encrypt').Base64Encode;
var Globals = require('./globals');
var BaseKeyValidator = require('./key_validator');
var MemCache = require('./mem_cache');
var PingRequest = require('./ping_request');
var RoutingRequest = require('./routing_request');
var RoutingTable = require('./routing_table');
var RPC = require('./rpc');
var UDPSocket = require('./sockets');
var Globals = require('./globals').Globals;
var BaseKeyValidator = require('./key_validator').BaseKeyValidator;
var MemCache = require('./mem_cache').MemCache;
var PingRequest = require('./ping_request').PingRequest;
var RoutingRequest = require('./routing_request').RoutingRequest;
var RoutingTable = require('./routing_table').RoutingTable;
var RPC = require('./rpc').RPC;
var UDPSocket = require('./sockets').UDPSocket;
var DistanceUtil = require('./utils').DistanceUtil;
var RouteEntry = require(__dirname+'/route_entry');
var logger = require('avlogger').make('R5N/dht');
var RouteEntry = require(__dirname+'/route_entry').RouteEntry;
/**
* The DHT instance. This class is responsible for booting the Node and general
* Node operations.
*
* The DHT instance. This class is responsible for booting the Node and general Node operations
* @param options required: IP, port. optional: key_validator_class
**/
var DHT = function(options) {
if (!options.IP || !options.port) {
if(!options.IP || !options.port) {
throw 'Address and port must be specified for the DHT.';

@@ -30,3 +27,4 @@ }

this.key_validator_class = BaseKeyValidator;
} else {
}
else {
this.key_validator_class = options.key_validator_class;

@@ -45,3 +43,3 @@ }

// List of active RoutingRequest's
// Needs to be a list and not a map because a given node can be
//Needs to be a list and not a map because a given node can be
// visited twice in a FIND_VALUE request so we're not guaranteed a

@@ -61,8 +59,8 @@ // 1 to 1 matchup

DHT.prototype.start = function() {
var d = deferred();
var self = this;
var d = new deferred();
var _self = this;
var setId = function() {
crypto.randomBytes(Globals.KeyProps.DEFAULT_KEY_LENGTH, function(ex, buf) {
self.id = buf.toString('hex');
DHT.prototype._init.call(self, d);
_self.id = buf.toString('hex');
DHT.prototype._init.call(_self, d);
});

@@ -83,7 +81,7 @@ };

DHT.prototype._stopJobs = function() {
if (this.asyncTasks.length === 0) {
if(this.asyncTasks.length == 0) {
return;
}
for (var i = 0; i < this.asyncTasks.length; i++) {
for(var i = 0; i < this.asyncTasks.length; i++) {
var task = this.asyncTasks[i];

@@ -96,13 +94,10 @@ task.stop();

DHT.prototype._init = function(def) {
var self = this;
var _self = this;
this.socket.connect()
.then(function() {
logger.debug('UDP socket for ID %s started on port %d',
self.id, self.socket.port
);
self.socket.onMessage(DHT.prototype.onSocketMessage.bind(self));
.then(function(res) {
console.log('UDP socket for ID %s started on port %d', _self.id, _self.socket.port);
_self.socket.onMessage(DHT.prototype.onSocketMessage.bind(_self));
def.resolve({});
})
.done();
});

@@ -117,7 +112,5 @@ // Add ping request clear job

// Add replication job
var replicationTask = new AsyncTask(
'replication_all',
var replicationTask = new AsyncTask('replication_all',
DHT.prototype._replicateAll.bind(this),
Globals.AsyncTasks.REPLICATION_INTERVAL
);
Globals.AsyncTasks.REPLICATION_INTERVAL);
this.asyncTasks.push(replicationTask);

@@ -127,7 +120,5 @@ replicationTask.start();

// Add data cleanup job
var dataCleanupTask = new AsyncTask(
'data_cleanup',
var dataCleanupTask = new AsyncTask('data_cleanup',
DHT.prototype._clearOldData.bind(this),
Globals.AsyncTasks.DATA_CLEANUP_INTERVAL
);
Globals.AsyncTasks.DATA_CLEANUP_INTERVAL);
this.asyncTasks.push(dataCleanupTask);

@@ -146,8 +137,8 @@ dataCleanupTask.start();

/**
* Clears any stale entries from the PingRequest queue and routing table
**/
* Clears any stale entries from the PingRequest queue and routing table
**/
DHT.prototype._checkPingRequestQueue = function() {
var self = this;
var _self = this;
if (this.pingRequests.length === 0) {
if(this.pingRequests.length == 0) {
return;

@@ -161,3 +152,3 @@ }

this._removePingRequest(currReq.address, currReq.port);
if (!currReq.resolved) {
if(!currReq.resolved) {
unResolved.push(currReq);

@@ -168,6 +159,6 @@ }

if (unResolved.length > 0) {
logger.info('Clearing routing table %d: %j', this.port, unResolved);
if(unResolved.length > 0) {
console.log('Clearing routing table for %d: %j', this.port, unResolved);
unResolved.forEach(function(pingReq) {
self.routingTable.removeRoute(pingReq.address, pingReq.port);
_self.routingTable.removeRoute(pingReq.address, pingReq.port);
});

@@ -185,3 +176,3 @@ }

DHT.prototype._pingRequestExists = function(addr, port) {
if (this.pingRequests.length === 0) {
if(this.pingRequests.length == 0) {
return false;

@@ -192,3 +183,3 @@ }

this.pingRequests.forEach(function(pr) {
if (pr.address === addr && pr.port === port) {
if(pr.address === addr && pr.port === port) {
pingInQueue = true;

@@ -210,3 +201,3 @@ return false;

DHT.prototype._removePingRequest = function(addr, port) {
if (this.pingRequests.length === 0) {
if(this.pingRequests.length == 0) {
return;

@@ -228,5 +219,5 @@ }

DHT.prototype._clearRouteRequest = function(req) {
for (var i = this.routingRequests.length - 1; i >= 0; i--) {
for(var i = this.routingRequests.length - 1; i >= 0; i--) {
var currReq = this.routingRequests[i];
if (currReq.rpcId === req.rpcId) {
if(currReq.rpcId === req.rpcId) {
return this.routingRequests.splice(i, 1);

@@ -244,9 +235,10 @@ }

DHT.prototype._checkForStaleNodes = function() {
var self = this;
var _self = this;
var allRoutes = this.routingTable.getAllRoutes();
allRoutes.forEach(function(route) {
self.sendPing(route.address, route.port);
_self.sendPing(route.address, route.port);
});
};
DHT.prototype._getNetworkInterfaces = function(first_argument) {

@@ -256,14 +248,9 @@ var ifaceList = [];

for (var dev in ifaces) {
if (ifaces.hasOwnProperty(dev)) {
var alias = 0;
ifaces[dev].forEach(function(details){
if (details.family === 'IPv4') {
ifaceList.push({
name: dev + (alias ? ':' + alias : ''),
address: details.address
});
++alias;
}
});
}
var alias=0;
ifaces[dev].forEach(function(details){
if (details.family=='IPv4') {
ifaceList.push( { name: dev + ( alias ? ':' + alias : '' ), address: details.address } );
++alias;
}
});
}

@@ -289,7 +276,9 @@ };

/**
* Resolves a PING request
**/
* Resolves a PING request
**/
DHT.prototype._resolvePing = function(addr, port) {
var _self = this;
this.pingRequests.forEach(function(req) {
if (req.address === addr && req.port === port) {
if (req.address == addr && req.port == port) {
req.resolved = true;

@@ -303,4 +292,4 @@ return false;

/**
* Bootstraps this DHT off of another. Works by sending FIND_NODE to the given DHT
**/
* Bootstraps this DHT off of another. Works by sending FIND_NODE to the given DHT
**/
DHT.prototype.bootstrap = function(address, port) {

@@ -313,10 +302,9 @@ return this.sendFindNode(address, port);

};
/**
* Finds a routing request with the given node ID
**/
* Finds a routing request with the given node ID
**/
DHT.prototype._getRouteRequest = function(rpcId) {
for (var i = this.routingRequests.length - 1; i >= 0; i--) {
for(var i = this.routingRequests.length - 1; i >= 0; i--) {
var currReq = this.routingRequests[i];
if (currReq.rpcId === rpcId) {
if(currReq.rpcId === rpcId) {
return currReq;

@@ -329,9 +317,9 @@ }

/**
* Adds a new deferred object to the RPC map
**/
* Adds a new deferred object to the RPC map
**/
DHT.prototype._createDeferredForRequest = function(rpcId) {
if (this.rpcToDefferedMap[rpcId]) {
if(this.rpcToDefferedMap[rpcId]) {
return this.rpcToDefferedMap[rpcId];
} else {
var def = deferred();
var def = new deferred();
this.rpcToDefferedMap[rpcId] = def;

@@ -343,4 +331,4 @@ return def;

/**
* Retrieves the deferred object for the given rpc ID
**/
* Retrieves the deferred object for the given rpc ID
**/
DHT.prototype._getDeferredForRequest = function(rpcId) {

@@ -357,12 +345,7 @@ return this.rpcToDefferedMap[rpcId];

DHT.prototype._getRandomUniqueRoute = function(rpc) {
var randomRoutes = this.routingTable.getRandomRoutes(
Globals.RecursiveRouting.MAX_RANDOM_NODES
);
var randomRoutes = this.routingTable.getRandomRoutes(Globals.RecursiveRouting.MAX_RANDOM_NODES);
var closest = null;
// We don't want to send back to the initiating node
for (var i = 0; i < randomRoutes.length; i++) {
//if (randomRoutes[i].port !== rpc.fromPort &&
// randomRoutes[i].address !== address)
//{
if (randomRoutes[i].port !== rpc.fromPort) {
for(var i = 0; i < randomRoutes.length; i++) {
if(randomRoutes[i].port != rpc.fromPort /*&& randomRoutes[i].address != address*/) {
closest = randomRoutes[i];

@@ -379,3 +362,3 @@ break;

// Here we send out a number of requests equal to the replication level
for (var i = 0; i < routes.length && i < Globals.Network.getReplicationLevel(); i++) {
for(var i = 0; i < routes.length && i < Globals.Network.getReplicationLevel(); i++) {
var random = routes[i];

@@ -394,3 +377,3 @@ var rpc = new RPC();

// Add to deferred list for replication only
this._createDeferredForRequest(rpc.id);
var def = this._createDeferredForRequest(rpc.id);

@@ -401,2 +384,25 @@ this.sendStore(rpc, random.address, random.port, 1);

DHT.prototype._replicateBulk = function(peerDataMap) {
for(nodeId in peerDataMap) {
var route = this.routingTable.findByNodeId(nodeId);
var rpc = new RPC();
rpc.type = 'STORE';
rpc.address = route.address;
rpc.port = route.port;
// Set replication to true so that peers do not try to
// re-play replication at point of storage
rpc.set('replication', true);
rpc.set('data', peerDataMap[nodeId]);
rpc.set('initiator_id', this.id);
// Add to deferred list for replication only
var def = this._createDeferredForRequest(rpc.id);
this.sendStore(rpc, route.address, route.port, 1);
}
};
/**

@@ -409,9 +415,27 @@ * Replicates all values to closest peers

var allData = this.cache.all();
if (allData.length === 0) {
if(allData.length == 0) {
return;
}
logger.silly('Has data: %j', this.port, allData);
//console.log('Port %d has data: %j', this.port, allData);
var peerDataMap = {};
var addToMap = function(nodeId, data) {
if(peerDataMap[nodeId]) {
peerDataMap[nodeId].push(data);
} else {
peerDataMap[nodeId] = [data];
}
}
allData.forEach(function(keyValue) {
self._replicate(keyValue.key, keyValue.value);
//_self._replicate(keyValue.key, keyValue.value);
var routes = self.routingTable.getClosestPeers(keyValue.key);
for(var i = 0; i < routes.length && i < Globals.Network.getReplicationLevel(); i++) {
addToMap(routes[i].nodeId, keyValue);
}
});
self._replicateBulk(peerDataMap);
};

@@ -425,10 +449,10 @@

var allRows = this.cache.allAsValues();
if (allRows.length === 0) {
if(allRows.length == 0) {
return;
}
for (var i = 0; i < allRows.length; i++) {
for(var i = 0; i < allRows.length; i++) {
var currVal = allRows[i].value;
if (currVal.expired(Globals.ValueProps.TTL)) {
logger.debug('Removing %j', allRows[i]);
if(currVal.expired(Globals.ValueProps.TTL)) {
console.log('Removing %j at %d', allRows[i], this.port);
this.cache.removeValue(allRows[i].key);

@@ -439,5 +463,6 @@ }

/******** Message Senders **********/
DHT.prototype.send = function(rpc) {
if (!rpc.address || !rpc.port) {
if(!rpc.address || !rpc.port) {
throw 'Invalid RPC request. Address and Port must be specified';

@@ -451,4 +476,4 @@ }

/**
* Send ANNOUNCE message with closest peers
**/
* Send ANNOUNCE message with closest peers
**/
DHT.prototype.sendAnnounce = function(rpc) {

@@ -459,3 +484,3 @@ var closest = this.routingTable.getClosestPeers(rpc);

// than the global constant K then add self
if (closest.length === 0 || closest.length < Globals.Const.K) {
if(closest.length === 0 || closest.length < Globals.Const.K) {
closest.push(new RouteEntry(this.address, this.port, this.id));

@@ -472,5 +497,5 @@ }

/**
* In a PING call, this DHT is pinging another and expecting a return message of PONG
* This can be used to check for stale nodes
**/
* In a PING call, this DHT is pinging another and expecting a return message of PONG
* This can be used to check for stale nodes
**/
DHT.prototype.sendPing = function(addr, port) {

@@ -483,3 +508,3 @@ var rpc = new RPC();

if (!this._pingRequestExists(addr, port)) {
if(!this._pingRequestExists(addr, port)) {
this.pingRequests.push(new PingRequest(addr, port));

@@ -498,4 +523,4 @@ }

/**
* Sends a FIND_NODE message to a peer.
**/
* Sends a FIND_NODE message to a peer.
**/
DHT.prototype.sendFindNode = function(addr, port) {

@@ -513,10 +538,9 @@ var rpc = new RPC();

/**
* Initiates a STORE request
**/
* Initiates a STORE request
**/
DHT.prototype.store = function(key, value) {
var def;
var keyValidator = new this.key_validator_class(key);
if (!keyValidator.isValid()) {
if(!keyValidator.isValid()) {
// No need to forward request
def = deferred();
var def = new deferred();
def.reject(new Error("Invalid key: " + key));

@@ -526,6 +550,6 @@ return def.promise();

if (this.cache.containsKey(key)) {
if(this.cache.containsKey(key)) {
// No need to forward request
def = deferred();
def.resolve({key: key, value: this.cache.get(key)});
var def = new deferred();
def.resolve({ key: key, value: this.cache.get(key) });
return def.promise();

@@ -535,6 +559,4 @@ }

// Get a random route and start the hop counter at 1
var randomRoute = this.routingTable.getRandomRoutes(
Globals.RecursiveRouting.MAX_RANDOM_NODES
)[0];
var rpc = new RPC();
var randomRoute = this.routingTable.getRandomRoutes(Globals.RecursiveRouting.MAX_RANDOM_NODES)[0];
rpc = new RPC();
rpc.type = 'STORE';

@@ -550,3 +572,3 @@ rpc.address = randomRoute.address;

// Add to deferred map
def = this._createDeferredForRequest(rpc.id);
var def = this._createDeferredForRequest(rpc.id);
this.sendStore(rpc, randomRoute.address, randomRoute.port, 1);

@@ -557,4 +579,4 @@ return def.promise();

/**
* Forwards a STORE request
**/
* Forwards a STORE request
**/
DHT.prototype.sendStore = function(rpc, addr, port, hopCount) {

@@ -568,12 +590,12 @@ rpc.address = addr;

/**
* Forwards the request back to the prev node
**/
* Forwards the request back to the prev node
**/
DHT.prototype.sendForward = function(rpc) {
var routeReq = this._getRouteRequest(rpc.id);
if (!routeReq) {
logger.warn('No routing information for RPC ID %s', rpc.id);
if(!routeReq) {
console.log('PORT %d WARNING: No routing information for RPC ID %s', this.port, rpc.id);
return;
}
if (rpc.get('original_type') == null) {
if(rpc.get('original_type') == null) {
var origType = rpc.type;

@@ -589,12 +611,12 @@ rpc.set('original_type', origType);

this._clearRouteRequest(routeReq);
this.send(rpc);
};
/**
* Initiates a FIND_VALUE operation
**/
* Initiates a FIND_VALUE operation
**/
DHT.prototype.findValue = function(key) {
var randomRoute = this.routingTable.getRandomRoutes(
Globals.RecursiveRouting.MAX_RANDOM_NODES
)[0];
var randomRoute = this.routingTable.getRandomRoutes(Globals.RecursiveRouting.MAX_RANDOM_NODES)[0];
var rpc = new RPC();

@@ -616,8 +638,6 @@ rpc.type = 'FIND_VALUE';

/**
* Retries a FIND_VALUE operation
**/
* Retries a FIND_VALUE operation
**/
DHT.prototype._retryFindValue = function(rpc) {
var randomRoute = this.routingTable.getRandomRoutes(
Globals.RecursiveRouting.MAX_RANDOM_NODES
)[0];
var randomRoute = this.routingTable.getRandomRoutes(Globals.RecursiveRouting.MAX_RANDOM_NODES)[0];
rpc.set('attempts', (rpc.get('attempts') + 1));

@@ -631,4 +651,4 @@ rpc.type = 'FIND_VALUE';

/**
* Sends a FIND_VALUE message
**/
* Sends a FIND_VALUE message
**/
DHT.prototype.sendFindValue = function(rpc, addr, port, hopCount) {

@@ -641,2 +661,3 @@ rpc.address = addr;

/******** Message Handlers **********/

@@ -648,4 +669,4 @@ DHT.prototype.onSocketMessage = function(rpc) {

DHT.prototype._handleMessage = function(rpc) {
logger.debug('Received %s from %d', rpc.type, rpc.fromPort);
switch (rpc.type) {
//console.log('Received %s at %d from %d', rpc.type, this.port, rpc.fromPort);
switch(rpc.type) {
case 'PING':

@@ -674,4 +695,3 @@ this.onPingMessage(rpc);

break;
default:
this._handleCustomMessage(rpc);
default: this._handleCustomMessage(rpc);
}

@@ -681,21 +701,23 @@ };

/**
* A response from FIND_NODE rpc. Should contain node_id and peers for data.
**/
* A response from FIND_NODE rpc. Should contain node_id and peers for data.
**/
DHT.prototype.onAnnounceMessage = function(rpc) {
var self = this;
var _self = this;
var peers = rpc.get('peers');
peers.forEach(function(peer) {
self.routingTable.addRoute(peer.address, peer.port, peer.nodeId);
_self.routingTable.addRoute(peer.address, peer.port, peer.nodeId);
// Send ping to add this node to peer node routing tables
self.sendPing(peer.address, peer.port);
_self.sendPing(peer.address, peer.port);
});
logger.silly('Has %d routes', this.routingTable.getAllRoutes().length);
//console.log('PORT %d has %d routes', this.port, this.routingTable.getAllRoutes().length);
var def = this._getDeferredForRequest(rpc.id);
if (def) {
if(def) {
def.resolve({number_of_routes: this.routingTable.getAllRoutes().length});
}
};
DHT.prototype.onPingMessage = function(rpc) {

@@ -715,5 +737,5 @@ // Add the node to the routing table

/**
* For a FIND_NODE message, we get the closest peers to the given node ID
* up to a maximum K and return them
**/
* For a FIND_NODE message, we get the closest peers to the given node ID
* up to a maximum K and return them
**/
DHT.prototype.onFindNodeMessage = function(rpc) {

@@ -724,8 +746,8 @@ this.sendAnnounce(rpc);

/**
* Handles a STORE message.
* STEPS:
* 1) Get the nodes closest to the given key in the routing table
* 2) If this node's ID is closer than all peers, store the value locally
* 3) Else, Forward the STORE request to the closest peer
**/
* Handles a STORE message.
* STEPS:
* 1) Get the nodes closest to the given key in the routing table
* 2) If this node's ID is closer than all peers, store the value locally
* 3) Else, Forward the STORE request to the closest peer
**/
DHT.prototype.onStoreMessage = function(rpc) {

@@ -736,6 +758,28 @@ // Add a new routing link

if(rpc.get('data')) {
// This is a bulk replication operation
// Just load all the data and return
var replData = rpc.get('data');
rpc.set('stored_at', this.id);
for(var i = 0; i < replData.length; i++) {
var kv = replData[i];
var keyValidator = new this.key_validator_class(kv.key);
if(!keyValidator.isValid()) {
console.log('INVALID KEY: %s', key);
// Send reply
this.sendForward(rpc);
return;
}
this.cache.put(kv.key, kv.value);
}
this.sendForward(rpc);
return;
}
var token = rpc.get('key');
var keyValidator = new this.key_validator_class(token);
if (!keyValidator.isValid()) {
logger.warn('Invalid key: %s', token);
if(!keyValidator.isValid()) {
console.log('INVALID KEY: %s', token);
rpc.set('stored_at', this.id);

@@ -747,10 +791,11 @@ // Send reply

logger.silly('AT NODE FOR STORE: %s', this.id);
//console.log('AT NODE FOR STORE: %s', this.id);
var key = keyValidator.parse().key;
var hops = rpc.get('hop_count');
var closestRoute = null;
var randomRoute = false;
logger.silly('Random route: %j', randomRoute);
if (hops > Globals.RecursiveRouting.MAX_RANDOM_NODES) {
//console.log('RANDOM ROUTES AT PORT %d: %j', this.port, randomRoutes);
if(hops > Globals.RecursiveRouting.MAX_RANDOM_NODES) {
closestRoute = this.routingTable.getClosestPeers(rpc)[0];

@@ -764,3 +809,3 @@ } else {

// only 1 entry in the routing table. If so just store the value here
if (!closestRoute) {
if(!closestRoute) {
closestRoute = new RouteEntry(this.address, this.port, this.id);

@@ -772,14 +817,15 @@ randomRoute = false;

var address = closestRoute.address;
var port = closestRoute.port;
var isReplication = rpc.get('replication');
var address = closestRoute.address,
port = closestRoute.port,
value = rpc.get('value'),
isReplication = rpc.get('replication');
if (!randomRoute) {
if(!randomRoute) {
var myDistance = DistanceUtil.calcDistance(key, this.id);
var closestDistance = DistanceUtil.calcDistance(key, closestRoute.nodeId);
if (myDistance <= closestDistance) {
if(myDistance <= closestDistance) {
// Store here
logger.info('Stored a KV. %s: %s', rpc.get('key'), rpc.get('value'));
//console.log('VALUE STORED AT NODE: %d', this.port);
this.cache.put(token, rpc.get('value'));

@@ -791,3 +837,3 @@ rpc.set('stored_at', this.id);

// Replication step
if (!isReplication) {
if(!isReplication) {
this._replicate(rpc.get('key'), rpc.get('value'));

@@ -803,10 +849,13 @@ }

}
};
/**
* Handles FIND_VALUE message. Steps are the same as STORE but instead
* we are retrieving.
* TODO: Refactor both the onFindValue and onStore methods into a more abstract
* structure
**/
* Handles FIND_VALUE message. Steps are the same as STORE but instead
* we are retrieving.
* TODO: Refactor both the onFindValue and onStore methods into a more abstract
* structure
**/
DHT.prototype.onFindValueMessage = function(rpc) {

@@ -819,4 +868,4 @@ // Add a new routing link

var keyValidator = new this.key_validator_class(token);
if (!keyValidator.isValid()) {
logger.warn('Invalid key: %s', token);
if(!keyValidator.isValid()) {
console.log('INVALID KEY: %s', token);
rpc.set('found', false);

@@ -838,3 +887,3 @@ rpc.set('retrieved_at', this.id);

var value = this.cache.get(token);
if (value) {
if(value) {
rpc.set('value', value);

@@ -850,4 +899,4 @@ rpc.set('found', true);

var randomRoute = false;
logger.silly('RANDOM ROUTE AT PORT %d: %j', this.port, randomRoute);
if (hops > Globals.RecursiveRouting.MAX_RANDOM_NODES) {
//console.log('RANDOM ROUTES AT PORT %d: %j', this.port, randomRoutes);
if(hops > Globals.RecursiveRouting.MAX_RANDOM_NODES) {
closestRoute = this.routingTable.getClosestPeers(rpc)[0];

@@ -861,3 +910,3 @@ } else {

// only 1 entry in the routing table. If so just try to find the value here
if (!closestRoute) {
if(!closestRoute) {
closestRoute = new RouteEntry(this.address, this.port, this.id);

@@ -869,19 +918,16 @@ randomRoute = false;

var address = closestRoute.address;
var port = closestRoute.port;
var address = closestRoute.address,
port = closestRoute.port;
logger.info('RPC ID: %s\nINIT ID: %s\nLAST NODE ID: %s\n',
rpc.id, rpc.get('initiator_id'), rpc.get('node_id')
);
if (!randomRoute) {
//console.log('AT %d\nRPC ID: %s\nINIT ID: %s\nLAST NODE ID: %s\n', this.port, rpc.id, rpc.get('initiator_id'), rpc.get('node_id'));
if(!randomRoute) {
var myDistance = DistanceUtil.calcDistance(key, this.id);
var closestDistance = DistanceUtil.calcDistance(key, closestRoute.nodeId);
logger.silly('onStoreMessage. myDistance = %d, closestDist = %d',
myDistance, closestDistance
);
//console.log('onStoreMessage\nmyDistance = %d\nclosestDist = %d', myDistance, closestDistance);
if (myDistance <= closestDistance) {
if(myDistance <= closestDistance) {
// We have reached the closest node and we don't have the key
// return failure
logger.warn('At closest node with no value port: %d', this.port);
console.log('AT CLOSEST NODE WITH NO VALUE PORT: %d', this.port);
rpc.set('found', false);

@@ -897,3 +943,3 @@ rpc.set('retrieved_at', this.id);

} else {
logger.silly('Random hop #%d to peer %s:%d', hops - 1, address, port);
//console.log('RANDOM HOP #%d TO PORT %d', hops - 1, port);
this.sendFindValue(rpc, address, port, hops);

@@ -904,16 +950,15 @@ }

/**
* Handles a FORWARD message.
* Check to see if the initiating node ID is this DHT's ID
* If it is, the request has reached the initiator
* If not, remove the entry from the routing request list and
* forward the request to the next node in the chain
**/
// TODO Refactor: too many nested ifs and elses means the function can be split.
* Handles a FORWARD message.
* Check to see if the initiating node ID is this DHT's ID
* If it is, the request has reached the initiator
* If not, remove the entry from the routing request list and
* forward the request to the next node in the chain
**/
DHT.prototype.onForwardMessage = function(rpc) {
logger.silly('Received forward message.');
if (rpc.get('initiator_id') === this.id) {
logger.silly('Reached initiator.');
//console.log('RECEIVED FORWARD AT: %d', this.port);
if(rpc.get('initiator_id') === this.id) {
//console.log('REACHED INITIATOR');
var def = this._getDeferredForRequest(rpc.id);
if (rpc.get('is_custom_type')) {
if(rpc.get('is_custom_type')) {
this._handleCustomForwardMessage(rpc);

@@ -923,5 +968,5 @@ return;

// This callback will handle both FIND_VALUE and STORE RPC's
if (rpc.get('original_type') === 'FIND_VALUE') {
if(rpc.get('original_type') === 'FIND_VALUE') {
// Only resolve the promise if a value was found
if (rpc.get('found') === true) {
if(rpc.get('found') === true) {
def.resolve({

@@ -931,3 +976,3 @@ rpc: rpc

} else {
if (rpc.get('attempts') > Globals.RecursiveRouting.MAX_FIND_VALUE_ATTEMPTS) {
if(rpc.get('attempts') > Globals.RecursiveRouting.MAX_FIND_VALUE_ATTEMPTS) {
def.reject(new Error('No value found for key: ' + rpc.get('key')));

@@ -940,3 +985,5 @@ } else {

} else {
def.resolve({rpc: rpc});
def.resolve({
rpc: rpc
});
}

@@ -955,4 +1002,4 @@ } else {

var cbMap = this._getCallbacksForRPC(rpc.type);
if (!cbMap) {
logger.warn('Unsupported message type: %s', rpc.type);
if(!cbMap) {
console.log('Unsupported message type: %s', rpc.type);
return;

@@ -972,3 +1019,3 @@ }

var cbMap = this.customMessageMap[type];
if (!cbMap) {
if(!cbMap) {
return null;

@@ -988,5 +1035,5 @@ }

if (!cbMap) {
logger.warn('Unsupported message type: %s', rpc.get('original_type'));
def.reject(new Error('Unsupported message type: ' + rpc.get('original_type')));
if(!cbMap) {
console.log('Unsupported message type: %s', rpc.get('original_type'));
def.reject(new Error('Unsupported message type: ' +rpc.get('original_type') ));
return;

@@ -1000,22 +1047,18 @@ }

* Add a custom message type to the DHT
* @param {object} options - Object containing the following fields:
* messageType - A unique message type string
* onMessage - Callback called when a message is received with the specified message type
* onResponse - Callback called when the initiating node receives the response for a custom message
*
* @param {object} options - Object containing the following fields:
* @param options.messageType - A unique message type string
* @param options.onMessage - Callback called when a message is received with
* the specified message type.
* @param options.onResponse - Callback called when the initiating node receives
* the response for a custom message
*/
DHT.prototype.addCustomMessageHandler = function(options) {
var self = this;
if (!options.messageType || !options.onMessage) {
var _self = this;
if(!options.messageType || !options.onMessage) {
throw new Error('The following fields are required for custom message '+
'handlers: messageType, onMessage');
'handlers: messageType, onMessage');
}
this.customMessageMap[options.messageType] = {
onMessage: function(rpc) {
self._addRouteRequest(
rpc.id, rpc.get('node_id'), rpc.fromAddress, rpc.fromPort
);
options.onMessage.call(self, rpc);
_self._addRouteRequest(rpc.id, rpc.get('node_id'), rpc.fromAddress, rpc.fromPort);
options.onMessage.call(_self, rpc);
}

@@ -1031,10 +1074,10 @@ };

DHT.prototype.sendCustomRPC = function(type, rpc) {
if (!type || !this._getCallbacksForRPC(type)) {
if(!type || !this._getCallbacksForRPC(type)) {
throw 'No message handlers defined for message type: "' + type + '"';
}
if (!rpc.address || !rpc.port) {
if(!rpc.address || !rpc.port) {
throw 'Remote server address and port are required for RPC transport.';
}
if (!rpc.get('initiator_id')) {
if(!rpc.get('initiator_id')) {
rpc.set('initiator_id', this.id);

@@ -1052,4 +1095,4 @@ }

/**
* Signifies that the transmission and processing of the RPC is complete.
* At this point, responses are sent back to the initiating node.
* Signifies that the transmission and processing of the RPC is complete. At this point, responses
* are sent back to the initiating node.
* @param {RPC} rpc

@@ -1056,0 +1099,0 @@ */

@@ -0,0 +0,0 @@ /**

@@ -0,0 +0,0 @@ /**

@@ -0,0 +0,0 @@ /**

@@ -0,0 +0,0 @@ /**

@@ -0,0 +0,0 @@ var Globals = require(__dirname + '/globals');

@@ -0,0 +0,0 @@ var Globals = require(__dirname+'/globals');

@@ -0,0 +0,0 @@ var deferred = require('deferred');

@@ -0,0 +0,0 @@ var RouteEntry = require(__dirname + '/route_entry');

@@ -0,0 +0,0 @@ /**

@@ -0,0 +0,0 @@ /**

@@ -0,0 +0,0 @@ /**

@@ -0,0 +0,0 @@ /**

@@ -0,0 +0,0 @@ /**

@@ -0,0 +0,0 @@ var Globals = require(__dirname+'/globals');

@@ -0,0 +0,0 @@ The MIT License (MIT)

{
"name": "r5n",
"version": "0.2.6",
"version": "0.2.8",
"author": "Dave Hagman <dave@avatar.ai>",

@@ -43,5 +43,5 @@ "description": "An implementation of the R5N distributed hash table",

"jshint-stylish": "^0.1.5",
"exec-sync": "^0.1.6",
"execSync": "*",
"semver": "^2.3.0"
}
}

@@ -0,0 +0,0 @@ # Welcome to the R5N DHT Repository

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc