Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

rhea-promise

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rhea-promise - npm Package Compare versions

Comparing version 0.1.4 to 0.1.5

4

changelog.md

@@ -0,1 +1,5 @@

### 0.1.5 - 2018-09-27
- Improved log statements for better debugging.
- Any type of `error` event will be emitted with a tick delay. This would give enough time for the `create()` methods to resolve the promise.
### 0.1.4 - 2018-09-25

@@ -2,0 +6,0 @@ - `options` is a required property of `Connection` and `Container`.

109

dist/lib/connection.js

@@ -11,4 +11,4 @@ "use strict";

const constants_1 = require("./util/constants");
const utils_1 = require("./util/utils");
const rhea_1 = require("rhea");
const eventContext_1 = require("./eventContext");
// Determines whether the given object is a CreatedRheConnectionOptions object.

@@ -117,6 +117,4 @@ function isCreatedRheaConnectionOptions(obj) {

removeListeners();
setTimeout(() => {
log.connection("[%s] Resolving the promise with amqp connection.", this.id);
resolve(this);
});
log.connection("[%s] Resolving the promise with amqp connection.", this.id);
resolve(this);
};

@@ -152,4 +150,4 @@ onClose = (context) => {

* - **Resolves** the promise when rhea emits the "connection_close" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_error" event while trying
* to close an amqp connection.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_error" event while
* trying to close an amqp connection.
*/

@@ -170,6 +168,4 @@ close() {

removeListeners();
setTimeout(() => {
log.connection("[%s] Resolving the promise as the connection has been successfully closed.", this.id);
resolve();
});
log.connection("[%s] Resolving the promise as the connection has been successfully closed.", this.id);
resolve();
};

@@ -249,4 +245,4 @@ onError = (context) => {

* - **Resolves** the promise with the Session object when rhea emits the "session_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_close" event while trying
* to create an amqp session.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_close" event while
* trying to create an amqp session.
*/

@@ -267,6 +263,4 @@ createSession() {

removeListeners();
setTimeout(() => {
log.connection("[%s] Resolving the promise with amqp session.", this.id);
resolve(session);
});
log.session("[%s] Resolving the promise with amqp session.", this.id);
resolve(session);
};

@@ -287,3 +281,3 @@ onClose = (context) => {

rheaSession.once(rhea_1.SessionEvents.sessionClose, onClose);
log.connection("[%s] Calling amqp session.begin().", this.id);
log.session("[%s] Calling amqp session.begin().", this.id);
waitTimer = setTimeout(actionAfterTimeout, this.options.operationTimeoutInSeconds * 1000);

@@ -360,3 +354,12 @@ rheaSession.begin();

for (const eventName in rhea_1.ConnectionEvents) {
this._connection.on(rhea_1.ConnectionEvents[eventName], (context) => this.emit(rhea_1.ConnectionEvents[eventName], eventContext_1.EventContext.translate(context, this)));
this._connection.on(rhea_1.ConnectionEvents[eventName], (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: rhea_1.ConnectionEvents[eventName],
emitterType: "connection",
connectionId: this.id
};
utils_1.emitEvent(params);
});
}

@@ -366,10 +369,66 @@ // Add event handlers for *_error and *_close events that can be propogated to the connection

// Sender
this._connection.on(rhea_1.SenderEvents.senderError, (context) => this.emit(rhea_1.SenderEvents.senderError, eventContext_1.EventContext.translate(context, this)));
this._connection.on(rhea_1.SenderEvents.senderClose, (context) => this.emit(rhea_1.SenderEvents.senderClose, eventContext_1.EventContext.translate(context, this)));
this._connection.on(rhea_1.SenderEvents.senderError, (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: rhea_1.SenderEvents.senderError,
emitterType: "connection",
connectionId: this.id
};
utils_1.emitEvent(params);
});
this._connection.on(rhea_1.SenderEvents.senderClose, (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: rhea_1.SenderEvents.senderClose,
emitterType: "connection",
connectionId: this.id
};
utils_1.emitEvent(params);
});
// Receiver
this._connection.on(rhea_1.ReceiverEvents.receiverError, (context) => this.emit(rhea_1.ReceiverEvents.receiverError, eventContext_1.EventContext.translate(context, this)));
this._connection.on(rhea_1.ReceiverEvents.receiverClose, (context) => this.emit(rhea_1.ReceiverEvents.receiverClose, eventContext_1.EventContext.translate(context, this)));
this._connection.on(rhea_1.ReceiverEvents.receiverError, (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: rhea_1.ReceiverEvents.receiverError,
emitterType: "connection",
connectionId: this.id
};
utils_1.emitEvent(params);
});
this._connection.on(rhea_1.ReceiverEvents.receiverClose, (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: rhea_1.ReceiverEvents.receiverClose,
emitterType: "connection",
connectionId: this.id
};
utils_1.emitEvent(params);
});
// Session
this._connection.on(rhea_1.SessionEvents.sessionError, (context) => this.emit(rhea_1.SessionEvents.sessionError, eventContext_1.EventContext.translate(context, this)));
this._connection.on(rhea_1.SessionEvents.sessionClose, (context) => this.emit(rhea_1.SessionEvents.sessionClose, eventContext_1.EventContext.translate(context, this)));
this._connection.on(rhea_1.SessionEvents.sessionError, (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: rhea_1.SessionEvents.sessionError,
emitterType: "connection",
connectionId: this.id
};
utils_1.emitEvent(params);
});
this._connection.on(rhea_1.SessionEvents.sessionClose, (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: rhea_1.SessionEvents.sessionClose,
emitterType: "connection",
connectionId: this.id
};
utils_1.emitEvent(params);
});
log.eventHandler("[%s] rhea-promise 'connection' object is listening for events: %o " +
"emitted by rhea's 'connection' object.", this.id, this._connection.eventNames());
}

@@ -376,0 +435,0 @@ }

@@ -8,2 +8,3 @@ "use strict";

const link_1 = require("./link");
const log = require("./log");
var EventContext;

@@ -14,5 +15,10 @@ (function (EventContext) {

* @param rheaContext The received context from rhea's event emitter
* @param emitter Teh rhea-promise equivalent object that is supposed emit the same event.
* @param emitter The rhea-promise equivalent object that is supposed emit the same event
* @param eventName The name of the event for which the context will be translated
*
* @returns EventContext The translated EventContext.
*/
function translate(rheaContext, emitter) {
function translate(rheaContext, emitter, eventName) {
const connectionId = rheaContext.connection.options.id || "";
log.contextTranslator("[%s] Translating the context for event: '%s'.", connectionId, eventName);
// initialize the result

@@ -19,0 +25,0 @@ const result = Object.assign({ _context: rheaContext }, rheaContext);

@@ -9,3 +9,3 @@ "use strict";

const events_1 = require("events");
const eventContext_1 = require("./eventContext");
const utils_1 = require("./util/utils");
var LinkType;

@@ -202,6 +202,4 @@ (function (LinkType) {

removeListeners();
setTimeout(() => {
log[this.type]("[%s] Resolving the promise as the amqp %s has been closed.", this.connection.id, this.type);
resolve();
});
log[this.type]("[%s] Resolving the promise as the amqp %s has been closed.", this.connection.id, this.type);
resolve();
};

@@ -242,4 +240,16 @@ onError = (context) => {

for (const eventName in events) {
this._link.on(events[eventName], (context) => this.emit(events[eventName], eventContext_1.EventContext.translate(context, this)));
this._link.on(events[eventName], (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: events[eventName],
emitterType: this.type,
connectionId: this.connection.id
};
utils_1.emitEvent(params);
});
}
log.eventHandler("[%s] rhea-promise '%s' object is listening for events: %o " +
"emitted by rhea's '%s' object.", this.connection.id, this.type, this._link.eventNames(), this.type);
log.eventHandler("[%s] ListenerCount for event '%s_error' on rhea's '%s' object is: %d.", this.connection.id, this.type, this.type, this._link.listenerCount(`${this.type}_error`));
}

@@ -246,0 +256,0 @@ }

@@ -36,2 +36,12 @@ "use strict";

exports.error = debugModule("rhea-promise:error");
/**
* @ignore
* log statements for error
*/
exports.eventHandler = debugModule("rhea-promise:eventhandler");
/**
* @ignore
* log statements for error
*/
exports.contextTranslator = debugModule("rhea-promise:translate");
//# sourceMappingURL=log.js.map

@@ -5,3 +5,2 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
const rhea = require("rhea");
const log = require("./log");

@@ -11,4 +10,4 @@ const receiver_1 = require("./receiver");

const rhea_1 = require("rhea");
const utils_1 = require("./util/utils");
const events_1 = require("events");
const eventContext_1 = require("./eventContext");
/**

@@ -69,3 +68,3 @@ * Describes the session that wraps the rhea session.

if (this._session) {
// Remove our listeners and listeners from rhea's session object.
// Remove our listeners and listeners from rhea's 'session' object.
this.removeAllListeners();

@@ -102,6 +101,4 @@ this._session.removeAllListeners();

removeListeners();
setTimeout(() => {
log.session("[%s] Resolving the promise as the amqp session has been closed.", this.connection.id);
resolve();
});
log.session("[%s] Resolving the promise as the amqp session has been closed.", this.connection.id);
resolve();
};

@@ -153,5 +150,9 @@ onError = (context) => {

this.on(rhea_1.SessionEvents.sessionError, options.onSessionError);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
"while creating the 'receiver'.", this.connection.id, rhea_1.SessionEvents.sessionError);
}
if (options && options.onSessionClose) {
this.on(rhea_1.SessionEvents.sessionClose, options.onSessionClose);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
" while creating the 'receiver'.", this.connection.id, rhea_1.SessionEvents.sessionClose);
}

@@ -166,5 +167,7 @@ const rheaReceiver = this._session.attach_receiver(options);

receiver.on(rhea_1.ReceiverEvents.receiverError, options.onError);
log.receiver("[%s] Added event handler for events: '%s', '%s', on rhea-promise 'receiver'.", this.connection.id, rhea_1.ReceiverEvents.message, rhea_1.ReceiverEvents.receiverError);
}
if (options && options.onClose) {
receiver.on(rhea_1.ReceiverEvents.receiverClose, options.onClose);
log.receiver("[%s] Added event handler for event '%s' on rhea-promise 'receiver'.", this.connection.id, rhea_1.ReceiverEvents.receiverClose);
}

@@ -178,6 +181,4 @@ const removeListeners = () => {

removeListeners();
setTimeout(() => {
log.session("[%s] Resolving the promise with amqp receiver '%s'.", this.connection.id, receiver.name);
resolve(receiver);
});
log.receiver("[%s] Resolving the promise with amqp receiver '%s'.", this.connection.id, receiver.name);
resolve(receiver);
};

@@ -215,5 +216,9 @@ onClose = (context) => {

this.on(rhea_1.SessionEvents.sessionError, options.onSessionError);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
"while creating the sender.", this.connection.id, rhea_1.SessionEvents.sessionError);
}
if (options && options.onSessionClose) {
this.on(rhea_1.SessionEvents.sessionClose, options.onSessionClose);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
"while creating the sender.", this.connection.id, rhea_1.SessionEvents.sessionClose);
}

@@ -254,6 +259,4 @@ const rheaSender = this._session.attach_sender(options);

removeListeners();
setTimeout(() => {
log.session("[%s] Resolving the promise with amqp sender '%s'.", this.connection.id, sender.name);
resolve(sender);
});
log.sender("[%s] Resolving the promise with amqp sender '%s'.", this.connection.id, sender.name);
resolve(sender);
};

@@ -286,3 +289,12 @@ onClose = (context) => {

for (const eventName in rhea_1.SessionEvents) {
this._session.on(rhea_1.SessionEvents[eventName], (context) => this.emit(rhea_1.SessionEvents[eventName], eventContext_1.EventContext.translate(context, this)));
this._session.on(rhea_1.SessionEvents[eventName], (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: rhea_1.SessionEvents[eventName],
emitterType: "session",
connectionId: this.connection.id
};
utils_1.emitEvent(params);
});
}

@@ -292,7 +304,45 @@ // Add event handlers for *_error and *_close events that can be propogated to the session

// Sender
this._session.on(rhea.SenderEvents.senderError, (context) => this.emit(rhea.SenderEvents.senderError, eventContext_1.EventContext.translate(context, this)));
this._session.on(rhea.SenderEvents.senderClose, (context) => this.emit(rhea.SenderEvents.senderClose, eventContext_1.EventContext.translate(context, this)));
this._session.on(rhea_1.SenderEvents.senderError, (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: rhea_1.SenderEvents.senderError,
emitterType: "session",
connectionId: this.connection.id
};
utils_1.emitEvent(params);
});
this._session.on(rhea_1.SenderEvents.senderClose, (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: rhea_1.SenderEvents.senderClose,
emitterType: "session",
connectionId: this.connection.id
};
utils_1.emitEvent(params);
});
// Receiver
this._session.on(rhea.ReceiverEvents.receiverError, (context) => this.emit(rhea.ReceiverEvents.receiverError, eventContext_1.EventContext.translate(context, this)));
this._session.on(rhea.ReceiverEvents.receiverClose, (context) => this.emit(rhea.ReceiverEvents.receiverClose, eventContext_1.EventContext.translate(context, this)));
this._session.on(rhea_1.ReceiverEvents.receiverError, (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: rhea_1.ReceiverEvents.receiverError,
emitterType: "session",
connectionId: this.connection.id
};
utils_1.emitEvent(params);
});
this._session.on(rhea_1.ReceiverEvents.receiverClose, (context) => {
const params = {
rheaContext: context,
emitter: this,
eventName: rhea_1.ReceiverEvents.receiverClose,
emitterType: "session",
connectionId: this.connection.id
};
utils_1.emitEvent(params);
});
log.eventHandler("[%s] rhea-promise 'session' object is listening for events: %o " +
"emitted by rhea's 'session' object.", this.connection.id, this._session.eventNames());
}

@@ -299,0 +349,0 @@ }

@@ -5,2 +5,4 @@ "use strict";

Object.defineProperty(exports, "__esModule", { value: true });
const log = require("../log");
const eventContext_1 = require("../eventContext");
/**

@@ -119,2 +121,25 @@ * Defines a mapping for Http like response status codes for different status-code values

exports.parseConnectionString = parseConnectionString;
/**
* @ignore
* Emits an event.
* @param params parameters needed to emit an event from one of the rhea-promise objects.
* @returns void
*/
function emitEvent(params) {
const emit = () => {
log[params.emitterType]("[%s] %s got event: '%s'. Re-emitting the translated context.", params.connectionId, params.emitterType, params.eventName);
params.emitter.emit(params.eventName, eventContext_1.EventContext.translate(params.rheaContext, params.emitter, params.eventName));
};
if (params.eventName.indexOf("error") !== -1) {
log[params.emitterType]("[%s] %s got event: '%s'. Will re-emit in the next tick.", params.connectionId, params.emitterType, params.eventName);
// setTimeout() without any time is equivalent to process.nextTick() and works in node.js and
// browsers. We wait for a tick to emit error events in general. This should give enough
// time for promises to resolve on *_open (create) and *_close (close).
setTimeout(emit);
}
else {
emit();
}
}
exports.emitEvent = emitEvent;
//# sourceMappingURL=utils.js.map

@@ -13,3 +13,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

import { defaultOperationTimeoutInSeconds } from "./util/constants";
import { Func } from "./util/utils";
import { Func, EmitParameters, emitEvent } from "./util/utils";
import {

@@ -21,7 +21,7 @@ ConnectionEvents, SessionEvents, SenderEvents, ReceiverEvents, create_connection,

import { EventContext, OnAmqpEvent } from "./eventContext";
import { OnAmqpEvent } from "./eventContext";
/**
* Describes the options that can be provided while creating an AMQP sender. One can also provide a
* session if it was already created.
* Describes the options that can be provided while creating an AMQP sender. One can also provide
* a session if it was already created.
* @interface SenderOptionsWithSession

@@ -34,4 +34,4 @@ */

/**
* Describes the options that can be provided while creating an AMQP receiver. One can also provide a
* session if it was already created.
* Describes the options that can be provided while creating an AMQP receiver. One can also provide
* a session if it was already created.
* @interface ReceiverOptionsWithSession

@@ -124,3 +124,4 @@ */

/**
* @property {Container} container The underlying Container instance on which the connection exists.
* @property {Container} container The underlying Container instance on which the connection
* exists.
*/

@@ -240,6 +241,4 @@ readonly container: Container;

removeListeners();
setTimeout(() => {
log.connection("[%s] Resolving the promise with amqp connection.", this.id);
resolve(this);
});
log.connection("[%s] Resolving the promise with amqp connection.", this.id);
resolve(this);
};

@@ -280,4 +279,4 @@

* - **Resolves** the promise when rhea emits the "connection_close" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_error" event while trying
* to close an amqp connection.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_error" event while
* trying to close an amqp connection.
*/

@@ -299,7 +298,5 @@ close(): Promise<void> {

removeListeners();
setTimeout(() => {
log.connection("[%s] Resolving the promise as the connection has been successfully closed.",
this.id);
resolve();
});
log.connection("[%s] Resolving the promise as the connection has been successfully closed.",
this.id);
resolve();
};

@@ -389,4 +386,4 @@

* - **Resolves** the promise with the Session object when rhea emits the "session_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_close" event while trying
* to create an amqp session.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_close" event while
* trying to create an amqp session.
*/

@@ -409,6 +406,4 @@ createSession(): Promise<Session> {

removeListeners();
setTimeout(() => {
log.connection("[%s] Resolving the promise with amqp session.", this.id);
resolve(session);
});
log.session("[%s] Resolving the promise with amqp session.", this.id);
resolve(session);
};

@@ -433,3 +428,3 @@

rheaSession.once(SessionEvents.sessionClose, onClose);
log.connection("[%s] Calling amqp session.begin().", this.id);
log.session("[%s] Calling amqp session.begin().", this.id);
waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000);

@@ -476,3 +471,4 @@ rheaSession.begin();

*/
async createRequestResponseLink(senderOptions: SenderOptions, receiverOptions: ReceiverOptions, providedSession?: Session): Promise<ReqResLink> {
async createRequestResponseLink(senderOptions: SenderOptions, receiverOptions: ReceiverOptions,
providedSession?: Session): Promise<ReqResLink> {
if (!senderOptions) {

@@ -489,3 +485,4 @@ throw new Error(`Please provide sender options.`);

]);
log.connection("[%s] Successfully created the sender and receiver links on the same session.", this.id);
log.connection("[%s] Successfully created the sender and receiver links on the same session.",
this.id);
return {

@@ -506,4 +503,12 @@ session: session,

for (const eventName in ConnectionEvents) {
this._connection.on(ConnectionEvents[eventName],
(context) => this.emit(ConnectionEvents[eventName], EventContext.translate(context, this)));
this._connection.on(ConnectionEvents[eventName], (context) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: ConnectionEvents[eventName],
emitterType: "connection",
connectionId: this.id
};
emitEvent(params);
});
}

@@ -513,19 +518,72 @@

// object, if they are not handled at their level. * denotes - Sender, Receiver, Session
// Sender
this._connection.on(SenderEvents.senderError,
(context) => this.emit(SenderEvents.senderError, EventContext.translate(context, this)));
this._connection.on(SenderEvents.senderClose,
(context) => this.emit(SenderEvents.senderClose, EventContext.translate(context, this)));
this._connection.on(SenderEvents.senderError, (context) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: SenderEvents.senderError,
emitterType: "connection",
connectionId: this.id
};
emitEvent(params);
});
this._connection.on(SenderEvents.senderClose, (context) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: SenderEvents.senderClose,
emitterType: "connection",
connectionId: this.id
};
emitEvent(params);
});
// Receiver
this._connection.on(ReceiverEvents.receiverError,
(context) => this.emit(ReceiverEvents.receiverError, EventContext.translate(context, this)));
this._connection.on(ReceiverEvents.receiverClose,
(context) => this.emit(ReceiverEvents.receiverClose, EventContext.translate(context, this)));
this._connection.on(ReceiverEvents.receiverError, (context) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: ReceiverEvents.receiverError,
emitterType: "connection",
connectionId: this.id
};
emitEvent(params);
});
this._connection.on(ReceiverEvents.receiverClose, (context) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: ReceiverEvents.receiverClose,
emitterType: "connection",
connectionId: this.id
};
emitEvent(params);
});
// Session
this._connection.on(SessionEvents.sessionError,
(context) => this.emit(SessionEvents.sessionError, EventContext.translate(context, this)));
this._connection.on(SessionEvents.sessionClose,
(context) => this.emit(SessionEvents.sessionClose, EventContext.translate(context, this)));
this._connection.on(SessionEvents.sessionError, (context) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: SessionEvents.sessionError,
emitterType: "connection",
connectionId: this.id
};
emitEvent(params);
});
this._connection.on(SessionEvents.sessionClose, (context) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: SessionEvents.sessionClose,
emitterType: "connection",
connectionId: this.id
};
emitEvent(params);
});
log.eventHandler("[%s] rhea-promise 'connection' object is listening for events: %o " +
"emitted by rhea's 'connection' object.", this.id, this._connection.eventNames());
}
}

