@clickhouse/client
Advanced tools
Comparing version 0.2.10 to 0.3.0-beta.1
@@ -13,15 +13,6 @@ /// <reference types="node" /> | ||
enabled?: boolean; | ||
/** How long to keep a particular open socket alive | ||
* on the client side (in milliseconds). | ||
* Should be less than the server setting | ||
* (see `keep_alive_timeout` in server's `config.xml`). | ||
* Currently, has no effect if {@link retry_on_expired_socket} | ||
* is unset or false. Default value: 2500 | ||
* (based on the default ClickHouse server setting, which is 3000) */ | ||
socket_ttl?: number; | ||
/** If the client detects a potentially expired socket based on the | ||
* {@link socket_ttl}, this socket will be immediately destroyed | ||
* before sending the request, and this request will be retried | ||
* with a new socket up to 3 times. Default: false (no retries) */ | ||
retry_on_expired_socket?: boolean; | ||
/** For how long keep a particular idle socket alive on the client side (in milliseconds). | ||
* It is supposed to be a fair bit less that the ClickHouse server KeepAlive timeout, which is by default 3000 ms. | ||
* Default value: 2500 */ | ||
idle_socket_ttl?: number; | ||
}; | ||
@@ -28,0 +19,0 @@ }; |
@@ -26,4 +26,3 @@ "use strict"; | ||
enabled: config?.keep_alive?.enabled ?? true, | ||
socket_ttl: config?.keep_alive?.socket_ttl ?? 2500, | ||
retry_on_expired_socket: config?.keep_alive?.retry_on_expired_socket ?? false, | ||
idle_socket_ttl: config?.keep_alive?.idle_socket_ttl ?? 2500, | ||
}; | ||
@@ -30,0 +29,0 @@ return new client_common_1.ClickHouseClient({ |
@@ -11,4 +11,3 @@ /// <reference types="node" /> | ||
enabled: boolean; | ||
socket_ttl: number; | ||
retry_on_expired_socket: boolean; | ||
idle_socket_ttl: number; | ||
}; | ||
@@ -29,3 +28,3 @@ }; | ||
body?: string | Stream.Readable; | ||
abort_signal?: AbortSignal; | ||
abort_signal: AbortSignal; | ||
decompress_response?: boolean; | ||
@@ -40,4 +39,4 @@ compress_request?: boolean; | ||
private readonly logger; | ||
private readonly retry_expired_sockets; | ||
private readonly known_sockets; | ||
private readonly knownSockets; | ||
private readonly idleSocketTTL; | ||
protected constructor(params: NodeConnectionParams, agent: Http.Agent); | ||
@@ -47,3 +46,2 @@ protected buildDefaultHeaders(username: string, password: string, additional_headers?: Record<string, string>): Http.OutgoingHttpHeaders; | ||
private request; | ||
private _request; | ||
ping(): Promise<ConnPingResult>; | ||
@@ -54,5 +52,8 @@ query(params: ConnBaseQueryParams): Promise<ConnQueryResult<Stream.Readable>>; | ||
close(): Promise<void>; | ||
private getQueryId; | ||
private getAbortController; | ||
private logResponse; | ||
private drainHttpResponse; | ||
private logRequestError; | ||
private httpRequestErrorMessage; | ||
private parseSummary; | ||
} |
@@ -12,3 +12,4 @@ "use strict"; | ||
const utils_1 = require("../utils"); | ||
const expiredSocketMessage = 'expired socket'; | ||
const compression_1 = require("./compression"); | ||
const stream_2 = require("./stream"); | ||
class NodeBaseConnection { | ||
@@ -40,17 +41,16 @@ constructor(params, agent) { | ||
}); | ||
Object.defineProperty(this, "retry_expired_sockets", { | ||
Object.defineProperty(this, "knownSockets", { | ||
enumerable: true, | ||
configurable: true, | ||
writable: true, | ||
value: void 0 | ||
value: new WeakMap() | ||
}); | ||
Object.defineProperty(this, "known_sockets", { | ||
Object.defineProperty(this, "idleSocketTTL", { | ||
enumerable: true, | ||
configurable: true, | ||
writable: true, | ||
value: new WeakMap() | ||
value: void 0 | ||
}); | ||
this.logger = params.logWriter; | ||
this.retry_expired_sockets = | ||
params.keep_alive.enabled && params.keep_alive.retry_on_expired_socket; | ||
this.logger = params.log_writer; | ||
this.idleSocketTTL = params.keep_alive.idle_socket_ttl; | ||
this.headers = this.buildDefaultHeaders(params.username, params.password, params.additional_headers); | ||
@@ -60,2 +60,4 @@ } | ||
return { | ||
// KeepAlive agent for some reason does not set this on its own | ||
Connection: this.params.keep_alive.enabled ? 'keep-alive' : 'close', | ||
Authorization: `Basic ${Buffer.from(`${username}:${password}`).toString('base64')}`, | ||
@@ -66,23 +68,3 @@ 'User-Agent': (0, utils_1.getUserAgent)(this.params.application_id), | ||
} | ||
async request(params, retryCount = 0) { | ||
try { | ||
return await this._request(params); | ||
} | ||
catch (e) { | ||
if (e instanceof Error && e.message === expiredSocketMessage) { | ||
if (this.retry_expired_sockets && retryCount < 3) { | ||
this.logger.trace({ | ||
module: 'Connection', | ||
message: `Keep-Alive socket is expired, retrying with a new one, retries so far: ${retryCount}`, | ||
}); | ||
return await this.request(params, retryCount + 1); | ||
} | ||
else { | ||
throw new Error(`Socket hang up after ${retryCount} retries`); | ||
} | ||
} | ||
throw e; | ||
} | ||
} | ||
async _request(params) { | ||
async request(params, op) { | ||
return new Promise((resolve, reject) => { | ||
@@ -96,5 +78,5 @@ const start = Date.now(); | ||
const onResponse = async (_response) => { | ||
this.logResponse(request, params, _response, start); | ||
const decompressionResult = decompressResponse(_response); | ||
if (isDecompressionError(decompressionResult)) { | ||
this.logResponse(op, request, params, _response, start); | ||
const decompressionResult = (0, compression_1.decompressResponse)(_response); | ||
if ((0, compression_1.isDecompressionError)(decompressionResult)) { | ||
return reject(decompressionResult.error); | ||
@@ -106,3 +88,3 @@ } | ||
summary: params.parse_summary | ||
? this.parseSummary(_response) | ||
? this.parseSummary(op, _response) | ||
: undefined, | ||
@@ -155,58 +137,63 @@ }); | ||
const onSocket = (socket) => { | ||
if (this.retry_expired_sockets) { | ||
// if socket is reused | ||
const socketInfo = this.known_sockets.get(socket); | ||
if (socketInfo !== undefined) { | ||
if (this.params.keep_alive.enabled) { | ||
const socketInfo = this.knownSockets.get(socket); | ||
// It is the first time we encounter this socket, | ||
// so it doesn't have the idle timeout handler attached to it | ||
if (socketInfo === undefined) { | ||
const socketId = crypto_1.default.randomUUID(); | ||
this.logger.trace({ | ||
module: 'Connection', | ||
message: `Reused socket ${socketInfo.id}`, | ||
message: `Using a fresh socket ${socketId}, setting up a new 'free' listener`, | ||
}); | ||
// if a socket was reused at an unfortunate time, | ||
// and is likely about to expire | ||
const isPossiblyExpired = Date.now() - socketInfo.last_used_time > | ||
this.params.keep_alive.socket_ttl; | ||
if (isPossiblyExpired) { | ||
this.knownSockets.set(socket, { | ||
id: socketId, | ||
idle_timeout_handle: undefined, | ||
}); | ||
// When the request is complete and the socket is released, | ||
// make sure that the socket is removed after `idleSocketTTL`. | ||
socket.on('free', () => { | ||
this.logger.trace({ | ||
module: 'Connection', | ||
message: 'Socket should be expired - terminate it', | ||
message: `Socket ${socketId} was released`, | ||
}); | ||
this.known_sockets.delete(socket); | ||
socket.destroy(); // immediately terminate the connection | ||
request.destroy(); | ||
reject(new Error(expiredSocketMessage)); | ||
} | ||
else { | ||
// Avoiding the built-in socket.timeout() method usage here, | ||
// as we don't want to clash with the actual request timeout. | ||
const idleTimeoutHandle = setTimeout(() => { | ||
this.logger.trace({ | ||
message: `Removing socket ${socketId} after ${this.idleSocketTTL} ms of idle`, | ||
}); | ||
this.knownSockets.delete(socket); | ||
socket.destroy(); | ||
}, this.idleSocketTTL).unref(); | ||
this.knownSockets.set(socket, { | ||
id: socketId, | ||
idle_timeout_handle: idleTimeoutHandle, | ||
}); | ||
}); | ||
const cleanup = () => { | ||
const maybeSocketInfo = this.knownSockets.get(socket); | ||
// clean up a possibly dangling idle timeout handle (preventing leaks) | ||
if (maybeSocketInfo?.idle_timeout_handle) { | ||
clearTimeout(maybeSocketInfo.idle_timeout_handle); | ||
} | ||
this.logger.trace({ | ||
module: 'Connection', | ||
message: `Socket ${socketInfo.id} is safe to be reused`, | ||
message: `Socket ${socketId} was closed or ended, 'free' listener removed`, | ||
}); | ||
this.known_sockets.set(socket, { | ||
id: socketInfo.id, | ||
last_used_time: Date.now(), | ||
}); | ||
pipeStream(); | ||
} | ||
}; | ||
socket.once('end', cleanup); | ||
socket.once('close', cleanup); | ||
} | ||
else { | ||
const socketId = crypto_1.default.randomUUID(); | ||
clearTimeout(socketInfo.idle_timeout_handle); | ||
this.logger.trace({ | ||
module: 'Connection', | ||
message: `Using a new socket ${socketId}`, | ||
message: `Reusing socket ${socketInfo.id}`, | ||
}); | ||
this.known_sockets.set(socket, { | ||
id: socketId, | ||
last_used_time: Date.now(), | ||
this.knownSockets.set(socket, { | ||
...socketInfo, | ||
idle_timeout_handle: undefined, | ||
}); | ||
pipeStream(); | ||
} | ||
} | ||
else { | ||
// no need to track the reused sockets; | ||
// keep alive is disabled or retry mechanism is not enabled | ||
pipeStream(); | ||
} | ||
// this is for request timeout only. | ||
// The socket won't be actually destroyed, | ||
// and it will be returned to the pool. | ||
// TODO: investigate if can actually remove the idle sockets properly | ||
// Socket is "prepared" with idle handlers, continue with our request | ||
pipeStream(); | ||
// This is for request timeout only. Surprisingly, it is not always enough to set in the HTTP request. | ||
// The socket won't be actually destroyed, and it will be returned to the pool. | ||
socket.setTimeout(this.params.request_timeout, onTimeout); | ||
@@ -244,2 +231,3 @@ }; | ||
async ping() { | ||
const abortController = new AbortController(); | ||
try { | ||
@@ -249,18 +237,24 @@ const { stream } = await this.request({ | ||
url: (0, client_common_1.transformUrl)({ url: this.params.url, pathname: '/ping' }), | ||
}); | ||
stream.destroy(); | ||
abort_signal: abortController.signal, | ||
}, 'Ping'); | ||
await (0, stream_2.drainStream)(stream); | ||
return { success: true }; | ||
} | ||
catch (error) { | ||
if (error instanceof Error) { | ||
return { | ||
success: false, | ||
error, | ||
}; | ||
} | ||
throw error; // should never happen | ||
// it is used to ensure that the outgoing request is terminated, | ||
// and we don't get an unhandled error propagation later | ||
abortController.abort('Ping failed'); | ||
// not an error, as this might be semi-expected | ||
this.logger.warn({ | ||
message: this.httpRequestErrorMessage('Ping'), | ||
err: error, | ||
}); | ||
return { | ||
success: false, | ||
error: error, // should NOT be propagated to the user | ||
}; | ||
} | ||
} | ||
async query(params) { | ||
const query_id = getQueryId(params.query_id); | ||
const query_id = this.getQueryId(params.query_id); | ||
const clickhouse_settings = (0, client_common_1.withHttpSettings)(params.clickhouse_settings, this.params.compression.decompress_response); | ||
@@ -274,16 +268,38 @@ const searchParams = (0, client_common_1.toSearchParams)({ | ||
}); | ||
const { stream } = await this.request({ | ||
method: 'POST', | ||
url: (0, client_common_1.transformUrl)({ url: this.params.url, searchParams }), | ||
body: params.query, | ||
abort_signal: params.abort_signal, | ||
decompress_response: clickhouse_settings.enable_http_compression === 1, | ||
}); | ||
return { | ||
stream, | ||
query_id, | ||
}; | ||
const decompressResponse = clickhouse_settings.enable_http_compression === 1; | ||
const { controller, controllerCleanup } = this.getAbortController(params); | ||
try { | ||
const { stream } = await this.request({ | ||
method: 'POST', | ||
url: (0, client_common_1.transformUrl)({ url: this.params.url, searchParams }), | ||
body: params.query, | ||
abort_signal: controller.signal, | ||
decompress_response: decompressResponse, | ||
}, 'Query'); | ||
return { | ||
stream, | ||
query_id, | ||
}; | ||
} | ||
catch (err) { | ||
controller.abort('Query HTTP request failed'); | ||
this.logRequestError({ | ||
op: 'Query', | ||
query_id: query_id, | ||
query_params: params, | ||
search_params: searchParams, | ||
err: err, | ||
extra_args: { | ||
decompress_response: decompressResponse, | ||
clickhouse_settings, | ||
}, | ||
}); | ||
throw err; // should be propagated to the user | ||
} | ||
finally { | ||
controllerCleanup(); | ||
} | ||
} | ||
async exec(params) { | ||
const query_id = getQueryId(params.query_id); | ||
const query_id = this.getQueryId(params.query_id); | ||
const searchParams = (0, client_common_1.toSearchParams)({ | ||
@@ -296,17 +312,37 @@ database: this.params.database, | ||
}); | ||
const { stream, summary } = await this.request({ | ||
method: 'POST', | ||
url: (0, client_common_1.transformUrl)({ url: this.params.url, searchParams }), | ||
body: params.query, | ||
abort_signal: params.abort_signal, | ||
parse_summary: true, | ||
}); | ||
return { | ||
stream, | ||
query_id, | ||
summary, | ||
}; | ||
const { controller, controllerCleanup } = this.getAbortController(params); | ||
try { | ||
const { stream, summary } = await this.request({ | ||
method: 'POST', | ||
url: (0, client_common_1.transformUrl)({ url: this.params.url, searchParams }), | ||
body: params.query, | ||
abort_signal: controller.signal, | ||
parse_summary: true, | ||
}, 'Exec'); | ||
return { | ||
stream, | ||
query_id, | ||
summary, | ||
}; | ||
} | ||
catch (err) { | ||
controller.abort('Exec HTTP request failed'); | ||
this.logRequestError({ | ||
op: 'Exec', | ||
query_id: query_id, | ||
query_params: params, | ||
search_params: searchParams, | ||
err: err, | ||
extra_args: { | ||
clickhouse_settings: params.clickhouse_settings ?? {}, | ||
}, | ||
}); | ||
throw err; // should be propagated to the user | ||
} | ||
finally { | ||
controllerCleanup(); | ||
} | ||
} | ||
async insert(params) { | ||
const query_id = getQueryId(params.query_id); | ||
const query_id = this.getQueryId(params.query_id); | ||
const searchParams = (0, client_common_1.toSearchParams)({ | ||
@@ -320,12 +356,32 @@ database: this.params.database, | ||
}); | ||
const { stream, summary } = await this.request({ | ||
method: 'POST', | ||
url: (0, client_common_1.transformUrl)({ url: this.params.url, searchParams }), | ||
body: params.values, | ||
abort_signal: params.abort_signal, | ||
compress_request: this.params.compression.compress_request, | ||
parse_summary: true, | ||
}); | ||
await this.drainHttpResponse(stream); | ||
return { query_id, summary }; | ||
const { controller, controllerCleanup } = this.getAbortController(params); | ||
try { | ||
const { stream, summary } = await this.request({ | ||
method: 'POST', | ||
url: (0, client_common_1.transformUrl)({ url: this.params.url, searchParams }), | ||
body: params.values, | ||
abort_signal: controller.signal, | ||
compress_request: this.params.compression.compress_request, | ||
parse_summary: true, | ||
}, 'Insert'); | ||
await (0, stream_2.drainStream)(stream); | ||
return { query_id, summary }; | ||
} | ||
catch (err) { | ||
controller.abort('Insert HTTP request failed'); | ||
this.logRequestError({ | ||
op: 'Insert', | ||
query_id: query_id, | ||
query_params: params, | ||
search_params: searchParams, | ||
err: err, | ||
extra_args: { | ||
clickhouse_settings: params.clickhouse_settings ?? {}, | ||
}, | ||
}); | ||
throw err; // should be propagated to the user | ||
} | ||
finally { | ||
controllerCleanup(); | ||
} | ||
} | ||
@@ -337,9 +393,26 @@ async close() { | ||
} | ||
logResponse(request, params, response, startTimestamp) { | ||
getQueryId(query_id) { | ||
return query_id || crypto_1.default.randomUUID(); | ||
} | ||
// a wrapper over the user's Signal to terminate the failed requests | ||
getAbortController(params) { | ||
const controller = new AbortController(); | ||
function onAbort() { | ||
controller.abort(); | ||
} | ||
params.abort_signal?.addEventListener('abort', onAbort); | ||
return { | ||
controller, | ||
controllerCleanup: () => { | ||
params.abort_signal?.removeEventListener('abort', onAbort); | ||
}, | ||
}; | ||
} | ||
logResponse(op, request, params, response, startTimestamp) { | ||
// eslint-disable-next-line @typescript-eslint/no-unused-vars | ||
const { authorization, host, ...headers } = request.getHeaders(); | ||
const duration = Date.now() - startTimestamp; | ||
this.params.logWriter.debug({ | ||
this.params.log_writer.debug({ | ||
module: 'HTTP Adapter', | ||
message: 'Got a response from ClickHouse', | ||
message: `${op}: got a response from ClickHouse`, | ||
args: { | ||
@@ -356,31 +429,20 @@ request_method: params.method, | ||
} | ||
async drainHttpResponse(stream) { | ||
return new Promise((resolve, reject) => { | ||
function dropData() { | ||
// We don't care about the data | ||
} | ||
function onEnd() { | ||
removeListeners(); | ||
resolve(); | ||
} | ||
function onError(err) { | ||
removeListeners(); | ||
reject(err); | ||
} | ||
function onClose() { | ||
removeListeners(); | ||
} | ||
function removeListeners() { | ||
stream.removeListener('data', dropData); | ||
stream.removeListener('end', onEnd); | ||
stream.removeListener('error', onError); | ||
stream.removeListener('onClose', onClose); | ||
} | ||
stream.on('data', dropData); | ||
stream.on('end', onEnd); | ||
stream.on('error', onError); | ||
stream.on('close', onClose); | ||
logRequestError({ op, err, query_id, query_params, search_params, extra_args, }) { | ||
this.logger.error({ | ||
message: this.httpRequestErrorMessage(op), | ||
err: err, | ||
args: { | ||
query: query_params.query, | ||
search_params: search_params?.toString() ?? '', | ||
with_abort_signal: query_params.abort_signal !== undefined, | ||
session_id: query_params.session_id, | ||
query_id: query_id, | ||
...extra_args, | ||
}, | ||
}); | ||
} | ||
parseSummary(response) { | ||
httpRequestErrorMessage(op) { | ||
return `${op}: HTTP request error.`; | ||
} | ||
parseSummary(op, response) { | ||
const summaryHeader = response.headers['x-clickhouse-summary']; | ||
@@ -393,4 +455,6 @@ if (typeof summaryHeader === 'string') { | ||
this.logger.error({ | ||
module: 'Connection', | ||
message: `Failed to parse X-ClickHouse-Summary header, got: ${summaryHeader}`, | ||
message: `${op}: failed to parse X-ClickHouse-Summary header.`, | ||
args: { | ||
'X-ClickHouse-Summary': summaryHeader, | ||
}, | ||
err: err, | ||
@@ -403,26 +467,2 @@ }); | ||
exports.NodeBaseConnection = NodeBaseConnection; | ||
function decompressResponse(response) { | ||
const encoding = response.headers['content-encoding']; | ||
if (encoding === 'gzip') { | ||
return { | ||
response: stream_1.default.pipeline(response, zlib_1.default.createGunzip(), function pipelineCb(err) { | ||
if (err) { | ||
console.error(err); | ||
} | ||
}), | ||
}; | ||
} | ||
else if (encoding !== undefined) { | ||
return { | ||
error: new Error(`Unexpected encoding: ${encoding}`), | ||
}; | ||
} | ||
return { response }; | ||
} | ||
function isDecompressionError(result) { | ||
return result.error !== undefined; | ||
} | ||
function getQueryId(query_id) { | ||
return query_id || crypto_1.default.randomUUID(); | ||
} | ||
//# sourceMappingURL=node_base_connection.js.map |
@@ -22,2 +22,3 @@ "use strict"; | ||
agent: this.agent, | ||
timeout: this.params.request_timeout, | ||
headers: (0, client_common_1.withCompressionHeaders)({ | ||
@@ -24,0 +25,0 @@ headers: this.headers, |
@@ -41,2 +41,3 @@ "use strict"; | ||
agent: this.agent, | ||
timeout: this.params.request_timeout, | ||
headers: (0, client_common_1.withCompressionHeaders)({ | ||
@@ -43,0 +44,0 @@ headers: this.headers, |
export { createConnection, createClient } from './client'; | ||
export { ResultSet } from './result_set'; | ||
/** Re-export @clickhouse/client-common types */ | ||
export { type BaseClickHouseClientConfigOptions, type ClickHouseClientConfigOptions, type BaseQueryParams, type QueryParams, type ExecParams, type InsertParams, type InsertValues, type CommandParams, type CommandResult, type ExecResult, type InsertResult, type DataFormat, type ErrorLogParams, type Logger, type LogParams, type ClickHouseSettings, type MergeTreeSettings, type Row, type ResponseJSON, type InputJSON, type InputJSONObjectEachRow, type BaseResultSet, type PingResult, ClickHouseError, ClickHouseLogLevel, ClickHouseClient, SettingsMap, } from '@clickhouse/client-common'; | ||
export { type BaseClickHouseClientConfigOptions, type ClickHouseClientConfigOptions, type BaseQueryParams, type QueryParams, type ExecParams, type InsertParams, type InsertValues, type CommandParams, type CommandResult, type ExecResult, type InsertResult, type DataFormat, type Logger, type LogParams, type ErrorLogParams, type WarnLogParams, type ClickHouseSettings, type MergeTreeSettings, type Row, type ResponseJSON, type InputJSON, type InputJSONObjectEachRow, type BaseResultSet, type PingResult, ClickHouseError, ClickHouseLogLevel, ClickHouseClient, SettingsMap, } from '@clickhouse/client-common'; |
@@ -1,2 +0,2 @@ | ||
declare const _default: "0.2.10"; | ||
declare const _default: "0.3.0-beta.1"; | ||
export default _default; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.default = '0.2.10'; | ||
exports.default = '0.3.0-beta.1'; | ||
//# sourceMappingURL=version.js.map |
@@ -5,3 +5,3 @@ { | ||
"homepage": "https://clickhouse.com", | ||
"version": "0.2.10", | ||
"version": "0.3.0-beta.1", | ||
"license": "Apache-2.0", | ||
@@ -27,4 +27,4 @@ "keywords": [ | ||
"dependencies": { | ||
"@clickhouse/client-common": "0.2.10" | ||
"@clickhouse/client-common": "0.3.0-beta.1" | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
87814
48
1159
+ Added@clickhouse/client-common@0.3.0-beta.1(transitive)
- Removed@clickhouse/client-common@0.2.10(transitive)