
Research
/Security News
Shai Hulud Strikes Again (v2)
Another wave of Shai-Hulud campaign has hit npm with more than 500 packages and 700+ versions affected.
@hkube/producer-consumer
Advanced tools
producer consumer message queue based on Redis built for Node.js
$ npm install @hkube/producer-consumer
const { Producer } = require('@hkube/producer-consumer');
const options = {
job: {
type: 'test-job',
data: { action: 'bla' },
}
}
const producer = new Producer(options);
const job = await producer.createJob(options);
const { Consumer } = require('@hkube/producer-consumer');
const options = {
job: {
type: 'test-job'
}
}
const consumer = new Consumer(options);
consumer.on('job', (job) => {
// do some work...
job.done(null, {result: true}); // success
// or
job.done(new Error('oopps..')); // failed
});
consumer.register(options);
The createJob method will validate the options against the schema
const schema = {
"properties": {
"job": {
"type": "object",
"properties": {
"type": {
"type": "string",
"description": "the job type"
},
"waitingTimeout": {
"type": "integer",
"description": "time wait before the job is active/failed/completed"
},
"resolveOnStart": {
"type": "boolean",
"description": "should resolve when the job is in active state"
},
"resolveOnComplete": {
"type": "boolean",
"description": "should resolve when the job is in completed state"
}
}
},
"queue": {
"type": "object",
"properties": {
"priority": {
"type": "integer",
"description": "ranges from 1 (highest) to MAX_INT"
},
"delay": {
"type": "integer",
"description": "miliseconds to wait until this job can be processed."
},
"timeout": {
"type": "integer",
"description": "milliseconds after which the job should be fail with a timeout error"
},
"attempts": {
"type": "integer",
"description": "total number of attempts to try the job until it completes"
},
"removeOnComplete": {
"type": "boolean",
"description": "If true, removes the job when it successfully completes",
"default": false
},
"removeOnFail": {
"type": "boolean",
"description": "If true, removes the job when it fails after all attempts",
"default": false
}
}
},
"setting": {
"type": "object",
"properties": {
"prefix": {
"type": "string",
"default": "queue",
"description": "prefix for all queue keys"
},
"redis": {
"type": "object",
"properties": {
"host": {
"type": "string",
"default": "localhost"
},
"port": {
"type": "integer",
"default": 6379
}
}
}
}
}
}
}
const { Producer } = require('@hkube/producer-consumer');
producer.on('job-failed', (jobId, err) => {
}).on('job-completed', (jobId, result) => {
}).on('job-active', (jobId) => {
});
producer.createJob(options);
const options = {
job: {
type: 'test-job',
data: { action: 'bla' },
}
}
const producer = new Producer(options);
const job = await producer.createJob(options);
const { producer } = require('@hkube/producer-consumer');
const options = {
job: {
resolveOnStart: false,
resolveOnComplete: false,
type: 'test-job',
data: { action: 'bla' },
waitingTimeout: 5000
},
queue: {
priority: 1,
delay: 1000,
timeout: 5000,
attempts: 3,
removeOnComplete: true,
removeOnFail: false
},
setting: {
prefix: 'sf-queue',
redis: {
host: '127.0.0.1',
port: 6379,
cluster: true,
sentinel: false
}
}
}
const job = await producer.createJob(options);
FAQs
producer consumer message queue based on Redis
We found that @hkube/producer-consumer demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 6 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Research
/Security News
Another wave of Shai-Hulud campaign has hit npm with more than 500 packages and 700+ versions affected.

Product
Add real-time Socket webhook events to your workflows to automatically receive software supply chain alert changes in real time.

Security News
ENISA has become a CVE Program Root, giving the EU a central authority for coordinating vulnerability reporting, disclosure, and cross-border response.