Comparing version 2.7.1 to 2.8.0
@@ -19,2 +19,3 @@ 'use strict'; | ||
asyncCompression: false, | ||
brokerRedirection: self.brokerNullRemap, | ||
reconnectionDelay: { | ||
@@ -80,14 +81,5 @@ min: 1000, | ||
hostStr = hostStr.trim(); | ||
parsed = self.parseHostString(hostStr); | ||
config = self.checkBrokerRedirect(parsed.host, parsed.port); | ||
if (!/^([a-z]+:)?\/\//.test(hostStr)) { | ||
hostStr = 'kafka://' + hostStr; | ||
} | ||
parsed = url.parse(hostStr); | ||
config = { | ||
host: parsed.hostname, | ||
port: parseInt(parsed.port) | ||
}; | ||
return config.host && config.port ? new Connection(config) : undefined; | ||
@@ -117,2 +109,60 @@ }); | ||
Client.prototype.parseHostString = function (hostString) { | ||
var hostStr = hostString.trim(), parsed; | ||
// Prepend the protocol, if required | ||
if (!/^([a-z]+:)?\/\//.test(hostStr)) { | ||
hostStr = 'kafka://' + hostStr; | ||
} | ||
parsed = url.parse(hostStr); | ||
return { | ||
host: parsed.hostname, | ||
port: parseInt(parsed.port) | ||
}; | ||
}; | ||
Client.prototype.checkBrokerRedirect = function (host, port) { | ||
var self = this, fullName, fullNameProtocol; | ||
var redirect = self.options.brokerRedirection; | ||
// No remapper | ||
if (!redirect) { | ||
return { | ||
host: host, | ||
port: port | ||
}; | ||
} | ||
// Use a function | ||
if (typeof redirect === 'function') { | ||
return redirect(host, port); | ||
} | ||
// Name, without protocol | ||
fullName = host + ':' + port.toString(); | ||
if (redirect[fullName]) { | ||
return this.parseHostString(redirect[fullName]); | ||
} | ||
// Name, with protocol | ||
fullNameProtocol = 'kafka://' + host + ':' + port.toString(); | ||
if (redirect[fullNameProtocol]) { | ||
return this.parseHostString(redirect[fullNameProtocol]); | ||
} | ||
return { | ||
host: host, | ||
port: port | ||
}; | ||
}; | ||
Client.prototype.brokerNullRemap = function (host, port) { | ||
// Default function. | ||
return { | ||
host: host, | ||
port: port | ||
}; | ||
}; | ||
Client.prototype.updateMetadata = function () { | ||
@@ -136,8 +186,10 @@ var self = this, attempt = 1; | ||
_.each(response.broker, function (broker) { | ||
var remapped = self.checkBrokerRedirect(broker.host, broker.port); | ||
var connection = _.find(oldConnections, function (c, i) { | ||
return c.equal(broker.host, broker.port) && delete oldConnections[i]; | ||
return c.equal(remapped.host, remapped.port) && delete oldConnections[i]; | ||
}); | ||
self.brokerConnections[broker.nodeId] = connection || new Connection({ | ||
host: broker.host, | ||
port: broker.port | ||
host: remapped.host, | ||
port: remapped.port | ||
}); | ||
@@ -490,6 +542,4 @@ }); | ||
.then(function (host) { | ||
return new Connection({ | ||
host: host.coordinatorHost, | ||
port: host.coordinatorPort | ||
}); | ||
var remapped = self.checkBrokerRedirect(host.coordinatorHost, host.coordinatorPort); | ||
return new Connection(remapped); | ||
}) | ||
@@ -496,0 +546,0 @@ .catch(function (err) { |
@@ -9,3 +9,3 @@ { | ||
}, | ||
"version": "2.7.1", | ||
"version": "2.8.0", | ||
"main": "./lib/index.js", | ||
@@ -12,0 +12,0 @@ "keywords": [ |
@@ -450,6 +450,37 @@ [![Build Status][badge-travis]][travis] | ||
__no-kafka__ will connect to the hosts specified in `connectionString` constructor option unless it is omitted. In this case it will use KAFKA_URL environment variable or fallback to default `kafka://127.0.0.1:9092`. For better availability always specify several initial brokers: `10.0.1.1:9092,10.0.1.2:9092,10.0.1.3:9092`. The `kafka://` prefix is optional. | ||
### Initial Brokers | ||
__no-kafka__ will connect to the hosts specified in `connectionString` constructor option unless it is omitted. In this case it will use KAFKA_URL environment variable or fallback to default `kafka://127.0.0.1:9092`. For better availability always specify several initial brokers: `10.0.1.1:9092,10.0.1.2:9092,10.0.1.3:9092`. The `/` prefix is optional. | ||
### Disconnect / Timeout Handling | ||
All network errors are handled by the library: producer will retry sending failed messages for configured amount of times, simple consumer and group consumer will try to reconnect to failed host, update metadata as needed as so on. | ||
### Remapping Broker Addresses | ||
Sometimes the advertised listener addresses for a Kafka cluster may be incorrect from the client, | ||
such as when a Kafka farm is behind NAT or other network infrastructure. In this scenario it is | ||
possible to pass a `brokerRedirection` option to the `Producer`, `SimpleConsumer` or `GroupConsumer`. | ||
The value of the `brokerDirection` can be either: | ||
- A function returning a tuple of host (string) and port (integer), such as: | ||
brokerRedirection: function (host, port) { | ||
return { | ||
host: host + '.somesuffix.com', // Fully qualify | ||
port: port + 100, // Port NAT | ||
} | ||
} | ||
- A simple map of connection strings to new connection strings, such as: | ||
brokerRedirection: { | ||
'some-host:9092': 'actual-host:9092', | ||
'kafka://another-host:9092': 'another-host:9093', | ||
'third-host:9092': 'kafka://third-host:9000' | ||
} | ||
A common scenario for this kind of remapping is when a Kafka cluster exists within a Docker application, and the | ||
internally advertised names needed for container to container communication do not correspond to the actual external | ||
ports or addresses when connecting externally via other tools. | ||
### Reconnection delay | ||
@@ -456,0 +487,0 @@ In case of network error which prevents further operations __no-kafka__ will try to reconnect to Kafka brokers in a endless loop with the optionally progressive delay which can be configured with `reconnectionDelay` option. |
@@ -58,1 +58,81 @@ 'use strict'; | ||
}); | ||
describe('brokerRedirection', function () { | ||
it('Should execute a function passed for direction', function () { | ||
var latched = false; | ||
var producer = new Kafka.Producer({ | ||
brokerRedirection: function (host, port) { | ||
latched = true; | ||
return { | ||
host: host, | ||
port: port | ||
}; | ||
} | ||
}); | ||
return producer.init() | ||
.then(function () { | ||
latched.should.eql(true); | ||
}); | ||
}); | ||
it('Should apply function remapping', function () { | ||
var latched = false; | ||
var producer = new Kafka.Producer({ | ||
connectionString: 'does-not-exist:9092', | ||
brokerRedirection: function () { | ||
latched = true; | ||
return { | ||
host: 'localhost', | ||
port: 9092 | ||
}; | ||
} | ||
}); | ||
// If the init is succesful, then we remapped the bad | ||
// broker name. | ||
return producer.init() | ||
.then(function () { | ||
latched.should.eql(true); | ||
}); | ||
}); | ||
it('Should apply lookup remap (host:port)', function () { | ||
var producer = new Kafka.Producer({ | ||
connectionString: 'does-not-exist:9092', | ||
brokerRedirection: { | ||
'does-not-exist:9092': 'localhost:9092' | ||
} | ||
}); | ||
// If the init is succesful, then we remapped the bad | ||
// broker name. | ||
return producer.init(); | ||
}); | ||
it('Should apply lookup remap (kafka://host:port)', function () { | ||
var producer = new Kafka.Producer({ | ||
connectionString: 'does-not-exist:9092', | ||
brokerRedirection: { | ||
'kafka://does-not-exist:9092': 'localhost:9092' | ||
} | ||
}); | ||
// If the init is succesful, then we remapped the bad | ||
// broker name. | ||
return producer.init(); | ||
}); | ||
it('Should apply lookup remap prefixed with Kafka', function () { | ||
var producer = new Kafka.Producer({ | ||
connectionString: 'does-not-exist:9092', | ||
brokerRedirection: { | ||
'kafka://does-not-exist:9092': 'kafka://localhost:9092' | ||
} | ||
}); | ||
// If the init is succesful, then we remapped the bad | ||
// broker name. | ||
return producer.init(); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
248896
4485
568