Comparing version 2.2.0 to 2.3.0
324
bin/fash.js
@@ -47,3 +47,3 @@ #!/usr/bin/env node | ||
if (opts.help) { | ||
if (opts.help || args.length !== 0 || !opts.v || !opts.p) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -54,8 +54,2 @@ return callback(err ? err : true); | ||
if (args.length !== 0 || !opts.v || !opts.p) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
return callback(err ? err : true); | ||
}); | ||
} | ||
var pnodes = opts.p.split(' '); | ||
@@ -86,13 +80,16 @@ switch(opts.b) { | ||
chash.serialize(function(_err, sh) { | ||
if (_err) { | ||
console.error(_err); | ||
return callback(_err); | ||
} | ||
if (opts.o) { | ||
console.log(sh); | ||
} | ||
if (opts.o) { | ||
chash.serialize(function(_err, sh) { | ||
if (_err) { | ||
console.error(_err); | ||
return callback(_err); | ||
} | ||
if (opts.o) { | ||
console.log(sh); | ||
} | ||
return (callback()); | ||
}); | ||
} else { | ||
return (callback()); | ||
}); | ||
return (undefined); | ||
} | ||
}); | ||
@@ -139,9 +136,4 @@ | ||
var self = this; | ||
if (opts.help) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
return callback(err ? err : true); | ||
}); | ||
} | ||
if (args.length !== 0 || !opts.l) { | ||
if (opts.help || args.length !== 0 || !opts.l) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -210,3 +202,4 @@ return callback(err ? err : true); | ||
var self = this; | ||
if (opts.help) { | ||
if (opts.help || args.length !== 0 || !opts.v || !opts.d || !opts.b) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -217,6 +210,7 @@ return callback(err ? err : true); | ||
if (args.length !== 0 || !opts.v || !opts.d) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
return callback(err ? err : true); | ||
}); | ||
if (opts.b !== BACKENDS.IN_MEMORY && opts.b !== BACKENDS.LEVEL_DB) { | ||
console.error(sprintf('backend must be one of %s or %s', | ||
BACKENDS.IN_MEMORY, | ||
BACKENDS.LEVEL_DB)); | ||
return callback(new Error()); | ||
} | ||
@@ -232,3 +226,3 @@ | ||
function prepInput(_, cb) { | ||
if (!opts.b || opts.b === BACKENDS.IN_MEMORY) { | ||
if (opts.b === BACKENDS.IN_MEMORY) { | ||
hashOptions.backend = fash.BACKEND.IN_MEMORY; | ||
@@ -256,2 +250,3 @@ constructor = fash.deserialize; | ||
if (!opts.l) { | ||
console.error('leveldb backend requires a location'); | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -264,6 +259,2 @@ return callback(err ? err : true); | ||
} | ||
} else { | ||
console.error('one of %j must be specified if not passing ' + | ||
'topology via stdin', BACKENDS); | ||
return (undefined); | ||
} | ||
@@ -347,3 +338,4 @@ return (undefined); | ||
var self = this; | ||
if (opts.help) { | ||
if (opts.help || args.length !== 0 || !opts.v || !opts.p || !opts.b) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -354,6 +346,7 @@ return callback(err ? err : true); | ||
if (args.length !== 0 || !opts.v || !opts.p) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
return callback(err ? err : true); | ||
}); | ||
if (opts.b !== BACKENDS.IN_MEMORY && opts.b !== BACKENDS.LEVEL_DB) { | ||
console.error(sprintf('backend must be one of %s or %s', | ||
BACKENDS.IN_MEMORY, | ||
BACKENDS.LEVEL_DB)); | ||
return callback(new Error()); | ||
} | ||
@@ -369,3 +362,3 @@ | ||
function prepInput(_, cb) { | ||
if (!opts.b || opts.b === BACKENDS.IN_MEMORY) { | ||
if (opts.b === BACKENDS.IN_MEMORY) { | ||
hashOptions.backend = fash.BACKEND.IN_MEMORY; | ||
@@ -393,2 +386,3 @@ constructor = fash.deserialize; | ||
if (!opts.l) { | ||
console.error('leveldb backend requires a location'); | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -401,6 +395,2 @@ return callback(err ? err : true); | ||
} | ||
} else { | ||
console.error('one of %j must be specified if not passing ' + | ||
'topology via stdin', BACKENDS); | ||
return (undefined); | ||
} | ||
@@ -483,3 +473,4 @@ return (undefined); | ||
var self = this; | ||
if (opts.help) { | ||
if (opts.help || args.length !== 0 || !opts.p || !opts.b) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -490,6 +481,7 @@ return callback(err ? err : true); | ||
if (args.length !== 0 || !opts.p) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
return callback(err ? err : true); | ||
}); | ||
if (opts.b !== BACKENDS.IN_MEMORY && opts.b !== BACKENDS.LEVEL_DB) { | ||
console.error(sprintf('backend must be one of %s or %s', | ||
BACKENDS.IN_MEMORY, | ||
BACKENDS.LEVEL_DB)); | ||
return callback(new Error()); | ||
} | ||
@@ -505,3 +497,3 @@ | ||
function prepInput(_, cb) { | ||
if (!opts.b || opts.b === BACKENDS.IN_MEMORY) { | ||
if (opts.b === BACKENDS.IN_MEMORY) { | ||
hashOptions.backend = fash.BACKEND.IN_MEMORY; | ||
@@ -529,2 +521,3 @@ constructor = fash.deserialize; | ||
if (!opts.l) { | ||
console.error('leveldb backend requires a location'); | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -537,6 +530,2 @@ return callback(err ? err : true); | ||
} | ||
} else { | ||
console.error('one of %j must be specified if not passing ' + | ||
'topology via stdin', BACKENDS); | ||
return (undefined); | ||
} | ||
@@ -554,4 +543,3 @@ return (undefined); | ||
if (_err) { | ||
return cb(new verror.VError(_err, | ||
'unable to print hash')); | ||
return cb(new verror.VError(_err, 'unable to print hash')); | ||
} | ||
@@ -601,5 +589,5 @@ if (opts.o) { | ||
Fash.prototype.do_get_node = function(subcmd, opts, args, callback) { | ||
Fash.prototype.do_get_pnodes = function(subcmd, opts, args, callback) { | ||
var self = this; | ||
if (opts.help) { | ||
if (opts.help || !opts.b) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -610,3 +598,96 @@ return callback(err ? err : true); | ||
if (args.length !== 1) { | ||
if (opts.b !== BACKENDS.IN_MEMORY && opts.b !== BACKENDS.LEVEL_DB) { | ||
console.error(sprintf('backend must be one of %s or %s', | ||
BACKENDS.IN_MEMORY, | ||
BACKENDS.LEVEL_DB)); | ||
return callback(new Error()); | ||
} | ||
var hashOptions = { | ||
log: self.log | ||
}; | ||
var hash; | ||
var constructor; | ||
vasync.pipeline({funcs: [ | ||
function prepInput(_, cb) { | ||
if (opts.b === BACKENDS.IN_MEMORY) { | ||
hashOptions.backend = fash.BACKEND.IN_MEMORY; | ||
constructor = fash.deserialize; | ||
if (opts.l) { | ||
hashOptions.topology = fs.readFileSync(opts.l, 'utf8'); | ||
return cb(); | ||
} else { | ||
hashOptions.topology = ''; | ||
process.stdin.resume(); | ||
process.stdin.setEncoding('utf8'); | ||
process.stdin.on('data', function(chunk) { | ||
hashOptions.topology += chunk; | ||
}); | ||
process.stdin.on('end', function() { | ||
return cb(); | ||
}); | ||
} | ||
} else if (opts.b === BACKENDS.LEVEL_DB) { | ||
hashOptions.backend = fash.BACKEND.LEVEL_DB; | ||
constructor = fash.load; | ||
if (!opts.l) { | ||
console.error('leveldb backend requires a location'); | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
return callback(err ? err : true); | ||
}); | ||
} else { | ||
hashOptions.location = opts.l; | ||
return cb(); | ||
} | ||
} | ||
return (undefined); | ||
}, | ||
function loadRing(_, cb) { | ||
hash = constructor(hashOptions, cb); | ||
}, | ||
function getPnodes(_, cb) { | ||
hash.getPnodes(function (err, pnodes) { | ||
if (err) { | ||
return cb (err); | ||
} | ||
console.log(pnodes); | ||
return cb(); | ||
}); | ||
} | ||
], arg: {}}, function(err) { | ||
if (err) { | ||
console.error(err); | ||
} | ||
return callback(err); | ||
}); | ||
return (undefined); | ||
}; | ||
Fash.prototype.do_get_pnodes.options = [{ | ||
names: [ 'l', 'location' ], | ||
type: 'string', | ||
help: 'the location of the topology, if using the in_memory backend, \n' + | ||
'this is the location of the serialized ring on disk, if using \n ' + | ||
'the leveldb backend, this is the path to the levedb on disk.' | ||
}, { | ||
names: [ 'b', 'backend' ], | ||
type: 'string', | ||
help: 'the backend to use' | ||
}]; | ||
Fash.prototype.do_get_pnodes.help = ( | ||
'get all the pnodes in the ring' | ||
+ '\n' | ||
+ 'usage:\n' | ||
+ ' fash get_pnodes [options]\n' | ||
+ '\n' | ||
+ '{{options}}' | ||
); | ||
Fash.prototype.do_get_vnodes = function(subcmd, opts, args, callback) { | ||
var self = this; | ||
if (opts.help || !opts.b || args.length !== 1) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -617,2 +698,9 @@ return callback(err ? err : true); | ||
if (opts.b !== BACKENDS.IN_MEMORY && opts.b !== BACKENDS.LEVEL_DB) { | ||
console.error(sprintf('backend must be one of %s or %s', | ||
BACKENDS.IN_MEMORY, | ||
BACKENDS.LEVEL_DB)); | ||
return callback(new Error()); | ||
} | ||
var hashOptions = { | ||
@@ -626,2 +714,101 @@ log: self.log | ||
function prepInput(_, cb) { | ||
if (opts.b === BACKENDS.IN_MEMORY) { | ||
hashOptions.backend = fash.BACKEND.IN_MEMORY; | ||
constructor = fash.deserialize; | ||
if (opts.l) { | ||
hashOptions.topology = fs.readFileSync(opts.l, 'utf8'); | ||
return cb(); | ||
} else { | ||
hashOptions.topology = ''; | ||
process.stdin.resume(); | ||
process.stdin.setEncoding('utf8'); | ||
process.stdin.on('data', function(chunk) { | ||
hashOptions.topology += chunk; | ||
}); | ||
process.stdin.on('end', function() { | ||
return cb(); | ||
}); | ||
} | ||
} else if (opts.b === BACKENDS.LEVEL_DB) { | ||
hashOptions.backend = fash.BACKEND.LEVEL_DB; | ||
constructor = fash.load; | ||
if (!opts.l) { | ||
console.error('leveldb backend requires a location'); | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
return callback(err ? err : true); | ||
}); | ||
} else { | ||
hashOptions.location = opts.l; | ||
return cb(); | ||
} | ||
} | ||
return (undefined); | ||
}, | ||
function loadRing(_, cb) { | ||
hash = constructor(hashOptions, cb); | ||
}, | ||
function getVnodes(_, cb) { | ||
hash.getVnodes(args[0], function (err, vnodes) { | ||
if (err) { | ||
return cb (err); | ||
} | ||
console.log(vnodes); | ||
return cb(); | ||
}); | ||
} | ||
], arg: {}}, function(err) { | ||
if (err) { | ||
console.error(err); | ||
} | ||
return callback(err); | ||
}); | ||
return (undefined); | ||
}; | ||
Fash.prototype.do_get_vnodes.options = [{ | ||
names: [ 'l', 'location' ], | ||
type: 'string', | ||
help: 'the location of the topology, if using the in_memory backend, \n' + | ||
'this is the location of the serialized ring on disk, if using \n ' + | ||
'the leveldb backend, this is the path to the levedb on disk.' | ||
}, { | ||
names: [ 'b', 'backend' ], | ||
type: 'string', | ||
help: 'the backend to use' | ||
}]; | ||
Fash.prototype.do_get_vnodes.help = ( | ||
'get the vnodes owned by a pnode' | ||
+ '\n' | ||
+ 'usage:\n' | ||
+ ' fash get_vnodes [options] pnode\n' | ||
+ '\n' | ||
+ '{{options}}' | ||
); | ||
Fash.prototype.do_get_node = function(subcmd, opts, args, callback) { | ||
var self = this; | ||
if (opts.help || !opts.b || args.length !== 1) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
return callback(err ? err : true); | ||
}); | ||
} | ||
if (opts.b !== BACKENDS.IN_MEMORY && opts.b !== BACKENDS.LEVEL_DB) { | ||
console.error(sprintf('backend must be one of %s or %s', | ||
BACKENDS.IN_MEMORY, | ||
BACKENDS.LEVEL_DB)); | ||
return callback(new Error()); | ||
} | ||
var hashOptions = { | ||
log: self.log | ||
}; | ||
var hash; | ||
var constructor; | ||
vasync.pipeline({funcs: [ | ||
function prepInput(_, cb) { | ||
if (!opts.b || opts.b === BACKENDS.IN_MEMORY) { | ||
@@ -650,2 +837,3 @@ hashOptions.backend = fash.BACKEND.IN_MEMORY; | ||
if (!opts.l) { | ||
console.error('leveldb backend requires a location'); | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -658,6 +846,2 @@ return callback(err ? err : true); | ||
} | ||
} else { | ||
console.error('one of %j must be specified if not passing ' + | ||
'topology via stdin', BACKENDS); | ||
return (undefined); | ||
} | ||
@@ -709,3 +893,3 @@ return (undefined); | ||
var self = this; | ||
if (opts.help) { | ||
if (opts.help || !opts.b) { | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -716,2 +900,9 @@ return callback(err ? err : true); | ||
if (opts.b !== BACKENDS.IN_MEMORY && opts.b !== BACKENDS.LEVEL_DB) { | ||
console.error(sprintf('backend must be one of %s or %s', | ||
BACKENDS.IN_MEMORY, | ||
BACKENDS.LEVEL_DB)); | ||
return callback(new Error()); | ||
} | ||
var hashOptions = { | ||
@@ -725,3 +916,3 @@ log: self.log | ||
function prepInput(_, cb) { | ||
if (!opts.b || opts.b === BACKENDS.IN_MEMORY) { | ||
if (opts.b === BACKENDS.IN_MEMORY) { | ||
hashOptions.backend = fash.BACKEND.IN_MEMORY; | ||
@@ -749,2 +940,3 @@ constructor = fash.deserialize; | ||
if (!opts.l) { | ||
console.error('leveldb backend requires a location'); | ||
this.do_help('help', {}, [subcmd], function(err) { | ||
@@ -757,6 +949,2 @@ return callback(err ? err : true); | ||
} | ||
} else { | ||
console.error('one of %j must be specified if not passing ' + | ||
'topology via stdin', BACKENDS); | ||
return (undefined); | ||
} | ||
@@ -763,0 +951,0 @@ return (undefined); |
@@ -10,2 +10,3 @@ /** | ||
var common = require('../common'); | ||
var dtrace = require('../dtrace'); | ||
var fash = require('../index'); | ||
@@ -45,2 +46,3 @@ var util = require('util'); | ||
function ConsistentHash(options, callback) { | ||
dtrace._fash_probes['new-start'].fire(function() { return([]); }); | ||
assert.object(options, 'options'); | ||
@@ -139,6 +141,5 @@ | ||
log.info('ConsistentHash.new: finished deserializing'); | ||
log.debug({ | ||
pnodeToVnodeMap: self.pnodeToVnodeMap_, | ||
vnodeToPnodeMap: self.vnodeToPnodeMap_ | ||
}, 'ConsistentHash.new: topology'); | ||
dtrace._fash_probes['new-done'].fire(function() { | ||
return ([null, 'deserialize']); | ||
}); | ||
} else { | ||
@@ -152,2 +153,5 @@ log.info('instantiating new ring from scratch.'); | ||
'duplicate pnodes in input'); | ||
dtrace._fash_probes['new-done'].fire(function() { | ||
return ([err ? err.message : null, 'createNewRing']); | ||
}); | ||
if (callback) { | ||
@@ -199,2 +203,5 @@ return callback(err); | ||
allocateVnode(); | ||
dtrace._fash_probes['new-done'].fire(function() { | ||
return ([null, 'createNewRing']); | ||
}); | ||
} | ||
@@ -228,2 +235,5 @@ | ||
ConsistentHash.prototype.addData = function addData(vnode, data, cb) { | ||
dtrace._fash_probes['adddata-start'].fire(function() { | ||
return([vnode, data]); | ||
}); | ||
var self = this; | ||
@@ -271,6 +281,9 @@ var log = self.log; | ||
dtrace._fash_probes['adddata-done'].fire(function() { | ||
return([null, vnode, data]); | ||
}); | ||
if (cb) { | ||
return cb(); | ||
} | ||
return (undefined); | ||
@@ -303,2 +316,5 @@ }; | ||
ConsistentHash.prototype.remapVnode = function remapVnode(newPnode, vnodes, cb) { | ||
dtrace._fash_probes['remapvnode-start'].fire(function() { | ||
return ([newPnode, vnodes]); | ||
}); | ||
var self = this; | ||
@@ -315,2 +331,3 @@ var log = self.log; | ||
assert.optionalArrayOfNumber(vnodes, 'vnodes'); | ||
assert.optionalFunc(cb, 'callback'); | ||
@@ -324,17 +341,26 @@ // assert the vnodes, ensuring that: | ||
vnodes.forEach(function(v) { | ||
var err; | ||
if ((v > self.vnodeCount_) || (v < 0)) { | ||
throw new verror.VError('vnode ' + v + | ||
' does not exist in the ring'); | ||
} | ||
if (vnodeMap[v]) { | ||
throw new verror.VError('vnode ' + v + | ||
err = new verror.VError('vnode ' + v + | ||
' specified more than once'); | ||
} else if (vnodeMap[v]) { | ||
err = new verror.VError('vnode ' + v + | ||
' specified more than once'); | ||
} else if (self.vnodeToPnodeMap_[v].pnode === newPnode) { | ||
// check that the vnode doesn't already belong to the newPnode. | ||
err = new verror.VError('vnode ' + v + | ||
' already belongs to pnode'); | ||
} | ||
if (err) { | ||
dtrace._fash_probes['remapvnode-done'].fire(function() { | ||
return ([err ? err.message : null, newPnode, oldPnode, | ||
vnode]); | ||
}); | ||
if (cb) { | ||
return cb(err); | ||
} | ||
throw err; | ||
} | ||
vnodeMap[v] = true; | ||
// check that the vnode doesn't already belong to the newPnode. | ||
if (self.vnodeToPnodeMap_[v].pnode === newPnode) { | ||
throw new verror.VError('vnode ' + v + | ||
' already belongs to pnode'); | ||
} | ||
}); | ||
@@ -399,6 +425,8 @@ } | ||
dtrace._fash_probes['remapvnode-done'].fire(function() { | ||
return ([null, newPnode, oldPnode, vnode]); | ||
}); | ||
if (cb) { | ||
return cb(null); | ||
} | ||
return (undefined); | ||
@@ -417,6 +445,9 @@ }; | ||
* @param {String} cb.newTopology.vnode The number of vnodes in the ring. | ||
* @param {Object} cb.changedNodes The pnode->vnode mapping of the nodes that have | ||
* changed. | ||
* @param {Object} cb.changedNodes The pnode->vnode mapping of the nodes that | ||
* have changed. | ||
*/ | ||
ConsistentHash.prototype.removePnode = function removePnode(pnode, cb) { | ||
dtrace._fash_probes['removepnode-start'].fire(function() { | ||
return ([pnode]); | ||
}); | ||
var self = this; | ||
@@ -430,12 +461,23 @@ var log = self.log; | ||
assert.string(pnode, 'pnode'); | ||
assert.optionalFunc(cb, 'callback'); | ||
// check that the pnode exists | ||
var err; | ||
var vnodes = self.pnodeToVnodeMap_[pnode]; | ||
if (!vnodes) { | ||
var msg = sprintf('pnode %s not in ring, skipping', pnode); | ||
throw new verror.VError(msg); | ||
err = new verror.VError( | ||
sprintf('pnode %s not in ring, skipping', pnode)); | ||
} else if (Object.keys(vnodes).length > 0) { | ||
err = new verror.VError('pnode still maps to vnodes, re-assign ' + | ||
'vnodes first'); | ||
} | ||
if (Object.keys(vnodes).length > 0) { | ||
var errMsg = 'pnode still maps to vnodes, re-assign vnodes first'; | ||
throw new verror.VError(errMsg); | ||
if (err) { | ||
dtrace._fash_probes['removepnode-done'].fire(function() { | ||
return ([err ? err.message : null, pnode]); | ||
}); | ||
if (cb) { | ||
return cb(err); | ||
} | ||
throw err; | ||
} | ||
@@ -452,6 +494,9 @@ | ||
dtrace._fash_probes['removepnode-done'].fire(function() { | ||
return ([err ? err.message : null, pnode]); | ||
}); | ||
if (cb) { | ||
return cb(); | ||
} | ||
return (undefined); | ||
@@ -527,2 +572,5 @@ }; | ||
ConsistentHash.prototype.getNode = function getNode(key, cb) { | ||
dtrace._fash_probes['getnode-start'].fire(function() { | ||
return ([key]); | ||
}); | ||
assert.optionalFunc(cb, 'callback'); | ||
@@ -534,2 +582,5 @@ var value = crypto.createHash(this.algorithm_.NAME).update(key).digest('hex'); | ||
var data = this.pnodeToVnodeMap_[pnode][vnode]; | ||
dtrace._fash_probes['getnode-done'].fire(function() { | ||
return([null, key, value, pnode, vnode, data]); | ||
}); | ||
if (cb) { | ||
@@ -536,0 +587,0 @@ return cb(null, {pnode: pnode, vnode: vnode, data: data}); |
@@ -10,2 +10,3 @@ /** | ||
var dtrace = require('../dtrace'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var fash = require('../index'); | ||
@@ -76,2 +77,3 @@ var crypto = require('crypto'); | ||
function ConsistentHash(options, cb) { | ||
dtrace._fash_probes['new-start'].fire(function() { return([]); }); | ||
assert.object(options, 'options'); | ||
@@ -116,3 +118,2 @@ assert.optionalObject(options.leveldbCfg, 'options.leveldbCfg'); | ||
dtrace['new-start'].fire(function() { return([]); }); | ||
/** | ||
@@ -193,5 +194,53 @@ * 1) create 'VNODE_COUNT' key which keeps track of the # of vnodes. | ||
function allocateVnodes(_, _cb) { | ||
log.trace('allocateVnodes'); | ||
_cb = once(_cb); | ||
_.pnodeToVnodeMap = {}; | ||
for (var vnode = 0; vnode < self.vnodeCount_; vnode++) { | ||
/* | ||
* #21 Batch up the vnode puts here. Becauase we are running in | ||
* a tight for loop, _every_ put get enqueued onto the node | ||
* work queue before they actually get procecessed. This means | ||
* that we are allocating a huge amount of memory and not | ||
* deallocating it until every node has been enqueued. On | ||
* sufficiently large counts of vnodes, say 10 000 000, we | ||
* effectively fragment the heap such that either malloc(1), | ||
* mmap(1) or v8 will fail to grow the heap because of | ||
* fragmentation and cause the process to fail with OOME. Hence | ||
* we want to batch up the puts in 1000 vnode increments and | ||
* let them finish before enqueueing more puts. | ||
*/ | ||
var batch = _.db.batch(); | ||
// use this emitter to control the serial puts of vnodes. | ||
var emitter = new EventEmitter(); | ||
emitter.on('enqueue', function (vnode) { | ||
if (vnode >= self.vnodeCount_) { | ||
//done -- then batch up any remaining operations. | ||
batch.write(function (err) { | ||
if (err) { | ||
return _cb(new verror.VError('unable to ' + | ||
'allocate some vnodes')); | ||
} | ||
return _cb(); | ||
}); | ||
} else if (vnode % 1000 === 1) { | ||
batch.write(function (err) { | ||
if (err) { | ||
return _cb(new verror.VError('unable to ' + | ||
'allocate some vnodes')); | ||
} | ||
batch = _.db.batch(); | ||
// only invoke putVnode when the batch has finished | ||
allocateVnodeImpl(vnode, function () { | ||
emitter.emit('enqueue', ++vnode); | ||
}); | ||
}); | ||
} else { | ||
allocateVnodeImpl(vnode, function () { | ||
emitter.emit('enqueue', ++vnode); | ||
}); | ||
} | ||
}); | ||
var allocateVnodeImpl = function (vnode, _cb1) { | ||
var pnode = self.pnodes_[vnode % self.pnodes_.length]; | ||
@@ -211,16 +260,8 @@ var hashspace = common.findHashspace({ | ||
/** | ||
/* | ||
* assign the pnode->vnode and vnode->pnode maps | ||
* set the data here to null since this is a new ring | ||
*/ | ||
_.db.put(sprintf(LKEY_VNODE_V, vnode), | ||
pnode, | ||
function(err) | ||
{ | ||
if (err) { | ||
err = new verror.VError(err); | ||
} | ||
return _cb(err); | ||
}); | ||
/** | ||
batch.put(sprintf(LKEY_VNODE_V, vnode), pnode); | ||
/* | ||
* we put the vnode in the path, to avoid having to put all | ||
@@ -230,8 +271,3 @@ * vnodes under 1 key | ||
var pnodePath = sprintf(LKEY_PNODE_P_V, pnode, vnode); | ||
_.db.put(pnodePath, LVAL_NULL, function(err) { | ||
if (err) { | ||
err = new verror.VError(err); | ||
} | ||
return _cb(err); | ||
}); | ||
batch.put(pnodePath, LVAL_NULL); | ||
// cache the pnopdeToVnode mapping for step 4 | ||
@@ -247,7 +283,11 @@ if (!_.pnodeToVnodeMap[pnode]) { | ||
}, 'ConsistentHash.new: added vnode to pnode'); | ||
} | ||
return _cb(); | ||
return _cb1(); | ||
}; | ||
emitter.emit('enqueue', 0); | ||
}, | ||
// step 4 | ||
function writePnodeKeys(_, _cb) { | ||
log.trace('writePnodeKeys'); | ||
_cb = once(_cb); | ||
@@ -323,3 +363,3 @@ var pnodeMap = {}; | ||
}, 'finished instantiated new ring'); | ||
dtrace['new-done'].fire(function() { | ||
dtrace._fash_probes['new-done'].fire(function() { | ||
return ([err ? err.message : null, 'createNewRing']); | ||
@@ -481,3 +521,3 @@ }); | ||
}, 'finished deserializing ring'); | ||
dtrace['new-done'].fire(function() { | ||
dtrace._fash_probes['new-done'].fire(function() { | ||
return ([err ? err.message : null, 'deserialize']); | ||
@@ -565,3 +605,3 @@ }); | ||
} | ||
dtrace['new-done'].fire(function() { | ||
dtrace._fash_probes['new-done'].fire(function() { | ||
return ([err ? err.message : null, 'loadFromDb']); | ||
@@ -597,3 +637,3 @@ }); | ||
ConsistentHash.prototype.getNode = function getNode(key, callback) { | ||
dtrace['getnode-start'].fire(function() { | ||
dtrace._fash_probes['getnode-start'].fire(function() { | ||
return ([key]); | ||
@@ -610,3 +650,3 @@ }); | ||
err = new verror.VError(err); | ||
dtrace['getnode-done'].fire(function() { | ||
dtrace._fash_probes['getnode-done'].fire(function() { | ||
return([err.message, key, value, null, vnode]); | ||
@@ -623,3 +663,3 @@ }); | ||
dtrace['getnode-done'].fire(function() { | ||
dtrace._fash_probes['getnode-done'].fire(function() { | ||
return([_err ? _err.message : null, | ||
@@ -646,3 +686,3 @@ key, value, pnode, vnode, data]); | ||
ConsistentHash.prototype.addData = function addData(vnode, data, cb) { | ||
dtrace['adddata-start'].fire(function() { | ||
dtrace._fash_probes['adddata-start'].fire(function() { | ||
return([vnode, data]); | ||
@@ -726,4 +766,4 @@ }); | ||
], arg:{}}, function(err) { | ||
dtrace['adddata-done'].fire(function() { | ||
return([err ? err.message : null]); | ||
dtrace._fash_probes['adddata-done'].fire(function() { | ||
return([err ? err.message : null, vnode, data]); | ||
}); | ||
@@ -766,3 +806,3 @@ return cb(err); | ||
ConsistentHash.prototype.remapVnode = function remapVnode(newPnode, vnode, cb) { | ||
dtrace['remapvnode-start'].fire(function() { | ||
dtrace._fash_probes['remapvnode-start'].fire(function() { | ||
return ([newPnode, vnode]); | ||
@@ -918,3 +958,3 @@ }); | ||
log.info({err: err}, 'ConsistentHash.remapVnode: exiting'); | ||
dtrace['remapvnode-done'].fire(function() { | ||
dtrace._fash_probes['remapvnode-done'].fire(function() { | ||
return ([err ? err.message : null, newPnode, oldPnode, vnode]); | ||
@@ -985,3 +1025,3 @@ }); | ||
ConsistentHash.prototype.removePnode = function removePnode(pnode, cb) { | ||
dtrace['removepnode-start'].fire(function() { | ||
dtrace._fash_probes['removepnode-start'].fire(function() { | ||
return ([pnode]); | ||
@@ -1047,3 +1087,3 @@ }); | ||
], arg: {}}, function(err) { | ||
dtrace['removepnode-done'].fire(function() { | ||
dtrace._fash_probes['removepnode-done'].fire(function() { | ||
return ([err ? err.message : null, pnode]); | ||
@@ -1066,3 +1106,3 @@ }); | ||
ConsistentHash.prototype.serialize = function serialize(callback) { | ||
dtrace['serialize-start'].fire(function() { | ||
dtrace._fash_probes['serialize-start'].fire(function() { | ||
return ([]); | ||
@@ -1149,3 +1189,3 @@ }); | ||
dtrace['serialize-done'].fire(function() { | ||
dtrace._fash_probes['serialize-done'].fire(function() { | ||
return ([err ? err.message : null]); | ||
@@ -1152,0 +1192,0 @@ }); |
@@ -5,8 +5,2 @@ /** | ||
var dtrace = require('dtrace-provider'); | ||
var DTraceProvider = dtrace.DTraceProvider; | ||
var PROBES = { | ||
@@ -26,7 +20,7 @@ 'new-start': [], | ||
'adddata-start': ['int', 'char *'], | ||
// err | ||
'adddata-done': ['char *'], | ||
// err, vnode, data | ||
'adddata-done': ['char *', 'int', 'char *'], | ||
// newPnode, vnode | ||
'remapvnode-start': ['char *', 'int'], | ||
// err | ||
// err, newPnode, oldPnode, vnode | ||
'remapvnode-done': ['char *', 'char *', 'char *', 'int'], | ||
@@ -41,23 +35,38 @@ // pnode | ||
///--- API | ||
module.exports = function exportStaticProvider() { | ||
if (!PROVIDER) { | ||
PROVIDER = dtrace.createDTraceProvider('node-fash'); | ||
if (!PROVIDER) { | ||
try { | ||
var dtrace = require('dtrace-provider'); | ||
PROVIDER = dtrace.createDTraceProvider('node-fash'); | ||
} catch (e) { | ||
PROVIDER = { | ||
fire: function () {}, | ||
enable: function () {}, | ||
addProbe: function () { | ||
var p = { | ||
fire: function () {} | ||
}; | ||
return (p); | ||
}, | ||
removeProbe: function () {}, | ||
disable: function () {} | ||
}; | ||
} | ||
PROVIDER._fash_probes = {}; | ||
PROVIDER._fash_probes = {}; | ||
Object.keys(PROBES).forEach(function (p) { | ||
var args = PROBES[p].splice(0); | ||
args.unshift(p); | ||
Object.keys(PROBES).forEach(function (p) { | ||
var args = PROBES[p].splice(0); | ||
args.unshift(p); | ||
var probe = PROVIDER.addProbe.apply(PROVIDER, args); | ||
PROVIDER._fash_probes[p] = probe; | ||
}); | ||
var probe = PROVIDER.addProbe.apply(PROVIDER, args); | ||
PROVIDER._fash_probes[p] = probe; | ||
}); | ||
PROVIDER.enable(); | ||
} | ||
PROVIDER.enable(); | ||
} | ||
return (PROVIDER); | ||
return (PROVIDER); | ||
}(); |
@@ -17,2 +17,4 @@ { | ||
"consistent hashing", | ||
"consistent hash", | ||
"hash", | ||
"hashing", | ||
@@ -23,3 +25,3 @@ "distributed", | ||
], | ||
"version": "2.2.0", | ||
"version": "2.3.0", | ||
"repository": { | ||
@@ -42,4 +44,3 @@ "type": "git", | ||
"dashdash": "1.2.1", | ||
"dtrace-provider": "0.2.8", | ||
"levelup": "git://github.com/yunong/node-levelup.git#c8d0bcb", | ||
"levelup": "git://github.com/yunong/node-levelup.git#273cbda", | ||
"leveldown": "0.8.0", | ||
@@ -51,2 +52,5 @@ "once": "1.1.1", | ||
}, | ||
"optionalDependencies": { | ||
"dtrace-provider": "0.2.8" | ||
}, | ||
"devDependencies": { | ||
@@ -56,3 +60,4 @@ "node-uuid": "1.4.1", | ||
"bunyan": "*", | ||
"lodash": "1.3.1" | ||
"lodash": "1.3.1", | ||
"restify": "2.6.0" | ||
}, | ||
@@ -59,0 +64,0 @@ "bin": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Git dependency
Supply chain riskContains a dependency which resolves to a remote git URL. Dependencies fetched from git URLs are not immutable can be used to inject untrusted code or reduce the likelihood of a reproducible install.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 2 instances in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Git dependency
Supply chain riskContains a dependency which resolves to a remote git URL. Dependencies fetched from git URLs are not immutable can be used to inject untrusted code or reduce the likelihood of a reproducible install.
Found 1 instance in 1 package
4435761
4889
5
188
23
2