Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

elastic-apm-http-client

Package Overview
Dependencies
Maintainers
3
Versions
62
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

elastic-apm-http-client - npm Package Compare versions

Comparing version 10.4.0 to 11.0.0

24

CHANGELOG.md
# elastic-apm-http-client changelog
## v11.0.0
- Add support for coordinating data flushing in an AWS Lambda environment. The
following two API additions are used to ensure that (a) the Elastic Lambda
extension is signaled at invocation end [per spec](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing)
and (b) a new intake request is not started when a Lambda function invocation
is not active.
- `Client#lambdaStart()` should be used to indicate when a Lambda function
invocation begins.
- `Client#flush([opts,] cb)` now supports an optional `opts.lambdaEnd`
boolean. Set it to true to indicate this is a flush at the end of a Lambda
function invocation.
This is a **BREAKING CHANGE**, because current versions of elastic-apm-node
depend on `^10.4.0`. If this were released as another 10.x, then usage of
current elastic-apm-node with this version of the client would break
behavior in a Lambda environment.
- Add the `freeSocketTimeout` option, with a default of 4000 (ms), and switch
from Node.js's core `http.Agent` to the [agentkeepalive package](https://github.com/node-modules/agentkeepalive)
to fix ECONNRESET issues with HTTP Keep-Alive usage talking to APM Server
(https://github.com/elastic/apm-agent-nodejs/issues/2594).
## v10.4.0

@@ -4,0 +28,0 @@

236

index.js

@@ -10,5 +10,8 @@ 'use strict'

const os = require('os')
const { performance } = require('perf_hooks')
const { URL } = require('url')
const zlib = require('zlib')
const HttpAgentKeepAlive = require('agentkeepalive')
const HttpsAgentKeepAlive = HttpAgentKeepAlive.HttpsAgent
const Filters = require('object-filter-sequence')

@@ -29,3 +32,10 @@ const querystring = require('querystring')

const flush = Symbol('flush')
// These symbols are used as markers in the client stream to indicate special
// flush handling.
const kFlush = Symbol('flush')
const kLambdaEndFlush = Symbol('lambdaEndFlush')
function isFlushMarker (obj) {
return obj === kFlush || obj === kLambdaEndFlush
}
const hostname = os.hostname()

@@ -92,4 +102,4 @@ const requiredOpts = [

this._agent = null
this._active = false
this._onflushed = null
this._activeIntakeReq = false
this._onIntakeReqConcluded = null
this._transport = null

@@ -103,2 +113,5 @@ this._configTimer = null

this._metadataFilters = new Filters()
// _lambdaActive indicates if a Lambda function invocation is active. It is
// only meaningful if `isLambdaExecutionEnvironment`.
this._lambdaActive = false

@@ -188,2 +201,10 @@ // Internal runtime stats for developer debugging/tuning.

// The 'beforeExit' event is significant in Lambda invocation completion
// handling, so we log it for debugging.
if (isLambdaExecutionEnvironment && this._log.isLevelEnabled('trace')) {
process.prependListener('beforeExit', () => {
this._log.trace('process "beforeExit"')
})
}
if (this._conf.centralConfig) {

@@ -233,2 +254,6 @@ this._pollConfig()

this._conf.centralConfig = this._conf.centralConfig || false
if (!('keepAliveMsecs' in this._conf)) this._conf.keepAliveMsecs = 1000
if (!('maxSockets' in this._conf)) this._conf.maxSockets = Infinity
if (!('maxFreeSockets' in this._conf)) this._conf.maxFreeSockets = 256
if (!('freeSocketTimeout' in this._conf)) this._conf.freeSocketTimeout = 4000

@@ -250,2 +275,3 @@ // processed values

let AgentKeepAlive
switch (this._conf.serverUrl.protocol) {

@@ -256,2 +282,3 @@ case 'http:':

this._transportGet = httpGet
AgentKeepAlive = HttpAgentKeepAlive
break

@@ -262,2 +289,3 @@ case 'https:':

this._transportGet = httpsGet
AgentKeepAlive = HttpsAgentKeepAlive
break

@@ -274,9 +302,10 @@ default:

}
var agentOpts = {
this._agent = new AgentKeepAlive({
keepAlive: this._conf.keepAlive,
keepAliveMsecs: this._conf.keepAliveMsecs,
freeSocketTimeout: this._conf.freeSocketTimeout,
timeout: this._conf.serverTimeout,
maxSockets: this._conf.maxSockets,
maxFreeSockets: this._conf.maxFreeSockets
}
this._agent = new this._transport.Agent(agentOpts)
})
}

@@ -287,2 +316,3 @@

this._conf.requestConfig = getConfigRequestOptions(this._conf, this._agent)
this._conf.requestSignalLambdaEnd = getSignalLambdaEndRequestOptions(this._conf, this._agent)

@@ -315,4 +345,4 @@ this._conf.metadata = getMetadata(this._conf)

if (this._conf.expectExtraMetadata) {
this._log.trace('maybe uncork (expectExtraMetadata)')
this._maybeUncork()
this._log.trace('uncorked (expectExtraMetadata)')
}

@@ -437,4 +467,4 @@ }

Client.prototype._write = function (obj, enc, cb) {
if (obj === flush) {
this._writeFlush(cb)
if (isFlushMarker(obj)) {
this._writeFlush(obj, cb)
} else {

@@ -471,3 +501,3 @@ const t = process.hrtime()

for (let i = offset; i < limit; i++) {
if (objs[i].chunk === flush) {
if (isFlushMarker(objs[i].chunk)) {
flushIdx = i

@@ -479,6 +509,6 @@ break

if (offset === 0 && flushIdx === -1 && objs.length <= MAX_WRITE_BATCH_SIZE) {
// A shortcut if there is no `flush` and the whole `objs` fits in a batch.
// A shortcut if there is no flush marker and the whole `objs` fits in a batch.
this._writeBatch(objs, cb)
} else if (flushIdx === -1) {
// No `flush` in this batch.
// No flush marker in this batch.
this._writeBatch(objs.slice(offset, limit),

@@ -488,11 +518,11 @@ limit === objs.length ? cb : processBatch)

} else if (flushIdx > offset) {
// There are some events in the queue before a `flush`.
// There are some events in the queue before a flush marker.
this._writeBatch(objs.slice(offset, flushIdx), processBatch)
offset = flushIdx
} else if (flushIdx === objs.length - 1) {
// The next item is a flush, and it is the *last* item in the queue.
this._writeFlush(cb)
// The next item is a flush marker, and it is the *last* item in the queue.
this._writeFlush(objs[flushIdx].chunk, cb)
} else {
// the next item in the queue is a flush
this._writeFlush(processBatch)
// The next item in the queue is a flush.
this._writeFlush(objs[flushIdx].chunk, processBatch)
offset++

@@ -538,15 +568,20 @@ }

Client.prototype._writeFlush = function (cb) {
this._log.trace({ active: this._active }, '_writeFlush')
if (this._active) {
// In a Lambda environment a flush is almost certainly a signal that the
// runtime environment is about to be frozen: tell the intake request
// to finish up quickly.
if (this._intakeRequestGracefulExitFn && isLambdaExecutionEnvironment) {
this._intakeRequestGracefulExitFn()
Client.prototype._writeFlush = function (flushMarker, cb) {
this._log.trace({ activeIntakeReq: this._activeIntakeReq, lambdaEnd: flushMarker === kLambdaEndFlush }, '_writeFlush')
let onFlushed = cb
if (isLambdaExecutionEnvironment && flushMarker === kLambdaEndFlush) {
onFlushed = () => {
// Signal the Elastic AWS Lambda extension that it is done passing data
// for this invocation, then call `cb()` so the wrapped Lambda handler
// can finish.
this._signalLambdaEnd(cb)
}
this._onflushed = cb
}
if (this._activeIntakeReq) {
this._onIntakeReqConcluded = onFlushed
this._chopper.chop()
} else {
this._chopper.chop(cb)
this._chopper.chop(onFlushed)
}

@@ -556,11 +591,15 @@ }

Client.prototype._maybeCork = function () {
if (!this._writableState.corked && this._conf.bufferWindowTime !== -1) {
this.cork()
if (this._corkTimer && this._corkTimer.refresh) {
// the refresh function was added in Node 10.2.0
this._corkTimer.refresh()
} else {
this._corkTimer = setTimeout(() => {
this.uncork()
}, this._conf.bufferWindowTime)
if (!this._writableState.corked) {
if (isLambdaExecutionEnvironment && !this._lambdaActive) {
this.cork()
} else if (this._conf.bufferWindowTime !== -1) {
this.cork()
if (this._corkTimer && this._corkTimer.refresh) {
// the refresh function was added in Node 10.2.0
this._corkTimer.refresh()
} else {
this._corkTimer = setTimeout(() => {
this.uncork()
}, this._conf.bufferWindowTime)
}
}

@@ -573,6 +612,10 @@ } else if (this._writableState.length >= this._conf.bufferWindowSize) {

Client.prototype._maybeUncork = function () {
// client must remain corked until cloud metadata has been
// fetched-or-skipped.
if (!this._encodedMetadata) {
// The client must remain corked until cloud metadata has been
// fetched-or-skipped.
return
} else if (isLambdaExecutionEnvironment && !this._lambdaActive) {
// In a Lambda env, we must only uncork when an invocation is active,
// otherwise we could start an intake request just before the VM is frozen.
return
}

@@ -585,3 +628,3 @@

process.nextTick(() => {
if (this.destroyed === false) {
if (this.destroyed === false && !(isLambdaExecutionEnvironment && !this._lambdaActive)) {
this.uncork()

@@ -620,2 +663,6 @@ }

Client.prototype.lambdaStart = function () {
this._lambdaActive = true
}
// With the cork/uncork handling on this stream, `this.write`ing on this

@@ -670,4 +717,21 @@ // stream when already destroyed will lead to:

Client.prototype.flush = function (cb) {
this._maybeUncork()
/**
* If possible, start a flush of currently queued APM events to APM server.
*
* "If possible," because there are some guards on uncorking. See `_maybeUncork`.
*
* @param {Object} opts - Optional.
* - {Boolean} opts.lambdaEnd - Optional. Default false. Setting this true
* tells the client to also handle the end of a Lambda function invocation.
* @param {Function} cb - Optional. `cb()` will be called when the data has
* be sent to APM Server (or failed in the attempt).
*/
Client.prototype.flush = function (opts, cb) {
if (typeof opts === 'function') {
cb = opts
opts = {}
} else if (!opts) {
opts = {}
}
const lambdaEnd = !!opts.lambdaEnd

@@ -678,3 +742,16 @@ // Write the special "flush" signal. We do this so that the order of writes

// given to the _write function.
return this.write(flush, cb)
if (lambdaEnd && isLambdaExecutionEnvironment && this._lambdaActive) {
// To flush the current data and ensure that subsequently sent events *in
// the same tick* do not start a new intake request, we must uncork
// synchronously -- rather than the nextTick uncork done in `_maybeUncork()`.
assert(this._encodedMetadata, 'client.flush({lambdaEnd:true}) must not be called before metadata has been set')
const rv = this.write(kLambdaEndFlush, cb)
this.uncork()
this._lambdaActive = false
return rv
} else {
this._maybeUncork()
return this.write(kFlush, cb)
}
}

@@ -803,4 +880,4 @@

// `_active` is used to coordinate the callback to `client.flush(db)`.
client._active = true
// `_activeIntakeReq` is used to coordinate the callback to `client.flush(db)`.
client._activeIntakeReq = true

@@ -861,3 +938,3 @@ // Handle conclusion of this intake request. Each "part" is expected to call

client.sent = client._numEventsEnqueued
client._active = false
client._activeIntakeReq = false
const backoffDelayMs = client._getBackoffDelay(!!err)

@@ -872,5 +949,5 @@ if (err) {

}
if (client._onflushed) {
client._onflushed()
client._onflushed = null
if (client._onIntakeReqConcluded) {
client._onIntakeReqConcluded()
client._onIntakeReqConcluded = null
}

@@ -932,3 +1009,8 @@

// manually unref the socket.
if (!intakeRequestGracefulExitCalled) {
//
// The exception is when in a Lambda environment, where we *do* want to
// keep the node process running to complete this intake request.
// Otherwise a 'beforeExit' event can be sent, which the Lambda runtime
// interprets as "the Lambda handler callback was never called".
if (!isLambdaExecutionEnvironment && !intakeRequestGracefulExitCalled) {
log.trace('intakeReq "socket": unref it')

@@ -1073,2 +1155,51 @@ intakeReqSocket.unref()

/**
* Signal to the Elastic AWS Lambda extension that a lambda function execution
* is done.
* https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing
*
* @param {Function} cb() is called when finished. There are no arguments.
*/
Client.prototype._signalLambdaEnd = function (cb) {
this._log.trace('_signalLambdaEnd start')
const startTime = performance.now()
const finish = errOrErrMsg => {
const durationMs = performance.now() - startTime
if (errOrErrMsg) {
this._log.error({ err: errOrErrMsg, durationMs }, 'error signaling lambda invocation done')
} else {
this._log.trace({ durationMs }, 'signaled lambda invocation done')
}
cb()
}
// We expect to be talking to the localhost Elastic Lambda extension, so we
// want a shorter timeout than `_conf.serverTimeout`.
const TIMEOUT_MS = 5000
const req = this._transportRequest(this._conf.requestSignalLambdaEnd, res => {
res.on('error', err => {
// Not sure this event can ever be emitted, but just in case.
res.destroy(err)
})
res.resume()
if (res.statusCode !== 202) {
finish(`unexpected response status code: ${res.statusCode}`)
return
}
res.on('end', function () {
finish()
})
})
req.setTimeout(TIMEOUT_MS)
req.on('timeout', () => {
this._log.trace('_signalLambdaEnd timeout')
req.destroy(new Error(`timeout (${TIMEOUT_MS}ms) signaling Lambda invocation done`))
})
req.on('error', err => {
finish(err)
})
req.end()
}
/**
* Fetch the APM Server version and set `this._apmServerVersion`.

@@ -1189,2 +1320,9 @@ * https://www.elastic.co/guide/en/apm/server/current/server-info.html

function getSignalLambdaEndRequestOptions (opts, agent) {
const headers = getHeaders(opts)
headers['Content-Length'] = 0
return getBasicRequestOptions('POST', '/intake/v2/events?flushed=true', headers, opts, agent)
}
function getConfigRequestOptions (opts, agent) {

@@ -1191,0 +1329,0 @@ const path = '/config/v1/agents?' + querystring.stringify({

@@ -16,2 +16,3 @@ 'use strict'

child () { return this }
isLevelEnabled (_level) { return false }
}

@@ -18,0 +19,0 @@

{
"name": "elastic-apm-http-client",
"version": "10.4.0",
"version": "11.0.0",
"description": "A low-level HTTP client for communicating with the Elastic APM intake API",

@@ -12,5 +12,4 @@ "main": "index.js",

],
"// scripts.test": "quoting arg to tape to avoid too long argv, let tape do globbing",
"scripts": {
"test": "standard && nyc tape \"test/*.test.js\""
"test": "standard && nyc ./test/run_tests.sh"
},

@@ -23,2 +22,3 @@ "engines": {

"dependencies": {
"agentkeepalive": "^4.2.1",
"breadth-filter": "^2.0.0",

@@ -25,0 +25,0 @@ "container-info": "^1.0.1",

@@ -107,2 +107,11 @@ # elastic-apm-http-client

state. Only relevant if `keepAlive` is set to `true` (default: `256`)
- `freeSocketTimeout` - A number of milliseconds of inactivity on a free
(kept-alive) socket after which to timeout and recycle the socket. Set this to
a value less than the HTTP Keep-Alive timeout of the APM server to avoid
[ECONNRESET exceptions](https://medium.com/ssense-tech/reduce-networking-errors-in-nodejs-23b4eb9f2d83).
This defaults to 4000ms to be less than the [node.js HTTP server default of
5s](https://nodejs.org/api/http.html#serverkeepalivetimeout) (useful when
using a Node.js-based mock APM server) and the [Go lang Dialer `KeepAlive`
default of 15s](https://pkg.go.dev/net#Dialer) (when talking to the Elastic
APM Lambda extension). (default: `4000`)

@@ -345,2 +354,29 @@ Cloud & Extra Metadata Configuration:

### `client.lambdaStart()`
Tells the client that a Lambda function invocation has started.
#### Notes on Lambda usage
To properly handle [data flushing for instrumented Lambda functions](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing)
this Client should be used as follows in a Lambda environment.
- When a Lambda invocation starts, `client.lambdaStart()` must be called.
The Client prevents intake requests to APM Server when in a Lambda environment
when a function invocation is *not* active. This is to ensure that an intake
request does not accidentally span a period when a Lambda VM is frozen,
which can lead to timeouts and lost APM data.
- When a Lambda invocation finishes, `client.flush({lambdaEnd: true}, cb)` must
be called.
The `lambdaEnd: true` tells the Client to (a) mark the lambda as inactive so
a subsequent intake request is not started until the next invocation, and
(b) signal the Elastic AWS Lambda Extension that this invocation is done.
The user's Lambda handler should not finish until `cb` is called. This
ensures that the extension receives tracing data and the end signal before
the Lambda Runtime freezes the VM.
### `client.sendSpan(span[, callback])`

@@ -386,3 +422,3 @@

### `client.flush([callback])`
### `client.flush([opts,] [callback])`

@@ -396,2 +432,7 @@ Flush the internal buffer and end the current HTTP request to the APM

- `opts`:
- `opts.lambdaEnd` - An optional boolean to indicate if this is the final
flush at the end of the Lambda function invocation. The client will do
some extra handling if this is the case. See notes in `client.lambdaStart()`
above.
- `callback` - Callback is called when the internal buffer has been

@@ -398,0 +439,0 @@ flushed and the HTTP request ended. If no HTTP request is in progress

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc