Product
Socket Now Supports uv.lock Files
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.
@azure/core-amqp
Advanced tools
Common library for amqp based azure sdks like @azure/event-hubs.
@azure/core-amqp is a library that provides core functionality for working with the Advanced Message Queuing Protocol (AMQP) in Azure services. It is primarily used as a building block for other Azure SDKs that need to communicate over AMQP, such as Azure Service Bus and Azure Event Hubs.
AMQP Connection Management
This feature allows you to establish and manage AMQP connections. The code sample demonstrates how to create a connection context and open a connection using a connection string.
const { ConnectionContext } = require('@azure/core-amqp');
async function createConnection() {
const connectionContext = ConnectionContext.create({
connectionString: 'your-connection-string',
options: {}
});
await connectionContext.connection.open();
console.log('Connection established');
return connectionContext;
}
createConnection().catch(console.error);
AMQP Sender and Receiver
This feature allows you to send and receive messages over AMQP. The code sample demonstrates how to create a sender to send a message and a receiver to receive messages from a specified queue.
const { ConnectionContext, Sender, Receiver } = require('@azure/core-amqp');
async function sendMessage() {
const connectionContext = await createConnection();
const sender = new Sender(connectionContext, 'queue-name');
await sender.send({ body: 'Hello, World!' });
console.log('Message sent');
}
async function receiveMessage() {
const connectionContext = await createConnection();
const receiver = new Receiver(connectionContext, 'queue-name');
receiver.on('message', (message) => {
console.log('Received message:', message.body);
});
}
sendMessage().catch(console.error);
receiveMessage().catch(console.error);
AMQP Error Handling
This feature provides mechanisms for handling errors that occur during AMQP operations. The code sample demonstrates how to handle errors when establishing a connection.
const { ConnectionContext } = require('@azure/core-amqp');
async function createConnectionWithErrorHandling() {
try {
const connectionContext = ConnectionContext.create({
connectionString: 'your-connection-string',
options: {}
});
await connectionContext.connection.open();
console.log('Connection established');
return connectionContext;
} catch (error) {
console.error('Error establishing connection:', error);
}
}
createConnectionWithErrorHandling().catch(console.error);
The 'amqp10' package is a pure JavaScript implementation of the AMQP 1.0 protocol. It provides similar functionalities for managing AMQP connections, sending, and receiving messages. However, it is not specifically tailored for Azure services and lacks some of the Azure-specific optimizations and integrations found in @azure/core-amqp.
The 'rhea' package is another AMQP 1.0 client library for Node.js. It offers a flexible and extensible API for working with AMQP connections and messaging. Like 'amqp10', it is a general-purpose library and does not include Azure-specific features or optimizations.
The 'amqplib' package is a client for the AMQP 0.9.1 protocol, commonly used with RabbitMQ. While it provides robust support for AMQP messaging, it is not compatible with AMQP 1.0 and therefore not suitable for use with Azure services that require AMQP 1.0.
Azure Core AMQP is a library that provides common functionality for Azure Javascript libraries that use AMQP protocol like the ones for Azure Service Bus and Azure Event Hubs.
Install this library using npm as follows:
npm install @azure/core-amqp
Note: rhea-promise
is a peer dependency. You need to explicitly install this library as a dependency
in your application.
Some of the key features of Azure Core AMQP library are:
Claims Based Authorization need to be done for every AMQP link that your application creates. The claims also has to be renewed periodically. For more details on CBS, please see the CBS Specification.
In the below examples, we use the Shared Key details present in the connection string to create a SAS token. This token is then used to make a request on the $cbs link to carry out Claims Based Authorization for a link to the given entity in Azure Service Bus or Azure Event Hubs.
The examples below expect a connection string to a Azure Service Bus or Azure Event Hubs instance. The entity path refers to an Event Hub name in case of Azure Event Hubs and a queue or a topic name in case of Azure Service Bus.
In the below example, we first create a ConnectionContext
which is used to carry out the claims
based authorization. Then, we create a sender link using the ConnectionContext.connection
to
send a message.
async function main() {
const connectionConfig = ConnectionConfig.create(
"your-connection-string-with-shared-key",
"entity-path"
);
const connectionContext = ConnectionContextBase.create({
config: connectionConfig,
connectionProperties: {
product: "product",
userAgent: "/user-agent",
version: "0.0.0"
}
});
// Carry out the Claims Based Authorization
await connectionContext.cbsSession.init();
const token = await connectionContext.tokenCredential.getToken(audience);
await connectionContext.cbsSession.negotiateClaim(audience, token, TokenType.CbsTokenTypeSas);
// Create a sender
const senderName = "your-sender-name";
const senderOptions = {
name: senderName,
target: {
// For an EventHub Sender bound to a partition, the address will be "<EventHubName>/Partitions/<PartitionId>"
address: `${connectionConfig.entityPath}`
},
onError: (context) => {
const senderError = context.sender && context.sender.error;
if (senderError) {
console.log("An error occurred for sender '%s': %O.", senderName, senderError);
}
},
onSessionError: (context) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log("An error occurred for session of sender '%s': %O.", senderName, sessionError);
}
}
};
const sender = await connectionContext.connection.createSender(senderOptions);
// Send a message
const delivery = await sender.send({ body: "your-message-body" });
await sender.close();
await connectionContext.connection.close();
}
main().catch((err) => console.log(err));
In the below example, we first create a ConnectionContext
which is used to carry out the claims
based authorization. Then, we create a receiver link using the ConnectionContext.connection
to
receive messages for 30 seconds.
async function main() {
const connectionConfig = ConnectionConfig.create(
"your-connection-string-with-shared-key",
"entity-path"
);
const connectionContext = ConnectionContextBase.create({
config: connectionConfig,
connectionProperties: {
product: "product",
userAgent: "/user-agent",
version: "0.0.0"
}
});
// Carry out the Claims Based Authorization
await connectionContext.cbsSession.init();
const token = await connectionContext.tokenCredential.getToken(audience);
await connectionContext.cbsSession.negotiateClaim(audience, token, TokenType.CbsTokenTypeSas);
// Create a receiver
const receiverName = "your-receiver-name";
const filterClause = `amqp.annotation.x-opt-enqueued-time > '${Date.now() - 3600 * 1000}'`; // Get messages from the past hour
const receiverAddress = `${connectionConfig.entityPath}/ConsumerGroups/$default/Partitions/0`; // For ServiceBus "<QueueName>"
const receiverOptions = {
name: receiverName,
source: {
address: receiverAddress,
filter: {
// May not be required for ServiceBus. The current example is for EventHubs.
"apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468c00000004)
}
},
onError: (context) => {
const receiverError = context.receiver && context.receiver.error;
if (receiverError) {
console.log("An error occurred for receiver '%s': %O.", receiverName, receiverError);
}
},
onMessage: (context) => {
console.log("Received message: %O", context.message);
},
onSessionError: (context) => {
const sessionError = context.session && context.session.error;
if (sessionError) {
console.log(
"An error occurred for session of receiver '%s': %O.",
receiverName,
sessionError
);
}
}
};
const receiver = await connectionContext.connection.createReceiver(receiverOptions);
// sleeping for 30 seconds to let the receiver receive messages
await new Promise((r) => setTimeout(r, 30000));
// Close the receiver to stop receiving messages
await receiver.close();
await connectionContext.connection.close();
}
main().catch((err) => console.log(err));
You can set the following environment variable to get the debug logs.
export DEBUG=azure:core-amqp*
export DEBUG=azure:core-amqp*,rhea*
DEBUG
environment variable as follows:export DEBUG=azure:core-amqp*,rhea*,-rhea:raw,-rhea:message,-azure:core-amqp:datatransformer
DEBUG
environment variable as follows:export DEBUG=azure-core-amqp:error,rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow
DEBUG
environment variable as shown above and then run your test script as follows:
out.log
and logging statement from the sdk go to debug.log
.
node your-test-script.js > out.log 2>debug.log
out.log
by redirecting stderr to stdout (&1), and then redirect stdout to a file:
node your-test-script.js >out.log 2>&1
out.log
.
node your-test-script.js &> out.log
Please take a look at the samples directory for detailed samples.
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
If you'd like to contribute to this library, please read the contributing guide to learn more about how to build and test the code.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
FAQs
Common library for amqp based azure sdks like @azure/event-hubs.
The npm package @azure/core-amqp receives a total of 121,348 weekly downloads. As such, @azure/core-amqp popularity was classified as popular.
We found that @azure/core-amqp demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.
Research
Security News
Socket researchers have discovered multiple malicious npm packages targeting Solana private keys, abusing Gmail to exfiltrate the data and drain Solana wallets.
Security News
PEP 770 proposes adding SBOM support to Python packages to improve transparency and catch hidden non-Python dependencies that security tools often miss.