awaitable-emit
Emit message to kafka and wait while nestjs process it.
How it works
Create helper utils for end to end test:
emitMessage
- function to emit message to kafkaAwaitableEmitInterceptor
- nestjs global interceptor (required for emitMessage)dispose
- function to run on test tear down stage
Usage
- Create helper objects
const { emitMessage, AwaitableEmitInterceptor, dispose } = createAwaitableEmit(options)`
- Add interceptor to nestjs app
app.useGlobalInterceptors(new AwaitableEmitInterceptor());
- Use
emitMessage
in test - Run
dispose
in 'after all' stage
Options
getKafkaClient: () => ClientKafka
Factory function which returns kafka client instance
wait?: number
Wait time in milliseconds (if controller cannot handle message in this time, promise will be resolved)
brokers?: string[]
Kafka brokers, this is optional and uses kafka admin under the hood - waits when all consumners groups will have no lag
(this may or may not be useful for parallel running tests)
Example
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@EventPattern('user-created')
async handleEntityCreated(@Payload() payload: object) {
console.log('user created', payload);
await setTimeout(2000);
this.appService.shared.push(payload);
}
}
describe('AppController (e2e)', () => {
let app: INestMicroservice;
let clientKafka: ClientKafka;
let service: AppService;
const { emitMessage, AwaitableEmitInterceptor, dispose } =
createAwaitableEmit({
getKafkaClient: () => clientKafka,
});
before(async () => {
const providers: Provider[] = [
{
provide: 'KAFKA_CLIENT',
useFactory: () => {
return ClientProxyFactory.create({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'KAFKA_CLIENT',
brokers: ['127.0.0.1:9092'],
},
},
});
},
},
];
const testingModule = await Test.createTestingModule({
imports: [AppModule],
providers,
}).compile();
app = testingModule.createNestMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'KAFKA_CLIENT',
brokers: ['127.0.0.1:9092'],
},
},
});
app.useGlobalInterceptors(new AwaitableEmitInterceptor());
await app.init();
await app.listen();
clientKafka = app.get<ClientKafka>('KAFKA_CLIENT');
service = app.get(AppService);
});
after(async () => {
await dispose();
await clientKafka?.close();
await app?.close();
});
it('smoke', () => {
expect(clientKafka).toBeTruthy();
});
it('test emit message', async () => {
await emitMessage('user-created', {
key: Date.now.toString(),
value: { name: 'Bob' },
});
expect(service.shared.at(-1)).toEqual({ name: 'Bob' });
});
});