Security News
Opengrep Emerges as Open Source Alternative Amid Semgrep Licensing Controversy
Opengrep forks Semgrep to preserve open source SAST in response to controversial licensing changes.
@azure/event-hubs
Advanced tools
Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This lets you process and analyze the massive amounts of data produced by your connected devices and applications. If you would like to know more about Azure Event Hubs, you may wish to review: What is Event Hubs?
The Azure Event Hubs client library allows you to send and receive events in your Node.js application.
Source code | Package (npm) | API Reference Documentation | Product documentation | Samples
NOTE: If you are using version 2.1.0 or lower, then please use the below links instead
Source code for v2.1.0 | Package for v2.1.0 (npm) | Samples for v2.1.0
Install the Azure Event Hubs client library using npm
npm install @azure/event-hubs
Prerequisites: You must have an Azure subscription and a Event Hubs Namespace to use this package. If you are using this package in a Node.js application, then use Node.js 8.x or higher.
TypeScript users need to have Node type definitions installed:
npm install @types/node
You also need to enable compilerOptions.allowSyntheticDefaultImports
in your tsconfig.json. Note that if you have enabled compilerOptions.esModuleInterop
, allowSyntheticDefaultImports
is enabled by default. See TypeScript's compiler options handbook for more information.
Interaction with Event Hubs starts with either an instance of the EventHubConsumerClient class or an instance of the EventHubProducerClient class. There are constructor overloads to support different ways of instantiating these classes as shown below:
const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");
const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"my-connection-string",
"my-event-hub"
);
Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;
and entity name to your Event Hub instance. You can create a consumer group, get the connection string as well as the entity name from the Azure portal.const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");
const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"my-connection-string-with-entity-path"
);
;EntityPath=[[ EVENT HUB NAME ]]
to the end of the connection string. For example, ;EntityPath=my-event-hub-name
.If you have defined a shared access policy directly on the Event Hub itself, then copying the connection string from that Event Hub will result in a connection string that contains the path.
const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");
const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"my-host-name",
"my-event-hub",
credential
);
TokenCredential
interface available in the @azure/identity package. The host name is of the format <yournamespace>.servicebus.windows.net
. When using Azure Active Directory, your principal must be assigned a role which allows access to Event Hubs, such as the Azure Event Hubs Data Owner role. For more information about using Azure Active Directory authorization with Event Hubs, please refer to the associated documentation.An Event Hub client is the primary interface for developers interacting with the Event Hubs client library, allowing for inspection of Event Hub metadata and providing a guided experience towards specific Event Hub operations such as the creation of producers and consumers.
An Event Hub producer is a source of telemetry data, diagnostics information, usage logs, or other log data, as part of an embedded device solution, a mobile device application, a game title running on a console or other device, some client or server based business solution, or a web site.
An Event Hub consumer picks up such information from the Event Hub and processes it. Processing may involve aggregation, complex computation and filtering. Processing may also involve distribution or storage of the information in a raw or transformed fashion. Event Hub consumers are often robust and high-scale platform infrastructure parts with built-in analytics capabilities, like Azure Stream Analytics, Apache Spark, or Apache Storm.
A partition is an ordered sequence of events that is held in an Event Hub. Partitions are a means of data organization associated with the parallelism required by event consumers. Azure Event Hubs provides message streaming through a partitioned consumer pattern in which each consumer only reads a specific subset, or partition, of the message stream. As newer events arrive, they are added to the end of this sequence. The number of partitions is specified at the time an Event Hub is created and cannot be changed.
A consumer group is a view of an entire Event Hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and from their own position. There can be at most 5 concurrent readers on a partition per consumer group; however it is recommended that there is only one active consumer for a given partition and consumer group pairing. Each active reader receives all of the events from its partition; If there are multiple readers on the same partition, then they will receive duplicate events.
For more concepts and deeper discussion, see: Event Hubs Features
The EventHubConsumerClient
and EventHubProducerClient
accept options
where you can set the retryOptions
that allow you to tune how the SDK handles transient errors.
Examples of transient errors include temporary network or service issues.
If a transient error (e.g. a temporary network issue) is encountered while the SDK is receiving events,
it will retry receiving events based on the retry options passed into the EventHubConsumerClient
.
If the maximum retry attempts are exhausted, the processError
function will be invoked.
You can use the retry settings to control how quickly you are informed about temporary issues such as a
network connection issue.
For example, if you need to know when there is a network issue right away you can lower the
values for maxRetries
and retryDelayInMs
.
After executing the processError
function, the client invokes the user-provided processClose
function.
This function is also invoked when either you stop the subscription or when the client stops reading
events from the current partition due to it being picked up by another instance of your application
as part of load balancing.
The processClose
function provides an opportunity to update checkpoints if needed.
After executing processClose
, the client (or in the case of load balancing,
a client from another instance of you application) will invoke the user-provided
processInitialize
function to resume reading events from the last updated checkpoint for the same partition.
If you wish to stop attempting to read events, you must call close()
on the subscription
returned
by the subscribe
method.
The following sections provide code snippets that cover some of the common tasks using Azure Event Hubs
Many Event Hub operations take place within the scope of a specific partition.
Because partitions are owned by the Event Hub, their names are assigned at the time of creation.
To understand what partitions are available, you query the Event Hub using either of the two clients available: EventHubProducerClient
or EventHubConsumerClient
In the below example, we are using an EventHubProducerClient
.
const { EventHubProducerClient } = require("@azure/event-hubs");
async function main() {
const client = new EventHubProducerClient("connectionString", "eventHubName");
const partitionIds = await client.getPartitionIds();
await client.close();
}
main();
In order to publish events, you'll need to create an EventHubProducerClient
. While the below example shows one way to create the client, see the
Authenticate the client section to learn other ways to instantiate the client.
You may publish events to a specific partition, or allow the Event Hubs service to decide which partition events should be published to. It is recommended to use automatic routing when the publishing of events needs to be highly available or when event data should be distributed evenly among the partitions. In the example below, we will take advantage of automatic routing.
EventDataBatch
object using the createBatchfalse
to indicate that no more events can be added to the batch due to the max batch size being reached.In the below example, we attempt to send 10 events to Azure Event Hubs.
const { EventHubProducerClient } = require("@azure/event-hubs");
async function main() {
const producerClient = new EventHubProducerClient("connectionString", "eventHubName");
const eventDataBatch = await producerClient.createBatch();
let numberOfEventsToSend = 10;
while (numberOfEventsToSend > 0) {
let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
if (!wasAdded) {
break;
}
numberOfEventsToSend--;
}
await producerClient.sendBatch(eventDataBatch);
await producerClient.close();
}
main();
There are options you can pass at different stages to control the process of sending events to Azure Event Hubs.
EventHubProducerClient
constructor takes an optional parameter of type EventHubClientOptions
which you can use to specify options like number of retries.createBatch
method takes an optional parameter of type CreateBatchOptions
which you can use to speicify the max batch size supported by the batch being created.sendBatch
method takes an optional parameter of type SendBatchOptions
which you can use to specify abortSignal
to cancel current operation.sendBatch
method allows you to pass the id of the partition to send events to.
The Inspect an Event Hub example above shows how to fetch the available partitions ids.Note: When working with Azure Stream Analytics, the body of the event being sent should be a JSON object as well.
For example: body: { "message": "Hello World" }
To consume events from an Event Hub instance, you also need to know which consumer group you want to target. Once you know this, you are ready to create an EventHubConsumerClient. While the below example shows one way to create the client, see the Authenticate the client section to learn other ways to instantiate the client.
The subscribe
method on the client has overloads which, combined with the constructor, can cater to several
ways to consume events:
The subscribe
method takes an optional parameter of type SubscriptionOptions
which you can use to specify options like the maxBatchSize (number of events to wait for) and maxWaitTimeInSeconds (amount of time to wait for maxBatchSize events to arrive).
Begin by creating an instance of the EventHubConsumerClient
, and then call the subscribe()
method on it to start
consuming events.
The subscribe
method takes callbacks to process events as they are received from Azure Event Hubs.
To stop receiving events, you can call close()
on the object returned by the subscribe()
method.
const { EventHubConsumerClient } = require("@azure/event-hubs");
async function main() {
const client = new EventHubConsumerClient(
"my-consumer-group",
"connectionString",
"eventHubName"
);
const subscription = client.subscribe({
processEvents: (events, context) => {
// event processing code goes here
},
processError: (err, context) => {
// error reporting/handling code here
}
});
// When ready to stop receiving
await subscription.close();
await client.close();
}
main();
Azure Event Hubs is capable of dealing with millions of events per second. To scale your processing application, you can run multiple instances of your application and have it balance the load among themselves.
Begin by creating an instance of the EventHubConsumerClient
using one of the
constructor overloads that take a CheckpointStore
, and then call the subscribe()
method to start consuming events. The checkpoint store will enable the subscribers
within a consumer group to coordinate the processing between multiple instances
of your application.
In this example, we will use the BlobCheckpointStore
from the @azure/eventhubs-checkpointstore-blob
package
which implements the required read/writes to a durable store by using Azure Blob Storage.
The subscribe
method takes callbacks to process events as they are received from Azure Event Hubs.
To stop receiving events, you can call close()
on the object returned by the subscribe()
method.
const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");
async function main() {
const blobContainerClient = new ContainerClient("storage-connection-string", "container-name");
await blobContainerClient.create(); // This can be skipped if the container already exists
const checkpointStore = new BlobCheckpointStore(blobContainerClient);
const consumerClient = new EventHubConsumerClient(
"my-consumer-group",
"connectionString",
"eventHubName",
checkpointStore
);
const subscription = consumerClient.subscribe({
processEvents: (events, context) => {
// event processing code goes here
},
processError: (err, context) => {
// error reporting/handling code here
}
});
// When ready to stop receiving
await subscription.close();
await consumerClient.close();
}
main();
Begin by creating an instance of the EventHubConsumerClient
, and then call the subscribe()
method on it to start
consuming events. Pass the id of the partition you want to target to the subscribe()
method to consume only from that partition.
In the below example, we are using the first partition.
The subscribe
method takes callbacks to process events as they are received from Azure Event Hubs.
To stop receiving events, you can call close()
on the object returned by the subscribe()
method.
const { EventHubConsumerClient } = require("@azure/event-hubs");
async function main() {
const client = new EventHubConsumerClient(
"my-consumer-group",
"connectionString",
"eventHubName"
);
const partitionIds = await client.getPartitionIds();
const subscription = client.subscribe(partitionIds[0], {
processEvents: (events, context) => {
// event processing code goes here
},
processError: (err, context) => {
// error reporting/handling code here
}
});
// When ready to stop receiving
await subscription.close();
await client.close();
}
main();
You can use EventHubConsumerClient
to work with IotHub as well. This is useful for receiving telemetry data of IotHub from the linked EventHub.
The associated connection string will not have send claims,
hence sending events is not possible.
const { EventHubConsumerClient } = require("@azure/event-hubs");
async function main() {
const client = new EventHubConsumerClient(
"my-consumer-group",
"Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
);
await client.getEventHubProperties();
// retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
const partitionId = "0";
await client.getPartitionProperties(partitionId);
await client.close();
}
main();
The Event Hubs library depends on the rhea-promise library for managing connections, sending and receiving events over the AMQP protocol.
You can set the AZURE_LOG_LEVEL
environment variable to one of the following values to enable logging to stderr
:
You can also set the log level programatically by importing the
@azure/logger package and calling the
setLogLevel
function with one of the log level values.
When setting a log level either programatically or via the AZURE_LOG_LEVEL
environment variable,
any logs that are written using a log level equal to or less than the one you choose will be emitted.
For example, when you set the log level to info
, the logs that are written for levels
warning
and error
are also emitted.
This SDK follows the Azure SDK for TypeScript guidelines
when determining which level to log to.
You can alternatively set the DEBUG
environment variable to get logs when using this library.
This can be useful if you also want to emit logs from the dependencies rhea-promise
and rhea
as well.
Note: AZURE_LOG_LEVEL, if set, takes precedence over DEBUG.
Do not specify any azure
libraries via DEBUG when also specifying
AZURE_LOG_LEVEL or calling setLogLevel.
export DEBUG=azure:*:info
export DEBUG=azure*,rhea*
DEBUG
environment variable as follows:export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
DEBUG
environment variable as follows:export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow
Enable logging as shown above and then run your test script as follows:
Logging statements from your test script go to out.log
and logging statements from the sdk go to debug.log
.
node your-test-script.js > out.log 2>debug.log
Logging statements from your test script and the sdk go to the same file out.log
by redirecting stderr to stdout (&1), and then redirect stdout to a file:
node your-test-script.js >out.log 2>&1
Logging statements from your test script and the sdk go to the same file out.log
.
node your-test-script.js &> out.log
Please take a look at the samples directory for detailed examples of how to use this library to send and receive events to/from Event Hubs.
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
If you'd like to contribute to this library, please read the contributing guide to learn more about how to build and test the code.
FAQs
Azure Event Hubs SDK for JS.
The npm package @azure/event-hubs receives a total of 48,630 weekly downloads. As such, @azure/event-hubs popularity was classified as popular.
We found that @azure/event-hubs demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 5 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Opengrep forks Semgrep to preserve open source SAST in response to controversial licensing changes.
Security News
Critics call the Node.js EOL CVE a misuse of the system, sparking debate over CVE standards and the growing noise in vulnerability databases.
Security News
cURL and Go security teams are publicly rejecting CVSS as flawed for assessing vulnerabilities and are calling for more accurate, context-aware approaches.