Comparing version 0.4.3 to 0.5.0
@@ -10,2 +10,18 @@ ## Changelog | ||
### v0.5.0 (2021-02-08) | ||
- [`#233`](https://github.com/lifion/sqslite/pull/233): Implement real fifo queue | ||
- [`1d63739`](https://github.com/lifion/sqslite/commit/1d63739b23076405a6bf6c45693dafcae836806e): Implement true FIFO | ||
- [`697eef1`](https://github.com/lifion/sqslite/commit/697eef1a8daa7f609843afc6d98034f831fe91d3): Add validations for creating fifo-queue | ||
- [`dde8037`](https://github.com/lifion/sqslite/commit/dde803796bcf46def41211506004b5f29e1eec76): Upgrade dependencies | ||
- [`40bab64`](https://github.com/lifion/sqslite/commit/40bab64e0fb621f560a06d15fa3b938fef9918c0): Update package-lock | ||
- [`2911576`](https://github.com/lifion/sqslite/commit/2911576f03071bd9285a21d60bd39970ef38ed7e): Add edge case for fifo deduplication | ||
- [`f071e93`](https://github.com/lifion/sqslite/commit/f071e9340154aa1eba04377484eb1f574e3e0113): Add unit tests | ||
- [`5ff6a19`](https://github.com/lifion/sqslite/commit/5ff6a195cf1958dc3c359286843722e812e9635c): Update renovate | ||
- [`dc288ea`](https://github.com/lifion/sqslite/commit/dc288eadab5628f5efa57aba24524e6828272033): Upgrade dependencies | ||
- [`8b5453f`](https://github.com/lifion/sqslite/commit/8b5453f6682bdbff6571cbb78636ab4d95fc0f01): Upgrade major dependencies | ||
- [`72e9e61`](https://github.com/lifion/sqslite/commit/72e9e611344627f6c95d6d152edd80b829af84e5): Upgrade dependencies | ||
- [`e5eed58`](https://github.com/lifion/sqslite/commit/e5eed5803ae715d057cd4504022e2dcd973acb90): Upgrade dependencies | ||
- [`f0bbb21`](https://github.com/lifion/sqslite/commit/f0bbb210ccc79eb75e522cb9d4605ad02d7dc1a0): Add TODOs | ||
### v0.4.3 (2021-02-01) | ||
@@ -12,0 +28,0 @@ |
164
lib/sqs.js
@@ -12,6 +12,8 @@ 'use strict'; | ||
const CONTENT_DEDUPLICATION_INTERVAL_5_MIN = 5 * 60 * 1000; | ||
const queues = new Map(); | ||
function validateQueueName(queueName, isFifo) { | ||
if (isFifo && !queueName.endsWith('.fifo')) { | ||
if ((isFifo && !queueName.endsWith('.fifo')) || (!isFifo && queueName.endsWith('.fifo'))) { | ||
throw new ErrorWithCode( | ||
@@ -166,3 +168,3 @@ 'The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, must end with .fifo suffix and be 1 to 80 in length.', | ||
MessageBody, | ||
MessageDeduplicationId, | ||
MessageDeduplicationId = false, | ||
MessageGroupId, | ||
@@ -183,3 +185,2 @@ MessageSystemAttributes, | ||
} | ||
// TODO: make actual delay work for DelaySeconds | ||
@@ -196,12 +197,20 @@ const { Attributes } = queue; | ||
if (FifoQueue && !ContentBasedDeduplication && !MessageDeduplicationId) { | ||
throw new ErrorWithCode( | ||
'The queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided explicitly', | ||
'InvalidParameterValue' | ||
const MessageId = uuid(); | ||
const { DelaySeconds = null } = FifoQueue ? Attributes : params; | ||
const SentTimestamp = Date.now(); | ||
let AvailableSince = SentTimestamp; | ||
if (ContentBasedDeduplication) { | ||
const duplicate = queue.messages.find( | ||
(message) => | ||
!message['@State'].isRead && | ||
(message.MessageDeduplicationId === MessageDeduplicationId || | ||
(!message.MessageDeduplicationId && message.MessageBody === MessageBody)) | ||
); | ||
if (duplicate) { | ||
AvailableSince = SentTimestamp + CONTENT_DEDUPLICATION_INTERVAL_5_MIN; | ||
} | ||
} | ||
const MessageId = uuid(); | ||
const { DelaySeconds = null } = FifoQueue ? Attributes : params; | ||
const message = { | ||
@@ -212,4 +221,5 @@ '@State': { isRead: false }, | ||
ApproximateReceiveCount: 0, | ||
AvailableSince, | ||
SenderId: 'AAAAAAAAAAAAAAAAAAAAA:i-00a0aaa0aaa000000', | ||
SentTimestamp: Date.now() | ||
SentTimestamp | ||
}, | ||
@@ -219,5 +229,9 @@ MessageBody, // TODO: assert for all valid message variations | ||
...(MessageAttributes && { MessageAttributes }), | ||
...(MessageDeduplicationId && { SequenceNumber: '00000000000000000000' }), // TODO: The length of SequenceNumber is 128 bits, continues to increase for a particular MessageGroupId. Implement seq number, | ||
...(MessageDeduplicationId && { | ||
MessageDeduplicationId, | ||
SequenceNumber: '00000000000000000000' | ||
}), // TODO: The length of SequenceNumber is 128 bits, continues to increase for a particular MessageGroupId. Implement seq number, | ||
...(DelaySeconds && { DelaySeconds }), | ||
...(MessageSystemAttributes && { MessageSystemAttributes }) | ||
...(MessageSystemAttributes && { MessageSystemAttributes }), | ||
MessageGroupId | ||
}; | ||
@@ -229,3 +243,6 @@ | ||
...(MessageAttributes && { MD5OfMessageAttributes: md5(MessageAttributes) }), | ||
...(MessageDeduplicationId && { SequenceNumber: '00000000000000000000' }), | ||
...(MessageDeduplicationId && { | ||
MessageDeduplicationId, | ||
SequenceNumber: '00000000000000000000' | ||
}), | ||
...(DelaySeconds && { DelaySeconds }), | ||
@@ -238,3 +255,3 @@ ...(MessageSystemAttributes && { MD5OfMessageSystemAttributes: md5(MessageSystemAttributes) }) | ||
} | ||
// TODO: duplicates arriving together in the batch | ||
function sendMessageBatch(params) { | ||
@@ -253,8 +270,31 @@ const { QueueUrl, messages } = params; | ||
const { Attributes } = queue; | ||
const { ContentBasedDeduplication } = Attributes; | ||
const result = []; | ||
const SentTimestamp = Date.now(); | ||
let AvailableSince = SentTimestamp; | ||
messages.forEach((message) => { | ||
const { DelaySeconds, Id, MessageAttributes, MessageBody } = message; | ||
const { | ||
DelaySeconds, | ||
Id, | ||
MessageAttributes, | ||
MessageBody, | ||
MessageDeduplicationId = false, | ||
MessageGroupId | ||
} = message; | ||
// TODO: check if indeed auto generated | ||
const MessageId = uuid(); | ||
if (ContentBasedDeduplication) { | ||
const duplicate = queue.messages.find( | ||
(msg) => | ||
!msg['@State'].isRead && | ||
(msg.MessageDeduplicationId === MessageDeduplicationId || | ||
(!msg.MessageDeduplicationId && msg.MessageBody === MessageBody)) | ||
); | ||
if (duplicate) { | ||
AvailableSince = SentTimestamp + CONTENT_DEDUPLICATION_INTERVAL_5_MIN; | ||
} | ||
} | ||
@@ -266,4 +306,5 @@ queue.messages.push({ | ||
ApproximateReceiveCount: 0, | ||
AvailableSince, | ||
SenderId: 'AAAAAAAAAAAAAAAAAAAAA:i-00a0aaa0aaa000000', | ||
SentTimestamp: Date.now() | ||
SentTimestamp | ||
}, | ||
@@ -273,2 +314,4 @@ Id, | ||
MessageBody, | ||
...(MessageDeduplicationId && { MessageDeduplicationId }), | ||
MessageGroupId, | ||
MessageId, | ||
@@ -314,28 +357,73 @@ ...(DelaySeconds && { DelaySeconds }) | ||
// if (WaitTimeSeconds) { | ||
// // TODO: amount of time can wait for messages | ||
// } | ||
const stateParams = VisibilityTimeout ? { VisibilityTimeout, isRead: true } : { isRead: true }; | ||
const ReceiptHandle = uuid(); | ||
const ReceiptHandle = uuid(); // TODO: make unique ReceiptHandle for every message | ||
const unreadMessages = queue.messages.filter((msg) => !msg['@State'].isRead); | ||
const readMessages = queue.messages.filter((msg) => msg['@State'].isRead); | ||
let messages = []; | ||
let remainingMessages = []; | ||
const messages = unreadMessages.slice(0, MaxNumberOfMessages).map((msg) => ({ | ||
...msg, | ||
'@State': { | ||
...msg['@State'], | ||
...stateParams, | ||
ReceiptHandle | ||
}, | ||
Attributes: { | ||
...msg.Attributes, | ||
ApproximateFirstReceiveTimestamp: Date.now(), // TODO: might need to update only on 1st read | ||
ApproximateReceiveCount: msg.Attributes.ApproximateReceiveCount + 1 | ||
if (queue.Attributes.FifoQueue) { | ||
let firstMessageGroupId; | ||
for (let i = 0; i < queue.messages.length; i += 1) { | ||
if (queue.messages[i]['@State'].isRead === false) { | ||
firstMessageGroupId = queue.messages[i].MessageGroupId; | ||
break; | ||
} | ||
} | ||
})); | ||
const remainingMessages = unreadMessages.slice(MaxNumberOfMessages); | ||
queue.messages = [...readMessages, ...messages, ...remainingMessages]; | ||
const messageTest = {}; | ||
let count = 0; | ||
for (let i = 0; i < queue.messages.length; i += 1) { | ||
if (count >= MaxNumberOfMessages) break; | ||
const msg = queue.messages[i]; | ||
if ( | ||
msg['@State'].isRead === false && | ||
msg.MessageGroupId === firstMessageGroupId && | ||
msg.Attributes.AvailableSince - Date.now() <= 0 // TODO: this is in case of deduplication, turn into function | ||
) { | ||
const updatedMessage = { | ||
...msg, | ||
'@State': { | ||
...msg['@State'], | ||
...stateParams, | ||
ReceiptHandle | ||
}, | ||
Attributes: { | ||
...msg.Attributes, | ||
ApproximateFirstReceiveTimestamp: Date.now(), // TODO: might need to update only on 1st read | ||
ApproximateReceiveCount: msg.Attributes.ApproximateReceiveCount + 1 | ||
} | ||
}; | ||
messageTest[i] = updatedMessage; | ||
messages.push(updatedMessage); | ||
count += 1; | ||
} | ||
} | ||
const keys = Object.keys(messageTest); | ||
for (let j = 0; j < keys.length; j += 1) { | ||
const index = keys[j]; | ||
queue.messages[index] = messageTest[index]; | ||
} | ||
} else { | ||
const unreadMessages = queue.messages.filter((msg) => !msg['@State'].isRead); | ||
const readMessages = queue.messages.filter((msg) => msg['@State'].isRead); | ||
messages = unreadMessages.slice(0, MaxNumberOfMessages).map((msg) => ({ | ||
...msg, | ||
'@State': { | ||
...msg['@State'], | ||
...stateParams, | ||
ReceiptHandle | ||
}, | ||
Attributes: { | ||
...msg.Attributes, | ||
ApproximateFirstReceiveTimestamp: Date.now(), // TODO: might need to update only on 1st read | ||
ApproximateReceiveCount: msg.Attributes.ApproximateReceiveCount + 1 | ||
} | ||
})); | ||
remainingMessages = unreadMessages.slice(MaxNumberOfMessages); | ||
queue.messages = [...readMessages, ...messages, ...remainingMessages]; | ||
} | ||
return messages.map(({ Attributes, MessageAttributes, MessageBody, MessageId }) => ({ | ||
@@ -342,0 +430,0 @@ Body: MessageBody, |
@@ -120,3 +120,3 @@ 'use strict'; | ||
const attributes = object({ | ||
ContentBasedDeduplication: optional(boolean()), | ||
ContentBasedDeduplication: defaulted(optional(boolean()), false), | ||
DelaySeconds: defaulted(structs.delaySeconds, 0), | ||
@@ -123,0 +123,0 @@ FifoQueue: defaulted(optional(boolean()), false), |
{ | ||
"name": "sqslite", | ||
"version": "0.4.3", | ||
"version": "0.5.0", | ||
"description": "Lightweight module for integration testing AWS SQS.", | ||
@@ -36,3 +36,2 @@ "keywords": [], | ||
"test": "jest -c ./.jest.json", | ||
"type-check": "tsc --noEmit --project jsconfig.json", | ||
"version": "auto-changelog -p && git add CHANGELOG.md" | ||
@@ -51,5 +50,5 @@ }, | ||
"devDependencies": { | ||
"@types/node": "^14.14.22", | ||
"@types/node": "^14.14.25", | ||
"auto-changelog": "^2.2.1", | ||
"aws-sdk": "^2.834.0", | ||
"aws-sdk": "^2.839.0", | ||
"chalk": "^4.1.0", | ||
@@ -64,3 +63,3 @@ "check-engines": "^1.5.0", | ||
"jsdoc-to-markdown": "^6.0.1", | ||
"lint-staged": "^10.5.3", | ||
"lint-staged": "^10.5.4", | ||
"npm-watch": "^0.7.0", | ||
@@ -67,0 +66,0 @@ "prettier": "^2.2.1", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
92761
1401