clickhouse-buffer
Advanced tools
Comparing version 1.6.0 to 1.7.0
@@ -6,5 +6,21 @@ "use strict"; | ||
const fs_1 = require("fs"); | ||
const zlib_1 = require("zlib"); | ||
function getStreamDecoder(fileExtension) { | ||
switch (fileExtension) { | ||
case 'gz': | ||
return (0, zlib_1.createGunzip)(); | ||
case "br": | ||
return (0, zlib_1.createBrotliDecompress)(); | ||
case "deflate": | ||
return (0, zlib_1.createInflate)(); | ||
} | ||
} | ||
function addFilesToStream(passThrough, files, options) { | ||
const file = files.shift(); | ||
const stream = (0, fs_1.createReadStream)(file, options); | ||
const fileExtension = file.split('.').pop(); | ||
let stream = (0, fs_1.createReadStream)(file, options); | ||
const decoder = getStreamDecoder(fileExtension); | ||
if (decoder) { | ||
stream = stream.pipe(decoder); | ||
} | ||
if (files.length > 0) { | ||
@@ -11,0 +27,0 @@ stream.pipe(passThrough, { end: false }); |
@@ -23,2 +23,3 @@ import { ClickhouseClient } from "@watchdg/clickhouse-client"; | ||
compressed?: 'gzip' | 'br' | 'deflate'; | ||
compressedFiles?: 'gzip' | 'br' | 'deflate'; | ||
} | ||
@@ -33,2 +34,3 @@ declare type columnType = string | number | Date | boolean; | ||
private readonly compressed?; | ||
private readonly compressedFiles?; | ||
private readonly maxRowsPerFile; | ||
@@ -51,2 +53,3 @@ private readonly maxRowsInMemory; | ||
private static flushToFiles; | ||
private static getStreamEncoder; | ||
private static loadToDatabase; | ||
@@ -53,0 +56,0 @@ static getRowsInFiles(files: string[]): number; |
@@ -51,2 +51,5 @@ "use strict"; | ||
} | ||
if (options.compressedFiles) { | ||
this.compressedFiles = options.compressedFiles; | ||
} | ||
this.database = options.database ?? clickhouse_client_1.DEFAULT_DATABASE; | ||
@@ -97,26 +100,24 @@ this.table = options.table; | ||
const numBytesToFile = ClickhouseBuffer.calcBytes(rowsToFile); | ||
const dataToFile = rowsToFile.join('\n') + '\n'; | ||
const filename = `${sortKeys}_${part}_r${numRowsToFile}_b${numBytesToFile}`; | ||
if (self.compressed === 'gzip') { | ||
let dataToFile = rowsToFile.join('\n') + '\n'; | ||
let filename = `${sortKeys}_${part}_r${numRowsToFile}_b${numBytesToFile}`; | ||
if (self.compressedFiles === 'gzip') { | ||
const gzip = (0, zlib_1.createGzip)(); | ||
gzip.write(dataToFile); | ||
await (0, promises_1.writeFile)(path_1.default.join(self.directoryPath, filename + '.gz'), dataToFile, { mode: self.fsMode }); | ||
files.push(filename + '.gz'); | ||
gzip.end(dataToFile); | ||
dataToFile = gzip; | ||
filename += '.gz'; | ||
} | ||
else if (self.compressed === 'br') { | ||
else if (self.compressedFiles === 'br') { | ||
const br = (0, zlib_1.createBrotliCompress)(); | ||
br.write(dataToFile); | ||
await (0, promises_1.writeFile)(path_1.default.join(self.directoryPath, filename + '.br'), dataToFile, { mode: self.fsMode }); | ||
files.push(filename + '.br'); | ||
br.end(dataToFile); | ||
dataToFile = br; | ||
filename += '.br'; | ||
} | ||
else if (self.compressed === 'deflate') { | ||
else if (self.compressedFiles === 'deflate') { | ||
const deflate = (0, zlib_1.createDeflate)(); | ||
deflate.write(dataToFile); | ||
await (0, promises_1.writeFile)(path_1.default.join(self.directoryPath, filename + '.df'), dataToFile, { mode: self.fsMode }); | ||
files.push(filename + '.df'); | ||
deflate.end(dataToFile); | ||
dataToFile = deflate; | ||
filename += '.deflate'; | ||
} | ||
else { | ||
await (0, promises_1.writeFile)(path_1.default.join(self.directoryPath, filename), dataToFile, { mode: self.fsMode }); | ||
files.push(filename); | ||
} | ||
await (0, promises_1.writeFile)(path_1.default.join(self.directoryPath, filename), dataToFile, { mode: self.fsMode }); | ||
files.push(filename); | ||
} | ||
@@ -135,2 +136,12 @@ self.files.push(...files); | ||
} | ||
static getStreamEncoder(contentEncoding) { | ||
switch (contentEncoding) { | ||
case 'gzip': | ||
return (0, zlib_1.createGzip)(); | ||
case 'br': | ||
return (0, zlib_1.createBrotliCompress)(); | ||
case 'deflate': | ||
return (0, zlib_1.createDeflate)(); | ||
} | ||
} | ||
static async loadToDatabase(self, files) { | ||
@@ -141,6 +152,11 @@ const rowsInFiles = ClickhouseBuffer.getRowsInFiles(files); | ||
}); | ||
const stream = (0, files_to_stream_1.filesToStream)(Array.from(paths)); | ||
let stream = (0, files_to_stream_1.filesToStream)(Array.from(paths)); | ||
const encoder = ClickhouseBuffer.getStreamEncoder(self.compressed); | ||
if (encoder) { | ||
stream = stream.pipe(encoder); | ||
} | ||
await self.clickhouseClient.query({ | ||
query: self.insertStatement, | ||
data: stream | ||
data: stream, | ||
compressed: self.compressed | ||
}); | ||
@@ -147,0 +163,0 @@ self.lastLoadDate = Date.now(); |
{ | ||
"name": "clickhouse-buffer", | ||
"version": "1.6.0", | ||
"version": "1.7.0", | ||
"license": "MIT", | ||
@@ -23,3 +23,3 @@ "author": { | ||
"dependencies": { | ||
"@watchdg/clickhouse-client": "^1.7.0" | ||
"@watchdg/clickhouse-client": "^1.8.0" | ||
}, | ||
@@ -26,0 +26,0 @@ "devDependencies": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
16705
354