🚀 DAY 4 OF LAUNCH WEEK:Introducing Socket Scanning for OpenVSX Extensions.Learn more →
Socket
Book a DemoInstallSign in
Socket

@opentelemetry/instrumentation-kafkajs

Package Overview
Dependencies
Maintainers
3
Versions
24
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@opentelemetry/instrumentation-kafkajs

OpenTelemetry instrumentation for `kafkajs` messaging client for Apache Kafka

Source
npmnpm
Version
0.11.0
Version published
Weekly downloads
7.8M
4.8%
Maintainers
3
Weekly downloads
 
Created
Source

OpenTelemetry kafkajs Instrumentation for Node.js

NPM Published Version Apache License

This module provides automatic instrumentation for the kafkajs package, which may be loaded using the @opentelemetry/sdk-trace-node package and is included in the @opentelemetry/auto-instrumentations-node bundle.

If total installation size is not constrained, it is recommended to use the @opentelemetry/auto-instrumentations-node bundle with @opentelemetry/sdk-node for the most seamless instrumentation experience.

Compatible with OpenTelemetry JS API and SDK 1.0+.

Installation

npm install --save @opentelemetry/instrumentation-kafkajs

Supported versions

  • kafkajs versions >=0.3.0 <3

Usage

const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node');
const {
  KafkaJsInstrumentation,
} = require('@opentelemetry/instrumentation-kafkajs');
const { registerInstrumentations } = require('@opentelemetry/instrumentation');

const provider = new NodeTracerProvider();
provider.register();

registerInstrumentations({
  instrumentations: [
    new KafkaJsInstrumentation({
      // see below for available configuration
    }),
  ],
});

Instrumentation Options

You can set the following:

OptionsTypeDescription
producerHookKafkaProducerCustomAttributeFunctionFunction called before a producer message is sent. Allows for adding custom attributes to the span.
consumerHookKafkaConsumerCustomAttributeFunctionFunction called before a consumer message is processed. Allows for adding custom attributes to the span.

Semantic Conventions

This package uses @opentelemetry/semantic-conventions version 1.30+, which implements Semantic Convention Version 1.30.0

Spans Emitted

KafkaJS ObjectActionSpan KindSpan NameOperation Type / Name
ConsumereachBatchClientpoll <topic-name>receive / poll
ConsumereachBatch, eachMessageConsumerprocess <topic-name> [1]process / process
ProducersendProducersend <topic-name>send / send

[1] process <topic-name>: In the context of eachBatch, this span will be emitted for each message in the batch but the timing (start, end, duration) will reflect the timing of the batch.

Metrics Emitted

KafkaJS ObjectMetric NameShort Description
Consumermessaging.process.durationDuration of processing operation. [1]
Consumermessaging.client.consumed.messagesNumber of messages that were delivered to the application.
Consumer and Producermessaging.client.operation.durationNumber of messages that were delivered to the application. (Only emitted for kafkajs@1.5.0 and later.)
Producermessaging.client.sent.messagesNumber of messages producer attempted to send to the broker.

[1] messaging.process.duration: In the context of eachBatch, this metric will be emitted once for each message but the value reflects the duration of the entire batch.

Attributes Collected

These attributes are added to both spans and metrics, where possible.

AttributeShort Description
messaging.systemAn identifier for the messaging system being used (i.e. "kafka").
messaging.destination.nameThe message destination name.
messaging.operation.typeA string identifying the type of messaging operation.
messaging.operation.nameThe system-specific name of the messaging operation.
messaging.operation.nameThe system-specific name of the messaging operation.
messaging.kafka.message.keyA stringified value representing the key of the Kafka message (if present).
messaging.kafka.message.tombstoneA boolean that is true if the message is a tombstone.
messaging.kafka.offsetThe offset of a record in the corresponding Kafka partition.
messaging.destination.partition.idThe identifier of the partition messages are sent to or received from, unique within the messaging.destination.name. Note: only available on producer spans.

License

Apache 2.0 - See LICENSE for more information.

Keywords

kafkajs

FAQs

Package last updated on 02 Jun 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts