elastic-apm-http-client
Advanced tools
Comparing version 9.6.0 to 9.7.0
# elastic-apm-http-client changelog | ||
## v9.7.0 | ||
- A number of changes were made to fix issues with the APM agent under heavy | ||
load and with a slow or non-responsive APM server. | ||
([#144](https://github.com/elastic/apm-nodejs-http-client/pull/144)) | ||
1. A new `maxQueueSize` config option is added (default 1024 for now) to | ||
control how many events (transactions, spans, errors, metricsets) | ||
will be queued before being dropped if events are incoming faster | ||
than can be sent to APM server. This ensures the APM agent memory usage | ||
does not grow unbounded. | ||
2. JSON encoding of events (when uncorking) is done in limited size | ||
batches to control the amount of single chunk CPU eventloop blocking | ||
time. (See MAX_WRITE_BATCH_SIZE in Client._writev.) Internal stats | ||
are collected to watch for long(est) batch processing times. | ||
3. The handling of individual requests to the APM Server intake API has | ||
be rewritten to handle some error cases -- especially from a | ||
non-responsive APM server -- and to ensure that only one intake | ||
request is being performed at a time. Two new config options -- | ||
`intakeResTimeout` and `intakeResTimeoutOnEnd` -- have been added to | ||
allow fine control over some parts of this handling. See the comment on | ||
`makeIntakeRequest` for the best overview. | ||
4. Support for backoff on intake API requests has been implemented per | ||
https://github.com/elastic/apm/blob/master/specs/agents/transport.md#transport-errors | ||
- Started testing against node v15 in preparation for supporting the coming | ||
node v16. | ||
## v9.6.0 | ||
@@ -4,0 +35,0 @@ |
561
index.js
'use strict' | ||
const assert = require('assert') | ||
const crypto = require('crypto') | ||
const fs = require('fs') | ||
const http = require('http') | ||
@@ -12,3 +15,2 @@ const https = require('https') | ||
const getContainerInfo = require('./lib/container-info') | ||
const pump = require('pump') | ||
const eos = require('end-of-stream') | ||
@@ -19,2 +21,3 @@ const streamToBuffer = require('fast-stream-to-buffer') | ||
const ndjson = require('./lib/ndjson') | ||
const { NoopLogger } = require('./lib/logging') | ||
const truncate = require('./lib/truncate') | ||
@@ -36,4 +39,2 @@ const pkg = require('./package') | ||
const node8 = process.version.indexOf('v8.') === 0 | ||
// All sockets on the agent are unreffed when they are created. This means that | ||
@@ -44,6 +45,10 @@ // when those are the only handles left, the `beforeExit` event will be | ||
// timeout happens. | ||
const clients = [] | ||
const clientsToAutoEnd = [] | ||
process.once('beforeExit', function () { | ||
clients.forEach(function (client) { | ||
if (!client) return // clients remove them selfs from the array when they end | ||
clientsToAutoEnd.forEach(function (client) { | ||
if (!client) { | ||
// Clients remove themselves from the array when they end. | ||
return | ||
} | ||
client._log.trace('auto-end client beforeExit') | ||
client.end() | ||
@@ -69,4 +74,2 @@ }) | ||
this._corkTimer = null | ||
this._received = 0 // number of events given to the client for reporting | ||
this.sent = 0 // number of events written to the socket | ||
this._agent = null | ||
@@ -78,4 +81,20 @@ this._active = false | ||
this._encodedMetadata = null | ||
this._backoffReconnectCount = 0 | ||
// Internal runtime stats for developer debugging/tuning. | ||
this._numEvents = 0 // number of events given to the client | ||
this._numEventsDropped = 0 // number of events dropped because overloaded | ||
this._numEventsEnqueued = 0 // number of events written through to chopper | ||
this.sent = 0 // number of events sent to APM server (not necessarily accepted) | ||
this._slowWriteBatch = { // data on slow or the slowest _writeBatch | ||
numOver10Ms: 0, | ||
// Data for the slowest _writeBatch: | ||
encodeTimeMs: 0, | ||
fullTimeMs: 0, | ||
numEvents: 0, | ||
numBytes: 0 | ||
} | ||
this.config(opts) | ||
this._log = this._conf.logger || new NoopLogger() | ||
@@ -101,5 +120,2 @@ // start stream in corked mode, uncork when cloud | ||
const errorproxy = (err) => { | ||
if (this.destroyed === false) this.emit('request-error', err) | ||
} | ||
this._chopper = new StreamChopper({ | ||
@@ -112,4 +128,12 @@ size: this._conf.size, | ||
} | ||
}).on('stream', onStream(this, errorproxy)) | ||
}) | ||
const onIntakeError = (err) => { | ||
if (this.destroyed === false) { | ||
this.emit('request-error', err) | ||
} | ||
} | ||
this._chopper.on('stream', getChoppedStreamHandler(this, onIntakeError)) | ||
// We don't expect the chopper stream to end until the client is ending. | ||
// Make sure to clean up if this does happen unexpectedly. | ||
const fail = () => { | ||
@@ -120,4 +144,4 @@ if (this._writableState.ending === false) this.destroy() | ||
this._index = clients.length | ||
clients.push(this) | ||
this._index = clientsToAutoEnd.length | ||
clientsToAutoEnd.push(this) | ||
@@ -129,2 +153,14 @@ if (this._conf.centralConfig) { | ||
// Return current internal stats. | ||
Client.prototype._getStats = function () { | ||
return { | ||
numEvents: this._numEvents, | ||
numEventsDropped: this._numEventsDropped, | ||
numEventsEnqueued: this._numEventsEnqueued, | ||
numEventsSent: this.sent, | ||
slowWriteBatch: this._slowWriteBatch, | ||
backoffReconnectCount: this._backoffReconnectCount | ||
} | ||
} | ||
Client.prototype.config = function (opts) { | ||
@@ -152,2 +188,5 @@ this._conf = Object.assign(this._conf || {}, opts) | ||
if (!this._conf.bufferWindowSize) this._conf.bufferWindowSize = 50 | ||
if (!this._conf.maxQueueSize) this._conf.maxQueueSize = 1024 | ||
if (!this._conf.intakeResTimeout) this._conf.intakeResTimeout = 10000 | ||
if (!this._conf.intakeResTimeoutOnEnd) this._conf.intakeResTimeoutOnEnd = 1000 | ||
this._conf.keepAlive = this._conf.keepAlive !== false | ||
@@ -302,4 +341,11 @@ this._conf.centralConfig = this._conf.centralConfig || false | ||
} else { | ||
this._received++ | ||
this._chopper.write(this._encode(obj, enc), cb) | ||
const t = process.hrtime() | ||
const chunk = this._encode(obj, enc) | ||
this._numEventsEnqueued++ | ||
this._chopper.write(chunk, cb) | ||
this._log.trace({ | ||
fullTimeMs: deltaMs(t), | ||
numEvents: 1, | ||
numBytes: chunk.length | ||
}, '_write: encode object') | ||
} | ||
@@ -309,9 +355,21 @@ } | ||
Client.prototype._writev = function (objs, cb) { | ||
// Limit the size of individual writes to manageable batches, primarily to | ||
// limit large sync pauses due to `_encode`ing in `_writeBatch`. This value | ||
// is not particularly well tuned. It was selected to get sync pauses under | ||
// 10ms on a developer machine. | ||
const MAX_WRITE_BATCH_SIZE = 32 | ||
let offset = 0 | ||
const processBatch = () => { | ||
let index = -1 | ||
for (let i = offset; i < objs.length; i++) { | ||
if (this.destroyed) { | ||
cb() | ||
return | ||
} | ||
let flushIdx = -1 | ||
const limit = Math.min(objs.length, offset + MAX_WRITE_BATCH_SIZE) | ||
for (let i = offset; i < limit; i++) { | ||
if (objs[i].chunk === flush) { | ||
index = i | ||
flushIdx = i | ||
break | ||
@@ -321,15 +379,16 @@ } | ||
if (offset === 0 && index === -1) { | ||
// normally there's no flush object queued, so here's a shortcut that just | ||
// skips all the complicated splitting logic | ||
this._writevCleaned(objs, cb) | ||
} else if (index === -1) { | ||
// no more flush elements in the queue, just write the rest | ||
this._writevCleaned(objs.slice(offset), cb) | ||
} else if (index > offset) { | ||
// there's a few items in the queue before we need to flush, let's first write those | ||
this._writevCleaned(objs.slice(offset, index), processBatch) | ||
offset = index | ||
} else if (index === objs.length - 1) { | ||
// the last item in the queue is a flush | ||
if (offset === 0 && flushIdx === -1 && objs.length <= MAX_WRITE_BATCH_SIZE) { | ||
// A shortcut if there is no `flush` and the whole `objs` fits in a batch. | ||
this._writeBatch(objs, cb) | ||
} else if (flushIdx === -1) { | ||
// No `flush` in this batch. | ||
this._writeBatch(objs.slice(offset, limit), | ||
limit === objs.length ? cb : processBatch) | ||
offset = limit | ||
} else if (flushIdx > offset) { | ||
// There are some events in the queue before a `flush`. | ||
this._writeBatch(objs.slice(offset, flushIdx), processBatch) | ||
offset = flushIdx | ||
} else if (flushIdx === objs.length - 1) { | ||
// The next item is a flush, and it is the *last* item in the queue. | ||
this._writeFlush(cb) | ||
@@ -350,10 +409,32 @@ } else { | ||
Client.prototype._writevCleaned = function (objs, cb) { | ||
// Write a batch of events (excluding specially handled "flush" events) to | ||
// the stream chopper. | ||
Client.prototype._writeBatch = function (objs, cb) { | ||
const t = process.hrtime() | ||
const chunk = objs.map(encodeObject.bind(this)).join('') | ||
const encodeTimeMs = deltaMs(t) | ||
this._received += objs.length | ||
this._numEventsEnqueued += objs.length | ||
this._chopper.write(chunk, cb) | ||
const fullTimeMs = deltaMs(t) | ||
if (fullTimeMs > this._slowWriteBatch.fullTimeMs) { | ||
this._slowWriteBatch.encodeTimeMs = encodeTimeMs | ||
this._slowWriteBatch.fullTimeMs = fullTimeMs | ||
this._slowWriteBatch.numEvents = objs.length | ||
this._slowWriteBatch.numBytes = chunk.length | ||
} | ||
if (fullTimeMs > 10) { | ||
this._slowWriteBatch.numOver10Ms++ | ||
} | ||
this._log.trace({ | ||
encodeTimeMs: encodeTimeMs, | ||
fullTimeMs: fullTimeMs, | ||
numEvents: objs.length, | ||
numBytes: chunk.length | ||
}, '_writeBatch') | ||
} | ||
Client.prototype._writeFlush = function (cb) { | ||
this._log.trace({ active: this._active }, '_writeFlush') | ||
if (this._active) { | ||
@@ -395,3 +476,5 @@ this._onflushed = cb | ||
process.nextTick(() => { | ||
if (this.destroyed === false) this.uncork() | ||
if (this.destroyed === false) { | ||
this.uncork() | ||
} | ||
}) | ||
@@ -436,4 +519,13 @@ | ||
Client.prototype._shouldDropEvent = function () { | ||
this._numEvents++ | ||
const shouldDrop = this._writableState.length >= this._conf.maxQueueSize | ||
if (shouldDrop) { | ||
this._numEventsDropped++ | ||
} | ||
return shouldDrop | ||
} | ||
Client.prototype.sendSpan = function (span, cb) { | ||
if (this._isUnsafeToWrite()) { | ||
if (this._isUnsafeToWrite() || this._shouldDropEvent()) { | ||
return | ||
@@ -446,3 +538,3 @@ } | ||
Client.prototype.sendTransaction = function (transaction, cb) { | ||
if (this._isUnsafeToWrite()) { | ||
if (this._isUnsafeToWrite() || this._shouldDropEvent()) { | ||
return | ||
@@ -455,3 +547,3 @@ } | ||
Client.prototype.sendError = function (error, cb) { | ||
if (this._isUnsafeToWrite()) { | ||
if (this._isUnsafeToWrite() || this._shouldDropEvent()) { | ||
return | ||
@@ -464,3 +556,3 @@ } | ||
Client.prototype.sendMetricSet = function (metricset, cb) { | ||
if (this._isUnsafeToWrite()) { | ||
if (this._isUnsafeToWrite() || this._shouldDropEvent()) { | ||
return | ||
@@ -483,2 +575,3 @@ } | ||
Client.prototype._final = function (cb) { | ||
this._log.trace('_final') | ||
if (this._configTimer) { | ||
@@ -488,3 +581,3 @@ clearTimeout(this._configTimer) | ||
} | ||
clients[this._index] = null // remove global reference to ease garbage collection | ||
clientsToAutoEnd[this._index] = null // remove global reference to ease garbage collection | ||
this._ref() | ||
@@ -496,2 +589,3 @@ this._chopper.end() | ||
Client.prototype._destroy = function (err, cb) { | ||
this._log.trace({ err }, '_destroy') | ||
if (this._configTimer) { | ||
@@ -505,3 +599,3 @@ clearTimeout(this._configTimer) | ||
} | ||
clients[this._index] = null // remove global reference to ease garbage collection | ||
clientsToAutoEnd[this._index] = null // remove global reference to ease garbage collection | ||
this._chopper.destroy() | ||
@@ -512,72 +606,132 @@ this._agent.destroy() | ||
function onStream (client, onerror) { | ||
return function (stream, next) { | ||
const onerrorproxy = (err) => { | ||
stream.removeListener('error', onerrorproxy) | ||
req.removeListener('error', onerrorproxy) | ||
destroyStream(stream) | ||
onerror(err) | ||
} | ||
// Return the appropriate backoff delay (in milliseconds) before a next possible | ||
// request to APM server. | ||
// Spec: https://github.com/elastic/apm/blob/master/specs/agents/transport.md#transport-errors | ||
Client.prototype._getBackoffDelay = function (isErr) { | ||
let reconnectCount = this._backoffReconnectCount | ||
if (isErr) { | ||
this._backoffReconnectCount++ | ||
} else { | ||
this._backoffReconnectCount = 0 | ||
reconnectCount = 0 | ||
} | ||
// min(reconnectCount++, 6) ** 2 ± 10% | ||
const delayS = Math.pow(Math.min(reconnectCount, 6), 2) | ||
const jitterS = delayS * (0.2 * Math.random() - 0.1) | ||
const delayMs = (delayS + jitterS) * 1000 | ||
return delayMs | ||
} | ||
function getChoppedStreamHandler (client, onerror) { | ||
// Make a request to the apm-server intake API. | ||
// https://www.elastic.co/guide/en/apm/server/current/events-api.html | ||
// | ||
// In normal operation this works as follows: | ||
// - The StreamChopper (`this._chopper`) calls this function with a newly | ||
// created Gzip stream, to which it writes encoded event data. | ||
// - It `gzipStream.end()`s the stream when: | ||
// (a) approximately `apiRequestSize` of data have been written, | ||
// (b) `apiRequestTime` seconds have passed, or | ||
// (c) `_chopper.chop()` is explicitly called via `client.flush()`, | ||
// e.g. used by the Node.js APM agent after `client.sendError()`. | ||
// - This function makes the HTTP POST to the apm-server, pipes the gzipStream | ||
// to it, and waits for the completion of the request and the apm-server | ||
// response. | ||
// - Then it calls the given `next` callback to signal StreamChopper that | ||
// another chopped stream can be created, when there is more the send. | ||
// | ||
// Of course, things can go wrong. Here are the known ways this pipeline can | ||
// conclude. | ||
// - intake response success - A successful response from the APM server. This | ||
// is the normal operation case described above. | ||
// - gzipStream error - An "error" event on the gzip stream. | ||
// - intake request error - An "error" event on the intake HTTP request, e.g. | ||
// ECONNREFUSED or ECONNRESET. | ||
// - intakeResTimeout - A timer started *after* we are finished sending data | ||
// to the APM server by which we require a response (including its body). By | ||
// default this is 10s -- a very long time to allow for a slow or far | ||
// apm-server. If we hit this, APM server is problematic anyway, so the | ||
// delay doesn't add to the problems. | ||
// - serverTimeout - An idle timeout value (default 30s) set on the socket. | ||
// This is a catch-all fallback for an otherwised wedged connection. If this | ||
// is being hit, there is some major issue in the application (possibly a | ||
// bug in the APM agent). | ||
// - process completion - The Client takes pains to always `.unref()` its | ||
// handles to never keep a using process open if it is ready to exit. When | ||
// the process is ready to exit, the following happens: | ||
// - The "beforeExit" handler above will call `client.end()`, | ||
// - which calls `client._ref()` (to *hold the process open* to complete | ||
// this request), then `_chopper.end()` to end the `gzipStream` so | ||
// this request can complete soon. | ||
// - We then expect this request to complete quickly and the process will | ||
// then finish exiting. A subtlety is if the APM server is not responding | ||
// then we'll wait on `intakeResTimeoutOnEnd` (by default 1s). | ||
return function makeIntakeRequest (gzipStream, next) { | ||
const reqId = crypto.randomBytes(16).toString('hex') | ||
const log = client._log.child({ reqId }) | ||
const startTime = process.hrtime() | ||
const timeline = [] | ||
let bytesWritten = 0 | ||
let intakeRes | ||
let intakeResTimer = null | ||
const intakeResTimeout = client._conf.intakeResTimeout | ||
const intakeResTimeoutOnEnd = client._conf.intakeResTimeoutOnEnd | ||
// `_active` is used to coordinate the callback to `client.flush(db)`. | ||
client._active = true | ||
const req = client._transport.request(client._conf.requestIntake, onResult(onerror)) | ||
// Handle conclusion of this intake request. Each "part" is expected to call | ||
// `completePart()` at least once -- multiple calls are okay for cases like | ||
// the "error" and "close" events on a stream being called. When a part | ||
// errors or all parts are completed, then we can conclude. | ||
let concluded = false | ||
const completedFromPart = { | ||
gzipStream: false, | ||
intakeReq: false, | ||
intakeRes: false | ||
} | ||
let numToComplete = Object.keys(completedFromPart).length | ||
const completePart = (part, err) => { | ||
log.trace({ err, concluded }, 'completePart %s', part) | ||
timeline.push([deltaMs(startTime), `completePart ${part}`, err && err.message]) | ||
assert(part in completedFromPart, `'${part}' is in completedFromPart`) | ||
// Abort the current request if the server responds prior to the request | ||
// being finished | ||
req.on('response', function (res) { | ||
if (!req.finished) { | ||
// In Node.js 8, the zlib stream will emit a 'zlib binding closed' | ||
// error when destroyed. Furthermore, the HTTP response will not emit | ||
// any data events after the request have been destroyed, so it becomes | ||
// impossible to see the error returned by the server if we abort the | ||
// request. So for Node.js 8, we'll work around this by closing the | ||
// stream gracefully. | ||
// | ||
// This results in the gzip buffer being flushed and a little more data | ||
// being sent to the APM Server, but it's better than not getting the | ||
// error body. | ||
if (node8) { | ||
stream.end() | ||
} else { | ||
destroyStream(stream) | ||
if (concluded) { | ||
return | ||
} | ||
// If this is the final part to complete, then we are ready to conclude. | ||
let allPartsCompleted = false | ||
if (!completedFromPart[part]) { | ||
completedFromPart[part] = true | ||
numToComplete-- | ||
if (numToComplete === 0) { | ||
allPartsCompleted = true | ||
} | ||
} | ||
}) | ||
if (!err && !allPartsCompleted) { | ||
return | ||
} | ||
// Mointor streams for errors so that we can make sure to destory the | ||
// output stream as soon as that occurs | ||
stream.on('error', onerrorproxy) | ||
req.on('error', onerrorproxy) | ||
// Conclude. | ||
concluded = true | ||
if (err) { | ||
// There was an error: clean up resources. | ||
req.on('socket', function (socket) { | ||
// Sockets will automatically be unreffed by the HTTP agent when they are | ||
// not in use by an HTTP request, but as we're keeping the HTTP request | ||
// open, we need to unref the socket manually | ||
socket.unref() | ||
}) | ||
// Note that in Node v8, destroying the gzip stream results in it | ||
// emitting an "error" event as follows. No harm, however. | ||
// Error: gzip stream error: zlib binding closed | ||
// at Gzip._transform (zlib.js:369:15) | ||
// ... | ||
destroyStream(gzipStream) | ||
intakeReq.destroy() | ||
if (intakeResTimer) { | ||
log.trace('cancel intakeResTimer') | ||
clearTimeout(intakeResTimer) | ||
intakeResTimer = null | ||
} | ||
} | ||
if (Number.isFinite(client._conf.serverTimeout)) { | ||
req.setTimeout(client._conf.serverTimeout, function () { | ||
req.destroy(new Error(`APM Server response timeout (${client._conf.serverTimeout}ms)`)) | ||
}) | ||
} | ||
pump(stream, req, function () { | ||
// This function is technically called with an error, but because we | ||
// manually attach error listeners on all the streams in the pipeline | ||
// above, we can safely ignore it. | ||
// | ||
// We do this for two reasons: | ||
// | ||
// 1) This callback might be called a few ticks too late, in which case a | ||
// race condition could occur where the user would write to the output | ||
// stream before the rest of the system discovered that it was | ||
// unwritable | ||
// | ||
// 2) The error might occur post the end of the stream. In that case we | ||
// would not get it here as the internal error listener would have | ||
// been removed and the stream would throw the error instead | ||
client.sent = client._received | ||
client.sent = client._numEventsEnqueued | ||
client._active = false | ||
@@ -589,32 +743,160 @@ if (client._onflushed) { | ||
next() | ||
const backoffDelayMs = client._getBackoffDelay(!!err) | ||
if (err) { | ||
log.trace({ timeline, bytesWritten, backoffDelayMs, err }, | ||
'conclude intake request: error') | ||
onerror(err) | ||
} else { | ||
log.trace({ timeline, bytesWritten, backoffDelayMs }, | ||
'conclude intake request: success') | ||
} | ||
if (backoffDelayMs > 0) { | ||
setTimeout(next, backoffDelayMs).unref() | ||
} else { | ||
setImmediate(next) | ||
} | ||
} | ||
// Start the request and set its timeout. | ||
const intakeReq = client._transport.request(client._conf.requestIntake) | ||
if (Number.isFinite(client._conf.serverTimeout)) { | ||
intakeReq.setTimeout(client._conf.serverTimeout) | ||
} | ||
// TODO: log intakeReq and intakeRes when | ||
// https://github.com/elastic/ecs-logging-nodejs/issues/67 is implemented. | ||
log.trace('intake request start') | ||
// Handle events on the intake request. | ||
// https://nodejs.org/api/http.html#http_http_request_options_callback docs | ||
// emitted events on the req and res objects for different scenarios. | ||
intakeReq.on('timeout', () => { | ||
log.trace('intakeReq "timeout"') | ||
// `.destroy(err)` will result in an "error" event. | ||
intakeReq.destroy(new Error(`APM Server response timeout (${client._conf.serverTimeout}ms)`)) | ||
}) | ||
// Only intended for local debugging | ||
if (client._conf.payloadLogFile) { | ||
if (!client._payloadLogFile) { | ||
client._payloadLogFile = require('fs').createWriteStream(client._conf.payloadLogFile, { flags: 'a' }) | ||
intakeReq.on('socket', function (socket) { | ||
// Unref the socket for this request so that the Client does not keep | ||
// the node process running if it otherwise would be done. (This is | ||
// tested by the "unref-client" test in test/side-effects.js.) | ||
// | ||
// The HTTP keep-alive agent will unref sockets when unused, and ref them | ||
// during a request. Given that the normal makeIntakeRequest behaviour | ||
// is to keep a request open for up to 10s (`apiRequestTimeout`), we must | ||
// manually unref the socket. | ||
log.trace('intakeReq "socket": unref it') | ||
socket.unref() | ||
}) | ||
intakeReq.on('response', (intakeRes_) => { | ||
intakeRes = intakeRes_ | ||
log.trace({ statusCode: intakeRes.statusCode, reqFinished: intakeReq.finished }, | ||
'intakeReq "response"') | ||
let err | ||
const chunks = [] | ||
if (!intakeReq.finished) { | ||
// Premature response from APM server. Typically this is for errors | ||
// like "queue is full", for which the response body will be parsed | ||
// below. However, set an `err` as a fallback for the unexpected case | ||
// that is with a 2xx response. | ||
if (intakeRes.statusCode >= 200 && intakeRes.statusCode < 300) { | ||
err = new Error(`premature apm-server response with statusCode=${intakeRes.statusCode}`) | ||
} | ||
// There is no point (though no harm) in sending more data to the APM | ||
// server. In case reading the error response body takes a while, pause | ||
// the gzip stream until it is destroyed in `completePart()`. | ||
gzipStream.pause() | ||
} | ||
// Manually write to the file instead of using pipe/pump so that the file | ||
// handle isn't closed when the stream ends | ||
stream.pipe(zlib.createGunzip()).on('data', function (chunk) { | ||
client._payloadLogFile.write(chunk) | ||
// Handle events on the intake response. | ||
intakeRes.on('error', (intakeResErr) => { | ||
// I am not aware of a way to get an "error" event on the | ||
// IncomingMessage (see also https://stackoverflow.com/q/53691119), but | ||
// handling it here is preferable to an uncaughtException. | ||
intakeResErr = wrapError(intakeResErr, 'intake response error event') | ||
completePart('intakeRes', intakeResErr) | ||
}) | ||
} | ||
intakeRes.on('data', (chunk) => { | ||
chunks.push(chunk) | ||
}) | ||
// intakeRes.on('close', () => { log.trace('intakeRes "close"') }) | ||
// intakeRes.on('aborted', () => { log.trace('intakeRes "aborted"') }) | ||
intakeRes.on('end', () => { | ||
log.trace('intakeRes "end"') | ||
if (intakeResTimer) { | ||
clearTimeout(intakeResTimer) | ||
intakeResTimer = null | ||
} | ||
if (intakeRes.statusCode < 200 || intakeRes.statusCode > 299) { | ||
err = processIntakeErrorResponse(intakeRes, Buffer.concat(chunks)) | ||
} | ||
completePart('intakeRes', err) | ||
}) | ||
}) | ||
// The _encodedMetadata property _should_ be set in the Client | ||
// constructor function after making a cloud metadata call. | ||
// | ||
// Since we cork data until the client._encodedMetadata is set the | ||
// following conditional should not be necessary. However, we'll | ||
// leave it in place out of a healthy sense of caution in case | ||
// something unsets _encodedMetadata or _encodedMetadata is somehow | ||
// never set. | ||
if (!client._encodedMetadata) { | ||
client._encodedMetadata = client._encode({ metadata: client._conf.metadata }, Client.encoding.METADATA) | ||
// intakeReq.on('abort', () => { log.trace('intakeReq "abort"') }) | ||
// intakeReq.on('close', () => { log.trace('intakeReq "close"') }) | ||
intakeReq.on('finish', () => { | ||
log.trace('intakeReq "finish"') | ||
completePart('intakeReq') | ||
}) | ||
intakeReq.on('error', (err) => { | ||
log.trace('intakeReq "error"') | ||
completePart('intakeReq', err) | ||
}) | ||
// Handle events on the gzip stream. | ||
gzipStream.on('data', (chunk) => { | ||
bytesWritten += chunk.length | ||
}) | ||
gzipStream.on('error', (gzipErr) => { | ||
log.trace('gzipStream "error"') | ||
gzipErr = wrapError(gzipErr, 'gzip stream error') | ||
completePart('gzipStream', gzipErr) | ||
}) | ||
gzipStream.on('finish', () => { | ||
// If the apm-server is not reading its input and the gzip data is large | ||
// enough to fill buffers, then the gzip stream will emit "finish", but | ||
// not "end". Therefore, this "finish" event is the best indicator that | ||
// the ball is now in the apm-server's court. | ||
// | ||
// We now start a timer waiting on the response, provided we still expect | ||
// one (we don't if the request has already errored out, e.g. | ||
// ECONNREFUSED) and it hasn't already completed (e.g. if it replied | ||
// quickly with "queue is full"). | ||
log.trace('gzipStream "finish"') | ||
if (!completedFromPart.intakeReq && !completedFromPart.intakeRes) { | ||
const timeout = client._writableState.ending ? intakeResTimeoutOnEnd : intakeResTimeout | ||
log.trace({ timeout }, 'start intakeResTimer') | ||
intakeResTimer = setTimeout(() => { | ||
completePart('intakeRes', | ||
new Error('intake response timeout: APM server did not respond ' + | ||
`within ${timeout / 1000}s of gzip stream finish`)) | ||
}, timeout).unref() | ||
} | ||
}) | ||
// Watch the gzip "end" event for its completion, because the "close" event | ||
// that we would prefer to use, *does not get emitted* for the | ||
// `client.sendSpan(callback) + client.flush()` test case with | ||
// *node v12-only*. | ||
gzipStream.on('end', () => { | ||
log.trace('gzipStream "end"') | ||
completePart('gzipStream') | ||
}) | ||
// gzipStream.on('close', () => { log.trace('gzipStream "close"') }) | ||
// Hook up writing data to a file (only intended for local debugging). | ||
// Append the intake data to `payloadLogFile`, if given. This is only | ||
// intended for local debugging because it can have a significant perf | ||
// impact. | ||
if (client._conf.payloadLogFile) { | ||
const payloadLogStream = fs.createWriteStream(client._conf.payloadLogFile, { flags: 'a' }) | ||
gzipStream.pipe(zlib.createGunzip()).pipe(payloadLogStream) | ||
} | ||
// All requests to the APM Server must start with a metadata object | ||
stream.write(client._encodedMetadata) | ||
// Send the metadata object (always first) and hook up the streams. | ||
assert(client._encodedMetadata, 'client._encodedMetadata is set') | ||
gzipStream.write(client._encodedMetadata) | ||
gzipStream.pipe(intakeReq) | ||
} | ||
@@ -654,11 +936,2 @@ } | ||
function onResult (onerror) { | ||
return streamToBuffer.onStream(function (err, buf, res) { | ||
if (err) return onerror(err) | ||
if (res.statusCode < 200 || res.statusCode > 299) { | ||
onerror(processIntakeErrorResponse(res, buf)) | ||
} | ||
}) | ||
} | ||
function getIntakeRequestOptions (opts, agent) { | ||
@@ -841,2 +1114,16 @@ const headers = getHeaders(opts) | ||
// Wrap the given Error object, including the given message. | ||
// | ||
// Dev Note: Various techniques exist to wrap `Error`s in node.js and JavaScript | ||
// to provide a cause chain, e.g. see | ||
// https://www.joyent.com/node-js/production/design/errors | ||
// However, I'm not aware of a de facto "winner". Eventually there may be | ||
// https://github.com/tc39/proposal-error-cause | ||
// For now we will simply prefix the existing error object's `message` property. | ||
// This is simple and preserves the root error `stack`. | ||
function wrapError (err, msg) { | ||
err.message = msg + ': ' + err.message | ||
return err | ||
} | ||
function processIntakeErrorResponse (res, buf) { | ||
@@ -848,2 +1135,3 @@ const err = new Error('Unexpected APM Server response') | ||
if (buf.length > 0) { | ||
// https://www.elastic.co/guide/en/apm/server/current/events-api.html#events-api-errors | ||
const body = buf.toString('utf8') | ||
@@ -910,1 +1198,8 @@ const contentType = res.headers['content-type'] | ||
} | ||
// Return the time difference (in milliseconds) between the given time `t` | ||
// (a 2-tuple as returned by `process.hrtime()`) and now. | ||
function deltaMs (t) { | ||
const d = process.hrtime(t) | ||
return d[0] * 1e3 + d[1] / 1e6 | ||
} |
{ | ||
"name": "elastic-apm-http-client", | ||
"version": "9.6.0", | ||
"version": "9.7.0", | ||
"description": "A low-level HTTP client for communicating with the Elastic APM intake API", | ||
@@ -12,5 +12,6 @@ "main": "index.js", | ||
], | ||
"// scripts.test": "quoting arg to tape to avoid too long argv, let tape do globbing", | ||
"scripts": { | ||
"coverage": "nyc report --reporter=text-lcov > coverage.lcov && codecov", | ||
"test": "standard && nyc tape test/*.js" | ||
"coverage": "nyc report --reporter=text-lcov > coverage.lcov && (codecov || echo 'warning: ignoring codecov failure')", | ||
"test": "standard && nyc tape \"test/*.js\"" | ||
}, | ||
@@ -28,3 +29,2 @@ "engines": { | ||
"fast-stream-to-buffer": "^1.0.0", | ||
"pump": "^3.0.0", | ||
"readable-stream": "^3.4.0", | ||
@@ -31,0 +31,0 @@ "stream-chopper": "^3.0.1", |
@@ -57,3 +57,4 @@ # elastic-apm-http-client | ||
- `options` - An object containing config options (see below) | ||
- `options` - An object containing config options (see below). All options | ||
are optional, except those marked "(required)". | ||
@@ -98,3 +99,3 @@ Data sent to the APM Server as part of the metadata package: | ||
than the `time` config option. That might result in healthy requests | ||
being aborted prematurely (default: `15000` ms) | ||
being aborted prematurely. (default: `15000` ms) | ||
- `keepAlive` - If set the `false` the client will not reuse sockets | ||
@@ -138,2 +139,21 @@ between requests (default: `true`) | ||
objects) | ||
- `maxQueueSize` - The maximum number of buffered events (transactions, | ||
spans, errors, metricsets). Events are buffered when the agent can't keep | ||
up with sending them to the APM Server or if the APM server is down. | ||
If the queue is full, events are rejected which means transactions, spans, | ||
etc. will be lost. This guards the application from consuming unbounded | ||
memory, possibly overusing CPU (spent on serializing events), and possibly | ||
crashing in case the APM server is unavailable for a long period of time. A | ||
lower value will decrease the heap overhead of the agent, while a higher | ||
value makes it less likely to lose events in case of a temporary spike in | ||
throughput. (default: 1024) | ||
- `intakeResTimeout` - The time (in milliseconds) by which a response from the | ||
[APM Server events intake API](https://www.elastic.co/guide/en/apm/server/current/events-api.html) | ||
is expected *after all the event data for that request has been sent*. This | ||
allows a smaller timeout than `serverTimeout` to handle an APM server that | ||
is accepting connections but is slow to respond. (default: `10000` ms) | ||
- `intakeResTimeoutOnEnd` - The same as `intakeResTimeout`, but used when | ||
the client has ended, hence for the possible last request to APM server. This | ||
is typically a lower value to not hang an ending process that is waiting for | ||
that APM server request to complete. (default: `1000` ms) | ||
@@ -157,5 +177,8 @@ Data sanitizing configuration: | ||
- `logger` - A [pino](https://getpino.io) logger to use for trace and | ||
debug-level logging. | ||
- `payloadLogFile` - Specify a file path to which a copy of all data | ||
sent to the APM Server should be written. The data will be in ndjson | ||
format and will be uncompressed | ||
format and will be uncompressed. Note that using this option can | ||
impact performance. | ||
@@ -225,5 +248,7 @@ ### Event: `config` | ||
An integer indicating the number of events (spans, transactions, or errors) | ||
sent by the client. An event is considered sent when the HTTP request | ||
used to transmit it have ended. | ||
An integer indicating the number of events (spans, transactions, errors, or | ||
metricsets) sent by the client. An event is considered sent when the HTTP | ||
request used to transmit it has ended. Note that errors in requests to APM | ||
server may mean this value is not the same as the number of events *accepted* | ||
by the APM server. | ||
@@ -230,0 +255,0 @@ ### `client.config(options)` |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
69032
8
9
1421
334
- Removedpump@^3.0.0
- Removedpump@3.0.2(transitive)