Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafka-node

Package Overview
Dependencies
Maintainers
3
Versions
113
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-node - npm Package Compare versions

Comparing version 1.0.3 to 1.0.4

4

CHANGELOG.md
# kafka-node CHANGELOG
## 2016-11-01, Version 1.0.4
- Fix issue where an exception is thrown in `client.brokerForLeader` when connection with broker is lost in ConsumerGroup it should retry instead [#498](https://github.com/SOHU-Co/kafka-node/pull/498)
- Fix issue where invalid characters could be used in createTopics call [#495](https://github.com/SOHU-Co/kafka-node/pull/495) [#492](https://github.com/SOHU-Co/kafka-node/pull/492)
## 2016-10-24, Version 1.0.3

@@ -4,0 +8,0 @@ - Fix issue in [Consumer Group](https://github.com/SOHU-Co/kafka-node#consumergroup) where using the migrator with no previous HLC offsets will set initial offsets to 0 instead of the offsets provided in "fromOfset" feature [#493](https://github.com/SOHU-Co/kafka-node/pull/493)

@@ -23,2 +23,3 @@ 'use strict';

var validateConfig = require('./utils').validateConfig;
var validateKafkaTopics = require('./utils').validateTopicNames;

@@ -103,2 +104,3 @@ /**

self.brokerMetadata = brokerMetadata;
debug('brokersChanged', brokerMetadata);
self.setupBrokerProfiles(brokerMetadata);

@@ -360,4 +362,9 @@ self.refreshBrokers();

}
try {
validateKafkaTopics(topics);
} catch (e) {
if (isAsync) return cb(e);
throw e;
}
var self = this;
// first, load metadata to create topics

@@ -618,2 +625,7 @@ this.loadMetadataForTopics(topics, function (err, resp) {

var broker = _.find(this.brokerProfiles, {id: leader});
if (!broker) {
return;
}
addr = broker.host + ':' + broker.port;

@@ -620,0 +632,0 @@

23

lib/consumerGroupRecovery.js
'use strict';
var retry = require('retry');
var debug = require('debug')('kafka-node:ConsumerGroupRecovery');
var assert = require('assert');
const retry = require('retry');
const debug = require('debug')('kafka-node:ConsumerGroupRecovery');
const assert = require('assert');
var GroupCoordinatorNotAvailable = require('./errors/GroupCoordinatorNotAvailableError');
var NotCoordinatorForGroup = require('./errors/NotCoordinatorForGroupError');
var IllegalGeneration = require('./errors/IllegalGenerationError');
var GroupLoadInProgress = require('./errors/GroupLoadInProgressError');
var UnknownMemberId = require('./errors/UnknownMemberIdError');
var RebalanceInProgress = require('./errors/RebalanceInProgressError');
const GroupCoordinatorNotAvailable = require('./errors/GroupCoordinatorNotAvailableError');
const NotCoordinatorForGroup = require('./errors/NotCoordinatorForGroupError');
const IllegalGeneration = require('./errors/IllegalGenerationError');
const GroupLoadInProgress = require('./errors/GroupLoadInProgressError');
const UnknownMemberId = require('./errors/UnknownMemberIdError');
const RebalanceInProgress = require('./errors/RebalanceInProgressError');
const BrokerNotAvailableError = require('./errors').BrokerNotAvailableError;

@@ -19,5 +20,5 @@ var recoverableErrors = [

{
errors: [NotCoordinatorForGroup],
errors: [NotCoordinatorForGroup, BrokerNotAvailableError],
handler: function () {
this.client.coordinatorId = null;
delete this.client.coordinatorId;
}

@@ -24,0 +25,0 @@ },

var assert = require('assert');
var InvalidConfigError = require('./errors/InvalidConfigError');
var legalChars = new RegExp('^[a-zA-Z0-9._-]*$');
const allowedTopicLength = 249;

@@ -11,2 +12,22 @@ function validateConfig (property, value) {

function validateTopicNames (topics) {
// Rewriting same validations done by Apache Kafka team for topics
// iterating over topics
topics.some(function (topic) {
if (topic.length <= 0) {
throw new InvalidConfigError('topic name is illegal, cannot be empty');
}
if (topic === '.' || topic === '..') {
throw new InvalidConfigError('topic name cannot be . or ..');
}
if (topic.length > allowedTopicLength) {
throw new InvalidConfigError(`topic name is illegal, cannot be longer than ${allowedTopicLength} characters`);
}
if (!legalChars.test(topic)) {
throw new InvalidConfigError(`topic name ${topic} is illegal, contains a character other than ASCII alphanumerics .,_ and -`);
}
});
return true;
}
function validateTopics (topics) {

@@ -83,3 +104,4 @@ if (topics.some(function (topic) {

groupPartitionsByTopic: groupPartitionsByTopic,
createTopicPartitionList: createTopicPartitionList
createTopicPartitionList: createTopicPartitionList,
validateTopicNames: validateTopicNames
};

@@ -12,3 +12,3 @@ {

"bugs": "https://github.com/SOHU-co/kafka-node/issues",
"version": "1.0.3",
"version": "1.0.4",
"main": "kafka.js",

@@ -15,0 +15,0 @@ "license": "MIT",

@@ -361,2 +361,12 @@ var host = process.env['KAFKA_TEST_HOST'] || '';

describe('#brokerForLeader', function () {
it('should not throw exception when leader does not exist', function () {
client.brokerProfiles = Object.create(null);
should.doesNotThrow(function () {
should(client.brokerForLeader(1001)).be.undefined;
});
});
});
describe('#reconnectBroker', function () {

@@ -363,0 +373,0 @@ var emptyFn = function () {};

'use strict';
var should = require('should');
var _ = require('lodash');
var sinon = require('sinon');
var ConsumerGroupRecovery = require('../lib/consumerGroupRecovery');
var GroupCoordinatorNotAvailable = require('../lib/errors/GroupCoordinatorNotAvailableError');
var GroupLoadInProgress = require('../lib/errors/GroupLoadInProgressError');
var EventEmitter = require('events').EventEmitter;
const should = require('should');
const _ = require('lodash');
const sinon = require('sinon');
const ConsumerGroupRecovery = require('../lib/consumerGroupRecovery');
const GroupCoordinatorNotAvailable = require('../lib/errors/GroupCoordinatorNotAvailableError');
const GroupLoadInProgress = require('../lib/errors/GroupLoadInProgressError');
const BrokerNotAvailableError = require('../lib/errors').BrokerNotAvailableError;
const EventEmitter = require('events');
describe('ConsumerGroupRecovery', function () {
var consumerGroupRecovery, fakeClient;
var consumerGroupRecovery, fakeConsumerGroup;
beforeEach(function () {
fakeClient = new EventEmitter();
Object.assign(fakeClient, {
fakeConsumerGroup = new EventEmitter();
fakeConsumerGroup.client = new EventEmitter();
fakeConsumerGroup.scheduleReconnect = () => { throw new Error('should be stubbed!'); };
Object.assign(fakeConsumerGroup, {
stopHeartbeats: sinon.stub(),
options: {
retries: 10,
retryFactor: 1.8
retryFactor: 1.8,
retryMinTimeout: 1000
}
});
consumerGroupRecovery = new ConsumerGroupRecovery(fakeClient);
consumerGroupRecovery = new ConsumerGroupRecovery(fakeConsumerGroup);
});

@@ -30,3 +34,3 @@

fakeClient.once('error', function (error) {
fakeConsumerGroup.once('error', function (error) {
error.should.be.eql(testError);

@@ -37,6 +41,28 @@ done();

consumerGroupRecovery.tryToRecoverFrom(testError, 'test');
sinon.assert.calledOnce(fakeClient.stopHeartbeats);
fakeClient.ready.should.be.false;
sinon.assert.calledOnce(fakeConsumerGroup.stopHeartbeats);
fakeConsumerGroup.ready.should.be.false;
consumerGroupRecovery.lastError.should.be.eql(testError);
});
it('should try to recover from a BrokerNotAvailableError', function () {
const brokerNotAvailableError = new BrokerNotAvailableError('test error');
fakeConsumerGroup.client.coordinatorId = 1234;
fakeConsumerGroup.once('error', function (error) {
error.should.not.be.eql(brokerNotAvailableError);
});
sinon.stub(fakeConsumerGroup, 'scheduleReconnect');
consumerGroupRecovery.tryToRecoverFrom(brokerNotAvailableError, 'test');
sinon.assert.calledOnce(fakeConsumerGroup.stopHeartbeats);
fakeConsumerGroup.ready.should.be.false;
consumerGroupRecovery.lastError.should.be.eql(brokerNotAvailableError);
sinon.assert.calledOnce(fakeConsumerGroup.scheduleReconnect);
should(fakeConsumerGroup.client.coordinatorId).be.undefined;
});
});

@@ -43,0 +69,0 @@

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc