Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

no-kafka

Package Overview
Dependencies
Maintainers
1
Versions
98
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

no-kafka - npm Package Compare versions

Comparing version 2.7.1 to 2.8.0

86

lib/client.js

@@ -19,2 +19,3 @@ 'use strict';

asyncCompression: false,
brokerRedirection: self.brokerNullRemap,
reconnectionDelay: {

@@ -80,14 +81,5 @@ min: 1000,

hostStr = hostStr.trim();
parsed = self.parseHostString(hostStr);
config = self.checkBrokerRedirect(parsed.host, parsed.port);
if (!/^([a-z]+:)?\/\//.test(hostStr)) {
hostStr = 'kafka://' + hostStr;
}
parsed = url.parse(hostStr);
config = {
host: parsed.hostname,
port: parseInt(parsed.port)
};
return config.host && config.port ? new Connection(config) : undefined;

@@ -117,2 +109,60 @@ });

Client.prototype.parseHostString = function (hostString) {
var hostStr = hostString.trim(), parsed;
// Prepend the protocol, if required
if (!/^([a-z]+:)?\/\//.test(hostStr)) {
hostStr = 'kafka://' + hostStr;
}
parsed = url.parse(hostStr);
return {
host: parsed.hostname,
port: parseInt(parsed.port)
};
};
Client.prototype.checkBrokerRedirect = function (host, port) {
var self = this, fullName, fullNameProtocol;
var redirect = self.options.brokerRedirection;
// No remapper
if (!redirect) {
return {
host: host,
port: port
};
}
// Use a function
if (typeof redirect === 'function') {
return redirect(host, port);
}
// Name, without protocol
fullName = host + ':' + port.toString();
if (redirect[fullName]) {
return this.parseHostString(redirect[fullName]);
}
// Name, with protocol
fullNameProtocol = 'kafka://' + host + ':' + port.toString();
if (redirect[fullNameProtocol]) {
return this.parseHostString(redirect[fullNameProtocol]);
}
return {
host: host,
port: port
};
};
Client.prototype.brokerNullRemap = function (host, port) {
// Default function.
return {
host: host,
port: port
};
};
Client.prototype.updateMetadata = function () {

@@ -136,8 +186,10 @@ var self = this, attempt = 1;

_.each(response.broker, function (broker) {
var remapped = self.checkBrokerRedirect(broker.host, broker.port);
var connection = _.find(oldConnections, function (c, i) {
return c.equal(broker.host, broker.port) && delete oldConnections[i];
return c.equal(remapped.host, remapped.port) && delete oldConnections[i];
});
self.brokerConnections[broker.nodeId] = connection || new Connection({
host: broker.host,
port: broker.port
host: remapped.host,
port: remapped.port
});

@@ -490,6 +542,4 @@ });

.then(function (host) {
return new Connection({
host: host.coordinatorHost,
port: host.coordinatorPort
});
var remapped = self.checkBrokerRedirect(host.coordinatorHost, host.coordinatorPort);
return new Connection(remapped);
})

@@ -496,0 +546,0 @@ .catch(function (err) {

@@ -9,3 +9,3 @@ {

},
"version": "2.7.1",
"version": "2.8.0",
"main": "./lib/index.js",

@@ -12,0 +12,0 @@ "keywords": [

@@ -450,6 +450,37 @@ [![Build Status][badge-travis]][travis]

__no-kafka__ will connect to the hosts specified in `connectionString` constructor option unless it is omitted. In this case it will use KAFKA_URL environment variable or fallback to default `kafka://127.0.0.1:9092`. For better availability always specify several initial brokers: `10.0.1.1:9092,10.0.1.2:9092,10.0.1.3:9092`. The `kafka://` prefix is optional.
### Initial Brokers
__no-kafka__ will connect to the hosts specified in `connectionString` constructor option unless it is omitted. In this case it will use KAFKA_URL environment variable or fallback to default `kafka://127.0.0.1:9092`. For better availability always specify several initial brokers: `10.0.1.1:9092,10.0.1.2:9092,10.0.1.3:9092`. The `/` prefix is optional.
### Disconnect / Timeout Handling
All network errors are handled by the library: producer will retry sending failed messages for configured amount of times, simple consumer and group consumer will try to reconnect to failed host, update metadata as needed as so on.
### Remapping Broker Addresses
Sometimes the advertised listener addresses for a Kafka cluster may be incorrect from the client,
such as when a Kafka farm is behind NAT or other network infrastructure. In this scenario it is
possible to pass a `brokerRedirection` option to the `Producer`, `SimpleConsumer` or `GroupConsumer`.
The value of the `brokerDirection` can be either:
- A function returning a tuple of host (string) and port (integer), such as:
brokerRedirection: function (host, port) {
return {
host: host + '.somesuffix.com', // Fully qualify
port: port + 100, // Port NAT
}
}
- A simple map of connection strings to new connection strings, such as:
brokerRedirection: {
'some-host:9092': 'actual-host:9092',
'kafka://another-host:9092': 'another-host:9093',
'third-host:9092': 'kafka://third-host:9000'
}
A common scenario for this kind of remapping is when a Kafka cluster exists within a Docker application, and the
internally advertised names needed for container to container communication do not correspond to the actual external
ports or addresses when connecting externally via other tools.
### Reconnection delay

@@ -456,0 +487,0 @@ In case of network error which prevents further operations __no-kafka__ will try to reconnect to Kafka brokers in a endless loop with the optionally progressive delay which can be configured with `reconnectionDelay` option.

@@ -58,1 +58,81 @@ 'use strict';

});
describe('brokerRedirection', function () {
it('Should execute a function passed for direction', function () {
var latched = false;
var producer = new Kafka.Producer({
brokerRedirection: function (host, port) {
latched = true;
return {
host: host,
port: port
};
}
});
return producer.init()
.then(function () {
latched.should.eql(true);
});
});
it('Should apply function remapping', function () {
var latched = false;
var producer = new Kafka.Producer({
connectionString: 'does-not-exist:9092',
brokerRedirection: function () {
latched = true;
return {
host: 'localhost',
port: 9092
};
}
});
// If the init is succesful, then we remapped the bad
// broker name.
return producer.init()
.then(function () {
latched.should.eql(true);
});
});
it('Should apply lookup remap (host:port)', function () {
var producer = new Kafka.Producer({
connectionString: 'does-not-exist:9092',
brokerRedirection: {
'does-not-exist:9092': 'localhost:9092'
}
});
// If the init is succesful, then we remapped the bad
// broker name.
return producer.init();
});
it('Should apply lookup remap (kafka://host:port)', function () {
var producer = new Kafka.Producer({
connectionString: 'does-not-exist:9092',
brokerRedirection: {
'kafka://does-not-exist:9092': 'localhost:9092'
}
});
// If the init is succesful, then we remapped the bad
// broker name.
return producer.init();
});
it('Should apply lookup remap prefixed with Kafka', function () {
var producer = new Kafka.Producer({
connectionString: 'does-not-exist:9092',
brokerRedirection: {
'kafka://does-not-exist:9092': 'kafka://localhost:9092'
}
});
// If the init is succesful, then we remapped the bad
// broker name.
return producer.init();
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc