sqs-consumer
Advanced tools
Comparing version 5.4.0 to 5.5.0
@@ -1,3 +0,4 @@ | ||
'use strict'; | ||
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.autoBind = void 0; | ||
function isMethod(propertyName, value) { | ||
@@ -4,0 +5,0 @@ return propertyName !== 'constructor' && typeof value === 'function'; |
@@ -0,4 +1,5 @@ | ||
/// <reference types="node" /> | ||
import * as SQS from 'aws-sdk/clients/sqs'; | ||
import { EventEmitter } from 'events'; | ||
declare type SQSMessage = SQS.Types.Message; | ||
export declare type SQSMessage = SQS.Types.Message; | ||
export interface ConsumerOptions { | ||
@@ -15,2 +16,3 @@ queueUrl?: string; | ||
terminateVisibilityTimeout?: boolean; | ||
heartbeatInterval?: number; | ||
sqs?: SQS; | ||
@@ -22,2 +24,12 @@ region?: string; | ||
} | ||
interface Events { | ||
'response_processed': []; | ||
'empty': []; | ||
'message_received': [SQSMessage]; | ||
'message_processed': [SQSMessage]; | ||
'error': [Error, void | SQSMessage | SQSMessage[]]; | ||
'timeout_error': [Error, SQSMessage]; | ||
'processing_error': [Error, SQSMessage]; | ||
'stopped': []; | ||
} | ||
export declare class Consumer extends EventEmitter { | ||
@@ -37,5 +49,9 @@ private queueUrl; | ||
private terminateVisibilityTimeout; | ||
private heartbeatInterval; | ||
private sqs; | ||
constructor(options: ConsumerOptions); | ||
readonly isRunning: boolean; | ||
emit<T extends keyof Events>(event: T, ...args: Events[T]): boolean; | ||
on<T extends keyof Events>(event: T, listener: (...args: Events[T]) => void): this; | ||
once<T extends keyof Events>(event: T, listener: (...args: Events[T]) => void): this; | ||
get isRunning(): boolean; | ||
static create(options: ConsumerOptions): Consumer; | ||
@@ -49,3 +65,3 @@ start(): void; | ||
private executeHandler; | ||
private terminateVisabilityTimeout; | ||
private changeVisabilityTimeout; | ||
private emitError; | ||
@@ -56,4 +72,5 @@ private poll; | ||
private executeBatchHandler; | ||
private terminateVisabilityTimeoutBatch; | ||
private changeVisabilityTimeoutBatch; | ||
private startHeartbeat; | ||
} | ||
export {}; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.Consumer = void 0; | ||
const SQS = require("aws-sdk/clients/sqs"); | ||
@@ -33,2 +34,5 @@ const Debug = require("debug"); | ||
} | ||
if (options.heartbeatInterval && !(options.heartbeatInterval < options.visibilityTimeout)) { | ||
throw new Error('heartbeatInterval must be less than visibilityTimeout.'); | ||
} | ||
} | ||
@@ -68,2 +72,3 @@ function isConnectionError(err) { | ||
this.terminateVisibilityTimeout = options.terminateVisibilityTimeout || false; | ||
this.heartbeatInterval = options.heartbeatInterval; | ||
this.waitTimeSeconds = options.waitTimeSeconds || 20; | ||
@@ -77,2 +82,11 @@ this.authenticationErrorTimeout = options.authenticationErrorTimeout || 10000; | ||
} | ||
emit(event, ...args) { | ||
return super.emit(event, ...args); | ||
} | ||
on(event, listener) { | ||
return super.on(event, listener); | ||
} | ||
once(event, listener) { | ||
return super.once(event, listener); | ||
} | ||
get isRunning() { | ||
@@ -116,3 +130,9 @@ return !this.stopped; | ||
this.emit('message_received', message); | ||
let heartbeat; | ||
try { | ||
if (this.heartbeatInterval) { | ||
heartbeat = this.startHeartbeat(async (elapsedSeconds) => { | ||
return this.changeVisabilityTimeout(message, elapsedSeconds + this.visibilityTimeout); | ||
}); | ||
} | ||
await this.executeHandler(message); | ||
@@ -125,10 +145,8 @@ await this.deleteMessage(message); | ||
if (this.terminateVisibilityTimeout) { | ||
try { | ||
await this.terminateVisabilityTimeout(message); | ||
} | ||
catch (err) { | ||
this.emit('error', err, message); | ||
} | ||
await this.changeVisabilityTimeout(message, 0); | ||
} | ||
} | ||
finally { | ||
clearInterval(heartbeat); | ||
} | ||
} | ||
@@ -188,10 +206,15 @@ async receiveMessage(params) { | ||
} | ||
async terminateVisabilityTimeout(message) { | ||
return this.sqs | ||
.changeMessageVisibility({ | ||
QueueUrl: this.queueUrl, | ||
ReceiptHandle: message.ReceiptHandle, | ||
VisibilityTimeout: 0 | ||
}) | ||
.promise(); | ||
async changeVisabilityTimeout(message, timeout) { | ||
try { | ||
return this.sqs | ||
.changeMessageVisibility({ | ||
QueueUrl: this.queueUrl, | ||
ReceiptHandle: message.ReceiptHandle, | ||
VisibilityTimeout: timeout | ||
}) | ||
.promise(); | ||
} | ||
catch (err) { | ||
this.emit('error', err, message); | ||
} | ||
} | ||
@@ -243,3 +266,9 @@ emitError(err, message) { | ||
}); | ||
let heartbeat; | ||
try { | ||
if (this.heartbeatInterval) { | ||
heartbeat = this.startHeartbeat(async (elapsedSeconds) => { | ||
return this.changeVisabilityTimeoutBatch(messages, elapsedSeconds + this.visibilityTimeout); | ||
}); | ||
} | ||
await this.executeBatchHandler(messages); | ||
@@ -254,10 +283,8 @@ await this.deleteMessageBatch(messages); | ||
if (this.terminateVisibilityTimeout) { | ||
try { | ||
await this.terminateVisabilityTimeoutBatch(messages); | ||
} | ||
catch (err) { | ||
this.emit('error', err, messages); | ||
} | ||
await this.changeVisabilityTimeoutBatch(messages, 0); | ||
} | ||
} | ||
finally { | ||
clearInterval(heartbeat); | ||
} | ||
} | ||
@@ -268,3 +295,3 @@ async deleteMessageBatch(messages) { | ||
QueueUrl: this.queueUrl, | ||
Entries: messages.map(message => ({ | ||
Entries: messages.map((message) => ({ | ||
Id: message.MessageId, | ||
@@ -292,3 +319,3 @@ ReceiptHandle: message.ReceiptHandle | ||
} | ||
async terminateVisabilityTimeoutBatch(messages) { | ||
async changeVisabilityTimeoutBatch(messages, timeout) { | ||
const params = { | ||
@@ -299,10 +326,22 @@ QueueUrl: this.queueUrl, | ||
ReceiptHandle: message.ReceiptHandle, | ||
VisibilityTimeout: 0 | ||
VisibilityTimeout: timeout | ||
})) | ||
}; | ||
return this.sqs | ||
.changeMessageVisibilityBatch(params) | ||
.promise(); | ||
try { | ||
return this.sqs | ||
.changeMessageVisibilityBatch(params) | ||
.promise(); | ||
} | ||
catch (err) { | ||
this.emit('error', err, messages); | ||
} | ||
} | ||
startHeartbeat(heartbeatFn) { | ||
const startTime = Date.now(); | ||
return setInterval(() => { | ||
const elapsedSeconds = Math.ceil((Date.now() - startTime) / 1000); | ||
heartbeatFn(elapsedSeconds); | ||
}, this.heartbeatInterval * 1000); | ||
} | ||
} | ||
exports.Consumer = Consumer; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.TimeoutError = exports.SQSError = void 0; | ||
class SQSError extends Error { | ||
@@ -4,0 +5,0 @@ constructor(message) { |
@@ -1,1 +0,1 @@ | ||
export { Consumer, ConsumerOptions } from './consumer'; | ||
export { SQSMessage, Consumer, ConsumerOptions } from './consumer'; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
var consumer_1 = require("./consumer"); | ||
exports.Consumer = consumer_1.Consumer; | ||
Object.defineProperty(exports, "Consumer", { enumerable: true, get: function () { return consumer_1.Consumer; } }); |
{ | ||
"name": "sqs-consumer", | ||
"version": "5.4.0", | ||
"version": "5.5.0", | ||
"description": "Build SQS-based Node applications without the boilerplate", | ||
@@ -13,4 +13,5 @@ "main": "dist/index.js", | ||
"pretest": "npm run build", | ||
"test": "mocha", | ||
"lint": "tslint --project tsconfig.json", | ||
"test": "mocha --recursive --full-trace --exit", | ||
"lint": "eslint . --ext .ts", | ||
"lint:fix": "eslint . --fix", | ||
"coverage": "nyc mocha && nyc report --reporter=html && nyc report --reporter=json-summary", | ||
@@ -35,24 +36,28 @@ "lcov": "nyc mocha && nyc report --reporter=lcov", | ||
"devDependencies": { | ||
"@types/chai": "^4.1.4", | ||
"@types/debug": "^4.1.3", | ||
"@types/mocha": "^2.2.43", | ||
"@types/node": "^10.12.18", | ||
"@types/sinon": "^4.0.0", | ||
"@types/chai": "^4.2.11", | ||
"@types/debug": "^4.1.5", | ||
"@types/mocha": "^7.0.2", | ||
"@types/node": "^14.0.13", | ||
"@types/sinon": "^9.0.4", | ||
"@types/typescript": "^2.0.0", | ||
"chai": "^4.2.0", | ||
"codeclimate-test-reporter": "^0.5.1", | ||
"mocha": "^5.2.0", | ||
"nyc": "^14.1.1", | ||
"p-event": "^2.1.0", | ||
"sinon": "^7.2.2", | ||
"ts-node": "^3.3.0", | ||
"tslint": "^5.17.0", | ||
"tslint-config-airbnb": "^5.3.1", | ||
"tslint-microsoft-contrib": "^5.0.3", | ||
"typescript": "^2.6.1" | ||
"eslint": "^7.2.0", | ||
"eslint-config-iplayer-ts": "^2.0.0", | ||
"mocha": "^8.0.1", | ||
"nyc": "^15.1.0", | ||
"p-event": "^4.2.0", | ||
"sinon": "^9.0.2", | ||
"ts-node": "^8.10.2", | ||
"tslint-config-airbnb": "^5.11.2", | ||
"tslint-microsoft-contrib": "^6.2.0", | ||
"typescript": "^3.9.5", | ||
"aws-sdk": "^2.699.0" | ||
}, | ||
"dependencies": { | ||
"aws-sdk": "^2.443.0", | ||
"debug": "^4.1.1" | ||
}, | ||
"peerDependencies": { | ||
"aws-sdk": "^2.699.0" | ||
}, | ||
"nyc": { | ||
@@ -70,3 +75,14 @@ "include": [ | ||
"instrument": true | ||
}, | ||
"eslintConfig": { | ||
"extends": "iplayer-ts", | ||
"parserOptions": { | ||
"ecmaVersion": 2017, | ||
"sourceType": "module" | ||
} | ||
}, | ||
"mocha": { | ||
"spec": "test/**/**/*.ts", | ||
"require": "ts-node/register" | ||
} | ||
} |
@@ -43,3 +43,32 @@ # sqs-consumer | ||
* By default messages are processed one at a time – a new message won't be received until the first one has been processed. To process messages in parallel, use the `batchSize` option [detailed below](#options). | ||
* By default, the default Node.js HTTP/HTTPS SQS agent creates a new TCP connection for every new request ([AWS SQS documentation](https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/node-reusing-connections.html)). To avoid the cost of establishing a new connection, you can reuse an existing connection by passing a new SQS instance with `keepAlive: true`. | ||
```js | ||
const { Consumer } = require('sqs-consumer'); | ||
const AWS = require('aws-sdk'); | ||
const app = Consumer.create({ | ||
queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name', | ||
handleMessage: async (message) => { | ||
// do some work with `message` | ||
}, | ||
sqs: new AWS.SQS({ | ||
httpOptions: { | ||
agent: new https.Agent({ | ||
keepAlive: true | ||
}) | ||
} | ||
}) | ||
}); | ||
app.on('error', (err) => { | ||
console.error(err.message); | ||
}); | ||
app.on('processing_error', (err) => { | ||
console.error(err.message); | ||
}); | ||
app.start(); | ||
``` | ||
### Credentials | ||
@@ -102,3 +131,3 @@ | ||
* `handleMessageBatch` - _Function_ - An `async` function (or function that returns a `Promise`) to be called whenever a batch of messages is received. Similar to `handleMessage` but will receive the list of messages, not each message individually. **If both are set, `handleMessageBatch` overrides `handleMessage`**. | ||
* `handleMessageTimeout` - _String_ - Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue. | ||
* `handleMessageTimeout` - _Number_ - Time in ms to wait for `handleMessage` to process a message before timing out. Emits `timeout_error` on timeout. By default, if `handleMessage` times out, the unprocessed message returns to the end of the queue. | ||
* `attributeNames` - _Array_ - List of queue attributes to retrieve (i.e. `['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']`). | ||
@@ -108,2 +137,3 @@ * `messageAttributeNames` - _Array_ - List of message attributes to retrieve (i.e. `['name', 'address']`). | ||
* `visibilityTimeout` - _Number_ - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. | ||
* `heartbeatInterval` - _Number_ - The interval (in seconds) between requests to extend the message visibility timeout. On each heartbeat the visibility is extended by adding `visibilityTimeout` to the number of seconds since the start of the handler function. This value must less than `visibilityTimeout`. | ||
* `terminateVisibilityTimeout` - _Boolean_ - If true, sets the message visibility timeout to 0 after a `processing_error` (defaults to `false`). | ||
@@ -148,2 +178,2 @@ * `waitTimeSeconds` - _Number_ - The duration (in seconds) for which the call will wait for a message to arrive in the queue before returning. | ||
### Contributing | ||
See contributing [guildlines](https://github.com/bbc/sqs-consumer/blob/master/CONTRIBUTING.md) | ||
See contributing [guidelines](https://github.com/bbc/sqs-consumer/blob/master/CONTRIBUTING.md) |
@@ -1,3 +0,1 @@ | ||
'use strict'; | ||
function isMethod(propertyName: string, value: any): boolean { | ||
@@ -4,0 +2,0 @@ return propertyName !== 'constructor' && typeof value === 'function'; |
@@ -12,4 +12,4 @@ import { AWSError } from 'aws-sdk'; | ||
type ReceieveMessageResponse = PromiseResult<SQS.Types.ReceiveMessageResult, AWSError>; | ||
type SQSMessage = SQS.Types.Message; | ||
type ReceiveMessageRequest = SQS.Types.ReceiveMessageRequest; | ||
export type SQSMessage = SQS.Types.Message; | ||
@@ -48,5 +48,9 @@ const requiredOptions = [ | ||
} | ||
if (options.heartbeatInterval && !(options.heartbeatInterval < options.visibilityTimeout)) { | ||
throw new Error('heartbeatInterval must be less than visibilityTimeout.'); | ||
} | ||
} | ||
function isConnectionError(err: Error): Boolean { | ||
function isConnectionError(err: Error): boolean { | ||
if (err instanceof SQSError) { | ||
@@ -85,2 +89,3 @@ return (err.statusCode === 403 || err.code === 'CredentialsError' || err.code === 'UnknownEndpoint'); | ||
terminateVisibilityTimeout?: boolean; | ||
heartbeatInterval?: number; | ||
sqs?: SQS; | ||
@@ -93,2 +98,13 @@ region?: string; | ||
interface Events { | ||
'response_processed': []; | ||
'empty': []; | ||
'message_received': [SQSMessage]; | ||
'message_processed': [SQSMessage]; | ||
'error': [Error, void | SQSMessage | SQSMessage[]]; | ||
'timeout_error': [Error, SQSMessage]; | ||
'processing_error': [Error, SQSMessage]; | ||
'stopped': []; | ||
} | ||
export class Consumer extends EventEmitter { | ||
@@ -108,2 +124,3 @@ private queueUrl: string; | ||
private terminateVisibilityTimeout: boolean; | ||
private heartbeatInterval: number; | ||
private sqs: SQS; | ||
@@ -124,2 +141,3 @@ | ||
this.terminateVisibilityTimeout = options.terminateVisibilityTimeout || false; | ||
this.heartbeatInterval = options.heartbeatInterval; | ||
this.waitTimeSeconds = options.waitTimeSeconds || 20; | ||
@@ -136,2 +154,14 @@ this.authenticationErrorTimeout = options.authenticationErrorTimeout || 10000; | ||
emit<T extends keyof Events>(event: T, ...args: Events[T]) { | ||
return super.emit(event, ...args); | ||
} | ||
on<T extends keyof Events>(event: T, listener: (...args: Events[T]) => void): this { | ||
return super.on(event, listener); | ||
} | ||
once<T extends keyof Events>(event: T, listener: (...args: Events[T]) => void): this { | ||
return super.once(event, listener); | ||
} | ||
public get isRunning(): boolean { | ||
@@ -180,3 +210,9 @@ return !this.stopped; | ||
let heartbeat; | ||
try { | ||
if (this.heartbeatInterval) { | ||
heartbeat = this.startHeartbeat(async (elapsedSeconds) => { | ||
return this.changeVisabilityTimeout(message, elapsedSeconds + this.visibilityTimeout); | ||
}); | ||
} | ||
await this.executeHandler(message); | ||
@@ -189,8 +225,6 @@ await this.deleteMessage(message); | ||
if (this.terminateVisibilityTimeout) { | ||
try { | ||
await this.terminateVisabilityTimeout(message); | ||
} catch (err) { | ||
this.emit('error', err, message); | ||
} | ||
await this.changeVisabilityTimeout(message, 0); | ||
} | ||
} finally { | ||
clearInterval(heartbeat); | ||
} | ||
@@ -251,10 +285,14 @@ } | ||
private async terminateVisabilityTimeout(message: SQSMessage): Promise<PromiseResult<any, AWSError>> { | ||
return this.sqs | ||
.changeMessageVisibility({ | ||
QueueUrl: this.queueUrl, | ||
ReceiptHandle: message.ReceiptHandle, | ||
VisibilityTimeout: 0 | ||
}) | ||
.promise(); | ||
private async changeVisabilityTimeout(message: SQSMessage, timeout: number): Promise<PromiseResult<any, AWSError>> { | ||
try { | ||
return this.sqs | ||
.changeMessageVisibility({ | ||
QueueUrl: this.queueUrl, | ||
ReceiptHandle: message.ReceiptHandle, | ||
VisibilityTimeout: timeout | ||
}) | ||
.promise(); | ||
} catch (err) { | ||
this.emit('error', err, message); | ||
} | ||
} | ||
@@ -299,3 +337,3 @@ | ||
}).then(() => { | ||
setTimeout(this.poll, currentPollingTimeout); | ||
setTimeout(this.poll, currentPollingTimeout); | ||
}).catch((err) => { | ||
@@ -311,3 +349,9 @@ this.emit('error', err); | ||
let heartbeat; | ||
try { | ||
if (this.heartbeatInterval) { | ||
heartbeat = this.startHeartbeat(async (elapsedSeconds) => { | ||
return this.changeVisabilityTimeoutBatch(messages, elapsedSeconds + this.visibilityTimeout); | ||
}); | ||
} | ||
await this.executeBatchHandler(messages); | ||
@@ -322,8 +366,6 @@ await this.deleteMessageBatch(messages); | ||
if (this.terminateVisibilityTimeout) { | ||
try { | ||
await this.terminateVisabilityTimeoutBatch(messages); | ||
} catch (err) { | ||
this.emit('error', err, messages); | ||
} | ||
await this.changeVisabilityTimeoutBatch(messages, 0); | ||
} | ||
} finally { | ||
clearInterval(heartbeat); | ||
} | ||
@@ -337,3 +379,3 @@ } | ||
QueueUrl: this.queueUrl, | ||
Entries: messages.map(message => ({ | ||
Entries: messages.map((message) => ({ | ||
Id: message.MessageId, | ||
@@ -362,3 +404,3 @@ ReceiptHandle: message.ReceiptHandle | ||
private async terminateVisabilityTimeoutBatch(messages: SQSMessage[]): Promise<PromiseResult<any, AWSError>> { | ||
private async changeVisabilityTimeoutBatch(messages: SQSMessage[], timeout: number): Promise<PromiseResult<any, AWSError>> { | ||
const params = { | ||
@@ -369,10 +411,21 @@ QueueUrl: this.queueUrl, | ||
ReceiptHandle: message.ReceiptHandle, | ||
VisibilityTimeout: 0 | ||
VisibilityTimeout: timeout | ||
})) | ||
}; | ||
return this.sqs | ||
.changeMessageVisibilityBatch(params) | ||
.promise(); | ||
try { | ||
return this.sqs | ||
.changeMessageVisibilityBatch(params) | ||
.promise(); | ||
} catch (err) { | ||
this.emit('error', err, messages); | ||
} | ||
} | ||
private startHeartbeat(heartbeatFn: (elapsedSeconds: number) => void): NodeJS.Timeout { | ||
const startTime = Date.now(); | ||
return setInterval(() => { | ||
const elapsedSeconds = Math.ceil((Date.now() - startTime) / 1000); | ||
heartbeatFn(elapsedSeconds); | ||
}, this.heartbeatInterval * 1000); | ||
} | ||
} |
@@ -16,3 +16,3 @@ class SQSError extends Error { | ||
class TimeoutError extends Error { | ||
constructor(message: string = 'Operation timed out.') { | ||
constructor(message = 'Operation timed out.') { | ||
super(message); | ||
@@ -19,0 +19,0 @@ this.message = message; |
@@ -1,1 +0,1 @@ | ||
export { Consumer, ConsumerOptions } from './consumer'; | ||
export { SQSMessage, Consumer, ConsumerOptions } from './consumer'; |
@@ -34,2 +34,3 @@ import { assert } from 'chai'; | ||
super(message); | ||
this.message = message; | ||
} | ||
@@ -41,2 +42,3 @@ } | ||
let consumer; | ||
let clock; | ||
let handleMessage; | ||
@@ -54,2 +56,3 @@ let handleMessageBatch; | ||
beforeEach(() => { | ||
clock = sinon.useFakeTimers(); | ||
handleMessage = sandbox.stub().resolves(null); | ||
@@ -62,2 +65,3 @@ handleMessageBatch = sandbox.stub().resolves(null); | ||
sqs.changeMessageVisibility = stubResolve(); | ||
sqs.changeMessageVisibilityBatch = stubResolve(); | ||
@@ -118,2 +122,25 @@ consumer = new Consumer({ | ||
it('requires visibilityTimeout to be set with heartbeatInterval', () => { | ||
assert.throws(() => { | ||
new Consumer({ | ||
region: 'some-region', | ||
queueUrl: 'some-queue-url', | ||
handleMessage, | ||
heartbeatInterval: 30 | ||
}); | ||
}); | ||
}); | ||
it('requires heartbeatInterval to be less than visibilityTimeout', () => { | ||
assert.throws(() => { | ||
new Consumer({ | ||
region: 'some-region', | ||
queueUrl: 'some-queue-url', | ||
handleMessage, | ||
heartbeatInterval: 30, | ||
visibilityTimeout: 30 | ||
}); | ||
}); | ||
}); | ||
describe('.create', () => { | ||
@@ -142,3 +169,3 @@ it('creates a new instance of a Consumer object', () => { | ||
const err = await pEvent(consumer, 'error'); | ||
const err: any = await pEvent(consumer, 'error'); | ||
@@ -162,3 +189,3 @@ consumer.stop(); | ||
consumer.start(); | ||
const err = await pEvent(consumer, 'error'); | ||
const err: any = await pEvent(consumer, 'error'); | ||
consumer.stop(); | ||
@@ -188,3 +215,3 @@ | ||
consumer.start(); | ||
const err = await pEvent(consumer, 'timeout_error'); | ||
const [err]: any = await Promise.all([pEvent(consumer, 'timeout_error'), clock.tickAsync(handleMessageTimeout)]); | ||
consumer.stop(); | ||
@@ -208,3 +235,3 @@ | ||
consumer.start(); | ||
const err = await pEvent(consumer, 'processing_error'); | ||
const err: any = await pEvent(consumer, 'processing_error'); | ||
consumer.stop(); | ||
@@ -223,3 +250,3 @@ | ||
consumer.start(); | ||
const err = await pEvent(consumer, 'error'); | ||
const err: any = await pEvent(consumer, 'error'); | ||
consumer.stop(); | ||
@@ -265,17 +292,11 @@ | ||
sqs.receiveMessage = stubReject(credentialsErr); | ||
const errorListener = sandbox.stub(); | ||
consumer.on('error', errorListener); | ||
return new Promise((resolve) => { | ||
const timings = []; | ||
const errorListener = sandbox.stub().callsFake(() => timings.push(new Date())); | ||
consumer.start(); | ||
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); | ||
consumer.stop(); | ||
errorListener.onThirdCall().callsFake(() => { | ||
consumer.stop(); | ||
sandbox.assert.calledThrice(sqs.receiveMessage); | ||
assert.isAtLeast(timings[1] - timings[0], AUTHENTICATION_ERROR_TIMEOUT); | ||
resolve(); | ||
}); | ||
consumer.on('error', errorListener); | ||
consumer.start(); | ||
}); | ||
sandbox.assert.calledTwice(errorListener); | ||
sandbox.assert.calledTwice(sqs.receiveMessage); | ||
}); | ||
@@ -289,17 +310,11 @@ | ||
sqs.receiveMessage = stubReject(invalidSignatureErr); | ||
const errorListener = sandbox.stub(); | ||
consumer.on('error', errorListener); | ||
return new Promise((resolve) => { | ||
const timings = []; | ||
const errorListener = sandbox.stub().callsFake(() => timings.push(new Date())); | ||
consumer.start(); | ||
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); | ||
consumer.stop(); | ||
errorListener.onThirdCall().callsFake(() => { | ||
consumer.stop(); | ||
sandbox.assert.calledThrice(sqs.receiveMessage); | ||
assert.isAtLeast(timings[1] - timings[0], AUTHENTICATION_ERROR_TIMEOUT); | ||
resolve(); | ||
}); | ||
consumer.on('error', errorListener); | ||
consumer.start(); | ||
}); | ||
sandbox.assert.calledTwice(errorListener); | ||
sandbox.assert.calledTwice(sqs.receiveMessage); | ||
}); | ||
@@ -313,17 +328,11 @@ | ||
sqs.receiveMessage = stubReject(unknownEndpointErr); | ||
const errorListener = sandbox.stub(); | ||
consumer.on('error', errorListener); | ||
return new Promise((resolve) => { | ||
const timings = []; | ||
const errorListener = sandbox.stub().callsFake(() => timings.push(new Date())); | ||
consumer.start(); | ||
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); | ||
consumer.stop(); | ||
errorListener.onThirdCall().callsFake(() => { | ||
consumer.stop(); | ||
sandbox.assert.calledThrice(sqs.receiveMessage); | ||
assert.isAtLeast(timings[1] - timings[0], AUTHENTICATION_ERROR_TIMEOUT); | ||
resolve(); | ||
}); | ||
consumer.on('error', errorListener); | ||
consumer.start(); | ||
}); | ||
sandbox.assert.calledTwice(errorListener); | ||
sandbox.assert.calledTwice(sqs.receiveMessage); | ||
}); | ||
@@ -340,16 +349,8 @@ | ||
}); | ||
return new Promise((resolve) => { | ||
const timings = []; | ||
const timeListener = sandbox.stub().callsFake(() => timings.push(new Date())); | ||
timeListener.onThirdCall().callsFake(() => { | ||
consumer.stop(); | ||
sandbox.assert.calledThrice(sqs.receiveMessage); | ||
assert.isAtLeast(timings[1] - timings[0], POLLING_TIMEOUT); | ||
resolve(); | ||
}); | ||
consumer.start(); | ||
await clock.tickAsync(POLLING_TIMEOUT); | ||
consumer.stop(); | ||
consumer.on('message_received', timeListener); | ||
consumer.start(); | ||
}); | ||
sandbox.assert.calledTwice(sqs.receiveMessage); | ||
}); | ||
@@ -409,9 +410,7 @@ | ||
return new Promise((resolve) => { | ||
handleMessage.onSecondCall().callsFake(() => { | ||
consumer.stop(); | ||
resolve(); | ||
}); | ||
consumer.start(); | ||
}); | ||
consumer.start(); | ||
await clock.runToLastAsync(); | ||
consumer.stop(); | ||
sandbox.assert.calledTwice(handleMessage); | ||
}); | ||
@@ -641,2 +640,72 @@ | ||
}); | ||
it('extends visibility timeout for long running handler functions', async () => { | ||
consumer = new Consumer({ | ||
queueUrl: 'some-queue-url', | ||
region: 'some-region', | ||
handleMessage: () => new Promise((resolve) => setTimeout(resolve, 75000)), | ||
sqs, | ||
visibilityTimeout: 40, | ||
heartbeatInterval: 30 | ||
}); | ||
const clearIntervalSpy = sinon.spy(global, 'clearInterval'); | ||
consumer.start(); | ||
await Promise.all([pEvent(consumer, 'response_processed'), clock.tickAsync(75000)]); | ||
consumer.stop(); | ||
sandbox.assert.calledWith(sqs.changeMessageVisibility, { | ||
QueueUrl: 'some-queue-url', | ||
ReceiptHandle: 'receipt-handle', | ||
VisibilityTimeout: 70 | ||
}); | ||
sandbox.assert.calledWith(sqs.changeMessageVisibility, { | ||
QueueUrl: 'some-queue-url', | ||
ReceiptHandle: 'receipt-handle', | ||
VisibilityTimeout: 100 | ||
}); | ||
sandbox.assert.calledOnce(clearIntervalSpy); | ||
}); | ||
it('extends visibility timeout for long running batch handler functions', async () => { | ||
sqs.receiveMessage = stubResolve({ | ||
Messages: [ | ||
{ MessageId: '1', ReceiptHandle: 'receipt-handle-1', Body: 'body-1' }, | ||
{ MessageId: '2', ReceiptHandle: 'receipt-handle-2', Body: 'body-2' }, | ||
{ MessageId: '3', ReceiptHandle: 'receipt-handle-3', Body: 'body-3' } | ||
] | ||
}); | ||
consumer = new Consumer({ | ||
queueUrl: 'some-queue-url', | ||
region: 'some-region', | ||
handleMessageBatch: () => new Promise((resolve) => setTimeout(resolve, 75000)), | ||
batchSize: 3, | ||
sqs, | ||
visibilityTimeout: 40, | ||
heartbeatInterval: 30 | ||
}); | ||
const clearIntervalSpy = sinon.spy(global, 'clearInterval'); | ||
consumer.start(); | ||
await Promise.all([pEvent(consumer, 'response_processed'), clock.tickAsync(75000)]); | ||
consumer.stop(); | ||
sandbox.assert.calledWith(sqs.changeMessageVisibilityBatch, { | ||
QueueUrl: 'some-queue-url', | ||
Entries: [ | ||
{ Id: '1', ReceiptHandle: 'receipt-handle-1', VisibilityTimeout: 70 }, | ||
{ Id: '2', ReceiptHandle: 'receipt-handle-2', VisibilityTimeout: 70 }, | ||
{ Id: '3', ReceiptHandle: 'receipt-handle-3', VisibilityTimeout: 70 } | ||
] | ||
}); | ||
sandbox.assert.calledWith(sqs.changeMessageVisibilityBatch, { | ||
QueueUrl: 'some-queue-url', | ||
Entries: [ | ||
{ Id: '1', ReceiptHandle: 'receipt-handle-1', VisibilityTimeout: 100 }, | ||
{ Id: '2', ReceiptHandle: 'receipt-handle-2', VisibilityTimeout: 100 }, | ||
{ Id: '3', ReceiptHandle: 'receipt-handle-3', VisibilityTimeout: 100 } | ||
] | ||
}); | ||
sandbox.assert.calledOnce(clearIntervalSpy); | ||
}); | ||
}); | ||
@@ -649,3 +718,3 @@ | ||
await pEvent(consumer, 'stopped'); | ||
await Promise.all([pEvent(consumer, 'stopped'), clock.runAllAsync()]); | ||
@@ -658,3 +727,3 @@ sandbox.assert.calledOnce(handleMessage); | ||
consumer.stop(); | ||
await pEvent(consumer, 'stopped'); | ||
await Promise.all([pEvent(consumer, 'stopped'), clock.runAllAsync()]); | ||
}); | ||
@@ -667,29 +736,23 @@ | ||
return new Promise((resolve) => { | ||
consumer.start(); | ||
consumer.stop(); | ||
consumer.stop(); | ||
consumer.stop(); | ||
consumer.start(); | ||
consumer.stop(); | ||
consumer.stop(); | ||
consumer.stop(); | ||
await clock.runAllAsync(); | ||
setTimeout(() => { | ||
sandbox.assert.calledOnce(handleStop); | ||
resolve(); | ||
}, 10); | ||
}); | ||
sandbox.assert.calledOnce(handleStop); | ||
}); | ||
it('fires a stopped event a second time if started and stopped twice', async () => { | ||
return new Promise((resolve) => { | ||
const handleStop = sandbox.stub().returns(null).onSecondCall().callsFake(() => { | ||
sandbox.assert.calledTwice(handleStop); | ||
resolve(); | ||
}); | ||
const handleStop = sandbox.stub().returns(null); | ||
consumer.on('stopped', handleStop); | ||
consumer.on('stopped', handleStop); | ||
consumer.start(); | ||
consumer.stop(); | ||
consumer.start(); | ||
consumer.stop(); | ||
}); | ||
consumer.start(); | ||
consumer.stop(); | ||
consumer.start(); | ||
consumer.stop(); | ||
await clock.runAllAsync(); | ||
sandbox.assert.calledTwice(handleStop); | ||
}); | ||
@@ -696,0 +759,0 @@ }); |
@@ -19,2 +19,2 @@ { | ||
] | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
120203
28
1505
176
19
- Removedaws-sdk@^2.443.0