@trivago/samsa
Advanced tools
Comparing version 0.3.0-beta.4 to 0.3.0
@@ -8,2 +8,11 @@ # Changelog | ||
## v0.3.0 | ||
- Breaking Changes | ||
- move from LevelDB to RocksDB as the underlying store for joins | ||
- this change does not affect sinks however | ||
- Features | ||
- Added a configurable buffer to the joiner | ||
- Added the ability to automatically disconnect a consumer group on process exit | ||
## v0.2.4 | ||
@@ -10,0 +19,0 @@ |
@@ -22,11 +22,13 @@ # Stream Combination Operators | ||
| argument | description | | ||
| ------------ | ------------------------------------------------------------------------------------------------------------- | | ||
| primary | the stream containing primary keys | | ||
| foreign | the stream containing foreign keys | | ||
| project | the projection used to map the primary and foreign values together, defaults to return `{ primary, foreign }` | | ||
| window | the amount of time in seconds to keep cached values alive, defaults to `0` | | ||
| KTableConfig | takes a `batchSize` and `batchAge` to determine how often the given streams are cached | | ||
| argument | description | | ||
| ----------------- | ------------------------------------------------------------------------------------------------------------- | | ||
| primary | the stream containing primary keys | | ||
| foreign | the stream containing foreign keys | | ||
| projection | the projection used to map the primary and foreign values together, defaults to return `{ primary, foreign }` | | ||
| maxKeyBufferSize | maximum number of messages to buffer | | ||
| keyBufferInterval | number of ms to wait before processing messages in the buffer | | ||
| <!-- | window | the amount of time in seconds to keep cached values alive, defaults to `0` | --> | | ||
| <!-- | KTableConfig | takes a `batchSize` and `batchAge` to determine how often the given streams are cached | --> | | ||
Underlying every join is a `KTable`, which in this case is a key-value cache. The `KTable` uses an instance of `LevelDB` to store and retrieve data from the cache in a reasonable manner. In a Kafka context, this could technically be considered a `KTable to KTable` join. | ||
Underlying every join is a `KTable`, which in this case is a key-value cache. The `KTable` uses an instance of `RocksDB` to store and retrieve data from the cache in a reasonable manner. In a Kafka context, this could technically be considered a `KTable to KTable` join. | ||
@@ -33,0 +35,0 @@ **Note:** `join` is an alias for `innerJoin` |
@@ -18,10 +18,11 @@ # Kafka Streams | ||
| option | required | default | description | | ||
| ------------- | -------- | --------- | --------------------------------------------------------------------------------------------------------------------- | | ||
| groupId | yes | undefined | the groupId to use for the underlying kafkaJS consumer | | ||
| topic | yes | undefined | the topic to subscribe to | | ||
| fromBeginning | no | true | whether or not to start consuming from the beginning of the topic | | ||
| highWaterMark | no | 20k | the maximum number of messages that can be consumed at once, more messages than this will cause the consumer to pause | | ||
| autoResume | no | true | whether or not to automatically resume once a set interval has passed | | ||
| resumeAfter | no | 1000 | the time, in ms, to wait before resuming consuming messages | | ||
| option | required | default | description | | ||
| -------------- | -------- | --------- | --------------------------------------------------------------------------------------------------------------------- | | ||
| groupId | yes | undefined | the groupId to use for the underlying kafkaJS consumer | | ||
| topic | yes | undefined | the topic to subscribe to | | ||
| autoDisconnect | no | true | tells the stream whether or not to automatically disconnect the consumer | | ||
| fromBeginning | no | true | whether or not to start consuming from the beginning of the topic | | ||
| highWaterMark | no | 20k | the maximum number of messages that can be consumed at once, more messages than this will cause the consumer to pause | | ||
| autoResume | no | true | whether or not to automatically resume once a set interval has passed | | ||
| resumeAfter | no | 1000 | the time, in ms, to wait before resuming consuming messages | | ||
@@ -28,0 +29,0 @@ Checkout the [KafkaJS documentation](https://kafka.js.org/docs/consuming#a-name-options-a-options) for a full list of consumer options. |
@@ -47,4 +47,5 @@ /// <reference types="node" /> | ||
resumeAfter?: number; | ||
autoDisconnect?: boolean; | ||
} | ||
export declare type StreamErrorCallback = (error?: Error | null) => void; | ||
export {}; |
@@ -11,3 +11,6 @@ /// <reference types="node" /> | ||
private running; | ||
private connected; | ||
private signalTraps; | ||
constructor(consumer: Consumer, topic: string, config?: { | ||
autoDisconnect: boolean; | ||
highWaterMark: number; | ||
@@ -17,2 +20,3 @@ autoResume: boolean; | ||
}); | ||
disconnect(): Promise<any>; | ||
pauseTopics(): void; | ||
@@ -19,0 +23,0 @@ resumeTopics(): void; |
@@ -77,2 +77,3 @@ "use strict"; | ||
var defaultConsumerConfig = { | ||
autoDisconnect: true, | ||
highWaterMark: 100000, | ||
@@ -82,2 +83,3 @@ autoResume: true, | ||
}; | ||
var exitSignals = ['SIGINT', 'SIGTERM', "SIGUSR2"]; | ||
var ConsumerStream = /** @class */ (function (_super) { | ||
@@ -93,2 +95,4 @@ __extends(ConsumerStream, _super); | ||
_this.running = false; | ||
_this.connected = true; | ||
_this.signalTraps = []; | ||
_this.running = false; | ||
@@ -106,4 +110,52 @@ _this.on("pause", function () { | ||
}); | ||
if (_this.config.autoDisconnect) { | ||
_this.signalTraps = exitSignals.map(function (signal) { | ||
return process.on(signal, function (code) { return __awaiter(_this, void 0, void 0, function () { | ||
var attempt; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
console.log("Attempting to gracefully shut down consumer for topic " + topic + " after " + code + " signal"); | ||
if (!this.connected) return [3 /*break*/, 2]; | ||
return [4 /*yield*/, this.disconnect()]; | ||
case 1: | ||
attempt = _a.sent(); | ||
if (attempt == undefined) { | ||
console.log("Consumer for topic " + topic + " successfully disconnected. Exiting"); | ||
} | ||
else { | ||
console.error(attempt); | ||
process.exit(1); | ||
} | ||
_a.label = 2; | ||
case 2: | ||
process.exit(); | ||
return [2 /*return*/]; | ||
} | ||
}); | ||
}); }); | ||
}); | ||
} | ||
return _this; | ||
} | ||
ConsumerStream.prototype.disconnect = function () { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var err_1; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
_a.trys.push([0, 2, , 3]); | ||
return [4 /*yield*/, this.consumer.disconnect()]; | ||
case 1: | ||
_a.sent(); | ||
this.connected = false; | ||
return [2 /*return*/]; | ||
case 2: | ||
err_1 = _a.sent(); | ||
return [2 /*return*/, err_1]; | ||
case 3: return [2 /*return*/]; | ||
} | ||
}); | ||
}); | ||
}; | ||
ConsumerStream.prototype.pauseTopics = function () { | ||
@@ -194,9 +246,9 @@ this.consumer.pause([ | ||
exports.createConsumerStream = function (kafkaClientOrConfig, streamConfig) { return __awaiter(void 0, void 0, void 0, function () { | ||
var topic, _a, fromBeginning, _b, highWaterMark, _c, autoResume, _d, resumeAfter, consumerConfig, client, consumer; | ||
var _e, _f; | ||
return __generator(this, function (_g) { | ||
switch (_g.label) { | ||
var topic, _a, fromBeginning, _b, highWaterMark, _c, autoResume, _d, resumeAfter, _e, autoDisconnect, consumerConfig, client, consumer; | ||
var _f, _g; | ||
return __generator(this, function (_h) { | ||
switch (_h.label) { | ||
case 0: | ||
topic = streamConfig.topic, _a = streamConfig.fromBeginning, fromBeginning = _a === void 0 ? true : _a, _b = streamConfig.highWaterMark, highWaterMark = _b === void 0 ? 100000 : _b, _c = streamConfig.autoResume, autoResume = _c === void 0 ? true : _c, _d = streamConfig.resumeAfter, resumeAfter = _d === void 0 ? 1000 : _d, consumerConfig = __rest(streamConfig, ["topic", "fromBeginning", "highWaterMark", "autoResume", "resumeAfter"]); | ||
client = ((_f = (_e = kafkaClientOrConfig) === null || _e === void 0 ? void 0 : _e.constructor) === null || _f === void 0 ? void 0 : _f.name) === 'Client' | ||
topic = streamConfig.topic, _a = streamConfig.fromBeginning, fromBeginning = _a === void 0 ? true : _a, _b = streamConfig.highWaterMark, highWaterMark = _b === void 0 ? 100000 : _b, _c = streamConfig.autoResume, autoResume = _c === void 0 ? true : _c, _d = streamConfig.resumeAfter, resumeAfter = _d === void 0 ? 1000 : _d, _e = streamConfig.autoDisconnect, autoDisconnect = _e === void 0 ? true : _e, consumerConfig = __rest(streamConfig, ["topic", "fromBeginning", "highWaterMark", "autoResume", "resumeAfter", "autoDisconnect"]); | ||
client = ((_g = (_f = kafkaClientOrConfig) === null || _f === void 0 ? void 0 : _f.constructor) === null || _g === void 0 ? void 0 : _g.name) === 'Client' | ||
? kafkaClientOrConfig | ||
@@ -209,3 +261,3 @@ : new kafkajs_1.Kafka(kafkaClientOrConfig); | ||
// connect our consumer and subscribe | ||
_g.sent(); | ||
_h.sent(); | ||
return [4 /*yield*/, consumer.subscribe({ | ||
@@ -216,7 +268,8 @@ topic: topic, | ||
case 2: | ||
_g.sent(); | ||
_h.sent(); | ||
return [2 /*return*/, new ConsumerStream(consumer, topic, { | ||
highWaterMark: highWaterMark, | ||
autoResume: autoResume, | ||
resumeAfter: resumeAfter | ||
resumeAfter: resumeAfter, | ||
autoDisconnect: autoDisconnect | ||
})]; | ||
@@ -223,0 +276,0 @@ } |
{ | ||
"name": "@trivago/samsa", | ||
"version": "0.3.0-beta.4", | ||
"version": "0.3.0", | ||
"types": "lib/index.d.ts", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
@@ -58,4 +58,5 @@ import { LevelUp } from "levelup"; | ||
resumeAfter?: number; | ||
autoDisconnect?: boolean; | ||
} | ||
export type StreamErrorCallback = (error?: Error | null) => void; |
@@ -6,2 +6,3 @@ import { StreamConfig, Message } from "../_types"; | ||
const defaultConsumerConfig = { | ||
autoDisconnect: true, | ||
highWaterMark: 100000, | ||
@@ -11,6 +12,12 @@ autoResume: true, | ||
}; | ||
const exitSignals: NodeJS.Signals[] = ['SIGINT', 'SIGTERM',"SIGUSR2"]; | ||
class ConsumerStream extends Readable { | ||
private buffer: Message[] = []; | ||
private running: boolean = false; | ||
private connected: boolean = true; | ||
private signalTraps: NodeJS.Process[] = []; | ||
constructor( | ||
@@ -20,2 +27,3 @@ private consumer: Consumer, | ||
private config: { | ||
autoDisconnect: boolean; | ||
highWaterMark: number; | ||
@@ -43,4 +51,33 @@ autoResume: boolean; | ||
}); | ||
if (this.config.autoDisconnect) { | ||
this.signalTraps = exitSignals.map(signal => { | ||
return process.on(signal, async (code) => { | ||
console.log(`Attempting to gracefully shut down consumer for topic ${topic} after ${code} signal`); | ||
if (this.connected) { | ||
const attempt = await this.disconnect(); | ||
if (attempt == undefined) { | ||
console.log(`Consumer for topic ${topic} successfully disconnected. Exiting`); | ||
} else { | ||
console.error(attempt); | ||
process.exit(1); | ||
} | ||
} | ||
process.exit(); | ||
}) | ||
}); | ||
} | ||
} | ||
async disconnect() { | ||
try { | ||
await this.consumer.disconnect(); | ||
this.connected = false; | ||
return; | ||
} catch (err) { | ||
return err; | ||
} | ||
} | ||
pauseTopics() { | ||
@@ -136,3 +173,4 @@ this.consumer.pause([ | ||
autoResume = true, | ||
resumeAfter = 1000, // allow for more fine grained control | ||
resumeAfter = 1000, // allow for more fine grained contro | ||
autoDisconnect = true, | ||
...consumerConfig | ||
@@ -158,4 +196,5 @@ } = streamConfig; | ||
autoResume, | ||
resumeAfter | ||
resumeAfter, | ||
autoDisconnect | ||
}); | ||
}; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
206745
4931