kafka-node
Advanced tools
Comparing version 0.0.4 to 0.0.5
@@ -11,10 +11,7 @@ 'use strict'; | ||
var topics = [ | ||
{topic: 'topic2'}, | ||
{topic: 'topic1'}, | ||
{topic: 't2'}, | ||
{topic: 'topic3'} | ||
], | ||
options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 10000 }; | ||
options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 1000 }; | ||
function createConsumer() { | ||
function createConsumer(topics) { | ||
var consumer = new Consumer(client, topics, options); | ||
@@ -29,3 +26,2 @@ var offset = new Offset(client); | ||
consumer.on('offsetOutOfRange', function (topic) { | ||
console.log(topic); | ||
topic.maxNum = 2; | ||
@@ -39,2 +35,2 @@ offset.fetch([topic], function (err, offsets) { | ||
createConsumer(); | ||
createConsumer(topics); |
@@ -6,3 +6,4 @@ 'use strict'; | ||
var client = new Client(); | ||
var total = 10000; | ||
var total = 50000; | ||
var assert = require('assert'); | ||
var count = 0; | ||
@@ -9,0 +10,0 @@ |
@@ -6,2 +6,5 @@ var kafka = require('../kafka'), | ||
var argv = require('optimist').argv; | ||
var topic = argv.topic || 'topic1'; | ||
var producer = new Producer(client); | ||
@@ -30,3 +33,4 @@ | ||
producer.on('ready', function () { | ||
setInterval(send, 1000); | ||
//setInterval(send, 1000); | ||
send(); | ||
}); | ||
@@ -37,10 +41,8 @@ | ||
producer.send([ | ||
{topic: 'topic1', messages: ['777777777777777' + 1 + 'coolmessage'] }, | ||
{topic: 'topic2', messages: ['777777777777777' + 2 + 'coolmessage'] } | ||
{topic: topic, messages: ['777777777777777' + 2 + 'coolmessage'] } | ||
], function (err, data) { | ||
if (err) console.log(arguments); | ||
else console.log(data); | ||
//if (++rets === count) process.exit(); | ||
if (++rets === count) process.exit(); | ||
}); | ||
} | ||
} |
@@ -16,3 +16,2 @@ 'use strict'; | ||
this.brokers = {} | ||
this.longPollingBrokers = {}; | ||
this.topicMetadata = {}; | ||
@@ -42,5 +41,3 @@ this.topicPartitions = {}; | ||
self.refreshBrokers(brokerMetadata); | ||
//setTimeout(function () { | ||
self.emit('brokersChanged'); | ||
//}, 5000); | ||
self.emit('brokersChanged'); | ||
}); | ||
@@ -77,3 +74,3 @@ } | ||
} | ||
}, consumer.longpolling); | ||
}); | ||
} | ||
@@ -179,3 +176,2 @@ | ||
delete this.brokers[deadKey]; | ||
delete this.longPollingBrokers[deadKey]; | ||
}.bind(this)); | ||
@@ -196,3 +192,3 @@ } | ||
Client.prototype.send = function (payloads, encoder, decoder, cb, longPolling) { | ||
Client.prototype.send = function (payloads, encoder, decoder, cb) { | ||
var self = this; | ||
@@ -202,3 +198,3 @@ // payloads: [ [metadata exists], [metadta not exists] ] | ||
if (payloads[0].length && !payloads[1].length) { | ||
this.sendToBroker(_.flatten(payloads), encoder, decoder, cb, longPolling); | ||
this.sendToBroker(_.flatten(payloads), encoder, decoder, cb); | ||
return; | ||
@@ -213,3 +209,3 @@ } | ||
self.updateMetadatas(resp); | ||
self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb, longPolling); | ||
self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb); | ||
}); | ||
@@ -219,3 +215,3 @@ } | ||
Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb, longPolling) { | ||
Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) { | ||
payloads = this.payloadsByLeader(payloads); | ||
@@ -225,3 +221,3 @@ for (var leader in payloads) { | ||
var request = encoder.call(null, this.clientId, correlationId, payloads[leader]); | ||
var broker = this.brokerForLeader(leader, longPolling); | ||
var broker = this.brokerForLeader(leader); | ||
if (broker.error) return cb('Leader not available', payloads[leader]); | ||
@@ -280,4 +276,4 @@ this.cbqueue[correlationId] = [decoder, cb]; | ||
Client.prototype.brokerForLeader = function (leader, longPolling) { | ||
var brokers = longPolling ? this.longPollingBrokers : this.brokers; | ||
Client.prototype.brokerForLeader = function (leader) { | ||
var brokers = this.brokers; | ||
// If leader is not give, choose the first broker as leader | ||
@@ -331,3 +327,2 @@ if (typeof leader === 'undefined') { | ||
if(s.retrying) return; | ||
console.log('retry', s.addr) | ||
s.retrying = true; | ||
@@ -334,0 +329,0 @@ s.error = true; |
@@ -38,3 +38,2 @@ 'use strict'; | ||
this.id = nextId(); | ||
this.longpolling = false; | ||
this.payloads = this.buildPayloads(topics); | ||
@@ -125,12 +124,7 @@ this.connect(); | ||
Consumer.prototype.updateOffsets = function (topics) { | ||
var offline = !this.longpolling; | ||
this.payloads.forEach(function (p) { | ||
if (!_.isEmpty(topics[p.topic])) { | ||
var offset = topics[p.topic][p.partition]; | ||
p.offset = offset + 1; | ||
offline = (offset < p.offlineOffset); | ||
} | ||
if (!_.isEmpty(topics[p.topic])) | ||
p.offset = topics[p.topic][p.partition] + 1; | ||
}); | ||
this.longpolling = !offline; | ||
if (this.options.autoCommit) this.autoCommit(); | ||
@@ -160,13 +154,3 @@ } | ||
if (!this.ready) return; | ||
var maxBytes = null, | ||
fetchMaxWaitMs = this.options.fetchMaxWaitMs, | ||
payloads = this.payloads; | ||
if (!this.longpolling) { | ||
maxBytes = 1024*1024; | ||
fetchMaxWaitMs = 100; | ||
payloads = this.payloads.map(function (p) { return _.defaults({ maxBytes: maxBytes }, p) }); | ||
} | ||
this.client.sendFetchRequest(this, payloads, fetchMaxWaitMs, this.options.fetchMinBytes); | ||
this.client.sendFetchRequest(this, this.payloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes); | ||
} | ||
@@ -173,0 +157,0 @@ |
@@ -67,3 +67,3 @@ 'use strict'; | ||
topics = typeof topic === 'string' ? [topics] : topics; | ||
if (typeof async === 'function' || typeof async === 'undefined') { | ||
if (typeof async === 'function' && typeof cb === 'undefined') { | ||
cb = async; | ||
@@ -70,0 +70,0 @@ async = true; |
{ | ||
"name": "kafka-node", | ||
"description": "node client for Apache kafka, only support kafka 0.8 and above", | ||
"version": "0.0.4", | ||
"version": "0.0.5", | ||
"main": "kafka.js", | ||
@@ -16,3 +16,4 @@ "dependencies": { | ||
"should": "~1.2.2", | ||
"line-by-line": "~0.1.1" | ||
"line-by-line": "~0.1.1", | ||
"optimist": "~0.6.0" | ||
}, | ||
@@ -19,0 +20,0 @@ "repository": { |
'use strict'; | ||
var Consumer = require('../lib/consumer'), | ||
Producer = require('../lib/producer'), | ||
Offset = require('../lib/offset'), | ||
Client = require('../lib/client'); | ||
var libPath = process.env['KAFKA_COV'] ? '../lib-cov/' : '../lib/', | ||
Consumer = require(libPath + 'consumer'), | ||
Producer = require(libPath + 'producer'), | ||
Offset = require(libPath + 'offset'), | ||
Client = require(libPath + 'client'); | ||
@@ -12,2 +13,10 @@ var client, consumer, producer, offset; | ||
function offsetOutOfRange (topic, consumer) { | ||
topic.maxNum = 2; | ||
offset.fetch([topic], function (err, offsets) { | ||
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); | ||
consumer.setOffset(topic.topic, topic.partition, min); | ||
}); | ||
} | ||
before(function (done) { | ||
@@ -19,3 +28,5 @@ client = new Client(); | ||
producer.createTopics(['_exist_topic_1_test', '_exist_topic_2_test'], false, function (err, created) { | ||
producer.send([{ topic: '_exist_topic_2_test', messages: 'hello kafka' }], function (err) { | ||
producer.send([ | ||
{ topic: '_exist_topic_2_test', messages: 'hello kafka' } | ||
], function (err) { | ||
done(err); | ||
@@ -28,2 +39,3 @@ }); | ||
describe('Consumer', function () { | ||
describe('events', function () { | ||
@@ -36,2 +48,5 @@ it ('should emit message when get new message', function (done) { | ||
consumer.on('error', noop); | ||
consumer.on('offsetOutOfRange', function (topic) { | ||
offsetOutOfRange.call(null, topic, this); | ||
}); | ||
consumer.on('message', function (message) { | ||
@@ -134,6 +149,11 @@ message.topic.should.equal('_exist_topic_2_test'); | ||
it('should commit offset of current topics', function (done) { | ||
var options = { autoCommit: true, groupId: '_groupId_commit_test' }, | ||
topics = [{ topic: '_exist_topic_2_test' }]; | ||
var topics = [ { topic: '_exist_topic_2_test' } ], | ||
options = { autoCommit: false, groupId: '_groupId_commit_test' }; | ||
var consumer = new Consumer(client, topics, options); | ||
var count = 0; | ||
consumer.on('error', noop); | ||
consumer.on('offsetOutOfRange', function (topic) { | ||
offsetOutOfRange.call(null, topic, this); | ||
}); | ||
consumer.on('message', function (message) { | ||
@@ -144,2 +164,3 @@ consumer.commit(true, function (err) { | ||
}); | ||
}); | ||
@@ -146,0 +167,0 @@ }); |
'use strict'; | ||
var Offset = require('../lib/offset'), | ||
Producer = require('../lib/producer'), | ||
Client = require('../lib/client'); | ||
var libPath = process.env['kafka-cov'] ? '../lib-cov/' : '../lib/', | ||
Producer = require(libPath + 'producer'), | ||
Offset = require(libPath + 'offset'), | ||
Client = require(libPath + 'client'); | ||
@@ -7,0 +8,0 @@ var client, producer, offset; |
@@ -8,2 +8,7 @@ 'use strict'; | ||
// Helper method | ||
function randomId () { | ||
return Math.floor(Math.random() * 10000) | ||
} | ||
before(function (done) { | ||
@@ -38,3 +43,3 @@ client = new Client(); | ||
it('should return All requests sent when async is true', function (done) { | ||
producer.createTopics(['_exist_topic_4_test'], function (err, data) { | ||
producer.createTopics(['_exist_topic_'+ randomId() +'_test'], true, function (err, data) { | ||
data.should.equal('All requests sent'); | ||
@@ -45,4 +50,11 @@ done(err); | ||
it('async should be true if not present', function (done) { | ||
producer.createTopics(['_exist_topic_'+ randomId() +'_test'], function (err, data) { | ||
data.should.equal('All requests sent'); | ||
done(err); | ||
}); | ||
}); | ||
it('should return All created when async is false', function (done) { | ||
producer.createTopics(['_exist_topic_4_test'], false, function (err, data) { | ||
producer.createTopics(['_exist_topic_'+ randomId() +'_test'], false, function (err, data) { | ||
data.should.equal('All created'); | ||
@@ -49,0 +61,0 @@ done(err); |
'use strict'; | ||
var Zookeeper = require('../lib/zookeeper'); | ||
var libPath = process.env['kafka-cov'] ? '../lib-cov/' : '../lib/', | ||
Zookeeper = require(libPath + 'zookeeper'); | ||
var zk; | ||
@@ -5,0 +7,0 @@ |
Sorry, the diff of this file is not supported yet
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 2 instances in 1 package
79044
4
31
1974
12