New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafka-observe

Package Overview
Dependencies
Maintainers
3
Versions
130
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-observe - npm Package Compare versions

Comparing version 2.1.1-beta-8 to 2.1.1-beta-9

79

controllers/stream.controller.js

@@ -6,28 +6,3 @@ const { filter } = require("rxjs/operators");

module.exports = {
write: ({ producer }) => ({ topic, headers, event, payload, limit = 819200 }) => {
const data = JSON.stringify(payload);
const totalSize = data.length;
const total = Math.ceil(totalSize / limit);
for (let page = 1; page <= total; page++) {
const offset = (page - 1) * limit;
const result = data.slice(offset, offset + limit);
producer.sendMessage({
headers: {
...headers,
page,
total,
limit,
totalSize,
size: result.length,
},
topic,
payload: {
payload: result,
event,
},
});
}
},
write: writeStream,
read: ({ consumer }) => ({ topic, events, guard, onStatus }) => new Promise((resolve, reject) => {

@@ -112,2 +87,54 @@ let result = {};

},
writeCallback: ({ consumer, producer }) => ({ topic, payload, sender, listeners, limit }) => {
const token = generatorHelpers.generateUuid();
function listenerForProducer({ topic, events, token }) {
return new Promise((resolve, reject) => {
const subscription = consumer.getTopicSubject(topic)
.pipe(filter(item => events.includes(item.event) && item.headers && item.headers.token === token))
.subscribe((item => {
subscription.unsubscribe();
return item.error ? reject(item) : resolve(item);
}));
});
}
writeStream({ producer })({ topic, event: sender, payload, limit });
return listenerForProducer({
topic,
events: listeners,
token,
});
},
};
function writeStream({ producer }) {
return ({ topic, headers, event, payload, limit = 524288 }) => {
const data = JSON.stringify(payload);
const totalSize = data.length;
const total = Math.ceil(totalSize / limit);
for (let page = 1; page <= total; page++) {
const offset = (page - 1) * limit;
const result = data.slice(offset, offset + limit);
producer.sendMessage({
headers: {
...headers,
page,
total,
limit,
totalSize,
size: result.length,
},
topic,
payload: {
payload: result,
event,
},
});
}
};
}

@@ -125,3 +125,3 @@ const { Consumer, ConsumerGroup, Producer } = require("./IO");

* @description
* Consume stream data
* Produce event and consume stream data
* @param {String} topic The topic on which the stream is send on

@@ -137,2 +137,13 @@ * @param {String} sender The event that will be send

* @description
* Produce stream event and consume event
* @param {String} topic The topic on which the stream is send on
* @param {String} sender The event that will be send
* @param {Array <String>} listeners List of the events the callback should listen for
* @param {Object} payload Payload data to send on produce
* @param {Number} limit The max data size in byte each stream can contain
*/
streamWriteCallback: streamController.writeCallback({ producer, consumer }),
/**
* @description
* Timeout wrapper for callback will throw error when timeout is passed and no response is received

@@ -139,0 +150,0 @@ * @param {Function} action Async promise that will return a promise on finish

{
"name": "kafka-observe",
"version": "2.1.1-beta-8",
"version": "2.1.1-beta-9",
"description": "",

@@ -5,0 +5,0 @@ "main": "index.js",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc