redis-ccp-queue
Advanced tools
Comparing version
@@ -5,15 +5,16 @@ 'use strict'; | ||
let c = new ccpq.Consumer('test', function (data, done) { | ||
handleMessage(data, done) | ||
}); | ||
function handleMessage(data, done) { | ||
const handleMessage = (data, done) => { | ||
console.log('CONSUMER:'); | ||
console.log(data); | ||
setTimeout(function () { | ||
setTimeout(() => { | ||
done() | ||
}, 100 + Math.random() * 500); | ||
} | ||
setTimeout(function () { | ||
const c = new ccpq.Consumer('test', (data, done) => { | ||
handleMessage(data, done) | ||
}); | ||
setTimeout(() => { | ||
c.shutdown(); | ||
}, 20000); | ||
}, 20000); |
@@ -5,13 +5,13 @@ 'use strict'; | ||
let p = new ccpq.Producer('test'); | ||
for (var i = 1; i <= 50; i++) { | ||
const p = new ccpq.Producer('test'); | ||
for (let i = 1; i <= 50; i++) { | ||
p.push({ name: 'a', num: i }); | ||
} | ||
p.size(function (size) { | ||
p.size((size) => { | ||
console.log(size) | ||
}); | ||
setTimeout(function () { | ||
setTimeout(() => { | ||
p.shutdown(); | ||
}, 2000); |
@@ -11,3 +11,3 @@ 'use strict'; | ||
class Queue { | ||
constructor (queueName, redisOptions) { | ||
constructor(queueName, redisOptions) { | ||
redisOptions = redisOptions || {}; | ||
@@ -21,3 +21,3 @@ this.redis = new Redis(redisOptions); | ||
size (callback) { | ||
size(callback) { | ||
this.redis.llen(this.queueKey, (err, size) => { | ||
@@ -28,3 +28,3 @@ callback(size); | ||
push (data) { | ||
push(data) { | ||
if (typeof data === 'object') { | ||
@@ -36,3 +36,3 @@ data = JSON.stringify(data) | ||
pop (callback) { | ||
pop(callback) { | ||
this.redis.brpop(this.queueKey, this.timeout, (err, replies) => { | ||
@@ -51,3 +51,3 @@ if (err) { | ||
purge (callback) { | ||
purge(callback) { | ||
this.redis.del(this.queueKey, (err, reply) => { | ||
@@ -60,3 +60,3 @@ if (callback && typeof callback === "function") { | ||
shutdown () { | ||
shutdown() { | ||
this.shuttingDown = true; | ||
@@ -78,3 +78,3 @@ debug('shutting down'); | ||
class Producer { | ||
constructor (queueName, redisOptions) { | ||
constructor(queueName, redisOptions) { | ||
redisOptions = redisOptions || {}; | ||
@@ -86,8 +86,8 @@ this.queueName = queueName; | ||
push (data) { | ||
push(data) { | ||
this.queue.push(data); | ||
} | ||
size (callback) { | ||
this.queue.size(function(size) { | ||
size(callback) { | ||
this.queue.size((size) => { | ||
callback(size); | ||
@@ -97,3 +97,3 @@ }) | ||
shutdown () { | ||
shutdown() { | ||
this.queue.shutdown(); | ||
@@ -108,3 +108,3 @@ } | ||
class Consumer { | ||
constructor (queueName, callback, redisOptions) { | ||
constructor(queueName, callback, redisOptions) { | ||
redisOptions = redisOptions || {}; | ||
@@ -118,6 +118,6 @@ this.queueName = queueName; | ||
process | ||
.once('SIGINT', function () { | ||
.once('SIGINT', () => { | ||
self.queue.shutdown() | ||
}) | ||
.once('SIGTERM', function () { | ||
.once('SIGTERM', () => { | ||
self.queue.shutdown() | ||
@@ -129,3 +129,3 @@ }); | ||
waitForNext (self) { | ||
waitForNext(self) { | ||
self.queue.pop((data) => { | ||
@@ -136,3 +136,3 @@ if (!self.queue.shuttingDown) { | ||
} else { | ||
self.callback(data, function() { | ||
self.callback(data, () => { | ||
self.done(); | ||
@@ -148,7 +148,7 @@ }) | ||
done () { | ||
process.nextTick(() => {this.waitForNext(this)}); | ||
done() { | ||
process.nextTick(() => { this.waitForNext(this) }); | ||
} | ||
shutdown () { | ||
shutdown() { | ||
this.queue.shutdown(); | ||
@@ -155,0 +155,0 @@ } |
{ | ||
"name": "redis-ccp-queue", | ||
"version": "1.0.5", | ||
"version": "1.1.0", | ||
"description": "A simple and lightweight Competing Consumers Pattern Queue", | ||
@@ -27,5 +27,5 @@ "license": "MIT", | ||
"dependencies": { | ||
"debug": "^2.2.0", | ||
"ioredis": "^1.15.1" | ||
"debug": "^4.1.0", | ||
"ioredis": "^4.14.0" | ||
} | ||
} |
# redis-ccp-queue | ||
A simple and lightweight Competing Consumers Pattern Queue. Built with [node.js][nodejs-url] and [Redis][redis-url] (just about 100 lines of code). | ||
A simple and lightweight Competing Consumers Pattern Queue. Built with [node.js][nodejs-url] and [Redis][redis-url] (just about 100 lines of code). | ||
@@ -21,3 +21,3 @@ [![NPM Version][npm-image]][npm-url] | ||
#### Create a new Producer | ||
#### Create a new Producer | ||
@@ -29,3 +29,3 @@ ```javascript | ||
let p = new ccpq.Producer('QueueKey'); // create new producer instance | ||
let p = new ccpq.Producer('QueueKey'); // create new producer instance | ||
@@ -38,3 +38,3 @@ ... | ||
#### Create a new Consumer | ||
#### Create a new Consumer | ||
@@ -48,8 +48,8 @@ When creating a `consumer` you pass a callback function, that is called every time, when a new message is available from the queue: | ||
let c = new ccpq.Consumer('QueueKey', function (data, done) { // create new consumer instance | ||
let c = new ccpq.Consumer('QueueKey', (data, done) => { // create new consumer instance | ||
handleMessage(data, done) | ||
}); | ||
// This is the function (callback) that actually handles the messages from the queue. | ||
// Put your worker-code inside this function. | ||
// This is the function (callback) that actually handles the messages from the queue. | ||
// Put your worker-code inside this function. | ||
function handleMessage(data, done) { | ||
@@ -62,5 +62,5 @@ console.log('CONSUMER:'); | ||
Within your callback, you need to call the done() function, to let the `consumer` know, that the you are finished with the | ||
message handling. The name of this function can be changed. just choose your preferred name and pass it as a second parameter | ||
name to the callback function. | ||
Within your callback, you need to call the done() function, to let the `consumer` know, that the you are finished with the | ||
message handling. The name of this function can be changed. just choose your preferred name and pass it as a second parameter | ||
name to the callback function. | ||
@@ -70,3 +70,3 @@ #### Examples | ||
I have attached a producer as well as a consumer in the `examples` folder. | ||
To see the queue in acton, open three command line instances. Be sure to have redis installed and running. | ||
To see the queue in acton, open three command line instances. Be sure to have redis installed and running. | ||
@@ -79,3 +79,3 @@ In the first command line window lauch the producer with | ||
Then in the second command line window launch the first consumer with | ||
Then in the second command line window launch the first consumer with | ||
@@ -86,3 +86,3 @@ ``` | ||
and immediately after then in the third command line window launch the second consumer with | ||
and immediately after then in the third command line window launch the second consumer with | ||
@@ -93,5 +93,5 @@ ``` | ||
You should now see both consumers pulling from the queue. Each message is only handled by one of the consumers. | ||
I used some `setTimeout` functions to simulate longer message handling. I also added a timeout to disconnect the | ||
consumers from Redis after a certain period. | ||
You should now see both consumers pulling from the queue. Each message is only handled by one of the consumers. | ||
I used some `setTimeout` functions to simulate longer message handling. I also added a timeout to disconnect the | ||
consumers from Redis after a certain period. | ||
@@ -101,9 +101,9 @@ | ||
The competing consumers pattern enables multiple consumers to pull and handle messages from the same queue, | ||
with the guarantee that each message is consumed once only. This pattern also allows multiple producers or | ||
senders to push messages to the same single queue. The queue is implemented as a FIFO (first in - first out queue): | ||
The competing consumers pattern enables multiple consumers to pull and handle messages from the same queue, | ||
with the guarantee that each message is consumed once only. This pattern also allows multiple producers or | ||
senders to push messages to the same single queue. The queue is implemented as a FIFO (first in - first out queue): | ||
``` | ||
Application instances Consumer service instances pool | ||
generating messages processing messages | ||
Application instances Consumer service instances pool | ||
generating messages processing messages | ||
@@ -119,9 +119,9 @@ | ||
-------------- ------------------------------------- -------------- | ||
-------------- ------------------------------------- -------------- | ||
| Producer 3 | ---> | ------ ------ ------ ------ | ---> | Consumer 2 | | ||
-------------- | | M5 | | M4 | | M3 | ....> | M1 | | -------------- | ||
| ------ ------ ------ ------ | | ||
. ------------------------------------- . | ||
. Message Queue (FIFO) . | ||
. . | ||
. ------------------------------------- . | ||
. Message Queue (FIFO) . | ||
. . | ||
@@ -162,3 +162,3 @@ -------------- -------------- | ||
#### *) Connect to Redis | ||
#### *) Connect to Redis | ||
When a new `Producer` or `Consumer` instance is created, | ||
@@ -192,8 +192,8 @@ a connection to Redis will be created at the same time. | ||
This is the initial version of this package. At the moment, it really does, what I expected is to do. | ||
But I am sure, there is quite a lot of room for improvement. I am happy to discuss any comments and suggestions. | ||
This is the initial version of this package. At the moment, it really does, what I expected is to do. | ||
But I am sure, there is quite a lot of room for improvement. I am happy to discuss any comments and suggestions. | ||
Please feel free to contact me if you see any possibility of improvement! | ||
For the next major version I plan to implement a more reliable version where an additional list is used to track messages in transit. | ||
If a processes fails to deliver with in a specified amount of time, an item could be moved back to the original queue for delivery. | ||
For the next major version I plan to implement a more reliable version where an additional list is used to track messages in transit. | ||
If a processes fails to deliver with in a specified amount of time, an item could be moved back to the original queue for delivery. | ||
@@ -204,2 +204,3 @@ ## Version history | ||
| -------------- | -------------- | -------- | | ||
| 1.1.0 | 2019-10-14 | documentation update, code cleanup, dependency bump | | ||
| 1.0.5 | 2016-03-03 | tiny bug fix | | ||
@@ -227,5 +228,5 @@ | 1.0.4 | 2016-03-03 | correct termination of consumer on SIGINT, SIGTERM | | ||
## Trademarks | ||
Node.js is a trademark of Joyent Inc., Redis, and the Redis logo are the trademarks of | ||
Salvatore Sanfilippo in the U.S. and other countries. Linux is a registered trademark of | ||
Node.js is a trademark of Joyent Inc., Redis, and the Redis logo are the trademarks of | ||
Salvatore Sanfilippo in the U.S. and other countries. Linux is a registered trademark of | ||
Linus Torvalds. All other trademarks are the property of their respective owners. | ||
@@ -237,3 +238,3 @@ | ||
> | ||
>Copyright © 2016 Sebastian Hildebrandt, [+innovations](http://www.plus-innovations.com). | ||
>Copyright © 2016-2019 Sebastian Hildebrandt, [+innovations](http://www.plus-innovations.com). | ||
> | ||
@@ -257,3 +258,3 @@ >Permission is hereby granted, free of charge, to any person obtaining a copy | ||
>THE SOFTWARE. | ||
> | ||
> | ||
>Further details see [LICENSE](LICENSE) file. | ||
@@ -260,0 +261,0 @@ |
Sorry, the diff of this file is not supported yet
150
0.67%257
0.39%15374
-3.78%6
-14.29%+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
+ Added
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
- Removed
Updated
Updated