@trivago/samsa
Advanced tools
Comparing version 0.3.0 to 0.3.1
@@ -8,2 +8,14 @@ # Changelog | ||
## v0.3.1 | ||
- Features | ||
- Added meta information to messages being pushed from `createConsumerStream`: | ||
- topic | ||
- partition | ||
- offset | ||
- timestamp | ||
- size | ||
- attributes | ||
- headers | ||
## v0.3.0 | ||
@@ -10,0 +22,0 @@ |
/// <reference types="node" /> | ||
import { LevelUp } from "levelup"; | ||
import { AbstractLevelDOWN } from "abstract-leveldown"; | ||
import { ConsumerConfig } from "kafkajs"; | ||
import { ConsumerConfig, IHeaders } from "kafkajs"; | ||
/** | ||
@@ -19,2 +19,11 @@ * Describes a key that could possibly come from Kafka or another source. | ||
} | ||
export interface MessageMetaData { | ||
topic: string; | ||
partition: number; | ||
offset: string; | ||
timestamp: string; | ||
size: number; | ||
attributes: number; | ||
headers?: IHeaders; | ||
} | ||
/** | ||
@@ -25,2 +34,3 @@ * Describes a key value pair coming from KafkaJS, optionally contains the ability | ||
export interface Message extends KeyValuePair { | ||
metaData: MessageMetaData; | ||
commit?: () => void; | ||
@@ -27,0 +37,0 @@ } |
@@ -51,3 +51,2 @@ "use strict"; | ||
var ObjectReadable_1 = require("../utils/ObjectReadable"); | ||
var stream_1 = require("stream"); | ||
exports.from = function (ish) { | ||
@@ -81,5 +80,6 @@ if (ish instanceof Promise) { | ||
} | ||
if (stream_1.Readable.hasOwnProperty("from")) { | ||
return stream_1.Readable.from(ish); | ||
} | ||
// removing for now, as it causes issues with strings | ||
// if (Readable.hasOwnProperty("from")) { | ||
// return Readable.from(ish); | ||
// } | ||
return new ObjectReadable_1.ObjectReadable({ | ||
@@ -86,0 +86,0 @@ read: function () { |
@@ -177,9 +177,18 @@ "use strict"; | ||
eachBatch: function (_a) { | ||
var messages = _a.batch.messages, resolveOffset = _a.resolveOffset; | ||
var _b = _a.batch, messages = _b.messages, partition = _b.partition, topic = _b.topic, resolveOffset = _a.resolveOffset; | ||
_this.running = true; | ||
_this.buffer = _this.buffer.concat(messages.map(function (_a) { | ||
var key = _a.key, value = _a.value, offset = _a.offset; | ||
var key = _a.key, value = _a.value, offset = _a.offset, timestamp = _a.timestamp, size = _a.size, attributes = _a.attributes, headers = _a.headers; | ||
return ({ | ||
key: key, | ||
value: value, | ||
metaData: { | ||
topic: topic, | ||
partition: partition, | ||
offset: offset, | ||
timestamp: timestamp, | ||
size: size, | ||
attributes: attributes, | ||
headers: headers, | ||
}, | ||
commit: function () { return resolveOffset(offset); } | ||
@@ -205,6 +214,10 @@ }); | ||
var message = _c.value; | ||
var key = message.key, value = message.value, commit = message.commit; | ||
var _d = message, key = _d.key, value = _d.value, timestamp = _d.timestamp, size = _d.size, attributes = _d.attributes, headers = _d.headers, commit = _d.commit; | ||
this.push({ | ||
key: key, | ||
value: value | ||
value: value, | ||
timestamp: timestamp, | ||
size: size, | ||
attributes: attributes, | ||
headers: headers, | ||
}); | ||
@@ -211,0 +224,0 @@ if (commit && typeof commit === "function") { |
{ | ||
"name": "@trivago/samsa", | ||
"version": "0.3.0", | ||
"version": "0.3.1", | ||
"types": "lib/index.d.ts", | ||
@@ -5,0 +5,0 @@ "main": "lib/index.js", |
import { LevelUp } from "levelup"; | ||
import { AbstractLevelDOWN } from "abstract-leveldown"; | ||
import { ConsumerConfig, Batch } from "kafkajs"; | ||
import { ConsumerConfig, Batch, IHeaders } from "kafkajs"; | ||
@@ -22,2 +22,12 @@ /** | ||
export interface MessageMetaData { | ||
topic: string; | ||
partition: number; | ||
offset: string; | ||
timestamp: string; | ||
size: number; | ||
attributes: number; | ||
headers?: IHeaders; | ||
} | ||
/** | ||
@@ -28,2 +38,3 @@ * Describes a key value pair coming from KafkaJS, optionally contains the ability | ||
export interface Message extends KeyValuePair { | ||
metaData: MessageMetaData; | ||
commit?: () => void; | ||
@@ -30,0 +41,0 @@ } |
@@ -21,5 +21,7 @@ import { ObjectReadable } from "../utils/ObjectReadable"; | ||
if (Readable.hasOwnProperty("from")) { | ||
return Readable.from(ish); | ||
} | ||
// removing for now, as it causes issues with strings | ||
// if (Readable.hasOwnProperty("from")) { | ||
// return Readable.from(ish); | ||
// } | ||
return new ObjectReadable({ | ||
@@ -26,0 +28,0 @@ read() { |
import { StreamConfig, Message } from "../_types"; | ||
import { Kafka, KafkaConfig, Consumer } from "kafkajs"; | ||
import { Kafka, KafkaConfig, Consumer, KafkaMessage } from "kafkajs"; | ||
import { Readable } from "stream"; | ||
@@ -99,9 +99,18 @@ | ||
eachBatchAutoResolve: false, | ||
eachBatch: ({ batch: { messages }, resolveOffset }) => { | ||
eachBatch: ({ batch: { messages, partition, topic }, resolveOffset }) => { | ||
this.running = true; | ||
this.buffer = this.buffer.concat( | ||
messages.map(({ key, value, offset }) => ({ | ||
messages.map(({ key, value, offset, timestamp, size, attributes, headers }) => ({ | ||
key, | ||
value, | ||
metaData: { | ||
topic, | ||
partition, | ||
offset, | ||
timestamp, | ||
size, | ||
attributes, | ||
headers, | ||
}, | ||
commit: () => resolveOffset(offset) | ||
@@ -127,6 +136,19 @@ })) | ||
for (const message of this.buffer) { | ||
const { key, value, commit } = message; | ||
const { | ||
key, | ||
value, | ||
timestamp, | ||
size, | ||
attributes, | ||
headers, | ||
commit | ||
} = message as (Message & KafkaMessage); | ||
this.push({ | ||
key, | ||
value | ||
value, | ||
timestamp, | ||
size, | ||
attributes, | ||
headers, | ||
}); | ||
@@ -133,0 +155,0 @@ if (commit && typeof commit === "function") { |
209067
4986