aedes-otel-instrumentation
Advanced tools
Comparing version 0.1.1 to 0.2.0
@@ -28,3 +28,3 @@ import { Span } from '@opentelemetry/api'; | ||
private getAedesListenerPatch; | ||
private getAedesSubscribePatch; | ||
private getAedesWrapDeliveryFuncPatch; | ||
private callConsumeHook; | ||
@@ -31,0 +31,0 @@ private callConsumeEndHook; |
@@ -160,5 +160,2 @@ "use strict"; | ||
} | ||
if (!this.isWrapped(moduleExports.prototype, 'subscribe')) { | ||
this._wrap(moduleExports.prototype, 'subscribe', this.getAedesSubscribePatch.bind(this)); | ||
} | ||
if (!this.isWrapped(moduleExports.prototype, 'preConnect')) { | ||
@@ -176,2 +173,5 @@ this._wrap(moduleExports.prototype, 'preConnect', this.getAedesPreConnectPatch.bind(this)); | ||
} | ||
if (!this.isWrapped(moduleExports.prototype, 'wrapDeliveryFunc')) { | ||
this._wrap(moduleExports.prototype, 'wrapDeliveryFunc', this.getAedesWrapDeliveryFuncPatch.bind(this)); | ||
} | ||
return moduleExports; | ||
@@ -186,5 +186,2 @@ } | ||
} | ||
if ((0, instrumentation_1.isWrapped)(moduleExports.prototype.subscribe)) { | ||
this._unwrap(moduleExports.prototype, 'subscribe'); | ||
} | ||
if ((0, instrumentation_1.isWrapped)(moduleExports.prototype.preConnect)) { | ||
@@ -202,2 +199,5 @@ this._unwrap(moduleExports.prototype, 'preConnect'); | ||
} | ||
if ((0, instrumentation_1.isWrapped)(moduleExports.prototype.wrapDeliveryFunc)) { | ||
this._unwrap(moduleExports.prototype, 'wrapDeliveryFunc'); | ||
} | ||
return moduleExports; | ||
@@ -265,7 +265,7 @@ } | ||
} | ||
getAedesSubscribePatch(original) { | ||
getAedesWrapDeliveryFuncPatch(original) { | ||
// eslint-disable-next-line @typescript-eslint/no-this-alias | ||
const instrumentation = this; | ||
return function patchedSubscribe(...args) { | ||
const [topic, deliver] = args; | ||
return function patchedWrapDeliveryFunc(...args) { | ||
const [, func] = args; | ||
// still unclear how to set the kind https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/messaging/#span-kind | ||
@@ -275,26 +275,23 @@ const kind = api_1.SpanKind.SERVER; | ||
const client = handleSubscribeCtx.getValue(constants_1.CLIENT_CONTEXT_KEY); | ||
const attributes = { | ||
...(client && client[constants_1.CONNECTION_ATTRIBUTES]), | ||
...(client && { [constants_1.AedesAttributes.CLIENT_ID]: client.id }), | ||
[semantic_conventions_1.SemanticAttributes.MESSAGING_OPERATION]: semantic_conventions_1.MessagingOperationValues.RECEIVE, | ||
// source attribute is present in semantic conventions but missing in implementation | ||
// @see https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/messaging/ | ||
'messaging.source': topic, | ||
'messaging.source.kind': semantic_conventions_1.MessagingDestinationKindValues.TOPIC, | ||
}; | ||
function patchedDeliverFunc(// default to MQEmitter | ||
packet, callback) { | ||
const currentContext = api_1.context.active(); | ||
if (packet.messageId) { | ||
attributes[semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_ID] = | ||
packet.messageId.toString(); | ||
} | ||
attributes[semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION] = packet.topic; | ||
attributes[semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION_KIND] = | ||
semantic_conventions_1.MessagingDestinationKindValues.TOPIC; | ||
attributes[semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES] = | ||
packet.payload.length.toString(); | ||
const parentContext = (0, utils_1.getContextFromPacket)(packet, currentContext); | ||
const parentContext = (0, utils_1.getContextFromPacket)(packet, handleSubscribeCtx); | ||
const startTime = (0, core_1.hrTime)(); | ||
const span = instrumentation.startSpan(`${packet.topic} receive`, { | ||
const topic = packet.topic; | ||
const attributes = { | ||
...(client && client[constants_1.CONNECTION_ATTRIBUTES]), | ||
...(client && { [constants_1.AedesAttributes.CLIENT_ID]: client.id }), | ||
...(packet.messageId && { | ||
[semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_ID]: packet.messageId.toString(), | ||
}), | ||
[semantic_conventions_1.SemanticAttributes.MESSAGING_OPERATION]: semantic_conventions_1.MessagingOperationValues.RECEIVE, | ||
[semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION]: topic, | ||
[semantic_conventions_1.SemanticAttributes.MESSAGING_DESTINATION_KIND]: semantic_conventions_1.MessagingDestinationKindValues.TOPIC, | ||
// source attribute is present in semantic conventions but missing in implementation | ||
// @see https://opentelemetry.io/docs/specs/otel/trace/semantic_conventions/messaging/ | ||
'messaging.source': topic, | ||
'messaging.source.kind': semantic_conventions_1.MessagingDestinationKindValues.TOPIC, | ||
[semantic_conventions_1.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES]: packet.payload.length.toString(), | ||
}; | ||
const span = instrumentation.startSpan(`${topic} receive`, { | ||
kind, | ||
@@ -305,7 +302,7 @@ attributes, | ||
instrumentation.callConsumeHook(span, packet); | ||
function wrappedCallback() { | ||
function patchedCallback() { | ||
instrumentation.callConsumeEndHook(span, packet, null); | ||
instrumentation.endSpan(span, kind, startTime, client, (0, utils_1.getMetricAttributes)(attributes)); | ||
const cb = api_1.context.bind(messageContext, callback); | ||
cb.apply(this); | ||
return cb.apply(this); | ||
} | ||
@@ -315,6 +312,6 @@ // TODO depending on QoS : | ||
// - span should be ended in packet ack for QoS 2 | ||
return api_1.context.with(messageContext, deliver, this, packet, wrappedCallback); | ||
return func.call(this, packet, patchedCallback); | ||
} | ||
args[1] = patchedDeliverFunc; | ||
return api_1.context.with(api_1.context.active(), original, this, ...args); | ||
return original.apply(this, args); | ||
}; | ||
@@ -321,0 +318,0 @@ } |
@@ -126,7 +126,12 @@ "use strict"; | ||
if (!request?.connDetails) { | ||
const address = !request | ||
? stream?.address() | ||
: request.socket.address(); | ||
let address = {}; | ||
if (isNetSocket(stream) && isNetSocketAddress(stream.address())) { | ||
address = stream.address(); | ||
} | ||
else if (typeof request?.socket?.address === 'function' && | ||
isNetSocketAddress(request.socket.address())) { | ||
address = request.socket.address(); | ||
} | ||
return isNetSocketAddress(address) | ||
? `${protocol}://${address?.address}:${address.port}` | ||
? `${protocol}://${address.address}:${address.port}` | ||
: `${protocol}://localhost:1883`; | ||
@@ -133,0 +138,0 @@ } |
{ | ||
"name": "aedes-otel-instrumentation", | ||
"version": "0.1.1", | ||
"version": "0.2.0", | ||
"description": "OpenTelemetry instrumentation library for Aedes MQTT broker", | ||
@@ -34,3 +34,5 @@ "main": "dist/index.js", | ||
"scripts": { | ||
"test": "node --require @esbuild-kit/cjs-loader --test-reporter=spec --test test/*.spec.ts", | ||
"otel-test": "node --require @esbuild-kit/cjs-loader --test-reporter=spec --test test/*.spec.ts", | ||
"aedes-test": "npm run build && node --require ./test/tracing --test test-aedes/*.js", | ||
"test": "npm run otel-test && npm run aedes-test", | ||
"lint": "eslint . --ext .ts", | ||
@@ -109,2 +111,3 @@ "prettier": "prettier --write .", | ||
"@release-it/conventional-changelog": "^7.0.2", | ||
"@sinonjs/fake-timers": "^11.1.0", | ||
"@types/node": "^20.5.8", | ||
@@ -120,9 +123,13 @@ "@typescript-eslint/eslint-plugin": "^6.5.0", | ||
"mqtt": "^5.0.4", | ||
"mqtt-connection": "^4.1.0", | ||
"patch-package": "^8.0.0", | ||
"prettier": "^3.0.3", | ||
"proxyquire": "^2.1.3", | ||
"release-it": "^16.1.5", | ||
"semver": "^7.5.4", | ||
"tap": "^16.3.7", | ||
"tsx": "^3.12.10", | ||
"typescript": "^5.2.2" | ||
"typescript": "^5.2.2", | ||
"websocket-stream": "^5.5.2" | ||
} | ||
} |
57985
1042
28