Comparing version 0.1.1 to 0.2.0
655
index.js
var net = require('net'), | ||
events = require('events'), | ||
stream = require('stream'), | ||
ResponseParser = require('./lib/responseParser').ResponseParser, | ||
responseTypes = ResponseParser.responseTypes | ||
util = require('util'), | ||
defaultPort = 8673, | ||
defaultHost = '127.0.0.1' | ||
events = require('events'), | ||
stream = require('stream'), | ||
ResponseParser = require('./lib/responseParser').ResponseParser, | ||
responseTypes = ResponseParser.responseTypes | ||
util = require('util'), | ||
defaultPort = 8673, | ||
defaultHost = '127.0.0.1' | ||
/** | ||
* A client for BloomD (https://github.com/armon/bloomd) | ||
* | ||
* | ||
* Requires Node 0.10's stream transformations. | ||
* | ||
* Opens a single stream and continually writes data to it, offloading the | ||
* Opens a single stream and continually writes data to it, offloading the | ||
* resulting data to a parser and then applying a queued callback to the response. | ||
* This relies on the fact that queries to bloomd are answered in the | ||
* order that they were made. | ||
* This relies on the fact that queries to bloomd are answered in the | ||
* order that they were made. | ||
* | ||
@@ -25,10 +25,10 @@ * The documentation consistently states that checks will return true if the key | ||
* More info here: http://en.wikipedia.org/wiki/Bloom_filter | ||
* | ||
* | ||
* TODO(jamie) | ||
* + Stream Write Buffering | ||
* + Command Buffering for quick calls after client creation. | ||
* + Partial block rememberance, to avoid rework. | ||
* + Retry and reconnect support | ||
* + Handle list with prefix after safe commands. | ||
* + Safe creation after dropping | ||
* + More Error checking | ||
* ? StreamNoDelay configuration | ||
* ? Rename setSafe command to set, and set to setUnchecked | ||
* | ||
@@ -43,152 +43,44 @@ * Options are: | ||
function BloomClient(stream, options) { | ||
this.stream = stream | ||
this.options = options | ||
this.responseParser = new ResponseParser(this) | ||
this.ready = false | ||
this.commandQueue = [] | ||
this.offlineQueue = [] | ||
this.stream = stream | ||
this.options = options | ||
this.responseParser = new ResponseParser(this) | ||
this.ready = false | ||
this.commandQueue = [] | ||
this.offlineQueue = [] | ||
this.commandsSent = 0 | ||
this.filterQueues = {} | ||
var self = this | ||
var self = this | ||
stream.on('connect', function() { | ||
self.onConnect() | ||
}) | ||
stream.on('error', function(message) { | ||
self.onError(message.message) | ||
}) | ||
stream.on('close', function() { | ||
self.connectionClosed('close') | ||
}) | ||
stream.on('connect', function() { | ||
self._onConnect() | ||
}) | ||
stream.on('end', function() { | ||
self.connectionClosed('end') | ||
}) | ||
stream.on('error', function(message) { | ||
self._onError(message.message) | ||
}) | ||
this.stream.pipe(this.responseParser) | ||
this.responseParser.on('readable', function() { | ||
self.onReadable() | ||
}) | ||
events.EventEmitter.call(this) | ||
} | ||
util.inherits(BloomClient, events.EventEmitter) | ||
stream.on('close', function() { | ||
self._connectionClosed('close') | ||
}) | ||
/** | ||
* Fires when the parser is able to send back a complete response from the server. | ||
* | ||
* Because operations are performed in the order they are received, we can safely | ||
* unshift a command off the queue and use it to match the response to the callback | ||
* that is waiting for it. | ||
*/ | ||
BloomClient.prototype.onReadable = function () { | ||
var response | ||
while (response = this.responseParser.read()) { | ||
var command = this.commandQueue.shift(), | ||
error = null, | ||
data = null | ||
try { | ||
switch (command.responseType) { | ||
case responseTypes.BOOL: | ||
data = ResponseParser.parseBool(response) | ||
break | ||
case responseTypes.BOOL_LIST: | ||
data = ResponseParser.parseBoolList(response, command.arguments.slice(2)) | ||
break | ||
case responseTypes.FILTER_LIST: | ||
data = ResponseParser.parseFilterList(response) | ||
break | ||
case responseTypes.CONFIRMATION: | ||
data = ResponseParser.parseConfirmation(response) | ||
break | ||
case responseTypes.INFO: | ||
// command.arguments[1] is the name of the filter, passed back for completeness. | ||
data = ResponseParser.parseInfo(response, command.arguments[1]) | ||
break | ||
default: | ||
throw new Error('Unknown response type: ' + command.responseType) | ||
break | ||
} | ||
} catch (err) { | ||
error = err | ||
} | ||
// Callbacks are optional. | ||
if (command.callback) { | ||
command.callback(error, data) | ||
} | ||
} | ||
} | ||
stream.on('end', function() { | ||
self._connectionClosed('end') | ||
}) | ||
/** | ||
* Fires when the underlying stream connects. | ||
* | ||
* TODO(jamie) Support queuing of commands before ready, and then flush them here. | ||
*/ | ||
BloomClient.prototype.onConnect = function () { | ||
if (this.options.debug) { | ||
console.log('Connected to ' + this.options.host + ':' + this.options.port) | ||
} | ||
stream.on('drain', function () { | ||
self._drain() | ||
}) | ||
this.ready = true | ||
this.emit('ready') | ||
} | ||
this.stream.pipe(this.responseParser) | ||
/** | ||
* Fires when there is an error on the underlying stream. | ||
*/ | ||
BloomClient.prototype.onError = function (msg) { | ||
var message = 'Connection failed to ' + this.options.host + ':' + this.options.port + ' (' + msg + ')' | ||
if (this.options.debug) { | ||
console.warn(message) | ||
} | ||
this.responseParser.on('readable', function() { | ||
self._onReadable() | ||
}) | ||
this.connected = false | ||
this.emit('error', new Error(message)) | ||
this.connectionClosed('error') | ||
} | ||
events.EventEmitter.call(this) | ||
/** | ||
* Fires when a connection is closed, either through error, or naturally. | ||
* | ||
* TODO(jamie) Support reconnects, and flushing of queue for natural closure. | ||
* | ||
* @param {string} reason | ||
*/ | ||
BloomClient.prototype.connectionClosed = function (reason) { | ||
if (this.options.debug) { | ||
console.warn('Connection closed (' + reason + ')') | ||
} | ||
} | ||
util.inherits(BloomClient, events.EventEmitter) | ||
/** | ||
* Sends a command to the server. | ||
* | ||
* @param {string} command | ||
* @param {Array} args | ||
* @param {string} responseType one of ResponseParser.responseTypes | ||
* @param {Function} callback | ||
*/ | ||
BloomClient.prototype.send = function(command, args, responseType, callback) { | ||
args = args || [] | ||
args.unshift(command) | ||
var line = args.join(' ') + '\n' | ||
this.commandQueue.push({ | ||
arguments: args, | ||
responseType: responseType, | ||
callback: callback | ||
}) | ||
this.stream.write(line) | ||
} | ||
// API | ||
@@ -202,5 +94,5 @@ | ||
BloomClient.prototype.isReady = function() { | ||
return this.ready | ||
return this.ready | ||
} | ||
/** | ||
@@ -213,3 +105,3 @@ * Closes the connection to BloomD | ||
// Commands | ||
// Standard Bloomd Commands | ||
@@ -219,11 +111,11 @@ /** | ||
* | ||
* A number of options are available. Stated defaults are for those of | ||
* A number of options are available. Stated defaults are for those of | ||
* a bloomd server with default configuration. Check your server for | ||
* specifics. | ||
* | ||
* prob [0.001] - The desired probability of false positives. | ||
* prob [0.0001] - The desired probability of false positives. | ||
* capacity [100000] - The required initial capacity of the filter. | ||
* in_memory [0] - Whether the filter should exist only in memory, with no disk backing. | ||
* | ||
* The data passed back to the callback will be true on success, null otherwise. | ||
* The data passed back to the callback will be true on success, null otherwise. | ||
* | ||
@@ -235,2 +127,3 @@ * @param {string} filterName | ||
BloomClient.prototype.create = function (filterName, options, callback) { | ||
var self = this | ||
var args = [filterName] | ||
@@ -241,3 +134,12 @@ options = options || {} | ||
} | ||
this.send('create', args, responseTypes.CONFIRMATION, callback) | ||
this._process('create', filterName, args, responseTypes.CONFIRMATION, function (error, data) { | ||
// First, run the callback. | ||
if (callback) { | ||
callback.call(callback, error, data) | ||
} | ||
// Then, clear the filter queue if we have one. | ||
self._clearFilterQueue(filterName) | ||
}) | ||
} | ||
@@ -249,3 +151,3 @@ | ||
* If no prefix is specified, all filters are returned. | ||
* | ||
* | ||
* The data passed back to the callback will be an array of BloomFilter objects. | ||
@@ -257,7 +159,6 @@ * | ||
BloomClient.prototype.list = function (prefix, callback) { | ||
var args = prefix ? [prefix] : [] | ||
this.send('list', args, responseTypes.FILTER_LIST, callback) | ||
var args = prefix ? [prefix] : [] | ||
this._process('list', null, args, responseTypes.FILTER_LIST, callback) | ||
} | ||
/** | ||
@@ -272,3 +173,3 @@ * Drops the specified filter. | ||
BloomClient.prototype.drop = function (filterName, callback) { | ||
this.send('drop', [filterName], responseTypes.CONFIRMATION, callback) | ||
this._process('drop', filterName, [filterName], responseTypes.DROP_CONFIRMATION, callback) | ||
} | ||
@@ -278,3 +179,3 @@ | ||
* Closes a filter. | ||
* | ||
* | ||
* @param {string} filterName | ||
@@ -284,3 +185,3 @@ * @param {Function} callback | ||
BloomClient.prototype.close = function (filterName, callback) { | ||
this.send('close', [filterName], responseTypes.CONFIRMATION, callback) | ||
this._process('close', filterName, [filterName], responseTypes.CONFIRMATION, callback) | ||
} | ||
@@ -290,3 +191,3 @@ | ||
* Clears a filter. | ||
* | ||
* | ||
* @param {string} filterName | ||
@@ -296,3 +197,3 @@ * @param {Function} callback | ||
BloomClient.prototype.clear = function (filterName, callback) { | ||
this.send('clear', [filterName], responseTypes.CONFIRMATION, callback) | ||
this._process('clear', filterName, [filterName], responseTypes.CONFIRMATION, callback) | ||
} | ||
@@ -311,3 +212,3 @@ | ||
BloomClient.prototype.check = function (filterName, key, callback) { | ||
this.send('check', [filterName, key], responseTypes.BOOL, callback) | ||
this._handle(this._buildCheckCommand(filterName, key, callback)) | ||
} | ||
@@ -326,4 +227,3 @@ | ||
BloomClient.prototype.multi = function (filterName, keys, callback) { | ||
keys.unshift(filterName) | ||
this.send('multi', keys, responseTypes.BOOL_LIST, callback) | ||
this._handle(this._buildMultiCommand(filterName, keys, callback)) | ||
} | ||
@@ -334,3 +234,3 @@ | ||
* | ||
* The data passed back to the callback will be true if the key was newly set, | ||
* The data passed back to the callback will be true if the key was newly set, | ||
* or false if it was already in the filter. The latter is not considered an error. | ||
@@ -343,3 +243,3 @@ * | ||
BloomClient.prototype.set = function (filterName, key, callback) { | ||
this.send('set', [filterName, key], responseTypes.BOOL, callback) | ||
this._handle(this._buildSetCommand(filterName, key, callback)) | ||
} | ||
@@ -359,4 +259,3 @@ | ||
BloomClient.prototype.bulk = function (filterName, keys, callback) { | ||
keys.unshift(filterName) | ||
this.send('bulk', keys, responseTypes.BOOL_LIST, callback) | ||
this._handle(this._buildBulkCommand(filterName, keys, callback)) | ||
} | ||
@@ -368,3 +267,3 @@ | ||
* The data passed back to the callback will be a single BloomFilter object. | ||
* | ||
* | ||
* @param {string} filterName | ||
@@ -374,3 +273,3 @@ * @param {Function} callback | ||
BloomClient.prototype.info = function (filterName, callback) { | ||
this.send('info', [filterName], responseTypes.INFO, callback) | ||
this._process('info', filterName, [filterName], responseTypes.INFO, callback) | ||
} | ||
@@ -388,6 +287,399 @@ | ||
BloomClient.prototype.flush = function (filterName, callback) { | ||
var args = filterName ? [filterName] : [] | ||
this.send('flush', args, responseTypes.CONFIRMATION, callback) | ||
var args = filterName ? [filterName] : [] | ||
this._process('flush', filterName, args, responseTypes.CONFIRMATION, callback) | ||
} | ||
// 'Safe' Commands | ||
BloomClient.prototype._buildCheckCommand = function (filterName, key, callback) { | ||
return this._buildCommand('check', filterName, [filterName, key], responseTypes.BOOL, callback) | ||
} | ||
BloomClient.prototype._buildMultiCommand = function (filterName, keys, callback) { | ||
var args = keys.slice(0) | ||
args.unshift(filterName) | ||
return this._buildCommand('multi', filterName, args, responseTypes.BOOL_LIST, callback) | ||
} | ||
BloomClient.prototype._buildSetCommand = function (filterName, key, callback) { | ||
return this._buildCommand('set', filterName, [filterName, key], responseTypes.BOOL, callback) | ||
} | ||
BloomClient.prototype._buildBulkCommand = function (filterName, keys, callback) { | ||
var args = keys.slice(0) | ||
args.unshift(filterName) | ||
return this._buildCommand('bulk', filterName, args, responseTypes.BOOL_LIST, callback) | ||
} | ||
/** | ||
* Safe versions of standard functions. | ||
* They appear on the prototype as setSafe, checkSafe, bulkSafe etc. | ||
* | ||
* @see _makeSafe() | ||
*/ | ||
var _safeCommands = ['set', 'check', 'bulk', 'multi'] | ||
for (var i = 0, l = _safeCommands.length; i < l; i++) { | ||
var commandName = _safeCommands[i] | ||
BloomClient.prototype[commandName + 'Safe'] = _makeSafe(commandName) | ||
} | ||
// Extended Commands | ||
/** | ||
* Alias for bulk, for ease of remembering. | ||
* | ||
* Bulk sets many items. | ||
* | ||
* @see BloomClient.prototype.bulk | ||
*/ | ||
BloomClient.prototype.bulkSet = BloomClient.prototype.bulk | ||
BloomClient.prototype.bulkSetSafe = BloomClient.prototype.bulkSafe | ||
/** | ||
* Alias for multi, for ease of remembering. | ||
* | ||
* Multi checks many items. | ||
* | ||
* @see BloomClient.prototype.multi | ||
*/ | ||
BloomClient.prototype.multiCheck = BloomClient.prototype.multi | ||
BloomClient.prototype.multiCheckSafe = BloomClient.prototype.multiSafe | ||
// Private Methods | ||
/** | ||
* Fires when the parser is able to send back a complete response from the server. | ||
* | ||
* Because operations are performed in the order they are received, we can safely | ||
* unshift a command off the queue and use it to match the response to the callback | ||
* that is waiting for it. | ||
*/ | ||
BloomClient.prototype._onReadable = function () { | ||
var response | ||
while (response = this.responseParser.read()) { | ||
var command = this.commandQueue.shift(), | ||
error = null, | ||
data = null | ||
if (this.options.debug) { | ||
_timer(command.started, 'Response received for: ' + command.filterName + ' ' + command.arguments[0]) | ||
} | ||
try { | ||
switch (command.responseType) { | ||
case responseTypes.BOOL: | ||
data = ResponseParser.parseBool(response) | ||
break | ||
case responseTypes.BOOL_LIST: | ||
data = ResponseParser.parseBoolList(response, command.arguments.slice(2)) | ||
break | ||
case responseTypes.FILTER_LIST: | ||
data = ResponseParser.parseFilterList(response) | ||
break | ||
case responseTypes.CONFIRMATION: | ||
data = ResponseParser.parseConfirmation(response) | ||
break | ||
case responseTypes.DROP_CONFIRMATION: | ||
data = ResponseParser.parseDropConfirmation(response) | ||
break | ||
case responseTypes.INFO: | ||
data = ResponseParser.parseInfo(response, command.filterName) | ||
break | ||
default: | ||
throw new Error('Unknown response type: ' + command.responseType) | ||
break | ||
} | ||
} catch (err) { | ||
error = command.error || err | ||
} | ||
// Callbacks are optional. | ||
if (command.callback) { | ||
command.callback(error, data) | ||
} | ||
} | ||
} | ||
/** | ||
* Fires when the underlying stream connects. | ||
* | ||
* TODO(jamie) Support queuing of commands before ready, and then flush them here. | ||
*/ | ||
BloomClient.prototype._onConnect = function () { | ||
if (this.options.debug) { | ||
console.log('Connected to ' + this.options.host + ':' + this.options.port) | ||
} | ||
this.emit('connected') | ||
this._drain() | ||
} | ||
/** | ||
* Fires when there is an error on the underlying stream. | ||
*/ | ||
BloomClient.prototype._onError = function (msg) { | ||
var message = 'Connection failed to ' + this.options.host + ':' + this.options.port + ' (' + msg + ')' | ||
if (this.options.debug) { | ||
console.warn(message) | ||
} | ||
this.connected = false | ||
this.emit('error', new Error(message)) | ||
this._connectionClosed('error') | ||
} | ||
/** | ||
* Fires when a connection is closed, either through error, or naturally. | ||
* | ||
* TODO(jamie) Support reconnects, and flushing of queue for natural closure. | ||
* | ||
* @param {string} reason | ||
*/ | ||
BloomClient.prototype._connectionClosed = function (reason) { | ||
if (this.options.debug) { | ||
console.warn('Connection closed (' + reason + ')') | ||
} | ||
} | ||
/** | ||
* Convenience function to build and handle a command. | ||
* | ||
* @param {string} commandName | ||
* @param {string} filterName | ||
* @param {Array} args | ||
* @param {string} responseType one of ResponseParser.responseTypes | ||
* @param {Function} callback | ||
*/ | ||
BloomClient.prototype._process = function (commandName, filterName, args, responseType, callback) { | ||
this._handle(this._buildCommand(commandName, filterName, args, responseType, callback)) | ||
} | ||
/** | ||
* Prepares a command from the supplied arguments. | ||
* | ||
* @param {string} commandName | ||
* @param {string} filterName | ||
* @param {Array} args | ||
* @param {string} responseType one of ResponseParser.responseTypes | ||
* @param {Function} callback | ||
*/ | ||
BloomClient.prototype._buildCommand = function (commandName, filterName, args, responseType, callback) { | ||
args = args || [] | ||
args.unshift(commandName) | ||
return { | ||
filterName: filterName, | ||
arguments: args, | ||
responseType: responseType, | ||
callback: callback | ||
} | ||
} | ||
/** | ||
* Prepares a command to be sent. If the stream is ready to receive a command, | ||
* sends it immediately, otherwise queues it up to be sent when the stream is ready. | ||
* | ||
* @param {string} commandName | ||
* @param {Array} args | ||
* @param {string} responseType one of ResponseParser.responseTypes | ||
* @param {Function} callback | ||
*/ | ||
BloomClient.prototype._handle = function (command, clearing) { | ||
var commandName = command.arguments[0] | ||
var filterName = command.filterName | ||
if (filterName && this.filterQueues[filterName] && ('create' !== commandName) && !clearing) { | ||
// There are other commands outstanding for this filter, so hold this one until they are processed. | ||
if (this.options.debug) { | ||
console.log("Holding command in filter sub-queue:", commandName, filterName) | ||
} | ||
this.filterQueues[filterName].push(command) | ||
return | ||
} | ||
if (this.ready) { | ||
if (this.options.debug) { | ||
console.log("Processing:", commandName) | ||
} | ||
this._send(command) | ||
} else { | ||
if (this.options.debug) { | ||
console.log("Buffering command:", commandName) | ||
} | ||
this.offlineQueue.push(command) | ||
} | ||
} | ||
/** | ||
* Attempts to send a command to bloomd. If the command was sent, pushes it | ||
* onto the command queue for processing when the response arrives. | ||
* | ||
* Returns a boolean indicating sent status. | ||
* | ||
* @param {Object} command | ||
* @return {boolean} | ||
*/ | ||
BloomClient.prototype._send = function (command) { | ||
var line = command.arguments.join(' ') + '\n' | ||
var processedEntirely = this.stream.write(line) | ||
if (this.options.debug) { | ||
console.log("Sent:", command.arguments[0]) | ||
command.started = process.hrtime() | ||
} | ||
this.commandsSent++ | ||
this.commandQueue.push(command) | ||
if (!processedEntirely) { | ||
if (this.options.debug) { | ||
console.log("Waiting after full buffer:", command.arguments[0]) | ||
} | ||
this.ready = false | ||
} | ||
return processedEntirely | ||
} | ||
/** | ||
* Processes the offline command queue. | ||
* | ||
* Marks the client as ready when there is nothing left in the queue. | ||
*/ | ||
BloomClient.prototype._drain = function () { | ||
while (this.offlineQueue.length) { | ||
var command = this.offlineQueue.shift() | ||
if (this.options.debug) { | ||
console.log("Sending buffered command:", command.arguments[0]) | ||
} | ||
if (!this._send(command)) { | ||
// Buffer was filled from this command. Wait some more. | ||
return | ||
} | ||
} | ||
this.ready = true | ||
this.emit('ready') | ||
} | ||
/** | ||
* Queues for processing all those commands which were held due to | ||
* a 'safe' method being invoked. | ||
* | ||
* @param {string} filterName | ||
*/ | ||
BloomClient.prototype._clearFilterQueue = function (filterName) { | ||
var filterQueue = this.filterQueues[filterName] | ||
if (!filterQueue) { | ||
return | ||
} | ||
if (this.options.debug) { | ||
console.log('Clearing filter queue:', filterName) | ||
} | ||
while (filterQueue.length) { | ||
this._handle(filterQueue.shift(), true) | ||
} | ||
delete this.filterQueues[filterName] | ||
} | ||
// Helper Functions | ||
/** | ||
* Returns a function which is a 'safe' version of the command with the supplied name. That is, if | ||
* the filter doesn't exist when the command is run, attempts to automatically create | ||
* the filter and then re-run the command, transparently to the client. | ||
* | ||
* If there is an error in the creation step, the callback will receive the filter creation | ||
* failure, not the original 'filter not found', to help track down why the creation | ||
* would be failing. | ||
* | ||
* @param {string} command | ||
* @return {Function} | ||
*/ | ||
function _makeSafe(commandName) { | ||
var commandBuilder = BloomClient.prototype['_build' + commandName[0].toUpperCase() + commandName.slice(1) + 'Command'] | ||
return function() { | ||
// This is a function like setSafe() | ||
var self = this | ||
var args = Array.prototype.slice.call(arguments, 0) | ||
var filterName = args[0] | ||
var createOptions = {} | ||
// Suppports optional createOptions as a final parameter. | ||
var callback | ||
if (args[args.length - 1] instanceof Function) { | ||
callback = args.pop() | ||
} else { | ||
createOptions = args.pop() | ||
callback = args.pop() | ||
} | ||
// Create a separate copy of these arguments, so they don't get munged by later commands | ||
// which modify them. | ||
var originalArgs = args.slice(0) | ||
originalArgs.push(callback) | ||
args.push(function (originalError, originalData) { | ||
// This is the callback which catches the response to the original command | ||
// (e.g. safe, check, bulk, multi etc.) | ||
if (originalError && ('Filter does not exist' === originalError.message)) { | ||
// Try to create the filter. The create method will clear the queue when it completes. | ||
self.create(filterName, createOptions, function (createError, createData) { | ||
// This is the callback which catches the response to the create command. | ||
// In it, we tell it to run the command which triggered this creation. | ||
var command = commandBuilder.apply(self, originalArgs) | ||
// If the creation fails, the triggering action will also fail. | ||
// Store the creation error so we can give useful feedback for why the triggering | ||
// action wasn't successful, despite it being 'safe'. | ||
if (createError) { | ||
command.error = createError | ||
} | ||
self._handle(command, true) | ||
}) | ||
} else { | ||
// The filter exists, so run the original callback. | ||
callback.call(callback, originalError, originalData) | ||
self._clearFilterQueue(filterName) | ||
} | ||
}) | ||
this._handle(commandBuilder.apply(self, args)) | ||
// Create a queue for this filter, so that subsequent commands to this filter are | ||
// buffered until it is created. | ||
if (!this.filterQueues[filterName]) { | ||
this.filterQueues[filterName] = [] | ||
} | ||
} | ||
} | ||
/** | ||
* Helper function to time performance in ms. | ||
* | ||
* @param {Array} since A previous call to process.hrtime() | ||
* @param {string} message an optional message | ||
* @return {number} | ||
*/ | ||
function _timer(since, message) { | ||
var interval = process.hrtime(since) | ||
var elapsed = (interval[0] * 1000) + (interval[1] / 1000000) | ||
message = message ? message + ': ' : '' | ||
console.log(message + elapsed.toFixed(3) + 'ms') | ||
return elapsed | ||
} | ||
// Exports | ||
@@ -406,5 +698,6 @@ | ||
exports.timer = _timer | ||
exports.print = function (error, data) { | ||
console.log(data) | ||
} | ||
var stream = require('stream'), | ||
util = require('util') | ||
util = require('util') | ||
@@ -7,16 +7,16 @@ /** | ||
*/ | ||
function BloomFilter() { | ||
this.capacity = null | ||
this.checks = null | ||
this.checkHits = null | ||
this.checkMisses = null | ||
this.name = null | ||
this.pageIns = null | ||
this.pageOuts = null | ||
this.probability = null | ||
this.sets = null | ||
this.setHits = null | ||
this.setMisses = null | ||
this.size = null | ||
this.storage = null | ||
function BloomFilter() { | ||
this.capacity = null | ||
this.checks = null | ||
this.checkHits = null | ||
this.checkMisses = null | ||
this.name = null | ||
this.pageIns = null | ||
this.pageOuts = null | ||
this.probability = null | ||
this.sets = null | ||
this.setHits = null | ||
this.setMisses = null | ||
this.size = null | ||
this.storage = null | ||
} | ||
@@ -32,6 +32,7 @@ | ||
function ResponseParser(client) { | ||
this.client = client | ||
this.lines = [] | ||
this.lineData = '' | ||
stream.Transform.call(this, {objectMode: true}) | ||
this.client = client | ||
this.lines = [] | ||
this.lineData = '' | ||
this.blockLines = 1 | ||
stream.Transform.call(this, {objectMode: true}) | ||
} | ||
@@ -44,11 +45,12 @@ util.inherits(ResponseParser, stream.Transform); | ||
ResponseParser.responseTypes = { | ||
BOOL: 'bool', | ||
BOOL_LIST: 'boolList', | ||
CONFIRMATION: 'confirmation', | ||
FILTER_LIST: 'filterList', | ||
INFO: 'info' | ||
BOOL: 'bool', | ||
BOOL_LIST: 'boolList', | ||
CONFIRMATION: 'confirmation', | ||
DROP_CONFIRMATION: 'dropConfirmation', | ||
FILTER_LIST: 'filterList', | ||
INFO: 'info' | ||
} | ||
/** | ||
* Given a chunk of data, appends onto previously received data, | ||
* Given a chunk of data, appends onto previously received data, | ||
* decomposes it into lines and parses those lines into either single | ||
@@ -63,37 +65,42 @@ * or multi-line responses that can be used to populate JS types. | ||
// Add the chunk to the line buffer | ||
this.lineData += chunk.toString() | ||
var lines = this.lineData.split(/\r\n|\r|\n/g); | ||
// Add the chunk to the line buffer | ||
this.lineData += chunk.toString() | ||
var lines = this.lineData.split(/\r\n|\r|\n/g); | ||
// If the chunk finishes on a newline, the final line | ||
// will be empty, otherwise it will be a partially completed | ||
// line. Either way, we don't want to process it. | ||
this.lineData = lines.pop() | ||
for (var i = 0, l = lines.length; i < l; i++) { | ||
this.lines.push(lines[i]) | ||
} | ||
// If the chunk finishes on a newline, the final line | ||
// will be empty, otherwise it will be a partially completed | ||
// line. Either way, we don't want to process it. | ||
this.lineData = lines.pop() | ||
parseLines: | ||
while (this.lines.length) { | ||
if ('START' === this.lines[0]) { | ||
for (var i = 1, l = this.lines.length; i < l; i++) { | ||
if ('END' === this.lines[i]) { | ||
// Got a full list. Push it and continue parsing. | ||
this.push(this.lines.splice(0, i + 1).slice(1, -1)) | ||
// Goto might be considered harmful, but this is a continue ;) | ||
continue parseLines | ||
} | ||
} | ||
// We had an incomplete list, so we have to wait until we get more data. | ||
break | ||
} else { | ||
this.push(this.lines.shift()) | ||
} | ||
} | ||
done() | ||
for (var i = 0, l = lines.length; i < l; i++) { | ||
this.lines.push(lines[i]) | ||
} | ||
parseLines: | ||
while (this.lines.length) { | ||
if ('START' === this.lines[0]) { | ||
for (var i = this.blockLines, l = this.lines.length; i < l; i++) { | ||
if ('END' === this.lines[i]) { | ||
// Got a full list. Push it and continue parsing. | ||
this.push(this.lines.splice(0, i + 1).slice(1, -1)) | ||
// Reset the block count for the next block. | ||
this.blockLines = 1 | ||
// Goto might be considered harmful, but this is a continue ;) | ||
continue parseLines | ||
} | ||
} | ||
// We had an incomplete list, so we have to wait until we get more data. | ||
// Remember which line we got to, so we don't have to start from 1 again. | ||
this.blockLines = i | ||
break | ||
} else { | ||
this.push(this.lines.shift()) | ||
} | ||
} | ||
done() | ||
} | ||
@@ -108,11 +115,11 @@ | ||
* @return {bool} | ||
*/ | ||
*/ | ||
ResponseParser.parseBool = function (data) { | ||
if ('Yes' === data) { | ||
return true | ||
} else if ('No' === data) { | ||
return false | ||
} else { | ||
throw new Error(data) | ||
} | ||
if ('Yes' === data) { | ||
return true | ||
} else if ('No' === data) { | ||
return false | ||
} else { | ||
throw new Error(data) | ||
} | ||
} | ||
@@ -127,17 +134,17 @@ | ||
* @return {bool} | ||
*/ | ||
*/ | ||
ResponseParser.parseBoolList = function (data, keys) { | ||
var values = data.split(' '), | ||
results = {} | ||
var values = data.split(' '), | ||
results = {} | ||
try { | ||
for (var i = 0, l = values.length; i < l; i++) { | ||
results[keys[i]] = ResponseParser.parseBool(values[i]) | ||
} | ||
} catch (err) { | ||
// If there was an error parsing a bool, make the entire line available for debugging. | ||
throw new Error(data) | ||
} | ||
return results | ||
try { | ||
for (var i = 0, l = values.length; i < l; i++) { | ||
results[keys[i]] = ResponseParser.parseBool(values[i]) | ||
} | ||
} catch (err) { | ||
// If there was an error parsing a bool, make the entire line available for debugging. | ||
throw new Error(data) | ||
} | ||
return results | ||
} | ||
@@ -150,12 +157,28 @@ | ||
* @return {bool} | ||
*/ | ||
*/ | ||
ResponseParser.parseConfirmation = function (data) { | ||
if ('Done' === data) { | ||
return true | ||
} else { | ||
throw new Error(data) | ||
} | ||
if ('Done' === data) { | ||
return true | ||
} else { | ||
throw new Error(data) | ||
} | ||
} | ||
/** | ||
* Parses a Done response from bloomd into a boolean, following a drop command. | ||
* | ||
* For drop commands, we don't care if the filter existed or not. | ||
* | ||
* @param {string} data | ||
* @return {bool} | ||
*/ | ||
ResponseParser.parseDropConfirmation = function (data) { | ||
if ('Done' === data || 'Filter does not exist' === data) { | ||
return true | ||
} else { | ||
throw new Error(data) | ||
} | ||
} | ||
/** | ||
* Parses a list of filter definitions into an array of BloomFilter objects. | ||
@@ -165,17 +188,17 @@ * | ||
* @return {Array} | ||
*/ | ||
*/ | ||
ResponseParser.parseFilterList = function (data) { | ||
if (!Array.isArray(data)) { | ||
throw new Error(data) | ||
} | ||
return data.map(function(item) { | ||
var definition = item.split(' ') | ||
var filter = new BloomFilter() | ||
filter.name = definition[0] | ||
filter.probability = definition[1] | ||
filter.storage = definition[2] | ||
filter.capacity = definition[3] | ||
filter.size = definition[4] | ||
return filter | ||
}) | ||
if (!Array.isArray(data)) { | ||
throw new Error(data) | ||
} | ||
return data.map(function(item) { | ||
var definition = item.split(' ') | ||
var filter = new BloomFilter() | ||
filter.name = definition[0] | ||
filter.probability = definition[1] | ||
filter.storage = definition[2] | ||
filter.capacity = definition[3] | ||
filter.size = definition[4] | ||
return filter | ||
}) | ||
} | ||
@@ -188,14 +211,14 @@ | ||
* @return {BloomFilter} | ||
*/ | ||
*/ | ||
ResponseParser.parseInfo = function (data, name) { | ||
if (!Array.isArray(data)) { | ||
throw new Error(data) | ||
} | ||
var filter = new BloomFilter() | ||
for (var i = 0, l = data.length; i < l; i++) { | ||
var definition = data[i].split(' ') | ||
filter[definition[0].replace(/_([a-z])/g, function (g) { return g[1].toUpperCase() })] = definition[1] | ||
} | ||
filter.name = name | ||
return filter | ||
if (!Array.isArray(data)) { | ||
throw new Error(data) | ||
} | ||
var filter = new BloomFilter() | ||
for (var i = 0, l = data.length; i < l; i++) { | ||
var definition = data[i].split(' ') | ||
filter[definition[0].replace(/_([a-z])/g, function (g) { return g[1].toUpperCase() })] = definition[1] | ||
} | ||
filter.name = name | ||
return filter | ||
} | ||
@@ -202,0 +225,0 @@ |
{ | ||
"name": "bloomd" | ||
, "description": "NodeJS Driver for BloomD" | ||
, "version": "0.1.1" | ||
, "version": "0.2.0" | ||
, "homepage": "https://github.com/obvious/node-bloomd" | ||
@@ -6,0 +6,0 @@ , "authors": [ |
105
README.md
@@ -10,4 +10,7 @@ node-bloomd | ||
* Complete support for all Bloomd's commands. | ||
* Fast performance: insertion of 250k items in around 500ms on a 2010 MBP. | ||
* Fast performance: insertion of 235k items in ~600ms on a 2010 MBP, over localhost. | ||
* Familiar interface, similar to node-redis | ||
* A number of useful extensions over and above bloomd's default behaviour: | ||
- [set|bulk|check|multi|info]Safe() commands to automatically create a filter if it doesn't exist when running a filter-specific command. | ||
- Squashing non-existent filter errors on drop. | ||
@@ -31,27 +34,86 @@ Install | ||
```js | ||
var bloomd = require('./index') | ||
client = bloomd.createClient() | ||
client.on('error', function (err) { | ||
console.log('Error:' + err) | ||
}) | ||
client.list(null, bloomd.print) | ||
client.create('newFilter', bloomd.print) | ||
client.info('newFilter', bloomd.print) | ||
client.check('newFilter', 'monkey', bloomd.print) | ||
client.set('newFilter', 'monkey', bloomd.print) | ||
client.check('newFilter', 'monkey', bloomd.print) | ||
client.bulk('newFilter', ['monkey', 'magic', 'muppet'], bloomd.print) | ||
client.multi('newFilter', ['monkey', 'magic', 'muppet'], bloomd.print) | ||
client.info('newFilter', bloomd.print) | ||
client.drop('newFilter', bloomd.print) | ||
client.dispose() | ||
var bloomd = require('./index'), | ||
client = bloomd.createClient() | ||
client.on('error', function (err) { | ||
console.log('Error:' + err) | ||
}) | ||
client.list(null, bloomd.print) | ||
client.create('newFilter', bloomd.print) | ||
client.info('newFilter', bloomd.print) | ||
client.check('newFilter', 'monkey', bloomd.print) | ||
client.set('newFilter', 'monkey', bloomd.print) | ||
client.check('newFilter', 'monkey', bloomd.print) | ||
client.bulk('newFilter', ['monkey', 'magic', 'muppet'], bloomd.print) | ||
client.multi('newFilter', ['monkey', 'magic', 'muppet'], bloomd.print) | ||
client.info('newFilter', bloomd.print) | ||
client.drop('newFilter', bloomd.print) | ||
client.dispose() | ||
``` | ||
Memorable Commands | ||
------------------ | ||
Pop quiz: Bulk and Multi - which is used for batch checking, and which is used for batch setting? I | ||
can never remember either. node-bloomd helps out by providing two methods to make it explicit: | ||
```multiCheck()``` and ```bulkSet()```. Use them. The maintainers of your code will thank you. | ||
'Safe' Commands | ||
--------------- | ||
Typically, when issuing a ```set```, ```check```, ```bulk```, or ```multi``` command, | ||
bloomd will respond with "Filter does not exist" if the filter has not been created. node-bloomd | ||
provides 'safe' versions of these commands which auto-create the filter in this situation. These | ||
are ```setSafe()```, ```checkSafe()```, ```bulkSafe()```, and ```multiSafe()```. | ||
The method signatures of these are the same as the non-safe equivalent, with the addition of an optional | ||
createOptions parameter, which can be used to control the configuration of the filter that might be created. | ||
There is overhead to co-ordinating all this (see below), so if you are sure that a filter exists, | ||
you should use the non-safe version of the command. | ||
Subsequent commands issued to the same filter are guaranteed to happen after both the creation command | ||
and the safe command that triggered the creation, even if the filter didn't previously exist. For example: | ||
```js | ||
var bloomd = require('./index'), | ||
client = bloomd.createClient() | ||
client.bulkSafe('nonExistent', ['a', 'b', 'c', 'd'], function(error, data) { | ||
console.log('First, we created and bulk set some values') | ||
}, { | ||
prob: 0.01, | ||
capacity: 50000 | ||
}) | ||
client.check('nonExistent', 'a', function (error, data) { | ||
console.log('This will run second, and will be true') | ||
}) | ||
``` | ||
In order to do this, when a safe command is issued, subsequent commands on the same filter are held | ||
until we have attempted to create the filter and process the original safe command. | ||
This requires the use of a per-filter sub-queue, which is then processed when both the create command | ||
and the originating command has completed. While not a huge overhead, it is certainly slower than just | ||
the non-safe version of the command. | ||
In order of speed, from fastest to slowest: | ||
* set(). | ||
* setSafe(), where the filter already exists. | ||
* setSafe() on a non-existent filter. | ||
Note that a safe command can still fail if the create method fails. Typically, this happens due to bad | ||
creation parameters, such as too low a capacity being chosen. To aid with debugging, in this instance, | ||
the error passed to the safe command's callback will be the reason that the filter creation failed, not | ||
the reason that the safe command failed (which would be, in all cases "Filter does not exist"). Any | ||
subsequent commands that were also queued will still fail with "Filter does not exist". | ||
Finally, 'safe' is a terrible designation, and I welcome suggestions for a better name. | ||
Still To Do | ||
----------- | ||
* Offline command Buffering for dropped connections and early requests. | ||
* Partial list caching, to avoid re-checking. | ||
* Retry and reconnect support. | ||
@@ -62,2 +124,3 @@ * More Error checking. | ||
* Better documentation. | ||
* Auto-retry of filter creation when failing due to the filter having recently been dropped. | ||
@@ -64,0 +127,0 @@ Contributions |
var bloom = require('../index'), | ||
fs = require('fs'), | ||
assert = require('assert') | ||
fs = require('fs'), | ||
assert = require('assert') | ||
/** | ||
* Helper function to time performance in ms. | ||
* | ||
* @param {Array} since A previous call to process.hrtime() | ||
* @param {string} message an optional message | ||
* @return {number} | ||
*/ | ||
function elapsedTime(since, message) { | ||
var interval = process.hrtime(since) | ||
var elapsed = (interval[0] * 1000) + (interval[1] / 1000000) | ||
message = message ? message + ': ' : '' | ||
console.log(message + elapsed.toFixed(3) + "ms") | ||
return elapsed | ||
} | ||
// Delay in ms that we should wait after doing a drop command before issuing | ||
// a create command to the same filter name, due to bloomd's limitations. | ||
// If tests are failing, try increasing this. | ||
var DROP_THEN_CREATE_DELAY_MS = 200 | ||
@@ -28,39 +18,248 @@ /** | ||
/** | ||
* Test insertion of 235k items, into a filter initially sized for 20k, forcing multiple resizes. | ||
/** | ||
* Tests that calling setSafe on a filter actually calls the original | ||
* callback if the filter already exists. | ||
*/ | ||
exports.setAndCreateTestFilterExists = function (test) { | ||
var filterName = 'set_and_create_already_exists' | ||
var bloomClient = bloom.createClient() | ||
var called = false | ||
// Create a filter. | ||
bloomClient.create(filterName, {}, function (error, data) { | ||
test.equals(data, true, 'Failed to create filter') | ||
}) | ||
bloomClient.setSafe(filterName, 'monkey', function(error, data) { | ||
test.equals(data, true) | ||
called = true | ||
}) | ||
bloomClient.check(filterName, 'monkey', function(error, data) { | ||
test.equals(data, true) | ||
}) | ||
bloomClient.drop(filterName, function() { | ||
bloomClient.dispose() | ||
test.equals(called, true, 'The original callback was not called') | ||
test.done() | ||
}) | ||
} | ||
/** | ||
* Tests the setting of a key on a filter that doesn't exist, that the | ||
* filter is automatically created, that the key is set, and that the | ||
* original callback is still called. | ||
*/ | ||
exports.setAndCreateTestFilterDoesNotExist = function (test) { | ||
var filterName = 'set_and_create_non_existent' | ||
var bloomClient = bloom.createClient() | ||
bloomClient.drop(filterName, function (error, data) { | ||
// This is a bit janky, as we have to drop the bloomClient to | ||
// ensure that the filter doesn't exist beforehand, | ||
// but bloomd has a period of time where you can't create immediately | ||
// after a drop where creation will fail, so we wait for a bit. | ||
// Non-deterministic, but probably ok. | ||
setTimeout(function() { | ||
bloomClient.setSafe(filterName, 'monkey', function(error, data) { | ||
test.equals(data, true) | ||
// The cleanup drop command also has to come in this callback, | ||
// otherwise it will be in the queue before the create and retry | ||
// commands that are generated by the non-existence of the filter. | ||
// This is why promises are good. | ||
bloomClient.drop(filterName, function() { | ||
// Drop, Set, Create, Set, Drop | ||
test.equals(5, bloomClient.commandsSent) | ||
bloomClient.dispose() | ||
test.done() | ||
}) | ||
}) | ||
}, DROP_THEN_CREATE_DELAY_MS) | ||
}) | ||
} | ||
/** | ||
* Tests the checking of a key after we call setSafe. When a filter doesn't exist, setSafe | ||
* automatically creates it, then sets the value. If a client issues a check command after | ||
* a setSafe command, it should return true, even if the filter didn't exist. | ||
*/ | ||
exports.checkAfterSetSafe = function (test) { | ||
var filterName = 'check_after_set_safe' | ||
var bloomClient = bloom.createClient() | ||
bloomClient.drop(filterName, function (error, data) { | ||
// Also janky. | ||
setTimeout(function() { | ||
bloomClient.setSafe(filterName, 'monkey', function(error, data) { | ||
test.equals(data, true) | ||
}) | ||
bloomClient.check(filterName, 'monkey', function(error, data) { | ||
test.equals(data, true, 'Check after safe set was not true') | ||
}) | ||
bloomClient.drop(filterName, function() { | ||
// Drop, Set, Create, Set, Check, Drop | ||
test.equals(6, bloomClient.commandsSent) | ||
bloomClient.dispose() | ||
test.done() | ||
}) | ||
}, DROP_THEN_CREATE_DELAY_MS) | ||
}) | ||
} | ||
/** | ||
* Tests interleaved consecutive safe and non-safe commands, to ensure they run in the specified order. | ||
*/ | ||
exports.interleavedSafeNonSafe = function (test) { | ||
var filterName = 'interleaved_safe_non_safe' | ||
var bloomClient = bloom.createClient() | ||
bloomClient.drop(filterName, function (error, data) { | ||
// Also janky. | ||
setTimeout(function() { | ||
bloomClient.multiSafe(filterName, ['monkey'], function(error, data) { | ||
test.deepEqual(data, { | ||
monkey: false | ||
}) | ||
}) | ||
bloomClient.bulk(filterName, ['monkey', 'magic', 'muppet'], function(error, data) { | ||
test.deepEqual(data, { | ||
monkey: true, | ||
magic: true, | ||
muppet: true | ||
}) | ||
}) | ||
bloomClient.multiSafe(filterName, ['magic', 'muppet', 'moonbeam'], function(error, data) { | ||
test.deepEqual(data, { | ||
magic: true, | ||
muppet: true, | ||
moonbeam: false | ||
}) | ||
}) | ||
bloomClient.bulkSafe(filterName, ['monkey', 'moonbeam'], function(error, data) { | ||
test.deepEqual(data, { | ||
monkey: false, | ||
moonbeam: true | ||
}) | ||
}) | ||
bloomClient.multi(filterName, ['monkey', 'magic', 'muppet', 'moonbeam'], function(error, data) { | ||
test.deepEqual(data, { | ||
monkey: true, | ||
magic: true, | ||
muppet: true, | ||
moonbeam: true | ||
}) | ||
}) | ||
bloomClient.drop(filterName, function() { | ||
// Drop, Multi, Create, Multi, Bulk, Multi, Bulk, Multi, Drop | ||
test.equals(9, bloomClient.commandsSent) | ||
bloomClient.dispose() | ||
test.done() | ||
}) | ||
}, DROP_THEN_CREATE_DELAY_MS) | ||
}) | ||
} | ||
/** | ||
* Tests the setting of a key on a filter that doesn't exist, in the situation | ||
* where the creation of the filter fails for some reason. | ||
* | ||
* We can simulate this by using a sufficiently low desired capacity. | ||
*/ | ||
exports.setAndCreateTestFilterCannotBeCreated = function (test) { | ||
var filterName = 'set_and_create_error_creating' | ||
var bloomClient = bloom.createClient() | ||
bloomClient.drop(filterName, function (error, data) { | ||
// Same as prior test, also janky. | ||
setTimeout(function() { | ||
bloomClient.setSafe(filterName, 'monkey', function(error, data) { | ||
test.equals(error.message, 'Client Error: Bad arguments') | ||
bloomClient.drop(filterName, function() { | ||
bloomClient.dispose() | ||
test.done() | ||
}) | ||
}, { | ||
// A low capacity will cause a creation failure due to bad arguments. | ||
capacity: 100 | ||
}) | ||
}, DROP_THEN_CREATE_DELAY_MS) | ||
}) | ||
} | ||
/** | ||
* Test insertion and subsequent retrieval of 235k items, into a filter initially | ||
* sized for 20k, forcing multiple resizes. We chain the multi on the callback of | ||
* the bulk in order to get accurate timings. | ||
*/ | ||
exports.bulkPerformance = function (test) { | ||
var filterName = 'bulk_performance' | ||
var bloomClient = bloom.createClient() | ||
var filterName = 'bulk_performance' | ||
var bloomClient = bloom.createClient() | ||
bloomClient.on('ready', function() { | ||
// Read in a dictionary. | ||
fs.readFile('./test/words.txt', 'utf8', function (error, data) { | ||
// Read in a dictionary. | ||
fs.readFile('./test/words.txt', 'utf8', function (error, data) { | ||
// Create a filter. | ||
bloomClient.create(filterName, { | ||
prob: 0.01, | ||
capacity: 20000 | ||
}) | ||
// Create a filter. | ||
bloomClient.create(filterName, { | ||
prob: 0.0001, | ||
capacity: 20000 | ||
}) | ||
// Insert lots of data. | ||
var lines = data.split('\n') | ||
var start = process.hrtime() | ||
bloomClient.bulk(filterName, lines, function (error, data) { | ||
var elapsed = elapsedTime(start, "Inserted " + lines.length + " items") | ||
// Totally arbitrary, but should be plenty of room on even | ||
// a moderate laptop with a single worker. | ||
test.ok(elapsed < 1000, "Bulk insert considered too slow") | ||
}) | ||
bloomClient.drop(filterName, function() { | ||
bloomClient.dispose() | ||
test.done() | ||
}) | ||
}); | ||
}) | ||
// The last line will be blank. | ||
var lines = data.split('\n') | ||
lines.pop() | ||
var bulkExpected = {} | ||
var multiExpected = {} | ||
for (var i = 0, l = lines.length; i < l; i++) { | ||
var line = lines[i] | ||
bulkExpected[line] = true | ||
multiExpected[line] = true | ||
} | ||
// There are a couple of collisions at this probability. | ||
bulkExpected['choledochotomy'] = false | ||
bulkExpected['ensnarer'] = false | ||
bulkExpected['renunciatory'] = false | ||
bulkExpected['unboundless'] = false | ||
// Insert lots of data. | ||
var bulkStart = process.hrtime() | ||
bloomClient.bulk(filterName, lines, function (error, data) { | ||
var elapsed = bloom.timer(bulkStart, 'Inserted ' + lines.length + ' items') | ||
// Totally arbitrary, but should be plenty of room on even a moderate laptop. | ||
test.ok(elapsed < 1000, 'Bulk set considered too slow') | ||
test.deepEqual(bulkExpected, data) | ||
var multiStart = process.hrtime() | ||
bloomClient.multi(filterName, lines, function (error, data) { | ||
var elapsed = bloom.timer(multiStart, 'Retrieved ' + lines.length + ' items') | ||
// Totally arbitrary, but should be plenty of room on even a moderate laptop. | ||
test.ok(elapsed < 1000, 'Multi check considered too slow') | ||
test.deepEqual(multiExpected, data) | ||
}) | ||
bloomClient.drop(filterName, function() { | ||
bloomClient.dispose() | ||
test.done() | ||
}) | ||
}) | ||
}) | ||
} | ||
/** | ||
/** | ||
* Test repeated calls to info, to force data buffering that will | ||
@@ -70,29 +269,27 @@ * result in incomplete lists, and give the stream transformation code a workout. | ||
exports.consecutiveInfo = function (test) { | ||
var filterName = 'consecutive_info' | ||
var bloomClient = bloom.createClient() | ||
var filterName = 'consecutive_info' | ||
var bloomClient = bloom.createClient() | ||
bloomClient.on('ready', function() { | ||
// Create a filter. | ||
bloomClient.create(filterName, { | ||
prob: 0.01, | ||
capacity: 20000 | ||
}) | ||
// Create a filter. | ||
bloomClient.create(filterName, { | ||
prob: 0.01, | ||
capacity: 20000 | ||
}) | ||
var iterations = 1000 | ||
var responseCount = 0 | ||
function callback(error, data) { | ||
test.equals(data.name, filterName, "Didn't get back the same thing we put it") | ||
responseCount++ | ||
} | ||
var iterations = 1000 | ||
var responseCount = 0 | ||
function callback(error, data) { | ||
test.equals(data.name, filterName, 'Did not get back the same thing we put it') | ||
responseCount++ | ||
} | ||
for (var i = 0; i < iterations; i++) { | ||
bloomClient.info(filterName, callback) | ||
} | ||
bloomClient.drop(filterName, function() { | ||
bloomClient.dispose() | ||
test.equals(iterations, responseCount, "Didn't iterate the correct number of times") | ||
test.done() | ||
}) | ||
}) | ||
for (var i = 0; i < iterations; i++) { | ||
bloomClient.info(filterName, callback) | ||
} | ||
bloomClient.drop(filterName, function() { | ||
bloomClient.dispose() | ||
test.equals(iterations, responseCount, 'Did not iterate the correct number of times') | ||
test.done() | ||
}) | ||
} | ||
@@ -106,67 +303,65 @@ | ||
exports.canonicalTest = function (test) { | ||
var filterName = 'canonical_test' | ||
var bloomClient = bloom.createClient() | ||
bloomClient.on('ready', function() { | ||
// Create a filter. | ||
bloomClient.list(null, function(error, data) { | ||
test.equals(data.length, 0, "We had a list, somehow.") | ||
}) | ||
bloomClient.create(filterName, {}, function (error, data) { | ||
test.equals(data, true, "Failed to create filter") | ||
}) | ||
var filterName = 'canonical_test' | ||
var bloomClient = bloom.createClient() | ||
bloomClient.check(filterName, 'zipzab', function (error, data) { | ||
test.deepEqual(data, false, "zipzab should not exist") | ||
}) | ||
// Create a filter. | ||
bloomClient.list(null, function(error, data) { | ||
test.equals(data.length, 0, 'We had a list, somehow.') | ||
}) | ||
bloomClient.set(filterName, 'zipzab', function (error, data) { | ||
test.equals(data, true, "zipzab should have been created") | ||
}) | ||
bloomClient.check(filterName, 'zipzab', function (error, data) { | ||
test.equals(data, true, "zipzab should now exist") | ||
}) | ||
bloomClient.create(filterName, {}, function (error, data) { | ||
test.equals(data, true, 'Failed to create filter') | ||
}) | ||
bloomClient.multi(filterName, ['zipzab', 'blah', 'boo'], function (error, data) { | ||
test.deepEqual(data, { | ||
zipzab: true, | ||
blah: false, | ||
boo: false | ||
}) | ||
}) | ||
bloomClient.check(filterName, 'zipzab', function (error, data) { | ||
test.deepEqual(data, false, 'zipzab should not exist') | ||
}) | ||
bloomClient.bulk(filterName, ['zipzab', 'blah', 'boo'], function (error, data) { | ||
test.deepEqual(data, { | ||
zipzab: false, | ||
blah: true, | ||
boo: true | ||
}) | ||
}) | ||
bloomClient.set(filterName, 'zipzab', function (error, data) { | ||
test.equals(data, true, 'zipzab should have been created') | ||
}) | ||
bloomClient.multi(filterName, ['zipzab', 'blah', 'boo'], function (error, data) { | ||
test.deepEqual(data, { | ||
zipzab: true, | ||
blah: true, | ||
boo: true | ||
}) | ||
}) | ||
bloomClient.check(filterName, 'zipzab', function (error, data) { | ||
test.equals(data, true, 'zipzab should now exist') | ||
}) | ||
bloomClient.list(null, function(error, data) { | ||
test.equals(data.length, 1, "We had a list, somehow.") | ||
test.equals(data[0].name, filterName) | ||
}) | ||
bloomClient.drop(filterName, function (error, data) { | ||
test.equals(data, true, "Failed to drop filter") | ||
}) | ||
bloomClient.list(null, function(error, data) { | ||
bloomClient.dispose() | ||
test.equals(data.length, 0, "We had a list, somehow.") | ||
test.done() | ||
}) | ||
}) | ||
bloomClient.multi(filterName, ['zipzab', 'blah', 'boo'], function (error, data) { | ||
test.deepEqual(data, { | ||
zipzab: true, | ||
blah: false, | ||
boo: false | ||
}) | ||
}) | ||
} | ||
bloomClient.bulk(filterName, ['zipzab', 'blah', 'boo'], function (error, data) { | ||
test.deepEqual(data, { | ||
zipzab: false, | ||
blah: true, | ||
boo: true | ||
}) | ||
}) | ||
bloomClient.multi(filterName, ['zipzab', 'blah', 'boo'], function (error, data) { | ||
test.deepEqual(data, { | ||
zipzab: true, | ||
blah: true, | ||
boo: true | ||
}) | ||
}) | ||
bloomClient.list(null, function(error, data) { | ||
test.equals(data.length, 1, 'We had a list, somehow.') | ||
test.equals(data[0].name, filterName) | ||
}) | ||
bloomClient.drop(filterName, function (error, data) { | ||
test.equals(data, true, 'Failed to drop filter') | ||
}) | ||
bloomClient.list(null, function(error, data) { | ||
bloomClient.dispose() | ||
test.equals(data.length, 0, 'We had a list, somehow.') | ||
test.done() | ||
}) | ||
} |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
2545142
1094
146