Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@azure/event-hubs

Package Overview
Dependencies
Maintainers
6
Versions
527
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@azure/event-hubs

Azure Event Hubs SDK for JS.

  • 5.0.0-preview.2
  • npm
  • Socket score

Version published
Weekly downloads
74K
increased by2.9%
Maintainers
6
Weekly downloads
 
Created
Source

Azure Event Hubs client library for Javascript

Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This lets you process and analyze the massive amounts of data produced by your connected devices and applications. If you would like to know more about Azure Event Hubs, you may wish to review: What is Event Hubs?

The Azure Event Hubs client library allows you to send and receive events in your Node.js application.

Source code | Package (npm) | API Reference Documentation | Product documentation | Samples

NOTE: If you are using version 2.1.0 or lower, then please use the below links instead

Source code for v2.1.0 | Package for v2.1.0 (npm) | Samples for v2.1.0

Getting Started

Install the package

Install the Azure Event Hubs client library using npm

npm install @azure/event-hubs@5.0.0-preview.2

Prerequisites: You must have an Azure subscription and a Event Hubs Namespace to use this package. If you are using this package in a Node.js application, then use Node.js 8.x or higher.

Configure Typescript

TypeScript users need to have Node type definitions installed:

npm install @types/node

You also need to enable compilerOptions.allowSyntheticDefaultImports in your tsconfig.json. Note that if you have enabled compilerOptions.esModuleInterop, allowSyntheticDefaultImports is enabled by default. See TypeScript's compiler options handbook for more information.

Key concepts

  • An Event Hub client is the primary interface for developers interacting with the Event Hubs client library, allowing for inspection of Event Hub metadata and providing a guided experience towards specific Event Hub operations such as the creation of producers and consumers.

  • An Event Hub producer is a source of telemetry data, diagnostics information, usage logs, or other log data, as part of an embedded device solution, a mobile device application, a game title running on a console or other device, some client or server based business solution, or a web site.

  • An Event Hub consumer picks up such information from the Event Hub and processes it. Processing may involve aggregation, complex computation and filtering. Processing may also involve distribution or storage of the information in a raw or transformed fashion. Event Hub consumers are often robust and high-scale platform infrastructure parts with built-in analytics capabilities, like Azure Stream Analytics, Apache Spark, or Apache Storm.

  • A partition is an ordered sequence of events that is held in an Event Hub. Partitions are a means of data organization associated with the parallelism required by event consumers. Azure Event Hubs provides message streaming through a partitioned consumer pattern in which each consumer only reads a specific subset, or partition, of the message stream. As newer events arrive, they are added to the end of this sequence. The number of partitions is specified at the time an Event Hub is created and cannot be changed.

  • A consumer group is a view of an entire Event Hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and from their own position. There can be at most 5 concurrent readers on a partition per consumer group; however it is recommended that there is only one active consumer for a given partition and consumer group pairing. Each active reader receives all of the events from its partition; If there are multiple readers on the same partition, then they will receive duplicate events.

For more concepts and deeper discussion, see: Event Hubs Features

Authenticate the client

Interaction with Event Hubs starts with an instance of the EventHubClient class. You can instantiate this class using one of the below

const client = new EventHubClient("my-connection-string", "my-event-hub");
  • This constructor takes the connection string of the form 'Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;' and entity name to your Event Hub instance. You can get the connection string from the Azure portal.
const client = new EventHubClient("my-connection-string-with-entity-path");
  • The connection string from the Azure Portal is for the entire Event Hubs namespace and will not contain the path to the desired Event Hub instance which is needed for this constructor overload. In this case, the path can be added manually by adding ";EntityPath=[[ EVENT HUB NAME ]]" to the end of the connection string. For example, ";EntityPath=my-event-hub-name".

If you have defined a shared access policy directly on the Event Hub itself, then copying the connection string from that Event Hub will result in a connection string that contains the path.

const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const client = new EventHubClient("my-host-name", "my-event-hub", credential);
  • This constructor takes the host name and entity name of your Event Hub instance and credential that implements the TokenCredential interface. There are implementations of the TokenCredential interface available in the @azure/identity package. The host name is of the format <yournamespace>.servicebus.windows.net.

Examples

The following sections provide code snippets that cover some of the common tasks using Azure Event Hubs

Inspect an Event Hub

Many Event Hub operations take place within the scope of a specific partition. Because partitions are owned by the Event Hub, their names are assigned at the time of creation. To understand what partitions are available, you query the Event Hub using the client.

const client = new EventHubCLient("connectionString", "eventHubName");
const partitionIds = await client.getPartitionIds();

Publish events to an Event Hub

In order to publish events, you'll need to create an EventHubProducer. Producers may be dedicated to a specific partition, or allow the Event Hubs service to decide which partition events should be published to. It is recommended to use automatic routing when the publishing of events needs to be highly available or when event data should be distributed evenly among the partitions. In the below examples, we will take advantage of automatic routing.

Send a single event or an array of events

Use the send method to send a single event or multiple events using a single call.

const client = new EventHubClient("connectionString", "eventHubName");
const producer = client.createProducer();
await producer.send({ body: "my-event-body" });
await producer.send([{ body: "foo" }, { body: "bar" }]);
Send a batch of events

Use the createBatch method to create an EventDataBatch object which can then be sent using the send method. Events may be added to the EventDataBatch using the tryAdd method until the maximum batch size limit in bytes has been reached.

const client = new EventHubClient("connectionString", "eventHubName");
const producer = client.createProducer();
const eventDataBatch = await producer.createBatch();
let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
wasAdded = eventDataBatch.tryAdd({ body: "my-event-body-2" });
await producer.send(eventDataBatch);

The Inspect an Event Hub example shows how to get the list of partition ids should you wish to specify one for a producer.

The createProducer method takes an optional parameter of type EventHubProducerOptions which you can use to specify the retry options and partition id for the send operation.

The send method takes an optional parameter of type SendOptions which you can use to specify abortSignal to cancel current operation. You can also specify partitionKey if you did not specify a partition id when creating the producer. All events that use the same partition key will be sent to the same partition.

Note: When working with Azure Stream Analytics, the body of the event being sent should be a JSON object as well. For example: body: { "message": "Hello World" }

Consume events from an Event Hub partition

To consume events from a single Event Hub partition in a consumer group, create an EventHubConsumer for that partition and consumer group combination. You will need to provide a position in the event stream from where to begin receiving events; in our example, we will read new events as they are published.

const client = new EventHubClient("connectionString", "eventHubName");
const consumer = client.createConsumer(
  EventHubClient.defaultConsumerGroupName,
  partitionIds[0],
  EventPosition.latest()
);

The Inspect an Event Hub example shows how to get the list of partition ids.

The createConsumer method takes an optional parameter of type EventHubConsumerOptions which you can use to specify the ownerLevel, the level that this consumer is currently using for partition ownership. If another consumer is currently active for the same partition with no or lower level, then it will get disconnected. If another consumer is currently active with a higher level, then this consumer will fail to connect. You can also specify retryOptions for the receive operation on the consumer.

You can use this consumer in one of 3 ways to receive events:

Get an array of events

Use the receiveBatch function which returns a promise that resolves to an array of events.

This function takes an optional parameter called abortSignal to cancel current operation.

const maxMessageCount = 10;
const myEvents = await consumer.receiveBatch(maxMessageCount);
Register event handler

Use the receive to set up event handlers and have it running as long as you need.

This function takes an optional parameter called abortSignal to cancel current operation.

const myEventHandler = (event) => {
  // your code here
};
const myErrorHandler = (error) => {
  // your error handler here
};
const receiveHandler = consumer.receive(myEventHandler, myErrorHandler);

// When ready to stop receiving
await receiveHandler.stop();
Use async iterator

Use the getMessageIterator to get an async iterator over events.

This function takes an optional EventIteratorOptions parameter that includes abortSignal to cancel the current operation.

for await (const events of consumer.getEventIterator()){
  // your code here
}

Consume events using an Event Processor

Using an EventHubConsumer to consume events like in the previous examples puts the responsibility of storing the checkpoints (the last processed event) on the user. Checkpoints are important for restarting the task of processing events from the right position in a partition. Ideally, you would also want to run multiple programs targeting different partitions with some load balancing. This is where an EventProcessor can help.

The EventProcessor will delegate the processing of events to a PartitionProcessor that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer operations including checkpointing and load balancing.

While load balancing is a feature we will be adding in the next update, you can see how to use the EventProcessor in the below example, where we use an InMemoryPartitionManager that does checkpointing in memory.

class SimplePartitionProcessor {
  // Gets called once before the processing of events from current partition starts.
  async initialize() {
    /* your code here */
  }

  // Gets called for each batch of events that are received.
  // You may choose to use the checkpoint manager to update checkpoints.
  async processEvents(events) {
    /* your code here */
  }

  // Gets called for any error when receiving events.
  async processError(error) {
    /* your code here */
  }

  // Gets called when Event Processor stops processing events for current partition.
  async close(reason) {
    /* your code here */
  }
}

const client = new EventHubClient("my-connection-string", "my-event-hub");
const processor = new EventProcessor(
  EventHubClient.defaultConsumerGroupName,
  client,
  (partitionContext, checkpointManager) => new SimplePartitionProcessor(),
  new InMemoryPartitionManager()
);
await processor.start();
// At this point, the processor is consuming events from each partition of the Event Hub and
// delegating them to the SimplePartitionProcessor instance created for that partition.  This
// processing takes place in the background and will not block.
//
// In this example, we'll stop processing after five seconds.
await delay(5000);
await processor.stop();

To control the number of events passed to processEvents, use the options argument in the EventProcessor constructor.

Note: In this model, you are responsible for closing the EventHubClient instance to dispose it.

Use EventHubClient to work with IotHub

You can use EventHubClient to work with IotHub as well. This is useful for receiving telemetry data of IotHub from the linked EventHub. Most likely the associated connection string will not have send claims. Hence getting HubRuntimeInfo or PartitionRuntimeInfo and receiving events would be the possible operations.

  • Please notice that we are awaiting on the createFromIotHubConnectionString method to get an instance of the EventHubClient. This is different from other static methods on the client. The method talks to the IotHub endpoint to get a redirect error which contains the EventHub endpoint to talk to. It then constructs the right EventHub connection string based on the information in the redirect error and returns an instance of the EventHubClient that you can play with.
const client = await EventHubClient.createFromIotHubConnectionString("connectionString");
await client.getProperties();
await client.getPartitionProperties("partitionId");

Notes: For scalable and efficient receiving, please take a look at azure-event-processor-host. The Event Processor host, internally uses the streaming receiver to receive events.

Troubleshooting

AMQP Dependencies

The Event Hubs library depends on the rhea-promise library for managing connections, sending and receiving events over the AMQP protocol.

Enable logs

You can set the following environment variable to get the debug logs when using this library.

  • Getting debug logs from the Event Hubs SDK
export DEBUG=azure*
  • Getting debug logs from the Event Hubs SDK and the protocol level library.
export DEBUG=azure*,rhea*
  • If you are not interested in viewing the event transformation (which consumes lot of console/disk space) then you can set the DEBUG environment variable as follows:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message,-azure:amqp-common:datatransformer
  • If you are interested only in errors, then you can set the DEBUG environment variable as follows:
export DEBUG=azure:event-hubs:error,azure-amqp-common:error,rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

Logging to a file

  • Set the DEBUG environment variable as shown above and then run your test script as follows:

    • Logging statements from your test script go to out.log and logging statements from the sdk go to debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Logging statements from your test script and the sdk go to the same file out.log by redirecting stderr to stdout (&1), and then redirect stdout to a file:

      node your-test-script.js >out.log 2>&1
      
    • Logging statements from your test script and the sdk go to the same file out.log.

        node your-test-script.js &> out.log
      

Next Steps

Please take a look at the samples directory for detailed examples on how to use this library to send and receive events to/from Event Hubs.

Contributing

If you'd like to contribute to this library, please read the contributing guide to learn more about how to build and test the code.

Impressions

Keywords

FAQs

Package last updated on 06 Aug 2019

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc