kafka-node
Advanced tools
Comparing version 0.2.3 to 0.2.4
@@ -95,3 +95,3 @@ 'use strict'; | ||
} | ||
if (type === 'message') { | ||
@@ -146,4 +146,4 @@ consumer.emit('message', value); | ||
// data: { topicName1: {}, topicName2: {} } | ||
if (err) return cb(err); | ||
_.merge(out, data); | ||
if (err) return cb && cb(err); | ||
_.merge(out, data); | ||
count -= 1; | ||
@@ -174,3 +174,3 @@ // Waiting for all request return | ||
this.cbqueue[correlationId] = [protocol.decodeMetadataResponse, cb]; | ||
broker && broker.write(request); | ||
broker && broker.write(request); | ||
} | ||
@@ -182,3 +182,3 @@ | ||
this.zk.topicExists(topic, function (existed, reply) { | ||
if (existed && ++created === topics.length) cb(null, topics.length); | ||
if (existed && ++created === topics.length) cb(null, topics.length); | ||
}, true); | ||
@@ -199,3 +199,3 @@ }.bind(this)); | ||
this.zk.topicExists(topic, function (existed, reply) { | ||
if (!existed) errors.push(reply); | ||
if (!existed) errors.push(reply); | ||
if (++count === topics.length) cb(errors.length, errors); | ||
@@ -374,3 +374,3 @@ }); | ||
} else if (!_.isEmpty(this.brokerMetadata)) { | ||
leader = Object.keys(this.brokerMetadata)[0]; | ||
leader = Object.keys(this.brokerMetadata)[0]; | ||
} else { | ||
@@ -380,3 +380,3 @@ this.emit('error', new errors.BrokerNotAvailableError('Could not find a broker')); | ||
} | ||
} | ||
} | ||
var metadata = this.brokerMetadata[leader], | ||
@@ -445,3 +445,3 @@ addr = metadata.host + ':' + metadata.port; | ||
if (socket.buffer.length) | ||
if (socket.buffer.length) | ||
setImmediate(function () { this.handleReceivedData(socket);}.bind(this)); | ||
@@ -448,0 +448,0 @@ } |
@@ -486,7 +486,7 @@ 'use strict'; | ||
function decodeOffsets (end, vars) { | ||
if (--vars.offsetNum === 0) end(); | ||
if (--vars.offsetNum <= 0) end(); | ||
topics[vars.topic][vars.partition] = topics[vars.topic][vars.partition] || []; | ||
this.word64bs('offset') | ||
.tap(function (vars) { | ||
topics[vars.topic][vars.partition].push(vars.offset); | ||
if (vars.offset) topics[vars.topic][vars.partition].push(vars.offset); | ||
}); | ||
@@ -493,0 +493,0 @@ } |
{ | ||
"name": "kafka-node", | ||
"description": "node client for Apache kafka, only support kafka 0.8 and above", | ||
"version": "0.2.3", | ||
"version": "0.2.4", | ||
"main": "kafka.js", | ||
@@ -6,0 +6,0 @@ "dependencies": { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package