🚨 Shai-Hulud Strikes Again:834 Packages Compromised.Technical Analysis →
Socket
Book a DemoInstallSign in
Socket

@awesomeniko/kafka-trail

Package Overview
Dependencies
Maintainers
1
Versions
30
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@awesomeniko/kafka-trail

A Node.js library for managing message queue with Kafka

latest
Source
npmnpm
Version
0.1.1
Version published
Maintainers
1
Created
Source

Kafka-trail - MessageQueue Library

A Node.js library for managing message queues with Kafka, designed to simplify creating, using, and managing Kafka topics with producers and consumers.

Based on Kafkajs

Features

  • Fully in typescript
  • Branded types
  • Connect to Kafka brokers easily.
  • Create or use existing Kafka topics with specified partitions.
  • Initialize the message queue with minimal setup.
  • Setup consumer handlers
  • Compressing (see)
  • Supports custom encoders/decoders.

Installation

Install the library using npm or Yarn:

npm install kafka-trail

Or with Yarn:

yarn add kafka-trail

Usage

Here’s an example of how to use the kafka-trail library in your project.

If you want only producer:

// Define your Kafka broker URLs
import { 
  CreateKTTopic, 
  KafkaClientId, 
  KafkaMessageKey, 
  KafkaTopicName, 
  KTMessageQueue 
} from "kafka-trail";

const kafkaBrokerUrls = ["localhost:19092"];

// Create a MessageQueue instance
const messageQueue = new KTMessageQueue();

// Start producer
await messageQueue.initProducer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
  },
})

// Create topic fn
const { BaseTopic: TestExampleTopic } = CreateKTTopic<{
  fieldForPayload: number
}>({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10, // Works is batchConsuming = true
})

// Create or use topic
await messageQueue.initTopics([
  TestExampleTopic,
])

// Use publishSingleMessage method to publish message
const payload = TestExampleTopic({
  fieldForPayload: 1,
}, {
  messageKey: KafkaMessageKey.NULL, //If you don't want to specify message key
})

await messageQueue.publishSingleMessage(payload)

If you want consumer only:

import type  { pino } from "pino";

import { 
  KTHandler, 
  CreateKTTopic, 
  KafkaClientId, 
  KafkaMessageKey, 
  KafkaTopicName, 
  KTMessageQueue 
} from "kafka-trail";

// Another dependency example
class DatabaseClass {
  #client: string
  constructor () {
    this.#client = 'test-client'
  }

  getClient() {
    return this.#client
  }
}

const dbClass = new DatabaseClass()

const kafkaBrokerUrls = ["localhost:19092"];

// Create a MessageQueue instance
const messageQueue = new KTMessageQueue({
  // If you want pass context available in handler
  ctx: () => {
    return {
      dbClass,
    }
  },
});

export const { BaseTopic: TestExampleTopic } = CreateKTTopic<{
  fieldForPayload: number
}>({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10, // Works is batchConsuming = true
})

// Create topic handler
const testExampleTopicHandler = KTHandler({
  topic: TestExampleTopic,
  run: async (payload, ctx: {logger: pino.Logger, dbClass: typeof dbClass}) => {
    // Ts will show you right type for `payload` variable from `TestExampleTopic`
    // Ctx passed from KTMessageQueue({ctx: () => {...}})

    const [data] = payload

    if (!data) {
      return Promise.resolve()
    }

    const logger = ctx.logger.child({
      payload: data.fieldForPayload,
    })

    logger.info(dbClass.getClient())

    return Promise.resolve()
  },
})

messageQueue.registerHandlers([
  testExampleTopicHandler,
])

// Start consumer
await messageQueue.initConsumer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
    consumerGroupId: 'consumer-group-id', 
    batchConsuming: true // default false
  },
})

For both consumer and producer:

import { 
  KTHandler, 
  CreateKTTopic, 
  KafkaClientId, 
  KafkaMessageKey, 
  KafkaTopicName, 
  KTMessageQueue 
} from "kafka-trail";

const kafkaBrokerUrls = ["localhost:19092"];

// Create a MessageQueue instance
const messageQueue = new KTMessageQueue();

// Create topic fn
const { BaseTopic: TestExampleTopic } = CreateKTTopic<{
  fieldForPayload: number
}>({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10, // Works is batchConsuming = true
})

// Required, because inside handler we are going to publish data
await messageQueue.initProducer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
  },
})

// Create or use topic
await messageQueue.initTopics([
  TestExampleTopic,
])

// Create topic handler
const testExampleTopicHandler = KTHandler({
  topic: TestExampleTopic,
  run: async (payload, _, publisher, { heartbeat, partition, lastOffset, resolveOffset }) => { // resolveOffset available for batchConsuming = true only
    // Ts will show you right type for `payload` variable from `TestExampleTopic`

    const [data] = payload

    if (!data) {
      return Promise.resolve()
    }

    const newPayload = TestExampleTopic({
      fieldForPayload: data.fieldForPayload + 1,
    }, {
      messageKey: KafkaMessageKey.NULL,
    })

    await publisher.publishSingleMessage(newPayload)
  },
})

messageQueue.registerHandlers([
  testExampleTopicHandler,
])

// Start consumer
await messageQueue.initConsumer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
    consumerGroupId: 'consumer-group-id', 
    batchConsuming: true // default false
  },
})

Destroying all will help you perform graceful shutdown

const messageQueue = new MessageQueue();

process.on("SIGINT", async () => {
  await messageQueue.destroyAll()
});

process.on("SIGTERM", async () => {
  await messageQueue.destroyAll()
});

Configurations

Compression codec

By default, lib using LZ4 codec to compress and decompress data. You can override it, by passing via KTKafkaSettings type. Be careful - producer and consumer should have same codec. Ref docs. Example:

import { KTMessageQueue } from "kafka-trail";
import { CompressionTypes } from "kafkajs";
import lz4 from "lz4";

// Instanciate messageQueue
const kafkaBrokerUrls = ["localhost:19092"];

const messageQueue = new KTMessageQueue();

await messageQueue.initProducer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
    compressionCodec: {
      codecType: CompressionTypes.LZ4,
      codecFn: {
        compress(encoder: Buffer) {
          return lz4.encode(encoder);
        },

        decompress<T>(buffer: Buffer) {
          return lz4.decode(buffer) as T;
        },
      },
    },
  },
})

Data encoding / decoding

You can provide custom encoders / decoders for sending / receiving data. Example:

type MyModel = {
  fieldForPayload: number
}

const { BaseTopic: TestExampleTopic } = CreateKTTopic<MyModel>({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10, // Works is batchConsuming = true
}, {
  encode: (data) => {
    return JSON.stringify(data)
  },
  decode: (data: string | Buffer) => {
    if (Buffer.isBuffer(data)) {
      data = data.toString()
    }

    return JSON.parse(data) as MyModel
  },
})

Sending batch messages

You can send batch messages instead of sending one by one, but it required a little different usage. Example:

// Create topic fn
const { BaseTopic: TestExampleTopic } = CreateKTTopicBatch({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10,
})

// Create or use topic
await messageQueue.initTopics([
  TestExampleTopic,
])

// Use publishSingleMessage method to publish message
const payload = TestExampleTopic([{
  value: {
    test: 1,
    test2: 2,
  },
  key: '1',
}, {
  value: {
    test: 3,
    test2: 4,
  },
  key: '2',
}, {
  value: {
    test: 5,
    test2: 6,
  },
  key: '3',
}])

await messageQueue.publishBatchMessages(payload)

Dead Letter Queue (DLQ)

Automatically route failed messages to DLQ topics for later analysis and reprocessing.

// DLQ topics are automatically created with 'dlq.' prefix
const { BaseTopic: TestExampleTopic, DLQTopic: DLQTestExampleTopic } = CreateKTTopic<MyPayload>({
  topic: KafkaTopicName.fromString('my.topic'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10,
  createDLQ: true, // Enables DLQ
})

// Create or use topic
await messageQueue.initTopics([
  TestExampleTopic,
  DLQTestExampleTopic
])

// Failed messages automatically sent to: dlq.my.topic with next model:
{
  originalOffset: "123",
  originalTopic: "user.events",
  originalPartition: 0,
  key: '["user123","user456"]',
  value: [
    { userId: "user123", action: "login" },
    { userId: "user456", action: "logout" }
  ],
  errorMessage: "Database connection failed",
  failedAt: 1703123456789
}

Contributing

Contributions are welcome! If you’d like to improve this library:

  • Fork the repository.
  • Create a new branch.
  • Make your changes and submit a pull request.

License

This library is open-source and licensed under the MIT License.

Keywords

Kafka

FAQs

Package last updated on 05 Dec 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts