@5minds/processcube_engine_client
Advanced tools
Comparing version 5.1.10-hotfix-249644-m73efwy3 to 5.1.10-hotfix-d73948-m767pifq
@@ -12,11 +12,9 @@ "use strict"; | ||
topic; | ||
abortSignal; | ||
abortExecutionController = new AbortController(); | ||
customErrorHandler; | ||
interval; | ||
abortSignalSubscription; | ||
awaitAbortSignal; | ||
abortSignalResolver; | ||
identityAccessor; | ||
logger; | ||
constructor(externalTask, processingFunction, externalTaskClient, config, topic, abortSignal, customErrorHandler) { | ||
isDisposed = false; | ||
constructor(externalTask, processingFunction, externalTaskClient, config, topic, customErrorHandler) { | ||
this.externalTask = externalTask; | ||
@@ -28,3 +26,2 @@ this.processingFunction = processingFunction; | ||
this.topic = topic; | ||
this.abortSignal = abortSignal; | ||
this.customErrorHandler = customErrorHandler; | ||
@@ -37,13 +34,17 @@ this.logger = new processcube_engine_sdk_1.Logger('external_task_execution', { | ||
} | ||
get disposed() { | ||
return this.isDisposed; | ||
} | ||
async execute() { | ||
try { | ||
this.abortSignal.throwIfAborted(); | ||
this.startAbortSignalSubscription(); | ||
this.startExtendLockInterval(); | ||
const result = await Promise.race([this.processingFunction(this.externalTask.payload, this.externalTask, this.abortSignal), this.awaitAbortSignal]); | ||
this.abortSignal.throwIfAborted(); | ||
this.stopLockingInterval(); | ||
const result = await this.processingFunction(this.externalTask.payload, this.externalTask, this.abortExecutionController.signal); | ||
this.stopExtendLockInterval(); | ||
if (this.abortExecutionController.signal.aborted) { | ||
return; | ||
} | ||
if (result) { | ||
await this.processResult(result); | ||
} | ||
this.dispose(); | ||
} | ||
@@ -54,20 +55,34 @@ catch (error) { | ||
await this.handleExternalTaskExecutionError(error); | ||
} | ||
finally { | ||
this.dispose(); | ||
} | ||
} | ||
abort() { | ||
this.logger.debug(`Abort Signal has been triggered for External Task Executor ${this.externalTask.id}`); | ||
this.dispose(); | ||
} | ||
async dispose() { | ||
this.logger.debug(`Disposong Executor for External Task ${this.externalTask.id}`); | ||
this.stopExtendLockInterval(); | ||
if (!this.abortExecutionController.signal.aborted) { | ||
this.abortExecutionController.abort(); | ||
} | ||
this.isDisposed = true; | ||
// we explicitly do not dispose the identityAccessor here, since it is shared with the ExternalTaskWorker | ||
} | ||
async processResult(result) { | ||
if (this.abortSignal.aborted) { | ||
if (this.abortExecutionController.signal.aborted) { | ||
return; | ||
} | ||
this.logger.trace(`Processing External Task Result ${this.externalTask.id}`, { externalTask: this.externalTask, result: result }); | ||
try { | ||
if (result instanceof processcube_engine_sdk_1.ExternalTaskError || result instanceof Error) { | ||
this.logger.trace(`External Task Result ${this.externalTask.id} is an error. The task will be finished as failed.`, { externalTask: this.externalTask, result: result }); | ||
return await this.handleExternalTaskExecutionError(result); | ||
} | ||
return await this.externalTaskClient.finishExternalTask(this.config.workerId, this.externalTask.id, result, await this.identityAccessor.getIdentity(), this.abortSignal); | ||
this.logger.trace(`Finishing ${this.externalTask.id} regularly.`, { externalTask: this.externalTask, result: result }); | ||
return await this.externalTaskClient.finishExternalTask(this.config.workerId, this.externalTask.id, result, await this.identityAccessor.getIdentity(), this.abortExecutionController.signal); | ||
} | ||
catch (error) { | ||
const errorIsRetryable = await this.evaluateError('finishExternalTask', error); | ||
if (errorIsRetryable && !this.abortSignal.aborted) { | ||
if (errorIsRetryable && !this.abortExecutionController.signal.aborted) { | ||
await this.sleep(1000); | ||
@@ -79,6 +94,7 @@ return await this.processResult(result); | ||
async handleExternalTaskExecutionError(error) { | ||
if (this.abortSignal.aborted) { | ||
if (this.abortExecutionController.signal.aborted) { | ||
return; | ||
} | ||
try { | ||
this.logger.trace(`Reporting Error for External Task ${this.externalTask.id} to Engine`); | ||
const workerError = new processcube_engine_sdk_1.ExternalTaskError(error?.errorCode || error?.name || 'ExternalTaskExecutionError', error?.errorMessage || error?.message || 'An error occurred while processing the external task.', error?.errorDetails || error?.stack || 'No error details available.'); | ||
@@ -89,3 +105,3 @@ return await this.externalTaskClient.handleError(this.config.workerId, this.externalTask.id, workerError, await this.identityAccessor.getIdentity()); | ||
const errorIsRetryable = await this.evaluateError('finishExternalTask', error); | ||
if (errorIsRetryable && !this.abortSignal.aborted) { | ||
if (errorIsRetryable && !this.abortExecutionController.signal.aborted) { | ||
await this.sleep(1000); | ||
@@ -96,12 +112,18 @@ return await this.handleExternalTaskExecutionError(error); | ||
} | ||
startExtendLockInterval() { | ||
const lockExtensionBuffer = 5000; | ||
this.logger.trace(`Starting automatic Lock extension for External Task ${this.externalTask.id}`); | ||
this.interval = setInterval(async () => this.extendLocks(this.externalTask), this.config.lockDuration - lockExtensionBuffer); | ||
} | ||
async extendLocks(externalTask) { | ||
if (this.abortSignal.aborted) { | ||
if (this.abortExecutionController.signal.aborted) { | ||
return; | ||
} | ||
try { | ||
await this.externalTaskClient.extendLock(this.config.workerId, externalTask.id, this.config.lockDuration, await this.identityAccessor.getIdentity(), this.abortSignal); | ||
this.logger.debug(`Extending Lock on External Task ${this.externalTask.id}`); | ||
await this.externalTaskClient.extendLock(this.config.workerId, externalTask.id, this.config.lockDuration, await this.identityAccessor.getIdentity(), this.abortExecutionController.signal); | ||
} | ||
catch (error) { | ||
const errorIsRetryable = await this.evaluateError('extendLock', error); | ||
if (errorIsRetryable && !this.abortSignal.aborted) { | ||
if (errorIsRetryable && !this.abortExecutionController.signal.aborted) { | ||
await this.sleep(1000); | ||
@@ -112,8 +134,4 @@ await this.extendLocks(externalTask); | ||
} | ||
async dispose() { | ||
this.removeAbortSignalSubscription(); | ||
this.stopLockingInterval(); | ||
// we explicitly do not dispose the identityAccessor here, since it is shared with the ExternalTaskWorker | ||
} | ||
stopLockingInterval() { | ||
stopExtendLockInterval() { | ||
this.logger.debug(`Disposing automatic Lock Extension for External Task ${this.externalTask.id}`); | ||
if (this.interval) { | ||
@@ -124,28 +142,15 @@ clearInterval(this.interval); | ||
} | ||
startAbortSignalSubscription() { | ||
this.awaitAbortSignal = new Promise((resolve) => (this.abortSignalResolver = resolve)); | ||
this.abortSignalSubscription = () => { | ||
if (this.abortSignalResolver) { | ||
this.abortSignalResolver(); | ||
} | ||
this.dispose(); | ||
}; | ||
this.abortSignal.addEventListener('abort', this.abortSignalSubscription); | ||
} | ||
removeAbortSignalSubscription() { | ||
if (this.abortSignalSubscription) { | ||
this.abortSignal.removeEventListener('abort', this.abortSignalSubscription); | ||
async evaluateError(errorType, error) { | ||
if (this.abortExecutionController.signal.aborted) { | ||
return false; | ||
} | ||
if (this.abortSignalResolver) { | ||
this.abortSignalResolver(); | ||
} | ||
} | ||
startExtendLockInterval() { | ||
const lockExtensionBuffer = 5000; | ||
this.interval = setInterval(async () => this.extendLocks(this.externalTask), this.config.lockDuration - lockExtensionBuffer); | ||
} | ||
async evaluateError(errorType, error) { | ||
if (typeof this.customErrorHandler === 'function') { | ||
this.logger.trace(`Posting error for external task ${this.externalTask.id} to subscribed custom error handlers`, { errorType: errorType, err: error }); | ||
this.customErrorHandler(errorType, error, this.externalTask); | ||
} | ||
if (error.code === 409) { | ||
this.logger.warn(`Failed to run ${errorType} for External Task ${this.externalTask.id} because the task has already been finished. Aborting now, since there is nothing more to do here.`, { err: error }); | ||
this.abort(); | ||
return false; | ||
} | ||
this.logger.warn(`An error occured while executing ${errorType} for External Task ${this.externalTask.id}`, { err: error }); | ||
@@ -152,0 +157,0 @@ if (error.message?.match(/expired/i) && this.identityAccessor.canIdentityBeRefreshed()) { |
@@ -58,2 +58,3 @@ "use strict"; | ||
abortSignal; | ||
activelyProcessedExternalTasks = {}; | ||
constructor(engineUrl, topic, processingFunction, config) { | ||
@@ -101,2 +102,6 @@ this._workerId = config?.workerId ?? uuid.v4(); | ||
this.abortSignal = this.abortController.signal; | ||
this.abortSignal.onabort = () => { | ||
Object.values(this.activelyProcessedExternalTasks).forEach((executor) => executor.abort()); | ||
this.activelyProcessedExternalTasks = {}; | ||
}; | ||
this.processExternalTasks(); | ||
@@ -110,3 +115,10 @@ } | ||
} | ||
abortExternalTaskIfPresent(externalTaskId) { | ||
if (this.activelyProcessedExternalTasks[externalTaskId] != null) { | ||
this.activelyProcessedExternalTasks[externalTaskId].abort(); | ||
delete this.activelyProcessedExternalTasks[externalTaskId]; | ||
} | ||
} | ||
dispose() { | ||
this.stop(); | ||
this.externalTaskClient.dispose(); | ||
@@ -168,3 +180,4 @@ this.identityAccessor?.dispose(); | ||
workerId: this.workerId, | ||
}, this.topic, this.abortSignal, this.customErrorHandler); | ||
}, this.topic, this.customErrorHandler); | ||
this.activelyProcessedExternalTasks[externalTask.id] = externalTaskExecution; | ||
await externalTaskExecution.execute(); | ||
@@ -178,2 +191,5 @@ } | ||
} | ||
finally { | ||
delete this.activelyProcessedExternalTasks[externalTask.id]; | ||
} | ||
} | ||
@@ -180,0 +196,0 @@ async sleep(milliseconds) { |
@@ -10,22 +10,20 @@ import { ExternalTask } from '@5minds/processcube_engine_sdk'; | ||
private topic; | ||
private abortSignal; | ||
private abortExecutionController; | ||
private customErrorHandler?; | ||
private interval; | ||
private abortSignalSubscription; | ||
private awaitAbortSignal; | ||
private abortSignalResolver; | ||
private identityAccessor; | ||
private logger; | ||
constructor(externalTask: ExternalTask<TExternalTaskPayload>, processingFunction: HandleExternalTaskAction<TExternalTaskPayload, TResultPayload>, externalTaskClient: ExternalTaskApiClient, config: IExternalTaskWorkerConfig, topic: string, abortSignal: AbortSignal, customErrorHandler?: WorkerErrorHandler); | ||
private isDisposed; | ||
constructor(externalTask: ExternalTask<TExternalTaskPayload>, processingFunction: HandleExternalTaskAction<TExternalTaskPayload, TResultPayload>, externalTaskClient: ExternalTaskApiClient, config: IExternalTaskWorkerConfig, topic: string, customErrorHandler?: WorkerErrorHandler); | ||
get disposed(): boolean; | ||
execute(): Promise<void>; | ||
abort(): void; | ||
private dispose; | ||
private processResult; | ||
private handleExternalTaskExecutionError; | ||
private startExtendLockInterval; | ||
private extendLocks; | ||
private dispose; | ||
private stopLockingInterval; | ||
private startAbortSignalSubscription; | ||
private removeAbortSignalSubscription; | ||
private startExtendLockInterval; | ||
private stopExtendLockInterval; | ||
private evaluateError; | ||
private sleep; | ||
} |
@@ -18,2 +18,3 @@ import { Identity } from '@5minds/processcube_engine_sdk'; | ||
private abortSignal; | ||
private activelyProcessedExternalTasks; | ||
constructor(engineUrl: string, topic: string, processingFunction: HandleExternalTaskAction<TExternalTaskPayload, TResultPayload>, config?: IExternalTaskWorkerConfig); | ||
@@ -35,2 +36,3 @@ /** | ||
stop(): void; | ||
abortExternalTaskIfPresent(externalTaskId: string): void; | ||
dispose(): void; | ||
@@ -37,0 +39,0 @@ private processExternalTasks; |
@@ -7,3 +7,3 @@ { | ||
"name": "@5minds/processcube_engine_client", | ||
"version": "5.1.10-hotfix-249644-m73efwy3", | ||
"version": "5.1.10-hotfix-d73948-m767pifq", | ||
"description": "Contains a typescript based client for accessing the Engine.", | ||
@@ -10,0 +10,0 @@ "main": "dist/commonjs/index.js", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
362763
5152