Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

lib-task-scheduler

Package Overview
Dependencies
Maintainers
1
Versions
70
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

lib-task-scheduler - npm Package Compare versions

Comparing version 2.0.85 to 3.0.92

dist/test/unit/sns_sqs_consumer_task_spec.d.ts

5

dist/src/index.d.ts

@@ -12,2 +12,7 @@ export * from './source_sink_task';

import { Logger } from './logger';
export declare class DelayError extends Error {
readonly delaySecs: number;
readonly reason: string;
constructor(delaySecs: number, reason: string);
}
export interface Task {

@@ -14,0 +19,0 @@ /**

@@ -26,2 +26,10 @@ "use strict";

const logger_1 = require("./logger");
class DelayError extends Error {
constructor(delaySecs, reason) {
super(`Delay for ${delaySecs} seconds because ${reason}.`);
this.delaySecs = delaySecs;
this.reason = reason;
}
}
exports.DelayError = DelayError;
const DEFAULT_CONFIG = {

@@ -28,0 +36,0 @@ errorHandler: (err) => {

14

dist/src/sns_sqs_consumer_task.js

@@ -11,2 +11,3 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
const _1 = require("./");
const sns_sqs_subscriber_1 = require("./sns_sqs_subscriber");

@@ -39,5 +40,12 @@ /*

catch (error) {
this.log.error(`Unable to handle message: ${JSON.stringify(sqsMessage)}`, error);
yield this.backoff(sqsMessage);
throw error;
if (error instanceof _1.DelayError) {
yield this.delay(sqsMessage, error.delaySecs);
this.log.info(`Delaying message for ${error.delaySecs}. Reason: ${error.reason}`);
return true; // short circuit to skip message deletion.
}
else {
this.log.error(`Unable to handle message: ${JSON.stringify(sqsMessage)}`, error);
yield this.backoff(sqsMessage);
throw error;
}
}

@@ -44,0 +52,0 @@ }

@@ -15,3 +15,4 @@ import { SNSSQSSubscriber, SNSSQSSubscriberConfig } from './sns_sqs_subscriber';

backoff(message: SNSMessage): Promise<number>;
delay(message: SNSMessage, delaySecs: number): Promise<number>;
isReady(): boolean;
}

@@ -44,2 +44,7 @@ "use strict";

}
delay(message, delaySecs) {
return __awaiter(this, void 0, void 0, function* () {
return this.sqsQueue.delay(message, delaySecs);
});
}
isReady() {

@@ -46,0 +51,0 @@ return this.sqsQueue.isReady();

@@ -32,4 +32,6 @@ import { AwsCredentialsConfig } from './clients/aws';

backoff(message: SQSMessage): Promise<number>;
delay(message: SQSMessage, delaySecs: number): Promise<number>;
subscribeQueueToTopic(): Promise<string[]>;
isReady(): boolean;
private getReceiveCount(message);
}

@@ -50,6 +50,12 @@ "use strict";

return __awaiter(this, void 0, void 0, function* () {
const receiveCount = parseInt(message.Attributes['ApproximateReceiveCount']);
const receiveCount = this.getReceiveCount(message);
const nextIntervalSecs = Math.min(receiveCount * this.baseRetryIntervalSec, this.maxRetryIntervalSec);
return yield this.delay(message, nextIntervalSecs);
});
}
delay(message, delaySecs) {
return __awaiter(this, void 0, void 0, function* () {
const receiveCount = this.getReceiveCount(message);
const receiptHandle = message.ReceiptHandle;
yield this.getSqsClient().setMessageVisibilitySeconds(receiptHandle, nextIntervalSecs);
yield this.getSqsClient().setMessageVisibilitySeconds(receiptHandle, delaySecs);
return receiveCount;

@@ -70,4 +76,7 @@ });

}
getReceiveCount(message) {
return parseInt(message.Attributes['ApproximateReceiveCount']);
}
}
exports.SNSSQSSubscriber = SNSSQSSubscriber;
//# sourceMappingURL=sns_sqs_subscriber.js.map

@@ -13,2 +13,3 @@ "use strict";

const bPromise = require("bluebird");
const _1 = require("./");
const logger_1 = require("./logger");

@@ -45,8 +46,13 @@ const util_1 = require("./util");

catch (error) {
const attemptCount = yield this.config.source.backoff(message);
if (this.config.errorHandler) {
this.config.errorHandler(error, this.log, attemptCount, message);
if (error instanceof _1.DelayError) {
yield this.config.source.delay(message, error.delaySecs);
}
else {
this.log.error(`Error processing message ${message.id}`, error);
const attemptCount = yield this.config.source.backoff(message);
if (this.config.errorHandler) {
this.config.errorHandler(error, this.log, attemptCount, message);
}
else {
this.log.error(`Error processing message ${message.id}`, error);
}
}

@@ -53,0 +59,0 @@ }

@@ -21,2 +21,10 @@ import { Task, Guard } from './';

backoff(message: T): Promise<number>;
/**
* Called when a message should be delayed for processing for some amount of time. The
* message should not be redelivered until this amount of time has elapsed.
*
* This returns the number of times a message has been attempted (including the latest
* attempt).
*/
delay(message: T, delaySecs: number): Promise<number>;
}

@@ -23,0 +31,0 @@ export interface MessageSink<T extends Message> {

@@ -12,2 +12,3 @@ "use strict";

const _ = require("lodash");
const _1 = require("./");
const logger_1 = require("./logger");

@@ -45,4 +46,10 @@ class SourceSinkTask {

catch (error) {
yield this.config.source.backoff(incoming);
throw error;
if (error instanceof _1.DelayError) {
yield this.config.source.delay(incoming, error.delaySecs);
return true;
}
else {
yield this.config.source.backoff(incoming);
throw error;
}
}

@@ -49,0 +56,0 @@ });

@@ -22,2 +22,3 @@ import { SQSClient, SQSClientConfig } from './clients/sqs';

backoff(message: T): Promise<number>;
delay(message: T, delaySecs: number): Promise<number>;
persist(payload: T): Promise<void>;

@@ -31,3 +32,3 @@ deleteQueue(): Promise<void>;

private invisible;
setInvisible(message: T): void;
setInvisible(message: T, seconds?: number): void;
isVisible(message: T): boolean;

@@ -38,2 +39,3 @@ getNext(): Promise<T>;

backoff(_message: T): Promise<number>;
delay(message: T, delaySecs: number): Promise<number>;
}

@@ -74,3 +74,9 @@ "use strict";

const nextIntervalSecs = Math.min(receiveCount * BASE_RETRY_INTERVAL_SECS, MAX_RETRY_INTERVAL_SECS);
yield this.sqsClient.setMessageVisibilitySeconds(receiptHandle, nextIntervalSecs);
return yield this.delay(message, nextIntervalSecs);
});
}
delay(message, delaySecs) {
return __awaiter(this, void 0, void 0, function* () {
const { receiptHandle, receiveCount } = this.messageMeta[message.id];
yield this.sqsClient.setMessageVisibilitySeconds(receiptHandle, delaySecs);
delete this.messageMeta[message.id];

@@ -105,4 +111,4 @@ return receiveCount;

}
setInvisible(message) {
this.invisible[message.id] = Date.now() + 1000;
setInvisible(message, seconds = 1000) {
this.invisible[message.id] = Date.now() + seconds;
}

@@ -141,4 +147,10 @@ isVisible(message) {

}
delay(message, delaySecs) {
return __awaiter(this, void 0, void 0, function* () {
this.setInvisible(message, delaySecs);
return 1;
});
}
}
exports.MemoryQueue = MemoryQueue;
//# sourceMappingURL=sqs_queue.js.map

@@ -12,3 +12,5 @@ "use strict";

const _ = require("lodash");
const sinon = require("sinon");
const src_1 = require("../../src");
const src_2 = require("../../src");
const helpers_1 = require("../helpers");

@@ -34,2 +36,3 @@ function sleep(durationMs) {

backoff: sandbox.stub(),
delay: sandbox.stub(),
complete: sandbox.spy((message) => {

@@ -45,3 +48,3 @@ _.remove(messages, m => m.id === message.id);

};
sourceSinkRunner = new src_1.SourceSinkRunner({ source, sink, transformer, delayMs: 100 });
sourceSinkRunner = new src_2.SourceSinkRunner({ source, sink, transformer, delayMs: 100 });
});

@@ -103,3 +106,3 @@ it(`does not call transformer when there is no source message`, () => __awaiter(this, void 0, void 0, function* () {

const errorHandler = sandbox.stub();
sourceSinkRunner = new src_1.SourceSinkRunner({
sourceSinkRunner = new src_2.SourceSinkRunner({
source,

@@ -122,2 +125,7 @@ transformer,

}));
it('invokes the source delay method when transformer throws a DelayError', () => __awaiter(this, void 0, void 0, function* () {
helpers_1.asStub(transformer.transform).throws(new src_1.DelayError(10, 'this is a test'));
yield sourceSinkRunner.runOnce();
expect(source.delay).to.have.been.calledWith(sinon.match.object, 10);
}));
});

@@ -124,0 +132,0 @@ });

@@ -12,2 +12,3 @@ "use strict";

const sinon = require("sinon");
const src_1 = require("../../src");
const source_sink_task_1 = require("../../src/source_sink_task");

@@ -38,2 +39,3 @@ const helpers_1 = require("../helpers");

backoff: sandbox.stub(),
delay: sandbox.stub(),
},

@@ -144,2 +146,15 @@ sink: {

});
context(`and the transform throws a delay`, () => {
beforeEach(() => {
helpers_1.asStub(config.transformer.transform).rejects(new src_1.DelayError(10, 'This is a test!'));
});
it(`does not complete the message`, () => __awaiter(this, void 0, void 0, function* () {
yield task.execute();
expect(config.source.complete).to.not.have.been.called;
}));
it(`delays the message`, () => __awaiter(this, void 0, void 0, function* () {
yield task.execute();
expect(config.source.delay).to.have.been.calledWith(sinon.match.object, 10);
}));
});
context(`and the guard throws`, () => {

@@ -146,0 +161,0 @@ beforeEach(() => {

{
"name": "lib-task-scheduler",
"version": "2.0.85",
"version": "3.0.92",
"description": "A lightweight, modular task scheduler.",

@@ -5,0 +5,0 @@ "homepage": "https://github.com/convoyinc/lib-task-scheduler",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc