tardis-dev
Advanced tools
Comparing version 12.4.0 to 12.4.1
@@ -33,2 +33,3 @@ import { Mapper } from './mappers'; | ||
}): Promise<void>; | ||
export declare function cleanTempFiles(): void; | ||
export declare class CircularBuffer<T> { | ||
@@ -35,0 +36,0 @@ private readonly _bufferSize; |
@@ -25,3 +25,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.CappedSet = exports.CircularBuffer = exports.download = exports.optimizeFilters = exports.parseμs = exports.batch = exports.getFilters = exports.normalizeMessages = exports.take = exports.HttpError = exports.ONE_SEC_IN_MS = exports.sequence = exports.addDays = exports.addMinutes = exports.sha256 = exports.doubleDigit = exports.formatDateToPath = exports.wait = exports.parseAsUTCDate = void 0; | ||
exports.CappedSet = exports.CircularBuffer = exports.cleanTempFiles = exports.download = exports.optimizeFilters = exports.parseμs = exports.batch = exports.getFilters = exports.normalizeMessages = exports.take = exports.HttpError = exports.ONE_SEC_IN_MS = exports.sequence = exports.addDays = exports.addMinutes = exports.sha256 = exports.doubleDigit = exports.formatDateToPath = exports.wait = exports.parseAsUTCDate = void 0; | ||
const crypto_1 = __importStar(require("crypto")); | ||
@@ -268,2 +268,7 @@ const fs_extra_1 = require("fs-extra"); | ||
exports.download = download; | ||
const tmpFileCleanups = new Map(); | ||
function cleanTempFiles() { | ||
tmpFileCleanups.forEach((cleanup) => cleanup()); | ||
} | ||
exports.cleanTempFiles = cleanTempFiles; | ||
async function _downloadFile(requestOptions, url, downloadPath) { | ||
@@ -275,2 +280,10 @@ // first ensure that directory where we want to download file exists | ||
const fileWriteStream = fs_extra_1.createWriteStream(tmpFilePath); | ||
const cleanup = () => { | ||
try { | ||
fileWriteStream.destroy(); | ||
fs_extra_1.removeSync(tmpFilePath); | ||
} | ||
catch { } | ||
}; | ||
tmpFileCleanups.set(tmpFilePath, cleanup); | ||
try { | ||
@@ -319,7 +332,4 @@ // based on https://github.com/nodejs/node/issues/28172 - only reliable way to consume response stream and avoiding all the 'gotchas' | ||
finally { | ||
fileWriteStream.destroy(); | ||
try { | ||
fs_extra_1.removeSync(tmpFilePath); | ||
} | ||
catch { } | ||
tmpFileCleanups.delete(tmpFilePath); | ||
cleanup(); | ||
} | ||
@@ -326,0 +336,0 @@ } |
@@ -77,8 +77,12 @@ "use strict"; | ||
.pipe(zlib_1.default.createGunzip({ chunkSize: 128 * 1024 })) | ||
.on('error', (err) => { | ||
.on('error', function onGunzipError(err) { | ||
debug_1.debug('gunzip error %o', err); | ||
replayError = err; | ||
linesStream.destroy(err); | ||
}) | ||
// and split by new line | ||
.pipe(new binarysplit_1.BinarySplitStream()); | ||
.pipe(new binarysplit_1.BinarySplitStream()) | ||
.on('error', function onBinarySplitStreamError(err) { | ||
debug_1.debug('binary split stream error %o', err); | ||
linesStream.destroy(err); | ||
}); | ||
let linesCount = 0; | ||
@@ -143,6 +147,20 @@ // date is always formatted to have lendth of 28 so we can skip looking for first space in line and use it | ||
} | ||
await worker.terminate(); | ||
await terminateWorker(worker, 500); | ||
} | ||
} | ||
exports.replay = replay; | ||
// gracefully terminate worker | ||
async function terminateWorker(worker, waitTimeout) { | ||
let cancelWait = () => { }; | ||
const maxWaitGuard = new Promise((resolve) => { | ||
const timeoutId = setTimeout(resolve, waitTimeout); | ||
cancelWait = () => clearTimeout(timeoutId); | ||
}); | ||
const readyToTerminate = new Promise((resolve) => { | ||
worker.once('message', (signal) => signal === "READY_TO_TERMINATE" /* READY_TO_TERMINATE */ && resolve()); | ||
}).then(cancelWait); | ||
worker.postMessage("BEFORE_TERMINATE" /* BEFORE_TERMINATE */); | ||
await Promise.race([readyToTerminate, maxWaitGuard]); | ||
await worker.terminate(); | ||
} | ||
function replayNormalized({ exchange, symbols, from, to, withDisconnectMessages = undefined, apiKey = undefined, autoCleanup = undefined, waitWhenDataNotYetAvailable = undefined }, ...normalizers) { | ||
@@ -149,0 +167,0 @@ // mappers assume that symbols are uppercased by default |
@@ -17,2 +17,6 @@ import { Exchange, Filter } from './types'; | ||
}; | ||
export declare const enum WorkerSignal { | ||
BEFORE_TERMINATE = "BEFORE_TERMINATE", | ||
READY_TO_TERMINATE = "READY_TO_TERMINATE" | ||
} | ||
//# sourceMappingURL=worker.d.ts.map |
@@ -16,2 +16,8 @@ "use strict"; | ||
else { | ||
worker_threads_1.parentPort.on('message', (signal) => { | ||
if (signal === "BEFORE_TERMINATE" /* BEFORE_TERMINATE */) { | ||
handy_1.cleanTempFiles(); | ||
worker_threads_1.parentPort.postMessage("READY_TO_TERMINATE" /* READY_TO_TERMINATE */); | ||
} | ||
}); | ||
getDataFeedSlices(worker_threads_1.workerData); | ||
@@ -18,0 +24,0 @@ } |
{ | ||
"name": "tardis-dev", | ||
"version": "12.4.0", | ||
"version": "12.4.1", | ||
"engines": { | ||
@@ -5,0 +5,0 @@ "node": ">=12" |
@@ -283,2 +283,8 @@ import crypto, { createHash } from 'crypto' | ||
const tmpFileCleanups = new Map<string, () => void>() | ||
export function cleanTempFiles() { | ||
tmpFileCleanups.forEach((cleanup) => cleanup()) | ||
} | ||
async function _downloadFile(requestOptions: RequestOptions, url: string, downloadPath: string) { | ||
@@ -292,2 +298,10 @@ // first ensure that directory where we want to download file exists | ||
const fileWriteStream = createWriteStream(tmpFilePath) | ||
const cleanup = () => { | ||
try { | ||
fileWriteStream.destroy() | ||
removeSync(tmpFilePath) | ||
} catch {} | ||
} | ||
tmpFileCleanups.set(tmpFilePath, cleanup) | ||
try { | ||
@@ -334,6 +348,4 @@ // based on https://github.com/nodejs/node/issues/28172 - only reliable way to consume response stream and avoiding all the 'gotchas' | ||
} finally { | ||
fileWriteStream.destroy() | ||
try { | ||
removeSync(tmpFilePath) | ||
} catch {} | ||
tmpFileCleanups.delete(tmpFilePath) | ||
cleanup() | ||
} | ||
@@ -340,0 +352,0 @@ } |
@@ -12,3 +12,3 @@ import { createReadStream } from 'fs-extra' | ||
import { Disconnect, Exchange, FilterForExchange } from './types' | ||
import { WorkerJobPayload, WorkerMessage } from './worker' | ||
import { WorkerJobPayload, WorkerMessage, WorkerSignal } from './worker' | ||
import { clearCacheSync } from './clearcache' | ||
@@ -109,8 +109,12 @@ | ||
.pipe(zlib.createGunzip({ chunkSize: 128 * 1024 })) | ||
.on('error',(err) => { | ||
.on('error', function onGunzipError(err) { | ||
debug('gunzip error %o', err) | ||
replayError = err | ||
linesStream.destroy(err) | ||
}) | ||
// and split by new line | ||
.pipe(new BinarySplitStream()) | ||
.on('error', function onBinarySplitStreamError(err) { | ||
debug('binary split stream error %o', err) | ||
linesStream.destroy(err) | ||
}) | ||
@@ -199,6 +203,23 @@ let linesCount = 0 | ||
await worker.terminate() | ||
await terminateWorker(worker, 500) | ||
} | ||
} | ||
// gracefully terminate worker | ||
async function terminateWorker(worker: Worker, waitTimeout: number) { | ||
let cancelWait = () => {} | ||
const maxWaitGuard = new Promise((resolve) => { | ||
const timeoutId = setTimeout(resolve, waitTimeout) | ||
cancelWait = () => clearTimeout(timeoutId) | ||
}) | ||
const readyToTerminate = new Promise((resolve) => { | ||
worker.once('message', (signal) => signal === WorkerSignal.READY_TO_TERMINATE && resolve()) | ||
}).then(cancelWait) | ||
worker.postMessage(WorkerSignal.BEFORE_TERMINATE) | ||
await Promise.race([readyToTerminate, maxWaitGuard]) | ||
await worker.terminate() | ||
} | ||
export function replayNormalized<T extends Exchange, U extends MapperFactory<T, any>[], Z extends boolean = false>( | ||
@@ -205,0 +226,0 @@ { |
import dbg from 'debug' | ||
import { existsSync } from 'fs-extra' | ||
import { existsSync, removeSync } from 'fs-extra' | ||
import pMap from 'p-map' | ||
import { isMainThread, parentPort, workerData } from 'worker_threads' | ||
import { addMinutes, download, formatDateToPath, optimizeFilters, sequence, sha256, wait } from './handy' | ||
import { addMinutes, download, formatDateToPath, optimizeFilters, sequence, sha256, wait, cleanTempFiles } from './handy' | ||
import { Exchange, Filter } from './types' | ||
@@ -13,2 +13,8 @@ | ||
} else { | ||
parentPort!.on('message', (signal: WorkerSignal) => { | ||
if (signal === WorkerSignal.BEFORE_TERMINATE) { | ||
cleanTempFiles() | ||
parentPort!.postMessage(WorkerSignal.READY_TO_TERMINATE) | ||
} | ||
}) | ||
getDataFeedSlices(workerData as WorkerJobPayload) | ||
@@ -131,1 +137,6 @@ } | ||
} | ||
export const enum WorkerSignal { | ||
BEFORE_TERMINATE = 'BEFORE_TERMINATE', | ||
READY_TO_TERMINATE = 'READY_TO_TERMINATE' | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1018173
18251
185
4