kafka-node
Advanced tools
Comparing version 0.2.24 to 0.2.25
# kafka-node CHANGELOG | ||
## 2015-04-01, Version 0.2.25 | ||
- Producer support `requireAcks` option [#187](https://github.com/SOHU-Co/kafka-node/pull/187) | ||
- Update examples [#185](https://github.com/SOHU-Co/kafka-node/pull/185) | ||
## 2015-03-20, Version 0.2.24 | ||
- Bump deps | ||
- Refresh metadata after auto rebalance among brokers #180 | ||
- Initialize partition owner with consumerId #178 | ||
- Refresh metadata after auto rebalance among brokers [#180](https://github.com/SOHU-Co/kafka-node/pull/180) | ||
- Initialize partition owner with consumerId [#178](https://github.com/SOHU-Co/kafka-node/pull/178) | ||
## 2015-03-17, Version 0.2.23 | ||
- Fix #175: Refresh topic metadata in Producer when broker change | ||
- Fix [#175](https://github.com/SOHU-Co/kafka-node/issues/175): Refresh topic metadata in Producer when broker change | ||
- Refactor Client#refreshMetadata method | ||
- Add the missing semicolons, no offense, just keep style. | ||
- Fix #170: In case of `offsetOutOfRange`, the consumer should be paused. | ||
- Fix #169: When paused why try to fetch every 1000 ms? | ||
- Fix [#170](https://github.com/SOHU-Co/kafka-node/issues/170): In case of `offsetOutOfRange`, the consumer should be paused. | ||
- Fix [#169](https://github.com/SOHU-Co/kafka-node/issues/169): When paused why try to fetch every 1000 ms? | ||
- Ref: remove unused variables. |
'use strict'; | ||
var kafka = require('../kafka'); | ||
var kafka = require('..'); | ||
var Consumer = kafka.Consumer; | ||
var Producer = kafka.Producer; | ||
var Offset = kafka.Offset; | ||
@@ -16,22 +15,24 @@ var Client = kafka.Client; | ||
], | ||
options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 }; | ||
options = { autoCommit: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 }; | ||
function createConsumer(topics) { | ||
var consumer = new Consumer(client, topics, options); | ||
var offset = new Offset(client); | ||
consumer.on('message', function (message) { | ||
console.log(this.id, message); | ||
var consumer = new Consumer(client, topics, options); | ||
var offset = new Offset(client); | ||
consumer.on('message', function (message) { | ||
console.log(message); | ||
}); | ||
consumer.on('error', function (err) { | ||
console.log('error', err); | ||
}); | ||
/* | ||
* If consumer get `offsetOutOfRange` event, fetch data from the smallest(oldest) offset | ||
*/ | ||
consumer.on('offsetOutOfRange', function (topic) { | ||
topic.maxNum = 2; | ||
offset.fetch([topic], function (err, offsets) { | ||
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); | ||
consumer.setOffset(topic.topic, topic.partition, min); | ||
}); | ||
consumer.on('error', function (err) { | ||
console.log('error', err); | ||
}); | ||
consumer.on('offsetOutOfRange', function (topic) { | ||
topic.maxNum = 2; | ||
offset.fetch([topic], function (err, offsets) { | ||
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); | ||
consumer.setOffset(topic.topic, topic.partition, min); | ||
}); | ||
}) | ||
} | ||
createConsumer(topics); | ||
}); |
'use strict'; | ||
var kafka = require('../kafka'); | ||
var kafka = require('..'); | ||
var HighLevelConsumer = kafka.HighLevelConsumer; | ||
var Offset = kafka.Offset; | ||
var Client = kafka.Client; | ||
@@ -11,18 +10,11 @@ var argv = require('optimist').argv; | ||
var topics = [ { topic: topic }]; | ||
var options = { autoCommit: true, fromBeginning: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 }; | ||
var options = { autoCommit: true, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 }; | ||
var consumer = new HighLevelConsumer(client, topics, options); | ||
var offset = new Offset(client); | ||
consumer.on('message', function (message) { | ||
console.log(this.id, message); | ||
console.log(message); | ||
}); | ||
consumer.on('error', function (err) { | ||
console.log('error', err); | ||
}); | ||
consumer.on('offsetOutOfRange', function (topic) { | ||
topic.maxNum = 2; | ||
offset.fetch([topic], function (err, offsets) { | ||
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); | ||
consumer.setOffset(topic.topic, topic.partition, min); | ||
}); | ||
}); |
@@ -1,2 +0,2 @@ | ||
var kafka = require('../kafka'); | ||
var kafka = require('..'); | ||
var HighLevelProducer = kafka.HighLevelProducer; | ||
@@ -7,20 +7,22 @@ var Client = kafka.Client; | ||
var topic = argv.topic || 'topic1'; | ||
var count = 3, rets = 0; | ||
var count = 10, rets = 0; | ||
var producer = new HighLevelProducer(client); | ||
producer.on('ready', function () { | ||
send('hello'); | ||
setTimeout(function () { | ||
send('world'); | ||
send('world'); | ||
}, 2000); | ||
setInterval(send, 1000); | ||
}); | ||
function send(message) { | ||
producer.on('error', function (err) { | ||
console.log('error', err) | ||
}) | ||
function send() { | ||
var message = new Date().toString(); | ||
producer.send([ | ||
{topic: topic, messages: [message] } | ||
{topic: topic, messages: [message] } | ||
], function (err, data) { | ||
if (err) console.log(arguments); | ||
if (++rets === count) process.exit(); | ||
if (err) console.log(err); | ||
else console.log('send %d messages', ++rets); | ||
if (rets === count) process.exit(); | ||
}); | ||
} | ||
} |
'use strict'; | ||
var kafka = require('../kafka'); | ||
var kafka = require('..'); | ||
var Client = kafka.Client; | ||
var Offset = kafka.Offset; | ||
var offset = new Offset(new Client()); | ||
var total = 30000; | ||
var count = 0; | ||
var topic = 'topic1'; | ||
function fetch (cb) { | ||
offset.fetch([ | ||
{topic: 't2', partition: 0, maxNum: 2}, | ||
{ topic: 'topic2', offset: 100, partition: 0 }], | ||
cb ); | ||
} | ||
// Fetch available offsets | ||
offset.fetch([ | ||
{ topic: topic, partition: 0, maxNum: 2 }, | ||
{ topic: topic, partition: 1 }, | ||
], function (err, offsets) { | ||
console.log(err || offsets); | ||
}); | ||
function commit (cb) { | ||
offset.commit('group-offset', | ||
[ | ||
{ topic: 't2', offset: 10, partition: 0 }, | ||
{ topic: 'topic2', offset: 100, partition: 0 }, | ||
], cb); | ||
} | ||
function fetchCommits (cb) { | ||
offset.fetchCommits( | ||
'kafka-node-group', | ||
[ | ||
{ topic: 't2', partition: 0 }, | ||
{ topic: 'topic2', partition: 0 }, | ||
], cb); | ||
} | ||
fetchCommits(function () { | ||
console.log(arguments); | ||
// Fetch commited offset | ||
offset.commit('kafka-node-group', [ | ||
{ topic: topic, partition: 0 } | ||
], function (err, result) { | ||
console.log(err || result); | ||
}); |
@@ -1,7 +0,6 @@ | ||
var kafka = require('../kafka'), | ||
Producer = kafka.Producer, | ||
KeyedMessage = kafka.KeyedMessage, | ||
Client = kafka.Client, | ||
client = new Client('localhost:2181'); | ||
var kafka = require('..'); | ||
var Producer = kafka.Producer; | ||
var KeyedMessage = kafka.KeyedMessage; | ||
var Client = kafka.Client; | ||
var client = new Client('localhost:2181'); | ||
var argv = require('optimist').argv; | ||
@@ -11,10 +10,14 @@ var topic = argv.topic || 'topic1'; | ||
var a = argv.a || 0; | ||
var count = argv.count || 1, rets = 0; | ||
var producer = new Producer(client); | ||
var producer = new Producer(client, { requireAcks: 1 }); | ||
producer.on('ready', function () { | ||
send([ | ||
'hello', | ||
new KeyedMessage('keyed', 'keyed message') | ||
]); | ||
var message = 'a message'; | ||
var keyedMessage = new KeyedMessage('keyed', 'a keyed message'); | ||
producer.send([ | ||
{ topic: topic, partition: p, messages: [message, keyedMessage], attributes: a } | ||
], function (err, result) { | ||
console.log(err || result); | ||
process.exit(); | ||
}); | ||
}); | ||
@@ -24,13 +27,2 @@ | ||
console.log('error', err) | ||
}) | ||
function send(messages) { | ||
for (var i = 0; i < count; i++) { | ||
producer.send([ | ||
{topic: topic, messages: messages , partition: p, attributes: a} | ||
], function (err, data) { | ||
if (err) console.log(arguments); | ||
if (++rets === count) process.exit(); | ||
}); | ||
} | ||
} | ||
}); |
@@ -101,7 +101,10 @@ 'use strict'; | ||
Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) { | ||
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes), | ||
decoder = protocol.decodeFetchResponse(function (err, type, message) { | ||
var self = this; | ||
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes); | ||
var decoder = protocol.decodeFetchResponse(function (err, type, message) { | ||
if (err) { | ||
if (err.message === 'OffsetOutOfRange') { | ||
return consumer.emit('offsetOutOfRange', err); | ||
} else if (err.message === 'NotLeaderForPartition') { | ||
return self.emit('brokersChanged'); | ||
} | ||
@@ -134,9 +137,20 @@ | ||
Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, cb) { | ||
var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs), | ||
decoder = protocol.decodeProduceResponse, | ||
self = this; | ||
var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs); | ||
var decoder = protocol.decodeProduceResponse; | ||
var self = this; | ||
decoder.requireAcks = requireAcks; | ||
async.each(payloads, buildRequest, function (err) { | ||
if (err) return cb(err); | ||
self.send(payloads, encoder, decoder, cb); | ||
self.send(payloads, encoder, decoder, function (err, result) { | ||
if (err) { | ||
if (err.message === 'NotLeaderForPartition') { | ||
self.emit('brokersChanged'); | ||
} | ||
cb(err); | ||
} else { | ||
cb(null, result); | ||
} | ||
}); | ||
}); | ||
@@ -214,3 +228,3 @@ | ||
this.queueCallback(broker, correlationId, [protocol.decodeMetadataResponse, cb]); | ||
broker && broker.write(request); | ||
broker.write(request); | ||
}; | ||
@@ -349,3 +363,3 @@ | ||
if (err) { | ||
debug('refresh metadta error', err.message) | ||
debug('refresh metadata error', err.message) | ||
return cb(err); | ||
@@ -410,4 +424,10 @@ } | ||
} | ||
this.queueCallback(broker, correlationId, [decoder, cb]); | ||
broker && broker.write(request); | ||
if (decoder.requireAcks == 0) { | ||
broker.write(request); | ||
cb(null, { result: 'no ack' }); | ||
} else { | ||
this.queueCallback(broker, correlationId, [decoder, cb]); | ||
broker.write(request); | ||
} | ||
} | ||
@@ -543,5 +563,8 @@ }; | ||
if (!handlers) return; | ||
var decoder = handlers[0], | ||
cb = handlers[1]; | ||
cb.call(this, null, decoder(resp)); | ||
var decoder = handlers[0]; | ||
var cb = handlers[1]; | ||
var result = decoder(resp); | ||
(result instanceof Error) | ||
? cb.call(this, result) | ||
: cb.call(this, null, result); | ||
socket.buffer = socket.buffer.slice(size); | ||
@@ -548,0 +571,0 @@ if (socket.longpolling) socket.waitting = false; |
@@ -108,3 +108,3 @@ 'use strict'; | ||
// Wait for the consumer to be ready | ||
this.on('ready', function () { | ||
this.on('rebalanced', function () { | ||
self.fetchOffset(self.topicPayloads, function (err, topics) { | ||
@@ -118,3 +118,2 @@ if (err) { | ||
self.fetch(); | ||
self.emit('rebalanced'); | ||
}); | ||
@@ -154,5 +153,3 @@ }); | ||
// Nasty hack to retry 3 times to re-balance - TBD fix this | ||
var oldTopicPayloads = JSON.parse(JSON.stringify(self.topicPayloads)); | ||
self.topicPayloads = []; | ||
var oldTopicPayloads = self.topicPayloads; | ||
var operation = retry.operation({ | ||
@@ -185,3 +182,3 @@ retries: 10, | ||
} else { | ||
self.emit('ready'); | ||
self.emit('rebalanced'); | ||
} | ||
@@ -204,3 +201,3 @@ }); | ||
self.client.zk.on('consumersChanged', rebalance); | ||
self.client.zk.on('brokersChanged', rebalance); | ||
self.client.on('brokersChanged', rebalance); | ||
} | ||
@@ -212,3 +209,3 @@ | ||
self.client.zk.removeListener('consumersChanged', rebalance); | ||
self.client.zk.removeListener('brokersChanged', rebalance); | ||
self.client.removeListener('brokersChanged', rebalance); | ||
} | ||
@@ -576,3 +573,2 @@ | ||
HighLevelConsumer.prototype.stop = function (cb) { | ||
this.ready = false; | ||
if (!this.options.autoCommit) return cb && cb(); | ||
@@ -579,0 +575,0 @@ this.commit(true, function (err) { |
@@ -32,3 +32,3 @@ 'use strict'; | ||
var HighLevelProducer = function (client, options) { | ||
var useOptions = options || {}; | ||
options = options || {}; | ||
@@ -38,4 +38,8 @@ this.ready = false; | ||
this.requireAcks = useOptions.requireAcks || DEFAULTS.requireAcks | ||
this.ackTimeoutMs = useOptions.ackTimeoutMs || DEFAULTS.ackTimeoutMs | ||
this.requireAcks = options.requireAcks === undefined | ||
? DEFAULTS.requireAcks | ||
: options.requireAcks; | ||
this.ackTimeoutMs = options.ackTimeoutMs === undefined | ||
? DEFAULTS.ackTimeoutMs | ||
: options.ackTimeoutMs; | ||
@@ -42,0 +46,0 @@ this.connect(); |
@@ -33,3 +33,3 @@ 'use strict'; | ||
var Producer = function (client, options) { | ||
var useOptions = options || {}; | ||
options = options || {}; | ||
@@ -39,4 +39,8 @@ this.ready = false; | ||
this.requireAcks = useOptions.requireAcks || DEFAULTS.requireAcks | ||
this.ackTimeoutMs = useOptions.ackTimeoutMs || DEFAULTS.ackTimeoutMs | ||
this.requireAcks = options.requireAcks === undefined | ||
? DEFAULTS.requireAcks | ||
: options.requireAcks; | ||
this.ackTimeoutMs = options.ackTimeoutMs === undefined | ||
? DEFAULTS.ackTimeoutMs | ||
: options.ackTimeoutMs; | ||
@@ -43,0 +47,0 @@ this.connect(); |
@@ -115,3 +115,3 @@ 'use strict'; | ||
if (vars.errorCode !== 0) | ||
cb({ topic: vars.topic, partition: vars.partition, message: ERROR_CODE[vars.errorCode] }); | ||
return cb({ topic: vars.topic, partition: vars.partition, message: ERROR_CODE[vars.errorCode] }); | ||
var messageSet = decodeMessageSet(vars.topic, vars.partition, vars.messageSet, cb, maxTickMessages); | ||
@@ -396,2 +396,3 @@ if (messageSet.length) { | ||
var topics = {}; | ||
var error; | ||
Binary.parse(resp) | ||
@@ -410,6 +411,10 @@ .word32bs('size') | ||
.tap(function (vars) { | ||
topics[vars.topic][vars.partition] = vars.offset; | ||
if (vars.errorCode) { | ||
error = new Error(ERROR_CODE[vars.errorCode]); | ||
} else { | ||
topics[vars.topic][vars.partition] = vars.offset; | ||
} | ||
}); | ||
} | ||
return topics; | ||
return error || topics; | ||
} | ||
@@ -416,0 +421,0 @@ |
@@ -329,4 +329,4 @@ 'use strict'; | ||
path, | ||
new Buffer(consumerId), | ||
null, | ||
null, | ||
zookeeper.CreateMode.EPHEMERAL, | ||
@@ -340,12 +340,2 @@ function (error, path) { | ||
function (callback) { | ||
self.client.setData(path, new Buffer(consumerId), function (error, stat) { | ||
if (error) { | ||
callback(error); | ||
} | ||
else { | ||
callback(); | ||
} | ||
}); | ||
}, | ||
function (callback) { | ||
self.client.exists(path, null, function (error, stat) { | ||
@@ -352,0 +342,0 @@ if (error) { |
{ | ||
"name": "kafka-node", | ||
"description": "node client for Apache kafka, only support kafka 0.8 and above", | ||
"version": "0.2.24", | ||
"version": "0.2.25", | ||
"main": "kafka.js", | ||
@@ -6,0 +6,0 @@ "dependencies": { |
@@ -30,4 +30,5 @@ Kafka-node | ||
## Producer | ||
### Producer(client) | ||
### Producer(client, [options]) | ||
* `client`: client which keeps a connection with the Kafka server. | ||
* `options`: set `requireAcks` and `ackTimeoutMs` for producer, the default value is `{requireAcks: 1, ackTimeoutMs: 100}` | ||
@@ -108,4 +109,5 @@ ``` js | ||
## HighLevelProducer | ||
### HighLevelProducer(client) | ||
### HighLevelProducer(client, [options]) | ||
* `client`: client which keeps a connection with the Kafka server. Round-robins produce requests to the available topic partitions | ||
* `options`: set `requireAcks` and `ackTimeoutMs` for producer, the default value is `{requireAcks: 1, ackTimeoutMs: 100}` | ||
@@ -112,0 +114,0 @@ ``` js |
@@ -8,3 +8,3 @@ 'use strict'; | ||
var client, producer; | ||
var client, producer, noAckProducer; | ||
@@ -30,2 +30,3 @@ var TOPIC_POSTFIX = '_test_' + Date.now(); | ||
}); | ||
noAckProducer = new Producer(client, { requireAcks: 0 }); | ||
}); | ||
@@ -121,2 +122,12 @@ | ||
}); | ||
it('should send message without ack', function (done) { | ||
noAckProducer.send([{ | ||
topic: EXISTS_TOPIC_3, messages: 'hello kafka' | ||
}], function (err, message) { | ||
if (err) return done(err); | ||
message.result.should.equal('no ack'); | ||
done(); | ||
}); | ||
}) | ||
}); | ||
@@ -123,0 +134,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
584
1
148650
37
3400