kafka-observe
Advanced tools
Comparing version 2.1.1-beta-7 to 2.1.1-beta-8
@@ -35,26 +35,26 @@ const { filter } = require("rxjs/operators"); | ||
const subscription = consumer.getTopicSubject(topic) | ||
.pipe(filter(item => events.includes(item.event) && (guard ? guard(item) : true)) | ||
.subscribe((item => { | ||
if (item.error) { | ||
reject(result); | ||
return; | ||
} | ||
.pipe(filter(item => events.includes(item.event) && (guard ? guard(item) : true))) | ||
.subscribe((item => { | ||
if (item.error) { | ||
reject(result); | ||
return; | ||
} | ||
result[item.headers.page] = item.payload; | ||
onStatus && onStatus(item); | ||
result[item.headers.page] = item.payload; | ||
onStatus && onStatus(item); | ||
if (!item.headers.isFinished) { | ||
return; | ||
} | ||
if (!item.headers.isFinished) { | ||
return; | ||
} | ||
const json = Object.keys(result). | ||
sort((a, b) => a - b).reduce((prev, curr) => prev + result[curr], ""); | ||
const json = Object.keys(result). | ||
sort((a, b) => a - b).reduce((prev, curr) => prev + result[curr], ""); | ||
subscription.unsubscribe(); | ||
resolve({ | ||
payload: JSON.parse(json), | ||
headers: item.headers, | ||
event: item.event, | ||
}); | ||
}))); | ||
subscription.unsubscribe(); | ||
resolve({ | ||
payload: JSON.parse(json), | ||
headers: item.headers, | ||
event: item.event, | ||
}); | ||
})); | ||
}), | ||
@@ -61,0 +61,0 @@ callback: ({ consumer, producer }) => ({ topic, payload, sender, listeners, onStatus }) => { |
{ | ||
"name": "kafka-observe", | ||
"version": "2.1.1-beta-7", | ||
"version": "2.1.1-beta-8", | ||
"description": "", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
32036