New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

clickhouse-cargo

Package Overview
Dependencies
Maintainers
1
Versions
20
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

clickhouse-cargo - npm Package Compare versions

Comparing version 1.1.1 to 2.0.0

lib/utils.js

353

lib/cargo.js
// Generated by CoffeeScript 2.5.1
(function() {
var Bulk, Cargo, FOLDER_PREFIX, MAX_COMMIT_PER_EXAM_ROUTINE, NOOP, assert, cluster, crypto, debuglog, electSelfToALeader, fs, os, path;
var CLUSTER_WORKER_ID, Cargo, DEFAULT_COMMIT_INTERVAL, EXTNAME_UNCOMMITTED, FILENAME_PREFIX, MAX_COMMIT_PER_EXAM_ROUTINE, MIN_ROWS, MIN_TIME, MultiStream, NOOP, StaticCountWithinProcess, assert, cluster, crypto, debuglog, detectLeaderWorker, fs, os, path, pipeline, toSQLDateString;

@@ -15,12 +15,21 @@ fs = require("fs");

({pipeline} = require('stream'));
assert = require("assert");
debuglog = require("debug")("chcargo:cargo");
//CombinedStream = require('combined-stream')
MultiStream = require('multistream');
Bulk = require("./bulk");
({detectLeaderWorker} = require("./leader_election"));
({electSelfToALeader} = require("./leader_election"));
({toSQLDateString} = require("./utils"));
FOLDER_PREFIX = "cargo-";
CLUSTER_WORKER_ID = cluster.isMaster ? "nocluster" : cluster.worker.id;
debuglog = require("debug")(`chcargo:cargo@${CLUSTER_WORKER_ID}`);
FILENAME_PREFIX = "cargo_";
EXTNAME_UNCOMMITTED = ".uncommitted";
NOOP = function() {};

@@ -30,4 +39,11 @@

MIN_TIME = 1000;
MIN_ROWS = 100;
DEFAULT_COMMIT_INTERVAL = 5000;
StaticCountWithinProcess = 0;
Cargo = class Cargo {
//toString : -> "[Cargo #{@id}@#{@workingPath}]"
toString() {

@@ -37,147 +53,244 @@ return `[Cargo ${this.id}]`;

constructor(clichouseClient, statement, bulkTTL, pathToCargoFile, skipRestoration) {
// @param ClickHouse clichouseClient
// @param SQLInsertString statement
// @param Object options:
// .pathToCargoFolder
// .maxTime
// .maxRows
// .commitInterval
constructor(clichouseClient, statement, options = {}) {
this.clichouseClient = clichouseClient;
this.statement = statement;
this.bulkTTL = bulkTTL;
debuglog(`[new Cargo] @statement:${this.statement}, @bulkTTL:${this.bulkTTL}`);
//@id = Date.now().toString(36)
this.maxTime = parseInt(options.maxTime) || MIN_TIME;
if (this.maxTime < MIN_TIME) {
this.maxTime = MIN_TIME;
}
this.maxRows = parseInt(options.maxRows) || MIN_ROWS;
if (this.maxRows < 1) {
this.maxRows = MIN_ROWS;
}
this.commitInterval = parseInt(options.commitInterval) || DEFAULT_COMMIT_INTERVAL;
if (this.commitInterval < this.maxTime) {
this.commitInterval = DEFAULT_COMMIT_INTERVAL;
}
this.id = crypto.createHash('md5').update(this.statement).digest("hex");
this.count = 0;
this.bulks = [];
//@workingPath = fs.mkdtempSync(path.join(os.tmpdir(), FOLDER_PREFIX))
this.workingPath = path.join(pathToCargoFile, FOLDER_PREFIX + this.id);
if (fs.existsSync(this.workingPath)) {
// directory already exists
assert(fs.statSync(this.workingPath).isDirectory(), `${this.workingPath} is not a directory`);
if (!skipRestoration) {
electSelfToALeader(this.id, this.restoreExistingFiles.bind(this));
assert(options.pathToCargoFolder, "missing options.pathToCargoFolder");
this.pathToCargoFolder = options.pathToCargoFolder;
this.pathToCargoFile = path.join(this.pathToCargoFolder, FILENAME_PREFIX + this.id);
debuglog(`[new Cargo] @statement:${this.statement}, @maxTime:${this.maxTime}, @maxRows:${this.maxRows}, @commitInterval:${this.commitInterval}, @pathToCargoFile:${this.pathToCargoFile}`);
// verify cargo can write to the destination folder
fs.access(this.pathToCargoFolder, fs.constants.W_OK, function(err) {
if (err != null) {
throw new Error(`Cargo not able to write to folder ${this.pathToCargoFolder}. Due to ${err}`);
}
} else {
// create directory
//if cluster.isMaster and Object.keys(cluster.workers).length is 0
//debuglog "[new Cargo] single process, try restoreExistingFiles"
//@restoreExistingFiles()
//else
//debuglog "[new Cargo] cluster worker, to elect lead"
//elector = new BonjourElector({name:@id, host:'127.0.0.1', port:9888})
//elector.on 'leader', =>
//debuglog "worker:#{cluster.worker.id} is leader, try restoreExistingFiles"
//return
//elector.on 'error', (err)=>
//debuglog "worker:#{cluster.worker.id} ELECTION error:", err
//return
//elector.on 'reelection', (err)=>
//debuglog "worker:#{cluster.worker.id} require reelection"
//return
//elector.on 'follower', (err)=>
//debuglog "worker:#{cluster.worker.id} is follower"
//return
//elector.start()
fs.mkdirSync(this.workingPath);
}
this.curBulk = null;
this.moveToNextBulk();
});
this.cachedRows = [];
this.lastFlushAt = Date.now();
this.lastCommitAt = Date.now();
return;
}
setBulkTTL(val) {
return this.bulkTTL = val;
// push row insert into memory cache
push() {
var arr, i, item, j, len;
arr = Array.from(arguments);
assert(arr.length > 0, "blank row can not be accepted.");
for (i = j = 0, len = arr.length; j < len; i = ++j) {
item = arr[i];
if (item instanceof Date) {
arr[i] = toSQLDateString(item);
}
}
this.cachedRows.push(JSON.stringify(arr));
if ((this.cachedRows.length > this.maxRows) || (Date.now() > this.lastFlushAt + this.maxRows)) {
//debuglog "[push] row? #{(@cachedRows.length > @maxRows)}, time? #{(Date.now() > @lastFlushAt + @maxRows)} "
this.flushToFile();
}
}
restoreExistingFiles() {
debuglog("[restoreExistingFiles] @workingPath:", this.workingPath);
fs.readdir(this.workingPath, (err, filenamList) => {
var existingBulkId, filename, i, len, pathToFile, stats;
// flush memory cache to the disk file
// @callbak (err, isFlushing:Boolean)
flushToFile(callbak = NOOP) {
var rowsToFlush;
//debuglog("#{@} [flushToFile] @_isFlushing:", @_isFlushing)
if (this._isFlushing) {
callbak(null, true);
return;
}
if (!(this.cachedRows.length > 0)) {
//debuglog("#{@} [flushToFile] nothing to flush")
this.lastFlushAt = Date.now();
callbak();
return;
}
rowsToFlush = this.cachedRows;
this.cachedRows = [];
debuglog(`${this} [flushToFile] -> ${rowsToFlush.length} rows`);
this._isFlushing = true;
fs.appendFile(this.pathToCargoFile, rowsToFlush.join("\n") + "\n", (err) => {
if (err != null) {
throw err;
debuglog(`${this} [flushToFile] FAILED error:`, err);
this.cachedRows = rowsToFlush.concat(this.cachedRows); // unshift data back
}
debuglog(`${this} [flushToFile] SUCCESS ${rowsToFlush.length} rows`);
this.lastFlushAt = Date.now();
this._isFlushing = false;
callbak(err);
});
}
flushSync() {
var rowsToFlush;
if (!(this.cachedRows.length > 0)) {
debuglog(`${this} [flushSync] nothing to flush`);
return;
}
rowsToFlush = this.cachedRows;
this.cachedRows = [];
debuglog(`${this} [flushSync] ${rowsToFlush.length} rows`);
fs.appendFileSync(this.pathToCargoFile, rowsToFlush.join("\n") + "\n");
}
// check if to commit disk file to clickhouse DB
exam() {
//debuglog "[exam] go commit"
this.flushToFile((err, isFlushing) => {
if (err != null) {
debuglog("[exam] ABORT fail to flush. error:", err);
return;
}
if (!Array.isArray(filenamList)) {
if (isFlushing) {
debuglog("[exam] ABORT isFlushing");
return;
}
filenamList = filenamList.filter(function(item) {
return item.startsWith(Bulk.FILENAME_PREFIX);
});
if (!(filenamList.length > 0)) {
if (!(Date.now() > this.lastCommitAt + this.commitInterval)) {
return;
}
debuglog(`[restoreExistingFiles] filenamList(${filenamList.length})`);
for (i = 0, len = filenamList.length; i < len; i++) {
filename = filenamList[i];
pathToFile = path.join(this.workingPath, filename);
stats = fs.statSync(pathToFile);
if (stats.size <= 0) {
debuglog(`[restoreExistingFiles] remove empty:${filename}`);
fs.unlink(pathToFile, NOOP);
// detect leader before every commit because worker might die
//debuglog "[exam] SKIP tick not reach"
detectLeaderWorker(this.id, (err, leadWorkerId) => {
if (err != null) {
debuglog("[exam > detectLeaderWorker] FAILED error:", err);
return;
}
// only one process can commit
if (leadWorkerId !== CLUSTER_WORKER_ID) {
debuglog(`[exam] CANCLE NOT leadWorkerId:${leadWorkerId}`);
// non-leader skip 10 commit rounds
this.lastCommitAt = Date.now() + this.commitInterval * 10;
return;
}
debuglog("[exam] LEAD COMMIT");
this.rotateFile((err) => {
if (err != null) {
debuglog("[exam > rotateFile] FAILED error:", err);
return;
}
this.commitToClickhouseDB();
this.lastCommitAt = Date.now();
});
});
});
}
rotateFile(callbak = NOOP) {
fs.stat(this.pathToCargoFile, (err, stats) => {
var pathToRenameFile;
//debuglog "[exam > stat] err:", err,", stats:", stats
if (err != null) {
if (err.code === 'ENOENT') {
debuglog("[rotateFile] SKIP nothing to rotate");
callbak();
} else {
existingBulkId = filename.replace(Bulk.FILENAME_PREFIX, "");
debuglog("[restoreExistingFiles] restore existing bulk:", existingBulkId);
this.bulks.push(new Bulk(this.workingPath, existingBulkId));
debuglog("[rotateFile] ABORT fail to stats file. error:", err);
callbak(err);
}
return;
}
if (!(stats && (stats.size > 0))) {
debuglog("[rotateFile] SKIP empty file.");
callbak();
return;
}
// rotate disk file
pathToRenameFile = path.join(this.pathToCargoFolder, `${FILENAME_PREFIX}${this.id}.${Date.now().toString(36) + `_${++StaticCountWithinProcess}`}.${CLUSTER_WORKER_ID}${EXTNAME_UNCOMMITTED}`);
debuglog(`[rotateFile] rotate to ${pathToRenameFile}`);
fs.rename(this.pathToCargoFile, pathToRenameFile, (err) => {
if (err != null) {
debuglog(`[exam] ABORT fail to rename file to ${pathToRenameFile}. error:`, err);
callbak(err);
return;
}
callbak();
});
});
}
moveToNextBulk() {
debuglog(`${this} [moveToNextBulk]`);
if (this.curBulk) {
this.bulks.push(this.curBulk);
// commit local rotated files to remote ClickHouse DB
commitToClickhouseDB() {
if (this._isCommiting) {
debuglog("[commitToClickhouseDB] SKIP is committing");
return;
}
this.curBulk = new Bulk(this.workingPath);
this.curBulk.expire(this.bulkTTL);
}
// detect leader before every commit because worker might die
//detectLeaderWorker @id, (err, leadWorkerId)=>
//#debuglog "[commitToClickhouseDB > detectLeaderWorker] err:", err, ", leadWorkerId:", leadWorkerId
//if err?
//debuglog "[commitToClickhouseDB > detectLeaderWorker] FAILED error:", err
//return
// routine to exame each bulk belongs to this cargo
exam() {
var bulk, bulksToRemove, countIssueCommit, i, j, len, len1, pos, ref;
//debuglog "#{@} [exam]"
if (this.curBulk) {
if (this.curBulk.isEmpty()) {
// lazy: keep ttl when bulk is empty
this.curBulk.expire(this.bulkTTL);
} else if (this.curBulk.isExpired()) {
this.moveToNextBulk();
//# only one process can commit
//unless leadWorkerId is CLUSTER_WORKER_ID
//debuglog "[commitToClickhouseDB] CANCLE NOT leadWorkerId:#{leadWorkerId}"
//return
//debuglog "[commitToClickhouseDB] LEAD COMMIT"
fs.readdir(this.pathToCargoFolder, (err, filenamList) => {
var combinedStream, dbStream, rotationPrefix;
//debuglog "[commitToClickhouseDB > readdir] err:", err, ", filenamList:", filenamList
if (err != null) {
debuglog("[commitToClickhouseDB > ls] FAILED error:", err);
return;
}
}
bulksToRemove = [];
this.bulks.sort(function(a, b) {
return (parseInt(a.id, 36) || 0) - (parseInt(b.id, 36) || 0);
});
countIssueCommit = 0;
ref = this.bulks;
for (i = 0, len = ref.length; i < len; i++) {
bulk = ref[i];
if (bulk.isCommitted()) {
bulksToRemove.push(bulk);
} else {
if (countIssueCommit < MAX_COMMIT_PER_EXAM_ROUTINE) {
bulk.commit(this.clichouseClient, this.statement);
++countIssueCommit;
if (!Array.isArray(filenamList)) {
return;
}
rotationPrefix = FILENAME_PREFIX + this.id + '.';
filenamList = filenamList.filter(function(item) {
return item.startsWith(rotationPrefix) && item.endsWith(EXTNAME_UNCOMMITTED);
});
if (!(filenamList.length > 0)) {
return;
}
debuglog(`[commitToClickhouseDB] filenamList(${filenamList.length})`);
filenamList = filenamList.map((item) => {
return path.join(this.pathToCargoFolder, item);
});
this._isCommiting = true; //lock
dbStream = this.clichouseClient.query(this.statement, {
format: 'JSONCompactEachRow'
});
dbStream.on('error', (err) => {
debuglog(`${this} [commitToClickhouseDB > DB write] FAILED error:`, err);
this._isCommiting = false;
});
dbStream.on('finish', () => {
var filepath, j, len;
debuglog(`${this} [commitToClickhouseDB] success dbStream:finish`);
this._isCommiting = false;
for (j = 0, len = filenamList.length; j < len; j++) {
filepath = filenamList[j];
fs.unlink(filepath, NOOP); // remove the physical file
}
}
}
if (bulksToRemove.length > 0) {
debuglog(`${this} [exam], bulks:`, this.bulks.length, ", bulksToRemove:", bulksToRemove.map(function(item) {
return item.id;
});
combinedStream = new MultiStream(filenamList.map(function(filepath) {
return fs.createReadStream(filepath);
}));
}
for (j = 0, len1 = bulksToRemove.length; j < len1; j++) {
bulk = bulksToRemove[j];
pos = this.bulks.indexOf(bulk);
debuglog(`${this} [exam] remove bulk: ${bulk.toString()}@${pos}`);
if (pos >= 0) {
this.bulks.splice(pos, 1);
}
}
//console.dir combinedStream , depth:10
combinedStream.pipe(dbStream);
});
}
push() {
this.curBulk.push(Array.from(arguments));
return ++this.count;
}
getRetiredBulks() {
return this.bulks.concat();
}
};

@@ -184,0 +297,0 @@

// Generated by CoffeeScript 2.5.1
(function() {
var FILENAME_PREFIX, NUM_OF_LINE, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, STATEMENT_INSERT2, TABLE_NAME, assert, clickHouseClient, cluster, createCargo, crypto, debuglog, fs, getClickHouseClient, isInited, path, prepareBulkCachFile, statment, toSQLDateString;
var NUM_OF_LINE, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, STATEMENT_INSERT2, StartCountWithinProcess, TABLE_NAME, assert, clickHouseClient, cluster, columnValueString, crashProc, createCargo, crypto, debuglog, fs, getClickHouseClient, isInited, path, prepareBulkCachFile, proc, statment, theCargo, toSQLDateString;

@@ -19,3 +19,3 @@ ({createCargo, isInited, getClickHouseClient} = require("../"));

({FILENAME_PREFIX, toSQLDateString} = require("../bulk"));
({toSQLDateString} = require("../utils"));

@@ -34,28 +34,19 @@ TABLE_NAME = "cargo_test.unittest05";

STATEMENT_INSERT = `INSERT INTO ${TABLE_NAME}`;
columnValueString = Date.now().toString(36);
STATEMENT_DROP_TABLE = `DROP TABLE ${TABLE_NAME}`;
STATEMENT_INSERT = `INSERT INTO ${TABLE_NAME}`;
NUM_OF_LINE = 36;
STATEMENT_DROP_TABLE = `DROP TABLE IF EXISTS ${TABLE_NAME}`;
STATEMENT_INSERT2 = `insert into ${TABLE_NAME}`;
NUM_OF_LINE = 12;
STATEMENT_INSERT2 = `insert into ${TABLE_NAME}`;
prepareBulkCachFile = function(insertStatement, label) {
var PathToCargoFile, arr, bulkId, content, i, j, ref, theFilepath;
// prepar bulk cache file
PathToCargoFile = path.join(process.cwd(), "cargo_files", "cargo-" + crypto.createHash('md5').update(insertStatement).digest("hex"));
if (fs.existsSync(PathToCargoFile)) {
assert(fs.statSync(PathToCargoFile).isDirectory(), `${PathToCargoFile} is not a directory`);
} else {
fs.mkdirSync(PathToCargoFile, {
recursive: true,
mode: 0o755
});
}
bulkId = FILENAME_PREFIX + Date.now().toString(36) + "_1";
theFilepath = path.join(PathToCargoFile, bulkId);
var arr, content, i, j, ref, theFilepath;
theFilepath = path.join(process.cwd(), "cargo_files", `cargo_${crypto.createHash('md5').update(insertStatement).digest("hex")}.${Date.now().toString(36)}_unittest.nocluster.uncommitted`);
debuglog("[prepare] theFilepath:", theFilepath);
content = "";
for (i = j = 0, ref = NUM_OF_LINE; (0 <= ref ? j < ref : j > ref); i = 0 <= ref ? ++j : --j) {
arr = [toSQLDateString(new Date()), i, label || "cluster-bulk"];
arr = [toSQLDateString(new Date()), i, label || "cluster-cargo"];
content += JSON.stringify(arr) + "\n";

@@ -73,4 +64,4 @@ }

var i, j;
prepareBulkCachFile(STATEMENT_INSERT, "batchA");
prepareBulkCachFile(STATEMENT_INSERT2, "batchB");
prepareBulkCachFile(STATEMENT_INSERT, "~batchA");
prepareBulkCachFile(STATEMENT_INSERT2, "~batchB");
// spawn worker

@@ -85,5 +76,19 @@ for (i = j = 0; j < 8; i = ++j) {

debuglog(`[isWorker ${cluster.worker.id}] statment:`, statment);
createCargo(statment);
theCargo = createCargo(statment);
StartCountWithinProcess = 0;
proc = function() {
var i, j, mark, ref;
mark = `worker@${cluster.worker.id}:${StartCountWithinProcess++}`;
debuglog("proc insert: ", mark);
for (i = j = 0, ref = NUM_OF_LINE; (0 <= ref ? j < ref : j > ref); i = 0 <= ref ? ++j : --j) {
theCargo.push(new Date(), i, mark);
}
};
setInterval(proc, 2000);
crashProc = function() {
nonExistingFunc();
};
setTimeout(crashProc, 10000 + Math.random() * 10000 >>> 0);
}
}).call(this);
// Generated by CoffeeScript 2.5.1
(function() {
var Cargo, ClickHouse, ClickHouseClient, DEFAULT_BULK_TTL, MIN_BULK_TTL, PathToCargoFile, STATEMENT_TO_CARGO, assert, createCargo, debuglog, err, examCargos, fs, init, os, path, pathToConfig, profileConfig, profileName;
var CLUSTER_WORKER_ID, Cargo, CargoOptions, ClickHouse, ClickHouseClient, STATEMENT_TO_CARGO, assert, cluster, createCargo, debuglog, err, examCargos, fs, init, os, path, pathToConfig, profileConfig, profileName;
debuglog = require("debug")("chcargo:index");
ClickHouse = require('@apla/clickhouse');

@@ -17,2 +15,8 @@

cluster = require('cluster');
CLUSTER_WORKER_ID = cluster.isMaster ? "nocluster" : cluster.worker.id;
debuglog = require("debug")(`chcargo:index@${CLUSTER_WORKER_ID}`);
Cargo = require("./cargo");

@@ -22,26 +26,44 @@

// for how long (in ms) one bulk should keep accumlate inserts
DEFAULT_BULK_TTL = 5 * 1000;
MIN_BULK_TTL = 1000;
STATEMENT_TO_CARGO = {};
PathToCargoFile = null;
CargoOptions = {};
// @param Object config:
// .pathToCargoFolder
// .maxTime
// .maxRows
// .commitInterval
init = function(config) {
var commitInterval, isToFlushBeforeCrash, maxRows, maxTime;
assert(ClickHouseClient === null, "ClickHouseClient has already inited");
assert(config && config.host, "missing host in config");
config = Object.assign({}, config); // leave input obj unmodified
// prepare disk path
PathToCargoFile = path.resolve(process.cwd(), config.cargoPath || "cargo_files");
debuglog("[init] PathToCargoFile:", PathToCargoFile);
CargoOptions.pathToCargoFolder = path.resolve(process.cwd(), config.cargoPath || "cargo_files");
delete config.cargoPath;
if (fs.existsSync(PathToCargoFile)) {
assert(fs.statSync(PathToCargoFile).isDirectory(), `${PathToCargoFile} is not a directory`);
} else {
fs.mkdirSync(PathToCargoFile, {
recursive: true,
mode: 0o755
});
// verify cargo can write to the destination folder
fs.accessSync(CargoOptions.pathToCargoFolder, fs.constants.W_OK); //, "Cargo not able to write to folder #{CargoOptions.pathToCargoFolder}"
fs.stat(CargoOptions.pathToCargoFolder, function(err, stats) {
assert(err == null, `Fail to read directory stats. Due to ${err}`);
assert(stats.isDirectory(), `Not a directory: ${CargoOptions.pathToCargoFolder}`);
});
maxTime = parseInt(config.maxTime);
if (maxTime > 0) {
CargoOptions.maxTime = maxTime;
}
delete config.maxTime;
maxRows = parseInt(config.maxRows);
if (maxRows > 0) {
CargoOptions.maxRows = maxRows;
}
delete config.maxRows;
commitInterval = parseInt(config.commitInterval);
if (commitInterval > 0) {
CargoOptions.commitInterval = commitInterval;
}
delete config.commitInterval;
//debuglog "[init] CargoOptions:", CargoOptions
isToFlushBeforeCrash = config.saveWhenCrash !== false;
delete config.saveWhenCrash;
ClickHouseClient = new ClickHouse(config);

@@ -53,2 +75,14 @@ ClickHouseClient.ping(function(err) {

});
if (isToFlushBeforeCrash) {
// flush in-memroy data when process crash
process.on('uncaughtException', function(err) {
var cargo, statement;
debuglog("⚠️⚠️⚠️ [flushSyncInMemoryCargo] ⚠️⚠️⚠️ ");
for (statement in STATEMENT_TO_CARGO) {
cargo = STATEMENT_TO_CARGO[statement];
cargo.flushSync();
}
throw err;
});
}
};

@@ -59,6 +93,5 @@

// @param statement String, sql insert statement
// @param bulkTTL Int, ttl(in ms) for flush accumlated inserts. default: 5000, min: 1000
createCargo = function(statement, bulkTTL) {
createCargo = function(statement) {
var cargo;
debuglog(`[createCargo] statement:${statement}, bulkTTL:${bulkTTL}`);
debuglog(`[createCargo] statement:${statement}`);
assert(ClickHouseClient, "ClickHouseClient needs to be inited first");

@@ -68,13 +101,8 @@ statement = String(statement || "").trim();

assert(statement.toUpperCase().startsWith("INSERT"), "statement must be an insert sql");
bulkTTL = parseInt(bulkTTL) || DEFAULT_BULK_TTL;
if (bulkTTL < MIN_BULK_TTL) {
bulkTTL = MIN_BULK_TTL;
}
cargo = STATEMENT_TO_CARGO[statement];
if (cargo) {
cargo.setBulkTTL(bulkTTL);
debuglog("[createCargo] reuse cargo:", cargo.toString());
return cargo;
}
cargo = new Cargo(ClickHouseClient, statement, bulkTTL, PathToCargoFile);
cargo = new Cargo(ClickHouseClient, statement, CargoOptions);
STATEMENT_TO_CARGO[statement] = cargo;

@@ -113,3 +141,3 @@ debuglog("[createCargo] cargo:", cargo.toString());

// self examination routine
setInterval(examCargos, MIN_BULK_TTL);
setInterval(examCargos, 1000);

@@ -116,0 +144,0 @@ module.exports = {

// Generated by CoffeeScript 2.5.1
(function() {
// when in cluster worker mode, elect a leader for each cargo to avoid race condition when restoring existing bulks
var MAX_UDP_CONFIRM, NOOP, REG_MSG_VALIDATOR, RemoteClientPortByCargoId, SERVICE_HOST, SERVICE_PORT, SERVICE_TYPE, UDPEchoSrv, assert, cluster, debuglog, dgram, electSelfToALeader, err;
var CLUSTER_WORKER_ID, MAX_UDP_CONFIRM, REG_MSG_VALIDATOR, RemoteClientPortByCargoId, SERVICE_HOST, SERVICE_PORT, SERVICE_TYPE, UDPEchoSrv, assert, cluster, debuglog, detectLeaderWorker, dgram, err;

@@ -12,5 +12,5 @@ assert = require("assert");

debuglog = require("debug")(`chcargo:leader_election@${cluster.isMaster ? "main" : cluster.worker.id}`);
CLUSTER_WORKER_ID = cluster.isMaster ? "nocluster" : cluster.worker.id;
NOOP = function() {};
debuglog = require("debug")(`chcargo:leader_election@${CLUSTER_WORKER_ID}`);

@@ -69,7 +69,7 @@ SERVICE_TYPE = "clickhouse-cargo";

// communicate with other cluster worker and to elect a leader worker for the given cargoId
electSelfToALeader = function(cargoId, callbak = NOOP) {
detectLeaderWorker = function(cargoId, callbak) {
var cargoLeaderId, countSend, msg, procSend, udpClient, workerId;
if (cluster.isMaster && Object.keys(cluster.workers).length === 0) {
debuglog("[electSelfToALeader] single process leader");
callbak();
callbak(null, CLUSTER_WORKER_ID);
return;

@@ -105,13 +105,12 @@ }

if (countSend > MAX_UDP_CONFIRM) {
if (cargoLeaderId === workerId) {
debuglog(`[electSelfToALeader@${workerId}] is leader for ${cargoId}`);
callbak();
} else {
debuglog(`[electSelfToALeader@${workerId}] is follower for ${cargoId}`);
}
udpClient.close();
callbak(null, cargoLeaderId);
} else {
//if cargoLeaderId is workerId
//debuglog "[electSelfToALeader@#{workerId}] is leader for #{cargoId}"
//else
//debuglog "[electSelfToALeader@#{workerId}] is follower for #{cargoId}"
udpClient.send(msg, 0, msg.length, SERVICE_PORT, SERVICE_HOST);
//setTimeout(procSend, Math.random() * 1000 >>> 0)
setTimeout(procSend, 1000);
setTimeout(procSend, 200);
}

@@ -123,5 +122,5 @@ };

module.exports = {
electSelfToALeader: electSelfToALeader
detectLeaderWorker: detectLeaderWorker
};
}).call(this);
// Generated by CoffeeScript 2.5.1
(function() {
var QUERY, QUERY1, assert, createCargo, isInited;
var INIT_OPTION, QUERY, QUERY1, assert, createCargo, init, isInited;
({createCargo, isInited} = require("../"));
({createCargo, isInited, init} = require("../"));
assert = require("assert");
INIT_OPTION = {
host: "localhost",
maxTime: 2000,
maxRows: 100,
commitInterval: 8000
};
QUERY = "INSERT INTO test.cargo0 FORMAT JSONCompactEachRow";

@@ -16,2 +23,8 @@

cargo0 = null;
//before (done)->
//init(INIT_OPTION)
//done()
//return
//after -> process.exit(0)
it("auto init when env set", function(done) {

@@ -29,5 +42,8 @@ assert(isInited(), "should auto init when env set");

cargo0 = createCargo(QUERY);
//console.log cargo0
assert(cargo0);
assert(cargo0.id);
assert(cargo0.curBulk);
assert(cargo0.id, "bad cargo0.id");
assert(cargo0.maxTime === INIT_OPTION.maxTime, `bad cargo0.maxTime:${cargo0.maxTime} => ${INIT_OPTION.maxTime}`);
assert(cargo0.maxRows === INIT_OPTION.maxRows, "bad cargo0.maxRows");
assert(cargo0.commitInterval === INIT_OPTION.commitInterval, "bad cargo0.commitInterval");
done();

@@ -34,0 +50,0 @@ });

// Generated by CoffeeScript 2.5.1
(function() {
var NUM_OF_LINE, QUERY, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, TABLE_NAME, assert, createCargo, debuglog, fs, getClickHouseClient, isInited;
var INIT_OPTION, NUM_OF_LINE, QUERY, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, TABLE_NAME, _, assert, createCargo, debuglog, fs, getClickHouseClient, isInited;

@@ -15,2 +15,4 @@ ({createCargo, isInited} = require("../"));

_ = require("lodash");
TABLE_NAME = "cargo_test.unittest02";

@@ -22,3 +24,3 @@

STATEMENT_DROP_TABLE = `DROP TABLE ${TABLE_NAME}`;
STATEMENT_DROP_TABLE = `DROP TABLE IF EXISTS ${TABLE_NAME}`;

@@ -35,38 +37,54 @@ STATEMENT_CREATE_TABLE = `CREATE TABLE IF NOT EXISTS ${TABLE_NAME}

// refer
INIT_OPTION = {
host: "localhost",
maxTime: 2000,
maxRows: 100,
commitInterval: 8000
};
//NUM_OF_LINE = 80
NUM_OF_LINE = 100; // NOTE: bulk flushs every 100 lines
NUM_OF_LINE = 429; // NOTE: bulk flushs every 100 lines
describe("push log to cargo", function() {
var theBulk, theCargo, theFilepath;
this.timeout(5000);
var columnValueString, theCargo;
this.timeout(20000);
theCargo = null;
theBulk = null;
theFilepath = null;
columnValueString = Date.now().toString(36);
before(function(done) {
debuglog("[before]");
theCargo = createCargo(QUERY, 999000);
theBulk = theCargo.curBulk;
theFilepath = theBulk.pathToFile;
getClickHouseClient().query(STATEMENT_CREATE_TABLE, done);
});
after(function(done) {
debuglog("[after] query:", STATEMENT_DROP_TABLE);
getClickHouseClient().query(STATEMENT_DROP_TABLE, function(err) {
done(err);
if (err != null) {
throw err;
}
getClickHouseClient().query(STATEMENT_CREATE_TABLE, function(err) {
if (err != null) {
throw err;
}
theCargo = createCargo(QUERY);
if (fs.existsSync(theCargo.pathToCargoFile)) { // clean up existing log
fs.unlinkSync(theCargo.pathToCargoFile);
}
done();
});
});
});
//after -> process.exit(0)
it("push to cargo", function(done) {
var i, j, ref;
for (i = j = 0, ref = NUM_OF_LINE; (0 <= ref ? j < ref : j > ref); i = 0 <= ref ? ++j : --j) {
theCargo.push(new Date(), i, "string");
theCargo.push(new Date(), i, columnValueString);
}
assert(fs.existsSync(theFilepath), `log file not exist on ${theFilepath}`);
setTimeout(done, 2000); // wait file stream flush
setTimeout(done, 5000); // wait file stream flush
});
it("cargo should flush to file", function() {
assert(fs.existsSync(theCargo.pathToCargoFile), `log file not exist on ${theCargo.pathToCargoFile}`);
});
return it("exam content written on hd file", function(done) {
var contentInHD, contentInHDArr, i, j, len, line;
contentInHD = fs.readFileSync(theFilepath, 'utf8');
contentInHDArr = contentInHD.split(/\r|\n|\r\n/);
debuglog("[exam hd content] contentInHDArr:", contentInHDArr.length);
assert(contentInHDArr.length === NUM_OF_LINE, "unmatching output length");
contentInHD = fs.readFileSync(theCargo.pathToCargoFile, 'utf8');
//debuglog "[exam hd content] contentInHD:", contentInHD
contentInHDArr = _.compact(contentInHD.split(/\r|\n|\r\n/));
//debuglog "[exam hd content] contentInHDArr:", contentInHDArr
assert(contentInHDArr.length === NUM_OF_LINE, `unmatching output length. NUM_OF_LINE:${NUM_OF_LINE}, contentInHDArr.length:${contentInHDArr.length}`);
for (i = j = 0, len = contentInHDArr.length; j < len; i = ++j) {

@@ -77,3 +95,3 @@ line = contentInHDArr[i];

assert(line[1] === i, "unmatching field 1 ");
assert(line[2] === "string", "unmatching field 2 ");
assert(line[2] === columnValueString, "unmatching field 2 ");
}

@@ -80,0 +98,0 @@ done();

// Generated by CoffeeScript 2.5.1
(function() {
var NUM_OF_LINE, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, STATEMENT_SELECT, TABLE_NAME, assert, createCargo, debuglog, fs, getClickHouseClient, isInited;
var INIT_OPTION, NUM_OF_LINE, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, STATEMENT_SELECT, TABLE_NAME, assert, columnValueString, createCargo, debuglog, fs, getClickHouseClient, isInited;

@@ -27,7 +27,16 @@ ({createCargo, isInited, getClickHouseClient} = require("../"));

STATEMENT_DROP_TABLE = `DROP TABLE ${TABLE_NAME}`;
STATEMENT_DROP_TABLE = `DROP TABLE IF EXISTS ${TABLE_NAME}`;
//STATEMENT_SELECT = "SELECT * FROM #{TABLE_NAME} LIMIT 10000000"
STATEMENT_SELECT = `SELECT * FROM ${TABLE_NAME} LIMIT 10000000 FORMAT JSONCompactEachRow `;
columnValueString = Date.now().toString(36);
STATEMENT_SELECT = `SELECT * FROM ${TABLE_NAME} WHERE pos_id='${columnValueString}' LIMIT 100000 FORMAT JSONCompactEachRow `;
// refer
INIT_OPTION = {
host: "localhost",
maxTime: 2000,
maxRows: 100,
commitInterval: 8000
};
NUM_OF_LINE = 27891; // NOTE: bulk flushs every 100 lines

@@ -37,34 +46,33 @@

describe("commit bulk", function() {
var theBulk, theCargo, theFilepath;
this.timeout(60000);
var theCargo;
this.timeout(30000);
theCargo = null;
theBulk = null;
theFilepath = null;
before(function(done) {
theCargo = createCargo(STATEMENT_INSERT);
theBulk = theCargo.curBulk;
theFilepath = theBulk.pathToFile;
getClickHouseClient().query(STATEMENT_CREATE_TABLE, done);
});
after(function(done) {
debuglog("[after] query:", STATEMENT_DROP_TABLE);
debuglog("[before]");
getClickHouseClient().query(STATEMENT_DROP_TABLE, function(err) {
done(err);
if (err != null) {
throw err;
}
getClickHouseClient().query(STATEMENT_CREATE_TABLE, function(err) {
if (err != null) {
throw err;
}
theCargo = createCargo(STATEMENT_INSERT);
if (fs.existsSync(theCargo.pathToCargoFile)) { // clean up existing log
fs.unlinkSync(theCargo.pathToCargoFile);
}
done();
});
});
});
//process.exit() if not err?
//after -> process.exit(0)
it("push to cargo", function(done) {
var i, j, ref;
for (i = j = 0, ref = NUM_OF_LINE; (0 <= ref ? j < ref : j > ref); i = 0 <= ref ? ++j : --j) {
theCargo.push(new Date(), i, "string");
theCargo.push(new Date(), i, columnValueString);
}
setTimeout(done, 10000); // wait file stream flush
setTimeout(done, 15000); // wait file stream flush
});
it("bulk should committed", function(done) {
var curBulk;
assert(theBulk.isCommitted(), "the bulk should committed");
curBulk = theCargo.curBulk;
assert(curBulk !== theBulk, "previouse bulk should not be the current bulk");
assert(theCargo.getRetiredBulks().length === 0, "committed bulks should be cleared");
assert(!fs.existsSync(theFilepath), "local file must be cleared");
assert(!fs.existsSync(theCargo.pathToCargoFile), "local file must be cleared");
done();

@@ -97,3 +105,3 @@ });

assert(row[1] === i, "unmatching field 1 ");
assert(row[2] === "string", "unmatching field 2 ");
assert(row[2] === columnValueString, "unmatching field 2 ");
}

@@ -100,0 +108,0 @@ done();

// Generated by CoffeeScript 2.5.1
(function() {
var FILENAME_PREFIX, NUM_OF_LINE, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, STATEMENT_SELECT, TABLE_NAME, assert, createCargo, crypto, debuglog, fs, getClickHouseClient, isInited, path, toSQLDateString;
var NUM_OF_LINE, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, STATEMENT_SELECT, TABLE_NAME, assert, columnValueString, createCargo, crypto, debuglog, fs, getClickHouseClient, isInited, path, toSQLDateString;

@@ -17,3 +17,3 @@ ({createCargo, isInited, getClickHouseClient} = require("../"));

({FILENAME_PREFIX, toSQLDateString} = require("../bulk"));
({toSQLDateString} = require("../utils"));

@@ -34,9 +34,11 @@ TABLE_NAME = "cargo_test.unittest04";

STATEMENT_DROP_TABLE = `DROP TABLE ${TABLE_NAME}`;
columnValueString = Date.now().toString(36);
STATEMENT_SELECT = `SELECT * FROM ${TABLE_NAME} LIMIT 10000000 FORMAT JSONCompactEachRow `;
STATEMENT_DROP_TABLE = `DROP TABLE IF EXISTS ${TABLE_NAME}`;
STATEMENT_SELECT = `SELECT * FROM ${TABLE_NAME} WHERE pos_id='${columnValueString}' LIMIT 100000 FORMAT JSONCompactEachRow `;
NUM_OF_LINE = 3396;
describe("restore-local-bulks", function() {
describe("restore-local-rotations", function() {
var theCargo, theFilepath;

@@ -48,4 +50,4 @@ this.timeout(60000);

getClickHouseClient().query(STATEMENT_DROP_TABLE, function(err) {
if (err == null) {
process.exit();
if (err != null) {
throw err;
}

@@ -55,27 +57,13 @@ getClickHouseClient().query(STATEMENT_CREATE_TABLE, done);

});
//after (done)->
//debuglog "[after] query:", STATEMENT_DROP_TABLE
//getClickHouseClient().query STATEMENT_DROP_TABLE, (err)->
//done(err)
//process.exit() if not err?
//return
//return
it("prepare local bulks", function(done) {
var PathToCargoFile, arr, bulkId, content, err, i, j, ref;
after(function() {
return process.exit(0);
});
it("prepare local rotations", function(done) {
var arr, content, err, i, j, ref;
try {
PathToCargoFile = path.join(process.cwd(), "cargo_files", "cargo-" + crypto.createHash('md5').update(STATEMENT_INSERT).digest("hex"));
if (fs.existsSync(PathToCargoFile)) {
assert(fs.statSync(PathToCargoFile).isDirectory(), `${PathToCargoFile} is not a directory`);
} else {
fs.mkdirSync(PathToCargoFile, {
recursive: true,
mode: 0o755
});
}
bulkId = FILENAME_PREFIX + Date.now().toString(36) + "_1";
theFilepath = path.join(PathToCargoFile, bulkId);
theFilepath = path.join(process.cwd(), "cargo_files", `cargo_${crypto.createHash('md5').update(STATEMENT_INSERT).digest("hex")}.${Date.now().toString(36)}_unittest.nocluster.uncommitted`);
debuglog("[prepare] theFilepath:", theFilepath);
content = "";
for (i = j = 0, ref = NUM_OF_LINE; (0 <= ref ? j < ref : j > ref); i = 0 <= ref ? ++j : --j) {
arr = [toSQLDateString(new Date()), i, "test04"];
arr = [toSQLDateString(new Date()), i, columnValueString];
content += JSON.stringify(arr) + "\n";

@@ -90,3 +78,3 @@ }

}
setTimeout(done, 10000); // wait for cargo.exam
setTimeout(done, 20000); // wait for cargo.exam
});

@@ -118,3 +106,3 @@ it("local bulks should be commit to ClickHouse", function(done) {

assert(row[1] === i, "unmatching field 1 ");
assert(row[2] === "local-bulk", "unmatching field 2 ");
assert(row[2] === columnValueString, "unmatching field 2 ");
}

@@ -121,0 +109,0 @@ done();

{
"name": "clickhouse-cargo",
"version": "1.1.1",
"version": "2.0.0",
"description": "Accumulates insert queries and commit them to clickhouse in batch jobs with retries on failure. 这个模块将向 Clickhouse 数据库的高频写入改为低频的批量插入。",

@@ -36,2 +36,3 @@ "main": "lib/index.js",

"coffeescript": "^2.5.1",
"lodash": "^4.17.20",
"mocha": "^8.1.3",

@@ -42,4 +43,5 @@ "nyc": "^15.1.0"

"@apla/clickhouse": "^1.6.3",
"debug": "^4.1.1"
"debug": "^4.1.1",
"multistream": "^4.0.0"
}
}

@@ -14,23 +14,21 @@ # clickhouse-cargo

1. The `cargo` instance accepts insert requests submitted by the `push` method and routes these requests to a `bulk`.
2. The `bulk` writes accumulated `push` in the memory to a local file cache according to the setting of `stream.cork`.
3. `cargo` checks all online `bulks` regularly. When a `bulk` exceeds its `bulkTTL`, it will then commit its local file cache to the Clickhouse server.
4. In case of a Clickhouse commit failure, `bulk` will retry the submission in the next round of inspection cycle until the submission is successful.
5. In case of the NodeJS process crash. local `bulk` file caches will remain on disk. Thus next time when `clickHouse-cargo` module starts, `cargo` checks the remaining `bulk` cache files, and submit them to Clickhouse again.
1. A `cargo` instance accepts insert requests submitted by the `push` method and keep inserts in-memory.
2. The `cargo` instance periodically flushs in-memory inserts to a file cache, then rotates this file and commits rotations to the Clickhouse server.
3. In case of a Clickhouse commit failure, the cargo will retry the submission in the next round of its routine till the submission is successful.
4. In case of the NodeJS process crash. in-memory inserts will be flushed immediately into the file cache.
### Cluster mode support
When running in cluster mode (such as [PM2 cluster deployment](https://pm2.keymetrics.io/docs/usage/cluster-mode/) ), all cargo workers will run through an election via udp communication @ 127.0.0.1:17888 to elect a leader worker. Then the leader worker will take care of restoring existing bulks.
When running in cluster mode (such as [PM2 cluster deployment](https://pm2.keymetrics.io/docs/usage/cluster-mode/) ), all cargo workers will run through an election via udp communication @ 127.0.0.1:17888 to elect a leader worker. Then only the leader worker will carry on with file rotations and commitments.
## 工作原理
1. `cargo` 实例接受 `push`方法所提交的插入请求,并将请求路由给 `bulk`。
2. `bulk` 根据 `stream.cork` 的设定,按量将内存中累计的 `push` 写入本地文件缓存。
3. `cargo` 定时检查所有在线的 `bulk`, 当 `bulk` 的存活超过 `bulkTTL` 的设定时,将 `bulk` 所对应的本地文件缓存提交到 Clickhouse 服务器。
4. 当 Clickhouse 写入失败时,`bulk` 将会在下一轮检查周期中重试提交直到提交成功。
5. 当本地的 NodeJS 进程奔溃时,都会导致本地的 `bulk` 文件缓存残留。于是下一次启动 `clickHouse-cargo` 模块时, `cargo` 检查到残留的 `bulk` 缓存文件时将再次提交给 Clickhouse。
1. `cargo` 实例接受 `push`方法所提交的插入请求,并将请求临时存放于内存中。
1. `cargo` 周期性地将内存中累积的插入记录写入对应的文件缓存。随后将文件缓存进行滚动,并将滚出结果提交到 Clickhouse 数据库。
4. 当向 Clickhouse 写入失败时,`cargo` 将会在下一轮检查周期中重试提交直到提交成功。
5. 当本地的 NodeJS 进程奔溃时,内存中累积的插入请求会被同步写入对应的文件缓存。
### 支持集群模式
在集群模式下,所有的 cargo woker 将通过UDP通讯选举出一个领头的worker。 接着由这个领头的worker来负责恢复文件残留缓存的工作。
在集群模式下,所有的 cargo woker 将通过UDP通讯选举出一个领头的worker。 接着由这个领头的worker来负责文件缓存的滚动和提交到 Clickhouse 数据库。

@@ -92,3 +90,7 @@

| `host` | ✓ | | Host to connect.
| `cargoPath` | | `${cwd()}/cargo_files` | Path to local cargo cache
| `cargoPath` | | `${cwd()}/cargo_files` | Path to local cargo cache.
| `maxTime` | | 1000 | For how long in milliseconds, a cargo will keep in-memory insert buffer before flushing it to file.
| `maxRows` | | 100 | For how many rows a cargo will keep in-memory.
| `commitInterval` | | 5000 | Interval(ms) for cargo to commit to ClickHouse.
| `saveWhenCrash` | | true | When `false`, cargos will not flushSync in-memory data when node process crashes.
| `user` | | | Authentication user.

@@ -115,3 +117,3 @@ | `password` | | | Authentication password.

*/
const cargo = clickhouse-cargo.createCargo(statement, bulkTTL);
const cargo = clickhouse-cargo.createCargo(statement);
```

@@ -118,0 +120,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc