Socket
Socket
Sign inDemoInstall

@upstash/kafka

Package Overview
Dependencies
0
Maintainers
6
Versions
16
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

    @upstash/kafka

An HTTP/REST based Kafka client built on top of Upstash REST API.


Version published
Weekly downloads
4.4K
increased by57.52%
Maintainers
6
Created
Weekly downloads
 

Readme

Source

upstash-kafka

An HTTP/REST based Kafka client built on top of Upstash REST API.

codecov npm (scoped) npm bundle size

It is the only connectionless (HTTP based) Kafka client and designed for:

  • Serverless functions (AWS Lambda ...)
  • Cloudflare Workers (see the example)
  • Fastly Compute@Edge
  • Next.js Edge, Remix ...
  • Client side web/mobile applications
  • WebAssembly and other environments where HTTP is preferred over TCP connections.

Installation

npm install @upstash/kafka

Quickstart

Auth

  1. Go to upstash and select your database.
  2. Copy the REST API secrets at the bottom of the page
import { Kafka } from "@upstash/kafka"

const kafka = new Kafka({
  url: "<UPSTASH_KAFKA_REST_URL>",
  username: "<UPSTASH_KAFKA_REST_USERNAME>",
  password: "<UPSTASH_KAFKA_REST_PASSWORD>",
})

Produce a single message

const p = kafka.producer()
const message = { hello: "world" } // Objects will get serialized using `JSON.stringify`
const res = await p.produce("<my.topic>", message)
const res = await p.produce("<my.topic>", message, {
  partition: 1,
  timestamp: 12345,
  key: "<custom key>",
  headers: [{ key: "traceId", value: "85a9f12" }],
})

Produce multiple messages.

The same options from the example above can be set for every message.

const p = kafka.producer()
const res = await p.produceMany([
  {
    topic: "my.topic",
    value: { hello: "world" },
    // ...options
  },
  {
    topic: "another.topic",
    value: "another message",
    // ...options
  },
])

Consume

The first time a consumer is created, it needs to figure out the group coordinator by asking the Kafka brokers and joins the consumer group. This process takes some time to complete. That's why when a consumer instance is created first time, it may return empty messages until consumer group coordination is completed.

const c = kafka.consumer()
const messages = await c.consume({
  consumerGroupId: "group_1",
  instanceId: "instance_1",
  topics: ["test.topic"],
  autoOffsetReset: "earliest",
})

More examples can be found in the docstring

Commit manually

While consume can handle committing automatically, you can also use Consumer.commit to manually commit.

const consumerGroupId = "mygroup"
const instanceId = "myinstance"
const topic = "my.topic"

const c = kafka.consumer()
const messages = await c.consume({
  consumerGroupId,
  instanceId,
  topics: [topic],
  autoCommit: false,
})

for (const message of messages) {
  // message handling logic

  await c.commit({
    consumerGroupId,
    instanceId,
    offset: {
      topic: message.topic,
      partition: message.partition,
      offset: message.offset,
    },
  })
}

Fetch

You can also manage offsets manually by using Consumer.fetch

const c = kafka.consumer()
const messages = await c.fetch({
  topic: "greeting",
  partition: 3,
  offset: 42,
  timeout: 1000,
})

Examples

See /examples as well as various examples in the docstrings of each method.

Contributing

Requirements

Setup

  1. Install dependencies using pnpm install

  2. Create a kafka instance on upstash. docs

  3. Create the following topics: blue, red, green. docs

    The partitions or retention settings don't matter at this time.

  4. Create .env file with your kafka secrets cp .env.example .env

Running tests

pnpm test

Keywords

FAQs

Last updated on 17 Jan 2024

Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc