Azure Event Hubs client library for Javascript
Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This lets you process and analyze the massive amounts of data produced by your connected devices and applications. If you would like to know more about Azure Event Hubs, you may wish to review: What is Event Hubs?
The Azure Event Hubs client library allows you to send and receive events in your Node.js application.
Source code | Package (npm) | API Reference Documentation | Product documentation | Samples
NOTE: If you are using version 2.1.0 or lower, then please use the below links instead
Source code for v2.1.0 | Package for v2.1.0 (npm) | Samples for v2.1.0
Getting Started
Install the package
Install the Azure Event Hubs client library using npm
npm install @azure/event-hubs@next
Prerequisites: You must have an Azure subscription and a
Event Hubs Namespace to use this package.
If you are using this package in a Node.js application, then use Node.js 8.x or higher.
Configure Typescript
TypeScript users need to have Node type definitions installed:
npm install @types/node
You also need to enable compilerOptions.allowSyntheticDefaultImports
in your tsconfig.json. Note that if you have enabled compilerOptions.esModuleInterop
, allowSyntheticDefaultImports
is enabled by default. See TypeScript's compiler options handbook for more information.
Key concepts
-
An Event Hub client is the primary interface for developers interacting with the Event Hubs client library, allowing for inspection of Event Hub metadata and providing a guided experience towards specific Event Hub operations such as the creation of producers and consumers.
-
An Event Hub producer is a source of telemetry data, diagnostics information, usage logs, or other log data, as part of an embedded device solution, a mobile device application, a game title running on a console or other device, some client or server based business solution, or a web site.
-
An Event Hub consumer picks up such information from the Event Hub and processes it. Processing may involve aggregation, complex computation and filtering. Processing may also involve distribution or storage of the information in a raw or transformed fashion. Event Hub consumers are often robust and high-scale platform infrastructure parts with built-in analytics capabilities, like Azure Stream Analytics, Apache Spark, or Apache Storm.
-
A partition is an ordered sequence of events that is held in an Event Hub. Partitions are a means of data organization associated with the parallelism required by event consumers. Azure Event Hubs provides message streaming through a partitioned consumer pattern in which each consumer only reads a specific subset, or partition, of the message stream. As newer events arrive, they are added to the end of this sequence. The number of partitions is specified at the time an Event Hub is created and cannot be changed.
-
A consumer group is a view of an entire Event Hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and from their own position. There can be at most 5 concurrent readers on a partition per consumer group; however it is recommended that there is only one active consumer for a given partition and consumer group pairing. Each active reader receives all of the events from its partition; If there are multiple readers on the same partition, then they will receive duplicate events.
For more concepts and deeper discussion, see: Event Hubs Features
Authenticate the client
Interaction with Event Hubs starts with an instance of the EventHubClient class. You can instantiate
this class using one of the below
const client = new EventHubClient("my-connection-string", "my-event-hub");
- This constructor takes the connection string of the form 'Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;' and entity name to your Event Hub instance. You can get the connection string from the Azure portal.
const client = new EventHubClient("my-connection-string-with-entity-path");
- The connection string from the Azure Portal is for the entire Event Hubs namespace and will not contain the path to the desired Event Hub instance which is needed for this constructor overload. In this case, the path can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]]" to the end of the connection string. For example, ";EntityPath=my-event-hub-name".
If you have defined a shared access policy directly on the Event Hub itself, then copying the connection string from that Event Hub will result in a connection string that contains the path.
const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const client = new EventHubClient("my-host-name", "my-event-hub", credential);
- This constructor takes the host name and entity name of your Event Hub instance and credential that implements the TokenCredential interface. There are implementations of the
TokenCredential
interface available in the @azure/identity package. The host name is of the format <yournamespace>.servicebus.windows.net
.
Examples
The following sections provide code snippets that cover some of the common tasks using Azure Event Hubs
Inspect an Event Hub
Many Event Hub operations take place within the scope of a specific partition.
Because partitions are owned by the Event Hub, their names are assigned at the time of creation.
To understand what partitions are available, you query the Event Hub using the client.
const client = new EventHubCLient("connectionString", "eventHubName");
const partitionIds = await client.getPartitionIds();
Publish events to an Event Hub
In order to publish events, you'll need to create an EventHubProducer
. Producers may be dedicated to a specific partition, or allow the Event Hubs service to decide which partition events should be published to. It is recommended to use automatic routing when the publishing of events needs to be highly available or when event data should be distributed evenly among the partitions. In the below examples, we will take advantage of automatic routing.
Send a single event or an array of events
Use the send method to send a single event or multiple events using a single call.
const client = new EventHubClient("connectionString", "eventHubName");
const producer = client.createProducer();
await producer.send({ body: "my-event-body" });
await producer.send([{ body: "foo" }, { body: "bar" }]);
Send a batch of events
Use the createBatch method to create
an EventDataBatch
object which can then be sent using the send method.
Events may be added to the EventDataBatch
using the tryAdd
method until the maximum batch size limit in bytes has been reached.
const client = new EventHubClient("connectionString", "eventHubName");
const producer = client.createProducer();
const eventDataBatch = await producer.createBatch();
let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
wasAdded = eventDataBatch.tryAdd({ body: "my-event-body-2" });
await producer.send(eventDataBatch);
The Inspect an Event Hub example shows how to get the list of partition ids should you wish to specify one for a producer.
The createProducer
method takes an optional parameter of type EventHubProducerOptions which you can use to specify the retry options and partition id for the send operation.
The send
method takes an optional parameter of type SendOptions which you can use to specify abortSignal
to cancel current operation.
You can also specify partitionKey
if you did not specify a partition id when creating the producer.
All events that use the same partition key will be sent to the same partition.
Note: When working with Azure Stream Analytics, the body of the event being sent should be a JSON object as well.
For example: body: { "message": "Hello World" }
Consume events from an Event Hub partition
To consume events from a single Event Hub partition in a consumer group, create an EventHubConsumer
for that partition and consumer group combination. You will need to provide a position in the event stream from where to begin receiving events; in our example, we will read new events as they are published.
const client = new EventHubClient("connectionString", "eventHubName");
const consumer = client.createConsumer(
EventHubClient.defaultConsumerGroupName,
partitionIds[0],
EventPosition.latest()
);
The Inspect an Event Hub example shows how to get the list of partition ids.
The createConsumer
method takes an optional parameter of type EventHubConsumerOptions which you can use to specify the ownerLevel, the level that this consumer is currently using for partition ownership. If another consumer is currently active for the same partition with no or lower level, then it will get disconnected. If another consumer is currently active with a higher level, then this consumer will fail to connect. You can also specify retryOptions for the receive operation on the consumer.
You can use this consumer in one of 3 ways to receive events:
Get an array of events
Use the receiveBatch function which returns a promise that resolves to an array of events.
This function takes an optional parameter called abortSignal
to cancel current operation.
const maxMessageCount = 10;
const myEvents = await consumer.receiveBatch(maxMessageCount);
Register event handler
Use the receive to set up event handlers and have it running as long as you need.
This function takes an optional parameter called abortSignal
to cancel current operation.
const myEventHandler = (event) => {
};
const myErrorHandler = (error) => {
};
const receiveHandler = consumer.receive(myEventHandler, myErrorHandler);
await receiveHandler.stop();
Use async iterator
Use the getMessageIterator to get an async iterator over events.
This function takes an optional EventIteratorOptions parameter that includes abortSignal
to cancel the current operation.
for await (const events of consumer.getEventIterator()){
}
Consume events using an Event Processor
EventProcessor is a high level construct which internally uses the EventHubConsumer
which is mentioned in previous examples
to receive events from multiple partitions at once.
Typically, Event Processor based applications consist of one or more instances of EventProcessor instances which have been
configured to consume events from the same Event Hub and consumer group. They balance the
workload across different instances by distributing the partitions to be processed among themselves.
They also allow the user to track progress when events are processed using checkpoints.
The EventProcessor
will delegate the processing of events to a PartitionProcessor
that you provide, allowing you to focus on business logic while the EventProcessor
holds responsibility for managing the underlying consumer
operations including checkpointing and load balancing.
A checkpoint is meant to represent the last successfully processed event by the user from a particular
partition of a consumer group in an Event Hub instance. The EventProcessor
uses an instance of PartitionManager
to update checkpoints and to store the relevant information required by the load balancing algorithm.
While for the purposes of getting started you can use the InMemoryPartitionManager that is shipped out of the box from this library,
it is recommended to use a peristent store when running in production.
Search npm with the prefix @azure/eventhubs-checkpointstore-
to find packages that support this and use the PartitionManager
implementation from one such package.
In the below example, we create two instances of EventProcessor against the same Event Hub and consumer group,
using an InMemoryPartitionManager
.
class SamplePartitionProcessor extends PartitionProcessor {
async processEvents(events, partitionContext) {
}
}
const client = new EventHubClient("my-connection-string", "my-event-hub");
const partitionManager = new InMemoryPartitionManager();
const processor1 = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
SamplePartitionProcessor,
partitionManager
);
const processor2 = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
SamplePartitionProcessor,
partitionManager
);
await processor1.start();
await processor2.start();
await delay(30000);
await processor1.stop();
await processor2.stop();
To control the number of events passed to processEvents, use the options argument in the EventProcessor constructor.
Note: In this model, you are responsible for closing the EventHubClient
instance to dispose it.
Use EventHubClient to work with IotHub
You can use EventHubClient
to work with IotHub as well. This is useful for receiving telemetry data of IotHub from the linked EventHub.
The associated connection string will not have send claims,
hence sending events is not possible.
- Please notice that the connection string needs to be for an
Event Hub-compatible endpoint
e.g. "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
const client = new EventHubClient(
"Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
);
await client.getProperties();
const partitionId = "0";
await client.getPartitionProperties(partitionId);
Notes: For scalable and efficient receiving, please take a look at EventProcessor. The EventProcessor, internally uses the batched receiver to receive events.
Troubleshooting
AMQP Dependencies
The Event Hubs library depends on the rhea-promise library for managing connections, sending and receiving events over the AMQP protocol.
Enable logs
You can set the following environment variable to get the debug logs when using this library.
- Getting debug logs from the Event Hubs SDK
export DEBUG=azure*
- Getting debug logs from the Event Hubs SDK and the protocol level library.
export DEBUG=azure*,rhea*
- If you are not interested in viewing the event transformation (which consumes lot of console/disk space) then you can set the
DEBUG
environment variable as follows:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message,-azure:amqp-common:datatransformer
- If you are interested only in errors, then you can set the
DEBUG
environment variable as follows:
export DEBUG=azure:event-hubs:error,azure-amqp-common:error,rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow
Logging to a file
Next Steps
Please take a look at the samples
directory for detailed examples on how to use this library to send and receive events to/from
Event Hubs.
Contributing
If you'd like to contribute to this library, please read the contributing guide to learn more about how to build and test the code.