kafka-test-helper
Kafka test helper is a Node.js library that helps you writing integration tests that interacts with Apache Kafka.
Features
This library can:
- read events published on a topic
- publish events on a channel or a specific partition
- create/delete topic on the fly
Kafka test helper is able to work both with independent messages and messages managed in the context of a transaction (commited or aborted).
You can use Kafka test helper in conjunction with your preferred testing library, because it is compatible with jest, mocha, tap and so on..., by the way it's agnostic by test runners.
Documentation
Install
npm install kafka-test-helper
Examples
Testing a producer
Full source code available here: ProducerExample.test.js and ProducerExample.js
import { createKafkaTestHelper } from 'kafka-test-helper'
test('ProducerExample', async () => {
const kafka = getKafka()
const topicPrefix = Date.now()
const topicHelper = await createKafkaTestHelper(kafka, topicPrefix + '_something_happened')
await topicHelper.ensureTopicExists()
const controller = new ProducerExample()
await controller.setup(kafka, topicPrefix)
const record = {
name: 'Tony',
surname: 'Stark'
}
await controller.doSomething(record)
const messages = await topicHelper.messages()
expect(messages).toHaveLength(1)
expect(messages[0].json).toEqual({
operation: 'doSomething',
record
})
expect(messages[0].string).toEqual(JSON.stringify({
operation: 'doSomething',
record
}))
expect(messages[0].buffer).toEqual(Buffer.from(JSON.stringify({
operation: 'doSomething',
record
})))
expect(messages).toEqual([
expect.objectContaining({
json: {
operation: 'doSomething',
record
}
})
])
await controller.destroy()
await topicHelper.ensureTopicDeleted()
})
Testing a consumer
Full source code available here: ConsumerExample.test.js and ConsumerExample.js
import { createKafkaTestHelper } from 'kafka-test-helper'
test('ConsumerExample', async () => {
const kafka = getKafka()
const topicPrefix = Date.now()
const topicHelper = await createKafkaTestHelper(kafka, topicPrefix + 'test-topic')
await topicHelper.ensureTopicExists()
const controller = new ConsumerExample()
await controller.setup(kafka, topicPrefix)
const waitMessage = () => new Promise(resolve => {
controller.handleMessage = jest.fn()
.mockImplementation(message => {
resolve(message)
})
})
await topicHelper.publishMessages([
{
json: {
hello: 'world'
}
}
])
const message = await waitMessage()
expect(message).toBe('{"hello":"world"}')
await controller.destroy()
await topicHelper.ensureTopicDeleted()
})
Changelog
- 0.x - Beta version
- 1.0 - First official version
- 1.1 - Adds typescript definition [#1]; Upgrades deps
- 1.2 - Migrates to Kafka.JS 2; Upgrades deps; Upgrades referenced images on docker-compose
- 1.3 - Fixes
js/insecure-randomness
security issue; Deprecates Node.js 14 and 16; Upgrades deps, Kafka docker images and workflows
Contributors
References