Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafka-node

Package Overview
Dependencies
Maintainers
1
Versions
113
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-node - npm Package Compare versions

Comparing version 0.0.4 to 0.0.5

10

example/consumer.js

@@ -11,10 +11,7 @@ 'use strict';

var topics = [
{topic: 'topic2'},
{topic: 'topic1'},
{topic: 't2'},
{topic: 'topic3'}
],
options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 10000 };
options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 1000 };
function createConsumer() {
function createConsumer(topics) {
var consumer = new Consumer(client, topics, options);

@@ -29,3 +26,2 @@ var offset = new Offset(client);

consumer.on('offsetOutOfRange', function (topic) {
console.log(topic);
topic.maxNum = 2;

@@ -39,2 +35,2 @@ offset.fetch([topic], function (err, offsets) {

createConsumer();
createConsumer(topics);

@@ -6,3 +6,4 @@ 'use strict';

var client = new Client();
var total = 10000;
var total = 50000;
var assert = require('assert');
var count = 0;

@@ -9,0 +10,0 @@

@@ -6,2 +6,5 @@ var kafka = require('../kafka'),

var argv = require('optimist').argv;
var topic = argv.topic || 'topic1';
var producer = new Producer(client);

@@ -30,3 +33,4 @@

producer.on('ready', function () {
setInterval(send, 1000);
//setInterval(send, 1000);
send();
});

@@ -37,10 +41,8 @@

producer.send([
{topic: 'topic1', messages: ['777777777777777' + 1 + 'coolmessage'] },
{topic: 'topic2', messages: ['777777777777777' + 2 + 'coolmessage'] }
{topic: topic, messages: ['777777777777777' + 2 + 'coolmessage'] }
], function (err, data) {
if (err) console.log(arguments);
else console.log(data);
//if (++rets === count) process.exit();
if (++rets === count) process.exit();
});
}
}

@@ -16,3 +16,2 @@ 'use strict';

this.brokers = {}
this.longPollingBrokers = {};
this.topicMetadata = {};

@@ -42,5 +41,3 @@ this.topicPartitions = {};

self.refreshBrokers(brokerMetadata);
//setTimeout(function () {
self.emit('brokersChanged');
//}, 5000);
self.emit('brokersChanged');
});

@@ -77,3 +74,3 @@ }

}
}, consumer.longpolling);
});
}

@@ -179,3 +176,2 @@

delete this.brokers[deadKey];
delete this.longPollingBrokers[deadKey];
}.bind(this));

@@ -196,3 +192,3 @@ }

Client.prototype.send = function (payloads, encoder, decoder, cb, longPolling) {
Client.prototype.send = function (payloads, encoder, decoder, cb) {
var self = this;

@@ -202,3 +198,3 @@ // payloads: [ [metadata exists], [metadta not exists] ]

if (payloads[0].length && !payloads[1].length) {
this.sendToBroker(_.flatten(payloads), encoder, decoder, cb, longPolling);
this.sendToBroker(_.flatten(payloads), encoder, decoder, cb);
return;

@@ -213,3 +209,3 @@ }

self.updateMetadatas(resp);
self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb, longPolling);
self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb);
});

