Comparing version 2.2.3 to 2.2.4
@@ -68,21 +68,36 @@ 'use strict'; | ||
var handle_consumer_data = function handle_consumer_data(data, topic, id, work, exit) { | ||
var parsed = data.split(delimiter); | ||
(0, _logger.log)('Consumed data from ' + topic + ': ' + parsed); | ||
parsed.pop(); // Empty string after delimiter | ||
parsed.forEach(function (item) { | ||
var results = work(item); | ||
var deserialized = {}; | ||
var parsed = data.split(delimiter).reverse(); | ||
// Remove any empty string after the delimiter | ||
if (parsed[0] === '') parsed.shift(); | ||
var i = parsed.length; | ||
while (i--) { | ||
try { | ||
deserialized = JSON.parse(item); | ||
// Attempt to parse our most recent chunk | ||
var deserialized = JSON.parse(parsed[i]); | ||
var payload = void 0; | ||
(0, _logger.log)('Consumed data from ' + topic + ': ' + deserialized.payload); | ||
var results = work(deserialized.payload); | ||
try { | ||
payload = JSON.parse(deserialized.payload); | ||
} catch (e) { | ||
payload = deserialized.payload; | ||
} | ||
if (payload.response_topic) (0, _produce.produce)(payload.response_topic, results); | ||
} catch (err) { | ||
deserialized.response_topic = false; | ||
// Incomplete chunk from string, save for next event | ||
(0, _logger.error)('Unable to parse data chunk: ' + parsed[i], err); | ||
break; | ||
} | ||
if (deserialized.response_topic) (0, _produce.produce)(deserialized.response_topic, results); | ||
}); | ||
parsed.splice(i, 1); | ||
} | ||
if (exit) teardown_consumer(topic, id); | ||
return parsed; | ||
return parsed.join(''); | ||
}; | ||
@@ -135,6 +150,5 @@ | ||
var id = _uuid2.default.v4(); | ||
var consume_options = ['-b', _run.brokers, '-D', delimiter, '-o', offset, '-u'].concat(consumer_type); | ||
var consume_options = ['-b', _run.brokers, '-D', delimiter, '-o', offset, '-u', '-J'].concat(consumer_type); | ||
var stdout = ''; | ||
var stderr = ''; | ||
@@ -145,18 +159,9 @@ (0, _logger.log)('Consuming ' + topic + ' at offset ' + offset); | ||
consumer.stdout.on('data', function (data) { | ||
return stdout += data.toString(); | ||
stdout += data.toString(); | ||
stdout = handle_consumer_data(stdout, topic, id, work, exit); | ||
}); | ||
consumer.stderr.on('data', function (data) { | ||
return stderr += data.toString(); | ||
handle_consumer_error(data.toString()); | ||
}); | ||
consumer.stdout.on('end', function () { | ||
handle_consumer_data(stdout, topic, id, work, exit); | ||
stdout = ''; | ||
return stdout; | ||
}); | ||
consumer.stderr.on('end', function () { | ||
handle_consumer_error(stderr); | ||
stderr = ''; | ||
return stderr; | ||
}); | ||
consumer.on('close', handle_consumer_close); | ||
@@ -163,0 +168,0 @@ |
{ | ||
"name": "elytron", | ||
"version": "2.2.3", | ||
"version": "2.2.4", | ||
"description": "An interface for Kafka in Node", | ||
@@ -5,0 +5,0 @@ "main": "./dist/src/index.js", |
@@ -54,20 +54,32 @@ import uuid from 'uuid'; | ||
const handle_consumer_data = (data, topic, id, work, exit) => { | ||
let parsed = data.split(delimiter); | ||
log(`Consumed data from ${topic}: ${parsed}`); | ||
parsed.pop(); // Empty string after delimiter | ||
parsed.forEach((item) => { | ||
let results = work(item); | ||
let deserialized = {}; | ||
let parsed = data.split(delimiter).reverse(); | ||
// Remove any empty string after the delimiter | ||
if (parsed[0] === '') parsed.shift(); | ||
try { deserialized = JSON.parse(item); } | ||
catch (err) { deserialized.response_topic = false; } | ||
let i = parsed.length; | ||
if (deserialized.response_topic) produce( | ||
deserialized.response_topic, results | ||
); | ||
}); | ||
while (i--) { | ||
try { // Attempt to parse our most recent chunk | ||
let deserialized = JSON.parse(parsed[i]); | ||
let payload; | ||
log(`Consumed data from ${topic}: ${deserialized.payload}`); | ||
let results = work(deserialized.payload); | ||
try { payload = JSON.parse(deserialized.payload); } | ||
catch (e) { payload = deserialized.payload; } | ||
if (payload.response_topic) produce(payload.response_topic, results); | ||
} | ||
catch (err) { // Incomplete chunk from string, save for next event | ||
error(`Unable to parse data chunk: ${parsed[i]}`, err); | ||
break; | ||
} | ||
parsed.splice(i, 1); | ||
} | ||
if (exit) teardown_consumer(topic, id); | ||
return parsed; | ||
return parsed.join(''); | ||
}; | ||
@@ -123,7 +135,6 @@ | ||
const consume_options = [ | ||
'-b', brokers, '-D', delimiter, '-o', offset, '-u' | ||
'-b', brokers, '-D', delimiter, '-o', offset, '-u', '-J', | ||
].concat(consumer_type); | ||
let stdout = ''; | ||
let stderr = ''; | ||
@@ -133,15 +144,10 @@ log(`Consuming ${topic} at offset ${offset}`); | ||
consumer.stdout.on('data', data => stdout += data.toString()); | ||
consumer.stderr.on('data', data => stderr += data.toString()); | ||
consumer.stdout.on('end', () => { | ||
handle_consumer_data(stdout, topic, id, work, exit); | ||
stdout = ''; | ||
return stdout; | ||
consumer.stdout.on('data', (data) => { | ||
stdout += data.toString(); | ||
stdout = handle_consumer_data(stdout, topic, id, work, exit); | ||
}); | ||
consumer.stderr.on('end', () => { | ||
handle_consumer_error(stderr); | ||
stderr = ''; | ||
return stderr; | ||
consumer.stderr.on('data', (data) => { | ||
handle_consumer_error(data.toString()); | ||
}); | ||
consumer.on('close', handle_consumer_close); | ||
@@ -148,0 +154,0 @@ |
61061
26
613