
Security News
New React Server Components Vulnerabilities: DoS and Source Code Exposure
New DoS and source code exposure bugs in React Server Components and Next.js: what’s affected and how to update safely.
@awesomeniko/kafka-trail
Advanced tools
A Node.js library for managing message queues with Kafka, designed to simplify creating, using, and managing Kafka topics with producers and consumers.
Install the library using npm or Yarn:
npm install kafka-trail
Or with Yarn:
yarn add kafka-trail
Here’s an example of how to use the kafka-trail library in your project.
// Define your Kafka broker URLs
import {
CreateKTTopic,
KafkaClientId,
KafkaMessageKey,
KafkaTopicName,
KTMessageQueue
} from "kafka-trail";
const kafkaBrokerUrls = ["localhost:19092"];
// Create a MessageQueue instance
const messageQueue = new KTMessageQueue();
// Start producer
await messageQueue.initProducer({
kafkaSettings: {
brokerUrls: kafkaBrokerUrls,
clientId: KafkaClientId.fromString('hostname'),
connectionTimeout: 30_000,
},
})
// Create topic fn
const { BaseTopic: TestExampleTopic } = CreateKTTopic<{
fieldForPayload: number
}>({
topic: KafkaTopicName.fromString('test.example'),
numPartitions: 1,
batchMessageSizeToConsume: 10, // Works is batchConsuming = true
})
// Create or use topic
await messageQueue.initTopics([
TestExampleTopic,
])
// Use publishSingleMessage method to publish message
const payload = TestExampleTopic({
fieldForPayload: 1,
}, {
messageKey: KafkaMessageKey.NULL, //If you don't want to specify message key
})
await messageQueue.publishSingleMessage(payload)
import type { pino } from "pino";
import {
KTHandler,
CreateKTTopic,
KafkaClientId,
KafkaMessageKey,
KafkaTopicName,
KTMessageQueue
} from "kafka-trail";
// Another dependency example
class DatabaseClass {
#client: string
constructor () {
this.#client = 'test-client'
}
getClient() {
return this.#client
}
}
const dbClass = new DatabaseClass()
const kafkaBrokerUrls = ["localhost:19092"];
// Create a MessageQueue instance
const messageQueue = new KTMessageQueue({
// If you want pass context available in handler
ctx: () => {
return {
dbClass,
}
},
});
export const { BaseTopic: TestExampleTopic } = CreateKTTopic<{
fieldForPayload: number
}>({
topic: KafkaTopicName.fromString('test.example'),
numPartitions: 1,
batchMessageSizeToConsume: 10, // Works is batchConsuming = true
})
// Create topic handler
const testExampleTopicHandler = KTHandler({
topic: TestExampleTopic,
run: async (payload, ctx: {logger: pino.Logger, dbClass: typeof dbClass}) => {
// Ts will show you right type for `payload` variable from `TestExampleTopic`
// Ctx passed from KTMessageQueue({ctx: () => {...}})
const [data] = payload
if (!data) {
return Promise.resolve()
}
const logger = ctx.logger.child({
payload: data.fieldForPayload,
})
logger.info(dbClass.getClient())
return Promise.resolve()
},
})
messageQueue.registerHandlers([
testExampleTopicHandler,
])
// Start consumer
await messageQueue.initConsumer({
kafkaSettings: {
brokerUrls: kafkaBrokerUrls,
clientId: KafkaClientId.fromString('hostname'),
connectionTimeout: 30_000,
consumerGroupId: 'consumer-group-id',
batchConsuming: true // default false
},
})
import {
KTHandler,
CreateKTTopic,
KafkaClientId,
KafkaMessageKey,
KafkaTopicName,
KTMessageQueue
} from "kafka-trail";
const kafkaBrokerUrls = ["localhost:19092"];
// Create a MessageQueue instance
const messageQueue = new KTMessageQueue();
// Create topic fn
const { BaseTopic: TestExampleTopic } = CreateKTTopic<{
fieldForPayload: number
}>({
topic: KafkaTopicName.fromString('test.example'),
numPartitions: 1,
batchMessageSizeToConsume: 10, // Works is batchConsuming = true
})
// Required, because inside handler we are going to publish data
await messageQueue.initProducer({
kafkaSettings: {
brokerUrls: kafkaBrokerUrls,
clientId: KafkaClientId.fromString('hostname'),
connectionTimeout: 30_000,
},
})
// Create or use topic
await messageQueue.initTopics([
TestExampleTopic,
])
// Create topic handler
const testExampleTopicHandler = KTHandler({
topic: TestExampleTopic,
run: async (payload, _, publisher, { heartbeat, partition, lastOffset, resolveOffset }) => { // resolveOffset available for batchConsuming = true only
// Ts will show you right type for `payload` variable from `TestExampleTopic`
const [data] = payload
if (!data) {
return Promise.resolve()
}
const newPayload = TestExampleTopic({
fieldForPayload: data.fieldForPayload + 1,
}, {
messageKey: KafkaMessageKey.NULL,
})
await publisher.publishSingleMessage(newPayload)
},
})
messageQueue.registerHandlers([
testExampleTopicHandler,
])
// Start consumer
await messageQueue.initConsumer({
kafkaSettings: {
brokerUrls: kafkaBrokerUrls,
clientId: KafkaClientId.fromString('hostname'),
connectionTimeout: 30_000,
consumerGroupId: 'consumer-group-id',
batchConsuming: true // default false
},
})
const messageQueue = new MessageQueue();
process.on("SIGINT", async () => {
await messageQueue.destroyAll()
});
process.on("SIGTERM", async () => {
await messageQueue.destroyAll()
});
By default, lib using LZ4 codec to compress and decompress data.
You can override it, by passing via KTKafkaSettings type. Be careful - producer and consumer should have same codec.
Ref docs. Example:
import { KTMessageQueue } from "kafka-trail";
import { CompressionTypes } from "kafkajs";
import lz4 from "lz4";
// Instanciate messageQueue
const kafkaBrokerUrls = ["localhost:19092"];
const messageQueue = new KTMessageQueue();
await messageQueue.initProducer({
kafkaSettings: {
brokerUrls: kafkaBrokerUrls,
clientId: KafkaClientId.fromString('hostname'),
connectionTimeout: 30_000,
compressionCodec: {
codecType: CompressionTypes.LZ4,
codecFn: {
compress(encoder: Buffer) {
return lz4.encode(encoder);
},
decompress<T>(buffer: Buffer) {
return lz4.decode(buffer) as T;
},
},
},
},
})
You can provide custom encoders / decoders for sending / receiving data. Example:
type MyModel = {
fieldForPayload: number
}
const { BaseTopic: TestExampleTopic } = CreateKTTopic<MyModel>({
topic: KafkaTopicName.fromString('test.example'),
numPartitions: 1,
batchMessageSizeToConsume: 10, // Works is batchConsuming = true
}, {
encode: (data) => {
return JSON.stringify(data)
},
decode: (data: string | Buffer) => {
if (Buffer.isBuffer(data)) {
data = data.toString()
}
return JSON.parse(data) as MyModel
},
})
You can send batch messages instead of sending one by one, but it required a little different usage. Example:
// Create topic fn
const { BaseTopic: TestExampleTopic } = CreateKTTopicBatch({
topic: KafkaTopicName.fromString('test.example'),
numPartitions: 1,
batchMessageSizeToConsume: 10,
})
// Create or use topic
await messageQueue.initTopics([
TestExampleTopic,
])
// Use publishSingleMessage method to publish message
const payload = TestExampleTopic([{
value: {
test: 1,
test2: 2,
},
key: '1',
}, {
value: {
test: 3,
test2: 4,
},
key: '2',
}, {
value: {
test: 5,
test2: 6,
},
key: '3',
}])
await messageQueue.publishBatchMessages(payload)
Automatically route failed messages to DLQ topics for later analysis and reprocessing.
// DLQ topics are automatically created with 'dlq.' prefix
const { BaseTopic: TestExampleTopic, DLQTopic: DLQTestExampleTopic } = CreateKTTopic<MyPayload>({
topic: KafkaTopicName.fromString('my.topic'),
numPartitions: 1,
batchMessageSizeToConsume: 10,
createDLQ: true, // Enables DLQ
})
// Create or use topic
await messageQueue.initTopics([
TestExampleTopic,
DLQTestExampleTopic
])
// Failed messages automatically sent to: dlq.my.topic with next model:
{
originalOffset: "123",
originalTopic: "user.events",
originalPartition: 0,
key: '["user123","user456"]',
value: [
{ userId: "user123", action: "login" },
{ userId: "user456", action: "logout" }
],
errorMessage: "Database connection failed",
failedAt: 1703123456789
}
Contributions are welcome! If you’d like to improve this library:
This library is open-source and licensed under the MIT License.
FAQs
A Node.js library for managing message queue with Kafka
We found that @awesomeniko/kafka-trail demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
New DoS and source code exposure bugs in React Server Components and Next.js: what’s affected and how to update safely.

Security News
Socket CEO Feross Aboukhadijeh joins Software Engineering Daily to discuss modern software supply chain attacks and rising AI-driven security risks.

Security News
GitHub has revoked npm classic tokens for publishing; maintainers must migrate, but OpenJS warns OIDC trusted publishing still has risky gaps for critical projects.