
Security News
OWASP 2025 Top 10 Adds Software Supply Chain Failures, Ranked Top Community Concern
OWASP’s 2025 Top 10 introduces Software Supply Chain Failures as a new category, reflecting rising concern over dependency and build system risks.
@betsys-nestjs/kafka
Advanced tools
This library enables messaging using Apache Kafka.
| Package | Version |
|---|---|
| kafkajs | ^2.0.0 |
| @kafkajs/confluent-schema-registry | ^3.0.0 |
| reflect-metadata | ^0.1.12 |
| rxjs | ^7.1.0 |
KafkaModule to your module.@Module({
imports: [
KafkaModule.forFeature(kafkaConfig(), 'my-handle'),
]
})
export class AppModule {
// ...
}
forFeature:kafkaConfig:const kafkaConfig: KafkaModuleConfig = {
brokers: ['localhost:29092'], // list of kafka broker hosts
clientId: 'test-client-id', // unique client identification
registryHost: 'http://localhost:8081', // schema registry host
};
dbHandle - unique handle identifier
NOTE: Library requires @betsys-nestjs/logger to work. It's planned to decouple this library of this dependency.
setup infrastructure
await schemaRegistryProvider.createSchemaRegistry();
await kafkaConnectionUtils.createKafkaConnection();
await kafkaConnectionUtils.connectProducer();
dog.schema.avro{
"name": "Dog",
"type": "record",
"namespace": "test_namespace",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "birthplace",
"type": [
"null",
"string"
],
"default": null
}
]
}
-value suffix const dogSchema = await readAVSCAsync(path.join(__dirname, 'dog.schema.avsc'));
const schemaId = await schemaRegistryProvider.registerSchema(dogSchema, {
subject: `dog-subject-value`,
compatibility: COMPATIBILITY.BACKWARD,
});
await kafkaConnectionUtils.createTopic('dog-topic');
const payload = await schemaRegistryProvider
.getSchemaRegistry()
.encode(schemaId, {
name: 'Buddy',
age: 6,
birthplace: 'Děčín',
});
await kafkaConnectionUtils.send('dog-topic', [{ value: payload }])
await kafkaConnectionUtils.connectConsumer({ groupId: 'unique-consumer-group' })
const createCallback = async (
schemaProvider: SchemaRegistryProvider,
schema: RawAvroSchema,
): Promise<(payload: EachMessagePayload) => Promise<void>> => async (payload: EachMessagePayload): Promise<void> => {
const schemaRegistry = schemaRegistryProvider.getSchemaRegistry();
const decodedMessage = await schemaRegistry.decode(
payload.message.value as Buffer,
{
[SchemaType.AVRO]: { readerSchema: schema },
},
);
console.log(decodedMessage);
};
await kafkaConnectionUtils.subscribeAndRunConsumer(
{ topics: ['dog-topic'] },
await createCallback(
schemaRegistryProvider,
dogSchema,
),
);
The library is ready to work with logger. To enable it you need to implement your own logger service based on abstraction provided by this library.
You can simply implement custom service following KafkaLoggerInterface.
Example using @betsys-nestjs/logger:
import { Injectable } from '@nestjs/common';
import { Logger as NestLogger } from '@betsys-nestjs/logger';
import { Logger } from '@betsys-nestjs/postgres';
@Injectable()
export class KafkaLogger implements KafkaLoggerInterface {
constructor(private readonly logger: NestLogger) {}
info(message: string): void {
// eslint-disable-next-line no-console
this.logger.info(message);
}
setContext(context: string): void {
this.logger.setContext(context);
}
}
In setContext you can define some context for further logging.
info method is responsible for logging itself so you can either use some console.log or any logger based on your
preference like winston etc.
To start using Logger service, you simply insert class references to forFeature method of KafkaModule like this:
KafkaModule.forFeature({
...kafkaConfig(),
logger: KafkaTestLogger,
})
FAQs
Enables messaging using Apache Kafka.
The npm package @betsys-nestjs/kafka receives a total of 0 weekly downloads. As such, @betsys-nestjs/kafka popularity was classified as not popular.
We found that @betsys-nestjs/kafka demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 9 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
OWASP’s 2025 Top 10 introduces Software Supply Chain Failures as a new category, reflecting rising concern over dependency and build system risks.

Research
/Security News
Socket researchers discovered nine malicious NuGet packages that use time-delayed payloads to crash applications and corrupt industrial control systems.

Security News
Socket CTO Ahmad Nassri discusses why supply chain attacks now target developer machines and what AI means for the future of enterprise security.