node-cassandra-cql
Advanced tools
Comparing version 0.4.1 to 0.4.2
@@ -15,3 +15,5 @@ var net = require('net'); | ||
//When the simultaneous requests has been reached, it determines the amount of milliseconds before retrying to get an available streamId | ||
maxRequestsRetry: 100 | ||
maxRequestsRetry: 100, | ||
//Connect timeout: time to wait when trying to connect to a host, | ||
connectTimeout: 2000 | ||
}; | ||
@@ -71,2 +73,7 @@ function Connection(options) { | ||
this.netClient.once('error', errorConnecting); | ||
this.netClient.once('timeout', function connectTimeout() { | ||
var err = new types.DriverError('Connection timeout'); | ||
errorConnecting(err, true); | ||
}); | ||
this.netClient.setTimeout(this.options.connectTimeout); | ||
@@ -77,2 +84,3 @@ this.netClient.connect(this.options.port, this.options.host, function connectCallback() { | ||
self.netClient.removeAllListeners('connect'); | ||
self.netClient.removeAllListeners('timeout'); | ||
@@ -236,3 +244,7 @@ self.sendStream(new writers.StartupWriter(self.options.version), null, function (err, response) { | ||
if (err) { | ||
err.isServerUnhealthy = true; | ||
if (!(err instanceof TypeError)) { | ||
//TypeError is raised when there is a serialization issue | ||
//If it is not a serialization issue is a socket issue | ||
err.isServerUnhealthy = true; | ||
} | ||
return callback(err); | ||
@@ -349,2 +361,2 @@ } | ||
exports.Connection = Connection; | ||
exports.Connection = Connection; |
@@ -6,2 +6,3 @@ var util = require('util'); | ||
var encoder = require('./encoder.js'); | ||
var types = require('./types.js'); | ||
@@ -268,3 +269,3 @@ var utils = require('./utils.js'); | ||
{ | ||
row[col.name] = types.typeEncoder.decode(bytes, col.type); | ||
row[col.name] = encoder.decode(bytes, col.type); | ||
bytes = null; | ||
@@ -271,0 +272,0 @@ } |
539
lib/types.js
var util = require('util'); | ||
var uuid = require('node-uuid'); | ||
var stream = require('stream'); | ||
@@ -94,41 +93,38 @@ var async = require('async'); | ||
*/ | ||
var queryParser = (function() { | ||
function parse(query, args) { | ||
if (!args || args.length === 0) { | ||
var queryParser = { | ||
/** | ||
* Replaced the query place holders with the stringified value | ||
* @param {String} query | ||
* @param {Array} params | ||
* @param {Function} stringifier | ||
*/ | ||
parse: function (query, params, stringifier) { | ||
if (!query || !query.length || !params) { | ||
return query; | ||
} | ||
var q = 0; | ||
var a = 0; | ||
var len = args.length; | ||
var str = ''; | ||
try { | ||
while (q >= 0) { | ||
var oldq = q; | ||
q = query.indexOf('?', q); | ||
if (q >= 0) { | ||
str += query.substr(oldq, q-oldq); | ||
if (a >= len) { | ||
throw new QueryParserError('Query parameter number ' + (a+1) + ' is not defined. Placeholder for not provided argument.'); | ||
} | ||
str += typeEncoder.stringifyValue(args[a++]); | ||
q += 1; | ||
} else { | ||
str += query.substr(oldq); | ||
} | ||
if (!stringifier) { | ||
stringifier = function (a) {return a.toString()}; | ||
} | ||
var parts = []; | ||
var isLiteral = false; | ||
var lastIndex = 0; | ||
var paramsCounter = 0; | ||
for (var i = 0; i < query.length; i++) { | ||
var char = query.charAt(i); | ||
if (char === "'" && query.charAt(i-1) !== '\\') { | ||
//opening or closing quotes in a literal value of the query | ||
isLiteral = !isLiteral; | ||
} | ||
return str; | ||
if (!isLiteral && char === '?') { | ||
//is a placeholder | ||
parts.push(query.substring(lastIndex, i)); | ||
parts.push(stringifier(params[paramsCounter++])); | ||
lastIndex = i+1; | ||
} | ||
} | ||
catch (e) { | ||
throw new QueryParserError(e); | ||
} | ||
parts.push(query.substring(lastIndex)); | ||
return parts.join(''); | ||
} | ||
}; | ||
return { | ||
parse: parse | ||
}; | ||
})(); | ||
/** | ||
@@ -166,476 +162,2 @@ * Server error codes returned by Cassandra | ||
/** | ||
* Encodes and decodes from a type to Cassandra bytes | ||
*/ | ||
var typeEncoder = (function(){ | ||
/** | ||
* Decodes Cassandra bytes into Javascript values. | ||
*/ | ||
function decode(bytes, type) { | ||
if (bytes === null) { | ||
return null; | ||
} | ||
switch(type[0]) { | ||
case dataTypes.custom: | ||
case dataTypes.decimal: | ||
case dataTypes.inet: | ||
case dataTypes.varint: | ||
//return buffer and move on :) | ||
return utils.copyBuffer(bytes); | ||
case dataTypes.ascii: | ||
return bytes.toString('ascii'); | ||
case dataTypes.bigint: | ||
case dataTypes.counter: | ||
return decodeBigNumber(utils.copyBuffer(bytes)); | ||
case dataTypes.timestamp: | ||
return decodeTimestamp(utils.copyBuffer(bytes)); | ||
case dataTypes.blob: | ||
return utils.copyBuffer(bytes); | ||
case dataTypes.boolean: | ||
return !!bytes.readUInt8(0); | ||
case dataTypes.double: | ||
return bytes.readDoubleBE(0); | ||
case dataTypes.float: | ||
return bytes.readFloatBE(0); | ||
case dataTypes.int: | ||
return bytes.readInt32BE(0); | ||
case dataTypes.uuid: | ||
case dataTypes.timeuuid: | ||
return uuid.unparse(bytes); | ||
case dataTypes.text: | ||
case dataTypes.varchar: | ||
return bytes.toString('utf8'); | ||
case dataTypes.list: | ||
case dataTypes.set: | ||
var list = decodeList(bytes, type[1][0]); | ||
return list; | ||
case dataTypes.map: | ||
var map = decodeMap(bytes, type[1][0][0], type[1][1][0]); | ||
return map; | ||
} | ||
throw new Error('Unknown data type: ' + type[0]); | ||
} | ||
function decodeBigNumber (bytes) { | ||
return Long.fromBuffer(bytes); | ||
} | ||
function decodeTimestamp (bytes) { | ||
var value = decodeBigNumber(bytes); | ||
if (value.greaterThan(Long.fromNumber(Number.MIN_VALUE)) && value.lessThan(Long.fromNumber(Number.MAX_VALUE))) { | ||
return new Date(value.toNumber()); | ||
} | ||
return value; | ||
} | ||
/* | ||
* Reads a list from bytes | ||
*/ | ||
function decodeList (bytes, type) { | ||
var offset = 0; | ||
//a short containing the total items | ||
var totalItems = bytes.readUInt16BE(offset); | ||
offset += 2; | ||
var list = []; | ||
for(var i = 0; i < totalItems; i++) { | ||
//bytes length of the item | ||
var length = bytes.readUInt16BE(offset); | ||
offset += 2; | ||
//slice it | ||
list.push(decode(bytes.slice(offset, offset+length), [type])); | ||
offset += length; | ||
} | ||
return list; | ||
} | ||
/* | ||
* Reads a map (key / value) from bytes | ||
*/ | ||
function decodeMap (bytes, type1, type2) { | ||
var offset = 0; | ||
//a short containing the total items | ||
var totalItems = bytes.readUInt16BE(offset); | ||
offset += 2; | ||
var map = {}; | ||
for(var i = 0; i < totalItems; i++) { | ||
var keyLength = bytes.readUInt16BE(offset); | ||
offset += 2; | ||
var key = decode(bytes.slice(offset, offset+keyLength), [type1]); | ||
offset += keyLength; | ||
var valueLength = bytes.readUInt16BE(offset); | ||
offset += 2; | ||
map[key] = decode(bytes.slice(offset, offset+valueLength), [type2]); | ||
offset += valueLength; | ||
} | ||
return map; | ||
} | ||
function encode (item) { | ||
if (item === null) { | ||
return null; | ||
} | ||
var value = item; | ||
var type = null; | ||
var subtype = null; | ||
if (item.hint) { | ||
value = item.value; | ||
type = item.hint; | ||
if (typeof type === 'string') { | ||
var typeInfo = dataTypes.getByName(type); | ||
type = typeInfo.type; | ||
subtype = typeInfo.subtype; | ||
} | ||
} | ||
if (value === null) { | ||
return null; | ||
} | ||
if (!type) { | ||
type = guessDataType(value); | ||
if (!type) { | ||
throw new TypeError('Target data type could not be guessed, you must specify a hint.', value); | ||
} | ||
} | ||
switch (type) { | ||
case dataTypes.int: | ||
return encodeInt(value); | ||
case dataTypes.float: | ||
return encodeFloat(value); | ||
case dataTypes.double: | ||
return encodeDouble(value); | ||
case dataTypes.boolean: | ||
return encodeBoolean(value); | ||
case dataTypes.text: | ||
case dataTypes.varchar: | ||
return encodeString(value); | ||
case dataTypes.ascii: | ||
return encodeString(value, 'ascii'); | ||
case dataTypes.uuid: | ||
case dataTypes.timeuuid: | ||
return encodeUuid(value); | ||
case dataTypes.custom: | ||
case dataTypes.decimal: | ||
case dataTypes.inet: | ||
case dataTypes.varint: | ||
case dataTypes.blob: | ||
return encodeBlob(value, type); | ||
case dataTypes.bigint: | ||
case dataTypes.counter: | ||
return encodeBigNumber(value, type); | ||
case dataTypes.timestamp: | ||
return encodeTimestamp(value, type); | ||
case dataTypes.list: | ||
case dataTypes.set: | ||
return encodeList(value, type, subtype); | ||
case dataTypes.map: | ||
return encodeMap(value); | ||
default: | ||
throw new TypeError('Type not supported ' + type, value); | ||
} | ||
} | ||
/** | ||
* Try to guess the Cassandra type to be stored, based on the javascript value type | ||
*/ | ||
function guessDataType (value) { | ||
var dataType = null; | ||
if (typeof value === 'number') { | ||
dataType = dataTypes.int; | ||
if (value % 1 !== 0) { | ||
dataType = dataTypes.double; | ||
} | ||
} | ||
else if(value instanceof Date) { | ||
dataType = dataTypes.timestamp; | ||
} | ||
else if(value instanceof Long) { | ||
dataType = dataTypes.bigint; | ||
} | ||
else if (typeof value === 'string') { | ||
dataType = dataTypes.text; | ||
if (/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i.test(value)){ | ||
dataType = dataTypes.uuid; | ||
} | ||
} | ||
else if (value instanceof Buffer) { | ||
dataType = dataTypes.blob; | ||
} | ||
else if (util.isArray(value)) { | ||
dataType = dataTypes.list; | ||
} | ||
else if (value === true || value === false) { | ||
dataType = dataTypes.boolean; | ||
} | ||
return dataType; | ||
} | ||
function encodeInt (value) { | ||
if (typeof value !== 'number') { | ||
throw new TypeError(null, value, 'number'); | ||
} | ||
var buf = new Buffer(4); | ||
buf.writeInt32BE(value, 0); | ||
return buf; | ||
} | ||
function encodeFloat (value) { | ||
if (typeof value !== 'number') { | ||
throw new TypeError(null, value, 'number'); | ||
} | ||
var buf = new Buffer(4); | ||
buf.writeFloatBE(value, 0); | ||
return buf; | ||
} | ||
function encodeDouble (value) { | ||
if (typeof value !== 'number') { | ||
throw new TypeError(null, value, 'number'); | ||
} | ||
var buf = new Buffer(8); | ||
buf.writeDoubleBE(value, 0); | ||
return buf; | ||
} | ||
function encodeTimestamp (value, type) { | ||
if (value instanceof Date) { | ||
value = value.getTime(); | ||
} | ||
return encodeBigNumber (value, type); | ||
} | ||
function encodeUuid (value) { | ||
if (typeof value === 'string') { | ||
value = uuid.parse(value, new Buffer(16)); | ||
} | ||
if (!(value instanceof Buffer)) { | ||
throw new TypeError('Only Buffer and string objects allowed for UUID values', value, Buffer); | ||
} | ||
return value; | ||
} | ||
function encodeBigNumber (value, type) { | ||
var buf = getBigNumberBuffer(value); | ||
if (buf === null) { | ||
throw new TypeError(null, value, Buffer, null, type); | ||
} | ||
return buf; | ||
} | ||
function getBigNumberBuffer (value) { | ||
var buf = null; | ||
if (value instanceof Buffer) { | ||
buf = value; | ||
} else if (value instanceof Long) { | ||
buf = Long.toBuffer(value); | ||
} else if (typeof value === 'number') { | ||
buf = Long.toBuffer(Long.fromNumber(value)); | ||
} | ||
return buf; | ||
} | ||
function encodeString (value, encoding) { | ||
if (typeof value !== 'string') { | ||
throw new TypeError(null, value, 'string'); | ||
} | ||
return new Buffer(value, encoding); | ||
} | ||
function encodeBlob (value, type) { | ||
if (!(value instanceof Buffer)) { | ||
throw new TypeError(null, value, Buffer, null, type); | ||
} | ||
return value; | ||
} | ||
function encodeBoolean(value) { | ||
return new Buffer([(value ? 1 : 0)]); | ||
} | ||
function encodeList(value, type, subtype) { | ||
if (!util.isArray(value)) { | ||
throw new TypeError(null, value, Array, null, type); | ||
} | ||
if (value.length === 0) { | ||
return null; | ||
} | ||
var parts = []; | ||
parts.push(getLengthBuffer(value)); | ||
for (var i=0;i<value.length;i++) { | ||
var item = value[i]; | ||
if (subtype) { | ||
item = {hint: subtype, value: item}; | ||
} | ||
var bytes = encode(item); | ||
//include item byte length | ||
parts.push(getLengthBuffer(bytes)); | ||
//include item | ||
parts.push(bytes); | ||
} | ||
return Buffer.concat(parts); | ||
} | ||
function encodeMap(value) { | ||
var parts = []; | ||
var propCounter = 0; | ||
for (var key in value) { | ||
if (!value.hasOwnProperty(key)) continue; | ||
//add the key and the value | ||
var keyBuffer = encode(key); | ||
//include item byte length | ||
parts.push(getLengthBuffer(keyBuffer)); | ||
//include item | ||
parts.push(keyBuffer); | ||
//value | ||
var valueBuffer = encode(value[key]); | ||
//include item byte length | ||
parts.push(getLengthBuffer(valueBuffer)); | ||
//include item | ||
if (valueBuffer !== null) { | ||
parts.push(valueBuffer); | ||
} | ||
propCounter++; | ||
} | ||
parts.unshift(getLengthBuffer(propCounter)); | ||
return Buffer.concat(parts); | ||
} | ||
/** | ||
* Converts a value to string for a query | ||
*/ | ||
function stringifyValue (item) { | ||
if (item === null || item === undefined) { | ||
return 'NULL'; | ||
} | ||
var value = item; | ||
var type = null; | ||
var subtype = null; | ||
if (item.hint) { | ||
value = item.value; | ||
type = item.hint; | ||
if (typeof type === 'string') { | ||
var typeInfo = dataTypes.getByName(type); | ||
type = typeInfo.type; | ||
subtype = typeInfo.subtype; | ||
} | ||
} | ||
if (value === null || value === undefined) { | ||
return 'NULL'; | ||
} | ||
if (!type) { | ||
type = guessDataType(value); | ||
if (!type && value instanceof QueryLiteral) { | ||
return value.toString(); | ||
} | ||
if (!type) { | ||
throw new TypeError('Target data type could not be guessed, you must specify a hint.', value); | ||
} | ||
} | ||
switch (type) { | ||
case dataTypes.int: | ||
case dataTypes.float: | ||
case dataTypes.double: | ||
case dataTypes.boolean: | ||
case dataTypes.uuid: | ||
case dataTypes.timeuuid: | ||
return value.toString(); | ||
case dataTypes.text: | ||
case dataTypes.varchar: | ||
case dataTypes.ascii: | ||
return quote(value); | ||
case dataTypes.custom: | ||
case dataTypes.decimal: | ||
case dataTypes.inet: | ||
case dataTypes.varint: | ||
case dataTypes.blob: | ||
return stringifyBuffer(value); | ||
case dataTypes.bigint: | ||
case dataTypes.counter: | ||
return stringifyBigNumber(value); | ||
case dataTypes.timestamp: | ||
return stringifyDate(value); | ||
case dataTypes.list: | ||
return stringifyArray(value, subtype); | ||
case dataTypes.set: | ||
return stringifyArray(value, subtype, '{', '}'); | ||
case dataTypes.map: | ||
return stringifyMap(value); | ||
default: | ||
throw new TypeError('Type not supported ' + type, value); | ||
} | ||
} | ||
function stringifyBuffer(value) { | ||
return '0x' + value.toString('hex'); | ||
} | ||
function stringifyDate (value) { | ||
return value.getTime().toString(); | ||
} | ||
function quote(value) { | ||
if (typeof value !== 'string') { | ||
throw new TypeError(null, value, 'string'); | ||
} | ||
value = value.replace(/'/g, "''"); // escape strings with double single-quotes | ||
return "'" + value + "'"; | ||
} | ||
function stringifyBigNumber (value) { | ||
var buf = getBigNumberBuffer(value); | ||
if (buf === null) { | ||
throw new TypeError(null, value, Long); | ||
} | ||
return 'blobAsBigint(' + stringifyBuffer(buf) + ')'; | ||
} | ||
function stringifyArray (value, subtype, openChar, closeChar) { | ||
if (!openChar) { | ||
openChar = '['; | ||
closeChar = ']'; | ||
} | ||
var stringValues = []; | ||
for (var i = 0; i < value.length; i++) { | ||
var item = value[i]; | ||
if (subtype) { | ||
item = {hint: subtype, value: item}; | ||
} | ||
stringValues.push(stringifyValue(item)); | ||
} | ||
return openChar + stringValues.join() + closeChar; | ||
} | ||
function stringifyMap (value) { | ||
var stringValues = []; | ||
for (var key in value) { | ||
stringValues.push(stringifyValue(key) + ':' + stringifyValue(value[key])); | ||
} | ||
return '{' + stringValues.join() + '}'; | ||
} | ||
/** | ||
* Gets a buffer containing with 2 bytes representing the array length or the value | ||
*/ | ||
function getLengthBuffer(value) { | ||
var lengthBuffer = new Buffer(2); | ||
if (!value) { | ||
lengthBuffer.writeUInt16BE(0, 0); | ||
} | ||
else if (value.length) { | ||
lengthBuffer.writeUInt16BE(value.length, 0); | ||
} | ||
else { | ||
lengthBuffer.writeUInt16BE(value, 0); | ||
} | ||
return lengthBuffer; | ||
} | ||
return { | ||
decode: decode, | ||
encode: encode, | ||
guessDataType: guessDataType, | ||
stringifyValue: stringifyValue}; | ||
})(); | ||
//classes | ||
@@ -863,3 +385,2 @@ | ||
exports.resultKind = resultKind; | ||
exports.typeEncoder = typeEncoder; | ||
exports.FrameHeader = FrameHeader; | ||
@@ -866,0 +387,0 @@ exports.Long = Long; |
@@ -5,2 +5,3 @@ var async = require('async'); | ||
var encoder = require('./encoder.js'); | ||
var types = require('./types.js'); | ||
@@ -123,3 +124,3 @@ var utils = require('./utils.js'); | ||
var frameWriter = new FrameWriter('QUERY', this.streamId); | ||
var query = types.queryParser.parse(this.query, this.params); | ||
var query = types.queryParser.parse(this.query, this.params, encoder.stringifyValue); | ||
frameWriter.writeLString(query); | ||
@@ -194,3 +195,3 @@ frameWriter.writeShort(this.consistency); | ||
for (var i=0; i<this.params.length; i++) { | ||
frameWriter.writeBytes(types.typeEncoder.encode(this.params[i])); | ||
frameWriter.writeBytes(encoder.encode(this.params[i])); | ||
} | ||
@@ -231,3 +232,3 @@ frameWriter.writeShort(this.consistency); | ||
}, | ||
function (callback) { | ||
function (next) { | ||
self.isRunning = true; | ||
@@ -251,3 +252,3 @@ var writeItem = self.queue.shift(); | ||
//to allow IO between writes | ||
setImmediate(callback); | ||
setImmediate(next); | ||
} | ||
@@ -254,0 +255,0 @@ }, |
{ | ||
"name": "node-cassandra-cql", | ||
"version": "0.4.1", | ||
"version": "0.4.2", | ||
"description": "Node.js driver for Apache Cassandra", | ||
@@ -46,4 +46,4 @@ "author": "Jorge Bay <jorgebaygondra@gmail.com>", | ||
"scripts": { | ||
"test": "mocha test -R spec" | ||
"test": "mocha test -R spec -t 5000" | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
98304
13
2707