@aws-lambda-powertools/batch
Advanced tools
Comparing version
@@ -5,2 +5,5 @@ import type { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; | ||
import type { EventSourceDataClassTypes, PartialItemFailureResponse, PartialItemFailures } from './types'; | ||
/** | ||
* Process batch and partially report failed items | ||
*/ | ||
declare abstract class BasePartialBatchProcessor extends BasePartialProcessor { | ||
@@ -14,11 +17,48 @@ COLLECTOR_MAPPING: { | ||
eventType: keyof typeof EventType; | ||
/** | ||
* Initializes base batch processing class | ||
* @param eventType Whether this is SQS, DynamoDB stream, or Kinesis data stream event | ||
*/ | ||
constructor(eventType: keyof typeof EventType); | ||
/** | ||
* Report messages to be deleted in case of partial failures | ||
*/ | ||
clean(): void; | ||
/** | ||
* Collects identifiers of failed items for a DynamoDB stream | ||
* @returns list of identifiers for failed items | ||
*/ | ||
collectDynamoDBFailures(): PartialItemFailures[]; | ||
/** | ||
* Collects identifiers of failed items for a Kinesis stream | ||
* @returns list of identifiers for failed items | ||
*/ | ||
collectKinesisFailures(): PartialItemFailures[]; | ||
/** | ||
* Collects identifiers of failed items for an SQS batch | ||
* @returns list of identifiers for failed items | ||
*/ | ||
collectSqsFailures(): PartialItemFailures[]; | ||
/** | ||
* Determines whether all records in a batch failed to process | ||
* @returns true if all records resulted in exception results | ||
*/ | ||
entireBatchFailed(): boolean; | ||
/** | ||
* Collects identifiers for failed batch items | ||
* @returns formatted messages to use in batch deletion | ||
*/ | ||
getMessagesToReport(): PartialItemFailures[]; | ||
/** | ||
* Determines if any records failed to process | ||
* @returns true if any records resulted in exception | ||
*/ | ||
hasMessagesToReport(): boolean; | ||
/** | ||
* Remove results from previous execution | ||
*/ | ||
prepare(): void; | ||
/** | ||
* @returns Batch items that failed processing, if any | ||
*/ | ||
response(): PartialItemFailureResponse; | ||
@@ -25,0 +65,0 @@ toBatchType(record: EventSourceDataClassTypes, eventType: keyof typeof EventType): SQSRecord | KinesisStreamRecord | DynamoDBRecord; |
@@ -7,3 +7,10 @@ "use strict"; | ||
const errors_1 = require("./errors"); | ||
/** | ||
* Process batch and partially report failed items | ||
*/ | ||
class BasePartialBatchProcessor extends BasePartialProcessor_1.BasePartialProcessor { | ||
/** | ||
* Initializes base batch processing class | ||
* @param eventType Whether this is SQS, DynamoDB stream, or Kinesis data stream event | ||
*/ | ||
constructor(eventType) { | ||
@@ -19,2 +26,5 @@ super(); | ||
} | ||
/** | ||
* Report messages to be deleted in case of partial failures | ||
*/ | ||
clean() { | ||
@@ -30,2 +40,6 @@ if (!this.hasMessagesToReport()) { | ||
} | ||
/** | ||
* Collects identifiers of failed items for a DynamoDB stream | ||
* @returns list of identifiers for failed items | ||
*/ | ||
collectDynamoDBFailures() { | ||
@@ -41,2 +55,6 @@ const failures = []; | ||
} | ||
/** | ||
* Collects identifiers of failed items for a Kinesis stream | ||
* @returns list of identifiers for failed items | ||
*/ | ||
collectKinesisFailures() { | ||
@@ -50,2 +68,6 @@ const failures = []; | ||
} | ||
/** | ||
* Collects identifiers of failed items for an SQS batch | ||
* @returns list of identifiers for failed items | ||
*/ | ||
collectSqsFailures() { | ||
@@ -59,11 +81,26 @@ const failures = []; | ||
} | ||
/** | ||
* Determines whether all records in a batch failed to process | ||
* @returns true if all records resulted in exception results | ||
*/ | ||
entireBatchFailed() { | ||
return this.errors.length == this.records.length; | ||
} | ||
/** | ||
* Collects identifiers for failed batch items | ||
* @returns formatted messages to use in batch deletion | ||
*/ | ||
getMessagesToReport() { | ||
return this.COLLECTOR_MAPPING[this.eventType](); | ||
} | ||
/** | ||
* Determines if any records failed to process | ||
* @returns true if any records resulted in exception | ||
*/ | ||
hasMessagesToReport() { | ||
return this.failureMessages.length != 0; | ||
} | ||
/** | ||
* Remove results from previous execution | ||
*/ | ||
prepare() { | ||
@@ -75,2 +112,5 @@ this.successMessages.length = 0; | ||
} | ||
/** | ||
* @returns Batch items that failed processing, if any | ||
*/ | ||
response() { | ||
@@ -77,0 +117,0 @@ return this.batchResponse; |
import type { BaseRecord, BatchProcessingOptions, EventSourceDataClassTypes, FailureResponse, ResultType, SuccessResponse } from './types'; | ||
/** | ||
* Abstract class for batch processors. | ||
*/ | ||
declare abstract class BasePartialProcessor { | ||
@@ -9,11 +12,56 @@ errors: Error[]; | ||
successMessages: EventSourceDataClassTypes[]; | ||
/** | ||
* Initializes base processor class | ||
*/ | ||
constructor(); | ||
/** | ||
* Clean class instance after processing | ||
*/ | ||
abstract clean(): void; | ||
/** | ||
* Keeps track of batch records that failed processing | ||
* @param record record that failed processing | ||
* @param exception exception that was thrown | ||
* @returns FailureResponse object with ["fail", exception, original record] | ||
*/ | ||
failureHandler(record: EventSourceDataClassTypes, exception: Error): FailureResponse; | ||
/** | ||
* Prepare class instance before processing | ||
*/ | ||
abstract prepare(): void; | ||
/** | ||
* Call instance's handler for each record | ||
* @returns List of processed records | ||
*/ | ||
process(): Promise<(SuccessResponse | FailureResponse)[]>; | ||
/** | ||
* Process a record with an asyncronous handler | ||
* | ||
* @param record Record to be processed | ||
*/ | ||
abstract processRecord(record: BaseRecord): Promise<SuccessResponse | FailureResponse>; | ||
/** | ||
* Process a record with the handler | ||
* @param record Record to be processed | ||
*/ | ||
abstract processRecordSync(record: BaseRecord): SuccessResponse | FailureResponse; | ||
/** | ||
* Call instance's handler for each record | ||
* @returns List of processed records | ||
*/ | ||
processSync(): (SuccessResponse | FailureResponse)[]; | ||
/** | ||
* Set class instance attributes before execution | ||
* @param records List of records to be processed | ||
* @param handler CallableFunction to process entries of "records" | ||
* @param options Options to be used during processing | ||
* @returns this object | ||
*/ | ||
register(records: BaseRecord[], handler: CallableFunction, options?: BatchProcessingOptions): BasePartialProcessor; | ||
/** | ||
* Keeps track of batch records that were processed successfully | ||
* @param record record that succeeded processing | ||
* @param result result from record handler | ||
* @returns SuccessResponse object with ["success", result, original record] | ||
*/ | ||
successHandler(record: EventSourceDataClassTypes, result: ResultType): SuccessResponse; | ||
@@ -20,0 +68,0 @@ } |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.BasePartialProcessor = void 0; | ||
/** | ||
* Abstract class for batch processors. | ||
*/ | ||
class BasePartialProcessor { | ||
/** | ||
* Initializes base processor class | ||
*/ | ||
constructor() { | ||
@@ -12,2 +18,8 @@ this.successMessages = []; | ||
} | ||
/** | ||
* Keeps track of batch records that failed processing | ||
* @param record record that failed processing | ||
* @param exception exception that was thrown | ||
* @returns FailureResponse object with ["fail", exception, original record] | ||
*/ | ||
failureHandler(record, exception) { | ||
@@ -19,3 +31,11 @@ const entry = ['fail', exception.message, record]; | ||
} | ||
/** | ||
* Call instance's handler for each record | ||
* @returns List of processed records | ||
*/ | ||
async process() { | ||
/** | ||
* If this is a sync processor, user should have called processSync instead, | ||
* so we call the method early to throw the error early thus failing fast. | ||
*/ | ||
if (this.constructor.name === 'BatchProcessorSync') { | ||
@@ -30,3 +50,11 @@ await this.processRecord(this.records[0]); | ||
} | ||
/** | ||
* Call instance's handler for each record | ||
* @returns List of processed records | ||
*/ | ||
processSync() { | ||
/** | ||
* If this is an async processor, user should have called process instead, | ||
* so we call the method early to throw the error early thus failing fast. | ||
*/ | ||
if (this.constructor.name === 'BatchProcessor') { | ||
@@ -43,2 +71,9 @@ this.processRecordSync(this.records[0]); | ||
} | ||
/** | ||
* Set class instance attributes before execution | ||
* @param records List of records to be processed | ||
* @param handler CallableFunction to process entries of "records" | ||
* @param options Options to be used during processing | ||
* @returns this object | ||
*/ | ||
register(records, handler, options) { | ||
@@ -52,2 +87,8 @@ this.records = records; | ||
} | ||
/** | ||
* Keeps track of batch records that were processed successfully | ||
* @param record record that succeeded processing | ||
* @param result result from record handler | ||
* @returns SuccessResponse object with ["success", result, original record] | ||
*/ | ||
successHandler(record, result) { | ||
@@ -54,0 +95,0 @@ const entry = ['success', result, record]; |
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; | ||
import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; | ||
/** | ||
* Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB | ||
*/ | ||
declare class BatchProcessor extends BasePartialBatchProcessor { | ||
processRecord(record: BaseRecord): Promise<SuccessResponse | FailureResponse>; | ||
/** | ||
* Process a record with instance's handler | ||
* @param _record Batch record to be processed | ||
* @returns response of success or failure | ||
*/ | ||
processRecordSync(_record: BaseRecord): SuccessResponse | FailureResponse; | ||
@@ -6,0 +14,0 @@ } |
@@ -6,2 +6,5 @@ "use strict"; | ||
const errors_1 = require("./errors"); | ||
/** | ||
* Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB | ||
*/ | ||
class BatchProcessor extends BasePartialBatchProcessor_1.BasePartialBatchProcessor { | ||
@@ -18,2 +21,7 @@ async processRecord(record) { | ||
} | ||
/** | ||
* Process a record with instance's handler | ||
* @param _record Batch record to be processed | ||
* @returns response of success or failure | ||
*/ | ||
processRecordSync(_record) { | ||
@@ -20,0 +28,0 @@ throw new errors_1.BatchProcessingError('Not implemented. Use asyncProcess() instead.'); |
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; | ||
import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; | ||
/** | ||
* Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB | ||
*/ | ||
declare class BatchProcessorSync extends BasePartialBatchProcessor { | ||
processRecord(_record: BaseRecord): Promise<SuccessResponse | FailureResponse>; | ||
/** | ||
* Process a record with instance's handler | ||
* @param record Batch record to be processed | ||
* @returns response of success or failure | ||
*/ | ||
processRecordSync(record: BaseRecord): SuccessResponse | FailureResponse; | ||
@@ -6,0 +14,0 @@ } |
@@ -6,2 +6,5 @@ "use strict"; | ||
const errors_1 = require("./errors"); | ||
/** | ||
* Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB | ||
*/ | ||
class BatchProcessorSync extends BasePartialBatchProcessor_1.BasePartialBatchProcessor { | ||
@@ -11,2 +14,7 @@ async processRecord(_record) { | ||
} | ||
/** | ||
* Process a record with instance's handler | ||
* @param record Batch record to be processed | ||
* @returns response of success or failure | ||
*/ | ||
processRecordSync(record) { | ||
@@ -13,0 +21,0 @@ try { |
@@ -0,4 +1,10 @@ | ||
/** | ||
* Base error thrown by the Batch Processing utility | ||
*/ | ||
declare class BatchProcessingError extends Error { | ||
constructor(message: string); | ||
} | ||
/** | ||
* Error thrown by the Batch Processing utility when all batch records failed to be processed | ||
*/ | ||
declare class FullBatchFailureError extends BatchProcessingError { | ||
@@ -8,5 +14,14 @@ recordErrors: Error[]; | ||
} | ||
/** | ||
* Error thrown by the Batch Processing utility when a SQS FIFO queue is short-circuited. | ||
* This happens when a record fails processing and the remaining records are not processed | ||
* to avoid out-of-order delivery. | ||
*/ | ||
declare class SqsFifoShortCircuitError extends BatchProcessingError { | ||
constructor(); | ||
} | ||
/** | ||
* Error thrown by the Batch Processing utility when a partial processor receives an unexpected | ||
* batch type. | ||
*/ | ||
declare class UnexpectedBatchTypeError extends BatchProcessingError { | ||
@@ -13,0 +28,0 @@ constructor(); |
@@ -5,2 +5,5 @@ "use strict"; | ||
const constants_1 = require("./constants"); | ||
/** | ||
* Base error thrown by the Batch Processing utility | ||
*/ | ||
class BatchProcessingError extends Error { | ||
@@ -13,2 +16,5 @@ constructor(message) { | ||
exports.BatchProcessingError = BatchProcessingError; | ||
/** | ||
* Error thrown by the Batch Processing utility when all batch records failed to be processed | ||
*/ | ||
class FullBatchFailureError extends BatchProcessingError { | ||
@@ -22,2 +28,7 @@ constructor(childErrors) { | ||
exports.FullBatchFailureError = FullBatchFailureError; | ||
/** | ||
* Error thrown by the Batch Processing utility when a SQS FIFO queue is short-circuited. | ||
* This happens when a record fails processing and the remaining records are not processed | ||
* to avoid out-of-order delivery. | ||
*/ | ||
class SqsFifoShortCircuitError extends BatchProcessingError { | ||
@@ -30,2 +41,6 @@ constructor() { | ||
exports.SqsFifoShortCircuitError = SqsFifoShortCircuitError; | ||
/** | ||
* Error thrown by the Batch Processing utility when a partial processor receives an unexpected | ||
* batch type. | ||
*/ | ||
class UnexpectedBatchTypeError extends BatchProcessingError { | ||
@@ -32,0 +47,0 @@ constructor() { |
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; | ||
import type { BaseRecord, BatchProcessingOptions, PartialItemFailureResponse } from './types'; | ||
/** | ||
* Higher level function to handle batch event processing | ||
* @param event Lambda's original event | ||
* @param recordHandler Callable function to process each record from the batch | ||
* @param processor Batch processor to handle partial failure cases | ||
* @param options Batch processing options | ||
* @returns Lambda Partial Batch Response | ||
*/ | ||
declare const processPartialResponse: (event: { | ||
@@ -4,0 +12,0 @@ Records: BaseRecord[]; |
@@ -5,2 +5,10 @@ "use strict"; | ||
const errors_1 = require("./errors"); | ||
/** | ||
* Higher level function to handle batch event processing | ||
* @param event Lambda's original event | ||
* @param recordHandler Callable function to process each record from the batch | ||
* @param processor Batch processor to handle partial failure cases | ||
* @param options Batch processing options | ||
* @returns Lambda Partial Batch Response | ||
*/ | ||
const processPartialResponse = async (event, recordHandler, processor, options) => { | ||
@@ -7,0 +15,0 @@ if (!event.Records || !Array.isArray(event.Records)) { |
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; | ||
import type { BaseRecord, BatchProcessingOptions, PartialItemFailureResponse } from './types'; | ||
/** | ||
* Higher level function to handle batch event processing | ||
* @param event Lambda's original event | ||
* @param recordHandler Callable function to process each record from the batch | ||
* @param processor Batch processor to handle partial failure cases | ||
* @returns Lambda Partial Batch Response | ||
*/ | ||
declare const processPartialResponseSync: (event: { | ||
@@ -4,0 +11,0 @@ Records: BaseRecord[]; |
@@ -5,2 +5,9 @@ "use strict"; | ||
const errors_1 = require("./errors"); | ||
/** | ||
* Higher level function to handle batch event processing | ||
* @param event Lambda's original event | ||
* @param recordHandler Callable function to process each record from the batch | ||
* @param processor Batch processor to handle partial failure cases | ||
* @returns Lambda Partial Batch Response | ||
*/ | ||
const processPartialResponseSync = (event, recordHandler, processor, options) => { | ||
@@ -7,0 +14,0 @@ if (!event.Records || !Array.isArray(event.Records)) { |
import { BatchProcessorSync } from './BatchProcessorSync'; | ||
import type { FailureResponse, SuccessResponse } from './types'; | ||
/** | ||
* Process native partial responses from SQS FIFO queues | ||
* Stops processing records when the first record fails | ||
* The remaining records are reported as failed items | ||
*/ | ||
declare class SqsFifoPartialProcessor extends BatchProcessorSync { | ||
constructor(); | ||
/** | ||
* Call instance's handler for each record. | ||
* When the first failed message is detected, the process is short-circuited | ||
* And the remaining messages are reported as failed items | ||
*/ | ||
processSync(): (SuccessResponse | FailureResponse)[]; | ||
/** | ||
* Starting from the first failure index, fail all remaining messages and append them to the result list | ||
* @param firstFailureIndex Index of first message that failed | ||
* @param result List of success and failure responses with remaining messages failed | ||
*/ | ||
shortCircuitProcessing(firstFailureIndex: number, processedRecords: (SuccessResponse | FailureResponse)[]): (SuccessResponse | FailureResponse)[]; | ||
@@ -7,0 +22,0 @@ } |
@@ -7,2 +7,7 @@ "use strict"; | ||
const errors_1 = require("./errors"); | ||
/** | ||
* Process native partial responses from SQS FIFO queues | ||
* Stops processing records when the first record fails | ||
* The remaining records are reported as failed items | ||
*/ | ||
class SqsFifoPartialProcessor extends BatchProcessorSync_1.BatchProcessorSync { | ||
@@ -12,2 +17,7 @@ constructor() { | ||
} | ||
/** | ||
* Call instance's handler for each record. | ||
* When the first failed message is detected, the process is short-circuited | ||
* And the remaining messages are reported as failed items | ||
*/ | ||
processSync() { | ||
@@ -18,2 +28,4 @@ this.prepare(); | ||
for (const record of this.records) { | ||
// If we have any failed messages, it means the last message failed | ||
// We should then short circuit the process and fail remaining messages | ||
if (this.failureMessages.length != 0) { | ||
@@ -28,2 +40,7 @@ return this.shortCircuitProcessing(currentIndex, processedRecords); | ||
} | ||
/** | ||
* Starting from the first failure index, fail all remaining messages and append them to the result list | ||
* @param firstFailureIndex Index of first message that failed | ||
* @param result List of success and failure responses with remaining messages failed | ||
*/ | ||
shortCircuitProcessing(firstFailureIndex, processedRecords) { | ||
@@ -30,0 +47,0 @@ const remainingRecords = this.records.slice(firstFailureIndex); |
{ | ||
"name": "@aws-lambda-powertools/batch", | ||
"version": "1.14.0", | ||
"version": "1.14.1", | ||
"description": "The batch processing package for the Powertools for AWS Lambda (TypeScript) library.", | ||
@@ -5,0 +5,0 @@ "author": { |
# Powertools for AWS Lambda (TypeScript) - Batch Processing Utility <!-- omit in toc --> | ||
Powertools for AWS Lambda (TypeScript) is a developer toolkit to implement Serverless [best practices and increase developer velocity](https://docs.powertools.aws.dev/lambda-typescript/latest/#features). | ||
Powertools for AWS Lambda (TypeScript) is a developer toolkit to implement Serverless [best practices and increase developer velocity](https://docs.powertools.aws.dev/lambda/typescript/latest/#features). | ||
@@ -246,3 +246,3 @@ You can use the package in both TypeScript and JavaScript code bases. | ||
Share what you did with Powertools for AWS Lambda (TypeScript) ππ. Blog post, workshops, presentation, sample apps and others. Check out what the community has already shared about Powertools for AWS Lambda (TypeScript) [here](https://docs.powertools.aws.dev/lambda-typescript/latest/we_made_this). | ||
Share what you did with Powertools for AWS Lambda (TypeScript) ππ. Blog post, workshops, presentation, sample apps and others. Check out what the community has already shared about Powertools for AWS Lambda (TypeScript) [here](https://docs.powertools.aws.dev/lambda/typescript/latest/we_made_this). | ||
@@ -249,0 +249,0 @@ ### Using Lambda Layer |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
48631
28%768
61.68%