Security News
GitHub Removes Malicious Pull Requests Targeting Open Source Repositories
GitHub removed 27 malicious pull requests attempting to inject harmful code across multiple open source repositories, in another round of low-effort attacks.
@azure/amqp-common
Advanced tools
Common library for amqp based azure sdks like @azure/event-hubs.
Library that provides common functionality for different Azure Javascript libraries using amqp protocol. Some of the common functionalities include:
ConnectionConfig
by parsing the connection-stringnpm install @azure/amqp-common
rhea-promise
is a peer dependency. You need to explicitly install this library as a dependency
in your application.This sdk has been developed in TypeScript and has good source code documentation. It is highly recommended to use vscode or any other IDE that provides better intellisense and exposes the full power of source code documentation.
This SDK houses core AMQP common related functionality in use bu Azure SDKs that use the protocol.
Please take a look at the samples directory for detailed samples. You can run the samples by cloning the repo or copy pasting the below sample in your sample.js file.
- git clone https://github.com/Azure/azure-sdk-for-js.git
- cd azure-sdk-for-js/sdk/core/amqp-common
- npm i
- npm i -g typescript #This is optional. However it is useful to have typescript installed globally on your box
- npm i -g ts-node #This is optional. However it is useful to have ts-node installed globally on your box
# Make sure to set the environment variables and then run
- ts-node ./samples/cbsAuth.ts
The samples below are generic for EventHubs and Servicebus. You can find EventHub specific samples, in the samples directory.
You can find more information about cbs authorization over here.
NOTE: The code block below has been later referred to as "./cbsAuth".
const { ConnectionContextBase, ConnectionConfig, CbsResponse } = require("@azure/amqp-common");
const dotenv = require("dotenv");
dotenv.config(); // Optional for loading environment configuration from a .env (config) file
export const str = process.env.CONNECTION_STRING || "";
export const path = process.env.ENTITY_PATH;
export const connectionConfig = ConnectionConfig.create(str, path);
const parameters = {
config: connectionConfig,
connectionProperties: {
product: "MSJSClient",
userAgent: "/js-amqp-common",
version: "0.1.0"
}
};
export const connectionContext = ConnectionContextBase.create(parameters);
/**
* audience The entity token audience in one of the following forms:
*
* - **ServiceBus**
* - **Sender**
* - `"sb://<yournamespace>.servicebus.windows.net/<queue-name>"`
* - `"sb://<yournamespace>.servicebus.windows.net/<topic-name>"`
*
* - **Receiver**
* - `"sb://<yournamespace>.servicebus.windows.net/<queue-name>"`
* - `"sb://<yournamespace>.servicebus.windows.net/<topic-name>"`
*
* - **ManagementClient**
* - `"sb://<your-namespace>.servicebus.windows.net/<queue-name>/$management"`.
* - `"sb://<your-namespace>.servicebus.windows.net/<topic-name>/$management"`.
*
* - **EventHubs**
* - **Sender**
* - `"sb://<yournamespace>.servicebus.windows.net/<hubName>"`
* - `"sb://<yournamespace>.servicebus.windows.net/<hubName>/Partitions/<partitionId>"`.
*
* - **Receiver**
* - `"sb://<your-namespace>.servicebus.windows.net/<event-hub-name>/ConsumerGroups/<consumer-group-name>/Partitions/<partition-id>"`.
*
* - **ManagementClient**
* - `"sb://<your-namespace>.servicebus.windows.net/<event-hub-name>/$management"`.
*/
export async function authenticate(audience, closeConnection = false) {
await connectionContext.cbsSession.init();
const tokenObject = await connectionContext.tokenProvider.getToken(audience);
const result = await connectionContext.cbsSession.negotiateClaim(audience, tokenObject);
console.log(`Result is: ${result}`);
if (closeConnection) {
await connectionContext.connection.close();
console.log("Successfully closed the connection.");
}
return result;
}
//Audience is for an EventHub or ServiceBus sender.
// You can uncomment the following line and just run this sample, if required.
// authenticate(`${config.endpoint}${path}`).catch((err) => console.log(err));
Building on the above mentioned cbs auth sample, after authentication, we can send a message to EventHub or ServiceBus.
const dotenv = require("dotenv");
dotenv.config(); // Optional for loading environment configuration from a .env (config) file
const { Sender, SenderOptions, EventContext, Message, Delivery } = require("rhea-promise");
const { authenticate, connectionContext, connectionConfig, path } = require("./cbsAuth");
async function main() {
await authenticate(`${connectionConfig.endpoint}${path}`);
const senderName = "sender-1";
const senderOptions = {
name: senderName,
target: {
// Address for EventHub Sender, it can be "<EventHubName>" or "<EventHubName>/Partitions/<PartitionId>"
// For ServiceBus Queue, it will be "<QueueName>"
address: `${path}`
},
onError: (context) => {
const senderError = context.sender && context.sender.error;
if (senderError) {
console.log(
">>>>> [%s] An error occurred for sender '%s': %O.",
connectionContext.connection.id,
senderName,
senderError
);
}
},
onSessionError: (context) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(
">>>>> [%s] An error occurred for session of sender '%s': %O.",
connectionContext.connection.id,
senderName,
sessionError
);
}
}
};
const sender = await connectionContext.connection.createSender(senderOptions);
const message = {
body: "Hello World!!",
message_id: "12343434343434"
};
const delivery = await sender.send(message);
console.log(">>>>>[%s] Delivery id: ", connectionContext.connection.id, delivery.id);
await sender.close();
await connectionContext.connection.close();
}
main().catch((err) => console.log(err));
Building on the auth sample, post authentication we can receive messages from an EventHub or ServiceBus.
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
const dotenv = require("dotenv");
dotenv.config(); // Optional for loading environment configuration from a .env (config) file
const {
Receiver,
ReceiverOptions,
EventContext,
ReceiverEvents,
delay,
types
} = require("rhea-promise");
const { authenticate, connectionContext, connectionConfig, path } = require("./cbsAuth");
async function main() {
await authenticate(`${connectionConfig.endpoint}${path}`);
const receiverName = "receiver-1";
const filterClause = `amqp.annotation.x-opt-enqueued-time > '${Date.now() - 3600 * 1000}'`; // Get messages from the past hour
const receiverAddress = `${path}/ConsumerGroups/$default/Partitions/0`; // For ServiceBus "<QueueName>"
const receiverOptions = {
name: receiverName,
source: {
address: receiverAddress,
filter: {
// May not be required for ServiceBus. The current example is for EventHubs.
"apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468c00000004)
}
},
onSessionError: (context) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(
">>>>> [%s] An error occurred for session of receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
sessionError
);
}
}
};
const receiver = await connectionContext.connection.createReceiver(receiverOptions);
receiver.on(ReceiverEvents.message, (context) => {
console.log("Received message: %O", context.message);
});
receiver.on(ReceiverEvents.receiverError, (context) => {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
console.log(
">>>>> [%s] An error occurred for receiver '%s': %O.",
connectionContext.connection.id,
receiverName,
receiverError
);
}
});
// sleeping for 2 mins to let the receiver receive messages and then closing it.
await delay(120000);
await receiver.close();
await connectionContext.connection.close();
}
main().catch((err) => console.log(err));
You can set the following environment variable to get the debug logs.
export DEBUG=azure:amqp-common*
export DEBUG=azure:amqp-common*,rhea*
DEBUG
environment variable as follows:export DEBUG=azure:amqp-common*,rhea*,-rhea:raw,-rhea:message,-azure:amqp-common:datatransformer
DEBUG
environment variable as follows:export DEBUG=azure-amqp-common:error,rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow
DEBUG
environment variable as shown above and then run your test script as follows:
out.log
and logging statement from the sdk go to debug.log
.
node your-test-script.js > out.log 2>debug.log
out.log
by redirecting stderr to stdout (&1), and then redirect stdout to a file:
node your-test-script.js >out.log 2>&1
out.log
.
node your-test-script.js &> out.log
Please take a look at Examples section above to use the package.
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
FAQs
Common library for amqp based azure sdks like @azure/event-hubs.
We found that @azure/amqp-common demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 6 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
GitHub removed 27 malicious pull requests attempting to inject harmful code across multiple open source repositories, in another round of low-effort attacks.
Security News
RubyGems.org has added a new "maintainer" role that allows for publishing new versions of gems. This new permission type is aimed at improving security for gem owners and the service overall.
Security News
Node.js will be enforcing stricter semver-major PR policies a month before major releases to enhance stability and ensure reliable release candidates.