Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

kafka-sse

Package Overview
Dependencies
Maintainers
1
Versions
11
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

kafka-sse - npm Package Compare versions

Comparing version 0.4.0 to 0.4.2

224

lib/KafkaSSE.js

@@ -20,3 +20,2 @@ 'use strict';

/**

@@ -205,2 +204,12 @@ * Represents a Kafka Consumer -> Server Side Events connection.

// If we won't be responding to any rdkafka events,
// there is no need to set up the rdkafka event emitter.
// This is a workaround for a memory leak bug currently
// in node-rdkafka: https://github.com/Blizzard/node-rdkafka/issues/731
// If you do set any kafkaEventHandlers, be aware you may encounter a
// memory leak until this bug is fixed.
if (!this.options.kafkaEventHandlers) {
defaultKafkaConfig.event_cb = false;
}
// These configs MUST be set for a KafkaSSE KafkaConsumer;

@@ -222,3 +231,2 @@ // they are not overridable.

'group.id': `KafkaSSE-${this.id}`
};

@@ -233,10 +241,50 @@

// Call this.disconnect() when the http client request ends.
this.req.on('abort', () => this.disconnect());
this.req.on('aborted', () => this.disconnect());
this.req.on('close', () => this.disconnect());
this.req.on('error', (e) => this.disconnect(e));
this.res.on('finish', () => this.disconnect());
this.res.on('close', () => this.disconnect());
this.res.on('error', (e) => this.disconnect(e));
// Set up HTTP request and response end handlers.
// We want to call this.disconnect() in response to any of these
// events. We only need to call this.disconnect() the first time
// one of these happens, so disconnect() will call
// this.removeHttpEndListeners to remove all of these listeners
// as soon as disconnect() is called.
const onceReqAbort = () => this.disconnect('HTTP request abort');
const onceReqAborted = () => this.disconnect('HTTP request aborted');
const onceReqClose = () => this.disconnect('HTTP request close');
const onceReqError = (e) => {
this.log.error(
{err: e}, 'HTTP request encountered an error, calling KafkaSSE disconnect.'
);
this.disconnect('HTTP request error');
};
const onceResFinish = () => this.disconnect('HTTP response finish');
const onceResClose = () => this.disconnect('HTTP response close');
const onceResError = (e) => {
this.log.error(
{err: e}, 'HTTP response encountered an error, calling KafkaSSE disconnect.'
);
this.disconnect('HTTP response error');
};
this.req.once('abort', onceReqAbort);
this.req.once('aborted', onceReqAborted);
this.req.once('close', onceReqClose);
this.req.once('error', onceReqError);
this.res.once('finish', onceResFinish);
this.res.once('close', onceResClose);
this.res.once('error', onceResError);
// Will be called theh first time disconnect() is called to keep it
// from being called multiple times as the connection closes.
this.removeHttpEndListeners = () => {
this.log.debug('Removing all HTTP end listeners.');
if (this.req) {
this.req.removeListener('abort', onceReqAbort);
this.req.removeListener('aborted', onceReqAborted);
this.req.removeListener('close', onceReqClose);
this.req.removeListener('error', onceReqError);
}
if (this.res) {
this.res.removeListener('finish', onceResFinish);
this.res.removeListener('close', onceResClose);
this.res.removeListener('error', onceResError);
}
};
}

@@ -251,3 +299,3 @@

* Once _start is called, a 200 response header will be written (via
* sseClient.initialize()), and any further errors must be reported to the
* sse.start()), and any further errors must be reported to the
* client by emitting an error SSE event.

@@ -300,3 +348,8 @@ *

// error message to the client
.catch(() => {});
.catch((sseSendError) => {
this.log.error(
{err: sseSendError},
'Caught error while attempting to send SSE error event'
);
});
}

@@ -316,8 +369,17 @@ });

})
// Close KafkaConsumer, and either this.sseClient or this.res http response.
.finally(() => this.disconnect());
.finally(() => {
// Just in case we get here for a reason other than the HTTP request closing
// or error handling, make sure we disconnect everything properly,
// especially the KafkaConsumer.
if (!this.is_finished) {
return this.disconnect('Finally finished consume loop and error handling');
}
});
// wait for the `done` event to return
// connect() will resolve after the KafkaSSE `done` event is fired by disconnect().
return new P((resolve, reject) => {
this._eventEmitter.on('done', resolve);
this._eventEmitter.on('done', () => {
this.log.info('KafkaSSE connection done.');
resolve();
});
});

@@ -400,8 +462,6 @@ }

);
consumer.on(event, this.options.kafkaEventHandlers[event]);
this.kafkaConsumer.on(event, this.options.kafkaEventHandlers[event]);
});
}
// Save our consumer.
// this.kafkaConsumer = consumer;
return this.kafkaConsumer;

@@ -514,3 +574,3 @@ })

// Start the consume -> sse send loop.
this.log.info('Initializing sseClient and starting consume loop.');
this.log.info('Initializing KafkaSSE connection and starting consume loop.');

@@ -531,3 +591,3 @@ const responseHeaders = {};

// Initialize the sse response and start sending
// Initialize the SSEResponse and start sending
// the response in chunked transfer encoding.

@@ -556,6 +616,6 @@ this.sse = new SSEResponse(this.res, {

.then((kafkaMessage) => {
// If the request is finished (by calling this.disconnect()),
// If the request is finished (something called this.disconnect()),
// then exit the consume loop now by returning a resolved promise.
if (this.is_finished || !this.sse || this._resFinished()) {
this.log.info('Finished. Returning from consume loop.');
if (this.is_finished) {
this.log.debug('KafkaSSE connection finished. Returning from consume loop.');
return P.resolve();

@@ -582,8 +642,6 @@ }

return new P((resolve, reject) => {
setTimeout(() => {
if (this.is_finished || this._resFinished()) {
return reject('Loop finished');
}
resolve(Date.now());
}, 1);
setTimeout(
() => resolve(Date.now()),
1
);
});

@@ -640,4 +698,4 @@ }

// don't try to consume anything.
if (this.is_finished || this._resFinished()) {
this.log.debug('Finished. Not attempting consume.');
if (this.is_finished) {
this.log.debug('KafkaSSE connection finished. Not attempting consume.');
return P.resolve();

@@ -801,2 +859,9 @@ }

/**
* Checks if the HTTP response is finished.
* Returns true if if this.res is undefined, or if the this.res.finished,
* or if res.connection.destroyed.
*
* @return {boolean}
*/
_resFinished() {

@@ -810,11 +875,16 @@ const res = this.res;

/**
* Disconnects the KafkaConsumer and closes the sse client or http response.
* Disconnects the KafkaConsumer and closes the sse client and/or http response.
* If disconnect() has already been called, this does nothing.
*
* @param {string} reason Reason disconnect was called, used for logging.
* @return {Promise} Resolved if disconnect was successful, rejected if errored.
*/
disconnect(error) {
let pKafka = P.resolve();
let pReq = P.resolve();
disconnect(reason) {
reason = reason || 'unknown';
// If disconnect has already been called once, do nothing.
// This shouldn't really happen, as the HTTP end listeners will be removed.
if (this.is_finished) {
this.log.debug(`KafkaSSE disconnect() has already been called. Doing nothing in response to: ${reason}`);
return P.resolve();

@@ -824,20 +894,27 @@ }

this.log.info('Disconnecting.');
if (error) {
this._error(error, 'warn');
}
this.log.info(`KafkaSSE disconnecting due to: ${reason}`);
return P.resolve().then(() => {
// first, deal with the response
// Remove other HTTP disconnect handlers to prevent disconnect from being fired multiple times.
this.removeHttpEndListeners();
const disconnectHttpPromise = P.resolve().then(() => {
// 3 cases:
// - Usually we are disconnecting an active SSEResponse, so just
// end the HTTP response via this.sse.end.
// - If no SSEResponse is active, just end this.res.
// - Else this.res is finished but we still have a reference to it
// so just delete this.res.
if (this.sse) {
// If we still have SSE, then end the SSEResponse.
// SSEResponse will handle ending thee HTTP Response itself.
const sse = this.sse;
delete this.sse;
delete this.res;
return sse.end()
.catch((e) => this._error(e, 'warn'))
.then(() => {
this.log.info('Closed SSE response.');
});
}
if (!this._resFinished()) {
.then(() => this.log.debug('KafkaSSE disconnect: Ended SSE (HTTP) response.'));
} else if (!this._resFinished()) {
// Else if for disconnect was called and we don't have an SSEResponse,
// (likely because the SSEResponse wasn't ever started), then just
// end the HTTP response here.
return new P((resolve, reject) => {

@@ -849,33 +926,44 @@ const res = this.res;

}
delete this.res;
res.on('error', reject);
res.once('error', reject);
try {
if (!res.end(resolve)) {
resolve();
}
res.end();
resolve();
} catch(e) {
reject(e);
}
}).catch((e) => this._error(e, 'warn'))
.then(() => {
this.log.info('Closed HTTP request.');
});
}
if (this.res) {
})
.then(() => this.log.debug('KafkaSSE disconnect: Ended HTTP response.'));
} else if (this.res) {
// Else the HTTP Response has already been ended,
// so just delete our reference to it.
delete this.res;
this.log.info('Closed HTTP request.');
this.log.debug('KafkaSSE disconnect: Deleted HTTP response.');
return P.resolve();
}
}).then(() => {
// now close the kafka consumer
});
const disconnectKafkaPromise = P.resolve().then(() => {
if (!this.kafkaConsumer) {
this.log.debug("KafkaSSE disconnect: Kafka consumer already deleted, doing nothing.");
return P.resolve();
}
const kC = this.kafkaConsumer;
const kafkaConsumer = this.kafkaConsumer;
delete this.kafkaConsumer;
return P.try(() => kC.disconnect())
.catch((e) => this._error(e, 'warn'))
.then(() => {
this.log.info('Closed the Kafka consumer.');
});
}).then(() => this._eventEmitter.emit('done'));
return kafkaConsumer.disconnectAsync()
.then(() => this.log.debug("KafkaSSE disconnect: Disconnected the Kafka consumer."));
});
// Emit 'done' when both HTTP response and Kafka are disconnected.
return P.all([disconnectHttpPromise, disconnectKafkaPromise])
.catch((e) => {
this.log.error({err: e}, 'KafkaSSE disconnect: encountered error');
})
.finally(() => {
this.log.debug('KafkaSSE disconnect: finished.')
return this._eventEmitter.emit('done')
});
}

@@ -882,0 +970,0 @@ }

@@ -109,25 +109,27 @@ 'use strict';

// If we don't want to format the response as event-stream SSE,
// Just send the data now and return.
if (this.disableSSEFormatting) {
return p.then(() => this._sendLines(data + '\n'));
}
// If we don't want to format the response as event-stream SSE,
// Just write data as a single line and return.
return p.then(() => this._write(data + '\n'));
} else {
// Else format as an SSE event.
id = this.serialize(id);
// Set the event headers (event name, retry, id)
if (event) {
raw.push(`event: ${event}`);
id = this.serialize(id);
// Set the event headers (event name, retry, id)
if (event) {
raw.push(`event: ${event}`);
}
if (retry) {
raw.push(`retry: ${retry}`);
}
if (id) {
raw.push(`id: ${id}`);
}
// Set each event data line.
raw = raw.concat(data.split(/\n/).map(line => `data: ${line}`)).map(line => `${line}\n`);
// send the event
raw[raw.length - 1] = `${raw[raw.length - 1]}\n`;
return p.then(() => this._sendLines(raw));
}
if (retry) {
raw.push(`retry: ${retry}`);
}
if (id) {
raw.push(`id: ${id}`);
}
// Set each event data line.
raw = raw.concat(data.split(/\n/).map(line => `data: ${line}`)).map(line => `${line}\n`);
// send the event
raw[raw.length - 1] = `${raw[raw.length - 1]}\n`;
return p.then(() => this._sendLines(raw));
}

@@ -137,3 +139,3 @@

/**
* Ends the response.
* Ends the HTTP response.
*/

@@ -144,2 +146,3 @@ end() {

}
if (this._resFinished()) {

@@ -149,14 +152,11 @@ delete this.res;

}
const res = this.res;
delete this.res;
return new P((resolve, reject) => {
const res = this.res;
if (this._resFinished()) {
delete this.res;
return resolve();
}
delete this.res;
res.on('error', reject);
res.once('error', reject);
try {
if (res.end(resolve) === false) {
resolve();
}
res.end();
resolve();
} catch(e) {

@@ -197,5 +197,10 @@ reject(e);

return new P((resolve, reject) => {
if (this._resFinished()) {
return reject('Cannot write SSE event: the response is already finished.');
}
const res = this.res;
let drainAdded = false;
let ret = true;
const removeListeners = () => {

@@ -208,18 +213,20 @@ if (drainAdded) {

};
const endOk = () => {
removeListeners();
if (this._resFinished()) {
return reject('Cannot write events after closing the response!');
return reject('Cannot write SSE event: the response has finished normally.');
}
resolve();
};
const endErr = (e) => {
removeListeners();
this.log.error({err: e}, 'Got HTTP response error while writing SSE event.');
reject(e);
};
if (this._resFinished()) {
return reject('Cannot write events after closing the response!');
}
res.on('error', endErr);
res.on('prefinish', endOk);
try {

@@ -229,8 +236,6 @@ ret = res.write(data);

removeListeners();
this.log.error({err: e}, 'Caught error while writing SSE event.')
return reject(e);
}
if (this._resFinished()) {
removeListeners();
return reject('Cannot write events after closing the response!');
}
if (ret === false) {

@@ -237,0 +242,0 @@ drainAdded = true;

@@ -236,3 +236,3 @@ 'use strict';

/**
* Given a a Kafka assignemnts array, this will look for any occurances
* Given a a Kafka assignments array, this will look for any occurances
* of 'timestamp' instead of 'offset'. For those found, it will issue

@@ -239,0 +239,0 @@ * a offsetsForTimes request on the kafkaConsumer. The returned

{
"name": "kafka-sse",
"version": "0.4.0",
"version": "0.4.2",
"description": "KafkaSSE - Kafka Consumer to HTTP SSE/EventSource",

@@ -22,3 +22,3 @@ "main": "index.js",

"type": "git",
"url": "https://phabricator.wikimedia.org/diffusion/WKSE/kafkasse.git"
"url": "https://github.com/wikimedia/KafkaSSE.git"
},

@@ -35,5 +35,5 @@ "keywords": [

"bugs": {
"url": "https://phabricator.wikimedia.org/search/query/fpxAPkMeWqjh/"
"url": "https://github.com/wikimedia/KafkaSSE/issues"
},
"homepage": "https://github.com/wikimedia/kafkasse#readme",
"homepage": "https://github.com/wikimedia/KafkaSSE#readme",
"dependencies": {

@@ -40,0 +40,0 @@ "bluebird": "^3.5.1",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc