Socket
Socket
Sign inDemoInstall

@clickhouse/client

Package Overview
Dependencies
Maintainers
4
Versions
44
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@clickhouse/client - npm Package Compare versions

Comparing version 0.2.10 to 0.3.0-beta.1

dist/connection/compression.d.ts

17

dist/client.d.ts

@@ -13,15 +13,6 @@ /// <reference types="node" />

enabled?: boolean;
/** How long to keep a particular open socket alive
* on the client side (in milliseconds).
* Should be less than the server setting
* (see `keep_alive_timeout` in server's `config.xml`).
* Currently, has no effect if {@link retry_on_expired_socket}
* is unset or false. Default value: 2500
* (based on the default ClickHouse server setting, which is 3000) */
socket_ttl?: number;
/** If the client detects a potentially expired socket based on the
* {@link socket_ttl}, this socket will be immediately destroyed
* before sending the request, and this request will be retried
* with a new socket up to 3 times. Default: false (no retries) */
retry_on_expired_socket?: boolean;
/** For how long keep a particular idle socket alive on the client side (in milliseconds).
* It is supposed to be a fair bit less that the ClickHouse server KeepAlive timeout, which is by default 3000 ms.
* Default value: 2500 */
idle_socket_ttl?: number;
};

@@ -28,0 +19,0 @@ };

@@ -26,4 +26,3 @@ "use strict";

enabled: config?.keep_alive?.enabled ?? true,
socket_ttl: config?.keep_alive?.socket_ttl ?? 2500,
retry_on_expired_socket: config?.keep_alive?.retry_on_expired_socket ?? false,
idle_socket_ttl: config?.keep_alive?.idle_socket_ttl ?? 2500,
};

@@ -30,0 +29,0 @@ return new client_common_1.ClickHouseClient({

@@ -11,4 +11,3 @@ /// <reference types="node" />

enabled: boolean;
socket_ttl: number;
retry_on_expired_socket: boolean;
idle_socket_ttl: number;
};

@@ -29,3 +28,3 @@ };

body?: string | Stream.Readable;
abort_signal?: AbortSignal;
abort_signal: AbortSignal;
decompress_response?: boolean;

@@ -40,4 +39,4 @@ compress_request?: boolean;

private readonly logger;
private readonly retry_expired_sockets;
private readonly known_sockets;
private readonly knownSockets;
private readonly idleSocketTTL;
protected constructor(params: NodeConnectionParams, agent: Http.Agent);

@@ -47,3 +46,2 @@ protected buildDefaultHeaders(username: string, password: string, additional_headers?: Record<string, string>): Http.OutgoingHttpHeaders;

private request;
private _request;
ping(): Promise<ConnPingResult>;

@@ -54,5 +52,8 @@ query(params: ConnBaseQueryParams): Promise<ConnQueryResult<Stream.Readable>>;

close(): Promise<void>;
private getQueryId;
private getAbortController;
private logResponse;
private drainHttpResponse;
private logRequestError;
private httpRequestErrorMessage;
private parseSummary;
}

@@ -12,3 +12,4 @@ "use strict";

const utils_1 = require("../utils");
const expiredSocketMessage = 'expired socket';
const compression_1 = require("./compression");
const stream_2 = require("./stream");
class NodeBaseConnection {

@@ -40,17 +41,16 @@ constructor(params, agent) {

});
Object.defineProperty(this, "retry_expired_sockets", {
Object.defineProperty(this, "knownSockets", {
enumerable: true,
configurable: true,
writable: true,
value: void 0
value: new WeakMap()
});
Object.defineProperty(this, "known_sockets", {
Object.defineProperty(this, "idleSocketTTL", {
enumerable: true,
configurable: true,
writable: true,
value: new WeakMap()
value: void 0
});
this.logger = params.logWriter;
this.retry_expired_sockets =
params.keep_alive.enabled && params.keep_alive.retry_on_expired_socket;
this.logger = params.log_writer;
this.idleSocketTTL = params.keep_alive.idle_socket_ttl;
this.headers = this.buildDefaultHeaders(params.username, params.password, params.additional_headers);

@@ -60,2 +60,4 @@ }

return {
// KeepAlive agent for some reason does not set this on its own
Connection: this.params.keep_alive.enabled ? 'keep-alive' : 'close',
Authorization: `Basic ${Buffer.from(`${username}:${password}`).toString('base64')}`,

@@ -66,23 +68,3 @@ 'User-Agent': (0, utils_1.getUserAgent)(this.params.application_id),

}
async request(params, retryCount = 0) {
try {
return await this._request(params);
}
catch (e) {
if (e instanceof Error && e.message === expiredSocketMessage) {
if (this.retry_expired_sockets && retryCount < 3) {
this.logger.trace({
module: 'Connection',
message: `Keep-Alive socket is expired, retrying with a new one, retries so far: ${retryCount}`,
});
return await this.request(params, retryCount + 1);
}
else {
throw new Error(`Socket hang up after ${retryCount} retries`);
}
}
throw e;
}
}
async _request(params) {
async request(params, op) {
return new Promise((resolve, reject) => {

@@ -96,5 +78,5 @@ const start = Date.now();

const onResponse = async (_response) => {
this.logResponse(request, params, _response, start);
const decompressionResult = decompressResponse(_response);
if (isDecompressionError(decompressionResult)) {
this.logResponse(op, request, params, _response, start);
const decompressionResult = (0, compression_1.decompressResponse)(_response);
if ((0, compression_1.isDecompressionError)(decompressionResult)) {
return reject(decompressionResult.error);

@@ -106,3 +88,3 @@ }

summary: params.parse_summary
? this.parseSummary(_response)
? this.parseSummary(op, _response)
: undefined,

@@ -155,58 +137,63 @@ });

const onSocket = (socket) => {
if (this.retry_expired_sockets) {
// if socket is reused
const socketInfo = this.known_sockets.get(socket);
if (socketInfo !== undefined) {
if (this.params.keep_alive.enabled) {
const socketInfo = this.knownSockets.get(socket);
// It is the first time we encounter this socket,
// so it doesn't have the idle timeout handler attached to it
if (socketInfo === undefined) {
const socketId = crypto_1.default.randomUUID();
this.logger.trace({
module: 'Connection',
message: `Reused socket ${socketInfo.id}`,
message: `Using a fresh socket ${socketId}, setting up a new 'free' listener`,
});
// if a socket was reused at an unfortunate time,
// and is likely about to expire
const isPossiblyExpired = Date.now() - socketInfo.last_used_time >
this.params.keep_alive.socket_ttl;
if (isPossiblyExpired) {
this.knownSockets.set(socket, {
id: socketId,
idle_timeout_handle: undefined,
});
// When the request is complete and the socket is released,
// make sure that the socket is removed after `idleSocketTTL`.
socket.on('free', () => {
this.logger.trace({
module: 'Connection',
message: 'Socket should be expired - terminate it',
message: `Socket ${socketId} was released`,
});
this.known_sockets.delete(socket);
socket.destroy(); // immediately terminate the connection
request.destroy();
reject(new Error(expiredSocketMessage));
}
else {
// Avoiding the built-in socket.timeout() method usage here,
// as we don't want to clash with the actual request timeout.
const idleTimeoutHandle = setTimeout(() => {
this.logger.trace({
message: `Removing socket ${socketId} after ${this.idleSocketTTL} ms of idle`,
});
this.knownSockets.delete(socket);
socket.destroy();
}, this.idleSocketTTL).unref();
this.knownSockets.set(socket, {
id: socketId,
idle_timeout_handle: idleTimeoutHandle,
});
});
const cleanup = () => {
const maybeSocketInfo = this.knownSockets.get(socket);
// clean up a possibly dangling idle timeout handle (preventing leaks)
if (maybeSocketInfo?.idle_timeout_handle) {
clearTimeout(maybeSocketInfo.idle_timeout_handle);
}
this.logger.trace({
module: 'Connection',
message: `Socket ${socketInfo.id} is safe to be reused`,
message: `Socket ${socketId} was closed or ended, 'free' listener removed`,
});
this.known_sockets.set(socket, {
id: socketInfo.id,
last_used_time: Date.now(),
});
pipeStream();
}
};
socket.once('end', cleanup);
socket.once('close', cleanup);
}
else {
const socketId = crypto_1.default.randomUUID();
clearTimeout(socketInfo.idle_timeout_handle);
this.logger.trace({
module: 'Connection',
message: `Using a new socket ${socketId}`,
message: `Reusing socket ${socketInfo.id}`,
});
this.known_sockets.set(socket, {
id: socketId,
last_used_time: Date.now(),
this.knownSockets.set(socket, {
...socketInfo,
idle_timeout_handle: undefined,
});
pipeStream();
}
}
else {
// no need to track the reused sockets;
// keep alive is disabled or retry mechanism is not enabled
pipeStream();
}
// this is for request timeout only.
// The socket won't be actually destroyed,
// and it will be returned to the pool.
// TODO: investigate if can actually remove the idle sockets properly
// Socket is "prepared" with idle handlers, continue with our request
pipeStream();
// This is for request timeout only. Surprisingly, it is not always enough to set in the HTTP request.
// The socket won't be actually destroyed, and it will be returned to the pool.
socket.setTimeout(this.params.request_timeout, onTimeout);

@@ -244,2 +231,3 @@ };

async ping() {
const abortController = new AbortController();
try {

@@ -249,18 +237,24 @@ const { stream } = await this.request({

url: (0, client_common_1.transformUrl)({ url: this.params.url, pathname: '/ping' }),
});
stream.destroy();
abort_signal: abortController.signal,
}, 'Ping');
await (0, stream_2.drainStream)(stream);
return { success: true };
}
catch (error) {
if (error instanceof Error) {
return {
success: false,
error,
};
}
throw error; // should never happen
// it is used to ensure that the outgoing request is terminated,
// and we don't get an unhandled error propagation later
abortController.abort('Ping failed');
// not an error, as this might be semi-expected
this.logger.warn({
message: this.httpRequestErrorMessage('Ping'),
err: error,
});
return {
success: false,
error: error, // should NOT be propagated to the user
};
}
}
async query(params) {
const query_id = getQueryId(params.query_id);
const query_id = this.getQueryId(params.query_id);
const clickhouse_settings = (0, client_common_1.withHttpSettings)(params.clickhouse_settings, this.params.compression.decompress_response);

@@ -274,16 +268,38 @@ const searchParams = (0, client_common_1.toSearchParams)({

});
const { stream } = await this.request({
method: 'POST',
url: (0, client_common_1.transformUrl)({ url: this.params.url, searchParams }),
body: params.query,
abort_signal: params.abort_signal,
decompress_response: clickhouse_settings.enable_http_compression === 1,
});
return {
stream,
query_id,
};
const decompressResponse = clickhouse_settings.enable_http_compression === 1;
const { controller, controllerCleanup } = this.getAbortController(params);
try {
const { stream } = await this.request({
method: 'POST',
url: (0, client_common_1.transformUrl)({ url: this.params.url, searchParams }),
body: params.query,
abort_signal: controller.signal,
decompress_response: decompressResponse,
}, 'Query');
return {
stream,
query_id,
};
}
catch (err) {
controller.abort('Query HTTP request failed');
this.logRequestError({
op: 'Query',
query_id: query_id,
query_params: params,
search_params: searchParams,
err: err,
extra_args: {
decompress_response: decompressResponse,
clickhouse_settings,
},
});
throw err; // should be propagated to the user
}
finally {
controllerCleanup();
}
}
async exec(params) {
const query_id = getQueryId(params.query_id);
const query_id = this.getQueryId(params.query_id);
const searchParams = (0, client_common_1.toSearchParams)({

@@ -296,17 +312,37 @@ database: this.params.database,

});
const { stream, summary } = await this.request({
method: 'POST',
url: (0, client_common_1.transformUrl)({ url: this.params.url, searchParams }),
body: params.query,
abort_signal: params.abort_signal,
parse_summary: true,
});
return {
stream,
query_id,
summary,
};
const { controller, controllerCleanup } = this.getAbortController(params);
try {
const { stream, summary } = await this.request({
method: 'POST',
url: (0, client_common_1.transformUrl)({ url: this.params.url, searchParams }),
body: params.query,
abort_signal: controller.signal,
parse_summary: true,
}, 'Exec');
return {
stream,
query_id,
summary,
};
}
catch (err) {
controller.abort('Exec HTTP request failed');
this.logRequestError({
op: 'Exec',
query_id: query_id,
query_params: params,
search_params: searchParams,
err: err,
extra_args: {
clickhouse_settings: params.clickhouse_settings ?? {},
},
});
throw err; // should be propagated to the user
}
finally {
controllerCleanup();
}
}
async insert(params) {
const query_id = getQueryId(params.query_id);
const query_id = this.getQueryId(params.query_id);
const searchParams = (0, client_common_1.toSearchParams)({

@@ -320,12 +356,32 @@ database: this.params.database,

});
const { stream, summary } = await this.request({
method: 'POST',
url: (0, client_common_1.transformUrl)({ url: this.params.url, searchParams }),
body: params.values,
abort_signal: params.abort_signal,
compress_request: this.params.compression.compress_request,
parse_summary: true,
});
await this.drainHttpResponse(stream);
return { query_id, summary };
const { controller, controllerCleanup } = this.getAbortController(params);
try {
const { stream, summary } = await this.request({
method: 'POST',
url: (0, client_common_1.transformUrl)({ url: this.params.url, searchParams }),
body: params.values,
abort_signal: controller.signal,
compress_request: this.params.compression.compress_request,
parse_summary: true,
}, 'Insert');
await (0, stream_2.drainStream)(stream);
return { query_id, summary };
}
catch (err) {
controller.abort('Insert HTTP request failed');
this.logRequestError({
op: 'Insert',
query_id: query_id,
query_params: params,
search_params: searchParams,
err: err,
extra_args: {
clickhouse_settings: params.clickhouse_settings ?? {},
},
});
throw err; // should be propagated to the user
}
finally {
controllerCleanup();
}
}

@@ -337,9 +393,26 @@ async close() {

}
logResponse(request, params, response, startTimestamp) {
getQueryId(query_id) {
return query_id || crypto_1.default.randomUUID();
}
// a wrapper over the user's Signal to terminate the failed requests
getAbortController(params) {
const controller = new AbortController();
function onAbort() {
controller.abort();
}
params.abort_signal?.addEventListener('abort', onAbort);
return {
controller,
controllerCleanup: () => {
params.abort_signal?.removeEventListener('abort', onAbort);
},
};
}
logResponse(op, request, params, response, startTimestamp) {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { authorization, host, ...headers } = request.getHeaders();
const duration = Date.now() - startTimestamp;
this.params.logWriter.debug({
this.params.log_writer.debug({
module: 'HTTP Adapter',
message: 'Got a response from ClickHouse',
message: `${op}: got a response from ClickHouse`,
args: {

@@ -356,31 +429,20 @@ request_method: params.method,

}
async drainHttpResponse(stream) {
return new Promise((resolve, reject) => {
function dropData() {
// We don't care about the data
}
function onEnd() {
removeListeners();
resolve();
}
function onError(err) {
removeListeners();
reject(err);
}
function onClose() {
removeListeners();
}
function removeListeners() {
stream.removeListener('data', dropData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onError);
stream.removeListener('onClose', onClose);
}
stream.on('data', dropData);
stream.on('end', onEnd);
stream.on('error', onError);
stream.on('close', onClose);
logRequestError({ op, err, query_id, query_params, search_params, extra_args, }) {
this.logger.error({
message: this.httpRequestErrorMessage(op),
err: err,
args: {
query: query_params.query,
search_params: search_params?.toString() ?? '',
with_abort_signal: query_params.abort_signal !== undefined,
session_id: query_params.session_id,
query_id: query_id,
...extra_args,
},
});
}
parseSummary(response) {
httpRequestErrorMessage(op) {
return `${op}: HTTP request error.`;
}
parseSummary(op, response) {
const summaryHeader = response.headers['x-clickhouse-summary'];

@@ -393,4 +455,6 @@ if (typeof summaryHeader === 'string') {

this.logger.error({
module: 'Connection',
message: `Failed to parse X-ClickHouse-Summary header, got: ${summaryHeader}`,
message: `${op}: failed to parse X-ClickHouse-Summary header.`,
args: {
'X-ClickHouse-Summary': summaryHeader,
},
err: err,

@@ -403,26 +467,2 @@ });

exports.NodeBaseConnection = NodeBaseConnection;
function decompressResponse(response) {
const encoding = response.headers['content-encoding'];
if (encoding === 'gzip') {
return {
response: stream_1.default.pipeline(response, zlib_1.default.createGunzip(), function pipelineCb(err) {
if (err) {
console.error(err);
}
}),
};
}
else if (encoding !== undefined) {
return {
error: new Error(`Unexpected encoding: ${encoding}`),
};
}
return { response };
}
function isDecompressionError(result) {
return result.error !== undefined;
}
function getQueryId(query_id) {
return query_id || crypto_1.default.randomUUID();
}
//# sourceMappingURL=node_base_connection.js.map

@@ -22,2 +22,3 @@ "use strict";

agent: this.agent,
timeout: this.params.request_timeout,
headers: (0, client_common_1.withCompressionHeaders)({

@@ -24,0 +25,0 @@ headers: this.headers,

@@ -41,2 +41,3 @@ "use strict";

agent: this.agent,
timeout: this.params.request_timeout,
headers: (0, client_common_1.withCompressionHeaders)({

@@ -43,0 +44,0 @@ headers: this.headers,

export { createConnection, createClient } from './client';
export { ResultSet } from './result_set';
/** Re-export @clickhouse/client-common types */
export { type BaseClickHouseClientConfigOptions, type ClickHouseClientConfigOptions, type BaseQueryParams, type QueryParams, type ExecParams, type InsertParams, type InsertValues, type CommandParams, type CommandResult, type ExecResult, type InsertResult, type DataFormat, type ErrorLogParams, type Logger, type LogParams, type ClickHouseSettings, type MergeTreeSettings, type Row, type ResponseJSON, type InputJSON, type InputJSONObjectEachRow, type BaseResultSet, type PingResult, ClickHouseError, ClickHouseLogLevel, ClickHouseClient, SettingsMap, } from '@clickhouse/client-common';
export { type BaseClickHouseClientConfigOptions, type ClickHouseClientConfigOptions, type BaseQueryParams, type QueryParams, type ExecParams, type InsertParams, type InsertValues, type CommandParams, type CommandResult, type ExecResult, type InsertResult, type DataFormat, type Logger, type LogParams, type ErrorLogParams, type WarnLogParams, type ClickHouseSettings, type MergeTreeSettings, type Row, type ResponseJSON, type InputJSON, type InputJSONObjectEachRow, type BaseResultSet, type PingResult, ClickHouseError, ClickHouseLogLevel, ClickHouseClient, SettingsMap, } from '@clickhouse/client-common';

@@ -1,2 +0,2 @@

declare const _default: "0.2.10";
declare const _default: "0.3.0-beta.1";
export default _default;
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.default = '0.2.10';
exports.default = '0.3.0-beta.1';
//# sourceMappingURL=version.js.map

@@ -5,3 +5,3 @@ {

"homepage": "https://clickhouse.com",
"version": "0.2.10",
"version": "0.3.0-beta.1",
"license": "Apache-2.0",

@@ -27,4 +27,4 @@ "keywords": [

"dependencies": {
"@clickhouse/client-common": "0.2.10"
"@clickhouse/client-common": "0.3.0-beta.1"
}
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc