Comparing version 12.1.0 to 12.1.1
@@ -107,4 +107,5 @@ /* | ||
return new DatastoreParameters({ | ||
host: clientOpts?.socket?.host || 'localhost', | ||
port_path_or_id: clientOpts?.socket?.path || clientOpts?.socket?.port || '6379', | ||
host: clientOpts?.host || clientOpts?.socket?.host || 'localhost', | ||
port_path_or_id: | ||
clientOpts?.port || clientOpts?.socket?.path || clientOpts?.socket?.port || '6379', | ||
database_name: clientOpts?.database || 0 | ||
@@ -111,0 +112,0 @@ }) |
@@ -12,6 +12,6 @@ /* | ||
} = require('../../shim/specs') | ||
const url = require('url') | ||
const wrapModel = require('./channel-model') | ||
const { setCallback } = require('./utils') | ||
const { setCallback, parseConnectionArgs } = require('./utils') | ||
const wrapChannel = require('./channel') | ||
const { amqpConnection } = require('../../symbols') | ||
@@ -77,2 +77,5 @@ module.exports.instrumentPromiseAPI = instrumentChannelAPI | ||
* Instruments the connect method | ||
* We have to both wrap and record because | ||
* we need the host/port for all subsequent calls on the model/channel | ||
* but record only completes in an active transaction | ||
* | ||
@@ -84,14 +87,31 @@ * @param {Shim} shim instance of shim | ||
function wrapConnect(shim, amqp, promiseMode) { | ||
shim.record(amqp, 'connect', function recordConnect(shim, connect, name, args) { | ||
let [connArgs] = args | ||
const params = new DatastoreParameters() | ||
shim.wrap(amqp, 'connect', function wrapConnect(shim, connect) { | ||
return function wrappedConnect() { | ||
const args = shim.argsToArray.apply(shim, arguments) | ||
const [connArgs] = args | ||
const params = parseConnectionArgs({ shim, connArgs }) | ||
const cb = args[args.length - 1] | ||
if (!promiseMode) { | ||
args[args.length - 1] = function wrappedCallback() { | ||
const cbArgs = shim.argsToArray.apply(shim, arguments) | ||
const [, c] = cbArgs | ||
c.connection[amqpConnection] = params | ||
return cb.apply(this, cbArgs) | ||
} | ||
} | ||
if (shim.isString(connArgs)) { | ||
connArgs = url.parse(connArgs) | ||
params.host = connArgs.hostname | ||
if (connArgs.port) { | ||
params.port = connArgs.port | ||
const result = connect.apply(this, args) | ||
if (promiseMode) { | ||
return result.then((c) => { | ||
c.connection[amqpConnection] = params | ||
return c | ||
}) | ||
} | ||
return result | ||
} | ||
}) | ||
shim.record(amqp, 'connect', function recordConnect(shim, connect, name, args) { | ||
const [connArgs] = args | ||
const params = parseConnectionArgs({ shim, connArgs }) | ||
return new OperationSpec({ | ||
@@ -101,7 +121,8 @@ name: 'amqplib.connect', | ||
promise: promiseMode, | ||
parameters: params, | ||
stream: null, | ||
recorder: null | ||
parameters: new DatastoreParameters({ | ||
host: params.host, | ||
port_path_or_id: params.port | ||
}) | ||
}) | ||
}) | ||
} |
@@ -8,2 +8,3 @@ /* | ||
const { MessageSpec, MessageSubscribeSpec, RecorderSpec } = require('../../shim/specs') | ||
const { amqpConnection } = require('../../symbols') | ||
const CHANNEL_METHODS = [ | ||
@@ -26,9 +27,3 @@ 'close', | ||
] | ||
const { | ||
describeMessage, | ||
setCallback, | ||
parseConnect, | ||
getParametersFromMessage, | ||
TEMP_RE | ||
} = require('./utils') | ||
const { describeMessage, setCallback, getParametersFromMessage, TEMP_RE } = require('./utils') | ||
@@ -94,3 +89,3 @@ /** | ||
shim.recordConsume(proto, 'get', function wrapGet() { | ||
const { host, port } = parseConnect(this?.connection?.stream) | ||
const { host, port } = this?.connection?.[amqpConnection] || {} | ||
return new MessageSpec({ | ||
@@ -121,3 +116,3 @@ destinationName: shim.FIRST, | ||
shim.recordSubscribedConsume(proto, 'consume', function consume() { | ||
const { host, port } = parseConnect(this?.connection?.stream) | ||
const { host, port } = this?.connection?.[amqpConnection] || {} | ||
return new MessageSubscribeSpec({ | ||
@@ -124,0 +119,0 @@ name: 'amqplib.Channel#consume', |
@@ -8,3 +8,4 @@ /* | ||
const { MessageSpec } = require('../../shim/specs') | ||
const { parseConnect, getParameters, TEMP_RE } = require('./utils') | ||
const { amqpConnection } = require('../../symbols') | ||
const { getParameters, TEMP_RE } = require('./utils') | ||
@@ -51,22 +52,24 @@ /** | ||
shim.recordProduce(proto, 'sendMessage', function recordSendMessage(shim, fn, n, args) { | ||
const fields = args[0] | ||
if (!fields) { | ||
return null | ||
} | ||
const isDefault = fields.exchange === '' | ||
let exchange = 'Default' | ||
if (!isDefault) { | ||
exchange = TEMP_RE.test(fields.exchange) ? null : fields.exchange | ||
} | ||
const { host, port } = parseConnect(this?.connection?.stream) | ||
shim.recordProduce(proto, 'sendMessage', recordSendMessage) | ||
} | ||
return new MessageSpec({ | ||
destinationName: exchange, | ||
destinationType: shim.EXCHANGE, | ||
routingKey: fields.routingKey, | ||
headers: fields.headers, | ||
parameters: getParameters({ parameters: Object.create(null), fields, host, port }) | ||
}) | ||
function recordSendMessage(shim, fn, n, args) { | ||
const fields = args[0] | ||
if (!fields) { | ||
return null | ||
} | ||
const isDefault = fields.exchange === '' | ||
let exchange = 'Default' | ||
if (!isDefault) { | ||
exchange = TEMP_RE.test(fields.exchange) ? null : fields.exchange | ||
} | ||
const { host, port } = this?.connection?.[amqpConnection] || {} | ||
return new MessageSpec({ | ||
destinationName: exchange, | ||
destinationType: shim.EXCHANGE, | ||
routingKey: fields.routingKey, | ||
headers: fields.headers, | ||
parameters: getParameters({ parameters: Object.create(null), fields, host, port }) | ||
}) | ||
} |
@@ -11,3 +11,2 @@ /* | ||
} = require('../../shim/specs') | ||
const { amqpConnection } = require('../../symbols') | ||
const TEMP_RE = /^amq\./ | ||
@@ -105,21 +104,2 @@ | ||
/** | ||
* Extracts the host/port from the amqp socket connection. | ||
* Stores on connection as symbol to only parse once. | ||
* | ||
* @param {Socket} socket amqp connection | ||
* @returns {object} {host, port } of connection | ||
*/ | ||
function parseConnect(socket) { | ||
if (socket[amqpConnection]) { | ||
return socket[amqpConnection] | ||
} | ||
const host = ['127.0.0.1', '::1', '[::1]'].includes(socket?.remoteAddress) | ||
? 'localhost' | ||
: socket?.remoteAddress | ||
const port = socket?.remotePort | ||
socket[amqpConnection] = { host, port } | ||
return { host, port } | ||
} | ||
/** | ||
* Helper to set the appropriate value of the callback property | ||
@@ -136,2 +116,24 @@ * in the spec. If it's a promise set to null otherwise set it to `shim.LAST` | ||
/** | ||
* Parses the connection args to return host/port | ||
* | ||
* @param {string|object} connArgs connection arguments | ||
* @returns {object} {host, port } | ||
*/ | ||
function parseConnectionArgs({ shim, connArgs }) { | ||
const params = {} | ||
if (shim.isString(connArgs)) { | ||
connArgs = new URL(connArgs) | ||
params.host = connArgs.hostname | ||
if (connArgs.port) { | ||
params.port = parseInt(connArgs.port, 10) | ||
} | ||
} else { | ||
params.port = connArgs.port || (connArgs.protocol === 'amqp' ? 5672 : 5671) | ||
params.host = connArgs.hostname | ||
} | ||
return params | ||
} | ||
module.exports = { | ||
@@ -141,5 +143,5 @@ describeMessage, | ||
getParametersFromMessage, | ||
parseConnect, | ||
parseConnectionArgs, | ||
setCallback, | ||
TEMP_RE | ||
} |
@@ -14,5 +14,5 @@ /* | ||
agent.metrics | ||
.getOrCreateMetric(`MessageBroker/Kafka/Nodes/${broker}/${kind}/Named/${topic}`) | ||
.getOrCreateMetric(`MessageBroker/Kafka/Nodes/${broker}/${kind}/${topic}`) | ||
.incrementCallCount() | ||
} | ||
} |
@@ -18,3 +18,2 @@ /* | ||
// eslint-disable-next-line sonarjs/cognitive-complexity | ||
module.exports = function instrument(shim, tools) { | ||
@@ -39,3 +38,16 @@ const pinoVersion = shim.pkgVersion | ||
wrapAsJson({ shim, tools }) | ||
} | ||
/** | ||
* Wraps `asJson` to properly decorate and forward logs | ||
* | ||
* @param {object} params to function | ||
* @param {Shim} params.shim instance of shim | ||
* @param {object} params.tools exported `pino/lib/tools` | ||
*/ | ||
function wrapAsJson({ shim, tools }) { | ||
const symbols = shim.require('./lib/symbols') | ||
const { agent } = shim | ||
const { config, metrics } = agent | ||
@@ -103,12 +115,10 @@ shim.wrap(tools, 'asJson', function wrapJson(shim, asJson) { | ||
* | ||
* @param logLine.logLine | ||
* @param {object} logLine log line | ||
* @param {object} metadata NR context data | ||
* @param {string} chindings serialized string of all common log line data | ||
* @param logLine.args | ||
* @param logLine.agent | ||
* @param logLine.chindings | ||
* @param logLine.msg | ||
* @param logLine.level | ||
* @param logLine.logger | ||
* @param {object} params to function | ||
* @param {object} params.logLine log line | ||
* @param {string} params.msg message of log line | ||
* @param {object} params.agent instance of agent | ||
* @param {string} params.chindings serialized string of all common log line data | ||
* @param {string} params.level log level | ||
* @param {object} params.logger instance of agent logger | ||
* @returns {function} wrapped log formatter function | ||
*/ | ||
@@ -115,0 +125,0 @@ function reformatLogLine({ logLine, msg, agent, chindings = '', level, logger }) { |
{ | ||
"name": "newrelic", | ||
"version": "12.1.0", | ||
"version": "12.1.1", | ||
"author": "New Relic Node.js agent team <nodejs@newrelic.com>", | ||
@@ -5,0 +5,0 @@ "license": "Apache-2.0", |
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
2044095
40229