🚀 Big News: Socket Acquires Coana to Bring Reachability Analysis to Every Appsec Team.Learn more
Socket
DemoInstallSign in
Socket

queue-schedule

Package Overview
Dependencies
Maintainers
1
Versions
30
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

queue-schedule

Queue producer and consumer tools, with kafka support.

3.0.6
latest
Source
npm
Version published
Weekly downloads
6
-81.25%
Maintainers
1
Weekly downloads
 
Created
Source

Queue Shedule

NPM version Build status Test coverage

Kafka is a high avaliable message queue, but it lacks of consuming message with a slow speed. Some of task with no need to finish it at none, and we want to complete it with a small cost. This is just the reason why we develop Queue Shedule.

Install

npm install queue-schedule

How to use

Use rdkafka

const Kafka = require('node-rdkafka');
const {RdKafkaProducer,RdKafkaConsumer} = require('queue-schedule');
const producerRd = new Kafka.HighLevelProducer({
    'metadata.broker.list': KAFKA_HOST,
    'linger.ms':0.1,
    'queue.buffering.max.ms': 500,
    'queue.buffering.max.messages':1000,
    // debug: 'all'
});
producerRd.on('event.error',function(err) {
    slogger.error('producer error');
});
producerRd.on('event.log',function(log) {
    slogger.debug('producer log',log);
});
const producer = new RdKafkaProducer({
    name : SCHEDULE_NAME1,
    topic: TOPIC_NAME1,
    producer:producerRd,
    delayInterval: 500
});
producer.addData(FIST_DATA, {},function(err) {
    if (err) {
        slogger.error('write to queue error',err);
        return done('write to queue error');
    }
    slogger.info('write to kafka finished');
});


const consumer = new Kafka.KafkaConsumer({
    'metadata.broker.list': KAFKA_HOST,
    'group.id': 'test-rdkafka-0',
    'auto.offset.reset':'earliest',
    'socket.keepalive.enable': true,
    'socket.nagle.disable': true,
    'enable.auto.commit': true,
    'fetch.wait.max.ms': 5,
    'fetch.error.backoff.ms': 5,
    'queued.max.messages.kbytes': 1024 * 10,
    debug:'all'
});
let hasDone = false;
new RdKafkaConsumer({
    name: 'kafka',
    consumer,
    topics: [ TOPIC_NAME1],
    
    doTask:function(messages,callback) {
        slogger.trace(messages);
    },
    readCount : 1,
    pauseTime : 500,
    idleCheckInter: 10 * 1000
}).on(RdKafkaConsumer.EVENT_CONSUMER_ERROR,function(err) {
    slogger.error('consumer error',err);
    hasDone = true;
    done(err);
}).on(RdKafkaConsumer.EVENT_CLIENT_READY,function() {
    slogger.trace('the consumer client is ready');
    
}).on(RdKafkaConsumer.EVENT_LOG,function(log) {
    // slogger.trace(JSON.stringify(log));
});

Using kafkajs

const { Kafka } = require('kafkajs');
const {KafkaJsProducer,KafkaJsConsumer} = require('queue-schedule');

const FIST_DATA = {a:1,b:2};
const SCHEDULE_NAME1 = 'schedule1';
const TOPIC_NAME1 = 'topic.kafkajs';
const client =  new Kafka({
    brokers: ['xxxx', 'yyyy']
});

const producer = new KafkaJsProducer({
    topic: TOPIC_NAME1,
    client,
});
producer.addData(FIST_DATA, {},function(err) {
    if (err) {
        console.error('write to queue error',err);
        return;
    }
    console.info('write to kafka finished');
});
producer.on(KafkaJsProducer.EVENT_PRODUCER_ERROR, function(err) {
    console.error('error in consumer', err);
});

new KafkaJsConsumer({
    name: 'kafka',
    client,
    topic: TOPIC_NAME1,
    consumerOption: {
        groupId: 'kafkajs',
        fromBeginning: true
    },
    doTask:function(messages,callback) {
        console.log(messages);
        const value = messages[0].value;//read the first value
        let data = null;
        try {
            data = JSON.parse(value);
            console.log('recieve data',data);
        } catch (e) {
            console.error('parse message error',e);
        }

        callback();//the next loop
    },
    readCount : 1,
    pauseTime : 500,
    idleCheckInter: 10 * 1000
}).on(KafkaJsConsumer.EVENT_CONSUMER_ERROR,function(err) {
    console.error('consumer error',err);
    hasDone = true;
    done(err);
}).on(KafkaJsConsumer.EVENT_CONSUMER_READY,function() {
    console.log('the consumer is ready');
});

API

For detail usage, see the document online here.

License

MIT

Keywords

queue

FAQs

Package last updated on 25 Feb 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts