
Product
Introducing Repository Access Permissions and Custom Roles
Socket now supports Custom Roles and Repository Access Permissions so organizations can control who can access specific repositories and actions.
kafka-outbox-node
Advanced tools
A Node.js library implementing the Kafka Outbox pattern for reliable event publishing in distributed systems. This pattern ensures messages are consistently delivered to Kafka, even when the transaction and message publishing need to happen atomically.
A Node.js library implementing the Kafka Outbox pattern for reliable event publishing in distributed systems. This pattern ensures messages are consistently delivered to Kafka, even when the transaction and message publishing need to happen atomically.
# Latest version
pnpm add kafka-outbox-node
# Or specify version
pnpm add kafka-outbox-node@0.1.1
import { KafkaOutbox, InMemoryOutboxStorage } from 'kafka-outbox-node';
// Create a storage instance (in-memory, PostgreSQL, MySQL, or DynamoDB)
const storage = new InMemoryOutboxStorage();
// Create the outbox with the storage
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
defaultTopic: 'outbox-events',
clientId: 'my-service',
storage,
});
// Connect to Kafka
await outbox.connect();
// Add an event to the outbox
await outbox.addEvent({ message: 'Hello, world!', timestamp: new Date().toISOString() });
// Publish events manually
const count = await outbox.publishEvents();
// The outbox uses the configured logger
// No need for console.log statements
// Or start polling to publish events automatically
outbox.startPolling();
// When shutting down the application
await outbox.disconnect();
// With default Pino logger
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage: new PostgresOutboxStorage({ /* config */ }),
});
// With custom logger
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage: new PostgresOutboxStorage({ /* config */ }),
logger: myCustomLogger // Any logger with trace, debug, info, warn, error methods
});
// With detailed debug logging
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage: new PostgresOutboxStorage({ /* config */ }),
logger: { level: 'debug', prettyPrint: true }
});
import { KafkaOutbox, PostgresOutboxStorage } from 'kafka-outbox-node';
const storage = new PostgresOutboxStorage({
host: 'localhost',
port: 5432,
database: 'outbox',
user: 'postgres',
password: 'postgres',
});
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage,
});
import { KafkaOutbox, MySQLOutboxStorage } from 'kafka-outbox-node';
const storage = new MySQLOutboxStorage({
host: 'localhost',
port: 3306,
database: 'outbox',
user: 'root',
password: 'password',
});
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage,
});
import { KafkaOutbox, MongoDBOutboxStorage } from 'kafka-outbox-node';
const storage = new MongoDBOutboxStorage({
connectionString: 'mongodb://localhost:27017/outbox',
});
// Initialize the collection with proper indexes if needed
await storage.initialize();
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage,
});
import { KafkaOutbox, RedisOutboxStorage } from 'kafka-outbox-node';
const storage = new RedisOutboxStorage({
host: 'localhost',
port: 6379,
keyPrefix: 'outbox:', // Optional
});
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage,
});
import { KafkaOutbox, DynamoDBOutboxStorage } from 'kafka-outbox-node';
const storage = new DynamoDBOutboxStorage({
region: 'us-east-1',
tableName: 'outbox-events',
credentials: {
accessKeyId: 'your-access-key',
secretAccessKey: 'your-secret-key',
},
});
// Initialize the table if it doesn't exist
await storage.initialize();
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage,
});
new KafkaOutbox(config: KafkaOutboxConfig)
interface KafkaOutboxConfig {
kafkaBrokers: string[]; // Array of Kafka broker addresses
defaultTopic?: string; // Default topic if not specified when adding events
clientId?: string; // Client ID for Kafka
storage: OutboxStorage; // Storage adapter implementation
pollInterval?: number; // Interval in ms for polling (default: 5000)
kafkaOptions?: { // Additional Kafka options
ssl?: boolean; // Enable SSL
sasl?: { // Authentication options
mechanism: 'plain' | 'scram-sha-256' | 'scram-sha-512';
username: string;
password: string;
};
connectionTimeout?: number; // Connection timeout in ms
authenticationTimeout?: number; // Authentication timeout in ms
reauthenticationThreshold?: number; // Reauthentication threshold in ms
};
logger?: Logger | LoggerOptions | false; // Customizable logging options
}
connect(): Connect to Kafka.disconnect(): Disconnect from Kafka and stop polling.addEvent(payload: any, topic?: string): Add an event to the outbox.publishEvents(): Publish unpublished events to Kafka.startPolling(): Start polling for unpublished events.stopPolling(): Stop polling for unpublished events.Storage adapters implement this interface:
interface OutboxStorage {
saveEvent(event: OutboxEvent): Promise<void>;
markPublished(id: string): Promise<void>;
getUnpublishedEvents(): Promise<OutboxEvent[]>;
close?(): Promise<void>;
}
The library comes with built-in logging using Pino, but can be configured to work with your existing logging infrastructure:
// 1. Use the default Pino logger (info level)
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage,
// No logger config = default Pino logger with info level
});
// 2. Customize the built-in Pino logger
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage,
logger: {
level: 'debug', // One of: trace, debug, info, warn, error, fatal
name: 'my-service', // Logger name for identification
prettyPrint: true // Pretty print for development
}
});
// 3. Use your own logger (Winston, Bunyan, etc.)
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage,
logger: myCustomLogger // Must implement trace, debug, info, warn, error methods
});
// 4. Disable logging completely
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage,
logger: false
});
// 5. Access the logger from the outbox instance
const { logger } = outbox;
logger.info('Outbox initialized successfully');
logger.debug({ event: myEvent }, 'Adding event to outbox');
For high-throughput applications, you can use Debezium's Change Data Capture (CDC) to stream outbox events directly to Kafka without polling:
// Configure PostgreSQL for Debezium CDC
import { setupPostgresForDebezium } from 'kafka-outbox-node/dist/scripts/init/debezium/setup-postgres';
await setupPostgresForDebezium({
host: 'localhost',
port: 5432,
database: 'mydb',
user: 'postgres',
password: 'password',
tableName: 'outbox_events' // The table containing outbox events
});
// Create and register a Debezium connector
import { DebeziumConnector } from 'kafka-outbox-node';
const connector = new DebeziumConnector({
databaseType: 'postgres',
host: 'localhost',
port: 5432,
database: 'mydb',
username: 'postgres',
password: 'password',
outboxTable: 'outbox_events',
connectorName: 'my-outbox-connector',
kafkaBootstrapServers: 'localhost:9092',
topicPrefix: 'app.',
// You can use the same logger instance across your application
logger: myLogger // Must implement trace, debug, info, warn, error methods
}, {
eventTypeField: 'event_type',
payloadField: 'payload',
topicField: 'topic',
eventIdField: 'id'
});
// Register with Kafka Connect
await connector.register();
// With Debezium configured, you can use the standard KafkaOutbox for adding events,
// but don't need to call startPolling() or publishEvents() as Debezium handles that
const outbox = new KafkaOutbox({
kafkaBrokers: ['localhost:9092'],
storage: new PostgresOutboxStorage({
host: 'localhost',
port: 5432,
database: 'mydb',
user: 'postgres',
password: 'password',
})
});
// Add an event (but don't need to publish it explicitly)
await outbox.addEvent({ orderId: '123', status: 'created' });
The library includes a Docker Compose configuration with Debezium and all required components:
# Start the Docker environment (Kafka, PostgreSQL, Kafka Connect with Debezium, etc.)
pnpm docker:up
# The environment includes:
# - PostgreSQL (configured for CDC): localhost:5432
# - Kafka: localhost:29092
# - Kafka Connect: localhost:8083
# - Debezium UI: http://localhost:8084
# - Kafka UI: http://localhost:8080
# Register a Debezium connector for the outbox pattern
pnpm tsx scripts/register-debezium-connector.ts
# Now you can run the example
pnpm tsx examples/debezium-cdc-example.ts
# When finished
pnpm docker:down
The Docker setup handles all the configuration needed for Debezium, including:
outbox.startPolling();
// The outbox instance has a logger property you can use
const { logger } = outbox;
process.on('uncaughtException', async (error) => {
logger.error({ err: error }, 'Uncaught exception');
await outbox.disconnect();
process.exit(1);
});
process.on('SIGINT', async () => {
logger.info('Shutting down...');
await outbox.disconnect();
process.exit(0);
});
// Add an event to a specific topic
await outbox.addEvent(
{ orderId: '12345', status: 'completed' },
'order-events'
);
// Add an event to the default topic
await outbox.addEvent({ message: 'Hello, world!' });
pnpm buildpnpm testpnpm lintpnpm formatSee DOCKER.md for instructions on setting up a local development environment with Docker.
MIT
FAQs
A Node.js library implementing the Kafka Outbox pattern for reliable event publishing in distributed systems. This pattern ensures messages are consistently delivered to Kafka, even when the transaction and message publishing need to happen atomically.
We found that kafka-outbox-node demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Product
Socket now supports Custom Roles and Repository Access Permissions so organizations can control who can access specific repositories and actions.

Product
Socket MCP now lets AI assistants review org alerts, investigate threats using the Socket threat feed, and inspect package files in addition to dependency scoring.

Product
Socket Firewall blocks malicious VS Code and Open VSX extensions before install, protecting developers from compromised editor marketplaces.