
Security News
PyPI Expands Trusted Publishing to GitLab Self-Managed as Adoption Passes 25 Percent
PyPI adds Trusted Publishing support for GitLab Self-Managed as adoption reaches 25% of uploads
@betsys-nestjs/kafka
Advanced tools
This library enables messaging using Apache Kafka.
| Package | Version |
|---|---|
| kafkajs | ^2.0.0 |
| @kafkajs/confluent-schema-registry | ^3.0.0 |
| reflect-metadata | ^0.1.12 |
| rxjs | ^7.1.0 |
KafkaModule to your module.@Module({
imports: [
KafkaModule.forFeature(kafkaConfig(), 'my-handle'),
]
})
export class AppModule {
// ...
}
forFeature:kafkaConfig:const kafkaConfig: KafkaModuleConfig = {
brokers: ['localhost:29092'], // list of kafka broker hosts
clientId: 'test-client-id', // unique client identification
registryHost: 'http://localhost:8081', // schema registry host
};
dbHandle - unique handle identifier
NOTE: Library requires @betsys-nestjs/logger to work. It's planned to decouple this library of this dependency.
setup infrastructure
await schemaRegistryProvider.createSchemaRegistry();
await kafkaConnectionUtils.createKafkaConnection();
await kafkaConnectionUtils.connectProducer();
dog.schema.avro{
"name": "Dog",
"type": "record",
"namespace": "test_namespace",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "birthplace",
"type": [
"null",
"string"
],
"default": null
}
]
}
-value suffix const dogSchema = await readAVSCAsync(path.join(__dirname, 'dog.schema.avsc'));
const schemaId = await schemaRegistryProvider.registerSchema(dogSchema, {
subject: `dog-subject-value`,
compatibility: COMPATIBILITY.BACKWARD,
});
await kafkaConnectionUtils.createTopic('dog-topic');
const payload = await schemaRegistryProvider
.getSchemaRegistry()
.encode(schemaId, {
name: 'Buddy',
age: 6,
birthplace: 'Děčín',
});
await kafkaConnectionUtils.send('dog-topic', [{ value: payload }])
await kafkaConnectionUtils.connectConsumer({ groupId: 'unique-consumer-group' })
const createCallback = async (
schemaProvider: SchemaRegistryProvider,
schema: RawAvroSchema,
): Promise<(payload: EachMessagePayload) => Promise<void>> => async (payload: EachMessagePayload): Promise<void> => {
const schemaRegistry = schemaRegistryProvider.getSchemaRegistry();
const decodedMessage = await schemaRegistry.decode(
payload.message.value as Buffer,
{
[SchemaType.AVRO]: { readerSchema: schema },
},
);
console.log(decodedMessage);
};
await kafkaConnectionUtils.subscribeAndRunConsumer(
{ topics: ['dog-topic'] },
await createCallback(
schemaRegistryProvider,
dogSchema,
),
);
The library is ready to work with logger. To enable it you need to implement your own logger service based on abstraction provided by this library.
You can simply implement custom service following KafkaLoggerInterface.
Example using @betsys-nestjs/logger:
import { Injectable } from '@nestjs/common';
import { Logger as NestLogger } from '@betsys-nestjs/logger';
import { Logger } from '@betsys-nestjs/postgres';
@Injectable()
export class KafkaLogger implements KafkaLoggerInterface {
constructor(private readonly logger: NestLogger) {}
info(message: string): void {
// eslint-disable-next-line no-console
this.logger.info(message);
}
setContext(context: string): void {
this.logger.setContext(context);
}
}
In setContext you can define some context for further logging.
info method is responsible for logging itself so you can either use some console.log or any logger based on your
preference like winston etc.
To start using Logger service, you simply insert class references to forFeature method of KafkaModule like this:
KafkaModule.forFeature({
...kafkaConfig(),
logger: KafkaTestLogger,
})
FAQs
Enables messaging using Apache Kafka.
The npm package @betsys-nestjs/kafka receives a total of 1 weekly downloads. As such, @betsys-nestjs/kafka popularity was classified as not popular.
We found that @betsys-nestjs/kafka demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 9 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
PyPI adds Trusted Publishing support for GitLab Self-Managed as adoption reaches 25% of uploads

Research
/Security News
A malicious Chrome extension posing as an Ethereum wallet steals seed phrases by encoding them into Sui transactions, enabling full wallet takeover.

Security News
Socket is heading to London! Stop by our booth or schedule a meeting to see what we've been working on.