Comparing version 0.5.0 to 0.6.0
@@ -0,1 +1,5 @@ | ||
# 0.6.0 | ||
* Pass `attemptsExhausted` to `error` emitter indicating when the `redeliveryCount` equals `max_deliver`. | ||
# 0.5.0 | ||
@@ -2,0 +6,0 @@ |
@@ -13,2 +13,17 @@ import { ConnectionOptions } from 'nats'; | ||
start: (def: JobDef) => { | ||
/** | ||
* To be used in conjunction with SIGTERM and SIGINT. | ||
* | ||
* ```ts | ||
* const processor = await jobProcessor() | ||
* const stop = processor.start({}) | ||
* const shutDown = async () => { | ||
* await stop() | ||
* process.exit(0) | ||
* } | ||
* | ||
* process.on('SIGTERM', shutDown) | ||
* process.on('SIGINT', shutDown) | ||
* ``` | ||
*/ | ||
stop: () => Promise<void>; | ||
@@ -15,0 +30,0 @@ }; |
@@ -84,2 +84,5 @@ "use strict"; | ||
const emitter = new eventemitter3_1.default(); | ||
const emit = (event, data) => { | ||
emitter.emit(event, { type: event, ...data }); | ||
}; | ||
const start = (def) => { | ||
@@ -100,3 +103,4 @@ const abortController = new AbortController(); | ||
// TODO: Maybe handle errors better | ||
await createStream(conn, def).catch(); | ||
// eslint-disable-next-line | ||
await createStream(conn, def).catch(() => { }); | ||
// Create pull consumer | ||
@@ -123,7 +127,7 @@ const ps = await createConsumer(conn, def); | ||
try { | ||
emitter.emit('start', { ...metadata, type: 'start' }); | ||
emit('start', metadata); | ||
// Process the message | ||
await def.perform(msg, { signal: abortController.signal, def, js }); | ||
debug('completed'); | ||
emitter.emit('complete', { ...metadata, type: 'complete' }); | ||
emit('complete', metadata); | ||
// Ack message | ||
@@ -135,8 +139,4 @@ await msg.ackAck(); | ||
const backoffMs = (0, util_1.getNextBackoff)(backoff, msg); | ||
emitter.emit('error', { | ||
...metadata, | ||
type: 'error', | ||
backoffMs, | ||
error: e, | ||
}); | ||
const attemptsExhausted = msg.info.redeliveryCount === consumerConfig.max_deliver; | ||
emit('error', { ...metadata, attemptsExhausted, backoffMs, error: e }); | ||
debug('next backoff ms %d', backoffMs); | ||
@@ -159,17 +159,2 @@ // Negative ack message with backoff | ||
}; | ||
/** | ||
* To be used in conjunction with SIGTERM and SIGINT. | ||
* | ||
* ```ts | ||
* const processor = await jobProcessor() | ||
* const stop = processor.start({}) | ||
* const shutDown = async () => { | ||
* await stop() | ||
* process.exit(0) | ||
* } | ||
* | ||
* process.on('SIGTERM', shutDown) | ||
* process.on('SIGINT', shutDown) | ||
* ``` | ||
*/ | ||
const stop = () => { | ||
@@ -187,3 +172,20 @@ // Send abort signal to perform | ||
run(); | ||
return { stop }; | ||
return { | ||
/** | ||
* To be used in conjunction with SIGTERM and SIGINT. | ||
* | ||
* ```ts | ||
* const processor = await jobProcessor() | ||
* const stop = processor.start({}) | ||
* const shutDown = async () => { | ||
* await stop() | ||
* process.exit(0) | ||
* } | ||
* | ||
* process.on('SIGTERM', shutDown) | ||
* process.on('SIGINT', shutDown) | ||
* ``` | ||
*/ | ||
stop, | ||
}; | ||
}; | ||
@@ -190,0 +192,0 @@ const stop = async () => { |
{ | ||
"name": "nats-jobs", | ||
"version": "0.5.0", | ||
"version": "0.6.0", | ||
"description": "Background job processor using NATS", | ||
@@ -5,0 +5,0 @@ "author": "GovSpend", |
@@ -107,2 +107,5 @@ import ms from 'ms' | ||
const emitter = new EventEmitter<Events>() | ||
const emit = (event: Events, data: object) => { | ||
emitter.emit(event, { type: event, ...data }) | ||
} | ||
@@ -125,3 +128,4 @@ const start = (def: JobDef) => { | ||
// TODO: Maybe handle errors better | ||
await createStream(conn, def).catch() | ||
// eslint-disable-next-line | ||
await createStream(conn, def).catch(() => {}) | ||
// Create pull consumer | ||
@@ -149,7 +153,7 @@ const ps = await createConsumer(conn, def) | ||
try { | ||
emitter.emit('start', { ...metadata, type: 'start' }) | ||
emit('start', metadata) | ||
// Process the message | ||
await def.perform(msg, { signal: abortController.signal, def, js }) | ||
debug('completed') | ||
emitter.emit('complete', { ...metadata, type: 'complete' }) | ||
emit('complete', metadata) | ||
// Ack message | ||
@@ -160,8 +164,5 @@ await msg.ackAck() | ||
const backoffMs = getNextBackoff(backoff, msg) | ||
emitter.emit('error', { | ||
...metadata, | ||
type: 'error', | ||
backoffMs, | ||
error: e, | ||
}) | ||
const attemptsExhausted = | ||
msg.info.redeliveryCount === consumerConfig.max_deliver | ||
emit('error', { ...metadata, attemptsExhausted, backoffMs, error: e }) | ||
debug('next backoff ms %d', backoffMs) | ||
@@ -184,17 +185,2 @@ // Negative ack message with backoff | ||
/** | ||
* To be used in conjunction with SIGTERM and SIGINT. | ||
* | ||
* ```ts | ||
* const processor = await jobProcessor() | ||
* const stop = processor.start({}) | ||
* const shutDown = async () => { | ||
* await stop() | ||
* process.exit(0) | ||
* } | ||
* | ||
* process.on('SIGTERM', shutDown) | ||
* process.on('SIGINT', shutDown) | ||
* ``` | ||
*/ | ||
const stop = () => { | ||
@@ -213,3 +199,20 @@ // Send abort signal to perform | ||
return { stop } | ||
return { | ||
/** | ||
* To be used in conjunction with SIGTERM and SIGINT. | ||
* | ||
* ```ts | ||
* const processor = await jobProcessor() | ||
* const stop = processor.start({}) | ||
* const shutDown = async () => { | ||
* await stop() | ||
* process.exit(0) | ||
* } | ||
* | ||
* process.on('SIGTERM', shutDown) | ||
* process.on('SIGINT', shutDown) | ||
* ``` | ||
*/ | ||
stop, | ||
} | ||
} | ||
@@ -216,0 +219,0 @@ |
39416
1074