🚀 DAY 5 OF LAUNCH WEEK: Introducing Socket Firewall Enterprise.Learn more →
Socket
Book a DemoInstallSign in
Socket

@hkube/producer-consumer

Package Overview
Dependencies
Maintainers
6
Versions
47
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@hkube/producer-consumer

producer consumer message queue based on Redis

latest
npmnpm
Version
1.0.55
Version published
Maintainers
6
Created
Source

Producer consumer

Build Status Coverage Status

producer consumer message queue based on Redis built for Node.js

Installation

$ npm install @hkube/producer-consumer

Basic usage

Producer


const { Producer } = require('@hkube/producer-consumer');
const options = {
    job: {
        type: 'test-job',
        data: { action: 'bla' },
    }
}
const producer = new Producer(options);
const job = await producer.createJob(options);

Consumer


const { Consumer } = require('@hkube/producer-consumer');
const options = {
    job: {
        type: 'test-job'
    }
}
const consumer = new Consumer(options);
consumer.on('job', (job) => {
    // do some work...
    job.done(null, {result: true}); // success

    // or
    job.done(new Error('oopps..')); // failed
});
consumer.register(options);

Schema

The createJob method will validate the options against the schema

const schema = {
    "properties": {
        "job": {
            "type": "object",
            "properties": {
                "type": {
                    "type": "string",
                    "description": "the job type"
                },
                "waitingTimeout": {
                    "type": "integer",
                    "description": "time wait before the job is active/failed/completed"
                },
                "resolveOnStart": {
                    "type": "boolean",
                    "description": "should resolve when the job is in active state"
                },
                "resolveOnComplete": {
                    "type": "boolean",
                    "description": "should resolve when the job is in completed state"
                }
            }
        },
        "queue": {
            "type": "object",
            "properties": {
                "priority": {
                    "type": "integer",
                    "description": "ranges from 1 (highest) to MAX_INT"
                },
                "delay": {
                    "type": "integer",
                    "description": "miliseconds to wait until this job can be processed."
                },
                "timeout": {
                    "type": "integer",
                    "description": "milliseconds after which the job should be fail with a timeout error"
                },
                "attempts": {
                    "type": "integer",
                    "description": "total number of attempts to try the job until it completes"
                },
                "removeOnComplete": {
                    "type": "boolean",
                    "description": "If true, removes the job when it successfully completes",
                    "default": false
                },
                "removeOnFail": {
                    "type": "boolean",
                    "description": "If true, removes the job when it fails after all attempts",
                    "default": false
                }
            }
        },
        "setting": {
            "type": "object",
            "properties": {
                "prefix": {
                    "type": "string",
                    "default": "queue",
                    "description": "prefix for all queue keys"
                },
                "redis": {
                    "type": "object",
                    "properties": {
                        "host": {
                            "type": "string",
                            "default": "localhost"
                        },
                        "port": {
                            "type": "integer",
                            "default": 6379
                        }
                    }
                }
            }
        }
    }
}

Events

const { Producer } = require('@hkube/producer-consumer');
producer.on('job-failed', (jobId, err) => {       
}).on('job-completed', (jobId, result) => {           
}).on('job-active', (jobId) => {             
});
producer.createJob(options);

const options = {
    job: {
        type: 'test-job',
        data: { action: 'bla' },
    }
}
const producer = new Producer(options);
const job = await producer.createJob(options);

Full Detailed Example

const { producer } = require('@hkube/producer-consumer');
const options = {
    job: {
        resolveOnStart: false,
        resolveOnComplete: false,
        type: 'test-job',
        data: { action: 'bla' },
        waitingTimeout: 5000
    },
    queue: {
        priority: 1,
        delay: 1000,
        timeout: 5000,
        attempts: 3,
        removeOnComplete: true,
        removeOnFail: false
    },
    setting: {
        prefix: 'sf-queue',
        redis: {
            host: '127.0.0.1',
            port: 6379,
            cluster: true,
            sentinel: false
        }
    }
}

const job = await producer.createJob(options);

License

MIT

FAQs

Package last updated on 31 Jul 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts