upstash-kafka
An HTTP/REST based Kafka client built on top of
Upstash REST API.
It is the only connectionless (HTTP based) Kafka client and designed for:
- Serverless functions (AWS Lambda ...)
- Cloudflare Workers (see the example)
- Fastly Compute@Edge
- Next.js Edge, Remix ...
- Client side web/mobile applications
- WebAssembly and other environments where HTTP is preferred over TCP
connections.
Installation
npm install @upstash/kafka
Quickstart
Auth
- Go to upstash and select your database.
- Copy the
REST API
secrets at the bottom of the page
import { Kafka } from "@upstash/kafka"
const kafka = new Kafka({
url: "<UPSTASH_KAFKA_REST_URL>",
username: "<UPSTASH_KAFKA_REST_USERNAME>",
password: "<UPSTASH_KAFKA_REST_PASSWORD>",
})
Produce a single message
const p = kafka.producer()
const message = { hello: "world" }
const res = await p.produce("<my.topic>", message)
const res = await p.produce("<my.topic>", message, {
partition: 1,
timestamp: 12345,
key: "<custom key>",
headers: [{ key: "traceId", value: "85a9f12" }],
})
Produce multiple messages.
The same options from the example above can be set for every message.
const p = kafka.producer()
const res = await p.produceMany([
{
topic: "my.topic",
value: { hello: "world" },
},
{
topic: "another.topic",
value: "another message",
},
])
Consume
The first time a consumer is created, it needs to figure out the group
coordinator by asking the Kafka brokers and joins the consumer group. This
process takes some time to complete. That's why when a consumer instance is
created first time, it may return empty messages until consumer group
coordination is completed.
const c = kafka.consumer()
const messages = await c.consume({
consumerGroupId: "group_1",
instanceId: "instance_1",
topics: ["test.topic"],
autoOffsetReset: "earliest",
})
More examples can be found in the
docstring
Commit manually
While consume
can handle committing automatically, you can also use
Consumer.commit
to manually commit.
const consumerGroupId = "mygroup"
const instanceId = "myinstance"
const topic = "my.topic"
const c = kafka.consumer()
const messages = await c.consume({
consumerGroupId,
instanceId,
topics: [topic],
autoCommit: false,
})
for (const message of messages) {
await c.commit({
consumerGroupId,
instanceId,
offset: {
topic: message.topic,
partition: message.partition,
offset: message.offset,
},
})
}
Fetch
You can also manage offsets manually by using Consumer.fetch
const c = kafka.consumer()
const messages = await c.fetch({
topic: "greeting",
partition: 3,
offset: 42,
timeout: 1000,
})
Examples
See /examples as
well as various examples in the docstrings of each method.
Contributing
Requirements
Setup
-
Install dependencies using pnpm install
-
Create a kafka instance on upstash.
docs
-
Create the following topics: blue
, red
, green
.
docs
The partitions or retention settings don't matter at this time.
-
Create .env
file with your kafka secrets cp .env.example .env
Running tests
pnpm test