Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@trivago/samsa

Package Overview
Dependencies
Maintainers
3
Versions
19
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@trivago/samsa - npm Package Compare versions

Comparing version 0.3.0-beta.4 to 0.3.0

9

CHANGELOG.md

@@ -8,2 +8,11 @@ # Changelog

## v0.3.0
- Breaking Changes
- move from LevelDB to RocksDB as the underlying store for joins
- this change does not affect sinks however
- Features
- Added a configurable buffer to the joiner
- Added the ability to automatically disconnect a consumer group on process exit
## v0.2.4

@@ -10,0 +19,0 @@

18

docs/Combinators.md

@@ -22,11 +22,13 @@ # Stream Combination Operators

| argument | description |
| ------------ | ------------------------------------------------------------------------------------------------------------- |
| primary | the stream containing primary keys |
| foreign | the stream containing foreign keys |
| project | the projection used to map the primary and foreign values together, defaults to return `{ primary, foreign }` |
| window | the amount of time in seconds to keep cached values alive, defaults to `0` |
| KTableConfig | takes a `batchSize` and `batchAge` to determine how often the given streams are cached |
| argument | description |
| ----------------- | ------------------------------------------------------------------------------------------------------------- |
| primary | the stream containing primary keys |
| foreign | the stream containing foreign keys |
| projection | the projection used to map the primary and foreign values together, defaults to return `{ primary, foreign }` |
| maxKeyBufferSize | maximum number of messages to buffer |
| keyBufferInterval | number of ms to wait before processing messages in the buffer |
| <!-- | window | the amount of time in seconds to keep cached values alive, defaults to `0` | --> |
| <!-- | KTableConfig | takes a `batchSize` and `batchAge` to determine how often the given streams are cached | --> |
Underlying every join is a `KTable`, which in this case is a key-value cache. The `KTable` uses an instance of `LevelDB` to store and retrieve data from the cache in a reasonable manner. In a Kafka context, this could technically be considered a `KTable to KTable` join.
Underlying every join is a `KTable`, which in this case is a key-value cache. The `KTable` uses an instance of `RocksDB` to store and retrieve data from the cache in a reasonable manner. In a Kafka context, this could technically be considered a `KTable to KTable` join.

@@ -33,0 +35,0 @@ **Note:** `join` is an alias for `innerJoin`

@@ -18,10 +18,11 @@ # Kafka Streams

| option | required | default | description |
| ------------- | -------- | --------- | --------------------------------------------------------------------------------------------------------------------- |
| groupId | yes | undefined | the groupId to use for the underlying kafkaJS consumer |
| topic | yes | undefined | the topic to subscribe to |
| fromBeginning | no | true | whether or not to start consuming from the beginning of the topic |
| highWaterMark | no | 20k | the maximum number of messages that can be consumed at once, more messages than this will cause the consumer to pause |
| autoResume | no | true | whether or not to automatically resume once a set interval has passed |
| resumeAfter | no | 1000 | the time, in ms, to wait before resuming consuming messages |
| option | required | default | description |
| -------------- | -------- | --------- | --------------------------------------------------------------------------------------------------------------------- |
| groupId | yes | undefined | the groupId to use for the underlying kafkaJS consumer |
| topic | yes | undefined | the topic to subscribe to |
| autoDisconnect | no | true | tells the stream whether or not to automatically disconnect the consumer |
| fromBeginning | no | true | whether or not to start consuming from the beginning of the topic |
| highWaterMark | no | 20k | the maximum number of messages that can be consumed at once, more messages than this will cause the consumer to pause |
| autoResume | no | true | whether or not to automatically resume once a set interval has passed |
| resumeAfter | no | 1000 | the time, in ms, to wait before resuming consuming messages |

@@ -28,0 +29,0 @@ Checkout the [KafkaJS documentation](https://kafka.js.org/docs/consuming#a-name-options-a-options) for a full list of consumer options.

@@ -47,4 +47,5 @@ /// <reference types="node" />

resumeAfter?: number;
autoDisconnect?: boolean;
}
export declare type StreamErrorCallback = (error?: Error | null) => void;
export {};

@@ -11,3 +11,6 @@ /// <reference types="node" />

private running;
private connected;
private signalTraps;
constructor(consumer: Consumer, topic: string, config?: {
autoDisconnect: boolean;
highWaterMark: number;

@@ -17,2 +20,3 @@ autoResume: boolean;

});
disconnect(): Promise<any>;
pauseTopics(): void;

@@ -19,0 +23,0 @@ resumeTopics(): void;

@@ -77,2 +77,3 @@ "use strict";

var defaultConsumerConfig = {
autoDisconnect: true,
highWaterMark: 100000,

@@ -82,2 +83,3 @@ autoResume: true,

};
var exitSignals = ['SIGINT', 'SIGTERM', "SIGUSR2"];
var ConsumerStream = /** @class */ (function (_super) {

@@ -93,2 +95,4 @@ __extends(ConsumerStream, _super);

_this.running = false;
_this.connected = true;
_this.signalTraps = [];
_this.running = false;

@@ -106,4 +110,52 @@ _this.on("pause", function () {

});
if (_this.config.autoDisconnect) {
_this.signalTraps = exitSignals.map(function (signal) {
return process.on(signal, function (code) { return __awaiter(_this, void 0, void 0, function () {
var attempt;
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
console.log("Attempting to gracefully shut down consumer for topic " + topic + " after " + code + " signal");
if (!this.connected) return [3 /*break*/, 2];
return [4 /*yield*/, this.disconnect()];
case 1:
attempt = _a.sent();
if (attempt == undefined) {
console.log("Consumer for topic " + topic + " successfully disconnected. Exiting");
}
else {
console.error(attempt);
process.exit(1);
}
_a.label = 2;
case 2:
process.exit();
return [2 /*return*/];
}
});
}); });
});
}
return _this;
}
ConsumerStream.prototype.disconnect = function () {
return __awaiter(this, void 0, void 0, function () {
var err_1;
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
_a.trys.push([0, 2, , 3]);
return [4 /*yield*/, this.consumer.disconnect()];
case 1:
_a.sent();
this.connected = false;
return [2 /*return*/];
case 2:
err_1 = _a.sent();
return [2 /*return*/, err_1];
case 3: return [2 /*return*/];
}
});
});
};
ConsumerStream.prototype.pauseTopics = function () {

@@ -194,9 +246,9 @@ this.consumer.pause([

exports.createConsumerStream = function (kafkaClientOrConfig, streamConfig) { return __awaiter(void 0, void 0, void 0, function () {
var topic, _a, fromBeginning, _b, highWaterMark, _c, autoResume, _d, resumeAfter, consumerConfig, client, consumer;
var _e, _f;
return __generator(this, function (_g) {
switch (_g.label) {
var topic, _a, fromBeginning, _b, highWaterMark, _c, autoResume, _d, resumeAfter, _e, autoDisconnect, consumerConfig, client, consumer;
var _f, _g;
return __generator(this, function (_h) {
switch (_h.label) {
case 0:
topic = streamConfig.topic, _a = streamConfig.fromBeginning, fromBeginning = _a === void 0 ? true : _a, _b = streamConfig.highWaterMark, highWaterMark = _b === void 0 ? 100000 : _b, _c = streamConfig.autoResume, autoResume = _c === void 0 ? true : _c, _d = streamConfig.resumeAfter, resumeAfter = _d === void 0 ? 1000 : _d, consumerConfig = __rest(streamConfig, ["topic", "fromBeginning", "highWaterMark", "autoResume", "resumeAfter"]);
client = ((_f = (_e = kafkaClientOrConfig) === null || _e === void 0 ? void 0 : _e.constructor) === null || _f === void 0 ? void 0 : _f.name) === 'Client'
topic = streamConfig.topic, _a = streamConfig.fromBeginning, fromBeginning = _a === void 0 ? true : _a, _b = streamConfig.highWaterMark, highWaterMark = _b === void 0 ? 100000 : _b, _c = streamConfig.autoResume, autoResume = _c === void 0 ? true : _c, _d = streamConfig.resumeAfter, resumeAfter = _d === void 0 ? 1000 : _d, _e = streamConfig.autoDisconnect, autoDisconnect = _e === void 0 ? true : _e, consumerConfig = __rest(streamConfig, ["topic", "fromBeginning", "highWaterMark", "autoResume", "resumeAfter", "autoDisconnect"]);
client = ((_g = (_f = kafkaClientOrConfig) === null || _f === void 0 ? void 0 : _f.constructor) === null || _g === void 0 ? void 0 : _g.name) === 'Client'
? kafkaClientOrConfig

@@ -209,3 +261,3 @@ : new kafkajs_1.Kafka(kafkaClientOrConfig);

// connect our consumer and subscribe
_g.sent();
_h.sent();
return [4 /*yield*/, consumer.subscribe({

@@ -216,7 +268,8 @@ topic: topic,

case 2:
_g.sent();
_h.sent();
return [2 /*return*/, new ConsumerStream(consumer, topic, {
highWaterMark: highWaterMark,
autoResume: autoResume,
resumeAfter: resumeAfter
resumeAfter: resumeAfter,
autoDisconnect: autoDisconnect
})];

@@ -223,0 +276,0 @@ }

{
"name": "@trivago/samsa",
"version": "0.3.0-beta.4",
"version": "0.3.0",
"types": "lib/index.d.ts",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

@@ -58,4 +58,5 @@ import { LevelUp } from "levelup";

resumeAfter?: number;
autoDisconnect?: boolean;
}
export type StreamErrorCallback = (error?: Error | null) => void;

@@ -6,2 +6,3 @@ import { StreamConfig, Message } from "../_types";

const defaultConsumerConfig = {
autoDisconnect: true,
highWaterMark: 100000,

@@ -11,6 +12,12 @@ autoResume: true,

};
const exitSignals: NodeJS.Signals[] = ['SIGINT', 'SIGTERM',"SIGUSR2"];
class ConsumerStream extends Readable {
private buffer: Message[] = [];
private running: boolean = false;
private connected: boolean = true;
private signalTraps: NodeJS.Process[] = [];
constructor(

@@ -20,2 +27,3 @@ private consumer: Consumer,

private config: {
autoDisconnect: boolean;
highWaterMark: number;

@@ -43,4 +51,33 @@ autoResume: boolean;

});
if (this.config.autoDisconnect) {
this.signalTraps = exitSignals.map(signal => {
return process.on(signal, async (code) => {
console.log(`Attempting to gracefully shut down consumer for topic ${topic} after ${code} signal`);
if (this.connected) {
const attempt = await this.disconnect();
if (attempt == undefined) {
console.log(`Consumer for topic ${topic} successfully disconnected. Exiting`);
} else {
console.error(attempt);
process.exit(1);
}
}
process.exit();
})
});
}
}
async disconnect() {
try {
await this.consumer.disconnect();
this.connected = false;
return;
} catch (err) {
return err;
}
}
pauseTopics() {

@@ -136,3 +173,4 @@ this.consumer.pause([

autoResume = true,
resumeAfter = 1000, // allow for more fine grained control
resumeAfter = 1000, // allow for more fine grained contro
autoDisconnect = true,
...consumerConfig

@@ -158,4 +196,5 @@ } = streamConfig;

autoResume,
resumeAfter
resumeAfter,
autoDisconnect
});
};
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc