New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

smqp

Package Overview
Dependencies
Maintainers
1
Versions
79
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

smqp - npm Package Compare versions

Comparing version 1.9.0 to 1.10.0

8

CHANGELOG.md
Changelog
=========
# 1.10.0
## Additions
- Support message expiration and queue `messageTtl`
## Fixes
- Acked messages where sent to dead letter exchange, they shouldn't, and are not anymore
# 1.9.0

@@ -5,0 +13,0 @@

22

dist/src/Message.js

@@ -14,2 +14,12 @@ "use strict";

const messageId = properties.messageId || `smq.mid-${(0, _shared.generateId)()}`;
const messageProperties = { ...properties,
messageId
};
const timestamp = messageProperties.timestamp = properties.timestamp || Date.now();
let ttl;
if (properties.expiration) {
ttl = messageProperties.ttl = timestamp + parseInt(properties.expiration);
}
const message = {

@@ -20,5 +30,3 @@ fields: { ...fields,

content,
properties: { ...properties,
messageId
},
properties: messageProperties,
consume,

@@ -30,4 +38,10 @@ ack,

Object.defineProperty(message, 'messageId', {
get: () => messageId
get() {
return messageId;
}
});
Object.defineProperty(message, 'ttl', {
value: ttl
});
Object.defineProperty(message, 'consumerTag', {

@@ -34,0 +48,0 @@ get: () => message.fields.consumerTag,

@@ -25,2 +25,3 @@ "use strict";

let maxLength = 'maxLength' in options ? options.maxLength : Infinity;
const messageTtl = options.messageTtl;
const {

@@ -87,3 +88,6 @@ deadLetterExchange,

if (stopped) return;
const message = (0, _Message.Message)(fields, content, properties, onMessageConsumed);
const messageProperties = { ...properties
};
if (messageTtl) messageProperties.expiration = messageProperties.expiration || messageTtl;
const message = (0, _Message.Message)(fields, content, messageProperties, onMessageConsumed);
const capacity = getCapacity();

@@ -197,6 +201,14 @@ messages.push(message);

if (stopped || !pendingMessageCount || !n) return [];
const now = Date.now();
const msgs = [];
const evict = [];
for (const message of messages) {
if (message.pending) continue;
if (message.ttl && message.ttl < now) {
evict.push(message);
continue;
}
message.consume(consumeOptions);

@@ -208,2 +220,4 @@ pendingMessageCount--;

for (const expired of evict) nack(expired, false, false);
return msgs;

@@ -227,2 +241,3 @@ }

const pending = allUpTo && getPendingMessages(message);
let deadLetter = false;

@@ -243,2 +258,3 @@ switch (operation) {

if (!dequeue(message)) return;
deadLetter = !!deadLetterExchange;
break;

@@ -251,4 +267,6 @@ }

if (deadLetterExchange) {
const deadMessage = (0, _Message.Message)(message.fields, message.content, message.properties);
if (deadLetter) {
const deadMessage = (0, _Message.Message)(message.fields, message.content, { ...message.properties,
expiration: undefined
});
if (deadLetterRoutingKey) deadMessage.fields.routingKey = deadLetterRoutingKey;

@@ -255,0 +273,0 @@ emit('dead-letter', {

5

package.json
{
"name": "smqp",
"version": "1.9.0",
"version": "1.10.0",
"description": "Synchronous message queuing package",

@@ -62,3 +62,4 @@ "author": {

"chai": "^4.2.0",
"eslint": "^6.3.0",
"chronokinesis": "^2.0.1",
"eslint": "^6.4.0",
"markdown-toc": "^1.2.0",

@@ -65,0 +66,0 @@ "mocha": "^6.2.0",

@@ -10,6 +10,13 @@ import {generateId} from './shared';

const messageId = properties.messageId || `smq.mid-${generateId()}`;
const messageProperties = {...properties, messageId};
const timestamp = messageProperties.timestamp = properties.timestamp || Date.now();
let ttl;
if (properties.expiration) {
ttl = messageProperties.ttl = timestamp + parseInt(properties.expiration);
}
const message = {
fields: {...fields, consumerTag: undefined},
content,
properties: {...properties, messageId},
properties: messageProperties,
consume,

@@ -22,5 +29,11 @@ ack,

Object.defineProperty(message, 'messageId', {
get: () => messageId
get() {
return messageId;
}
});
Object.defineProperty(message, 'ttl', {
value: ttl
});
Object.defineProperty(message, 'consumerTag', {

@@ -27,0 +40,0 @@ get: () => message.fields.consumerTag,

@@ -14,2 +14,3 @@ import {generateId, sortByPriority} from './shared';

let maxLength = 'maxLength' in options ? options.maxLength : Infinity;
const messageTtl = options.messageTtl;

@@ -83,3 +84,5 @@ const {deadLetterExchange, deadLetterRoutingKey} = options;

const message = Message(fields, content, properties, onMessageConsumed);
const messageProperties = {...properties};
if (messageTtl) messageProperties.expiration = messageProperties.expiration || messageTtl;
const message = Message(fields, content, messageProperties, onMessageConsumed);

@@ -194,5 +197,11 @@ const capacity = getCapacity();

const now = Date.now();
const msgs = [];
const evict = [];
for (const message of messages) {
if (message.pending) continue;
if (message.ttl && message.ttl < now) {
evict.push(message);
continue;
}
message.consume(consumeOptions);

@@ -204,2 +213,4 @@ pendingMessageCount--;

for (const expired of evict) nack(expired, false, false);
return msgs;

@@ -224,2 +235,3 @@ }

let deadLetter = false;
switch (operation) {

@@ -237,2 +249,3 @@ case 'ack': {

if (!dequeue(message)) return;
deadLetter = !!deadLetterExchange;
break;

@@ -247,4 +260,4 @@ }

if (deadLetterExchange) {
const deadMessage = Message(message.fields, message.content, message.properties);
if (deadLetter) {
const deadMessage = Message(message.fields, message.content, {...message.properties, expiration: undefined});
if (deadLetterRoutingKey) deadMessage.fields.routingKey = deadLetterRoutingKey;

@@ -251,0 +264,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc