Security News
Node.js EOL Versions CVE Dubbed the "Worst CVE of the Year" by Security Experts
Critics call the Node.js EOL CVE a misuse of the system, sparking debate over CVE standards and the growing noise in vulnerability databases.
node-queue-bus
Advanced tools
Node-Queue-Bus is a plugin for Resque (we use node-resque for this project) which transforms Resque into a distributed message bus. This allows more than one application to share workloads on the bus, and for events/jobs to fan out to more than one application. Perhaps you want the user_created
event to be consumed by the analytics
and email
applications... then QueueBus is for you.
This application is a port of the main ruby project to node.js. This project is likley to be behind the Ruby project. However, this package aims to be 100% compatible with the Ruby version, and can fill a driver, rider, or scheduler role within that same ecosystm (more information below).
The publishing application sends an event (bus.publish()
). This event is entered into an incomming
resque queue. The Rider/Driver then inspects subscriptions
and sends the event over to queues for application which have registered interest in the event. Then, finally, workers for those events consume the jobs in a normal resque way. You can also delay a publication bus.publishAt()
, and in that case, a Scheduler is required to coordinate.
We use the 'bus' object to both subscribe and publish events. It should be passed your connectionDetails
and optionally jobs
(see node-resque for more information about connectionDetails
and jobs
)
See the Examples to learn more
bus.subscriptions(callback(error, subsciptions, count))
bus.subscribe(callback(appKey, priority, job, matcher, callback(error, queue_name)))
bus.unsubscribe(callback(appKey, priority, job, callback))
bus.unsubscribeAll(callback(appKey, callback))
bus.publish(bus_event_type, args, callback(error, toRun))
bus.publishAt(timestamp, bus_event_type, args, callback(error, toRun))
bus.publishIn(delay, bus_event_type, args, callback(error, toRun))
bus.publishHeartbeat(callback(error))
You need to be certain that only one process within your ecosystem is running bus.publishHeartbeat
at once. To do this, you can key into the scheduler's master status like so:
var scheduler = new SchedulerPrototype({connection: connectionDetails});
scheduler.connect(function(){
scheduler.start();
setInterval(function(){
if(scheduler.master){
console.log('enqueue heartbeat');
bus.publishHeartbeat();
}
}, 1000 * 60);
});
Keep in mind that if you use publishAt
or publishIn
, you will need to have a Scheduler
running in your ecosystem.
You can use a single string { bus_event_type: 'add' }
or regex { bus_event_type : /^.*add.*/ }
to match events in your subscription. There are also special keys you can use:
bus_special_value_key
(key is not null)bus_special_value_blank
(key exists but is blank (''))bus_special_value_empty
(key is null)bus_special_value_nil
(key is undefined)bus_special_value_value
(key is not null)bus_special_value_present
(key is not null and has a length > 0)For example, a matcher like { first_name: 'bus_special_value_present' }
would match all events where the field 'first_name' is present.
error
and worker_queue
to the callback
. You can then use worker_queue
to tell your worker what queues to work.
appKey
will always be lowercased, and all spaces will be replaced with "_"
Running a rider is just like running a normal resque worker with the added bonus that it will also work the incomming queue to fan out jobs when its primary queues are empty. The rider takes the same inputs as a worker, and simply appends some special jobs to handle the fan-out and bus queues.
var rider = new RiderPrototype({connection: connectionDetails, toDrive: true}, jobs);
rider.connect(function(){
rider.workerCleanup(); // optional: cleanup any previous improperly shutdown workers
rider.start();
});
You can configure some additional options as well (see the resque-bus project for more information):
options = {
looping: true,
timeout: 5000,
name: os.hostname() + ":" + process.pid
}
When an event is passed though the entire bus, metadata is added by both the publisher and the rider. You can overwrite any of these fields in your attributes
hash. A simple event like bus.publish('add', {a: 5, b: 10})
will yeild:
{
"class":"remoteEventSubtract",
"queue":"exampleapp_default",
"args":{
"bus_driven_at":1392251619,
"bus_rider_queue":"exampleapp_default",
"bus_rider_app_key":"exampleapp",
"bus_rider_sub_key":"exampleapp_default_remoteEventSubtract",
"bus_rider_class_name":"remoteEventSubtract",
"bus_event_type":"subtract",
"bus_published_at":1392251614,
"bus_id":"1392251614-6a3d3a00-9cde-4877-9e53-9c580a0caa07",
"bus_app_hostname":"OpsimusPrime.local",
"a":5,
"b":10
}
}
You can see a full example with workers, riders, and schedulers in /examples/example.js
FAQs
an implementation of queuebus in node
The npm package node-queue-bus receives a total of 14 weekly downloads. As such, node-queue-bus popularity was classified as not popular.
We found that node-queue-bus demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 4 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Critics call the Node.js EOL CVE a misuse of the system, sparking debate over CVE standards and the growing noise in vulnerability databases.
Security News
cURL and Go security teams are publicly rejecting CVSS as flawed for assessing vulnerabilities and are calling for more accurate, context-aware approaches.
Security News
Bun 1.2 enhances its JavaScript runtime with 90% Node.js compatibility, built-in S3 and Postgres support, HTML Imports, and faster, cloud-first performance.