Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

github.com/pinpoint-apm/pinpoint-go-agent/plugin/sarama

Package Overview
Dependencies
Alerts
File Explorer
Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

github.com/pinpoint-apm/pinpoint-go-agent/plugin/sarama

  • v1.4.5
  • Source
  • Go
  • Socket score

Version published
Created
Source

ppsarama

This package instruments the Shopify/sarama package.

Installation

$ go get github.com/pinpoint-apm/pinpoint-go-agent/plugin/sarama
import "github.com/pinpoint-apm/pinpoint-go-agent/plugin/sarama"

Usage

PkgGoDev

This package instruments Kafka consumers and producers.

Consumer

To instrument a Kafka consumer, ConsumeMessageContext. In order to display the kafka broker on the pinpoint screen, a context with broker addresses must be created and delivered using NewContext.

ConsumePartition example:

ctx := ppsarama.NewContext(context.Background(), broker)
pc, _ := consumer.ConsumePartition(topic, partition, offset)
for msg := range pc.Messages() {
    ppsarama.ConsumeMessageContext(processMessage, ctx, msg)
}

ConsumerGroupHandler example:

func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    ctx := sess.Context()
    for msg := range claim.Messages() {
        _ = ppsarama.ConsumeMessageContext(process, ctx, msg)
    }
}

func main() {     
    ctx := ppsarama.NewContext(context.Background(), broker)
    handler := exampleConsumerGroupHandler{}
    err := group.Consume(ctx, topics, handler)

ConsumeMessageContext passes a context added pinpoint.Tracer to HandlerContextFunc. In HandlerContextFunc, this tracer can be obtained by using the pinpoint.FromContext function. Alternatively, the context may be propagated where the context that contains the pinpoint.Tracer is required.

func process(ctx context.Context, msg *sarama.ConsumerMessage) error {
    tracer := pinpoint.FromContext(ctx)
    defer tracer.NewSpanEvent("process").EndSpanEvent()

    fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
package main

import (
    "github.com/Shopify/sarama"
    "github.com/pinpoint-apm/pinpoint-go-agent"
    "github.com/pinpoint-apm/pinpoint-go-agent/plugin/sarama"
)

func processMessage(ctx context.Context, msg *sarama.ConsumerMessage) error {
    tracer := pinpoint.FromContext(ctx)
    defer tracer.NewSpanEvent("processMessage").EndSpanEvent()
    fmt.Println("retrieving message: ", string(msg.Value))
    ...
}

func subscribe() {
    broker := []string{"localhost:9092"}
    config := sarama.NewConfig()
    consumer, err := sarama.NewConsumer(broker, config)
    ...
    
    ctx := ppsarama.NewContext(context.Background(), broker)
    for _, partition := range partitionList {
        pc, _ := consumer.ConsumePartition(topic, partition, initialOffset)

        go func(pc sarama.PartitionConsumer) {
            for msg := range pc.Messages() {
                ppsarama.ConsumeMessageContext(processMessage, ctx, msg)
            }
        }(pc)
	...	
}

func main() {
    ... //setup agent

    subscribe()
}

Full Example Source

Producer

SyncProducer

To instrument a Kafka sync producer, use NewSyncProducer.

config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := ppsarama.NewSyncProducer(brokers, config)

Use SendMessageContext with the context containing the pinpoint.Tracer. You can also use SendMessage with WithContext, but we recommend using SendMessageContext because the WithContext is not thread-safe.

ctx := pinpoint.NewContext(context.Background(), tracer)
partition, offset, err := producer.SendMessageContext(ctx, msg)
package main

import (
    "github.com/Shopify/sarama"
    "github.com/pinpoint-apm/pinpoint-go-agent"
    "github.com/github.com/pinpoint-apm/pinpoint-go-agent/plugin/sarama"
)

func prepareMessage(topic, message string) *sarama.ProducerMessage {
    return &sarama.ProducerMessage{
        Topic:     topic,
        Value:     sarama.StringEncoder(message),
    }
}

func save(w http.ResponseWriter, r *http.Request) {
    msg := prepareMessage("topic", "Hello, Kafka!!")
    partition, offset, err := producer.SendMessageContext(r.Context(), msg)
    ...
}

var producer sarama.SyncProducer

func main() {
    ... //setup agent
	
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    ...

    producer, err := ppsarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    http.HandleFunc("/save", pphttp.WrapHandlerFunc(save))
}

Full Example Source

AsyncProducer

To instrument a Kafka async producer, use NewAsyncProducer.

config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := ppsarama.NewAsyncProducer(brokers, config)

Use InputContext with the context containing the pinpoint.Tracer. You can also use Input with WithContext, but we recommend using InputContext because the WithContext is not thread-safe.

ctx := pinpoint.NewContext(context.Background(), tracer)
producer.InputContext(ctx, msg)
package main

import (
    "github.com/Shopify/sarama"
    "github.com/pinpoint-apm/pinpoint-go-agent"
    "github.com/github.com/pinpoint-apm/pinpoint-go-agent/plugin/sarama"
)

func prepareAsyncMessage(topic, message string) *sarama.ProducerMessage {
    return &sarama.ProducerMessage{
        Topic:     topic,
        Value:     sarama.StringEncoder(message),
    }
}

func saveAsync(w http.ResponseWriter, r *http.Request) {
    msg := prepareMessage("topic", "Hello, Kafka!!")
    producer.InputContext(r.Context(), msg)
    ...
}

var producer sarama.AsyncProducer

func main() {
    ... //setup agent
	
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    ...

    producer, err := ppsarama.NewAsyncProducer([]string{"127.0.0.1:9092"}, config)
    go func() {
        for {
            select {
            case success := <-producer.Successes():
                log.Printf("Partition %d at offset %d\n", success.Partition, success.Offset)
            case err := <-producer.Errors():
                log.Printf("Failed to send message: %v", err)
            }
        }
    }()

    http.HandleFunc("/save", pphttp.WrapHandlerFunc(saveAsync))
}

Full Example Source

FAQs

Package last updated on 18 Dec 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc