New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@metrichor/epi2me-web

Package Overview
Dependencies
Maintainers
3
Versions
202
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@metrichor/epi2me-web - npm Package Compare versions

Comparing version 5.0.5200121 to 5.0.5353744

cjs/filestats/index.js

1

cjs/default_options.json

@@ -8,2 +8,3 @@ {

"uploadTimeout": 1200,
"uploadRetries": 10,
"downloadTimeout": 1200,

@@ -10,0 +11,0 @@ "fileCheckInterval": 5,

268

cjs/epi2me-fs.js

@@ -22,3 +22,2 @@ "use strict";

const factory_1 = require("./factory");
const filestats_1 = __importDefault(require("./filestats"));
const rest_fs_1 = require("./rest-fs");

@@ -36,2 +35,3 @@ const sample_reader_1 = require("./sample-reader");

const queue_1 = require("./queue");
const filestats_1 = require("./filestats");
const fileUploader_1 = require("./fileUploader");

@@ -57,5 +57,4 @@ const networkStreamErrors = new WeakSet();

this.SampleReader = new sample_reader_1.SampleReader();
this.uploadsInProgress = [];
}
async sessionedS3() {
async sessionedS3(options = {}) {
if (!this.sessionManager) {

@@ -65,7 +64,5 @@ this.sessionManager = this.initSessionManager();

await this.sessionManager.session();
return new aws_sdk_1.default.S3({
useAccelerateEndpoint: this.config.options.awsAcceleration === 'on',
});
return new aws_sdk_1.default.S3(Object.assign({ useAccelerateEndpoint: this.config.options.awsAcceleration === 'on' }, options));
}
async sessionedSQS() {
async sessionedSQS(options = {}) {
if (!this.sessionManager) {

@@ -75,3 +72,3 @@ this.sessionManager = this.initSessionManager();

await this.sessionManager.session();
return new aws_sdk_1.default.SQS();
return new aws_sdk_1.default.SQS(options);
}

@@ -420,6 +417,2 @@ async deleteMessage(message) {

async stopUpload() {
for (const inProgressUpload of this.uploadsInProgress) {
inProgressUpload.abort();
}
this.uploadsInProgress = [];
super.stopUpload();

@@ -504,45 +497,2 @@ this.log.debug('clearing split files');

}
async uploadJob(file) {
// Initiate file upload to S3
var _a;
let file2 = null;
let errorMsg;
try {
this.log.info(`upload: ${file.id} starting`);
file2 = await this.uploadHandler(file);
this.log.info(`upload: ${file2.id} uploaded and notified`);
}
catch (err) {
errorMsg = err;
this.log.error(`upload: ${file.id} done, but failed: ${String(errorMsg)}`);
}
if (errorMsg) {
this.log.error(`uploadJob ${errorMsg}`);
if (!this.states.upload.failure) {
this.states.upload.failure = {};
}
this.states.upload.failure[errorMsg] = this.states.upload.failure[errorMsg]
? ts_runtime_typecheck_1.asNumber(this.states.upload.failure[errorMsg]) + 1
: 1;
if (String(errorMsg).match(/AWS.SimpleQueueService.NonExistentQueue/)) {
// FATALITY! thrown during sqs.sendMessage
this.log.error(`instance stopped because of a fatal error`);
return this.stopEverything();
}
}
else {
const { bytes = 0, reads = 0, sequences = 0 } = (_a = file2 === null || file2 === void 0 ? void 0 : file2.stats) !== null && _a !== void 0 ? _a : {};
// this.uploadState('queueLength', 'decr', file2.stats); // this.states.upload.queueLength = this.states.upload.queueLength ? this.states.upload.queueLength - readCount : 0;
this.uploadState('success', 'incr', { files: 1, bytes, reads, sequences });
// this.states.upload.success = this.states.upload.success ? this.states.upload.success + readCount : readCount;
if (file2 === null || file2 === void 0 ? void 0 : file2.name) {
// nb. we only count types for successful uploads
const ext = path_1.default.extname(file2.name);
this.uploadState('types', 'incr', {
[ext]: 1,
});
}
}
// file-by-file?
}
async receiveMessages(receiveMessages) {

@@ -843,3 +793,3 @@ if (!receiveMessages || !receiveMessages.Messages || !receiveMessages.Messages.length) {

const ext = path_1.default.extname(outputFile);
const { bytes, reads, sequences } = await filestats_1.default(outputFile);
const { bytes, reads, sequences } = await filestats_1.filestats(outputFile);
this.downloadState('success', 'incr', {

@@ -890,3 +840,2 @@ files: 1,

});
/* download stream timeout in ms */
this.timers.transferTimeouts[outputFile] = transferTimeout;

@@ -940,207 +889,2 @@ const updateVisibilityFunc = async () => {

}
async uploadHandler(file) {
/** open readStream and pipe to S3.upload */
const s3 = await this.sessionedS3();
let rs;
let closed = false;
const mangledRelative = file.relative
.replace(/^[\\/]+/, '')
.replace(/\\/g, '/')
.replace(/\//g, '_'); // MC-7204, MC-7206 - this needs to be unpicked in future
const objectId = [
this.config.instance.bucketFolder,
'component-0',
mangledRelative,
mangledRelative,
]
.join('/')
.replace(/\/+/g, '/');
let timeoutHandle;
const p = new Promise((resolve, reject) => {
const timeoutFunc = () => {
if (rs && !closed) {
rs.close();
}
reject(new Error(`${file.name} timed out`));
};
// timeout to ensure this completeCb *always* gets called
timeoutHandle = timers_1.createTimeout((this.config.options.uploadTimeout + 5) * 1000, timeoutFunc);
try {
rs = fs_extra_1.default.createReadStream(file.path);
rs.on('close', () => {
closed = true;
});
}
catch (createReadStreamException) {
timeoutHandle.cancel();
reject(createReadStreamException);
return;
}
rs.on('error', (readStreamError) => {
rs.close();
let errstr = 'error in upload readstream';
if (readStreamError === null || readStreamError === void 0 ? void 0 : readStreamError.message) {
errstr += `: ${readStreamError.message}`;
}
timeoutHandle.cancel();
reject(new Error(errstr));
});
rs.on('open', async () => {
var _a;
const params = {
Bucket: ts_runtime_typecheck_1.asString(this.config.instance.bucket),
Key: objectId,
Body: rs,
};
const service = new aws_sdk_1.default.S3();
const options = {
partSize: 10 * 1024 * 1024,
queueSize: 1,
service,
};
if (this.config.instance.key_id) {
// MC-4996 support (optional, for now) encryption
params.SSEKMSKeyId = this.config.instance.key_id;
params.ServerSideEncryption = 'aws:kms';
}
if (file.size) {
params['Content-Length'] = file.size;
}
this.uploadState('progress', 'incr', {
total: file.size,
});
let myProgress = 0;
const managedUpload = s3.upload(params, options);
this.uploadsInProgress.push(managedUpload);
const sessionManager = this.initSessionManager([service]);
sessionManager.sts_expiration = (_a = this.sessionManager) === null || _a === void 0 ? void 0 : _a.sts_expiration; // No special options here, so use the main session and don't refetch until it's expired
managedUpload.on('httpUploadProgress', async (progress) => {
// Breaking out here causes this.states.progress.bytes to get out of sync.
// if (this.stopped) {
// reject(new Error('stopped'));
// return;
// }
// this.log.debug(`upload progress ${progress.key} ${progress.loaded} / ${progress.total}`);
this.uploadState('progress', 'incr', {
bytes: progress.loaded - myProgress,
}); // delta since last time
myProgress = progress.loaded; // store for calculating delta next iteration
timeoutHandle.reset(); // MC-6789 - reset upload timeout
try {
await sessionManager.session(); // MC-7129 force refresh token on the MANAGED UPLOAD instance of the s3 service
}
catch (e) {
this.log.warn(`Error refreshing token: ${String(e)}`);
}
});
try {
await managedUpload.promise();
this.log.info(`${file.id} S3 upload complete`);
rs.close();
timeoutHandle.cancel();
await this.uploadComplete(objectId, file); // send message
resolve(file);
}
catch (uploadStreamErr) {
this.log.warn(`${file.id} uploadStreamError ${uploadStreamErr}`);
reject(uploadStreamErr);
}
finally {
this.uploadState('progress', 'decr', {
total: file.size,
bytes: file.size,
}); // zero in-flight upload counters
this.uploadsInProgress = this.uploadsInProgress.filter((upload) => upload !== managedUpload);
}
});
// rs.on('end', rs.close);
// rs.on('close', () => this.log.debug('closing readstream'));
});
return p;
}
async uploadComplete(objectId, file) {
this.log.info(`${file.id} uploaded to S3: ${objectId}`);
const message = {
bucket: this.config.instance.bucket,
outputQueue: this.config.instance.outputQueueName,
remote_addr: this.config.instance.remote_addr,
apikey: this.config.options.apikey,
id_workflow_instance: this.config.instance.id_workflow_instance,
id_master: this.config.instance.id_workflow,
utc: new Date().toISOString(),
path: objectId,
prefix: objectId.substring(0, objectId.lastIndexOf('/')),
};
if (this.config.instance.chain) {
try {
message.components = JSON.parse(JSON.stringify(this.config.instance.chain.components)); // low-frills object clone
message.targetComponentId = this.config.instance.chain.targetComponentId; // first component to run
}
catch (jsonException) {
this.log.error(`${file.id} exception parsing components JSON ${String(jsonException)}`);
return Promise.reject(jsonException); // close the queue job
}
}
// MC-5943 support (optional, for now) #SSE #crypto!
if (this.config.instance.key_id) {
message.key_id = this.config.instance.key_id;
}
// MC-1304 - attach geo location and ip
if (this.config.options.agent_address) {
try {
message.agent_address = JSON.parse(this.config.options.agent_address);
}
catch (exception) {
this.log.error(`${file.id} Could not parse agent_address ${String(exception)}`);
}
}
if (message.components) {
const components = ts_runtime_typecheck_1.asDictionaryOf(ts_runtime_typecheck_1.isDictionary)(message.components);
// optionally populate input + output queues
for (const component of Object.values(components)) {
switch (component === null || component === void 0 ? void 0 : component.inputQueueName) {
case 'uploadMessageQueue':
component.inputQueueName = this.uploadMessageQueue;
break;
case 'downloadMessageQueue':
component.inputQueueName = this.downloadMessageQueue;
break;
default:
// NOTE should this be a NOOP or an error
break;
}
}
}
let sentMessage;
try {
const inputQueueURL = await this.discoverQueue(ts_runtime_typecheck_1.asOptString(this.config.instance.inputQueueName));
const sqs = await this.sessionedSQS();
this.log.info(`${file.id} sending SQS message to input queue`);
sentMessage = await sqs
.sendMessage({
QueueUrl: inputQueueURL,
MessageBody: JSON.stringify(message),
})
.promise();
}
catch (sendMessageException) {
this.log.error(`${file.id} exception sending SQS message: ${String(sendMessageException)}`);
throw sendMessageException;
}
this.realtimeFeedback(`workflow_instance:state`, {
type: 'start',
id_workflow_instance: this.config.instance.id_workflow_instance,
id_workflow: this.config.instance.id_workflow,
component_id: '0',
message_id: sentMessage.MessageId,
id_user: this.config.instance.id_user,
}).catch((e) => {
this.log.warn(`realtimeFeedback failed: ${String(e)}`);
});
this.log.info(`${file.id} SQS message sent. Mark as uploaded`);
if (!this.db) {
throw new Error('Database has not been instantiated');
}
await this.db.uploadFile(file.path);
}
observeTelemetry() {

@@ -1147,0 +891,0 @@ var _a, _b;

@@ -17,3 +17,3 @@ "use strict";

const graphql_1 = require("./graphql");
const niceSize_1 = __importDefault(require("./niceSize"));
const niceSize_1 = require("./niceSize");
const rest_1 = require("./rest");

@@ -84,3 +84,3 @@ const socket_1 = __importDefault(require("./socket"));

static parseOptObject(opt) {
const options = Object.assign(Object.assign({}, parseCoreOpts_1.parseCoreOpts(opt)), { region: ts_runtime_typecheck_1.asString(opt.region, default_options_json_1.default.region), sessionGrace: ts_runtime_typecheck_1.asNumber(opt.sessionGrace, default_options_json_1.default.sessionGrace), uploadTimeout: ts_runtime_typecheck_1.asNumber(opt.uploadTimeout, default_options_json_1.default.uploadTimeout), downloadTimeout: ts_runtime_typecheck_1.asNumber(opt.downloadTimeout, default_options_json_1.default.downloadTimeout), fileCheckInterval: ts_runtime_typecheck_1.asNumber(opt.fileCheckInterval, default_options_json_1.default.fileCheckInterval), downloadCheckInterval: ts_runtime_typecheck_1.asNumber(opt.downloadCheckInterval, default_options_json_1.default.downloadCheckInterval), stateCheckInterval: ts_runtime_typecheck_1.asNumber(opt.stateCheckInterval, default_options_json_1.default.stateCheckInterval), inFlightDelay: ts_runtime_typecheck_1.asNumber(opt.inFlightDelay, default_options_json_1.default.inFlightDelay), waitTimeSeconds: ts_runtime_typecheck_1.asNumber(opt.waitTimeSeconds, default_options_json_1.default.waitTimeSeconds), waitTokenError: ts_runtime_typecheck_1.asNumber(opt.waitTokenError, default_options_json_1.default.waitTokenError), transferPoolSize: ts_runtime_typecheck_1.asNumber(opt.transferPoolSize, default_options_json_1.default.transferPoolSize), downloadMode: ts_runtime_typecheck_1.asString(opt.downloadMode, default_options_json_1.default.downloadMode), filetype: ts_runtime_typecheck_1.asArrayOf(ts_runtime_typecheck_1.isString)(opt.filetype, default_options_json_1.default.filetype), sampleDirectory: ts_runtime_typecheck_1.asString(opt.sampleDirectory, default_options_json_1.default.sampleDirectory),
const options = Object.assign(Object.assign({}, parseCoreOpts_1.parseCoreOpts(opt)), { region: ts_runtime_typecheck_1.asString(opt.region, default_options_json_1.default.region), sessionGrace: ts_runtime_typecheck_1.asNumber(opt.sessionGrace, default_options_json_1.default.sessionGrace), uploadTimeout: ts_runtime_typecheck_1.asNumber(opt.uploadTimeout, default_options_json_1.default.uploadTimeout), uploadRetries: ts_runtime_typecheck_1.asNumber(opt.uploadRetries, default_options_json_1.default.uploadRetries), downloadTimeout: ts_runtime_typecheck_1.asNumber(opt.downloadTimeout, default_options_json_1.default.downloadTimeout), fileCheckInterval: ts_runtime_typecheck_1.asNumber(opt.fileCheckInterval, default_options_json_1.default.fileCheckInterval), downloadCheckInterval: ts_runtime_typecheck_1.asNumber(opt.downloadCheckInterval, default_options_json_1.default.downloadCheckInterval), stateCheckInterval: ts_runtime_typecheck_1.asNumber(opt.stateCheckInterval, default_options_json_1.default.stateCheckInterval), inFlightDelay: ts_runtime_typecheck_1.asNumber(opt.inFlightDelay, default_options_json_1.default.inFlightDelay), waitTimeSeconds: ts_runtime_typecheck_1.asNumber(opt.waitTimeSeconds, default_options_json_1.default.waitTimeSeconds), waitTokenError: ts_runtime_typecheck_1.asNumber(opt.waitTokenError, default_options_json_1.default.waitTokenError), transferPoolSize: ts_runtime_typecheck_1.asNumber(opt.transferPoolSize, default_options_json_1.default.transferPoolSize), downloadMode: ts_runtime_typecheck_1.asString(opt.downloadMode, default_options_json_1.default.downloadMode), filetype: ts_runtime_typecheck_1.asArrayOf(ts_runtime_typecheck_1.isString)(opt.filetype, default_options_json_1.default.filetype), sampleDirectory: ts_runtime_typecheck_1.asString(opt.sampleDirectory, default_options_json_1.default.sampleDirectory),
// optional values

@@ -245,3 +245,3 @@ useGraphQL: ts_runtime_typecheck_1.asOptBoolean(opt.useGraphQL), id_workflow_instance: ts_runtime_typecheck_1.asOptIndex(opt.id_workflow_instance), debounceWindow: ts_runtime_typecheck_1.asOptNumber(opt.debounceWindow), proxy: ts_runtime_typecheck_1.asOptString(opt.proxy),

try {
state.success.niceReads = niceSize_1.default(this.states[direction].success.reads);
state.success.niceReads = niceSize_1.niceSize(this.states[direction].success.reads);
}

@@ -253,3 +253,3 @@ catch (ignore) {

// complete plus in-transit
state.progress.niceSize = niceSize_1.default((_b = state.success.bytes + state.progress.bytes) !== null && _b !== void 0 ? _b : 0);
state.progress.niceSize = niceSize_1.niceSize((_b = state.success.bytes + state.progress.bytes) !== null && _b !== void 0 ? _b : 0);
}

@@ -261,3 +261,3 @@ catch (ignore) {

// complete
state.success.niceSize = niceSize_1.default(this.states[direction].success.bytes);
state.success.niceSize = niceSize_1.niceSize(this.states[direction].success.bytes);
}

@@ -296,3 +296,3 @@ catch (ignore) {

try {
state.success.niceReads = niceSize_1.default(this.states[direction].success.reads);
state.success.niceReads = niceSize_1.niceSize(this.states[direction].success.reads);
}

@@ -304,3 +304,3 @@ catch (ignore) {

// complete plus in-transit
state.progress.niceSize = niceSize_1.default((_b = state.success.bytes + state.progress.bytes) !== null && _b !== void 0 ? _b : 0);
state.progress.niceSize = niceSize_1.niceSize((_b = state.success.bytes + state.progress.bytes) !== null && _b !== void 0 ? _b : 0);
}

@@ -312,3 +312,3 @@ catch (ignore) {

// complete
state.success.niceSize = niceSize_1.default(this.states[direction].success.bytes);
state.success.niceSize = niceSize_1.niceSize(this.states[direction].success.bytes);
}

@@ -315,0 +315,0 @@ catch (ignore) {

@@ -6,8 +6,9 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
const fs_extra_1 = __importDefault(require("fs-extra"));
async function default_1(filePath) {
return fs_extra_1.default.stat(filePath).then((d) => {
exports.genericFileStatistics = void 0;
const fs_1 = __importDefault(require("fs"));
async function genericFileStatistics(filePath) {
return fs_1.default.promises.stat(filePath).then((d) => {
return { type: 'bytes', bytes: d.size };
});
}
exports.default = default_1;
exports.genericFileStatistics = genericFileStatistics;

@@ -6,4 +6,5 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
const fs_extra_1 = __importDefault(require("fs-extra"));
function default_1(filePath) {
exports.fastaFileStatistics = void 0;
const fs_1 = __importDefault(require("fs"));
function fastaFileStatistics(filePath) {
return new Promise((resolve, reject) => {

@@ -17,3 +18,4 @@ const linesPerRead = 2;

try {
stat = fs_extra_1.default.statSync(filePath);
// TODO make this async
stat = fs_1.default.statSync(filePath);
}

@@ -23,3 +25,3 @@ catch (e) {

}
fs_extra_1.default.createReadStream(filePath)
fs_1.default.createReadStream(filePath)
.on('data', (buffer) => {

@@ -41,2 +43,2 @@ idx = -1;

}
exports.default = default_1;
exports.fastaFileStatistics = fastaFileStatistics;

@@ -6,4 +6,5 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
const fs_extra_1 = __importDefault(require("fs-extra"));
function default_1(filePath) {
exports.fastqFileStatistics = void 0;
const fs_1 = __importDefault(require("fs"));
function fastqFileStatistics(filePath) {
return new Promise((resolve, reject) => {

@@ -17,3 +18,4 @@ const linesPerRead = 4;

try {
stat = fs_extra_1.default.statSync(filePath);
// TODO make this async
stat = fs_1.default.statSync(filePath);
}

@@ -24,3 +26,3 @@ catch (e) {

}
fs_extra_1.default.createReadStream(filePath)
fs_1.default.createReadStream(filePath)
.on('data', (buffer) => {

@@ -42,2 +44,2 @@ idx = -1;

}
exports.default = default_1;
exports.fastqFileStatistics = fastqFileStatistics;

@@ -6,5 +6,6 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
const fs_extra_1 = __importDefault(require("fs-extra"));
exports.fastqgzFileStatistics = void 0;
const fs_1 = __importDefault(require("fs"));
const zlib_1 = require("zlib");
function default_1(filePath) {
function fastqgzFileStatistics(filePath) {
return new Promise((resolve, reject) => {

@@ -18,3 +19,4 @@ const linesPerRead = 4;

try {
stat = fs_extra_1.default.statSync(filePath);
// TODO make this async
stat = fs_1.default.statSync(filePath);
}

@@ -26,3 +28,3 @@ catch (e) {

const gunzip = zlib_1.createGunzip();
fs_extra_1.default.createReadStream(filePath)
fs_1.default.createReadStream(filePath)
.pipe(gunzip)

@@ -45,2 +47,2 @@ .on('data', (buffer) => {

}
exports.default = default_1;
exports.fastqgzFileStatistics = fastqgzFileStatistics;

@@ -6,3 +6,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
exports.addWarning = exports.skipFile = exports.processFile = exports.readSettings = exports.createFileScanner = exports.instantiateFileUpload = void 0;
exports.uploadFile = exports.constructUploadParameters = exports.openReadStream = exports.addFailure = exports.uploadJob = exports.addFileWarning = exports.addWarning = exports.skipFile = exports.processFile = exports.readSettings = exports.createFileScanner = exports.instantiateFileUpload = void 0;
const inputScanner_1 = require("./inputScanner");

@@ -14,7 +14,8 @@ const queue_1 = require("./queue");

const fastq_1 = require("./splitters/fastq");
const filestats_1 = __importDefault(require("./filestats"));
const niceSize_1 = __importDefault(require("./niceSize"));
const filestats_1 = require("./filestats");
const niceSize_1 = require("./niceSize");
const ts_runtime_typecheck_1 = require("ts-runtime-typecheck");
const operators_1 = require("rxjs/operators");
const path_1 = __importDefault(require("path"));
const fs_1 = __importDefault(require("fs"));
function instantiateFileUpload(instance) {

@@ -24,3 +25,3 @@ ts_runtime_typecheck_1.assertDefined(instance.config.workflow, 'Workflow');

const workflow = instance.config.workflow;
const settings = readSettings(workflow);
const settings = readSettings(instance.config);
const hasStorageAccount = 'storage_account' in workflow;

@@ -32,11 +33,5 @@ if (settings.requiresStorage && !hasStorageAccount) {

const { inputFolders, outputFolder, filetype: filetypes } = instance.config.options;
const scanner = createFileScanner({
database,
inputFolders,
filetypes,
outputFolder,
});
const warnings = instance.states.warnings;
const state = instance.states.upload;
const ctx = {
const { warnings, upload: state } = instance.states;
const stopped$ = instance.uploadStopped$;
const context = {
settings,

@@ -48,29 +43,35 @@ warnings,

logger,
stopped$,
};
const { add: queueFile, empty$ } = queue_1.createQueue({}, async (file) => processFile(ctx, file));
const scanner = createFileScanner({
database,
inputFolders,
filetypes,
outputFolder,
context,
});
let running = true;
stopped$.subscribe(() => {
running = false;
});
const { add: queueFile, empty$ } = queue_1.createQueue({}, async (file) => {
if (running) {
await processFile(context, file);
}
});
const uploadInterval = instance.config.options.fileCheckInterval * 1000;
const queueEmpty = () => empty$.pipe(operators_1.first()).toPromise();
const stopped$ = instance.uploadStopped$;
return async () => {
let running = true;
stopped$.subscribe(() => {
running = false;
});
while (running) {
const startTime = Date.now();
try {
const files = await scanner();
// NOTE queueEmpty will never resolve if nothing is queued
if (files.length > 0) {
for (const file of files) {
queueFile(file);
}
// NOTE errors that occur in the queue are swallowed
await queueEmpty();
// NOTE scanner errors are handled inside createFileScanner
const files = await scanner();
// NOTE queueEmpty will never resolve if nothing is queued
if (files.length > 0) {
for (const file of files) {
queueFile(file);
}
// NOTE errors that occur in the queue are swallowed
await queueEmpty();
}
catch (err) {
// NOTE err is only ever from the scanner
// TODO add warning about FS error
}
const deltaTime = Date.now() - startTime;

@@ -83,3 +84,3 @@ const delay = Math.max(0, uploadInterval - deltaTime);

exports.instantiateFileUpload = instantiateFileUpload;
function createFileScanner({ database, inputFolders, outputFolder, filetypes, }) {
function createFileScanner({ database, inputFolders, outputFolder, filetypes, context, }) {
const filter = async (location) => {

@@ -89,2 +90,5 @@ const exists = await database.seenUpload(location);

};
const errorHandler = (err) => {
addWarning(context, fileUploader_type_1.UploadWarnings.SCAN_FAIL, err + '');
};
const options = {

@@ -95,2 +99,3 @@ inputFolders,

filter,
errorHandler,
};

@@ -100,4 +105,7 @@ return () => inputScanner_1.loadInputFiles(options);

exports.createFileScanner = createFileScanner;
function readSettings(workflow) {
function readSettings({ options, instance: workflowInstance, workflow, }) {
var _a, _b;
ts_runtime_typecheck_1.assertDefined(workflow);
ts_runtime_typecheck_1.assertDefined(workflowInstance.bucket, 'workflowInstance.bucket');
ts_runtime_typecheck_1.assertDefined(workflowInstance.bucketFolder, 'workflowInstance.bucketFolder');
const settings = {

@@ -107,2 +115,6 @@ maxFiles: Infinity,

requiresStorage: false,
bucket: workflowInstance.bucket,
bucketFolder: workflowInstance.bucketFolder,
sseKeyId: workflowInstance.key_id,
retries: options.uploadRetries,
};

@@ -157,7 +169,7 @@ const workflowAttributes = ts_runtime_typecheck_1.asOptDictionary(workflow.workflowAttributes || workflow.workflow_attributes);

if (settings.maxFiles <= state.filesCount) {
await skipFile(file, ctx, fileUploader_type_1.UploadWarnings.TOO_MANY);
await skipFile(file, ctx, fileUploader_type_1.FileUploadWarnings.TOO_MANY);
return;
}
if (file.size === 0) {
await skipFile(file, ctx, fileUploader_type_1.UploadWarnings.EMPTY);
await skipFile(file, ctx, fileUploader_type_1.FileUploadWarnings.EMPTY);
return;

@@ -170,3 +182,3 @@ }

if (canSplit && shouldSplit && ts_runtime_typecheck_1.isDefined(split)) {
addWarning(file, ctx, fileUploader_type_1.UploadWarnings.SPLIT);
addFileWarning(file, ctx, fileUploader_type_1.FileUploadWarnings.SPLIT);
const isCompressed = /\.gz$/i.test(file.path);

@@ -181,4 +193,2 @@ const directory = path_1.default.dirname(file.relative);

const id = `${file.id}_${chunkId}`;
const stats = await filestats_1.default(chunkFile);
const { bytes: size } = stats;
const chunkFilestat = {

@@ -189,7 +199,5 @@ name,

id,
size,
size: 0,
};
// TODO this should retry if it fails... but only for some reasons
// await uploadFile(chunkFilestat, stats, ctx);
await ctx.instance.uploadJob(Object.assign(Object.assign({}, chunkFilestat), { stats }));
await uploadJob(ctx, chunkFilestat);
await database.splitDone(chunkFile);

@@ -204,9 +212,6 @@ }, isCompressed);

if (file.size > settings.maxFileSize) {
await skipFile(file, ctx, fileUploader_type_1.UploadWarnings.TOO_BIG);
await skipFile(file, ctx, fileUploader_type_1.FileUploadWarnings.TOO_BIG);
return;
}
const stats = await filestats_1.default(file.path);
// TODO this should retry if it fails... but only for some reasons
// await uploadFile(file, stats, ctx);
await ctx.instance.uploadJob(Object.assign(Object.assign({}, file), { stats }));
await uploadJob(ctx, file);
state.filesCount += 1;

@@ -216,7 +221,19 @@ }

async function skipFile(file, ctx, warn) {
addWarning(file, ctx, warn);
addFileWarning(file, ctx, warn);
await ctx.database.skipFile(file.path);
}
exports.skipFile = skipFile;
function addWarning(file, ctx, warn) {
function addWarning(ctx, warn, msg) {
const { logger, warnings } = ctx;
let type;
switch (warn) {
case fileUploader_type_1.UploadWarnings.SCAN_FAIL:
type = 'WARNING_SCAN_FAIL';
break;
}
logger.error(msg);
warnings.push({ msg, type });
}
exports.addWarning = addWarning;
function addFileWarning(file, ctx, warn) {
var _a, _b;

@@ -227,22 +244,264 @@ const { settings, logger, warnings } = ctx;

switch (warn) {
case fileUploader_type_1.UploadWarnings.EMPTY:
case fileUploader_type_1.FileUploadWarnings.EMPTY:
type = 'WARNING_FILE_EMPTY';
msg = `The file ${file.relative} is empty. It will be skipped.`;
break;
case fileUploader_type_1.UploadWarnings.SPLIT:
case fileUploader_type_1.FileUploadWarnings.SPLIT:
type = 'WARNING_FILE_SPLIT';
msg = `${file.relative}${file.size > splitSize ? ' is too big and' : ''} is going to be split`;
break;
case fileUploader_type_1.UploadWarnings.TOO_BIG:
case fileUploader_type_1.FileUploadWarnings.TOO_BIG:
type = 'WARNING_FILE_TOO_BIG';
msg = `The file ${file.relative} is bigger than the maximum size limit (${niceSize_1.default(settings.maxFileSize)}B). It will be skipped.`;
msg = `The file ${file.relative} is bigger than the maximum size limit (${niceSize_1.niceSize(settings.maxFileSize)}B). It will be skipped.`;
break;
case fileUploader_type_1.UploadWarnings.TOO_MANY:
case fileUploader_type_1.FileUploadWarnings.TOO_MANY:
type = 'WARNING_FILE_TOO_MANY';
msg = `Maximum ${settings.maxFiles} file(s) already uploaded. Marking ${file.relative} as skipped.`;
break;
case fileUploader_type_1.FileUploadWarnings.UPLOAD_FAILED:
type = 'WARNING_FILE_UPLOAD_FAILED';
msg = `Uploading ${file.relative} failed.`;
break;
case fileUploader_type_1.FileUploadWarnings.UPLOAD_RETRIES_EXCEEDED:
type = 'WARNING_FILE_UPLOAD_RETRIES_EXCEEDED';
msg = `Exceeded maximum retries uploading ${file.relative}. This file will not be uploaded.`;
break;
case fileUploader_type_1.FileUploadWarnings.MESSAGE_RETRIES_EXCEEDED:
type = 'WARNING_FILE_UPLOAD_MESSAGE_RETRIES_EXCEEDED';
msg = `Exceeded maximum retries adding ${file.relative} to the queue. This file will not be processed.`;
break;
}
logger.error(msg);
logger.warn(msg);
warnings.push({ msg, type });
}
exports.addWarning = addWarning;
exports.addFileWarning = addFileWarning;
async function uploadJob(ctx, file) {
await uploadFile(file, await filestats_1.filestats(file.path), ctx);
}
exports.uploadJob = uploadJob;
function addFailure(state, msg) {
var _a;
if (!state.failure) {
state.failure = {};
}
state.failure[msg] = ((_a = state.failure[msg]) !== null && _a !== void 0 ? _a : 0) + 1;
}
exports.addFailure = addFailure;
function openReadStream(location, handler) {
const rs = fs_1.default.createReadStream(location);
return new Promise((resolve, reject) => {
rs.addListener('open', async () => {
try {
await handler(rs);
resolve();
}
catch (err) {
rs.close(); // ensure the stream is closed if we have an error in the handler
reject(err);
}
});
rs.addListener('error', (err) => reject(`Upload filesystem error ${err}`));
});
}
exports.openReadStream = openReadStream;
function constructUploadParameters(ctx, file, rs) {
const { settings: { bucket, sseKeyId, bucketFolder }, } = ctx;
const mangledRelative = file.relative
.replace(/^[\\/]+/, '')
.replace(/\\/g, '/')
.replace(/\//g, '_'); // MC-7204, MC-7206 - this needs to be unpicked in future
const key = [
bucketFolder,
'component-0',
mangledRelative,
mangledRelative,
]
.join('/')
.replace(/\/+/g, '/');
const params = {
Bucket: bucket,
Key: key,
Body: rs,
};
if (sseKeyId) {
// MC-4996 support (optional, for now) encryption
params.SSEKMSKeyId = sseKeyId;
params.ServerSideEncryption = 'aws:kms';
}
if (file.size) {
params.ContentLength = file.size;
}
return params;
}
exports.constructUploadParameters = constructUploadParameters;
async function uploadFile(file, stats, ctx) {
const { state, instance, logger, stopped$ } = ctx;
try {
const timeout = (instance.config.options.uploadTimeout + 5) * 1000;
const s3 = await instance.sessionedS3({
retryDelayOptions: {
customBackoff(count, err) {
addFileWarning(file, ctx, fileUploader_type_1.FileUploadWarnings.UPLOAD_FAILED);
ctx.logger.error('Upload error', err);
if (count > ctx.settings.retries) {
addFileWarning(file, ctx, fileUploader_type_1.FileUploadWarnings.UPLOAD_RETRIES_EXCEEDED);
return -1;
}
return 2 ** count * 1000; // 2s, 4s, 8s, 16s, 32s
},
},
maxRetries: ctx.settings.retries,
httpOptions: {
timeout,
},
});
await openReadStream(file.path, async (rs) => {
var _a;
const params = constructUploadParameters(ctx, file, rs);
const options = {
partSize: 10 * 1024 * 1024,
queueSize: 1,
};
instance.uploadState('progress', 'incr', {
total: file.size,
});
const managedUpload = s3.upload(params, options);
const subscription = stopped$.subscribe(() => {
managedUpload.abort();
});
const sessionManager = instance.initSessionManager([s3]);
sessionManager.sts_expiration = (_a = instance.sessionManager) === null || _a === void 0 ? void 0 : _a.sts_expiration; // No special options here, so use the main session and don't refetch until it's expired
let currentProgress = 0;
managedUpload.on('httpUploadProgress', async (progress) => {
const progressDelta = progress.loaded - currentProgress;
instance.uploadState('progress', 'incr', {
bytes: progressDelta,
}); // delta since last time
currentProgress = progress.loaded; // store for calculating delta next iteration
try {
await sessionManager.session(); // MC-7129 force refresh token on the MANAGED UPLOAD instance of the s3 service
}
catch (e) {
logger.warn(`Error refreshing token: ${String(e)}`);
}
});
try {
await managedUpload.promise();
await uploadComplete(ctx, params.Key, file); // send message
}
finally {
instance.uploadState('progress', 'decr', {
total: file.size,
bytes: currentProgress,
}); // zero in-flight upload counters
subscription.unsubscribe();
}
});
const { bytes = 0, reads = 0, sequences = 0 } = stats !== null && stats !== void 0 ? stats : {};
instance.uploadState('success', 'incr', { files: 1, bytes, reads, sequences });
const ext = path_1.default.extname(file.name);
instance.uploadState('types', 'incr', {
[ext]: 1,
});
}
catch (err) {
addFailure(state, err + '');
throw err;
}
}
exports.uploadFile = uploadFile;
function createMessage(instance, objectId) {
const workflowInstance = instance.config.instance;
const message = {
bucket: workflowInstance.bucket,
outputQueue: workflowInstance.outputQueueName,
remote_addr: workflowInstance.remote_addr,
apikey: instance.config.options.apikey,
id_workflow_instance: workflowInstance.id_workflow_instance,
id_master: workflowInstance.id_workflow,
utc: new Date().toISOString(),
path: objectId,
prefix: objectId.substring(0, objectId.lastIndexOf('/')),
key_id: workflowInstance.key_id,
};
if (workflowInstance.chain) {
let components;
if (!ts_runtime_typecheck_1.isDictionaryOf(ts_runtime_typecheck_1.isDictionary)(workflowInstance.chain.components)) {
throw new Error('Unexpected chain definition');
}
try {
components = JSON.parse(JSON.stringify(workflowInstance.chain.components)); // low-frills object clone
}
catch (_a) {
throw new Error(`Failed to clone workflow chain`);
}
for (const component of Object.values(components)) {
switch (component === null || component === void 0 ? void 0 : component.inputQueueName) {
case 'uploadMessageQueue':
component.inputQueueName = instance.uploadMessageQueue;
break;
case 'downloadMessageQueue':
component.inputQueueName = instance.downloadMessageQueue;
break;
default:
// NOTE should this be a NOOP or an error
break;
}
}
message.components = components;
message.targetComponentId = workflowInstance.chain.targetComponentId;
}
return message;
}
async function messageInputQueue(ctx, objectId, file) {
const { instance, logger } = ctx;
const message = createMessage(instance, objectId);
try {
const inputQueueURL = await instance.discoverQueue(ts_runtime_typecheck_1.asOptString(instance.config.instance.inputQueueName));
const sqs = await instance.sessionedSQS({
retryDelayOptions: {
customBackoff(count, err) {
ctx.logger.error('Upload message error', err);
if (count > ctx.settings.retries) {
addFileWarning(file, ctx, fileUploader_type_1.FileUploadWarnings.MESSAGE_RETRIES_EXCEEDED);
return -1;
}
return 2 ** count * 1000; // 2s, 4s, 8s, 16s, 32s
},
},
maxRetries: ctx.settings.retries,
});
logger.info(`${file.id} sending SQS message to input queue`);
const { MessageId } = await sqs
.sendMessage({
QueueUrl: inputQueueURL,
MessageBody: JSON.stringify(message),
})
.promise();
return MessageId;
}
catch (sendMessageException) {
logger.error(`${file.id} exception sending SQS message: ${String(sendMessageException)}`);
throw sendMessageException;
}
}
async function uploadComplete(ctx, objectId, file) {
const { instance, database, logger } = ctx;
logger.info(`${file.id} uploaded to S3: ${objectId}`);
const messageId = await messageInputQueue(ctx, objectId, file);
const workflowInstance = instance.config.instance;
instance
.realtimeFeedback(`workflow_instance:state`, {
type: 'start',
id_workflow_instance: workflowInstance.id_workflow_instance,
id_workflow: workflowInstance.id_workflow,
component_id: '0',
message_id: messageId,
id_user: workflowInstance.id_user,
})
.catch((e) => {
logger.warn(`realtimeFeedback failed: ${String(e)}`);
});
logger.info(`${file.id} SQS message sent. Mark as uploaded`);
await database.uploadFile(file.path);
}
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.UploadWarnings = void 0;
exports.UploadWarnings = exports.FileUploadWarnings = void 0;
var FileUploadWarnings;
(function (FileUploadWarnings) {
FileUploadWarnings[FileUploadWarnings["TOO_MANY"] = 0] = "TOO_MANY";
FileUploadWarnings[FileUploadWarnings["EMPTY"] = 1] = "EMPTY";
FileUploadWarnings[FileUploadWarnings["TOO_BIG"] = 2] = "TOO_BIG";
FileUploadWarnings[FileUploadWarnings["SPLIT"] = 3] = "SPLIT";
FileUploadWarnings[FileUploadWarnings["UPLOAD_FAILED"] = 4] = "UPLOAD_FAILED";
FileUploadWarnings[FileUploadWarnings["UPLOAD_RETRIES_EXCEEDED"] = 5] = "UPLOAD_RETRIES_EXCEEDED";
FileUploadWarnings[FileUploadWarnings["MESSAGE_RETRIES_EXCEEDED"] = 6] = "MESSAGE_RETRIES_EXCEEDED";
})(FileUploadWarnings = exports.FileUploadWarnings || (exports.FileUploadWarnings = {}));
var UploadWarnings;
(function (UploadWarnings) {
UploadWarnings[UploadWarnings["TOO_MANY"] = 0] = "TOO_MANY";
UploadWarnings[UploadWarnings["EMPTY"] = 1] = "EMPTY";
UploadWarnings[UploadWarnings["TOO_BIG"] = 2] = "TOO_BIG";
UploadWarnings[UploadWarnings["SPLIT"] = 3] = "SPLIT";
UploadWarnings[UploadWarnings["SCAN_FAIL"] = 0] = "SCAN_FAIL";
})(UploadWarnings = exports.UploadWarnings || (exports.UploadWarnings = {}));

@@ -35,3 +35,3 @@ "use strict";

*/
async function loadInputFiles({ inputFolders, outputFolder, filetypes, filter, }) {
async function loadInputFiles({ inputFolders, outputFolder, filetypes, filter, errorHandler, }) {
const extensionFilter = createExtensionFilter(filetypes !== null && filetypes !== void 0 ? filetypes : '');

@@ -59,2 +59,3 @@ const outputBasename = outputFolder ? path_1.basename(outputFolder) : null;

},
catch: errorHandler,
});

@@ -61,0 +62,0 @@ const results = [];

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const niceSize = (sizeIn, unitIndexIn) => {
exports.niceSize = void 0;
function niceSize(sizeIn, unitIndexIn) {
const UNITS = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z'];

@@ -17,3 +18,3 @@ const DIV = 1000;

return `${size.toFixed(1)}${UNITS[unitIndex]}`;
};
exports.default = niceSize;
}
exports.niceSize = niceSize;

@@ -32,3 +32,3 @@ "use strict";

const result = await this.graphQL.instanceToken(instanceTokenOptions);
token = ts_runtime_typecheck_1.asDictionary((_a = result.data) === null || _a === void 0 ? void 0 : _a.token);
token = ts_runtime_typecheck_1.asDefined((_a = result.data) === null || _a === void 0 ? void 0 : _a.token);
}

@@ -43,4 +43,17 @@ else {

this.sts_expiration = new Date(token.expiration).getTime() - 60 * ts_runtime_typecheck_1.makeNumber((_b = this.options.sessionGrace) !== null && _b !== void 0 ? _b : '0'); // refresh token x mins before it expires
const configUpdate = token;
let credentials = null;
if (ts_runtime_typecheck_1.isDefined(token.accessKeyId) && ts_runtime_typecheck_1.isDefined(token.secretAccessKey)) {
credentials = {
accessKeyId: token.accessKeyId,
secretAccessKey: token.secretAccessKey,
sessionToken: ts_runtime_typecheck_1.asOptString(token.sessionToken),
};
}
const configUpdate = {
credentials,
region: ts_runtime_typecheck_1.asOptString(token.region),
};
if (this.options.proxy) {
// NOTE AWS SDK explicitly does a deep merge on httpOptions
// so this won't squash any options that have already been set
configUpdate.httpOptions = {

@@ -47,0 +60,0 @@ agent: proxy_agent_1.default(this.options.proxy),

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.sleep = exports.createTimeout$ = exports.createTimeout = exports.createInterval = void 0;
const rxjs_1 = require("rxjs");
exports.sleep = exports.createTimeout = exports.createInterval = void 0;
/*
NOTE this exists mostly because TS is having a really hard time to decide
NOTE this partially exists because TS is having a really hard time to decide
which version the timer functions we are using (the return type of Node.js

@@ -37,18 +36,2 @@ timers is an object not a number) and in some cases appears to be using

exports.createTimeout = createTimeout;
function createTimeout$(duration) {
const timeout$ = new rxjs_1.Subject();
const cb = () => timeout$.complete();
let id = setTimeout(cb, duration);
return {
cancel() {
clearTimeout(id);
},
timeout$,
reset(newDuration = duration) {
clearTimeout(id);
id = setTimeout(cb, newDuration);
},
};
}
exports.createTimeout$ = createTimeout$;
function sleep(duration) {

@@ -55,0 +38,0 @@ return new Promise((resolve) => {

@@ -8,2 +8,3 @@ {

"uploadTimeout": 1200,
"uploadRetries": 10,
"downloadTimeout": 1200,

@@ -10,0 +11,0 @@ "fileCheckInterval": 5,

@@ -8,3 +8,3 @@ /* eslint no-console: ["error", { allow: ["log", "info", "debug", "warn", "error"] }] */

*/
import { asOptString, asString, asNumber, asDictionary, asOptIndex, asOptDictionary, asIndex, asDefined, asOptFunction, asOptNumber, asDictionaryOf, makeString, isString, isUndefined, asOptDictionaryOf, isDictionaryOf, isDictionary, isArrayOf, } from 'ts-runtime-typecheck';
import { asOptString, asString, asNumber, asDictionary, asOptIndex, asOptDictionary, asIndex, asDefined, asOptFunction, asOptNumber, makeString, isString, isUndefined, asOptDictionaryOf, isDictionaryOf, isArrayOf, } from 'ts-runtime-typecheck';
import AWS from 'aws-sdk';

@@ -17,3 +17,2 @@ import fs from 'fs-extra'; /* MC-565 handle EMFILE & EXDIR gracefully; use Promises */

import { Factory } from './factory';
import filestats from './filestats';
import { REST_FS } from './rest-fs';

@@ -31,2 +30,3 @@ import { SampleReader } from './sample-reader';

import { createQueue } from './queue';
import { filestats } from './filestats';
import { instantiateFileUpload } from './fileUploader';

@@ -52,5 +52,4 @@ const networkStreamErrors = new WeakSet();

this.SampleReader = new SampleReader();
this.uploadsInProgress = [];
}
async sessionedS3() {
async sessionedS3(options = {}) {
if (!this.sessionManager) {

@@ -60,7 +59,5 @@ this.sessionManager = this.initSessionManager();

await this.sessionManager.session();
return new AWS.S3({
useAccelerateEndpoint: this.config.options.awsAcceleration === 'on',
});
return new AWS.S3(Object.assign({ useAccelerateEndpoint: this.config.options.awsAcceleration === 'on' }, options));
}
async sessionedSQS() {
async sessionedSQS(options = {}) {
if (!this.sessionManager) {

@@ -70,3 +67,3 @@ this.sessionManager = this.initSessionManager();

await this.sessionManager.session();
return new AWS.SQS();
return new AWS.SQS(options);
}

@@ -415,6 +412,2 @@ async deleteMessage(message) {

async stopUpload() {
for (const inProgressUpload of this.uploadsInProgress) {
inProgressUpload.abort();
}
this.uploadsInProgress = [];
super.stopUpload();

@@ -499,45 +492,2 @@ this.log.debug('clearing split files');

}
async uploadJob(file) {
// Initiate file upload to S3
var _a;
let file2 = null;
let errorMsg;
try {
this.log.info(`upload: ${file.id} starting`);
file2 = await this.uploadHandler(file);
this.log.info(`upload: ${file2.id} uploaded and notified`);
}
catch (err) {
errorMsg = err;
this.log.error(`upload: ${file.id} done, but failed: ${String(errorMsg)}`);
}
if (errorMsg) {
this.log.error(`uploadJob ${errorMsg}`);
if (!this.states.upload.failure) {
this.states.upload.failure = {};
}
this.states.upload.failure[errorMsg] = this.states.upload.failure[errorMsg]
? asNumber(this.states.upload.failure[errorMsg]) + 1
: 1;
if (String(errorMsg).match(/AWS.SimpleQueueService.NonExistentQueue/)) {
// FATALITY! thrown during sqs.sendMessage
this.log.error(`instance stopped because of a fatal error`);
return this.stopEverything();
}
}
else {
const { bytes = 0, reads = 0, sequences = 0 } = (_a = file2 === null || file2 === void 0 ? void 0 : file2.stats) !== null && _a !== void 0 ? _a : {};
// this.uploadState('queueLength', 'decr', file2.stats); // this.states.upload.queueLength = this.states.upload.queueLength ? this.states.upload.queueLength - readCount : 0;
this.uploadState('success', 'incr', { files: 1, bytes, reads, sequences });
// this.states.upload.success = this.states.upload.success ? this.states.upload.success + readCount : readCount;
if (file2 === null || file2 === void 0 ? void 0 : file2.name) {
// nb. we only count types for successful uploads
const ext = path.extname(file2.name);
this.uploadState('types', 'incr', {
[ext]: 1,
});
}
}
// file-by-file?
}
async receiveMessages(receiveMessages) {

@@ -884,3 +834,2 @@ if (!receiveMessages || !receiveMessages.Messages || !receiveMessages.Messages.length) {

});
/* download stream timeout in ms */
this.timers.transferTimeouts[outputFile] = transferTimeout;

@@ -934,207 +883,2 @@ const updateVisibilityFunc = async () => {

}
async uploadHandler(file) {
/** open readStream and pipe to S3.upload */
const s3 = await this.sessionedS3();
let rs;
let closed = false;
const mangledRelative = file.relative
.replace(/^[\\/]+/, '')
.replace(/\\/g, '/')
.replace(/\//g, '_'); // MC-7204, MC-7206 - this needs to be unpicked in future
const objectId = [
this.config.instance.bucketFolder,
'component-0',
mangledRelative,
mangledRelative,
]
.join('/')
.replace(/\/+/g, '/');
let timeoutHandle;
const p = new Promise((resolve, reject) => {
const timeoutFunc = () => {
if (rs && !closed) {
rs.close();
}
reject(new Error(`${file.name} timed out`));
};
// timeout to ensure this completeCb *always* gets called
timeoutHandle = createTimeout((this.config.options.uploadTimeout + 5) * 1000, timeoutFunc);
try {
rs = fs.createReadStream(file.path);
rs.on('close', () => {
closed = true;
});
}
catch (createReadStreamException) {
timeoutHandle.cancel();
reject(createReadStreamException);
return;
}
rs.on('error', (readStreamError) => {
rs.close();
let errstr = 'error in upload readstream';
if (readStreamError === null || readStreamError === void 0 ? void 0 : readStreamError.message) {
errstr += `: ${readStreamError.message}`;
}
timeoutHandle.cancel();
reject(new Error(errstr));
});
rs.on('open', async () => {
var _a;
const params = {
Bucket: asString(this.config.instance.bucket),
Key: objectId,
Body: rs,
};
const service = new AWS.S3();
const options = {
partSize: 10 * 1024 * 1024,
queueSize: 1,
service,
};
if (this.config.instance.key_id) {
// MC-4996 support (optional, for now) encryption
params.SSEKMSKeyId = this.config.instance.key_id;
params.ServerSideEncryption = 'aws:kms';
}
if (file.size) {
params['Content-Length'] = file.size;
}
this.uploadState('progress', 'incr', {
total: file.size,
});
let myProgress = 0;
const managedUpload = s3.upload(params, options);
this.uploadsInProgress.push(managedUpload);
const sessionManager = this.initSessionManager([service]);
sessionManager.sts_expiration = (_a = this.sessionManager) === null || _a === void 0 ? void 0 : _a.sts_expiration; // No special options here, so use the main session and don't refetch until it's expired
managedUpload.on('httpUploadProgress', async (progress) => {
// Breaking out here causes this.states.progress.bytes to get out of sync.
// if (this.stopped) {
// reject(new Error('stopped'));
// return;
// }
// this.log.debug(`upload progress ${progress.key} ${progress.loaded} / ${progress.total}`);
this.uploadState('progress', 'incr', {
bytes: progress.loaded - myProgress,
}); // delta since last time
myProgress = progress.loaded; // store for calculating delta next iteration
timeoutHandle.reset(); // MC-6789 - reset upload timeout
try {
await sessionManager.session(); // MC-7129 force refresh token on the MANAGED UPLOAD instance of the s3 service
}
catch (e) {
this.log.warn(`Error refreshing token: ${String(e)}`);
}
});
try {
await managedUpload.promise();
this.log.info(`${file.id} S3 upload complete`);
rs.close();
timeoutHandle.cancel();
await this.uploadComplete(objectId, file); // send message
resolve(file);
}
catch (uploadStreamErr) {
this.log.warn(`${file.id} uploadStreamError ${uploadStreamErr}`);
reject(uploadStreamErr);
}
finally {
this.uploadState('progress', 'decr', {
total: file.size,
bytes: file.size,
}); // zero in-flight upload counters
this.uploadsInProgress = this.uploadsInProgress.filter((upload) => upload !== managedUpload);
}
});
// rs.on('end', rs.close);
// rs.on('close', () => this.log.debug('closing readstream'));
});
return p;
}
async uploadComplete(objectId, file) {
this.log.info(`${file.id} uploaded to S3: ${objectId}`);
const message = {
bucket: this.config.instance.bucket,
outputQueue: this.config.instance.outputQueueName,
remote_addr: this.config.instance.remote_addr,
apikey: this.config.options.apikey,
id_workflow_instance: this.config.instance.id_workflow_instance,
id_master: this.config.instance.id_workflow,
utc: new Date().toISOString(),
path: objectId,
prefix: objectId.substring(0, objectId.lastIndexOf('/')),
};
if (this.config.instance.chain) {
try {
message.components = JSON.parse(JSON.stringify(this.config.instance.chain.components)); // low-frills object clone
message.targetComponentId = this.config.instance.chain.targetComponentId; // first component to run
}
catch (jsonException) {
this.log.error(`${file.id} exception parsing components JSON ${String(jsonException)}`);
return Promise.reject(jsonException); // close the queue job
}
}
// MC-5943 support (optional, for now) #SSE #crypto!
if (this.config.instance.key_id) {
message.key_id = this.config.instance.key_id;
}
// MC-1304 - attach geo location and ip
if (this.config.options.agent_address) {
try {
message.agent_address = JSON.parse(this.config.options.agent_address);
}
catch (exception) {
this.log.error(`${file.id} Could not parse agent_address ${String(exception)}`);
}
}
if (message.components) {
const components = asDictionaryOf(isDictionary)(message.components);
// optionally populate input + output queues
for (const component of Object.values(components)) {
switch (component === null || component === void 0 ? void 0 : component.inputQueueName) {
case 'uploadMessageQueue':
component.inputQueueName = this.uploadMessageQueue;
break;
case 'downloadMessageQueue':
component.inputQueueName = this.downloadMessageQueue;
break;
default:
// NOTE should this be a NOOP or an error
break;
}
}
}
let sentMessage;
try {
const inputQueueURL = await this.discoverQueue(asOptString(this.config.instance.inputQueueName));
const sqs = await this.sessionedSQS();
this.log.info(`${file.id} sending SQS message to input queue`);
sentMessage = await sqs
.sendMessage({
QueueUrl: inputQueueURL,
MessageBody: JSON.stringify(message),
})
.promise();
}
catch (sendMessageException) {
this.log.error(`${file.id} exception sending SQS message: ${String(sendMessageException)}`);
throw sendMessageException;
}
this.realtimeFeedback(`workflow_instance:state`, {
type: 'start',
id_workflow_instance: this.config.instance.id_workflow_instance,
id_workflow: this.config.instance.id_workflow,
component_id: '0',
message_id: sentMessage.MessageId,
id_user: this.config.instance.id_user,
}).catch((e) => {
this.log.warn(`realtimeFeedback failed: ${String(e)}`);
});
this.log.info(`${file.id} SQS message sent. Mark as uploaded`);
if (!this.db) {
throw new Error('Database has not been instantiated');
}
await this.db.uploadFile(file.path);
}
observeTelemetry() {

@@ -1141,0 +885,0 @@ var _a, _b;

@@ -11,3 +11,3 @@ /* eslint no-console: ["error", { allow: ["log", "info", "debug", "warn", "error"] }] */

import { GraphQL } from './graphql';
import niceSize from './niceSize';
import { niceSize } from './niceSize';
import { REST } from './rest';

@@ -78,3 +78,3 @@ import Socket from './socket';

static parseOptObject(opt) {
const options = Object.assign(Object.assign({}, parseCoreOpts(opt)), { region: asString(opt.region, DEFAULTS.region), sessionGrace: asNumber(opt.sessionGrace, DEFAULTS.sessionGrace), uploadTimeout: asNumber(opt.uploadTimeout, DEFAULTS.uploadTimeout), downloadTimeout: asNumber(opt.downloadTimeout, DEFAULTS.downloadTimeout), fileCheckInterval: asNumber(opt.fileCheckInterval, DEFAULTS.fileCheckInterval), downloadCheckInterval: asNumber(opt.downloadCheckInterval, DEFAULTS.downloadCheckInterval), stateCheckInterval: asNumber(opt.stateCheckInterval, DEFAULTS.stateCheckInterval), inFlightDelay: asNumber(opt.inFlightDelay, DEFAULTS.inFlightDelay), waitTimeSeconds: asNumber(opt.waitTimeSeconds, DEFAULTS.waitTimeSeconds), waitTokenError: asNumber(opt.waitTokenError, DEFAULTS.waitTokenError), transferPoolSize: asNumber(opt.transferPoolSize, DEFAULTS.transferPoolSize), downloadMode: asString(opt.downloadMode, DEFAULTS.downloadMode), filetype: asArrayOf(isString)(opt.filetype, DEFAULTS.filetype), sampleDirectory: asString(opt.sampleDirectory, DEFAULTS.sampleDirectory),
const options = Object.assign(Object.assign({}, parseCoreOpts(opt)), { region: asString(opt.region, DEFAULTS.region), sessionGrace: asNumber(opt.sessionGrace, DEFAULTS.sessionGrace), uploadTimeout: asNumber(opt.uploadTimeout, DEFAULTS.uploadTimeout), uploadRetries: asNumber(opt.uploadRetries, DEFAULTS.uploadRetries), downloadTimeout: asNumber(opt.downloadTimeout, DEFAULTS.downloadTimeout), fileCheckInterval: asNumber(opt.fileCheckInterval, DEFAULTS.fileCheckInterval), downloadCheckInterval: asNumber(opt.downloadCheckInterval, DEFAULTS.downloadCheckInterval), stateCheckInterval: asNumber(opt.stateCheckInterval, DEFAULTS.stateCheckInterval), inFlightDelay: asNumber(opt.inFlightDelay, DEFAULTS.inFlightDelay), waitTimeSeconds: asNumber(opt.waitTimeSeconds, DEFAULTS.waitTimeSeconds), waitTokenError: asNumber(opt.waitTokenError, DEFAULTS.waitTokenError), transferPoolSize: asNumber(opt.transferPoolSize, DEFAULTS.transferPoolSize), downloadMode: asString(opt.downloadMode, DEFAULTS.downloadMode), filetype: asArrayOf(isString)(opt.filetype, DEFAULTS.filetype), sampleDirectory: asString(opt.sampleDirectory, DEFAULTS.sampleDirectory),
// optional values

@@ -81,0 +81,0 @@ useGraphQL: asOptBoolean(opt.useGraphQL), id_workflow_instance: asOptIndex(opt.id_workflow_instance), debounceWindow: asOptNumber(opt.debounceWindow), proxy: asOptString(opt.proxy),

@@ -1,6 +0,6 @@

import fs from 'fs-extra';
export default async function (filePath) {
return fs.stat(filePath).then((d) => {
import fs from 'fs';
export async function genericFileStatistics(filePath) {
return fs.promises.stat(filePath).then((d) => {
return { type: 'bytes', bytes: d.size };
});
}

@@ -1,3 +0,3 @@

import fs from 'fs-extra';
export default function (filePath) {
import fs from 'fs';
export function fastaFileStatistics(filePath) {
return new Promise((resolve, reject) => {

@@ -11,2 +11,3 @@ const linesPerRead = 2;

try {
// TODO make this async
stat = fs.statSync(filePath);

@@ -13,0 +14,0 @@ }

@@ -1,3 +0,3 @@

import fs from 'fs-extra';
export default function (filePath) {
import fs from 'fs';
export function fastqFileStatistics(filePath) {
return new Promise((resolve, reject) => {

@@ -11,2 +11,3 @@ const linesPerRead = 4;

try {
// TODO make this async
stat = fs.statSync(filePath);

@@ -13,0 +14,0 @@ }

@@ -1,4 +0,4 @@

import fs from 'fs-extra';
import fs from 'fs';
import { createGunzip } from 'zlib';
export default function (filePath) {
export function fastqgzFileStatistics(filePath) {
return new Promise((resolve, reject) => {

@@ -12,2 +12,3 @@ const linesPerRead = 4;

try {
// TODO make this async
stat = fs.statSync(filePath);

@@ -14,0 +15,0 @@ }

@@ -5,9 +5,10 @@ import { loadInputFiles } from './inputScanner';

import { isFastq } from './file_extensions';
import { UploadWarnings } from './fileUploader.type';
import { FileUploadWarnings, UploadWarnings } from './fileUploader.type';
import { splitter } from './splitters/fastq';
import filestats from './filestats';
import niceSize from './niceSize';
import { asNumber, asOptDictionary, asOptDictionaryOf, isArray, isDefined, makeNumber, assertDefined, } from 'ts-runtime-typecheck';
import { filestats } from './filestats';
import { niceSize } from './niceSize';
import { asNumber, asOptDictionary, asOptDictionaryOf, asOptString, assertDefined, isArray, isDefined, isDictionary, isDictionaryOf, makeNumber, } from 'ts-runtime-typecheck';
import { first } from 'rxjs/operators';
import path from 'path';
import fs from 'fs';
export function instantiateFileUpload(instance) {

@@ -17,3 +18,3 @@ assertDefined(instance.config.workflow, 'Workflow');

const workflow = instance.config.workflow;
const settings = readSettings(workflow);
const settings = readSettings(instance.config);
const hasStorageAccount = 'storage_account' in workflow;

@@ -25,11 +26,5 @@ if (settings.requiresStorage && !hasStorageAccount) {

const { inputFolders, outputFolder, filetype: filetypes } = instance.config.options;
const scanner = createFileScanner({
database,
inputFolders,
filetypes,
outputFolder,
});
const warnings = instance.states.warnings;
const state = instance.states.upload;
const ctx = {
const { warnings, upload: state } = instance.states;
const stopped$ = instance.uploadStopped$;
const context = {
settings,

@@ -41,29 +36,35 @@ warnings,

logger,
stopped$,
};
const { add: queueFile, empty$ } = createQueue({}, async (file) => processFile(ctx, file));
const scanner = createFileScanner({
database,
inputFolders,
filetypes,
outputFolder,
context,
});
let running = true;
stopped$.subscribe(() => {
running = false;
});
const { add: queueFile, empty$ } = createQueue({}, async (file) => {
if (running) {
await processFile(context, file);
}
});
const uploadInterval = instance.config.options.fileCheckInterval * 1000;
const queueEmpty = () => empty$.pipe(first()).toPromise();
const stopped$ = instance.uploadStopped$;
return async () => {
let running = true;
stopped$.subscribe(() => {
running = false;
});
while (running) {
const startTime = Date.now();
try {
const files = await scanner();
// NOTE queueEmpty will never resolve if nothing is queued
if (files.length > 0) {
for (const file of files) {
queueFile(file);
}
// NOTE errors that occur in the queue are swallowed
await queueEmpty();
// NOTE scanner errors are handled inside createFileScanner
const files = await scanner();
// NOTE queueEmpty will never resolve if nothing is queued
if (files.length > 0) {
for (const file of files) {
queueFile(file);
}
// NOTE errors that occur in the queue are swallowed
await queueEmpty();
}
catch (err) {
// NOTE err is only ever from the scanner
// TODO add warning about FS error
}
const deltaTime = Date.now() - startTime;

@@ -75,3 +76,3 @@ const delay = Math.max(0, uploadInterval - deltaTime);

}
export function createFileScanner({ database, inputFolders, outputFolder, filetypes, }) {
export function createFileScanner({ database, inputFolders, outputFolder, filetypes, context, }) {
const filter = async (location) => {

@@ -81,2 +82,5 @@ const exists = await database.seenUpload(location);

};
const errorHandler = (err) => {
addWarning(context, UploadWarnings.SCAN_FAIL, err + '');
};
const options = {

@@ -87,7 +91,11 @@ inputFolders,

filter,
errorHandler,
};
return () => loadInputFiles(options);
}
export function readSettings(workflow) {
export function readSettings({ options, instance: workflowInstance, workflow, }) {
var _a, _b;
assertDefined(workflow);
assertDefined(workflowInstance.bucket, 'workflowInstance.bucket');
assertDefined(workflowInstance.bucketFolder, 'workflowInstance.bucketFolder');
const settings = {

@@ -97,2 +105,6 @@ maxFiles: Infinity,

requiresStorage: false,
bucket: workflowInstance.bucket,
bucketFolder: workflowInstance.bucketFolder,
sseKeyId: workflowInstance.key_id,
retries: options.uploadRetries,
};

@@ -146,7 +158,7 @@ const workflowAttributes = asOptDictionary(workflow.workflowAttributes || workflow.workflow_attributes);

if (settings.maxFiles <= state.filesCount) {
await skipFile(file, ctx, UploadWarnings.TOO_MANY);
await skipFile(file, ctx, FileUploadWarnings.TOO_MANY);
return;
}
if (file.size === 0) {
await skipFile(file, ctx, UploadWarnings.EMPTY);
await skipFile(file, ctx, FileUploadWarnings.EMPTY);
return;

@@ -159,3 +171,3 @@ }

if (canSplit && shouldSplit && isDefined(split)) {
addWarning(file, ctx, UploadWarnings.SPLIT);
addFileWarning(file, ctx, FileUploadWarnings.SPLIT);
const isCompressed = /\.gz$/i.test(file.path);

@@ -170,4 +182,2 @@ const directory = path.dirname(file.relative);

const id = `${file.id}_${chunkId}`;
const stats = await filestats(chunkFile);
const { bytes: size } = stats;
const chunkFilestat = {

@@ -178,7 +188,5 @@ name,

id,
size,
size: 0,
};
// TODO this should retry if it fails... but only for some reasons
// await uploadFile(chunkFilestat, stats, ctx);
await ctx.instance.uploadJob(Object.assign(Object.assign({}, chunkFilestat), { stats }));
await uploadJob(ctx, chunkFilestat);
await database.splitDone(chunkFile);

@@ -193,16 +201,24 @@ }, isCompressed);

if (file.size > settings.maxFileSize) {
await skipFile(file, ctx, UploadWarnings.TOO_BIG);
await skipFile(file, ctx, FileUploadWarnings.TOO_BIG);
return;
}
const stats = await filestats(file.path);
// TODO this should retry if it fails... but only for some reasons
// await uploadFile(file, stats, ctx);
await ctx.instance.uploadJob(Object.assign(Object.assign({}, file), { stats }));
await uploadJob(ctx, file);
state.filesCount += 1;
}
export async function skipFile(file, ctx, warn) {
addWarning(file, ctx, warn);
addFileWarning(file, ctx, warn);
await ctx.database.skipFile(file.path);
}
export function addWarning(file, ctx, warn) {
export function addWarning(ctx, warn, msg) {
const { logger, warnings } = ctx;
let type;
switch (warn) {
case UploadWarnings.SCAN_FAIL:
type = 'WARNING_SCAN_FAIL';
break;
}
logger.error(msg);
warnings.push({ msg, type });
}
export function addFileWarning(file, ctx, warn) {
var _a, _b;

@@ -213,21 +229,258 @@ const { settings, logger, warnings } = ctx;

switch (warn) {
case UploadWarnings.EMPTY:
case FileUploadWarnings.EMPTY:
type = 'WARNING_FILE_EMPTY';
msg = `The file ${file.relative} is empty. It will be skipped.`;
break;
case UploadWarnings.SPLIT:
case FileUploadWarnings.SPLIT:
type = 'WARNING_FILE_SPLIT';
msg = `${file.relative}${file.size > splitSize ? ' is too big and' : ''} is going to be split`;
break;
case UploadWarnings.TOO_BIG:
case FileUploadWarnings.TOO_BIG:
type = 'WARNING_FILE_TOO_BIG';
msg = `The file ${file.relative} is bigger than the maximum size limit (${niceSize(settings.maxFileSize)}B). It will be skipped.`;
break;
case UploadWarnings.TOO_MANY:
case FileUploadWarnings.TOO_MANY:
type = 'WARNING_FILE_TOO_MANY';
msg = `Maximum ${settings.maxFiles} file(s) already uploaded. Marking ${file.relative} as skipped.`;
break;
case FileUploadWarnings.UPLOAD_FAILED:
type = 'WARNING_FILE_UPLOAD_FAILED';
msg = `Uploading ${file.relative} failed.`;
break;
case FileUploadWarnings.UPLOAD_RETRIES_EXCEEDED:
type = 'WARNING_FILE_UPLOAD_RETRIES_EXCEEDED';
msg = `Exceeded maximum retries uploading ${file.relative}. This file will not be uploaded.`;
break;
case FileUploadWarnings.MESSAGE_RETRIES_EXCEEDED:
type = 'WARNING_FILE_UPLOAD_MESSAGE_RETRIES_EXCEEDED';
msg = `Exceeded maximum retries adding ${file.relative} to the queue. This file will not be processed.`;
break;
}
logger.error(msg);
logger.warn(msg);
warnings.push({ msg, type });
}
export async function uploadJob(ctx, file) {
await uploadFile(file, await filestats(file.path), ctx);
}
export function addFailure(state, msg) {
var _a;
if (!state.failure) {
state.failure = {};
}
state.failure[msg] = ((_a = state.failure[msg]) !== null && _a !== void 0 ? _a : 0) + 1;
}
export function openReadStream(location, handler) {
const rs = fs.createReadStream(location);
return new Promise((resolve, reject) => {
rs.addListener('open', async () => {
try {
await handler(rs);
resolve();
}
catch (err) {
rs.close(); // ensure the stream is closed if we have an error in the handler
reject(err);
}
});
rs.addListener('error', (err) => reject(`Upload filesystem error ${err}`));
});
}
export function constructUploadParameters(ctx, file, rs) {
const { settings: { bucket, sseKeyId, bucketFolder }, } = ctx;
const mangledRelative = file.relative
.replace(/^[\\/]+/, '')
.replace(/\\/g, '/')
.replace(/\//g, '_'); // MC-7204, MC-7206 - this needs to be unpicked in future
const key = [
bucketFolder,
'component-0',
mangledRelative,
mangledRelative,
]
.join('/')
.replace(/\/+/g, '/');
const params = {
Bucket: bucket,
Key: key,
Body: rs,
};
if (sseKeyId) {
// MC-4996 support (optional, for now) encryption
params.SSEKMSKeyId = sseKeyId;
params.ServerSideEncryption = 'aws:kms';
}
if (file.size) {
params.ContentLength = file.size;
}
return params;
}
export async function uploadFile(file, stats, ctx) {
const { state, instance, logger, stopped$ } = ctx;
try {
const timeout = (instance.config.options.uploadTimeout + 5) * 1000;
const s3 = await instance.sessionedS3({
retryDelayOptions: {
customBackoff(count, err) {
addFileWarning(file, ctx, FileUploadWarnings.UPLOAD_FAILED);
ctx.logger.error('Upload error', err);
if (count > ctx.settings.retries) {
addFileWarning(file, ctx, FileUploadWarnings.UPLOAD_RETRIES_EXCEEDED);
return -1;
}
return 2 ** count * 1000; // 2s, 4s, 8s, 16s, 32s
},
},
maxRetries: ctx.settings.retries,
httpOptions: {
timeout,
},
});
await openReadStream(file.path, async (rs) => {
var _a;
const params = constructUploadParameters(ctx, file, rs);
const options = {
partSize: 10 * 1024 * 1024,
queueSize: 1,
};
instance.uploadState('progress', 'incr', {
total: file.size,
});
const managedUpload = s3.upload(params, options);
const subscription = stopped$.subscribe(() => {
managedUpload.abort();
});
const sessionManager = instance.initSessionManager([s3]);
sessionManager.sts_expiration = (_a = instance.sessionManager) === null || _a === void 0 ? void 0 : _a.sts_expiration; // No special options here, so use the main session and don't refetch until it's expired
let currentProgress = 0;
managedUpload.on('httpUploadProgress', async (progress) => {
const progressDelta = progress.loaded - currentProgress;
instance.uploadState('progress', 'incr', {
bytes: progressDelta,
}); // delta since last time
currentProgress = progress.loaded; // store for calculating delta next iteration
try {
await sessionManager.session(); // MC-7129 force refresh token on the MANAGED UPLOAD instance of the s3 service
}
catch (e) {
logger.warn(`Error refreshing token: ${String(e)}`);
}
});
try {
await managedUpload.promise();
await uploadComplete(ctx, params.Key, file); // send message
}
finally {
instance.uploadState('progress', 'decr', {
total: file.size,
bytes: currentProgress,
}); // zero in-flight upload counters
subscription.unsubscribe();
}
});
const { bytes = 0, reads = 0, sequences = 0 } = stats !== null && stats !== void 0 ? stats : {};
instance.uploadState('success', 'incr', { files: 1, bytes, reads, sequences });
const ext = path.extname(file.name);
instance.uploadState('types', 'incr', {
[ext]: 1,
});
}
catch (err) {
addFailure(state, err + '');
throw err;
}
}
function createMessage(instance, objectId) {
const workflowInstance = instance.config.instance;
const message = {
bucket: workflowInstance.bucket,
outputQueue: workflowInstance.outputQueueName,
remote_addr: workflowInstance.remote_addr,
apikey: instance.config.options.apikey,
id_workflow_instance: workflowInstance.id_workflow_instance,
id_master: workflowInstance.id_workflow,
utc: new Date().toISOString(),
path: objectId,
prefix: objectId.substring(0, objectId.lastIndexOf('/')),
key_id: workflowInstance.key_id,
};
if (workflowInstance.chain) {
let components;
if (!isDictionaryOf(isDictionary)(workflowInstance.chain.components)) {
throw new Error('Unexpected chain definition');
}
try {
components = JSON.parse(JSON.stringify(workflowInstance.chain.components)); // low-frills object clone
}
catch (_a) {
throw new Error(`Failed to clone workflow chain`);
}
for (const component of Object.values(components)) {
switch (component === null || component === void 0 ? void 0 : component.inputQueueName) {
case 'uploadMessageQueue':
component.inputQueueName = instance.uploadMessageQueue;
break;
case 'downloadMessageQueue':
component.inputQueueName = instance.downloadMessageQueue;
break;
default:
// NOTE should this be a NOOP or an error
break;
}
}
message.components = components;
message.targetComponentId = workflowInstance.chain.targetComponentId;
}
return message;
}
async function messageInputQueue(ctx, objectId, file) {
const { instance, logger } = ctx;
const message = createMessage(instance, objectId);
try {
const inputQueueURL = await instance.discoverQueue(asOptString(instance.config.instance.inputQueueName));
const sqs = await instance.sessionedSQS({
retryDelayOptions: {
customBackoff(count, err) {
ctx.logger.error('Upload message error', err);
if (count > ctx.settings.retries) {
addFileWarning(file, ctx, FileUploadWarnings.MESSAGE_RETRIES_EXCEEDED);
return -1;
}
return 2 ** count * 1000; // 2s, 4s, 8s, 16s, 32s
},
},
maxRetries: ctx.settings.retries,
});
logger.info(`${file.id} sending SQS message to input queue`);
const { MessageId } = await sqs
.sendMessage({
QueueUrl: inputQueueURL,
MessageBody: JSON.stringify(message),
})
.promise();
return MessageId;
}
catch (sendMessageException) {
logger.error(`${file.id} exception sending SQS message: ${String(sendMessageException)}`);
throw sendMessageException;
}
}
async function uploadComplete(ctx, objectId, file) {
const { instance, database, logger } = ctx;
logger.info(`${file.id} uploaded to S3: ${objectId}`);
const messageId = await messageInputQueue(ctx, objectId, file);
const workflowInstance = instance.config.instance;
instance
.realtimeFeedback(`workflow_instance:state`, {
type: 'start',
id_workflow_instance: workflowInstance.id_workflow_instance,
id_workflow: workflowInstance.id_workflow,
component_id: '0',
message_id: messageId,
id_user: workflowInstance.id_user,
})
.catch((e) => {
logger.warn(`realtimeFeedback failed: ${String(e)}`);
});
logger.info(`${file.id} SQS message sent. Mark as uploaded`);
await database.uploadFile(file.path);
}

@@ -0,7 +1,14 @@

export var FileUploadWarnings;
(function (FileUploadWarnings) {
FileUploadWarnings[FileUploadWarnings["TOO_MANY"] = 0] = "TOO_MANY";
FileUploadWarnings[FileUploadWarnings["EMPTY"] = 1] = "EMPTY";
FileUploadWarnings[FileUploadWarnings["TOO_BIG"] = 2] = "TOO_BIG";
FileUploadWarnings[FileUploadWarnings["SPLIT"] = 3] = "SPLIT";
FileUploadWarnings[FileUploadWarnings["UPLOAD_FAILED"] = 4] = "UPLOAD_FAILED";
FileUploadWarnings[FileUploadWarnings["UPLOAD_RETRIES_EXCEEDED"] = 5] = "UPLOAD_RETRIES_EXCEEDED";
FileUploadWarnings[FileUploadWarnings["MESSAGE_RETRIES_EXCEEDED"] = 6] = "MESSAGE_RETRIES_EXCEEDED";
})(FileUploadWarnings || (FileUploadWarnings = {}));
export var UploadWarnings;
(function (UploadWarnings) {
UploadWarnings[UploadWarnings["TOO_MANY"] = 0] = "TOO_MANY";
UploadWarnings[UploadWarnings["EMPTY"] = 1] = "EMPTY";
UploadWarnings[UploadWarnings["TOO_BIG"] = 2] = "TOO_BIG";
UploadWarnings[UploadWarnings["SPLIT"] = 3] = "SPLIT";
UploadWarnings[UploadWarnings["SCAN_FAIL"] = 0] = "SCAN_FAIL";
})(UploadWarnings || (UploadWarnings = {}));

@@ -30,3 +30,3 @@ import { createInspector } from 'fs-inspect';

*/
export async function loadInputFiles({ inputFolders, outputFolder, filetypes, filter, }) {
export async function loadInputFiles({ inputFolders, outputFolder, filetypes, filter, errorHandler, }) {
const extensionFilter = createExtensionFilter(filetypes !== null && filetypes !== void 0 ? filetypes : '');

@@ -54,2 +54,3 @@ const outputBasename = outputFolder ? basename(outputFolder) : null;

},
catch: errorHandler,
});

@@ -56,0 +57,0 @@ const results = [];

@@ -1,2 +0,2 @@

const niceSize = (sizeIn, unitIndexIn) => {
export function niceSize(sizeIn, unitIndexIn) {
const UNITS = ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z'];

@@ -15,3 +15,2 @@ const DIV = 1000;

return `${size.toFixed(1)}${UNITS[unitIndex]}`;
};
export default niceSize;
}
import ProxyAgent from 'proxy-agent';
import { asDictionary, isIndex, makeNumber } from 'ts-runtime-typecheck';
import { isIndex, makeNumber, asDefined, asOptString, isDefined } from 'ts-runtime-typecheck';
export default class SessionManager {

@@ -27,3 +27,3 @@ constructor(idWorkflowInstance, REST, children, opts, graphQL) {

const result = await this.graphQL.instanceToken(instanceTokenOptions);
token = asDictionary((_a = result.data) === null || _a === void 0 ? void 0 : _a.token);
token = asDefined((_a = result.data) === null || _a === void 0 ? void 0 : _a.token);
}

@@ -38,4 +38,17 @@ else {

this.sts_expiration = new Date(token.expiration).getTime() - 60 * makeNumber((_b = this.options.sessionGrace) !== null && _b !== void 0 ? _b : '0'); // refresh token x mins before it expires
const configUpdate = token;
let credentials = null;
if (isDefined(token.accessKeyId) && isDefined(token.secretAccessKey)) {
credentials = {
accessKeyId: token.accessKeyId,
secretAccessKey: token.secretAccessKey,
sessionToken: asOptString(token.sessionToken),
};
}
const configUpdate = {
credentials,
region: asOptString(token.region),
};
if (this.options.proxy) {
// NOTE AWS SDK explicitly does a deep merge on httpOptions
// so this won't squash any options that have already been set
configUpdate.httpOptions = {

@@ -42,0 +55,0 @@ agent: ProxyAgent(this.options.proxy),

@@ -1,4 +0,3 @@

import { Subject } from 'rxjs';
/*
NOTE this exists mostly because TS is having a really hard time to decide
NOTE this partially exists because TS is having a really hard time to decide
which version the timer functions we are using (the return type of Node.js

@@ -32,17 +31,2 @@ timers is an object not a number) and in some cases appears to be using

}
export function createTimeout$(duration) {
const timeout$ = new Subject();
const cb = () => timeout$.complete();
let id = setTimeout(cb, duration);
return {
cancel() {
clearTimeout(id);
},
timeout$,
reset(newDuration = duration) {
clearTimeout(id);
id = setTimeout(cb, newDuration);
},
};
}
export function sleep(duration) {

@@ -49,0 +33,0 @@ return new Promise((resolve) => {

@@ -12,3 +12,3 @@ {

"private": false,
"version": "5.0.5200121",
"version": "5.0.5353744",
"main": "cjs/index-web.js",

@@ -15,0 +15,0 @@ "module": "esm/index-web.js",

@@ -13,2 +13,3 @@ import type { Logger } from './Logger';

uploadTimeout: number;
uploadRetries: number;
downloadTimeout: number;

@@ -15,0 +16,0 @@ fileCheckInterval: number;

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc