kafka-node
Advanced tools
Comparing version 2.4.1 to 2.5.0
# kafka-node CHANGELOG | ||
## 2018-04-09, Version 2.5.0 | ||
* Explicitly cast key to string in hashCode function for `KeyedPartitioner` [#870](https://github.com/SOHU-Co/kafka-node/pull/870) | ||
* For consumer fetch loop we now clear `socket.waiting` before invoking callbacks [#819](https://github.com/SOHU-Co/kafka-node/pull/819) | ||
* Add Support for IPv6 [#818](https://github.com/SOHU-Co/kafka-node/pull/818) | ||
* Clear internal topicPayload array if no topic partitions are assigned to the `ConsumerGroup` [#888](https://github.com/SOHU-Co/kafka-node/pull/888) | ||
* Fix Stale Commit Queue for `ConsumerGroupStream` [#891](https://github.com/SOHU-Co/kafka-node/pull/891) | ||
* For rebalance case `ConsumerGroup` will try to commit before joining a group. Added `onRebalance` callback for manual commit users [#889](https://github.com/SOHU-Co/kafka-node/pull/889) | ||
## 2018-02-06, Version 2.4.1 | ||
@@ -4,0 +12,0 @@ |
@@ -5,5 +5,6 @@ 'use strict'; | ||
var util = require('util'); | ||
var events = require('events'); | ||
var EventEmitter = require('events'); | ||
var Admin = function (kafkaClient) { | ||
function Admin (kafkaClient) { | ||
EventEmitter.call(this); | ||
if (!(kafkaClient instanceof KafkaClient)) { | ||
@@ -26,4 +27,4 @@ throw new Error("'Admin' only accepts 'KafkaClient' for its kafka client."); | ||
}); | ||
}; | ||
util.inherits(Admin, events.EventEmitter); | ||
} | ||
util.inherits(Admin, EventEmitter); | ||
@@ -30,0 +31,0 @@ Admin.prototype.listGroups = function (cb) { |
@@ -5,3 +5,3 @@ 'use strict'; | ||
var util = require('util'); | ||
var events = require('events'); | ||
var EventEmitter = require('events'); | ||
var _ = require('lodash'); | ||
@@ -57,2 +57,3 @@ var protocol = require('./protocol'); | ||
function BaseProducer (client, options, defaultPartitionerType, customPartitioner) { | ||
EventEmitter.call(this); | ||
options = options || {}; | ||
@@ -80,3 +81,3 @@ | ||
util.inherits(BaseProducer, events.EventEmitter); | ||
util.inherits(BaseProducer, EventEmitter); | ||
@@ -83,0 +84,0 @@ BaseProducer.prototype.connect = function () { |
@@ -10,3 +10,3 @@ 'use strict'; | ||
var retry = require('retry'); | ||
var events = require('events'); | ||
var EventEmitter = require('events'); | ||
var errors = require('./errors'); | ||
@@ -51,6 +51,8 @@ var getCodec = require('./codec'); | ||
*/ | ||
var Client = function (connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions) { | ||
function Client (connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions) { | ||
if (this instanceof Client === false) { | ||
return new Client(connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions); | ||
} | ||
EventEmitter.call(this); | ||
this.setMaxListeners(20); // waitUntilReady can trigger lots of listeners under high activity, so increase threshold | ||
@@ -78,5 +80,5 @@ this.sslOptions = sslOptions; | ||
this.connect(); | ||
}; | ||
} | ||
util.inherits(Client, events.EventEmitter); | ||
util.inherits(Client, EventEmitter); | ||
@@ -765,2 +767,6 @@ Client.prototype.connect = function () { | ||
if (buffer.length >= size) { | ||
if (socket.longpolling) { | ||
socket.waiting = false; | ||
} | ||
var resp = buffer.shallowSlice(0, size); | ||
@@ -779,5 +785,2 @@ var correlationId = resp.readUInt32BE(4); | ||
buffer.consume(size); | ||
if (socket.longpolling) { | ||
socket.waiting = false; | ||
} | ||
} else { | ||
@@ -784,0 +787,0 @@ return; |
@@ -5,3 +5,3 @@ 'use strict'; | ||
var _ = require('lodash'); | ||
var events = require('events'); | ||
var EventEmitter = require('events'); | ||
var logger = require('./logging')('kafka-node:Consumer'); | ||
@@ -30,3 +30,4 @@ var utils = require('./utils'); | ||
var Consumer = function (client, topics, options) { | ||
function Consumer (client, topics, options) { | ||
EventEmitter.call(this); | ||
if (!topics) { | ||
@@ -51,4 +52,4 @@ throw new Error('Must have payloads'); | ||
} | ||
}; | ||
util.inherits(Consumer, events.EventEmitter); | ||
} | ||
util.inherits(Consumer, EventEmitter); | ||
@@ -55,0 +56,0 @@ Consumer.prototype.buildPayloads = function (payloads) { |
@@ -6,3 +6,3 @@ 'use strict'; | ||
const EventEmitter = require('events'); | ||
const highLevelConsumer = require('./highLevelConsumer'); | ||
const HighLevelConsumer = require('./highLevelConsumer'); | ||
const Client = require('./client'); | ||
@@ -51,2 +51,3 @@ const KafkaClient = require('./kafkaClient'); | ||
migrateRolling: true, | ||
onRebalance: null, | ||
protocol: ['roundrobin'] | ||
@@ -56,3 +57,3 @@ }; | ||
function ConsumerGroup (memberOptions, topics) { | ||
EventEmitter.call(this); | ||
EventEmitter.call(this); // Intentionally not calling HighLevelConsumer to avoid constructor logic | ||
const self = this; | ||
@@ -205,3 +206,3 @@ this.options = _.defaults(memberOptions || {}, DEFAULTS); | ||
util.inherits(ConsumerGroup, highLevelConsumer); | ||
util.inherits(ConsumerGroup, HighLevelConsumer); | ||
@@ -403,2 +404,3 @@ ConsumerGroup.prototype.reconnectIfNeeded = function () { | ||
} else { | ||
self.topicPayloads = []; | ||
// no partitions assigned | ||
@@ -447,2 +449,21 @@ callback(null, false); | ||
function (callback) { | ||
if (typeof self.options.onRebalance === 'function') { | ||
self.options.onRebalance(self.generationId != null && self.memberId != null, callback); | ||
return; | ||
} | ||
callback(null); | ||
}, | ||
function (callback) { | ||
if (self.options.autoCommit && self.generationId != null && self.memberId) { | ||
self.commit(true, function (error) { | ||
if (error) { | ||
return callback(error); | ||
} | ||
callback(null); | ||
}); | ||
return; | ||
} | ||
callback(null); | ||
}, | ||
function (callback) { | ||
if (self.client.coordinatorId) { | ||
@@ -449,0 +470,0 @@ return callback(null, null); |
@@ -9,3 +9,3 @@ 'use strict'; | ||
const _ = require('lodash'); | ||
const EventEmitter = require('events').EventEmitter; | ||
const EventEmitter = require('events'); | ||
const NUMER_OF_TIMES_TO_VERIFY = 4; | ||
@@ -12,0 +12,0 @@ const VERIFY_WAIT_TIME_MS = 1500; |
@@ -22,3 +22,3 @@ 'use strict'; | ||
partition: partition, | ||
offset: offset + 1, | ||
offset: offset, | ||
metadata: 'm' | ||
@@ -37,2 +37,3 @@ }); | ||
_.defaultsDeep(options || {}, DEFAULTS); | ||
const self = this; | ||
@@ -43,2 +44,21 @@ this.autoCommit = options.autoCommit; | ||
options.autoCommit = false; | ||
const originalOnRebalance = options.onRebalance; | ||
options.onRebalance = function (isAlreadyMember, callback) { | ||
const autoCommit = _.once(function (err) { | ||
if (err) { | ||
callback(err); | ||
} else { | ||
self.commit(null, true, callback); | ||
} | ||
}); | ||
if (typeof originalOnRebalance === 'function') { | ||
try { | ||
originalOnRebalance(isAlreadyMember, autoCommit); | ||
} catch (e) { | ||
autoCommit(e); | ||
} | ||
} else { | ||
autoCommit(); | ||
} | ||
}; | ||
@@ -79,3 +99,3 @@ this.consumerGroup = new ConsumerGroup(options, topics); | ||
if (message != null && message.offset !== -1) { | ||
_.set(this.commitQueue, [message.topic, message.partition], message.offset); | ||
_.set(this.commitQueue, [message.topic, message.partition], message.offset + 1); | ||
} | ||
@@ -82,0 +102,0 @@ |
@@ -5,3 +5,3 @@ 'use strict'; | ||
var _ = require('lodash'); | ||
var events = require('events'); | ||
var EventEmitter = require('events'); | ||
var uuid = require('uuid'); | ||
@@ -38,2 +38,3 @@ var async = require('async'); | ||
var HighLevelConsumer = function (client, topics, options) { | ||
EventEmitter.call(this); | ||
if (!topics) { | ||
@@ -67,3 +68,3 @@ throw new Error('Must have payloads'); | ||
}; | ||
util.inherits(HighLevelConsumer, events.EventEmitter); | ||
util.inherits(HighLevelConsumer, EventEmitter); | ||
@@ -70,0 +71,0 @@ HighLevelConsumer.prototype.buildPayloads = function (payloads) { |
@@ -5,2 +5,3 @@ 'use strict'; | ||
const logger = require('./logging')('kafka-node:KafkaClient'); | ||
const EventEmitter = require('events'); | ||
const async = require('async'); | ||
@@ -42,6 +43,8 @@ const retry = require('retry'); | ||
}, | ||
maxAsyncRequests: 10 | ||
maxAsyncRequests: 10, | ||
noAckBatchOptions: null | ||
}; | ||
const KafkaClient = function (options) { | ||
EventEmitter.call(this); // Intentionally not calling Client to avoid constructor logic | ||
this.options = _.defaultsDeep(options || {}, DEFAULTS); | ||
@@ -61,3 +64,3 @@ | ||
this.clientId = this.options.clientId || 'kafka-node-client'; | ||
this.noAckBatchOptions = this.noAckBatchOptions; | ||
this.noAckBatchOptions = this.options.noAckBatchOptions; | ||
this.brokers = {}; | ||
@@ -98,6 +101,9 @@ this.longpollingBrokers = {}; | ||
function parseHost (hostString) { | ||
const piece = hostString.split(':'); | ||
const ip = hostString.substring(0, hostString.lastIndexOf(':')); | ||
const port = hostString.substring(hostString.lastIndexOf(':') + 1); | ||
const isIpv6 = ip.match(/\[(.*)\]/); | ||
const host = isIpv6 ? isIpv6[1] : ip; | ||
return { | ||
host: piece[0], | ||
port: piece[1] | ||
host, | ||
port | ||
}; | ||
@@ -104,0 +110,0 @@ } |
@@ -5,5 +5,6 @@ 'use strict'; | ||
var async = require('async'); | ||
var events = require('events'); | ||
var EventEmitter = require('events'); | ||
var Offset = function (client) { | ||
function Offset (client) { | ||
EventEmitter.call(this); | ||
var self = this; | ||
@@ -22,4 +23,4 @@ this.client = client; | ||
}); | ||
}; | ||
util.inherits(Offset, events.EventEmitter); | ||
} | ||
util.inherits(Offset, EventEmitter); | ||
@@ -26,0 +27,0 @@ Offset.prototype.fetch = function (payloads, cb) { |
'use strict'; | ||
var util = require('util'); | ||
var _ = require('lodash'); | ||
const util = require('util'); | ||
const _ = require('lodash'); | ||
var Partitioner = function () {}; | ||
function Partitioner () { } | ||
var DefaultPartitioner = function () {}; | ||
function DefaultPartitioner () { | ||
Partitioner.call(this); | ||
} | ||
util.inherits(DefaultPartitioner, Partitioner); | ||
@@ -19,5 +21,6 @@ | ||
var CyclicPartitioner = function () { | ||
function CyclicPartitioner () { | ||
Partitioner.call(this); | ||
this.c = 0; | ||
}; | ||
} | ||
util.inherits(CyclicPartitioner, Partitioner); | ||
@@ -27,6 +30,8 @@ | ||
if (_.isEmpty(partitions)) return 0; | ||
return partitions[ this.c++ % partitions.length ]; | ||
return partitions[this.c++ % partitions.length]; | ||
}; | ||
var RandomPartitioner = function () {}; | ||
function RandomPartitioner () { | ||
Partitioner.call(this); | ||
} | ||
util.inherits(RandomPartitioner, Partitioner); | ||
@@ -38,3 +43,5 @@ | ||
var KeyedPartitioner = function () {}; | ||
function KeyedPartitioner () { | ||
Partitioner.call(this); | ||
} | ||
util.inherits(KeyedPartitioner, Partitioner); | ||
@@ -45,7 +52,9 @@ | ||
KeyedPartitioner.prototype.hashCode = function (string) { | ||
var hash = 0; | ||
var length = string.length; | ||
let hash = 0; | ||
if (string) { | ||
const length = string.toString().length; | ||
for (var i = 0; i < length; i++) { | ||
hash = ((hash * 31) + string.charCodeAt(i)) & 0x7fffffff; | ||
for (let i = 0; i < length; i++) { | ||
hash = ((hash * 31) + string.charCodeAt(i)) & 0x7fffffff; | ||
} | ||
} | ||
@@ -59,9 +68,10 @@ | ||
var index = this.hashCode(key) % partitions.length; | ||
const index = this.hashCode(key) % partitions.length; | ||
return partitions[index]; | ||
}; | ||
var CustomPartitioner = function (partitioner) { | ||
function CustomPartitioner (partitioner) { | ||
Partitioner.call(this); | ||
this.getPartition = partitioner; | ||
}; | ||
} | ||
util.inherits(CustomPartitioner, Partitioner); | ||
@@ -68,0 +78,0 @@ |
@@ -6,3 +6,3 @@ 'use strict'; | ||
var async = require('async'); | ||
var EventEmiter = require('events').EventEmitter; | ||
var EventEmitter = require('events'); | ||
var logger = require('./logging')('kafka-node:zookeeper'); | ||
@@ -19,2 +19,3 @@ | ||
var Zookeeper = function (connectionString, options) { | ||
EventEmitter.call(this); | ||
this.client = zookeeper.createClient(connectionString, options); | ||
@@ -32,3 +33,3 @@ | ||
util.inherits(Zookeeper, EventEmiter); | ||
util.inherits(Zookeeper, EventEmitter); | ||
@@ -35,0 +36,0 @@ Zookeeper.prototype.unregisterConsumer = function (groupId, consumerId, cb) { |
@@ -17,3 +17,3 @@ { | ||
"bugs": "https://github.com/SOHU-co/kafka-node/issues", | ||
"version": "2.4.1", | ||
"version": "2.5.0", | ||
"main": "kafka.js", | ||
@@ -20,0 +20,0 @@ "license": "MIT", |
@@ -544,3 +544,3 @@ Kafka-node | ||
## HighLevelConsumer | ||
⚠️ ***This consumer has been deprecated in the latest version of Kafka (0.10.1) and is likely to be removed in the future. Please use the ConsumerGroup instead.*** | ||
⚠️ ***This consumer has been deprecated and is likely to be removed in the future. Please use the ConsumerGroup instead.*** | ||
@@ -721,3 +721,6 @@ ### HighLevelConsumer(client, payloads, options) | ||
migrateHLC: false, // for details please see Migration section below | ||
migrateRolling: true | ||
migrateRolling: true, | ||
// Callback to allow consumers with autoCommit false a chance to commit before a rebalance finishes | ||
// isAlreadyMember will be false on the first connection, and true on rebalances triggered after that | ||
onRebalance: (isAlreadyMember, callback) => { callback(); } // or null | ||
}; | ||
@@ -873,2 +876,9 @@ | ||
### commit(message, force, callback) | ||
This method can be used to commit manually when `autoCommit` is set to `false`. | ||
* `message` the original message or an object with `{topic, partition, offset}` | ||
* `force` a commit even if there's a pending commit | ||
* `callback` (*optional*) | ||
### close(callback) | ||
@@ -919,2 +929,5 @@ Closes the `ConsumerGroup`. Calls `callback` when complete. If `autoCommit` is enabled calling close will also commit offsets consumed from the buffer. | ||
### commit(groupId, payloads, cb) | ||
> ⚠️**WARNING**: commits are made to zookeeper and is only compatible with `HighLevelConsumer` and will **NOT** with the new `ConsumerGroup` | ||
* `groupId`: consumer group | ||
@@ -945,2 +958,5 @@ * `payloads`: **Array**,array of `OffsetCommitRequest`, `OffsetCommitRequest` is a JSON object like: | ||
### fetchCommits(groupid, payloads, cb) | ||
> ⚠️**WARNING**: commits are from zookeeper and is only compatible with `HighLevelConsumer` and will **NOT** with the new `ConsumerGroup` | ||
Fetch the last committed offset in a topic of a specific consumer group | ||
@@ -947,0 +963,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
290626
6600
1296