
Research
Malicious npm Packages Impersonate Flashbots SDKs, Targeting Ethereum Wallet Credentials
Four npm packages disguised as cryptographic tools steal developer credentials and send them to attacker-controlled Telegram infrastructure.
A robust npm package for handling dynamic topic creation, subscription, and management in multi-instance microservice architectures. Built on top of KafkaJS, this package provides a simplified interface for Kafka operations while maintaining full flexibil
A robust npm package for handling dynamic topic creation, subscription, and management in multi-instance microservice architectures. Built on top of KafkaJS, this package provides a simplified interface for Kafka operations while maintaining full flexibility and control.
npm install js-kafka
Topic Updates
Topic (One-Time Setup)⚠️ IMPORTANT: Before using js-kafka in your environment, you must create a special topic named topic-updates
once. This topic facilitates dynamic topic subscription for all consumers.
# Create the topic-updates topic (one-time setup per environment)
kafka-topics.sh --create --topic topic-updates --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
This topic is used internally by js-kafka to manage dynamic topic registration and subscription across all microservice instances.
js-kafka uses a specific topic naming convention for dynamic subscription:
Pattern: {env}-{serviceName}-{entityName}.{topicName}
Examples:
dev-user-service-user123.events
prod-order-service-order456.notifications
staging-payment-service-payment789.transactions
How it works:
topicName
(e.g., "events"), the consumer will automatically subscribe to ALL topics that end with .events
⚠️ WARNING: Do NOT create topics with dots (.
) in the base topic name when using standard topic creation.
Incorrect: ❌
// Don't do this - dots should only be used in the naming convention
await kafka.producer.sendMessage("user.events.topic", message);
Correct: ✅
// Use the naming convention or simple names without dots
await kafka.producer.sendMessage("user-events", message);
// or let js-kafka handle the naming convention internally
The dot (.
) is reserved for the dynamic topic subscription pattern and should only appear before the final topic name segment.
import { getKafkaClient } from "js-kafka";
const kafka = getKafkaClient({
env: "dev",
brokers: ["localhost:9092"],
clientId: "kafka-default-service",
serviceName: "user-service",
partitions: 1, // optional
replicationFactor: 1, // optional
acks: 1, // optional
});
const kafka = getKafkaClient(
{
env: "dev",
brokers: ["localhost:9092"],
clientId: "kafka-default-service",
serviceName: "user-service",
},
{
isSingletonEnabled: boolean, // It will return only one instance if called multiple times by defalut it is true.
}
);
Important: Register all consumers before calling the init()
method.
// Register batch consumer - will subscribe to all topics ending with .user-events
kafka.registry.registerBatch("user-events", handleBatchMessages, {
consumerGroup: "my-service-group",
fromBeginning: false,
});
// Register single message consumer - will subscribe to all topics ending with .user-events
kafka.registry.registerSingle("user-events", handleSingleMessage, {
consumerGroup: "my-service-group",
fromBeginning: false,
});
// This is optional
{
consumerGroup: "my-service-group",
fromBeginning: false,
}
await kafka.init();
const message = {
key: "user-123",
value: {
userId: "123",
action: "login",
timestamp: new Date().toISOString(),
},
};
await kafka.producer.sendMessage("user-events", message, "user-123", {
acks: 1,
});
Note: How acks
priority works.
By default, the producer is configured to acks the messages with the following logic:
When you register a consumer with a topic name like 'user-events'
, js-kafka automatically:
*.user-events
topic-updates
topic for new topic registrations// Consumer registration
kafka.registry.registerBatch("notifications", handleNotifications, {
consumerGroup: "notification-consumer-group",
});
// This consumer will automatically receive messages from ALL of these topics:
// - dev-user-service-user123.notifications
// - dev-order-service-order456.notifications
// - dev-payment-service-payment789.notifications
// - Any future topics ending with .notifications
// These will create topics following the naming convention:
await kafka.producer.sendMessage("user-events", message, "user123");
// Creates: {env}-{serviceName}-user123.user-events
await kafka.producer.sendMessage("order-updates", message, "order456");
// Creates: {env}-{serviceName}-order456.order-updates
await kafka.producer.sendMessage("payments", message, "payment789");
// Creates: {env}-{serviceName}-payment789.payments
Ensure proper cleanup when shutting down your application. This will disconnect all the consumers, producers and admin connections.
await kafka.shutdown();
getkafkaClient(config)
Creates a new Kafka client instance.
Parameters:
Parameter | Type | Required | Description |
---|---|---|---|
env | string | Yes | Environment (e.g., 'development', 'production') |
brokers | string[] | Yes | Array of Kafka broker addresses |
clientId | string | Yes | Unique client identifier |
serviceName | string | Yes | Name of your service used for topic creation |
partitions | number | No | Default number of partitions for topics |
replicationFactor | number | No | Default replication factor for topics |
acks | number | No | Default acknowledgment setting |
producer.sendMessage(topic, message, entityId, options)
Sends a message to a specified topic.
Parameters:
topic
(string): The base topic name (without dots)message
(MessagePayload): The message payloadentityId
(string): Entity identifier for partitioning and topic namingoptions
(IKafkaProducerOptions): Producer optionsMessagePayload Interface:
interface MessagePayload {
key?: string;
value: any;
timestamp?: string;
partition?: number;
headers?: IHeaders;
}
IKafkaProducerOptions Interface:
interface IKafkaProducerOptions {
acks?: number;
}
registry.registerBatch(topic, handler, options)
Registers a batch message handler for a topic pattern.
Parameters:
topic
(string): Base topic name to subscribe to (will match all topics ending with .{topic}
)handler
(function): Callback function to handle messagesoptions
(ITopicRegistryOptions): Consumer optionsHandler Signature:
async function handleBatchMessages(params: { topic: string; messages: any[] }) {
// Handle batch of messages
// topic will be the full topic name (e.g., 'dev-user-service-user123.events')
}
registry.registerSingle(topic, handler, options)
Registers a single message handler for a topic pattern.
Parameters:
topic
(string): Base topic name to subscribe to (will match all topics ending with .{topic}
)handler
(function): Callback function to handle individual messagesoptions
(ITopicRegistryOptions): Consumer optionsHandler Signature:
async function handleSingleMessage(params: {
topic: string;
message: any;
partition: number;
offset: string;
}) {
// Handle single message
// topic will be the full topic name (e.g., 'dev-user-service-user123.events')
}
import { getkafkaClient } from "js-kafka";
class KafkaService {
constructor() {
this._kafka = getkafkaClient({
env: "dev", // e.g., 'dev'
brokers: ["localhost:9092"],
clientId: "kafka-default-service",
serviceName: "user-service", // e.g., 'user-service'
partitions: 1,
replicationFactor: 1,
});
}
async initialize() {
// Register consumers before init
// This will subscribe to all topics ending with .user-events
this._kafka.registry.registerBatch(
"user-events",
this.handleUserEvents.bind(this),
{
consumerGroup: "user-service-group",
fromBeginning: false,
}
);
// This will subscribe to all topics ending with .notifications
this._kafka.registry.registerSingle(
"notifications",
this.handleNotification.bind(this),
{
consumerGroup: "notification-service-group",
fromBeginning: true,
}
);
// Initialize the client
await this._kafka.init();
console.log("Kafka client initialized");
}
// Batch message function
async handleUserEvents(params) {
const { topic, messages } = params;
console.log(
`[Kafka][Batch][${topic}] Received ${messages.length} messages`
);
for (const message of messages) {
console.log(`Processing message for topic ${topic}:`, message);
}
}
// Single message function
async handleNotification(params) {
const { topic, message, partition, offset } = params;
console.log(
`[Kafka][Single][${topic}] Message from partition ${partition}, offset ${offset}`
);
console.log("Message:", message);
}
async sendUserEvent(userId, eventData) {
const message = {
key: userId,
value: {
userId,
...eventData,
timestamp: new Date().toISOString(),
},
headers: {
source: "user-service",
},
};
// This will create topic: dev-user-service-user123.user-events if not present and send the message to the topic created
await this._kafka.producer.sendMessage("user-events", message, userId);
}
async sendNotification(userId, notificationData) {
const message = {
key: userId,
value: notificationData,
};
// This will create topic: dev-user-service-user123.notifications if not present and send the message to the topic created
await this._kafka.producer.sendMessage("notifications", message, userId);
}
}
// Usage
const kafkaService = new KafkaService();
await kafkaService.initialize();
// Send messages that will be automatically routed to appropriate consumers
await kafkaService.sendUserEvent("user-123", {
action: "profile_updated",
data: { email: "user@example.com" },
});
await kafkaService.sendNotification("user-123", {
type: "email",
subject: "Profile Updated",
body: "Your profile has been successfully updated.",
});
topic-updates
topic is created before deploying any servicesinit()
.
) in your base topic names - let js-kafka handle the naming conventionawait kafka.producer.sendMessage("user-events", message, entityId);
await kafka.producer.sendMessage("order-updates", message, entityId);
await kafka.producer.sendMessage("payment-transactions", message, entityId);
await kafka.producer.sendMessage("notifications", message, entityId);
// Don't use dots in base topic names
await kafka.producer.sendMessage("user.events", message, entityId); // ❌
await kafka.producer.sendMessage("order.status.updates", message, entityId); // ❌
// These are reserved patterns
await kafka.producer.sendMessage("topic-updates", message, entityId); // ❌ Reserved
js-kafka comes with a comprehensive test suite covering all functionality.
npm test
npm run test:coverage
npm run test:watch
npm run test:verbose
# Run specific test file
npm test src/config/__tests__/kafka-config.test.ts
# Run all tests in a directory
npm test src/kafka/__tests__/
# Run tests matching a pattern
npm test -- --testNamePattern="KafkaClient"
# Connection manager tests
npm test src/kafka/__tests__/connection-manager.test.ts
# Producer manager tests
npm test src/kafka/__tests__/producer-manager.test.ts
# Main entry point tests
npm test src/__tests__/index.test.ts
After running npm run test:coverage
, you can view detailed coverage reports:
The coverage summary is displayed directly in the terminal showing:
Open the detailed HTML report in your browser:
open coverage/lcov-report/index.html
The HTML report provides:
The test suite is organized as follows:
src/
├── __tests__/ # Main entry point tests
├── config/__tests__/ # Configuration tests
├── enums/__tests__/ # Enum tests
├── kafka/__tests__/ # Kafka component tests
│ ├── admin-manager.test.ts
│ ├── connection-manager.test.ts
│ ├── consumer-manager.test.ts
│ ├── handler-registry.test.ts
│ └── producer-manager.test.ts
├── logger/__tests__/ # Logger tests
└── utils/__tests__/ # Utility tests
When contributing new features, please ensure:
Example test structure:
describe('ComponentName', () => {
beforeEach(() => {
jest.clearAllMocks();
// Reset any singletons or state
});
describe('methodName', () => {
it('should handle normal case', () => {
// Test implementation
});
it('should handle error case', () => {
// Test error scenarios
});
it('should handle edge cases', () => {
// Test boundary conditions
});
});
});
# Clone the repository
git clone <repository-url>
cd js-kafka
# Install dependencies
npm install
# Build the project
npm run build
# Install as local dependencies
npm link
# Install in your own repo using
npm i <repository-path>
# Run tests
npm test
# Run tests in watch mode during development
npm run test:watch
Script | Description |
---|---|
npm test | Run all tests |
npm run test:coverage | Run tests with coverage report |
npm run test:watch | Run tests in watch mode |
npm run test:verbose | Run tests with detailed output |
npm run build | Build TypeScript to JavaScript |
npm run dev | Build in watch mode |
Contributions are welcome! Please follow these guidelines:
npm test
npm test
)npm run build
)This project is licensed under the MIT License - see the LICENSE file for details.
For issues and questions, please open an issue on the GitHub repository.
FAQs
A robust npm package for handling dynamic topic creation, subscription, and management in multi-instance microservice architectures. Built on top of KafkaJS, this package provides a simplified interface for Kafka operations while maintaining full flexibil
The npm package js-kafka receives a total of 55 weekly downloads. As such, js-kafka popularity was classified as not popular.
We found that js-kafka demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Four npm packages disguised as cryptographic tools steal developer credentials and send them to attacker-controlled Telegram infrastructure.
Security News
Ruby maintainers from Bundler and rbenv teams are building rv to bring Python uv's speed and unified tooling approach to Ruby development.
Security News
Following last week’s supply chain attack, Nx published findings on the GitHub Actions exploit and moved npm publishing to Trusted Publishers.