Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Sign inDemoInstall


Package Overview
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies


@apla/clickhouse - npm Package Compare versions

Comparing version 1.2.3 to 1.4.0



"name": "@apla/clickhouse",
"version": "1.2.3",
"version": "1.4.0",
"description": "Yandex ClickHouse database interface",

@@ -29,6 +29,7 @@ "main": "src/clickhouse.js",

"dependencies": {
"object-assign": "^4.1.0",
"buffer-indexof-polyfill": "^1.0.1"
"buffer-indexof-polyfill": "^1.0.1",
"object-assign": "^4.1.0"
"devDependencies": {
"bluebird": "^3.5.0",
"codecov": "^2.2.0",

@@ -35,0 +36,0 @@ "memwatch-next": "^0.3.0",

@@ -15,5 +15,15 @@ Database interface for

var ch = new ClickHouse ({host:});
var ch = new ClickHouse ({host:, port: 8123, auth: "user:password"});
// or
var ch = new ClickHouse (;
// stream is an object stream. you can pipe it
// do the query, callback interface, not recommended for selects
ch.query ("CREATE DATABASE clickhouse_test", function (err, data) {
// promise interface (requires 'util.promisify' for node < 8, Promise shim for node < 4)
ch.querying ("CREATE DATABASE clickhouse_test").then (…);
// it is better to use stream interface to fetch select results
var stream = ch.query ("SELECT 1");

@@ -73,4 +83,4 @@

`queryOptions` object can contain any option from Settings (docs:

@@ -87,5 +97,7 @@

* **omitFormat**: `FORMAT JSONCompact` will be added by default to every query.
* **omitFormat**: `FORMAT JSONCompact` will be added by default to every query
which returns dataset. Currently `SELECT|SHOW|DESC|DESCRIBE|EXISTS\s+TABLE`.
You can change this behaviour by providing this option. In this case you should
add `FORMAT JSONCompact` by yourself.
* **inputFormat**: this is format for data loading with `INSERT` statements.
* **syncParser**: collect data, then parse entire response. Should be faster, but for

@@ -120,4 +132,68 @@ large datasets all your dataset goes into memory (actually, entire response + entire dataset).

### ()
You should have at least one error handler listening. Via callbacks or via stream errors.
If you have callback and stream listener, you'll have error notification in both listeners.
### clickHouse.querying (statement, [options]).then (…)
Promise interface. Similar to the callback one.
### (function (err, response) {})
Sends an empty query and check if it "Ok.\n"
### clickHouse.pinging ().then (…)
Promise interface for `ping`
## Bulk data loading with INSERT statements
`INSERT` can be used for bulk data loading. There is a 2 formats easily implementable
with javascript: CSV and TabSeparated/TSV.
CSV is useful for loading from file, thus you can read and pipe into clickhouse
file contents. To activate CSV parsing you should set `inputFormat` option to `CSV`
for driver or query:
var csvStream = fs.createReadStream ('data.csv');
var clickhouseStream = clickHouse.query (statement, {inputFormat: CSV});
csvStream.pipe (clickhouseStream);
TSV is useful for loading from file and bulk loading from external sources, such as other databases.
Only `\\`, `\t` and `\n` need to be escaped in strings; numbers, nulls,
bools and date objects need some minor processing. You can send prepared TSV data strings
(line ending will be appended automatically), buffers (always passed as is) or Arrays with fields (WIP).
## Memory size
You can read all the records into memory in single call like this:
var ch = new ClickHouse ({host: host, port: port});
ch.query ("SELECT number FROM system.numbers LIMIT 10", {syncParser: true}, function (err, result) {
// result will contain all the data you need
In this case whole JSON response from the server will be read into memory,
then parsed into memory hogging your CPU. Default parser will parse server response
line by line and emits events. This is slower, but much more memory and CPU efficient
for larger datasets.
## Promise interface
Promise interface have some restrictions. It is not recommended to use this interface
for `INSERT` and `SELECT` queries. For the `INSERT` you cannot bulk load data via stream,
`SELECT` will collect all the records in the memory. For simple usage where data size
is controlled it is ok.

@@ -10,6 +10,4 @@ var http = require ('http');

Object.assign = require ('object-assign');
require ('./legacy-support');
require ('buffer-indexof-polyfill');
var JSONStream = require ('./json-stream');

@@ -22,2 +20,12 @@ (this, options);
this.format = options.recordFormat;
this._writeBuffer = [];
this._canWrite = false;
Object.defineProperty (this, 'req', {
get: function () {return this._req},
set: function (req) {this._req = req; this._canWrite = true;}

@@ -31,153 +39,322 @@

RecordStream.prototype._write = function write () {
RecordStream.prototype._write = function _write (chunk, enc, cb) {
if (Array.isArray (chunk)) {
chunk = (function (field) {
return encodeValue (false, field, this.format);
}.bind (this)).join ("\t");
if (typeof chunk === 'string') {
if (chunk.substr (chunk.length - 1) !== "\n") {
chunk = chunk + "\n";
chunk = Buffer.from ? Buffer.from (chunk, enc) : new Buffer (chunk, enc);
// there is no way to determine line ending efficiently for Buffer
if (!(chunk instanceof Buffer)) {
return this.emit ('error', new Error ('Incompatible format'));
this._canWrite = this.req.write (chunk);
if (!this._canWrite) {
this.req.once ('drain', function () {
// wait for drain, then emit drain event calling cb ()
cb ();
}.bind (this));
cb ();
RecordStream.prototype.end = function write (chunk, enc, cb) {
if (chunk)
this.write (chunk, enc);
function httpRequest (reqParams, reqData, cb) {
this.req.once ('drain', function () {
this.req.end ();
cb && cb ();
}.bind (this));
var stream = new RecordStream ();
var onResponse = function(response) {
var str;
var error;
if (response.statusCode === 200) {
str = new Buffer (0);
During parsing, values could be enclosed or not enclosed in quotes.
Supported both single and double quotes. In particular,
Strings could be represented without quotes - in that case,
they are parsed up to comma or newline (CR or LF).
Contrary to RFC, in case of parsing strings without quotes,
leading and trailing spaces and tabs are ignored. As line delimiter,
both Unix (LF), Windows (CR LF) or Mac OS Classic (LF CR) variants are supported.
In TabSeparated format, data is written by row. Each row contains values separated by tabs.
Each value is follow by a tab, except the last value in the row,
which is followed by a line break. Strictly Unix line breaks are assumed everywhere.
The last row also must contain a line break at the end. Values are written in text format,
without enclosing quotation marks, and with special characters escaped.
Minimum set of symbols that you must escape in TabSeparated format is tab, newline (LF) and backslash.
Arrays are formatted as a list of comma-separated values in square brackets.
Number items in the array are formatted as normally, but dates, dates with times,
and strings are formatted in single quotes with the same escaping rules as above.
As an exception, parsing DateTime is also supported in Unix timestamp format,
if it consists of exactly 10 decimal digits. The result is not time zone-dependent.
The formats YYYY-MM-DD hh:mm:ss and NNNNNNNNNN are differentiated automatically.
Prints every row in parentheses. Rows are separated by commas.
There is no comma after the last row. The values inside the parentheses are also comma-separated.
Numbers are output in decimal format without quotes. Arrays are output in square brackets.
Strings, dates, and dates with times are output in quotes.
Escaping rules and parsing are same as in the TabSeparated format.
During formatting, extra spaces aren’t inserted, but during parsing,
they are allowed and skipped (except for spaces inside array values, which are not allowed).
Minimum set of symbols that you must escape in Values format is single quote and backslash.
Nulls: \N?
function encodeValue (wrapString, v, format) {
switch (typeof v) {
case 'string':
return v.replace (/\\/g, '\\').replace(/\t/g, '\\t').replace(/\n/g, '\\n');
case 'number':
if (isNaN (v))
return 'nan';
if (v === +Infinity)
return '+inf';
if (v === -Infinity)
return '-inf';
if (v === Infinity)
return 'inf';
return v.toString ();
case 'object':
if (v instanceof Date)
return ("" + v.valueOf ()).substr (0, 10);
if (v instanceof Array)
return '[' + (encodeValue.bind (this, true)).join (',') + ']'
if (v === null)
return '\\N';
return '\\N';
console.warn ('Cannot stringify [Object]:', v);
case 'boolean':
return v === true ? 1 : 0;
function encodeTSRow (row) {
return (encodeValue.bind (this, false));
function parseError (e) {
var fields = new Error (e.toString ('utf8'));
e.toString ('utf8')
.split (/\,\s+(?=e\.)/gm)
.map (function (f) {
f = f.trim ().split (/\n/gm).join ('');
var m;
if (m = f.match (/^(?:Error: )?Code: (\d+)$/)) {
fields.code = parseInt (m[1]);
} else if (m = f.match (/^e\.displayText\(\) = ([A-Za-z0-9\:]+:) ([^]+)/m)) {
// e.displayText() = DB::Exception: Syntax error: failed at position 0: SEL
fields.scope = m[1];
fields.message = m[2];
if (m = fields.message.match (/Syntax error: (?:failed at position (\d+)(?:\s*\(line\s*(\d+)\,\s+col\s*(\d+)\))?)/)) {
// console.log ('!!! syntax error: pos %s line %s col %s', m[1], m[2], m[3]);
fields.lineno = parseInt (m[2] || 1, 10);
fields.colno = parseInt (m[3] || m[1], 10);
} else if (m = f.match (/^e\.what\(\) = (.*)/)) {
fields.type = m[1];
} else {
error = new Buffer (0);
console.warn ('Unknown error field:', f)
function errorHandler (e) {
stream.emit ('error', e);
return cb && cb (e);
// In case of error, we're just throw away data
response.on ('error', errorHandler);
return fields;
// TODO: use streaming interface
// from
// or or
// or
function httpResponseHandler (stream, reqParams, reqData, cb, response) {
var str;
var error;
// or implement it youself
var jsonParser = new JSONStream (stream);
if (response.statusCode === 200) {
str = Buffer.alloc ? Buffer.alloc (0) : new Buffer (0);
} else {
error = Buffer.alloc ? Buffer.alloc (0) : new Buffer (0);
var symbolsTransferred = 0;
function errorHandler (e) {
var err = parseError (e);
//another chunk of data has been received, so append it to `str`
response.on ('data', function (chunk) {
// user should define callback or add event listener for the error event
if (!cb || (cb && stream.listeners ('error').length))
stream.emit ('error', err);
return cb && cb (err);
symbolsTransferred += chunk.length;
// In case of error, we're just throw away data
response.on ('error', errorHandler);
// JSON response
if (
&& response.headers['content-type'].indexOf ('application/json') === 0
&& !reqData.syncParser
&& chunk.lastIndexOf ("\n") !== -1
) {
// TODO: use streaming interface
// from
// or or
// or
// store in buffer anything after
var newLinePos = chunk.lastIndexOf ("\n");
// or implement it youself
var jsonParser = new JSONStream (stream);
var remains = chunk.slice (newLinePos + 1);
var symbolsTransferred = 0;
Buffer.concat([str, chunk.slice (0, newLinePos)])
.toString ('utf8')
.split ("\n")
.forEach (jsonParser);
//another chunk of data has been received, so append it to `str`
response.on ('data', function (chunk) {
jsonParser.rows.forEach (function (row) {
// write to readable stream
stream.push (row);
symbolsTransferred += chunk.length;
jsonParser.rows = [];
// JSON response
if (
&& response.headers['content-type'].indexOf ('application/json') === 0
&& !reqData.syncParser
&& chunk.lastIndexOf ("\n") !== -1
&& str
) {
str = remains;
// store in buffer anything after
var newLinePos = chunk.lastIndexOf ("\n");
var remains = chunk.slice (newLinePos + 1);
Buffer.concat([str, chunk.slice (0, newLinePos)])
.toString ('utf8')
.split ("\n")
.forEach (jsonParser);
jsonParser.rows.forEach (function (row) {
// write to readable stream
stream.push (row);
jsonParser.rows = [];
str = remains;
// plaintext response
} else if (str) {
str = Buffer.concat ([str, chunk]);
} else {
error = Buffer.concat ([error, chunk]);
} else if (str) {
str = Buffer.concat ([str, chunk]);
} else {
error = Buffer.concat ([error, chunk]);
//the whole response has been received, so we just print it out here
response.on('end', function () {
//the whole response has been received, so we just print it out here
response.on('end', function () {
// debug (response.headers);
// debug (response.headers);
if (error) {
return errorHandler (new Error (error.toString ('utf8')))
if (error) {
return errorHandler (error);
var data;
var data;
var contentType = response.headers['content-type'];
var contentType = response.headers['content-type'];
if (response.statusCode === 200 && (
|| contentType.indexOf ('text/plain') === 0
|| contentType.indexOf ('text/html') === 0 // WTF: xenial - no content-type, precise - text/html
)) {
// probably this is a ping response or any other successful response with *empty* body
stream.push (null);
cb && cb (null, str.toString ('utf8'));
if (response.statusCode === 200 && (
|| contentType.indexOf ('text/plain') === 0
|| contentType.indexOf ('text/html') === 0 // WTF: xenial - no content-type, precise - text/html
)) {
// probably this is a ping response or any other successful response with *empty* body
stream.push (null);
cb && cb (null, str.toString ('utf8'));
var supplemental = {};
// we already pushed all the data
if (jsonParser.columns.length) {
try {
supplemental = JSON.parse (jsonParser.supplementalString + str.toString ('utf8'));
} catch (e) {
stream.supplemental = supplemental;
var supplemental = {};
// end stream
stream.push (null);
if (jsonParser.columns.length) {
try {
supplemental = JSON.parse (jsonParser.supplementalString + str.toString ('utf8'));
} catch (e) {
stream.supplemental = supplemental;
cb && cb (null, Object.assign ({}, supplemental, {
meta: jsonParser.columns,
transferred: symbolsTransferred
// end stream
stream.push (null);
cb && cb (null, Object.assign ({}, supplemental, {
meta: jsonParser.columns,
transferred: symbolsTransferred
// one shot data parsing, should be much faster for smaller datasets
try {
data = JSON.parse (str.toString ('utf8'));
data.transferred = symbolsTransferred;
if (data.meta) {
stream.emit ('metadata', data.meta);
// one shot data parsing, should be much faster for smaller datasets
try {
data = JSON.parse (str.toString ('utf8'));
if ( {
// no highWatermark support (function (row) {
stream.push (row);
data.transferred = symbolsTransferred;
stream.push (null);
} catch (e) {
return errorHandler (e);
if (data.meta) {
stream.emit ('metadata', data.meta);
cb && cb (null, data);
if ( {
// no highWatermark support (function (row) {
stream.push (row);
stream.push (null);
} catch (e) {
return errorHandler (e);
function httpRequest (reqParams, reqData, cb) {
cb && cb (null, data);
if (reqParams.query) {
reqParams.path = (reqParams.pathname || reqParams.path) + '?' + qs.stringify (reqParams.query);
var stream = new RecordStream ({
recordFormat: 'TabSeparated'
var req = http.request (reqParams, onResponse);
var req = http.request (reqParams, httpResponseHandler.bind (this, stream, reqParams, reqData, cb));
stream.req = req;
if (reqData.query)

@@ -219,2 +396,4 @@ req.write (reqData.query);

urlObject.port = urlObject.port || 8123;
return urlObject;

@@ -263,12 +442,22 @@ }

// we need to handler 2 and 3 and not to close http stream in that cases
// we need to handle 2 and 3 and http stream must stay open in that cases
if (chQuery.match (/VALUES$/i)) {
reqData.finalized = false;
// simplest format to use, only need to escape \t, \\ and \n
formatSuffix = ' FORMAT TabSeparated';
// TODO: use values format
// formatSuffix = ' FORMAT TabSeparated ';
} else if (!chQuery.match (/VALUES/i)) {
reqData.finalized = false;
if (!chQuery.match (/FORMAT/i)) {
// simplest format to use, only need to escape \t, \\ and \n
formatSuffix = ' FORMAT TabSeparated ';
} else {
// otherwise, we will allow user to send prepared strings/buffers
// use query string to submit ClickHouse query — usefuful to mock CH server
// use query string to submit ClickHouse query — useful to mock CH server
if (this.options.useQueryString) {

@@ -282,3 +471,3 @@ queryObject.query = chQuery + ((options.omitFormat) ? '' : formatSuffix);

reqParams.path += '?' + qs.stringify (queryObject);
reqParams.query = queryObject;

@@ -290,2 +479,13 @@ var stream = httpRequest (reqParams, reqData, cb);

ClickHouse.prototype.querying = function (chQuery, options) {
return new Promise (function (resolve, reject) {
var stream = this.query (chQuery, options, function (err, data) {
if (err)
return reject (err);
resolve (data);
}.bind (this));
} = function (cb) {

@@ -302,2 +502,17 @@

ClickHouse.prototype.pinging = function () {
return new Promise (function (resolve, reject) {
var reqParams = this.getReqParams ();
reqParams.method = 'GET';
httpRequest (reqParams, {finalized: true}, function (err, data) {
if (err)
return reject (err);
resolve (data);
}.bind (this));
module.exports = ClickHouse;

@@ -9,3 +9,3 @@ var ClickHouse = require ("../src/clickhouse");

describe ("real server queries", function () {
describe ("real server", function () {

@@ -20,3 +20,3 @@ var server, (function (err, ok) {
assert (!err);
assert.ifError (err);
assert.equal (ok, "Ok.\n", "ping response should be 'Ok.\\n'");

@@ -27,8 +27,21 @@ done ();

it ("selects using callback", function (done) {
var ch = new ClickHouse ({host: host, port: port, useQueryString: true});
ch.query ("SELECT 1", {syncParser: true}, function (err, result) {
assert (!err);
assert (result.meta, "result should be Object with `data` key to represent rows");
assert (, "result should be Object with `meta` key to represent column info");
it ("pinging using promise interface", function () {
var ch = new ClickHouse ({host: host, port: port});
return ch.pinging ();
it ("pinging using promise interface with bad connection option", function () {
var ch = new ClickHouse ();
return ch.pinging ().then (function () {
return Promise.reject (new Error ("Driver should throw without host name"))
}, function (e) {
return Promise.resolve ();
it ("pings with options as host", function (done) {
var ch = new ClickHouse (host); (function (err, ok) {
assert.ifError (err);
assert.equal (ok, "Ok.\n", "ping response should be 'Ok.\\n'");
done ();

@@ -38,2 +51,7 @@ });

it ("nothing to ping", function () {
var ch = new ClickHouse ();
assert (ch);
it ("returns error", function (done) {

@@ -48,3 +66,3 @@ var ch = new ClickHouse ({host: host, port: port, useQueryString: true});

assert (err);
console.log (err);
// console.log (err);

@@ -54,25 +72,7 @@ });

it ("selects using callback and query submitted in the POST body", function (done) {
it ("selects from system columns", function (done) {
var ch = new ClickHouse ({host: host, port: port});
ch.query ("SELECT 1", {syncParser: true}, function (err, result) {
ch.query ("SELECT * FROM system.columns", function (err, result) {
assert (!err);
assert (result.meta, "result should be Object with `data` key to represent rows");
assert (, "result should be Object with `meta` key to represent column info");
done ();
it ("selects numbers using callback", function (done) {
var ch = new ClickHouse ({host: host, port: port, useQueryString: true});
ch.query ("SELECT number FROM system.numbers LIMIT 10", {syncParser: true}, function (err, result) {
assert (!err);
assert (result.meta, "result should be Object with `data` key to represent rows");
assert (, "result should be Object with `meta` key to represent column info");
assert (result.meta.constructor === Array, "metadata is an array with column descriptions");
assert (result.meta[0].name === "number");
assert ( === Array, "data is a row set");
assert ([0].constructor === Array, "each row contains list of values (using FORMAT JSONCompact)");
assert ([9][0] === "9"); // this should be corrected at database side
assert (result.rows === 10);
assert (result.rows_before_limit_at_least === 10);
done ();

@@ -82,67 +82,6 @@ });

it ("selects numbers using callback and query submitted in the POST body", function (done) {
var ch = new ClickHouse ({host: host, port: port});
ch.query ("SELECT number FROM system.numbers LIMIT 10", {syncParser: true}, function (err, result) {
assert (!err);
assert (result.meta, "result should be Object with `meta` key to represent rows");
assert (, "result should be Object with `data` key to represent column info");
assert (result.meta.constructor === Array, "metadata is an array with column descriptions");
assert (result.meta[0].name === "number");
assert ( === Array, "data is a row set");
assert ([0].constructor === Array, "each row contains list of values (using FORMAT JSONCompact)");
assert ([9][0] === "9"); // this should be corrected at database side
assert (result.rows === 10);
assert (result.rows_before_limit_at_least === 10);
done ();
it ("selects numbers asynchronously using events and query submitted in the POST body", function (done) {
var ch = new ClickHouse ({host: host, port: port});
var rows = [];
var stream = ch.query ("SELECT number FROM system.numbers LIMIT 10", function (err, result) {
assert (!err);
assert (result.meta, "result should be Object with `meta` key to represent rows");
assert (rows, "result should be Object with `data` key to represent column info");
assert (result.meta.constructor === Array, "metadata is an array with column descriptions");
assert (result.meta[0].name === "number");
assert (rows.length === 10, "total 10 rows");
assert (rows[0].constructor === Array, "each row contains list of values (using FORMAT JSONCompact)");
assert (rows[9][0] === "9"); // this should be corrected at database side
assert (result.rows === 10);
assert (result.rows_before_limit_at_least === 10);
done ();
stream.on ('data', function (row) {
rows.push (row);
it ("selects numbers asynchronously using stream and query submitted in the POST body", function (done) {
var ch = new ClickHouse ({host: host, port: port});
var metadata;
var rows = [];
var stream = ch.query ("SELECT number FROM system.numbers LIMIT 10");
stream.on ('metadata', function (_meta) {
metadata = _meta;
stream.on ('data', function (row) {
rows.push (row);
stream.on ('error', function (err) {
it ("selects from system columns no more than 10 rows throws exception", function (done) {
var ch = new ClickHouse ({host: host, port: port, queryOptions: {max_rows_to_read: 10}});
ch.query ("SELECT * FROM system.columns", function (err, result) {
assert (err);
stream.on ('end', function () {
assert (metadata, "result should be Object with `meta` key to represent rows");
assert (rows, "result should be Object with `data` key to represent column info");
assert (metadata.constructor === Array, "metadata is an array with column descriptions");
assert (metadata[0].name === "number");
assert (rows.length === 10, "total 10 rows");
assert (rows[0].constructor === Array, "each row contains list of values (using FORMAT JSONCompact)");
assert (rows[9][0] === "9"); // this should be corrected at database side
assert (stream.supplemental.rows === 10);
assert (stream.supplemental.rows_before_limit_at_least === 10);

@@ -153,41 +92,2 @@ done ();

it ("selects number objects asynchronously using stream and query submitted in the POST body", function (done) {
var ch = new ClickHouse ({host: host, port: port});
var metadata;
var rows = [];
var stream = ch.query ("SELECT number FROM system.numbers LIMIT 10", {dataObjects: true});
stream.on ('metadata', function (_meta) {
metadata = _meta;
stream.on ('data', function (row) {
rows.push (row);
stream.on ('error', function (err) {
assert (err);
stream.on ('end', function () {
assert (metadata, "result should be Object with `meta` key to represent rows");
assert (rows, "result should be Object with `data` key to represent column info");
assert (metadata.constructor === Array, "metadata is an array with column descriptions");
assert (metadata[0].name === "number");
assert (rows.length === 10, "total 10 rows");
assert ('number' in rows[0], "each row contains fields (using FORMAT JSON)");
assert (rows[9].number === "9"); // this should be corrected at database side
assert (stream.supplemental.rows === 10);
assert (stream.supplemental.rows_before_limit_at_least === 10);
done ();
it ("selects from system columns", function (done) {
var ch = new ClickHouse ({host: host, port: port});
ch.query ("SELECT * FROM system.columns", function (err, result) {
assert (!err);
done ();
it ("creates a database", function (done) {

@@ -194,0 +94,0 @@ var ch = new ClickHouse ({host: host, port: port});

SocketSocket SOC 2 Logo


  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog



Stay in touch

Get open source security insights delivered straight into your inbox.

  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc