kafka-node
Advanced tools
Comparing version 1.0.4 to 1.0.5
# kafka-node CHANGELOG | ||
## 2016-11-03, Version 1.0.5 | ||
- Update doc added how to list all topics [#503](https://github.com/SOHU-Co/kafka-node/pull/503) | ||
- Fix uncaught exceptions that can occur when using ConsumerGroup [#505](https://github.com/SOHU-Co/kafka-node/pull/505) | ||
## 2016-11-01, Version 1.0.4 | ||
@@ -4,0 +8,0 @@ - Fix issue where an exception is thrown in `client.brokerForLeader` when connection with broker is lost in ConsumerGroup it should retry instead [#498](https://github.com/SOHU-Co/kafka-node/pull/498) |
@@ -285,3 +285,3 @@ 'use strict'; | ||
if (!broker || !broker.socket || broker.socket.error) { | ||
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) { | ||
return cb(new errors.BrokerNotAvailableError('Broker not available')); | ||
@@ -346,3 +346,3 @@ } | ||
if (!broker || !broker.socket || broker.socket.error) { | ||
if (!broker || !broker.socket || broker.socket.error || broker.socket.destroyed) { | ||
return cb(new errors.BrokerNotAvailableError('Broker not available')); | ||
@@ -539,3 +539,3 @@ } | ||
var broker = this.brokerForLeader(leader, longpolling); | ||
if (!broker || !broker.socket || broker.socket.error || broker.socket.closing) { | ||
if (!broker || !broker.socket || broker.socket.error || broker.socket.closing || broker.socket.destroyed) { | ||
return cb(new errors.BrokerNotAvailableError('Could not find the leader'), payloads[leader]); | ||
@@ -619,3 +619,2 @@ } | ||
} else { | ||
this.emit('error', new errors.BrokerNotAvailableError('Could not find a broker')); | ||
return; | ||
@@ -622,0 +621,0 @@ } |
@@ -5,2 +5,3 @@ 'use strict'; | ||
const util = require('util'); | ||
const EventEmitter = require('events'); | ||
const highLevelConsumer = require('./highLevelConsumer'); | ||
@@ -50,2 +51,3 @@ const Client = require('./client'); | ||
function ConsumerGroup (memberOptions, topics) { | ||
EventEmitter.call(this); | ||
const self = this; | ||
@@ -294,2 +296,4 @@ this.options = _.defaults((memberOptions || {}), DEFAULTS); | ||
this.offset = new Offset(this.client); | ||
// we can ignore this since we are already forwarding error event emitted from client | ||
this.offset.on('error', _.noop); | ||
return this.offset; | ||
@@ -296,0 +300,0 @@ }; |
@@ -12,3 +12,3 @@ { | ||
"bugs": "https://github.com/SOHU-co/kafka-node/issues", | ||
"version": "1.0.4", | ||
"version": "1.0.5", | ||
"main": "kafka.js", | ||
@@ -15,0 +15,0 @@ "license": "MIT", |
@@ -31,2 +31,3 @@ Kafka-node | ||
- [How do I debug an issue?](#how-do-i-debug-an-issue) | ||
- [How do I get a list of all topics?](#how-do-i-get-a-list-of-all-topics) | ||
- [For a new consumer how do I start consuming from the latest message in a partition?](#for-a-new-consumer-how-do-i-start-consuming-from-the-latest-message-in-a-partition) | ||
@@ -843,5 +844,18 @@ - [FailedToRebalanceConsumerError: Exception: NODE_EXISTS[-110]](#failedtorebalanceconsumererror-exception-node_exists-110) | ||
## How do I get a list of all topics? | ||
Call `client.loadMetadataForTopics` with a blank topic array to get the entire list of available topics (and available brokers). | ||
```js | ||
client.loadMetadataForTopics([], function (error, results) { | ||
console.log('%j', _.get(results, '1.metadata')); | ||
}); | ||
``` | ||
## For a new consumer how do I start consuming from the latest message in a partition? | ||
If you are using the new `ConsumerGroup` simply set `'latest'` to `fromOffset` option. | ||
Otherwise: | ||
1. Call `offset.fetchLatestOffsets` to get fetch the latest offset | ||
@@ -848,0 +862,0 @@ 2. Consume from returned offset |
@@ -227,2 +227,6 @@ 'use strict'; | ||
afterEach(function () { | ||
client.removeAllListeners(); | ||
}); | ||
describe('events', function () { | ||
@@ -229,0 +233,0 @@ it('should emit message when get new message', function (done) { |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
350823
8459
940