Socket
Socket
Sign inDemoInstall

@opentelemetry/instrumentation-amqplib

Package Overview
Dependencies
Maintainers
2
Versions
25
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@opentelemetry/instrumentation-amqplib - npm Package Compare versions

Comparing version 0.32.0 to 0.32.1

16

build/src/amqplib.d.ts

@@ -1,11 +0,9 @@

/// <reference types="node" />
import { InstrumentationBase, InstrumentationModuleDefinition } from '@opentelemetry/instrumentation';
import type * as amqp from 'amqplib';
import { AmqplibInstrumentationConfig } from './types';
import { InstrumentationBase, InstrumentationNodeModuleDefinition } from '@opentelemetry/instrumentation';
import { AmqplibInstrumentationConfig, ConsumeMessage, Options, Replies } from './types';
import { InstrumentationConsumeChannel, InstrumentationPublishChannel } from './utils';
export declare class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
export declare class AmqplibInstrumentation extends InstrumentationBase {
protected _config: AmqplibInstrumentationConfig;
constructor(config?: AmqplibInstrumentationConfig);
setConfig(config?: AmqplibInstrumentationConfig): void;
protected init(): InstrumentationModuleDefinition<typeof amqp>;
protected init(): InstrumentationNodeModuleDefinition<unknown>;
private patchConnect;

@@ -19,5 +17,5 @@ private unpatchConnect;

private getAckPatch;
protected getConsumePatch(moduleVersion: string | undefined, original: Function): (this: InstrumentationConsumeChannel, queue: string, onMessage: (msg: amqp.ConsumeMessage | null) => void, options?: amqp.Options.Consume | undefined) => Promise<amqp.Replies.Consume>;
protected getConfirmedPublishPatch(moduleVersion: string | undefined, original: Function): (this: InstrumentationConsumeChannel, exchange: string, routingKey: string, content: Buffer, options?: amqp.Options.Publish | undefined, callback?: ((err: any, ok: amqp.Replies.Empty) => void) | undefined) => boolean;
protected getPublishPatch(moduleVersion: string | undefined, original: Function): (this: InstrumentationPublishChannel, exchange: string, routingKey: string, content: Buffer, options?: amqp.Options.Publish | undefined) => boolean;
protected getConsumePatch(moduleVersion: string | undefined, original: Function): (this: InstrumentationConsumeChannel, queue: string, onMessage: (msg: ConsumeMessage | null) => void, options?: Options.Consume | undefined) => Promise<Replies.Consume>;
protected getConfirmedPublishPatch(moduleVersion: string | undefined, original: Function): (this: InstrumentationConsumeChannel, exchange: string, routingKey: string, content: Buffer, options?: Options.Publish | undefined, callback?: ((err: any, ok: Replies.Empty) => void) | undefined) => boolean;
protected getPublishPatch(moduleVersion: string | undefined, original: Function): (this: InstrumentationPublishChannel, exchange: string, routingKey: string, content: Buffer, options?: Options.Publish | undefined) => boolean;
private createPublishSpan;

@@ -24,0 +22,0 @@ private endConsumerSpan;

@@ -42,3 +42,3 @@ "use strict";

moduleExports = this.unpatchConnect(moduleExports);
if (!instrumentation_1.isWrapped(moduleExports.connect)) {
if (!(0, instrumentation_1.isWrapped)(moduleExports.connect)) {
this._wrap(moduleExports, 'connect', this.getConnectPatch.bind(this));

@@ -49,3 +49,3 @@ }

unpatchConnect(moduleExports) {
if (instrumentation_1.isWrapped(moduleExports.connect)) {
if ((0, instrumentation_1.isWrapped)(moduleExports.connect)) {
this._unwrap(moduleExports, 'connect');

@@ -56,27 +56,27 @@ }

patchChannelModel(moduleExports, moduleVersion) {
if (!instrumentation_1.isWrapped(moduleExports.Channel.prototype.publish)) {
if (!(0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.publish)) {
this._wrap(moduleExports.Channel.prototype, 'publish', this.getPublishPatch.bind(this, moduleVersion));
}
if (!instrumentation_1.isWrapped(moduleExports.Channel.prototype.consume)) {
if (!(0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.consume)) {
this._wrap(moduleExports.Channel.prototype, 'consume', this.getConsumePatch.bind(this, moduleVersion));
}
if (!instrumentation_1.isWrapped(moduleExports.Channel.prototype.ack)) {
if (!(0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.ack)) {
this._wrap(moduleExports.Channel.prototype, 'ack', this.getAckPatch.bind(this, false, types_1.EndOperation.Ack));
}
if (!instrumentation_1.isWrapped(moduleExports.Channel.prototype.nack)) {
if (!(0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.nack)) {
this._wrap(moduleExports.Channel.prototype, 'nack', this.getAckPatch.bind(this, true, types_1.EndOperation.Nack));
}
if (!instrumentation_1.isWrapped(moduleExports.Channel.prototype.reject)) {
if (!(0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.reject)) {
this._wrap(moduleExports.Channel.prototype, 'reject', this.getAckPatch.bind(this, true, types_1.EndOperation.Reject));
}
if (!instrumentation_1.isWrapped(moduleExports.Channel.prototype.ackAll)) {
if (!(0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.ackAll)) {
this._wrap(moduleExports.Channel.prototype, 'ackAll', this.getAckAllPatch.bind(this, false, types_1.EndOperation.AckAll));
}
if (!instrumentation_1.isWrapped(moduleExports.Channel.prototype.nackAll)) {
if (!(0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.nackAll)) {
this._wrap(moduleExports.Channel.prototype, 'nackAll', this.getAckAllPatch.bind(this, true, types_1.EndOperation.NackAll));
}
if (!instrumentation_1.isWrapped(moduleExports.Channel.prototype.emit)) {
if (!(0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.emit)) {
this._wrap(moduleExports.Channel.prototype, 'emit', this.getChannelEmitPatch.bind(this));
}
if (!instrumentation_1.isWrapped(moduleExports.ConfirmChannel.prototype.publish)) {
if (!(0, instrumentation_1.isWrapped)(moduleExports.ConfirmChannel.prototype.publish)) {
this._wrap(moduleExports.ConfirmChannel.prototype, 'publish', this.getConfirmedPublishPatch.bind(this, moduleVersion));

@@ -87,27 +87,27 @@ }

unpatchChannelModel(moduleExports) {
if (instrumentation_1.isWrapped(moduleExports.Channel.prototype.publish)) {
if ((0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.publish)) {
this._unwrap(moduleExports.Channel.prototype, 'publish');
}
if (instrumentation_1.isWrapped(moduleExports.Channel.prototype.consume)) {
if ((0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.consume)) {
this._unwrap(moduleExports.Channel.prototype, 'consume');
}
if (instrumentation_1.isWrapped(moduleExports.Channel.prototype.ack)) {
if ((0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.ack)) {
this._unwrap(moduleExports.Channel.prototype, 'ack');
}
if (instrumentation_1.isWrapped(moduleExports.Channel.prototype.nack)) {
if ((0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.nack)) {
this._unwrap(moduleExports.Channel.prototype, 'nack');
}
if (instrumentation_1.isWrapped(moduleExports.Channel.prototype.reject)) {
if ((0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.reject)) {
this._unwrap(moduleExports.Channel.prototype, 'reject');
}
if (instrumentation_1.isWrapped(moduleExports.Channel.prototype.ackAll)) {
if ((0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.ackAll)) {
this._unwrap(moduleExports.Channel.prototype, 'ackAll');
}
if (instrumentation_1.isWrapped(moduleExports.Channel.prototype.nackAll)) {
if ((0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.nackAll)) {
this._unwrap(moduleExports.Channel.prototype, 'nackAll');
}
if (instrumentation_1.isWrapped(moduleExports.Channel.prototype.emit)) {
if ((0, instrumentation_1.isWrapped)(moduleExports.Channel.prototype.emit)) {
this._unwrap(moduleExports.Channel.prototype, 'emit');
}
if (instrumentation_1.isWrapped(moduleExports.ConfirmChannel.prototype.publish)) {
if ((0, instrumentation_1.isWrapped)(moduleExports.ConfirmChannel.prototype.publish)) {
this._unwrap(moduleExports.ConfirmChannel.prototype, 'publish');

@@ -121,3 +121,3 @@ }

if (err == null) {
const urlAttributes = utils_1.getConnectionAttributesFromUrl(url);
const urlAttributes = (0, utils_1.getConnectionAttributesFromUrl)(url);
// the type of conn in @types/amqplib is amqp.Connection, but in practice the library send the

@@ -128,3 +128,3 @@ // `serverProperties` on the `conn` and not in a property `connection`.

// currently setting as any to calm typescript
const serverAttributes = utils_1.getConnectionAttributesFromServer(conn);
const serverAttributes = (0, utils_1.getConnectionAttributesFromServer)(conn);
conn[utils_1.CONNECTION_ATTRIBUTES] = Object.assign(Object.assign({}, urlAttributes), serverAttributes);

@@ -217,3 +217,3 @@ }

if (self._config.consumeHook) {
instrumentation_1.safeExecuteInTheMiddle(() => self._config.consumeHook(span, { moduleVersion, msg }), e => {
(0, instrumentation_1.safeExecuteInTheMiddle)(() => self._config.consumeHook(span, { moduleVersion, msg }), e => {
if (e) {

@@ -228,3 +228,3 @@ api_1.diag.error('amqplib instrumentation: consumerHook error', e);

msg,
timeOfConsume: core_1.hrTime(),
timeOfConsume: (0, core_1.hrTime)(),
});

@@ -234,3 +234,3 @@ // store the span on the message, so we can end it when user call 'ack' on it

}
api_1.context.with(api_1.trace.setSpan(api_1.context.active(), span), () => {
api_1.context.with(api_1.trace.setSpan(parentContext, span), () => {
onMessage.call(this, msg);

@@ -253,3 +253,3 @@ });

if (self._config.publishHook) {
instrumentation_1.safeExecuteInTheMiddle(() => self._config.publishHook(span, {
(0, instrumentation_1.safeExecuteInTheMiddle)(() => self._config.publishHook(span, {
moduleVersion,

@@ -273,3 +273,3 @@ exchange,

if (self._config.publishConfirmHook) {
instrumentation_1.safeExecuteInTheMiddle(() => self._config.publishConfirmHook(span, {
(0, instrumentation_1.safeExecuteInTheMiddle)(() => self._config.publishConfirmHook(span, {
moduleVersion,

@@ -299,6 +299,6 @@ exchange,

// span ends in the patched callback.
const markedContext = utils_1.markConfirmChannelTracing(api_1.context.active());
const markedContext = (0, utils_1.markConfirmChannelTracing)(api_1.context.active());
const argumentsCopy = [...arguments];
argumentsCopy[3] = modifiedOptions;
argumentsCopy[4] = api_1.context.bind(utils_1.unmarkConfirmChannelTracing(api_1.trace.setSpan(markedContext, span)), patchedOnConfirm);
argumentsCopy[4] = api_1.context.bind((0, utils_1.unmarkConfirmChannelTracing)(api_1.trace.setSpan(markedContext, span)), patchedOnConfirm);
return api_1.context.with(markedContext, original.bind(this, ...argumentsCopy));

@@ -310,3 +310,3 @@ };

return function publish(exchange, routingKey, content, options) {
if (utils_1.isConfirmChannelTracing(api_1.context.active())) {
if ((0, utils_1.isConfirmChannelTracing)(api_1.context.active())) {
// work already done

@@ -319,3 +319,3 @@ return original.apply(this, arguments);

if (self._config.publishHook) {
instrumentation_1.safeExecuteInTheMiddle(() => self._config.publishHook(span, {
(0, instrumentation_1.safeExecuteInTheMiddle)(() => self._config.publishHook(span, {
moduleVersion,

@@ -345,3 +345,3 @@ exchange,

var _a;
const normalizedExchange = utils_1.normalizeExchange(exchange);
const normalizedExchange = (0, utils_1.normalizeExchange)(exchange);
const span = self.tracer.startSpan(`${normalizedExchange} -> ${routingKey} send`, {

@@ -388,3 +388,3 @@ kind: api_1.SpanKind.PRODUCER,

return;
instrumentation_1.safeExecuteInTheMiddle(() => this._config.consumeEndHook(span, { msg, rejected, endOperation }), e => {
(0, instrumentation_1.safeExecuteInTheMiddle)(() => this._config.consumeEndHook(span, { msg, rejected, endOperation }), e => {
if (e) {

@@ -397,3 +397,3 @@ api_1.diag.error('amqplib instrumentation: consumerEndHook error', e);

var _a;
const currentTime = core_1.hrTime();
const currentTime = (0, core_1.hrTime)();
const spansNotEnded = (_a = channel[utils_1.CHANNEL_SPANS_NOT_ENDED]) !== null && _a !== void 0 ? _a : [];

@@ -403,4 +403,4 @@ let i;

const currMessage = spansNotEnded[i];
const timeFromConsume = core_1.hrTimeDuration(currMessage.timeOfConsume, currentTime);
if (core_1.hrTimeToMilliseconds(timeFromConsume) < this._config.consumeTimeoutMs) {
const timeFromConsume = (0, core_1.hrTimeDuration)(currMessage.timeOfConsume, currentTime);
if ((0, core_1.hrTimeToMilliseconds)(timeFromConsume) < this._config.consumeTimeoutMs) {
break;

@@ -407,0 +407,0 @@ }

/// <reference types="node" />
import { Span } from '@opentelemetry/api';
import { InstrumentationConfig } from '@opentelemetry/instrumentation';
import type * as amqp from 'amqplib';
export interface PublishInfo {

@@ -10,3 +9,3 @@ moduleVersion: string | undefined;

content: Buffer;
options?: amqp.Options.Publish;
options?: Options.Publish;
isConfirmChannel?: boolean;

@@ -19,6 +18,6 @@ }

moduleVersion: string | undefined;
msg: amqp.ConsumeMessage;
msg: ConsumeMessage;
}
export interface ConsumeEndInfo {
msg: amqp.ConsumeMessage;
msg: ConsumeMessage;
rejected: boolean | null;

@@ -76,2 +75,204 @@ endOperation: EndOperation;

export declare const DEFAULT_CONFIG: AmqplibInstrumentationConfig;
export declare namespace Options {
interface Connect {
/**
* The to be used protocol
*
* Default value: 'amqp'
*/
protocol?: string;
/**
* Hostname used for connecting to the server.
*
* Default value: 'localhost'
*/
hostname?: string;
/**
* Port used for connecting to the server.
*
* Default value: 5672
*/
port?: number;
/**
* Username used for authenticating against the server.
*
* Default value: 'guest'
*/
username?: string;
/**
* Password used for authenticating against the server.
*
* Default value: 'guest'
*/
password?: string;
/**
* The desired locale for error messages. RabbitMQ only ever uses en_US
*
* Default value: 'en_US'
*/
locale?: string;
/**
* The size in bytes of the maximum frame allowed over the connection. 0 means
* no limit (but since frames have a size field which is an unsigned 32 bit integer, it’s perforce 2^32 - 1).
*
* Default value: 0x1000 (4kb) - That's the allowed minimum, it will fit many purposes
*/
frameMax?: number;
/**
* The period of the connection heartbeat in seconds.
*
* Default value: 0
*/
heartbeat?: number;
/**
* What VHost shall be used.
*
* Default value: '/'
*/
vhost?: string;
}
interface AssertQueue {
exclusive?: boolean;
durable?: boolean;
autoDelete?: boolean;
arguments?: any;
messageTtl?: number;
expires?: number;
deadLetterExchange?: string;
deadLetterRoutingKey?: string;
maxLength?: number;
maxPriority?: number;
}
interface DeleteQueue {
ifUnused?: boolean;
ifEmpty?: boolean;
}
interface AssertExchange {
durable?: boolean;
internal?: boolean;
autoDelete?: boolean;
alternateExchange?: string;
arguments?: any;
}
interface DeleteExchange {
ifUnused?: boolean;
}
interface Publish {
expiration?: string | number;
userId?: string;
CC?: string | string[];
mandatory?: boolean;
persistent?: boolean;
deliveryMode?: boolean | number;
BCC?: string | string[];
contentType?: string;
contentEncoding?: string;
headers?: any;
priority?: number;
correlationId?: string;
replyTo?: string;
messageId?: string;
timestamp?: number;
type?: string;
appId?: string;
}
interface Consume {
consumerTag?: string;
noLocal?: boolean;
noAck?: boolean;
exclusive?: boolean;
priority?: number;
arguments?: any;
}
interface Get {
noAck?: boolean;
}
}
interface ServerProperties {
host: string;
product: string;
version: string;
platform: string;
copyright?: string;
information: string;
[key: string]: string | undefined;
}
export declare namespace Replies {
interface Empty {
}
interface AssertQueue {
queue: string;
messageCount: number;
consumerCount: number;
}
interface PurgeQueue {
messageCount: number;
}
interface DeleteQueue {
messageCount: number;
}
interface AssertExchange {
exchange: string;
}
interface Consume {
consumerTag: string;
}
}
export interface ConfirmChannel {
publish(exchange: string, routingKey: string, content: Buffer, options?: Options.Publish, callback?: (err: any, ok: Replies.Empty) => void): boolean;
sendToQueue(queue: string, content: Buffer, options?: Options.Publish, callback?: (err: any, ok: Replies.Empty) => void): boolean;
waitForConfirms(): Promise<void>;
}
export interface Connection {
close(): Promise<void>;
createChannel(): Promise<any>;
createConfirmChannel(): Promise<ConfirmChannel>;
connection: {
serverProperties: ServerProperties;
};
}
export interface Message {
content: Buffer;
fields: MessageFields;
properties: MessageProperties;
}
export interface GetMessage extends Message {
fields: GetMessageFields;
}
export interface ConsumeMessage extends Message {
fields: ConsumeMessageFields;
}
export interface CommonMessageFields {
deliveryTag: number;
redelivered: boolean;
exchange: string;
routingKey: string;
}
export interface MessageFields extends CommonMessageFields {
messageCount?: number;
consumerTag?: string;
}
export interface GetMessageFields extends CommonMessageFields {
messageCount: number;
}
export interface ConsumeMessageFields extends CommonMessageFields {
deliveryTag: number;
}
export interface MessageProperties {
contentType: any | undefined;
contentEncoding: any | undefined;
headers: any;
deliveryMode: any | undefined;
priority: any | undefined;
correlationId: any | undefined;
replyTo: any | undefined;
expiration: any | undefined;
messageId: any | undefined;
timestamp: any | undefined;
type: any | undefined;
userId: any | undefined;
appId: any | undefined;
clusterId: any | undefined;
}
export {};
//# sourceMappingURL=types.d.ts.map

@@ -25,3 +25,3 @@ "use strict";

exports.CONNECTION_ATTRIBUTES = Symbol('opentelemetry.amqplib.connection.attributes');
const IS_CONFIRM_CHANNEL_CONTEXT_KEY = api_1.createContextKey('opentelemetry.amqplib.channel.is-confirm-channel');
const IS_CONFIRM_CHANNEL_CONTEXT_KEY = (0, api_1.createContextKey)('opentelemetry.amqplib.channel.is-confirm-channel');
const normalizeExchange = (exchangeName) => exchangeName !== '' ? exchangeName : '<default>';

@@ -28,0 +28,0 @@ exports.normalizeExchange = normalizeExchange;

@@ -1,2 +0,2 @@

export declare const VERSION = "0.32.0";
export declare const VERSION = "0.32.1";
//# sourceMappingURL=version.d.ts.map

@@ -20,3 +20,3 @@ "use strict";

// this is autogenerated file, see scripts/version-update.js
exports.VERSION = '0.32.0';
exports.VERSION = '0.32.1';
//# sourceMappingURL=version.js.map
{
"name": "@opentelemetry/instrumentation-amqplib",
"version": "0.32.0",
"version": "0.32.1",
"description": "OpenTelemetry automatic instrumentation for the `amqplib` package",

@@ -50,9 +50,9 @@ "keywords": [

"@opentelemetry/core": "^1.8.0",
"@opentelemetry/instrumentation": "^0.34.0",
"@opentelemetry/semantic-conventions": "^1.0.0",
"@types/amqplib": "^0.5.17"
"@opentelemetry/instrumentation": "^0.35.1",
"@opentelemetry/semantic-conventions": "^1.0.0"
},
"devDependencies": {
"@opentelemetry/api": "^1.3.0",
"@opentelemetry/contrib-test-utils": "^0.33.0",
"@opentelemetry/contrib-test-utils": "^0.33.1",
"@types/amqplib": "^0.5.17",
"@types/lodash": "4.14.178",

@@ -63,3 +63,3 @@ "@types/mocha": "8.2.3",

"amqplib": "0.8.0",
"expect": "27.4.2",
"expect": "29.2.0",
"gts": "3.1.0",

@@ -69,6 +69,6 @@ "lodash": "4.17.21",

"nyc": "15.1.0",
"sinon": "14.0.0",
"sinon": "15.0.1",
"test-all-versions": "5.0.1",
"ts-mocha": "10.0.0",
"typescript": "4.3.5"
"typescript": "4.4.4"
},

@@ -78,3 +78,3 @@ "engines": {

},
"gitHead": "59fa57cfd0dff4ae0e6f3833dff73c55dfd79ee5"
"gitHead": "63e0fc9b6b862f74304abf0343c506a5bd415191"
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc