@mojaloop/platform-shared-lib-nodejs-kafka-client-lib
Advanced tools
Comparing version 0.3.59 to 0.3.60
@@ -45,3 +45,5 @@ /***** | ||
setCallbackFn: (handlerCallback: (message: IRawMessage) => Promise<void>) => void; | ||
setBatchCallbackFn: (batchHandlerCallback: (messages: IRawMessage[]) => Promise<void>) => void; | ||
setTopics: (topics: string[]) => void; | ||
setBatchSize(size: number): void; | ||
destroy: (force: boolean) => Promise<void>; | ||
@@ -48,0 +50,0 @@ connect: () => Promise<void>; |
@@ -74,2 +74,4 @@ /***** | ||
private _handlerCallback; | ||
private _batchHandlerCallback; | ||
private _batchSize; | ||
constructor(options: MLKafkaRawConsumerOptions, logger?: ILogger | null); | ||
@@ -88,6 +90,9 @@ on(event: MLKafkaRawConsumerEvents, listener: MLKafkaRawConsumerEventListener<MLKafkaRawConsumerEvents>): this; | ||
private _onDisconnect; | ||
private _consuming; | ||
private _consumeLoop; | ||
private _toIMessage; | ||
private _commitMsg; | ||
setBatchSize(size: number): void; | ||
setCallbackFn(handlerCallback: (message: IRawMessage) => Promise<void>): void; | ||
setBatchCallbackFn(batchHandlerCallback: (messages: IRawMessage[]) => Promise<void>): void; | ||
setTopics(topics: string[]): void; | ||
@@ -94,0 +99,0 @@ destroy(force: boolean): Promise<void>; |
@@ -55,2 +55,6 @@ /***** | ||
super(); | ||
this._handlerCallback = null; | ||
this._batchHandlerCallback = null; | ||
this._batchSize = defaultOptions.batchSize; | ||
this._consuming = false; | ||
this._options = options; | ||
@@ -118,2 +122,4 @@ this._logger = logger; | ||
this._globalConfig["metadata.broker.list"] = this._options.kafkaBrokerList; | ||
// local helper vars | ||
this._batchSize = this._options.batchSize; | ||
} | ||
@@ -186,5 +192,5 @@ _onReady(info, metadata) { | ||
_consumeLoop() { | ||
if (!this._client.isConnected()) | ||
if (!this._client.isConnected() || this._consuming) | ||
return; | ||
this._client.consume(this._options.batchSize || defaultOptions.batchSize, async (err, kafkaMessages) => { | ||
this._client.consume(this._batchSize, async (err, kafkaMessages) => { | ||
if (err) { | ||
@@ -199,8 +205,24 @@ if (!this._client.isConnected() || err.code == -172 /* not connected or wrong state */ || err.code == 3 /* Broker: Unknown topic or partition */) | ||
} | ||
for (const kafkaMessage of kafkaMessages) { | ||
const msg = this._toIMessage(kafkaMessage); | ||
// call the provided handler and then commit | ||
await this._handlerCallback(msg); | ||
this._commitMsg(kafkaMessage); | ||
if (kafkaMessages.length <= 0) { | ||
setImmediate(() => { | ||
this._consumeLoop(); | ||
}); | ||
return; | ||
} | ||
this._consuming = true; | ||
// use the batch handler if batchSize > 1 and we have a batchHandlerCallback | ||
if (this._batchSize > 1 && this._batchHandlerCallback) { | ||
const msgs = kafkaMessages.map(this._toIMessage.bind(this)); | ||
await this._batchHandlerCallback(msgs); | ||
this._commitMsg(kafkaMessages); | ||
} | ||
else if (this._handlerCallback) { | ||
for (const kafkaMessage of kafkaMessages) { | ||
const msg = this._toIMessage(kafkaMessage); | ||
// call the provided handler and then commit | ||
await this._handlerCallback(msg); | ||
this._commitMsg(kafkaMessage); | ||
} | ||
} | ||
this._consuming = false; | ||
setImmediate(() => { | ||
@@ -268,15 +290,32 @@ this._consumeLoop(); | ||
} | ||
_commitMsg(kafkaMessage) { | ||
if (this._globalConfig["enable.auto.commit"] !== true) { | ||
/* // commit all read offsets | ||
private _commitMessages(kafkaMessages: RDKafka.Message[]): void { | ||
if (this._globalConfig["enable.auto.commit"]===true) return; | ||
if (this._options.useSyncCommit) { | ||
this._client.commitMessageSync(kafkaMessage); | ||
this._client.commitSync(kafkaMessages); | ||
} else { | ||
this._client.commit(kafkaMessages); | ||
} | ||
else { | ||
this._client.commitMessage(kafkaMessage); | ||
} | ||
}*/ | ||
_commitMsg(kafkaMessage) { | ||
if (this._globalConfig["enable.auto.commit"] === true) | ||
return; | ||
if (this._options.useSyncCommit) { | ||
this._client.commitSync(kafkaMessage); | ||
} | ||
else { | ||
this._client.commit(kafkaMessage); | ||
} | ||
} | ||
setBatchSize(size) { | ||
this._batchSize = size; | ||
} | ||
setCallbackFn(handlerCallback) { | ||
this._batchHandlerCallback = null; | ||
this._handlerCallback = handlerCallback; | ||
} | ||
setBatchCallbackFn(batchHandlerCallback) { | ||
this._batchHandlerCallback = batchHandlerCallback; | ||
} | ||
setTopics(topics) { | ||
@@ -283,0 +322,0 @@ this._topics = topics; |
@@ -55,2 +55,3 @@ /***** | ||
private _handlerCallback; | ||
private _batchHandlerCallback; | ||
private _filterFn; | ||
@@ -63,5 +64,8 @@ private _options; | ||
private _internalHandler; | ||
private _internalBatchHandler; | ||
setCallbackFn(handlerCallback: (message: IMessage) => Promise<void>): void; | ||
setBatchCallbackFn(batchHandlerCallback: (messages: IMessage[]) => Promise<void>): void; | ||
setFilteringFn(filterFn: (message: IMessage) => boolean): void; | ||
setTopics(topics: string[]): void; | ||
setBatchSize(size: number): void; | ||
destroy(force: boolean): Promise<void>; | ||
@@ -68,0 +72,0 @@ connect(): Promise<void>; |
@@ -41,2 +41,4 @@ /***** | ||
super(); | ||
this._handlerCallback = null; | ||
this._batchHandlerCallback = null; | ||
this._options = options; | ||
@@ -49,3 +51,8 @@ this._logger = logger; | ||
this._kafkaRawConsumer = new rdkafka_raw_consumer_1.MLKafkaRawConsumer(rawOptions, logger); | ||
this._kafkaRawConsumer.setCallbackFn(this._internalHandler.bind(this)); | ||
if (this._options.batchSize && this._options.batchSize > 1) { | ||
this._kafkaRawConsumer.setBatchCallbackFn(this._internalBatchHandler.bind(this)); | ||
} | ||
else { | ||
this._kafkaRawConsumer.setCallbackFn(this._internalHandler.bind(this)); | ||
} | ||
this._kafkaRawConsumer.eventNames(); | ||
@@ -73,2 +80,4 @@ // hook MLKafkaRawConsumer events we care about | ||
async _internalHandler(rawMessage) { | ||
if (!this._handlerCallback) | ||
return; | ||
// convert raw message to IMessage | ||
@@ -82,5 +91,26 @@ const msg = this._convertMsg(rawMessage); | ||
} | ||
async _internalBatchHandler(rawMessages) { | ||
if (!this._batchHandlerCallback) | ||
return; | ||
// convert raw message to IMessage and filter | ||
const msgs = []; | ||
rawMessages.forEach(rawMsg => { | ||
const msg = this._convertMsg(rawMsg); | ||
if (this._filterFn && !this._filterFn(msg)) { | ||
this._logger?.isDebugEnabled() && this._logger.debug("MLKafkaConsumer - ignoring message filtered out by filterFunction"); | ||
} | ||
else { | ||
msgs.push(msg); | ||
} | ||
}); | ||
await this._batchHandlerCallback(msgs); | ||
} | ||
setCallbackFn(handlerCallback) { | ||
this._batchHandlerCallback = null; | ||
this._handlerCallback = handlerCallback; | ||
} | ||
setBatchCallbackFn(batchHandlerCallback) { | ||
this._handlerCallback = null; | ||
this._batchHandlerCallback = batchHandlerCallback; | ||
} | ||
setFilteringFn(filterFn) { | ||
@@ -92,2 +122,5 @@ this._filterFn = filterFn; | ||
} | ||
setBatchSize(size) { | ||
this.setBatchSize(size); | ||
} | ||
async destroy(force) { | ||
@@ -94,0 +127,0 @@ return this._kafkaRawConsumer.destroy(force); |
{ | ||
"name": "@mojaloop/platform-shared-lib-nodejs-kafka-client-lib", | ||
"version": "0.3.59", | ||
"version": "0.3.60", | ||
"description": "mojaloop vnext platform shared libraries", | ||
@@ -5,0 +5,0 @@ "license": "Apache-2.0", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
101049
1306