Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rsmq-worker

Package Overview
Dependencies
Maintainers
1
Versions
21
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rsmq-worker

RSMQ helper to simply implement a worker around the message queue

  • 0.3.6
  • Source
  • npm
  • Socket score

Version published
Maintainers
1
Created
Source

RSMQ-Worker

Build Status Build Status NPM version

Join the chat at https://gitter.im/mpneuried/rsmq-worker

Helper to simply implement a worker RSMQ ( Redis Simple Message Queue ).

NPM

Install

  npm install rsmq-worker

Initialize

  new RSMQWorker( queuename, options );

Example:

  var RSMQWorker = require( "rsmq-worker" );
  var worker = new RSMQWorker( "myqueue" );

  worker.on( "message", function( msg, next ){
  	// process your message
  	next()
  });

  // optional error listeners
  worker.on('error', function( err, msg ){
      console.log( "ERROR", err, msg.id );
  });
  worker.on('exceeded', function( msg ){
      console.log( "EXCEEDED", msg.id );
  });
  worker.on('timeout', function( msg ){
      console.log( "TIMEOUT", msg.id, msg.rc );
  });

  worker.start();

Config

  • queuename: ( String required ) The queuename to pull the messages
  • options ( Object optional ) The configuration object
    • options.interval: ( Number[] optional; default = [ 0, 1, 5, 10 ] ) An Array of increasing wait times in seconds
    • options.maxReceiveCount: ( Number optional; default = 10 ) Receive count until a message will be exceeded
    • options.invisibletime: ( Number optional; default = 30 ) A time in seconds to hide a message after it has been received.
    • options.defaultDelay: ( Number optional; default = 1 ) The default delay in seconds for for sending new messages to the queue.
    • options.autostart: ( Boolean optional; default = false ) Autostart the worker on init
    • options.timeout: ( Number optional; default = 3000 ) Message processing timeout in ms. So you have to call the next() method of message at least after e.g. 3000ms. If set to 0 it'll wait until infinity.
    • options.customExceedCheck: ( Function optional; ) A custom function, with the raw message (see message format) as argument to build a custom exceed check. If you return a true the message will not exceed. On return false the regular check for maxReceiveCount will be used.
    • options.rsmq: ( RedisSMQ optional; default = null ) A already existing rsmq instance to use instead of creating a new client
    • options.redis: ( RedisClient optional; default = null ) A already existing redis client instance to use if no rsmq instance has been defined
    • options.redisPrefix: ( String optional; default = "" ) The redis prefix/namespace for rsmq if no rsmq instance has been defined. This has to match the option ns of RSMQ.
    • options.host: ( String optional; default = "localhost" ) Host to connect to redis if no rsmq or redis instance has been defined
    • options.port: ( Number optional; default = 6379 ) Port to connect to redis if no rsmq or redis instance has been defined
    • options.options: ( Object optional; default = {} ) Options to connect to redis if no rsmq or redis instance has been defined

Raw message format

A message ( e.g. received by the event data or customExceedCheck ) contains the following keys:

  • msg.message : ( String ) The queue message content. You can use complex content by using a stringified JSON.
  • msg.id : ( String ) The rsmq internal message id
  • msg.sent : ( Number ) Timestamp of when this message was sent / created.
  • msg.fr : ( Number ) Timestamp of when this message was first received.
  • msg.rc : ( Number ) Number of times this message was received.

Methods

.start()

If you haven't defined the config autostart to true you have to call the .start() method.

Return

( Self ): The instance itself for chaining.

.stop()

Stop the receive interval.

Return

( Self ): The instance itself for chaining.

.send( msg [, delay ][, cb ] )

Helper function to simply send a message in the configured queue.

Arguments

  • msg : ( String required ): The rsmq message. In best practice it's a stringified JSON with additional data.
  • delay : ( Number optional; default = 0 ): The message delay to hide this message for the next x seconds.
  • cb : ( Function optional ): An optional callback to get a secure response for a successful send.

Return

( Self ): The instance itself for chaining.

.del( id [, cb ] )

Helper function to simply delete a message after it has been processed.

Arguments

  • id : ( String required ): The rsmq message id.
  • cb : ( Function optional ): A optional callback to get a secure response for a successful delete.

Return

( Self ): The instance itself for chaining.

.changeInterval( interval )

Change the interval timeouts in operation.

Arguments

  • interval : ( Number|Array required ): The new interval.

Return

( Self ): The instance itself for chaining.

Events

message

Main event to catch and process a message. If you do not set a handler for this Event nothing will happen.

Example:

worker.on( "message", function( message, next, msgid ){
	// process message ... 
	next();
});

Arguments

  • message : ( String ) The queue message content to process. You can use complex content by using a stringified JSON.
  • next : ( Function ) A function you have to call when your message has been processed.
    Arguments
    • delete: ( Boolean|Error optional; default = true ) Error: If you return an error it will emitted as an error event; Boolean: It's possible to prevent the worker from auto-delete the message on end. This is useful if you want to pop up a message multiple times. To implement this, please check the config options.customExceedCheck
  • msgid : ( String ) The message id. This is useful if you want to delete a message manually.

ready

Fired until the worker is connected to rsmq/redis and has been initialized with the given queuename.

data

The raw event when a message has been received.

Arguments

  • msg : ( String ) The raw rsmq message. ( See section Raw message format )

deleted

Fired after a message has been deleted.

Arguments

  • id : ( String ) The rsmq message id

exceeded

Fired after a message has been exceeded and immediately will be deleted.

Arguments

  • msg : ( String ) The raw rsmq message. ( See section Raw message format )

timeout

Fired if a message processing exceeds the configured timeout.

Arguments

  • msg : ( String ) The raw rsmq message. ( See section Raw message format )

error

Fired if a message processing throws an error.

Arguments

  • err : ( Error|Any ) The thrown error
  • msg : ( String ) The raw rsmq message. ( See section Raw message format )

Advanced example

This is an advanced example showing some features in action.

	var fs = require( "fs" );
	var RSMQWorker = require( "rsmq-worker" );

	var fnCheck = function( msg ){
		// check function to not exceed the message if the content is `createmessages`
		if( msg.message === "createmessages" ){
			return true
		}
		return false
	}

	
	var worker = new RSMQWorker( "myqueue", {
		interval: [ .1, 1 ],				// wait 100ms between every receive and step up to 1,3 on empty receives
		invisibletime: 2,						// hide received message for 5 sec
		maxReceiveCount: 2,					// only receive a message 2 times until delete
		autostart: true,						// start worker on init
		customExceedCheck: fnCheck	// set the custom exceed check
	});

	// Listen to errors
	worker.on('error', function( err, msg ){
	    console.log( "ERROR", err, msg.id );
	});
	worker.on('exceeded', function( msg ){
	    console.log( "EXCEEDED", msg.id );
	});
	worker.on('timeout', function( msg ){
	    console.log( "TIMEOUT", msg.id, msg.rc );
	});

	//
	worker.on( "message", function( message, next, id ){
		
		console.log( "message", message );
		
		if( message === "createmessages" ){
			next( false )
			worker.send( JSON.stringify( { type: "writefile", filename: "./test.txt", txt: "Foo Bar" } ) );
			worker.send( JSON.stringify( { type: "deletefile", filename: "./test.txt" } ) );
			return	
		}

		var _data = JSON.parse( message )
		switch( _data.type ){
			case "writefile": 
				fs.writeFile( _data.filename, _data.txt, function( err ){
					if( err ){
						next( err );
					}else{
						next()
					}
				});
				break;
			case "deletefile": 
				fs.unlink( _data.filename, function( err ){
					if( err ){
						next( err );
					}else{
						next()
					}
				});
				break;
		}
		
	});

	worker.send( "createmessages" );

Todos/Ideas

  • MORE tests!
  • Optional parallel execution. To do multiple receives in parallel.

Release History

VersionDateDescription
0.3.62015-09-02Updated dependencies; optimized readme (thanks to Tobias Lidskog)
0.3.52015-04-27again ... fixed argument dispatch for .send()
0.3.42015-04-27fixed argument dispatch for .send() and added optional cb for .del()
0.3.32015-03-27added changeInterval to modify the interval in operation
0.3.22015-02-23changed default prefix/namespace;
0.3.02015-02-16It's now possible to return an error as first argument of next. This will lead to an error emit + optimized readme
0.2.22015-01-27added option defaultDelay and optimized arguments of the send method; fixed travis.yml
0.2.02015-01-27Added timeout, better error handling and send callback
0.1.22015-01-20Reorganized code, added code docs and optimized readme
0.1.12015-01-17Added test scripts and optimized repository file list
0.1.02015-01-16First working and documented version
0.0.12015-01-14Initial commit

NPM

Initially Generated with generator-mpnodemodule

Other projects

NameDescription
rsmqA really simple message queue based on Redis
rsmq-clia terminal client for rsmq
rest-rsmqREST interface for.
redis-notificationsA redis based notification engine. It implements the rsmq-worker to safely create notifications and recurring reports.
node-cacheSimple and fast NodeJS internal caching. Node internal in memory cache like memcached.
redis-sessionsAn advanced session store for NodeJS and Redis
obj-schemaSimple module to validate an object by a predefined schema
connect-redis-sessionsA connect or express middleware to simply use the redis sessions. With redis sessions you can handle multiple sessions per user_id.
systemhealthNode module to run simple custom checks for your machine or it's connections. It will use redis-heartbeat to send the current state to redis.
task-queue-workerA powerful tool for background processing of tasks that are run by making standard http requests.
soyerSoyer is small lib for serverside use of Google Closure Templates with node.js.
grunt-soy-compileCompile Goggle Closure Templates ( SOY ) templates inclding the handling of XLIFF language files.
backlunrA solution to bring Backbone Collections together with the browser fulltext search engine Lunr.js

The MIT License (MIT)

Copyright © 2015 Mathias Peter, http://www.tcs.de

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

FAQs

Package last updated on 02 Sep 2015

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc