kafka-node
Advanced tools
Comparing version 1.0.3 to 1.0.4
# kafka-node CHANGELOG | ||
## 2016-11-01, Version 1.0.4 | ||
- Fix issue where an exception is thrown in `client.brokerForLeader` when connection with broker is lost in ConsumerGroup it should retry instead [#498](https://github.com/SOHU-Co/kafka-node/pull/498) | ||
- Fix issue where invalid characters could be used in createTopics call [#495](https://github.com/SOHU-Co/kafka-node/pull/495) [#492](https://github.com/SOHU-Co/kafka-node/pull/492) | ||
## 2016-10-24, Version 1.0.3 | ||
@@ -4,0 +8,0 @@ - Fix issue in [Consumer Group](https://github.com/SOHU-Co/kafka-node#consumergroup) where using the migrator with no previous HLC offsets will set initial offsets to 0 instead of the offsets provided in "fromOfset" feature [#493](https://github.com/SOHU-Co/kafka-node/pull/493) |
@@ -23,2 +23,3 @@ 'use strict'; | ||
var validateConfig = require('./utils').validateConfig; | ||
var validateKafkaTopics = require('./utils').validateTopicNames; | ||
@@ -103,2 +104,3 @@ /** | ||
self.brokerMetadata = brokerMetadata; | ||
debug('brokersChanged', brokerMetadata); | ||
self.setupBrokerProfiles(brokerMetadata); | ||
@@ -360,4 +362,9 @@ self.refreshBrokers(); | ||
} | ||
try { | ||
validateKafkaTopics(topics); | ||
} catch (e) { | ||
if (isAsync) return cb(e); | ||
throw e; | ||
} | ||
var self = this; | ||
// first, load metadata to create topics | ||
@@ -618,2 +625,7 @@ this.loadMetadataForTopics(topics, function (err, resp) { | ||
var broker = _.find(this.brokerProfiles, {id: leader}); | ||
if (!broker) { | ||
return; | ||
} | ||
addr = broker.host + ':' + broker.port; | ||
@@ -620,0 +632,0 @@ |
'use strict'; | ||
var retry = require('retry'); | ||
var debug = require('debug')('kafka-node:ConsumerGroupRecovery'); | ||
var assert = require('assert'); | ||
const retry = require('retry'); | ||
const debug = require('debug')('kafka-node:ConsumerGroupRecovery'); | ||
const assert = require('assert'); | ||
var GroupCoordinatorNotAvailable = require('./errors/GroupCoordinatorNotAvailableError'); | ||
var NotCoordinatorForGroup = require('./errors/NotCoordinatorForGroupError'); | ||
var IllegalGeneration = require('./errors/IllegalGenerationError'); | ||
var GroupLoadInProgress = require('./errors/GroupLoadInProgressError'); | ||
var UnknownMemberId = require('./errors/UnknownMemberIdError'); | ||
var RebalanceInProgress = require('./errors/RebalanceInProgressError'); | ||
const GroupCoordinatorNotAvailable = require('./errors/GroupCoordinatorNotAvailableError'); | ||
const NotCoordinatorForGroup = require('./errors/NotCoordinatorForGroupError'); | ||
const IllegalGeneration = require('./errors/IllegalGenerationError'); | ||
const GroupLoadInProgress = require('./errors/GroupLoadInProgressError'); | ||
const UnknownMemberId = require('./errors/UnknownMemberIdError'); | ||
const RebalanceInProgress = require('./errors/RebalanceInProgressError'); | ||
const BrokerNotAvailableError = require('./errors').BrokerNotAvailableError; | ||
@@ -19,5 +20,5 @@ var recoverableErrors = [ | ||
{ | ||
errors: [NotCoordinatorForGroup], | ||
errors: [NotCoordinatorForGroup, BrokerNotAvailableError], | ||
handler: function () { | ||
this.client.coordinatorId = null; | ||
delete this.client.coordinatorId; | ||
} | ||
@@ -24,0 +25,0 @@ }, |
var assert = require('assert'); | ||
var InvalidConfigError = require('./errors/InvalidConfigError'); | ||
var legalChars = new RegExp('^[a-zA-Z0-9._-]*$'); | ||
const allowedTopicLength = 249; | ||
@@ -11,2 +12,22 @@ function validateConfig (property, value) { | ||
function validateTopicNames (topics) { | ||
// Rewriting same validations done by Apache Kafka team for topics | ||
// iterating over topics | ||
topics.some(function (topic) { | ||
if (topic.length <= 0) { | ||
throw new InvalidConfigError('topic name is illegal, cannot be empty'); | ||
} | ||
if (topic === '.' || topic === '..') { | ||
throw new InvalidConfigError('topic name cannot be . or ..'); | ||
} | ||
if (topic.length > allowedTopicLength) { | ||
throw new InvalidConfigError(`topic name is illegal, cannot be longer than ${allowedTopicLength} characters`); | ||
} | ||
if (!legalChars.test(topic)) { | ||
throw new InvalidConfigError(`topic name ${topic} is illegal, contains a character other than ASCII alphanumerics .,_ and -`); | ||
} | ||
}); | ||
return true; | ||
} | ||
function validateTopics (topics) { | ||
@@ -83,3 +104,4 @@ if (topics.some(function (topic) { | ||
groupPartitionsByTopic: groupPartitionsByTopic, | ||
createTopicPartitionList: createTopicPartitionList | ||
createTopicPartitionList: createTopicPartitionList, | ||
validateTopicNames: validateTopicNames | ||
}; |
@@ -12,3 +12,3 @@ { | ||
"bugs": "https://github.com/SOHU-co/kafka-node/issues", | ||
"version": "1.0.3", | ||
"version": "1.0.4", | ||
"main": "kafka.js", | ||
@@ -15,0 +15,0 @@ "license": "MIT", |
@@ -361,2 +361,12 @@ var host = process.env['KAFKA_TEST_HOST'] || ''; | ||
describe('#brokerForLeader', function () { | ||
it('should not throw exception when leader does not exist', function () { | ||
client.brokerProfiles = Object.create(null); | ||
should.doesNotThrow(function () { | ||
should(client.brokerForLeader(1001)).be.undefined; | ||
}); | ||
}); | ||
}); | ||
describe('#reconnectBroker', function () { | ||
@@ -363,0 +373,0 @@ var emptyFn = function () {}; |
'use strict'; | ||
var should = require('should'); | ||
var _ = require('lodash'); | ||
var sinon = require('sinon'); | ||
var ConsumerGroupRecovery = require('../lib/consumerGroupRecovery'); | ||
var GroupCoordinatorNotAvailable = require('../lib/errors/GroupCoordinatorNotAvailableError'); | ||
var GroupLoadInProgress = require('../lib/errors/GroupLoadInProgressError'); | ||
var EventEmitter = require('events').EventEmitter; | ||
const should = require('should'); | ||
const _ = require('lodash'); | ||
const sinon = require('sinon'); | ||
const ConsumerGroupRecovery = require('../lib/consumerGroupRecovery'); | ||
const GroupCoordinatorNotAvailable = require('../lib/errors/GroupCoordinatorNotAvailableError'); | ||
const GroupLoadInProgress = require('../lib/errors/GroupLoadInProgressError'); | ||
const BrokerNotAvailableError = require('../lib/errors').BrokerNotAvailableError; | ||
const EventEmitter = require('events'); | ||
describe('ConsumerGroupRecovery', function () { | ||
var consumerGroupRecovery, fakeClient; | ||
var consumerGroupRecovery, fakeConsumerGroup; | ||
beforeEach(function () { | ||
fakeClient = new EventEmitter(); | ||
Object.assign(fakeClient, { | ||
fakeConsumerGroup = new EventEmitter(); | ||
fakeConsumerGroup.client = new EventEmitter(); | ||
fakeConsumerGroup.scheduleReconnect = () => { throw new Error('should be stubbed!'); }; | ||
Object.assign(fakeConsumerGroup, { | ||
stopHeartbeats: sinon.stub(), | ||
options: { | ||
retries: 10, | ||
retryFactor: 1.8 | ||
retryFactor: 1.8, | ||
retryMinTimeout: 1000 | ||
} | ||
}); | ||
consumerGroupRecovery = new ConsumerGroupRecovery(fakeClient); | ||
consumerGroupRecovery = new ConsumerGroupRecovery(fakeConsumerGroup); | ||
}); | ||
@@ -30,3 +34,3 @@ | ||
fakeClient.once('error', function (error) { | ||
fakeConsumerGroup.once('error', function (error) { | ||
error.should.be.eql(testError); | ||
@@ -37,6 +41,28 @@ done(); | ||
consumerGroupRecovery.tryToRecoverFrom(testError, 'test'); | ||
sinon.assert.calledOnce(fakeClient.stopHeartbeats); | ||
fakeClient.ready.should.be.false; | ||
sinon.assert.calledOnce(fakeConsumerGroup.stopHeartbeats); | ||
fakeConsumerGroup.ready.should.be.false; | ||
consumerGroupRecovery.lastError.should.be.eql(testError); | ||
}); | ||
it('should try to recover from a BrokerNotAvailableError', function () { | ||
const brokerNotAvailableError = new BrokerNotAvailableError('test error'); | ||
fakeConsumerGroup.client.coordinatorId = 1234; | ||
fakeConsumerGroup.once('error', function (error) { | ||
error.should.not.be.eql(brokerNotAvailableError); | ||
}); | ||
sinon.stub(fakeConsumerGroup, 'scheduleReconnect'); | ||
consumerGroupRecovery.tryToRecoverFrom(brokerNotAvailableError, 'test'); | ||
sinon.assert.calledOnce(fakeConsumerGroup.stopHeartbeats); | ||
fakeConsumerGroup.ready.should.be.false; | ||
consumerGroupRecovery.lastError.should.be.eql(brokerNotAvailableError); | ||
sinon.assert.calledOnce(fakeConsumerGroup.scheduleReconnect); | ||
should(fakeConsumerGroup.client.coordinatorId).be.undefined; | ||
}); | ||
}); | ||
@@ -43,0 +69,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
349843
8453