Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bloomd

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bloomd - npm Package Compare versions

Comparing version 0.1.1 to 0.2.0

655

index.js
var net = require('net'),
events = require('events'),
stream = require('stream'),
ResponseParser = require('./lib/responseParser').ResponseParser,
responseTypes = ResponseParser.responseTypes
util = require('util'),
defaultPort = 8673,
defaultHost = '127.0.0.1'
events = require('events'),
stream = require('stream'),
ResponseParser = require('./lib/responseParser').ResponseParser,
responseTypes = ResponseParser.responseTypes
util = require('util'),
defaultPort = 8673,
defaultHost = '127.0.0.1'
/**
* A client for BloomD (https://github.com/armon/bloomd)
*
*
* Requires Node 0.10's stream transformations.
*
* Opens a single stream and continually writes data to it, offloading the
* Opens a single stream and continually writes data to it, offloading the
* resulting data to a parser and then applying a queued callback to the response.
* This relies on the fact that queries to bloomd are answered in the
* order that they were made.
* This relies on the fact that queries to bloomd are answered in the
* order that they were made.
*

@@ -25,10 +25,10 @@ * The documentation consistently states that checks will return true if the key

* More info here: http://en.wikipedia.org/wiki/Bloom_filter
*
*
* TODO(jamie)
* + Stream Write Buffering
* + Command Buffering for quick calls after client creation.
* + Partial block rememberance, to avoid rework.
* + Retry and reconnect support
* + Handle list with prefix after safe commands.
* + Safe creation after dropping
* + More Error checking
* ? StreamNoDelay configuration
* ? Rename setSafe command to set, and set to setUnchecked
*

@@ -43,152 +43,44 @@ * Options are:

function BloomClient(stream, options) {
this.stream = stream
this.options = options
this.responseParser = new ResponseParser(this)
this.ready = false
this.commandQueue = []
this.offlineQueue = []
this.stream = stream
this.options = options
this.responseParser = new ResponseParser(this)
this.ready = false
this.commandQueue = []
this.offlineQueue = []
this.commandsSent = 0
this.filterQueues = {}
var self = this
var self = this
stream.on('connect', function() {
self.onConnect()
})
stream.on('error', function(message) {
self.onError(message.message)
})
stream.on('close', function() {
self.connectionClosed('close')
})
stream.on('connect', function() {
self._onConnect()
})
stream.on('end', function() {
self.connectionClosed('end')
})
stream.on('error', function(message) {
self._onError(message.message)
})
this.stream.pipe(this.responseParser)
this.responseParser.on('readable', function() {
self.onReadable()
})
events.EventEmitter.call(this)
}
util.inherits(BloomClient, events.EventEmitter)
stream.on('close', function() {
self._connectionClosed('close')
})
/**
* Fires when the parser is able to send back a complete response from the server.
*
* Because operations are performed in the order they are received, we can safely
* unshift a command off the queue and use it to match the response to the callback
* that is waiting for it.
*/
BloomClient.prototype.onReadable = function () {
var response
while (response = this.responseParser.read()) {
var command = this.commandQueue.shift(),
error = null,
data = null
try {
switch (command.responseType) {
case responseTypes.BOOL:
data = ResponseParser.parseBool(response)
break
case responseTypes.BOOL_LIST:
data = ResponseParser.parseBoolList(response, command.arguments.slice(2))
break
case responseTypes.FILTER_LIST:
data = ResponseParser.parseFilterList(response)
break
case responseTypes.CONFIRMATION:
data = ResponseParser.parseConfirmation(response)
break
case responseTypes.INFO:
// command.arguments[1] is the name of the filter, passed back for completeness.
data = ResponseParser.parseInfo(response, command.arguments[1])
break
default:
throw new Error('Unknown response type: ' + command.responseType)
break
}
} catch (err) {
error = err
}
// Callbacks are optional.
if (command.callback) {
command.callback(error, data)
}
}
}
stream.on('end', function() {
self._connectionClosed('end')
})
/**
* Fires when the underlying stream connects.
*
* TODO(jamie) Support queuing of commands before ready, and then flush them here.
*/
BloomClient.prototype.onConnect = function () {
if (this.options.debug) {
console.log('Connected to ' + this.options.host + ':' + this.options.port)
}
stream.on('drain', function () {
self._drain()
})
this.ready = true
this.emit('ready')
}
this.stream.pipe(this.responseParser)
/**
* Fires when there is an error on the underlying stream.
*/
BloomClient.prototype.onError = function (msg) {
var message = 'Connection failed to ' + this.options.host + ':' + this.options.port + ' (' + msg + ')'
if (this.options.debug) {
console.warn(message)
}
this.responseParser.on('readable', function() {
self._onReadable()
})
this.connected = false
this.emit('error', new Error(message))
this.connectionClosed('error')
}
events.EventEmitter.call(this)
/**
* Fires when a connection is closed, either through error, or naturally.
*
* TODO(jamie) Support reconnects, and flushing of queue for natural closure.
*
* @param {string} reason
*/
BloomClient.prototype.connectionClosed = function (reason) {
if (this.options.debug) {
console.warn('Connection closed (' + reason + ')')
}
}
util.inherits(BloomClient, events.EventEmitter)
/**
* Sends a command to the server.
*
* @param {string} command
* @param {Array} args
* @param {string} responseType one of ResponseParser.responseTypes
* @param {Function} callback
*/
BloomClient.prototype.send = function(command, args, responseType, callback) {
args = args || []
args.unshift(command)
var line = args.join(' ') + '\n'
this.commandQueue.push({
arguments: args,
responseType: responseType,
callback: callback
})
this.stream.write(line)
}
// API

@@ -202,5 +94,5 @@

BloomClient.prototype.isReady = function() {
return this.ready
return this.ready
}
/**

@@ -213,3 +105,3 @@ * Closes the connection to BloomD

// Commands
// Standard Bloomd Commands

@@ -219,11 +111,11 @@ /**

*
* A number of options are available. Stated defaults are for those of
* A number of options are available. Stated defaults are for those of
* a bloomd server with default configuration. Check your server for
* specifics.
*
* prob [0.001] - The desired probability of false positives.
* prob [0.0001] - The desired probability of false positives.
* capacity [100000] - The required initial capacity of the filter.
* in_memory [0] - Whether the filter should exist only in memory, with no disk backing.
*
* The data passed back to the callback will be true on success, null otherwise.
* The data passed back to the callback will be true on success, null otherwise.
*

@@ -235,2 +127,3 @@ * @param {string} filterName

BloomClient.prototype.create = function (filterName, options, callback) {
var self = this
var args = [filterName]

@@ -241,3 +134,12 @@ options = options || {}

}
this.send('create', args, responseTypes.CONFIRMATION, callback)
this._process('create', filterName, args, responseTypes.CONFIRMATION, function (error, data) {
// First, run the callback.
if (callback) {
callback.call(callback, error, data)
}
// Then, clear the filter queue if we have one.
self._clearFilterQueue(filterName)
})
}

@@ -249,3 +151,3 @@

* If no prefix is specified, all filters are returned.
*
*
* The data passed back to the callback will be an array of BloomFilter objects.

@@ -257,7 +159,6 @@ *

BloomClient.prototype.list = function (prefix, callback) {
var args = prefix ? [prefix] : []
this.send('list', args, responseTypes.FILTER_LIST, callback)
var args = prefix ? [prefix] : []
this._process('list', null, args, responseTypes.FILTER_LIST, callback)
}
/**

@@ -272,3 +173,3 @@ * Drops the specified filter.

BloomClient.prototype.drop = function (filterName, callback) {
this.send('drop', [filterName], responseTypes.CONFIRMATION, callback)
this._process('drop', filterName, [filterName], responseTypes.DROP_CONFIRMATION, callback)
}

@@ -278,3 +179,3 @@

* Closes a filter.
*
*
* @param {string} filterName

@@ -284,3 +185,3 @@ * @param {Function} callback

BloomClient.prototype.close = function (filterName, callback) {
this.send('close', [filterName], responseTypes.CONFIRMATION, callback)
this._process('close', filterName, [filterName], responseTypes.CONFIRMATION, callback)
}

@@ -290,3 +191,3 @@

* Clears a filter.
*
*
* @param {string} filterName

@@ -296,3 +197,3 @@ * @param {Function} callback

BloomClient.prototype.clear = function (filterName, callback) {
this.send('clear', [filterName], responseTypes.CONFIRMATION, callback)
this._process('clear', filterName, [filterName], responseTypes.CONFIRMATION, callback)
}

@@ -311,3 +212,3 @@

BloomClient.prototype.check = function (filterName, key, callback) {
this.send('check', [filterName, key], responseTypes.BOOL, callback)
this._handle(this._buildCheckCommand(filterName, key, callback))
}

@@ -326,4 +227,3 @@

BloomClient.prototype.multi = function (filterName, keys, callback) {
keys.unshift(filterName)
this.send('multi', keys, responseTypes.BOOL_LIST, callback)
this._handle(this._buildMultiCommand(filterName, keys, callback))
}

@@ -334,3 +234,3 @@

*
* The data passed back to the callback will be true if the key was newly set,
* The data passed back to the callback will be true if the key was newly set,
* or false if it was already in the filter. The latter is not considered an error.

@@ -343,3 +243,3 @@ *

BloomClient.prototype.set = function (filterName, key, callback) {
this.send('set', [filterName, key], responseTypes.BOOL, callback)
this._handle(this._buildSetCommand(filterName, key, callback))
}

@@ -359,4 +259,3 @@

BloomClient.prototype.bulk = function (filterName, keys, callback) {
keys.unshift(filterName)
this.send('bulk', keys, responseTypes.BOOL_LIST, callback)
this._handle(this._buildBulkCommand(filterName, keys, callback))
}

@@ -368,3 +267,3 @@

* The data passed back to the callback will be a single BloomFilter object.
*
*
* @param {string} filterName

@@ -374,3 +273,3 @@ * @param {Function} callback

BloomClient.prototype.info = function (filterName, callback) {
this.send('info', [filterName], responseTypes.INFO, callback)
this._process('info', filterName, [filterName], responseTypes.INFO, callback)
}

@@ -388,6 +287,399 @@

BloomClient.prototype.flush = function (filterName, callback) {
var args = filterName ? [filterName] : []
this.send('flush', args, responseTypes.CONFIRMATION, callback)
var args = filterName ? [filterName] : []
this._process('flush', filterName, args, responseTypes.CONFIRMATION, callback)
}
// 'Safe' Commands
BloomClient.prototype._buildCheckCommand = function (filterName, key, callback) {
return this._buildCommand('check', filterName, [filterName, key], responseTypes.BOOL, callback)
}
BloomClient.prototype._buildMultiCommand = function (filterName, keys, callback) {
var args = keys.slice(0)
args.unshift(filterName)
return this._buildCommand('multi', filterName, args, responseTypes.BOOL_LIST, callback)
}
BloomClient.prototype._buildSetCommand = function (filterName, key, callback) {
return this._buildCommand('set', filterName, [filterName, key], responseTypes.BOOL, callback)
}
BloomClient.prototype._buildBulkCommand = function (filterName, keys, callback) {
var args = keys.slice(0)
args.unshift(filterName)
return this._buildCommand('bulk', filterName, args, responseTypes.BOOL_LIST, callback)
}
/**
* Safe versions of standard functions.
* They appear on the prototype as setSafe, checkSafe, bulkSafe etc.
*
* @see _makeSafe()
*/
var _safeCommands = ['set', 'check', 'bulk', 'multi']
for (var i = 0, l = _safeCommands.length; i < l; i++) {
var commandName = _safeCommands[i]
BloomClient.prototype[commandName + 'Safe'] = _makeSafe(commandName)
}
// Extended Commands
/**
* Alias for bulk, for ease of remembering.
*
* Bulk sets many items.
*
* @see BloomClient.prototype.bulk
*/
BloomClient.prototype.bulkSet = BloomClient.prototype.bulk
BloomClient.prototype.bulkSetSafe = BloomClient.prototype.bulkSafe
/**
* Alias for multi, for ease of remembering.
*
* Multi checks many items.
*
* @see BloomClient.prototype.multi
*/
BloomClient.prototype.multiCheck = BloomClient.prototype.multi
BloomClient.prototype.multiCheckSafe = BloomClient.prototype.multiSafe
// Private Methods
/**
* Fires when the parser is able to send back a complete response from the server.
*
* Because operations are performed in the order they are received, we can safely
* unshift a command off the queue and use it to match the response to the callback
* that is waiting for it.
*/
BloomClient.prototype._onReadable = function () {
var response
while (response = this.responseParser.read()) {
var command = this.commandQueue.shift(),
error = null,
data = null
if (this.options.debug) {
_timer(command.started, 'Response received for: ' + command.filterName + ' ' + command.arguments[0])
}
try {
switch (command.responseType) {
case responseTypes.BOOL:
data = ResponseParser.parseBool(response)
break
case responseTypes.BOOL_LIST:
data = ResponseParser.parseBoolList(response, command.arguments.slice(2))
break
case responseTypes.FILTER_LIST:
data = ResponseParser.parseFilterList(response)
break
case responseTypes.CONFIRMATION:
data = ResponseParser.parseConfirmation(response)
break
case responseTypes.DROP_CONFIRMATION:
data = ResponseParser.parseDropConfirmation(response)
break
case responseTypes.INFO:
data = ResponseParser.parseInfo(response, command.filterName)
break
default:
throw new Error('Unknown response type: ' + command.responseType)
break
}
} catch (err) {
error = command.error || err
}
// Callbacks are optional.
if (command.callback) {
command.callback(error, data)
}
}
}
/**
* Fires when the underlying stream connects.
*
* TODO(jamie) Support queuing of commands before ready, and then flush them here.
*/
BloomClient.prototype._onConnect = function () {
if (this.options.debug) {
console.log('Connected to ' + this.options.host + ':' + this.options.port)
}
this.emit('connected')
this._drain()
}
/**
* Fires when there is an error on the underlying stream.
*/
BloomClient.prototype._onError = function (msg) {
var message = 'Connection failed to ' + this.options.host + ':' + this.options.port + ' (' + msg + ')'
if (this.options.debug) {
console.warn(message)
}
this.connected = false
this.emit('error', new Error(message))
this._connectionClosed('error')
}
/**
* Fires when a connection is closed, either through error, or naturally.
*
* TODO(jamie) Support reconnects, and flushing of queue for natural closure.
*
* @param {string} reason
*/
BloomClient.prototype._connectionClosed = function (reason) {
if (this.options.debug) {
console.warn('Connection closed (' + reason + ')')
}
}
/**
* Convenience function to build and handle a command.
*
* @param {string} commandName
* @param {string} filterName
* @param {Array} args
* @param {string} responseType one of ResponseParser.responseTypes
* @param {Function} callback
*/
BloomClient.prototype._process = function (commandName, filterName, args, responseType, callback) {
this._handle(this._buildCommand(commandName, filterName, args, responseType, callback))
}
/**
* Prepares a command from the supplied arguments.
*
* @param {string} commandName
* @param {string} filterName
* @param {Array} args
* @param {string} responseType one of ResponseParser.responseTypes
* @param {Function} callback
*/
BloomClient.prototype._buildCommand = function (commandName, filterName, args, responseType, callback) {
args = args || []
args.unshift(commandName)
return {
filterName: filterName,
arguments: args,
responseType: responseType,
callback: callback
}
}
/**
* Prepares a command to be sent. If the stream is ready to receive a command,
* sends it immediately, otherwise queues it up to be sent when the stream is ready.
*
* @param {string} commandName
* @param {Array} args
* @param {string} responseType one of ResponseParser.responseTypes
* @param {Function} callback
*/
BloomClient.prototype._handle = function (command, clearing) {
var commandName = command.arguments[0]
var filterName = command.filterName
if (filterName && this.filterQueues[filterName] && ('create' !== commandName) && !clearing) {
// There are other commands outstanding for this filter, so hold this one until they are processed.
if (this.options.debug) {
console.log("Holding command in filter sub-queue:", commandName, filterName)
}
this.filterQueues[filterName].push(command)
return
}
if (this.ready) {
if (this.options.debug) {
console.log("Processing:", commandName)
}
this._send(command)
} else {
if (this.options.debug) {
console.log("Buffering command:", commandName)
}
this.offlineQueue.push(command)
}
}
/**
* Attempts to send a command to bloomd. If the command was sent, pushes it
* onto the command queue for processing when the response arrives.
*
* Returns a boolean indicating sent status.
*
* @param {Object} command
* @return {boolean}
*/
BloomClient.prototype._send = function (command) {
var line = command.arguments.join(' ') + '\n'
var processedEntirely = this.stream.write(line)
if (this.options.debug) {
console.log("Sent:", command.arguments[0])
command.started = process.hrtime()
}
this.commandsSent++
this.commandQueue.push(command)
if (!processedEntirely) {
if (this.options.debug) {
console.log("Waiting after full buffer:", command.arguments[0])
}
this.ready = false
}
return processedEntirely
}
/**
* Processes the offline command queue.
*
* Marks the client as ready when there is nothing left in the queue.
*/
BloomClient.prototype._drain = function () {
while (this.offlineQueue.length) {
var command = this.offlineQueue.shift()
if (this.options.debug) {
console.log("Sending buffered command:", command.arguments[0])
}
if (!this._send(command)) {
// Buffer was filled from this command. Wait some more.
return
}
}
this.ready = true
this.emit('ready')
}
/**
* Queues for processing all those commands which were held due to
* a 'safe' method being invoked.
*
* @param {string} filterName
*/
BloomClient.prototype._clearFilterQueue = function (filterName) {
var filterQueue = this.filterQueues[filterName]
if (!filterQueue) {
return
}
if (this.options.debug) {
console.log('Clearing filter queue:', filterName)
}
while (filterQueue.length) {
this._handle(filterQueue.shift(), true)
}
delete this.filterQueues[filterName]
}
// Helper Functions
/**
* Returns a function which is a 'safe' version of the command with the supplied name. That is, if
* the filter doesn't exist when the command is run, attempts to automatically create
* the filter and then re-run the command, transparently to the client.
*
* If there is an error in the creation step, the callback will receive the filter creation
* failure, not the original 'filter not found', to help track down why the creation
* would be failing.
*
* @param {string} command
* @return {Function}
*/
function _makeSafe(commandName) {
var commandBuilder = BloomClient.prototype['_build' + commandName[0].toUpperCase() + commandName.slice(1) + 'Command']
return function() {
// This is a function like setSafe()
var self = this
var args = Array.prototype.slice.call(arguments, 0)
var filterName = args[0]
var createOptions = {}
// Suppports optional createOptions as a final parameter.
var callback
if (args[args.length - 1] instanceof Function) {
callback = args.pop()
} else {
createOptions = args.pop()
callback = args.pop()
}
// Create a separate copy of these arguments, so they don't get munged by later commands
// which modify them.
var originalArgs = args.slice(0)
originalArgs.push(callback)
args.push(function (originalError, originalData) {
// This is the callback which catches the response to the original command
// (e.g. safe, check, bulk, multi etc.)
if (originalError && ('Filter does not exist' === originalError.message)) {
// Try to create the filter. The create method will clear the queue when it completes.
self.create(filterName, createOptions, function (createError, createData) {
// This is the callback which catches the response to the create command.
// In it, we tell it to run the command which triggered this creation.
var command = commandBuilder.apply(self, originalArgs)
// If the creation fails, the triggering action will also fail.
// Store the creation error so we can give useful feedback for why the triggering
// action wasn't successful, despite it being 'safe'.
if (createError) {
command.error = createError
}
self._handle(command, true)
})
} else {
// The filter exists, so run the original callback.
callback.call(callback, originalError, originalData)
self._clearFilterQueue(filterName)
}
})
this._handle(commandBuilder.apply(self, args))
// Create a queue for this filter, so that subsequent commands to this filter are
// buffered until it is created.
if (!this.filterQueues[filterName]) {
this.filterQueues[filterName] = []
}
}
}
/**
* Helper function to time performance in ms.
*
* @param {Array} since A previous call to process.hrtime()
* @param {string} message an optional message
* @return {number}
*/
function _timer(since, message) {
var interval = process.hrtime(since)
var elapsed = (interval[0] * 1000) + (interval[1] / 1000000)
message = message ? message + ': ' : ''
console.log(message + elapsed.toFixed(3) + 'ms')
return elapsed
}
// Exports

@@ -406,5 +698,6 @@

exports.timer = _timer
exports.print = function (error, data) {
console.log(data)
}
var stream = require('stream'),
util = require('util')
util = require('util')

@@ -7,16 +7,16 @@ /**

*/
function BloomFilter() {
this.capacity = null
this.checks = null
this.checkHits = null
this.checkMisses = null
this.name = null
this.pageIns = null
this.pageOuts = null
this.probability = null
this.sets = null
this.setHits = null
this.setMisses = null
this.size = null
this.storage = null
function BloomFilter() {
this.capacity = null
this.checks = null
this.checkHits = null
this.checkMisses = null
this.name = null
this.pageIns = null
this.pageOuts = null
this.probability = null
this.sets = null
this.setHits = null
this.setMisses = null
this.size = null
this.storage = null
}

@@ -32,6 +32,7 @@

function ResponseParser(client) {
this.client = client
this.lines = []
this.lineData = ''
stream.Transform.call(this, {objectMode: true})
this.client = client
this.lines = []
this.lineData = ''
this.blockLines = 1
stream.Transform.call(this, {objectMode: true})
}

@@ -44,11 +45,12 @@ util.inherits(ResponseParser, stream.Transform);

ResponseParser.responseTypes = {
BOOL: 'bool',
BOOL_LIST: 'boolList',
CONFIRMATION: 'confirmation',
FILTER_LIST: 'filterList',
INFO: 'info'
BOOL: 'bool',
BOOL_LIST: 'boolList',
CONFIRMATION: 'confirmation',
DROP_CONFIRMATION: 'dropConfirmation',
FILTER_LIST: 'filterList',
INFO: 'info'
}
/**
* Given a chunk of data, appends onto previously received data,
* Given a chunk of data, appends onto previously received data,
* decomposes it into lines and parses those lines into either single

@@ -63,37 +65,42 @@ * or multi-line responses that can be used to populate JS types.

// Add the chunk to the line buffer
this.lineData += chunk.toString()
var lines = this.lineData.split(/\r\n|\r|\n/g);
// Add the chunk to the line buffer
this.lineData += chunk.toString()
var lines = this.lineData.split(/\r\n|\r|\n/g);
// If the chunk finishes on a newline, the final line
// will be empty, otherwise it will be a partially completed
// line. Either way, we don't want to process it.
this.lineData = lines.pop()
for (var i = 0, l = lines.length; i < l; i++) {
this.lines.push(lines[i])
}
// If the chunk finishes on a newline, the final line
// will be empty, otherwise it will be a partially completed
// line. Either way, we don't want to process it.
this.lineData = lines.pop()
parseLines:
while (this.lines.length) {
if ('START' === this.lines[0]) {
for (var i = 1, l = this.lines.length; i < l; i++) {
if ('END' === this.lines[i]) {
// Got a full list. Push it and continue parsing.
this.push(this.lines.splice(0, i + 1).slice(1, -1))
// Goto might be considered harmful, but this is a continue ;)
continue parseLines
}
}
// We had an incomplete list, so we have to wait until we get more data.
break
} else {
this.push(this.lines.shift())
}
}
done()
for (var i = 0, l = lines.length; i < l; i++) {
this.lines.push(lines[i])
}
parseLines:
while (this.lines.length) {
if ('START' === this.lines[0]) {
for (var i = this.blockLines, l = this.lines.length; i < l; i++) {
if ('END' === this.lines[i]) {
// Got a full list. Push it and continue parsing.
this.push(this.lines.splice(0, i + 1).slice(1, -1))
// Reset the block count for the next block.
this.blockLines = 1
// Goto might be considered harmful, but this is a continue ;)
continue parseLines
}
}
// We had an incomplete list, so we have to wait until we get more data.
// Remember which line we got to, so we don't have to start from 1 again.
this.blockLines = i
break
} else {
this.push(this.lines.shift())
}
}
done()
}

@@ -108,11 +115,11 @@

* @return {bool}
*/
*/
ResponseParser.parseBool = function (data) {
if ('Yes' === data) {
return true
} else if ('No' === data) {
return false
} else {
throw new Error(data)
}
if ('Yes' === data) {
return true
} else if ('No' === data) {
return false
} else {
throw new Error(data)
}
}

@@ -127,17 +134,17 @@

* @return {bool}
*/
*/
ResponseParser.parseBoolList = function (data, keys) {
var values = data.split(' '),
results = {}
var values = data.split(' '),
results = {}
try {
for (var i = 0, l = values.length; i < l; i++) {
results[keys[i]] = ResponseParser.parseBool(values[i])
}
} catch (err) {
// If there was an error parsing a bool, make the entire line available for debugging.
throw new Error(data)
}
return results
try {
for (var i = 0, l = values.length; i < l; i++) {
results[keys[i]] = ResponseParser.parseBool(values[i])
}
} catch (err) {
// If there was an error parsing a bool, make the entire line available for debugging.
throw new Error(data)
}
return results
}

@@ -150,12 +157,28 @@

* @return {bool}
*/
*/
ResponseParser.parseConfirmation = function (data) {
if ('Done' === data) {
return true
} else {
throw new Error(data)
}
if ('Done' === data) {
return true
} else {
throw new Error(data)
}
}
/**
* Parses a Done response from bloomd into a boolean, following a drop command.
*
* For drop commands, we don't care if the filter existed or not.
*
* @param {string} data
* @return {bool}
*/
ResponseParser.parseDropConfirmation = function (data) {
if ('Done' === data || 'Filter does not exist' === data) {
return true
} else {
throw new Error(data)
}
}
/**
* Parses a list of filter definitions into an array of BloomFilter objects.

@@ -165,17 +188,17 @@ *

* @return {Array}
*/
*/
ResponseParser.parseFilterList = function (data) {
if (!Array.isArray(data)) {
throw new Error(data)
}
return data.map(function(item) {
var definition = item.split(' ')
var filter = new BloomFilter()
filter.name = definition[0]
filter.probability = definition[1]
filter.storage = definition[2]
filter.capacity = definition[3]
filter.size = definition[4]
return filter
})
if (!Array.isArray(data)) {
throw new Error(data)
}
return data.map(function(item) {
var definition = item.split(' ')
var filter = new BloomFilter()
filter.name = definition[0]
filter.probability = definition[1]
filter.storage = definition[2]
filter.capacity = definition[3]
filter.size = definition[4]
return filter
})
}

@@ -188,14 +211,14 @@

* @return {BloomFilter}
*/
*/
ResponseParser.parseInfo = function (data, name) {
if (!Array.isArray(data)) {
throw new Error(data)
}
var filter = new BloomFilter()
for (var i = 0, l = data.length; i < l; i++) {
var definition = data[i].split(' ')
filter[definition[0].replace(/_([a-z])/g, function (g) { return g[1].toUpperCase() })] = definition[1]
}
filter.name = name
return filter
if (!Array.isArray(data)) {
throw new Error(data)
}
var filter = new BloomFilter()
for (var i = 0, l = data.length; i < l; i++) {
var definition = data[i].split(' ')
filter[definition[0].replace(/_([a-z])/g, function (g) { return g[1].toUpperCase() })] = definition[1]
}
filter.name = name
return filter
}

@@ -202,0 +225,0 @@

{
"name": "bloomd"
, "description": "NodeJS Driver for BloomD"
, "version": "0.1.1"
, "version": "0.2.0"
, "homepage": "https://github.com/obvious/node-bloomd"

@@ -6,0 +6,0 @@ , "authors": [

@@ -10,4 +10,7 @@ node-bloomd

* Complete support for all Bloomd's commands.
* Fast performance: insertion of 250k items in around 500ms on a 2010 MBP.
* Fast performance: insertion of 235k items in ~600ms on a 2010 MBP, over localhost.
* Familiar interface, similar to node-redis
* A number of useful extensions over and above bloomd's default behaviour:
- [set|bulk|check|multi|info]Safe() commands to automatically create a filter if it doesn't exist when running a filter-specific command.
- Squashing non-existent filter errors on drop.

@@ -31,27 +34,86 @@ Install

```js
var bloomd = require('./index')
client = bloomd.createClient()
client.on('error', function (err) {
console.log('Error:' + err)
})
client.list(null, bloomd.print)
client.create('newFilter', bloomd.print)
client.info('newFilter', bloomd.print)
client.check('newFilter', 'monkey', bloomd.print)
client.set('newFilter', 'monkey', bloomd.print)
client.check('newFilter', 'monkey', bloomd.print)
client.bulk('newFilter', ['monkey', 'magic', 'muppet'], bloomd.print)
client.multi('newFilter', ['monkey', 'magic', 'muppet'], bloomd.print)
client.info('newFilter', bloomd.print)
client.drop('newFilter', bloomd.print)
client.dispose()
var bloomd = require('./index'),
client = bloomd.createClient()
client.on('error', function (err) {
console.log('Error:' + err)
})
client.list(null, bloomd.print)
client.create('newFilter', bloomd.print)
client.info('newFilter', bloomd.print)
client.check('newFilter', 'monkey', bloomd.print)
client.set('newFilter', 'monkey', bloomd.print)
client.check('newFilter', 'monkey', bloomd.print)
client.bulk('newFilter', ['monkey', 'magic', 'muppet'], bloomd.print)
client.multi('newFilter', ['monkey', 'magic', 'muppet'], bloomd.print)
client.info('newFilter', bloomd.print)
client.drop('newFilter', bloomd.print)
client.dispose()
```
Memorable Commands
------------------
Pop quiz: Bulk and Multi - which is used for batch checking, and which is used for batch setting? I
can never remember either. node-bloomd helps out by providing two methods to make it explicit:
```multiCheck()``` and ```bulkSet()```. Use them. The maintainers of your code will thank you.
'Safe' Commands
---------------
Typically, when issuing a ```set```, ```check```, ```bulk```, or ```multi``` command,
bloomd will respond with "Filter does not exist" if the filter has not been created. node-bloomd
provides 'safe' versions of these commands which auto-create the filter in this situation. These
are ```setSafe()```, ```checkSafe()```, ```bulkSafe()```, and ```multiSafe()```.
The method signatures of these are the same as the non-safe equivalent, with the addition of an optional
createOptions parameter, which can be used to control the configuration of the filter that might be created.
There is overhead to co-ordinating all this (see below), so if you are sure that a filter exists,
you should use the non-safe version of the command.
Subsequent commands issued to the same filter are guaranteed to happen after both the creation command
and the safe command that triggered the creation, even if the filter didn't previously exist. For example:
```js
var bloomd = require('./index'),
client = bloomd.createClient()
client.bulkSafe('nonExistent', ['a', 'b', 'c', 'd'], function(error, data) {
console.log('First, we created and bulk set some values')
}, {
prob: 0.01,
capacity: 50000
})
client.check('nonExistent', 'a', function (error, data) {
console.log('This will run second, and will be true')
})
```
In order to do this, when a safe command is issued, subsequent commands on the same filter are held
until we have attempted to create the filter and process the original safe command.
This requires the use of a per-filter sub-queue, which is then processed when both the create command
and the originating command has completed. While not a huge overhead, it is certainly slower than just
the non-safe version of the command.
In order of speed, from fastest to slowest:
* set().
* setSafe(), where the filter already exists.
* setSafe() on a non-existent filter.
Note that a safe command can still fail if the create method fails. Typically, this happens due to bad
creation parameters, such as too low a capacity being chosen. To aid with debugging, in this instance,
the error passed to the safe command's callback will be the reason that the filter creation failed, not
the reason that the safe command failed (which would be, in all cases "Filter does not exist"). Any
subsequent commands that were also queued will still fail with "Filter does not exist".
Finally, 'safe' is a terrible designation, and I welcome suggestions for a better name.
Still To Do
-----------
* Offline command Buffering for dropped connections and early requests.
* Partial list caching, to avoid re-checking.
* Retry and reconnect support.

@@ -62,2 +124,3 @@ * More Error checking.

* Better documentation.
* Auto-retry of filter creation when failing due to the filter having recently been dropped.

@@ -64,0 +127,0 @@ Contributions

var bloom = require('../index'),
fs = require('fs'),
assert = require('assert')
fs = require('fs'),
assert = require('assert')
/**
* Helper function to time performance in ms.
*
* @param {Array} since A previous call to process.hrtime()
* @param {string} message an optional message
* @return {number}
*/
function elapsedTime(since, message) {
var interval = process.hrtime(since)
var elapsed = (interval[0] * 1000) + (interval[1] / 1000000)
message = message ? message + ': ' : ''
console.log(message + elapsed.toFixed(3) + "ms")
return elapsed
}
// Delay in ms that we should wait after doing a drop command before issuing
// a create command to the same filter name, due to bloomd's limitations.
// If tests are failing, try increasing this.
var DROP_THEN_CREATE_DELAY_MS = 200

@@ -28,39 +18,248 @@ /**

/**
* Test insertion of 235k items, into a filter initially sized for 20k, forcing multiple resizes.
/**
* Tests that calling setSafe on a filter actually calls the original
* callback if the filter already exists.
*/
exports.setAndCreateTestFilterExists = function (test) {
var filterName = 'set_and_create_already_exists'
var bloomClient = bloom.createClient()
var called = false
// Create a filter.
bloomClient.create(filterName, {}, function (error, data) {
test.equals(data, true, 'Failed to create filter')
})
bloomClient.setSafe(filterName, 'monkey', function(error, data) {
test.equals(data, true)
called = true
})
bloomClient.check(filterName, 'monkey', function(error, data) {
test.equals(data, true)
})
bloomClient.drop(filterName, function() {
bloomClient.dispose()
test.equals(called, true, 'The original callback was not called')
test.done()
})
}
/**
* Tests the setting of a key on a filter that doesn't exist, that the
* filter is automatically created, that the key is set, and that the
* original callback is still called.
*/
exports.setAndCreateTestFilterDoesNotExist = function (test) {
var filterName = 'set_and_create_non_existent'
var bloomClient = bloom.createClient()
bloomClient.drop(filterName, function (error, data) {
// This is a bit janky, as we have to drop the bloomClient to
// ensure that the filter doesn't exist beforehand,
// but bloomd has a period of time where you can't create immediately
// after a drop where creation will fail, so we wait for a bit.
// Non-deterministic, but probably ok.
setTimeout(function() {
bloomClient.setSafe(filterName, 'monkey', function(error, data) {
test.equals(data, true)
// The cleanup drop command also has to come in this callback,
// otherwise it will be in the queue before the create and retry
// commands that are generated by the non-existence of the filter.
// This is why promises are good.
bloomClient.drop(filterName, function() {
// Drop, Set, Create, Set, Drop
test.equals(5, bloomClient.commandsSent)
bloomClient.dispose()
test.done()
})
})
}, DROP_THEN_CREATE_DELAY_MS)
})
}
/**
* Tests the checking of a key after we call setSafe. When a filter doesn't exist, setSafe
* automatically creates it, then sets the value. If a client issues a check command after
* a setSafe command, it should return true, even if the filter didn't exist.
*/
exports.checkAfterSetSafe = function (test) {
var filterName = 'check_after_set_safe'
var bloomClient = bloom.createClient()
bloomClient.drop(filterName, function (error, data) {
// Also janky.
setTimeout(function() {
bloomClient.setSafe(filterName, 'monkey', function(error, data) {
test.equals(data, true)
})
bloomClient.check(filterName, 'monkey', function(error, data) {
test.equals(data, true, 'Check after safe set was not true')
})
bloomClient.drop(filterName, function() {
// Drop, Set, Create, Set, Check, Drop
test.equals(6, bloomClient.commandsSent)
bloomClient.dispose()
test.done()
})
}, DROP_THEN_CREATE_DELAY_MS)
})
}
/**
* Tests interleaved consecutive safe and non-safe commands, to ensure they run in the specified order.
*/
exports.interleavedSafeNonSafe = function (test) {
var filterName = 'interleaved_safe_non_safe'
var bloomClient = bloom.createClient()
bloomClient.drop(filterName, function (error, data) {
// Also janky.
setTimeout(function() {
bloomClient.multiSafe(filterName, ['monkey'], function(error, data) {
test.deepEqual(data, {
monkey: false
})
})
bloomClient.bulk(filterName, ['monkey', 'magic', 'muppet'], function(error, data) {
test.deepEqual(data, {
monkey: true,
magic: true,
muppet: true
})
})
bloomClient.multiSafe(filterName, ['magic', 'muppet', 'moonbeam'], function(error, data) {
test.deepEqual(data, {
magic: true,
muppet: true,
moonbeam: false
})
})
bloomClient.bulkSafe(filterName, ['monkey', 'moonbeam'], function(error, data) {
test.deepEqual(data, {
monkey: false,
moonbeam: true
})
})
bloomClient.multi(filterName, ['monkey', 'magic', 'muppet', 'moonbeam'], function(error, data) {
test.deepEqual(data, {
monkey: true,
magic: true,
muppet: true,
moonbeam: true
})
})
bloomClient.drop(filterName, function() {
// Drop, Multi, Create, Multi, Bulk, Multi, Bulk, Multi, Drop
test.equals(9, bloomClient.commandsSent)
bloomClient.dispose()
test.done()
})
}, DROP_THEN_CREATE_DELAY_MS)
})
}
/**
* Tests the setting of a key on a filter that doesn't exist, in the situation
* where the creation of the filter fails for some reason.
*
* We can simulate this by using a sufficiently low desired capacity.
*/
exports.setAndCreateTestFilterCannotBeCreated = function (test) {
var filterName = 'set_and_create_error_creating'
var bloomClient = bloom.createClient()
bloomClient.drop(filterName, function (error, data) {
// Same as prior test, also janky.
setTimeout(function() {
bloomClient.setSafe(filterName, 'monkey', function(error, data) {
test.equals(error.message, 'Client Error: Bad arguments')
bloomClient.drop(filterName, function() {
bloomClient.dispose()
test.done()
})
}, {
// A low capacity will cause a creation failure due to bad arguments.
capacity: 100
})
}, DROP_THEN_CREATE_DELAY_MS)
})
}
/**
* Test insertion and subsequent retrieval of 235k items, into a filter initially
* sized for 20k, forcing multiple resizes. We chain the multi on the callback of
* the bulk in order to get accurate timings.
*/
exports.bulkPerformance = function (test) {
var filterName = 'bulk_performance'
var bloomClient = bloom.createClient()
var filterName = 'bulk_performance'
var bloomClient = bloom.createClient()
bloomClient.on('ready', function() {
// Read in a dictionary.
fs.readFile('./test/words.txt', 'utf8', function (error, data) {
// Read in a dictionary.
fs.readFile('./test/words.txt', 'utf8', function (error, data) {
// Create a filter.
bloomClient.create(filterName, {
prob: 0.01,
capacity: 20000
})
// Create a filter.
bloomClient.create(filterName, {
prob: 0.0001,
capacity: 20000
})
// Insert lots of data.
var lines = data.split('\n')
var start = process.hrtime()
bloomClient.bulk(filterName, lines, function (error, data) {
var elapsed = elapsedTime(start, "Inserted " + lines.length + " items")
// Totally arbitrary, but should be plenty of room on even
// a moderate laptop with a single worker.
test.ok(elapsed < 1000, "Bulk insert considered too slow")
})
bloomClient.drop(filterName, function() {
bloomClient.dispose()
test.done()
})
});
})
// The last line will be blank.
var lines = data.split('\n')
lines.pop()
var bulkExpected = {}
var multiExpected = {}
for (var i = 0, l = lines.length; i < l; i++) {
var line = lines[i]
bulkExpected[line] = true
multiExpected[line] = true
}
// There are a couple of collisions at this probability.
bulkExpected['choledochotomy'] = false
bulkExpected['ensnarer'] = false
bulkExpected['renunciatory'] = false
bulkExpected['unboundless'] = false
// Insert lots of data.
var bulkStart = process.hrtime()
bloomClient.bulk(filterName, lines, function (error, data) {
var elapsed = bloom.timer(bulkStart, 'Inserted ' + lines.length + ' items')
// Totally arbitrary, but should be plenty of room on even a moderate laptop.
test.ok(elapsed < 1000, 'Bulk set considered too slow')
test.deepEqual(bulkExpected, data)
var multiStart = process.hrtime()
bloomClient.multi(filterName, lines, function (error, data) {
var elapsed = bloom.timer(multiStart, 'Retrieved ' + lines.length + ' items')
// Totally arbitrary, but should be plenty of room on even a moderate laptop.
test.ok(elapsed < 1000, 'Multi check considered too slow')
test.deepEqual(multiExpected, data)
})
bloomClient.drop(filterName, function() {
bloomClient.dispose()
test.done()
})
})
})
}
/**
/**
* Test repeated calls to info, to force data buffering that will

@@ -70,29 +269,27 @@ * result in incomplete lists, and give the stream transformation code a workout.

exports.consecutiveInfo = function (test) {
var filterName = 'consecutive_info'
var bloomClient = bloom.createClient()
var filterName = 'consecutive_info'
var bloomClient = bloom.createClient()
bloomClient.on('ready', function() {
// Create a filter.
bloomClient.create(filterName, {
prob: 0.01,
capacity: 20000
})
// Create a filter.
bloomClient.create(filterName, {
prob: 0.01,
capacity: 20000
})
var iterations = 1000
var responseCount = 0
function callback(error, data) {
test.equals(data.name, filterName, "Didn't get back the same thing we put it")
responseCount++
}
var iterations = 1000
var responseCount = 0
function callback(error, data) {
test.equals(data.name, filterName, 'Did not get back the same thing we put it')
responseCount++
}
for (var i = 0; i < iterations; i++) {
bloomClient.info(filterName, callback)
}
bloomClient.drop(filterName, function() {
bloomClient.dispose()
test.equals(iterations, responseCount, "Didn't iterate the correct number of times")
test.done()
})
})
for (var i = 0; i < iterations; i++) {
bloomClient.info(filterName, callback)
}
bloomClient.drop(filterName, function() {
bloomClient.dispose()
test.equals(iterations, responseCount, 'Did not iterate the correct number of times')
test.done()
})
}

@@ -106,67 +303,65 @@

exports.canonicalTest = function (test) {
var filterName = 'canonical_test'
var bloomClient = bloom.createClient()
bloomClient.on('ready', function() {
// Create a filter.
bloomClient.list(null, function(error, data) {
test.equals(data.length, 0, "We had a list, somehow.")
})
bloomClient.create(filterName, {}, function (error, data) {
test.equals(data, true, "Failed to create filter")
})
var filterName = 'canonical_test'
var bloomClient = bloom.createClient()
bloomClient.check(filterName, 'zipzab', function (error, data) {
test.deepEqual(data, false, "zipzab should not exist")
})
// Create a filter.
bloomClient.list(null, function(error, data) {
test.equals(data.length, 0, 'We had a list, somehow.')
})
bloomClient.set(filterName, 'zipzab', function (error, data) {
test.equals(data, true, "zipzab should have been created")
})
bloomClient.check(filterName, 'zipzab', function (error, data) {
test.equals(data, true, "zipzab should now exist")
})
bloomClient.create(filterName, {}, function (error, data) {
test.equals(data, true, 'Failed to create filter')
})
bloomClient.multi(filterName, ['zipzab', 'blah', 'boo'], function (error, data) {
test.deepEqual(data, {
zipzab: true,
blah: false,
boo: false
})
})
bloomClient.check(filterName, 'zipzab', function (error, data) {
test.deepEqual(data, false, 'zipzab should not exist')
})
bloomClient.bulk(filterName, ['zipzab', 'blah', 'boo'], function (error, data) {
test.deepEqual(data, {
zipzab: false,
blah: true,
boo: true
})
})
bloomClient.set(filterName, 'zipzab', function (error, data) {
test.equals(data, true, 'zipzab should have been created')
})
bloomClient.multi(filterName, ['zipzab', 'blah', 'boo'], function (error, data) {
test.deepEqual(data, {
zipzab: true,
blah: true,
boo: true
})
})
bloomClient.check(filterName, 'zipzab', function (error, data) {
test.equals(data, true, 'zipzab should now exist')
})
bloomClient.list(null, function(error, data) {
test.equals(data.length, 1, "We had a list, somehow.")
test.equals(data[0].name, filterName)
})
bloomClient.drop(filterName, function (error, data) {
test.equals(data, true, "Failed to drop filter")
})
bloomClient.list(null, function(error, data) {
bloomClient.dispose()
test.equals(data.length, 0, "We had a list, somehow.")
test.done()
})
})
bloomClient.multi(filterName, ['zipzab', 'blah', 'boo'], function (error, data) {
test.deepEqual(data, {
zipzab: true,
blah: false,
boo: false
})
})
}
bloomClient.bulk(filterName, ['zipzab', 'blah', 'boo'], function (error, data) {
test.deepEqual(data, {
zipzab: false,
blah: true,
boo: true
})
})
bloomClient.multi(filterName, ['zipzab', 'blah', 'boo'], function (error, data) {
test.deepEqual(data, {
zipzab: true,
blah: true,
boo: true
})
})
bloomClient.list(null, function(error, data) {
test.equals(data.length, 1, 'We had a list, somehow.')
test.equals(data[0].name, filterName)
})
bloomClient.drop(filterName, function (error, data) {
test.equals(data, true, 'Failed to drop filter')
})
bloomClient.list(null, function(error, data) {
bloomClient.dispose()
test.equals(data.length, 0, 'We had a list, somehow.')
test.done()
})
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc