Comparing version 1.9.0 to 1.10.0
Changelog | ||
========= | ||
# 1.10.0 | ||
## Additions | ||
- Support message expiration and queue `messageTtl` | ||
## Fixes | ||
- Acked messages where sent to dead letter exchange, they shouldn't, and are not anymore | ||
# 1.9.0 | ||
@@ -5,0 +13,0 @@ |
@@ -14,2 +14,12 @@ "use strict"; | ||
const messageId = properties.messageId || `smq.mid-${(0, _shared.generateId)()}`; | ||
const messageProperties = { ...properties, | ||
messageId | ||
}; | ||
const timestamp = messageProperties.timestamp = properties.timestamp || Date.now(); | ||
let ttl; | ||
if (properties.expiration) { | ||
ttl = messageProperties.ttl = timestamp + parseInt(properties.expiration); | ||
} | ||
const message = { | ||
@@ -20,5 +30,3 @@ fields: { ...fields, | ||
content, | ||
properties: { ...properties, | ||
messageId | ||
}, | ||
properties: messageProperties, | ||
consume, | ||
@@ -30,4 +38,10 @@ ack, | ||
Object.defineProperty(message, 'messageId', { | ||
get: () => messageId | ||
get() { | ||
return messageId; | ||
} | ||
}); | ||
Object.defineProperty(message, 'ttl', { | ||
value: ttl | ||
}); | ||
Object.defineProperty(message, 'consumerTag', { | ||
@@ -34,0 +48,0 @@ get: () => message.fields.consumerTag, |
@@ -25,2 +25,3 @@ "use strict"; | ||
let maxLength = 'maxLength' in options ? options.maxLength : Infinity; | ||
const messageTtl = options.messageTtl; | ||
const { | ||
@@ -87,3 +88,6 @@ deadLetterExchange, | ||
if (stopped) return; | ||
const message = (0, _Message.Message)(fields, content, properties, onMessageConsumed); | ||
const messageProperties = { ...properties | ||
}; | ||
if (messageTtl) messageProperties.expiration = messageProperties.expiration || messageTtl; | ||
const message = (0, _Message.Message)(fields, content, messageProperties, onMessageConsumed); | ||
const capacity = getCapacity(); | ||
@@ -197,6 +201,14 @@ messages.push(message); | ||
if (stopped || !pendingMessageCount || !n) return []; | ||
const now = Date.now(); | ||
const msgs = []; | ||
const evict = []; | ||
for (const message of messages) { | ||
if (message.pending) continue; | ||
if (message.ttl && message.ttl < now) { | ||
evict.push(message); | ||
continue; | ||
} | ||
message.consume(consumeOptions); | ||
@@ -208,2 +220,4 @@ pendingMessageCount--; | ||
for (const expired of evict) nack(expired, false, false); | ||
return msgs; | ||
@@ -227,2 +241,3 @@ } | ||
const pending = allUpTo && getPendingMessages(message); | ||
let deadLetter = false; | ||
@@ -243,2 +258,3 @@ switch (operation) { | ||
if (!dequeue(message)) return; | ||
deadLetter = !!deadLetterExchange; | ||
break; | ||
@@ -251,4 +267,6 @@ } | ||
if (deadLetterExchange) { | ||
const deadMessage = (0, _Message.Message)(message.fields, message.content, message.properties); | ||
if (deadLetter) { | ||
const deadMessage = (0, _Message.Message)(message.fields, message.content, { ...message.properties, | ||
expiration: undefined | ||
}); | ||
if (deadLetterRoutingKey) deadMessage.fields.routingKey = deadLetterRoutingKey; | ||
@@ -255,0 +273,0 @@ emit('dead-letter', { |
{ | ||
"name": "smqp", | ||
"version": "1.9.0", | ||
"version": "1.10.0", | ||
"description": "Synchronous message queuing package", | ||
@@ -62,3 +62,4 @@ "author": { | ||
"chai": "^4.2.0", | ||
"eslint": "^6.3.0", | ||
"chronokinesis": "^2.0.1", | ||
"eslint": "^6.4.0", | ||
"markdown-toc": "^1.2.0", | ||
@@ -65,0 +66,0 @@ "mocha": "^6.2.0", |
@@ -10,6 +10,13 @@ import {generateId} from './shared'; | ||
const messageId = properties.messageId || `smq.mid-${generateId()}`; | ||
const messageProperties = {...properties, messageId}; | ||
const timestamp = messageProperties.timestamp = properties.timestamp || Date.now(); | ||
let ttl; | ||
if (properties.expiration) { | ||
ttl = messageProperties.ttl = timestamp + parseInt(properties.expiration); | ||
} | ||
const message = { | ||
fields: {...fields, consumerTag: undefined}, | ||
content, | ||
properties: {...properties, messageId}, | ||
properties: messageProperties, | ||
consume, | ||
@@ -22,5 +29,11 @@ ack, | ||
Object.defineProperty(message, 'messageId', { | ||
get: () => messageId | ||
get() { | ||
return messageId; | ||
} | ||
}); | ||
Object.defineProperty(message, 'ttl', { | ||
value: ttl | ||
}); | ||
Object.defineProperty(message, 'consumerTag', { | ||
@@ -27,0 +40,0 @@ get: () => message.fields.consumerTag, |
@@ -14,2 +14,3 @@ import {generateId, sortByPriority} from './shared'; | ||
let maxLength = 'maxLength' in options ? options.maxLength : Infinity; | ||
const messageTtl = options.messageTtl; | ||
@@ -83,3 +84,5 @@ const {deadLetterExchange, deadLetterRoutingKey} = options; | ||
const message = Message(fields, content, properties, onMessageConsumed); | ||
const messageProperties = {...properties}; | ||
if (messageTtl) messageProperties.expiration = messageProperties.expiration || messageTtl; | ||
const message = Message(fields, content, messageProperties, onMessageConsumed); | ||
@@ -194,5 +197,11 @@ const capacity = getCapacity(); | ||
const now = Date.now(); | ||
const msgs = []; | ||
const evict = []; | ||
for (const message of messages) { | ||
if (message.pending) continue; | ||
if (message.ttl && message.ttl < now) { | ||
evict.push(message); | ||
continue; | ||
} | ||
message.consume(consumeOptions); | ||
@@ -204,2 +213,4 @@ pendingMessageCount--; | ||
for (const expired of evict) nack(expired, false, false); | ||
return msgs; | ||
@@ -224,2 +235,3 @@ } | ||
let deadLetter = false; | ||
switch (operation) { | ||
@@ -237,2 +249,3 @@ case 'ack': { | ||
if (!dequeue(message)) return; | ||
deadLetter = !!deadLetterExchange; | ||
break; | ||
@@ -247,4 +260,4 @@ } | ||
if (deadLetterExchange) { | ||
const deadMessage = Message(message.fields, message.content, message.properties); | ||
if (deadLetter) { | ||
const deadMessage = Message(message.fields, message.content, {...message.properties, expiration: undefined}); | ||
if (deadLetterRoutingKey) deadMessage.fields.routingKey = deadLetterRoutingKey; | ||
@@ -251,0 +264,0 @@ |
76503
2351
10