clickhouse-cargo
Advanced tools
Comparing version 1.1.1 to 2.0.0
353
lib/cargo.js
// Generated by CoffeeScript 2.5.1 | ||
(function() { | ||
var Bulk, Cargo, FOLDER_PREFIX, MAX_COMMIT_PER_EXAM_ROUTINE, NOOP, assert, cluster, crypto, debuglog, electSelfToALeader, fs, os, path; | ||
var CLUSTER_WORKER_ID, Cargo, DEFAULT_COMMIT_INTERVAL, EXTNAME_UNCOMMITTED, FILENAME_PREFIX, MAX_COMMIT_PER_EXAM_ROUTINE, MIN_ROWS, MIN_TIME, MultiStream, NOOP, StaticCountWithinProcess, assert, cluster, crypto, debuglog, detectLeaderWorker, fs, os, path, pipeline, toSQLDateString; | ||
@@ -15,12 +15,21 @@ fs = require("fs"); | ||
({pipeline} = require('stream')); | ||
assert = require("assert"); | ||
debuglog = require("debug")("chcargo:cargo"); | ||
//CombinedStream = require('combined-stream') | ||
MultiStream = require('multistream'); | ||
Bulk = require("./bulk"); | ||
({detectLeaderWorker} = require("./leader_election")); | ||
({electSelfToALeader} = require("./leader_election")); | ||
({toSQLDateString} = require("./utils")); | ||
FOLDER_PREFIX = "cargo-"; | ||
CLUSTER_WORKER_ID = cluster.isMaster ? "nocluster" : cluster.worker.id; | ||
debuglog = require("debug")(`chcargo:cargo@${CLUSTER_WORKER_ID}`); | ||
FILENAME_PREFIX = "cargo_"; | ||
EXTNAME_UNCOMMITTED = ".uncommitted"; | ||
NOOP = function() {}; | ||
@@ -30,4 +39,11 @@ | ||
MIN_TIME = 1000; | ||
MIN_ROWS = 100; | ||
DEFAULT_COMMIT_INTERVAL = 5000; | ||
StaticCountWithinProcess = 0; | ||
Cargo = class Cargo { | ||
//toString : -> "[Cargo #{@id}@#{@workingPath}]" | ||
toString() { | ||
@@ -37,147 +53,244 @@ return `[Cargo ${this.id}]`; | ||
constructor(clichouseClient, statement, bulkTTL, pathToCargoFile, skipRestoration) { | ||
// @param ClickHouse clichouseClient | ||
// @param SQLInsertString statement | ||
// @param Object options: | ||
// .pathToCargoFolder | ||
// .maxTime | ||
// .maxRows | ||
// .commitInterval | ||
constructor(clichouseClient, statement, options = {}) { | ||
this.clichouseClient = clichouseClient; | ||
this.statement = statement; | ||
this.bulkTTL = bulkTTL; | ||
debuglog(`[new Cargo] @statement:${this.statement}, @bulkTTL:${this.bulkTTL}`); | ||
//@id = Date.now().toString(36) | ||
this.maxTime = parseInt(options.maxTime) || MIN_TIME; | ||
if (this.maxTime < MIN_TIME) { | ||
this.maxTime = MIN_TIME; | ||
} | ||
this.maxRows = parseInt(options.maxRows) || MIN_ROWS; | ||
if (this.maxRows < 1) { | ||
this.maxRows = MIN_ROWS; | ||
} | ||
this.commitInterval = parseInt(options.commitInterval) || DEFAULT_COMMIT_INTERVAL; | ||
if (this.commitInterval < this.maxTime) { | ||
this.commitInterval = DEFAULT_COMMIT_INTERVAL; | ||
} | ||
this.id = crypto.createHash('md5').update(this.statement).digest("hex"); | ||
this.count = 0; | ||
this.bulks = []; | ||
//@workingPath = fs.mkdtempSync(path.join(os.tmpdir(), FOLDER_PREFIX)) | ||
this.workingPath = path.join(pathToCargoFile, FOLDER_PREFIX + this.id); | ||
if (fs.existsSync(this.workingPath)) { | ||
// directory already exists | ||
assert(fs.statSync(this.workingPath).isDirectory(), `${this.workingPath} is not a directory`); | ||
if (!skipRestoration) { | ||
electSelfToALeader(this.id, this.restoreExistingFiles.bind(this)); | ||
assert(options.pathToCargoFolder, "missing options.pathToCargoFolder"); | ||
this.pathToCargoFolder = options.pathToCargoFolder; | ||
this.pathToCargoFile = path.join(this.pathToCargoFolder, FILENAME_PREFIX + this.id); | ||
debuglog(`[new Cargo] @statement:${this.statement}, @maxTime:${this.maxTime}, @maxRows:${this.maxRows}, @commitInterval:${this.commitInterval}, @pathToCargoFile:${this.pathToCargoFile}`); | ||
// verify cargo can write to the destination folder | ||
fs.access(this.pathToCargoFolder, fs.constants.W_OK, function(err) { | ||
if (err != null) { | ||
throw new Error(`Cargo not able to write to folder ${this.pathToCargoFolder}. Due to ${err}`); | ||
} | ||
} else { | ||
// create directory | ||
//if cluster.isMaster and Object.keys(cluster.workers).length is 0 | ||
//debuglog "[new Cargo] single process, try restoreExistingFiles" | ||
//@restoreExistingFiles() | ||
//else | ||
//debuglog "[new Cargo] cluster worker, to elect lead" | ||
//elector = new BonjourElector({name:@id, host:'127.0.0.1', port:9888}) | ||
//elector.on 'leader', => | ||
//debuglog "worker:#{cluster.worker.id} is leader, try restoreExistingFiles" | ||
//return | ||
//elector.on 'error', (err)=> | ||
//debuglog "worker:#{cluster.worker.id} ELECTION error:", err | ||
//return | ||
//elector.on 'reelection', (err)=> | ||
//debuglog "worker:#{cluster.worker.id} require reelection" | ||
//return | ||
//elector.on 'follower', (err)=> | ||
//debuglog "worker:#{cluster.worker.id} is follower" | ||
//return | ||
//elector.start() | ||
fs.mkdirSync(this.workingPath); | ||
} | ||
this.curBulk = null; | ||
this.moveToNextBulk(); | ||
}); | ||
this.cachedRows = []; | ||
this.lastFlushAt = Date.now(); | ||
this.lastCommitAt = Date.now(); | ||
return; | ||
} | ||
setBulkTTL(val) { | ||
return this.bulkTTL = val; | ||
// push row insert into memory cache | ||
push() { | ||
var arr, i, item, j, len; | ||
arr = Array.from(arguments); | ||
assert(arr.length > 0, "blank row can not be accepted."); | ||
for (i = j = 0, len = arr.length; j < len; i = ++j) { | ||
item = arr[i]; | ||
if (item instanceof Date) { | ||
arr[i] = toSQLDateString(item); | ||
} | ||
} | ||
this.cachedRows.push(JSON.stringify(arr)); | ||
if ((this.cachedRows.length > this.maxRows) || (Date.now() > this.lastFlushAt + this.maxRows)) { | ||
//debuglog "[push] row? #{(@cachedRows.length > @maxRows)}, time? #{(Date.now() > @lastFlushAt + @maxRows)} " | ||
this.flushToFile(); | ||
} | ||
} | ||
restoreExistingFiles() { | ||
debuglog("[restoreExistingFiles] @workingPath:", this.workingPath); | ||
fs.readdir(this.workingPath, (err, filenamList) => { | ||
var existingBulkId, filename, i, len, pathToFile, stats; | ||
// flush memory cache to the disk file | ||
// @callbak (err, isFlushing:Boolean) | ||
flushToFile(callbak = NOOP) { | ||
var rowsToFlush; | ||
//debuglog("#{@} [flushToFile] @_isFlushing:", @_isFlushing) | ||
if (this._isFlushing) { | ||
callbak(null, true); | ||
return; | ||
} | ||
if (!(this.cachedRows.length > 0)) { | ||
//debuglog("#{@} [flushToFile] nothing to flush") | ||
this.lastFlushAt = Date.now(); | ||
callbak(); | ||
return; | ||
} | ||
rowsToFlush = this.cachedRows; | ||
this.cachedRows = []; | ||
debuglog(`${this} [flushToFile] -> ${rowsToFlush.length} rows`); | ||
this._isFlushing = true; | ||
fs.appendFile(this.pathToCargoFile, rowsToFlush.join("\n") + "\n", (err) => { | ||
if (err != null) { | ||
throw err; | ||
debuglog(`${this} [flushToFile] FAILED error:`, err); | ||
this.cachedRows = rowsToFlush.concat(this.cachedRows); // unshift data back | ||
} | ||
debuglog(`${this} [flushToFile] SUCCESS ${rowsToFlush.length} rows`); | ||
this.lastFlushAt = Date.now(); | ||
this._isFlushing = false; | ||
callbak(err); | ||
}); | ||
} | ||
flushSync() { | ||
var rowsToFlush; | ||
if (!(this.cachedRows.length > 0)) { | ||
debuglog(`${this} [flushSync] nothing to flush`); | ||
return; | ||
} | ||
rowsToFlush = this.cachedRows; | ||
this.cachedRows = []; | ||
debuglog(`${this} [flushSync] ${rowsToFlush.length} rows`); | ||
fs.appendFileSync(this.pathToCargoFile, rowsToFlush.join("\n") + "\n"); | ||
} | ||
// check if to commit disk file to clickhouse DB | ||
exam() { | ||
//debuglog "[exam] go commit" | ||
this.flushToFile((err, isFlushing) => { | ||
if (err != null) { | ||
debuglog("[exam] ABORT fail to flush. error:", err); | ||
return; | ||
} | ||
if (!Array.isArray(filenamList)) { | ||
if (isFlushing) { | ||
debuglog("[exam] ABORT isFlushing"); | ||
return; | ||
} | ||
filenamList = filenamList.filter(function(item) { | ||
return item.startsWith(Bulk.FILENAME_PREFIX); | ||
}); | ||
if (!(filenamList.length > 0)) { | ||
if (!(Date.now() > this.lastCommitAt + this.commitInterval)) { | ||
return; | ||
} | ||
debuglog(`[restoreExistingFiles] filenamList(${filenamList.length})`); | ||
for (i = 0, len = filenamList.length; i < len; i++) { | ||
filename = filenamList[i]; | ||
pathToFile = path.join(this.workingPath, filename); | ||
stats = fs.statSync(pathToFile); | ||
if (stats.size <= 0) { | ||
debuglog(`[restoreExistingFiles] remove empty:${filename}`); | ||
fs.unlink(pathToFile, NOOP); | ||
// detect leader before every commit because worker might die | ||
//debuglog "[exam] SKIP tick not reach" | ||
detectLeaderWorker(this.id, (err, leadWorkerId) => { | ||
if (err != null) { | ||
debuglog("[exam > detectLeaderWorker] FAILED error:", err); | ||
return; | ||
} | ||
// only one process can commit | ||
if (leadWorkerId !== CLUSTER_WORKER_ID) { | ||
debuglog(`[exam] CANCLE NOT leadWorkerId:${leadWorkerId}`); | ||
// non-leader skip 10 commit rounds | ||
this.lastCommitAt = Date.now() + this.commitInterval * 10; | ||
return; | ||
} | ||
debuglog("[exam] LEAD COMMIT"); | ||
this.rotateFile((err) => { | ||
if (err != null) { | ||
debuglog("[exam > rotateFile] FAILED error:", err); | ||
return; | ||
} | ||
this.commitToClickhouseDB(); | ||
this.lastCommitAt = Date.now(); | ||
}); | ||
}); | ||
}); | ||
} | ||
rotateFile(callbak = NOOP) { | ||
fs.stat(this.pathToCargoFile, (err, stats) => { | ||
var pathToRenameFile; | ||
//debuglog "[exam > stat] err:", err,", stats:", stats | ||
if (err != null) { | ||
if (err.code === 'ENOENT') { | ||
debuglog("[rotateFile] SKIP nothing to rotate"); | ||
callbak(); | ||
} else { | ||
existingBulkId = filename.replace(Bulk.FILENAME_PREFIX, ""); | ||
debuglog("[restoreExistingFiles] restore existing bulk:", existingBulkId); | ||
this.bulks.push(new Bulk(this.workingPath, existingBulkId)); | ||
debuglog("[rotateFile] ABORT fail to stats file. error:", err); | ||
callbak(err); | ||
} | ||
return; | ||
} | ||
if (!(stats && (stats.size > 0))) { | ||
debuglog("[rotateFile] SKIP empty file."); | ||
callbak(); | ||
return; | ||
} | ||
// rotate disk file | ||
pathToRenameFile = path.join(this.pathToCargoFolder, `${FILENAME_PREFIX}${this.id}.${Date.now().toString(36) + `_${++StaticCountWithinProcess}`}.${CLUSTER_WORKER_ID}${EXTNAME_UNCOMMITTED}`); | ||
debuglog(`[rotateFile] rotate to ${pathToRenameFile}`); | ||
fs.rename(this.pathToCargoFile, pathToRenameFile, (err) => { | ||
if (err != null) { | ||
debuglog(`[exam] ABORT fail to rename file to ${pathToRenameFile}. error:`, err); | ||
callbak(err); | ||
return; | ||
} | ||
callbak(); | ||
}); | ||
}); | ||
} | ||
moveToNextBulk() { | ||
debuglog(`${this} [moveToNextBulk]`); | ||
if (this.curBulk) { | ||
this.bulks.push(this.curBulk); | ||
// commit local rotated files to remote ClickHouse DB | ||
commitToClickhouseDB() { | ||
if (this._isCommiting) { | ||
debuglog("[commitToClickhouseDB] SKIP is committing"); | ||
return; | ||
} | ||
this.curBulk = new Bulk(this.workingPath); | ||
this.curBulk.expire(this.bulkTTL); | ||
} | ||
// detect leader before every commit because worker might die | ||
//detectLeaderWorker @id, (err, leadWorkerId)=> | ||
//#debuglog "[commitToClickhouseDB > detectLeaderWorker] err:", err, ", leadWorkerId:", leadWorkerId | ||
//if err? | ||
//debuglog "[commitToClickhouseDB > detectLeaderWorker] FAILED error:", err | ||
//return | ||
// routine to exame each bulk belongs to this cargo | ||
exam() { | ||
var bulk, bulksToRemove, countIssueCommit, i, j, len, len1, pos, ref; | ||
//debuglog "#{@} [exam]" | ||
if (this.curBulk) { | ||
if (this.curBulk.isEmpty()) { | ||
// lazy: keep ttl when bulk is empty | ||
this.curBulk.expire(this.bulkTTL); | ||
} else if (this.curBulk.isExpired()) { | ||
this.moveToNextBulk(); | ||
//# only one process can commit | ||
//unless leadWorkerId is CLUSTER_WORKER_ID | ||
//debuglog "[commitToClickhouseDB] CANCLE NOT leadWorkerId:#{leadWorkerId}" | ||
//return | ||
//debuglog "[commitToClickhouseDB] LEAD COMMIT" | ||
fs.readdir(this.pathToCargoFolder, (err, filenamList) => { | ||
var combinedStream, dbStream, rotationPrefix; | ||
//debuglog "[commitToClickhouseDB > readdir] err:", err, ", filenamList:", filenamList | ||
if (err != null) { | ||
debuglog("[commitToClickhouseDB > ls] FAILED error:", err); | ||
return; | ||
} | ||
} | ||
bulksToRemove = []; | ||
this.bulks.sort(function(a, b) { | ||
return (parseInt(a.id, 36) || 0) - (parseInt(b.id, 36) || 0); | ||
}); | ||
countIssueCommit = 0; | ||
ref = this.bulks; | ||
for (i = 0, len = ref.length; i < len; i++) { | ||
bulk = ref[i]; | ||
if (bulk.isCommitted()) { | ||
bulksToRemove.push(bulk); | ||
} else { | ||
if (countIssueCommit < MAX_COMMIT_PER_EXAM_ROUTINE) { | ||
bulk.commit(this.clichouseClient, this.statement); | ||
++countIssueCommit; | ||
if (!Array.isArray(filenamList)) { | ||
return; | ||
} | ||
rotationPrefix = FILENAME_PREFIX + this.id + '.'; | ||
filenamList = filenamList.filter(function(item) { | ||
return item.startsWith(rotationPrefix) && item.endsWith(EXTNAME_UNCOMMITTED); | ||
}); | ||
if (!(filenamList.length > 0)) { | ||
return; | ||
} | ||
debuglog(`[commitToClickhouseDB] filenamList(${filenamList.length})`); | ||
filenamList = filenamList.map((item) => { | ||
return path.join(this.pathToCargoFolder, item); | ||
}); | ||
this._isCommiting = true; //lock | ||
dbStream = this.clichouseClient.query(this.statement, { | ||
format: 'JSONCompactEachRow' | ||
}); | ||
dbStream.on('error', (err) => { | ||
debuglog(`${this} [commitToClickhouseDB > DB write] FAILED error:`, err); | ||
this._isCommiting = false; | ||
}); | ||
dbStream.on('finish', () => { | ||
var filepath, j, len; | ||
debuglog(`${this} [commitToClickhouseDB] success dbStream:finish`); | ||
this._isCommiting = false; | ||
for (j = 0, len = filenamList.length; j < len; j++) { | ||
filepath = filenamList[j]; | ||
fs.unlink(filepath, NOOP); // remove the physical file | ||
} | ||
} | ||
} | ||
if (bulksToRemove.length > 0) { | ||
debuglog(`${this} [exam], bulks:`, this.bulks.length, ", bulksToRemove:", bulksToRemove.map(function(item) { | ||
return item.id; | ||
}); | ||
combinedStream = new MultiStream(filenamList.map(function(filepath) { | ||
return fs.createReadStream(filepath); | ||
})); | ||
} | ||
for (j = 0, len1 = bulksToRemove.length; j < len1; j++) { | ||
bulk = bulksToRemove[j]; | ||
pos = this.bulks.indexOf(bulk); | ||
debuglog(`${this} [exam] remove bulk: ${bulk.toString()}@${pos}`); | ||
if (pos >= 0) { | ||
this.bulks.splice(pos, 1); | ||
} | ||
} | ||
//console.dir combinedStream , depth:10 | ||
combinedStream.pipe(dbStream); | ||
}); | ||
} | ||
push() { | ||
this.curBulk.push(Array.from(arguments)); | ||
return ++this.count; | ||
} | ||
getRetiredBulks() { | ||
return this.bulks.concat(); | ||
} | ||
}; | ||
@@ -184,0 +297,0 @@ |
// Generated by CoffeeScript 2.5.1 | ||
(function() { | ||
var FILENAME_PREFIX, NUM_OF_LINE, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, STATEMENT_INSERT2, TABLE_NAME, assert, clickHouseClient, cluster, createCargo, crypto, debuglog, fs, getClickHouseClient, isInited, path, prepareBulkCachFile, statment, toSQLDateString; | ||
var NUM_OF_LINE, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, STATEMENT_INSERT2, StartCountWithinProcess, TABLE_NAME, assert, clickHouseClient, cluster, columnValueString, crashProc, createCargo, crypto, debuglog, fs, getClickHouseClient, isInited, path, prepareBulkCachFile, proc, statment, theCargo, toSQLDateString; | ||
@@ -19,3 +19,3 @@ ({createCargo, isInited, getClickHouseClient} = require("../")); | ||
({FILENAME_PREFIX, toSQLDateString} = require("../bulk")); | ||
({toSQLDateString} = require("../utils")); | ||
@@ -34,28 +34,19 @@ TABLE_NAME = "cargo_test.unittest05"; | ||
STATEMENT_INSERT = `INSERT INTO ${TABLE_NAME}`; | ||
columnValueString = Date.now().toString(36); | ||
STATEMENT_DROP_TABLE = `DROP TABLE ${TABLE_NAME}`; | ||
STATEMENT_INSERT = `INSERT INTO ${TABLE_NAME}`; | ||
NUM_OF_LINE = 36; | ||
STATEMENT_DROP_TABLE = `DROP TABLE IF EXISTS ${TABLE_NAME}`; | ||
STATEMENT_INSERT2 = `insert into ${TABLE_NAME}`; | ||
NUM_OF_LINE = 12; | ||
STATEMENT_INSERT2 = `insert into ${TABLE_NAME}`; | ||
prepareBulkCachFile = function(insertStatement, label) { | ||
var PathToCargoFile, arr, bulkId, content, i, j, ref, theFilepath; | ||
// prepar bulk cache file | ||
PathToCargoFile = path.join(process.cwd(), "cargo_files", "cargo-" + crypto.createHash('md5').update(insertStatement).digest("hex")); | ||
if (fs.existsSync(PathToCargoFile)) { | ||
assert(fs.statSync(PathToCargoFile).isDirectory(), `${PathToCargoFile} is not a directory`); | ||
} else { | ||
fs.mkdirSync(PathToCargoFile, { | ||
recursive: true, | ||
mode: 0o755 | ||
}); | ||
} | ||
bulkId = FILENAME_PREFIX + Date.now().toString(36) + "_1"; | ||
theFilepath = path.join(PathToCargoFile, bulkId); | ||
var arr, content, i, j, ref, theFilepath; | ||
theFilepath = path.join(process.cwd(), "cargo_files", `cargo_${crypto.createHash('md5').update(insertStatement).digest("hex")}.${Date.now().toString(36)}_unittest.nocluster.uncommitted`); | ||
debuglog("[prepare] theFilepath:", theFilepath); | ||
content = ""; | ||
for (i = j = 0, ref = NUM_OF_LINE; (0 <= ref ? j < ref : j > ref); i = 0 <= ref ? ++j : --j) { | ||
arr = [toSQLDateString(new Date()), i, label || "cluster-bulk"]; | ||
arr = [toSQLDateString(new Date()), i, label || "cluster-cargo"]; | ||
content += JSON.stringify(arr) + "\n"; | ||
@@ -73,4 +64,4 @@ } | ||
var i, j; | ||
prepareBulkCachFile(STATEMENT_INSERT, "batchA"); | ||
prepareBulkCachFile(STATEMENT_INSERT2, "batchB"); | ||
prepareBulkCachFile(STATEMENT_INSERT, "~batchA"); | ||
prepareBulkCachFile(STATEMENT_INSERT2, "~batchB"); | ||
// spawn worker | ||
@@ -85,5 +76,19 @@ for (i = j = 0; j < 8; i = ++j) { | ||
debuglog(`[isWorker ${cluster.worker.id}] statment:`, statment); | ||
createCargo(statment); | ||
theCargo = createCargo(statment); | ||
StartCountWithinProcess = 0; | ||
proc = function() { | ||
var i, j, mark, ref; | ||
mark = `worker@${cluster.worker.id}:${StartCountWithinProcess++}`; | ||
debuglog("proc insert: ", mark); | ||
for (i = j = 0, ref = NUM_OF_LINE; (0 <= ref ? j < ref : j > ref); i = 0 <= ref ? ++j : --j) { | ||
theCargo.push(new Date(), i, mark); | ||
} | ||
}; | ||
setInterval(proc, 2000); | ||
crashProc = function() { | ||
nonExistingFunc(); | ||
}; | ||
setTimeout(crashProc, 10000 + Math.random() * 10000 >>> 0); | ||
} | ||
}).call(this); |
// Generated by CoffeeScript 2.5.1 | ||
(function() { | ||
var Cargo, ClickHouse, ClickHouseClient, DEFAULT_BULK_TTL, MIN_BULK_TTL, PathToCargoFile, STATEMENT_TO_CARGO, assert, createCargo, debuglog, err, examCargos, fs, init, os, path, pathToConfig, profileConfig, profileName; | ||
var CLUSTER_WORKER_ID, Cargo, CargoOptions, ClickHouse, ClickHouseClient, STATEMENT_TO_CARGO, assert, cluster, createCargo, debuglog, err, examCargos, fs, init, os, path, pathToConfig, profileConfig, profileName; | ||
debuglog = require("debug")("chcargo:index"); | ||
ClickHouse = require('@apla/clickhouse'); | ||
@@ -17,2 +15,8 @@ | ||
cluster = require('cluster'); | ||
CLUSTER_WORKER_ID = cluster.isMaster ? "nocluster" : cluster.worker.id; | ||
debuglog = require("debug")(`chcargo:index@${CLUSTER_WORKER_ID}`); | ||
Cargo = require("./cargo"); | ||
@@ -22,26 +26,44 @@ | ||
// for how long (in ms) one bulk should keep accumlate inserts | ||
DEFAULT_BULK_TTL = 5 * 1000; | ||
MIN_BULK_TTL = 1000; | ||
STATEMENT_TO_CARGO = {}; | ||
PathToCargoFile = null; | ||
CargoOptions = {}; | ||
// @param Object config: | ||
// .pathToCargoFolder | ||
// .maxTime | ||
// .maxRows | ||
// .commitInterval | ||
init = function(config) { | ||
var commitInterval, isToFlushBeforeCrash, maxRows, maxTime; | ||
assert(ClickHouseClient === null, "ClickHouseClient has already inited"); | ||
assert(config && config.host, "missing host in config"); | ||
config = Object.assign({}, config); // leave input obj unmodified | ||
// prepare disk path | ||
PathToCargoFile = path.resolve(process.cwd(), config.cargoPath || "cargo_files"); | ||
debuglog("[init] PathToCargoFile:", PathToCargoFile); | ||
CargoOptions.pathToCargoFolder = path.resolve(process.cwd(), config.cargoPath || "cargo_files"); | ||
delete config.cargoPath; | ||
if (fs.existsSync(PathToCargoFile)) { | ||
assert(fs.statSync(PathToCargoFile).isDirectory(), `${PathToCargoFile} is not a directory`); | ||
} else { | ||
fs.mkdirSync(PathToCargoFile, { | ||
recursive: true, | ||
mode: 0o755 | ||
}); | ||
// verify cargo can write to the destination folder | ||
fs.accessSync(CargoOptions.pathToCargoFolder, fs.constants.W_OK); //, "Cargo not able to write to folder #{CargoOptions.pathToCargoFolder}" | ||
fs.stat(CargoOptions.pathToCargoFolder, function(err, stats) { | ||
assert(err == null, `Fail to read directory stats. Due to ${err}`); | ||
assert(stats.isDirectory(), `Not a directory: ${CargoOptions.pathToCargoFolder}`); | ||
}); | ||
maxTime = parseInt(config.maxTime); | ||
if (maxTime > 0) { | ||
CargoOptions.maxTime = maxTime; | ||
} | ||
delete config.maxTime; | ||
maxRows = parseInt(config.maxRows); | ||
if (maxRows > 0) { | ||
CargoOptions.maxRows = maxRows; | ||
} | ||
delete config.maxRows; | ||
commitInterval = parseInt(config.commitInterval); | ||
if (commitInterval > 0) { | ||
CargoOptions.commitInterval = commitInterval; | ||
} | ||
delete config.commitInterval; | ||
//debuglog "[init] CargoOptions:", CargoOptions | ||
isToFlushBeforeCrash = config.saveWhenCrash !== false; | ||
delete config.saveWhenCrash; | ||
ClickHouseClient = new ClickHouse(config); | ||
@@ -53,2 +75,14 @@ ClickHouseClient.ping(function(err) { | ||
}); | ||
if (isToFlushBeforeCrash) { | ||
// flush in-memroy data when process crash | ||
process.on('uncaughtException', function(err) { | ||
var cargo, statement; | ||
debuglog("⚠️⚠️⚠️ [flushSyncInMemoryCargo] ⚠️⚠️⚠️ "); | ||
for (statement in STATEMENT_TO_CARGO) { | ||
cargo = STATEMENT_TO_CARGO[statement]; | ||
cargo.flushSync(); | ||
} | ||
throw err; | ||
}); | ||
} | ||
}; | ||
@@ -59,6 +93,5 @@ | ||
// @param statement String, sql insert statement | ||
// @param bulkTTL Int, ttl(in ms) for flush accumlated inserts. default: 5000, min: 1000 | ||
createCargo = function(statement, bulkTTL) { | ||
createCargo = function(statement) { | ||
var cargo; | ||
debuglog(`[createCargo] statement:${statement}, bulkTTL:${bulkTTL}`); | ||
debuglog(`[createCargo] statement:${statement}`); | ||
assert(ClickHouseClient, "ClickHouseClient needs to be inited first"); | ||
@@ -68,13 +101,8 @@ statement = String(statement || "").trim(); | ||
assert(statement.toUpperCase().startsWith("INSERT"), "statement must be an insert sql"); | ||
bulkTTL = parseInt(bulkTTL) || DEFAULT_BULK_TTL; | ||
if (bulkTTL < MIN_BULK_TTL) { | ||
bulkTTL = MIN_BULK_TTL; | ||
} | ||
cargo = STATEMENT_TO_CARGO[statement]; | ||
if (cargo) { | ||
cargo.setBulkTTL(bulkTTL); | ||
debuglog("[createCargo] reuse cargo:", cargo.toString()); | ||
return cargo; | ||
} | ||
cargo = new Cargo(ClickHouseClient, statement, bulkTTL, PathToCargoFile); | ||
cargo = new Cargo(ClickHouseClient, statement, CargoOptions); | ||
STATEMENT_TO_CARGO[statement] = cargo; | ||
@@ -113,3 +141,3 @@ debuglog("[createCargo] cargo:", cargo.toString()); | ||
// self examination routine | ||
setInterval(examCargos, MIN_BULK_TTL); | ||
setInterval(examCargos, 1000); | ||
@@ -116,0 +144,0 @@ module.exports = { |
// Generated by CoffeeScript 2.5.1 | ||
(function() { | ||
// when in cluster worker mode, elect a leader for each cargo to avoid race condition when restoring existing bulks | ||
var MAX_UDP_CONFIRM, NOOP, REG_MSG_VALIDATOR, RemoteClientPortByCargoId, SERVICE_HOST, SERVICE_PORT, SERVICE_TYPE, UDPEchoSrv, assert, cluster, debuglog, dgram, electSelfToALeader, err; | ||
var CLUSTER_WORKER_ID, MAX_UDP_CONFIRM, REG_MSG_VALIDATOR, RemoteClientPortByCargoId, SERVICE_HOST, SERVICE_PORT, SERVICE_TYPE, UDPEchoSrv, assert, cluster, debuglog, detectLeaderWorker, dgram, err; | ||
@@ -12,5 +12,5 @@ assert = require("assert"); | ||
debuglog = require("debug")(`chcargo:leader_election@${cluster.isMaster ? "main" : cluster.worker.id}`); | ||
CLUSTER_WORKER_ID = cluster.isMaster ? "nocluster" : cluster.worker.id; | ||
NOOP = function() {}; | ||
debuglog = require("debug")(`chcargo:leader_election@${CLUSTER_WORKER_ID}`); | ||
@@ -69,7 +69,7 @@ SERVICE_TYPE = "clickhouse-cargo"; | ||
// communicate with other cluster worker and to elect a leader worker for the given cargoId | ||
electSelfToALeader = function(cargoId, callbak = NOOP) { | ||
detectLeaderWorker = function(cargoId, callbak) { | ||
var cargoLeaderId, countSend, msg, procSend, udpClient, workerId; | ||
if (cluster.isMaster && Object.keys(cluster.workers).length === 0) { | ||
debuglog("[electSelfToALeader] single process leader"); | ||
callbak(); | ||
callbak(null, CLUSTER_WORKER_ID); | ||
return; | ||
@@ -105,13 +105,12 @@ } | ||
if (countSend > MAX_UDP_CONFIRM) { | ||
if (cargoLeaderId === workerId) { | ||
debuglog(`[electSelfToALeader@${workerId}] is leader for ${cargoId}`); | ||
callbak(); | ||
} else { | ||
debuglog(`[electSelfToALeader@${workerId}] is follower for ${cargoId}`); | ||
} | ||
udpClient.close(); | ||
callbak(null, cargoLeaderId); | ||
} else { | ||
//if cargoLeaderId is workerId | ||
//debuglog "[electSelfToALeader@#{workerId}] is leader for #{cargoId}" | ||
//else | ||
//debuglog "[electSelfToALeader@#{workerId}] is follower for #{cargoId}" | ||
udpClient.send(msg, 0, msg.length, SERVICE_PORT, SERVICE_HOST); | ||
//setTimeout(procSend, Math.random() * 1000 >>> 0) | ||
setTimeout(procSend, 1000); | ||
setTimeout(procSend, 200); | ||
} | ||
@@ -123,5 +122,5 @@ }; | ||
module.exports = { | ||
electSelfToALeader: electSelfToALeader | ||
detectLeaderWorker: detectLeaderWorker | ||
}; | ||
}).call(this); |
// Generated by CoffeeScript 2.5.1 | ||
(function() { | ||
var QUERY, QUERY1, assert, createCargo, isInited; | ||
var INIT_OPTION, QUERY, QUERY1, assert, createCargo, init, isInited; | ||
({createCargo, isInited} = require("../")); | ||
({createCargo, isInited, init} = require("../")); | ||
assert = require("assert"); | ||
INIT_OPTION = { | ||
host: "localhost", | ||
maxTime: 2000, | ||
maxRows: 100, | ||
commitInterval: 8000 | ||
}; | ||
QUERY = "INSERT INTO test.cargo0 FORMAT JSONCompactEachRow"; | ||
@@ -16,2 +23,8 @@ | ||
cargo0 = null; | ||
//before (done)-> | ||
//init(INIT_OPTION) | ||
//done() | ||
//return | ||
//after -> process.exit(0) | ||
it("auto init when env set", function(done) { | ||
@@ -29,5 +42,8 @@ assert(isInited(), "should auto init when env set"); | ||
cargo0 = createCargo(QUERY); | ||
//console.log cargo0 | ||
assert(cargo0); | ||
assert(cargo0.id); | ||
assert(cargo0.curBulk); | ||
assert(cargo0.id, "bad cargo0.id"); | ||
assert(cargo0.maxTime === INIT_OPTION.maxTime, `bad cargo0.maxTime:${cargo0.maxTime} => ${INIT_OPTION.maxTime}`); | ||
assert(cargo0.maxRows === INIT_OPTION.maxRows, "bad cargo0.maxRows"); | ||
assert(cargo0.commitInterval === INIT_OPTION.commitInterval, "bad cargo0.commitInterval"); | ||
done(); | ||
@@ -34,0 +50,0 @@ }); |
// Generated by CoffeeScript 2.5.1 | ||
(function() { | ||
var NUM_OF_LINE, QUERY, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, TABLE_NAME, assert, createCargo, debuglog, fs, getClickHouseClient, isInited; | ||
var INIT_OPTION, NUM_OF_LINE, QUERY, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, TABLE_NAME, _, assert, createCargo, debuglog, fs, getClickHouseClient, isInited; | ||
@@ -15,2 +15,4 @@ ({createCargo, isInited} = require("../")); | ||
_ = require("lodash"); | ||
TABLE_NAME = "cargo_test.unittest02"; | ||
@@ -22,3 +24,3 @@ | ||
STATEMENT_DROP_TABLE = `DROP TABLE ${TABLE_NAME}`; | ||
STATEMENT_DROP_TABLE = `DROP TABLE IF EXISTS ${TABLE_NAME}`; | ||
@@ -35,38 +37,54 @@ STATEMENT_CREATE_TABLE = `CREATE TABLE IF NOT EXISTS ${TABLE_NAME} | ||
// refer | ||
INIT_OPTION = { | ||
host: "localhost", | ||
maxTime: 2000, | ||
maxRows: 100, | ||
commitInterval: 8000 | ||
}; | ||
//NUM_OF_LINE = 80 | ||
NUM_OF_LINE = 100; // NOTE: bulk flushs every 100 lines | ||
NUM_OF_LINE = 429; // NOTE: bulk flushs every 100 lines | ||
describe("push log to cargo", function() { | ||
var theBulk, theCargo, theFilepath; | ||
this.timeout(5000); | ||
var columnValueString, theCargo; | ||
this.timeout(20000); | ||
theCargo = null; | ||
theBulk = null; | ||
theFilepath = null; | ||
columnValueString = Date.now().toString(36); | ||
before(function(done) { | ||
debuglog("[before]"); | ||
theCargo = createCargo(QUERY, 999000); | ||
theBulk = theCargo.curBulk; | ||
theFilepath = theBulk.pathToFile; | ||
getClickHouseClient().query(STATEMENT_CREATE_TABLE, done); | ||
}); | ||
after(function(done) { | ||
debuglog("[after] query:", STATEMENT_DROP_TABLE); | ||
getClickHouseClient().query(STATEMENT_DROP_TABLE, function(err) { | ||
done(err); | ||
if (err != null) { | ||
throw err; | ||
} | ||
getClickHouseClient().query(STATEMENT_CREATE_TABLE, function(err) { | ||
if (err != null) { | ||
throw err; | ||
} | ||
theCargo = createCargo(QUERY); | ||
if (fs.existsSync(theCargo.pathToCargoFile)) { // clean up existing log | ||
fs.unlinkSync(theCargo.pathToCargoFile); | ||
} | ||
done(); | ||
}); | ||
}); | ||
}); | ||
//after -> process.exit(0) | ||
it("push to cargo", function(done) { | ||
var i, j, ref; | ||
for (i = j = 0, ref = NUM_OF_LINE; (0 <= ref ? j < ref : j > ref); i = 0 <= ref ? ++j : --j) { | ||
theCargo.push(new Date(), i, "string"); | ||
theCargo.push(new Date(), i, columnValueString); | ||
} | ||
assert(fs.existsSync(theFilepath), `log file not exist on ${theFilepath}`); | ||
setTimeout(done, 2000); // wait file stream flush | ||
setTimeout(done, 5000); // wait file stream flush | ||
}); | ||
it("cargo should flush to file", function() { | ||
assert(fs.existsSync(theCargo.pathToCargoFile), `log file not exist on ${theCargo.pathToCargoFile}`); | ||
}); | ||
return it("exam content written on hd file", function(done) { | ||
var contentInHD, contentInHDArr, i, j, len, line; | ||
contentInHD = fs.readFileSync(theFilepath, 'utf8'); | ||
contentInHDArr = contentInHD.split(/\r|\n|\r\n/); | ||
debuglog("[exam hd content] contentInHDArr:", contentInHDArr.length); | ||
assert(contentInHDArr.length === NUM_OF_LINE, "unmatching output length"); | ||
contentInHD = fs.readFileSync(theCargo.pathToCargoFile, 'utf8'); | ||
//debuglog "[exam hd content] contentInHD:", contentInHD | ||
contentInHDArr = _.compact(contentInHD.split(/\r|\n|\r\n/)); | ||
//debuglog "[exam hd content] contentInHDArr:", contentInHDArr | ||
assert(contentInHDArr.length === NUM_OF_LINE, `unmatching output length. NUM_OF_LINE:${NUM_OF_LINE}, contentInHDArr.length:${contentInHDArr.length}`); | ||
for (i = j = 0, len = contentInHDArr.length; j < len; i = ++j) { | ||
@@ -77,3 +95,3 @@ line = contentInHDArr[i]; | ||
assert(line[1] === i, "unmatching field 1 "); | ||
assert(line[2] === "string", "unmatching field 2 "); | ||
assert(line[2] === columnValueString, "unmatching field 2 "); | ||
} | ||
@@ -80,0 +98,0 @@ done(); |
// Generated by CoffeeScript 2.5.1 | ||
(function() { | ||
var NUM_OF_LINE, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, STATEMENT_SELECT, TABLE_NAME, assert, createCargo, debuglog, fs, getClickHouseClient, isInited; | ||
var INIT_OPTION, NUM_OF_LINE, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, STATEMENT_SELECT, TABLE_NAME, assert, columnValueString, createCargo, debuglog, fs, getClickHouseClient, isInited; | ||
@@ -27,7 +27,16 @@ ({createCargo, isInited, getClickHouseClient} = require("../")); | ||
STATEMENT_DROP_TABLE = `DROP TABLE ${TABLE_NAME}`; | ||
STATEMENT_DROP_TABLE = `DROP TABLE IF EXISTS ${TABLE_NAME}`; | ||
//STATEMENT_SELECT = "SELECT * FROM #{TABLE_NAME} LIMIT 10000000" | ||
STATEMENT_SELECT = `SELECT * FROM ${TABLE_NAME} LIMIT 10000000 FORMAT JSONCompactEachRow `; | ||
columnValueString = Date.now().toString(36); | ||
STATEMENT_SELECT = `SELECT * FROM ${TABLE_NAME} WHERE pos_id='${columnValueString}' LIMIT 100000 FORMAT JSONCompactEachRow `; | ||
// refer | ||
INIT_OPTION = { | ||
host: "localhost", | ||
maxTime: 2000, | ||
maxRows: 100, | ||
commitInterval: 8000 | ||
}; | ||
NUM_OF_LINE = 27891; // NOTE: bulk flushs every 100 lines | ||
@@ -37,34 +46,33 @@ | ||
describe("commit bulk", function() { | ||
var theBulk, theCargo, theFilepath; | ||
this.timeout(60000); | ||
var theCargo; | ||
this.timeout(30000); | ||
theCargo = null; | ||
theBulk = null; | ||
theFilepath = null; | ||
before(function(done) { | ||
theCargo = createCargo(STATEMENT_INSERT); | ||
theBulk = theCargo.curBulk; | ||
theFilepath = theBulk.pathToFile; | ||
getClickHouseClient().query(STATEMENT_CREATE_TABLE, done); | ||
}); | ||
after(function(done) { | ||
debuglog("[after] query:", STATEMENT_DROP_TABLE); | ||
debuglog("[before]"); | ||
getClickHouseClient().query(STATEMENT_DROP_TABLE, function(err) { | ||
done(err); | ||
if (err != null) { | ||
throw err; | ||
} | ||
getClickHouseClient().query(STATEMENT_CREATE_TABLE, function(err) { | ||
if (err != null) { | ||
throw err; | ||
} | ||
theCargo = createCargo(STATEMENT_INSERT); | ||
if (fs.existsSync(theCargo.pathToCargoFile)) { // clean up existing log | ||
fs.unlinkSync(theCargo.pathToCargoFile); | ||
} | ||
done(); | ||
}); | ||
}); | ||
}); | ||
//process.exit() if not err? | ||
//after -> process.exit(0) | ||
it("push to cargo", function(done) { | ||
var i, j, ref; | ||
for (i = j = 0, ref = NUM_OF_LINE; (0 <= ref ? j < ref : j > ref); i = 0 <= ref ? ++j : --j) { | ||
theCargo.push(new Date(), i, "string"); | ||
theCargo.push(new Date(), i, columnValueString); | ||
} | ||
setTimeout(done, 10000); // wait file stream flush | ||
setTimeout(done, 15000); // wait file stream flush | ||
}); | ||
it("bulk should committed", function(done) { | ||
var curBulk; | ||
assert(theBulk.isCommitted(), "the bulk should committed"); | ||
curBulk = theCargo.curBulk; | ||
assert(curBulk !== theBulk, "previouse bulk should not be the current bulk"); | ||
assert(theCargo.getRetiredBulks().length === 0, "committed bulks should be cleared"); | ||
assert(!fs.existsSync(theFilepath), "local file must be cleared"); | ||
assert(!fs.existsSync(theCargo.pathToCargoFile), "local file must be cleared"); | ||
done(); | ||
@@ -97,3 +105,3 @@ }); | ||
assert(row[1] === i, "unmatching field 1 "); | ||
assert(row[2] === "string", "unmatching field 2 "); | ||
assert(row[2] === columnValueString, "unmatching field 2 "); | ||
} | ||
@@ -100,0 +108,0 @@ done(); |
// Generated by CoffeeScript 2.5.1 | ||
(function() { | ||
var FILENAME_PREFIX, NUM_OF_LINE, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, STATEMENT_SELECT, TABLE_NAME, assert, createCargo, crypto, debuglog, fs, getClickHouseClient, isInited, path, toSQLDateString; | ||
var NUM_OF_LINE, STATEMENT_CREATE_TABLE, STATEMENT_DROP_TABLE, STATEMENT_INSERT, STATEMENT_SELECT, TABLE_NAME, assert, columnValueString, createCargo, crypto, debuglog, fs, getClickHouseClient, isInited, path, toSQLDateString; | ||
@@ -17,3 +17,3 @@ ({createCargo, isInited, getClickHouseClient} = require("../")); | ||
({FILENAME_PREFIX, toSQLDateString} = require("../bulk")); | ||
({toSQLDateString} = require("../utils")); | ||
@@ -34,9 +34,11 @@ TABLE_NAME = "cargo_test.unittest04"; | ||
STATEMENT_DROP_TABLE = `DROP TABLE ${TABLE_NAME}`; | ||
columnValueString = Date.now().toString(36); | ||
STATEMENT_SELECT = `SELECT * FROM ${TABLE_NAME} LIMIT 10000000 FORMAT JSONCompactEachRow `; | ||
STATEMENT_DROP_TABLE = `DROP TABLE IF EXISTS ${TABLE_NAME}`; | ||
STATEMENT_SELECT = `SELECT * FROM ${TABLE_NAME} WHERE pos_id='${columnValueString}' LIMIT 100000 FORMAT JSONCompactEachRow `; | ||
NUM_OF_LINE = 3396; | ||
describe("restore-local-bulks", function() { | ||
describe("restore-local-rotations", function() { | ||
var theCargo, theFilepath; | ||
@@ -48,4 +50,4 @@ this.timeout(60000); | ||
getClickHouseClient().query(STATEMENT_DROP_TABLE, function(err) { | ||
if (err == null) { | ||
process.exit(); | ||
if (err != null) { | ||
throw err; | ||
} | ||
@@ -55,27 +57,13 @@ getClickHouseClient().query(STATEMENT_CREATE_TABLE, done); | ||
}); | ||
//after (done)-> | ||
//debuglog "[after] query:", STATEMENT_DROP_TABLE | ||
//getClickHouseClient().query STATEMENT_DROP_TABLE, (err)-> | ||
//done(err) | ||
//process.exit() if not err? | ||
//return | ||
//return | ||
it("prepare local bulks", function(done) { | ||
var PathToCargoFile, arr, bulkId, content, err, i, j, ref; | ||
after(function() { | ||
return process.exit(0); | ||
}); | ||
it("prepare local rotations", function(done) { | ||
var arr, content, err, i, j, ref; | ||
try { | ||
PathToCargoFile = path.join(process.cwd(), "cargo_files", "cargo-" + crypto.createHash('md5').update(STATEMENT_INSERT).digest("hex")); | ||
if (fs.existsSync(PathToCargoFile)) { | ||
assert(fs.statSync(PathToCargoFile).isDirectory(), `${PathToCargoFile} is not a directory`); | ||
} else { | ||
fs.mkdirSync(PathToCargoFile, { | ||
recursive: true, | ||
mode: 0o755 | ||
}); | ||
} | ||
bulkId = FILENAME_PREFIX + Date.now().toString(36) + "_1"; | ||
theFilepath = path.join(PathToCargoFile, bulkId); | ||
theFilepath = path.join(process.cwd(), "cargo_files", `cargo_${crypto.createHash('md5').update(STATEMENT_INSERT).digest("hex")}.${Date.now().toString(36)}_unittest.nocluster.uncommitted`); | ||
debuglog("[prepare] theFilepath:", theFilepath); | ||
content = ""; | ||
for (i = j = 0, ref = NUM_OF_LINE; (0 <= ref ? j < ref : j > ref); i = 0 <= ref ? ++j : --j) { | ||
arr = [toSQLDateString(new Date()), i, "test04"]; | ||
arr = [toSQLDateString(new Date()), i, columnValueString]; | ||
content += JSON.stringify(arr) + "\n"; | ||
@@ -90,3 +78,3 @@ } | ||
} | ||
setTimeout(done, 10000); // wait for cargo.exam | ||
setTimeout(done, 20000); // wait for cargo.exam | ||
}); | ||
@@ -118,3 +106,3 @@ it("local bulks should be commit to ClickHouse", function(done) { | ||
assert(row[1] === i, "unmatching field 1 "); | ||
assert(row[2] === "local-bulk", "unmatching field 2 "); | ||
assert(row[2] === columnValueString, "unmatching field 2 "); | ||
} | ||
@@ -121,0 +109,0 @@ done(); |
{ | ||
"name": "clickhouse-cargo", | ||
"version": "1.1.1", | ||
"version": "2.0.0", | ||
"description": "Accumulates insert queries and commit them to clickhouse in batch jobs with retries on failure. 这个模块将向 Clickhouse 数据库的高频写入改为低频的批量插入。", | ||
@@ -36,2 +36,3 @@ "main": "lib/index.js", | ||
"coffeescript": "^2.5.1", | ||
"lodash": "^4.17.20", | ||
"mocha": "^8.1.3", | ||
@@ -42,4 +43,5 @@ "nyc": "^15.1.0" | ||
"@apla/clickhouse": "^1.6.3", | ||
"debug": "^4.1.1" | ||
"debug": "^4.1.1", | ||
"multistream": "^4.0.0" | ||
} | ||
} |
@@ -14,23 +14,21 @@ # clickhouse-cargo | ||
1. The `cargo` instance accepts insert requests submitted by the `push` method and routes these requests to a `bulk`. | ||
2. The `bulk` writes accumulated `push` in the memory to a local file cache according to the setting of `stream.cork`. | ||
3. `cargo` checks all online `bulks` regularly. When a `bulk` exceeds its `bulkTTL`, it will then commit its local file cache to the Clickhouse server. | ||
4. In case of a Clickhouse commit failure, `bulk` will retry the submission in the next round of inspection cycle until the submission is successful. | ||
5. In case of the NodeJS process crash. local `bulk` file caches will remain on disk. Thus next time when `clickHouse-cargo` module starts, `cargo` checks the remaining `bulk` cache files, and submit them to Clickhouse again. | ||
1. A `cargo` instance accepts insert requests submitted by the `push` method and keep inserts in-memory. | ||
2. The `cargo` instance periodically flushs in-memory inserts to a file cache, then rotates this file and commits rotations to the Clickhouse server. | ||
3. In case of a Clickhouse commit failure, the cargo will retry the submission in the next round of its routine till the submission is successful. | ||
4. In case of the NodeJS process crash. in-memory inserts will be flushed immediately into the file cache. | ||
### Cluster mode support | ||
When running in cluster mode (such as [PM2 cluster deployment](https://pm2.keymetrics.io/docs/usage/cluster-mode/) ), all cargo workers will run through an election via udp communication @ 127.0.0.1:17888 to elect a leader worker. Then the leader worker will take care of restoring existing bulks. | ||
When running in cluster mode (such as [PM2 cluster deployment](https://pm2.keymetrics.io/docs/usage/cluster-mode/) ), all cargo workers will run through an election via udp communication @ 127.0.0.1:17888 to elect a leader worker. Then only the leader worker will carry on with file rotations and commitments. | ||
## 工作原理 | ||
1. `cargo` 实例接受 `push`方法所提交的插入请求,并将请求路由给 `bulk`。 | ||
2. `bulk` 根据 `stream.cork` 的设定,按量将内存中累计的 `push` 写入本地文件缓存。 | ||
3. `cargo` 定时检查所有在线的 `bulk`, 当 `bulk` 的存活超过 `bulkTTL` 的设定时,将 `bulk` 所对应的本地文件缓存提交到 Clickhouse 服务器。 | ||
4. 当 Clickhouse 写入失败时,`bulk` 将会在下一轮检查周期中重试提交直到提交成功。 | ||
5. 当本地的 NodeJS 进程奔溃时,都会导致本地的 `bulk` 文件缓存残留。于是下一次启动 `clickHouse-cargo` 模块时, `cargo` 检查到残留的 `bulk` 缓存文件时将再次提交给 Clickhouse。 | ||
1. `cargo` 实例接受 `push`方法所提交的插入请求,并将请求临时存放于内存中。 | ||
1. `cargo` 周期性地将内存中累积的插入记录写入对应的文件缓存。随后将文件缓存进行滚动,并将滚出结果提交到 Clickhouse 数据库。 | ||
4. 当向 Clickhouse 写入失败时,`cargo` 将会在下一轮检查周期中重试提交直到提交成功。 | ||
5. 当本地的 NodeJS 进程奔溃时,内存中累积的插入请求会被同步写入对应的文件缓存。 | ||
### 支持集群模式 | ||
在集群模式下,所有的 cargo woker 将通过UDP通讯选举出一个领头的worker。 接着由这个领头的worker来负责恢复文件残留缓存的工作。 | ||
在集群模式下,所有的 cargo woker 将通过UDP通讯选举出一个领头的worker。 接着由这个领头的worker来负责文件缓存的滚动和提交到 Clickhouse 数据库。 | ||
@@ -92,3 +90,7 @@ | ||
| `host` | ✓ | | Host to connect. | ||
| `cargoPath` | | `${cwd()}/cargo_files` | Path to local cargo cache | ||
| `cargoPath` | | `${cwd()}/cargo_files` | Path to local cargo cache. | ||
| `maxTime` | | 1000 | For how long in milliseconds, a cargo will keep in-memory insert buffer before flushing it to file. | ||
| `maxRows` | | 100 | For how many rows a cargo will keep in-memory. | ||
| `commitInterval` | | 5000 | Interval(ms) for cargo to commit to ClickHouse. | ||
| `saveWhenCrash` | | true | When `false`, cargos will not flushSync in-memory data when node process crashes. | ||
| `user` | | | Authentication user. | ||
@@ -115,3 +117,3 @@ | `password` | | | Authentication password. | ||
*/ | ||
const cargo = clickhouse-cargo.createCargo(statement, bulkTTL); | ||
const cargo = clickhouse-cargo.createCargo(statement); | ||
``` | ||
@@ -118,0 +120,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
72725
909
129
9
3
4
+ Addedmultistream@^4.0.0
+ Addedinherits@2.0.4(transitive)
+ Addedmultistream@4.1.0(transitive)
+ Addedonce@1.4.0(transitive)
+ Addedreadable-stream@3.6.2(transitive)
+ Addedsafe-buffer@5.2.1(transitive)
+ Addedstring_decoder@1.3.0(transitive)
+ Addedutil-deprecate@1.0.2(transitive)
+ Addedwrappy@1.0.2(transitive)