sqs-consumer
Advanced tools
Comparing version 5.2.0 to 5.3.0
@@ -17,3 +17,4 @@ import * as SQS from 'aws-sdk/clients/sqs'; | ||
handleMessageTimeout?: number; | ||
handleMessage(message: SQSMessage): Promise<void>; | ||
handleMessage?(message: SQSMessage): Promise<void>; | ||
handleMessageBatch?(messages: SQSMessage[]): Promise<void>; | ||
} | ||
@@ -23,2 +24,3 @@ export declare class Consumer extends EventEmitter { | ||
private handleMessage; | ||
private handleMessageBatch; | ||
private handleMessageTimeout; | ||
@@ -47,3 +49,7 @@ private attributeNames; | ||
private poll; | ||
private processMessageBatch; | ||
private deleteMessageBatch; | ||
private executeBatchHandler; | ||
private terminateVisabilityTimeoutBatch; | ||
} | ||
export {}; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const debug = require('debug')('sqs-consumer'); | ||
const SQS = require("aws-sdk/clients/sqs"); | ||
const Debug = require("debug"); | ||
const events_1 = require("events"); | ||
const bind_1 = require("./bind"); | ||
const errors_1 = require("./errors"); | ||
const debug = Debug('sqs-consumer'); | ||
const requiredOptions = [ | ||
'queueUrl', | ||
'handleMessage' | ||
// only one of handleMessage / handleMessagesBatch is required | ||
'handleMessage|handleMessageBatch' | ||
]; | ||
@@ -23,4 +25,5 @@ function createTimeout(duration) { | ||
requiredOptions.forEach((option) => { | ||
if (!options[option]) { | ||
throw new Error(`Missing SQS consumer option ['${option}'].`); | ||
const possibilities = option.split('|'); | ||
if (!possibilities.find((p) => options[p])) { | ||
throw new Error(`Missing SQS consumer option [ ${possibilities.join(' or ')} ].`); | ||
} | ||
@@ -57,2 +60,3 @@ }); | ||
this.handleMessage = options.handleMessage; | ||
this.handleMessageBatch = options.handleMessageBatch; | ||
this.handleMessageTimeout = options.handleMessageTimeout; | ||
@@ -94,3 +98,9 @@ this.attributeNames = options.attributeNames || []; | ||
if (hasMessages(response)) { | ||
await Promise.all(response.Messages.map(this.processMessage)); | ||
if (this.handleMessageBatch) { | ||
// prefer handling messages in batch when available | ||
await this.processMessageBatch(response.Messages); | ||
} | ||
else { | ||
await Promise.all(response.Messages.map(this.processMessage)); | ||
} | ||
this.emit('response_processed'); | ||
@@ -225,3 +235,66 @@ } | ||
} | ||
async processMessageBatch(messages) { | ||
messages.forEach((message) => { | ||
this.emit('message_received', message); | ||
}); | ||
try { | ||
await this.executeBatchHandler(messages); | ||
await this.deleteMessageBatch(messages); | ||
messages.forEach((message) => { | ||
this.emit('message_processed', message); | ||
}); | ||
} | ||
catch (err) { | ||
this.emit('error', err, messages); | ||
if (this.terminateVisibilityTimeout) { | ||
try { | ||
await this.terminateVisabilityTimeoutBatch(messages); | ||
} | ||
catch (err) { | ||
this.emit('error', err, messages); | ||
} | ||
} | ||
} | ||
} | ||
async deleteMessageBatch(messages) { | ||
debug('Deleting messages %s', messages.map((msg) => msg.MessageId).join(' ,')); | ||
const deleteParams = { | ||
QueueUrl: this.queueUrl, | ||
Entries: messages.map(message => ({ | ||
Id: message.MessageId, | ||
ReceiptHandle: message.ReceiptHandle | ||
})) | ||
}; | ||
try { | ||
await this.sqs | ||
.deleteMessageBatch(deleteParams) | ||
.promise(); | ||
} | ||
catch (err) { | ||
throw toSQSError(err, `SQS delete message failed: ${err.message}`); | ||
} | ||
} | ||
async executeBatchHandler(messages) { | ||
try { | ||
await this.handleMessageBatch(messages); | ||
} | ||
catch (err) { | ||
err.message = `Unexpected message handler failure: ${err.message}`; | ||
throw err; | ||
} | ||
} | ||
async terminateVisabilityTimeoutBatch(messages) { | ||
const params = { | ||
QueueUrl: this.queueUrl, | ||
Entries: messages.map((message) => ({ | ||
Id: message.MessageId, | ||
ReceiptHandle: message.ReceiptHandle, | ||
VisibilityTimeout: 0 | ||
})) | ||
}; | ||
return this.sqs | ||
.changeMessageVisibilityBatch(params) | ||
.promise(); | ||
} | ||
} | ||
exports.Consumer = Consumer; |
{ | ||
"name": "sqs-consumer", | ||
"version": "5.2.0", | ||
"version": "5.3.0", | ||
"description": "Build SQS-based Node applications without the boilerplate", | ||
@@ -34,4 +34,4 @@ "main": "dist/index.js", | ||
"devDependencies": { | ||
"@types/aws-sdk": "^2.7.0", | ||
"@types/chai": "^4.1.4", | ||
"@types/debug": "^4.1.3", | ||
"@types/mocha": "^2.2.43", | ||
@@ -42,2 +42,7 @@ "@types/node": "^10.12.18", | ||
"chai": "^4.2.0", | ||
"codeclimate-test-reporter": "^0.5.1", | ||
"mocha": "^5.2.0", | ||
"nyc": "^13.1.0", | ||
"p-event": "^2.1.0", | ||
"sinon": "^7.2.2", | ||
"ts-node": "^3.3.0", | ||
@@ -47,11 +52,6 @@ "tslint": "^5.12.1", | ||
"tslint-microsoft-contrib": "^5.0.3", | ||
"typescript": "^2.6.1", | ||
"codeclimate-test-reporter": "^0.5.1", | ||
"mocha": "^5.2.0", | ||
"nyc": "^13.1.0", | ||
"p-event": "^2.1.0", | ||
"sinon": "^7.2.2" | ||
"typescript": "^2.6.1" | ||
}, | ||
"dependencies": { | ||
"aws-sdk": "^2.393.0", | ||
"aws-sdk": "^2.443.0", | ||
"debug": "^4.1.1" | ||
@@ -58,0 +58,0 @@ }, |
@@ -100,3 +100,4 @@ # sqs-consumer | ||
* `handleMessage` - _Function_ - An `async` function (or function that returns a `Promise`) to be called whenever a message is received. Receives an SQS message object as it's first argument. | ||
* `handleMessageTimeout` - _Number_ - Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue. | ||
* `handleMessageBatch` - _Function_ - An `async` function (or function that returns a `Promise`) to be called whenever a batch of messages is received. Similar to `handleMessage` but will receive the list of messages, not each message individually. **If both are set, `handleMessageBatch` overrides `handleMessage`**. | ||
* `handleMessageTimeout` - _String_ - Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue. | ||
* `attributeNames` - _Array_ - List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`). | ||
@@ -103,0 +104,0 @@ * `messageAttributeNames` - _Array_ - List of message attributes to retrieve (i.e. `['name', 'address']`). |
@@ -1,6 +0,5 @@ | ||
const debug = require('debug')('sqs-consumer'); | ||
import { AWSError } from 'aws-sdk'; | ||
import * as SQS from 'aws-sdk/clients/sqs'; | ||
import { PromiseResult } from 'aws-sdk/lib/request'; | ||
import * as Debug from 'debug'; | ||
import { EventEmitter } from 'events'; | ||
@@ -10,2 +9,4 @@ import { autoBind } from './bind'; | ||
const debug = Debug('sqs-consumer'); | ||
type ReceieveMessageResponse = PromiseResult<SQS.Types.ReceiveMessageResult, AWSError>; | ||
@@ -17,3 +18,4 @@ type SQSMessage = SQS.Types.Message; | ||
'queueUrl', | ||
'handleMessage' | ||
// only one of handleMessage / handleMessagesBatch is required | ||
'handleMessage|handleMessageBatch' | ||
]; | ||
@@ -38,4 +40,5 @@ | ||
requiredOptions.forEach((option) => { | ||
if (!options[option]) { | ||
throw new Error(`Missing SQS consumer option ['${option}'].`); | ||
const possibilities = option.split('|'); | ||
if (!possibilities.find((p) => options[p])) { | ||
throw new Error(`Missing SQS consumer option [ ${possibilities.join(' or ')} ].`); | ||
} | ||
@@ -85,3 +88,4 @@ }); | ||
handleMessageTimeout?: number; | ||
handleMessage(message: SQSMessage): Promise<void>; | ||
handleMessage?(message: SQSMessage): Promise<void>; | ||
handleMessageBatch?(messages: SQSMessage[]): Promise<void>; | ||
} | ||
@@ -92,2 +96,3 @@ | ||
private handleMessage: (message: SQSMessage) => Promise<void>; | ||
private handleMessageBatch: (message: SQSMessage[]) => Promise<void>; | ||
private handleMessageTimeout: number; | ||
@@ -107,5 +112,5 @@ private attributeNames: string[]; | ||
assertOptions(options); | ||
this.queueUrl = options.queueUrl; | ||
this.handleMessage = options.handleMessage; | ||
this.handleMessageBatch = options.handleMessageBatch; | ||
this.handleMessageTimeout = options.handleMessageTimeout; | ||
@@ -129,3 +134,3 @@ this.attributeNames = options.attributeNames || []; | ||
public get isRunning(): boolean { | ||
return !this.stopped; | ||
return !this.stopped; | ||
} | ||
@@ -156,3 +161,8 @@ | ||
if (hasMessages(response)) { | ||
await Promise.all(response.Messages.map(this.processMessage)); | ||
if (this.handleMessageBatch) { | ||
// prefer handling messages in batch when available | ||
await this.processMessageBatch(response.Messages); | ||
} else { | ||
await Promise.all(response.Messages.map(this.processMessage)); | ||
} | ||
this.emit('response_processed'); | ||
@@ -289,2 +299,70 @@ } else { | ||
} | ||
private async processMessageBatch(messages: SQSMessage[]): Promise<void> { | ||
messages.forEach((message) => { | ||
this.emit('message_received', message); | ||
}); | ||
try { | ||
await this.executeBatchHandler(messages); | ||
await this.deleteMessageBatch(messages); | ||
messages.forEach((message) => { | ||
this.emit('message_processed', message); | ||
}); | ||
} catch (err) { | ||
this.emit('error', err, messages); | ||
if (this.terminateVisibilityTimeout) { | ||
try { | ||
await this.terminateVisabilityTimeoutBatch(messages); | ||
} catch (err) { | ||
this.emit('error', err, messages); | ||
} | ||
} | ||
} | ||
} | ||
private async deleteMessageBatch(messages: SQSMessage[]): Promise<void> { | ||
debug('Deleting messages %s', messages.map((msg) => msg.MessageId).join(' ,')); | ||
const deleteParams = { | ||
QueueUrl: this.queueUrl, | ||
Entries: messages.map(message => ({ | ||
Id: message.MessageId, | ||
ReceiptHandle: message.ReceiptHandle | ||
})) | ||
}; | ||
try { | ||
await this.sqs | ||
.deleteMessageBatch(deleteParams) | ||
.promise(); | ||
} catch (err) { | ||
throw toSQSError(err, `SQS delete message failed: ${err.message}`); | ||
} | ||
} | ||
private async executeBatchHandler(messages: SQSMessage[]): Promise<void> { | ||
try { | ||
await this.handleMessageBatch(messages); | ||
} catch (err) { | ||
err.message = `Unexpected message handler failure: ${err.message}`; | ||
throw err; | ||
} | ||
} | ||
private async terminateVisabilityTimeoutBatch(messages: SQSMessage[]): Promise<PromiseResult<any, AWSError>> { | ||
const params = { | ||
QueueUrl: this.queueUrl, | ||
Entries: messages.map((message) => ({ | ||
Id: message.MessageId, | ||
ReceiptHandle: message.ReceiptHandle, | ||
VisibilityTimeout: 0 | ||
})) | ||
}; | ||
return this.sqs | ||
.changeMessageVisibilityBatch(params) | ||
.promise(); | ||
} | ||
} |
@@ -40,2 +40,3 @@ import { assert } from 'chai'; | ||
let handleMessage; | ||
let handleMessageBatch; | ||
let sqs; | ||
@@ -52,5 +53,7 @@ const response = { | ||
handleMessage = sandbox.stub().resolves(null); | ||
handleMessageBatch = sandbox.stub().resolves(null); | ||
sqs = sandbox.mock(); | ||
sqs.receiveMessage = stubResolve(response); | ||
sqs.deleteMessage = stubResolve(); | ||
sqs.deleteMessageBatch = stubResolve(); | ||
sqs.changeMessageVisibility = stubResolve(); | ||
@@ -80,3 +83,3 @@ | ||
it('requires a handleMessage function to be set', () => { | ||
it('requires a handleMessage or handleMessagesBatch function to be set', () => { | ||
assert.throws(() => { | ||
@@ -563,2 +566,39 @@ new Consumer({ | ||
}); | ||
it('calls the handleMessagesBatch function when a batch of messages is received', async () => { | ||
consumer = new Consumer({ | ||
queueUrl: 'some-queue-url', | ||
messageAttributeNames: ['attribute-1', 'attribute-2'], | ||
region: 'some-region', | ||
handleMessageBatch, | ||
batchSize: 2, | ||
sqs | ||
}); | ||
consumer.start(); | ||
await pEvent(consumer, 'response_processed'); | ||
consumer.stop(); | ||
sandbox.assert.callCount(handleMessageBatch, 1); | ||
}); | ||
it('prefers handleMessagesBatch over handleMessage when both are set', async () => { | ||
consumer = new Consumer({ | ||
queueUrl: 'some-queue-url', | ||
messageAttributeNames: ['attribute-1', 'attribute-2'], | ||
region: 'some-region', | ||
handleMessageBatch, | ||
handleMessage, | ||
batchSize: 2, | ||
sqs | ||
}); | ||
consumer.start(); | ||
await pEvent(consumer, 'response_processed'); | ||
consumer.stop(); | ||
sandbox.assert.callCount(handleMessageBatch, 1); | ||
sandbox.assert.callCount(handleMessage, 0); | ||
}); | ||
}); | ||
@@ -565,0 +605,0 @@ |
84490
24
1390
145
Updatedaws-sdk@^2.443.0