@@ -13,2 +13,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

import { Link, LinkType } from './link';
import * as log from "./log";

@@ -88,5 +89,13 @@ /**

* @param rheaContext The received context from rhea's event emitter
* @param emitter Teh rhea-promise equivalent object that is supposed emit the same event.
* @param emitter The rhea-promise equivalent object that is supposed emit the same event
* @param eventName The name of the event for which the context will be translated
*
* @returns EventContext The translated EventContext.
*/
export function translate(rheaContext: RheaEventContext, emitter: Link | Session | Connection): EventContext {
export function translate(
rheaContext: RheaEventContext,
emitter: Link | Session | Connection,
eventName: string): EventContext {
const connectionId = rheaContext.connection.options.id || "";
log.contextTranslator("[%s] Translating the context for event: '%s'.", connectionId, eventName);
// initialize the result

@@ -93,0 +102,0 @@ const result: EventContext = {

@@ -12,4 +12,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

import { Connection } from "./connection";
import { Func } from './util/utils';
import { EventContext } from "./eventContext";
import { Func, emitEvent, EmitParameters } from './util/utils';

@@ -239,7 +238,5 @@ export enum LinkType {

removeListeners();
setTimeout(() => {
log[this.type]("[%s] Resolving the promise as the amqp %s has been closed.",
this.connection.id, this.type);
resolve();
});
log[this.type]("[%s] Resolving the promise as the amqp %s has been closed.",
this.connection.id, this.type);
resolve();
};

@@ -286,5 +283,19 @@

this._link.on(events[eventName],
(context: RheaEventContext) => this.emit(events[eventName], EventContext.translate(context, this)));
(context: RheaEventContext) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: events[eventName],
emitterType: this.type,
connectionId: this.connection.id
};
emitEvent(params);
});
}
log.eventHandler("[%s] rhea-promise '%s' object is listening for events: %o " +
"emitted by rhea's '%s' object.", this.connection.id, this.type,
this._link.eventNames(), this.type);
log.eventHandler("[%s] ListenerCount for event '%s_error' on rhea's '%s' object is: %d.",
this.connection.id, this.type, this.type, this._link.listenerCount(`${this.type}_error`));
}
}

@@ -35,1 +35,11 @@ // Copyright (c) Microsoft Corporation. All rights reserved.

export const error = debugModule("rhea-promise:error");
/**
* @ignore
* log statements for error
*/
export const eventHandler = debugModule("rhea-promise:eventhandler");
/**
* @ignore
* log statements for error
*/
export const contextTranslator = debugModule("rhea-promise:translate");
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache License. See License in the project root for license information.
import * as rhea from "rhea";
import { Session } from "./session";
import { ReceiverEvents, Receiver as RheaReceiver } from "rhea";
import {
ReceiverEvents, Receiver as RheaReceiver, ReceiverOptions as RheaReceiverOptions
} from "rhea";
import { Link, LinkType } from "./link";

@@ -14,3 +15,3 @@ import { OnAmqpEvent } from "./eventContext";

*/
export interface ReceiverOptions extends rhea.ReceiverOptions {
export interface ReceiverOptions extends RheaReceiverOptions {
/**

@@ -56,3 +57,3 @@ * @property {OnAmqpEvent} [onMessage] The handler that can be provided for receiving the

export class Receiver extends Link {
constructor(session: Session, receiver: rhea.Receiver, options?: ReceiverOptions) {
constructor(session: Session, receiver: RheaReceiver, options?: ReceiverOptions) {
super(LinkType.receiver, session, receiver, options);

@@ -59,0 +60,0 @@ }

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache License. See License in the project root for license information.
import * as rhea from "rhea";
import * as log from "./log";

@@ -9,6 +8,9 @@ import { Connection } from "./connection";

import { Sender, SenderOptions } from "./sender";
import { SenderEvents, ReceiverEvents, SessionEvents, AmqpError } from "rhea";
import { Func } from "./util/utils";
import {
SenderEvents, ReceiverEvents, SessionEvents, AmqpError, Session as RheaSession,
EventContext as RheaEventContext
} from "rhea";
import { Func, EmitParameters, emitEvent } from "./util/utils";
import { EventEmitter } from "events";
import { EventContext, OnAmqpEvent } from './eventContext';
import { OnAmqpEvent } from './eventContext';

@@ -28,6 +30,6 @@ /**

export class Session extends EventEmitter {
private _session: rhea.Session;
private _session: RheaSession;
private _connection: Connection;
constructor(connection: Connection, session: rhea.Session) {
constructor(connection: Connection, session: RheaSession) {
super();

@@ -88,3 +90,3 @@ this._connection = connection;

if (this._session) {
// Remove our listeners and listeners from rhea's session object.
// Remove our listeners and listeners from rhea's 'session' object.
this.removeAllListeners();

@@ -113,4 +115,4 @@ this._session.removeAllListeners();

if (this.isOpen()) {
let onError: Func<rhea.EventContext, void>;
let onClose: Func<rhea.EventContext, void>;
let onError: Func<RheaEventContext, void>;
let onClose: Func<RheaEventContext, void>;
let waitTimer: any;

@@ -124,12 +126,10 @@

onClose = (context: rhea.EventContext) => {
onClose = (context: RheaEventContext) => {
removeListeners();
setTimeout(() => {
log.session("[%s] Resolving the promise as the amqp session has been closed.",
this.connection.id);
resolve();
});
log.session("[%s] Resolving the promise as the amqp session has been closed.",
this.connection.id);
resolve();
};
onError = (context: rhea.EventContext) => {
onError = (context: RheaEventContext) => {
removeListeners();

@@ -183,2 +183,4 @@ log.error("[%s] Error occurred while closing amqp session.",

this.on(SessionEvents.sessionError, options.onSessionError);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
"while creating the 'receiver'.", this.connection.id, SessionEvents.sessionError);
}

@@ -188,7 +190,9 @@

this.on(SessionEvents.sessionClose, options.onSessionClose);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
" while creating the 'receiver'.", this.connection.id, SessionEvents.sessionClose);
}
const rheaReceiver = this._session.attach_receiver(options);
const receiver = new Receiver(this, rheaReceiver, options);
let onOpen: Func<rhea.EventContext, void>;
let onClose: Func<rhea.EventContext, void>;
let onOpen: Func<RheaEventContext, void>;
let onClose: Func<RheaEventContext, void>;
let waitTimer: any;

@@ -199,2 +203,4 @@

receiver.on(ReceiverEvents.receiverError, options!.onError!);
log.receiver("[%s] Added event handler for events: '%s', '%s', on rhea-promise 'receiver'.",
this.connection.id, ReceiverEvents.message, ReceiverEvents.receiverError);
}

@@ -204,2 +210,4 @@

receiver.on(ReceiverEvents.receiverClose, options.onClose);
log.receiver("[%s] Added event handler for event '%s' on rhea-promise 'receiver'.",
this.connection.id, ReceiverEvents.receiverClose);
}

@@ -213,12 +221,10 @@

onOpen = (context: rhea.EventContext) => {
onOpen = (context: RheaEventContext) => {
removeListeners();
setTimeout(() => {
log.session("[%s] Resolving the promise with amqp receiver '%s'.",
this.connection.id, receiver.name);
resolve(receiver);
});
log.receiver("[%s] Resolving the promise with amqp receiver '%s'.",
this.connection.id, receiver.name);
resolve(receiver);
};
onClose = (context: rhea.EventContext) => {
onClose = (context: RheaEventContext) => {
removeListeners();

@@ -258,2 +264,4 @@ log.error("[%s] Error occurred while creating a receiver over amqp connection: %O.",

this.on(SessionEvents.sessionError, options.onSessionError);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
"while creating the sender.", this.connection.id, SessionEvents.sessionError);
}

@@ -263,2 +271,4 @@

this.on(SessionEvents.sessionClose, options.onSessionClose);
log.session("[%s] Added event handler for event '%s' on rhea-promise 'session', " +
"while creating the sender.", this.connection.id, SessionEvents.sessionClose);
}

@@ -268,4 +278,4 @@

const sender = new Sender(this, rheaSender, options);
let onSendable: Func<rhea.EventContext, void>;
let onClose: Func<rhea.EventContext, void>;
let onSendable: Func<RheaEventContext, void>;
let onClose: Func<RheaEventContext, void>;
let waitTimer: any;

@@ -302,12 +312,10 @@

onSendable = (context: rhea.EventContext) => {
onSendable = (context: RheaEventContext) => {
removeListeners();
setTimeout(() => {
log.session("[%s] Resolving the promise with amqp sender '%s'.",
this.connection.id, sender.name);
resolve(sender);
});
log.sender("[%s] Resolving the promise with amqp sender '%s'.",
this.connection.id, sender.name);
resolve(sender);
};
onClose = (context: rhea.EventContext) => {
onClose = (context: RheaEventContext) => {
removeListeners();

@@ -344,3 +352,12 @@ log.error("[%s] Error occurred while creating a sender over amqp connection: %O.",

this._session.on(SessionEvents[eventName],
(context) => this.emit(SessionEvents[eventName], EventContext.translate(context, this)));
(context) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: SessionEvents[eventName],
emitterType: "session",
connectionId: this.connection.id
};
emitEvent(params);
});
}

@@ -350,13 +367,49 @@

// object, if they are not handled at their level. * denotes - Sender and Receiver.
// Sender
this._session.on(rhea.SenderEvents.senderError,
(context) => this.emit(rhea.SenderEvents.senderError, EventContext.translate(context, this)));
this._session.on(rhea.SenderEvents.senderClose,
(context) => this.emit(rhea.SenderEvents.senderClose, EventContext.translate(context, this)));
this._session.on(SenderEvents.senderError, (context) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: SenderEvents.senderError,
emitterType: "session",
connectionId: this.connection.id
};
emitEvent(params);
});
this._session.on(SenderEvents.senderClose, (context) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: SenderEvents.senderClose,
emitterType: "session",
connectionId: this.connection.id
};
emitEvent(params);
});
// Receiver
this._session.on(rhea.ReceiverEvents.receiverError,
(context) => this.emit(rhea.ReceiverEvents.receiverError, EventContext.translate(context, this)));
this._session.on(rhea.ReceiverEvents.receiverClose,
(context) => this.emit(rhea.ReceiverEvents.receiverClose, EventContext.translate(context, this)));
this._session.on(ReceiverEvents.receiverError, (context) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: ReceiverEvents.receiverError,
emitterType: "session",
connectionId: this.connection.id
};
emitEvent(params);
});
this._session.on(ReceiverEvents.receiverClose, (context) => {
const params: EmitParameters = {
rheaContext: context,
emitter: this,
eventName: ReceiverEvents.receiverClose,
emitterType: "session",
connectionId: this.connection.id
};
emitEvent(params);
});
log.eventHandler("[%s] rhea-promise 'session' object is listening for events: %o " +
"emitted by rhea's 'session' object.", this.connection.id, this._session.eventNames());
}
}
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache License. See License in the project root for license information.
import { EventContext as RheaEventContext } from "rhea";
import { Link } from "../link";
import { Session } from "../session";
import { Connection } from "../connection";
import * as log from "../log";
import { EventContext } from '../eventContext';
/**

@@ -152,1 +159,39 @@ * Defines a mapping for Http like response status codes for different status-code values

}
/**
* @ignore
* Describes the parameters to be provided to the function `emitEvent()`.
* @interface EmitParameters
*/
export interface EmitParameters {
rheaContext: RheaEventContext;
emitter: Link | Session | Connection;
eventName: string;
connectionId: string;
emitterType: "sender" | "receiver" | "session" | "connection";
}
/**
* @ignore
* Emits an event.
* @param params parameters needed to emit an event from one of the rhea-promise objects.
* @returns void
*/
export function emitEvent(params: EmitParameters): void {
const emit = () => {
log[params.emitterType]("[%s] %s got event: '%s'. Re-emitting the translated context.",
params.connectionId, params.emitterType, params.eventName);
params.emitter.emit(params.eventName,
EventContext.translate(params.rheaContext, params.emitter, params.eventName));
};
if (params.eventName.indexOf("error") !== -1) {
log[params.emitterType]("[%s] %s got event: '%s'. Will re-emit in the next tick.",
params.connectionId, params.emitterType, params.eventName);
// setTimeout() without any time is equivalent to process.nextTick() and works in node.js and
// browsers. We wait for a tick to emit error events in general. This should give enough
// time for promises to resolve on *_open (create) and *_close (close).
setTimeout(emit);
} else {
emit();
}
}
{
"name": "rhea-promise",
"version": "0.1.4",
"version": "0.1.5",
"description": "A Promisified layer over rhea AMQP client",

@@ -43,2 +43,2 @@ "license": "Apache-2.0",

}
}
}

