@metrichor/epi2me-web
Advanced tools
Comparing version 5.0.5200121 to 5.0.5353744
@@ -8,2 +8,3 @@ { | ||
"uploadTimeout": 1200, | ||
"uploadRetries": 10, | ||
"downloadTimeout": 1200, | ||
@@ -10,0 +11,0 @@ "fileCheckInterval": 5, |
@@ -22,3 +22,2 @@ "use strict"; | ||
const factory_1 = require("./factory"); | ||
const filestats_1 = __importDefault(require("./filestats")); | ||
const rest_fs_1 = require("./rest-fs"); | ||
@@ -36,2 +35,3 @@ const sample_reader_1 = require("./sample-reader"); | ||
const queue_1 = require("./queue"); | ||
const filestats_1 = require("./filestats"); | ||
const fileUploader_1 = require("./fileUploader"); | ||
@@ -57,5 +57,4 @@ const networkStreamErrors = new WeakSet(); | ||
this.SampleReader = new sample_reader_1.SampleReader(); | ||
this.uploadsInProgress = []; | ||
} | ||
async sessionedS3() { | ||
async sessionedS3(options = {}) { | ||
if (!this.sessionManager) { | ||
@@ -65,7 +64,5 @@ this.sessionManager = this.initSessionManager(); | ||
await this.sessionManager.session(); | ||
return new aws_sdk_1.default.S3({ | ||
useAccelerateEndpoint: this.config.options.awsAcceleration === 'on', | ||
}); | ||
return new aws_sdk_1.default.S3(Object.assign({ useAccelerateEndpoint: this.config.options.awsAcceleration === 'on' }, options)); | ||
} | ||
async sessionedSQS() { | ||
async sessionedSQS(options = {}) { | ||
if (!this.sessionManager) { | ||
@@ -75,3 +72,3 @@ this.sessionManager = this.initSessionManager(); | ||
await this.sessionManager.session(); | ||
return new aws_sdk_1.default.SQS(); | ||
return new aws_sdk_1.default.SQS(options); | ||
} | ||
@@ -420,6 +417,2 @@ async deleteMessage(message) { | ||
async stopUpload() { | ||
for (const inProgressUpload of this.uploadsInProgress) { | ||
inProgressUpload.abort(); | ||
} | ||
this.uploadsInProgress = []; | ||
super.stopUpload(); | ||
@@ -504,45 +497,2 @@ this.log.debug('clearing split files'); | ||
} | ||
async uploadJob(file) { | ||
// Initiate file upload to S3 | ||
var _a; | ||
let file2 = null; | ||
let errorMsg; | ||
try { | ||
this.log.info(`upload: ${file.id} starting`); | ||
file2 = await this.uploadHandler(file); | ||
this.log.info(`upload: ${file2.id} uploaded and notified`); | ||
} | ||
catch (err) { | ||
errorMsg = err; | ||
this.log.error(`upload: ${file.id} done, but failed: ${String(errorMsg)}`); | ||
} | ||
if (errorMsg) { | ||
this.log.error(`uploadJob ${errorMsg}`); | ||
if (!this.states.upload.failure) { | ||
this.states.upload.failure = {}; | ||
} | ||
this.states.upload.failure[errorMsg] = this.states.upload.failure[errorMsg] | ||
? ts_runtime_typecheck_1.asNumber(this.states.upload.failure[errorMsg]) + 1 | ||
: 1; | ||
if (String(errorMsg).match(/AWS.SimpleQueueService.NonExistentQueue/)) { | ||
// FATALITY! thrown during sqs.sendMessage | ||
this.log.error(`instance stopped because of a fatal error`); | ||
return this.stopEverything(); | ||
} | ||
} | ||
else { | ||
const { bytes = 0, reads = 0, sequences = 0 } = (_a = file2 === null || file2 === void 0 ? void 0 : file2.stats) !== null && _a !== void 0 ? _a : {}; | ||
// this.uploadState('queueLength', 'decr', file2.stats); // this.states.upload.queueLength = this.states.upload.queueLength ? this.states.upload.queueLength - readCount : 0; | ||
this.uploadState('success', 'incr', { files: 1, bytes, reads, sequences }); | ||
// this.states.upload.success = this.states.upload.success ? this.states.upload.success + readCount : readCount; | ||
if (file2 === null || file2 === void 0 ? void 0 : file2.name) { | ||
// nb. we only count types for successful uploads | ||
const ext = path_1.default.extname(file2.name); | ||
this.uploadState('types', 'incr', { | ||
[ext]: 1, | ||
}); | ||
} | ||
} | ||
// file-by-file? | ||
} | ||
async receiveMessages(receiveMessages) { | ||
@@ -843,3 +793,3 @@ if (!receiveMessages || !receiveMessages.Messages || !receiveMessages.Messages.length) { | ||
const ext = path_1.default.extname(outputFile); | ||
const { bytes, reads, sequences } = await filestats_1.default(outputFile); | ||
const { bytes, reads, sequences } = await filestats_1.filestats(outputFile); | ||
this.downloadState('success', 'incr', { | ||
@@ -890,3 +840,2 @@ files: 1, | ||
}); | ||
/* download stream timeout in ms */ | ||
this.timers.transferTimeouts[outputFile] = transferTimeout; | ||
@@ -940,207 +889,2 @@ const updateVisibilityFunc = async () => { | ||
} | ||
async uploadHandler(file) { | ||
/** open readStream and pipe to S3.upload */ | ||
const s3 = await this.sessionedS3(); | ||
let rs; | ||
let closed = false; | ||
const mangledRelative = file.relative | ||
.replace(/^[\\/]+/, '') | ||
.replace(/\\/g, '/') | ||
.replace(/\//g, '_'); // MC-7204, MC-7206 - this needs to be unpicked in future | ||
const objectId = [ | ||
this.config.instance.bucketFolder, | ||
'component-0', | ||
mangledRelative, | ||
mangledRelative, | ||
] | ||
.join('/') | ||
.replace(/\/+/g, '/'); | ||
let timeoutHandle; | ||
const p = new Promise((resolve, reject) => { | ||
const timeoutFunc = () => { | ||
if (rs && !closed) { | ||
rs.close(); | ||
} | ||
reject(new Error(`${file.name} timed out`)); | ||
}; | ||
// timeout to ensure this completeCb *always* gets called | ||
timeoutHandle = timers_1.createTimeout((this.config.options.uploadTimeout + 5) * 1000, timeoutFunc); | ||
try { | ||
rs = fs_extra_1.default.createReadStream(file.path); | ||
rs.on('close', () => { | ||
closed = true; | ||
}); | ||
} | ||
catch (createReadStreamException) { | ||
timeoutHandle.cancel(); | ||
reject(createReadStreamException); | ||
return; | ||
} | ||
rs.on('error', (readStreamError) => { | ||
rs.close(); | ||
let errstr = 'error in upload readstream'; | ||
if (readStreamError === null || readStreamError === void 0 ? void 0 : readStreamError.message) { | ||
errstr += `: ${readStreamError.message}`; | ||
} | ||
timeoutHandle.cancel(); | ||
reject(new Error(errstr)); | ||
}); | ||
rs.on('open', async () => { | ||
var _a; | ||
const params = { | ||
Bucket: ts_runtime_typecheck_1.asString(this.config.instance.bucket), | ||
Key: objectId, | ||
Body: rs, | ||
}; | ||
const service = new aws_sdk_1.default.S3(); | ||
const options = { | ||
partSize: 10 * 1024 * 1024, | ||
queueSize: 1, | ||
service, | ||
}; | ||
if (this.config.instance.key_id) { | ||
// MC-4996 support (optional, for now) encryption | ||
params.SSEKMSKeyId = this.config.instance.key_id; | ||
params.ServerSideEncryption = 'aws:kms'; | ||
} | ||
if (file.size) { | ||
params['Content-Length'] = file.size; | ||
} | ||
this.uploadState('progress', 'incr', { | ||
total: file.size, | ||
}); | ||
let myProgress = 0; | ||
const managedUpload = s3.upload(params, options); | ||
this.uploadsInProgress.push(managedUpload); | ||
const sessionManager = this.initSessionManager([service]); | ||
sessionManager.sts_expiration = (_a = this.sessionManager) === null || _a === void 0 ? void 0 : _a.sts_expiration; // No special options here, so use the main session and don't refetch until it's expired | ||
managedUpload.on('httpUploadProgress', async (progress) => { | ||
// Breaking out here causes this.states.progress.bytes to get out of sync. | ||
// if (this.stopped) { | ||
// reject(new Error('stopped')); | ||
// return; | ||
// } | ||
// this.log.debug(`upload progress ${progress.key} ${progress.loaded} / ${progress.total}`); | ||
this.uploadState('progress', 'incr', { | ||
bytes: progress.loaded - myProgress, | ||
}); // delta since last time | ||
myProgress = progress.loaded; // store for calculating delta next iteration | ||
timeoutHandle.reset(); // MC-6789 - reset upload timeout | ||
try { | ||
await sessionManager.session(); // MC-7129 force refresh token on the MANAGED UPLOAD instance of the s3 service | ||
} | ||
catch (e) { | ||
this.log.warn(`Error refreshing token: ${String(e)}`); | ||
} | ||
}); | ||
try { | ||
await managedUpload.promise(); | ||
this.log.info(`${file.id} S3 upload complete`); | ||
rs.close(); | ||
timeoutHandle.cancel(); | ||
await this.uploadComplete(objectId, file); // send message | ||
resolve(file); | ||
} | ||
catch (uploadStreamErr) { | ||
this.log.warn(`${file.id} uploadStreamError ${uploadStreamErr}`); | ||
reject(uploadStreamErr); | ||
} | ||
finally { | ||
this.uploadState('progress', 'decr', { | ||
total: file.size, | ||
bytes: file.size, | ||
}); // zero in-flight upload counters | ||
this.uploadsInProgress = this.uploadsInProgress.filter((upload) => upload !== managedUpload); | ||
} | ||
}); | ||
// rs.on('end', rs.close); | ||
// rs.on('close', () => this.log.debug('closing readstream')); | ||
}); | ||
return p; | ||
} | ||
async uploadComplete(objectId, file) { | ||
this.log.info(`${file.id} uploaded to S3: ${objectId}`); | ||
const message = { | ||
bucket: this.config.instance.bucket, | ||
outputQueue: this.config.instance.outputQueueName, | ||
remote_addr: this.config.instance.remote_addr, | ||
apikey: this.config.options.apikey, | ||
id_workflow_instance: this.config.instance.id_workflow_instance, | ||
id_master: this.config.instance.id_workflow, | ||
utc: new Date().toISOString(), | ||
path: objectId, | ||
prefix: objectId.substring(0, objectId.lastIndexOf('/')), | ||
}; | ||
if (this.config.instance.chain) { | ||
try { | ||
message.components = JSON.parse(JSON.stringify(this.config.instance.chain.components)); // low-frills object clone | ||
message.targetComponentId = this.config.instance.chain.targetComponentId; // first component to run | ||
} | ||
catch (jsonException) { | ||
this.log.error(`${file.id} exception parsing components JSON ${String(jsonException)}`); | ||
return Promise.reject(jsonException); // close the queue job | ||
} | ||
} | ||
// MC-5943 support (optional, for now) #SSE #crypto! | ||
if (this.config.instance.key_id) { | ||
message.key_id = this.config.instance.key_id; | ||
} | ||
// MC-1304 - attach geo location and ip | ||
if (this.config.options.agent_address) { | ||
try { | ||
message.agent_address = JSON.parse(this.config.options.agent_address); | ||
} | ||
catch (exception) { | ||
this.log.error(`${file.id} Could not parse agent_address ${String(exception)}`); | ||
} | ||
} | ||
if (message.components) { | ||
const components = ts_runtime_typecheck_1.asDictionaryOf(ts_runtime_typecheck_1.isDictionary)(message.components); | ||
// optionally populate input + output queues | ||
for (const component of Object.values(components)) { | ||
switch (component === null || component === void 0 ? void 0 : component.inputQueueName) { | ||
case 'uploadMessageQueue': | ||
component.inputQueueName = this.uploadMessageQueue; | ||
break; | ||
case 'downloadMessageQueue': | ||
component.inputQueueName = this.downloadMessageQueue; | ||
break; | ||
default: | ||
// NOTE should this be a NOOP or an error | ||
break; | ||
} | ||
} | ||
} | ||
let sentMessage; | ||
try { | ||
const inputQueueURL = await this.discoverQueue(ts_runtime_typecheck_1.asOptString(this.config.instance.inputQueueName)); | ||
const sqs = await this.sessionedSQS(); | ||
this.log.info(`${file.id} sending SQS message to input queue`); | ||
sentMessage = await sqs | ||
.sendMessage({ | ||
QueueUrl: inputQueueURL, | ||
MessageBody: JSON.stringify(message), | ||
}) | ||
.promise(); | ||
} | ||
catch (sendMessageException) { | ||
this.log.error(`${file.id} exception sending SQS message: ${String(sendMessageException)}`); | ||
throw sendMessageException; | ||
} | ||
this.realtimeFeedback(`workflow_instance:state`, { | ||
type: 'start', | ||
id_workflow_instance: this.config.instance.id_workflow_instance, | ||
id_workflow: this.config.instance.id_workflow, | ||
component_id: '0', | ||
message_id: sentMessage.MessageId, | ||
id_user: this.config.instance.id_user, | ||
}).catch((e) => { | ||
this.log.warn(`realtimeFeedback failed: ${String(e)}`); | ||
}); | ||
this.log.info(`${file.id} SQS message sent. Mark as uploaded`); | ||
if (!this.db) { | ||
throw new Error('Database has not been instantiated'); | ||
} | ||
await this.db.uploadFile(file.path); | ||
} | ||
observeTelemetry() { | ||
@@ -1147,0 +891,0 @@ var _a, _b; |
@@ -17,3 +17,3 @@ "use strict"; | ||
const graphql_1 = require("./graphql"); | ||
const niceSize_1 = __importDefault(require("./niceSize")); | ||
const niceSize_1 = require("./niceSize"); | ||
const rest_1 = require("./rest"); | ||
@@ -84,3 +84,3 @@ const socket_1 = __importDefault(require("./socket")); | ||
static parseOptObject(opt) { | ||
const options = Object.assign(Object.assign({}, parseCoreOpts_1.parseCoreOpts(opt)), { region: ts_runtime_typecheck_1.asString(opt.region, default_options_json_1.default.region), sessionGrace: ts_runtime_typecheck_1.asNumber(opt.sessionGrace, default_options_json_1.default.sessionGrace), uploadTimeout: ts_runtime_typecheck_1.asNumber(opt.uploadTimeout, default_options_json_1.default.uploadTimeout), downloadTimeout: ts_runtime_typecheck_1.asNumber(opt.downloadTimeout, default_options_json_1.default.downloadTimeout), fileCheckInterval: ts_runtime_typecheck_1.asNumber(opt.fileCheckInterval, default_options_json_1.default.fileCheckInterval), downloadCheckInterval: ts_runtime_typecheck_1.asNumber(opt.downloadCheckInterval, default_options_json_1.default.downloadCheckInterval), stateCheckInterval: ts_runtime_typecheck_1.asNumber(opt.stateCheckInterval, default_options_json_1.default.stateCheckInterval), inFlightDelay: ts_runtime_typecheck_1.asNumber(opt.inFlightDelay, default_options_json_1.default.inFlightDelay), waitTimeSeconds: ts_runtime_typecheck_1.asNumber(opt.waitTimeSeconds, default_options_json_1.default.waitTimeSeconds), waitTokenError: ts_runtime_typecheck_1.asNumber(opt.waitTokenError, default_options_json_1.default.waitTokenError), transferPoolSize: ts_runtime_typecheck_1.asNumber(opt.transferPoolSize, default_options_json_1.default.transferPoolSize), downloadMode: ts_runtime_typecheck_1.asString(opt.downloadMode, default_options_json_1.default.downloadMode), filetype: ts_runtime_typecheck_1.asArrayOf(ts_runtime_typecheck_1.isString)(opt.filetype, default_options_json_1.default.filetype), sampleDirectory: ts_runtime_typecheck_1.asString(opt.sampleDirectory, default_options_json_1.default.sampleDirectory), | ||
const options = Object.assign(Object.assign({}, parseCoreOpts_1.parseCoreOpts(opt)), { region: ts_runtime_typecheck_1.asString(opt.region, default_options_json_1.default.region), sessionGrace: ts_runtime_typecheck_1.asNumber(opt.sessionGrace, default_options_json_1.default.sessionGrace), uploadTimeout: ts_runtime_typecheck_1.asNumber(opt.uploadTimeout, default_options_json_1.default.uploadTimeout), uploadRetries: ts_runtime_typecheck_1.asNumber(opt.uploadRetries, default_options_json_1.default.uploadRetries), downloadTimeout: ts_runtime_typecheck_1.asNumber(opt.downloadTimeout, default_options_json_1.default.downloadTimeout), fileCheckInterval: ts_runtime_typecheck_1.asNumber(opt.fileCheckInterval, default_options_json_1.default.fileCheckInterval), downloadCheckInterval: ts_runtime_typecheck_1.asNumber(opt.downloadCheckInterval, default_options_json_1.default.downloadCheckInterval), stateCheckInterval: ts_runtime_typecheck_1.asNumber(opt.stateCheckInterval, default_options_json_1.default.stateCheckInterval), inFlightDelay: ts_runtime_typecheck_1.asNumber(opt.inFlightDelay, default_options_json_1.default.inFlightDelay), waitTimeSeconds: ts_runtime_typecheck_1.asNumber(opt.waitTimeSeconds, default_options_json_1.default.waitTimeSeconds), waitTokenError: ts_runtime_typecheck_1.asNumber(opt.waitTokenError, default_options_json_1.default.waitTokenError), transferPoolSize: ts_runtime_typecheck_1.asNumber(opt.transferPoolSize, default_options_json_1.default.transferPoolSize), downloadMode: ts_runtime_typecheck_1.asString(opt.downloadMode, default_options_json_1.default.downloadMode), filetype: ts_runtime_typecheck_1.asArrayOf(ts_runtime_typecheck_1.isString)(opt.filetype, default_options_json_1.default.filetype), sampleDirectory: ts_runtime_typecheck_1.asString(opt.sampleDirectory, default_options_json_1.default.sampleDirectory), | ||
// optional values | ||
@@ -245,3 +245,3 @@ useGraphQL: ts_runtime_typecheck_1.asOptBoolean(opt.useGraphQL), id_workflow_instance: ts_runtime_typecheck_1.asOptIndex(opt.id_workflow_instance), debounceWindow: ts_runtime_typecheck_1.asOptNumber(opt.debounceWindow), proxy: ts_runtime_typecheck_1.asOptString(opt.proxy), | ||
try { | ||
state.success.niceReads = niceSize_1.default(this.states[direction].success.reads); | ||
state.success.niceReads = niceSize_1.niceSize(this.states[direction].success.reads); | ||
} | ||
@@ -253,3 +253,3 @@ catch (ignore) { | ||
// complete plus in-transit | ||
state.progress.niceSize = niceSize_1.default((_b = state.success.bytes + state.progress.bytes) !== null && _b !== void 0 ? _b : 0); | ||
state.progress.niceSize = niceSize_1.niceSize((_b = state.success.bytes + state.progress.bytes) !== null && _b !== void 0 ? _b : 0); | ||
} | ||
@@ -261,3 +261,3 @@ catch (ignore) { | ||
// complete | ||
state.success.niceSize = niceSize_1.default(this.states[direction].success.bytes); | ||
state.success.niceSize = niceSize_1.niceSize(this.states[direction].success.bytes); | ||
} | ||
@@ -296,3 +296,3 @@ catch (ignore) { | ||
try { | ||
state.success.niceReads = niceSize_1.default(this.states[direction].success.reads); | ||
state.success.niceReads = niceSize_1.niceSize(this.states[direction].success.reads); | ||
} | ||
@@ -304,3 +304,3 @@ catch (ignore) { | ||
// complete plus in-transit | ||
state.progress.niceSize = niceSize_1.default((_b = state.success.bytes + state.progress.bytes) !== null && _b !== void 0 ? _b : 0); | ||
state.progress.niceSize = niceSize_1.niceSize((_b = state.success.bytes + state.progress.bytes) !== null && _b !== void 0 ? _b : 0); | ||
} | ||
@@ -312,3 +312,3 @@ catch (ignore) { | ||
// complete | ||
state.success.niceSize = niceSize_1.default(this.states[direction].success.bytes); | ||
state.success.niceSize = niceSize_1.niceSize(this.states[direction].success.bytes); | ||
} | ||
@@ -315,0 +315,0 @@ catch (ignore) { |
@@ -6,8 +6,9 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const fs_extra_1 = __importDefault(require("fs-extra")); | ||
async function default_1(filePath) { | ||
return fs_extra_1.default.stat(filePath).then((d) => { | ||
exports.genericFileStatistics = void 0; | ||
const fs_1 = __importDefault(require("fs")); | ||
async function genericFileStatistics(filePath) { | ||
return fs_1.default.promises.stat(filePath).then((d) => { | ||
return { type: 'bytes', bytes: d.size }; | ||
}); | ||
} | ||
exports.default = default_1; | ||
exports.genericFileStatistics = genericFileStatistics; |
@@ -6,4 +6,5 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const fs_extra_1 = __importDefault(require("fs-extra")); | ||
function default_1(filePath) { | ||
exports.fastaFileStatistics = void 0; | ||
const fs_1 = __importDefault(require("fs")); | ||
function fastaFileStatistics(filePath) { | ||
return new Promise((resolve, reject) => { | ||
@@ -17,3 +18,4 @@ const linesPerRead = 2; | ||
try { | ||
stat = fs_extra_1.default.statSync(filePath); | ||
// TODO make this async | ||
stat = fs_1.default.statSync(filePath); | ||
} | ||
@@ -23,3 +25,3 @@ catch (e) { | ||
} | ||
fs_extra_1.default.createReadStream(filePath) | ||
fs_1.default.createReadStream(filePath) | ||
.on('data', (buffer) => { | ||
@@ -41,2 +43,2 @@ idx = -1; | ||
} | ||
exports.default = default_1; | ||
exports.fastaFileStatistics = fastaFileStatistics; |
@@ -6,4 +6,5 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const fs_extra_1 = __importDefault(require("fs-extra")); | ||
function default_1(filePath) { | ||
exports.fastqFileStatistics = void 0; | ||
const fs_1 = __importDefault(require("fs")); | ||
function fastqFileStatistics(filePath) { | ||
return new Promise((resolve, reject) => { | ||
@@ -17,3 +18,4 @@ const linesPerRead = 4; | ||
try { | ||
stat = fs_extra_1.default.statSync(filePath); | ||
// TODO make this async | ||
stat = fs_1.default.statSync(filePath); | ||
} | ||
@@ -24,3 +26,3 @@ catch (e) { | ||
} | ||
fs_extra_1.default.createReadStream(filePath) | ||
fs_1.default.createReadStream(filePath) | ||
.on('data', (buffer) => { | ||
@@ -42,2 +44,2 @@ idx = -1; | ||
} | ||
exports.default = default_1; | ||
exports.fastqFileStatistics = fastqFileStatistics; |
@@ -6,5 +6,6 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const fs_extra_1 = __importDefault(require("fs-extra")); | ||
exports.fastqgzFileStatistics = void 0; | ||
const fs_1 = __importDefault(require("fs")); | ||
const zlib_1 = require("zlib"); | ||
function default_1(filePath) { | ||
function fastqgzFileStatistics(filePath) { | ||
return new Promise((resolve, reject) => { | ||
@@ -18,3 +19,4 @@ const linesPerRead = 4; | ||
try { | ||
stat = fs_extra_1.default.statSync(filePath); | ||
// TODO make this async | ||
stat = fs_1.default.statSync(filePath); | ||
} | ||
@@ -26,3 +28,3 @@ catch (e) { | ||
const gunzip = zlib_1.createGunzip(); | ||
fs_extra_1.default.createReadStream(filePath) | ||
fs_1.default.createReadStream(filePath) | ||
.pipe(gunzip) | ||
@@ -45,2 +47,2 @@ .on('data', (buffer) => { | ||
} | ||
exports.default = default_1; | ||
exports.fastqgzFileStatistics = fastqgzFileStatistics; |
@@ -6,3 +6,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.addWarning = exports.skipFile = exports.processFile = exports.readSettings = exports.createFileScanner = exports.instantiateFileUpload = void 0; | ||
exports.uploadFile = exports.constructUploadParameters = exports.openReadStream = exports.addFailure = exports.uploadJob = exports.addFileWarning = exports.addWarning = exports.skipFile = exports.processFile = exports.readSettings = exports.createFileScanner = exports.instantiateFileUpload = void 0; | ||
const inputScanner_1 = require("./inputScanner"); | ||
@@ -14,7 +14,8 @@ const queue_1 = require("./queue"); | ||
const fastq_1 = require("./splitters/fastq"); | ||
const filestats_1 = __importDefault(require("./filestats")); | ||
const niceSize_1 = __importDefault(require("./niceSize")); | ||
const filestats_1 = require("./filestats"); | ||
const niceSize_1 = require("./niceSize"); | ||
const ts_runtime_typecheck_1 = require("ts-runtime-typecheck"); | ||
const operators_1 = require("rxjs/operators"); | ||
const path_1 = __importDefault(require("path")); | ||
const fs_1 = __importDefault(require("fs")); | ||
function instantiateFileUpload(instance) { | ||
@@ -24,3 +25,3 @@ ts_runtime_typecheck_1.assertDefined(instance.config.workflow, 'Workflow'); | ||
const workflow = instance.config.workflow; | ||
const settings = readSettings(workflow); | ||
const settings = readSettings(instance.config); | ||
const hasStorageAccount = 'storage_account' in workflow; | ||
@@ -32,11 +33,5 @@ if (settings.requiresStorage && !hasStorageAccount) { | ||
const { inputFolders, outputFolder, filetype: filetypes } = instance.config.options; | ||
const scanner = createFileScanner({ | ||
database, | ||
inputFolders, | ||
filetypes, | ||
outputFolder, | ||
}); | ||
const warnings = instance.states.warnings; | ||
const state = instance.states.upload; | ||
const ctx = { | ||
const { warnings, upload: state } = instance.states; | ||
const stopped$ = instance.uploadStopped$; | ||
const context = { | ||
settings, | ||
@@ -48,29 +43,35 @@ warnings, | ||
logger, | ||
stopped$, | ||
}; | ||
const { add: queueFile, empty$ } = queue_1.createQueue({}, async (file) => processFile(ctx, file)); | ||
const scanner = createFileScanner({ | ||
database, | ||
inputFolders, | ||
filetypes, | ||
outputFolder, | ||
context, | ||
}); | ||
let running = true; | ||
stopped$.subscribe(() => { | ||
running = false; | ||
}); | ||
const { add: queueFile, empty$ } = queue_1.createQueue({}, async (file) => { | ||
if (running) { | ||
await processFile(context, file); | ||
} | ||
}); | ||
const uploadInterval = instance.config.options.fileCheckInterval * 1000; | ||
const queueEmpty = () => empty$.pipe(operators_1.first()).toPromise(); | ||
const stopped$ = instance.uploadStopped$; | ||
return async () => { | ||
let running = true; | ||
stopped$.subscribe(() => { | ||
running = false; | ||
}); | ||
while (running) { | ||
const startTime = Date.now(); | ||
try { | ||
const files = await scanner(); | ||
// NOTE queueEmpty will never resolve if nothing is queued | ||
if (files.length > 0) { | ||
for (const file of files) { | ||
queueFile(file); | ||
} | ||
// NOTE errors that occur in the queue are swallowed | ||
await queueEmpty(); | ||
// NOTE scanner errors are handled inside createFileScanner | ||
const files = await scanner(); | ||
// NOTE queueEmpty will never resolve if nothing is queued | ||
if (files.length > 0) { | ||
for (const file of files) { | ||
queueFile(file); | ||
} | ||
// NOTE errors that occur in the queue are swallowed | ||
await queueEmpty(); | ||
} | ||
catch (err) { | ||
// NOTE err is only ever from the scanner | ||
// TODO add warning about FS error | ||
} | ||
const deltaTime = Date.now() - startTime; | ||
@@ -83,3 +84,3 @@ const delay = Math.max(0, uploadInterval - deltaTime); | ||
exports.instantiateFileUpload = instantiateFileUpload; | ||
function createFileScanner({ database, inputFolders, outputFolder, filetypes, }) { | ||
function createFileScanner({ database, inputFolders, outputFolder, filetypes, context, }) { | ||
const filter = async (location) => { | ||
@@ -89,2 +90,5 @@ const exists = await database.seenUpload(location); | ||
}; | ||
const errorHandler = (err) => { | ||
addWarning(context, fileUploader_type_1.UploadWarnings.SCAN_FAIL, err + ''); | ||
}; | ||
const options = { | ||
@@ -95,2 +99,3 @@ inputFolders, | ||
filter, | ||
errorHandler, | ||
}; | ||
@@ -100,4 +105,7 @@ return () => inputScanner_1.loadInputFiles(options); | ||
exports.createFileScanner = createFileScanner; | ||
function readSettings(workflow) { | ||
function readSettings({ options, instance: workflowInstance, workflow, }) { | ||
var _a, _b; | ||
ts_runtime_typecheck_1.assertDefined(workflow); | ||
ts_runtime_typecheck_1.assertDefined(workflowInstance.bucket, 'workflowInstance.bucket'); | ||
ts_runtime_typecheck_1.assertDefined(workflowInstance.bucketFolder, 'workflowInstance.bucketFolder'); | ||
const settings = { | ||
@@ -107,2 +115,6 @@ maxFiles: Infinity, | ||
requiresStorage: false, | ||
bucket: workflowInstance.bucket, | ||
bucketFolder: workflowInstance.bucketFolder, | ||
sseKeyId: workflowInstance.key_id, | ||
retries: options.uploadRetries, | ||
}; | ||
@@ -157,7 +169,7 @@ const workflowAttributes = ts_runtime_typecheck_1.asOptDictionary(workflow.workflowAttributes || workflow.workflow_attributes); | ||
if (settings.maxFiles <= state.filesCount) { | ||
await skipFile(file, ctx, fileUploader_type_1.UploadWarnings.TOO_MANY); | ||
await skipFile(file, ctx, fileUploader_type_1.FileUploadWarnings.TOO_MANY); | ||
return; | ||
} | ||
if (file.size === 0) { | ||
await skipFile(file, ctx, fileUploader_type_1.UploadWarnings.EMPTY); | ||
await skipFile(file, ctx, fileUploader_type_1.FileUploadWarnings.EMPTY); | ||
return; | ||
@@ -170,3 +182,3 @@ } | ||
if (canSplit && shouldSplit && ts_runtime_typecheck_1.isDefined(split)) { | ||
addWarning(file, ctx, fileUploader_type_1.UploadWarnings.SPLIT); | ||
addFileWarning(file, ctx, fileUploader_type_1.FileUploadWarnings.SPLIT); | ||
const isCompressed = /\.gz$/i.test(file.path); | ||
@@ -181,4 +193,2 @@ const directory = path_1.default.dirname(file.relative); | ||
const id = `${file.id}_${chunkId}`; | ||
const stats = await filestats_1.default(chunkFile); | ||
const { bytes: size } = stats; | ||
const chunkFilestat = { | ||
@@ -189,7 +199,5 @@ name, | ||
id, | ||
size, | ||
size: 0, | ||
}; | ||
// TODO this should retry if it fails... but only for some reasons | ||
// await uploadFile(chunkFilestat, stats, ctx); | ||
await ctx.instance.uploadJob(Object.assign(Object.assign({}, chunkFilestat), { stats })); | ||
await uploadJob(ctx, chunkFilestat); | ||
await database.splitDone(chunkFile); | ||
@@ -204,9 +212,6 @@ }, isCompressed); | ||
if (file.size > settings.maxFileSize) { | ||
await skipFile(file, ctx, fileUploader_type_1.UploadWarnings.TOO_BIG); | ||
await skipFile(file, ctx, fileUploader_type_1.FileUploadWarnings.TOO_BIG); | ||
return; | ||
} | ||
const stats = await filestats_1.default(file.path); | ||
// TODO this should retry if it fails... but only for some reasons | ||
// await uploadFile(file, stats, ctx); | ||
await ctx.instance.uploadJob(Object.assign(Object.assign({}, file), { stats })); | ||
await uploadJob(ctx, file); | ||
state.filesCount += 1; | ||
@@ -216,7 +221,19 @@ } | ||
async function skipFile(file, ctx, warn) { | ||
addWarning(file, ctx, warn); | ||
addFileWarning(file, ctx, warn); | ||
await ctx.database.skipFile(file.path); | ||
} | ||
exports.skipFile = skipFile; | ||
function addWarning(file, ctx, warn) { | ||
function addWarning(ctx, warn, msg) { | ||
const { logger, warnings } = ctx; | ||
let type; | ||
switch (warn) { | ||
case fileUploader_type_1.UploadWarnings.SCAN_FAIL: | ||
type = 'WARNING_SCAN_FAIL'; | ||
break; | ||
} | ||
logger.error(msg); | ||
warnings.push({ msg, type }); | ||
} | ||
exports.addWarning = addWarning; | ||
function addFileWarning(file, ctx, warn) { | ||
var _a, _b; | ||
@@ -227,22 +244,264 @@ const { settings, logger, warnings } = ctx; | ||
switch (warn) { | ||
case fileUploader_type_1.UploadWarnings.EMPTY: | ||
case fileUploader_type_1.FileUploadWarnings.EMPTY: | ||
type = 'WARNING_FILE_EMPTY'; | ||
msg = `The file ${file.relative} is empty. It will be skipped.`; | ||
break; | ||
case fileUploader_type_1.UploadWarnings.SPLIT: | ||
case fileUploader_type_1.FileUploadWarnings.SPLIT: | ||
type = 'WARNING_FILE_SPLIT'; | ||
msg = `${file.relative}${file.size > splitSize ? ' is too big and' : ''} is going to be split`; | ||
break; | ||
case fileUploader_type_1.UploadWarnings.TOO_BIG: | ||
case fileUploader_type_1.FileUploadWarnings.TOO_BIG: | ||
type = 'WARNING_FILE_TOO_BIG'; | ||
msg = `The file ${file.relative} is bigger than the maximum size limit (${niceSize_1.default(settings.maxFileSize)}B). It will be skipped.`; | ||
msg = `The file ${file.relative} is bigger than the maximum size limit (${niceSize_1.niceSize(settings.maxFileSize)}B). It will be skipped.`; | ||
break; | ||
case fileUploader_type_1.UploadWarnings.TOO_MANY: | ||
case fileUploader_type_1.FileUploadWarnings.TOO_MANY: | ||
type = 'WARNING_FILE_TOO_MANY'; | ||
msg = `Maximum ${settings.maxFiles} file(s) already uploaded. Marking ${file.relative} as skipped.`; | ||
break; | ||
case fileUploader_type_1.FileUploadWarnings.UPLOAD_FAILED: | ||
type = 'WARNING_FILE_UPLOAD_FAILED'; | ||
msg = `Uploading ${file.relative} failed.`; | ||
break; | ||
case fileUploader_type_1.FileUploadWarnings.UPLOAD_RETRIES_EXCEEDED: | ||
type = 'WARNING_FILE_UPLOAD_RETRIES_EXCEEDED'; | ||
msg = `Exceeded maximum retries uploading ${file.relative}. This file will not be uploaded.`; | ||
break; | ||
case fileUploader_type_1.FileUploadWarnings.MESSAGE_RETRIES_EXCEEDED: | ||
type = 'WARNING_FILE_UPLOAD_MESSAGE_RETRIES_EXCEEDED'; | ||
msg = `Exceeded maximum retries adding ${file.relative} to the queue. This file will not be processed.`; | ||
break; | ||
} | ||
logger.error(msg); | ||
logger.warn(msg); | ||
warnings.push({ msg, type }); | ||
} | ||
exports.addWarning = addWarning; | ||
exports.addFileWarning = addFileWarning; | ||
async function uploadJob(ctx, file) { | ||
await uploadFile(file, await filestats_1.filestats(file.path), ctx); | ||
} | ||
exports.uploadJob = uploadJob; | ||
function addFailure(state, msg) { | ||
var _a; | ||
if (!state.failure) { | ||
state.failure = {}; | ||
} | ||
state.failure[msg] = ((_a = state.failure[msg]) !== null && _a !== void 0 ? _a : 0) + 1; | ||
} | ||
exports.addFailure = addFailure; | ||
function openReadStream(location, handler) { | ||
const rs = fs_1.default.createReadStream(location); | ||
return new Promise((resolve, reject) => { | ||
rs.addListener('open', async () => { | ||
try { | ||
await handler(rs); | ||
resolve(); | ||
} | ||
catch (err) { | ||
rs.close(); // ensure the stream is closed if we have an error in the handler | ||
reject(err); | ||
} | ||
}); | ||
rs.addListener('error', (err) => reject(`Upload filesystem error ${err}`)); | ||
}); | ||
} | ||
exports.openReadStream = openReadStream; | ||
function constructUploadParameters(ctx, file, rs) { | ||
const { settings: { bucket, sseKeyId, bucketFolder }, } = ctx; | ||
const mangledRelative = file.relative | ||
.replace(/^[\\/]+/, '') | ||
.replace(/\\/g, '/') | ||
.replace(/\//g, '_'); // MC-7204, MC-7206 - this needs to be unpicked in future | ||
const key = [ | ||
bucketFolder, | ||
'component-0', | ||
mangledRelative, | ||
mangledRelative, | ||
] | ||
.join('/') | ||
.replace(/\/+/g, '/'); | ||
const params = { | ||
Bucket: bucket, | ||
Key: key, | ||
Body: rs, | ||
}; | ||
if (sseKeyId) { | ||
// MC-4996 support (optional, for now) encryption | ||
params.SSEKMSKeyId = sseKeyId; | ||
params.ServerSideEncryption = 'aws:kms'; | ||
} | ||
if (file.size) { | ||
params.ContentLength = file.size; | ||
} | ||
return params; | ||
} | ||
exports.constructUploadParameters = constructUploadParameters; | ||
async function uploadFile(file, stats, ctx) { | ||
const { state, instance, logger, stopped$ } = ctx; | ||
try { | ||
const timeout = (instance.config.options.uploadTimeout + 5) * 1000; | ||
const s3 = await instance.sessionedS3({ | ||
retryDelayOptions: { | ||
customBackoff(count, err) { | ||
addFileWarning(file, ctx, fileUploader_type_1.FileUploadWarnings.UPLOAD_FAILED); | ||
ctx.logger.error('Upload error', err); | ||
if (count > ctx.settings.retries) { | ||
addFileWarning(file, ctx, fileUploader_type_1.FileUploadWarnings.UPLOAD_RETRIES_EXCEEDED); | ||
return -1; | ||
} | ||
return 2 ** count * 1000; // 2s, 4s, 8s, 16s, 32s | ||
}, | ||
}, | ||
maxRetries: ctx.settings.retries, | ||
httpOptions: { | ||
timeout, | ||
}, | ||
}); | ||
await openReadStream(file.path, async (rs) => { | ||
var _a; | ||
const params = constructUploadParameters(ctx, file, rs); | ||
const options = { | ||
partSize: 10 * 1024 * 1024, | ||
queueSize: 1, | ||
}; | ||
instance.uploadState('progress', 'incr', { | ||
total: file.size, | ||
}); | ||
const managedUpload = s3.upload(params, options); | ||
const subscription = stopped$.subscribe(() => { | ||
managedUpload.abort(); | ||
}); | ||
const sessionManager = instance.initSessionManager([s3]); | ||
sessionManager.sts_expiration = (_a = instance.sessionManager) === null || _a === void 0 ? void 0 : _a.sts_expiration; // No special options here, so use the main session and don't refetch until it's expired | ||
let currentProgress = 0; | ||
managedUpload.on('httpUploadProgress', async (progress) => { | ||
const progressDelta = progress.loaded - currentProgress; | ||
instance.uploadState('progress', 'incr', { | ||
bytes: progressDelta, | ||
}); // delta since last time | ||
currentProgress = progress.loaded; // store for calculating delta next iteration | ||
try { | ||
await sessionManager.session(); // MC-7129 force refresh token on the MANAGED UPLOAD instance of the s3 service | ||
} | ||
catch (e) { | ||
logger.warn(`Error refreshing token: ${String(e)}`); | ||
} | ||
}); | ||
try { | ||
await managedUpload.promise(); | ||
await uploadComplete(ctx, params.Key, file); // send message | ||
} | ||
finally { | ||
instance.uploadState('progress', 'decr', { | ||
total: file.size, | ||
bytes: currentProgress, | ||
}); // zero in-flight upload counters | ||
subscription.unsubscribe(); | ||
} | ||
}); | ||
const { bytes = 0, reads = 0, sequences = 0 } = stats !== null && stats !== void 0 ? stats : {}; | ||
instance.uploadState('success', 'incr', { files: 1, bytes, reads, sequences }); | ||
const ext = path_1.default.extname(file.name); | ||
instance.uploadState('types', 'incr', { | ||
[ext]: 1, | ||
}); | ||
} | ||
catch (err) { | ||
addFailure(state, err + ''); | ||
throw err; | ||
} | ||
} | ||
exports.uploadFile = uploadFile; | ||
function createMessage(instance, objectId) { | ||
const workflowInstance = instance.config.instance; | ||
const message = { | ||
bucket: workflowInstance.bucket, | ||
outputQueue: workflowInstance.outputQueueName, | ||
remote_addr: workflowInstance.remote_addr, | ||
apikey: instance.config.options.apikey, | ||
id_workflow_instance: workflowInstance.id_workflow_instance, | ||
id_master: workflowInstance.id_workflow, | ||
utc: new Date().toISOString(), | ||
path: objectId, | ||
prefix: objectId.substring(0, objectId.lastIndexOf('/')), | ||
key_id: workflowInstance.key_id, | ||
}; | ||
if (workflowInstance.chain) { | ||
let components; | ||
if (!ts_runtime_typecheck_1.isDictionaryOf(ts_runtime_typecheck_1.isDictionary)(workflowInstance.chain.components)) { | ||
throw new Error('Unexpected chain definition'); | ||
} | ||
try { | ||
components = JSON.parse(JSON.stringify(workflowInstance.chain.components)); // low-frills object clone | ||
} | ||
catch (_a) { | ||
throw new Error(`Failed to clone workflow chain`); | ||
} | ||
for (const component of Object.values(components)) { | ||
switch (component === null || component === void 0 ? void 0 : component.inputQueueName) { | ||
case 'uploadMessageQueue': | ||
component.inputQueueName = instance.uploadMessageQueue; | ||
break; | ||
case 'downloadMessageQueue': | ||
component.inputQueueName = instance.downloadMessageQueue; | ||
break; | ||
default: | ||
// NOTE should this be a NOOP or an error | ||
break; | ||
} | ||
} | ||
message.components = components; | ||
message.targetComponentId = workflowInstance.chain.targetComponentId; | ||
} | ||
return message; | ||
} | ||
async function messageInputQueue(ctx, objectId, file) { | ||
const { instance, logger } = ctx; | ||
const message = createMessage(instance, objectId); | ||
try { | ||
const inputQueueURL = await instance.discoverQueue(ts_runtime_typecheck_1.asOptString(instance.config.instance.inputQueueName)); | ||
const sqs = await instance.sessionedSQS({ | ||
retryDelayOptions: { | ||
customBackoff(count, err) { | ||
ctx.logger.error('Upload message error', err); | ||
if (count > ctx.settings.retries) { | ||
addFileWarning(file, ctx, fileUploader_type_1.FileUploadWarnings.MESSAGE_RETRIES_EXCEEDED); | ||
return -1; | ||
} | ||
return 2 ** count * 1000; // 2s, 4s, 8s, 16s, 32s | ||
}, | ||
}, | ||
maxRetries: ctx.settings.retries, | ||
}); | ||
logger.info(`${file.id} sending SQS message to input queue`); | ||
const { MessageId } = await sqs | ||
.sendMessage({ | ||
QueueUrl: inputQueueURL, | ||
MessageBody: JSON.stringify(message), | ||
}) | ||
.promise(); | ||
return MessageId; | ||
} | ||
catch (sendMessageException) { | ||
logger.error(`${file.id} exception sending SQS message: ${String(sendMessageException)}`); | ||
throw sendMessageException; | ||
} | ||
} | ||
async function uploadComplete(ctx, objectId, file) { | ||
const { instance, database, logger } = ctx; | ||
logger.info(`${file.id} uploaded to S3: ${objectId}`); | ||
const messageId = await messageInputQueue(ctx, objectId, file); | ||
const workflowInstance = instance.config.instance; | ||
instance | ||
.realtimeFeedback(`workflow_instance:state`, { | ||
type: 'start', | ||
id_workflow_instance: workflowInstance.id_workflow_instance, | ||
id_workflow: workflowInstance.id_workflow, | ||
component_id: '0', | ||
message_id: messageId, | ||
id_user: workflowInstance.id_user, | ||
}) | ||
.catch((e) => { | ||
logger.warn(`realtimeFeedback failed: ${String(e)}`); | ||
}); | ||
logger.info(`${file.id} SQS message sent. Mark as uploaded`); | ||
await database.uploadFile(file.path); | ||
} |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.UploadWarnings = void 0; | ||
exports.UploadWarnings = exports.FileUploadWarnings = void 0; | ||
var FileUploadWarnings; | ||
(function (FileUploadWarnings) { | ||
FileUploadWarnings[FileUploadWarnings["TOO_MANY"] = 0] = "TOO_MANY"; | ||
FileUploadWarnings[FileUploadWarnings["EMPTY"] = 1] = "EMPTY"; | ||
FileUploadWarnings[FileUploadWarnings["TOO_BIG"] = 2] = "TOO_BIG"; | ||
FileUploadWarnings[FileUploadWarnings["SPLIT"] = 3] = "SPLIT"; | ||
FileUploadWarnings[FileUploadWarnings["UPLOAD_FAILED"] = 4] = "UPLOAD_FAILED"; | ||
FileUploadWarnings[FileUploadWarnings["UPLOAD_RETRIES_EXCEEDED"] = 5] = "UPLOAD_RETRIES_EXCEEDED"; | ||
FileUploadWarnings[FileUploadWarnings["MESSAGE_RETRIES_EXCEEDED"] = 6] = "MESSAGE_RETRIES_EXCEEDED"; | ||
})(FileUploadWarnings = exports.FileUploadWarnings || (exports.FileUploadWarnings = {})); | ||
var UploadWarnings; | ||
(function (UploadWarnings) { | ||
UploadWarnings[UploadWarnings["TOO_MANY"] = 0] = "TOO_MANY"; | ||
UploadWarnings[UploadWarnings["EMPTY"] = 1] = "EMPTY"; | ||
UploadWarnings[UploadWarnings["TOO_BIG"] = 2] = "TOO_BIG"; | ||
UploadWarnings[UploadWarnings["SPLIT"] = 3] = "SPLIT"; | ||
UploadWarnings[UploadWarnings["SCAN_FAIL"] = 0] = "SCAN_FAIL"; | ||
})(UploadWarnings = exports.UploadWarnings || (exports.UploadWarnings = {})); |
@@ -35,3 +35,3 @@ "use strict"; | ||
*/ | ||
async function loadInputFiles({ inputFolders, outputFolder, filetypes, filter, }) { | ||
async function loadInputFiles({ inputFolders, outputFolder, filetypes, filter, errorHandler, }) { | ||
const extensionFilter = createExtensionFilter(filetypes !== null && filetypes !== void 0 ? filetypes : ''); | ||
@@ -59,2 +59,3 @@ const outputBasename = outputFolder ? path_1.basename(outputFolder) : null; | ||
}, | ||
catch: errorHandler, | ||
}); | ||
@@ -61,0 +62,0 @@ const results = []; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const niceSize = (sizeIn, unitIndexIn) => { | ||
exports.niceSize = void 0; | ||
function niceSize(sizeIn, unitIndexIn) { | ||
const UNITS = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']; | ||
@@ -17,3 +18,3 @@ const DIV = 1000; | ||
return `${size.toFixed(1)}${UNITS[unitIndex]}`; | ||
}; | ||
exports.default = niceSize; | ||
} | ||
exports.niceSize = niceSize; |
@@ -32,3 +32,3 @@ "use strict"; | ||
const result = await this.graphQL.instanceToken(instanceTokenOptions); | ||
token = ts_runtime_typecheck_1.asDictionary((_a = result.data) === null || _a === void 0 ? void 0 : _a.token); | ||
token = ts_runtime_typecheck_1.asDefined((_a = result.data) === null || _a === void 0 ? void 0 : _a.token); | ||
} | ||
@@ -43,4 +43,17 @@ else { | ||
this.sts_expiration = new Date(token.expiration).getTime() - 60 * ts_runtime_typecheck_1.makeNumber((_b = this.options.sessionGrace) !== null && _b !== void 0 ? _b : '0'); // refresh token x mins before it expires | ||
const configUpdate = token; | ||
let credentials = null; | ||
if (ts_runtime_typecheck_1.isDefined(token.accessKeyId) && ts_runtime_typecheck_1.isDefined(token.secretAccessKey)) { | ||
credentials = { | ||
accessKeyId: token.accessKeyId, | ||
secretAccessKey: token.secretAccessKey, | ||
sessionToken: ts_runtime_typecheck_1.asOptString(token.sessionToken), | ||
}; | ||
} | ||
const configUpdate = { | ||
credentials, | ||
region: ts_runtime_typecheck_1.asOptString(token.region), | ||
}; | ||
if (this.options.proxy) { | ||
// NOTE AWS SDK explicitly does a deep merge on httpOptions | ||
// so this won't squash any options that have already been set | ||
configUpdate.httpOptions = { | ||
@@ -47,0 +60,0 @@ agent: proxy_agent_1.default(this.options.proxy), |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.sleep = exports.createTimeout$ = exports.createTimeout = exports.createInterval = void 0; | ||
const rxjs_1 = require("rxjs"); | ||
exports.sleep = exports.createTimeout = exports.createInterval = void 0; | ||
/* | ||
NOTE this exists mostly because TS is having a really hard time to decide | ||
NOTE this partially exists because TS is having a really hard time to decide | ||
which version the timer functions we are using (the return type of Node.js | ||
@@ -37,18 +36,2 @@ timers is an object not a number) and in some cases appears to be using | ||
exports.createTimeout = createTimeout; | ||
function createTimeout$(duration) { | ||
const timeout$ = new rxjs_1.Subject(); | ||
const cb = () => timeout$.complete(); | ||
let id = setTimeout(cb, duration); | ||
return { | ||
cancel() { | ||
clearTimeout(id); | ||
}, | ||
timeout$, | ||
reset(newDuration = duration) { | ||
clearTimeout(id); | ||
id = setTimeout(cb, newDuration); | ||
}, | ||
}; | ||
} | ||
exports.createTimeout$ = createTimeout$; | ||
function sleep(duration) { | ||
@@ -55,0 +38,0 @@ return new Promise((resolve) => { |
@@ -8,2 +8,3 @@ { | ||
"uploadTimeout": 1200, | ||
"uploadRetries": 10, | ||
"downloadTimeout": 1200, | ||
@@ -10,0 +11,0 @@ "fileCheckInterval": 5, |
@@ -8,3 +8,3 @@ /* eslint no-console: ["error", { allow: ["log", "info", "debug", "warn", "error"] }] */ | ||
*/ | ||
import { asOptString, asString, asNumber, asDictionary, asOptIndex, asOptDictionary, asIndex, asDefined, asOptFunction, asOptNumber, asDictionaryOf, makeString, isString, isUndefined, asOptDictionaryOf, isDictionaryOf, isDictionary, isArrayOf, } from 'ts-runtime-typecheck'; | ||
import { asOptString, asString, asNumber, asDictionary, asOptIndex, asOptDictionary, asIndex, asDefined, asOptFunction, asOptNumber, makeString, isString, isUndefined, asOptDictionaryOf, isDictionaryOf, isArrayOf, } from 'ts-runtime-typecheck'; | ||
import AWS from 'aws-sdk'; | ||
@@ -17,3 +17,2 @@ import fs from 'fs-extra'; /* MC-565 handle EMFILE & EXDIR gracefully; use Promises */ | ||
import { Factory } from './factory'; | ||
import filestats from './filestats'; | ||
import { REST_FS } from './rest-fs'; | ||
@@ -31,2 +30,3 @@ import { SampleReader } from './sample-reader'; | ||
import { createQueue } from './queue'; | ||
import { filestats } from './filestats'; | ||
import { instantiateFileUpload } from './fileUploader'; | ||
@@ -52,5 +52,4 @@ const networkStreamErrors = new WeakSet(); | ||
this.SampleReader = new SampleReader(); | ||
this.uploadsInProgress = []; | ||
} | ||
async sessionedS3() { | ||
async sessionedS3(options = {}) { | ||
if (!this.sessionManager) { | ||
@@ -60,7 +59,5 @@ this.sessionManager = this.initSessionManager(); | ||
await this.sessionManager.session(); | ||
return new AWS.S3({ | ||
useAccelerateEndpoint: this.config.options.awsAcceleration === 'on', | ||
}); | ||
return new AWS.S3(Object.assign({ useAccelerateEndpoint: this.config.options.awsAcceleration === 'on' }, options)); | ||
} | ||
async sessionedSQS() { | ||
async sessionedSQS(options = {}) { | ||
if (!this.sessionManager) { | ||
@@ -70,3 +67,3 @@ this.sessionManager = this.initSessionManager(); | ||
await this.sessionManager.session(); | ||
return new AWS.SQS(); | ||
return new AWS.SQS(options); | ||
} | ||
@@ -415,6 +412,2 @@ async deleteMessage(message) { | ||
async stopUpload() { | ||
for (const inProgressUpload of this.uploadsInProgress) { | ||
inProgressUpload.abort(); | ||
} | ||
this.uploadsInProgress = []; | ||
super.stopUpload(); | ||
@@ -499,45 +492,2 @@ this.log.debug('clearing split files'); | ||
} | ||
async uploadJob(file) { | ||
// Initiate file upload to S3 | ||
var _a; | ||
let file2 = null; | ||
let errorMsg; | ||
try { | ||
this.log.info(`upload: ${file.id} starting`); | ||
file2 = await this.uploadHandler(file); | ||
this.log.info(`upload: ${file2.id} uploaded and notified`); | ||
} | ||
catch (err) { | ||
errorMsg = err; | ||
this.log.error(`upload: ${file.id} done, but failed: ${String(errorMsg)}`); | ||
} | ||
if (errorMsg) { | ||
this.log.error(`uploadJob ${errorMsg}`); | ||
if (!this.states.upload.failure) { | ||
this.states.upload.failure = {}; | ||
} | ||
this.states.upload.failure[errorMsg] = this.states.upload.failure[errorMsg] | ||
? asNumber(this.states.upload.failure[errorMsg]) + 1 | ||
: 1; | ||
if (String(errorMsg).match(/AWS.SimpleQueueService.NonExistentQueue/)) { | ||
// FATALITY! thrown during sqs.sendMessage | ||
this.log.error(`instance stopped because of a fatal error`); | ||
return this.stopEverything(); | ||
} | ||
} | ||
else { | ||
const { bytes = 0, reads = 0, sequences = 0 } = (_a = file2 === null || file2 === void 0 ? void 0 : file2.stats) !== null && _a !== void 0 ? _a : {}; | ||
// this.uploadState('queueLength', 'decr', file2.stats); // this.states.upload.queueLength = this.states.upload.queueLength ? this.states.upload.queueLength - readCount : 0; | ||
this.uploadState('success', 'incr', { files: 1, bytes, reads, sequences }); | ||
// this.states.upload.success = this.states.upload.success ? this.states.upload.success + readCount : readCount; | ||
if (file2 === null || file2 === void 0 ? void 0 : file2.name) { | ||
// nb. we only count types for successful uploads | ||
const ext = path.extname(file2.name); | ||
this.uploadState('types', 'incr', { | ||
[ext]: 1, | ||
}); | ||
} | ||
} | ||
// file-by-file? | ||
} | ||
async receiveMessages(receiveMessages) { | ||
@@ -884,3 +834,2 @@ if (!receiveMessages || !receiveMessages.Messages || !receiveMessages.Messages.length) { | ||
}); | ||
/* download stream timeout in ms */ | ||
this.timers.transferTimeouts[outputFile] = transferTimeout; | ||
@@ -934,207 +883,2 @@ const updateVisibilityFunc = async () => { | ||
} | ||
async uploadHandler(file) { | ||
/** open readStream and pipe to S3.upload */ | ||
const s3 = await this.sessionedS3(); | ||
let rs; | ||
let closed = false; | ||
const mangledRelative = file.relative | ||
.replace(/^[\\/]+/, '') | ||
.replace(/\\/g, '/') | ||
.replace(/\//g, '_'); // MC-7204, MC-7206 - this needs to be unpicked in future | ||
const objectId = [ | ||
this.config.instance.bucketFolder, | ||
'component-0', | ||
mangledRelative, | ||
mangledRelative, | ||
] | ||
.join('/') | ||
.replace(/\/+/g, '/'); | ||
let timeoutHandle; | ||
const p = new Promise((resolve, reject) => { | ||
const timeoutFunc = () => { | ||
if (rs && !closed) { | ||
rs.close(); | ||
} | ||
reject(new Error(`${file.name} timed out`)); | ||
}; | ||
// timeout to ensure this completeCb *always* gets called | ||
timeoutHandle = createTimeout((this.config.options.uploadTimeout + 5) * 1000, timeoutFunc); | ||
try { | ||
rs = fs.createReadStream(file.path); | ||
rs.on('close', () => { | ||
closed = true; | ||
}); | ||
} | ||
catch (createReadStreamException) { | ||
timeoutHandle.cancel(); | ||
reject(createReadStreamException); | ||
return; | ||
} | ||
rs.on('error', (readStreamError) => { | ||
rs.close(); | ||
let errstr = 'error in upload readstream'; | ||
if (readStreamError === null || readStreamError === void 0 ? void 0 : readStreamError.message) { | ||
errstr += `: ${readStreamError.message}`; | ||
} | ||
timeoutHandle.cancel(); | ||
reject(new Error(errstr)); | ||
}); | ||
rs.on('open', async () => { | ||
var _a; | ||
const params = { | ||
Bucket: asString(this.config.instance.bucket), | ||
Key: objectId, | ||
Body: rs, | ||
}; | ||
const service = new AWS.S3(); | ||
const options = { | ||
partSize: 10 * 1024 * 1024, | ||
queueSize: 1, | ||
service, | ||
}; | ||
if (this.config.instance.key_id) { | ||
// MC-4996 support (optional, for now) encryption | ||
params.SSEKMSKeyId = this.config.instance.key_id; | ||
params.ServerSideEncryption = 'aws:kms'; | ||
} | ||
if (file.size) { | ||
params['Content-Length'] = file.size; | ||
} | ||
this.uploadState('progress', 'incr', { | ||
total: file.size, | ||
}); | ||
let myProgress = 0; | ||
const managedUpload = s3.upload(params, options); | ||
this.uploadsInProgress.push(managedUpload); | ||
const sessionManager = this.initSessionManager([service]); | ||
sessionManager.sts_expiration = (_a = this.sessionManager) === null || _a === void 0 ? void 0 : _a.sts_expiration; // No special options here, so use the main session and don't refetch until it's expired | ||
managedUpload.on('httpUploadProgress', async (progress) => { | ||
// Breaking out here causes this.states.progress.bytes to get out of sync. | ||
// if (this.stopped) { | ||
// reject(new Error('stopped')); | ||
// return; | ||
// } | ||
// this.log.debug(`upload progress ${progress.key} ${progress.loaded} / ${progress.total}`); | ||
this.uploadState('progress', 'incr', { | ||
bytes: progress.loaded - myProgress, | ||
}); // delta since last time | ||
myProgress = progress.loaded; // store for calculating delta next iteration | ||
timeoutHandle.reset(); // MC-6789 - reset upload timeout | ||
try { | ||
await sessionManager.session(); // MC-7129 force refresh token on the MANAGED UPLOAD instance of the s3 service | ||
} | ||
catch (e) { | ||
this.log.warn(`Error refreshing token: ${String(e)}`); | ||
} | ||
}); | ||
try { | ||
await managedUpload.promise(); | ||
this.log.info(`${file.id} S3 upload complete`); | ||
rs.close(); | ||
timeoutHandle.cancel(); | ||
await this.uploadComplete(objectId, file); // send message | ||
resolve(file); | ||
} | ||
catch (uploadStreamErr) { | ||
this.log.warn(`${file.id} uploadStreamError ${uploadStreamErr}`); | ||
reject(uploadStreamErr); | ||
} | ||
finally { | ||
this.uploadState('progress', 'decr', { | ||
total: file.size, | ||
bytes: file.size, | ||
}); // zero in-flight upload counters | ||
this.uploadsInProgress = this.uploadsInProgress.filter((upload) => upload !== managedUpload); | ||
} | ||
}); | ||
// rs.on('end', rs.close); | ||
// rs.on('close', () => this.log.debug('closing readstream')); | ||
}); | ||
return p; | ||
} | ||
async uploadComplete(objectId, file) { | ||
this.log.info(`${file.id} uploaded to S3: ${objectId}`); | ||
const message = { | ||
bucket: this.config.instance.bucket, | ||
outputQueue: this.config.instance.outputQueueName, | ||
remote_addr: this.config.instance.remote_addr, | ||
apikey: this.config.options.apikey, | ||
id_workflow_instance: this.config.instance.id_workflow_instance, | ||
id_master: this.config.instance.id_workflow, | ||
utc: new Date().toISOString(), | ||
path: objectId, | ||
prefix: objectId.substring(0, objectId.lastIndexOf('/')), | ||
}; | ||
if (this.config.instance.chain) { | ||
try { | ||
message.components = JSON.parse(JSON.stringify(this.config.instance.chain.components)); // low-frills object clone | ||
message.targetComponentId = this.config.instance.chain.targetComponentId; // first component to run | ||
} | ||
catch (jsonException) { | ||
this.log.error(`${file.id} exception parsing components JSON ${String(jsonException)}`); | ||
return Promise.reject(jsonException); // close the queue job | ||
} | ||
} | ||
// MC-5943 support (optional, for now) #SSE #crypto! | ||
if (this.config.instance.key_id) { | ||
message.key_id = this.config.instance.key_id; | ||
} | ||
// MC-1304 - attach geo location and ip | ||
if (this.config.options.agent_address) { | ||
try { | ||
message.agent_address = JSON.parse(this.config.options.agent_address); | ||
} | ||
catch (exception) { | ||
this.log.error(`${file.id} Could not parse agent_address ${String(exception)}`); | ||
} | ||
} | ||
if (message.components) { | ||
const components = asDictionaryOf(isDictionary)(message.components); | ||
// optionally populate input + output queues | ||
for (const component of Object.values(components)) { | ||
switch (component === null || component === void 0 ? void 0 : component.inputQueueName) { | ||
case 'uploadMessageQueue': | ||
component.inputQueueName = this.uploadMessageQueue; | ||
break; | ||
case 'downloadMessageQueue': | ||
component.inputQueueName = this.downloadMessageQueue; | ||
break; | ||
default: | ||
// NOTE should this be a NOOP or an error | ||
break; | ||
} | ||
} | ||
} | ||
let sentMessage; | ||
try { | ||
const inputQueueURL = await this.discoverQueue(asOptString(this.config.instance.inputQueueName)); | ||
const sqs = await this.sessionedSQS(); | ||
this.log.info(`${file.id} sending SQS message to input queue`); | ||
sentMessage = await sqs | ||
.sendMessage({ | ||
QueueUrl: inputQueueURL, | ||
MessageBody: JSON.stringify(message), | ||
}) | ||
.promise(); | ||
} | ||
catch (sendMessageException) { | ||
this.log.error(`${file.id} exception sending SQS message: ${String(sendMessageException)}`); | ||
throw sendMessageException; | ||
} | ||
this.realtimeFeedback(`workflow_instance:state`, { | ||
type: 'start', | ||
id_workflow_instance: this.config.instance.id_workflow_instance, | ||
id_workflow: this.config.instance.id_workflow, | ||
component_id: '0', | ||
message_id: sentMessage.MessageId, | ||
id_user: this.config.instance.id_user, | ||
}).catch((e) => { | ||
this.log.warn(`realtimeFeedback failed: ${String(e)}`); | ||
}); | ||
this.log.info(`${file.id} SQS message sent. Mark as uploaded`); | ||
if (!this.db) { | ||
throw new Error('Database has not been instantiated'); | ||
} | ||
await this.db.uploadFile(file.path); | ||
} | ||
observeTelemetry() { | ||
@@ -1141,0 +885,0 @@ var _a, _b; |
@@ -11,3 +11,3 @@ /* eslint no-console: ["error", { allow: ["log", "info", "debug", "warn", "error"] }] */ | ||
import { GraphQL } from './graphql'; | ||
import niceSize from './niceSize'; | ||
import { niceSize } from './niceSize'; | ||
import { REST } from './rest'; | ||
@@ -78,3 +78,3 @@ import Socket from './socket'; | ||
static parseOptObject(opt) { | ||
const options = Object.assign(Object.assign({}, parseCoreOpts(opt)), { region: asString(opt.region, DEFAULTS.region), sessionGrace: asNumber(opt.sessionGrace, DEFAULTS.sessionGrace), uploadTimeout: asNumber(opt.uploadTimeout, DEFAULTS.uploadTimeout), downloadTimeout: asNumber(opt.downloadTimeout, DEFAULTS.downloadTimeout), fileCheckInterval: asNumber(opt.fileCheckInterval, DEFAULTS.fileCheckInterval), downloadCheckInterval: asNumber(opt.downloadCheckInterval, DEFAULTS.downloadCheckInterval), stateCheckInterval: asNumber(opt.stateCheckInterval, DEFAULTS.stateCheckInterval), inFlightDelay: asNumber(opt.inFlightDelay, DEFAULTS.inFlightDelay), waitTimeSeconds: asNumber(opt.waitTimeSeconds, DEFAULTS.waitTimeSeconds), waitTokenError: asNumber(opt.waitTokenError, DEFAULTS.waitTokenError), transferPoolSize: asNumber(opt.transferPoolSize, DEFAULTS.transferPoolSize), downloadMode: asString(opt.downloadMode, DEFAULTS.downloadMode), filetype: asArrayOf(isString)(opt.filetype, DEFAULTS.filetype), sampleDirectory: asString(opt.sampleDirectory, DEFAULTS.sampleDirectory), | ||
const options = Object.assign(Object.assign({}, parseCoreOpts(opt)), { region: asString(opt.region, DEFAULTS.region), sessionGrace: asNumber(opt.sessionGrace, DEFAULTS.sessionGrace), uploadTimeout: asNumber(opt.uploadTimeout, DEFAULTS.uploadTimeout), uploadRetries: asNumber(opt.uploadRetries, DEFAULTS.uploadRetries), downloadTimeout: asNumber(opt.downloadTimeout, DEFAULTS.downloadTimeout), fileCheckInterval: asNumber(opt.fileCheckInterval, DEFAULTS.fileCheckInterval), downloadCheckInterval: asNumber(opt.downloadCheckInterval, DEFAULTS.downloadCheckInterval), stateCheckInterval: asNumber(opt.stateCheckInterval, DEFAULTS.stateCheckInterval), inFlightDelay: asNumber(opt.inFlightDelay, DEFAULTS.inFlightDelay), waitTimeSeconds: asNumber(opt.waitTimeSeconds, DEFAULTS.waitTimeSeconds), waitTokenError: asNumber(opt.waitTokenError, DEFAULTS.waitTokenError), transferPoolSize: asNumber(opt.transferPoolSize, DEFAULTS.transferPoolSize), downloadMode: asString(opt.downloadMode, DEFAULTS.downloadMode), filetype: asArrayOf(isString)(opt.filetype, DEFAULTS.filetype), sampleDirectory: asString(opt.sampleDirectory, DEFAULTS.sampleDirectory), | ||
// optional values | ||
@@ -81,0 +81,0 @@ useGraphQL: asOptBoolean(opt.useGraphQL), id_workflow_instance: asOptIndex(opt.id_workflow_instance), debounceWindow: asOptNumber(opt.debounceWindow), proxy: asOptString(opt.proxy), |
@@ -1,6 +0,6 @@ | ||
import fs from 'fs-extra'; | ||
export default async function (filePath) { | ||
return fs.stat(filePath).then((d) => { | ||
import fs from 'fs'; | ||
export async function genericFileStatistics(filePath) { | ||
return fs.promises.stat(filePath).then((d) => { | ||
return { type: 'bytes', bytes: d.size }; | ||
}); | ||
} |
@@ -1,3 +0,3 @@ | ||
import fs from 'fs-extra'; | ||
export default function (filePath) { | ||
import fs from 'fs'; | ||
export function fastaFileStatistics(filePath) { | ||
return new Promise((resolve, reject) => { | ||
@@ -11,2 +11,3 @@ const linesPerRead = 2; | ||
try { | ||
// TODO make this async | ||
stat = fs.statSync(filePath); | ||
@@ -13,0 +14,0 @@ } |
@@ -1,3 +0,3 @@ | ||
import fs from 'fs-extra'; | ||
export default function (filePath) { | ||
import fs from 'fs'; | ||
export function fastqFileStatistics(filePath) { | ||
return new Promise((resolve, reject) => { | ||
@@ -11,2 +11,3 @@ const linesPerRead = 4; | ||
try { | ||
// TODO make this async | ||
stat = fs.statSync(filePath); | ||
@@ -13,0 +14,0 @@ } |
@@ -1,4 +0,4 @@ | ||
import fs from 'fs-extra'; | ||
import fs from 'fs'; | ||
import { createGunzip } from 'zlib'; | ||
export default function (filePath) { | ||
export function fastqgzFileStatistics(filePath) { | ||
return new Promise((resolve, reject) => { | ||
@@ -12,2 +12,3 @@ const linesPerRead = 4; | ||
try { | ||
// TODO make this async | ||
stat = fs.statSync(filePath); | ||
@@ -14,0 +15,0 @@ } |
@@ -5,9 +5,10 @@ import { loadInputFiles } from './inputScanner'; | ||
import { isFastq } from './file_extensions'; | ||
import { UploadWarnings } from './fileUploader.type'; | ||
import { FileUploadWarnings, UploadWarnings } from './fileUploader.type'; | ||
import { splitter } from './splitters/fastq'; | ||
import filestats from './filestats'; | ||
import niceSize from './niceSize'; | ||
import { asNumber, asOptDictionary, asOptDictionaryOf, isArray, isDefined, makeNumber, assertDefined, } from 'ts-runtime-typecheck'; | ||
import { filestats } from './filestats'; | ||
import { niceSize } from './niceSize'; | ||
import { asNumber, asOptDictionary, asOptDictionaryOf, asOptString, assertDefined, isArray, isDefined, isDictionary, isDictionaryOf, makeNumber, } from 'ts-runtime-typecheck'; | ||
import { first } from 'rxjs/operators'; | ||
import path from 'path'; | ||
import fs from 'fs'; | ||
export function instantiateFileUpload(instance) { | ||
@@ -17,3 +18,3 @@ assertDefined(instance.config.workflow, 'Workflow'); | ||
const workflow = instance.config.workflow; | ||
const settings = readSettings(workflow); | ||
const settings = readSettings(instance.config); | ||
const hasStorageAccount = 'storage_account' in workflow; | ||
@@ -25,11 +26,5 @@ if (settings.requiresStorage && !hasStorageAccount) { | ||
const { inputFolders, outputFolder, filetype: filetypes } = instance.config.options; | ||
const scanner = createFileScanner({ | ||
database, | ||
inputFolders, | ||
filetypes, | ||
outputFolder, | ||
}); | ||
const warnings = instance.states.warnings; | ||
const state = instance.states.upload; | ||
const ctx = { | ||
const { warnings, upload: state } = instance.states; | ||
const stopped$ = instance.uploadStopped$; | ||
const context = { | ||
settings, | ||
@@ -41,29 +36,35 @@ warnings, | ||
logger, | ||
stopped$, | ||
}; | ||
const { add: queueFile, empty$ } = createQueue({}, async (file) => processFile(ctx, file)); | ||
const scanner = createFileScanner({ | ||
database, | ||
inputFolders, | ||
filetypes, | ||
outputFolder, | ||
context, | ||
}); | ||
let running = true; | ||
stopped$.subscribe(() => { | ||
running = false; | ||
}); | ||
const { add: queueFile, empty$ } = createQueue({}, async (file) => { | ||
if (running) { | ||
await processFile(context, file); | ||
} | ||
}); | ||
const uploadInterval = instance.config.options.fileCheckInterval * 1000; | ||
const queueEmpty = () => empty$.pipe(first()).toPromise(); | ||
const stopped$ = instance.uploadStopped$; | ||
return async () => { | ||
let running = true; | ||
stopped$.subscribe(() => { | ||
running = false; | ||
}); | ||
while (running) { | ||
const startTime = Date.now(); | ||
try { | ||
const files = await scanner(); | ||
// NOTE queueEmpty will never resolve if nothing is queued | ||
if (files.length > 0) { | ||
for (const file of files) { | ||
queueFile(file); | ||
} | ||
// NOTE errors that occur in the queue are swallowed | ||
await queueEmpty(); | ||
// NOTE scanner errors are handled inside createFileScanner | ||
const files = await scanner(); | ||
// NOTE queueEmpty will never resolve if nothing is queued | ||
if (files.length > 0) { | ||
for (const file of files) { | ||
queueFile(file); | ||
} | ||
// NOTE errors that occur in the queue are swallowed | ||
await queueEmpty(); | ||
} | ||
catch (err) { | ||
// NOTE err is only ever from the scanner | ||
// TODO add warning about FS error | ||
} | ||
const deltaTime = Date.now() - startTime; | ||
@@ -75,3 +76,3 @@ const delay = Math.max(0, uploadInterval - deltaTime); | ||
} | ||
export function createFileScanner({ database, inputFolders, outputFolder, filetypes, }) { | ||
export function createFileScanner({ database, inputFolders, outputFolder, filetypes, context, }) { | ||
const filter = async (location) => { | ||
@@ -81,2 +82,5 @@ const exists = await database.seenUpload(location); | ||
}; | ||
const errorHandler = (err) => { | ||
addWarning(context, UploadWarnings.SCAN_FAIL, err + ''); | ||
}; | ||
const options = { | ||
@@ -87,7 +91,11 @@ inputFolders, | ||
filter, | ||
errorHandler, | ||
}; | ||
return () => loadInputFiles(options); | ||
} | ||
export function readSettings(workflow) { | ||
export function readSettings({ options, instance: workflowInstance, workflow, }) { | ||
var _a, _b; | ||
assertDefined(workflow); | ||
assertDefined(workflowInstance.bucket, 'workflowInstance.bucket'); | ||
assertDefined(workflowInstance.bucketFolder, 'workflowInstance.bucketFolder'); | ||
const settings = { | ||
@@ -97,2 +105,6 @@ maxFiles: Infinity, | ||
requiresStorage: false, | ||
bucket: workflowInstance.bucket, | ||
bucketFolder: workflowInstance.bucketFolder, | ||
sseKeyId: workflowInstance.key_id, | ||
retries: options.uploadRetries, | ||
}; | ||
@@ -146,7 +158,7 @@ const workflowAttributes = asOptDictionary(workflow.workflowAttributes || workflow.workflow_attributes); | ||
if (settings.maxFiles <= state.filesCount) { | ||
await skipFile(file, ctx, UploadWarnings.TOO_MANY); | ||
await skipFile(file, ctx, FileUploadWarnings.TOO_MANY); | ||
return; | ||
} | ||
if (file.size === 0) { | ||
await skipFile(file, ctx, UploadWarnings.EMPTY); | ||
await skipFile(file, ctx, FileUploadWarnings.EMPTY); | ||
return; | ||
@@ -159,3 +171,3 @@ } | ||
if (canSplit && shouldSplit && isDefined(split)) { | ||
addWarning(file, ctx, UploadWarnings.SPLIT); | ||
addFileWarning(file, ctx, FileUploadWarnings.SPLIT); | ||
const isCompressed = /\.gz$/i.test(file.path); | ||
@@ -170,4 +182,2 @@ const directory = path.dirname(file.relative); | ||
const id = `${file.id}_${chunkId}`; | ||
const stats = await filestats(chunkFile); | ||
const { bytes: size } = stats; | ||
const chunkFilestat = { | ||
@@ -178,7 +188,5 @@ name, | ||
id, | ||
size, | ||
size: 0, | ||
}; | ||
// TODO this should retry if it fails... but only for some reasons | ||
// await uploadFile(chunkFilestat, stats, ctx); | ||
await ctx.instance.uploadJob(Object.assign(Object.assign({}, chunkFilestat), { stats })); | ||
await uploadJob(ctx, chunkFilestat); | ||
await database.splitDone(chunkFile); | ||
@@ -193,16 +201,24 @@ }, isCompressed); | ||
if (file.size > settings.maxFileSize) { | ||
await skipFile(file, ctx, UploadWarnings.TOO_BIG); | ||
await skipFile(file, ctx, FileUploadWarnings.TOO_BIG); | ||
return; | ||
} | ||
const stats = await filestats(file.path); | ||
// TODO this should retry if it fails... but only for some reasons | ||
// await uploadFile(file, stats, ctx); | ||
await ctx.instance.uploadJob(Object.assign(Object.assign({}, file), { stats })); | ||
await uploadJob(ctx, file); | ||
state.filesCount += 1; | ||
} | ||
export async function skipFile(file, ctx, warn) { | ||
addWarning(file, ctx, warn); | ||
addFileWarning(file, ctx, warn); | ||
await ctx.database.skipFile(file.path); | ||
} | ||
export function addWarning(file, ctx, warn) { | ||
export function addWarning(ctx, warn, msg) { | ||
const { logger, warnings } = ctx; | ||
let type; | ||
switch (warn) { | ||
case UploadWarnings.SCAN_FAIL: | ||
type = 'WARNING_SCAN_FAIL'; | ||
break; | ||
} | ||
logger.error(msg); | ||
warnings.push({ msg, type }); | ||
} | ||
export function addFileWarning(file, ctx, warn) { | ||
var _a, _b; | ||
@@ -213,21 +229,258 @@ const { settings, logger, warnings } = ctx; | ||
switch (warn) { | ||
case UploadWarnings.EMPTY: | ||
case FileUploadWarnings.EMPTY: | ||
type = 'WARNING_FILE_EMPTY'; | ||
msg = `The file ${file.relative} is empty. It will be skipped.`; | ||
break; | ||
case UploadWarnings.SPLIT: | ||
case FileUploadWarnings.SPLIT: | ||
type = 'WARNING_FILE_SPLIT'; | ||
msg = `${file.relative}${file.size > splitSize ? ' is too big and' : ''} is going to be split`; | ||
break; | ||
case UploadWarnings.TOO_BIG: | ||
case FileUploadWarnings.TOO_BIG: | ||
type = 'WARNING_FILE_TOO_BIG'; | ||
msg = `The file ${file.relative} is bigger than the maximum size limit (${niceSize(settings.maxFileSize)}B). It will be skipped.`; | ||
break; | ||
case UploadWarnings.TOO_MANY: | ||
case FileUploadWarnings.TOO_MANY: | ||
type = 'WARNING_FILE_TOO_MANY'; | ||
msg = `Maximum ${settings.maxFiles} file(s) already uploaded. Marking ${file.relative} as skipped.`; | ||
break; | ||
case FileUploadWarnings.UPLOAD_FAILED: | ||
type = 'WARNING_FILE_UPLOAD_FAILED'; | ||
msg = `Uploading ${file.relative} failed.`; | ||
break; | ||
case FileUploadWarnings.UPLOAD_RETRIES_EXCEEDED: | ||
type = 'WARNING_FILE_UPLOAD_RETRIES_EXCEEDED'; | ||
msg = `Exceeded maximum retries uploading ${file.relative}. This file will not be uploaded.`; | ||
break; | ||
case FileUploadWarnings.MESSAGE_RETRIES_EXCEEDED: | ||
type = 'WARNING_FILE_UPLOAD_MESSAGE_RETRIES_EXCEEDED'; | ||
msg = `Exceeded maximum retries adding ${file.relative} to the queue. This file will not be processed.`; | ||
break; | ||
} | ||
logger.error(msg); | ||
logger.warn(msg); | ||
warnings.push({ msg, type }); | ||
} | ||
export async function uploadJob(ctx, file) { | ||
await uploadFile(file, await filestats(file.path), ctx); | ||
} | ||
export function addFailure(state, msg) { | ||
var _a; | ||
if (!state.failure) { | ||
state.failure = {}; | ||
} | ||
state.failure[msg] = ((_a = state.failure[msg]) !== null && _a !== void 0 ? _a : 0) + 1; | ||
} | ||
export function openReadStream(location, handler) { | ||
const rs = fs.createReadStream(location); | ||
return new Promise((resolve, reject) => { | ||
rs.addListener('open', async () => { | ||
try { | ||
await handler(rs); | ||
resolve(); | ||
} | ||
catch (err) { | ||
rs.close(); // ensure the stream is closed if we have an error in the handler | ||
reject(err); | ||
} | ||
}); | ||
rs.addListener('error', (err) => reject(`Upload filesystem error ${err}`)); | ||
}); | ||
} | ||
export function constructUploadParameters(ctx, file, rs) { | ||
const { settings: { bucket, sseKeyId, bucketFolder }, } = ctx; | ||
const mangledRelative = file.relative | ||
.replace(/^[\\/]+/, '') | ||
.replace(/\\/g, '/') | ||
.replace(/\//g, '_'); // MC-7204, MC-7206 - this needs to be unpicked in future | ||
const key = [ | ||
bucketFolder, | ||
'component-0', | ||
mangledRelative, | ||
mangledRelative, | ||
] | ||
.join('/') | ||
.replace(/\/+/g, '/'); | ||
const params = { | ||
Bucket: bucket, | ||
Key: key, | ||
Body: rs, | ||
}; | ||
if (sseKeyId) { | ||
// MC-4996 support (optional, for now) encryption | ||
params.SSEKMSKeyId = sseKeyId; | ||
params.ServerSideEncryption = 'aws:kms'; | ||
} | ||
if (file.size) { | ||
params.ContentLength = file.size; | ||
} | ||
return params; | ||
} | ||
export async function uploadFile(file, stats, ctx) { | ||
const { state, instance, logger, stopped$ } = ctx; | ||
try { | ||
const timeout = (instance.config.options.uploadTimeout + 5) * 1000; | ||
const s3 = await instance.sessionedS3({ | ||
retryDelayOptions: { | ||
customBackoff(count, err) { | ||
addFileWarning(file, ctx, FileUploadWarnings.UPLOAD_FAILED); | ||
ctx.logger.error('Upload error', err); | ||
if (count > ctx.settings.retries) { | ||
addFileWarning(file, ctx, FileUploadWarnings.UPLOAD_RETRIES_EXCEEDED); | ||
return -1; | ||
} | ||
return 2 ** count * 1000; // 2s, 4s, 8s, 16s, 32s | ||
}, | ||
}, | ||
maxRetries: ctx.settings.retries, | ||
httpOptions: { | ||
timeout, | ||
}, | ||
}); | ||
await openReadStream(file.path, async (rs) => { | ||
var _a; | ||
const params = constructUploadParameters(ctx, file, rs); | ||
const options = { | ||
partSize: 10 * 1024 * 1024, | ||
queueSize: 1, | ||
}; | ||
instance.uploadState('progress', 'incr', { | ||
total: file.size, | ||
}); | ||
const managedUpload = s3.upload(params, options); | ||
const subscription = stopped$.subscribe(() => { | ||
managedUpload.abort(); | ||
}); | ||
const sessionManager = instance.initSessionManager([s3]); | ||
sessionManager.sts_expiration = (_a = instance.sessionManager) === null || _a === void 0 ? void 0 : _a.sts_expiration; // No special options here, so use the main session and don't refetch until it's expired | ||
let currentProgress = 0; | ||
managedUpload.on('httpUploadProgress', async (progress) => { | ||
const progressDelta = progress.loaded - currentProgress; | ||
instance.uploadState('progress', 'incr', { | ||
bytes: progressDelta, | ||
}); // delta since last time | ||
currentProgress = progress.loaded; // store for calculating delta next iteration | ||
try { | ||
await sessionManager.session(); // MC-7129 force refresh token on the MANAGED UPLOAD instance of the s3 service | ||
} | ||
catch (e) { | ||
logger.warn(`Error refreshing token: ${String(e)}`); | ||
} | ||
}); | ||
try { | ||
await managedUpload.promise(); | ||
await uploadComplete(ctx, params.Key, file); // send message | ||
} | ||
finally { | ||
instance.uploadState('progress', 'decr', { | ||
total: file.size, | ||
bytes: currentProgress, | ||
}); // zero in-flight upload counters | ||
subscription.unsubscribe(); | ||
} | ||
}); | ||
const { bytes = 0, reads = 0, sequences = 0 } = stats !== null && stats !== void 0 ? stats : {}; | ||
instance.uploadState('success', 'incr', { files: 1, bytes, reads, sequences }); | ||
const ext = path.extname(file.name); | ||
instance.uploadState('types', 'incr', { | ||
[ext]: 1, | ||
}); | ||
} | ||
catch (err) { | ||
addFailure(state, err + ''); | ||
throw err; | ||
} | ||
} | ||
function createMessage(instance, objectId) { | ||
const workflowInstance = instance.config.instance; | ||
const message = { | ||
bucket: workflowInstance.bucket, | ||
outputQueue: workflowInstance.outputQueueName, | ||
remote_addr: workflowInstance.remote_addr, | ||
apikey: instance.config.options.apikey, | ||
id_workflow_instance: workflowInstance.id_workflow_instance, | ||
id_master: workflowInstance.id_workflow, | ||
utc: new Date().toISOString(), | ||
path: objectId, | ||
prefix: objectId.substring(0, objectId.lastIndexOf('/')), | ||
key_id: workflowInstance.key_id, | ||
}; | ||
if (workflowInstance.chain) { | ||
let components; | ||
if (!isDictionaryOf(isDictionary)(workflowInstance.chain.components)) { | ||
throw new Error('Unexpected chain definition'); | ||
} | ||
try { | ||
components = JSON.parse(JSON.stringify(workflowInstance.chain.components)); // low-frills object clone | ||
} | ||
catch (_a) { | ||
throw new Error(`Failed to clone workflow chain`); | ||
} | ||
for (const component of Object.values(components)) { | ||
switch (component === null || component === void 0 ? void 0 : component.inputQueueName) { | ||
case 'uploadMessageQueue': | ||
component.inputQueueName = instance.uploadMessageQueue; | ||
break; | ||
case 'downloadMessageQueue': | ||
component.inputQueueName = instance.downloadMessageQueue; | ||
break; | ||
default: | ||
// NOTE should this be a NOOP or an error | ||
break; | ||
} | ||
} | ||
message.components = components; | ||
message.targetComponentId = workflowInstance.chain.targetComponentId; | ||
} | ||
return message; | ||
} | ||
async function messageInputQueue(ctx, objectId, file) { | ||
const { instance, logger } = ctx; | ||
const message = createMessage(instance, objectId); | ||
try { | ||
const inputQueueURL = await instance.discoverQueue(asOptString(instance.config.instance.inputQueueName)); | ||
const sqs = await instance.sessionedSQS({ | ||
retryDelayOptions: { | ||
customBackoff(count, err) { | ||
ctx.logger.error('Upload message error', err); | ||
if (count > ctx.settings.retries) { | ||
addFileWarning(file, ctx, FileUploadWarnings.MESSAGE_RETRIES_EXCEEDED); | ||
return -1; | ||
} | ||
return 2 ** count * 1000; // 2s, 4s, 8s, 16s, 32s | ||
}, | ||
}, | ||
maxRetries: ctx.settings.retries, | ||
}); | ||
logger.info(`${file.id} sending SQS message to input queue`); | ||
const { MessageId } = await sqs | ||
.sendMessage({ | ||
QueueUrl: inputQueueURL, | ||
MessageBody: JSON.stringify(message), | ||
}) | ||
.promise(); | ||
return MessageId; | ||
} | ||
catch (sendMessageException) { | ||
logger.error(`${file.id} exception sending SQS message: ${String(sendMessageException)}`); | ||
throw sendMessageException; | ||
} | ||
} | ||
async function uploadComplete(ctx, objectId, file) { | ||
const { instance, database, logger } = ctx; | ||
logger.info(`${file.id} uploaded to S3: ${objectId}`); | ||
const messageId = await messageInputQueue(ctx, objectId, file); | ||
const workflowInstance = instance.config.instance; | ||
instance | ||
.realtimeFeedback(`workflow_instance:state`, { | ||
type: 'start', | ||
id_workflow_instance: workflowInstance.id_workflow_instance, | ||
id_workflow: workflowInstance.id_workflow, | ||
component_id: '0', | ||
message_id: messageId, | ||
id_user: workflowInstance.id_user, | ||
}) | ||
.catch((e) => { | ||
logger.warn(`realtimeFeedback failed: ${String(e)}`); | ||
}); | ||
logger.info(`${file.id} SQS message sent. Mark as uploaded`); | ||
await database.uploadFile(file.path); | ||
} |
@@ -0,7 +1,14 @@ | ||
export var FileUploadWarnings; | ||
(function (FileUploadWarnings) { | ||
FileUploadWarnings[FileUploadWarnings["TOO_MANY"] = 0] = "TOO_MANY"; | ||
FileUploadWarnings[FileUploadWarnings["EMPTY"] = 1] = "EMPTY"; | ||
FileUploadWarnings[FileUploadWarnings["TOO_BIG"] = 2] = "TOO_BIG"; | ||
FileUploadWarnings[FileUploadWarnings["SPLIT"] = 3] = "SPLIT"; | ||
FileUploadWarnings[FileUploadWarnings["UPLOAD_FAILED"] = 4] = "UPLOAD_FAILED"; | ||
FileUploadWarnings[FileUploadWarnings["UPLOAD_RETRIES_EXCEEDED"] = 5] = "UPLOAD_RETRIES_EXCEEDED"; | ||
FileUploadWarnings[FileUploadWarnings["MESSAGE_RETRIES_EXCEEDED"] = 6] = "MESSAGE_RETRIES_EXCEEDED"; | ||
})(FileUploadWarnings || (FileUploadWarnings = {})); | ||
export var UploadWarnings; | ||
(function (UploadWarnings) { | ||
UploadWarnings[UploadWarnings["TOO_MANY"] = 0] = "TOO_MANY"; | ||
UploadWarnings[UploadWarnings["EMPTY"] = 1] = "EMPTY"; | ||
UploadWarnings[UploadWarnings["TOO_BIG"] = 2] = "TOO_BIG"; | ||
UploadWarnings[UploadWarnings["SPLIT"] = 3] = "SPLIT"; | ||
UploadWarnings[UploadWarnings["SCAN_FAIL"] = 0] = "SCAN_FAIL"; | ||
})(UploadWarnings || (UploadWarnings = {})); |
@@ -30,3 +30,3 @@ import { createInspector } from 'fs-inspect'; | ||
*/ | ||
export async function loadInputFiles({ inputFolders, outputFolder, filetypes, filter, }) { | ||
export async function loadInputFiles({ inputFolders, outputFolder, filetypes, filter, errorHandler, }) { | ||
const extensionFilter = createExtensionFilter(filetypes !== null && filetypes !== void 0 ? filetypes : ''); | ||
@@ -54,2 +54,3 @@ const outputBasename = outputFolder ? basename(outputFolder) : null; | ||
}, | ||
catch: errorHandler, | ||
}); | ||
@@ -56,0 +57,0 @@ const results = []; |
@@ -1,2 +0,2 @@ | ||
const niceSize = (sizeIn, unitIndexIn) => { | ||
export function niceSize(sizeIn, unitIndexIn) { | ||
const UNITS = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z']; | ||
@@ -15,3 +15,2 @@ const DIV = 1000; | ||
return `${size.toFixed(1)}${UNITS[unitIndex]}`; | ||
}; | ||
export default niceSize; | ||
} |
import ProxyAgent from 'proxy-agent'; | ||
import { asDictionary, isIndex, makeNumber } from 'ts-runtime-typecheck'; | ||
import { isIndex, makeNumber, asDefined, asOptString, isDefined } from 'ts-runtime-typecheck'; | ||
export default class SessionManager { | ||
@@ -27,3 +27,3 @@ constructor(idWorkflowInstance, REST, children, opts, graphQL) { | ||
const result = await this.graphQL.instanceToken(instanceTokenOptions); | ||
token = asDictionary((_a = result.data) === null || _a === void 0 ? void 0 : _a.token); | ||
token = asDefined((_a = result.data) === null || _a === void 0 ? void 0 : _a.token); | ||
} | ||
@@ -38,4 +38,17 @@ else { | ||
this.sts_expiration = new Date(token.expiration).getTime() - 60 * makeNumber((_b = this.options.sessionGrace) !== null && _b !== void 0 ? _b : '0'); // refresh token x mins before it expires | ||
const configUpdate = token; | ||
let credentials = null; | ||
if (isDefined(token.accessKeyId) && isDefined(token.secretAccessKey)) { | ||
credentials = { | ||
accessKeyId: token.accessKeyId, | ||
secretAccessKey: token.secretAccessKey, | ||
sessionToken: asOptString(token.sessionToken), | ||
}; | ||
} | ||
const configUpdate = { | ||
credentials, | ||
region: asOptString(token.region), | ||
}; | ||
if (this.options.proxy) { | ||
// NOTE AWS SDK explicitly does a deep merge on httpOptions | ||
// so this won't squash any options that have already been set | ||
configUpdate.httpOptions = { | ||
@@ -42,0 +55,0 @@ agent: ProxyAgent(this.options.proxy), |
@@ -1,4 +0,3 @@ | ||
import { Subject } from 'rxjs'; | ||
/* | ||
NOTE this exists mostly because TS is having a really hard time to decide | ||
NOTE this partially exists because TS is having a really hard time to decide | ||
which version the timer functions we are using (the return type of Node.js | ||
@@ -32,17 +31,2 @@ timers is an object not a number) and in some cases appears to be using | ||
} | ||
export function createTimeout$(duration) { | ||
const timeout$ = new Subject(); | ||
const cb = () => timeout$.complete(); | ||
let id = setTimeout(cb, duration); | ||
return { | ||
cancel() { | ||
clearTimeout(id); | ||
}, | ||
timeout$, | ||
reset(newDuration = duration) { | ||
clearTimeout(id); | ||
id = setTimeout(cb, newDuration); | ||
}, | ||
}; | ||
} | ||
export function sleep(duration) { | ||
@@ -49,0 +33,0 @@ return new Promise((resolve) => { |
@@ -12,3 +12,3 @@ { | ||
"private": false, | ||
"version": "5.0.5200121", | ||
"version": "5.0.5353744", | ||
"main": "cjs/index-web.js", | ||
@@ -15,0 +15,0 @@ "module": "esm/index-web.js", |
@@ -13,2 +13,3 @@ import type { Logger } from './Logger'; | ||
uploadTimeout: number; | ||
uploadRetries: number; | ||
downloadTimeout: number; | ||
@@ -15,0 +16,0 @@ fileCheckInterval: number; |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
2406824
175
27184
155