
Research
Security News
Malicious PyPI Package Exploits Deezer API for Coordinated Music Piracy
Socket researchers uncovered a malicious PyPI package exploiting Deezer’s API to enable coordinated music piracy through API abuse and C2 server control.
@mojaloop/platform-shared-lib-nodejs-kafka-client-lib
Advanced tools
mojaloop vnext platform shared libraries
This is the kafka library implementation of the general messaging consumer and publisher interfaces found in here
It includes two a set of producer and consumer classes:
Never use these classes as a direct dependency from pure domain code, instead, use the generic types and interfaces provided by "@mojaloop/platform-shared-lib-messaging-types-lib"
which as implemented by the classes of this lib.
Only application layer implementations should use these classes directly (or tests).
import {ConsoleLogger} from "@mojaloop/logging-bc-public-types-lib";
import {IMessage, MessageTypes} from "@mojaloop/platform-shared-lib-messaging-types-lib";
import {MLKafkaJsonConsumer, MLKafkaJsonConsumerOptions} from "@mojaloop/platform-shared-lib-nodejs-kafka-client-lib";
const logger: ConsoleLogger = new ConsoleLogger();
const consumerOptions: MLKafkaJsonConsumerOptions = {
kafkaBrokerList: "localhost:9092",
kafkaGroupId: "test_consumer_group",
};
const kafkaConsumer: MLKafkaJsonConsumer = new MLKafkaJsonConsumer(consumerOptions, logger);
async function handler(message: IMessage): Promise<void> {
logger.debug(`Got message in handler: ${JSON.stringify(message, null, 2)}`);
return;
}
// OPTIONAL - wait for the Consumer Group this consumer instance belongs to, be rebalanced
// Waiting for this event after the start assures that any message received after this event is sent to the custom handler above
kafkaConsumer.on("rebalance", async (type: "assign" | "revoke", assignments) => {
if (type==="assign") {
// Consumer group rebalance is concluded
// This consumer group instance is now garanteed to start receiving messages
// from the subscribed topics and on the assinged partitions
}else{
// Consumer topic/partition assignments were revoked (some or all)
}
});
kafkaConsumer.setCallbackFn(handler);
kafkaConsumer.setTopics(["myTopic"]);
await kafkaConsumer.connect();
// Start consuming to handler
await kafkaConsumer.start();
// NOTE make sure destroy is always called when a consumer exists, to quickly inform the Kafka cluster,
// so it can rebalance the consumer group as soon as possible (without waiting for a timeout)
setTimeout(async ()=>{
await kafkaConsumer.destroy(false);
}, 1000);
import {ConsoleLogger} from "@mojaloop/logging-bc-public-types-lib";
import {IMessage, MessageTypes} from "@mojaloop/platform-shared-lib-messaging-types-lib";
import {MLKafkaJsonProducer, MLKafkaJsonProducerOptions} from "@mojaloop/platform-shared-lib-nodejs-kafka-client-lib";
const logger: ConsoleLogger = new ConsoleLogger();
const producerOptions: MLKafkaJsonProducerOptions = {
kafkaBrokerList: "localhost:9092",
producerClientId: "test_producer"
};
const kafkaProducer: MLKafkaJsonProducer = new MLKafkaJsonProducer(producerOptions, logger);
// example to get delivery reports
kafkaProducer.setDeliveryReportFn((topic: string, partition: number, offset: number) => {
console.log(`Delivery report event - topic: ${topic}, partition: ${partition}, offset: ${offset}`);
return;
});
// Set handler to null to disable
// kafkaProducer.setDeliveryReportFn(null);
await kafkaProducer.connect();
const msg:IMessage = {
msgId: "msgId",
msgName: "msgName",
msgKey: "msgKey",
msgType: MessageTypes.DOMAIN_EVENT,
msgTimestamp: Date.now(),
msgPartition: 42,
msgOffset: 31415,
msgTopic: "myTopic",
payload: {
testProp: "propValue"
}
};
await kafkaProducer.send(msg); //Note: send() can also receiver an array of messages
// NOTE make sure destroy is always called when finished
setTimeout(async ()=>{
await kafkaProducer.destroy()
}, 1000)
These allow publishing and consuming string or buffer messages directly without heavy JSON conversions.
They don't implement the generic interfaces that domain can use, so, these should only be used as base for higher level implementations.
Both MLKafkaJsonConsumer
and MLKafkaJsonProducer
wrap these low-level implementations.
export interface IRawMessageHeader {
[key: string]: string | Buffer;
}
export interface IRawMessage {
value: Buffer | string | object | null;
topic: string;
key: Buffer | string | null;
timestamp: number | null;
headers: IRawMessageHeader[] | null;
partition: number | null;
offset: number | null;
}
export interface IRawMessageConsumer {
setCallbackFn: (handlerCallback: (message: IRawMessage) => Promise<void>) => void;
setTopics: (topics: string[]) => void;
destroy: (force: boolean) => Promise<void>;
connect: () => Promise<void>;
disconnect: (force: boolean) => Promise<void>;
start: () => Promise<void>;
stop: () => Promise<void>;
}
export declare interface IRawMessageProducer {
destroy: () => Promise<void>;
connect: () => Promise<void>;
disconnect: () => Promise<void>;
send: (message: IRawMessage | IRawMessage[] | any) => Promise<void>;
}
MLKafkaRawConsumer
MLKafkaRawProducer
"ready"
- emitted as result of a successful connect and before the connect() finishes"rebalance"
- emitted as result of a successful consumer group rebalance (either with type assing or revoke)"rebalance.error"
- emitted as result of a unsuccessful consumer group rebalance (will include the error)"disconnected"
- emitted as result of the client being disconnected from the cluster"event"
| "throttle" | "stats" - technical/debug events, emitted by the rdkafka libTo control batching parameters of the MLKafkaRawConsumer
tweak the following options of the MLKafkaRawConsumerOptions
structure:
export class MLKafkaRawConsumerOptions {
// ... other options
batchSize?: number; // default: 1 - maximum number of messasges to wait for in each consume loop (if not timeout)
batchTimeoutMs?: number; // default: 1000 - maximum amount of time to wait for in each consume loop (if not bactch size reached)
}
The consumer loop will wait for any of those parameters to be true, whichever happens first.
FAQs
mojaloop vnext platform shared libraries
The npm package @mojaloop/platform-shared-lib-nodejs-kafka-client-lib receives a total of 81 weekly downloads. As such, @mojaloop/platform-shared-lib-nodejs-kafka-client-lib popularity was classified as not popular.
We found that @mojaloop/platform-shared-lib-nodejs-kafka-client-lib demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 5 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket researchers uncovered a malicious PyPI package exploiting Deezer’s API to enable coordinated music piracy through API abuse and C2 server control.
Research
The Socket Research Team discovered a malicious npm package, '@ton-wallet/create', stealing cryptocurrency wallet keys from developers and users in the TON ecosystem.
Security News
Newly introduced telemetry in devenv 1.4 sparked a backlash over privacy concerns, leading to the removal of its AI-powered feature after strong community pushback.