lib-task-scheduler
Advanced tools
Comparing version 2.0.85 to 3.0.92
@@ -12,2 +12,7 @@ export * from './source_sink_task'; | ||
import { Logger } from './logger'; | ||
export declare class DelayError extends Error { | ||
readonly delaySecs: number; | ||
readonly reason: string; | ||
constructor(delaySecs: number, reason: string); | ||
} | ||
export interface Task { | ||
@@ -14,0 +19,0 @@ /** |
@@ -26,2 +26,10 @@ "use strict"; | ||
const logger_1 = require("./logger"); | ||
class DelayError extends Error { | ||
constructor(delaySecs, reason) { | ||
super(`Delay for ${delaySecs} seconds because ${reason}.`); | ||
this.delaySecs = delaySecs; | ||
this.reason = reason; | ||
} | ||
} | ||
exports.DelayError = DelayError; | ||
const DEFAULT_CONFIG = { | ||
@@ -28,0 +36,0 @@ errorHandler: (err) => { |
@@ -11,2 +11,3 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const _1 = require("./"); | ||
const sns_sqs_subscriber_1 = require("./sns_sqs_subscriber"); | ||
@@ -39,5 +40,12 @@ /* | ||
catch (error) { | ||
this.log.error(`Unable to handle message: ${JSON.stringify(sqsMessage)}`, error); | ||
yield this.backoff(sqsMessage); | ||
throw error; | ||
if (error instanceof _1.DelayError) { | ||
yield this.delay(sqsMessage, error.delaySecs); | ||
this.log.info(`Delaying message for ${error.delaySecs}. Reason: ${error.reason}`); | ||
return true; // short circuit to skip message deletion. | ||
} | ||
else { | ||
this.log.error(`Unable to handle message: ${JSON.stringify(sqsMessage)}`, error); | ||
yield this.backoff(sqsMessage); | ||
throw error; | ||
} | ||
} | ||
@@ -44,0 +52,0 @@ } |
@@ -15,3 +15,4 @@ import { SNSSQSSubscriber, SNSSQSSubscriberConfig } from './sns_sqs_subscriber'; | ||
backoff(message: SNSMessage): Promise<number>; | ||
delay(message: SNSMessage, delaySecs: number): Promise<number>; | ||
isReady(): boolean; | ||
} |
@@ -44,2 +44,7 @@ "use strict"; | ||
} | ||
delay(message, delaySecs) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
return this.sqsQueue.delay(message, delaySecs); | ||
}); | ||
} | ||
isReady() { | ||
@@ -46,0 +51,0 @@ return this.sqsQueue.isReady(); |
@@ -32,4 +32,6 @@ import { AwsCredentialsConfig } from './clients/aws'; | ||
backoff(message: SQSMessage): Promise<number>; | ||
delay(message: SQSMessage, delaySecs: number): Promise<number>; | ||
subscribeQueueToTopic(): Promise<string[]>; | ||
isReady(): boolean; | ||
private getReceiveCount(message); | ||
} |
@@ -50,6 +50,12 @@ "use strict"; | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const receiveCount = parseInt(message.Attributes['ApproximateReceiveCount']); | ||
const receiveCount = this.getReceiveCount(message); | ||
const nextIntervalSecs = Math.min(receiveCount * this.baseRetryIntervalSec, this.maxRetryIntervalSec); | ||
return yield this.delay(message, nextIntervalSecs); | ||
}); | ||
} | ||
delay(message, delaySecs) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const receiveCount = this.getReceiveCount(message); | ||
const receiptHandle = message.ReceiptHandle; | ||
yield this.getSqsClient().setMessageVisibilitySeconds(receiptHandle, nextIntervalSecs); | ||
yield this.getSqsClient().setMessageVisibilitySeconds(receiptHandle, delaySecs); | ||
return receiveCount; | ||
@@ -70,4 +76,7 @@ }); | ||
} | ||
getReceiveCount(message) { | ||
return parseInt(message.Attributes['ApproximateReceiveCount']); | ||
} | ||
} | ||
exports.SNSSQSSubscriber = SNSSQSSubscriber; | ||
//# sourceMappingURL=sns_sqs_subscriber.js.map |
@@ -13,2 +13,3 @@ "use strict"; | ||
const bPromise = require("bluebird"); | ||
const _1 = require("./"); | ||
const logger_1 = require("./logger"); | ||
@@ -45,8 +46,13 @@ const util_1 = require("./util"); | ||
catch (error) { | ||
const attemptCount = yield this.config.source.backoff(message); | ||
if (this.config.errorHandler) { | ||
this.config.errorHandler(error, this.log, attemptCount, message); | ||
if (error instanceof _1.DelayError) { | ||
yield this.config.source.delay(message, error.delaySecs); | ||
} | ||
else { | ||
this.log.error(`Error processing message ${message.id}`, error); | ||
const attemptCount = yield this.config.source.backoff(message); | ||
if (this.config.errorHandler) { | ||
this.config.errorHandler(error, this.log, attemptCount, message); | ||
} | ||
else { | ||
this.log.error(`Error processing message ${message.id}`, error); | ||
} | ||
} | ||
@@ -53,0 +59,0 @@ } |
@@ -21,2 +21,10 @@ import { Task, Guard } from './'; | ||
backoff(message: T): Promise<number>; | ||
/** | ||
* Called when a message should be delayed for processing for some amount of time. The | ||
* message should not be redelivered until this amount of time has elapsed. | ||
* | ||
* This returns the number of times a message has been attempted (including the latest | ||
* attempt). | ||
*/ | ||
delay(message: T, delaySecs: number): Promise<number>; | ||
} | ||
@@ -23,0 +31,0 @@ export interface MessageSink<T extends Message> { |
@@ -12,2 +12,3 @@ "use strict"; | ||
const _ = require("lodash"); | ||
const _1 = require("./"); | ||
const logger_1 = require("./logger"); | ||
@@ -45,4 +46,10 @@ class SourceSinkTask { | ||
catch (error) { | ||
yield this.config.source.backoff(incoming); | ||
throw error; | ||
if (error instanceof _1.DelayError) { | ||
yield this.config.source.delay(incoming, error.delaySecs); | ||
return true; | ||
} | ||
else { | ||
yield this.config.source.backoff(incoming); | ||
throw error; | ||
} | ||
} | ||
@@ -49,0 +56,0 @@ }); |
@@ -22,2 +22,3 @@ import { SQSClient, SQSClientConfig } from './clients/sqs'; | ||
backoff(message: T): Promise<number>; | ||
delay(message: T, delaySecs: number): Promise<number>; | ||
persist(payload: T): Promise<void>; | ||
@@ -31,3 +32,3 @@ deleteQueue(): Promise<void>; | ||
private invisible; | ||
setInvisible(message: T): void; | ||
setInvisible(message: T, seconds?: number): void; | ||
isVisible(message: T): boolean; | ||
@@ -38,2 +39,3 @@ getNext(): Promise<T>; | ||
backoff(_message: T): Promise<number>; | ||
delay(message: T, delaySecs: number): Promise<number>; | ||
} |
@@ -74,3 +74,9 @@ "use strict"; | ||
const nextIntervalSecs = Math.min(receiveCount * BASE_RETRY_INTERVAL_SECS, MAX_RETRY_INTERVAL_SECS); | ||
yield this.sqsClient.setMessageVisibilitySeconds(receiptHandle, nextIntervalSecs); | ||
return yield this.delay(message, nextIntervalSecs); | ||
}); | ||
} | ||
delay(message, delaySecs) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
const { receiptHandle, receiveCount } = this.messageMeta[message.id]; | ||
yield this.sqsClient.setMessageVisibilitySeconds(receiptHandle, delaySecs); | ||
delete this.messageMeta[message.id]; | ||
@@ -105,4 +111,4 @@ return receiveCount; | ||
} | ||
setInvisible(message) { | ||
this.invisible[message.id] = Date.now() + 1000; | ||
setInvisible(message, seconds = 1000) { | ||
this.invisible[message.id] = Date.now() + seconds; | ||
} | ||
@@ -141,4 +147,10 @@ isVisible(message) { | ||
} | ||
delay(message, delaySecs) { | ||
return __awaiter(this, void 0, void 0, function* () { | ||
this.setInvisible(message, delaySecs); | ||
return 1; | ||
}); | ||
} | ||
} | ||
exports.MemoryQueue = MemoryQueue; | ||
//# sourceMappingURL=sqs_queue.js.map |
@@ -12,3 +12,5 @@ "use strict"; | ||
const _ = require("lodash"); | ||
const sinon = require("sinon"); | ||
const src_1 = require("../../src"); | ||
const src_2 = require("../../src"); | ||
const helpers_1 = require("../helpers"); | ||
@@ -34,2 +36,3 @@ function sleep(durationMs) { | ||
backoff: sandbox.stub(), | ||
delay: sandbox.stub(), | ||
complete: sandbox.spy((message) => { | ||
@@ -45,3 +48,3 @@ _.remove(messages, m => m.id === message.id); | ||
}; | ||
sourceSinkRunner = new src_1.SourceSinkRunner({ source, sink, transformer, delayMs: 100 }); | ||
sourceSinkRunner = new src_2.SourceSinkRunner({ source, sink, transformer, delayMs: 100 }); | ||
}); | ||
@@ -103,3 +106,3 @@ it(`does not call transformer when there is no source message`, () => __awaiter(this, void 0, void 0, function* () { | ||
const errorHandler = sandbox.stub(); | ||
sourceSinkRunner = new src_1.SourceSinkRunner({ | ||
sourceSinkRunner = new src_2.SourceSinkRunner({ | ||
source, | ||
@@ -122,2 +125,7 @@ transformer, | ||
})); | ||
it('invokes the source delay method when transformer throws a DelayError', () => __awaiter(this, void 0, void 0, function* () { | ||
helpers_1.asStub(transformer.transform).throws(new src_1.DelayError(10, 'this is a test')); | ||
yield sourceSinkRunner.runOnce(); | ||
expect(source.delay).to.have.been.calledWith(sinon.match.object, 10); | ||
})); | ||
}); | ||
@@ -124,0 +132,0 @@ }); |
@@ -12,2 +12,3 @@ "use strict"; | ||
const sinon = require("sinon"); | ||
const src_1 = require("../../src"); | ||
const source_sink_task_1 = require("../../src/source_sink_task"); | ||
@@ -38,2 +39,3 @@ const helpers_1 = require("../helpers"); | ||
backoff: sandbox.stub(), | ||
delay: sandbox.stub(), | ||
}, | ||
@@ -144,2 +146,15 @@ sink: { | ||
}); | ||
context(`and the transform throws a delay`, () => { | ||
beforeEach(() => { | ||
helpers_1.asStub(config.transformer.transform).rejects(new src_1.DelayError(10, 'This is a test!')); | ||
}); | ||
it(`does not complete the message`, () => __awaiter(this, void 0, void 0, function* () { | ||
yield task.execute(); | ||
expect(config.source.complete).to.not.have.been.called; | ||
})); | ||
it(`delays the message`, () => __awaiter(this, void 0, void 0, function* () { | ||
yield task.execute(); | ||
expect(config.source.delay).to.have.been.calledWith(sinon.match.object, 10); | ||
})); | ||
}); | ||
context(`and the guard throws`, () => { | ||
@@ -146,0 +161,0 @@ beforeEach(() => { |
{ | ||
"name": "lib-task-scheduler", | ||
"version": "2.0.85", | ||
"version": "3.0.92", | ||
"description": "A lightweight, modular task scheduler.", | ||
@@ -5,0 +5,0 @@ "homepage": "https://github.com/convoyinc/lib-task-scheduler", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
156178
66
2302