lib-task-scheduler
Advanced tools
Comparing version 1.0.9 to 1.0.10
@@ -43,2 +43,3 @@ "use strict"; | ||
catch (error) { | ||
yield this.config.source.backoff(message); | ||
if (this.config.errorHandler) { | ||
@@ -45,0 +46,0 @@ this.config.errorHandler(error, this.log); |
@@ -12,2 +12,3 @@ import { Task, Guard } from './'; | ||
complete(message: T): Promise<void>; | ||
backoff(message: T): Promise<void>; | ||
} | ||
@@ -14,0 +15,0 @@ export interface MessageSink<T extends Message> { |
@@ -23,19 +23,25 @@ "use strict"; | ||
const incoming = yield this.config.source.getNext(); | ||
const lockStringParts = [this.config.guardScope]; | ||
if (!incoming) { | ||
this.log.debug('No incoming message available.'); | ||
return false; | ||
try { | ||
const lockStringParts = [this.config.guardScope]; | ||
if (!incoming) { | ||
this.log.debug('No incoming message available.'); | ||
return false; | ||
} | ||
this.log.info(`Processing incoming queue message ${incoming.id}`); | ||
lockStringParts.push(incoming.id); | ||
const lockString = lockStringParts.join('-'); | ||
if (!(yield this.checkGuard(lockString))) { | ||
this.log.debug(`Already locked, not running on '${lockString}'`); | ||
return false; | ||
} | ||
this.log.info(`Got lock for '${lockString}'`); | ||
const outgoing = yield this.config.transformer.transform(incoming); | ||
yield this.post(outgoing); | ||
yield this.config.source.complete(incoming); | ||
return true; | ||
} | ||
this.log.info(`Processing incoming queue message ${incoming.id}`); | ||
lockStringParts.push(incoming.id); | ||
const lockString = lockStringParts.join('-'); | ||
if (!(yield this.checkGuard(lockString))) { | ||
this.log.debug(`Already locked, not running on '${lockString}'`); | ||
return false; | ||
catch (error) { | ||
yield this.config.source.backoff(incoming); | ||
throw error; | ||
} | ||
this.log.info(`Got lock for '${lockString}'`); | ||
const outgoing = yield this.config.transformer.transform(incoming); | ||
yield this.post(outgoing); | ||
yield this.config.source.complete(incoming); | ||
return true; | ||
}); | ||
@@ -42,0 +48,0 @@ } |
@@ -31,2 +31,3 @@ "use strict"; | ||
})), | ||
backoff: sandbox.stub(), | ||
complete: sandbox.spy((message) => { | ||
@@ -112,2 +113,7 @@ _.remove(messages, m => m.id === message.id); | ||
})); | ||
it('invokes the source backoff method when transformer throws', () => __awaiter(this, void 0, void 0, function* () { | ||
helpers_1.asStub(transformer.transform).throws(new Error('bewm')); | ||
yield sourceSinkRunner.runOnce(); | ||
expect(source.backoff).to.have.been.called; | ||
})); | ||
}); | ||
@@ -114,0 +120,0 @@ }); |
@@ -35,2 +35,3 @@ "use strict"; | ||
complete: sandbox.stub(), | ||
backoff: sandbox.stub(), | ||
}, | ||
@@ -37,0 +38,0 @@ sink: { |
{ | ||
"name": "lib-task-scheduler", | ||
"version": "1.0.9", | ||
"version": "1.0.10", | ||
"description": "A lightweight, modular task scheduler.", | ||
@@ -5,0 +5,0 @@ "homepage": "https://github.com/convoyinc/lib-task-scheduler", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
99703
1439