Comparing version 0.2.3 to 0.2.4
@@ -11,7 +11,3 @@ 'use strict'; | ||
connectionString: '127.0.0.1:9092,127.0.0.1:9093', | ||
timeout: 1000, | ||
clientId: 'group-consumer', | ||
nsl: { | ||
logLevel: 7 | ||
} | ||
timeout: 1000 | ||
}); | ||
@@ -18,0 +14,0 @@ |
@@ -176,2 +176,16 @@ 'use strict'; | ||
function _fakeTopicsErrorResponse(topics, error) { | ||
return _.map(topics, function (t) { | ||
return { | ||
topicName: t.topicName, | ||
partitions: _.map(t.partitions, function (p) { | ||
return { | ||
partition: p.partition, | ||
error: error | ||
}; | ||
}) | ||
}; | ||
}); | ||
} | ||
Client.prototype.produceRequest = function (requests) { | ||
@@ -194,2 +208,5 @@ var self = this; | ||
} | ||
}) | ||
.catch(errors.NoKafkaConnectionError, function (err) { | ||
return _fakeTopicsErrorResponse(topics, err); | ||
}); | ||
@@ -205,16 +222,2 @@ })) | ||
function _fakeErrorResponse(topics, error) { | ||
return _.map(topics, function (t) { | ||
return { | ||
topicName: t.topicName, | ||
partitions: _.map(t.partitions, function (p) { | ||
return { | ||
partition: p.partition, | ||
error: error | ||
}; | ||
}) | ||
}; | ||
}); | ||
} | ||
return self._metadata().then(function () { | ||
@@ -225,3 +228,3 @@ return Promise.all(_.map(requests, function (topics, leader) { | ||
if (leader === -1 || !self.brokerConnections[leader]) { | ||
return _fakeErrorResponse(topics, errors.byName('LeaderNotAvailable')); | ||
return _fakeTopicsErrorResponse(topics, errors.byName('LeaderNotAvailable')); | ||
} | ||
@@ -241,3 +244,3 @@ | ||
.catch(errors.NoKafkaConnectionError, function (err) { | ||
return _fakeErrorResponse(topics, err); | ||
return _fakeTopicsErrorResponse(topics, err); | ||
}); | ||
@@ -244,0 +247,0 @@ })) |
@@ -6,2 +6,3 @@ 'use strict'; | ||
var Client = require('./client'); | ||
var errors = require('./errors'); | ||
@@ -82,3 +83,6 @@ function Producer(options) { | ||
if (p.error) { | ||
if (/UnknownTopicOrPartition|NotLeaderForPartition|LeaderNotAvailable/.test(p.error.code) && attempt < 3) { | ||
if ((/UnknownTopicOrPartition|NotLeaderForPartition|LeaderNotAvailable/.test(p.error.code) | ||
|| p.error instanceof errors.NoKafkaConnectionError) | ||
&& attempt < 3) { | ||
// self.client.debug('Received', p.error, 'for', p.topic + ':' + p.partition); | ||
toRetry = toRetry.concat(_.filter(_data, { topic: p.topic, partition: p.partition })); | ||
@@ -85,0 +89,0 @@ } else { |
@@ -9,3 +9,3 @@ { | ||
}, | ||
"version": "0.2.3", | ||
"version": "0.2.4", | ||
"main": "./lib/index.js", | ||
@@ -28,6 +28,3 @@ "keywords": ["kafka"], | ||
"eslint-config-airbnb": "^3.1.0", | ||
"gulp": "^3.9.0", | ||
"gulp-eslint": "^1.1.1", | ||
"gulp-mocha": "^2.2.0", | ||
"gulp-istanbul": "^0.10.3" | ||
"istanbul": "^0.4.2" | ||
}, | ||
@@ -38,3 +35,3 @@ "bugs": { | ||
"scripts": { | ||
"test": "gulp" | ||
"test": "make" | ||
}, | ||
@@ -41,0 +38,0 @@ "repository": { |
@@ -1,2 +0,2 @@ | ||
"use strict"; | ||
'use strict'; | ||
@@ -15,9 +15,9 @@ // var Promise = require('bluebird'); | ||
return producer.send([ | ||
{ | ||
topic: 'kafka-test-topic', | ||
partition: 1, | ||
message: { | ||
value: new Date().toISOString() | ||
{ | ||
topic: 'kafka-test-topic', | ||
partition: 2, | ||
message: { | ||
value: new Date().toISOString() | ||
} | ||
} | ||
} | ||
]); | ||
@@ -32,2 +32,1 @@ }) | ||
}); | ||
@@ -208,2 +208,52 @@ [![Build Status](https://travis-ci.org/oleksiyk/kafka.png)](https://travis-ci.org/oleksiyk/kafka) | ||
## Logging | ||
You can differentiate messages from several instances of producer/consumer by providing unique `clientId` in options: | ||
```javascript | ||
var consumer1 = new Kafka.GroupConsumer({ | ||
clientId: 'group-consumer-1' | ||
}); | ||
var consumer2 = new Kafka.GroupConsumer({ | ||
clientId: 'group-consumer-2' | ||
}); | ||
``` | ||
=> | ||
``` | ||
2016-01-12T07:41:57.884Z INFO group-consumer-1 .... | ||
2016-01-12T07:41:57.884Z INFO group-consumer-2 .... | ||
``` | ||
You can also tune the logging level which should be bitwise OR for the following: | ||
- 1 - errors | ||
- 2 - warnings | ||
- 4 - log (info) | ||
- 16 - debug | ||
```javascript | ||
var consumer = new Kafka.GroupConsumer({ | ||
clientId: 'group-consumer', | ||
nsl: { | ||
logLevel: 7 // filter out debug messages (1 | 2 | 4) | ||
} | ||
}); | ||
``` | ||
You can overwrite all (error(), log(), warn(), debug()) or just some of the logger functions: | ||
```javascript | ||
var consumer = new Kafka.GroupConsumer({ | ||
clientId: 'group-consumer', | ||
logger: { | ||
log: console.log // overwrite just the .log() method | ||
} | ||
}); | ||
``` | ||
First argument passed to custom log functions will always be `clientId`: | ||
``` | ||
group-consumer Joined group no-kafka-group-v0.9 generationId 1 as group-consumer-c88fa417-b6a2-499c-b03f-fa66093831f6 | ||
``` |
@@ -53,3 +53,3 @@ 'use strict'; | ||
fn: Kafka.GroupConsumer.RoundRobinAssignment | ||
}) | ||
}).delay(200) // let it consume previous messages in a topic (if any) | ||
]); | ||
@@ -256,2 +256,3 @@ }); | ||
it('should not log errors on clean shutdown', function () { | ||
var spy = sinon.spy(function () {}); | ||
var consumer = new Kafka.GroupConsumer({ | ||
@@ -261,8 +262,8 @@ groupId: 'no-kafka-shutdown-test-group', | ||
idleTimeout: 100, | ||
heartbeatTimeout: 100 | ||
heartbeatTimeout: 100, | ||
logger: { | ||
error: spy | ||
} | ||
}); | ||
var origError = Kafka.error; | ||
Kafka.error = sinon.spy(Kafka.error); | ||
return consumer.init({ | ||
@@ -279,8 +280,5 @@ strategy: 'TestStrategy', | ||
/* jshint expr: true */ | ||
Kafka.error.should.not.have.been.called; //eslint-disable-line | ||
}) | ||
.finally(function () { | ||
Kafka.error = origError; | ||
spy.should.not.have.been.called; //eslint-disable-line | ||
}); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
7
259
166257
36
3014