New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

elytron

Package Overview
Dependencies
Maintainers
1
Versions
29
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

elytron - npm Package Compare versions

Comparing version 2.2.3 to 2.2.4

bar

55

dist/src/consume/index.js

@@ -68,21 +68,36 @@ 'use strict';

var handle_consumer_data = function handle_consumer_data(data, topic, id, work, exit) {
var parsed = data.split(delimiter);
(0, _logger.log)('Consumed data from ' + topic + ': ' + parsed);
parsed.pop(); // Empty string after delimiter
parsed.forEach(function (item) {
var results = work(item);
var deserialized = {};
var parsed = data.split(delimiter).reverse();
// Remove any empty string after the delimiter
if (parsed[0] === '') parsed.shift();
var i = parsed.length;
while (i--) {
try {
deserialized = JSON.parse(item);
// Attempt to parse our most recent chunk
var deserialized = JSON.parse(parsed[i]);
var payload = void 0;
(0, _logger.log)('Consumed data from ' + topic + ': ' + deserialized.payload);
var results = work(deserialized.payload);
try {
payload = JSON.parse(deserialized.payload);
} catch (e) {
payload = deserialized.payload;
}
if (payload.response_topic) (0, _produce.produce)(payload.response_topic, results);
} catch (err) {
deserialized.response_topic = false;
// Incomplete chunk from string, save for next event
(0, _logger.error)('Unable to parse data chunk: ' + parsed[i], err);
break;
}
if (deserialized.response_topic) (0, _produce.produce)(deserialized.response_topic, results);
});
parsed.splice(i, 1);
}
if (exit) teardown_consumer(topic, id);
return parsed;
return parsed.join('');
};

@@ -135,6 +150,5 @@

var id = _uuid2.default.v4();
var consume_options = ['-b', _run.brokers, '-D', delimiter, '-o', offset, '-u'].concat(consumer_type);
var consume_options = ['-b', _run.brokers, '-D', delimiter, '-o', offset, '-u', '-J'].concat(consumer_type);
var stdout = '';
var stderr = '';

@@ -145,18 +159,9 @@ (0, _logger.log)('Consuming ' + topic + ' at offset ' + offset);

consumer.stdout.on('data', function (data) {
return stdout += data.toString();
stdout += data.toString();
stdout = handle_consumer_data(stdout, topic, id, work, exit);
});
consumer.stderr.on('data', function (data) {
return stderr += data.toString();
handle_consumer_error(data.toString());
});
consumer.stdout.on('end', function () {
handle_consumer_data(stdout, topic, id, work, exit);
stdout = '';
return stdout;
});
consumer.stderr.on('end', function () {
handle_consumer_error(stderr);
stderr = '';
return stderr;
});
consumer.on('close', handle_consumer_close);

@@ -163,0 +168,0 @@

{
"name": "elytron",
"version": "2.2.3",
"version": "2.2.4",
"description": "An interface for Kafka in Node",

@@ -5,0 +5,0 @@ "main": "./dist/src/index.js",

@@ -54,20 +54,32 @@ import uuid from 'uuid';

const handle_consumer_data = (data, topic, id, work, exit) => {
let parsed = data.split(delimiter);
log(`Consumed data from ${topic}: ${parsed}`);
parsed.pop(); // Empty string after delimiter
parsed.forEach((item) => {
let results = work(item);
let deserialized = {};
let parsed = data.split(delimiter).reverse();
// Remove any empty string after the delimiter
if (parsed[0] === '') parsed.shift();
try { deserialized = JSON.parse(item); }
catch (err) { deserialized.response_topic = false; }
let i = parsed.length;
if (deserialized.response_topic) produce(
deserialized.response_topic, results
);
});
while (i--) {
try { // Attempt to parse our most recent chunk
let deserialized = JSON.parse(parsed[i]);
let payload;
log(`Consumed data from ${topic}: ${deserialized.payload}`);
let results = work(deserialized.payload);
try { payload = JSON.parse(deserialized.payload); }
catch (e) { payload = deserialized.payload; }
if (payload.response_topic) produce(payload.response_topic, results);
}
catch (err) { // Incomplete chunk from string, save for next event
error(`Unable to parse data chunk: ${parsed[i]}`, err);
break;
}
parsed.splice(i, 1);
}
if (exit) teardown_consumer(topic, id);
return parsed;
return parsed.join('');
};

@@ -123,7 +135,6 @@

const consume_options = [
'-b', brokers, '-D', delimiter, '-o', offset, '-u'
'-b', brokers, '-D', delimiter, '-o', offset, '-u', '-J',
].concat(consumer_type);
let stdout = '';
let stderr = '';

@@ -133,15 +144,10 @@ log(`Consuming ${topic} at offset ${offset}`);

consumer.stdout.on('data', data => stdout += data.toString());
consumer.stderr.on('data', data => stderr += data.toString());
consumer.stdout.on('end', () => {
handle_consumer_data(stdout, topic, id, work, exit);
stdout = '';
return stdout;
consumer.stdout.on('data', (data) => {
stdout += data.toString();
stdout = handle_consumer_data(stdout, topic, id, work, exit);
});
consumer.stderr.on('end', () => {
handle_consumer_error(stderr);
stderr = '';
return stderr;
consumer.stderr.on('data', (data) => {
handle_consumer_error(data.toString());
});
consumer.on('close', handle_consumer_close);

@@ -148,0 +154,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc