@azure/event-hubs
Advanced tools
Comparing version 5.0.0-preview.2 to 5.0.0-preview.3
@@ -0,1 +1,23 @@ | ||
### 2019-09-09 5.0.0-preview.3 | ||
#### Features | ||
- Adds load-balancing capabilities to `EventProcessor`. `EventProcesor` will use the data from `PartitionManager` to regulate how many partitions it should read from. | ||
([PR #4839](https://github.com/Azure/azure-sdk-for-js/pull/4839)). | ||
- Adds `lastEnqueuedEventInfo` property on `EventHubConsumer`. When the consumer is created with `trackLastEnqueuedEventInfo` set to `true`, the `lastEnqueuedEventInfo` | ||
field is updated everytime a message is received and provides details about the last enqueued event in the partition the `EventHubConsumer` is reading from. | ||
([PR #5036](https://github.com/Azure/azure-sdk-for-js/pull/5036)) | ||
- Received event data will now expose `systemProperties` for message annotations set by the service. ([PR #5008](https://github.com/Azure/azure-sdk-for-js/pull/5008)). | ||
- Improved error messages when constructing an `EventHubClient` with an invalid connection string and Event Hub name combo. ([PR #4899](https://github.com/Azure/azure-sdk-for-js/pull/4899)). | ||
- Adds client-side type-checking for `EventPosition` static helper methods. ([PR #5052](https://github.com/Azure/azure-sdk-for-js/pull/5052)). | ||
#### Breaking changes | ||
- The `PartitionProcessor` interface is now a class with default implementations of `initialize`, `close`, `processEvents`, and `processError`. | ||
([PR #4994](https://github.com/Azure/azure-sdk-for-js/pull/4994)). | ||
- Users should extend the `PartitionProcessor` class and override any of the methods that need custom logic. | ||
- All 4 methods now accept `PartitionContext` as the last parameter. | ||
`PartitionContext` contains information about the partition being processed, as well as the `updateCheckpoint` method that can be used to persist a checkpoint. | ||
- The `EventProcessor` constructor was changed to no longer accept a `PartitionProcessorFactory` function that returns a `PartitionProcessor`. | ||
Instead, users should extend the `PartitionProcessor` class and pass the class (not an instance of the class) to the `EventProcessor` constructor. | ||
([PR #4994](https://github.com/Azure/azure-sdk-for-js/pull/4994)). | ||
### 2019-08-06 5.0.0-preview.2 | ||
@@ -2,0 +24,0 @@ |
@@ -14,14 +14,24 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
if (msg.message_annotations) { | ||
if (msg.message_annotations[Constants.partitionKey] != undefined) { | ||
data.partitionKey = msg.message_annotations[Constants.partitionKey]; | ||
for (const annotationKey of Object.keys(msg.message_annotations)) { | ||
switch (annotationKey) { | ||
case Constants.partitionKey: | ||
data.partitionKey = msg.message_annotations[annotationKey]; | ||
break; | ||
case Constants.sequenceNumber: | ||
data.sequenceNumber = msg.message_annotations[annotationKey]; | ||
break; | ||
case Constants.enqueuedTime: | ||
data.enqueuedTimeUtc = new Date(msg.message_annotations[annotationKey]); | ||
break; | ||
case Constants.offset: | ||
data.offset = msg.message_annotations[annotationKey]; | ||
break; | ||
default: | ||
if (!data.systemProperties) { | ||
data.systemProperties = {}; | ||
} | ||
data.systemProperties[annotationKey] = msg.message_annotations[annotationKey]; | ||
break; | ||
} | ||
} | ||
if (msg.message_annotations[Constants.sequenceNumber] != undefined) { | ||
data.sequenceNumber = msg.message_annotations[Constants.sequenceNumber]; | ||
} | ||
if (msg.message_annotations[Constants.enqueuedTime] != undefined) { | ||
data.enqueuedTimeUtc = new Date(msg.message_annotations[Constants.enqueuedTime]); | ||
} | ||
if (msg.message_annotations[Constants.offset] != undefined) { | ||
data.offset = msg.message_annotations[Constants.offset]; | ||
} | ||
} | ||
@@ -28,0 +38,0 @@ if (msg.application_properties) { |
@@ -7,6 +7,11 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
/** | ||
* A class representing a batch of events which can be passed to the `send` method of a `EventConsumer` instance. | ||
* A class representing a batch of events which can be passed to the `send` method of a `EventProducer` instance. | ||
* This batch is ensured to be under the maximum message size supported by Azure Event Hubs service. | ||
* | ||
* Use the `tryAdd` function on the EventDataBatch to add events in a batch. | ||
* Use `createBatch()` method on the `EventHubProducer` to create an instance of `EventDataBatch` | ||
* instead of using `new EventDataBatch()`. You can specify an upper limit for the size of the batch | ||
* via options when calling `createBatch()`. | ||
* | ||
* Use the `tryAdd` function on the EventDataBatch to add events to the batch. This method will return | ||
* `false` after the upper limit is reached, therefore check the result before calling `tryAdd()` again. | ||
* @class | ||
@@ -16,2 +21,4 @@ */ | ||
/** | ||
* EventDataBatch should not be constructed using `new EventDataBatch()` | ||
* Use the `createBatch()` method on your `EventHubProducer` instead. | ||
* @constructor | ||
@@ -33,3 +40,4 @@ * @internal | ||
/** | ||
* @property The partitionKey set during `EventDataBatch` creation. This value is hashed to produce a partition assignment when the consumer is created without a `partitionId` | ||
* @property The partitionKey set during `EventDataBatch` creation. This value is hashed to | ||
* produce a partition assignment when the producer is created without a `partitionId` | ||
* @readonly | ||
@@ -41,3 +49,4 @@ */ | ||
/** | ||
* @property Size of a batch of events. | ||
* @property Size of the `EventDataBatch` instance after the events added to it have been | ||
* encoded into a single AMQP message. | ||
* @readonly | ||
@@ -49,3 +58,3 @@ */ | ||
/** | ||
* @property Number of events in the batch. | ||
* @property Number of events in the `EventDataBatch` instance. | ||
* @readonly | ||
@@ -57,3 +66,9 @@ */ | ||
/** | ||
* @property Encoded batch message. | ||
* @property Represents the single AMQP message which is the result of encoding all the events | ||
* added into the `EventDataBatch` instance. | ||
* | ||
* This is not meant for the user to use directly. | ||
* | ||
* When the `EventDataBatch` instance is passed to the `send()` method on the `EventHubProducer`, | ||
* this single batched AMQP message is what gets sent over the wire to the service. | ||
* @readonly | ||
@@ -66,2 +81,5 @@ */ | ||
* Tries to add an event data to the batch if permitted by the batch's size limit. | ||
* **NOTE**: Always remember to check the return value of this method, before calling it again | ||
* for the next event. | ||
* | ||
* @param eventData An individual event data object. | ||
@@ -68,0 +86,0 @@ * @returns A boolean value indicating if the event data has been added to the batch or not. |
@@ -5,3 +5,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import * as log from "./log"; | ||
import { EventHubConnectionConfig, SharedKeyCredential, ConnectionConfig, isTokenCredential, Constants } from "@azure/core-amqp"; | ||
import { EventHubConnectionConfig, SharedKeyCredential, ConnectionConfig, isTokenCredential, Constants, parseConnectionString } from "@azure/core-amqp"; | ||
import { ConnectionContext } from "./connectionContext"; | ||
@@ -26,5 +26,13 @@ import { IotHubClient } from "./iothub/iothubClient"; | ||
* operations for sending event data, receiving events, and inspecting the connected Event Hub. | ||
* | ||
* There are multiple ways to create an `EventHubClient` | ||
* - Use the connection string from the SAS policy created for your Event Hub instance. | ||
* - Use the connection string from the SAS policy created for your Event Hub namespace, | ||
* and the name of the Event Hub instance | ||
* - Use the fully qualified domain name of your Event Hub namespace like `<yournamespace>.servicebus.windows.net`, | ||
* and a credentials object. | ||
* | ||
*/ | ||
export class EventHubClient { | ||
constructor(hostOrConnectionString, eventHubPathOrOptions, credentialOrOptions, options) { | ||
constructor(hostOrConnectionString, eventHubNameOrOptions, credentialOrOptions, options) { | ||
let connectionString; | ||
@@ -35,12 +43,25 @@ let config; | ||
if (!isTokenCredential(credentialOrOptions)) { | ||
const parsedCS = parseConnectionString(hostOrConnectionString); | ||
if (!(parsedCS.EntityPath || | ||
(typeof eventHubNameOrOptions === "string" && eventHubNameOrOptions))) { | ||
throw new TypeError(`Either provide "eventHubName" or the "connectionString": "${hostOrConnectionString}", ` + | ||
`must contain "EntityPath=<your-event-hub-name>".`); | ||
} | ||
if (parsedCS.EntityPath && | ||
typeof eventHubNameOrOptions === "string" && | ||
eventHubNameOrOptions && | ||
parsedCS.EntityPath !== eventHubNameOrOptions) { | ||
throw new TypeError(`The entity path "${parsedCS.EntityPath}" in connectionString: "${hostOrConnectionString}" ` + | ||
`doesn't match with eventHubName: "${eventHubNameOrOptions}".`); | ||
} | ||
connectionString = hostOrConnectionString; | ||
if (typeof eventHubPathOrOptions !== "string") { | ||
if (typeof eventHubNameOrOptions !== "string") { | ||
// connectionstring and/or options were passed to constructor | ||
config = EventHubConnectionConfig.create(connectionString); | ||
options = eventHubPathOrOptions; | ||
options = eventHubNameOrOptions; | ||
} | ||
else { | ||
// connectionstring, eventHubPath and/or options were passed to constructor | ||
const eventHubPath = eventHubPathOrOptions; | ||
config = EventHubConnectionConfig.create(connectionString, eventHubPath); | ||
// connectionstring, eventHubName and/or options were passed to constructor | ||
const eventHubName = eventHubNameOrOptions; | ||
config = EventHubConnectionConfig.create(connectionString, eventHubName); | ||
options = credentialOrOptions; | ||
@@ -52,9 +73,12 @@ } | ||
else { | ||
// host, eventHubPath, a TokenCredential and/or options were passed to constructor | ||
const eventHubPath = eventHubPathOrOptions; | ||
// host, eventHubName, a TokenCredential and/or options were passed to constructor | ||
const eventHubName = eventHubNameOrOptions; | ||
let host = hostOrConnectionString; | ||
credential = credentialOrOptions; | ||
if (!eventHubName) { | ||
throw new TypeError(`"eventHubName" is missing`); | ||
} | ||
if (!host.endsWith("/")) | ||
host += "/"; | ||
connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;SharedAccessKey=defaultKeyValue;EntityPath=${eventHubPath}`; | ||
connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;SharedAccessKey=defaultKeyValue;EntityPath=${eventHubName}`; | ||
config = EventHubConnectionConfig.create(connectionString); | ||
@@ -109,3 +133,3 @@ } | ||
/** | ||
* Creates an Event Hub producer responsible for sending `EventData` to the Event Hub. | ||
* Creates an Event Hub producer that can send events to the Event Hub. | ||
* If `partitionId` is specified in the `options`, all event data sent using the producer | ||
@@ -115,11 +139,13 @@ * will be sent to the specified partition. | ||
* | ||
* Allowing automatic routing of partitions is recommended when: | ||
* - The sending of events needs to be highly available. | ||
* - The event data should be evenly distributed among all available partitions. | ||
* Automatic routing of partitions is recommended because: | ||
* - The sending of events will be highly available. | ||
* - The event data will be evenly distributed among all available partitions. | ||
* | ||
* @param options The set of options to apply when creating the producer where you can specify the id of the partition | ||
* to which events need to be sent to, and retry options. | ||
* @param options The set of options to apply when creating the producer. | ||
* - `partitionId` : The identifier of the partition that the producer can be bound to. | ||
* - `retryOptions` : The retry options used to govern retry attempts when an issue is encountered while sending events. | ||
* A simple usage can be `{ "maxRetries": 4 }`. | ||
* | ||
* @throws {Error} Thrown if the underlying connection has been closed, create a new EventHubClient. | ||
* @returns Promise<void> | ||
* @returns EventHubProducer | ||
*/ | ||
@@ -137,3 +163,3 @@ createProducer(options) { | ||
/** | ||
* Creates an Event Hub consumer responsible for reading `EventData` from a specific Event Hub partition, | ||
* Creates an Event Hub consumer that can receive events from a specific Event Hub partition, | ||
* in the context of a specific consumer group. | ||
@@ -146,8 +172,19 @@ * | ||
* | ||
* Designating a consumer as exclusive may be specified in the `options` via `ownerLevel`. | ||
* | ||
* @param consumerGroup The name of the consumer group this consumer is associated with. Events are read in the context of this group. | ||
* @param consumerGroup The name of the consumer group this consumer is associated with. | ||
* Events are read in the context of this group. You can get this information from Azure portal. | ||
* @param partitionId The identifier of the Event Hub partition from which events will be received. | ||
* You can get identifiers for all partitions by using the `getPartitionProperties` method on the `EventHubClient`. | ||
* @param eventPosition The position within the partition where the consumer should begin reading events. | ||
* @param options The set of options to apply when creating the consumer where you can specify retry options and ownerLevel. | ||
* The easiest way to create an instance of EventPosition is to use the static helpers on it like | ||
* - `EventPosition.fromOffset()` | ||
* - `EventPosition.fromSequenceNumber()` | ||
* - `EventPosition.fromEnqueuedTime()` | ||
* - `EventPosition.earliest()` | ||
* - `EventPosition.latest()` | ||
* @param options The set of options to apply when creating the consumer. | ||
* - `ownerLevel` : A number indicating that the consumer intends to be an exclusive consumer of events resulting in other | ||
* consumers to fail if their `ownerLevel` is lower or doesn't exist. | ||
* - `retryOptions`: The retry options used to govern retry attempts when an issue is encountered while receiving events. | ||
* A simple usage can be `{ "maxRetries": 4 }`. | ||
* | ||
* @throws {Error} Thrown if the underlying connection has been closed, create a new EventHubClient. | ||
@@ -154,0 +191,0 @@ * @throws {TypeError} Thrown if a required parameter is missing. |
@@ -36,7 +36,2 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
/** | ||
* @property receiverRuntimeMetricEnabled Indicates whether receiver runtime metric | ||
* is enabled. Default: false. | ||
*/ | ||
this.receiverRuntimeMetricEnabled = false; | ||
/** | ||
* @property _checkpoint The sequence number of the most recently received AMQP message. | ||
@@ -100,9 +95,10 @@ * @private | ||
enqueuedTimeUtc: data.enqueuedTimeUtc, | ||
partitionKey: data.partitionKey | ||
partitionKey: data.partitionKey, | ||
systemProperties: data.systemProperties | ||
}; | ||
this._checkpoint = receivedEventData.sequenceNumber; | ||
if (this.receiverRuntimeMetricEnabled && data) { | ||
this.runtimeInfo.lastEnqueuedSequenceNumber = data.lastSequenceNumber; | ||
this.runtimeInfo.lastEnqueuedTimeUtc = data.lastEnqueuedTime; | ||
this.runtimeInfo.lastEnqueuedOffset = data.lastEnqueuedOffset; | ||
if (this.options.trackLastEnqueuedEventInfo && data) { | ||
this.runtimeInfo.sequenceNumber = data.lastSequenceNumber; | ||
this.runtimeInfo.enqueuedTime = data.lastEnqueuedTime; | ||
this.runtimeInfo.offset = data.lastEnqueuedOffset; | ||
this.runtimeInfo.retrievalTime = data.retrievalTime; | ||
@@ -527,3 +523,3 @@ log.receiver("[%s] RuntimeInfo of Receiver '%s' is %O", this._context.connectionId, this.name, this.runtimeInfo); | ||
} | ||
if (this.receiverRuntimeMetricEnabled) { | ||
if (this.options.trackLastEnqueuedEventInfo) { | ||
rcvrOptions.desired_capabilities = Constants.enableReceiverRuntimeMetricName; | ||
@@ -530,0 +526,0 @@ } |
@@ -6,3 +6,10 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
* Represents the position of an event in an Event Hub partition, typically used in the creation of | ||
* an `EventHubProducer`. | ||
* an `EventHubConsumer` to specify the position in the partition to begin receiving events from. | ||
* | ||
* Make use of the below static helpers to create an instance of `EventPosition` | ||
* - `fromOffset()` | ||
* - `fromSequenceNumber()` | ||
* - `fromEnqueuedTime()` | ||
* - `earliest()` | ||
* - `latest()` | ||
* @class | ||
@@ -12,2 +19,9 @@ */ | ||
/** | ||
* Instead of constructing an event position using `new Event Position()`, make use of the below static helpers | ||
* - `fromOffset()` | ||
* - `fromSequenceNumber()` | ||
* - `fromEnqueuedTime()` | ||
* - `earliest()` | ||
* - `latest()` | ||
* | ||
* @constructor | ||
@@ -42,4 +56,4 @@ * @internal | ||
static fromOffset(offset, isInclusive) { | ||
if (offset == undefined) { | ||
throw new Error('Missing parameter "offset"'); | ||
if (typeof offset !== "number" && typeof offset !== "string") { | ||
throw new Error(`Invalid offset "${offset}" provided to "fromOffset" method.`); | ||
} | ||
@@ -74,4 +88,4 @@ return new EventPosition({ offset: offset, isInclusive: isInclusive }); | ||
static fromEnqueuedTime(enqueuedTime) { | ||
if (enqueuedTime == undefined) { | ||
throw new Error('Missing parameter "enqueuedTime"'); | ||
if (typeof enqueuedTime !== "number" && !(enqueuedTime instanceof Date)) { | ||
throw new Error(`Invalid enqueuedTime "${enqueuedTime}" provided to "fromEnqueuedTime" method.`); | ||
} | ||
@@ -78,0 +92,0 @@ return new EventPosition({ enqueuedTime: enqueuedTime }); |
@@ -6,9 +6,11 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import { EventPosition } from "./eventPosition"; | ||
import { CheckpointManager } from "./checkpointManager"; | ||
import { PartitionContext } from "./partitionContext"; | ||
import { PumpManager } from "./pumpManager"; | ||
import { AbortController } from "@azure/abort-controller"; | ||
import * as log from "./log"; | ||
import { PartitionLoadBalancer } from "./partitionLoadBalancer"; | ||
import { delay } from "@azure/core-amqp"; | ||
/** | ||
* Reason for closing a PartitionProcessor. | ||
* An enum representing the different reasons for an `EventProcessor` to stop processing | ||
* events from a partition in a consumer group of an Event Hub instance. | ||
*/ | ||
@@ -31,15 +33,45 @@ export var CloseReason; | ||
/** | ||
* Describes the Event Processor Host to process events from an EventHub. | ||
* @class EventProcessorHost | ||
* Event Processor based applications consist of one or more instances of EventProcessor which have been | ||
* configured to consume events from the same Event Hub and consumer group. They balance the | ||
* workload across different instances by distributing the partitions to be processed among themselves. | ||
* They also allow the user to track progress when events are processed using checkpoints. | ||
* | ||
* A checkpoint is meant to represent the last successfully processed event by the user from a particular | ||
* partition of a consumer group in an Event Hub instance. | ||
* | ||
* You need the below to create an instance of `EventProcessor` | ||
* - The name of the consumer group from which you want to process events | ||
* - An instance of `EventHubClient` class that was created for the Event Hub instance. | ||
* - A user implemented class that extends the `PartitionProcessor` class. To get started, you can use the | ||
* base class `PartitionProcessor` which simply logs the incoming events. To provide your code to process incoming | ||
* events, extend this class and override the `processEvents()` method. For example: | ||
* ```js | ||
* class SamplePartitionProcessor extends PartitionProcessor { | ||
* async processEvents(events, partitionContext) { | ||
* // user code to process events here | ||
* // use `partitionContext` property to get information on the partition | ||
* // use `partitionContext.updateCheckpoint()` method to update checkpoints as needed | ||
* } | ||
* } | ||
* ``` | ||
* - An instance of `PartitionManager`. To get started, you can pass an instance of `InMemoryPartitionManager`. | ||
* For production, choose an implementation that will store checkpoints and partition ownership details to a durable store. | ||
* Implementations of `PartitionManager` can be found on npm by searching for packages with the prefix @azure/eventhub-checkpointstore-. | ||
* | ||
* @class EventProcessor | ||
*/ | ||
export class EventProcessor { | ||
/** | ||
* @param consumerGroupName The consumer group name used in this event processor to consumer events. | ||
* @param eventHubAsyncClient The Event Hub client. | ||
* @param partitionProcessorFactory The factory to create new partition processor(s). | ||
* @param initialEventPosition Initial event position to start consuming events. | ||
* @param partitionManager The partition manager. | ||
* @param eventHubName The Event Hub name. | ||
* @param consumerGroupName The name of the consumer group from which you want to process events. | ||
* @param eventHubClient An instance of `EventHubClient` that was created for the Event Hub instance. | ||
* @param PartitionProcessorClass A user-provided class that extends the `PartitionProcessor` class. | ||
* This class will be responsible for processing and checkpointing events. | ||
* @param partitionManager An instance of `PartitionManager`. To get started, you can pass an instance of `InMemoryPartitionManager`. | ||
* For production, choose an implementation that will store checkpoints and partition ownership details to a durable store. | ||
* @param options A set of options to configure the Event Processor | ||
* - `maxBatchSize` : The max size of the batch of events passed each time to user code for processing. | ||
* - `maxWaitTimeInSeconds` : The maximum amount of time to wait to build up the requested message count before | ||
* passing the data to user code for processing. If not provided, it defaults to 60 seconds. | ||
*/ | ||
constructor(consumerGroupName, eventHubClient, partitionProcessorFactory, partitionManager, options) { | ||
constructor(consumerGroupName, eventHubClient, PartitionProcessorClass, partitionManager, options) { | ||
this._id = uuid(); | ||
@@ -51,30 +83,66 @@ this._isRunning = false; | ||
this._eventHubClient = eventHubClient; | ||
this._partitionProcessorFactory = partitionProcessorFactory; | ||
this._partitionProcessorClass = PartitionProcessorClass; | ||
this._partitionManager = partitionManager; | ||
this._processorOptions = options; | ||
this._pumpManager = new PumpManager(this._id, options); | ||
this._pumpManager = new PumpManager(this._id, this._processorOptions); | ||
const inactiveTimeLimitInMS = 60000; // ownership expiration time (1 mintue) | ||
this._partitionLoadBalancer = new PartitionLoadBalancer(this._id, inactiveTimeLimitInMS); | ||
} | ||
_getInactivePartitions() { | ||
/** | ||
* The unique identifier for the EventProcessor. | ||
* | ||
* @return {string} | ||
*/ | ||
get id() { | ||
return this._id; | ||
} | ||
_createPartitionOwnershipRequest(partitionOwnershipMap, partitionIdToClaim) { | ||
const previousPartitionOwnership = partitionOwnershipMap.get(partitionIdToClaim); | ||
const partitionOwnership = { | ||
ownerId: this._id, | ||
partitionId: partitionIdToClaim, | ||
consumerGroupName: this._consumerGroupName, | ||
eventHubName: this._eventHubClient.eventHubName, | ||
sequenceNumber: previousPartitionOwnership | ||
? previousPartitionOwnership.sequenceNumber | ||
: undefined, | ||
offset: previousPartitionOwnership ? previousPartitionOwnership.offset : undefined, | ||
eTag: previousPartitionOwnership ? previousPartitionOwnership.eTag : undefined, | ||
ownerLevel: 0 | ||
}; | ||
return partitionOwnership; | ||
} | ||
/* | ||
* Claim ownership of the given partition if it's available | ||
*/ | ||
_claimOwnership(partitionOwnershipMap, partitionIdToClaim) { | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
log.partitionLoadBalancer(`[${this._id}] Attempting to claim ownership of partition ${partitionIdToClaim}.`); | ||
const ownershipRequest = this._createPartitionOwnershipRequest(partitionOwnershipMap, partitionIdToClaim); | ||
try { | ||
// get all partition ids on the event hub | ||
const partitionIds = yield this._eventHubClient.getPartitionIds(); | ||
// get partitions this EventProcessor is actively processing | ||
const activePartitionIds = this._pumpManager.receivingFromPartitions(); | ||
// get a list of partition ids that are not being processed by this EventProcessor | ||
const inactivePartitionIds = partitionIds.filter((id) => activePartitionIds.indexOf(id) === -1); | ||
return inactivePartitionIds; | ||
yield this._partitionManager.claimOwnership([ownershipRequest]); | ||
log.partitionLoadBalancer(`[${this._id}] Successfully claimed ownership of partition ${partitionIdToClaim}.`); | ||
const partitionContext = new PartitionContext(this._eventHubClient.eventHubName, this._consumerGroupName, ownershipRequest.partitionId, this._partitionManager, this._id); | ||
log.partitionLoadBalancer(`[${this._id}] [${partitionIdToClaim}] Calling user-provided PartitionProcessorFactory.`); | ||
const partitionProcessor = new this._partitionProcessorClass(); | ||
const eventPosition = ownershipRequest.sequenceNumber | ||
? EventPosition.fromSequenceNumber(ownershipRequest.sequenceNumber) | ||
: EventPosition.earliest(); | ||
yield this._pumpManager.createPump(this._eventHubClient, partitionContext, eventPosition, partitionProcessor); | ||
log.partitionLoadBalancer(`[${this._id}] PartitionPump created successfully.`); | ||
} | ||
catch (err) { | ||
log.error(`[${this._id}] An error occured when retrieving partition ids: ${err}`); | ||
throw err; | ||
log.error(`[${this.id}] Failed to claim ownership of partition ${ownershipRequest.partitionId}`); | ||
} | ||
}); | ||
} | ||
/* | ||
* A simple implementation of an event processor that: | ||
* - Fetches all partition ids from Event Hub | ||
* - Gets the current ownership information of all the partitions from PartitionManager | ||
* - Claims ownership of any partition that doesn't have an owner yet. | ||
* - Starts a new PartitionProcessor and receives events from each of the partitions this instance owns | ||
/** | ||
* Every loop to this method will result in this EventProcessor owning at most one new partition. | ||
* | ||
* The load is considered balanced when no active EventProcessor owns 2 partitions more than any other active | ||
* EventProcessor. Given that each invocation to this method results in ownership claim of at most one partition, | ||
* this algorithm converges gradually towards a steady state. | ||
* | ||
* When a new partition is claimed, this method is also responsible for starting a partition pump that creates an | ||
* EventHubConsumer for processing events from that partition. | ||
*/ | ||
@@ -84,44 +152,21 @@ _runLoop(abortSignal) { | ||
// periodically check if there is any partition not being processed and process it | ||
const waitIntervalInMs = 30000; | ||
const waitIntervalInMs = 10000; | ||
while (!abortSignal.aborted) { | ||
try { | ||
// get a list of partition ids that are not being processed by this EventProcessor | ||
const partitionsToAdd = yield this._getInactivePartitions(); | ||
// check if the loop has been cancelled | ||
const partitionOwnershipMap = new Map(); | ||
// Retrieve current partition ownership details from the datastore. | ||
const partitionOwnership = yield this._partitionManager.listOwnership(this._eventHubClient.eventHubName, this._consumerGroupName); | ||
for (const ownership of partitionOwnership) { | ||
partitionOwnershipMap.set(ownership.partitionId, ownership); | ||
} | ||
const partitionIds = yield this._eventHubClient.getPartitionIds(); | ||
if (abortSignal.aborted) { | ||
return; | ||
} | ||
const tasks = []; | ||
// create partition pumps to process any partitions we should be processing | ||
for (const partitionId of partitionsToAdd) { | ||
const partitionContext = { | ||
consumerGroupName: this._consumerGroupName, | ||
eventHubName: this._eventHubClient.eventHubName, | ||
partitionId: partitionId | ||
}; | ||
const partitionOwnership = { | ||
eventHubName: this._eventHubClient.eventHubName, | ||
consumerGroupName: this._consumerGroupName, | ||
ownerId: this._id, | ||
partitionId: partitionId, | ||
ownerLevel: 0 | ||
}; | ||
yield this._partitionManager.claimOwnership([partitionOwnership]); | ||
const checkpointManager = new CheckpointManager(partitionContext, this._partitionManager, this._id); | ||
log.eventProcessor(`[${this._id}] [${partitionId}] Calling user-provided PartitionProcessorFactory.`); | ||
const partitionProcessor = this._partitionProcessorFactory(partitionContext, checkpointManager); | ||
// eventually this will 1st check if the existing PartitionOwnership has a position | ||
let eventPosition = this._processorOptions.initialEventPosition || EventPosition.earliest(); | ||
const partitionOwnerships = yield this._partitionManager.listOwnership(this._eventHubClient.eventHubName, this._consumerGroupName); | ||
for (const ownership of partitionOwnerships) { | ||
if (ownership.partitionId === partitionId && ownership.sequenceNumber) { | ||
eventPosition = EventPosition.fromSequenceNumber(ownership.sequenceNumber); | ||
break; | ||
} | ||
if (partitionIds.length > 0) { | ||
const partitionToClaim = this._partitionLoadBalancer.loadBalance(partitionOwnershipMap, partitionIds); | ||
if (partitionToClaim) { | ||
yield this._claimOwnership(partitionOwnershipMap, partitionToClaim); | ||
} | ||
tasks.push(this._pumpManager.createPump(this._eventHubClient, partitionContext, eventPosition, partitionProcessor)); | ||
} | ||
// wait for all the new pumps to be created | ||
yield Promise.all(tasks); | ||
log.eventProcessor(`[${this._id}] PartitionPumps created within EventProcessor.`); | ||
// sleep | ||
@@ -135,21 +180,12 @@ log.eventProcessor(`[${this._id}] Pausing the EventProcessor loop for ${waitIntervalInMs} ms.`); | ||
} | ||
// loop has completed, remove all existing pumps | ||
return this._pumpManager.removeAllPumps(CloseReason.Shutdown); | ||
}); | ||
} | ||
/** | ||
* The unique identifier for the EventProcessor. | ||
* Starts the `EventProcessor`. Based on the number of instances of `EventProcessor` that are running for the | ||
* same consumer group, the partitions are distributed among these instances to process events. | ||
* | ||
* @return {string} | ||
*/ | ||
get id() { | ||
return this._id; | ||
} | ||
/** | ||
* Starts processing of events for all partitions of the Event Hub that this event processor can own, assigning a | ||
* dedicated `PartitionProcessor` to each partition. If there are other Event Processors active for the same | ||
* consumer group on the Event Hub, responsibility for partitions will be shared between them. | ||
* For each partition, the user provided `PartitionProcessor` is instantiated. | ||
* | ||
* Subsequent calls to start will be ignored if this event processor is already running. Calling `start()` after `stop()` | ||
* is called will restart this event processor. | ||
* Subsequent calls to start will be ignored if this event processor is already running. | ||
* Calling `start()` after `stop()` is called will restart this event processor. | ||
* | ||
@@ -169,4 +205,4 @@ * @return {void} | ||
/** | ||
* Stops processing events for all partitions owned by this event processor. All `PartitionProcessor` will be | ||
* shutdown and any open resources will be closed. | ||
* Stops processing events for all partitions owned by this event processor. | ||
* All `PartitionProcessor` will be shutdown and any open resources will be closed. | ||
* | ||
@@ -185,2 +221,4 @@ * Subsequent calls to stop will be ignored if the event processor is not running. | ||
try { | ||
// remove all existing pumps | ||
yield this._pumpManager.removeAllPumps(CloseReason.Shutdown); | ||
// waits for the event processor loop to complete | ||
@@ -187,0 +225,0 @@ // will complete immediately if _loopTask is undefined |
@@ -9,6 +9,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
export { EventDataBatch } from "./eventDataBatch"; | ||
export { CheckpointManager } from "./checkpointManager"; | ||
export { EventProcessor, CloseReason } from "./eventProcessor"; | ||
export { PartitionContext } from "./partitionContext"; | ||
export { InMemoryPartitionManager } from "./inMemoryPartitionManager"; | ||
export { PartitionProcessor } from "./partitionProcessor"; | ||
export { MessagingError, DefaultDataTransformer, TokenType, delay } from "@azure/core-amqp"; | ||
//# sourceMappingURL=index.js.map |
@@ -6,3 +6,10 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
/** | ||
* A simple in-memory implementation of a `PartitionManager` | ||
* The `EventProcessor` relies on a `PartitionManager` to store checkpoints and handle partition | ||
* ownerships. `InMemoryPartitionManager` is simple partition manager that stores checkpoints and | ||
* partition ownerships in memory of your program. | ||
* | ||
* You can use the `InMemoryPartitionManager` to get started with using the `EventProcessor`. | ||
* But in production, you should choose an implementation of the `PartitionManager` interface that will | ||
* store the checkpoints and partition ownerships to a durable store instead. | ||
* | ||
* @class | ||
@@ -37,4 +44,7 @@ */ | ||
for (const ownership of partitionOwnership) { | ||
if (!this._partitionOwnershipMap.has(ownership.partitionId)) { | ||
if (!this._partitionOwnershipMap.has(ownership.partitionId) || | ||
this._partitionOwnershipMap.get(ownership.partitionId).eTag === ownership.eTag) { | ||
ownership.eTag = generate_uuid(); | ||
var date = new Date(); | ||
ownership.lastModifiedTimeInMS = date.getTime(); | ||
this._partitionOwnershipMap.set(ownership.partitionId, ownership); | ||
@@ -41,0 +51,0 @@ } |
@@ -69,2 +69,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
export const eventProcessor = debugModule("azure:event-hubs:eventProcessor"); | ||
/** | ||
* @ignore | ||
* log statements for partitionLoadBalancer | ||
*/ | ||
export const partitionLoadBalancer = debugModule("azure:event-hubs:partitionLoadBalancer"); | ||
//# sourceMappingURL=log.js.map |
@@ -108,3 +108,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
beginningSequenceNumber: info.begin_sequence_number, | ||
eventHubPath: info.name, | ||
eventHubName: info.name, | ||
lastEnqueuedOffset: info.last_enqueued_offset, | ||
@@ -111,0 +111,0 @@ lastEnqueuedTimeUtc: new Date(info.last_enqueued_time_utc), |
// Copyright (c) Microsoft Corporation. All rights reserved. | ||
// Licensed under the MIT License. | ||
import * as tslib_1 from "tslib"; | ||
/** | ||
* `PartitionContext` holds information on the partition, consumer group and event hub | ||
* being processed by the `EventProcessor`. | ||
* It also allows users to update checkpoints via the `updateCheckpoint` method. | ||
* | ||
* User is never meant to create `PartitionContext` directly. It is only passed to user code | ||
* by the `EventProcessor`. | ||
*/ | ||
export class PartitionContext { | ||
constructor(eventHubName, consumerGroupName, partitionId, partitionManager, eventProcessorId) { | ||
this._eTag = ""; | ||
this._eventHubName = eventHubName; | ||
this._consumerGroupName = consumerGroupName; | ||
this._partitionId = partitionId; | ||
this._partitionManager = partitionManager; | ||
this._eventProcessorId = eventProcessorId; | ||
} | ||
/** | ||
* @property The consumer group name | ||
* @readonly | ||
*/ | ||
get consumerGroupName() { | ||
return this._consumerGroupName; | ||
} | ||
/** | ||
* @property The event hub name | ||
* @readonly | ||
*/ | ||
get eventHubName() { | ||
return this._eventHubName; | ||
} | ||
/** | ||
* @property The identifier of the Event Hub partition | ||
* @readonly | ||
*/ | ||
get partitionId() { | ||
return this._partitionId; | ||
} | ||
updateCheckpoint(eventDataOrSequenceNumber, offset) { | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
const checkpoint = { | ||
eventHubName: this._eventHubName, | ||
consumerGroupName: this._consumerGroupName, | ||
ownerId: this._eventProcessorId, | ||
partitionId: this._partitionId, | ||
sequenceNumber: typeof eventDataOrSequenceNumber === "number" | ||
? eventDataOrSequenceNumber | ||
: eventDataOrSequenceNumber.sequenceNumber, | ||
offset: typeof offset === "number" ? offset : eventDataOrSequenceNumber.offset, | ||
eTag: this._eTag | ||
}; | ||
this._eTag = yield this._partitionManager.updateCheckpoint(checkpoint); | ||
}); | ||
} | ||
} | ||
//# sourceMappingURL=partitionContext.js.map |
@@ -6,6 +6,5 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import { CloseReason } from "./eventProcessor"; | ||
import { EventPosition } from "./eventPosition"; | ||
import { AbortController } from "@azure/abort-controller"; | ||
export class PartitionPump { | ||
constructor(eventHubClient, partitionContext, partitionProcessor, options) { | ||
constructor(eventHubClient, partitionContext, partitionProcessor, initialEventPosition, options) { | ||
this._isReceiving = false; | ||
@@ -17,2 +16,3 @@ if (!options) | ||
this._partitionProcessor = partitionProcessor; | ||
this._initialEventPosition = initialEventPosition; | ||
this._processorOptions = options; | ||
@@ -27,10 +27,8 @@ this._abortController = new AbortController(); | ||
this._isReceiving = true; | ||
if (typeof this._partitionProcessor.initialize === "function") { | ||
try { | ||
yield this._partitionProcessor.initialize(); | ||
} | ||
catch (_a) { | ||
// swallow the error from the user-defined code | ||
} | ||
try { | ||
yield this._partitionProcessor.initialize(this._partitionContext); | ||
} | ||
catch (_a) { | ||
// swallow the error from the user-defined code | ||
} | ||
this._receiveEvents(this._partitionContext.partitionId); | ||
@@ -42,3 +40,3 @@ log.partitionPump("Successfully started the receiver."); | ||
return tslib_1.__awaiter(this, void 0, void 0, function* () { | ||
this._receiver = this._eventHubClient.createConsumer(this._partitionContext.consumerGroupName, partitionId, this._processorOptions.initialEventPosition || EventPosition.earliest()); | ||
this._receiver = this._eventHubClient.createConsumer(this._partitionContext.consumerGroupName, partitionId, this._initialEventPosition, { ownerLevel: 0 }); | ||
while (this._isReceiving) { | ||
@@ -51,3 +49,3 @@ try { | ||
} | ||
yield this._partitionProcessor.processEvents(receivedEvents); | ||
yield this._partitionProcessor.processEvents(receivedEvents, this._partitionContext); | ||
} | ||
@@ -63,3 +61,3 @@ catch (err) { | ||
try { | ||
yield this._partitionProcessor.processError(err); | ||
yield this._partitionProcessor.processError(err, this._partitionContext); | ||
} | ||
@@ -72,2 +70,7 @@ catch (err) { | ||
try { | ||
// If the exception indicates that the partition was stolen (i.e some other consumer with same ownerlevel | ||
// started consuming the partition), update the closeReason | ||
if (err.name === "ReceiverDisconnectedError") { | ||
return yield this.stop(CloseReason.OwnershipLost); | ||
} | ||
// this will close the pump and will break us out of the while loop | ||
@@ -92,5 +95,3 @@ return yield this.stop(CloseReason.EventHubException); | ||
this._abortController.abort(); | ||
if (typeof this._partitionProcessor.close === "function") { | ||
yield this._partitionProcessor.close(reason); | ||
} | ||
yield this._partitionProcessor.close(reason, this._partitionContext); | ||
} | ||
@@ -97,0 +98,0 @@ catch (err) { |
@@ -55,3 +55,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
log.pumpManager(`[${this._eventProcessorName}] [${partitionId}] Creating a new pump.`); | ||
const pump = new PartitionPump(eventHubClient, partitionContext, partitionProcessor, Object.assign({}, this._options, { initialEventPosition })); | ||
const pump = new PartitionPump(eventHubClient, partitionContext, partitionProcessor, initialEventPosition, this._options); | ||
try { | ||
@@ -58,0 +58,0 @@ yield pump.start(); |
@@ -14,3 +14,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
* in the context of a specific consumer group. | ||
* To create a consumer use the `createConsumer()` method on your `EventHubClient`. | ||
* | ||
* You can pass the below in the `options` when creating a consumer. | ||
* - `ownerLevel` : A number indicating that the consumer intends to be an exclusive consumer of events resulting in other | ||
* consumers to fail if their `ownerLevel` is lower or doesn't exist. | ||
* - `retryOptions`: The retry options used to govern retry attempts when an issue is encountered while receiving events. | ||
* | ||
* Multiple consumers are allowed on the same partition in a consumer group. | ||
@@ -21,4 +27,4 @@ * If there is a need to have an exclusive consumer for a partition in a consumer group, | ||
* | ||
* The consumer can be used to receive messages in a batch or by registering handlers. | ||
* Use the `createConsumer` function on the EventHubClient to instantiate an EventHubConsumer. | ||
* The consumer can be used to receive messages in a batch using `receiveBatch()` or by registering handlers | ||
* by using `receive()` or via an async iterable got by using `getEventIterator()` | ||
* @class | ||
@@ -28,2 +34,5 @@ */ | ||
/** | ||
* EventHubConsumer should not be constructed using `new EventHubConsumer()` | ||
* Use the `createConsumer()` method on your `EventHubClient` instead. | ||
* @private | ||
* @constructor | ||
@@ -41,2 +50,3 @@ * @internal | ||
this._partitionId = partitionId; | ||
this._lastEnqueuedEventInfo = {}; | ||
this._receiverOptions = options || {}; | ||
@@ -47,2 +57,11 @@ this._retryOptions = this._receiverOptions.retryOptions || {}; | ||
/** | ||
* @property The last enqueued event information. This property will only | ||
* be enabled when `trackLastEnqueuedEventInfo` option is set to true in the | ||
* `client.createConsumer()` method. | ||
* @readonly | ||
*/ | ||
get lastEnqueuedEventInfo() { | ||
return this._lastEnqueuedEventInfo; | ||
} | ||
/** | ||
* @property Returns `true` if the consumer is closed. This can happen either because the consumer | ||
@@ -91,8 +110,7 @@ * itself has been closed or the client that created it has been closed. | ||
/** | ||
* Starts the consumer by establishing an AMQP session and an AMQP receiver link on the session. Messages will be passed to | ||
* the provided onMessage handler and error will be passed to the provided onError handler. | ||
* Starts receiving events from the service and calls the user provided message handler for each event. | ||
* Returns an object that can be used to query the state of the receiver and to stop receiving events as well. | ||
* | ||
* @param onMessage The message handler to receive event data objects. | ||
* @param onError The error handler to receive an error that occurs | ||
* while receiving messages. | ||
* @param onError The error handler for errora that can occur when receiving events. | ||
* @param abortSignal An implementation of the `AbortSignalLike` interface to signal the request to cancel the operation. | ||
@@ -142,2 +160,7 @@ * For example, use the @azure/abort-controller to create an `AbortSignal`. | ||
baseConsumer.registerHandlers(onMessage, wrappedOnError, Constants.defaultPrefetchCount, true, abortSignal, onAbort); | ||
if (this._receiverOptions.trackLastEnqueuedEventInfo && | ||
this._baseConsumer && | ||
this._baseConsumer.runtimeInfo) { | ||
this._lastEnqueuedEventInfo = this._baseConsumer.runtimeInfo; | ||
} | ||
return new ReceiveHandler(baseConsumer); | ||
@@ -149,3 +172,3 @@ } | ||
* The async iterable cannot indicate that it is done. | ||
* When using `for..await..of` to iterate over the events returned | ||
* When using `for await (let event of consumer.getEventIterator()) {}` to iterate over the events returned | ||
* by the async iterable, take care to exit the for loop after receiving the | ||
@@ -170,7 +193,6 @@ * desired number of messages, or provide an `AbortSignal` to control when to exit the loop. | ||
/** | ||
* Receives a batch of EventData objects from an EventHub partition for a given count and a given max wait time in seconds, whichever | ||
* happens first. This method can be used directly after creating the consumer object and **MUST NOT** be used along with the `start()` method. | ||
* Returns a promise that resolves to an array of events received from the service. | ||
* | ||
* @param maxMessageCount The maximum number of messages to receive in this batch. Must be a value greater than 0. | ||
* @param [maxWaitTimeInSeconds] The maximum amount of time to wait to build up the requested message count for the batch; | ||
* @param maxMessageCount The maximum number of messages to receive. | ||
* @param maxWaitTimeInSeconds The maximum amount of time to wait to build up the requested message count; | ||
* If not provided, it defaults to 60 seconds. | ||
@@ -242,2 +264,7 @@ * @param abortSignal An implementation of the `AbortSignalLike` interface to signal the request to cancel the operation. | ||
receivedEvents.push(eventData); | ||
if (this._receiverOptions.trackLastEnqueuedEventInfo && | ||
this._baseConsumer && | ||
this._baseConsumer.runtimeInfo) { | ||
this._lastEnqueuedEventInfo = this._baseConsumer.runtimeInfo; | ||
} | ||
// resolve the operation's promise after the requested | ||
@@ -244,0 +271,0 @@ // number of events are received. |
@@ -9,12 +9,17 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
/** | ||
* A producer responsible for sending `EventData` to a specific Event Hub. | ||
* If `partitionId` is specified in the `options`, all event data sent using the producer | ||
* A producer responsible for sending events to an Event Hub. | ||
* To create a producer use the `createProducer()` method on your `EventHubClient`. | ||
* You can pass the below in the `options` when creating a producer. | ||
* - `partitionId` : The identifier of the partition that the producer can be bound to. | ||
* - `retryOptions` : The retry options used to govern retry attempts when an issue is encountered while sending events. | ||
* A simple usage can be `{ "maxRetries": 4 }`. | ||
* | ||
* If `partitionId` is specified when creating a producer, all event data sent using the producer | ||
* will be sent to the specified partition. | ||
* Otherwise, they are automatically routed to an available partition by the Event Hubs service. | ||
* | ||
* Allowing automatic routing of partitions is recommended when: | ||
* - The sending of events needs to be highly available. | ||
* - The event data should be evenly distributed among all available partitions. | ||
* Automatic routing of partitions is recommended because: | ||
* - The sending of events will be highly available. | ||
* - The event data will be evenly distributed among all available partitions. | ||
* | ||
* Use the `createProducer` function on the EventHubClient to instantiate an EventHubProducer. | ||
* @class | ||
@@ -24,2 +29,5 @@ */ | ||
/** | ||
* EventHubProducer should not be constructed using `new EventHubProduer()` | ||
* Use the `createProducer()` method on your `EventHubClient` instead. | ||
* @private | ||
* @constructor | ||
@@ -49,5 +57,9 @@ * @internal | ||
/** | ||
* Creates an instance of EventDataBatch to which one can add events until the maximum supported size is reached. | ||
* The batch can be passed to the send method of the EventHubProducer to be sent to Azure Event Hubs | ||
* @param options Options to define partition key and max message size. | ||
* Creates an instance of `EventDataBatch` to which one can add events until the maximum supported size is reached. | ||
* The batch can be passed to the `send()` method of the `EventHubProducer` to be sent to Azure Event Hubs. | ||
* @param options A set of options to configure the behavior of the batch. | ||
* - `partitionKey` : A value that is hashed to produce a partition assignment. | ||
* Not applicable if the `EventHubProducer` was created using a `partitionId`. | ||
* - `maxSizeInBytes`: The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached. | ||
* - `abortSignal` : A signal the request to cancel the send operation. | ||
* @returns Promise<EventDataBatch> | ||
@@ -84,7 +96,11 @@ */ | ||
/** | ||
* Send a single or an array of events to the associated Event Hub. | ||
* Send one or more of events to the associated Event Hub. | ||
* | ||
* @param eventData An individual event data or array of event data objects to send. | ||
* @param eventData An individual `EventData` object, or an array of `EventData` objects or an | ||
* instance of `EventDataBatch`. | ||
* @param options The set of options that can be specified to influence the way in which | ||
* events are sent to the associated Event Hub, including an abort signal to cancel the operation. | ||
* events are sent to the associated Event Hub. | ||
* - `partitionKey` : A value that is hashed to produce a partition assignment. | ||
* Not applicable if the `EventHubProducer` was created using a `partitionId`. | ||
* - `abortSignal` : A signal the request to cancel the send operation. | ||
* | ||
@@ -91,0 +107,0 @@ * @returns Promise<void> |
@@ -8,4 +8,4 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
name: "@azure/event-hubs", | ||
version: "5.0.0-preview.2" | ||
version: "5.0.0-preview.3" | ||
}; | ||
//# sourceMappingURL=constants.js.map |
{ | ||
"name": "@azure/event-hubs", | ||
"sdk-type": "client", | ||
"version": "5.0.0-preview.2", | ||
"version": "5.0.0-preview.3", | ||
"description": "Azure Event Hubs SDK for JS.", | ||
@@ -54,3 +54,3 @@ "author": "Microsoft Corporation", | ||
"lint:fix": "eslint -c ../../.eslintrc.json src test samples --ext .ts --fix", | ||
"lint": "eslint -c ../../.eslintrc.json src test samples --ext .ts -f node_modules/eslint-detailed-reporter/lib/detailed.js -o event-hubs-lintReport.html || exit 0", | ||
"lint": "eslint -c ../../.eslintrc.json src test samples --ext .ts -f html -o event-hubs-lintReport.html || exit 0", | ||
"pack": "npm pack 2>&1", | ||
@@ -66,9 +66,11 @@ "prebuild": "npm run clean", | ||
"dependencies": { | ||
"@azure/abort-controller": "1.0.0-preview.1", | ||
"@azure/core-amqp": "1.0.0-preview.2", | ||
"@azure/abort-controller": "1.0.0-preview.2", | ||
"@azure/core-amqp": "1.0.0-preview.3", | ||
"@azure/core-asynciterator-polyfill": "1.0.0-preview.1", | ||
"async-lock": "^1.1.3", | ||
"debug": "^3.1.0", | ||
"buffer": "^5.2.1", | ||
"debug": "^4.1.1", | ||
"is-buffer": "^2.0.3", | ||
"jssha": "^2.3.1", | ||
"process": "^0.11.10", | ||
"rhea-promise": "^1.0.0", | ||
@@ -79,3 +81,3 @@ "tslib": "^1.9.3", | ||
"devDependencies": { | ||
"@azure/identity": "1.0.0-preview.2", | ||
"@azure/identity": "1.0.0-preview.3", | ||
"@microsoft/api-extractor": "^7.1.5", | ||
@@ -86,3 +88,3 @@ "@types/async-lock": "^1.1.0", | ||
"@types/chai-string": "^1.4.1", | ||
"@types/debug": "^0.0.31", | ||
"@types/debug": "^4.1.4", | ||
"@types/dotenv": "^6.1.0", | ||
@@ -94,4 +96,4 @@ "@types/long": "^4.0.0", | ||
"@types/ws": "^6.0.1", | ||
"@typescript-eslint/eslint-plugin": "^1.11.0", | ||
"@typescript-eslint/parser": "^1.11.0", | ||
"@typescript-eslint/eslint-plugin": "^2.0.0", | ||
"@typescript-eslint/parser": "^2.0.0", | ||
"assert": "^1.4.1", | ||
@@ -103,5 +105,4 @@ "chai": "^4.2.0", | ||
"dotenv": "^8.0.0", | ||
"eslint": "^5.16.0", | ||
"eslint": "^6.1.0", | ||
"eslint-config-prettier": "^6.0.0", | ||
"eslint-detailed-reporter": "^0.8.0", | ||
"eslint-plugin-no-null": "^1.0.2", | ||
@@ -113,3 +114,3 @@ "eslint-plugin-no-only-tests": "^2.3.0", | ||
"karma-chrome-launcher": "^3.0.0", | ||
"karma-coverage": "^1.1.2", | ||
"karma-coverage": "^2.0.0", | ||
"karma-edge-launcher": "^0.4.2", | ||
@@ -123,9 +124,9 @@ "karma-env-preprocessor": "^0.1.1", | ||
"karma-remap-coverage": "^0.1.5", | ||
"mocha": "^5.2.0", | ||
"mocha": "^6.2.0", | ||
"mocha-junit-reporter": "^1.18.0", | ||
"mocha-multi": "^1.0.1", | ||
"mocha-multi": "^1.1.3", | ||
"nyc": "^14.0.0", | ||
"prettier": "^1.16.4", | ||
"puppeteer": "^1.11.0", | ||
"rimraf": "^2.6.2", | ||
"rimraf": "^3.0.0", | ||
"rollup": "^1.16.3", | ||
@@ -132,0 +133,0 @@ "rollup-plugin-commonjs": "^10.0.0", |
@@ -19,3 +19,3 @@ # Azure Event Hubs client library for Javascript | ||
`npm install @azure/event-hubs@5.0.0-preview.2` | ||
`npm install @azure/event-hubs@next` | ||
@@ -207,51 +207,64 @@ **Prerequisites**: You must have an [Azure subscription](https://azure.microsoft.com/free/) and a | ||
Using an `EventHubConsumer` to consume events like in the previous examples puts the responsibility of storing the checkpoints (the last processed event) on the user. Checkpoints are important for restarting the task of processing events from the right position in a partition. Ideally, you would also want to run multiple programs targeting different partitions with some load balancing. | ||
This is where an [EventProcessor](https://azure.github.io/azure-sdk-for-js/event-hubs/classes/eventprocessor.html) can help. | ||
[EventProcessor](https://azure.github.io/azure-sdk-for-js/event-hubs/classes/eventprocessor.html) is a high level construct which internally uses the `EventHubConsumer` which is mentioned in previous examples | ||
to receive events from multiple partitions at once. | ||
Typically, Event Processor based applications consist of one or more instances of EventProcessor instances which have been | ||
configured to consume events from the same Event Hub and consumer group. They balance the | ||
workload across different instances by distributing the partitions to be processed among themselves. | ||
They also allow the user to track progress when events are processed using checkpoints. | ||
The `EventProcessor` will delegate the processing of events to a [PartitionProcessor](https://azure.github.io/azure-sdk-for-js/event-hubs/interfaces/partitionprocessor.html) | ||
that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer | ||
that you provide, allowing you to focus on business logic while the `EventProcessor` holds responsibility for managing the underlying consumer | ||
operations including checkpointing and load balancing. | ||
While load balancing is a feature we will be adding in the next update, you can see how to use the `EventProcessor` in the below | ||
example, where we use an [InMemoryPartitionManager](https://azure.github.io/azure-sdk-for-js/event-hubs/classes/inmemorypartitionmanager.html) that does checkpointing in memory. | ||
A checkpoint is meant to represent the last successfully processed event by the user from a particular | ||
partition of a consumer group in an Event Hub instance. The `EventProcessor` uses an instance of `PartitionManager` | ||
to update checkpoints and to store the relevant information required by the load balancing algorithm. | ||
While for the purposes of getting started you can use the [InMemoryPartitionManager](https://azure.github.io/azure-sdk-for-js/event-hubs/classes/inmemorypartitionmanager.html) that is shipped out of the box from this library, | ||
it is recommended to use a peristent store when running in production. | ||
Search npm with the prefix `@azure/eventhubs-checkpointstore-` to find packages that support this and use the `PartitionManager` implementation from one such package. | ||
In the below example, we create two instances of EventProcessor against the same Event Hub and consumer group, | ||
using an `InMemoryPartitionManager`. | ||
```javascript | ||
class SimplePartitionProcessor { | ||
// Gets called once before the processing of events from current partition starts. | ||
async initialize() { | ||
// Your Partition Processor where you override the `processEvents` method to include your | ||
// business logic for processing events. | ||
// You may choose to also override other methods like `initialize`, `close`, `processError` as you see fit | ||
class SamplePartitionProcessor extends PartitionProcessor { | ||
/** | ||
* Gets called for each batch of events that are received. | ||
* @param events Array of events recieved | ||
* @param partitionContext Instance of class `PartitionContext` that holds information about the current | ||
* partition, event hub, consumer group and the method `updateCheckpoint` for you to checkpoint an event. | ||
*/ | ||
async processEvents(events, partitionContext) { | ||
/* your code here */ | ||
} | ||
// Gets called for each batch of events that are received. | ||
// You may choose to use the checkpoint manager to update checkpoints. | ||
async processEvents(events) { | ||
/* your code here */ | ||
} | ||
// Gets called for any error when receiving events. | ||
async processError(error) { | ||
/* your code here */ | ||
} | ||
// Gets called when Event Processor stops processing events for current partition. | ||
async close(reason) { | ||
/* your code here */ | ||
} | ||
} | ||
const client = new EventHubClient("my-connection-string", "my-event-hub"); | ||
const processor = new EventProcessor( | ||
const partitionManager = new InMemoryPartitionManager(); | ||
const processor1 = new EventProcessor( | ||
EventHubClient.defaultConsumerGroupName, | ||
client, | ||
(partitionContext, checkpointManager) => new SimplePartitionProcessor(), | ||
new InMemoryPartitionManager() | ||
SamplePartitionProcessor, | ||
partitionManager | ||
); | ||
await processor.start(); | ||
// At this point, the processor is consuming events from each partition of the Event Hub and | ||
// delegating them to the SimplePartitionProcessor instance created for that partition. This | ||
// processing takes place in the background and will not block. | ||
const processor2 = new EventProcessor( | ||
EventHubClient.defaultConsumerGroupName, | ||
client, | ||
SamplePartitionProcessor, | ||
partitionManager | ||
); | ||
await processor1.start(); | ||
await processor2.start(); | ||
// At this point, both processors are consuming events from different partitions of the Event Hub and | ||
// delegating them to the SamplePartitionProcessor instance created for that partition. | ||
// This processing takes place in the background and will not block. | ||
// | ||
// In this example, we'll stop processing after five seconds. | ||
await delay(5000); | ||
await processor.stop(); | ||
// In this example, we'll stop processing after thirty seconds. | ||
await delay(30000); | ||
await processor1.stop(); | ||
await processor2.stop(); | ||
``` | ||
@@ -258,0 +271,0 @@ |
@@ -110,2 +110,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
retrievalTime?: Date; | ||
/** | ||
* @property [systemProperties] The properties set by the service. | ||
*/ | ||
systemProperties?: Dictionary<any>; | ||
} | ||
@@ -122,15 +126,26 @@ | ||
}; | ||
if (msg.message_annotations) { | ||
if (msg.message_annotations[Constants.partitionKey] != undefined) { | ||
data.partitionKey = msg.message_annotations[Constants.partitionKey]; | ||
for (const annotationKey of Object.keys(msg.message_annotations)) { | ||
switch (annotationKey) { | ||
case Constants.partitionKey: | ||
data.partitionKey = msg.message_annotations[annotationKey]; | ||
break; | ||
case Constants.sequenceNumber: | ||
data.sequenceNumber = msg.message_annotations[annotationKey]; | ||
break; | ||
case Constants.enqueuedTime: | ||
data.enqueuedTimeUtc = new Date(msg.message_annotations[annotationKey]); | ||
break; | ||
case Constants.offset: | ||
data.offset = msg.message_annotations[annotationKey]; | ||
break; | ||
default: | ||
if (!data.systemProperties) { | ||
data.systemProperties = {}; | ||
} | ||
data.systemProperties[annotationKey] = msg.message_annotations[annotationKey]; | ||
break; | ||
} | ||
} | ||
if (msg.message_annotations[Constants.sequenceNumber] != undefined) { | ||
data.sequenceNumber = msg.message_annotations[Constants.sequenceNumber]; | ||
} | ||
if (msg.message_annotations[Constants.enqueuedTime] != undefined) { | ||
data.enqueuedTimeUtc = new Date(msg.message_annotations[Constants.enqueuedTime] as number); | ||
} | ||
if (msg.message_annotations[Constants.offset] != undefined) { | ||
data.offset = msg.message_annotations[Constants.offset]; | ||
} | ||
} | ||
@@ -179,3 +194,4 @@ if (msg.application_properties) { | ||
/** | ||
* Describes the structure of an event to be sent to Event Hub. | ||
* `EventData` is the interface that describes the event data to be sent to Event Hub. | ||
* A simple instance can be `{ body: "your-data" }`. | ||
* @interface | ||
@@ -185,7 +201,7 @@ */ | ||
/** | ||
* @property The message body that needs to be sent or is received. | ||
* @property The message body that needs to be sent. | ||
*/ | ||
body: any; | ||
/** | ||
* @property The application specific properties. | ||
* @property Set of key value pairs that can be used to set properties specific to user application. | ||
*/ | ||
@@ -228,2 +244,8 @@ properties?: { | ||
sequenceNumber: number; | ||
/** | ||
* @property The properties set by the service. | ||
*/ | ||
systemProperties?: { | ||
[key: string]: any; | ||
}; | ||
} |
@@ -11,6 +11,11 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
/** | ||
* A class representing a batch of events which can be passed to the `send` method of a `EventConsumer` instance. | ||
* A class representing a batch of events which can be passed to the `send` method of a `EventProducer` instance. | ||
* This batch is ensured to be under the maximum message size supported by Azure Event Hubs service. | ||
* | ||
* Use the `tryAdd` function on the EventDataBatch to add events in a batch. | ||
* Use `createBatch()` method on the `EventHubProducer` to create an instance of `EventDataBatch` | ||
* instead of using `new EventDataBatch()`. You can specify an upper limit for the size of the batch | ||
* via options when calling `createBatch()`. | ||
* | ||
* Use the `tryAdd` function on the EventDataBatch to add events to the batch. This method will return | ||
* `false` after the upper limit is reached, therefore check the result before calling `tryAdd()` again. | ||
* @class | ||
@@ -51,2 +56,4 @@ */ | ||
/** | ||
* EventDataBatch should not be constructed using `new EventDataBatch()` | ||
* Use the `createBatch()` method on your `EventHubProducer` instead. | ||
* @constructor | ||
@@ -65,3 +72,4 @@ * @internal | ||
/** | ||
* @property The partitionKey set during `EventDataBatch` creation. This value is hashed to produce a partition assignment when the consumer is created without a `partitionId` | ||
* @property The partitionKey set during `EventDataBatch` creation. This value is hashed to | ||
* produce a partition assignment when the producer is created without a `partitionId` | ||
* @readonly | ||
@@ -74,3 +82,4 @@ */ | ||
/** | ||
* @property Size of a batch of events. | ||
* @property Size of the `EventDataBatch` instance after the events added to it have been | ||
* encoded into a single AMQP message. | ||
* @readonly | ||
@@ -83,3 +92,3 @@ */ | ||
/** | ||
* @property Number of events in the batch. | ||
* @property Number of events in the `EventDataBatch` instance. | ||
* @readonly | ||
@@ -92,3 +101,9 @@ */ | ||
/** | ||
* @property Encoded batch message. | ||
* @property Represents the single AMQP message which is the result of encoding all the events | ||
* added into the `EventDataBatch` instance. | ||
* | ||
* This is not meant for the user to use directly. | ||
* | ||
* When the `EventDataBatch` instance is passed to the `send()` method on the `EventHubProducer`, | ||
* this single batched AMQP message is what gets sent over the wire to the service. | ||
* @readonly | ||
@@ -102,2 +117,5 @@ */ | ||
* Tries to add an event data to the batch if permitted by the batch's size limit. | ||
* **NOTE**: Always remember to check the return value of this method, before calling it again | ||
* for the next event. | ||
* | ||
* @param eventData An individual event data object. | ||
@@ -104,0 +122,0 @@ * @returns A boolean value indicating if the event data has been added to the batch or not. |
@@ -14,3 +14,5 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
RetryOptions, | ||
Constants | ||
Constants, | ||
parseConnectionString, | ||
EventHubConnectionStringModel | ||
} from "@azure/core-amqp"; | ||
@@ -31,5 +33,5 @@ | ||
retryOptions == undefined || | ||
typeof retryOptions.timeoutInMs !== "number" || | ||
!isFinite(retryOptions.timeoutInMs) || | ||
retryOptions.timeoutInMs < Constants.defaultOperationTimeoutInMs | ||
typeof retryOptions.timeoutInMs !== "number" || | ||
!isFinite(retryOptions.timeoutInMs) || | ||
retryOptions.timeoutInMs < Constants.defaultOperationTimeoutInMs | ||
? Constants.defaultOperationTimeoutInMs | ||
@@ -43,2 +45,5 @@ : retryOptions.timeoutInMs; | ||
* These can be specified when creating the producer via the `createProducer` method. | ||
* - `partitionId` : The string identifier of the partition that the producer can be bound to. | ||
* - `retryOptions` : The retry options used to govern retry attempts when an issue is encountered while sending events. | ||
* A simple usage can be `{ "maxRetries": 4 }`. | ||
*/ | ||
@@ -62,3 +67,12 @@ export interface EventHubProducerOptions { | ||
/** | ||
* The set of options to configure the send operation on the `EventHubProducer`. | ||
* The set of options to configure the `send` operation on the `EventHubProducer`. | ||
* - `partitionKey` : A value that is hashed to produce a partition assignment. | ||
* - `abortSignal` : A signal the request to cancel the send operation. | ||
* | ||
* Example usage: | ||
* ```js | ||
* { | ||
* partitionKey: 'foo' | ||
* } | ||
* ``` | ||
*/ | ||
@@ -82,3 +96,15 @@ export interface SendOptions { | ||
/** | ||
* The set of options to configure the createBatch operation on the `EventProducer`. | ||
* The set of options to configure the `createBatch` operation on the `EventProducer`. | ||
* - `partitionKey` : A value that is hashed to produce a partition assignment. | ||
* Not applicable if the `EventHubProducer` was created using a `partitionId`. | ||
* - `maxSizeInBytes`: The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached. | ||
* - `abortSignal` : A signal the request to cancel the send operation. | ||
* | ||
* Example usage: | ||
* ```js | ||
* { | ||
* partitionKey: 'foo', | ||
* maxSizeInBytes: 1024 * 1024 // 1 MB | ||
* } | ||
* ``` | ||
*/ | ||
@@ -95,3 +121,3 @@ export interface BatchOptions { | ||
* @property | ||
* The maximum size allowed for the batch. | ||
* The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached. | ||
*/ | ||
@@ -110,2 +136,16 @@ maxSizeInBytes?: number; | ||
* These can be specified when creating the consumer using the `createConsumer` method. | ||
* - `ownerLevel` : A number indicating that the consumer intends to be an exclusive consumer of events resulting in other | ||
* consumers to fail if their `ownerLevel` is lower or doesn't exist. | ||
* - `retryOptions`: The retry options used to govern retry attempts when an issue is encountered while receiving events. | ||
* A simple usage can be `{ "maxRetries": 4 }`. | ||
* | ||
* Example usage: | ||
* ```js | ||
* { | ||
* retryOptions: { | ||
* maxRetries: 4 | ||
* }, | ||
* trackLastEnqueuedEventInfo: false | ||
* } | ||
* ``` | ||
*/ | ||
@@ -129,2 +169,13 @@ export interface EventHubConsumerOptions { | ||
retryOptions?: RetryOptions; | ||
/** | ||
* @property | ||
* Indicates whether or not the consumer should request information on the last enqueued event on its | ||
* associated partition, and track that information as events are received. | ||
* When information about the partition's last enqueued event is being tracked, each event received | ||
* from the Event Hubs service will carry metadata about the partition that it otherwise would not. This results in a small amount of | ||
* additional network bandwidth consumption that is generally a favorable trade-off when considered | ||
* against periodically making requests for partition properties using the Event Hub client. | ||
*/ | ||
trackLastEnqueuedEventInfo?: boolean; | ||
} | ||
@@ -134,2 +185,21 @@ | ||
* Describes the options that can be provided while creating the EventHubClient. | ||
* - `dataTransformer`: A set of `encode`/`decode` methods to be used to encode an event before sending to service | ||
* and to decode the event received from the service | ||
* - `userAgent` : A string to append to the built in user agent string that is passed as a connection property | ||
* to the service. | ||
* - `websocket` : The WebSocket constructor used to create an AMQP connection if you choose to make the connection | ||
* over a WebSocket. | ||
* - `webSocketConstructorOptions` : Options to pass to the Websocket constructor when you choose to make the connection | ||
* over a WebSocket. | ||
* - `retryOptions` : The retry options for all the operations on the client/producer/consumer. | ||
* A simple usage can be `{ "maxRetries": 4 }`. | ||
* | ||
* Example usage: | ||
* ```js | ||
* { | ||
* retryOptions: { | ||
* maxRetries: 4 | ||
* } | ||
* } | ||
* ``` | ||
* @interface ClientOptions | ||
@@ -187,2 +257,10 @@ */ | ||
* operations for sending event data, receiving events, and inspecting the connected Event Hub. | ||
* | ||
* There are multiple ways to create an `EventHubClient` | ||
* - Use the connection string from the SAS policy created for your Event Hub instance. | ||
* - Use the connection string from the SAS policy created for your Event Hub namespace, | ||
* and the name of the Event Hub instance | ||
* - Use the fully qualified domain name of your Event Hub namespace like `<yournamespace>.servicebus.windows.net`, | ||
* and a credentials object. | ||
* | ||
*/ | ||
@@ -215,2 +293,12 @@ export class EventHubClient { | ||
* @param options - A set of options to apply when configuring the client. | ||
* - `dataTransformer`: A set of `encode`/`decode` methods to be used to encode an event before sending to service | ||
* and to decode the event received from the service | ||
* - `userAgent` : A string to append to the built in user agent string that is passed as a connection property | ||
* to the service. | ||
* - `websocket` : The WebSocket constructor used to create an AMQP connection if you choose to make the connection | ||
* over a WebSocket. | ||
* - `webSocketConstructorOptions` : Options to pass to the Websocket constructor when you choose to make the connection | ||
* over a WebSocket. | ||
* - `retryOptions` : The retry options for all the operations on the client/producer/consumer. | ||
* A simple usage can be `{ "maxRetries": 4 }`. | ||
*/ | ||
@@ -223,6 +311,16 @@ constructor(connectionString: string, options?: EventHubClientOptions); | ||
* e.g. 'Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;'. | ||
* @param eventHubPath - The path of the specific Event Hub to connect the client to. | ||
* @param eventHubName - The path of the specific Event Hub to connect the client to. | ||
* @param options - A set of options to apply when configuring the client. | ||
* - `dataTransformer`: A set of `encode`/`decode` methods to be used to encode an event before sending to service | ||
* and to decode the event received from the service | ||
* - `userAgent` : A string to append to the built in user agent string that is passed as a connection property | ||
* to the service. | ||
* - `websocket` : The WebSocket constructor used to create an AMQP connection if you choose to make the connection | ||
* over a WebSocket. | ||
* - `webSocketConstructorOptions` : Options to pass to the Websocket constructor when you choose to make the connection | ||
* over a WebSocket. | ||
* - `retryOptions` : The retry options for all the operations on the client/producer/consumer. | ||
* A simple usage can be `{ "maxRetries": 4 }`. | ||
*/ | ||
constructor(connectionString: string, eventHubPath: string, options?: EventHubClientOptions); | ||
constructor(connectionString: string, eventHubName: string, options?: EventHubClientOptions); | ||
/** | ||
@@ -232,9 +330,19 @@ * @constructor | ||
* <yournamespace>.servicebus.windows.net | ||
* @param eventHubPath - The path of the specific Event Hub to connect the client to. | ||
* @param eventHubName - The path of the specific Event Hub to connect the client to. | ||
* @param credential - SharedKeyCredential object or your credential that implements the TokenCredential interface. | ||
* @param options - A set of options to apply when configuring the client. | ||
* @param options - A set of options to apply when configuring the client. | ||
* - `dataTransformer`: A set of `encode`/`decode` methods to be used to encode an event before sending to service | ||
* and to decode the event received from the service | ||
* - `userAgent` : A string to append to the built in user agent string that is passed as a connection property | ||
* to the service. | ||
* - `websocket` : The WebSocket constructor used to create an AMQP connection if you choose to make the connection | ||
* over a WebSocket. | ||
* - `webSocketConstructorOptions` : Options to pass to the Websocket constructor when you choose to make the connection | ||
* over a WebSocket. | ||
* - `retryOptions` : The retry options for all the operations on the client/producer/consumer. | ||
* A simple usage can be `{ "maxRetries": 4 }`. | ||
*/ | ||
constructor( | ||
host: string, | ||
eventHubPath: string, | ||
eventHubName: string, | ||
credential: TokenCredential, | ||
@@ -245,3 +353,3 @@ options?: EventHubClientOptions | ||
hostOrConnectionString: string, | ||
eventHubPathOrOptions?: string | EventHubClientOptions, | ||
eventHubNameOrOptions?: string | EventHubClientOptions, | ||
credentialOrOptions?: TokenCredential | EventHubClientOptions, | ||
@@ -256,11 +364,34 @@ options?: EventHubClientOptions | ||
if (!isTokenCredential(credentialOrOptions)) { | ||
const parsedCS = parseConnectionString<EventHubConnectionStringModel>(hostOrConnectionString); | ||
if ( | ||
!( | ||
parsedCS.EntityPath || | ||
(typeof eventHubNameOrOptions === "string" && eventHubNameOrOptions) | ||
) | ||
) { | ||
throw new TypeError( | ||
`Either provide "eventHubName" or the "connectionString": "${hostOrConnectionString}", ` + | ||
`must contain "EntityPath=<your-event-hub-name>".` | ||
); | ||
} | ||
if ( | ||
parsedCS.EntityPath && | ||
typeof eventHubNameOrOptions === "string" && | ||
eventHubNameOrOptions && | ||
parsedCS.EntityPath !== eventHubNameOrOptions | ||
) { | ||
throw new TypeError( | ||
`The entity path "${parsedCS.EntityPath}" in connectionString: "${hostOrConnectionString}" ` + | ||
`doesn't match with eventHubName: "${eventHubNameOrOptions}".` | ||
); | ||
} | ||
connectionString = hostOrConnectionString; | ||
if (typeof eventHubPathOrOptions !== "string") { | ||
if (typeof eventHubNameOrOptions !== "string") { | ||
// connectionstring and/or options were passed to constructor | ||
config = EventHubConnectionConfig.create(connectionString); | ||
options = eventHubPathOrOptions; | ||
options = eventHubNameOrOptions; | ||
} else { | ||
// connectionstring, eventHubPath and/or options were passed to constructor | ||
const eventHubPath = eventHubPathOrOptions; | ||
config = EventHubConnectionConfig.create(connectionString, eventHubPath); | ||
// connectionstring, eventHubName and/or options were passed to constructor | ||
const eventHubName = eventHubNameOrOptions; | ||
config = EventHubConnectionConfig.create(connectionString, eventHubName); | ||
options = credentialOrOptions; | ||
@@ -271,9 +402,12 @@ } | ||
} else { | ||
// host, eventHubPath, a TokenCredential and/or options were passed to constructor | ||
const eventHubPath = eventHubPathOrOptions; | ||
// host, eventHubName, a TokenCredential and/or options were passed to constructor | ||
const eventHubName = eventHubNameOrOptions; | ||
let host = hostOrConnectionString; | ||
credential = credentialOrOptions; | ||
if (!eventHubName) { | ||
throw new TypeError(`"eventHubName" is missing`); | ||
} | ||
if (!host.endsWith("/")) host += "/"; | ||
connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;SharedAccessKey=defaultKeyValue;EntityPath=${eventHubPath}`; | ||
connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;SharedAccessKey=defaultKeyValue;EntityPath=${eventHubName}`; | ||
config = EventHubConnectionConfig.create(connectionString); | ||
@@ -323,3 +457,3 @@ } | ||
/** | ||
* Creates an Event Hub producer responsible for sending `EventData` to the Event Hub. | ||
* Creates an Event Hub producer that can send events to the Event Hub. | ||
* If `partitionId` is specified in the `options`, all event data sent using the producer | ||
@@ -329,11 +463,13 @@ * will be sent to the specified partition. | ||
* | ||
* Allowing automatic routing of partitions is recommended when: | ||
* - The sending of events needs to be highly available. | ||
* - The event data should be evenly distributed among all available partitions. | ||
* Automatic routing of partitions is recommended because: | ||
* - The sending of events will be highly available. | ||
* - The event data will be evenly distributed among all available partitions. | ||
* | ||
* @param options The set of options to apply when creating the producer where you can specify the id of the partition | ||
* to which events need to be sent to, and retry options. | ||
* @param options The set of options to apply when creating the producer. | ||
* - `partitionId` : The identifier of the partition that the producer can be bound to. | ||
* - `retryOptions` : The retry options used to govern retry attempts when an issue is encountered while sending events. | ||
* A simple usage can be `{ "maxRetries": 4 }`. | ||
* | ||
* @throws {Error} Thrown if the underlying connection has been closed, create a new EventHubClient. | ||
* @returns Promise<void> | ||
* @returns EventHubProducer | ||
*/ | ||
@@ -352,3 +488,3 @@ createProducer(options?: EventHubProducerOptions): EventHubProducer { | ||
/** | ||
* Creates an Event Hub consumer responsible for reading `EventData` from a specific Event Hub partition, | ||
* Creates an Event Hub consumer that can receive events from a specific Event Hub partition, | ||
* in the context of a specific consumer group. | ||
@@ -361,8 +497,19 @@ * | ||
* | ||
* Designating a consumer as exclusive may be specified in the `options` via `ownerLevel`. | ||
* | ||
* @param consumerGroup The name of the consumer group this consumer is associated with. Events are read in the context of this group. | ||
* @param consumerGroup The name of the consumer group this consumer is associated with. | ||
* Events are read in the context of this group. You can get this information from Azure portal. | ||
* @param partitionId The identifier of the Event Hub partition from which events will be received. | ||
* You can get identifiers for all partitions by using the `getPartitionProperties` method on the `EventHubClient`. | ||
* @param eventPosition The position within the partition where the consumer should begin reading events. | ||
* @param options The set of options to apply when creating the consumer where you can specify retry options and ownerLevel. | ||
* The easiest way to create an instance of EventPosition is to use the static helpers on it like | ||
* - `EventPosition.fromOffset()` | ||
* - `EventPosition.fromSequenceNumber()` | ||
* - `EventPosition.fromEnqueuedTime()` | ||
* - `EventPosition.earliest()` | ||
* - `EventPosition.latest()` | ||
* @param options The set of options to apply when creating the consumer. | ||
* - `ownerLevel` : A number indicating that the consumer intends to be an exclusive consumer of events resulting in other | ||
* consumers to fail if their `ownerLevel` is lower or doesn't exist. | ||
* - `retryOptions`: The retry options used to govern retry attempts when an issue is encountered while receiving events. | ||
* A simple usage can be `{ "maxRetries": 4 }`. | ||
* | ||
* @throws {Error} Thrown if the underlying connection has been closed, create a new EventHubClient. | ||
@@ -369,0 +516,0 @@ * @throws {TypeError} Thrown if a required parameter is missing. |
@@ -44,22 +44,24 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
/** | ||
* @internal | ||
* @ignore | ||
* Represents the approximate receiver runtime information for a logical partition of an Event Hub. | ||
* @interface ReceiverRuntimeInfo | ||
* A set of information about the last enqueued event of a partition, as observed by the consumer as | ||
* events are received from the Event Hubs service | ||
* @interface LastEnqueuedEventInfo | ||
*/ | ||
export interface ReceiverRuntimeInfo { | ||
export interface LastEnqueuedEventInfo { | ||
/** | ||
* @property lastSequenceNumber The logical sequence number of the event. | ||
* @property The sequence number of the event that was last enqueued into the Event Hub partition from which | ||
* this event was received. | ||
*/ | ||
lastEnqueuedSequenceNumber?: number; | ||
sequenceNumber?: number; | ||
/** | ||
* @property lastEnqueuedTimeUtc The enqueued time of the last event. | ||
* @property The date and time, in UTC, that the last event was enqueued into the Event Hub partition from | ||
* which this event was received. | ||
*/ | ||
lastEnqueuedTimeUtc?: Date; | ||
enqueuedTime?: Date; | ||
/** | ||
* @property lastEnqueuedOffset The offset of the last enqueued event. | ||
* @property The offset of the event that was last enqueued into the Event Hub partition from which | ||
* this event was received. | ||
*/ | ||
lastEnqueuedOffset?: string; | ||
offset?: string; | ||
/** | ||
* @property retrievalTime The enqueued time of the last event. | ||
* @property The date and time, in UTC, that the last event was retrieved from the Event Hub partition. | ||
*/ | ||
@@ -99,3 +101,3 @@ retrievalTime?: Date; | ||
*/ | ||
runtimeInfo: ReceiverRuntimeInfo; | ||
runtimeInfo: LastEnqueuedEventInfo; | ||
/** | ||
@@ -115,7 +117,2 @@ * @property [ownerLevel] The Receiver ownerLevel. | ||
/** | ||
* @property receiverRuntimeMetricEnabled Indicates whether receiver runtime metric | ||
* is enabled. Default: false. | ||
*/ | ||
receiverRuntimeMetricEnabled: boolean = false; | ||
/** | ||
* @property [_receiver] The RHEA AMQP-based receiver link. | ||
@@ -231,3 +228,4 @@ * @private | ||
enqueuedTimeUtc: data.enqueuedTimeUtc!, | ||
partitionKey: data.partitionKey! | ||
partitionKey: data.partitionKey!, | ||
systemProperties: data.systemProperties | ||
}; | ||
@@ -237,6 +235,6 @@ | ||
if (this.receiverRuntimeMetricEnabled && data) { | ||
this.runtimeInfo.lastEnqueuedSequenceNumber = data.lastSequenceNumber; | ||
this.runtimeInfo.lastEnqueuedTimeUtc = data.lastEnqueuedTime; | ||
this.runtimeInfo.lastEnqueuedOffset = data.lastEnqueuedOffset; | ||
if (this.options.trackLastEnqueuedEventInfo && data) { | ||
this.runtimeInfo.sequenceNumber = data.lastSequenceNumber; | ||
this.runtimeInfo.enqueuedTime = data.lastEnqueuedTime; | ||
this.runtimeInfo.offset = data.lastEnqueuedOffset; | ||
this.runtimeInfo.retrievalTime = data.retrievalTime; | ||
@@ -278,3 +276,3 @@ log.receiver( | ||
"[%s] The receiver was closed by the user." + | ||
"Hence not notifying the user's error handler.", | ||
"Hence not notifying the user's error handler.", | ||
this._context.connectionId | ||
@@ -295,3 +293,3 @@ ); | ||
"[%s] Since the user did not close the receiver " + | ||
"we let the user know about it by calling the user's error handler.", | ||
"we let the user know about it by calling the user's error handler.", | ||
this._context.connectionId | ||
@@ -317,3 +315,3 @@ ); | ||
"[%s] The receiver was closed by the user." + | ||
"Hence not notifying the user's error handler.", | ||
"Hence not notifying the user's error handler.", | ||
this._context.connectionId | ||
@@ -335,3 +333,3 @@ ); | ||
"[%s] Since the user did not close the receiver, " + | ||
"we let the user know about it by calling the user's error handler.", | ||
"we let the user know about it by calling the user's error handler.", | ||
this._context.connectionId | ||
@@ -348,4 +346,4 @@ ); | ||
"[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " + | ||
"because the sdk initiated it. Hence not calling detached from the _onAmqpClose" + | ||
"() handler.", | ||
"because the sdk initiated it. Hence not calling detached from the _onAmqpClose" + | ||
"() handler.", | ||
this._context.connectionId, | ||
@@ -362,3 +360,3 @@ this.name, | ||
"[%s] 'receiver_close' event occurred for receiver '%s' with address '%s'. " + | ||
"The associated error is: %O", | ||
"The associated error is: %O", | ||
this._context.connectionId, | ||
@@ -374,4 +372,4 @@ this.name, | ||
"[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " + | ||
"and the sdk did not initiate this. The receiver is not reconnecting. Hence, calling " + | ||
"detached from the _onAmqpClose() handler.", | ||
"and the sdk did not initiate this. The receiver is not reconnecting. Hence, calling " + | ||
"detached from the _onAmqpClose() handler.", | ||
this._context.connectionId, | ||
@@ -385,4 +383,4 @@ this.name, | ||
"[%s] 'receiver_close' event occurred on the receiver '%s' with address '%s' " + | ||
"and the sdk did not initate this. Moreover the receiver is already re-connecting. " + | ||
"Hence not calling detached from the _onAmqpClose() handler.", | ||
"and the sdk did not initate this. Moreover the receiver is already re-connecting. " + | ||
"Hence not calling detached from the _onAmqpClose() handler.", | ||
this._context.connectionId, | ||
@@ -400,4 +398,4 @@ this.name, | ||
"[%s] 'session_close' event occurred on the session of receiver '%s' with " + | ||
"address '%s' and the sdk did not initiate this. Moreover the receiver is already " + | ||
"re-connecting. Hence not calling detached from the _onAmqpSessionClose() handler.", | ||
"address '%s' and the sdk did not initiate this. Moreover the receiver is already " + | ||
"re-connecting. Hence not calling detached from the _onAmqpSessionClose() handler.", | ||
this._context.connectionId, | ||
@@ -414,3 +412,3 @@ this.name, | ||
"[%s] 'session_close' event occurred for receiver '%s' with address '%s'. " + | ||
"The associated error is: %O", | ||
"The associated error is: %O", | ||
this._context.connectionId, | ||
@@ -426,4 +424,4 @@ this.name, | ||
"[%s] 'session_close' event occurred on the session of receiver '%s' with " + | ||
"address '%s' and the sdk did not initiate this. Hence calling detached from the " + | ||
"_onAmqpSessionClose() handler.", | ||
"address '%s' and the sdk did not initiate this. Hence calling detached from the " + | ||
"_onAmqpSessionClose() handler.", | ||
this._context.connectionId, | ||
@@ -437,4 +435,4 @@ this.name, | ||
"[%s] 'session_close' event occurred on the session of receiver '%s' with " + | ||
"address '%s' and the sdk did not initiate this. Moreover the receiver is already " + | ||
"re-connecting. Hence not calling detached from the _onAmqpSessionClose() handler.", | ||
"address '%s' and the sdk did not initiate this. Moreover the receiver is already " + | ||
"re-connecting. Hence not calling detached from the _onAmqpSessionClose() handler.", | ||
this._context.connectionId, | ||
@@ -482,4 +480,4 @@ this.name, | ||
"[%s] close() method of Receiver '%s' with address '%s' was not called. There " + | ||
"was an accompanying error and it is retryable. This is a candidate for re-establishing " + | ||
"the receiver link.", | ||
"was an accompanying error and it is retryable. This is a candidate for re-establishing " + | ||
"the receiver link.", | ||
this._context.connectionId, | ||
@@ -492,4 +490,4 @@ this.name, | ||
"[%s] close() method of Receiver '%s' with address '%s' was not called. There " + | ||
"was an accompanying error and it is NOT retryable. Hence NOT re-establishing " + | ||
"the receiver link.", | ||
"was an accompanying error and it is NOT retryable. Hence NOT re-establishing " + | ||
"the receiver link.", | ||
this._context.connectionId, | ||
@@ -505,4 +503,4 @@ this.name, | ||
"[%s] close() method of Receiver '%s' with address '%s' was not called. " + | ||
"There was no accompanying error as well. This is a candidate for re-establishing " + | ||
"the receiver link.", | ||
"There was no accompanying error as well. This is a candidate for re-establishing " + | ||
"the receiver link.", | ||
this._context.connectionId, | ||
@@ -568,3 +566,3 @@ this.name, | ||
"[%s] An error occurred while processing onDetached() of Receiver '%s' with address " + | ||
"'%s': %O", | ||
"'%s': %O", | ||
this._context.connectionId, | ||
@@ -755,3 +753,3 @@ this.name, | ||
"[%s] The receiver '%s' with address '%s' is not open and is not currently " + | ||
"establishing itself. Hence let's try to connect.", | ||
"establishing itself. Hence let's try to connect.", | ||
this._context.connectionId, | ||
@@ -809,3 +807,3 @@ this.name, | ||
"[%s] The receiver '%s' with address '%s' is open -> %s and is connecting " + | ||
"-> %s. Hence not reconnecting.", | ||
"-> %s. Hence not reconnecting.", | ||
this._context.connectionId, | ||
@@ -859,3 +857,3 @@ this.name, | ||
if (this.receiverRuntimeMetricEnabled) { | ||
if (this.options.trackLastEnqueuedEventInfo) { | ||
rcvrOptions.desired_capabilities = Constants.enableReceiverRuntimeMetricName; | ||
@@ -862,0 +860,0 @@ } |
@@ -38,3 +38,10 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
* Represents the position of an event in an Event Hub partition, typically used in the creation of | ||
* an `EventHubProducer`. | ||
* an `EventHubConsumer` to specify the position in the partition to begin receiving events from. | ||
* | ||
* Make use of the below static helpers to create an instance of `EventPosition` | ||
* - `fromOffset()` | ||
* - `fromSequenceNumber()` | ||
* - `fromEnqueuedTime()` | ||
* - `earliest()` | ||
* - `latest()` | ||
* @class | ||
@@ -87,2 +94,9 @@ */ | ||
/** | ||
* Instead of constructing an event position using `new Event Position()`, make use of the below static helpers | ||
* - `fromOffset()` | ||
* - `fromSequenceNumber()` | ||
* - `fromEnqueuedTime()` | ||
* - `earliest()` | ||
* - `latest()` | ||
* | ||
* @constructor | ||
@@ -112,4 +126,4 @@ * @internal | ||
static fromOffset(offset: number, isInclusive?: boolean): EventPosition { | ||
if (offset == undefined) { | ||
throw new Error('Missing parameter "offset"'); | ||
if (typeof offset !== "number" && typeof offset !== "string") { | ||
throw new Error(`Invalid offset "${offset}" provided to "fromOffset" method.`); | ||
} | ||
@@ -146,4 +160,6 @@ return new EventPosition({ offset: offset, isInclusive: isInclusive }); | ||
static fromEnqueuedTime(enqueuedTime: Date | number): EventPosition { | ||
if (enqueuedTime == undefined) { | ||
throw new Error('Missing parameter "enqueuedTime"'); | ||
if (typeof enqueuedTime !== "number" && !(enqueuedTime instanceof Date)) { | ||
throw new Error( | ||
`Invalid enqueuedTime "${enqueuedTime}" provided to "fromEnqueuedTime" method.` | ||
); | ||
} | ||
@@ -150,0 +166,0 @@ return new EventPosition({ enqueuedTime: enqueuedTime }); |
@@ -7,12 +7,13 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import { EventPosition } from "./eventPosition"; | ||
import { PartitionContext } from "./partitionContext"; | ||
import { CheckpointManager, Checkpoint } from "./checkpointManager"; | ||
import { ReceivedEventData } from "./eventData"; | ||
import { PartitionContext, Checkpoint } from "./partitionContext"; | ||
import { PumpManager } from "./pumpManager"; | ||
import { AbortSignalLike, AbortController } from "@azure/abort-controller"; | ||
import { AbortController, AbortSignalLike } from "@azure/abort-controller"; | ||
import * as log from "./log"; | ||
import { PartitionLoadBalancer } from "./partitionLoadBalancer"; | ||
import { delay } from "@azure/core-amqp"; | ||
import { PartitionProcessor } from "./partitionProcessor"; | ||
/** | ||
* Reason for closing a PartitionProcessor. | ||
* An enum representing the different reasons for an `EventProcessor` to stop processing | ||
* events from a partition in a consumer group of an Event Hub instance. | ||
*/ | ||
@@ -35,41 +36,7 @@ export enum CloseReason { | ||
/** | ||
* Implementations of this interface are responsible to process events, handle errors and update checkpoints | ||
* An interface representing the details on which instance of a `EventProcessor` owns processing | ||
* of a given partition from a consumer group of an Event Hub instance. | ||
* | ||
* **Note**: This is used internally by the `EventProcessor` and user never has to create it directly. | ||
*/ | ||
export interface PartitionProcessor { | ||
/** | ||
* This method is called when the `EventProcessor` takes ownership of a new partition and before any | ||
* events are received. | ||
* | ||
* @return {void} | ||
*/ | ||
initialize?(): Promise<void>; | ||
/** | ||
* This method is called before the partition processor is closed by the EventProcessor. | ||
* | ||
* @param closeReason The reason for closing this partition processor. | ||
* @return {void} | ||
*/ | ||
close?(reason: CloseReason): Promise<void>; | ||
/** | ||
* This method is called when new events are received. | ||
* | ||
* This is also a good place to update checkpoints as appropriate. | ||
* | ||
* @param eventData The received events to be processed. | ||
* @return {void} | ||
*/ | ||
processEvents(events: ReceivedEventData[]): Promise<void>; | ||
/** | ||
* This method is called when an error occurs while receiving events from Event Hub. | ||
* | ||
* @param error The error to be processed. | ||
* @return {void} | ||
*/ | ||
processError(error: Error): Promise<void>; | ||
} | ||
/** | ||
* Partition ownership information. Used by `PartitionManager` to claim ownership. | ||
*/ | ||
export interface PartitionOwnership { | ||
@@ -116,21 +83,15 @@ /** | ||
/** | ||
* A functional interface to create new instance(s) of `PartitionProcessor` when provided with a `PartitionContext` and `CheckpointManager`. | ||
* A Partition manager stores and retrieves partition ownership information and checkpoint details | ||
* for each partition in a given consumer group of an event hub instance. | ||
* | ||
* Users are not meant to implement an `PartitionManager`. | ||
* Users are expected to choose existing implementations of this interface, instantiate it, and pass | ||
* it to the constructor of `EventProcessor`. | ||
* | ||
* To get started, you can use the `InMemoryPartitionManager` which will store the relevant information in memory. | ||
* But in production, you should choose an implementation of the `PartitionManager` interface that will | ||
* store the checkpoints and partition ownerships to a durable store instead. | ||
* | ||
* Implementations of `PartitionManager` can be found on npm by searching for packages with the prefix @azure/eventhub-checkpointstore-. | ||
*/ | ||
export interface PartitionProcessorFactory { | ||
/** | ||
* Factory method to create a new instance of `PartitionProcessor` for a partition. | ||
* | ||
* @param partitionContext The partition context containing partition and Event Hub information. The new instance of | ||
* `PartitionProcessor` created by this method will be responsible for processing events only for this | ||
* partition. | ||
* @param checkpointManager The checkpoint manager for updating checkpoints when events are processed by `PartitionProcessor`. | ||
* | ||
* @return A new instance of `PartitionProcessor` responsible for processing events. | ||
*/ | ||
(context: PartitionContext, checkpointManager: CheckpointManager): PartitionProcessor; | ||
} | ||
/** | ||
* Partition manager stores and retrieves partition ownership information and checkpoint details for each partition in a given consumer group of an event hub instance. | ||
*/ | ||
export interface PartitionManager { | ||
@@ -163,6 +124,26 @@ /** | ||
// Options passed when creating EventProcessor, everything is optional | ||
/** | ||
* A set of options to pass to the constructor of `EventProcessor`. | ||
* You can specify | ||
* - `maxBatchSize`: The max size of the batch of events passed each time to user code for processing. | ||
* - `maxWaitTimeInSeconds`: The maximum amount of time to wait to build up the requested message count before | ||
* passing the data to user code for processing. If not provided, it defaults to 60 seconds. | ||
* | ||
* Example usage with default values: | ||
* ```ts | ||
* { | ||
* maxBatchSize: 1, | ||
* maxWaitTimeInSeconds: 60 | ||
* } | ||
* ``` | ||
*/ | ||
export interface EventProcessorOptions { | ||
initialEventPosition?: EventPosition; | ||
/** | ||
* The max size of the batch of events passed each time to user code for processing. | ||
*/ | ||
maxBatchSize?: number; | ||
/** | ||
* The maximum amount of time to wait to build up the requested message count before | ||
* passing the data to user code for processing. If not provided, it defaults to 60 seconds. | ||
*/ | ||
maxWaitTimeInSeconds?: number; | ||
@@ -172,4 +153,30 @@ } | ||
/** | ||
* Describes the Event Processor Host to process events from an EventHub. | ||
* @class EventProcessorHost | ||
* Event Processor based applications consist of one or more instances of EventProcessor which have been | ||
* configured to consume events from the same Event Hub and consumer group. They balance the | ||
* workload across different instances by distributing the partitions to be processed among themselves. | ||
* They also allow the user to track progress when events are processed using checkpoints. | ||
* | ||
* A checkpoint is meant to represent the last successfully processed event by the user from a particular | ||
* partition of a consumer group in an Event Hub instance. | ||
* | ||
* You need the below to create an instance of `EventProcessor` | ||
* - The name of the consumer group from which you want to process events | ||
* - An instance of `EventHubClient` class that was created for the Event Hub instance. | ||
* - A user implemented class that extends the `PartitionProcessor` class. To get started, you can use the | ||
* base class `PartitionProcessor` which simply logs the incoming events. To provide your code to process incoming | ||
* events, extend this class and override the `processEvents()` method. For example: | ||
* ```js | ||
* class SamplePartitionProcessor extends PartitionProcessor { | ||
* async processEvents(events, partitionContext) { | ||
* // user code to process events here | ||
* // use `partitionContext` property to get information on the partition | ||
* // use `partitionContext.updateCheckpoint()` method to update checkpoints as needed | ||
* } | ||
* } | ||
* ``` | ||
* - An instance of `PartitionManager`. To get started, you can pass an instance of `InMemoryPartitionManager`. | ||
* For production, choose an implementation that will store checkpoints and partition ownership details to a durable store. | ||
* Implementations of `PartitionManager` can be found on npm by searching for packages with the prefix @azure/eventhub-checkpointstore-. | ||
* | ||
* @class EventProcessor | ||
*/ | ||
@@ -179,3 +186,3 @@ export class EventProcessor { | ||
private _eventHubClient: EventHubClient; | ||
private _partitionProcessorFactory: PartitionProcessorFactory; | ||
private _partitionProcessorClass: typeof PartitionProcessor; | ||
private _processorOptions: EventProcessorOptions; | ||
@@ -188,10 +195,15 @@ private _pumpManager: PumpManager; | ||
private _partitionManager: PartitionManager; | ||
private _partitionLoadBalancer: PartitionLoadBalancer; | ||
/** | ||
* @param consumerGroupName The consumer group name used in this event processor to consumer events. | ||
* @param eventHubAsyncClient The Event Hub client. | ||
* @param partitionProcessorFactory The factory to create new partition processor(s). | ||
* @param initialEventPosition Initial event position to start consuming events. | ||
* @param partitionManager The partition manager. | ||
* @param eventHubName The Event Hub name. | ||
* @param consumerGroupName The name of the consumer group from which you want to process events. | ||
* @param eventHubClient An instance of `EventHubClient` that was created for the Event Hub instance. | ||
* @param PartitionProcessorClass A user-provided class that extends the `PartitionProcessor` class. | ||
* This class will be responsible for processing and checkpointing events. | ||
* @param partitionManager An instance of `PartitionManager`. To get started, you can pass an instance of `InMemoryPartitionManager`. | ||
* For production, choose an implementation that will store checkpoints and partition ownership details to a durable store. | ||
* @param options A set of options to configure the Event Processor | ||
* - `maxBatchSize` : The max size of the batch of events passed each time to user code for processing. | ||
* - `maxWaitTimeInSeconds` : The maximum amount of time to wait to build up the requested message count before | ||
* passing the data to user code for processing. If not provided, it defaults to 60 seconds. | ||
*/ | ||
@@ -201,3 +213,3 @@ constructor( | ||
eventHubClient: EventHubClient, | ||
partitionProcessorFactory: PartitionProcessorFactory, | ||
PartitionProcessorClass: typeof PartitionProcessor, | ||
partitionManager: PartitionManager, | ||
@@ -210,41 +222,117 @@ options?: EventProcessorOptions | ||
this._eventHubClient = eventHubClient; | ||
this._partitionProcessorFactory = partitionProcessorFactory; | ||
this._partitionProcessorClass = PartitionProcessorClass; | ||
this._partitionManager = partitionManager; | ||
this._processorOptions = options; | ||
this._pumpManager = new PumpManager(this._id, options); | ||
this._pumpManager = new PumpManager(this._id, this._processorOptions); | ||
const inactiveTimeLimitInMS = 60000; // ownership expiration time (1 mintue) | ||
this._partitionLoadBalancer = new PartitionLoadBalancer(this._id, inactiveTimeLimitInMS); | ||
} | ||
private async _getInactivePartitions(): Promise<string[]> { | ||
/** | ||
* The unique identifier for the EventProcessor. | ||
* | ||
* @return {string} | ||
*/ | ||
get id(): string { | ||
return this._id; | ||
} | ||
private _createPartitionOwnershipRequest( | ||
partitionOwnershipMap: Map<string, PartitionOwnership>, | ||
partitionIdToClaim: string | ||
): PartitionOwnership { | ||
const previousPartitionOwnership = partitionOwnershipMap.get(partitionIdToClaim); | ||
const partitionOwnership: PartitionOwnership = { | ||
ownerId: this._id, | ||
partitionId: partitionIdToClaim, | ||
consumerGroupName: this._consumerGroupName, | ||
eventHubName: this._eventHubClient.eventHubName, | ||
sequenceNumber: previousPartitionOwnership | ||
? previousPartitionOwnership.sequenceNumber | ||
: undefined, | ||
offset: previousPartitionOwnership ? previousPartitionOwnership.offset : undefined, | ||
eTag: previousPartitionOwnership ? previousPartitionOwnership.eTag : undefined, | ||
ownerLevel: 0 | ||
}; | ||
return partitionOwnership; | ||
} | ||
/* | ||
* Claim ownership of the given partition if it's available | ||
*/ | ||
private async _claimOwnership( | ||
partitionOwnershipMap: Map<string, PartitionOwnership>, | ||
partitionIdToClaim: string | ||
): Promise<void> { | ||
log.partitionLoadBalancer( | ||
`[${this._id}] Attempting to claim ownership of partition ${partitionIdToClaim}.` | ||
); | ||
const ownershipRequest = this._createPartitionOwnershipRequest( | ||
partitionOwnershipMap, | ||
partitionIdToClaim | ||
); | ||
try { | ||
// get all partition ids on the event hub | ||
const partitionIds = await this._eventHubClient.getPartitionIds(); | ||
// get partitions this EventProcessor is actively processing | ||
const activePartitionIds = this._pumpManager.receivingFromPartitions(); | ||
await this._partitionManager.claimOwnership([ownershipRequest]); | ||
log.partitionLoadBalancer( | ||
`[${this._id}] Successfully claimed ownership of partition ${partitionIdToClaim}.` | ||
); | ||
// get a list of partition ids that are not being processed by this EventProcessor | ||
const inactivePartitionIds: string[] = partitionIds.filter( | ||
(id) => activePartitionIds.indexOf(id) === -1 | ||
const partitionContext = new PartitionContext( | ||
this._eventHubClient.eventHubName, | ||
this._consumerGroupName, | ||
ownershipRequest.partitionId, | ||
this._partitionManager, | ||
this._id | ||
); | ||
return inactivePartitionIds; | ||
log.partitionLoadBalancer( | ||
`[${this._id}] [${partitionIdToClaim}] Calling user-provided PartitionProcessorFactory.` | ||
); | ||
const partitionProcessor = new this._partitionProcessorClass(); | ||
const eventPosition = ownershipRequest.sequenceNumber | ||
? EventPosition.fromSequenceNumber(ownershipRequest.sequenceNumber) | ||
: EventPosition.earliest(); | ||
await this._pumpManager.createPump( | ||
this._eventHubClient, | ||
partitionContext, | ||
eventPosition, | ||
partitionProcessor | ||
); | ||
log.partitionLoadBalancer(`[${this._id}] PartitionPump created successfully.`); | ||
} catch (err) { | ||
log.error(`[${this._id}] An error occured when retrieving partition ids: ${err}`); | ||
throw err; | ||
log.error( | ||
`[${this.id}] Failed to claim ownership of partition ${ownershipRequest.partitionId}` | ||
); | ||
} | ||
} | ||
/* | ||
* A simple implementation of an event processor that: | ||
* - Fetches all partition ids from Event Hub | ||
* - Gets the current ownership information of all the partitions from PartitionManager | ||
* - Claims ownership of any partition that doesn't have an owner yet. | ||
* - Starts a new PartitionProcessor and receives events from each of the partitions this instance owns | ||
/** | ||
* Every loop to this method will result in this EventProcessor owning at most one new partition. | ||
* | ||
* The load is considered balanced when no active EventProcessor owns 2 partitions more than any other active | ||
* EventProcessor. Given that each invocation to this method results in ownership claim of at most one partition, | ||
* this algorithm converges gradually towards a steady state. | ||
* | ||
* When a new partition is claimed, this method is also responsible for starting a partition pump that creates an | ||
* EventHubConsumer for processing events from that partition. | ||
*/ | ||
private async _runLoop(abortSignal: AbortSignalLike): Promise<void> { | ||
// periodically check if there is any partition not being processed and process it | ||
const waitIntervalInMs = 30000; | ||
const waitIntervalInMs = 10000; | ||
while (!abortSignal.aborted) { | ||
try { | ||
// get a list of partition ids that are not being processed by this EventProcessor | ||
const partitionsToAdd = await this._getInactivePartitions(); | ||
// check if the loop has been cancelled | ||
const partitionOwnershipMap: Map<string, PartitionOwnership> = new Map(); | ||
// Retrieve current partition ownership details from the datastore. | ||
const partitionOwnership = await this._partitionManager.listOwnership( | ||
this._eventHubClient.eventHubName, | ||
this._consumerGroupName | ||
); | ||
for (const ownership of partitionOwnership) { | ||
partitionOwnershipMap.set(ownership.partitionId, ownership); | ||
} | ||
const partitionIds = await this._eventHubClient.getPartitionIds(); | ||
if (abortSignal.aborted) { | ||
@@ -254,63 +342,12 @@ return; | ||
const tasks: PromiseLike<void>[] = []; | ||
// create partition pumps to process any partitions we should be processing | ||
for (const partitionId of partitionsToAdd) { | ||
const partitionContext: PartitionContext = { | ||
consumerGroupName: this._consumerGroupName, | ||
eventHubName: this._eventHubClient.eventHubName, | ||
partitionId: partitionId | ||
}; | ||
const partitionOwnership: PartitionOwnership = { | ||
eventHubName: this._eventHubClient.eventHubName, | ||
consumerGroupName: this._consumerGroupName, | ||
ownerId: this._id, | ||
partitionId: partitionId, | ||
ownerLevel: 0 | ||
}; | ||
await this._partitionManager.claimOwnership([partitionOwnership]); | ||
const checkpointManager = new CheckpointManager( | ||
partitionContext, | ||
this._partitionManager, | ||
this._id | ||
if (partitionIds.length > 0) { | ||
const partitionToClaim = this._partitionLoadBalancer.loadBalance( | ||
partitionOwnershipMap, | ||
partitionIds | ||
); | ||
log.eventProcessor( | ||
`[${this._id}] [${partitionId}] Calling user-provided PartitionProcessorFactory.` | ||
); | ||
const partitionProcessor = this._partitionProcessorFactory( | ||
partitionContext, | ||
checkpointManager | ||
); | ||
// eventually this will 1st check if the existing PartitionOwnership has a position | ||
let eventPosition = | ||
this._processorOptions.initialEventPosition || EventPosition.earliest(); | ||
const partitionOwnerships = await this._partitionManager.listOwnership( | ||
this._eventHubClient.eventHubName, | ||
this._consumerGroupName | ||
); | ||
for (const ownership of partitionOwnerships) { | ||
if (ownership.partitionId === partitionId && ownership.sequenceNumber) { | ||
eventPosition = EventPosition.fromSequenceNumber(ownership.sequenceNumber); | ||
break; | ||
} | ||
if (partitionToClaim) { | ||
await this._claimOwnership(partitionOwnershipMap, partitionToClaim); | ||
} | ||
tasks.push( | ||
this._pumpManager.createPump( | ||
this._eventHubClient, | ||
partitionContext, | ||
eventPosition, | ||
partitionProcessor | ||
) | ||
); | ||
} | ||
// wait for all the new pumps to be created | ||
await Promise.all(tasks); | ||
log.eventProcessor(`[${this._id}] PartitionPumps created within EventProcessor.`); | ||
// sleep | ||
@@ -325,23 +362,12 @@ log.eventProcessor( | ||
} | ||
// loop has completed, remove all existing pumps | ||
return this._pumpManager.removeAllPumps(CloseReason.Shutdown); | ||
} | ||
/** | ||
* The unique identifier for the EventProcessor. | ||
* Starts the `EventProcessor`. Based on the number of instances of `EventProcessor` that are running for the | ||
* same consumer group, the partitions are distributed among these instances to process events. | ||
* | ||
* @return {string} | ||
*/ | ||
get id(): string { | ||
return this._id; | ||
} | ||
/** | ||
* Starts processing of events for all partitions of the Event Hub that this event processor can own, assigning a | ||
* dedicated `PartitionProcessor` to each partition. If there are other Event Processors active for the same | ||
* consumer group on the Event Hub, responsibility for partitions will be shared between them. | ||
* For each partition, the user provided `PartitionProcessor` is instantiated. | ||
* | ||
* Subsequent calls to start will be ignored if this event processor is already running. Calling `start()` after `stop()` | ||
* is called will restart this event processor. | ||
* Subsequent calls to start will be ignored if this event processor is already running. | ||
* Calling `start()` after `stop()` is called will restart this event processor. | ||
* | ||
@@ -363,4 +389,4 @@ * @return {void} | ||
/** | ||
* Stops processing events for all partitions owned by this event processor. All `PartitionProcessor` will be | ||
* shutdown and any open resources will be closed. | ||
* Stops processing events for all partitions owned by this event processor. | ||
* All `PartitionProcessor` will be shutdown and any open resources will be closed. | ||
* | ||
@@ -379,2 +405,5 @@ * Subsequent calls to stop will be ignored if the event processor is not running. | ||
try { | ||
// remove all existing pumps | ||
await this._pumpManager.removeAllPumps(CloseReason.Shutdown); | ||
// waits for the event processor loop to complete | ||
@@ -381,0 +410,0 @@ // will complete immediately if _loopTask is undefined |
@@ -8,3 +8,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
export { WebSocketImpl } from "rhea-promise"; | ||
export { OnMessage, OnError } from "./eventHubReceiver"; | ||
export { OnMessage, OnError, LastEnqueuedEventInfo } from "./eventHubReceiver"; | ||
export { ReceiveHandler } from "./receiveHandler"; | ||
@@ -24,3 +24,2 @@ export { | ||
export { EventDataBatch } from "./eventDataBatch"; | ||
export { CheckpointManager } from "./checkpointManager"; | ||
export { | ||
@@ -30,10 +29,8 @@ EventProcessor, | ||
EventProcessorOptions, | ||
PartitionProcessor, | ||
PartitionManager, | ||
PartitionProcessorFactory, | ||
PartitionOwnership | ||
} from "./eventProcessor"; | ||
export { PartitionContext } from "./partitionContext"; | ||
export { PartitionContext, Checkpoint } from "./partitionContext"; | ||
export { InMemoryPartitionManager } from "./inMemoryPartitionManager"; | ||
export { Checkpoint } from "./checkpointManager"; | ||
export { PartitionProcessor } from "./partitionProcessor"; | ||
export { | ||
@@ -40,0 +37,0 @@ MessagingError, |
@@ -5,7 +5,14 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import { PartitionManager, PartitionOwnership } from "./eventProcessor"; | ||
import { Checkpoint } from "./checkpointManager"; | ||
import { Checkpoint } from "./partitionContext"; | ||
import { generate_uuid } from "rhea-promise"; | ||
/** | ||
* A simple in-memory implementation of a `PartitionManager` | ||
* The `EventProcessor` relies on a `PartitionManager` to store checkpoints and handle partition | ||
* ownerships. `InMemoryPartitionManager` is simple partition manager that stores checkpoints and | ||
* partition ownerships in memory of your program. | ||
* | ||
* You can use the `InMemoryPartitionManager` to get started with using the `EventProcessor`. | ||
* But in production, you should choose an implementation of the `PartitionManager` interface that will | ||
* store the checkpoints and partition ownerships to a durable store instead. | ||
* | ||
* @class | ||
@@ -40,4 +47,9 @@ */ | ||
for (const ownership of partitionOwnership) { | ||
if (!this._partitionOwnershipMap.has(ownership.partitionId)) { | ||
if ( | ||
!this._partitionOwnershipMap.has(ownership.partitionId) || | ||
this._partitionOwnershipMap.get(ownership.partitionId)!.eTag === ownership.eTag | ||
) { | ||
ownership.eTag = generate_uuid(); | ||
var date = new Date(); | ||
ownership.lastModifiedTimeInMS = date.getTime(); | ||
this._partitionOwnershipMap.set(ownership.partitionId, ownership); | ||
@@ -44,0 +56,0 @@ } |
@@ -71,1 +71,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
export const eventProcessor = debugModule("azure:event-hubs:eventProcessor"); | ||
/** | ||
* @ignore | ||
* log statements for partitionLoadBalancer | ||
*/ | ||
export const partitionLoadBalancer = debugModule("azure:event-hubs:partitionLoadBalancer"); |
@@ -57,3 +57,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
*/ | ||
eventHubPath: string; | ||
eventHubName: string; | ||
/** | ||
@@ -211,3 +211,3 @@ * @property Identifier of the partition within the Event Hub. | ||
beginningSequenceNumber: info.begin_sequence_number, | ||
eventHubPath: info.name, | ||
eventHubName: info.name, | ||
lastEnqueuedOffset: info.last_enqueued_offset, | ||
@@ -214,0 +214,0 @@ lastEnqueuedTimeUtc: new Date(info.last_enqueued_time_utc), |
// Copyright (c) Microsoft Corporation. All rights reserved. | ||
// Licensed under the MIT License. | ||
import { ReceivedEventData } from "./eventData"; | ||
import { PartitionManager } from "./eventProcessor"; | ||
/** | ||
* PartitionContext is passed into an EventProrcessor's initialization handler and contains information | ||
* about the partition, the EventProcessor will be processing events from. | ||
*/ | ||
export interface PartitionContext { | ||
* A checkpoint is meant to represent the last successfully processed event by the user from a particular | ||
* partition of a consumer group in an Event Hub instance. | ||
* | ||
* When the `updateCheckpoint()` method on the `PartitionContext` class is called by the user, a | ||
* `Checkpoint` is created internally. It is then stored in the storage solution implemented by the | ||
* `PartitionManager` chosen by the user when creating an `EventProcessor`. | ||
* | ||
* Users are never expected to interact with `Checkpoint` directly. This interface exists to support the | ||
* internal workings of `EventProcessor` and `PartitionManager`. | ||
**/ | ||
export interface Checkpoint { | ||
/** | ||
* @property The event hub name | ||
*/ | ||
eventHubName: string; | ||
/** | ||
* @property The consumer group name | ||
*/ | ||
consumerGroupName: string; | ||
/** | ||
* @property The unique identifier of the event processor. | ||
*/ | ||
ownerId: string; | ||
/** | ||
* @property The identifier of the Event Hub partition | ||
* @readonly | ||
*/ | ||
readonly partitionId: string; | ||
/** | ||
* @property The event hub name | ||
* @readonly | ||
partitionId: string; | ||
/** | ||
* @property The sequence number of the event. | ||
*/ | ||
readonly eventHubName: string; | ||
/** | ||
sequenceNumber: number; | ||
/** | ||
* @property The offset of the event. | ||
*/ | ||
offset: number; | ||
/** | ||
* @property The unique identifier for the operation. | ||
*/ | ||
eTag: string; | ||
} | ||
/** | ||
* `PartitionContext` holds information on the partition, consumer group and event hub | ||
* being processed by the `EventProcessor`. | ||
* It also allows users to update checkpoints via the `updateCheckpoint` method. | ||
* | ||
* User is never meant to create `PartitionContext` directly. It is only passed to user code | ||
* by the `EventProcessor`. | ||
*/ | ||
export class PartitionContext { | ||
private _partitionManager: PartitionManager; | ||
private _consumerGroupName: string; | ||
private _eventHubName: string; | ||
private _eventProcessorId: string; | ||
private _partitionId: string; | ||
private _eTag: string = ""; | ||
constructor( | ||
eventHubName: string, | ||
consumerGroupName: string, | ||
partitionId: string, | ||
partitionManager: PartitionManager, | ||
eventProcessorId: string | ||
) { | ||
this._eventHubName = eventHubName; | ||
this._consumerGroupName = consumerGroupName; | ||
this._partitionId = partitionId; | ||
this._partitionManager = partitionManager; | ||
this._eventProcessorId = eventProcessorId; | ||
} | ||
/** | ||
* @property The consumer group name | ||
* @readonly | ||
* @readonly | ||
*/ | ||
readonly consumerGroupName: string; | ||
get consumerGroupName() { | ||
return this._consumerGroupName; | ||
} | ||
/** | ||
* @property The event hub name | ||
* @readonly | ||
*/ | ||
get eventHubName() { | ||
return this._eventHubName; | ||
} | ||
/** | ||
* @property The identifier of the Event Hub partition | ||
* @readonly | ||
*/ | ||
get partitionId() { | ||
return this._partitionId; | ||
} | ||
/** | ||
* Updates the checkpoint for the partition associated with the `PartitionContext`. | ||
* | ||
* A checkpoint is meant to represent the last successfully processed event by the user from a particular | ||
* partition of a consumer group in an Event Hub instance. | ||
* | ||
* @param eventData The event that you want to update the checkpoint with. | ||
* @return Promise<void> | ||
*/ | ||
public async updateCheckpoint(eventData: ReceivedEventData): Promise<void>; | ||
/** | ||
* Updates the checkpoint for the partition associated with the `PartitionContext`. | ||
* | ||
* A checkpoint is meant to represent the last successfully processed event by the user from a particular | ||
* partition of a consumer group in an Event Hub instance. | ||
* | ||
* @param sequenceNumber The sequence number of the event that you want to update the checkpoint with. | ||
* @param offset The offset of the event that you want to update the checkpoint with. | ||
* @return Promise<void>. | ||
*/ | ||
public async updateCheckpoint(sequenceNumber: number, offset: number): Promise<void>; | ||
public async updateCheckpoint( | ||
eventDataOrSequenceNumber: ReceivedEventData | number, | ||
offset?: number | ||
): Promise<void> { | ||
const checkpoint: Checkpoint = { | ||
eventHubName: this._eventHubName, | ||
consumerGroupName: this._consumerGroupName, | ||
ownerId: this._eventProcessorId, | ||
partitionId: this._partitionId, | ||
sequenceNumber: | ||
typeof eventDataOrSequenceNumber === "number" | ||
? eventDataOrSequenceNumber | ||
: eventDataOrSequenceNumber.sequenceNumber, | ||
offset: | ||
typeof offset === "number" ? offset : (eventDataOrSequenceNumber as ReceivedEventData).offset, | ||
eTag: this._eTag | ||
}; | ||
this._eTag = await this._partitionManager.updateCheckpoint(checkpoint); | ||
} | ||
} |
@@ -5,6 +5,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import * as log from "./log"; | ||
import { EventProcessorOptions, PartitionProcessor, CloseReason } from "./eventProcessor"; | ||
import { EventProcessorOptions, CloseReason } from "./eventProcessor"; | ||
import { PartitionContext } from "./partitionContext"; | ||
import { EventHubClient } from "./eventHubClient"; | ||
import { EventPosition } from "./eventPosition"; | ||
import { PartitionProcessor } from "./partitionProcessor"; | ||
import { EventHubConsumer } from "./receiver"; | ||
@@ -20,2 +21,3 @@ import { AbortController } from "@azure/abort-controller"; | ||
private _receiver: EventHubConsumer | undefined; | ||
private _initialEventPosition: EventPosition; | ||
private _isReceiving: boolean = false; | ||
@@ -28,2 +30,3 @@ private _abortController: AbortController; | ||
partitionProcessor: PartitionProcessor, | ||
initialEventPosition: EventPosition, | ||
options?: EventProcessorOptions | ||
@@ -35,2 +38,3 @@ ) { | ||
this._partitionProcessor = partitionProcessor; | ||
this._initialEventPosition = initialEventPosition; | ||
this._processorOptions = options; | ||
@@ -46,8 +50,6 @@ this._abortController = new AbortController(); | ||
this._isReceiving = true; | ||
if (typeof this._partitionProcessor.initialize === "function") { | ||
try { | ||
await this._partitionProcessor.initialize(); | ||
} catch { | ||
// swallow the error from the user-defined code | ||
} | ||
try { | ||
await this._partitionProcessor.initialize(this._partitionContext); | ||
} catch { | ||
// swallow the error from the user-defined code | ||
} | ||
@@ -62,3 +64,4 @@ this._receiveEvents(this._partitionContext.partitionId); | ||
partitionId, | ||
this._processorOptions.initialEventPosition || EventPosition.earliest() | ||
this._initialEventPosition, | ||
{ ownerLevel: 0 } | ||
); | ||
@@ -77,3 +80,3 @@ | ||
} | ||
await this._partitionProcessor.processEvents(receivedEvents); | ||
await this._partitionProcessor.processEvents(receivedEvents, this._partitionContext); | ||
} catch (err) { | ||
@@ -89,3 +92,3 @@ // check if this pump is still receiving | ||
try { | ||
await this._partitionProcessor.processError(err); | ||
await this._partitionProcessor.processError(err, this._partitionContext); | ||
} catch (err) { | ||
@@ -98,2 +101,7 @@ log.error("An error was thrown by user's processError method: ", err); | ||
try { | ||
// If the exception indicates that the partition was stolen (i.e some other consumer with same ownerlevel | ||
// started consuming the partition), update the closeReason | ||
if (err.name === "ReceiverDisconnectedError") { | ||
return await this.stop(CloseReason.OwnershipLost); | ||
} | ||
// this will close the pump and will break us out of the while loop | ||
@@ -119,5 +127,3 @@ return await this.stop(CloseReason.EventHubException); | ||
this._abortController.abort(); | ||
if (typeof this._partitionProcessor.close === "function") { | ||
await this._partitionProcessor.close(reason); | ||
} | ||
await this._partitionProcessor.close(reason, this._partitionContext); | ||
} catch (err) { | ||
@@ -124,0 +130,0 @@ log.error("An error occurred while closing the receiver.", err); |
@@ -7,3 +7,4 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import { EventPosition } from "./eventPosition"; | ||
import { PartitionProcessor, EventProcessorOptions, CloseReason } from "./eventProcessor"; | ||
import { EventProcessorOptions, CloseReason } from "./eventProcessor"; | ||
import { PartitionProcessor } from "./partitionProcessor"; | ||
import { PartitionPump } from "./partitionPump"; | ||
@@ -77,6 +78,9 @@ import * as log from "./log"; | ||
const pump = new PartitionPump(eventHubClient, partitionContext, partitionProcessor, { | ||
...this._options, | ||
initialEventPosition | ||
}); | ||
const pump = new PartitionPump( | ||
eventHubClient, | ||
partitionContext, | ||
partitionProcessor, | ||
initialEventPosition, | ||
this._options | ||
); | ||
@@ -83,0 +87,0 @@ try { |
@@ -7,3 +7,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
import { EventHubConsumerOptions } from "./eventHubClient"; | ||
import { OnMessage, OnError, EventHubReceiver } from "./eventHubReceiver"; | ||
import { OnMessage, OnError, EventHubReceiver, LastEnqueuedEventInfo } from "./eventHubReceiver"; | ||
import { ReceivedEventData } from "./eventData"; | ||
@@ -25,3 +25,4 @@ import { | ||
/** | ||
* Options to pass when creating an iterator to iterate over events | ||
* Options to pass when creating an async iteratable using the `getEventIterator()` method on the | ||
* `EventHubConsumer`. | ||
*/ | ||
@@ -43,3 +44,9 @@ export interface EventIteratorOptions { | ||
* in the context of a specific consumer group. | ||
* To create a consumer use the `createConsumer()` method on your `EventHubClient`. | ||
* | ||
* You can pass the below in the `options` when creating a consumer. | ||
* - `ownerLevel` : A number indicating that the consumer intends to be an exclusive consumer of events resulting in other | ||
* consumers to fail if their `ownerLevel` is lower or doesn't exist. | ||
* - `retryOptions`: The retry options used to govern retry attempts when an issue is encountered while receiving events. | ||
* | ||
* Multiple consumers are allowed on the same partition in a consumer group. | ||
@@ -50,4 +57,4 @@ * If there is a need to have an exclusive consumer for a partition in a consumer group, | ||
* | ||
* The consumer can be used to receive messages in a batch or by registering handlers. | ||
* Use the `createConsumer` function on the EventHubClient to instantiate an EventHubConsumer. | ||
* The consumer can be used to receive messages in a batch using `receiveBatch()` or by registering handlers | ||
* by using `receive()` or via an async iterable got by using `getEventIterator()` | ||
* @class | ||
@@ -82,4 +89,18 @@ */ | ||
private _retryOptions: RetryOptions; | ||
/** | ||
* @property A set of information about the last enqueued event of a partition. | ||
*/ | ||
private _lastEnqueuedEventInfo: LastEnqueuedEventInfo; | ||
/** | ||
* @property The last enqueued event information. This property will only | ||
* be enabled when `trackLastEnqueuedEventInfo` option is set to true in the | ||
* `client.createConsumer()` method. | ||
* @readonly | ||
*/ | ||
public get lastEnqueuedEventInfo(): LastEnqueuedEventInfo { | ||
return this._lastEnqueuedEventInfo; | ||
} | ||
/** | ||
* @property Returns `true` if the consumer is closed. This can happen either because the consumer | ||
@@ -133,2 +154,5 @@ * itself has been closed or the client that created it has been closed. | ||
/** | ||
* EventHubConsumer should not be constructed using `new EventHubConsumer()` | ||
* Use the `createConsumer()` method on your `EventHubClient` instead. | ||
* @private | ||
* @constructor | ||
@@ -148,2 +172,3 @@ * @internal | ||
this._partitionId = partitionId; | ||
this._lastEnqueuedEventInfo = {}; | ||
this._receiverOptions = options || {}; | ||
@@ -160,8 +185,7 @@ this._retryOptions = this._receiverOptions.retryOptions || {}; | ||
/** | ||
* Starts the consumer by establishing an AMQP session and an AMQP receiver link on the session. Messages will be passed to | ||
* the provided onMessage handler and error will be passed to the provided onError handler. | ||
* Starts receiving events from the service and calls the user provided message handler for each event. | ||
* Returns an object that can be used to query the state of the receiver and to stop receiving events as well. | ||
* | ||
* @param onMessage The message handler to receive event data objects. | ||
* @param onError The error handler to receive an error that occurs | ||
* while receiving messages. | ||
* @param onError The error handler for errora that can occur when receiving events. | ||
* @param abortSignal An implementation of the `AbortSignalLike` interface to signal the request to cancel the operation. | ||
@@ -229,2 +253,10 @@ * For example, use the @azure/abort-controller to create an `AbortSignal`. | ||
if ( | ||
this._receiverOptions.trackLastEnqueuedEventInfo && | ||
this._baseConsumer && | ||
this._baseConsumer.runtimeInfo | ||
) { | ||
this._lastEnqueuedEventInfo = this._baseConsumer.runtimeInfo; | ||
} | ||
return new ReceiveHandler(baseConsumer); | ||
@@ -237,3 +269,3 @@ } | ||
* The async iterable cannot indicate that it is done. | ||
* When using `for..await..of` to iterate over the events returned | ||
* When using `for await (let event of consumer.getEventIterator()) {}` to iterate over the events returned | ||
* by the async iterable, take care to exit the for loop after receiving the | ||
@@ -264,7 +296,6 @@ * desired number of messages, or provide an `AbortSignal` to control when to exit the loop. | ||
/** | ||
* Receives a batch of EventData objects from an EventHub partition for a given count and a given max wait time in seconds, whichever | ||
* happens first. This method can be used directly after creating the consumer object and **MUST NOT** be used along with the `start()` method. | ||
* Returns a promise that resolves to an array of events received from the service. | ||
* | ||
* @param maxMessageCount The maximum number of messages to receive in this batch. Must be a value greater than 0. | ||
* @param [maxWaitTimeInSeconds] The maximum amount of time to wait to build up the requested message count for the batch; | ||
* @param maxMessageCount The maximum number of messages to receive. | ||
* @param maxWaitTimeInSeconds The maximum amount of time to wait to build up the requested message count; | ||
* If not provided, it defaults to 60 seconds. | ||
@@ -355,3 +386,9 @@ * @param abortSignal An implementation of the `AbortSignalLike` interface to signal the request to cancel the operation. | ||
receivedEvents.push(eventData); | ||
if ( | ||
this._receiverOptions.trackLastEnqueuedEventInfo && | ||
this._baseConsumer && | ||
this._baseConsumer.runtimeInfo | ||
) { | ||
this._lastEnqueuedEventInfo = this._baseConsumer.runtimeInfo; | ||
} | ||
// resolve the operation's promise after the requested | ||
@@ -358,0 +395,0 @@ // number of events are received. |
@@ -13,12 +13,17 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
/** | ||
* A producer responsible for sending `EventData` to a specific Event Hub. | ||
* If `partitionId` is specified in the `options`, all event data sent using the producer | ||
* A producer responsible for sending events to an Event Hub. | ||
* To create a producer use the `createProducer()` method on your `EventHubClient`. | ||
* You can pass the below in the `options` when creating a producer. | ||
* - `partitionId` : The identifier of the partition that the producer can be bound to. | ||
* - `retryOptions` : The retry options used to govern retry attempts when an issue is encountered while sending events. | ||
* A simple usage can be `{ "maxRetries": 4 }`. | ||
* | ||
* If `partitionId` is specified when creating a producer, all event data sent using the producer | ||
* will be sent to the specified partition. | ||
* Otherwise, they are automatically routed to an available partition by the Event Hubs service. | ||
* | ||
* Allowing automatic routing of partitions is recommended when: | ||
* - The sending of events needs to be highly available. | ||
* - The event data should be evenly distributed among all available partitions. | ||
* Automatic routing of partitions is recommended because: | ||
* - The sending of events will be highly available. | ||
* - The event data will be evenly distributed among all available partitions. | ||
* | ||
* Use the `createProducer` function on the EventHubClient to instantiate an EventHubProducer. | ||
* @class | ||
@@ -49,2 +54,5 @@ */ | ||
/** | ||
* EventHubProducer should not be constructed using `new EventHubProduer()` | ||
* Use the `createProducer()` method on your `EventHubClient` instead. | ||
* @private | ||
* @constructor | ||
@@ -65,5 +73,9 @@ * @internal | ||
/** | ||
* Creates an instance of EventDataBatch to which one can add events until the maximum supported size is reached. | ||
* The batch can be passed to the send method of the EventHubProducer to be sent to Azure Event Hubs | ||
* @param options Options to define partition key and max message size. | ||
* Creates an instance of `EventDataBatch` to which one can add events until the maximum supported size is reached. | ||
* The batch can be passed to the `send()` method of the `EventHubProducer` to be sent to Azure Event Hubs. | ||
* @param options A set of options to configure the behavior of the batch. | ||
* - `partitionKey` : A value that is hashed to produce a partition assignment. | ||
* Not applicable if the `EventHubProducer` was created using a `partitionId`. | ||
* - `maxSizeInBytes`: The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached. | ||
* - `abortSignal` : A signal the request to cancel the send operation. | ||
* @returns Promise<EventDataBatch> | ||
@@ -112,7 +124,11 @@ */ | ||
/** | ||
* Send a single or an array of events to the associated Event Hub. | ||
* Send one or more of events to the associated Event Hub. | ||
* | ||
* @param eventData An individual event data or array of event data objects to send. | ||
* @param eventData An individual `EventData` object, or an array of `EventData` objects or an | ||
* instance of `EventDataBatch`. | ||
* @param options The set of options that can be specified to influence the way in which | ||
* events are sent to the associated Event Hub, including an abort signal to cancel the operation. | ||
* events are sent to the associated Event Hub. | ||
* - `partitionKey` : A value that is hashed to produce a partition assignment. | ||
* Not applicable if the `EventHubProducer` was created using a `partitionId`. | ||
* - `abortSignal` : A signal the request to cancel the send operation. | ||
* | ||
@@ -119,0 +135,0 @@ * @returns Promise<void> |
@@ -9,3 +9,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
name: "@azure/event-hubs", | ||
version: "5.0.0-preview.2" | ||
version: "5.0.0-preview.3" | ||
}; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
1131133
59
80
14886
352
12
5
2
135
+ Addedbuffer@^5.2.1
+ Addedprocess@^0.11.10
+ Added@azure/abort-controller@1.0.0-preview.2(transitive)
+ Added@azure/core-amqp@1.0.0-preview.3(transitive)
+ Added@azure/core-auth@1.0.0-preview.3(transitive)
+ Addeddebug@4.4.0(transitive)
- Removed@azure/abort-controller@1.0.0-preview.1(transitive)
- Removed@azure/core-amqp@1.0.0-preview.2(transitive)
- Removed@azure/core-auth@1.0.0-preview.2(transitive)
Updateddebug@^4.1.1