@@ -19,4 +19,8 @@ # rhea-promise

- Getting debug logs from the library
- Getting debug logs from this library
```bash
export DEBUG=rhea-promise*
```
- Getting debug logs from this and the rhea library
```bash
export DEBUG=rhea*

@@ -26,3 +30,3 @@ ```

```bash
export DEBUG=rhea*,-rhea:raw,-rhea:message
export DEBUG=rhea*,-rhea:raw,-rhea:message,-rhea-promise:eventhandler,-rhea-promise:translate
```

@@ -29,0 +33,0 @@

@@ -12,4 +12,4 @@ /// <reference types="node" />

/**
* Describes the options that can be provided while creating an AMQP sender. One can also provide a
* session if it was already created.
* Describes the options that can be provided while creating an AMQP sender. One can also provide
* a session if it was already created.
* @interface SenderOptionsWithSession

@@ -21,4 +21,4 @@ */

/**
* Describes the options that can be provided while creating an AMQP receiver. One can also provide a
* session if it was already created.
* Describes the options that can be provided while creating an AMQP receiver. One can also provide
* a session if it was already created.
* @interface ReceiverOptionsWithSession

@@ -101,3 +101,4 @@ */

/**
* @property {Container} container The underlying Container instance on which the connection exists.
* @property {Container} container The underlying Container instance on which the connection
* exists.
*/

@@ -163,4 +164,4 @@ readonly container: Container;

* - **Resolves** the promise when rhea emits the "connection_close" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_error" event while trying
* to close an amqp connection.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_error" event while
* trying to close an amqp connection.
*/

@@ -203,4 +204,4 @@ close(): Promise<void>;

* - **Resolves** the promise with the Session object when rhea emits the "session_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_close" event while trying
* to create an amqp session.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_close" event while
* trying to create an amqp session.
*/

@@ -207,0 +208,0 @@ createSession(): Promise<Session>;

@@ -79,5 +79,8 @@ import { Connection } from "./connection";

* @param rheaContext The received context from rhea's event emitter
* @param emitter Teh rhea-promise equivalent object that is supposed emit the same event.
* @param emitter The rhea-promise equivalent object that is supposed emit the same event
* @param eventName The name of the event for which the context will be translated
*
* @returns EventContext The translated EventContext.
*/
function translate(rheaContext: RheaEventContext, emitter: Link | Session | Connection): EventContext;
function translate(rheaContext: RheaEventContext, emitter: Link | Session | Connection, eventName: string): EventContext;
}

@@ -32,1 +32,11 @@ import * as debugModule from "debug";

export declare const error: debugModule.IDebugger;
/**
* @ignore
* log statements for error
*/
export declare const eventHandler: debugModule.IDebugger;
/**
* @ignore
* log statements for error
*/
export declare const contextTranslator: debugModule.IDebugger;

