elastic-apm-http-client
Advanced tools
Comparing version 10.4.0 to 11.0.0
# elastic-apm-http-client changelog | ||
## v11.0.0 | ||
- Add support for coordinating data flushing in an AWS Lambda environment. The | ||
following two API additions are used to ensure that (a) the Elastic Lambda | ||
extension is signaled at invocation end [per spec](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing) | ||
and (b) a new intake request is not started when a Lambda function invocation | ||
is not active. | ||
- `Client#lambdaStart()` should be used to indicate when a Lambda function | ||
invocation begins. | ||
- `Client#flush([opts,] cb)` now supports an optional `opts.lambdaEnd` | ||
boolean. Set it to true to indicate this is a flush at the end of a Lambda | ||
function invocation. | ||
This is a **BREAKING CHANGE**, because current versions of elastic-apm-node | ||
depend on `^10.4.0`. If this were released as another 10.x, then usage of | ||
current elastic-apm-node with this version of the client would break | ||
behavior in a Lambda environment. | ||
- Add the `freeSocketTimeout` option, with a default of 4000 (ms), and switch | ||
from Node.js's core `http.Agent` to the [agentkeepalive package](https://github.com/node-modules/agentkeepalive) | ||
to fix ECONNRESET issues with HTTP Keep-Alive usage talking to APM Server | ||
(https://github.com/elastic/apm-agent-nodejs/issues/2594). | ||
## v10.4.0 | ||
@@ -4,0 +28,0 @@ |
236
index.js
@@ -10,5 +10,8 @@ 'use strict' | ||
const os = require('os') | ||
const { performance } = require('perf_hooks') | ||
const { URL } = require('url') | ||
const zlib = require('zlib') | ||
const HttpAgentKeepAlive = require('agentkeepalive') | ||
const HttpsAgentKeepAlive = HttpAgentKeepAlive.HttpsAgent | ||
const Filters = require('object-filter-sequence') | ||
@@ -29,3 +32,10 @@ const querystring = require('querystring') | ||
const flush = Symbol('flush') | ||
// These symbols are used as markers in the client stream to indicate special | ||
// flush handling. | ||
const kFlush = Symbol('flush') | ||
const kLambdaEndFlush = Symbol('lambdaEndFlush') | ||
function isFlushMarker (obj) { | ||
return obj === kFlush || obj === kLambdaEndFlush | ||
} | ||
const hostname = os.hostname() | ||
@@ -92,4 +102,4 @@ const requiredOpts = [ | ||
this._agent = null | ||
this._active = false | ||
this._onflushed = null | ||
this._activeIntakeReq = false | ||
this._onIntakeReqConcluded = null | ||
this._transport = null | ||
@@ -103,2 +113,5 @@ this._configTimer = null | ||
this._metadataFilters = new Filters() | ||
// _lambdaActive indicates if a Lambda function invocation is active. It is | ||
// only meaningful if `isLambdaExecutionEnvironment`. | ||
this._lambdaActive = false | ||
@@ -188,2 +201,10 @@ // Internal runtime stats for developer debugging/tuning. | ||
// The 'beforeExit' event is significant in Lambda invocation completion | ||
// handling, so we log it for debugging. | ||
if (isLambdaExecutionEnvironment && this._log.isLevelEnabled('trace')) { | ||
process.prependListener('beforeExit', () => { | ||
this._log.trace('process "beforeExit"') | ||
}) | ||
} | ||
if (this._conf.centralConfig) { | ||
@@ -233,2 +254,6 @@ this._pollConfig() | ||
this._conf.centralConfig = this._conf.centralConfig || false | ||
if (!('keepAliveMsecs' in this._conf)) this._conf.keepAliveMsecs = 1000 | ||
if (!('maxSockets' in this._conf)) this._conf.maxSockets = Infinity | ||
if (!('maxFreeSockets' in this._conf)) this._conf.maxFreeSockets = 256 | ||
if (!('freeSocketTimeout' in this._conf)) this._conf.freeSocketTimeout = 4000 | ||
@@ -250,2 +275,3 @@ // processed values | ||
let AgentKeepAlive | ||
switch (this._conf.serverUrl.protocol) { | ||
@@ -256,2 +282,3 @@ case 'http:': | ||
this._transportGet = httpGet | ||
AgentKeepAlive = HttpAgentKeepAlive | ||
break | ||
@@ -262,2 +289,3 @@ case 'https:': | ||
this._transportGet = httpsGet | ||
AgentKeepAlive = HttpsAgentKeepAlive | ||
break | ||
@@ -274,9 +302,10 @@ default: | ||
} | ||
var agentOpts = { | ||
this._agent = new AgentKeepAlive({ | ||
keepAlive: this._conf.keepAlive, | ||
keepAliveMsecs: this._conf.keepAliveMsecs, | ||
freeSocketTimeout: this._conf.freeSocketTimeout, | ||
timeout: this._conf.serverTimeout, | ||
maxSockets: this._conf.maxSockets, | ||
maxFreeSockets: this._conf.maxFreeSockets | ||
} | ||
this._agent = new this._transport.Agent(agentOpts) | ||
}) | ||
} | ||
@@ -287,2 +316,3 @@ | ||
this._conf.requestConfig = getConfigRequestOptions(this._conf, this._agent) | ||
this._conf.requestSignalLambdaEnd = getSignalLambdaEndRequestOptions(this._conf, this._agent) | ||
@@ -315,4 +345,4 @@ this._conf.metadata = getMetadata(this._conf) | ||
if (this._conf.expectExtraMetadata) { | ||
this._log.trace('maybe uncork (expectExtraMetadata)') | ||
this._maybeUncork() | ||
this._log.trace('uncorked (expectExtraMetadata)') | ||
} | ||
@@ -437,4 +467,4 @@ } | ||
Client.prototype._write = function (obj, enc, cb) { | ||
if (obj === flush) { | ||
this._writeFlush(cb) | ||
if (isFlushMarker(obj)) { | ||
this._writeFlush(obj, cb) | ||
} else { | ||
@@ -471,3 +501,3 @@ const t = process.hrtime() | ||
for (let i = offset; i < limit; i++) { | ||
if (objs[i].chunk === flush) { | ||
if (isFlushMarker(objs[i].chunk)) { | ||
flushIdx = i | ||
@@ -479,6 +509,6 @@ break | ||
if (offset === 0 && flushIdx === -1 && objs.length <= MAX_WRITE_BATCH_SIZE) { | ||
// A shortcut if there is no `flush` and the whole `objs` fits in a batch. | ||
// A shortcut if there is no flush marker and the whole `objs` fits in a batch. | ||
this._writeBatch(objs, cb) | ||
} else if (flushIdx === -1) { | ||
// No `flush` in this batch. | ||
// No flush marker in this batch. | ||
this._writeBatch(objs.slice(offset, limit), | ||
@@ -488,11 +518,11 @@ limit === objs.length ? cb : processBatch) | ||
} else if (flushIdx > offset) { | ||
// There are some events in the queue before a `flush`. | ||
// There are some events in the queue before a flush marker. | ||
this._writeBatch(objs.slice(offset, flushIdx), processBatch) | ||
offset = flushIdx | ||
} else if (flushIdx === objs.length - 1) { | ||
// The next item is a flush, and it is the *last* item in the queue. | ||
this._writeFlush(cb) | ||
// The next item is a flush marker, and it is the *last* item in the queue. | ||
this._writeFlush(objs[flushIdx].chunk, cb) | ||
} else { | ||
// the next item in the queue is a flush | ||
this._writeFlush(processBatch) | ||
// The next item in the queue is a flush. | ||
this._writeFlush(objs[flushIdx].chunk, processBatch) | ||
offset++ | ||
@@ -538,15 +568,20 @@ } | ||
Client.prototype._writeFlush = function (cb) { | ||
this._log.trace({ active: this._active }, '_writeFlush') | ||
if (this._active) { | ||
// In a Lambda environment a flush is almost certainly a signal that the | ||
// runtime environment is about to be frozen: tell the intake request | ||
// to finish up quickly. | ||
if (this._intakeRequestGracefulExitFn && isLambdaExecutionEnvironment) { | ||
this._intakeRequestGracefulExitFn() | ||
Client.prototype._writeFlush = function (flushMarker, cb) { | ||
this._log.trace({ activeIntakeReq: this._activeIntakeReq, lambdaEnd: flushMarker === kLambdaEndFlush }, '_writeFlush') | ||
let onFlushed = cb | ||
if (isLambdaExecutionEnvironment && flushMarker === kLambdaEndFlush) { | ||
onFlushed = () => { | ||
// Signal the Elastic AWS Lambda extension that it is done passing data | ||
// for this invocation, then call `cb()` so the wrapped Lambda handler | ||
// can finish. | ||
this._signalLambdaEnd(cb) | ||
} | ||
this._onflushed = cb | ||
} | ||
if (this._activeIntakeReq) { | ||
this._onIntakeReqConcluded = onFlushed | ||
this._chopper.chop() | ||
} else { | ||
this._chopper.chop(cb) | ||
this._chopper.chop(onFlushed) | ||
} | ||
@@ -556,11 +591,15 @@ } | ||
Client.prototype._maybeCork = function () { | ||
if (!this._writableState.corked && this._conf.bufferWindowTime !== -1) { | ||
this.cork() | ||
if (this._corkTimer && this._corkTimer.refresh) { | ||
// the refresh function was added in Node 10.2.0 | ||
this._corkTimer.refresh() | ||
} else { | ||
this._corkTimer = setTimeout(() => { | ||
this.uncork() | ||
}, this._conf.bufferWindowTime) | ||
if (!this._writableState.corked) { | ||
if (isLambdaExecutionEnvironment && !this._lambdaActive) { | ||
this.cork() | ||
} else if (this._conf.bufferWindowTime !== -1) { | ||
this.cork() | ||
if (this._corkTimer && this._corkTimer.refresh) { | ||
// the refresh function was added in Node 10.2.0 | ||
this._corkTimer.refresh() | ||
} else { | ||
this._corkTimer = setTimeout(() => { | ||
this.uncork() | ||
}, this._conf.bufferWindowTime) | ||
} | ||
} | ||
@@ -573,6 +612,10 @@ } else if (this._writableState.length >= this._conf.bufferWindowSize) { | ||
Client.prototype._maybeUncork = function () { | ||
// client must remain corked until cloud metadata has been | ||
// fetched-or-skipped. | ||
if (!this._encodedMetadata) { | ||
// The client must remain corked until cloud metadata has been | ||
// fetched-or-skipped. | ||
return | ||
} else if (isLambdaExecutionEnvironment && !this._lambdaActive) { | ||
// In a Lambda env, we must only uncork when an invocation is active, | ||
// otherwise we could start an intake request just before the VM is frozen. | ||
return | ||
} | ||
@@ -585,3 +628,3 @@ | ||
process.nextTick(() => { | ||
if (this.destroyed === false) { | ||
if (this.destroyed === false && !(isLambdaExecutionEnvironment && !this._lambdaActive)) { | ||
this.uncork() | ||
@@ -620,2 +663,6 @@ } | ||
Client.prototype.lambdaStart = function () { | ||
this._lambdaActive = true | ||
} | ||
// With the cork/uncork handling on this stream, `this.write`ing on this | ||
@@ -670,4 +717,21 @@ // stream when already destroyed will lead to: | ||
Client.prototype.flush = function (cb) { | ||
this._maybeUncork() | ||
/** | ||
* If possible, start a flush of currently queued APM events to APM server. | ||
* | ||
* "If possible," because there are some guards on uncorking. See `_maybeUncork`. | ||
* | ||
* @param {Object} opts - Optional. | ||
* - {Boolean} opts.lambdaEnd - Optional. Default false. Setting this true | ||
* tells the client to also handle the end of a Lambda function invocation. | ||
* @param {Function} cb - Optional. `cb()` will be called when the data has | ||
* be sent to APM Server (or failed in the attempt). | ||
*/ | ||
Client.prototype.flush = function (opts, cb) { | ||
if (typeof opts === 'function') { | ||
cb = opts | ||
opts = {} | ||
} else if (!opts) { | ||
opts = {} | ||
} | ||
const lambdaEnd = !!opts.lambdaEnd | ||
@@ -678,3 +742,16 @@ // Write the special "flush" signal. We do this so that the order of writes | ||
// given to the _write function. | ||
return this.write(flush, cb) | ||
if (lambdaEnd && isLambdaExecutionEnvironment && this._lambdaActive) { | ||
// To flush the current data and ensure that subsequently sent events *in | ||
// the same tick* do not start a new intake request, we must uncork | ||
// synchronously -- rather than the nextTick uncork done in `_maybeUncork()`. | ||
assert(this._encodedMetadata, 'client.flush({lambdaEnd:true}) must not be called before metadata has been set') | ||
const rv = this.write(kLambdaEndFlush, cb) | ||
this.uncork() | ||
this._lambdaActive = false | ||
return rv | ||
} else { | ||
this._maybeUncork() | ||
return this.write(kFlush, cb) | ||
} | ||
} | ||
@@ -803,4 +880,4 @@ | ||
// `_active` is used to coordinate the callback to `client.flush(db)`. | ||
client._active = true | ||
// `_activeIntakeReq` is used to coordinate the callback to `client.flush(db)`. | ||
client._activeIntakeReq = true | ||
@@ -861,3 +938,3 @@ // Handle conclusion of this intake request. Each "part" is expected to call | ||
client.sent = client._numEventsEnqueued | ||
client._active = false | ||
client._activeIntakeReq = false | ||
const backoffDelayMs = client._getBackoffDelay(!!err) | ||
@@ -872,5 +949,5 @@ if (err) { | ||
} | ||
if (client._onflushed) { | ||
client._onflushed() | ||
client._onflushed = null | ||
if (client._onIntakeReqConcluded) { | ||
client._onIntakeReqConcluded() | ||
client._onIntakeReqConcluded = null | ||
} | ||
@@ -932,3 +1009,8 @@ | ||
// manually unref the socket. | ||
if (!intakeRequestGracefulExitCalled) { | ||
// | ||
// The exception is when in a Lambda environment, where we *do* want to | ||
// keep the node process running to complete this intake request. | ||
// Otherwise a 'beforeExit' event can be sent, which the Lambda runtime | ||
// interprets as "the Lambda handler callback was never called". | ||
if (!isLambdaExecutionEnvironment && !intakeRequestGracefulExitCalled) { | ||
log.trace('intakeReq "socket": unref it') | ||
@@ -1073,2 +1155,51 @@ intakeReqSocket.unref() | ||
/** | ||
* Signal to the Elastic AWS Lambda extension that a lambda function execution | ||
* is done. | ||
* https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing | ||
* | ||
* @param {Function} cb() is called when finished. There are no arguments. | ||
*/ | ||
Client.prototype._signalLambdaEnd = function (cb) { | ||
this._log.trace('_signalLambdaEnd start') | ||
const startTime = performance.now() | ||
const finish = errOrErrMsg => { | ||
const durationMs = performance.now() - startTime | ||
if (errOrErrMsg) { | ||
this._log.error({ err: errOrErrMsg, durationMs }, 'error signaling lambda invocation done') | ||
} else { | ||
this._log.trace({ durationMs }, 'signaled lambda invocation done') | ||
} | ||
cb() | ||
} | ||
// We expect to be talking to the localhost Elastic Lambda extension, so we | ||
// want a shorter timeout than `_conf.serverTimeout`. | ||
const TIMEOUT_MS = 5000 | ||
const req = this._transportRequest(this._conf.requestSignalLambdaEnd, res => { | ||
res.on('error', err => { | ||
// Not sure this event can ever be emitted, but just in case. | ||
res.destroy(err) | ||
}) | ||
res.resume() | ||
if (res.statusCode !== 202) { | ||
finish(`unexpected response status code: ${res.statusCode}`) | ||
return | ||
} | ||
res.on('end', function () { | ||
finish() | ||
}) | ||
}) | ||
req.setTimeout(TIMEOUT_MS) | ||
req.on('timeout', () => { | ||
this._log.trace('_signalLambdaEnd timeout') | ||
req.destroy(new Error(`timeout (${TIMEOUT_MS}ms) signaling Lambda invocation done`)) | ||
}) | ||
req.on('error', err => { | ||
finish(err) | ||
}) | ||
req.end() | ||
} | ||
/** | ||
* Fetch the APM Server version and set `this._apmServerVersion`. | ||
@@ -1189,2 +1320,9 @@ * https://www.elastic.co/guide/en/apm/server/current/server-info.html | ||
function getSignalLambdaEndRequestOptions (opts, agent) { | ||
const headers = getHeaders(opts) | ||
headers['Content-Length'] = 0 | ||
return getBasicRequestOptions('POST', '/intake/v2/events?flushed=true', headers, opts, agent) | ||
} | ||
function getConfigRequestOptions (opts, agent) { | ||
@@ -1191,0 +1329,0 @@ const path = '/config/v1/agents?' + querystring.stringify({ |
@@ -16,2 +16,3 @@ 'use strict' | ||
child () { return this } | ||
isLevelEnabled (_level) { return false } | ||
} | ||
@@ -18,0 +19,0 @@ |
{ | ||
"name": "elastic-apm-http-client", | ||
"version": "10.4.0", | ||
"version": "11.0.0", | ||
"description": "A low-level HTTP client for communicating with the Elastic APM intake API", | ||
@@ -12,5 +12,4 @@ "main": "index.js", | ||
], | ||
"// scripts.test": "quoting arg to tape to avoid too long argv, let tape do globbing", | ||
"scripts": { | ||
"test": "standard && nyc tape \"test/*.test.js\"" | ||
"test": "standard && nyc ./test/run_tests.sh" | ||
}, | ||
@@ -23,2 +22,3 @@ "engines": { | ||
"dependencies": { | ||
"agentkeepalive": "^4.2.1", | ||
"breadth-filter": "^2.0.0", | ||
@@ -25,0 +25,0 @@ "container-info": "^1.0.1", |
@@ -107,2 +107,11 @@ # elastic-apm-http-client | ||
state. Only relevant if `keepAlive` is set to `true` (default: `256`) | ||
- `freeSocketTimeout` - A number of milliseconds of inactivity on a free | ||
(kept-alive) socket after which to timeout and recycle the socket. Set this to | ||
a value less than the HTTP Keep-Alive timeout of the APM server to avoid | ||
[ECONNRESET exceptions](https://medium.com/ssense-tech/reduce-networking-errors-in-nodejs-23b4eb9f2d83). | ||
This defaults to 4000ms to be less than the [node.js HTTP server default of | ||
5s](https://nodejs.org/api/http.html#serverkeepalivetimeout) (useful when | ||
using a Node.js-based mock APM server) and the [Go lang Dialer `KeepAlive` | ||
default of 15s](https://pkg.go.dev/net#Dialer) (when talking to the Elastic | ||
APM Lambda extension). (default: `4000`) | ||
@@ -345,2 +354,29 @@ Cloud & Extra Metadata Configuration: | ||
### `client.lambdaStart()` | ||
Tells the client that a Lambda function invocation has started. | ||
#### Notes on Lambda usage | ||
To properly handle [data flushing for instrumented Lambda functions](https://github.com/elastic/apm/blob/main/specs/agents/tracing-instrumentation-aws-lambda.md#data-flushing) | ||
this Client should be used as follows in a Lambda environment. | ||
- When a Lambda invocation starts, `client.lambdaStart()` must be called. | ||
The Client prevents intake requests to APM Server when in a Lambda environment | ||
when a function invocation is *not* active. This is to ensure that an intake | ||
request does not accidentally span a period when a Lambda VM is frozen, | ||
which can lead to timeouts and lost APM data. | ||
- When a Lambda invocation finishes, `client.flush({lambdaEnd: true}, cb)` must | ||
be called. | ||
The `lambdaEnd: true` tells the Client to (a) mark the lambda as inactive so | ||
a subsequent intake request is not started until the next invocation, and | ||
(b) signal the Elastic AWS Lambda Extension that this invocation is done. | ||
The user's Lambda handler should not finish until `cb` is called. This | ||
ensures that the extension receives tracing data and the end signal before | ||
the Lambda Runtime freezes the VM. | ||
### `client.sendSpan(span[, callback])` | ||
@@ -386,3 +422,3 @@ | ||
### `client.flush([callback])` | ||
### `client.flush([opts,] [callback])` | ||
@@ -396,2 +432,7 @@ Flush the internal buffer and end the current HTTP request to the APM | ||
- `opts`: | ||
- `opts.lambdaEnd` - An optional boolean to indicate if this is the final | ||
flush at the end of the Lambda function invocation. The client will do | ||
some extra handling if this is the case. See notes in `client.lambdaStart()` | ||
above. | ||
- `callback` - Callback is called when the internal buffer has been | ||
@@ -398,0 +439,0 @@ flushed and the HTTP request ended. If no HTTP request is in progress |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Debug access
Supply chain riskUses debug, reflection and dynamic code execution features.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
98543
1841
458
10
4
+ Addedagentkeepalive@^4.2.1
+ Addedagentkeepalive@4.6.0(transitive)
+ Addedhumanize-ms@1.2.1(transitive)
+ Addedms@2.1.3(transitive)