Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

no-kafka

Package Overview
Dependencies
Maintainers
1
Versions
98
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

no-kafka - npm Package Compare versions

Comparing version 0.2.1 to 0.2.2

4

group_consumer.js

@@ -36,3 +36,3 @@ "use strict";

return consumer.init(strategies[0])
.delay(3000)
/*.delay(3000)
.then(function () {

@@ -43,3 +43,3 @@ return consumer.end();

console.log('ended');
})
})*/
.catch(function (err) {

@@ -46,0 +46,0 @@ Kafka.error(err);

@@ -9,2 +9,3 @@ "use strict";

var Kafka = require('./index');
var errors = require('./errors');

@@ -80,8 +81,7 @@ function BaseConsumer (options){

} else if (/UnknownTopicOrPartition|NotLeaderForPartition|LeaderNotAvailable/.test(p.error.code)) {
Kafka.warn('Updating metadata because of', p.error.code, 'error for', p.topic + ':' + p.partition);
return self.client.updateMetadata().then(function () {
return self.subscribe(p.topic, p.partition, { offset: s.offset }).catch(function (err) {
Kafka.warn('Failed to re-subscribe to', p.topic + ':' + p.partition, 'because of', err.code, 'error');
});
});
// Kafka.log('Received', p.error.code, 'error for', p.topic, p.partition);
return self._updateSubscription(p.topic, p.partition);
} else if (p.error instanceof errors.NoKafkaConnectionError ) {
// Kafka.log('Received connection error for', p.topic, p.partition);
return self._updateSubscription(p.topic, p.partition);
} else {

@@ -115,2 +115,16 @@ Kafka.error(p.topic + ':' + p.partition, p.error);

BaseConsumer.prototype._updateSubscription = function(topic, partition) {
var self = this;
return self.client.updateMetadata().then(function () {
var s = self.subscriptions[topic + ':' + partition];
return self.subscribe(topic, partition, s.offset ? { offset: s.offset } : s.options)
.catch({code: 'LeaderNotAvailable'}, function () {}) // ignore and try again on next fetch
.catch(function (err) {
Kafka.error('Failed to re-subscribe to', topic + ':' + partition, err);
});
});
};
BaseConsumer.prototype._offset = function(leader, topic, partition, time) {

@@ -170,2 +184,3 @@ var self = this, request = {};

function _subscribe (leader, _offset) {
// Kafka.log('Subscribed to', topic, partition, 'leader:', leader);
self.subscriptions[topic + ':' + partition] = {

@@ -190,2 +205,11 @@ topic: topic,

})
.catch({code: 'LeaderNotAvailable'}, function () {
// these subscriptions will be retried on each _fetch()
self.subscriptions[topic + ':' + partition] = {
topic: topic,
partition: partition,
options: options,
leader: -1
};
})
.tap(function () {

@@ -192,0 +216,0 @@ process.nextTick(function () {

@@ -195,4 +195,23 @@ "use strict";

function _fakeErrorResponse (topics, error) {
return _.map(topics, function (t) {
return {
topicName: t.topicName,
partitions: _.map(t.partitions, function (p) {
return {
partition: p.partition,
error: error
};
})
};
});
}
return self._metadata().then(function () {
return Promise.all(_.map(requests, function (topics, leader) {
// fake LeaderNotAvailable for all topics with no leader
if(leader === -1 || !self.brokerConnections[leader]){
return _fakeErrorResponse(topics, errors.byName('LeaderNotAvailable'));
}
var buffer = self.encoder.reset().FetchRequest({

@@ -208,2 +227,5 @@ correlationId: self.correlationId++,

return protocol.read(responseBuffer).FetchResponse().result.topics;
})
.catch(errors.NoKafkaConnectionError, function (err) {
return _fakeErrorResponse(topics, err);
});

@@ -210,0 +232,0 @@ }))

"use strict";
var net = require('net');
var Promise = require('bluebird');
var net = require('net');
var Promise = require('bluebird');
var NoKafkaConnectionError = require('./errors').NoKafkaConnectionError;

@@ -40,3 +41,3 @@ function Connection(options) {

setTimeout(function() {
reject(new Error('Connection timeout to ' + self.host + ':' + self.port));
reject(new NoKafkaConnectionError(self, 'Connection timeout to ' + self.host + ':' + self.port));
}, timeout || 3000);

@@ -51,3 +52,3 @@ }),

self.socket.on('end', function () {
self._disconnect(new Error('Kafka server [' + self.host + ':' + self.port + '] has closed connection'));
self._disconnect(new NoKafkaConnectionError(self, 'Kafka server [' + self.host + ':' + self.port + '] has closed connection'));
});

@@ -132,3 +133,3 @@ self.socket.on('error', self._disconnect.bind(self));

} else {
return Promise.reject(new Error('Not connected'));
return Promise.reject(new NoKafkaConnectionError(self, 'Not connected'));
}

@@ -135,0 +136,0 @@ }

@@ -89,1 +89,33 @@ "use strict";

};
exports.KafkaError = KafkaError;
var NoKafkaConnectionError = function (connection, message) {
Error.call(this);
// Error.captureStackTrace(this, this.constructor);
this.name = 'NoKafkaConnectionError';
this.host = connection.host;
this.port = connection.port;
this.message = message || 'Error';
};
NoKafkaConnectionError.prototype = Object.create(Error.prototype);
NoKafkaConnectionError.prototype.constructor = NoKafkaConnectionError;
NoKafkaConnectionError.prototype.toJSON = function () {
return {
name: this.name,
host: this.host,
port: this.port,
message: this.message
};
};
NoKafkaConnectionError.prototype.toString = function () {
return this.name + ' (' + this.host + ':' + this.port + '): ' + this.message;
};
exports.NoKafkaConnectionError = NoKafkaConnectionError;

@@ -9,3 +9,3 @@ {

},
"version": "0.2.1",
"version": "0.2.2",
"main": "./lib/index.js",

@@ -12,0 +12,0 @@ "keywords": ["kafka"],

@@ -7,3 +7,6 @@ "use strict";

var kafka = require('./lib/index');
var producer = new kafka.Producer({ requiredAcks: 1});
var producer = new kafka.Producer({
requiredAcks: 1,
connectionString: '127.0.0.1:9092,127.0.0.1:9093',
});

@@ -10,0 +13,0 @@ return producer.init()

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc