Comparing version 0.2.1 to 0.2.2
@@ -36,3 +36,3 @@ "use strict"; | ||
return consumer.init(strategies[0]) | ||
.delay(3000) | ||
/*.delay(3000) | ||
.then(function () { | ||
@@ -43,3 +43,3 @@ return consumer.end(); | ||
console.log('ended'); | ||
}) | ||
})*/ | ||
.catch(function (err) { | ||
@@ -46,0 +46,0 @@ Kafka.error(err); |
@@ -9,2 +9,3 @@ "use strict"; | ||
var Kafka = require('./index'); | ||
var errors = require('./errors'); | ||
@@ -80,8 +81,7 @@ function BaseConsumer (options){ | ||
} else if (/UnknownTopicOrPartition|NotLeaderForPartition|LeaderNotAvailable/.test(p.error.code)) { | ||
Kafka.warn('Updating metadata because of', p.error.code, 'error for', p.topic + ':' + p.partition); | ||
return self.client.updateMetadata().then(function () { | ||
return self.subscribe(p.topic, p.partition, { offset: s.offset }).catch(function (err) { | ||
Kafka.warn('Failed to re-subscribe to', p.topic + ':' + p.partition, 'because of', err.code, 'error'); | ||
}); | ||
}); | ||
// Kafka.log('Received', p.error.code, 'error for', p.topic, p.partition); | ||
return self._updateSubscription(p.topic, p.partition); | ||
} else if (p.error instanceof errors.NoKafkaConnectionError ) { | ||
// Kafka.log('Received connection error for', p.topic, p.partition); | ||
return self._updateSubscription(p.topic, p.partition); | ||
} else { | ||
@@ -115,2 +115,16 @@ Kafka.error(p.topic + ':' + p.partition, p.error); | ||
BaseConsumer.prototype._updateSubscription = function(topic, partition) { | ||
var self = this; | ||
return self.client.updateMetadata().then(function () { | ||
var s = self.subscriptions[topic + ':' + partition]; | ||
return self.subscribe(topic, partition, s.offset ? { offset: s.offset } : s.options) | ||
.catch({code: 'LeaderNotAvailable'}, function () {}) // ignore and try again on next fetch | ||
.catch(function (err) { | ||
Kafka.error('Failed to re-subscribe to', topic + ':' + partition, err); | ||
}); | ||
}); | ||
}; | ||
BaseConsumer.prototype._offset = function(leader, topic, partition, time) { | ||
@@ -170,2 +184,3 @@ var self = this, request = {}; | ||
function _subscribe (leader, _offset) { | ||
// Kafka.log('Subscribed to', topic, partition, 'leader:', leader); | ||
self.subscriptions[topic + ':' + partition] = { | ||
@@ -190,2 +205,11 @@ topic: topic, | ||
}) | ||
.catch({code: 'LeaderNotAvailable'}, function () { | ||
// these subscriptions will be retried on each _fetch() | ||
self.subscriptions[topic + ':' + partition] = { | ||
topic: topic, | ||
partition: partition, | ||
options: options, | ||
leader: -1 | ||
}; | ||
}) | ||
.tap(function () { | ||
@@ -192,0 +216,0 @@ process.nextTick(function () { |
@@ -195,4 +195,23 @@ "use strict"; | ||
function _fakeErrorResponse (topics, error) { | ||
return _.map(topics, function (t) { | ||
return { | ||
topicName: t.topicName, | ||
partitions: _.map(t.partitions, function (p) { | ||
return { | ||
partition: p.partition, | ||
error: error | ||
}; | ||
}) | ||
}; | ||
}); | ||
} | ||
return self._metadata().then(function () { | ||
return Promise.all(_.map(requests, function (topics, leader) { | ||
// fake LeaderNotAvailable for all topics with no leader | ||
if(leader === -1 || !self.brokerConnections[leader]){ | ||
return _fakeErrorResponse(topics, errors.byName('LeaderNotAvailable')); | ||
} | ||
var buffer = self.encoder.reset().FetchRequest({ | ||
@@ -208,2 +227,5 @@ correlationId: self.correlationId++, | ||
return protocol.read(responseBuffer).FetchResponse().result.topics; | ||
}) | ||
.catch(errors.NoKafkaConnectionError, function (err) { | ||
return _fakeErrorResponse(topics, err); | ||
}); | ||
@@ -210,0 +232,0 @@ })) |
"use strict"; | ||
var net = require('net'); | ||
var Promise = require('bluebird'); | ||
var net = require('net'); | ||
var Promise = require('bluebird'); | ||
var NoKafkaConnectionError = require('./errors').NoKafkaConnectionError; | ||
@@ -40,3 +41,3 @@ function Connection(options) { | ||
setTimeout(function() { | ||
reject(new Error('Connection timeout to ' + self.host + ':' + self.port)); | ||
reject(new NoKafkaConnectionError(self, 'Connection timeout to ' + self.host + ':' + self.port)); | ||
}, timeout || 3000); | ||
@@ -51,3 +52,3 @@ }), | ||
self.socket.on('end', function () { | ||
self._disconnect(new Error('Kafka server [' + self.host + ':' + self.port + '] has closed connection')); | ||
self._disconnect(new NoKafkaConnectionError(self, 'Kafka server [' + self.host + ':' + self.port + '] has closed connection')); | ||
}); | ||
@@ -132,3 +133,3 @@ self.socket.on('error', self._disconnect.bind(self)); | ||
} else { | ||
return Promise.reject(new Error('Not connected')); | ||
return Promise.reject(new NoKafkaConnectionError(self, 'Not connected')); | ||
} | ||
@@ -135,0 +136,0 @@ } |
@@ -89,1 +89,33 @@ "use strict"; | ||
}; | ||
exports.KafkaError = KafkaError; | ||
var NoKafkaConnectionError = function (connection, message) { | ||
Error.call(this); | ||
// Error.captureStackTrace(this, this.constructor); | ||
this.name = 'NoKafkaConnectionError'; | ||
this.host = connection.host; | ||
this.port = connection.port; | ||
this.message = message || 'Error'; | ||
}; | ||
NoKafkaConnectionError.prototype = Object.create(Error.prototype); | ||
NoKafkaConnectionError.prototype.constructor = NoKafkaConnectionError; | ||
NoKafkaConnectionError.prototype.toJSON = function () { | ||
return { | ||
name: this.name, | ||
host: this.host, | ||
port: this.port, | ||
message: this.message | ||
}; | ||
}; | ||
NoKafkaConnectionError.prototype.toString = function () { | ||
return this.name + ' (' + this.host + ':' + this.port + '): ' + this.message; | ||
}; | ||
exports.NoKafkaConnectionError = NoKafkaConnectionError; | ||
@@ -9,3 +9,3 @@ { | ||
}, | ||
"version": "0.2.1", | ||
"version": "0.2.2", | ||
"main": "./lib/index.js", | ||
@@ -12,0 +12,0 @@ "keywords": ["kafka"], |
@@ -7,3 +7,6 @@ "use strict"; | ||
var kafka = require('./lib/index'); | ||
var producer = new kafka.Producer({ requiredAcks: 1}); | ||
var producer = new kafka.Producer({ | ||
requiredAcks: 1, | ||
connectionString: '127.0.0.1:9092,127.0.0.1:9093', | ||
}); | ||
@@ -10,0 +13,0 @@ return producer.init() |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
162394
2980