Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

elastic-apm-http-client

Package Overview
Dependencies
Maintainers
3
Versions
62
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

elastic-apm-http-client - npm Package Compare versions

Comparing version 9.6.0 to 9.7.0

lib/logging.js

31

CHANGELOG.md
# elastic-apm-http-client changelog
## v9.7.0
- A number of changes were made to fix issues with the APM agent under heavy
load and with a slow or non-responsive APM server.
([#144](https://github.com/elastic/apm-nodejs-http-client/pull/144))
1. A new `maxQueueSize` config option is added (default 1024 for now) to
control how many events (transactions, spans, errors, metricsets)
will be queued before being dropped if events are incoming faster
than can be sent to APM server. This ensures the APM agent memory usage
does not grow unbounded.
2. JSON encoding of events (when uncorking) is done in limited size
batches to control the amount of single chunk CPU eventloop blocking
time. (See MAX_WRITE_BATCH_SIZE in Client._writev.) Internal stats
are collected to watch for long(est) batch processing times.
3. The handling of individual requests to the APM Server intake API has
be rewritten to handle some error cases -- especially from a
non-responsive APM server -- and to ensure that only one intake
request is being performed at a time. Two new config options --
`intakeResTimeout` and `intakeResTimeoutOnEnd` -- have been added to
allow fine control over some parts of this handling. See the comment on
`makeIntakeRequest` for the best overview.
4. Support for backoff on intake API requests has been implemented per
https://github.com/elastic/apm/blob/master/specs/agents/transport.md#transport-errors
- Started testing against node v15 in preparation for supporting the coming
node v16.
## v9.6.0

@@ -4,0 +35,0 @@

561

index.js
'use strict'
const assert = require('assert')
const crypto = require('crypto')
const fs = require('fs')
const http = require('http')

@@ -12,3 +15,2 @@ const https = require('https')

const getContainerInfo = require('./lib/container-info')
const pump = require('pump')
const eos = require('end-of-stream')

@@ -19,2 +21,3 @@ const streamToBuffer = require('fast-stream-to-buffer')

const ndjson = require('./lib/ndjson')
const { NoopLogger } = require('./lib/logging')
const truncate = require('./lib/truncate')

@@ -36,4 +39,2 @@ const pkg = require('./package')

const node8 = process.version.indexOf('v8.') === 0
// All sockets on the agent are unreffed when they are created. This means that

@@ -44,6 +45,10 @@ // when those are the only handles left, the `beforeExit` event will be

// timeout happens.
const clients = []
const clientsToAutoEnd = []
process.once('beforeExit', function () {
clients.forEach(function (client) {
if (!client) return // clients remove them selfs from the array when they end
clientsToAutoEnd.forEach(function (client) {
if (!client) {
// Clients remove themselves from the array when they end.
return
}
client._log.trace('auto-end client beforeExit')
client.end()

@@ -69,4 +74,2 @@ })

this._corkTimer = null
this._received = 0 // number of events given to the client for reporting
this.sent = 0 // number of events written to the socket
this._agent = null

@@ -78,4 +81,20 @@ this._active = false

this._encodedMetadata = null
this._backoffReconnectCount = 0
// Internal runtime stats for developer debugging/tuning.
this._numEvents = 0 // number of events given to the client
this._numEventsDropped = 0 // number of events dropped because overloaded
this._numEventsEnqueued = 0 // number of events written through to chopper
this.sent = 0 // number of events sent to APM server (not necessarily accepted)
this._slowWriteBatch = { // data on slow or the slowest _writeBatch
numOver10Ms: 0,
// Data for the slowest _writeBatch:
encodeTimeMs: 0,
fullTimeMs: 0,
numEvents: 0,
numBytes: 0
}
this.config(opts)
this._log = this._conf.logger || new NoopLogger()

@@ -101,5 +120,2 @@ // start stream in corked mode, uncork when cloud

const errorproxy = (err) => {
if (this.destroyed === false) this.emit('request-error', err)
}
this._chopper = new StreamChopper({

@@ -112,4 +128,12 @@ size: this._conf.size,

}
}).on('stream', onStream(this, errorproxy))
})
const onIntakeError = (err) => {
if (this.destroyed === false) {
this.emit('request-error', err)
}
}
this._chopper.on('stream', getChoppedStreamHandler(this, onIntakeError))
// We don't expect the chopper stream to end until the client is ending.
// Make sure to clean up if this does happen unexpectedly.
const fail = () => {

@@ -120,4 +144,4 @@ if (this._writableState.ending === false) this.destroy()

this._index = clients.length
clients.push(this)
this._index = clientsToAutoEnd.length
clientsToAutoEnd.push(this)

@@ -129,2 +153,14 @@ if (this._conf.centralConfig) {

// Return current internal stats.
Client.prototype._getStats = function () {
return {
numEvents: this._numEvents,
numEventsDropped: this._numEventsDropped,
numEventsEnqueued: this._numEventsEnqueued,
numEventsSent: this.sent,
slowWriteBatch: this._slowWriteBatch,
backoffReconnectCount: this._backoffReconnectCount
}
}
Client.prototype.config = function (opts) {

@@ -152,2 +188,5 @@ this._conf = Object.assign(this._conf || {}, opts)

if (!this._conf.bufferWindowSize) this._conf.bufferWindowSize = 50
if (!this._conf.maxQueueSize) this._conf.maxQueueSize = 1024
if (!this._conf.intakeResTimeout) this._conf.intakeResTimeout = 10000
if (!this._conf.intakeResTimeoutOnEnd) this._conf.intakeResTimeoutOnEnd = 1000
this._conf.keepAlive = this._conf.keepAlive !== false

@@ -302,4 +341,11 @@ this._conf.centralConfig = this._conf.centralConfig || false

} else {
this._received++
this._chopper.write(this._encode(obj, enc), cb)
const t = process.hrtime()
const chunk = this._encode(obj, enc)
this._numEventsEnqueued++
this._chopper.write(chunk, cb)
this._log.trace({
fullTimeMs: deltaMs(t),
numEvents: 1,
numBytes: chunk.length
}, '_write: encode object')
}

@@ -309,9 +355,21 @@ }

Client.prototype._writev = function (objs, cb) {
// Limit the size of individual writes to manageable batches, primarily to
// limit large sync pauses due to `_encode`ing in `_writeBatch`. This value
// is not particularly well tuned. It was selected to get sync pauses under
// 10ms on a developer machine.
const MAX_WRITE_BATCH_SIZE = 32
let offset = 0
const processBatch = () => {
let index = -1
for (let i = offset; i < objs.length; i++) {
if (this.destroyed) {
cb()
return
}
let flushIdx = -1
const limit = Math.min(objs.length, offset + MAX_WRITE_BATCH_SIZE)
for (let i = offset; i < limit; i++) {
if (objs[i].chunk === flush) {
index = i
flushIdx = i
break

@@ -321,15 +379,16 @@ }

if (offset === 0 && index === -1) {
// normally there's no flush object queued, so here's a shortcut that just
// skips all the complicated splitting logic
this._writevCleaned(objs, cb)
} else if (index === -1) {
// no more flush elements in the queue, just write the rest
this._writevCleaned(objs.slice(offset), cb)
} else if (index > offset) {
// there's a few items in the queue before we need to flush, let's first write those
this._writevCleaned(objs.slice(offset, index), processBatch)
offset = index
} else if (index === objs.length - 1) {
// the last item in the queue is a flush
if (offset === 0 && flushIdx === -1 && objs.length <= MAX_WRITE_BATCH_SIZE) {
// A shortcut if there is no `flush` and the whole `objs` fits in a batch.
this._writeBatch(objs, cb)
} else if (flushIdx === -1) {
// No `flush` in this batch.
this._writeBatch(objs.slice(offset, limit),
limit === objs.length ? cb : processBatch)
offset = limit
} else if (flushIdx > offset) {
// There are some events in the queue before a `flush`.
this._writeBatch(objs.slice(offset, flushIdx), processBatch)
offset = flushIdx
} else if (flushIdx === objs.length - 1) {
// The next item is a flush, and it is the *last* item in the queue.
this._writeFlush(cb)

@@ -350,10 +409,32 @@ } else {

Client.prototype._writevCleaned = function (objs, cb) {
// Write a batch of events (excluding specially handled "flush" events) to
// the stream chopper.
Client.prototype._writeBatch = function (objs, cb) {
const t = process.hrtime()
const chunk = objs.map(encodeObject.bind(this)).join('')
const encodeTimeMs = deltaMs(t)
this._received += objs.length
this._numEventsEnqueued += objs.length
this._chopper.write(chunk, cb)
const fullTimeMs = deltaMs(t)
if (fullTimeMs > this._slowWriteBatch.fullTimeMs) {
this._slowWriteBatch.encodeTimeMs = encodeTimeMs
this._slowWriteBatch.fullTimeMs = fullTimeMs
this._slowWriteBatch.numEvents = objs.length
this._slowWriteBatch.numBytes = chunk.length
}
if (fullTimeMs > 10) {
this._slowWriteBatch.numOver10Ms++
}
this._log.trace({
encodeTimeMs: encodeTimeMs,
fullTimeMs: fullTimeMs,
numEvents: objs.length,
numBytes: chunk.length
}, '_writeBatch')
}
Client.prototype._writeFlush = function (cb) {
this._log.trace({ active: this._active }, '_writeFlush')
if (this._active) {

@@ -395,3 +476,5 @@ this._onflushed = cb

process.nextTick(() => {
if (this.destroyed === false) this.uncork()
if (this.destroyed === false) {
this.uncork()
}
})

@@ -436,4 +519,13 @@

Client.prototype._shouldDropEvent = function () {
this._numEvents++
const shouldDrop = this._writableState.length >= this._conf.maxQueueSize
if (shouldDrop) {
this._numEventsDropped++
}
return shouldDrop
}
Client.prototype.sendSpan = function (span, cb) {
if (this._isUnsafeToWrite()) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return

@@ -446,3 +538,3 @@ }

Client.prototype.sendTransaction = function (transaction, cb) {
if (this._isUnsafeToWrite()) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return

@@ -455,3 +547,3 @@ }

Client.prototype.sendError = function (error, cb) {
if (this._isUnsafeToWrite()) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return

@@ -464,3 +556,3 @@ }

Client.prototype.sendMetricSet = function (metricset, cb) {
if (this._isUnsafeToWrite()) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return

@@ -483,2 +575,3 @@ }

Client.prototype._final = function (cb) {
this._log.trace('_final')
if (this._configTimer) {

@@ -488,3 +581,3 @@ clearTimeout(this._configTimer)

}
clients[this._index] = null // remove global reference to ease garbage collection
clientsToAutoEnd[this._index] = null // remove global reference to ease garbage collection
this._ref()

@@ -496,2 +589,3 @@ this._chopper.end()

Client.prototype._destroy = function (err, cb) {
this._log.trace({ err }, '_destroy')
if (this._configTimer) {

@@ -505,3 +599,3 @@ clearTimeout(this._configTimer)

}
clients[this._index] = null // remove global reference to ease garbage collection
clientsToAutoEnd[this._index] = null // remove global reference to ease garbage collection
this._chopper.destroy()

@@ -512,72 +606,132 @@ this._agent.destroy()

function onStream (client, onerror) {
return function (stream, next) {
const onerrorproxy = (err) => {
stream.removeListener('error', onerrorproxy)
req.removeListener('error', onerrorproxy)
destroyStream(stream)
onerror(err)
}
// Return the appropriate backoff delay (in milliseconds) before a next possible
// request to APM server.
// Spec: https://github.com/elastic/apm/blob/master/specs/agents/transport.md#transport-errors
Client.prototype._getBackoffDelay = function (isErr) {
let reconnectCount = this._backoffReconnectCount
if (isErr) {
this._backoffReconnectCount++
} else {
this._backoffReconnectCount = 0
reconnectCount = 0
}
// min(reconnectCount++, 6) ** 2 ± 10%
const delayS = Math.pow(Math.min(reconnectCount, 6), 2)
const jitterS = delayS * (0.2 * Math.random() - 0.1)
const delayMs = (delayS + jitterS) * 1000
return delayMs
}
function getChoppedStreamHandler (client, onerror) {
// Make a request to the apm-server intake API.
// https://www.elastic.co/guide/en/apm/server/current/events-api.html
//
// In normal operation this works as follows:
// - The StreamChopper (`this._chopper`) calls this function with a newly
// created Gzip stream, to which it writes encoded event data.
// - It `gzipStream.end()`s the stream when:
// (a) approximately `apiRequestSize` of data have been written,
// (b) `apiRequestTime` seconds have passed, or
// (c) `_chopper.chop()` is explicitly called via `client.flush()`,
// e.g. used by the Node.js APM agent after `client.sendError()`.
// - This function makes the HTTP POST to the apm-server, pipes the gzipStream
// to it, and waits for the completion of the request and the apm-server
// response.
// - Then it calls the given `next` callback to signal StreamChopper that
// another chopped stream can be created, when there is more the send.
//
// Of course, things can go wrong. Here are the known ways this pipeline can
// conclude.
// - intake response success - A successful response from the APM server. This
// is the normal operation case described above.
// - gzipStream error - An "error" event on the gzip stream.
// - intake request error - An "error" event on the intake HTTP request, e.g.
// ECONNREFUSED or ECONNRESET.
// - intakeResTimeout - A timer started *after* we are finished sending data
// to the APM server by which we require a response (including its body). By
// default this is 10s -- a very long time to allow for a slow or far
// apm-server. If we hit this, APM server is problematic anyway, so the
// delay doesn't add to the problems.
// - serverTimeout - An idle timeout value (default 30s) set on the socket.
// This is a catch-all fallback for an otherwised wedged connection. If this
// is being hit, there is some major issue in the application (possibly a
// bug in the APM agent).
// - process completion - The Client takes pains to always `.unref()` its
// handles to never keep a using process open if it is ready to exit. When
// the process is ready to exit, the following happens:
// - The "beforeExit" handler above will call `client.end()`,
// - which calls `client._ref()` (to *hold the process open* to complete
// this request), then `_chopper.end()` to end the `gzipStream` so
// this request can complete soon.
// - We then expect this request to complete quickly and the process will
// then finish exiting. A subtlety is if the APM server is not responding
// then we'll wait on `intakeResTimeoutOnEnd` (by default 1s).
return function makeIntakeRequest (gzipStream, next) {
const reqId = crypto.randomBytes(16).toString('hex')
const log = client._log.child({ reqId })
const startTime = process.hrtime()
const timeline = []
let bytesWritten = 0
let intakeRes
let intakeResTimer = null
const intakeResTimeout = client._conf.intakeResTimeout
const intakeResTimeoutOnEnd = client._conf.intakeResTimeoutOnEnd
// `_active` is used to coordinate the callback to `client.flush(db)`.
client._active = true
const req = client._transport.request(client._conf.requestIntake, onResult(onerror))
// Handle conclusion of this intake request. Each "part" is expected to call
// `completePart()` at least once -- multiple calls are okay for cases like
// the "error" and "close" events on a stream being called. When a part
// errors or all parts are completed, then we can conclude.
let concluded = false
const completedFromPart = {
gzipStream: false,
intakeReq: false,
intakeRes: false
}
let numToComplete = Object.keys(completedFromPart).length
const completePart = (part, err) => {
log.trace({ err, concluded }, 'completePart %s', part)
timeline.push([deltaMs(startTime), `completePart ${part}`, err && err.message])
assert(part in completedFromPart, `'${part}' is in completedFromPart`)
// Abort the current request if the server responds prior to the request
// being finished
req.on('response', function (res) {
if (!req.finished) {
// In Node.js 8, the zlib stream will emit a 'zlib binding closed'
// error when destroyed. Furthermore, the HTTP response will not emit
// any data events after the request have been destroyed, so it becomes
// impossible to see the error returned by the server if we abort the
// request. So for Node.js 8, we'll work around this by closing the
// stream gracefully.
//
// This results in the gzip buffer being flushed and a little more data
// being sent to the APM Server, but it's better than not getting the
// error body.
if (node8) {
stream.end()
} else {
destroyStream(stream)
if (concluded) {
return
}
// If this is the final part to complete, then we are ready to conclude.
let allPartsCompleted = false
if (!completedFromPart[part]) {
completedFromPart[part] = true
numToComplete--
if (numToComplete === 0) {
allPartsCompleted = true
}
}
})
if (!err && !allPartsCompleted) {
return
}
// Mointor streams for errors so that we can make sure to destory the
// output stream as soon as that occurs
stream.on('error', onerrorproxy)
req.on('error', onerrorproxy)
// Conclude.
concluded = true
if (err) {
// There was an error: clean up resources.
req.on('socket', function (socket) {
// Sockets will automatically be unreffed by the HTTP agent when they are
// not in use by an HTTP request, but as we're keeping the HTTP request
// open, we need to unref the socket manually
socket.unref()
})
// Note that in Node v8, destroying the gzip stream results in it
// emitting an "error" event as follows. No harm, however.
// Error: gzip stream error: zlib binding closed
// at Gzip._transform (zlib.js:369:15)
// ...
destroyStream(gzipStream)
intakeReq.destroy()
if (intakeResTimer) {
log.trace('cancel intakeResTimer')
clearTimeout(intakeResTimer)
intakeResTimer = null
}
}
if (Number.isFinite(client._conf.serverTimeout)) {
req.setTimeout(client._conf.serverTimeout, function () {
req.destroy(new Error(`APM Server response timeout (${client._conf.serverTimeout}ms)`))
})
}
pump(stream, req, function () {
// This function is technically called with an error, but because we
// manually attach error listeners on all the streams in the pipeline
// above, we can safely ignore it.
//
// We do this for two reasons:
//
// 1) This callback might be called a few ticks too late, in which case a
// race condition could occur where the user would write to the output
// stream before the rest of the system discovered that it was
// unwritable
//
// 2) The error might occur post the end of the stream. In that case we
// would not get it here as the internal error listener would have
// been removed and the stream would throw the error instead
client.sent = client._received
client.sent = client._numEventsEnqueued
client._active = false

@@ -589,32 +743,160 @@ if (client._onflushed) {

next()
const backoffDelayMs = client._getBackoffDelay(!!err)
if (err) {
log.trace({ timeline, bytesWritten, backoffDelayMs, err },
'conclude intake request: error')
onerror(err)
} else {
log.trace({ timeline, bytesWritten, backoffDelayMs },
'conclude intake request: success')
}
if (backoffDelayMs > 0) {
setTimeout(next, backoffDelayMs).unref()
} else {
setImmediate(next)
}
}
// Start the request and set its timeout.
const intakeReq = client._transport.request(client._conf.requestIntake)
if (Number.isFinite(client._conf.serverTimeout)) {
intakeReq.setTimeout(client._conf.serverTimeout)
}
// TODO: log intakeReq and intakeRes when
// https://github.com/elastic/ecs-logging-nodejs/issues/67 is implemented.
log.trace('intake request start')
// Handle events on the intake request.
// https://nodejs.org/api/http.html#http_http_request_options_callback docs
// emitted events on the req and res objects for different scenarios.
intakeReq.on('timeout', () => {
log.trace('intakeReq "timeout"')
// `.destroy(err)` will result in an "error" event.
intakeReq.destroy(new Error(`APM Server response timeout (${client._conf.serverTimeout}ms)`))
})
// Only intended for local debugging
if (client._conf.payloadLogFile) {
if (!client._payloadLogFile) {
client._payloadLogFile = require('fs').createWriteStream(client._conf.payloadLogFile, { flags: 'a' })
intakeReq.on('socket', function (socket) {
// Unref the socket for this request so that the Client does not keep
// the node process running if it otherwise would be done. (This is
// tested by the "unref-client" test in test/side-effects.js.)
//
// The HTTP keep-alive agent will unref sockets when unused, and ref them
// during a request. Given that the normal makeIntakeRequest behaviour
// is to keep a request open for up to 10s (`apiRequestTimeout`), we must
// manually unref the socket.
log.trace('intakeReq "socket": unref it')
socket.unref()
})
intakeReq.on('response', (intakeRes_) => {
intakeRes = intakeRes_
log.trace({ statusCode: intakeRes.statusCode, reqFinished: intakeReq.finished },
'intakeReq "response"')
let err
const chunks = []
if (!intakeReq.finished) {
// Premature response from APM server. Typically this is for errors
// like "queue is full", for which the response body will be parsed
// below. However, set an `err` as a fallback for the unexpected case
// that is with a 2xx response.
if (intakeRes.statusCode >= 200 && intakeRes.statusCode < 300) {
err = new Error(`premature apm-server response with statusCode=${intakeRes.statusCode}`)
}
// There is no point (though no harm) in sending more data to the APM
// server. In case reading the error response body takes a while, pause
// the gzip stream until it is destroyed in `completePart()`.
gzipStream.pause()
}
// Manually write to the file instead of using pipe/pump so that the file
// handle isn't closed when the stream ends
stream.pipe(zlib.createGunzip()).on('data', function (chunk) {
client._payloadLogFile.write(chunk)
// Handle events on the intake response.
intakeRes.on('error', (intakeResErr) => {
// I am not aware of a way to get an "error" event on the
// IncomingMessage (see also https://stackoverflow.com/q/53691119), but
// handling it here is preferable to an uncaughtException.
intakeResErr = wrapError(intakeResErr, 'intake response error event')
completePart('intakeRes', intakeResErr)
})
}
intakeRes.on('data', (chunk) => {
chunks.push(chunk)
})
// intakeRes.on('close', () => { log.trace('intakeRes "close"') })
// intakeRes.on('aborted', () => { log.trace('intakeRes "aborted"') })
intakeRes.on('end', () => {
log.trace('intakeRes "end"')
if (intakeResTimer) {
clearTimeout(intakeResTimer)
intakeResTimer = null
}
if (intakeRes.statusCode < 200 || intakeRes.statusCode > 299) {
err = processIntakeErrorResponse(intakeRes, Buffer.concat(chunks))
}
completePart('intakeRes', err)
})
})
// The _encodedMetadata property _should_ be set in the Client
// constructor function after making a cloud metadata call.
//
// Since we cork data until the client._encodedMetadata is set the
// following conditional should not be necessary. However, we'll
// leave it in place out of a healthy sense of caution in case
// something unsets _encodedMetadata or _encodedMetadata is somehow
// never set.
if (!client._encodedMetadata) {
client._encodedMetadata = client._encode({ metadata: client._conf.metadata }, Client.encoding.METADATA)
// intakeReq.on('abort', () => { log.trace('intakeReq "abort"') })
// intakeReq.on('close', () => { log.trace('intakeReq "close"') })
intakeReq.on('finish', () => {
log.trace('intakeReq "finish"')
completePart('intakeReq')
})
intakeReq.on('error', (err) => {
log.trace('intakeReq "error"')
completePart('intakeReq', err)
})
// Handle events on the gzip stream.
gzipStream.on('data', (chunk) => {
bytesWritten += chunk.length
})
gzipStream.on('error', (gzipErr) => {
log.trace('gzipStream "error"')
gzipErr = wrapError(gzipErr, 'gzip stream error')
completePart('gzipStream', gzipErr)
})
gzipStream.on('finish', () => {
// If the apm-server is not reading its input and the gzip data is large
// enough to fill buffers, then the gzip stream will emit "finish", but
// not "end". Therefore, this "finish" event is the best indicator that
// the ball is now in the apm-server's court.
//
// We now start a timer waiting on the response, provided we still expect
// one (we don't if the request has already errored out, e.g.
// ECONNREFUSED) and it hasn't already completed (e.g. if it replied
// quickly with "queue is full").
log.trace('gzipStream "finish"')
if (!completedFromPart.intakeReq && !completedFromPart.intakeRes) {
const timeout = client._writableState.ending ? intakeResTimeoutOnEnd : intakeResTimeout
log.trace({ timeout }, 'start intakeResTimer')
intakeResTimer = setTimeout(() => {
completePart('intakeRes',
new Error('intake response timeout: APM server did not respond ' +
`within ${timeout / 1000}s of gzip stream finish`))
}, timeout).unref()
}
})
// Watch the gzip "end" event for its completion, because the "close" event
// that we would prefer to use, *does not get emitted* for the
// `client.sendSpan(callback) + client.flush()` test case with
// *node v12-only*.
gzipStream.on('end', () => {
log.trace('gzipStream "end"')
completePart('gzipStream')
})
// gzipStream.on('close', () => { log.trace('gzipStream "close"') })
// Hook up writing data to a file (only intended for local debugging).
// Append the intake data to `payloadLogFile`, if given. This is only
// intended for local debugging because it can have a significant perf
// impact.
if (client._conf.payloadLogFile) {
const payloadLogStream = fs.createWriteStream(client._conf.payloadLogFile, { flags: 'a' })
gzipStream.pipe(zlib.createGunzip()).pipe(payloadLogStream)
}
// All requests to the APM Server must start with a metadata object
stream.write(client._encodedMetadata)
// Send the metadata object (always first) and hook up the streams.
assert(client._encodedMetadata, 'client._encodedMetadata is set')
gzipStream.write(client._encodedMetadata)
gzipStream.pipe(intakeReq)
}

@@ -654,11 +936,2 @@ }

function onResult (onerror) {
return streamToBuffer.onStream(function (err, buf, res) {
if (err) return onerror(err)
if (res.statusCode < 200 || res.statusCode > 299) {
onerror(processIntakeErrorResponse(res, buf))
}
})
}
function getIntakeRequestOptions (opts, agent) {

@@ -841,2 +1114,16 @@ const headers = getHeaders(opts)

// Wrap the given Error object, including the given message.
//
// Dev Note: Various techniques exist to wrap `Error`s in node.js and JavaScript
// to provide a cause chain, e.g. see
// https://www.joyent.com/node-js/production/design/errors
// However, I'm not aware of a de facto "winner". Eventually there may be
// https://github.com/tc39/proposal-error-cause
// For now we will simply prefix the existing error object's `message` property.
// This is simple and preserves the root error `stack`.
function wrapError (err, msg) {
err.message = msg + ': ' + err.message
return err
}
function processIntakeErrorResponse (res, buf) {

@@ -848,2 +1135,3 @@ const err = new Error('Unexpected APM Server response')

if (buf.length > 0) {
// https://www.elastic.co/guide/en/apm/server/current/events-api.html#events-api-errors
const body = buf.toString('utf8')

@@ -910,1 +1198,8 @@ const contentType = res.headers['content-type']

}
// Return the time difference (in milliseconds) between the given time `t`
// (a 2-tuple as returned by `process.hrtime()`) and now.
function deltaMs (t) {
const d = process.hrtime(t)
return d[0] * 1e3 + d[1] / 1e6
}
{
"name": "elastic-apm-http-client",
"version": "9.6.0",
"version": "9.7.0",
"description": "A low-level HTTP client for communicating with the Elastic APM intake API",

@@ -12,5 +12,6 @@ "main": "index.js",

],
"// scripts.test": "quoting arg to tape to avoid too long argv, let tape do globbing",
"scripts": {
"coverage": "nyc report --reporter=text-lcov > coverage.lcov && codecov",
"test": "standard && nyc tape test/*.js"
"coverage": "nyc report --reporter=text-lcov > coverage.lcov && (codecov || echo 'warning: ignoring codecov failure')",
"test": "standard && nyc tape \"test/*.js\""
},

@@ -28,3 +29,2 @@ "engines": {

"fast-stream-to-buffer": "^1.0.0",
"pump": "^3.0.0",
"readable-stream": "^3.4.0",

@@ -31,0 +31,0 @@ "stream-chopper": "^3.0.1",

@@ -57,3 +57,4 @@ # elastic-apm-http-client

- `options` - An object containing config options (see below)
- `options` - An object containing config options (see below). All options
are optional, except those marked "(required)".

@@ -98,3 +99,3 @@ Data sent to the APM Server as part of the metadata package:

than the `time` config option. That might result in healthy requests
being aborted prematurely (default: `15000` ms)
being aborted prematurely. (default: `15000` ms)
- `keepAlive` - If set the `false` the client will not reuse sockets

@@ -138,2 +139,21 @@ between requests (default: `true`)

objects)
- `maxQueueSize` - The maximum number of buffered events (transactions,
spans, errors, metricsets). Events are buffered when the agent can't keep
up with sending them to the APM Server or if the APM server is down.
If the queue is full, events are rejected which means transactions, spans,
etc. will be lost. This guards the application from consuming unbounded
memory, possibly overusing CPU (spent on serializing events), and possibly
crashing in case the APM server is unavailable for a long period of time. A
lower value will decrease the heap overhead of the agent, while a higher
value makes it less likely to lose events in case of a temporary spike in
throughput. (default: 1024)
- `intakeResTimeout` - The time (in milliseconds) by which a response from the
[APM Server events intake API](https://www.elastic.co/guide/en/apm/server/current/events-api.html)
is expected *after all the event data for that request has been sent*. This
allows a smaller timeout than `serverTimeout` to handle an APM server that
is accepting connections but is slow to respond. (default: `10000` ms)
- `intakeResTimeoutOnEnd` - The same as `intakeResTimeout`, but used when
the client has ended, hence for the possible last request to APM server. This
is typically a lower value to not hang an ending process that is waiting for
that APM server request to complete. (default: `1000` ms)

@@ -157,5 +177,8 @@ Data sanitizing configuration:

- `logger` - A [pino](https://getpino.io) logger to use for trace and
debug-level logging.
- `payloadLogFile` - Specify a file path to which a copy of all data
sent to the APM Server should be written. The data will be in ndjson
format and will be uncompressed
format and will be uncompressed. Note that using this option can
impact performance.

@@ -225,5 +248,7 @@ ### Event: `config`

An integer indicating the number of events (spans, transactions, or errors)
sent by the client. An event is considered sent when the HTTP request
used to transmit it have ended.
An integer indicating the number of events (spans, transactions, errors, or
metricsets) sent by the client. An event is considered sent when the HTTP
request used to transmit it has ended. Note that errors in requests to APM
server may mean this value is not the same as the number of events *accepted*
by the APM server.

@@ -230,0 +255,0 @@ ### `client.config(options)`

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc