
Security News
Next.js Patches Critical Middleware Vulnerability (CVE-2025-29927)
Next.js has patched a critical vulnerability (CVE-2025-29927) that allowed attackers to bypass middleware-based authorization checks in self-hosted apps.
@ovotech/kafka-pg-sink
Advanced tools
Store kafka-node events into a postgres database.
yarn add @ovotech/kafka-pg-sink
Each topic from the consumer stream gets its own "resolver" function that converts the message to an array of values, directly inserted as columns in the database.
INSERT INTO tableName VALUES (column1Value, column2Value ...) ON CONFLICT DO NOTHING
import { PGSinkStream, Message } from '@ovotech/kafka-pg-sink';
import { ConsumerGroupStream } from 'kafka-node';
import { Client } from 'pg';
const consumerStream = new ConsumerGroupStream(
{ kafkaHost: 'localhost:29092', groupId: 'my-group', encoding: 'buffer' },
['migration-completed'],
);
const pg = new Client('postgresql://postgres:dev-pass@0.0.0.0:5432/postgres');
const pgSink = new PGSinkStream({
pg,
topics: {
'migration-completed': {
table: 'migration_completed',
resolver: (message: Message) => {
const data = getDataSomehow(message.value);
return [data.column1, data.column2, data];
},
},
},
});
consumerStream.pipe(pgSink);
Message
is the same as the type from kafka-node
, but is more lenient on the "value field". This allows you to pre-process the message before sending it off to pg-sink, by deserializing an using avro for example.
export interface Message {
topic: string;
value: any;
offset?: number;
partition?: number;
highWaterOffset?: number;
key?: string;
}
You can transform kafka events with a transform stream before they arrive to the sink. For example with @ovotech/avro-stream
.
import { AvroDeserializer, AvroMessage } from '@ovotech/avro-stream';
import { PGSinkStream } from '@ovotech/kafka-pg-sink';
import { ConsumerGroupStream } from 'kafka-node';
const consumerStream = new ConsumerGroupStream(
{ kafkaHost: 'localhost:29092', groupId: 'my-group', encoding: 'buffer' },
['migration-completed'],
);
const deserializer = new AvroDeserializer('http://localhost:8080');
const pg = new Client('postgresql://postgres:dev-pass@0.0.0.0:5432/postgres');
const pgSink = new PGSinkStream({
pg,
topics: {
'migration-completed': {
table: 'migration_completed',
resolver: (message: AvroMessage) => [message.value.accountId],
},
},
});
consumerStream.pipe(deserializer).pipe(pgSink);
PGSinkStream can emit an PGSinkError
and PGSinkMultipleError
depending on whether it was processing batched or normal requests.
Property | Description |
---|---|
message | Original error message |
chunk | The event sent from the previous stream to be saved to the database (Message) |
encoding | The buffer encoding |
originalError | The original error object that was triggered |
Property | Description |
---|---|
message | Original error message |
chunks | An array of { chunk: Message, encoding: string } objects |
originalError | The original error object that was triggered |
Example error handling:
import { PGSinkStream, PGSinkError, PGSinkMultipleError } from '@ovotech/kafka-pg-sink';
const pgSink = new PGSinkStream();
pgSink.on('error', (error: PGSinkError | PGSinkMultipleError) => {
if (error instanceof PGSinkError) {
console.log(error.chunk);
}
});
A thing to be aware of is that node streams unpipe in an event of an error, which means that you'll need to provide your own error handling and repipe the streams if you want it to be resilient to errors.
The tests require a running postgres database. This is setup easily with a docker-compose from root project folder:
docker-compose up
Then you can run the tests with:
yarn test
Style is maintained with prettier and tslint
yarn lint
Deployment is preferment by lerna automatically on merge / push to master, but you'll need to bump the package version numbers yourself. Only updated packages with newer versions will be pushed to the npm registry.
Have a bug? File an issue with a simple example that reproduces this so we can take a look & confirm.
Want to make a change? Submit a PR, explain why it's useful, and make sure you've updated the docs (this file) and the tests (see test folder).
This project is licensed under Apache 2 - see the LICENSE file for details
FAQs
Store kafka-node events into a postgres database
The npm package @ovotech/kafka-pg-sink receives a total of 10 weekly downloads. As such, @ovotech/kafka-pg-sink popularity was classified as not popular.
We found that @ovotech/kafka-pg-sink demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 77 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Next.js has patched a critical vulnerability (CVE-2025-29927) that allowed attackers to bypass middleware-based authorization checks in self-hosted apps.
Security News
A survey of 500 cybersecurity pros reveals high pay isn't enough—lack of growth and flexibility is driving attrition and risking organizational security.
Product
Socket, the leader in open source security, is now available on Google Cloud Marketplace for simplified procurement and enhanced protection against supply chain attacks.