Azure Event Hubs client library for Javascript
Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This lets you process and analyze the massive amounts of data produced by your connected devices and applications. If you would like to know more about Azure Event Hubs, you may wish to review: What is Event Hubs?
The Azure Event Hubs client library allows you to send and receive events in your Node.js application.
Source code | Package (npm) | API Reference Documentation | Product documentation | Samples
NOTE: If you are using version 2.1.0 or lower, then please use the below links instead
Source code for v2.1.0 | Package for v2.1.0 (npm) | Samples for v2.1.0
Getting Started
Install the package
Install the Azure Event Hubs client library using npm
npm install @azure/event-hubs@5.0.0-preview.2
Prerequisites: You must have an Azure subscription and a
Event Hubs Namespace to use this package.
If you are using this package in a Node.js application, then use Node.js 8.x or higher.
Configure Typescript
TypeScript users need to have Node type definitions installed:
npm install @types/node
You also need to enable compilerOptions.allowSyntheticDefaultImports
in your tsconfig.json. Note that if you have enabled compilerOptions.esModuleInterop
, allowSyntheticDefaultImports
is enabled by default. See TypeScript's compiler options handbook for more information.
Key concepts
-
An Event Hub client is the primary interface for developers interacting with the Event Hubs client library, allowing for inspection of Event Hub metadata and providing a guided experience towards specific Event Hub operations such as the creation of producers and consumers.
-
An Event Hub producer is a source of telemetry data, diagnostics information, usage logs, or other log data, as part of an embedded device solution, a mobile device application, a game title running on a console or other device, some client or server based business solution, or a web site.
-
An Event Hub consumer picks up such information from the Event Hub and processes it. Processing may involve aggregation, complex computation and filtering. Processing may also involve distribution or storage of the information in a raw or transformed fashion. Event Hub consumers are often robust and high-scale platform infrastructure parts with built-in analytics capabilities, like Azure Stream Analytics, Apache Spark, or Apache Storm.
-
A partition is an ordered sequence of events that is held in an Event Hub. Partitions are a means of data organization associated with the parallelism required by event consumers. Azure Event Hubs provides message streaming through a partitioned consumer pattern in which each consumer only reads a specific subset, or partition, of the message stream. As newer events arrive, they are added to the end of this sequence. The number of partitions is specified at the time an Event Hub is created and cannot be changed.
-
A consumer group is a view of an entire Event Hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and from their own position. There can be at most 5 concurrent readers on a partition per consumer group; however it is recommended that there is only one active consumer for a given partition and consumer group pairing. Each active reader receives all of the events from its partition; If there are multiple readers on the same partition, then they will receive duplicate events.
For more concepts and deeper discussion, see: Event Hubs Features
Authenticate the client
Interaction with Event Hubs starts with an instance of the EventHubClient class. You can instantiate
this class using one of the below
const client = new EventHubClient("my-connection-string", "my-event-hub");
- This constructor takes the connection string of the form 'Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;' and entity name to your Event Hub instance. You can get the connection string from the Azure portal.
const client = new EventHubClient("my-connection-string-with-entity-path");
- The connection string from the Azure Portal is for the entire Event Hubs namespace and will not contain the path to the desired Event Hub instance which is needed for this constructor overload. In this case, the path can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]]" to the end of the connection string. For example, ";EntityPath=my-event-hub-name".
If you have defined a shared access policy directly on the Event Hub itself, then copying the connection string from that Event Hub will result in a connection string that contains the path.
const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const client = new EventHubClient("my-host-name", "my-event-hub", credential);
- This constructor takes the host name and entity name of your Event Hub instance and credential that implements the TokenCredential interface. There are implementations of the
TokenCredential
interface available in the @azure/identity package. The host name is of the format <yournamespace>.servicebus.windows.net
.
Examples
The following sections provide code snippets that cover some of the common tasks using Azure Event Hubs
Inspect an Event Hub
Many Event Hub operations take place within the scope of a specific partition.
Because partitions are owned by the Event Hub, their names are assigned at the time of creation.
To understand what partitions are available, you query the Event Hub using the client.
const client = new EventHubCLient("connectionString", "eventHubName");
const partitionIds = await client.getPartitionIds();
Publish events to an Event Hub
In order to publish events, you'll need to create an EventHubProducer
. Producers may be dedicated to a specific partition, or allow the Event Hubs service to decide which partition events should be published to. It is recommended to use automatic routing when the publishing of events needs to be highly available or when event data should be distributed evenly among the partitions. In the below examples, we will take advantage of automatic routing.
Send a single event or an array of events
Use the send method to send a single event or multiple events using a single call.
const client = new EventHubClient("connectionString", "eventHubName");
const producer = client.createProducer();
await producer.send({ body: "my-event-body" });
await producer.send([{ body: "foo" }, { body: "bar" }]);
Send a batch of events
Use the createBatch method to create
an EventDataBatch
object which can then be sent using the send method.
Events may be added to the EventDataBatch
using the tryAdd
method until the maximum batch size limit in bytes has been reached.
const client = new EventHubClient("connectionString", "eventHubName");
const producer = client.createProducer();
const eventDataBatch = await producer.createBatch();
let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
wasAdded = eventDataBatch.tryAdd({ body: "my-event-body-2" });
await producer.send(eventDataBatch);
The Inspect an Event Hub example shows how to get the list of partition ids should you wish to specify one for a producer.
The createProducer
method takes an optional parameter of type EventHubProducerOptions which you can use to specify the retry options and partition id for the send operation.
The send
method takes an optional parameter of type SendOptions which you can use to specify abortSignal
to cancel current operation.
You can also specify partitionKey
if you did not specify a partition id when creating the producer.
All events that use the same partition key will be sent to the same partition.
Note: When working with Azure Stream Analytics, the body of the event being sent should be a JSON object as well.
For example: body: { "message": "Hello World" }
Consume events from an Event Hub partition
To consume events from a single Event Hub partition in a consumer group, create an EventHubConsumer
for that partition and consumer group combination. You will need to provide a position in the event stream from where to begin receiving events; in our example, we will read new events as they are published.
const client = new EventHubClient("connectionString", "eventHubName");
const consumer = client.createConsumer(
EventHubClient.defaultConsumerGroupName,
partitionIds[0],
EventPosition.latest()
);
The Inspect an Event Hub example shows how to get the list of partition ids.
The createConsumer
method takes an optional parameter of type EventHubConsumerOptions which you can use to specify the ownerLevel, the level that this consumer is currently using for partition ownership. If another consumer is currently active for the same partition with no or lower level, then it will get disconnected. If another consumer is currently active with a higher level, then this consumer will fail to connect. You can also specify retryOptions for the receive operation on the consumer.
You can use this consumer in one of 3 ways to receive events:
Get an array of events
Use the receiveBatch function which returns a promise that resolves to an array of events.
This function takes an optional parameter called abortSignal
to cancel current operation.
const maxMessageCount = 10;
const myEvents = await consumer.receiveBatch(maxMessageCount);
Register event handler
Use the receive to set up event handlers and have it running as long as you need.
This function takes an optional parameter called abortSignal
to cancel current operation.
const myEventHandler = (event) => {
};
const myErrorHandler = (error) => {
};
const receiveHandler = consumer.receive(myEventHandler, myErrorHandler);
await receiveHandler.stop();
Use async iterator
Use the getMessageIterator to get an async iterator over events.
This function takes an optional EventIteratorOptions parameter that includes abortSignal
to cancel the current operation.
for await (const events of consumer.getEventIterator()){
}
Consume events using an Event Processor
Using an EventHubConsumer
to consume events like in the previous examples puts the responsibility of storing the checkpoints (the last processed event) on the user. Checkpoints are important for restarting the task of processing events from the right position in a partition. Ideally, you would also want to run multiple programs targeting different partitions with some load balancing.
This is where an EventProcessor can help.
The EventProcessor
will delegate the processing of events to a PartitionProcessor
that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer
operations including checkpointing and load balancing.
While load balancing is a feature we will be adding in the next update, you can see how to use the EventProcessor
in the below
example, where we use an InMemoryPartitionManager that does checkpointing in memory.
class SimplePartitionProcessor {
async initialize() {
}
async processEvents(events) {
}
async processError(error) {
}
async close(reason) {
}
}
const client = new EventHubClient("my-connection-string", "my-event-hub");
const processor = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
(partitionContext, checkpointManager) => new SimplePartitionProcessor(),
new InMemoryPartitionManager()
);
await processor.start();
await delay(5000);
await processor.stop();
To control the number of events passed to processEvents, use the options argument in the EventProcessor constructor.
Note: In this model, you are responsible for closing the EventHubClient
instance to dispose it.
Use EventHubClient to work with IotHub
You can use EventHubClient
to work with IotHub as well. This is useful for receiving telemetry data of IotHub from the linked EventHub.
Most likely the associated connection string will not have send claims. Hence getting HubRuntimeInfo or PartitionRuntimeInfo and receiving events would be the possible operations.
- Please notice that we are awaiting on the createFromIotHubConnectionString method to get an instance of the EventHubClient. This is different from other static methods on the client. The method talks to the IotHub endpoint to get a redirect error which contains the EventHub endpoint to talk to. It then constructs the right EventHub connection string based on the information in the redirect error and returns an instance of the EventHubClient that you can play with.
const client = await EventHubClient.createFromIotHubConnectionString("connectionString");
await client.getProperties();
await client.getPartitionProperties("partitionId");
Notes: For scalable and efficient receiving, please take a look at azure-event-processor-host. The Event Processor host, internally uses the streaming receiver to receive events.
Troubleshooting
AMQP Dependencies
The Event Hubs library depends on the rhea-promise library for managing connections, sending and receiving events over the AMQP protocol.
Enable logs
You can set the following environment variable to get the debug logs when using this library.
- Getting debug logs from the Event Hubs SDK
export DEBUG=azure*
- Getting debug logs from the Event Hubs SDK and the protocol level library.
export DEBUG=azure*,rhea*
- If you are not interested in viewing the event transformation (which consumes lot of console/disk space) then you can set the
DEBUG
environment variable as follows:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message,-azure:amqp-common:datatransformer
- If you are interested only in errors, then you can set the
DEBUG
environment variable as follows:
export DEBUG=azure:event-hubs:error,azure-amqp-common:error,rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow
Logging to a file
Next Steps
Please take a look at the samples
directory for detailed examples on how to use this library to send and receive events to/from
Event Hubs.
Contributing
If you'd like to contribute to this library, please read the contributing guide to learn more about how to build and test the code.