kafka-observe
Advanced tools
Comparing version 2.1.1-beta-8 to 2.1.1-beta-9
@@ -6,28 +6,3 @@ const { filter } = require("rxjs/operators"); | ||
module.exports = { | ||
write: ({ producer }) => ({ topic, headers, event, payload, limit = 819200 }) => { | ||
const data = JSON.stringify(payload); | ||
const totalSize = data.length; | ||
const total = Math.ceil(totalSize / limit); | ||
for (let page = 1; page <= total; page++) { | ||
const offset = (page - 1) * limit; | ||
const result = data.slice(offset, offset + limit); | ||
producer.sendMessage({ | ||
headers: { | ||
...headers, | ||
page, | ||
total, | ||
limit, | ||
totalSize, | ||
size: result.length, | ||
}, | ||
topic, | ||
payload: { | ||
payload: result, | ||
event, | ||
}, | ||
}); | ||
} | ||
}, | ||
write: writeStream, | ||
read: ({ consumer }) => ({ topic, events, guard, onStatus }) => new Promise((resolve, reject) => { | ||
@@ -112,2 +87,54 @@ let result = {}; | ||
}, | ||
writeCallback: ({ consumer, producer }) => ({ topic, payload, sender, listeners, limit }) => { | ||
const token = generatorHelpers.generateUuid(); | ||
function listenerForProducer({ topic, events, token }) { | ||
return new Promise((resolve, reject) => { | ||
const subscription = consumer.getTopicSubject(topic) | ||
.pipe(filter(item => events.includes(item.event) && item.headers && item.headers.token === token)) | ||
.subscribe((item => { | ||
subscription.unsubscribe(); | ||
return item.error ? reject(item) : resolve(item); | ||
})); | ||
}); | ||
} | ||
writeStream({ producer })({ topic, event: sender, payload, limit }); | ||
return listenerForProducer({ | ||
topic, | ||
events: listeners, | ||
token, | ||
}); | ||
}, | ||
}; | ||
function writeStream({ producer }) { | ||
return ({ topic, headers, event, payload, limit = 524288 }) => { | ||
const data = JSON.stringify(payload); | ||
const totalSize = data.length; | ||
const total = Math.ceil(totalSize / limit); | ||
for (let page = 1; page <= total; page++) { | ||
const offset = (page - 1) * limit; | ||
const result = data.slice(offset, offset + limit); | ||
producer.sendMessage({ | ||
headers: { | ||
...headers, | ||
page, | ||
total, | ||
limit, | ||
totalSize, | ||
size: result.length, | ||
}, | ||
topic, | ||
payload: { | ||
payload: result, | ||
event, | ||
}, | ||
}); | ||
} | ||
}; | ||
} |
13
index.js
@@ -125,3 +125,3 @@ const { Consumer, ConsumerGroup, Producer } = require("./IO"); | ||
* @description | ||
* Consume stream data | ||
* Produce event and consume stream data | ||
* @param {String} topic The topic on which the stream is send on | ||
@@ -137,2 +137,13 @@ * @param {String} sender The event that will be send | ||
* @description | ||
* Produce stream event and consume event | ||
* @param {String} topic The topic on which the stream is send on | ||
* @param {String} sender The event that will be send | ||
* @param {Array <String>} listeners List of the events the callback should listen for | ||
* @param {Object} payload Payload data to send on produce | ||
* @param {Number} limit The max data size in byte each stream can contain | ||
*/ | ||
streamWriteCallback: streamController.writeCallback({ producer, consumer }), | ||
/** | ||
* @description | ||
* Timeout wrapper for callback will throw error when timeout is passed and no response is received | ||
@@ -139,0 +150,0 @@ * @param {Function} action Async promise that will return a promise on finish |
{ | ||
"name": "kafka-observe", | ||
"version": "2.1.1-beta-8", | ||
"version": "2.1.1-beta-9", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
33307
736