New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@trivago/samsa

Package Overview
Dependencies
Maintainers
3
Versions
19
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@trivago/samsa - npm Package Compare versions

Comparing version 0.3.0 to 0.3.1

12

CHANGELOG.md

@@ -8,2 +8,14 @@ # Changelog

## v0.3.1
- Features
- Added meta information to messages being pushed from `createConsumerStream`:
- topic
- partition
- offset
- timestamp
- size
- attributes
- headers
## v0.3.0

@@ -10,0 +22,0 @@

12

lib/_types.d.ts
/// <reference types="node" />
import { LevelUp } from "levelup";
import { AbstractLevelDOWN } from "abstract-leveldown";
import { ConsumerConfig } from "kafkajs";
import { ConsumerConfig, IHeaders } from "kafkajs";
/**

@@ -19,2 +19,11 @@ * Describes a key that could possibly come from Kafka or another source.

}
export interface MessageMetaData {
topic: string;
partition: number;
offset: string;
timestamp: string;
size: number;
attributes: number;
headers?: IHeaders;
}
/**

@@ -25,2 +34,3 @@ * Describes a key value pair coming from KafkaJS, optionally contains the ability

export interface Message extends KeyValuePair {
metaData: MessageMetaData;
commit?: () => void;

@@ -27,0 +37,0 @@ }

8

lib/creators/from.js

@@ -51,3 +51,2 @@ "use strict";

var ObjectReadable_1 = require("../utils/ObjectReadable");
var stream_1 = require("stream");
exports.from = function (ish) {

@@ -81,5 +80,6 @@ if (ish instanceof Promise) {

}
if (stream_1.Readable.hasOwnProperty("from")) {
return stream_1.Readable.from(ish);
}
// removing for now, as it causes issues with strings
// if (Readable.hasOwnProperty("from")) {
// return Readable.from(ish);
// }
return new ObjectReadable_1.ObjectReadable({

@@ -86,0 +86,0 @@ read: function () {

@@ -177,9 +177,18 @@ "use strict";

eachBatch: function (_a) {
var messages = _a.batch.messages, resolveOffset = _a.resolveOffset;
var _b = _a.batch, messages = _b.messages, partition = _b.partition, topic = _b.topic, resolveOffset = _a.resolveOffset;
_this.running = true;
_this.buffer = _this.buffer.concat(messages.map(function (_a) {
var key = _a.key, value = _a.value, offset = _a.offset;
var key = _a.key, value = _a.value, offset = _a.offset, timestamp = _a.timestamp, size = _a.size, attributes = _a.attributes, headers = _a.headers;
return ({
key: key,
value: value,
metaData: {
topic: topic,
partition: partition,
offset: offset,
timestamp: timestamp,
size: size,
attributes: attributes,
headers: headers,
},
commit: function () { return resolveOffset(offset); }

@@ -205,6 +214,10 @@ });

var message = _c.value;
var key = message.key, value = message.value, commit = message.commit;
var _d = message, key = _d.key, value = _d.value, timestamp = _d.timestamp, size = _d.size, attributes = _d.attributes, headers = _d.headers, commit = _d.commit;
this.push({
key: key,
value: value
value: value,
timestamp: timestamp,
size: size,
attributes: attributes,
headers: headers,
});

@@ -211,0 +224,0 @@ if (commit && typeof commit === "function") {

{
"name": "@trivago/samsa",
"version": "0.3.0",
"version": "0.3.1",
"types": "lib/index.d.ts",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

import { LevelUp } from "levelup";
import { AbstractLevelDOWN } from "abstract-leveldown";
import { ConsumerConfig, Batch } from "kafkajs";
import { ConsumerConfig, Batch, IHeaders } from "kafkajs";

@@ -22,2 +22,12 @@ /**

export interface MessageMetaData {
topic: string;
partition: number;
offset: string;
timestamp: string;
size: number;
attributes: number;
headers?: IHeaders;
}
/**

@@ -28,2 +38,3 @@ * Describes a key value pair coming from KafkaJS, optionally contains the ability

export interface Message extends KeyValuePair {
metaData: MessageMetaData;
commit?: () => void;

@@ -30,0 +41,0 @@ }

@@ -21,5 +21,7 @@ import { ObjectReadable } from "../utils/ObjectReadable";

if (Readable.hasOwnProperty("from")) {
return Readable.from(ish);
}
// removing for now, as it causes issues with strings
// if (Readable.hasOwnProperty("from")) {
// return Readable.from(ish);
// }
return new ObjectReadable({

@@ -26,0 +28,0 @@ read() {

import { StreamConfig, Message } from "../_types";
import { Kafka, KafkaConfig, Consumer } from "kafkajs";
import { Kafka, KafkaConfig, Consumer, KafkaMessage } from "kafkajs";
import { Readable } from "stream";

@@ -99,9 +99,18 @@

eachBatchAutoResolve: false,
eachBatch: ({ batch: { messages }, resolveOffset }) => {
eachBatch: ({ batch: { messages, partition, topic }, resolveOffset }) => {
this.running = true;
this.buffer = this.buffer.concat(
messages.map(({ key, value, offset }) => ({
messages.map(({ key, value, offset, timestamp, size, attributes, headers }) => ({
key,
value,
metaData: {
topic,
partition,
offset,
timestamp,
size,
attributes,
headers,
},
commit: () => resolveOffset(offset)

@@ -127,6 +136,19 @@ }))

for (const message of this.buffer) {
const { key, value, commit } = message;
const {
key,
value,
timestamp,
size,
attributes,
headers,
commit
} = message as (Message & KafkaMessage);
this.push({
key,
value
value,
timestamp,
size,
attributes,
headers,
});

@@ -133,0 +155,0 @@ if (commit && typeof commit === "function") {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc