Comparing version 0.4.0 to 0.4.2
@@ -20,3 +20,2 @@ 'use strict'; | ||
/** | ||
@@ -205,2 +204,12 @@ * Represents a Kafka Consumer -> Server Side Events connection. | ||
// If we won't be responding to any rdkafka events, | ||
// there is no need to set up the rdkafka event emitter. | ||
// This is a workaround for a memory leak bug currently | ||
// in node-rdkafka: https://github.com/Blizzard/node-rdkafka/issues/731 | ||
// If you do set any kafkaEventHandlers, be aware you may encounter a | ||
// memory leak until this bug is fixed. | ||
if (!this.options.kafkaEventHandlers) { | ||
defaultKafkaConfig.event_cb = false; | ||
} | ||
// These configs MUST be set for a KafkaSSE KafkaConsumer; | ||
@@ -222,3 +231,2 @@ // they are not overridable. | ||
'group.id': `KafkaSSE-${this.id}` | ||
}; | ||
@@ -233,10 +241,50 @@ | ||
// Call this.disconnect() when the http client request ends. | ||
this.req.on('abort', () => this.disconnect()); | ||
this.req.on('aborted', () => this.disconnect()); | ||
this.req.on('close', () => this.disconnect()); | ||
this.req.on('error', (e) => this.disconnect(e)); | ||
this.res.on('finish', () => this.disconnect()); | ||
this.res.on('close', () => this.disconnect()); | ||
this.res.on('error', (e) => this.disconnect(e)); | ||
// Set up HTTP request and response end handlers. | ||
// We want to call this.disconnect() in response to any of these | ||
// events. We only need to call this.disconnect() the first time | ||
// one of these happens, so disconnect() will call | ||
// this.removeHttpEndListeners to remove all of these listeners | ||
// as soon as disconnect() is called. | ||
const onceReqAbort = () => this.disconnect('HTTP request abort'); | ||
const onceReqAborted = () => this.disconnect('HTTP request aborted'); | ||
const onceReqClose = () => this.disconnect('HTTP request close'); | ||
const onceReqError = (e) => { | ||
this.log.error( | ||
{err: e}, 'HTTP request encountered an error, calling KafkaSSE disconnect.' | ||
); | ||
this.disconnect('HTTP request error'); | ||
}; | ||
const onceResFinish = () => this.disconnect('HTTP response finish'); | ||
const onceResClose = () => this.disconnect('HTTP response close'); | ||
const onceResError = (e) => { | ||
this.log.error( | ||
{err: e}, 'HTTP response encountered an error, calling KafkaSSE disconnect.' | ||
); | ||
this.disconnect('HTTP response error'); | ||
}; | ||
this.req.once('abort', onceReqAbort); | ||
this.req.once('aborted', onceReqAborted); | ||
this.req.once('close', onceReqClose); | ||
this.req.once('error', onceReqError); | ||
this.res.once('finish', onceResFinish); | ||
this.res.once('close', onceResClose); | ||
this.res.once('error', onceResError); | ||
// Will be called theh first time disconnect() is called to keep it | ||
// from being called multiple times as the connection closes. | ||
this.removeHttpEndListeners = () => { | ||
this.log.debug('Removing all HTTP end listeners.'); | ||
if (this.req) { | ||
this.req.removeListener('abort', onceReqAbort); | ||
this.req.removeListener('aborted', onceReqAborted); | ||
this.req.removeListener('close', onceReqClose); | ||
this.req.removeListener('error', onceReqError); | ||
} | ||
if (this.res) { | ||
this.res.removeListener('finish', onceResFinish); | ||
this.res.removeListener('close', onceResClose); | ||
this.res.removeListener('error', onceResError); | ||
} | ||
}; | ||
} | ||
@@ -251,3 +299,3 @@ | ||
* Once _start is called, a 200 response header will be written (via | ||
* sseClient.initialize()), and any further errors must be reported to the | ||
* sse.start()), and any further errors must be reported to the | ||
* client by emitting an error SSE event. | ||
@@ -300,3 +348,8 @@ * | ||
// error message to the client | ||
.catch(() => {}); | ||
.catch((sseSendError) => { | ||
this.log.error( | ||
{err: sseSendError}, | ||
'Caught error while attempting to send SSE error event' | ||
); | ||
}); | ||
} | ||
@@ -316,8 +369,17 @@ }); | ||
}) | ||
// Close KafkaConsumer, and either this.sseClient or this.res http response. | ||
.finally(() => this.disconnect()); | ||
.finally(() => { | ||
// Just in case we get here for a reason other than the HTTP request closing | ||
// or error handling, make sure we disconnect everything properly, | ||
// especially the KafkaConsumer. | ||
if (!this.is_finished) { | ||
return this.disconnect('Finally finished consume loop and error handling'); | ||
} | ||
}); | ||
// wait for the `done` event to return | ||
// connect() will resolve after the KafkaSSE `done` event is fired by disconnect(). | ||
return new P((resolve, reject) => { | ||
this._eventEmitter.on('done', resolve); | ||
this._eventEmitter.on('done', () => { | ||
this.log.info('KafkaSSE connection done.'); | ||
resolve(); | ||
}); | ||
}); | ||
@@ -400,8 +462,6 @@ } | ||
); | ||
consumer.on(event, this.options.kafkaEventHandlers[event]); | ||
this.kafkaConsumer.on(event, this.options.kafkaEventHandlers[event]); | ||
}); | ||
} | ||
// Save our consumer. | ||
// this.kafkaConsumer = consumer; | ||
return this.kafkaConsumer; | ||
@@ -514,3 +574,3 @@ }) | ||
// Start the consume -> sse send loop. | ||
this.log.info('Initializing sseClient and starting consume loop.'); | ||
this.log.info('Initializing KafkaSSE connection and starting consume loop.'); | ||
@@ -531,3 +591,3 @@ const responseHeaders = {}; | ||
// Initialize the sse response and start sending | ||
// Initialize the SSEResponse and start sending | ||
// the response in chunked transfer encoding. | ||
@@ -556,6 +616,6 @@ this.sse = new SSEResponse(this.res, { | ||
.then((kafkaMessage) => { | ||
// If the request is finished (by calling this.disconnect()), | ||
// If the request is finished (something called this.disconnect()), | ||
// then exit the consume loop now by returning a resolved promise. | ||
if (this.is_finished || !this.sse || this._resFinished()) { | ||
this.log.info('Finished. Returning from consume loop.'); | ||
if (this.is_finished) { | ||
this.log.debug('KafkaSSE connection finished. Returning from consume loop.'); | ||
return P.resolve(); | ||
@@ -582,8 +642,6 @@ } | ||
return new P((resolve, reject) => { | ||
setTimeout(() => { | ||
if (this.is_finished || this._resFinished()) { | ||
return reject('Loop finished'); | ||
} | ||
resolve(Date.now()); | ||
}, 1); | ||
setTimeout( | ||
() => resolve(Date.now()), | ||
1 | ||
); | ||
}); | ||
@@ -640,4 +698,4 @@ } | ||
// don't try to consume anything. | ||
if (this.is_finished || this._resFinished()) { | ||
this.log.debug('Finished. Not attempting consume.'); | ||
if (this.is_finished) { | ||
this.log.debug('KafkaSSE connection finished. Not attempting consume.'); | ||
return P.resolve(); | ||
@@ -801,2 +859,9 @@ } | ||
/** | ||
* Checks if the HTTP response is finished. | ||
* Returns true if if this.res is undefined, or if the this.res.finished, | ||
* or if res.connection.destroyed. | ||
* | ||
* @return {boolean} | ||
*/ | ||
_resFinished() { | ||
@@ -810,11 +875,16 @@ const res = this.res; | ||
/** | ||
* Disconnects the KafkaConsumer and closes the sse client or http response. | ||
* Disconnects the KafkaConsumer and closes the sse client and/or http response. | ||
* If disconnect() has already been called, this does nothing. | ||
* | ||
* @param {string} reason Reason disconnect was called, used for logging. | ||
* @return {Promise} Resolved if disconnect was successful, rejected if errored. | ||
*/ | ||
disconnect(error) { | ||
let pKafka = P.resolve(); | ||
let pReq = P.resolve(); | ||
disconnect(reason) { | ||
reason = reason || 'unknown'; | ||
// If disconnect has already been called once, do nothing. | ||
// This shouldn't really happen, as the HTTP end listeners will be removed. | ||
if (this.is_finished) { | ||
this.log.debug(`KafkaSSE disconnect() has already been called. Doing nothing in response to: ${reason}`); | ||
return P.resolve(); | ||
@@ -824,20 +894,27 @@ } | ||
this.log.info('Disconnecting.'); | ||
if (error) { | ||
this._error(error, 'warn'); | ||
} | ||
this.log.info(`KafkaSSE disconnecting due to: ${reason}`); | ||
return P.resolve().then(() => { | ||
// first, deal with the response | ||
// Remove other HTTP disconnect handlers to prevent disconnect from being fired multiple times. | ||
this.removeHttpEndListeners(); | ||
const disconnectHttpPromise = P.resolve().then(() => { | ||
// 3 cases: | ||
// - Usually we are disconnecting an active SSEResponse, so just | ||
// end the HTTP response via this.sse.end. | ||
// - If no SSEResponse is active, just end this.res. | ||
// - Else this.res is finished but we still have a reference to it | ||
// so just delete this.res. | ||
if (this.sse) { | ||
// If we still have SSE, then end the SSEResponse. | ||
// SSEResponse will handle ending thee HTTP Response itself. | ||
const sse = this.sse; | ||
delete this.sse; | ||
delete this.res; | ||
return sse.end() | ||
.catch((e) => this._error(e, 'warn')) | ||
.then(() => { | ||
this.log.info('Closed SSE response.'); | ||
}); | ||
} | ||
if (!this._resFinished()) { | ||
.then(() => this.log.debug('KafkaSSE disconnect: Ended SSE (HTTP) response.')); | ||
} else if (!this._resFinished()) { | ||
// Else if for disconnect was called and we don't have an SSEResponse, | ||
// (likely because the SSEResponse wasn't ever started), then just | ||
// end the HTTP response here. | ||
return new P((resolve, reject) => { | ||
@@ -849,33 +926,44 @@ const res = this.res; | ||
} | ||
delete this.res; | ||
res.on('error', reject); | ||
res.once('error', reject); | ||
try { | ||
if (!res.end(resolve)) { | ||
resolve(); | ||
} | ||
res.end(); | ||
resolve(); | ||
} catch(e) { | ||
reject(e); | ||
} | ||
}).catch((e) => this._error(e, 'warn')) | ||
.then(() => { | ||
this.log.info('Closed HTTP request.'); | ||
}); | ||
} | ||
if (this.res) { | ||
}) | ||
.then(() => this.log.debug('KafkaSSE disconnect: Ended HTTP response.')); | ||
} else if (this.res) { | ||
// Else the HTTP Response has already been ended, | ||
// so just delete our reference to it. | ||
delete this.res; | ||
this.log.info('Closed HTTP request.'); | ||
this.log.debug('KafkaSSE disconnect: Deleted HTTP response.'); | ||
return P.resolve(); | ||
} | ||
}).then(() => { | ||
// now close the kafka consumer | ||
}); | ||
const disconnectKafkaPromise = P.resolve().then(() => { | ||
if (!this.kafkaConsumer) { | ||
this.log.debug("KafkaSSE disconnect: Kafka consumer already deleted, doing nothing."); | ||
return P.resolve(); | ||
} | ||
const kC = this.kafkaConsumer; | ||
const kafkaConsumer = this.kafkaConsumer; | ||
delete this.kafkaConsumer; | ||
return P.try(() => kC.disconnect()) | ||
.catch((e) => this._error(e, 'warn')) | ||
.then(() => { | ||
this.log.info('Closed the Kafka consumer.'); | ||
}); | ||
}).then(() => this._eventEmitter.emit('done')); | ||
return kafkaConsumer.disconnectAsync() | ||
.then(() => this.log.debug("KafkaSSE disconnect: Disconnected the Kafka consumer.")); | ||
}); | ||
// Emit 'done' when both HTTP response and Kafka are disconnected. | ||
return P.all([disconnectHttpPromise, disconnectKafkaPromise]) | ||
.catch((e) => { | ||
this.log.error({err: e}, 'KafkaSSE disconnect: encountered error'); | ||
}) | ||
.finally(() => { | ||
this.log.debug('KafkaSSE disconnect: finished.') | ||
return this._eventEmitter.emit('done') | ||
}); | ||
} | ||
@@ -882,0 +970,0 @@ } |
@@ -109,25 +109,27 @@ 'use strict'; | ||
// If we don't want to format the response as event-stream SSE, | ||
// Just send the data now and return. | ||
if (this.disableSSEFormatting) { | ||
return p.then(() => this._sendLines(data + '\n')); | ||
} | ||
// If we don't want to format the response as event-stream SSE, | ||
// Just write data as a single line and return. | ||
return p.then(() => this._write(data + '\n')); | ||
} else { | ||
// Else format as an SSE event. | ||
id = this.serialize(id); | ||
// Set the event headers (event name, retry, id) | ||
if (event) { | ||
raw.push(`event: ${event}`); | ||
id = this.serialize(id); | ||
// Set the event headers (event name, retry, id) | ||
if (event) { | ||
raw.push(`event: ${event}`); | ||
} | ||
if (retry) { | ||
raw.push(`retry: ${retry}`); | ||
} | ||
if (id) { | ||
raw.push(`id: ${id}`); | ||
} | ||
// Set each event data line. | ||
raw = raw.concat(data.split(/\n/).map(line => `data: ${line}`)).map(line => `${line}\n`); | ||
// send the event | ||
raw[raw.length - 1] = `${raw[raw.length - 1]}\n`; | ||
return p.then(() => this._sendLines(raw)); | ||
} | ||
if (retry) { | ||
raw.push(`retry: ${retry}`); | ||
} | ||
if (id) { | ||
raw.push(`id: ${id}`); | ||
} | ||
// Set each event data line. | ||
raw = raw.concat(data.split(/\n/).map(line => `data: ${line}`)).map(line => `${line}\n`); | ||
// send the event | ||
raw[raw.length - 1] = `${raw[raw.length - 1]}\n`; | ||
return p.then(() => this._sendLines(raw)); | ||
} | ||
@@ -137,3 +139,3 @@ | ||
/** | ||
* Ends the response. | ||
* Ends the HTTP response. | ||
*/ | ||
@@ -144,2 +146,3 @@ end() { | ||
} | ||
if (this._resFinished()) { | ||
@@ -149,14 +152,11 @@ delete this.res; | ||
} | ||
const res = this.res; | ||
delete this.res; | ||
return new P((resolve, reject) => { | ||
const res = this.res; | ||
if (this._resFinished()) { | ||
delete this.res; | ||
return resolve(); | ||
} | ||
delete this.res; | ||
res.on('error', reject); | ||
res.once('error', reject); | ||
try { | ||
if (res.end(resolve) === false) { | ||
resolve(); | ||
} | ||
res.end(); | ||
resolve(); | ||
} catch(e) { | ||
@@ -197,5 +197,10 @@ reject(e); | ||
return new P((resolve, reject) => { | ||
if (this._resFinished()) { | ||
return reject('Cannot write SSE event: the response is already finished.'); | ||
} | ||
const res = this.res; | ||
let drainAdded = false; | ||
let ret = true; | ||
const removeListeners = () => { | ||
@@ -208,18 +213,20 @@ if (drainAdded) { | ||
}; | ||
const endOk = () => { | ||
removeListeners(); | ||
if (this._resFinished()) { | ||
return reject('Cannot write events after closing the response!'); | ||
return reject('Cannot write SSE event: the response has finished normally.'); | ||
} | ||
resolve(); | ||
}; | ||
const endErr = (e) => { | ||
removeListeners(); | ||
this.log.error({err: e}, 'Got HTTP response error while writing SSE event.'); | ||
reject(e); | ||
}; | ||
if (this._resFinished()) { | ||
return reject('Cannot write events after closing the response!'); | ||
} | ||
res.on('error', endErr); | ||
res.on('prefinish', endOk); | ||
try { | ||
@@ -229,8 +236,6 @@ ret = res.write(data); | ||
removeListeners(); | ||
this.log.error({err: e}, 'Caught error while writing SSE event.') | ||
return reject(e); | ||
} | ||
if (this._resFinished()) { | ||
removeListeners(); | ||
return reject('Cannot write events after closing the response!'); | ||
} | ||
if (ret === false) { | ||
@@ -237,0 +242,0 @@ drainAdded = true; |
@@ -236,3 +236,3 @@ 'use strict'; | ||
/** | ||
* Given a a Kafka assignemnts array, this will look for any occurances | ||
* Given a a Kafka assignments array, this will look for any occurances | ||
* of 'timestamp' instead of 'offset'. For those found, it will issue | ||
@@ -239,0 +239,0 @@ * a offsetsForTimes request on the kafkaConsumer. The returned |
{ | ||
"name": "kafka-sse", | ||
"version": "0.4.0", | ||
"version": "0.4.2", | ||
"description": "KafkaSSE - Kafka Consumer to HTTP SSE/EventSource", | ||
@@ -22,3 +22,3 @@ "main": "index.js", | ||
"type": "git", | ||
"url": "https://phabricator.wikimedia.org/diffusion/WKSE/kafkasse.git" | ||
"url": "https://github.com/wikimedia/KafkaSSE.git" | ||
}, | ||
@@ -35,5 +35,5 @@ "keywords": [ | ||
"bugs": { | ||
"url": "https://phabricator.wikimedia.org/search/query/fpxAPkMeWqjh/" | ||
"url": "https://github.com/wikimedia/KafkaSSE/issues" | ||
}, | ||
"homepage": "https://github.com/wikimedia/kafkasse#readme", | ||
"homepage": "https://github.com/wikimedia/KafkaSSE#readme", | ||
"dependencies": { | ||
@@ -40,0 +40,0 @@ "bluebird": "^3.5.1", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
145135
2645