Comparing version 0.4.1 to 0.5.0
{ | ||
"name": "kafkajs", | ||
"version": "0.4.1", | ||
"version": "0.5.0", | ||
"description": "A modern Apache Kafka client for node.js", | ||
@@ -5,0 +5,0 @@ "author": "Tulio Ornelas <ornelas.tulio@gmail.com>", |
@@ -5,2 +5,3 @@ const flatten = require('../utils/flatten') | ||
const { KafkaJSError } = require('../errors') | ||
const { HEARTBEAT } = require('./instrumentationEvents') | ||
@@ -22,2 +23,3 @@ const { keys } = Object | ||
logger, | ||
instrumentationEmitter, | ||
assigner, | ||
@@ -35,2 +37,3 @@ sessionTimeout, | ||
this.logger = logger.namespace('ConsumerGroup') | ||
this.instrumentationEmitter = instrumentationEmitter | ||
this.assigner = assigner | ||
@@ -109,2 +112,3 @@ this.sessionTimeout = sessionTimeout | ||
topicConfigurations: this.topicConfigurations, | ||
instrumentationEmitter: this.instrumentationEmitter, | ||
coordinator, | ||
@@ -134,8 +138,10 @@ memberAssignment, | ||
if (now > this.lastRequest + interval) { | ||
await this.coordinator.heartbeat({ | ||
const payload = { | ||
groupId, | ||
memberId, | ||
groupGenerationId: generationId, | ||
}) | ||
} | ||
await this.coordinator.heartbeat(payload) | ||
this.instrumentationEmitter.emit(HEARTBEAT, payload) | ||
this.lastRequest = Date.now() | ||
@@ -142,0 +148,0 @@ } |
const createRoundRobinAssigned = require('./assigners/roundRobinAssigner') | ||
const ConsumerGroup = require('./consumerGroup') | ||
const Runner = require('./runner') | ||
const events = require('./instrumentationEvents') | ||
const InstrumentationEventEmitter = require('../instrumentation/emitter') | ||
const { KafkaJSError } = require('../errors') | ||
const eventNames = Object.values(events) | ||
const eventKeys = Object.keys(events) | ||
.map(key => `consumer.events.${key}`) | ||
.join(', ') | ||
module.exports = ({ | ||
@@ -20,2 +28,3 @@ cluster, | ||
}) => { | ||
const instrumentationEmitter = new InstrumentationEventEmitter() | ||
const assigner = createPartitionAssigner({ cluster }) | ||
@@ -39,2 +48,3 @@ const logger = rootLogger.namespace('Consumer') | ||
maxWaitTimeInMs, | ||
instrumentationEmitter, | ||
}) | ||
@@ -107,2 +117,17 @@ | ||
/** | ||
* @param {string} eventName | ||
* @param {Function} listener | ||
* @return {Function} | ||
*/ | ||
const on = (eventName, listener) => { | ||
if (!eventNames.includes(eventName)) { | ||
throw new KafkaJSError(`Event name should be one of ${eventKeys}`, { | ||
retriable: false, | ||
}) | ||
} | ||
return instrumentationEmitter.addListener(eventName, event => listener(event)) | ||
} | ||
return { | ||
@@ -113,3 +138,5 @@ connect, | ||
run, | ||
on, | ||
events, | ||
} | ||
} |
const Long = require('long') | ||
const isInvalidOffset = require('./isInvalidOffset') | ||
const initializeConsumerOffsets = require('./initializeConsumerOffsets') | ||
const { COMMIT_OFFSETS } = require('../instrumentationEvents') | ||
@@ -14,2 +15,3 @@ const { keys, assign } = Object | ||
topicConfigurations, | ||
instrumentationEmitter, | ||
groupId, | ||
@@ -23,2 +25,3 @@ generationId, | ||
this.topicConfigurations = topicConfigurations | ||
this.instrumentationEmitter = instrumentationEmitter | ||
this.groupId = groupId | ||
@@ -138,3 +141,3 @@ this.generationId = generationId | ||
await this.coordinator.offsetCommit({ | ||
const payload = { | ||
groupId, | ||
@@ -144,4 +147,7 @@ memberId, | ||
topics: topicsWithPartitionsToCommit, | ||
}) | ||
} | ||
await this.coordinator.offsetCommit(payload) | ||
this.instrumentationEmitter.emit(COMMIT_OFFSETS, payload) | ||
// Update local reference of committed offsets | ||
@@ -148,0 +154,0 @@ topicsWithPartitionsToCommit.forEach(({ topic, partitions }) => { |
262701
159
4966