New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

redis-ccp-queue

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

redis-ccp-queue - npm Package Compare versions

Comparing version

to
1.1.0

17

examples/consumer.js

@@ -5,15 +5,16 @@ 'use strict';

let c = new ccpq.Consumer('test', function (data, done) {
handleMessage(data, done)
});
function handleMessage(data, done) {
const handleMessage = (data, done) => {
console.log('CONSUMER:');
console.log(data);
setTimeout(function () {
setTimeout(() => {
done()
}, 100 + Math.random() * 500);
}
setTimeout(function () {
const c = new ccpq.Consumer('test', (data, done) => {
handleMessage(data, done)
});
setTimeout(() => {
c.shutdown();
}, 20000);
}, 20000);

@@ -5,13 +5,13 @@ 'use strict';

let p = new ccpq.Producer('test');
for (var i = 1; i <= 50; i++) {
const p = new ccpq.Producer('test');
for (let i = 1; i <= 50; i++) {
p.push({ name: 'a', num: i });
}
p.size(function (size) {
p.size((size) => {
console.log(size)
});
setTimeout(function () {
setTimeout(() => {
p.shutdown();
}, 2000);

@@ -11,3 +11,3 @@ 'use strict';

class Queue {
constructor (queueName, redisOptions) {
constructor(queueName, redisOptions) {
redisOptions = redisOptions || {};

@@ -21,3 +21,3 @@ this.redis = new Redis(redisOptions);

size (callback) {
size(callback) {
this.redis.llen(this.queueKey, (err, size) => {

@@ -28,3 +28,3 @@ callback(size);

push (data) {
push(data) {
if (typeof data === 'object') {

@@ -36,3 +36,3 @@ data = JSON.stringify(data)

pop (callback) {
pop(callback) {
this.redis.brpop(this.queueKey, this.timeout, (err, replies) => {

@@ -51,3 +51,3 @@ if (err) {

purge (callback) {
purge(callback) {
this.redis.del(this.queueKey, (err, reply) => {

@@ -60,3 +60,3 @@ if (callback && typeof callback === "function") {

shutdown () {
shutdown() {
this.shuttingDown = true;

@@ -78,3 +78,3 @@ debug('shutting down');

class Producer {
constructor (queueName, redisOptions) {
constructor(queueName, redisOptions) {
redisOptions = redisOptions || {};

@@ -86,8 +86,8 @@ this.queueName = queueName;

push (data) {
push(data) {
this.queue.push(data);
}
size (callback) {
this.queue.size(function(size) {
size(callback) {
this.queue.size((size) => {
callback(size);

@@ -97,3 +97,3 @@ })

shutdown () {
shutdown() {
this.queue.shutdown();

@@ -108,3 +108,3 @@ }

class Consumer {
constructor (queueName, callback, redisOptions) {
constructor(queueName, callback, redisOptions) {
redisOptions = redisOptions || {};

@@ -118,6 +118,6 @@ this.queueName = queueName;

process
.once('SIGINT', function () {
.once('SIGINT', () => {
self.queue.shutdown()
})
.once('SIGTERM', function () {
.once('SIGTERM', () => {
self.queue.shutdown()

@@ -129,3 +129,3 @@ });

waitForNext (self) {
waitForNext(self) {
self.queue.pop((data) => {

@@ -136,3 +136,3 @@ if (!self.queue.shuttingDown) {

} else {
self.callback(data, function() {
self.callback(data, () => {
self.done();

@@ -148,7 +148,7 @@ })

done () {
process.nextTick(() => {this.waitForNext(this)});
done() {
process.nextTick(() => { this.waitForNext(this) });
}
shutdown () {
shutdown() {
this.queue.shutdown();

@@ -155,0 +155,0 @@ }

{
"name": "redis-ccp-queue",
"version": "1.0.5",
"version": "1.1.0",
"description": "A simple and lightweight Competing Consumers Pattern Queue",

@@ -27,5 +27,5 @@ "license": "MIT",

"dependencies": {
"debug": "^2.2.0",
"ioredis": "^1.15.1"
"debug": "^4.1.0",
"ioredis": "^4.14.0"
}
}
# redis-ccp-queue
A simple and lightweight Competing Consumers Pattern Queue. Built with [node.js][nodejs-url] and [Redis][redis-url] (just about 100 lines of code).
A simple and lightweight Competing Consumers Pattern Queue. Built with [node.js][nodejs-url] and [Redis][redis-url] (just about 100 lines of code).

@@ -21,3 +21,3 @@ [![NPM Version][npm-image]][npm-url]

#### Create a new Producer
#### Create a new Producer

@@ -29,3 +29,3 @@ ```javascript

let p = new ccpq.Producer('QueueKey'); // create new producer instance
let p = new ccpq.Producer('QueueKey'); // create new producer instance

@@ -38,3 +38,3 @@ ...

#### Create a new Consumer
#### Create a new Consumer

@@ -48,8 +48,8 @@ When creating a `consumer` you pass a callback function, that is called every time, when a new message is available from the queue:

let c = new ccpq.Consumer('QueueKey', function (data, done) { // create new consumer instance
let c = new ccpq.Consumer('QueueKey', (data, done) => { // create new consumer instance
handleMessage(data, done)
});
// This is the function (callback) that actually handles the messages from the queue.
// Put your worker-code inside this function.
// This is the function (callback) that actually handles the messages from the queue.
// Put your worker-code inside this function.
function handleMessage(data, done) {

@@ -62,5 +62,5 @@ console.log('CONSUMER:');

Within your callback, you need to call the done() function, to let the `consumer` know, that the you are finished with the
message handling. The name of this function can be changed. just choose your preferred name and pass it as a second parameter
name to the callback function.
Within your callback, you need to call the done() function, to let the `consumer` know, that the you are finished with the
message handling. The name of this function can be changed. just choose your preferred name and pass it as a second parameter
name to the callback function.

@@ -70,3 +70,3 @@ #### Examples

I have attached a producer as well as a consumer in the `examples` folder.
To see the queue in acton, open three command line instances. Be sure to have redis installed and running.
To see the queue in acton, open three command line instances. Be sure to have redis installed and running.

@@ -79,3 +79,3 @@ In the first command line window lauch the producer with

Then in the second command line window launch the first consumer with
Then in the second command line window launch the first consumer with

@@ -86,3 +86,3 @@ ```

and immediately after then in the third command line window launch the second consumer with
and immediately after then in the third command line window launch the second consumer with

@@ -93,5 +93,5 @@ ```

You should now see both consumers pulling from the queue. Each message is only handled by one of the consumers.
I used some `setTimeout` functions to simulate longer message handling. I also added a timeout to disconnect the
consumers from Redis after a certain period.
You should now see both consumers pulling from the queue. Each message is only handled by one of the consumers.
I used some `setTimeout` functions to simulate longer message handling. I also added a timeout to disconnect the
consumers from Redis after a certain period.

@@ -101,9 +101,9 @@

The competing consumers pattern enables multiple consumers to pull and handle messages from the same queue,
with the guarantee that each message is consumed once only. This pattern also allows multiple producers or
senders to push messages to the same single queue. The queue is implemented as a FIFO (first in - first out queue):
The competing consumers pattern enables multiple consumers to pull and handle messages from the same queue,
with the guarantee that each message is consumed once only. This pattern also allows multiple producers or
senders to push messages to the same single queue. The queue is implemented as a FIFO (first in - first out queue):
```
Application instances Consumer service instances pool
generating messages processing messages
Application instances Consumer service instances pool
generating messages processing messages

@@ -119,9 +119,9 @@

-------------- ------------------------------------- --------------
-------------- ------------------------------------- --------------
| Producer 3 | ---> | ------ ------ ------ ------ | ---> | Consumer 2 |
-------------- | | M5 | | M4 | | M3 | ....> | M1 | | --------------
| ------ ------ ------ ------ |
. ------------------------------------- .
. Message Queue (FIFO) .
. .
. ------------------------------------- .
. Message Queue (FIFO) .
. .

@@ -162,3 +162,3 @@ -------------- --------------

#### *) Connect to Redis
#### *) Connect to Redis
When a new `Producer` or `Consumer` instance is created,

@@ -192,8 +192,8 @@ a connection to Redis will be created at the same time.

This is the initial version of this package. At the moment, it really does, what I expected is to do.
But I am sure, there is quite a lot of room for improvement. I am happy to discuss any comments and suggestions.
This is the initial version of this package. At the moment, it really does, what I expected is to do.
But I am sure, there is quite a lot of room for improvement. I am happy to discuss any comments and suggestions.
Please feel free to contact me if you see any possibility of improvement!
For the next major version I plan to implement a more reliable version where an additional list is used to track messages in transit.
If a processes fails to deliver with in a specified amount of time, an item could be moved back to the original queue for delivery.
For the next major version I plan to implement a more reliable version where an additional list is used to track messages in transit.
If a processes fails to deliver with in a specified amount of time, an item could be moved back to the original queue for delivery.

@@ -204,2 +204,3 @@ ## Version history

| -------------- | -------------- | -------- |
| 1.1.0 | 2019-10-14 | documentation update, code cleanup, dependency bump |
| 1.0.5 | 2016-03-03 | tiny bug fix |

@@ -227,5 +228,5 @@ | 1.0.4 | 2016-03-03 | correct termination of consumer on SIGINT, SIGTERM |

## Trademarks
Node.js is a trademark of Joyent Inc., Redis, and the Redis logo are the trademarks of
Salvatore Sanfilippo in the U.S. and other countries. Linux is a registered trademark of
Node.js is a trademark of Joyent Inc., Redis, and the Redis logo are the trademarks of
Salvatore Sanfilippo in the U.S. and other countries. Linux is a registered trademark of
Linus Torvalds. All other trademarks are the property of their respective owners.

@@ -237,3 +238,3 @@

>
>Copyright &copy; 2016 Sebastian Hildebrandt, [+innovations](http://www.plus-innovations.com).
>Copyright &copy; 2016-2019 Sebastian Hildebrandt, [+innovations](http://www.plus-innovations.com).
>

@@ -257,3 +258,3 @@ >Permission is hereby granted, free of charge, to any person obtaining a copy

>THE SOFTWARE.
>
>
>Further details see [LICENSE](LICENSE) file.

@@ -260,0 +261,0 @@

Sorry, the diff of this file is not supported yet