Comparing version 10.0.0 to 10.1.0
@@ -26,6 +26,6 @@ /* | ||
* @class | ||
* @param pipePath | ||
* @classdesc | ||
* A helper class for wrapping modules with segments | ||
* @param {Agent} agent - The agent this collector will use | ||
* @param {string} pipePath path of the named pipe to the Lambda extension, if it's enabled | ||
*/ | ||
@@ -49,3 +49,3 @@ constructor(agent, pipePath) { | ||
* | ||
* @param arn | ||
* @param {string} arn Amazon Resource Name of the function | ||
*/ | ||
@@ -59,3 +59,3 @@ setLambdaArn(arn) { | ||
* | ||
* @param function_version | ||
* @param {string} function_version version indicator for Lambda function | ||
*/ | ||
@@ -90,3 +90,3 @@ setLambdaFunctionVersion(function_version) { | ||
* | ||
* @param cb | ||
* @param {Function} cb callback function, if any | ||
*/ | ||
@@ -223,2 +223,3 @@ restart(cb) { | ||
* @param {Function} cb The callback to invoke when finished. | ||
* @returns {boolean} indicating if callback was defined and successfully executed | ||
*/ | ||
@@ -278,3 +279,4 @@ flushPayload(cb) { | ||
* | ||
* @param payload | ||
* @param {string} payload serialized stringified-JSON payload to flush | ||
* @returns {boolean} whether or not flush was successful | ||
*/ | ||
@@ -281,0 +283,0 @@ flushToPipeSync(payload) { |
@@ -1125,2 +1125,10 @@ /* | ||
} | ||
}, | ||
batching: { | ||
formatter: boolean, | ||
default: true | ||
}, | ||
compression: { | ||
formatter: boolean, | ||
default: true | ||
} | ||
@@ -1127,0 +1135,0 @@ }, |
@@ -44,3 +44,3 @@ /* | ||
* | ||
* @param {object} traceObserverConfig config item config.infinite_tracing.trace_observer | ||
* @param {object} infiniteTracingConfig config item config.infinite_tracing | ||
* @param {MetricAggregator} metrics metric aggregator, for supportability metrics | ||
@@ -50,6 +50,8 @@ * @param {number} [reconnectDelayMs=15000] number of milliseconds to wait before reconnecting | ||
*/ | ||
constructor(traceObserverConfig, metrics, reconnectDelayMs) { | ||
constructor(infiniteTracingConfig, metrics, reconnectDelayMs = DEFAULT_RECONNECT_DELAY_MS) { | ||
super() | ||
this._compression = infiniteTracingConfig.compression | ||
this._method = infiniteTracingConfig.batching ? 'recordSpanBatch' : 'recordSpan' | ||
this._reconnectDelayMs = reconnectDelayMs || DEFAULT_RECONNECT_DELAY_MS | ||
this._reconnectDelayMs = reconnectDelayMs | ||
@@ -63,2 +65,3 @@ this._metrics = metrics | ||
const traceObserverConfig = infiniteTracingConfig.trace_observer | ||
this._endpoint = this.getTraceObserverEndpoint(traceObserverConfig) | ||
@@ -76,3 +79,2 @@ | ||
* | ||
* @param {string} endpoint the GRPC server's endpoint | ||
* @param {string} licenseKey the agent license key | ||
@@ -82,2 +84,3 @@ * @param {string} runId the current agent run id (also called agent run token) | ||
* @param {string} [rootCerts] string of root (ca) certificates to attach to the connection. | ||
* @returns {GrpcConnection} the instance of grpc connection | ||
*/ | ||
@@ -89,3 +92,2 @@ setConnectionDetails(licenseKey, runId, requestHeadersMap, rootCerts) { | ||
this._rootCerts = rootCerts | ||
return this | ||
@@ -104,5 +106,4 @@ } | ||
* | ||
* @param {int} state The connection state (See connectionStates above) | ||
* @param {ClientDuplexStreamImpl} state The GRPC stream, when defined | ||
* @param stream | ||
* @param {number} state The connection state (See connectionStates above) | ||
* @param {object} stream duplex stream | ||
*/ | ||
@@ -155,6 +156,7 @@ _setState(state, stream = null) { | ||
* | ||
* @param {string} licenseKey | ||
* @param {string} runId | ||
* @param requestHeadersMap | ||
* @param env | ||
* @param {string} licenseKey agent key | ||
* @param {string} runId agent run id | ||
* @param {object} requestHeadersMap map of request headers to include | ||
* @param {object} env process.env | ||
* @returns {object} grpc metadata | ||
*/ | ||
@@ -183,4 +185,4 @@ _getMetadata(licenseKey, runId, requestHeadersMap, env) { | ||
* | ||
* @param {Metadata} metadata | ||
* @param env | ||
* @param {object} metadata metadata to set | ||
* @param {object} env process.env | ||
*/ | ||
@@ -248,3 +250,3 @@ _setTestMetadata(metadata, env) { | ||
* | ||
* @param {ClientDuplexStreamImpl} stream | ||
* @param {object} stream duplex stream | ||
*/ | ||
@@ -276,3 +278,3 @@ _setupSpanStreamObservers(stream) { | ||
logger.info( | ||
'[UNIMPLEMENTED]: Trace Obserserver is no longer available. Shutting down connection.' | ||
'[UNIMPLEMENTED]: Trace Observer is no longer available. Shutting down connection.' | ||
) | ||
@@ -311,3 +313,4 @@ this._disconnect() | ||
* | ||
* @param grpcApi | ||
* @param {object} grpcApi grpc lib | ||
* @returns {object} ssl credentials | ||
*/ | ||
@@ -340,5 +343,7 @@ _generateCredentials(grpcApi) { | ||
* "Connection" can be a somewhat misleading term here. This method | ||
* invokes the "recordSpan" remote proceduce call. Behind the scenes | ||
* invokes the either `recordSpan` or `recordSpanBatch` remote procedure call. Behind the scenes | ||
* this makes an http2 request with the metadata, and then returns | ||
* a stream for further writing. | ||
* | ||
* @returns {object} stream duplex stream | ||
*/ | ||
@@ -349,3 +354,3 @@ _connectSpans() { | ||
// We create here (currently) for consistent error handling. | ||
this._client = this._createClient(this._endpoint) | ||
this._client = this._createClient() | ||
} | ||
@@ -360,3 +365,3 @@ | ||
const stream = this._client.recordSpan(metadata) | ||
const stream = this._client[this._method](metadata) | ||
this._setupSpanStreamObservers(stream) | ||
@@ -375,5 +380,6 @@ | ||
* | ||
* @param endpoint | ||
* @returns {object} protobuf API for IngestService | ||
*/ | ||
_createClient(endpoint) { | ||
_createClient() { | ||
const endpoint = this._endpoint | ||
logger.trace('Creating gRPC client for: ', endpoint) | ||
@@ -388,3 +394,21 @@ | ||
const credentials = this._generateCredentials(grpc) | ||
return new traceApi.IngestService(endpoint, credentials) | ||
// If you want to use mock server use insecure creds | ||
// const credentials = grpc.credentials.createInsecure() | ||
const opts = {} | ||
if (this._compression) { | ||
// 2 = gzip compression | ||
// see: https://github.com/grpc/grpc-node/blob/master/packages/grpc-js/src/compression-algorithms.ts#L21 | ||
opts['grpc.default_compression_algorithm'] = 2 | ||
this._metrics | ||
.getOrCreateMetric(`${NAMES.INFINITE_TRACING.COMPRESSION}/enabled`) | ||
.incrementCallCount() | ||
} else { | ||
this._metrics | ||
.getOrCreateMetric(`${NAMES.INFINITE_TRACING.COMPRESSION}/disabled`) | ||
.incrementCallCount() | ||
} | ||
return new traceApi.IngestService(endpoint, credentials, opts) | ||
} | ||
@@ -391,0 +415,0 @@ } |
@@ -30,2 +30,3 @@ /* | ||
'mysql': { module: './instrumentation/mysql' }, | ||
'@nestjs/core': { type: MODULE_TYPE.WEB_FRAMEWORK }, | ||
'pino': { module: './instrumentation/pino' }, | ||
@@ -56,5 +57,4 @@ 'pg': { type: MODULE_TYPE.DATASTORE }, | ||
'@prisma/client': { type: MODULE_TYPE.DATASTORE }, | ||
'@nestjs/core': { type: MODULE_TYPE.TRACKING }, | ||
'knex': { type: MODULE_TYPE.TRACKING } | ||
} | ||
} |
@@ -267,3 +267,5 @@ /* | ||
QUEUE_SIZE: SUPPORTABILITY.INFINITE_TRACING + '/Span/QueueSize', | ||
DRAIN_DURATION: SUPPORTABILITY.INFINITE_TRACING + '/Drain/Duration' | ||
DRAIN_DURATION: SUPPORTABILITY.INFINITE_TRACING + '/Drain/Duration', | ||
COMPRESSION: `${SUPPORTABILITY.INFINITE_TRACING}/gRPC/Compression`, | ||
BATCHING: `${SUPPORTABILITY.INFINITE_TRACING}/gRPC/Batching` | ||
} | ||
@@ -270,0 +272,0 @@ |
@@ -8,4 +8,2 @@ /* | ||
/* eslint sonarjs/cognitive-complexity: ["error", 29] -- TODO: https://issues.newrelic.com/browse/NEWRELIC-5252 */ | ||
const apiGateway = require('./api-gateway') | ||
@@ -71,2 +69,41 @@ const headerAttributes = require('../header-attributes') | ||
wrapEnders() { | ||
const shim = this.shim | ||
// There is no prependListener in node 4, so we wrap emit to look for 'beforeExit' | ||
// NOTE: This may be converted to holding onto a single ender function if only | ||
// one invocation is executing at a time. | ||
shim.wrap(process, 'emit', function wrapEmit(shim, emit) { | ||
return function wrappedEmit(ev, error) { | ||
// need to add error as uncaughtException to be used | ||
// later to add to transaction errors | ||
if (ev === 'unhandledRejection') { | ||
uncaughtException = error | ||
} | ||
if (['beforeExit', 'unhandledRejection'].includes(ev)) { | ||
transactionEnders.forEach((ender) => { | ||
ender() | ||
}) | ||
transactionEnders = [] | ||
} | ||
return emit.apply(process, arguments) | ||
} | ||
}) | ||
} | ||
wrapFatal() { | ||
const shim = this.shim | ||
shim.wrap(process, '_fatalException', function wrapper(shim, original) { | ||
return function wrappedFatalException(error) { | ||
// logic placed before the _fatalException call, since it ends the invocation | ||
uncaughtException = error | ||
transactionEnders.forEach((ender) => { | ||
ender() | ||
}) | ||
transactionEnders = [] | ||
return original.apply(this, arguments) | ||
} | ||
}) | ||
} | ||
patchLambdaHandler(handler) { | ||
@@ -85,34 +122,4 @@ const awsLambda = this | ||
// There is no prependListener in node 4, so we wrap emit to look for 'beforeExit' | ||
// NOTE: This may be converted to holding onto a single ender function if only | ||
// one invocation is executing at a time. | ||
shim.wrap(process, 'emit', function wrapEmit(shim, emit) { | ||
return function wrappedEmit(ev, error) { | ||
// need to add error as uncaughtException to be used | ||
// later to add to transaction errors | ||
if (ev === 'unhandledRejection') { | ||
uncaughtException = error | ||
} | ||
if (['beforeExit', 'unhandledRejection'].includes(ev)) { | ||
transactionEnders.forEach((ender) => { | ||
ender() | ||
}) | ||
transactionEnders = [] | ||
} | ||
return emit.apply(process, arguments) | ||
} | ||
}) | ||
shim.wrap(process, '_fatalException', function wrapper(shim, original) { | ||
return function wrappedFatalException(error) { | ||
// logic placed before the _fatalException call, since it ends the invocation | ||
uncaughtException = error | ||
transactionEnders.forEach((ender) => { | ||
ender() | ||
}) | ||
transactionEnders = [] | ||
return original.apply(this, arguments) | ||
} | ||
}) | ||
this.wrapEnders() | ||
this.wrapFatal() | ||
} | ||
@@ -161,3 +168,3 @@ | ||
args[cbIndex] = wrapCallbackAndCaptureError( | ||
args[cbIndex] = awsLambda.wrapCallbackAndCaptureError( | ||
transaction, | ||
@@ -171,4 +178,4 @@ txnEnder, | ||
// AWS, but are considered functional. | ||
context.done = wrapCallbackAndCaptureError(transaction, txnEnder, context.done) | ||
context.fail = wrapCallbackAndCaptureError(transaction, txnEnder, context.fail) | ||
context.done = awsLambda.wrapCallbackAndCaptureError(transaction, txnEnder, context.done) | ||
context.fail = awsLambda.wrapCallbackAndCaptureError(transaction, txnEnder, context.fail) | ||
shim.wrap(context, 'succeed', function wrapSucceed(shim, original) { | ||
@@ -223,21 +230,22 @@ return function wrappedSucceed() { | ||
} | ||
} | ||
function wrapCallbackAndCaptureError(transaction, txnEnder, cb, processResult) { | ||
return function wrappedCallback() { | ||
let err = arguments[0] | ||
if (typeof err === 'string') { | ||
err = new Error(err) | ||
} | ||
wrapCallbackAndCaptureError(transaction, txnEnder, cb, processResult) { | ||
const shim = this.shim | ||
return function wrappedCallback() { | ||
let err = arguments[0] | ||
if (typeof err === 'string') { | ||
err = new Error(err) | ||
} | ||
shim.agent.errors.add(transaction, err) | ||
shim.agent.errors.add(transaction, err) | ||
if (processResult) { | ||
const result = arguments[1] | ||
processResult(result) | ||
} | ||
if (processResult) { | ||
const result = arguments[1] | ||
processResult(result) | ||
} | ||
txnEnder() | ||
txnEnder() | ||
return cb.apply(this, arguments) | ||
} | ||
return cb.apply(this, arguments) | ||
} | ||
@@ -244,0 +252,0 @@ } |
@@ -39,2 +39,3 @@ /* | ||
NEXT: 'Nextjs', | ||
NEST: 'Nestjs', | ||
RESTIFY: 'Restify' | ||
@@ -41,0 +42,0 @@ } |
@@ -11,2 +11,3 @@ /* | ||
const StreamingSpanEventAggregator = require('./streaming-span-event-aggregator') | ||
const NAMES = require('../metrics/names').INFINITE_TRACING | ||
@@ -46,12 +47,28 @@ function createSpanEventAggregator(config, collector, metrics) { | ||
const GrpcConnection = require('../grpc/connection') | ||
const SpanStreamer = require('./span-streamer') | ||
const connection = new GrpcConnection(config.infinite_tracing.trace_observer, metrics) | ||
const spanStreamer = new SpanStreamer( | ||
config.license_key, | ||
connection, | ||
metrics, | ||
config.infinite_tracing.span_events.queue_size | ||
) | ||
const connection = new GrpcConnection(config.infinite_tracing, metrics) | ||
let spanStreamer | ||
if (config.infinite_tracing.batching) { | ||
const BatchSpanStreamer = require('./batch-span-streamer') | ||
spanStreamer = new BatchSpanStreamer( | ||
config.license_key, | ||
connection, | ||
metrics, | ||
config.infinite_tracing.span_events.queue_size | ||
) | ||
metrics.getOrCreateMetric(`${NAMES.BATCHING}/enabled`).incrementCallCount() | ||
} else { | ||
const SpanStreamer = require('./span-streamer') | ||
spanStreamer = new SpanStreamer( | ||
config.license_key, | ||
connection, | ||
metrics, | ||
config.infinite_tracing.span_events.queue_size | ||
) | ||
metrics.getOrCreateMetric(`${NAMES.BATCHING}/disabled`).incrementCallCount() | ||
} | ||
// this periodMs has no affect on gRPC calls | ||
// the send method on StreamingSpanEventAggregator is a no-op | ||
const opts = { | ||
@@ -58,0 +75,0 @@ periodMs: 1000, |
/* | ||
* Copyright 2020 New Relic Corporation. All rights reserved. | ||
* Copyright 2023 New Relic Corporation. All rights reserved. | ||
* SPDX-License-Identifier: Apache-2.0 | ||
@@ -9,33 +9,11 @@ */ | ||
const logger = require('../logger').child({ component: 'span-streamer' }) | ||
const NAMES = require('../metrics/names').INFINITE_TRACING | ||
const BaseSpanStreamer = require('./base-span-streamer') | ||
const SPAN_DROP_MSG_INTERVAL_MS = 30000 | ||
const SPAN_DROP_MSG = | ||
'Queue full, dropping spans. ' + | ||
`Will not warn again for ${SPAN_DROP_MSG_INTERVAL_MS / 1000} seconds.` | ||
class SpanStreamer { | ||
class SpanStreamer extends BaseSpanStreamer { | ||
constructor(licenseKey, connection, metrics, queueSize) { | ||
this.stream = null | ||
this.license_key = licenseKey | ||
this.connection = connection | ||
this.queue_size = queueSize | ||
this.spans = [] | ||
this._metrics = metrics | ||
this._writable = false | ||
super(licenseKey, connection, metrics, queueSize) | ||
} | ||
// 'connected' indicates a safely writeable stream. | ||
// May still be mid-connect to gRPC server. | ||
this.connection.on('connected', (stream) => { | ||
logger.info('Span streamer connected') | ||
this.stream = stream | ||
this._writable = true | ||
this.sendQueue() | ||
}) | ||
this.connection.on('disconnected', () => { | ||
logger.info('Span streamer disconnected') | ||
this.stream = null | ||
this._writable = false | ||
}) | ||
addToQueue(span) { | ||
this.spans.push(span) | ||
} | ||
@@ -47,22 +25,5 @@ | ||
write(span) { | ||
this._metrics.getOrCreateMetric(NAMES.SEEN).incrementCallCount() | ||
super.write(span) | ||
// If not writeable (because of backpressure) queue the span | ||
if (!this._writable) { | ||
if (this.spans.length < this.queue_size) { | ||
this.spans.push(span) | ||
return | ||
} | ||
// While this can be directionally calculated between seen/sent the | ||
// queue makes that a bit more disconnected. This will be a bit more specific. | ||
this._metrics.getOrCreateMetric(NAMES.DROPPED).incrementCallCount() | ||
// If the queue is full drop the span | ||
logger.infoOncePer( | ||
'SPAN_DROP_MSG', // key for the OncePer | ||
SPAN_DROP_MSG_INTERVAL_MS, | ||
SPAN_DROP_MSG | ||
) | ||
return | ||
@@ -84,48 +45,10 @@ } | ||
/** | ||
* Sends the span over the stream. Spans are only sent here if the stream is | ||
* in a writable state. If the stream becomes unwritable after sending the | ||
* span, a drain event handler is setup to continue writing when possible. | ||
* | ||
* @param span | ||
*/ | ||
send(span) { | ||
// false indicates the stream has reached the highWaterMark | ||
// and future writes should be avoided until drained. written items, | ||
// including the one that returned false, will still be buffered. | ||
this._writable = this.stream.write(span) | ||
this._metrics.getOrCreateMetric(NAMES.SENT).incrementCallCount() | ||
if (!this._writable) { | ||
const waitDrainStart = Date.now() | ||
const onDrain = this.drain.bind(this, waitDrainStart) | ||
this.stream.once('drain', onDrain) | ||
sendQueue() { | ||
if (!this.spans.length) { | ||
logger.trace('Queue is empty, not sending spans.') | ||
return | ||
} | ||
} | ||
/** | ||
* Drains the span queue that built up when the connection was | ||
* back-pressured or disconnected. `waitDrainStart` is when the stream | ||
* initially blocked, used to time how long the stream was blocked. If this | ||
* is not defined, it is assumed this is being called after a reconnect, | ||
* and the metric is not used. | ||
* | ||
* @param waitDrainStart | ||
*/ | ||
drain(waitDrainStart) { | ||
// Metric can be used to see how frequently completing drains as well as | ||
// average time to drain from when we first notice. | ||
const drainCompleted = Date.now() | ||
const drainDurationMs = drainCompleted - waitDrainStart | ||
this._metrics.getOrCreateMetric(NAMES.DRAIN_DURATION).recordValue(drainDurationMs / 1000) | ||
logger.trace('Sending spans from queue: %s.', this.spans.length) | ||
// Once the 'drain' event fires we can begin writing to the stream again | ||
this._writable = true | ||
this.sendQueue() | ||
} | ||
sendQueue() { | ||
logger.trace('Sending spans from queue.') | ||
// Continue sending the spans that were in the queue. _writable is checked | ||
@@ -142,19 +65,4 @@ // so that if a send fails while clearing the queue, this drain handler can | ||
} | ||
connect(agentRunId, requestHeadersMap) { | ||
this.connection.setConnectionDetails(this.license_key, agentRunId, requestHeadersMap) | ||
this.connection.connectSpans() | ||
} | ||
disconnect() { | ||
this.connection.disconnect() | ||
} | ||
createMetrics() { | ||
this._metrics.getOrCreateMetric(NAMES.QUEUE_CAPACITY).recordValue(this.queue_size) | ||
this._metrics.getOrCreateMetric(NAMES.QUEUE_SIZE).recordValue(this.spans.length) | ||
} | ||
} | ||
module.exports = SpanStreamer |
{ | ||
"name": "newrelic", | ||
"version": "10.0.0", | ||
"version": "10.1.0", | ||
"author": "New Relic Node.js agent team <nodejs@newrelic.com>", | ||
@@ -234,3 +234,2 @@ "license": "Apache-2.0", | ||
"q": "*", | ||
"request": "^2.88.2", | ||
"rimraf": "^2.6.3", | ||
@@ -237,0 +236,0 @@ "should": "*", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Mixed license
License(Experimental) Package contains multiple licenses.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Copyleft License
License(Experimental) Copyleft license information was found.
Found 1 instance in 1 package
Mixed license
License(Experimental) Package contains multiple licenses.
Found 1 instance in 1 package
Non-permissive License
License(Experimental) A license not known to be considered permissive was found.
Found 1 instance in 1 package
39
210
1
100
34510
1997754