platform-shared-lib-nodejs-kafka-client-lib

Mojaloop Platform Shared Libraries - Node.js Kafka Client Lib
Usage
###Consumer example
const logger: ConsoleLogger = new ConsoleLogger()
let consumerOptions: MLKafkaConsumerOptions = {
kafkaBrokerList: 'localhost:9092',
kafkaGroupId: 'test_consumer_group_'+Date.now(),
outputType: MLKafkaConsumerOutputType.Json
}
let kafkaConsumer: MLKafkaConsumer = new MLKafkaConsumer(consumerOptions, logger)
async function handler(message: IMessage): Promise<void> {
logger.debug(`Got message in handler: ${JSON.stringify(message, null, 2)}`)
return
}
kafkaConsumer.setCallbackFn(handler)
kafkaConsumer.setTopics(['myTopic'])
await kafkaConsumer.connect()
await kafkaConsumer.start()
setTimeout(async ()=>{
await kafkaConsumer.destroy(false)
}, 10000)
###Producer example
const logger: ConsoleLogger = new ConsoleLogger()
let producerOptions: MLKafkaProducerOptions = {
kafkaBrokerList: 'localhost:9092',
producerClientId: 'test_producer'
}
let kafkaProducer: MLKafkaProducer = new MLKafkaProducer(producerOptions, logger)
kafkaProducer.on('deliveryReport', (topic: string, partition: number|null, offset: number|null) => {
console.log(`delivery report event - topic: ${topic}, partition: ${partition}, offset: ${offset}`)
return;
})
await kafkaProducer.connect()
const msgs = []
for (let i = 0; i < messageCount; i++) {
msgs.push({
topic: 'myTopic',
value: { testProp: i },
key: null,
headers: [
{ key1: 'testStr' }
]
})
}
await kafkaProducer.send(msgs)
setTimeout(async ()=>{
await kafkaProducer.destroy()
}, 10000)
Install Node version
More information on how to install NVM: https://github.com/nvm-sh/nvm
nvm install
nvm use
Install Yarn
npm -g yarn
Install Dependencies
yarn
Build
yarn build
Run
yarn start
Unit Tests
yarn test:unit
Known Issues
- added
typescript
to .ncurc.json as the dep:update
script will install a non-supported version of typescript