Research
Security News
Quasar RAT Disguised as an npm Package for Detecting Vulnerabilities in Ethereum Smart Contracts
Socket researchers uncover a malicious npm package posing as a tool for detecting vulnerabilities in Etherium smart contracts.
@upstash/kafka
Advanced tools
An HTTP/REST based Kafka client built on top of Upstash REST API.
It is the only connectionless (HTTP based) Kafka client and designed for:
npm install @upstash/kafka
REST API
secrets at the bottom of the pageimport { Kafka } from "@upstash/kafka"
const kafka = new Kafka({
url: "<UPSTASH_KAFKA_REST_URL>",
username: "<UPSTASH_KAFKA_REST_USERNAME>",
password: "<UPSTASH_KAFKA_REST_PASSWORD>",
})
const p = kafka.producer()
const message = { hello: "world" } // Objects will get serialized using `JSON.stringify`
const res = await p.produce("<my.topic>", message)
const res = await p.produce("<my.topic>", message, {
partition: 1,
timestamp: 12345,
key: "<custom key>",
headers: [{ key: "traceId", value: "85a9f12" }],
})
The same options from the example above can be set for every message.
const p = kafka.producer()
const res = await p.produceMany([
{
topic: "my.topic",
value: { hello: "world" },
// ...options
},
{
topic: "another.topic",
value: "another message",
// ...options
},
])
The first time a consumer is created, it needs to figure out the group coordinator by asking the Kafka brokers and joins the consumer group. This process takes some time to complete. That's why when a consumer instance is created first time, it may return empty messages until consumer group coordination is completed.
const c = kafka.consumer()
const messages = await c.consume({
consumerGroupId: "group_1",
instanceId: "instance_1",
topics: ["test.topic"],
autoOffsetReset: "earliest",
})
More examples can be found in the docstring
While consume
can handle committing automatically, you can also use
Consumer.commit
to manually commit.
const consumerGroupId = "mygroup"
const instanceId = "myinstance"
const topic = "my.topic"
const c = kafka.consumer()
const messages = await c.consume({
consumerGroupId,
instanceId,
topics: [topic],
autoCommit: false,
})
for (const message of messages) {
// message handling logic
await c.commit({
consumerGroupId,
instanceId,
offset: {
topic: message.topic,
partition: message.partition,
offset: message.offset,
},
})
}
You can also manage offsets manually by using Consumer.fetch
const c = kafka.consumer()
const messages = await c.fetch({
topic: "greeting",
partition: 3,
offset: 42,
timeout: 1000,
})
See /examples as well as various examples in the docstrings of each method.
Install dependencies using pnpm install
Create a kafka instance on upstash. docs
Create the following topics: blue
, red
, green
.
docs
The partitions or retention settings don't matter at this time.
Create .env
file with your kafka secrets cp .env.example .env
pnpm test
FAQs
An HTTP/REST based Kafka client built on top of Upstash REST API.
The npm package @upstash/kafka receives a total of 2,430 weekly downloads. As such, @upstash/kafka popularity was classified as popular.
We found that @upstash/kafka demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 6 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket researchers uncover a malicious npm package posing as a tool for detecting vulnerabilities in Etherium smart contracts.
Security News
Research
A supply chain attack on Rspack's npm packages injected cryptomining malware, potentially impacting thousands of developers.
Research
Security News
Socket researchers discovered a malware campaign on npm delivering the Skuld infostealer via typosquatted packages, exposing sensitive data.