@@ -1,4 +0,3 @@

import * as rhea from "rhea";
import { Session } from "./session";
import { ReceiverEvents } from "rhea";
import { ReceiverEvents, Receiver as RheaReceiver, ReceiverOptions as RheaReceiverOptions } from "rhea";
import { Link } from "./link";

@@ -10,3 +9,3 @@ import { OnAmqpEvent } from "./eventContext";

*/
export interface ReceiverOptions extends rhea.ReceiverOptions {
export interface ReceiverOptions extends RheaReceiverOptions {
/**

@@ -50,3 +49,3 @@ * @property {OnAmqpEvent} [onMessage] The handler that can be provided for receiving the

export declare class Receiver extends Link {
constructor(session: Session, receiver: rhea.Receiver, options?: ReceiverOptions);
constructor(session: Session, receiver: RheaReceiver, options?: ReceiverOptions);
readonly drain: boolean;

@@ -53,0 +52,0 @@ addCredit(credit: number): void;

/// <reference types="node" />
import * as rhea from "rhea";
import { Connection } from "./connection";
import { Receiver, ReceiverOptions } from "./receiver";
import { Sender, SenderOptions } from "./sender";
import { SessionEvents, AmqpError } from "rhea";
import { SessionEvents, AmqpError, Session as RheaSession } from "rhea";
import { EventEmitter } from "events";

@@ -23,3 +22,3 @@ import { OnAmqpEvent } from './eventContext';

private _connection;
constructor(connection: Connection, session: rhea.Session);
constructor(connection: Connection, session: RheaSession);
/**

@@ -26,0 +25,0 @@ * @property {Connection} connection The underlying AMQP connection.

@@ -0,1 +1,5 @@

import { EventContext as RheaEventContext } from "rhea";
import { Link } from "../link";
import { Session } from "../session";
import { Connection } from "../connection";
/**

@@ -110,1 +114,20 @@ * Defines a mapping for Http like response status codes for different status-code values

export declare function parseConnectionString<T>(connectionString: string, options?: ConnectionStringParseOptions): ParsedOutput<T>;
/**
* @ignore
* Describes the parameters to be provided to the function `emitEvent()`.
* @interface EmitParameters
*/
export interface EmitParameters {
rheaContext: RheaEventContext;
emitter: Link | Session | Connection;
eventName: string;
connectionId: string;
emitterType: "sender" | "receiver" | "session" | "connection";
}
/**
* @ignore
* Emits an event.
* @param params parameters needed to emit an event from one of the rhea-promise objects.
* @returns void
*/
export declare function emitEvent(params: EmitParameters): void;

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc