New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@5minds/processcube_engine_client

Package Overview
Dependencies
Maintainers
0
Versions
711
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@5minds/processcube_engine_client - npm Package Compare versions

Comparing version 5.1.10-hotfix-249644-m73efwy3 to 5.1.10-hotfix-d73948-m767pifq

103

dist/commonjs/ExternalTaskExecution.js

@@ -12,11 +12,9 @@ "use strict";

topic;
abortSignal;
abortExecutionController = new AbortController();
customErrorHandler;
interval;
abortSignalSubscription;
awaitAbortSignal;
abortSignalResolver;
identityAccessor;
logger;
constructor(externalTask, processingFunction, externalTaskClient, config, topic, abortSignal, customErrorHandler) {
isDisposed = false;
constructor(externalTask, processingFunction, externalTaskClient, config, topic, customErrorHandler) {
this.externalTask = externalTask;

@@ -28,3 +26,2 @@ this.processingFunction = processingFunction;

this.topic = topic;
this.abortSignal = abortSignal;
this.customErrorHandler = customErrorHandler;

@@ -37,13 +34,17 @@ this.logger = new processcube_engine_sdk_1.Logger('external_task_execution', {

}
get disposed() {
return this.isDisposed;
}
async execute() {
try {
this.abortSignal.throwIfAborted();
this.startAbortSignalSubscription();
this.startExtendLockInterval();
const result = await Promise.race([this.processingFunction(this.externalTask.payload, this.externalTask, this.abortSignal), this.awaitAbortSignal]);
this.abortSignal.throwIfAborted();
this.stopLockingInterval();
const result = await this.processingFunction(this.externalTask.payload, this.externalTask, this.abortExecutionController.signal);
this.stopExtendLockInterval();
if (this.abortExecutionController.signal.aborted) {
return;
}
if (result) {
await this.processResult(result);
}
this.dispose();
}

@@ -54,20 +55,34 @@ catch (error) {

await this.handleExternalTaskExecutionError(error);
}
finally {
this.dispose();
}
}
abort() {
this.logger.debug(`Abort Signal has been triggered for External Task Executor ${this.externalTask.id}`);
this.dispose();
}
async dispose() {
this.logger.debug(`Disposong Executor for External Task ${this.externalTask.id}`);
this.stopExtendLockInterval();
if (!this.abortExecutionController.signal.aborted) {
this.abortExecutionController.abort();
}
this.isDisposed = true;
// we explicitly do not dispose the identityAccessor here, since it is shared with the ExternalTaskWorker
}
async processResult(result) {
if (this.abortSignal.aborted) {
if (this.abortExecutionController.signal.aborted) {
return;
}
this.logger.trace(`Processing External Task Result ${this.externalTask.id}`, { externalTask: this.externalTask, result: result });
try {
if (result instanceof processcube_engine_sdk_1.ExternalTaskError || result instanceof Error) {
this.logger.trace(`External Task Result ${this.externalTask.id} is an error. The task will be finished as failed.`, { externalTask: this.externalTask, result: result });
return await this.handleExternalTaskExecutionError(result);
}
return await this.externalTaskClient.finishExternalTask(this.config.workerId, this.externalTask.id, result, await this.identityAccessor.getIdentity(), this.abortSignal);
this.logger.trace(`Finishing ${this.externalTask.id} regularly.`, { externalTask: this.externalTask, result: result });
return await this.externalTaskClient.finishExternalTask(this.config.workerId, this.externalTask.id, result, await this.identityAccessor.getIdentity(), this.abortExecutionController.signal);
}
catch (error) {
const errorIsRetryable = await this.evaluateError('finishExternalTask', error);
if (errorIsRetryable && !this.abortSignal.aborted) {
if (errorIsRetryable && !this.abortExecutionController.signal.aborted) {
await this.sleep(1000);

@@ -79,6 +94,7 @@ return await this.processResult(result);

async handleExternalTaskExecutionError(error) {
if (this.abortSignal.aborted) {
if (this.abortExecutionController.signal.aborted) {
return;
}
try {
this.logger.trace(`Reporting Error for External Task ${this.externalTask.id} to Engine`);
const workerError = new processcube_engine_sdk_1.ExternalTaskError(error?.errorCode || error?.name || 'ExternalTaskExecutionError', error?.errorMessage || error?.message || 'An error occurred while processing the external task.', error?.errorDetails || error?.stack || 'No error details available.');

@@ -89,3 +105,3 @@ return await this.externalTaskClient.handleError(this.config.workerId, this.externalTask.id, workerError, await this.identityAccessor.getIdentity());

const errorIsRetryable = await this.evaluateError('finishExternalTask', error);
if (errorIsRetryable && !this.abortSignal.aborted) {
if (errorIsRetryable && !this.abortExecutionController.signal.aborted) {
await this.sleep(1000);

@@ -96,12 +112,18 @@ return await this.handleExternalTaskExecutionError(error);

}
startExtendLockInterval() {
const lockExtensionBuffer = 5000;
this.logger.trace(`Starting automatic Lock extension for External Task ${this.externalTask.id}`);
this.interval = setInterval(async () => this.extendLocks(this.externalTask), this.config.lockDuration - lockExtensionBuffer);
}
async extendLocks(externalTask) {
if (this.abortSignal.aborted) {
if (this.abortExecutionController.signal.aborted) {
return;
}
try {
await this.externalTaskClient.extendLock(this.config.workerId, externalTask.id, this.config.lockDuration, await this.identityAccessor.getIdentity(), this.abortSignal);
this.logger.debug(`Extending Lock on External Task ${this.externalTask.id}`);
await this.externalTaskClient.extendLock(this.config.workerId, externalTask.id, this.config.lockDuration, await this.identityAccessor.getIdentity(), this.abortExecutionController.signal);
}
catch (error) {
const errorIsRetryable = await this.evaluateError('extendLock', error);
if (errorIsRetryable && !this.abortSignal.aborted) {
if (errorIsRetryable && !this.abortExecutionController.signal.aborted) {
await this.sleep(1000);

@@ -112,8 +134,4 @@ await this.extendLocks(externalTask);

}
async dispose() {
this.removeAbortSignalSubscription();
this.stopLockingInterval();
// we explicitly do not dispose the identityAccessor here, since it is shared with the ExternalTaskWorker
}
stopLockingInterval() {
stopExtendLockInterval() {
this.logger.debug(`Disposing automatic Lock Extension for External Task ${this.externalTask.id}`);
if (this.interval) {

@@ -124,28 +142,15 @@ clearInterval(this.interval);

}
startAbortSignalSubscription() {
this.awaitAbortSignal = new Promise((resolve) => (this.abortSignalResolver = resolve));
this.abortSignalSubscription = () => {
if (this.abortSignalResolver) {
this.abortSignalResolver();
}
this.dispose();
};
this.abortSignal.addEventListener('abort', this.abortSignalSubscription);
}
removeAbortSignalSubscription() {
if (this.abortSignalSubscription) {
this.abortSignal.removeEventListener('abort', this.abortSignalSubscription);
async evaluateError(errorType, error) {
if (this.abortExecutionController.signal.aborted) {
return false;
}
if (this.abortSignalResolver) {
this.abortSignalResolver();
}
}
startExtendLockInterval() {
const lockExtensionBuffer = 5000;
this.interval = setInterval(async () => this.extendLocks(this.externalTask), this.config.lockDuration - lockExtensionBuffer);
}
async evaluateError(errorType, error) {
if (typeof this.customErrorHandler === 'function') {
this.logger.trace(`Posting error for external task ${this.externalTask.id} to subscribed custom error handlers`, { errorType: errorType, err: error });
this.customErrorHandler(errorType, error, this.externalTask);
}
if (error.code === 409) {
this.logger.warn(`Failed to run ${errorType} for External Task ${this.externalTask.id} because the task has already been finished. Aborting now, since there is nothing more to do here.`, { err: error });
this.abort();
return false;
}
this.logger.warn(`An error occured while executing ${errorType} for External Task ${this.externalTask.id}`, { err: error });

@@ -152,0 +157,0 @@ if (error.message?.match(/expired/i) && this.identityAccessor.canIdentityBeRefreshed()) {

@@ -58,2 +58,3 @@ "use strict";

abortSignal;
activelyProcessedExternalTasks = {};
constructor(engineUrl, topic, processingFunction, config) {

@@ -101,2 +102,6 @@ this._workerId = config?.workerId ?? uuid.v4();

this.abortSignal = this.abortController.signal;
this.abortSignal.onabort = () => {
Object.values(this.activelyProcessedExternalTasks).forEach((executor) => executor.abort());
this.activelyProcessedExternalTasks = {};
};
this.processExternalTasks();

@@ -110,3 +115,10 @@ }

}
abortExternalTaskIfPresent(externalTaskId) {
if (this.activelyProcessedExternalTasks[externalTaskId] != null) {
this.activelyProcessedExternalTasks[externalTaskId].abort();
delete this.activelyProcessedExternalTasks[externalTaskId];
}
}
dispose() {
this.stop();
this.externalTaskClient.dispose();

@@ -168,3 +180,4 @@ this.identityAccessor?.dispose();

workerId: this.workerId,
}, this.topic, this.abortSignal, this.customErrorHandler);
}, this.topic, this.customErrorHandler);
this.activelyProcessedExternalTasks[externalTask.id] = externalTaskExecution;
await externalTaskExecution.execute();

@@ -178,2 +191,5 @@ }

}
finally {
delete this.activelyProcessedExternalTasks[externalTask.id];
}
}

@@ -180,0 +196,0 @@ async sleep(milliseconds) {

@@ -10,22 +10,20 @@ import { ExternalTask } from '@5minds/processcube_engine_sdk';

private topic;
private abortSignal;
private abortExecutionController;
private customErrorHandler?;
private interval;
private abortSignalSubscription;
private awaitAbortSignal;
private abortSignalResolver;
private identityAccessor;
private logger;
constructor(externalTask: ExternalTask<TExternalTaskPayload>, processingFunction: HandleExternalTaskAction<TExternalTaskPayload, TResultPayload>, externalTaskClient: ExternalTaskApiClient, config: IExternalTaskWorkerConfig, topic: string, abortSignal: AbortSignal, customErrorHandler?: WorkerErrorHandler);
private isDisposed;
constructor(externalTask: ExternalTask<TExternalTaskPayload>, processingFunction: HandleExternalTaskAction<TExternalTaskPayload, TResultPayload>, externalTaskClient: ExternalTaskApiClient, config: IExternalTaskWorkerConfig, topic: string, customErrorHandler?: WorkerErrorHandler);
get disposed(): boolean;
execute(): Promise<void>;
abort(): void;
private dispose;
private processResult;
private handleExternalTaskExecutionError;
private startExtendLockInterval;
private extendLocks;
private dispose;
private stopLockingInterval;
private startAbortSignalSubscription;
private removeAbortSignalSubscription;
private startExtendLockInterval;
private stopExtendLockInterval;
private evaluateError;
private sleep;
}

@@ -18,2 +18,3 @@ import { Identity } from '@5minds/processcube_engine_sdk';

private abortSignal;
private activelyProcessedExternalTasks;
constructor(engineUrl: string, topic: string, processingFunction: HandleExternalTaskAction<TExternalTaskPayload, TResultPayload>, config?: IExternalTaskWorkerConfig);

@@ -35,2 +36,3 @@ /**

stop(): void;
abortExternalTaskIfPresent(externalTaskId: string): void;
dispose(): void;

@@ -37,0 +39,0 @@ private processExternalTasks;

@@ -7,3 +7,3 @@ {

"name": "@5minds/processcube_engine_client",
"version": "5.1.10-hotfix-249644-m73efwy3",
"version": "5.1.10-hotfix-d73948-m767pifq",
"description": "Contains a typescript based client for accessing the Engine.",

@@ -10,0 +10,0 @@ "main": "dist/commonjs/index.js",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc