Kafka
Kafka is Go client library for Apache Kafka
server, released under MIT license. Originally based on the great
client from: https://github.com/optiopay/kafka
Kafka provides minimal abstraction over wire protocol, support for transparent
failover and easy to use blocking API.
Example
Write all messages from stdin to kafka and print all messages from kafka topic
to stdout.
package main
import (
"bufio"
"log"
"os"
"strings"
"github.com/discord/zorkian-kafka"
"github.com/discord/zorkian-kafka/proto"
)
const (
topic = "my-messages"
partition = 0
)
var kafkaAddrs = []string{"localhost:9092", "localhost:9093"}
func printConsumed(broker kafka.Client) {
conf := kafka.NewConsumerConf(topic, partition)
conf.StartOffset = kafka.StartOffsetNewest
consumer, err := broker.Consumer(conf)
if err != nil {
log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err)
}
for {
msg, err := consumer.Consume()
if err != nil {
if err != kafka.ErrNoData {
log.Printf("cannot consume %q topic message: %s", topic, err)
}
break
}
log.Printf("message %d: %s", msg.Offset, msg.Value)
}
log.Print("consumer quit")
}
func produceStdin(broker kafka.Client) {
producer := broker.Producer(kafka.NewProducerConf())
input := bufio.NewReader(os.Stdin)
for {
line, err := input.ReadString('\n')
if err != nil {
log.Fatalf("input error: %s", err)
}
line = strings.TrimSpace(line)
if line == "" {
continue
}
msg := &proto.Message{Value: []byte(line)}
if _, err := producer.Produce(topic, partition, msg); err != nil {
log.Fatalf("cannot produce message to %s:%d: %s", topic, partition, err)
}
}
}
func main() {
broker, err := kafka.Dial(kafkaAddrs, kafka.NewBrokerConf("test-client"))
if err != nil {
log.Fatalf("cannot connect to kafka cluster: %s", err)
}
defer broker.Close()
go printConsumed(broker)
produceStdin(broker)
}