
Security News
Deno 2.2 Improves Dependency Management and Expands Node.js Compatibility
Deno 2.2 enhances Node.js compatibility, improves dependency management, adds OpenTelemetry support, and expands linting and task automation for developers.
rsmq-worker
Advanced tools
Helper to simply implement a worker RSMQ ( Redis Simple Message Queue ).
npm install rsmq-worker
new RSMQWorker( queuename, options );
Example:
var RSMQWorker = require( "rsmq-worker" );
var worker = new RSMQWorker( "myqueue" );
worker.on( "message", function( msg, next ){
// process your message
next()
});
// optional error listeners
worker.on('error', function( err, msg ){
console.log( "ERROR", err, msg.id );
});
worker.on('exceeded', function( msg ){
console.log( "EXCEEDED", msg.id );
});
worker.on('timeout', function( msg ){
console.log( "TIMEOUT", msg.id, msg.rc );
});
worker.start();
Config
String
required ) The queuename to pull the messagesObject
optional ) The configuration object
Number[]
optional; default = [ 0, 1, 5, 10 ]
) An Array of increasing wait times in secondsNumber
optional; default = 10
) Receive count until a message will be exceededNumber
optional; default = 30
) A time in seconds to hide a message after it has been received.Number
optional; default = 1
) The default delay in seconds for for sending new messages to the queue.Boolean
optional; default = false
) Autostart the worker on initNumber
optional; default = 3000
) Message processing timeout in ms
. So you have to call the next()
method of message
at least after e.g. 3000ms. If set to 0
it'll wait until infinity.Function
optional; ) A custom function, with the raw message (see message format) as argument to build a custom exceed check. If you return a true
the message will not exceed. On return false
the regular check for maxReceiveCount
will be used.RedisSMQ
optional; default = null
) A already existing rsmq instance to use instead of creating a new clientRedisClient
optional; default = null
) A already existing redis client instance to use if no rsmq
instance has been definedString
optional; default = ""
) The redis prefix/namespace for rsmq if no rsmq
instance has been defined. This has to match the option ns
of RSMQ.String
optional; default = "localhost"
) Host to connect to redis if no rsmq
or redis
instance has been definedNumber
optional; default = 6379
) Port to connect to redis if no rsmq
or redis
instance has been definedObject
optional; default = {}
) Options to connect to redis if no rsmq
or redis
instance has been definedA message ( e.g. received by the event data
or customExceedCheck
) contains the following keys:
String
) The queue message content. You can use complex content by using a stringified JSON.String
) The rsmq internal message idNumber
) Timestamp of when this message was sent / created.Number
) Timestamp of when this message was first received.Number
) Number of times this message was received..start()
If you haven't defined the config autostart
to true
you have to call the .start()
method.
Return
( Self ): The instance itself for chaining.
.stop()
Stop the receive interval.
Return
( Self ): The instance itself for chaining.
.send( msg [, delay ][, cb ] )
Helper function to simply send a message in the configured queue.
Arguments
msg
: ( String
required ): The rsmq message. In best practice it's a stringified JSON with additional data.delay
: ( Number
optional; default = 0
): The message delay to hide this message for the next x
seconds.cb
: ( Function
optional ): An optional callback to get a secure response for a successful send.Return
( Self ): The instance itself for chaining.
.del( id [, cb ] )
Helper function to simply delete a message after it has been processed.
Arguments
id
: ( String
required ): The rsmq message id.cb
: ( Function
optional ): A optional callback to get a secure response for a successful delete.Return
( Self ): The instance itself for chaining.
.changeInterval( interval )
Change the interval timeouts in operation.
Arguments
interval
: ( Number|Array
required ): The new interval.Return
( Self ): The instance itself for chaining.
message
Main event to catch and process a message. If you do not set a handler for this Event nothing will happen.
Example:
worker.on( "message", function( message, next, msgid ){
// process message ...
next();
});
Arguments
String
) The queue message content to process. You can use complex content by using a stringified JSON.Function
) A function you have to call when your message has been processed.delete
: ( Boolean|Error
optional; default = true ) Error
: If you return an error it will emitted as an error event; Boolean
: It's possible to prevent the worker from auto-delete the message on end. This is useful if you want to pop up a message multiple times. To implement this, please check the config options.customExceedCheck
String
) The message id. This is useful if you want to delete a message manually.ready
Fired until the worker is connected to rsmq/redis and has been initialized with the given queuename.
data
The raw event when a message has been received.
Arguments
String
) The raw rsmq message. ( See section Raw message format )deleted
Fired after a message has been deleted.
Arguments
String
) The rsmq message idexceeded
Fired after a message has been exceeded and immediately will be deleted.
Arguments
String
) The raw rsmq message. ( See section Raw message format )timeout
Fired if a message processing exceeds the configured timeout.
Arguments
String
) The raw rsmq message. ( See section Raw message format )error
Fired if a message processing throws an error.
Arguments
Error|Any
) The thrown errorString
) The raw rsmq message. ( See section Raw message format )This is an advanced example showing some features in action.
var fs = require( "fs" );
var RSMQWorker = require( "rsmq-worker" );
var fnCheck = function( msg ){
// check function to not exceed the message if the content is `createmessages`
if( msg.message === "createmessages" ){
return true
}
return false
}
var worker = new RSMQWorker( "myqueue", {
interval: [ .1, 1 ], // wait 100ms between every receive and step up to 1,3 on empty receives
invisibletime: 2, // hide received message for 5 sec
maxReceiveCount: 2, // only receive a message 2 times until delete
autostart: true, // start worker on init
customExceedCheck: fnCheck // set the custom exceed check
});
// Listen to errors
worker.on('error', function( err, msg ){
console.log( "ERROR", err, msg.id );
});
worker.on('exceeded', function( msg ){
console.log( "EXCEEDED", msg.id );
});
worker.on('timeout', function( msg ){
console.log( "TIMEOUT", msg.id, msg.rc );
});
//
worker.on( "message", function( message, next, id ){
console.log( "message", message );
if( message === "createmessages" ){
next( false )
worker.send( JSON.stringify( { type: "writefile", filename: "./test.txt", txt: "Foo Bar" } ) );
worker.send( JSON.stringify( { type: "deletefile", filename: "./test.txt" } ) );
return
}
var _data = JSON.parse( message )
switch( _data.type ){
case "writefile":
fs.writeFile( _data.filename, _data.txt, function( err ){
if( err ){
next( err );
}else{
next()
}
});
break;
case "deletefile":
fs.unlink( _data.filename, function( err ){
if( err ){
next( err );
}else{
next()
}
});
break;
}
});
worker.send( "createmessages" );
Version | Date | Description |
---|---|---|
0.3.6 | 2015-09-02 | Updated dependencies; optimized readme (thanks to Tobias Lidskog) |
0.3.5 | 2015-04-27 | again ... fixed argument dispatch for .send() |
0.3.4 | 2015-04-27 | fixed argument dispatch for .send() and added optional cb for .del() |
0.3.3 | 2015-03-27 | added changeInterval to modify the interval in operation |
0.3.2 | 2015-02-23 | changed default prefix/namespace; |
0.3.0 | 2015-02-16 | It's now possible to return an error as first argument of next . This will lead to an error emit + optimized readme |
0.2.2 | 2015-01-27 | added option defaultDelay and optimized arguments of the send method; fixed travis.yml |
0.2.0 | 2015-01-27 | Added timeout, better error handling and send callback |
0.1.2 | 2015-01-20 | Reorganized code, added code docs and optimized readme |
0.1.1 | 2015-01-17 | Added test scripts and optimized repository file list |
0.1.0 | 2015-01-16 | First working and documented version |
0.0.1 | 2015-01-14 | Initial commit |
Initially Generated with generator-mpnodemodule
Name | Description |
---|---|
rsmq | A really simple message queue based on Redis |
rsmq-cli | a terminal client for rsmq |
rest-rsmq | REST interface for. |
redis-notifications | A redis based notification engine. It implements the rsmq-worker to safely create notifications and recurring reports. |
node-cache | Simple and fast NodeJS internal caching. Node internal in memory cache like memcached. |
redis-sessions | An advanced session store for NodeJS and Redis |
obj-schema | Simple module to validate an object by a predefined schema |
connect-redis-sessions | A connect or express middleware to simply use the redis sessions. With redis sessions you can handle multiple sessions per user_id. |
systemhealth | Node module to run simple custom checks for your machine or it's connections. It will use redis-heartbeat to send the current state to redis. |
task-queue-worker | A powerful tool for background processing of tasks that are run by making standard http requests. |
soyer | Soyer is small lib for serverside use of Google Closure Templates with node.js. |
grunt-soy-compile | Compile Goggle Closure Templates ( SOY ) templates inclding the handling of XLIFF language files. |
backlunr | A solution to bring Backbone Collections together with the browser fulltext search engine Lunr.js |
Copyright © 2015 Mathias Peter, http://www.tcs.de
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
FAQs
RSMQ helper to simply implement a worker around the message queue
The npm package rsmq-worker receives a total of 3,966 weekly downloads. As such, rsmq-worker popularity was classified as popular.
We found that rsmq-worker demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Deno 2.2 enhances Node.js compatibility, improves dependency management, adds OpenTelemetry support, and expands linting and task automation for developers.
Security News
React's CRA deprecation announcement sparked community criticism over framework recommendations, leading to quick updates acknowledging build tools like Vite as valid alternatives.
Security News
Ransomware payment rates hit an all-time low in 2024 as law enforcement crackdowns, stronger defenses, and shifting policies make attacks riskier and less profitable.