Comparing version 2.5.5 to 2.5.6
@@ -98,2 +98,7 @@ 'use strict'; | ||
}); | ||
} else if (err.code === 'MessageSizeTooLarge') { | ||
self.client.warn('Received MessageSizeTooLarge error for', topic + ':' + partition, | ||
'which means maxBytes option value (' + s.maxBytes + ') is too small to fit the message at offset', s.offset); | ||
s.offset += 1; | ||
return null; | ||
/* istanbul ignore next */ | ||
@@ -100,0 +105,0 @@ } else if (/UnknownTopicOrPartition|NotLeaderForPartition|LeaderNotAvailable/.test(err.code)) { |
@@ -197,3 +197,3 @@ 'use strict'; | ||
return _.dropRightWhile(this.context.items, { _partial: true }); // drop partially read messages | ||
return this.context.items; | ||
}, | ||
@@ -200,0 +200,0 @@ write: function (items) { |
@@ -5,2 +5,4 @@ 'use strict'; | ||
var globals = require('./globals'); | ||
var errors = require('../errors'); | ||
var _ = require('lodash'); | ||
@@ -62,2 +64,10 @@ // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol | ||
.MessageSet('messageSet', this.context.messageSetSize); | ||
if (this.context.messageSet.length === 1 && this.context.messageSet[0]._partial === true) { | ||
this.context.messageSetSize = 0; | ||
this.context.messageSet = []; | ||
this.context.error = errors.byName('MessageSizeTooLarge'); | ||
} else { | ||
this.context.messageSet = _.dropRightWhile(this.context.messageSet, { _partial: true }); // drop partially read messages | ||
} | ||
} | ||
@@ -64,0 +74,0 @@ }); |
@@ -9,3 +9,3 @@ { | ||
}, | ||
"version": "2.5.5", | ||
"version": "2.5.6", | ||
"main": "./lib/index.js", | ||
@@ -12,0 +12,0 @@ "keywords": [ |
@@ -16,4 +16,2 @@ 'use strict'; | ||
var maxBytesTestMessagesSize; | ||
describe('SimpleConsumer', function () { | ||
@@ -166,4 +164,2 @@ before(function () { | ||
dataHandlerSpy.lastCall.args[0][1].message.value.toString('utf8').should.be.eql('p001'); | ||
// save for next test | ||
maxBytesTestMessagesSize = dataHandlerSpy.lastCall.args[0][0].messageSize + dataHandlerSpy.lastCall.args[0][1].messageSize; | ||
}); | ||
@@ -175,2 +171,3 @@ }); | ||
it('should receive messages in maxBytes batches', function () { | ||
var maxBytesTestMessagesSize = dataHandlerSpy.lastCall.args[0][0].messageSize + dataHandlerSpy.lastCall.args[0][1].messageSize; | ||
return consumer.unsubscribe('kafka-test-topic', 0).then(function () { | ||
@@ -195,2 +192,37 @@ dataHandlerSpy.reset(); | ||
it('should skip single message larger then configured maxBytes', function () { | ||
var mSize; | ||
dataHandlerSpy.reset(); | ||
return producer.send([{ | ||
topic: 'kafka-test-topic', | ||
partition: 0, | ||
message: { value: 'p0000000000000001' } | ||
}, { | ||
topic: 'kafka-test-topic', | ||
partition: 0, | ||
message: { value: 'p001' } | ||
}]) | ||
.delay(300) | ||
.then(function () { | ||
dataHandlerSpy.should.have.been.calledTwice; // eslint-disable-line | ||
mSize = dataHandlerSpy.getCall(0).args[0][0].messageSize; | ||
}) | ||
.then(function () { | ||
dataHandlerSpy.reset(); | ||
return consumer.unsubscribe('kafka-test-topic', 0).then(function () { | ||
return consumer.offset('kafka-test-topic', 0).then(function (offset) { | ||
// ask for maxBytes that is smaller then size of the first message but enough to receive second message | ||
var maxBytes = 8 + 4 + mSize - 1; | ||
return consumer.subscribe('kafka-test-topic', 0, { offset: offset - 2, maxBytes: maxBytes }, dataHandlerSpy) | ||
.delay(300) | ||
.then(function () { | ||
dataHandlerSpy.should.have.been.calledOnce; // eslint-disable-line | ||
dataHandlerSpy.getCall(0).args[0].should.be.an('array').and.have.length(1); | ||
dataHandlerSpy.getCall(0).args[0][0].message.value.toString('utf8').should.be.eql('p001'); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('should be able to commit single offset', function () { | ||
@@ -197,0 +229,0 @@ return consumer.commitOffset({ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
242076
4340