New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

kafka-observe

Package Overview
Dependencies
Maintainers
1
Versions
130
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-observe - npm Package Compare versions

Comparing version 2.0.3 to 2.0.4-beta-1

config/tests/commands.txt

4

helpers/generator.helpers.js

@@ -12,4 +12,2 @@ function generateRandomToken({ length } = { length: 24 }) {

module.exports = {
generateRandomToken,
};
module.exports = { generateRandomToken };
const { Consumer, ConsumerGroup, Producer } = require("./IO");
const { callbackController } = require("./controllers");
const { kafkaConfig } = require("./config");

@@ -28,3 +28,4 @@ let instance, consumer, consumerGroup, producer;

module.exports = (options) => {
if (instance) {
if(instance) {
return instance;

@@ -47,3 +48,3 @@ }

if (enableConsumer) {
if(enableConsumer) {
consumer = new Consumer({

@@ -61,3 +62,3 @@ topicsToFollow: topicsToFollow

if (enableConsumerGroup) {
if(enableConsumerGroup) {
consumerGroup = new ConsumerGroup({

@@ -96,12 +97,2 @@ topicsToFollow: topicsToFollow

consumer.getTopicSubject(topic),
/**
* @description
* Callback sends out a events and waits on for one off the listeners to return a response
* @param {String} topic The topic where the event will be send on
* @param {String} sender The event that will be send
* @param {Array <String>} listeners List of the events the callback should listen for
* @param {Object} payload Payload data to send with the event
*/
eventCallback: callbackController({ producer, consumer }),
};

@@ -108,0 +99,0 @@

@@ -22,3 +22,3 @@ const { Kafka, logLevel } = require("kafkajs");

initObservables() {
if (this.options.topicsToFollow.forEach) {
if(this.options.topicsToFollow.forEach) {
this.options.topicsToFollow.forEach(topic => {

@@ -33,3 +33,3 @@ this[`${topic}Subject`] = new Subject();

sendUpdateToObservers({ topic, payload, headers }) {
this[`${topic}Subject`].next({ ...payload, headers });
this[`${topic}Subject`].next({...payload, headers});
}

@@ -46,2 +46,3 @@

maxBytesPerPartition: this.options.maxBytesPerPartition || kafkaConfig.maxBytesPerPartition,
maxWaitTimeInMs: 25,
});

@@ -64,6 +65,6 @@

try {
try{
await Promise.all(this.options.topicsToFollow.map(topic => this.consumer.subscribe({ topic })));
} catch (e0) {
try {
try{
logging.error(`Error during subscribing:

@@ -83,3 +84,3 @@ ${e0}`);

try {
try{
await this.consumer.run({

@@ -103,3 +104,3 @@ autoCommitInterval: kafkaConfig.autoCommitInterval,

if (this.options.logAllEvents === "true") {
if(this.options.logAllEvents === "true") {
logging.report({

@@ -106,0 +107,0 @@ topic: topic,

@@ -22,3 +22,3 @@ const { Kafka, logLevel } = require("kafkajs");

initObservables() {
if (this.options.topicsToFollow.forEach) {
if(this.options.topicsToFollow.forEach) {
this.options.topicsToFollow.forEach(topic => {

@@ -33,3 +33,3 @@ this[`${topic}Subject`] = new Subject();

sendUpdateToObservers({ topic, payload, headers }) {
this[`${topic}Subject`].next({ ...payload, headers });
this[`${topic}Subject`].next({...payload, headers});
}

@@ -50,2 +50,3 @@

maxBytesPerPartition: this.options.maxBytesPerPartition || kafkaConfig.maxBytesPerPartition,
maxWaitTimeInMs: 25,
});

@@ -68,6 +69,6 @@

try {
try{
await Promise.all(this.options.topicsToFollow.map(topic => this.consumer.subscribe({ topic })));
} catch (e0) {
try {
try{
logging.error(`Error during subscribing:

@@ -87,3 +88,3 @@ ${e0}`);

try {
try{
await this.consumer.run({

@@ -107,3 +108,3 @@ autoCommitInterval: kafkaConfig.autoCommitInterval,

if (this.options.logAllEvents === "true") {
if(this.options.logAllEvents === "true") {
logging.report({

@@ -110,0 +111,0 @@ topic: topic,

@@ -8,3 +8,3 @@

module.exports = class Producer {
module.exports = class Producer{
constructor(options) {

@@ -16,3 +16,3 @@ this.options = options;

this.client = new Kafka({
logLevel: logLevel.INFO,
logLevel: logLevel. INFO,
brokers: [options.kafkaHost],

@@ -27,18 +27,19 @@ clientId: options.serviceId,

.then(() => {
this.ready = true;
this.queue.forEach(messageMetadata => this.sendMessage(messageMetadata));
this.ready = true;
this.queue.forEach(messageMetadata => this.sendMessage(messageMetadata));
this.livenessInterval = setInterval(() => this.sendMessage({
payload: {
event: this.options.liveness.event || kafkaConfig.events.liveness.ALIVE_TICK,
service: this.options.serviceId,
},
topic: this.options.liveness.topic,
}), this.options.liveness.interval);
this.livenessInterval = setInterval(() => this.sendMessage({
payload: {
event: this.options.liveness.event || kafkaConfig.events.liveness.ALIVE_TICK,
service: this.options.serviceId,
},
topic: this.options.liveness.topic,
}), this.options.liveness.interval);
})
.catch(error => logging.error(error));
.catch(error => logging.error(error))
}
async sendMessage({ payload, topic, headers }) {
if (!this.ready) {
if(!this.ready) {
this.queue.push({ payload, topic });

@@ -53,5 +54,6 @@ return;

headers,
acks: 1, // only wait for leader acceptance
}],
});
}
};
}
{
"name": "kafka-observe",
"version": "2.0.3",
"version": "2.0.4-beta-1",
"description": "",
"main": "index.js",
"scripts": {
"test": "NODE_ENV=test ./node_modules/mocha/bin/mocha ./test/test.config.js ./test/unit/**/*.js --exit --timeout 5000",
"coverage": "nyc npm run test"
"test": "echo \"Error: no test specified\" && exit 1"
},

@@ -13,9 +12,5 @@ "author": "",

"devDependencies": {
"chai": "^4.2.0",
"eslint": "^5.5.0",
"eslint-plugin-jest": "^22.7.2",
"eslint-plugin-prefer-object-spread": "^1.2.1",
"mocha": "^7.2.0",
"nyc": "^15.1.0",
"sinon": "^9.0.2",
"tslint": "^5.11.0"

@@ -22,0 +17,0 @@ },

@@ -57,4 +57,3 @@ # kafka-observe

`producer`: instance of a connected producer
`getTopicSubject`: a function that returns a topic to subscribe to with the correct strategy.
`eventCallback`: a function that sends out an event and listens for a response
`getTopicSubject`: a function that returns a topic to subscribe to with the correct strategy.

@@ -68,3 +67,3 @@ ## getTopicSubject

getTopicSubject({
getTopicSubject.getTopicSubject({
topic: "topic name",

@@ -77,19 +76,2 @@ loadBalanced: "true|false", // True for Loadbalanced strategy false for All strategy

## eventCallback
Returns a subject you can subscribe to listen to events of a certain topic.
```javascript
const { eventCallback } = require("kafka-observe")(options);
eventCallback({
topic: "topic name",
sender: "event",
listeners: ["listen_for_event"],
payload: {} // An object you want send with the event (needs to be JSON convertable)
}).then((event) => {
// do something with the event
});
```
## producer

@@ -96,0 +78,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc