New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@mojaloop/platform-shared-lib-nodejs-kafka-client-lib

Package Overview
Dependencies
Maintainers
5
Versions
144
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@mojaloop/platform-shared-lib-nodejs-kafka-client-lib - npm Package Compare versions

Comparing version 0.3.59 to 0.3.60

2

dist/raw/raw_types.d.ts

@@ -45,3 +45,5 @@ /*****

setCallbackFn: (handlerCallback: (message: IRawMessage) => Promise<void>) => void;
setBatchCallbackFn: (batchHandlerCallback: (messages: IRawMessage[]) => Promise<void>) => void;
setTopics: (topics: string[]) => void;
setBatchSize(size: number): void;
destroy: (force: boolean) => Promise<void>;

@@ -48,0 +50,0 @@ connect: () => Promise<void>;

@@ -74,2 +74,4 @@ /*****

private _handlerCallback;
private _batchHandlerCallback;
private _batchSize;
constructor(options: MLKafkaRawConsumerOptions, logger?: ILogger | null);

@@ -88,6 +90,9 @@ on(event: MLKafkaRawConsumerEvents, listener: MLKafkaRawConsumerEventListener<MLKafkaRawConsumerEvents>): this;

private _onDisconnect;
private _consuming;
private _consumeLoop;
private _toIMessage;
private _commitMsg;
setBatchSize(size: number): void;
setCallbackFn(handlerCallback: (message: IRawMessage) => Promise<void>): void;
setBatchCallbackFn(batchHandlerCallback: (messages: IRawMessage[]) => Promise<void>): void;
setTopics(topics: string[]): void;

@@ -94,0 +99,0 @@ destroy(force: boolean): Promise<void>;

65

dist/raw/rdkafka_raw_consumer.js

@@ -55,2 +55,6 @@ /*****

super();
this._handlerCallback = null;
this._batchHandlerCallback = null;
this._batchSize = defaultOptions.batchSize;
this._consuming = false;
this._options = options;

@@ -118,2 +122,4 @@ this._logger = logger;

this._globalConfig["metadata.broker.list"] = this._options.kafkaBrokerList;
// local helper vars
this._batchSize = this._options.batchSize;
}

@@ -186,5 +192,5 @@ _onReady(info, metadata) {

_consumeLoop() {
if (!this._client.isConnected())
if (!this._client.isConnected() || this._consuming)
return;
this._client.consume(this._options.batchSize || defaultOptions.batchSize, async (err, kafkaMessages) => {
this._client.consume(this._batchSize, async (err, kafkaMessages) => {
if (err) {

@@ -199,8 +205,24 @@ if (!this._client.isConnected() || err.code == -172 /* not connected or wrong state */ || err.code == 3 /* Broker: Unknown topic or partition */)

}
for (const kafkaMessage of kafkaMessages) {
const msg = this._toIMessage(kafkaMessage);
// call the provided handler and then commit
await this._handlerCallback(msg);
this._commitMsg(kafkaMessage);
if (kafkaMessages.length <= 0) {
setImmediate(() => {
this._consumeLoop();
});
return;
}
this._consuming = true;
// use the batch handler if batchSize > 1 and we have a batchHandlerCallback
if (this._batchSize > 1 && this._batchHandlerCallback) {
const msgs = kafkaMessages.map(this._toIMessage.bind(this));
await this._batchHandlerCallback(msgs);
this._commitMsg(kafkaMessages);
}
else if (this._handlerCallback) {
for (const kafkaMessage of kafkaMessages) {
const msg = this._toIMessage(kafkaMessage);
// call the provided handler and then commit
await this._handlerCallback(msg);
this._commitMsg(kafkaMessage);
}
}
this._consuming = false;
setImmediate(() => {

@@ -268,15 +290,32 @@ this._consumeLoop();

}
_commitMsg(kafkaMessage) {
if (this._globalConfig["enable.auto.commit"] !== true) {
/* // commit all read offsets
private _commitMessages(kafkaMessages: RDKafka.Message[]): void {
if (this._globalConfig["enable.auto.commit"]===true) return;
if (this._options.useSyncCommit) {
this._client.commitMessageSync(kafkaMessage);
this._client.commitSync(kafkaMessages);
} else {
this._client.commit(kafkaMessages);
}
else {
this._client.commitMessage(kafkaMessage);
}
}*/
_commitMsg(kafkaMessage) {
if (this._globalConfig["enable.auto.commit"] === true)
return;
if (this._options.useSyncCommit) {
this._client.commitSync(kafkaMessage);
}
else {
this._client.commit(kafkaMessage);
}
}
setBatchSize(size) {
this._batchSize = size;
}
setCallbackFn(handlerCallback) {
this._batchHandlerCallback = null;
this._handlerCallback = handlerCallback;
}
setBatchCallbackFn(batchHandlerCallback) {
this._batchHandlerCallback = batchHandlerCallback;
}
setTopics(topics) {

@@ -283,0 +322,0 @@ this._topics = topics;

@@ -55,2 +55,3 @@ /*****

private _handlerCallback;
private _batchHandlerCallback;
private _filterFn;

@@ -63,5 +64,8 @@ private _options;

private _internalHandler;
private _internalBatchHandler;
setCallbackFn(handlerCallback: (message: IMessage) => Promise<void>): void;
setBatchCallbackFn(batchHandlerCallback: (messages: IMessage[]) => Promise<void>): void;
setFilteringFn(filterFn: (message: IMessage) => boolean): void;
setTopics(topics: string[]): void;
setBatchSize(size: number): void;
destroy(force: boolean): Promise<void>;

@@ -68,0 +72,0 @@ connect(): Promise<void>;

@@ -41,2 +41,4 @@ /*****

super();
this._handlerCallback = null;
this._batchHandlerCallback = null;
this._options = options;

@@ -49,3 +51,8 @@ this._logger = logger;

this._kafkaRawConsumer = new rdkafka_raw_consumer_1.MLKafkaRawConsumer(rawOptions, logger);
this._kafkaRawConsumer.setCallbackFn(this._internalHandler.bind(this));
if (this._options.batchSize && this._options.batchSize > 1) {
this._kafkaRawConsumer.setBatchCallbackFn(this._internalBatchHandler.bind(this));
}
else {
this._kafkaRawConsumer.setCallbackFn(this._internalHandler.bind(this));
}
this._kafkaRawConsumer.eventNames();

@@ -73,2 +80,4 @@ // hook MLKafkaRawConsumer events we care about

async _internalHandler(rawMessage) {
if (!this._handlerCallback)
return;
// convert raw message to IMessage

@@ -82,5 +91,26 @@ const msg = this._convertMsg(rawMessage);

}
async _internalBatchHandler(rawMessages) {
if (!this._batchHandlerCallback)
return;
// convert raw message to IMessage and filter
const msgs = [];
rawMessages.forEach(rawMsg => {
const msg = this._convertMsg(rawMsg);
if (this._filterFn && !this._filterFn(msg)) {
this._logger?.isDebugEnabled() && this._logger.debug("MLKafkaConsumer - ignoring message filtered out by filterFunction");
}
else {
msgs.push(msg);
}
});
await this._batchHandlerCallback(msgs);
}
setCallbackFn(handlerCallback) {
this._batchHandlerCallback = null;
this._handlerCallback = handlerCallback;
}
setBatchCallbackFn(batchHandlerCallback) {
this._handlerCallback = null;
this._batchHandlerCallback = batchHandlerCallback;
}
setFilteringFn(filterFn) {

@@ -92,2 +122,5 @@ this._filterFn = filterFn;

}
setBatchSize(size) {
this.setBatchSize(size);
}
async destroy(force) {

@@ -94,0 +127,0 @@ return this._kafkaRawConsumer.destroy(force);

{
"name": "@mojaloop/platform-shared-lib-nodejs-kafka-client-lib",
"version": "0.3.59",
"version": "0.3.60",
"description": "mojaloop vnext platform shared libraries",

@@ -5,0 +5,0 @@ "license": "Apache-2.0",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc