lib-task-scheduler
Advanced tools
Comparing version 2.0.80 to 2.0.82
@@ -21,11 +21,24 @@ "use strict"; | ||
if (sqsMessage) { | ||
const message = JSON.parse(sqsMessage.Body); | ||
let message; | ||
try { | ||
yield this.handleMessage(message); | ||
message = JSON.parse(sqsMessage.Body); | ||
} | ||
catch (error) { | ||
this.log.error(`Unable to handle message: ${JSON.stringify(sqsMessage)}`, error); | ||
yield this.backoff(sqsMessage); | ||
throw error; | ||
if (error instanceof SyntaxError) { | ||
this.log.error(`Message Body cannot be parsed as JSON (dropping): ${sqsMessage.Body}`); | ||
} | ||
else { | ||
throw error; | ||
} | ||
} | ||
if (message) { | ||
try { | ||
yield this.handleMessage(message); | ||
} | ||
catch (error) { | ||
this.log.error(`Unable to handle message: ${JSON.stringify(sqsMessage)}`, error); | ||
yield this.backoff(sqsMessage); | ||
throw error; | ||
} | ||
} | ||
yield this.getSqsClient().deleteMessage(sqsMessage.ReceiptHandle); | ||
@@ -32,0 +45,0 @@ } |
@@ -29,11 +29,24 @@ "use strict"; | ||
if (sqsMessage) { | ||
const message = JSON.parse(sqsMessage.Body); | ||
let message; | ||
try { | ||
yield this.handleMessage(message); | ||
message = JSON.parse(sqsMessage.Body); | ||
} | ||
catch (error) { | ||
this.log.error(`Unable to handle message: ${message}`, error); | ||
yield this.backoff(sqsMessage); | ||
throw error; | ||
if (error instanceof SyntaxError) { | ||
this.log.error(`Message Body cannot be parsed as JSON (dropping): ${sqsMessage.Body}`); | ||
} | ||
else { | ||
throw error; | ||
} | ||
} | ||
if (message) { | ||
try { | ||
yield this.handleMessage(message); | ||
} | ||
catch (error) { | ||
this.log.error(`Unable to handle message: ${message}`, error); | ||
yield this.backoff(sqsMessage); | ||
throw error; | ||
} | ||
} | ||
yield this.sqsClient.deleteMessage(sqsMessage.ReceiptHandle); | ||
@@ -40,0 +53,0 @@ } |
import { SQSClient, SQSClientConfig } from './clients/sqs'; | ||
import { MessageSource, MessageSink, Message } from './'; | ||
import { Logger } from './logger'; | ||
export interface SNSMessage extends Message { | ||
@@ -16,3 +17,4 @@ MessageId: string; | ||
private messageMeta; | ||
constructor(clientOrConfig: SQSClient | SQSClientConfig); | ||
private readonly log; | ||
constructor(clientOrConfig: SQSClient | SQSClientConfig, log?: Logger); | ||
getNext(): Promise<T>; | ||
@@ -19,0 +21,0 @@ complete(message: T): Promise<void>; |
@@ -14,6 +14,7 @@ "use strict"; | ||
const sqs_1 = require("./clients/sqs"); | ||
const logger_1 = require("./logger"); | ||
const MAX_RETRY_INTERVAL_SECS = moment.duration(10, 'minutes').asSeconds(); | ||
const BASE_RETRY_INTERVAL_SECS = moment.duration(30, 'seconds').asSeconds(); | ||
class SQSQueue { | ||
constructor(clientOrConfig) { | ||
constructor(clientOrConfig, log) { | ||
this.messageMeta = {}; | ||
@@ -26,2 +27,3 @@ if (clientOrConfig instanceof sqs_1.SQSClient) { | ||
} | ||
this.log = log || new logger_1.NoopLogger(); | ||
} | ||
@@ -32,3 +34,14 @@ getNext() { | ||
if (sqsMessage) { | ||
const message = JSON.parse(sqsMessage.Body); | ||
let message; | ||
try { | ||
message = JSON.parse(sqsMessage.Body); | ||
} | ||
catch (error) { | ||
if (error instanceof SyntaxError) { | ||
this.log.error(`Message Body cannot be parsed as JSON (dropping): ${sqsMessage.Body}`); | ||
} | ||
else { | ||
throw error; | ||
} | ||
} | ||
if (!message.id) { | ||
@@ -35,0 +48,0 @@ message.id = sqsMessage.MessageId; |
{ | ||
"name": "lib-task-scheduler", | ||
"version": "2.0.80", | ||
"version": "2.0.82", | ||
"description": "A lightweight, modular task scheduler.", | ||
@@ -5,0 +5,0 @@ "homepage": "https://github.com/convoyinc/lib-task-scheduler", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
140472
2089