@metrichor/epi2me-web
Advanced tools
Comparing version 5.1.6028772 to 5.1.6058802
@@ -8,2 +8,3 @@ "use strict"; | ||
const ts_runtime_typecheck_1 = require("ts-runtime-typecheck"); | ||
const ts_runtime_typecheck_2 = require("ts-runtime-typecheck"); | ||
const aws_sdk_1 = __importDefault(require("aws-sdk")); | ||
@@ -13,2 +14,3 @@ const fs_extra_1 = __importDefault(require("fs-extra")); /* MC-565 handle EMFILE & EXDIR gracefully; use Promises */ | ||
const path_1 = __importDefault(require("path")); | ||
const db_1 = require("./db"); | ||
const epi2me_1 = require("./epi2me"); | ||
@@ -44,14 +46,9 @@ const factory_1 = require("./factory"); | ||
}; | ||
const asChain = (0, ts_runtime_typecheck_1.asOptStruct)({ | ||
components: (0, ts_runtime_typecheck_1.isDictionaryOf)(ts_runtime_typecheck_1.isDictionary), | ||
targetComponentId: ts_runtime_typecheck_1.isDefined, | ||
}); | ||
class EPI2ME_FS extends epi2me_1.EPI2ME { | ||
constructor(opts) { | ||
super(opts); // sets up this.config & this.log | ||
this.SampleReader = new sample_reader_1.SampleReader(); | ||
this.fetchToken = async () => { | ||
var _a; | ||
let token; | ||
(0, ts_runtime_typecheck_1.assertDefined)(this.config.instance.id_workflow_instance); | ||
(0, ts_runtime_typecheck_2.assertDefined)(this.config.instance.id_workflow_instance); | ||
if (this.config.options.useGraphQL) { | ||
@@ -62,3 +59,3 @@ const instanceTokenOptions = { | ||
const result = await this.graphQL.instanceToken(instanceTokenOptions); | ||
token = (0, ts_runtime_typecheck_1.asDefined)((_a = result.data) === null || _a === void 0 ? void 0 : _a.token); | ||
token = (0, ts_runtime_typecheck_2.asDefined)((_a = result.data) === null || _a === void 0 ? void 0 : _a.token); | ||
} | ||
@@ -74,2 +71,3 @@ else { | ||
this.graphQL = new graphql_fs_1.GraphQLFS(this.config.options); | ||
this.SampleReader = new sample_reader_1.SampleReader(); | ||
} | ||
@@ -99,3 +97,3 @@ sessionedS3(options = {}) { | ||
try { | ||
const queueURL = await this.discoverQueue((0, ts_runtime_typecheck_1.asOptString)(this.config.instance.outputQueueName)); | ||
const queueURL = await this.discoverQueue((0, ts_runtime_typecheck_2.asOptString)(this.config.instance.outputQueueName)); | ||
const sqs = this.getSQSSessionedService(); | ||
@@ -105,3 +103,3 @@ return sqs | ||
QueueUrl: queueURL, | ||
ReceiptHandle: (0, ts_runtime_typecheck_1.asString)(message.ReceiptHandle), | ||
ReceiptHandle: (0, ts_runtime_typecheck_2.asString)(message.ReceiptHandle), | ||
}) | ||
@@ -116,3 +114,3 @@ .promise(); | ||
if (this.config.instance.discoverQueueCache[queueName]) { | ||
return (0, ts_runtime_typecheck_1.asString)(this.config.instance.discoverQueueCache[queueName]); | ||
return (0, ts_runtime_typecheck_2.asString)(this.config.instance.discoverQueueCache[queueName]); | ||
} | ||
@@ -132,3 +130,3 @@ this.log.debug(`discovering queue for ${queueName}`); | ||
} | ||
const queueURL = (0, ts_runtime_typecheck_1.asString)(getQueue.QueueUrl); | ||
const queueURL = (0, ts_runtime_typecheck_2.asString)(getQueue.QueueUrl); | ||
this.log.debug(`found queue ${queueURL}`); | ||
@@ -191,3 +189,3 @@ this.config.instance.discoverQueueCache[queueName] = queueURL; | ||
const response = await this.graphQL.startWorkflow({ variables }); | ||
instance = (0, ts_runtime_typecheck_1.asDefined)((_a = response.data) === null || _a === void 0 ? void 0 : _a.startData); | ||
instance = (0, ts_runtime_typecheck_2.asDefined)((_a = response.data) === null || _a === void 0 ? void 0 : _a.startData); | ||
} | ||
@@ -229,28 +227,26 @@ catch (err) { | ||
} | ||
validateChain(chain) { | ||
return asChain((0, ts_runtime_typecheck_1.isString)(chain) ? JSON.parse(chain) : chain); | ||
} | ||
setClassConfigGQL(startData) { | ||
(0, ts_runtime_typecheck_1.assertDefined)(startData, 'Workflow Start Data'); | ||
(0, ts_runtime_typecheck_2.assertDefined)(startData, 'Workflow Start Data'); | ||
const { instance, bucket, idUser, remoteAddr } = startData; | ||
(0, ts_runtime_typecheck_1.assertDefined)(instance, 'Workflow Instance'); | ||
(0, ts_runtime_typecheck_2.assertDefined)(instance, 'Workflow Instance'); | ||
const { workflowImage, outputqueue, keyId, startDate, idWorkflowInstance, mappedTelemetry, telemetryNames } = instance; | ||
const chain = (0, ts_runtime_typecheck_2.isString)(instance.chain) ? (0, ts_runtime_typecheck_2.asString)(instance.chain) : (0, ts_runtime_typecheck_2.asDictionary)(instance.chain); | ||
const regionName = workflowImage.region.name; | ||
const idWorkflow = (0, ts_runtime_typecheck_1.asOptIndex)(workflowImage.workflow.idWorkflow); | ||
const idWorkflow = (0, ts_runtime_typecheck_2.asOptIndex)(workflowImage.workflow.idWorkflow); | ||
const inputqueue = workflowImage.inputqueue; | ||
const map = { | ||
bucket: (0, ts_runtime_typecheck_1.asOptString)(bucket), | ||
id_user: (0, ts_runtime_typecheck_1.asOptIndex)(idUser), | ||
remote_addr: (0, ts_runtime_typecheck_1.asOptString)(remoteAddr), | ||
id_workflow_instance: (0, ts_runtime_typecheck_1.asOptIndex)(idWorkflowInstance), | ||
key_id: (0, ts_runtime_typecheck_1.asOptString)(keyId), | ||
start_date: (0, ts_runtime_typecheck_1.asOptString)(startDate), | ||
outputQueueName: (0, ts_runtime_typecheck_1.asOptString)(outputqueue), | ||
summaryTelemetry: (0, ts_runtime_typecheck_1.asOptDictionary)(mappedTelemetry), | ||
telemetryNames: (0, ts_runtime_typecheck_1.asOptDictionaryOf)((0, ts_runtime_typecheck_1.isDictionaryOf)(ts_runtime_typecheck_1.isString))(telemetryNames), | ||
inputQueueName: (0, ts_runtime_typecheck_1.asOptString)(inputqueue), | ||
bucket: (0, ts_runtime_typecheck_2.asOptString)(bucket), | ||
id_user: (0, ts_runtime_typecheck_2.asOptIndex)(idUser), | ||
remote_addr: (0, ts_runtime_typecheck_2.asOptString)(remoteAddr), | ||
id_workflow_instance: (0, ts_runtime_typecheck_2.asOptIndex)(idWorkflowInstance), | ||
key_id: (0, ts_runtime_typecheck_2.asOptString)(keyId), | ||
start_date: (0, ts_runtime_typecheck_2.asOptString)(startDate), | ||
outputQueueName: (0, ts_runtime_typecheck_2.asOptString)(outputqueue), | ||
summaryTelemetry: (0, ts_runtime_typecheck_2.asOptDictionary)(mappedTelemetry), | ||
telemetryNames: (0, ts_runtime_typecheck_2.asOptDictionaryOf)((0, ts_runtime_typecheck_2.isDictionaryOf)(ts_runtime_typecheck_2.isString))(telemetryNames), | ||
inputQueueName: (0, ts_runtime_typecheck_2.asOptString)(inputqueue), | ||
id_workflow: idWorkflow, | ||
region: regionName, | ||
bucketFolder: `${outputqueue}/${idUser}/${idWorkflowInstance}`, | ||
chain: this.validateChain(instance.chain), | ||
chain: utils_fs_1.utilsFS.convertResponseToObject(chain), | ||
}; | ||
@@ -261,16 +257,19 @@ this.config.instance = Object.assign(Object.assign({}, this.config.instance), map); | ||
const conf = this.config.instance; | ||
conf.id_workflow_instance = (0, ts_runtime_typecheck_1.asOptIndex)(instance.id_workflow_instance); | ||
conf.id_workflow = (0, ts_runtime_typecheck_1.asOptIndex)(instance.id_workflow); | ||
conf.remote_addr = (0, ts_runtime_typecheck_1.asOptString)(instance.remote_addr); | ||
conf.key_id = (0, ts_runtime_typecheck_1.asOptString)(instance.key_id); | ||
conf.bucket = (0, ts_runtime_typecheck_1.asOptString)(instance.bucket); | ||
conf.start_date = (0, ts_runtime_typecheck_1.asOptString)(instance.start_date); | ||
conf.id_user = (0, ts_runtime_typecheck_1.asOptIndex)(instance.id_user); | ||
conf.id_workflow_instance = (0, ts_runtime_typecheck_2.asOptIndex)(instance.id_workflow_instance); | ||
conf.id_workflow = (0, ts_runtime_typecheck_2.asOptIndex)(instance.id_workflow); | ||
conf.remote_addr = (0, ts_runtime_typecheck_2.asOptString)(instance.remote_addr); | ||
conf.key_id = (0, ts_runtime_typecheck_2.asOptString)(instance.key_id); | ||
conf.bucket = (0, ts_runtime_typecheck_2.asOptString)(instance.bucket); | ||
conf.start_date = (0, ts_runtime_typecheck_2.asOptString)(instance.start_date); | ||
conf.id_user = (0, ts_runtime_typecheck_2.asOptIndex)(instance.id_user); | ||
// copy tuples with different names / structures | ||
conf.inputQueueName = (0, ts_runtime_typecheck_1.asOptString)(instance.inputqueue); | ||
conf.outputQueueName = (0, ts_runtime_typecheck_1.asOptString)(instance.outputqueue); | ||
conf.region = (0, ts_runtime_typecheck_1.asString)(instance.region, this.config.options.region); | ||
conf.inputQueueName = (0, ts_runtime_typecheck_2.asOptString)(instance.inputqueue); | ||
conf.outputQueueName = (0, ts_runtime_typecheck_2.asOptString)(instance.outputqueue); | ||
conf.region = (0, ts_runtime_typecheck_2.asString)(instance.region, this.config.options.region); | ||
conf.bucketFolder = `${instance.outputqueue}/${instance.id_user}/${instance.id_workflow_instance}`; | ||
conf.summaryTelemetry = (0, ts_runtime_typecheck_1.asOptDictionary)(instance.telemetry); // MC-7056 for fetchTelemetry (summary) telemetry periodically | ||
conf.chain = this.validateChain(instance.chain); | ||
conf.summaryTelemetry = (0, ts_runtime_typecheck_2.asOptDictionary)(instance.telemetry); // MC-7056 for fetchTelemetry (summary) telemetry periodically | ||
// WARN this assumes chain is an object, but could it be a string? | ||
if (instance.chain) { | ||
conf.chain = utils_fs_1.utilsFS.convertResponseToObject((0, ts_runtime_typecheck_2.asDictionary)(instance.chain)); | ||
} | ||
} | ||
@@ -335,3 +334,3 @@ // WARN | ||
const instancesDir = path_1.default.join(rootDir(), 'instances'); | ||
const thisInstanceDir = path_1.default.join(instancesDir, (0, ts_runtime_typecheck_1.makeString)(this.config.instance.id_workflow_instance)); | ||
const thisInstanceDir = path_1.default.join(instancesDir, (0, ts_runtime_typecheck_2.makeString)(this.config.instance.id_workflow_instance)); | ||
try { | ||
@@ -345,2 +344,10 @@ await fs_extra_1.default.mkdirp(thisInstanceDir); | ||
} | ||
// NOTE don't need the database if we're running from a dataset | ||
if (inputFolders) { | ||
// set up new tracking database | ||
this.db = new db_1.DB(thisInstanceDir, { | ||
idWorkflowInstance: (0, ts_runtime_typecheck_2.makeString)(this.config.instance.id_workflow_instance), | ||
inputFolders, | ||
}, this.log); | ||
} | ||
// MC-1828 - include instance id in telemetry file name | ||
@@ -419,6 +426,6 @@ const fileName = this.config.instance.id_workflow_instance | ||
}); | ||
instanceObj = (0, ts_runtime_typecheck_1.asDefined)(response.data).workflowInstance; | ||
instanceObj = (0, ts_runtime_typecheck_2.asDefined)(response.data).workflowInstance; | ||
} | ||
else { | ||
instanceObj = await this.REST.workflowInstance((0, ts_runtime_typecheck_1.asDefined)(this.config.instance.id_workflow_instance)); | ||
instanceObj = await this.REST.workflowInstance((0, ts_runtime_typecheck_2.asDefined)(this.config.instance.id_workflow_instance)); | ||
} | ||
@@ -429,3 +436,3 @@ if (instanceObj.state === 'stopped') { | ||
await this.stopEverything(); | ||
const remoteShutdownCb = (0, ts_runtime_typecheck_1.asOptFunction)(this.config.options.remoteShutdownCb); | ||
const remoteShutdownCb = (0, ts_runtime_typecheck_2.asOptFunction)(this.config.options.remoteShutdownCb); | ||
if (remoteShutdownCb) { | ||
@@ -454,2 +461,18 @@ remoteShutdownCb(`instance was stopped remotely at ${instanceObj.stop_date}`); | ||
} | ||
async stopUpload() { | ||
super.stopUpload(); | ||
this.log.debug('clearing split files'); | ||
if (this.db) { | ||
await this.db.splitClean(); // remove any split files whose transfers were disrupted and which didn't self-clean | ||
} | ||
} | ||
async stopAnalysis() { | ||
var _a, _b; | ||
await super.stopAnalysis(); | ||
(_a = this.telemetryDestroySignal$) === null || _a === void 0 ? void 0 : _a.next(); | ||
(_b = this.telemetryDestroySignal$) === null || _b === void 0 ? void 0 : _b.complete(); | ||
} | ||
async stopEverything() { | ||
await super.stopEverything(); | ||
} | ||
async checkForDownloads() { | ||
@@ -462,3 +485,3 @@ if (this.checkForDownloadsRunning) { | ||
try { | ||
const queueURL = await this.discoverQueue((0, ts_runtime_typecheck_1.asOptString)(this.config.instance.outputQueueName)); | ||
const queueURL = await this.discoverQueue((0, ts_runtime_typecheck_2.asOptString)(this.config.instance.outputQueueName)); | ||
const len = await this.queueLength(queueURL); | ||
@@ -485,3 +508,3 @@ if (len) { | ||
try { | ||
const queueURL = await this.discoverQueue((0, ts_runtime_typecheck_1.asOptString)(this.config.instance.outputQueueName)); | ||
const queueURL = await this.discoverQueue((0, ts_runtime_typecheck_2.asOptString)(this.config.instance.outputQueueName)); | ||
this.log.debug('fetching messages'); | ||
@@ -529,3 +552,3 @@ const sqs = this.getSQSSessionedService(); | ||
for (const message of receiveMessages.Messages) { | ||
const id = (0, ts_runtime_typecheck_1.makeString)(message.MessageId); | ||
const id = (0, ts_runtime_typecheck_2.makeString)(message.MessageId); | ||
workerPool[id] = 1; | ||
@@ -565,3 +588,3 @@ // WARN this behaviour does not appear to be correct, where does the error go??? | ||
try { | ||
messageBody = JSON.parse((0, ts_runtime_typecheck_1.asString)(message.Body)); | ||
messageBody = JSON.parse((0, ts_runtime_typecheck_2.asString)(message.Body)); | ||
} | ||
@@ -579,3 +602,3 @@ catch (err) { | ||
const { downloadMode } = this.config.options; | ||
const telemetry = (0, ts_runtime_typecheck_1.asOptDictionary)(messageBody.telemetry); | ||
const telemetry = (0, ts_runtime_typecheck_2.asOptDictionary)(messageBody.telemetry); | ||
/* MC-405 telemetry log to file */ | ||
@@ -589,4 +612,4 @@ if (telemetry && downloadMode.includes('telemetry')) { | ||
.getObject({ | ||
Bucket: (0, ts_runtime_typecheck_1.asString)(messageBody.bucket), | ||
Key: (0, ts_runtime_typecheck_1.asString)(telemetry.tm_path), | ||
Bucket: (0, ts_runtime_typecheck_2.asString)(messageBody.bucket), | ||
Key: (0, ts_runtime_typecheck_2.asString)(telemetry.tm_path), | ||
}) | ||
@@ -596,3 +619,3 @@ .promise(); | ||
const body = data.Body; | ||
if ((0, ts_runtime_typecheck_1.isUndefined)(body)) { | ||
if ((0, ts_runtime_typecheck_2.isUndefined)(body)) { | ||
throw new Error('Telemetry body is undefined'); | ||
@@ -637,4 +660,4 @@ } | ||
const idWorkflowInstance = this.config.instance.id_workflow_instance; | ||
let folder = path_1.default.join((0, ts_runtime_typecheck_1.asString)(this.config.options.outputFolder), (0, ts_runtime_typecheck_1.isUndefined)(idWorkflowInstance) ? '' : (0, ts_runtime_typecheck_1.makeString)(idWorkflowInstance)); | ||
const telemetryHintsFolder = (0, ts_runtime_typecheck_1.asOptString)((_a = (0, ts_runtime_typecheck_1.asOptDictionary)(telemetry === null || telemetry === void 0 ? void 0 : telemetry.hints)) === null || _a === void 0 ? void 0 : _a.folder); | ||
let folder = path_1.default.join((0, ts_runtime_typecheck_2.asString)(this.config.options.outputFolder), (0, ts_runtime_typecheck_2.isUndefined)(idWorkflowInstance) ? '' : (0, ts_runtime_typecheck_2.makeString)(idWorkflowInstance)); | ||
const telemetryHintsFolder = (0, ts_runtime_typecheck_2.asOptString)((_a = (0, ts_runtime_typecheck_2.asOptDictionary)(telemetry === null || telemetry === void 0 ? void 0 : telemetry.hints)) === null || _a === void 0 ? void 0 : _a.folder); | ||
/* MC-940: use folder hinting if present */ | ||
@@ -659,5 +682,5 @@ if (telemetryHintsFolder) { | ||
/* download file[s] from S3 */ | ||
const downloads = (0, ts_runtime_typecheck_1.isArrayOf)(ts_runtime_typecheck_1.isString)(messageBody.downloads) | ||
const downloads = (0, ts_runtime_typecheck_2.isArrayOf)(ts_runtime_typecheck_2.isString)(messageBody.downloads) | ||
? messageBody.downloads | ||
: [(0, ts_runtime_typecheck_1.asString)(messageBody.path)]; | ||
: [(0, ts_runtime_typecheck_2.asString)(messageBody.path)]; | ||
await Promise.all(downloads.map(async (filepath) => { | ||
@@ -668,3 +691,3 @@ const destination = path_1.default.join(folder, path_1.default.basename(filepath)); | ||
await this.initiateDownloadStream({ | ||
bucket: (0, ts_runtime_typecheck_1.asString)(messageBody.bucket), | ||
bucket: (0, ts_runtime_typecheck_2.asString)(messageBody.bucket), | ||
path: filepath, | ||
@@ -679,5 +702,5 @@ }, message, destination); | ||
else { | ||
const batchSummary = (0, ts_runtime_typecheck_1.asOptDictionary)(telemetry === null || telemetry === void 0 ? void 0 : telemetry.batch_summary); | ||
const batchSummary = (0, ts_runtime_typecheck_2.asOptDictionary)(telemetry === null || telemetry === void 0 ? void 0 : telemetry.batch_summary); | ||
// telemetry-only mode uses readcount from message | ||
const readCount = (_b = (0, ts_runtime_typecheck_1.asOptNumber)(batchSummary === null || batchSummary === void 0 ? void 0 : batchSummary.reads_num)) !== null && _b !== void 0 ? _b : 1; | ||
const readCount = (_b = (0, ts_runtime_typecheck_2.asOptNumber)(batchSummary === null || batchSummary === void 0 ? void 0 : batchSummary.reads_num)) !== null && _b !== void 0 ? _b : 1; | ||
this.downloadState('success', 'incr', { | ||
@@ -844,4 +867,4 @@ files: 1, | ||
.changeMessageVisibility({ | ||
QueueUrl: (0, ts_runtime_typecheck_1.asString)(queueUrl), | ||
ReceiptHandle: (0, ts_runtime_typecheck_1.asString)(receiptHandle), | ||
QueueUrl: (0, ts_runtime_typecheck_2.asString)(queueUrl), | ||
ReceiptHandle: (0, ts_runtime_typecheck_2.asString)(receiptHandle), | ||
VisibilityTimeout: this.config.options.inFlightDelay.seconds, | ||
@@ -880,8 +903,9 @@ }) | ||
} | ||
const instanceDir = path_1.default.join(rootDir(), 'instances', (0, ts_runtime_typecheck_1.makeString)(this.config.instance.id_workflow_instance)); | ||
const instanceDir = path_1.default.join(rootDir(), 'instances', (0, ts_runtime_typecheck_2.makeString)(this.config.instance.id_workflow_instance)); | ||
const telemetryNames = (_b = (_a = this.config) === null || _a === void 0 ? void 0 : _a.instance) === null || _b === void 0 ? void 0 : _b.telemetryNames; | ||
const idWorkflowInstance = (0, ts_runtime_typecheck_1.makeString)(this.config.instance.id_workflow_instance); | ||
this.telemetry = telemetry_1.Telemetry.connect(idWorkflowInstance, this.graphQL, (0, ts_runtime_typecheck_1.asDefined)(telemetryNames)); | ||
const idWorkflowInstance = (0, ts_runtime_typecheck_2.makeString)(this.config.instance.id_workflow_instance); | ||
this.telemetry = telemetry_1.Telemetry.connect(idWorkflowInstance, this.graphQL, (0, ts_runtime_typecheck_2.asDefined)(telemetryNames)); | ||
// const reports$ = this.telemetry.telemetryReports$().pipe(filter(isDefined)); | ||
const { add: writeQueue } = (0, queue_1.createQueue)({ signal$: this.analysisStopped$ }, async ([filePath, content]) => { | ||
const destroySignal$ = new rxjs_1.Subject(); | ||
const { add: writeQueue } = (0, queue_1.createQueue)({ signal$: destroySignal$ }, async ([filePath, content]) => { | ||
try { | ||
@@ -896,3 +920,3 @@ await fs_extra_1.default.writeJSON(filePath, content); | ||
this.telemetry.anyReportsReady$ | ||
.pipe((0, operators_1.filter)((isReady) => isReady), (0, operators_1.first)(), (0, operators_1.takeUntil)(this.analysisStopped$)) | ||
.pipe((0, operators_1.filter)((isReady) => isReady), (0, operators_1.first)(), (0, operators_1.takeUntil)(destroySignal$)) | ||
.subscribe(() => this.reportState$.next(true)); | ||
@@ -903,3 +927,3 @@ // pass all telemetry fils to public instanceTelemetry$ subject | ||
return reports.map(({ report }) => report); | ||
}), (0, operators_1.takeUntil)(this.analysisStopped$)) | ||
}), (0, operators_1.takeUntil)(destroySignal$)) | ||
.subscribe((reports) => this.instanceTelemetry$.next(reports)); | ||
@@ -932,6 +956,7 @@ const previousReports$ = new rxjs_1.BehaviorSubject([]); | ||
}); | ||
this.telemetryDestroySignal$ = destroySignal$; | ||
} | ||
async fetchTelemetry() { | ||
var _a, _b; | ||
const instanceDir = path_1.default.join(rootDir(), 'instances', (0, ts_runtime_typecheck_1.makeString)(this.config.instance.id_workflow_instance)); | ||
const instanceDir = path_1.default.join(rootDir(), 'instances', (0, ts_runtime_typecheck_2.makeString)(this.config.instance.id_workflow_instance)); | ||
if (this.config.options.useGraphQL) { | ||
@@ -945,8 +970,8 @@ // uses observeTelemetry instead | ||
const toFetch = []; | ||
const summaryTelemetry = (0, ts_runtime_typecheck_1.asDictionary)(this.config.instance.summaryTelemetry); | ||
const summaryTelemetry = (0, ts_runtime_typecheck_2.asDictionary)(this.config.instance.summaryTelemetry); | ||
Object.keys(summaryTelemetry).forEach((componentId) => { | ||
var _a; | ||
const component = (_a = (0, ts_runtime_typecheck_1.asDictionary)(summaryTelemetry[componentId])) !== null && _a !== void 0 ? _a : {}; | ||
const component = (_a = (0, ts_runtime_typecheck_2.asDictionary)(summaryTelemetry[componentId])) !== null && _a !== void 0 ? _a : {}; | ||
const firstReport = Object.keys(component)[0]; // poor show | ||
let url = (0, ts_runtime_typecheck_1.asOptString)(component[firstReport]); | ||
let url = (0, ts_runtime_typecheck_2.asOptString)(component[firstReport]); | ||
if (!url) { | ||
@@ -953,0 +978,0 @@ return; |
@@ -44,35 +44,2 @@ "use strict"; | ||
this.liveStates$ = new rxjs_1.BehaviorSubject(this.states); | ||
this.updateWorkerStatus = (newWorkerStatus) => { | ||
var _a, _b; | ||
const { instance: instanceConfig } = this.config; | ||
const components = (_a = instanceConfig.chain) === null || _a === void 0 ? void 0 : _a.components; | ||
if (!components) { | ||
return; | ||
} | ||
const summaryTelemetry = (0, ts_runtime_typecheck_1.asDictionary)(instanceConfig.summaryTelemetry); | ||
const workerStatus = Object.entries(components).sort((a, b) => parseInt(a[0], 10) - parseInt(b[0], 10)); | ||
const indexableNewWorkerStatus = (0, ts_runtime_typecheck_1.asIndexable)(newWorkerStatus); | ||
const results = []; | ||
for (const [key, value] of workerStatus) { | ||
if (key in indexableNewWorkerStatus) { | ||
const step = +key; | ||
let name = 'ROOT'; | ||
if (step !== 0) { | ||
const wid = (0, ts_runtime_typecheck_1.asIndex)((0, ts_runtime_typecheck_1.asDictionary)(value).wid); | ||
name = (_b = Object.keys((0, ts_runtime_typecheck_1.asDictionary)(summaryTelemetry[wid]))[0]) !== null && _b !== void 0 ? _b : 'ROOT'; | ||
} | ||
const [running, complete, error] = (0, ts_runtime_typecheck_1.asString)(indexableNewWorkerStatus[key]) | ||
.split(',') | ||
.map((componentID) => Math.max(0, +componentID)); // It's dodgy but assuming the componentID is a number happens all over the place | ||
results.push({ | ||
running, | ||
complete, | ||
error, | ||
step, | ||
name, | ||
}); | ||
} | ||
} | ||
this.experimentalWorkerStatus$.next(results); | ||
}; | ||
const options = (0, parseOptions_1.parseOptions)(optstring); | ||
@@ -97,26 +64,46 @@ const { idWorkflowInstance, log, region } = options; | ||
} | ||
async getSocket() { | ||
if (this.socket) { | ||
return this.socket; | ||
async socket() { | ||
if (this.mySocket) { | ||
return this.mySocket; | ||
} | ||
let jwt; | ||
try { | ||
jwt = await this.REST.jwt(); | ||
this.mySocket = new socket_1.default(this.REST, this.config.options); | ||
const { id_workflow_instance: idWorkflowInstance } = this.config.instance; | ||
if (idWorkflowInstance) { | ||
this.mySocket.watch(`workflow_instance:state:${idWorkflowInstance}`, (newWorkerStatus) => { | ||
var _a, _b; | ||
const { instance: instanceConfig } = this.config; | ||
const components = (0, ts_runtime_typecheck_1.asOptDictionary)((_a = instanceConfig.chain) === null || _a === void 0 ? void 0 : _a.components); | ||
if (components) { | ||
const summaryTelemetry = (0, ts_runtime_typecheck_1.asDictionary)(instanceConfig.summaryTelemetry); | ||
const workerStatus = Object.entries(components).sort((a, b) => parseInt(a[0], 10) - parseInt(b[0], 10)); | ||
const indexableNewWorkerStatus = (0, ts_runtime_typecheck_1.asIndexable)(newWorkerStatus); | ||
const results = []; | ||
for (const [key, value] of workerStatus) { | ||
if (key in indexableNewWorkerStatus) { | ||
const step = +key; | ||
let name = 'ROOT'; | ||
if (step !== 0) { | ||
const wid = (0, ts_runtime_typecheck_1.asIndex)((0, ts_runtime_typecheck_1.asDictionary)(value).wid); | ||
name = (_b = Object.keys((0, ts_runtime_typecheck_1.asDictionary)(summaryTelemetry[wid]))[0]) !== null && _b !== void 0 ? _b : 'ROOT'; | ||
} | ||
const [running, complete, error] = (0, ts_runtime_typecheck_1.asString)(indexableNewWorkerStatus[key]) | ||
.split(',') | ||
.map((componentID) => Math.max(0, +componentID)); // It's dodgy but assuming the componentID is a number happens all over the place | ||
results.push({ | ||
running, | ||
complete, | ||
error, | ||
step, | ||
name, | ||
}); | ||
} | ||
} | ||
this.experimentalWorkerStatus$.next(results); | ||
} | ||
}); | ||
} | ||
catch (err) { | ||
if (err instanceof Error) { | ||
this.log.error(`Failed to request JWT for websocket`, err.message); | ||
} | ||
throw err; | ||
} | ||
const socket = new socket_1.default(jwt, this.config.options); | ||
const { id_workflow_instance: id } = this.config.instance; | ||
if (id) { | ||
socket.watch(`workflow_instance:state:${id}`, this.updateWorkerStatus); | ||
} | ||
this.socket = socket; | ||
return socket; | ||
return this.mySocket; | ||
} | ||
async realtimeFeedback(channel, object) { | ||
const socket = await this.getSocket(); | ||
const socket = await this.socket(); | ||
socket.emit(channel, object); | ||
@@ -145,13 +132,2 @@ } | ||
} | ||
async stopEverything() { | ||
await this.stopAnalysis(); | ||
this.stopTransferTimers(); | ||
this.stopVisibilityTimers(); | ||
this.stopDownloadWorkers(); | ||
if (this.socket) { | ||
this.socket.destroy(); | ||
} | ||
this.stopTimer('summaryTelemetryInterval'); | ||
this.stopTimer('downloadCheckInterval'); | ||
} | ||
async stopAnalysis() { | ||
@@ -163,19 +139,18 @@ // If we stop the cloud, there's no point uploading anymore | ||
const { id_workflow_instance: idWorkflowInstance } = this.config.instance; | ||
if (!idWorkflowInstance) { | ||
return; | ||
} | ||
try { | ||
if (this.config.options.useGraphQL) { | ||
await this.graphQL.stopWorkflow({ variables: { idWorkflowInstance } }); | ||
if (idWorkflowInstance) { | ||
try { | ||
// TODO: Convert to GQL and switch on class-wide flag | ||
if (this.config.options.useGraphQL) { | ||
await this.graphQL.stopWorkflow({ variables: { idWorkflowInstance } }); | ||
} | ||
else { | ||
await this.REST.stopWorkflow(idWorkflowInstance); | ||
} | ||
this.analyseState$.next(false); | ||
} | ||
else { | ||
await this.REST.stopWorkflow(idWorkflowInstance); | ||
catch (err) { | ||
throw (0, NodeError_1.wrapAndLogError)('error stopping instance', err, this.log); | ||
} | ||
this.analyseState$.next(false); | ||
this.analyseState$.complete(); | ||
this.log.info(`workflow instance ${idWorkflowInstance} stopped`); | ||
} | ||
catch (err) { | ||
throw (0, NodeError_1.wrapAndLogError)('error stopping instance', err, this.log); | ||
} | ||
this.log.info(`workflow instance ${idWorkflowInstance} stopped`); | ||
} | ||
@@ -187,5 +162,7 @@ stopUpload() { | ||
this.uploadState$.next(false); | ||
this.uploadState$.complete(); | ||
} | ||
stopTransferTimers() { | ||
async stopEverything() { | ||
this.stopAnalysis(); | ||
// Moved this out of the main stopUpload because we don't want to stop it when we stop uploading | ||
// This is really 'stop fetching reports' | ||
for (const key in this.timers.transferTimeouts) { | ||
@@ -200,4 +177,2 @@ this.log.debug(`clearing transferTimeout for ${key}`); | ||
} | ||
} | ||
stopVisibilityTimers() { | ||
for (const key in this.timers.visibilityIntervals) { | ||
@@ -211,4 +186,2 @@ this.log.debug(`clearing visibilityInterval for ${key}`); | ||
} | ||
} | ||
async stopDownloadWorkers() { | ||
if (this.downloadWorkerPool) { | ||
@@ -219,2 +192,4 @@ this.log.debug('clearing downloadWorkerPool'); | ||
} | ||
this.stopTimer('summaryTelemetryInterval'); | ||
this.stopTimer('downloadCheckInterval'); | ||
} | ||
@@ -221,0 +196,0 @@ reportProgress() { |
@@ -21,7 +21,5 @@ "use strict"; | ||
const NodeError_1 = require("./NodeError"); | ||
const FileIndex_1 = require("./FileIndex"); | ||
function instantiateFileUpload(instance) { | ||
(0, ts_runtime_typecheck_1.assertDefined)(instance.config.workflow, 'Workflow'); | ||
const fileIndex = new FileIndex_1.FileIndex(); | ||
const splitFiles = new Set(); | ||
(0, ts_runtime_typecheck_1.assertDefined)(instance.db, 'Database'); | ||
const workflow = instance.config.workflow; | ||
@@ -33,2 +31,3 @@ const settings = readSettings(instance.config); | ||
} | ||
const { log: logger, db: database } = instance; | ||
const { inputFolders, outputFolder, filetype: filetypes } = instance.config.options; | ||
@@ -40,7 +39,6 @@ const { warnings, upload: state } = instance.states; | ||
warnings, | ||
fileIndex, | ||
splitFiles, | ||
database, | ||
instance, | ||
state, | ||
logger: instance.log, | ||
logger, | ||
hasStopped: false, | ||
@@ -51,3 +49,3 @@ stopped$, | ||
const scanner = createFileScanner({ | ||
fileIndex, | ||
database, | ||
inputFolders, | ||
@@ -88,5 +86,5 @@ filetypes, | ||
exports.instantiateFileUpload = instantiateFileUpload; | ||
function createFileScanner({ fileIndex, inputFolders, outputFolder, filetypes, context, }) { | ||
const filter = (location) => { | ||
const exists = context.splitFiles.has(location) || fileIndex.has(location); | ||
function createFileScanner({ database, inputFolders, outputFolder, filetypes, context, }) { | ||
const filter = async (location) => { | ||
const exists = await database.seenUpload(location); | ||
return !exists; | ||
@@ -168,9 +166,9 @@ }; | ||
async function processFile(ctx, file) { | ||
const { settings, state, splitFiles, fileIndex } = ctx; | ||
const { settings, state, database } = ctx; | ||
if (settings.maxFiles <= state.filesCount) { | ||
skipFile(file, ctx, fileUploader_type_1.FileUploadWarnings.TOO_MANY); | ||
await skipFile(file, ctx, fileUploader_type_1.FileUploadWarnings.TOO_MANY); | ||
return; | ||
} | ||
if (file.size === 0) { | ||
skipFile(file, ctx, fileUploader_type_1.FileUploadWarnings.EMPTY); | ||
await skipFile(file, ctx, fileUploader_type_1.FileUploadWarnings.EMPTY); | ||
return; | ||
@@ -187,3 +185,3 @@ } | ||
let chunkId = 0; | ||
await (0, fastq_1.splitter)(file.path, split, splitFiles, async (chunkFile) => { | ||
await (0, fastq_1.splitter)(file.path, split, async (chunkFile) => { | ||
if (ctx.hasStopped) { | ||
@@ -193,2 +191,3 @@ // NOTE should be caught by the queue and discarded | ||
} | ||
await database.splitFile(chunkFile, file.path); | ||
chunkId += 1; | ||
@@ -206,6 +205,7 @@ const name = path_1.default.basename(chunkFile); | ||
await uploadJob(ctx, chunkFilestat); | ||
await database.splitDone(chunkFile); | ||
}, isCompressed); | ||
state.filesCount += 1; | ||
// mark the original file as done | ||
fileIndex.add(file.path); | ||
await database.uploadFile(file.path); | ||
return; | ||
@@ -215,13 +215,12 @@ } | ||
if (file.size > settings.maxFileSize) { | ||
skipFile(file, ctx, fileUploader_type_1.FileUploadWarnings.TOO_BIG); | ||
await skipFile(file, ctx, fileUploader_type_1.FileUploadWarnings.TOO_BIG); | ||
return; | ||
} | ||
await uploadJob(ctx, file); | ||
fileIndex.add(file.path); | ||
state.filesCount += 1; | ||
} | ||
exports.processFile = processFile; | ||
function skipFile(file, ctx, warn) { | ||
async function skipFile(file, ctx, warn) { | ||
addFileWarning(file, ctx, warn); | ||
ctx.fileIndex.add(file.path); | ||
await ctx.database.skipFile(file.path); | ||
} | ||
@@ -428,2 +427,5 @@ exports.skipFile = skipFile; | ||
let components; | ||
if (!(0, ts_runtime_typecheck_1.isDictionaryOf)(ts_runtime_typecheck_1.isDictionary)(workflowInstance.chain.components)) { | ||
throw new Error('Unexpected chain definition'); | ||
} | ||
try { | ||
@@ -488,3 +490,3 @@ components = JSON.parse(JSON.stringify(workflowInstance.chain.components)); // low-frills object clone | ||
async function uploadComplete(ctx, objectId, file) { | ||
const { instance, logger } = ctx; | ||
const { instance, database, logger } = ctx; | ||
logger.info(`${file.id} uploaded to S3: ${objectId}`); | ||
@@ -506,2 +508,3 @@ const messageId = await messageInputQueue(ctx, objectId, file); | ||
logger.info(`${file.id} SQS message sent. Mark as uploaded`); | ||
await database.uploadFile(file.path); | ||
} |
@@ -9,22 +9,30 @@ "use strict"; | ||
const timers_1 = require("./timers"); | ||
const NodeError_1 = require("./NodeError"); | ||
class Socket { | ||
constructor(jwt, opts) { | ||
constructor(rest, opts) { | ||
this.debounces = new Set(); | ||
const { url, log, debounceWindow } = opts; | ||
this.socket = (0, socket_io_client_1.default)(url, { | ||
transportOptions: { | ||
polling: { | ||
extraHeaders: { | ||
Cookie: `x-epi2me-jwt=${jwt}`, | ||
this.debounceWindow = opts.debounceWindow; | ||
this.log = opts.log; | ||
this.initialise(rest, opts.url); | ||
} | ||
async initialise(rest, url) { | ||
try { | ||
const jwt = await rest.jwt(); | ||
this.socket = (0, socket_io_client_1.default)(url, { | ||
transportOptions: { | ||
polling: { | ||
extraHeaders: { | ||
Cookie: `x-epi2me-jwt=${jwt}`, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}); | ||
this.socket.on('connect', () => log.debug('socket ready')); | ||
this.debounceWindow = debounceWindow; | ||
this.log = log; | ||
}); | ||
this.socket.on('connect', () => { | ||
this.log.debug('socket ready'); | ||
}); | ||
} | ||
catch (err) { | ||
(0, NodeError_1.wrapAndLogError)('socket connection failed - JWT authentication error', err, this.log); | ||
} | ||
} | ||
destroy() { | ||
this.socket.disconnect(); | ||
} | ||
debounce(data, func) { | ||
@@ -39,9 +47,27 @@ if ((0, ts_runtime_typecheck_1.isDictionary)(data) && '_uuid' in data) { | ||
} | ||
func(data); | ||
if (func) { | ||
func(data); | ||
} | ||
} | ||
watch(chan, func) { | ||
this.socket.on(chan, (data) => this.debounce(data, func)); | ||
if (!this.socket) { | ||
this.log.debug(`socket not ready. requeueing watch on ${chan}`); | ||
setTimeout(() => { | ||
this.watch(chan, func); | ||
}, 1000); | ||
return; | ||
} | ||
this.socket.on(chan, (data) => { | ||
return this.debounce(data, func); | ||
}); | ||
} | ||
emit(chan, data) { | ||
this.log.debug(`socket emit ${chan}`, data); | ||
if (!this.socket) { | ||
this.log.debug(`socket not ready. requeueing emit on ${chan}`); | ||
setTimeout(() => { | ||
this.emit(chan, data); | ||
}, 1000); | ||
return; | ||
} | ||
this.log.debug(`socket emit ${chan} ${JSON.stringify(data)}`); | ||
this.socket.emit(chan, data); | ||
@@ -48,0 +74,0 @@ } |
@@ -56,3 +56,3 @@ "use strict"; | ||
*/ | ||
async function completeChunk(chunk, index, handler) { | ||
async function completeChunk(chunk, handler) { | ||
chunk.writer.end(); | ||
@@ -67,3 +67,2 @@ await chunk.closed; | ||
await fs_1.default.promises.unlink(chunk.location); | ||
index.delete(chunk.location); | ||
} | ||
@@ -103,5 +102,4 @@ catch (err) { | ||
exports.constructChunkLocation = constructChunkLocation; | ||
function createChunk(prefix, suffix, index, id, isCompressed) { | ||
function createChunk(prefix, suffix, id, isCompressed) { | ||
const location = constructChunkLocation(prefix, suffix, id); | ||
index.add(location); | ||
const { writer, closed } = createWriteStream(location, isCompressed); | ||
@@ -121,3 +119,3 @@ const chunk = { | ||
exports.createChunk = createChunk; | ||
async function splitter(filePath, opts, index, handler, isCompressed) { | ||
async function splitter(filePath, opts, handler, isCompressed) { | ||
var e_1, _a; | ||
@@ -158,3 +156,3 @@ // defaulting these values to infinity simplifies our checks | ||
chunkCounter += 1; | ||
chunk = createChunk(prefix, suffix, index, chunkCounter, isCompressed); | ||
chunk = createChunk(prefix, suffix, chunkCounter, isCompressed); | ||
} | ||
@@ -167,3 +165,3 @@ writeToChunk(chunk, lineBuffer); | ||
if (exceedsBytes || exceedsReads) { | ||
await completeChunk(chunk, index, handler); | ||
await completeChunk(chunk, handler); | ||
chunk = null; | ||
@@ -186,5 +184,5 @@ } | ||
if (chunk) { | ||
await completeChunk(chunk, index, handler); | ||
await completeChunk(chunk, handler); | ||
} | ||
} | ||
exports.splitter = splitter; |
@@ -13,2 +13,3 @@ "use strict"; | ||
const Logger_1 = require("./Logger"); | ||
const NodeError_1 = require("./NodeError"); | ||
axios_1.default.defaults.validateStatus = (status) => status <= 504; // Reject only if the status code is greater than or equal to 500 | ||
@@ -220,3 +221,17 @@ exports.utils = (function magic() { | ||
}, | ||
convertResponseToObject(data) { | ||
if (typeof data === 'object') { | ||
// already parsed | ||
return data; | ||
} | ||
else { | ||
try { | ||
return JSON.parse(data); | ||
} | ||
catch (err) { | ||
throw new NodeError_1.NestedError('exception parsing chain JSON', err); | ||
} | ||
} | ||
}, | ||
}; | ||
})(); |
@@ -1,2 +0,3 @@ | ||
import { asOptString, asString, asDictionary, asOptIndex, asOptDictionary, asDefined, asOptFunction, asOptNumber, isDefined, isDictionary, asOptStruct, makeString, isString, isUndefined, asOptDictionaryOf, isDictionaryOf, isArrayOf, assertDefined, } from 'ts-runtime-typecheck'; | ||
import { isDefined } from 'ts-runtime-typecheck'; | ||
import { asOptString, asString, asDictionary, asOptIndex, asOptDictionary, asDefined, asOptFunction, asOptNumber, makeString, isString, isUndefined, asOptDictionaryOf, isDictionaryOf, isArrayOf, assertDefined, } from 'ts-runtime-typecheck'; | ||
import AWS from 'aws-sdk'; | ||
@@ -6,2 +7,3 @@ import fs from 'fs-extra'; /* MC-565 handle EMFILE & EXDIR gracefully; use Promises */ | ||
import path from 'path'; | ||
import { DB } from './db'; | ||
import { EPI2ME } from './epi2me'; | ||
@@ -19,3 +21,3 @@ import { Factory } from './factory'; | ||
import { GraphQLFS } from './graphql-fs'; | ||
import { BehaviorSubject } from 'rxjs'; | ||
import { BehaviorSubject, Subject } from 'rxjs'; | ||
import { createQueue } from './queue'; | ||
@@ -38,10 +40,5 @@ import { filestats } from './filestats'; | ||
}; | ||
const asChain = asOptStruct({ | ||
components: isDictionaryOf(isDictionary), | ||
targetComponentId: isDefined, | ||
}); | ||
export class EPI2ME_FS extends EPI2ME { | ||
constructor(opts) { | ||
super(opts); // sets up this.config & this.log | ||
this.SampleReader = new SampleReader(); | ||
this.fetchToken = async () => { | ||
@@ -67,2 +64,3 @@ var _a; | ||
this.graphQL = new GraphQLFS(this.config.options); | ||
this.SampleReader = new SampleReader(); | ||
} | ||
@@ -217,5 +215,2 @@ sessionedS3(options = {}) { | ||
} | ||
validateChain(chain) { | ||
return asChain(isString(chain) ? JSON.parse(chain) : chain); | ||
} | ||
setClassConfigGQL(startData) { | ||
@@ -226,2 +221,3 @@ assertDefined(startData, 'Workflow Start Data'); | ||
const { workflowImage, outputqueue, keyId, startDate, idWorkflowInstance, mappedTelemetry, telemetryNames } = instance; | ||
const chain = isString(instance.chain) ? asString(instance.chain) : asDictionary(instance.chain); | ||
const regionName = workflowImage.region.name; | ||
@@ -244,3 +240,3 @@ const idWorkflow = asOptIndex(workflowImage.workflow.idWorkflow); | ||
bucketFolder: `${outputqueue}/${idUser}/${idWorkflowInstance}`, | ||
chain: this.validateChain(instance.chain), | ||
chain: utils.convertResponseToObject(chain), | ||
}; | ||
@@ -264,3 +260,6 @@ this.config.instance = Object.assign(Object.assign({}, this.config.instance), map); | ||
conf.summaryTelemetry = asOptDictionary(instance.telemetry); // MC-7056 for fetchTelemetry (summary) telemetry periodically | ||
conf.chain = this.validateChain(instance.chain); | ||
// WARN this assumes chain is an object, but could it be a string? | ||
if (instance.chain) { | ||
conf.chain = utils.convertResponseToObject(asDictionary(instance.chain)); | ||
} | ||
} | ||
@@ -334,2 +333,10 @@ // WARN | ||
} | ||
// NOTE don't need the database if we're running from a dataset | ||
if (inputFolders) { | ||
// set up new tracking database | ||
this.db = new DB(thisInstanceDir, { | ||
idWorkflowInstance: makeString(this.config.instance.id_workflow_instance), | ||
inputFolders, | ||
}, this.log); | ||
} | ||
// MC-1828 - include instance id in telemetry file name | ||
@@ -441,2 +448,18 @@ const fileName = this.config.instance.id_workflow_instance | ||
} | ||
async stopUpload() { | ||
super.stopUpload(); | ||
this.log.debug('clearing split files'); | ||
if (this.db) { | ||
await this.db.splitClean(); // remove any split files whose transfers were disrupted and which didn't self-clean | ||
} | ||
} | ||
async stopAnalysis() { | ||
var _a, _b; | ||
await super.stopAnalysis(); | ||
(_a = this.telemetryDestroySignal$) === null || _a === void 0 ? void 0 : _a.next(); | ||
(_b = this.telemetryDestroySignal$) === null || _b === void 0 ? void 0 : _b.complete(); | ||
} | ||
async stopEverything() { | ||
await super.stopEverything(); | ||
} | ||
async checkForDownloads() { | ||
@@ -860,3 +883,4 @@ if (this.checkForDownloadsRunning) { | ||
// const reports$ = this.telemetry.telemetryReports$().pipe(filter(isDefined)); | ||
const { add: writeQueue } = createQueue({ signal$: this.analysisStopped$ }, async ([filePath, content]) => { | ||
const destroySignal$ = new Subject(); | ||
const { add: writeQueue } = createQueue({ signal$: destroySignal$ }, async ([filePath, content]) => { | ||
try { | ||
@@ -871,3 +895,3 @@ await fs.writeJSON(filePath, content); | ||
this.telemetry.anyReportsReady$ | ||
.pipe(filter((isReady) => isReady), first(), takeUntil(this.analysisStopped$)) | ||
.pipe(filter((isReady) => isReady), first(), takeUntil(destroySignal$)) | ||
.subscribe(() => this.reportState$.next(true)); | ||
@@ -878,3 +902,3 @@ // pass all telemetry fils to public instanceTelemetry$ subject | ||
return reports.map(({ report }) => report); | ||
}), takeUntil(this.analysisStopped$)) | ||
}), takeUntil(destroySignal$)) | ||
.subscribe((reports) => this.instanceTelemetry$.next(reports)); | ||
@@ -907,2 +931,3 @@ const previousReports$ = new BehaviorSubject([]); | ||
}); | ||
this.telemetryDestroySignal$ = destroySignal$; | ||
} | ||
@@ -909,0 +934,0 @@ async fetchTelemetry() { |
@@ -7,3 +7,3 @@ import { BehaviorSubject, combineLatest } from 'rxjs'; | ||
import { utils } from './utils'; | ||
import { asDictionary, asString, asNumber, asIndexable, asIndex, asDefined } from 'ts-runtime-typecheck'; | ||
import { asDictionary, asString, asNumber, asIndexable, asIndex, asOptDictionary, asDefined, } from 'ts-runtime-typecheck'; | ||
import { createUploadState, createDownloadState } from './epi2me-state'; | ||
@@ -39,35 +39,2 @@ import { createInterval } from './timers'; | ||
this.liveStates$ = new BehaviorSubject(this.states); | ||
this.updateWorkerStatus = (newWorkerStatus) => { | ||
var _a, _b; | ||
const { instance: instanceConfig } = this.config; | ||
const components = (_a = instanceConfig.chain) === null || _a === void 0 ? void 0 : _a.components; | ||
if (!components) { | ||
return; | ||
} | ||
const summaryTelemetry = asDictionary(instanceConfig.summaryTelemetry); | ||
const workerStatus = Object.entries(components).sort((a, b) => parseInt(a[0], 10) - parseInt(b[0], 10)); | ||
const indexableNewWorkerStatus = asIndexable(newWorkerStatus); | ||
const results = []; | ||
for (const [key, value] of workerStatus) { | ||
if (key in indexableNewWorkerStatus) { | ||
const step = +key; | ||
let name = 'ROOT'; | ||
if (step !== 0) { | ||
const wid = asIndex(asDictionary(value).wid); | ||
name = (_b = Object.keys(asDictionary(summaryTelemetry[wid]))[0]) !== null && _b !== void 0 ? _b : 'ROOT'; | ||
} | ||
const [running, complete, error] = asString(indexableNewWorkerStatus[key]) | ||
.split(',') | ||
.map((componentID) => Math.max(0, +componentID)); // It's dodgy but assuming the componentID is a number happens all over the place | ||
results.push({ | ||
running, | ||
complete, | ||
error, | ||
step, | ||
name, | ||
}); | ||
} | ||
} | ||
this.experimentalWorkerStatus$.next(results); | ||
}; | ||
const options = parseOptions(optstring); | ||
@@ -92,26 +59,46 @@ const { idWorkflowInstance, log, region } = options; | ||
} | ||
async getSocket() { | ||
if (this.socket) { | ||
return this.socket; | ||
async socket() { | ||
if (this.mySocket) { | ||
return this.mySocket; | ||
} | ||
let jwt; | ||
try { | ||
jwt = await this.REST.jwt(); | ||
this.mySocket = new Socket(this.REST, this.config.options); | ||
const { id_workflow_instance: idWorkflowInstance } = this.config.instance; | ||
if (idWorkflowInstance) { | ||
this.mySocket.watch(`workflow_instance:state:${idWorkflowInstance}`, (newWorkerStatus) => { | ||
var _a, _b; | ||
const { instance: instanceConfig } = this.config; | ||
const components = asOptDictionary((_a = instanceConfig.chain) === null || _a === void 0 ? void 0 : _a.components); | ||
if (components) { | ||
const summaryTelemetry = asDictionary(instanceConfig.summaryTelemetry); | ||
const workerStatus = Object.entries(components).sort((a, b) => parseInt(a[0], 10) - parseInt(b[0], 10)); | ||
const indexableNewWorkerStatus = asIndexable(newWorkerStatus); | ||
const results = []; | ||
for (const [key, value] of workerStatus) { | ||
if (key in indexableNewWorkerStatus) { | ||
const step = +key; | ||
let name = 'ROOT'; | ||
if (step !== 0) { | ||
const wid = asIndex(asDictionary(value).wid); | ||
name = (_b = Object.keys(asDictionary(summaryTelemetry[wid]))[0]) !== null && _b !== void 0 ? _b : 'ROOT'; | ||
} | ||
const [running, complete, error] = asString(indexableNewWorkerStatus[key]) | ||
.split(',') | ||
.map((componentID) => Math.max(0, +componentID)); // It's dodgy but assuming the componentID is a number happens all over the place | ||
results.push({ | ||
running, | ||
complete, | ||
error, | ||
step, | ||
name, | ||
}); | ||
} | ||
} | ||
this.experimentalWorkerStatus$.next(results); | ||
} | ||
}); | ||
} | ||
catch (err) { | ||
if (err instanceof Error) { | ||
this.log.error(`Failed to request JWT for websocket`, err.message); | ||
} | ||
throw err; | ||
} | ||
const socket = new Socket(jwt, this.config.options); | ||
const { id_workflow_instance: id } = this.config.instance; | ||
if (id) { | ||
socket.watch(`workflow_instance:state:${id}`, this.updateWorkerStatus); | ||
} | ||
this.socket = socket; | ||
return socket; | ||
return this.mySocket; | ||
} | ||
async realtimeFeedback(channel, object) { | ||
const socket = await this.getSocket(); | ||
const socket = await this.socket(); | ||
socket.emit(channel, object); | ||
@@ -140,13 +127,2 @@ } | ||
} | ||
async stopEverything() { | ||
await this.stopAnalysis(); | ||
this.stopTransferTimers(); | ||
this.stopVisibilityTimers(); | ||
this.stopDownloadWorkers(); | ||
if (this.socket) { | ||
this.socket.destroy(); | ||
} | ||
this.stopTimer('summaryTelemetryInterval'); | ||
this.stopTimer('downloadCheckInterval'); | ||
} | ||
async stopAnalysis() { | ||
@@ -158,19 +134,18 @@ // If we stop the cloud, there's no point uploading anymore | ||
const { id_workflow_instance: idWorkflowInstance } = this.config.instance; | ||
if (!idWorkflowInstance) { | ||
return; | ||
} | ||
try { | ||
if (this.config.options.useGraphQL) { | ||
await this.graphQL.stopWorkflow({ variables: { idWorkflowInstance } }); | ||
if (idWorkflowInstance) { | ||
try { | ||
// TODO: Convert to GQL and switch on class-wide flag | ||
if (this.config.options.useGraphQL) { | ||
await this.graphQL.stopWorkflow({ variables: { idWorkflowInstance } }); | ||
} | ||
else { | ||
await this.REST.stopWorkflow(idWorkflowInstance); | ||
} | ||
this.analyseState$.next(false); | ||
} | ||
else { | ||
await this.REST.stopWorkflow(idWorkflowInstance); | ||
catch (err) { | ||
throw wrapAndLogError('error stopping instance', err, this.log); | ||
} | ||
this.analyseState$.next(false); | ||
this.analyseState$.complete(); | ||
this.log.info(`workflow instance ${idWorkflowInstance} stopped`); | ||
} | ||
catch (err) { | ||
throw wrapAndLogError('error stopping instance', err, this.log); | ||
} | ||
this.log.info(`workflow instance ${idWorkflowInstance} stopped`); | ||
} | ||
@@ -182,5 +157,7 @@ stopUpload() { | ||
this.uploadState$.next(false); | ||
this.uploadState$.complete(); | ||
} | ||
stopTransferTimers() { | ||
async stopEverything() { | ||
this.stopAnalysis(); | ||
// Moved this out of the main stopUpload because we don't want to stop it when we stop uploading | ||
// This is really 'stop fetching reports' | ||
for (const key in this.timers.transferTimeouts) { | ||
@@ -195,4 +172,2 @@ this.log.debug(`clearing transferTimeout for ${key}`); | ||
} | ||
} | ||
stopVisibilityTimers() { | ||
for (const key in this.timers.visibilityIntervals) { | ||
@@ -206,4 +181,2 @@ this.log.debug(`clearing visibilityInterval for ${key}`); | ||
} | ||
} | ||
async stopDownloadWorkers() { | ||
if (this.downloadWorkerPool) { | ||
@@ -214,2 +187,4 @@ this.log.debug('clearing downloadWorkerPool'); | ||
} | ||
this.stopTimer('summaryTelemetryInterval'); | ||
this.stopTimer('downloadCheckInterval'); | ||
} | ||
@@ -216,0 +191,0 @@ reportProgress() { |
@@ -9,3 +9,3 @@ import { loadInputFiles } from './inputScanner'; | ||
import { niceSize } from './niceSize'; | ||
import { asNumber, asOptDictionary, asOptDictionaryOf, asOptString, assertDefined, isArray, isDefined, makeNumber, } from 'ts-runtime-typecheck'; | ||
import { asNumber, asOptDictionary, asOptDictionaryOf, asOptString, assertDefined, isArray, isDefined, isDictionary, isDictionaryOf, makeNumber, } from 'ts-runtime-typecheck'; | ||
import { first } from 'rxjs/operators'; | ||
@@ -16,7 +16,5 @@ import path from 'path'; | ||
import { getErrorMessage, NestedError, wrapAndLogError } from './NodeError'; | ||
import { FileIndex } from './FileIndex'; | ||
export function instantiateFileUpload(instance) { | ||
assertDefined(instance.config.workflow, 'Workflow'); | ||
const fileIndex = new FileIndex(); | ||
const splitFiles = new Set(); | ||
assertDefined(instance.db, 'Database'); | ||
const workflow = instance.config.workflow; | ||
@@ -28,2 +26,3 @@ const settings = readSettings(instance.config); | ||
} | ||
const { log: logger, db: database } = instance; | ||
const { inputFolders, outputFolder, filetype: filetypes } = instance.config.options; | ||
@@ -35,7 +34,6 @@ const { warnings, upload: state } = instance.states; | ||
warnings, | ||
fileIndex, | ||
splitFiles, | ||
database, | ||
instance, | ||
state, | ||
logger: instance.log, | ||
logger, | ||
hasStopped: false, | ||
@@ -46,3 +44,3 @@ stopped$, | ||
const scanner = createFileScanner({ | ||
fileIndex, | ||
database, | ||
inputFolders, | ||
@@ -82,5 +80,5 @@ filetypes, | ||
} | ||
export function createFileScanner({ fileIndex, inputFolders, outputFolder, filetypes, context, }) { | ||
const filter = (location) => { | ||
const exists = context.splitFiles.has(location) || fileIndex.has(location); | ||
export function createFileScanner({ database, inputFolders, outputFolder, filetypes, context, }) { | ||
const filter = async (location) => { | ||
const exists = await database.seenUpload(location); | ||
return !exists; | ||
@@ -160,9 +158,9 @@ }; | ||
export async function processFile(ctx, file) { | ||
const { settings, state, splitFiles, fileIndex } = ctx; | ||
const { settings, state, database } = ctx; | ||
if (settings.maxFiles <= state.filesCount) { | ||
skipFile(file, ctx, FileUploadWarnings.TOO_MANY); | ||
await skipFile(file, ctx, FileUploadWarnings.TOO_MANY); | ||
return; | ||
} | ||
if (file.size === 0) { | ||
skipFile(file, ctx, FileUploadWarnings.EMPTY); | ||
await skipFile(file, ctx, FileUploadWarnings.EMPTY); | ||
return; | ||
@@ -179,3 +177,3 @@ } | ||
let chunkId = 0; | ||
await splitter(file.path, split, splitFiles, async (chunkFile) => { | ||
await splitter(file.path, split, async (chunkFile) => { | ||
if (ctx.hasStopped) { | ||
@@ -185,2 +183,3 @@ // NOTE should be caught by the queue and discarded | ||
} | ||
await database.splitFile(chunkFile, file.path); | ||
chunkId += 1; | ||
@@ -198,6 +197,7 @@ const name = path.basename(chunkFile); | ||
await uploadJob(ctx, chunkFilestat); | ||
await database.splitDone(chunkFile); | ||
}, isCompressed); | ||
state.filesCount += 1; | ||
// mark the original file as done | ||
fileIndex.add(file.path); | ||
await database.uploadFile(file.path); | ||
return; | ||
@@ -207,12 +207,11 @@ } | ||
if (file.size > settings.maxFileSize) { | ||
skipFile(file, ctx, FileUploadWarnings.TOO_BIG); | ||
await skipFile(file, ctx, FileUploadWarnings.TOO_BIG); | ||
return; | ||
} | ||
await uploadJob(ctx, file); | ||
fileIndex.add(file.path); | ||
state.filesCount += 1; | ||
} | ||
export function skipFile(file, ctx, warn) { | ||
export async function skipFile(file, ctx, warn) { | ||
addFileWarning(file, ctx, warn); | ||
ctx.fileIndex.add(file.path); | ||
await ctx.database.skipFile(file.path); | ||
} | ||
@@ -411,2 +410,5 @@ export function addWarning(ctx, warn, msg) { | ||
let components; | ||
if (!isDictionaryOf(isDictionary)(workflowInstance.chain.components)) { | ||
throw new Error('Unexpected chain definition'); | ||
} | ||
try { | ||
@@ -471,3 +473,3 @@ components = JSON.parse(JSON.stringify(workflowInstance.chain.components)); // low-frills object clone | ||
async function uploadComplete(ctx, objectId, file) { | ||
const { instance, logger } = ctx; | ||
const { instance, database, logger } = ctx; | ||
logger.info(`${file.id} uploaded to S3: ${objectId}`); | ||
@@ -489,2 +491,3 @@ const messageId = await messageInputQueue(ctx, objectId, file); | ||
logger.info(`${file.id} SQS message sent. Mark as uploaded`); | ||
await database.uploadFile(file.path); | ||
} |
import io from 'socket.io-client'; | ||
import { isDictionary } from 'ts-runtime-typecheck'; | ||
import { createTimeout } from './timers'; | ||
import { wrapAndLogError } from './NodeError'; | ||
export default class Socket { | ||
constructor(jwt, opts) { | ||
constructor(rest, opts) { | ||
this.debounces = new Set(); | ||
const { url, log, debounceWindow } = opts; | ||
this.socket = io(url, { | ||
transportOptions: { | ||
polling: { | ||
extraHeaders: { | ||
Cookie: `x-epi2me-jwt=${jwt}`, | ||
this.debounceWindow = opts.debounceWindow; | ||
this.log = opts.log; | ||
this.initialise(rest, opts.url); | ||
} | ||
async initialise(rest, url) { | ||
try { | ||
const jwt = await rest.jwt(); | ||
this.socket = io(url, { | ||
transportOptions: { | ||
polling: { | ||
extraHeaders: { | ||
Cookie: `x-epi2me-jwt=${jwt}`, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}); | ||
this.socket.on('connect', () => log.debug('socket ready')); | ||
this.debounceWindow = debounceWindow; | ||
this.log = log; | ||
}); | ||
this.socket.on('connect', () => { | ||
this.log.debug('socket ready'); | ||
}); | ||
} | ||
catch (err) { | ||
wrapAndLogError('socket connection failed - JWT authentication error', err, this.log); | ||
} | ||
} | ||
destroy() { | ||
this.socket.disconnect(); | ||
} | ||
debounce(data, func) { | ||
@@ -33,11 +41,29 @@ if (isDictionary(data) && '_uuid' in data) { | ||
} | ||
func(data); | ||
if (func) { | ||
func(data); | ||
} | ||
} | ||
watch(chan, func) { | ||
this.socket.on(chan, (data) => this.debounce(data, func)); | ||
if (!this.socket) { | ||
this.log.debug(`socket not ready. requeueing watch on ${chan}`); | ||
setTimeout(() => { | ||
this.watch(chan, func); | ||
}, 1000); | ||
return; | ||
} | ||
this.socket.on(chan, (data) => { | ||
return this.debounce(data, func); | ||
}); | ||
} | ||
emit(chan, data) { | ||
this.log.debug(`socket emit ${chan}`, data); | ||
if (!this.socket) { | ||
this.log.debug(`socket not ready. requeueing emit on ${chan}`); | ||
setTimeout(() => { | ||
this.emit(chan, data); | ||
}, 1000); | ||
return; | ||
} | ||
this.log.debug(`socket emit ${chan} ${JSON.stringify(data)}`); | ||
this.socket.emit(chan, data); | ||
} | ||
} |
@@ -47,3 +47,3 @@ var __asyncValues = (this && this.__asyncValues) || function (o) { | ||
*/ | ||
export async function completeChunk(chunk, index, handler) { | ||
export async function completeChunk(chunk, handler) { | ||
chunk.writer.end(); | ||
@@ -58,3 +58,2 @@ await chunk.closed; | ||
await fs.promises.unlink(chunk.location); | ||
index.delete(chunk.location); | ||
} | ||
@@ -91,5 +90,4 @@ catch (err) { | ||
} | ||
export function createChunk(prefix, suffix, index, id, isCompressed) { | ||
export function createChunk(prefix, suffix, id, isCompressed) { | ||
const location = constructChunkLocation(prefix, suffix, id); | ||
index.add(location); | ||
const { writer, closed } = createWriteStream(location, isCompressed); | ||
@@ -108,3 +106,3 @@ const chunk = { | ||
} | ||
export async function splitter(filePath, opts, index, handler, isCompressed) { | ||
export async function splitter(filePath, opts, handler, isCompressed) { | ||
var e_1, _a; | ||
@@ -145,3 +143,3 @@ // defaulting these values to infinity simplifies our checks | ||
chunkCounter += 1; | ||
chunk = createChunk(prefix, suffix, index, chunkCounter, isCompressed); | ||
chunk = createChunk(prefix, suffix, chunkCounter, isCompressed); | ||
} | ||
@@ -154,3 +152,3 @@ writeToChunk(chunk, lineBuffer); | ||
if (exceedsBytes || exceedsReads) { | ||
await completeChunk(chunk, index, handler); | ||
await completeChunk(chunk, handler); | ||
chunk = null; | ||
@@ -173,4 +171,4 @@ } | ||
if (chunk) { | ||
await completeChunk(chunk, index, handler); | ||
await completeChunk(chunk, handler); | ||
} | ||
} |
@@ -7,2 +7,3 @@ import axios from 'axios'; | ||
import { NoopLogger } from './Logger'; | ||
import { NestedError } from './NodeError'; | ||
axios.defaults.validateStatus = (status) => status <= 504; // Reject only if the status code is greater than or equal to 500 | ||
@@ -214,3 +215,17 @@ export const utils = (function magic() { | ||
}, | ||
convertResponseToObject(data) { | ||
if (typeof data === 'object') { | ||
// already parsed | ||
return data; | ||
} | ||
else { | ||
try { | ||
return JSON.parse(data); | ||
} | ||
catch (err) { | ||
throw new NestedError('exception parsing chain JSON', err); | ||
} | ||
} | ||
}, | ||
}; | ||
})(); |
@@ -12,3 +12,3 @@ { | ||
"private": false, | ||
"version": "5.1.6028772", | ||
"version": "5.1.6058802", | ||
"main": "cjs/index-web.js", | ||
@@ -15,0 +15,0 @@ "module": "esm/index-web.js", |
@@ -49,6 +49,3 @@ import type { Index, Dictionary, UnknownFunction } from 'ts-runtime-typecheck'; | ||
telemetryNames?: Dictionary<Dictionary<string>>; | ||
chain?: { | ||
components: Dictionary<Dictionary>; | ||
targetComponentId: unknown; | ||
}; | ||
chain?: Dictionary; | ||
key_id?: string; | ||
@@ -55,0 +52,0 @@ awssettings: { |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
3058178
214
28325
155