kafka-observe
Advanced tools
Comparing version 2.0.2 to 2.0.3
@@ -12,2 +12,4 @@ function generateRandomToken({ length } = { length: 24 }) { | ||
module.exports = { generateRandomToken }; | ||
module.exports = { | ||
generateRandomToken, | ||
}; |
19
index.js
const { Consumer, ConsumerGroup, Producer } = require("./IO"); | ||
const { kafkaConfig } = require("./config"); | ||
const { callbackController } = require("./controllers"); | ||
@@ -28,4 +28,3 @@ let instance, consumer, consumerGroup, producer; | ||
module.exports = (options) => { | ||
if(instance) { | ||
if (instance) { | ||
return instance; | ||
@@ -48,3 +47,3 @@ } | ||
if(enableConsumer) { | ||
if (enableConsumer) { | ||
consumer = new Consumer({ | ||
@@ -62,3 +61,3 @@ topicsToFollow: topicsToFollow | ||
if(enableConsumerGroup) { | ||
if (enableConsumerGroup) { | ||
consumerGroup = new ConsumerGroup({ | ||
@@ -97,2 +96,12 @@ topicsToFollow: topicsToFollow | ||
consumer.getTopicSubject(topic), | ||
/** | ||
* @description | ||
* Callback sends out a events and waits on for one off the listeners to return a response | ||
* @param {String} topic The topic where the event will be send on | ||
* @param {String} sender The event that will be send | ||
* @param {Array <String>} listeners List of the events the callback should listen for | ||
* @param {Object} payload Payload data to send with the event | ||
*/ | ||
eventCallback: callbackController({ producer, consumer }), | ||
}; | ||
@@ -99,0 +108,0 @@ |
@@ -11,3 +11,3 @@ const { Kafka, logLevel } = require("kafkajs"); | ||
this.client = new Kafka({ | ||
logLevel: logLevel. INFO, | ||
logLevel: logLevel.INFO, | ||
brokers: [options.kafkaHost], | ||
@@ -23,3 +23,3 @@ clientId: options.serviceId, | ||
initObservables() { | ||
if(this.options.topicsToFollow.forEach) { | ||
if (this.options.topicsToFollow.forEach) { | ||
this.options.topicsToFollow.forEach(topic => { | ||
@@ -34,3 +34,3 @@ this[`${topic}Subject`] = new Subject(); | ||
sendUpdateToObservers({ topic, payload, headers }) { | ||
this[`${topic}Subject`].next({...payload, headers}); | ||
this[`${topic}Subject`].next({ ...payload, headers }); | ||
} | ||
@@ -64,6 +64,6 @@ | ||
try{ | ||
try { | ||
await Promise.all(this.options.topicsToFollow.map(topic => this.consumer.subscribe({ topic }))); | ||
} catch (e0) { | ||
try{ | ||
try { | ||
logging.error(`Error during subscribing: | ||
@@ -83,3 +83,3 @@ ${e0}`); | ||
try{ | ||
try { | ||
await this.consumer.run({ | ||
@@ -103,3 +103,3 @@ autoCommitInterval: kafkaConfig.autoCommitInterval, | ||
if(this.options.logAllEvents === "true") { | ||
if (this.options.logAllEvents === "true") { | ||
logging.report({ | ||
@@ -106,0 +106,0 @@ topic: topic, |
@@ -11,3 +11,3 @@ const { Kafka, logLevel } = require("kafkajs"); | ||
this.client = new Kafka({ | ||
logLevel: logLevel. INFO, | ||
logLevel: logLevel.INFO, | ||
brokers: [options.kafkaHost], | ||
@@ -23,3 +23,3 @@ clientId: options.serviceId, | ||
initObservables() { | ||
if(this.options.topicsToFollow.forEach) { | ||
if (this.options.topicsToFollow.forEach) { | ||
this.options.topicsToFollow.forEach(topic => { | ||
@@ -34,3 +34,3 @@ this[`${topic}Subject`] = new Subject(); | ||
sendUpdateToObservers({ topic, payload, headers }) { | ||
this[`${topic}Subject`].next({...payload, headers}); | ||
this[`${topic}Subject`].next({ ...payload, headers }); | ||
} | ||
@@ -68,6 +68,6 @@ | ||
try{ | ||
try { | ||
await Promise.all(this.options.topicsToFollow.map(topic => this.consumer.subscribe({ topic }))); | ||
} catch (e0) { | ||
try{ | ||
try { | ||
logging.error(`Error during subscribing: | ||
@@ -87,3 +87,3 @@ ${e0}`); | ||
try{ | ||
try { | ||
await this.consumer.run({ | ||
@@ -107,3 +107,3 @@ autoCommitInterval: kafkaConfig.autoCommitInterval, | ||
if(this.options.logAllEvents === "true") { | ||
if (this.options.logAllEvents === "true") { | ||
logging.report({ | ||
@@ -110,0 +110,0 @@ topic: topic, |
@@ -8,3 +8,3 @@ | ||
module.exports = class Producer{ | ||
module.exports = class Producer { | ||
constructor(options) { | ||
@@ -16,3 +16,3 @@ this.options = options; | ||
this.client = new Kafka({ | ||
logLevel: logLevel. INFO, | ||
logLevel: logLevel.INFO, | ||
brokers: [options.kafkaHost], | ||
@@ -27,19 +27,18 @@ clientId: options.serviceId, | ||
.then(() => { | ||
this.ready = true; | ||
this.queue.forEach(messageMetadata => this.sendMessage(messageMetadata)); | ||
this.ready = true; | ||
this.queue.forEach(messageMetadata => this.sendMessage(messageMetadata)); | ||
this.livenessInterval = setInterval(() => this.sendMessage({ | ||
payload: { | ||
event: this.options.liveness.event || kafkaConfig.events.liveness.ALIVE_TICK, | ||
service: this.options.serviceId, | ||
}, | ||
topic: this.options.liveness.topic, | ||
}), this.options.liveness.interval); | ||
this.livenessInterval = setInterval(() => this.sendMessage({ | ||
payload: { | ||
event: this.options.liveness.event || kafkaConfig.events.liveness.ALIVE_TICK, | ||
service: this.options.serviceId, | ||
}, | ||
topic: this.options.liveness.topic, | ||
}), this.options.liveness.interval); | ||
}) | ||
.catch(error => logging.error(error)) | ||
.catch(error => logging.error(error)); | ||
} | ||
async sendMessage({ payload, topic, headers }) { | ||
if(!this.ready) { | ||
if (!this.ready) { | ||
this.queue.push({ payload, topic }); | ||
@@ -57,2 +56,2 @@ return; | ||
} | ||
} | ||
}; |
{ | ||
"name": "kafka-observe", | ||
"version": "2.0.2", | ||
"version": "2.0.3", | ||
"description": "", | ||
"main": "index.js", | ||
"scripts": { | ||
"test": "echo \"Error: no test specified\" && exit 1" | ||
"test": "NODE_ENV=test ./node_modules/mocha/bin/mocha ./test/test.config.js ./test/unit/**/*.js --exit --timeout 5000", | ||
"coverage": "nyc npm run test" | ||
}, | ||
@@ -12,5 +13,9 @@ "author": "", | ||
"devDependencies": { | ||
"chai": "^4.2.0", | ||
"eslint": "^5.5.0", | ||
"eslint-plugin-jest": "^22.7.2", | ||
"eslint-plugin-prefer-object-spread": "^1.2.1", | ||
"mocha": "^7.2.0", | ||
"nyc": "^15.1.0", | ||
"sinon": "^9.0.2", | ||
"tslint": "^5.11.0" | ||
@@ -17,0 +22,0 @@ }, |
@@ -57,3 +57,4 @@ # kafka-observe | ||
`producer`: instance of a connected producer | ||
`getTopicSubject`: a function that returns a topic to subscribe to with the correct strategy. | ||
`getTopicSubject`: a function that returns a topic to subscribe to with the correct strategy. | ||
`eventCallback`: a function that sends out an event and listens for a response | ||
@@ -67,3 +68,3 @@ ## getTopicSubject | ||
getTopicSubject.getTopicSubject({ | ||
getTopicSubject({ | ||
topic: "topic name", | ||
@@ -76,2 +77,19 @@ loadBalanced: "true|false", // True for Loadbalanced strategy false for All strategy | ||
## eventCallback | ||
Returns a subject you can subscribe to listen to events of a certain topic. | ||
```javascript | ||
const { eventCallback } = require("kafka-observe")(options); | ||
eventCallback({ | ||
topic: "topic name", | ||
sender: "event", | ||
listeners: ["listen_for_event"], | ||
payload: {} // An object you want send with the event (needs to be JSON convertable) | ||
}).then((event) => { | ||
// do something with the event | ||
}); | ||
``` | ||
## producer | ||
@@ -78,0 +96,0 @@ |
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
No tests
QualityPackage does not have any tests. This is a strong signal of a poorly maintained or low quality package.
Found 1 instance in 1 package
21402
20
472
1
131
8
1