Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafka-node

Package Overview
Dependencies
Maintainers
1
Versions
113
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-node - npm Package Compare versions

Comparing version 0.2.24 to 0.2.25

14

CHANGELOG.md
# kafka-node CHANGELOG
## 2015-04-01, Version 0.2.25
- Producer support `requireAcks` option [#187](https://github.com/SOHU-Co/kafka-node/pull/187)
- Update examples [#185](https://github.com/SOHU-Co/kafka-node/pull/185)
## 2015-03-20, Version 0.2.24
- Bump deps
- Refresh metadata after auto rebalance among brokers #180
- Initialize partition owner with consumerId #178
- Refresh metadata after auto rebalance among brokers [#180](https://github.com/SOHU-Co/kafka-node/pull/180)
- Initialize partition owner with consumerId [#178](https://github.com/SOHU-Co/kafka-node/pull/178)
## 2015-03-17, Version 0.2.23
- Fix #175: Refresh topic metadata in Producer when broker change
- Fix [#175](https://github.com/SOHU-Co/kafka-node/issues/175): Refresh topic metadata in Producer when broker change
- Refactor Client#refreshMetadata method
- Add the missing semicolons, no offense, just keep style.
- Fix #170: In case of `offsetOutOfRange`, the consumer should be paused.
- Fix #169: When paused why try to fetch every 1000 ms?
- Fix [#170](https://github.com/SOHU-Co/kafka-node/issues/170): In case of `offsetOutOfRange`, the consumer should be paused.
- Fix [#169](https://github.com/SOHU-Co/kafka-node/issues/169): When paused why try to fetch every 1000 ms?
- Ref: remove unused variables.
'use strict';
var kafka = require('../kafka');
var kafka = require('..');
var Consumer = kafka.Consumer;
var Producer = kafka.Producer;
var Offset = kafka.Offset;

@@ -16,22 +15,24 @@ var Client = kafka.Client;

],
options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 };
options = { autoCommit: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 };
function createConsumer(topics) {
var consumer = new Consumer(client, topics, options);
var offset = new Offset(client);
consumer.on('message', function (message) {
console.log(this.id, message);
var consumer = new Consumer(client, topics, options);
var offset = new Offset(client);
consumer.on('message', function (message) {
console.log(message);
});
consumer.on('error', function (err) {
console.log('error', err);
});
/*
* If consumer get `offsetOutOfRange` event, fetch data from the smallest(oldest) offset
*/
consumer.on('offsetOutOfRange', function (topic) {
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
consumer.setOffset(topic.topic, topic.partition, min);
});
consumer.on('error', function (err) {
console.log('error', err);
});
consumer.on('offsetOutOfRange', function (topic) {
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
consumer.setOffset(topic.topic, topic.partition, min);
});
})
}
createConsumer(topics);
});
'use strict';
var kafka = require('../kafka');
var kafka = require('..');
var HighLevelConsumer = kafka.HighLevelConsumer;
var Offset = kafka.Offset;
var Client = kafka.Client;

@@ -11,18 +10,11 @@ var argv = require('optimist').argv;

var topics = [ { topic: topic }];
var options = { autoCommit: true, fromBeginning: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 };
var options = { autoCommit: true, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024*1024 };
var consumer = new HighLevelConsumer(client, topics, options);
var offset = new Offset(client);
consumer.on('message', function (message) {
console.log(this.id, message);
console.log(message);
});
consumer.on('error', function (err) {
console.log('error', err);
});
consumer.on('offsetOutOfRange', function (topic) {
topic.maxNum = 2;
offset.fetch([topic], function (err, offsets) {
var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
consumer.setOffset(topic.topic, topic.partition, min);
});
});

@@ -1,2 +0,2 @@

var kafka = require('../kafka');
var kafka = require('..');
var HighLevelProducer = kafka.HighLevelProducer;

@@ -7,20 +7,22 @@ var Client = kafka.Client;

var topic = argv.topic || 'topic1';
var count = 3, rets = 0;
var count = 10, rets = 0;
var producer = new HighLevelProducer(client);
producer.on('ready', function () {
send('hello');
setTimeout(function () {
send('world');
send('world');
}, 2000);
setInterval(send, 1000);
});
function send(message) {
producer.on('error', function (err) {
console.log('error', err)
})
function send() {
var message = new Date().toString();
producer.send([
{topic: topic, messages: [message] }
{topic: topic, messages: [message] }
], function (err, data) {
if (err) console.log(arguments);
if (++rets === count) process.exit();
if (err) console.log(err);
else console.log('send %d messages', ++rets);
if (rets === count) process.exit();
});
}
}
'use strict';
var kafka = require('../kafka');
var kafka = require('..');
var Client = kafka.Client;
var Offset = kafka.Offset;
var offset = new Offset(new Client());
var total = 30000;
var count = 0;
var topic = 'topic1';
function fetch (cb) {
offset.fetch([
{topic: 't2', partition: 0, maxNum: 2},
{ topic: 'topic2', offset: 100, partition: 0 }],
cb );
}
// Fetch available offsets
offset.fetch([
{ topic: topic, partition: 0, maxNum: 2 },
{ topic: topic, partition: 1 },
], function (err, offsets) {
console.log(err || offsets);
});
function commit (cb) {
offset.commit('group-offset',
[
{ topic: 't2', offset: 10, partition: 0 },
{ topic: 'topic2', offset: 100, partition: 0 },
], cb);
}
function fetchCommits (cb) {
offset.fetchCommits(
'kafka-node-group',
[
{ topic: 't2', partition: 0 },
{ topic: 'topic2', partition: 0 },
], cb);
}
fetchCommits(function () {
console.log(arguments);
// Fetch commited offset
offset.commit('kafka-node-group', [
{ topic: topic, partition: 0 }
], function (err, result) {
console.log(err || result);
});

@@ -1,7 +0,6 @@

var kafka = require('../kafka'),
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
Client = kafka.Client,
client = new Client('localhost:2181');
var kafka = require('..');
var Producer = kafka.Producer;
var KeyedMessage = kafka.KeyedMessage;
var Client = kafka.Client;
var client = new Client('localhost:2181');
var argv = require('optimist').argv;

@@ -11,10 +10,14 @@ var topic = argv.topic || 'topic1';

var a = argv.a || 0;
var count = argv.count || 1, rets = 0;
var producer = new Producer(client);
var producer = new Producer(client, { requireAcks: 1 });
producer.on('ready', function () {
send([
'hello',
new KeyedMessage('keyed', 'keyed message')
]);
var message = 'a message';
var keyedMessage = new KeyedMessage('keyed', 'a keyed message');
producer.send([
{ topic: topic, partition: p, messages: [message, keyedMessage], attributes: a }
], function (err, result) {
console.log(err || result);
process.exit();
});
});

@@ -24,13 +27,2 @@

console.log('error', err)
})
function send(messages) {
for (var i = 0; i < count; i++) {
producer.send([
{topic: topic, messages: messages , partition: p, attributes: a}
], function (err, data) {
if (err) console.log(arguments);
if (++rets === count) process.exit();
});
}
}
});

@@ -101,7 +101,10 @@ 'use strict';

Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs, fetchMinBytes, maxTickMessages) {
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes),
decoder = protocol.decodeFetchResponse(function (err, type, message) {
var self = this;
var encoder = protocol.encodeFetchRequest(fetchMaxWaitMs, fetchMinBytes);
var decoder = protocol.decodeFetchResponse(function (err, type, message) {
if (err) {
if (err.message === 'OffsetOutOfRange') {
return consumer.emit('offsetOutOfRange', err);
} else if (err.message === 'NotLeaderForPartition') {
return self.emit('brokersChanged');
}

@@ -134,9 +137,20 @@

Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, cb) {
var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs),
decoder = protocol.decodeProduceResponse,
self = this;
var encoder = protocol.encodeProduceRequest(requireAcks, ackTimeoutMs);
var decoder = protocol.decodeProduceResponse;
var self = this;
decoder.requireAcks = requireAcks;
async.each(payloads, buildRequest, function (err) {
if (err) return cb(err);
self.send(payloads, encoder, decoder, cb);
self.send(payloads, encoder, decoder, function (err, result) {
if (err) {
if (err.message === 'NotLeaderForPartition') {
self.emit('brokersChanged');
}
cb(err);
} else {
cb(null, result);
}
});
});

@@ -214,3 +228,3 @@

this.queueCallback(broker, correlationId, [protocol.decodeMetadataResponse, cb]);
broker && broker.write(request);
broker.write(request);
};

@@ -349,3 +363,3 @@

if (err) {
debug('refresh metadta error', err.message)
debug('refresh metadata error', err.message)
return cb(err);

@@ -410,4 +424,10 @@ }

}
this.queueCallback(broker, correlationId, [decoder, cb]);
broker && broker.write(request);
if (decoder.requireAcks == 0) {
broker.write(request);
cb(null, { result: 'no ack' });
} else {
this.queueCallback(broker, correlationId, [decoder, cb]);
broker.write(request);
}
}

@@ -543,5 +563,8 @@ };

if (!handlers) return;
var decoder = handlers[0],
cb = handlers[1];
cb.call(this, null, decoder(resp));
var decoder = handlers[0];
var cb = handlers[1];
var result = decoder(resp);
(result instanceof Error)
? cb.call(this, result)
: cb.call(this, null, result);
socket.buffer = socket.buffer.slice(size);

@@ -548,0 +571,0 @@ if (socket.longpolling) socket.waitting = false;

@@ -108,3 +108,3 @@ 'use strict';

// Wait for the consumer to be ready
this.on('ready', function () {
this.on('rebalanced', function () {
self.fetchOffset(self.topicPayloads, function (err, topics) {

@@ -118,3 +118,2 @@ if (err) {

self.fetch();
self.emit('rebalanced');
});

@@ -154,5 +153,3 @@ });

// Nasty hack to retry 3 times to re-balance - TBD fix this
var oldTopicPayloads = JSON.parse(JSON.stringify(self.topicPayloads));
self.topicPayloads = [];
var oldTopicPayloads = self.topicPayloads;
var operation = retry.operation({

@@ -185,3 +182,3 @@ retries: 10,

} else {
self.emit('ready');
self.emit('rebalanced');
}

@@ -204,3 +201,3 @@ });

self.client.zk.on('consumersChanged', rebalance);
self.client.zk.on('brokersChanged', rebalance);
self.client.on('brokersChanged', rebalance);
}

@@ -212,3 +209,3 @@

self.client.zk.removeListener('consumersChanged', rebalance);
self.client.zk.removeListener('brokersChanged', rebalance);
self.client.removeListener('brokersChanged', rebalance);
}

@@ -576,3 +573,2 @@

HighLevelConsumer.prototype.stop = function (cb) {
this.ready = false;
if (!this.options.autoCommit) return cb && cb();

@@ -579,0 +575,0 @@ this.commit(true, function (err) {

@@ -32,3 +32,3 @@ 'use strict';

var HighLevelProducer = function (client, options) {
var useOptions = options || {};
options = options || {};

@@ -38,4 +38,8 @@ this.ready = false;

this.requireAcks = useOptions.requireAcks || DEFAULTS.requireAcks
this.ackTimeoutMs = useOptions.ackTimeoutMs || DEFAULTS.ackTimeoutMs
this.requireAcks = options.requireAcks === undefined
? DEFAULTS.requireAcks
: options.requireAcks;
this.ackTimeoutMs = options.ackTimeoutMs === undefined
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;

@@ -42,0 +46,0 @@ this.connect();

@@ -33,3 +33,3 @@ 'use strict';

var Producer = function (client, options) {
var useOptions = options || {};
options = options || {};

@@ -39,4 +39,8 @@ this.ready = false;

this.requireAcks = useOptions.requireAcks || DEFAULTS.requireAcks
this.ackTimeoutMs = useOptions.ackTimeoutMs || DEFAULTS.ackTimeoutMs
this.requireAcks = options.requireAcks === undefined
? DEFAULTS.requireAcks
: options.requireAcks;
this.ackTimeoutMs = options.ackTimeoutMs === undefined
? DEFAULTS.ackTimeoutMs
: options.ackTimeoutMs;

@@ -43,0 +47,0 @@ this.connect();

@@ -115,3 +115,3 @@ 'use strict';

if (vars.errorCode !== 0)
cb({ topic: vars.topic, partition: vars.partition, message: ERROR_CODE[vars.errorCode] });
return cb({ topic: vars.topic, partition: vars.partition, message: ERROR_CODE[vars.errorCode] });
var messageSet = decodeMessageSet(vars.topic, vars.partition, vars.messageSet, cb, maxTickMessages);

@@ -396,2 +396,3 @@ if (messageSet.length) {

var topics = {};
var error;
Binary.parse(resp)

@@ -410,6 +411,10 @@ .word32bs('size')

.tap(function (vars) {
topics[vars.topic][vars.partition] = vars.offset;
if (vars.errorCode) {
error = new Error(ERROR_CODE[vars.errorCode]);
} else {
topics[vars.topic][vars.partition] = vars.offset;
}
});
}
return topics;
return error || topics;
}

@@ -416,0 +421,0 @@

@@ -329,4 +329,4 @@ 'use strict';

path,
new Buffer(consumerId),
null,
null,
zookeeper.CreateMode.EPHEMERAL,

@@ -340,12 +340,2 @@ function (error, path) {

function (callback) {
self.client.setData(path, new Buffer(consumerId), function (error, stat) {
if (error) {
callback(error);
}
else {
callback();
}
});
},
function (callback) {
self.client.exists(path, null, function (error, stat) {

@@ -352,0 +342,0 @@ if (error) {

{
"name": "kafka-node",
"description": "node client for Apache kafka, only support kafka 0.8 and above",
"version": "0.2.24",
"version": "0.2.25",
"main": "kafka.js",

@@ -6,0 +6,0 @@ "dependencies": {

@@ -30,4 +30,5 @@ Kafka-node

## Producer
### Producer(client)
### Producer(client, [options])
* `client`: client which keeps a connection with the Kafka server.
* `options`: set `requireAcks` and `ackTimeoutMs` for producer, the default value is `{requireAcks: 1, ackTimeoutMs: 100}`

@@ -108,4 +109,5 @@ ``` js

## HighLevelProducer
### HighLevelProducer(client)
### HighLevelProducer(client, [options])
* `client`: client which keeps a connection with the Kafka server. Round-robins produce requests to the available topic partitions
* `options`: set `requireAcks` and `ackTimeoutMs` for producer, the default value is `{requireAcks: 1, ackTimeoutMs: 100}`

@@ -112,0 +114,0 @@ ``` js

@@ -8,3 +8,3 @@ 'use strict';

var client, producer;
var client, producer, noAckProducer;

@@ -30,2 +30,3 @@ var TOPIC_POSTFIX = '_test_' + Date.now();

});
noAckProducer = new Producer(client, { requireAcks: 0 });
});

@@ -121,2 +122,12 @@

});
it('should send message without ack', function (done) {
noAckProducer.send([{
topic: EXISTS_TOPIC_3, messages: 'hello kafka'
}], function (err, message) {
if (err) return done(err);
message.result.should.equal('no ack');
done();
});
})
});

@@ -123,0 +134,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc