kafka-observe
Advanced tools
Comparing version 2.0.3 to 2.0.4-beta-1
@@ -12,4 +12,2 @@ function generateRandomToken({ length } = { length: 24 }) { | ||
module.exports = { | ||
generateRandomToken, | ||
}; | ||
module.exports = { generateRandomToken }; |
19
index.js
const { Consumer, ConsumerGroup, Producer } = require("./IO"); | ||
const { callbackController } = require("./controllers"); | ||
const { kafkaConfig } = require("./config"); | ||
@@ -28,3 +28,4 @@ let instance, consumer, consumerGroup, producer; | ||
module.exports = (options) => { | ||
if (instance) { | ||
if(instance) { | ||
return instance; | ||
@@ -47,3 +48,3 @@ } | ||
if (enableConsumer) { | ||
if(enableConsumer) { | ||
consumer = new Consumer({ | ||
@@ -61,3 +62,3 @@ topicsToFollow: topicsToFollow | ||
if (enableConsumerGroup) { | ||
if(enableConsumerGroup) { | ||
consumerGroup = new ConsumerGroup({ | ||
@@ -96,12 +97,2 @@ topicsToFollow: topicsToFollow | ||
consumer.getTopicSubject(topic), | ||
/** | ||
* @description | ||
* Callback sends out a events and waits on for one off the listeners to return a response | ||
* @param {String} topic The topic where the event will be send on | ||
* @param {String} sender The event that will be send | ||
* @param {Array <String>} listeners List of the events the callback should listen for | ||
* @param {Object} payload Payload data to send with the event | ||
*/ | ||
eventCallback: callbackController({ producer, consumer }), | ||
}; | ||
@@ -108,0 +99,0 @@ |
@@ -22,3 +22,3 @@ const { Kafka, logLevel } = require("kafkajs"); | ||
initObservables() { | ||
if (this.options.topicsToFollow.forEach) { | ||
if(this.options.topicsToFollow.forEach) { | ||
this.options.topicsToFollow.forEach(topic => { | ||
@@ -33,3 +33,3 @@ this[`${topic}Subject`] = new Subject(); | ||
sendUpdateToObservers({ topic, payload, headers }) { | ||
this[`${topic}Subject`].next({ ...payload, headers }); | ||
this[`${topic}Subject`].next({...payload, headers}); | ||
} | ||
@@ -46,2 +46,3 @@ | ||
maxBytesPerPartition: this.options.maxBytesPerPartition || kafkaConfig.maxBytesPerPartition, | ||
maxWaitTimeInMs: 25, | ||
}); | ||
@@ -64,6 +65,6 @@ | ||
try { | ||
try{ | ||
await Promise.all(this.options.topicsToFollow.map(topic => this.consumer.subscribe({ topic }))); | ||
} catch (e0) { | ||
try { | ||
try{ | ||
logging.error(`Error during subscribing: | ||
@@ -83,3 +84,3 @@ ${e0}`); | ||
try { | ||
try{ | ||
await this.consumer.run({ | ||
@@ -103,3 +104,3 @@ autoCommitInterval: kafkaConfig.autoCommitInterval, | ||
if (this.options.logAllEvents === "true") { | ||
if(this.options.logAllEvents === "true") { | ||
logging.report({ | ||
@@ -106,0 +107,0 @@ topic: topic, |
@@ -22,3 +22,3 @@ const { Kafka, logLevel } = require("kafkajs"); | ||
initObservables() { | ||
if (this.options.topicsToFollow.forEach) { | ||
if(this.options.topicsToFollow.forEach) { | ||
this.options.topicsToFollow.forEach(topic => { | ||
@@ -33,3 +33,3 @@ this[`${topic}Subject`] = new Subject(); | ||
sendUpdateToObservers({ topic, payload, headers }) { | ||
this[`${topic}Subject`].next({ ...payload, headers }); | ||
this[`${topic}Subject`].next({...payload, headers}); | ||
} | ||
@@ -50,2 +50,3 @@ | ||
maxBytesPerPartition: this.options.maxBytesPerPartition || kafkaConfig.maxBytesPerPartition, | ||
maxWaitTimeInMs: 25, | ||
}); | ||
@@ -68,6 +69,6 @@ | ||
try { | ||
try{ | ||
await Promise.all(this.options.topicsToFollow.map(topic => this.consumer.subscribe({ topic }))); | ||
} catch (e0) { | ||
try { | ||
try{ | ||
logging.error(`Error during subscribing: | ||
@@ -87,3 +88,3 @@ ${e0}`); | ||
try { | ||
try{ | ||
await this.consumer.run({ | ||
@@ -107,3 +108,3 @@ autoCommitInterval: kafkaConfig.autoCommitInterval, | ||
if (this.options.logAllEvents === "true") { | ||
if(this.options.logAllEvents === "true") { | ||
logging.report({ | ||
@@ -110,0 +111,0 @@ topic: topic, |
@@ -8,3 +8,3 @@ | ||
module.exports = class Producer { | ||
module.exports = class Producer{ | ||
constructor(options) { | ||
@@ -16,3 +16,3 @@ this.options = options; | ||
this.client = new Kafka({ | ||
logLevel: logLevel.INFO, | ||
logLevel: logLevel. INFO, | ||
brokers: [options.kafkaHost], | ||
@@ -27,18 +27,19 @@ clientId: options.serviceId, | ||
.then(() => { | ||
this.ready = true; | ||
this.queue.forEach(messageMetadata => this.sendMessage(messageMetadata)); | ||
this.ready = true; | ||
this.queue.forEach(messageMetadata => this.sendMessage(messageMetadata)); | ||
this.livenessInterval = setInterval(() => this.sendMessage({ | ||
payload: { | ||
event: this.options.liveness.event || kafkaConfig.events.liveness.ALIVE_TICK, | ||
service: this.options.serviceId, | ||
}, | ||
topic: this.options.liveness.topic, | ||
}), this.options.liveness.interval); | ||
this.livenessInterval = setInterval(() => this.sendMessage({ | ||
payload: { | ||
event: this.options.liveness.event || kafkaConfig.events.liveness.ALIVE_TICK, | ||
service: this.options.serviceId, | ||
}, | ||
topic: this.options.liveness.topic, | ||
}), this.options.liveness.interval); | ||
}) | ||
.catch(error => logging.error(error)); | ||
.catch(error => logging.error(error)) | ||
} | ||
async sendMessage({ payload, topic, headers }) { | ||
if (!this.ready) { | ||
if(!this.ready) { | ||
this.queue.push({ payload, topic }); | ||
@@ -53,5 +54,6 @@ return; | ||
headers, | ||
acks: 1, // only wait for leader acceptance | ||
}], | ||
}); | ||
} | ||
}; | ||
} |
{ | ||
"name": "kafka-observe", | ||
"version": "2.0.3", | ||
"version": "2.0.4-beta-1", | ||
"description": "", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "NODE_ENV=test ./node_modules/mocha/bin/mocha ./test/test.config.js ./test/unit/**/*.js --exit --timeout 5000", | ||
"coverage": "nyc npm run test" | ||
"test": "echo \"Error: no test specified\" && exit 1" | ||
}, | ||
@@ -13,9 +12,5 @@ "author": "", | ||
"devDependencies": { | ||
"chai": "^4.2.0", | ||
"eslint": "^5.5.0", | ||
"eslint-plugin-jest": "^22.7.2", | ||
"eslint-plugin-prefer-object-spread": "^1.2.1", | ||
"mocha": "^7.2.0", | ||
"nyc": "^15.1.0", | ||
"sinon": "^9.0.2", | ||
"tslint": "^5.11.0" | ||
@@ -22,0 +17,0 @@ }, |
@@ -57,4 +57,3 @@ # kafka-observe | ||
`producer`: instance of a connected producer | ||
`getTopicSubject`: a function that returns a topic to subscribe to with the correct strategy. | ||
`eventCallback`: a function that sends out an event and listens for a response | ||
`getTopicSubject`: a function that returns a topic to subscribe to with the correct strategy. | ||
@@ -68,3 +67,3 @@ ## getTopicSubject | ||
getTopicSubject({ | ||
getTopicSubject.getTopicSubject({ | ||
topic: "topic name", | ||
@@ -77,19 +76,2 @@ loadBalanced: "true|false", // True for Loadbalanced strategy false for All strategy | ||
## eventCallback | ||
Returns a subject you can subscribe to listen to events of a certain topic. | ||
```javascript | ||
const { eventCallback } = require("kafka-observe")(options); | ||
eventCallback({ | ||
topic: "topic name", | ||
sender: "event", | ||
listeners: ["listen_for_event"], | ||
payload: {} // An object you want send with the event (needs to be JSON convertable) | ||
}).then((event) => { | ||
// do something with the event | ||
}); | ||
``` | ||
## producer | ||
@@ -96,0 +78,0 @@ |
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 6 instances in 1 package
No tests
QualityPackage does not have any tests. This is a strong signal of a poorly maintained or low quality package.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
21410
4
0
449
3
113
12