@@ -219,3 +215,3 @@ }

Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb, longPolling) {
Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) {
payloads = this.payloadsByLeader(payloads);

@@ -225,3 +221,3 @@ for (var leader in payloads) {

var request = encoder.call(null, this.clientId, correlationId, payloads[leader]);
var broker = this.brokerForLeader(leader, longPolling);
var broker = this.brokerForLeader(leader);
if (broker.error) return cb('Leader not available', payloads[leader]);

@@ -280,4 +276,4 @@ this.cbqueue[correlationId] = [decoder, cb];

Client.prototype.brokerForLeader = function (leader, longPolling) {
var brokers = longPolling ? this.longPollingBrokers : this.brokers;
Client.prototype.brokerForLeader = function (leader) {
var brokers = this.brokers;
// If leader is not give, choose the first broker as leader

@@ -331,3 +327,2 @@ if (typeof leader === 'undefined') {

if(s.retrying) return;
console.log('retry', s.addr)
s.retrying = true;

@@ -334,0 +329,0 @@ s.error = true;

@@ -38,3 +38,2 @@ 'use strict';

this.id = nextId();
this.longpolling = false;
this.payloads = this.buildPayloads(topics);

@@ -125,12 +124,7 @@ this.connect();

Consumer.prototype.updateOffsets = function (topics) {
var offline = !this.longpolling;
this.payloads.forEach(function (p) {
if (!_.isEmpty(topics[p.topic])) {
var offset = topics[p.topic][p.partition];
p.offset = offset + 1;
offline = (offset < p.offlineOffset);
}
if (!_.isEmpty(topics[p.topic]))
p.offset = topics[p.topic][p.partition] + 1;
});
this.longpolling = !offline;
if (this.options.autoCommit) this.autoCommit();

@@ -160,13 +154,3 @@ }

if (!this.ready) return;
var maxBytes = null,
fetchMaxWaitMs = this.options.fetchMaxWaitMs,
payloads = this.payloads;
if (!this.longpolling) {
maxBytes = 1024*1024;
fetchMaxWaitMs = 100;
payloads = this.payloads.map(function (p) { return _.defaults({ maxBytes: maxBytes }, p) });
}
this.client.sendFetchRequest(this, payloads, fetchMaxWaitMs, this.options.fetchMinBytes);
this.client.sendFetchRequest(this, this.payloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes);
}

@@ -173,0 +157,0 @@

@@ -67,3 +67,3 @@ 'use strict';

topics = typeof topic === 'string' ? [topics] : topics;
if (typeof async === 'function' || typeof async === 'undefined') {
if (typeof async === 'function' && typeof cb === 'undefined') {
cb = async;

@@ -70,0 +70,0 @@ async = true;

{
"name": "kafka-node",
"description": "node client for Apache kafka, only support kafka 0.8 and above",
"version": "0.0.4",
"version": "0.0.5",
"main": "kafka.js",

@@ -16,3 +16,4 @@ "dependencies": {

"should": "~1.2.2",
"line-by-line": "~0.1.1"
"line-by-line": "~0.1.1",
"optimist": "~0.6.0"
},

@@ -19,0 +20,0 @@ "repository": {

'use strict';
var Consumer = require('../lib/consumer'),
Producer = require('../lib/producer'),
Offset = require('../lib/offset'),
Client = require('../lib/client');
var libPath = process.env['KAFKA_COV'] ? '../lib-cov/' : '../lib/',
Consumer = require(libPath + 'consumer'),
Producer = require(libPath + 'producer'),
Offset = require(libPath + 'offset'),
Client = require(libPath + 'client');

@@ -12,2 +13,10 @@ var client, consumer, producer, offset;

function offsetOutOfRange (topic, consumer) {
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
consumer.setOffset(topic.topic, topic.partition, min);
});
}
before(function (done) {

@@ -19,3 +28,5 @@ client = new Client();

producer.createTopics(['_exist_topic_1_test', '_exist_topic_2_test'], false, function (err, created) {
producer.send([{ topic: '_exist_topic_2_test', messages: 'hello kafka' }], function (err) {
producer.send([
{ topic: '_exist_topic_2_test', messages: 'hello kafka' }
], function (err) {
done(err);

@@ -28,2 +39,3 @@ });

describe('Consumer', function () {
describe('events', function () {

@@ -36,2 +48,5 @@ it ('should emit message when get new message', function (done) {

consumer.on('error', noop);
consumer.on('offsetOutOfRange', function (topic) {
offsetOutOfRange.call(null, topic, this);
});
consumer.on('message', function (message) {

@@ -134,6 +149,11 @@ message.topic.should.equal('_exist_topic_2_test');

it('should commit offset of current topics', function (done) {
var options = { autoCommit: true, groupId: '_groupId_commit_test' },
topics = [{ topic: '_exist_topic_2_test' }];
var topics = [ { topic: '_exist_topic_2_test' } ],
options = { autoCommit: false, groupId: '_groupId_commit_test' };
var consumer = new Consumer(client, topics, options);
var count = 0;
consumer.on('error', noop);
consumer.on('offsetOutOfRange', function (topic) {
offsetOutOfRange.call(null, topic, this);
});
consumer.on('message', function (message) {

@@ -144,2 +164,3 @@ consumer.commit(true, function (err) {

});
});

@@ -146,0 +167,0 @@ });

'use strict';
var Offset = require('../lib/offset'),
Producer = require('../lib/producer'),
Client = require('../lib/client');
var libPath = process.env['kafka-cov'] ? '../lib-cov/' : '../lib/',
Producer = require(libPath + 'producer'),
Offset = require(libPath + 'offset'),
Client = require(libPath + 'client');

@@ -7,0 +8,0 @@ var client, producer, offset;

@@ -8,2 +8,7 @@ 'use strict';

// Helper method
function randomId () {
return Math.floor(Math.random() * 10000)
}
before(function (done) {

@@ -38,3 +43,3 @@ client = new Client();

it('should return All requests sent when async is true', function (done) {
producer.createTopics(['_exist_topic_4_test'], function (err, data) {
producer.createTopics(['_exist_topic_'+ randomId() +'_test'], true, function (err, data) {
data.should.equal('All requests sent');

@@ -45,4 +50,11 @@ done(err);

it('async should be true if not present', function (done) {
producer.createTopics(['_exist_topic_'+ randomId() +'_test'], function (err, data) {
data.should.equal('All requests sent');
done(err);
});
});
it('should return All created when async is false', function (done) {
producer.createTopics(['_exist_topic_4_test'], false, function (err, data) {
producer.createTopics(['_exist_topic_'+ randomId() +'_test'], false, function (err, data) {
data.should.equal('All created');

@@ -49,0 +61,0 @@ done(err);

'use strict';
var Zookeeper = require('../lib/zookeeper');
var libPath = process.env['kafka-cov'] ? '../lib-cov/' : '../lib/',
Zookeeper = require(libPath + 'zookeeper');
var zk;

@@ -5,0 +7,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc