@aws-lambda-powertools/batch
Advanced tools
Comparing version
@@ -43,5 +43,4 @@ import type { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; | ||
* | ||
* If the entire batch failed, and the utility is not configured otherwise, | ||
* this method will throw a `FullBatchFailureError` with the list of errors | ||
* that occurred during processing. | ||
* If the entire batch failed this method will throw a {@link FullBatchFailureError | `FullBatchFailureError`} with the list of | ||
* errors that occurred during processing, unless the `throwOnFullBatchFailure` option is set to `false`. | ||
* | ||
@@ -48,0 +47,0 @@ * Otherwise, it will build the partial failure response based on the event type. |
@@ -18,2 +18,17 @@ "use strict"; | ||
/** | ||
* Mapping of event types to their respective failure collectors | ||
* | ||
* Each service expects a different format for partial failure reporting, | ||
* this mapping ensures that the correct format is used for each event type. | ||
*/ | ||
COLLECTOR_MAPPING; | ||
/** | ||
* Response to be returned after processing | ||
*/ | ||
batchResponse; | ||
/** | ||
* Type of event that the processor is handling | ||
*/ | ||
eventType; | ||
/** | ||
* Initializes base batch processing class | ||
@@ -36,5 +51,4 @@ * | ||
* | ||
* If the entire batch failed, and the utility is not configured otherwise, | ||
* this method will throw a `FullBatchFailureError` with the list of errors | ||
* that occurred during processing. | ||
* If the entire batch failed this method will throw a {@link FullBatchFailureError | `FullBatchFailureError`} with the list of | ||
* errors that occurred during processing, unless the `throwOnFullBatchFailure` option is set to `false`. | ||
* | ||
@@ -47,3 +61,4 @@ * Otherwise, it will build the partial failure response based on the event type. | ||
} | ||
if (this.entireBatchFailed()) { | ||
if (this.options?.throwOnFullBatchFailure !== false && | ||
this.entireBatchFailed()) { | ||
throw new errors_js_1.FullBatchFailureError(this.errors); | ||
@@ -50,0 +65,0 @@ } |
@@ -20,2 +20,26 @@ "use strict"; | ||
class BasePartialProcessor { | ||
/** | ||
* List of errors that occurred during processing | ||
*/ | ||
errors; | ||
/** | ||
* List of records that failed processing | ||
*/ | ||
failureMessages; | ||
/** | ||
* Record handler provided by customers to process records | ||
*/ | ||
handler; | ||
/** | ||
* Options to be used during processing (optional) | ||
*/ | ||
options; | ||
/** | ||
* List of records to be processed | ||
*/ | ||
records; | ||
/** | ||
* List of records that were processed successfully | ||
*/ | ||
successMessages; | ||
constructor() { | ||
@@ -22,0 +46,0 @@ this.successMessages = []; |
@@ -19,2 +19,3 @@ "use strict"; | ||
class FullBatchFailureError extends BatchProcessingError { | ||
recordErrors; | ||
constructor(childErrors) { | ||
@@ -21,0 +22,0 @@ super('All records failed processing. See individual errors below.'); |
@@ -39,2 +39,27 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; | ||
* | ||
* By default, if the entire batch fails, the function will throw an error. | ||
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false` | ||
* | ||
* @example | ||
* ```typescript | ||
* import { | ||
* BatchProcessor, | ||
* EventType, | ||
* processPartialResponse, | ||
* } from '@aws-lambda-powertools/batch'; | ||
* import type { KinesisStreamHandler, KinesisStreamRecord } from 'aws-lambda'; | ||
* | ||
* const processor = new BatchProcessor(EventType.KinesisDataStreams); | ||
* | ||
* const recordHandler = async (record: KinesisStreamRecord): Promise<void> => { | ||
* const payload = JSON.parse(record.kinesis.data); | ||
* }; | ||
* | ||
* export const handler: KinesisStreamHandler = async (event, context) => | ||
* processPartialResponse(event, recordHandler, processor, { | ||
* context, | ||
* throwOnFullBatchFailure: false | ||
* }); | ||
* ``` | ||
* | ||
* @param event The event object containing the batch of records | ||
@@ -41,0 +66,0 @@ * @param recordHandler Async function to process each record from the batch |
@@ -41,2 +41,27 @@ "use strict"; | ||
* | ||
* By default, if the entire batch fails, the function will throw an error. | ||
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false` | ||
* | ||
* @example | ||
* ```typescript | ||
* import { | ||
* BatchProcessor, | ||
* EventType, | ||
* processPartialResponse, | ||
* } from '@aws-lambda-powertools/batch'; | ||
* import type { KinesisStreamHandler, KinesisStreamRecord } from 'aws-lambda'; | ||
* | ||
* const processor = new BatchProcessor(EventType.KinesisDataStreams); | ||
* | ||
* const recordHandler = async (record: KinesisStreamRecord): Promise<void> => { | ||
* const payload = JSON.parse(record.kinesis.data); | ||
* }; | ||
* | ||
* export const handler: KinesisStreamHandler = async (event, context) => | ||
* processPartialResponse(event, recordHandler, processor, { | ||
* context, | ||
* throwOnFullBatchFailure: false | ||
* }); | ||
* ``` | ||
* | ||
* @param event The event object containing the batch of records | ||
@@ -43,0 +68,0 @@ * @param recordHandler Async function to process each record from the batch |
@@ -65,2 +65,26 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; | ||
* | ||
* By default, if the entire batch fails, the function will throw an error. | ||
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false` | ||
* | ||
* @example | ||
* ```typescript | ||
* import { | ||
* SqsFifoPartialProcessor, | ||
* processPartialResponseSync, | ||
* } from '@aws-lambda-powertools/batch'; | ||
* import type { SQSRecord, SQSHandler } from 'aws-lambda'; | ||
* | ||
* const processor = new SqsFifoPartialProcessor(); | ||
* | ||
* const recordHandler = async (record: SQSRecord): Promise<void> => { | ||
* const payload = JSON.parse(record.body); | ||
* }; | ||
* | ||
* export const handler: SQSHandler = async (event, context) => | ||
* processPartialResponseSync(event, recordHandler, processor, { | ||
* context, | ||
* throwOnFullBatchFailure: false | ||
* }); | ||
* ``` | ||
* | ||
* @param event The event object containing the batch of records | ||
@@ -67,0 +91,0 @@ * @param recordHandler Sync function to process each record from the batch |
@@ -67,2 +67,26 @@ "use strict"; | ||
* | ||
* By default, if the entire batch fails, the function will throw an error. | ||
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false` | ||
* | ||
* @example | ||
* ```typescript | ||
* import { | ||
* SqsFifoPartialProcessor, | ||
* processPartialResponseSync, | ||
* } from '@aws-lambda-powertools/batch'; | ||
* import type { SQSRecord, SQSHandler } from 'aws-lambda'; | ||
* | ||
* const processor = new SqsFifoPartialProcessor(); | ||
* | ||
* const recordHandler = async (record: SQSRecord): Promise<void> => { | ||
* const payload = JSON.parse(record.body); | ||
* }; | ||
* | ||
* export const handler: SQSHandler = async (event, context) => | ||
* processPartialResponseSync(event, recordHandler, processor, { | ||
* context, | ||
* throwOnFullBatchFailure: false | ||
* }); | ||
* ``` | ||
* | ||
* @param event The event object containing the batch of records | ||
@@ -69,0 +93,0 @@ * @param recordHandler Sync function to process each record from the batch |
"use strict"; | ||
var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) { | ||
if (kind === "m") throw new TypeError("Private method is not writable"); | ||
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter"); | ||
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it"); | ||
return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value; | ||
}; | ||
var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) { | ||
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter"); | ||
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it"); | ||
return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver); | ||
}; | ||
var _SqsFifoPartialProcessor_instances, _SqsFifoPartialProcessor_currentGroupId, _SqsFifoPartialProcessor_failedGroupIds, _SqsFifoPartialProcessor_addToFailedGroup, _SqsFifoPartialProcessor_processFailRecord, _SqsFifoPartialProcessor_setCurrentGroup; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
@@ -51,14 +39,13 @@ exports.SqsFifoPartialProcessor = void 0; | ||
class SqsFifoPartialProcessor extends BatchProcessorSync_js_1.BatchProcessorSync { | ||
/** | ||
* The ID of the current message group being processed. | ||
*/ | ||
#currentGroupId; | ||
/** | ||
* A set of group IDs that have already encountered failures. | ||
*/ | ||
#failedGroupIds; | ||
constructor() { | ||
super(constants_js_1.EventType.SQS); | ||
_SqsFifoPartialProcessor_instances.add(this); | ||
/** | ||
* The ID of the current message group being processed. | ||
*/ | ||
_SqsFifoPartialProcessor_currentGroupId.set(this, void 0); | ||
/** | ||
* A set of group IDs that have already encountered failures. | ||
*/ | ||
_SqsFifoPartialProcessor_failedGroupIds.set(this, void 0); | ||
__classPrivateFieldSet(this, _SqsFifoPartialProcessor_failedGroupIds, new Set(), "f"); | ||
this.#failedGroupIds = new Set(); | ||
} | ||
@@ -73,4 +60,4 @@ /** | ||
failureHandler(record, exception) { | ||
if (this.options?.skipGroupOnError && __classPrivateFieldGet(this, _SqsFifoPartialProcessor_currentGroupId, "f")) { | ||
__classPrivateFieldGet(this, _SqsFifoPartialProcessor_instances, "m", _SqsFifoPartialProcessor_addToFailedGroup).call(this, __classPrivateFieldGet(this, _SqsFifoPartialProcessor_currentGroupId, "f")); | ||
if (this.options?.skipGroupOnError && this.#currentGroupId) { | ||
this.#addToFailedGroup(this.#currentGroupId); | ||
} | ||
@@ -102,3 +89,3 @@ return super.failureHandler(record, exception); | ||
for (const record of this.records) { | ||
__classPrivateFieldGet(this, _SqsFifoPartialProcessor_instances, "m", _SqsFifoPartialProcessor_setCurrentGroup).call(this, record.attributes?.MessageGroupId); | ||
this.#setCurrentGroup(record.attributes?.MessageGroupId); | ||
// If we have any failed messages, we should then short circuit the process and | ||
@@ -113,6 +100,6 @@ // fail remaining messages unless `skipGroupOnError` is true | ||
const shouldSkipCurrentGroup = this.options?.skipGroupOnError && | ||
__classPrivateFieldGet(this, _SqsFifoPartialProcessor_currentGroupId, "f") && | ||
__classPrivateFieldGet(this, _SqsFifoPartialProcessor_failedGroupIds, "f").has(__classPrivateFieldGet(this, _SqsFifoPartialProcessor_currentGroupId, "f")); | ||
this.#currentGroupId && | ||
this.#failedGroupIds.has(this.#currentGroupId); | ||
const result = shouldSkipCurrentGroup | ||
? __classPrivateFieldGet(this, _SqsFifoPartialProcessor_instances, "m", _SqsFifoPartialProcessor_processFailRecord).call(this, record, new errors_js_1.SqsFifoMessageGroupShortCircuitError()) | ||
? this.#processFailRecord(record, new errors_js_1.SqsFifoMessageGroupShortCircuitError()) | ||
: this.processRecordSync(record); | ||
@@ -140,3 +127,3 @@ processedRecords.push(result); | ||
for (const record of remainingRecords) { | ||
__classPrivateFieldGet(this, _SqsFifoPartialProcessor_instances, "m", _SqsFifoPartialProcessor_processFailRecord).call(this, record, new errors_js_1.SqsFifoShortCircuitError()); | ||
this.#processFailRecord(record, new errors_js_1.SqsFifoShortCircuitError()); | ||
} | ||
@@ -146,11 +133,29 @@ this.clean(); | ||
} | ||
/** | ||
* Adds the specified group ID to the set of failed group IDs. | ||
* | ||
* @param group - The group ID to be added to the set of failed group IDs. | ||
*/ | ||
#addToFailedGroup(group) { | ||
this.#failedGroupIds.add(group); | ||
} | ||
/** | ||
* Processes a fail record. | ||
* | ||
* @param record - The record that failed. | ||
* @param exception - The error that occurred. | ||
*/ | ||
#processFailRecord(record, exception) { | ||
const data = this.toBatchType(record, this.eventType); | ||
return this.failureHandler(data, exception); | ||
} | ||
/** | ||
* Sets the current group ID for the message being processed. | ||
* | ||
* @param group - The group ID of the current message being processed. | ||
*/ | ||
#setCurrentGroup(group) { | ||
this.#currentGroupId = group; | ||
} | ||
} | ||
exports.SqsFifoPartialProcessor = SqsFifoPartialProcessor; | ||
_SqsFifoPartialProcessor_currentGroupId = new WeakMap(), _SqsFifoPartialProcessor_failedGroupIds = new WeakMap(), _SqsFifoPartialProcessor_instances = new WeakSet(), _SqsFifoPartialProcessor_addToFailedGroup = function _SqsFifoPartialProcessor_addToFailedGroup(group) { | ||
__classPrivateFieldGet(this, _SqsFifoPartialProcessor_failedGroupIds, "f").add(group); | ||
}, _SqsFifoPartialProcessor_processFailRecord = function _SqsFifoPartialProcessor_processFailRecord(record, exception) { | ||
const data = this.toBatchType(record, this.eventType); | ||
return this.failureHandler(data, exception); | ||
}, _SqsFifoPartialProcessor_setCurrentGroup = function _SqsFifoPartialProcessor_setCurrentGroup(group) { | ||
__classPrivateFieldSet(this, _SqsFifoPartialProcessor_currentGroupId, group, "f"); | ||
}; |
@@ -10,2 +10,3 @@ import type { Context, DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; | ||
* @property skipGroupOnError The option to group on error during processing | ||
* @property throwOnFullBatchFailure The option to throw an error if the entire batch fails | ||
*/ | ||
@@ -23,2 +24,6 @@ type BatchProcessingOptions<T = BasePartialBatchProcessor> = { | ||
skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never; | ||
/** | ||
* Set this to false to prevent throwing an error if the entire batch fails. | ||
*/ | ||
throwOnFullBatchFailure?: boolean; | ||
}; | ||
@@ -25,0 +30,0 @@ /** |
@@ -43,5 +43,4 @@ import type { DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; | ||
* | ||
* If the entire batch failed, and the utility is not configured otherwise, | ||
* this method will throw a `FullBatchFailureError` with the list of errors | ||
* that occurred during processing. | ||
* If the entire batch failed this method will throw a {@link FullBatchFailureError | `FullBatchFailureError`} with the list of | ||
* errors that occurred during processing, unless the `throwOnFullBatchFailure` option is set to `false`. | ||
* | ||
@@ -48,0 +47,0 @@ * Otherwise, it will build the partial failure response based on the event type. |
@@ -15,2 +15,17 @@ import { BasePartialProcessor } from './BasePartialProcessor.js'; | ||
/** | ||
* Mapping of event types to their respective failure collectors | ||
* | ||
* Each service expects a different format for partial failure reporting, | ||
* this mapping ensures that the correct format is used for each event type. | ||
*/ | ||
COLLECTOR_MAPPING; | ||
/** | ||
* Response to be returned after processing | ||
*/ | ||
batchResponse; | ||
/** | ||
* Type of event that the processor is handling | ||
*/ | ||
eventType; | ||
/** | ||
* Initializes base batch processing class | ||
@@ -33,5 +48,4 @@ * | ||
* | ||
* If the entire batch failed, and the utility is not configured otherwise, | ||
* this method will throw a `FullBatchFailureError` with the list of errors | ||
* that occurred during processing. | ||
* If the entire batch failed this method will throw a {@link FullBatchFailureError | `FullBatchFailureError`} with the list of | ||
* errors that occurred during processing, unless the `throwOnFullBatchFailure` option is set to `false`. | ||
* | ||
@@ -44,3 +58,4 @@ * Otherwise, it will build the partial failure response based on the event type. | ||
} | ||
if (this.entireBatchFailed()) { | ||
if (this.options?.throwOnFullBatchFailure !== false && | ||
this.entireBatchFailed()) { | ||
throw new FullBatchFailureError(this.errors); | ||
@@ -47,0 +62,0 @@ } |
@@ -17,2 +17,26 @@ /** | ||
class BasePartialProcessor { | ||
/** | ||
* List of errors that occurred during processing | ||
*/ | ||
errors; | ||
/** | ||
* List of records that failed processing | ||
*/ | ||
failureMessages; | ||
/** | ||
* Record handler provided by customers to process records | ||
*/ | ||
handler; | ||
/** | ||
* Options to be used during processing (optional) | ||
*/ | ||
options; | ||
/** | ||
* List of records to be processed | ||
*/ | ||
records; | ||
/** | ||
* List of records that were processed successfully | ||
*/ | ||
successMessages; | ||
constructor() { | ||
@@ -19,0 +43,0 @@ this.successMessages = []; |
@@ -15,2 +15,3 @@ import { EventType } from './constants.js'; | ||
class FullBatchFailureError extends BatchProcessingError { | ||
recordErrors; | ||
constructor(childErrors) { | ||
@@ -17,0 +18,0 @@ super('All records failed processing. See individual errors below.'); |
@@ -39,2 +39,27 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; | ||
* | ||
* By default, if the entire batch fails, the function will throw an error. | ||
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false` | ||
* | ||
* @example | ||
* ```typescript | ||
* import { | ||
* BatchProcessor, | ||
* EventType, | ||
* processPartialResponse, | ||
* } from '@aws-lambda-powertools/batch'; | ||
* import type { KinesisStreamHandler, KinesisStreamRecord } from 'aws-lambda'; | ||
* | ||
* const processor = new BatchProcessor(EventType.KinesisDataStreams); | ||
* | ||
* const recordHandler = async (record: KinesisStreamRecord): Promise<void> => { | ||
* const payload = JSON.parse(record.kinesis.data); | ||
* }; | ||
* | ||
* export const handler: KinesisStreamHandler = async (event, context) => | ||
* processPartialResponse(event, recordHandler, processor, { | ||
* context, | ||
* throwOnFullBatchFailure: false | ||
* }); | ||
* ``` | ||
* | ||
* @param event The event object containing the batch of records | ||
@@ -41,0 +66,0 @@ * @param recordHandler Async function to process each record from the batch |
@@ -38,2 +38,27 @@ import { UnexpectedBatchTypeError } from './errors.js'; | ||
* | ||
* By default, if the entire batch fails, the function will throw an error. | ||
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false` | ||
* | ||
* @example | ||
* ```typescript | ||
* import { | ||
* BatchProcessor, | ||
* EventType, | ||
* processPartialResponse, | ||
* } from '@aws-lambda-powertools/batch'; | ||
* import type { KinesisStreamHandler, KinesisStreamRecord } from 'aws-lambda'; | ||
* | ||
* const processor = new BatchProcessor(EventType.KinesisDataStreams); | ||
* | ||
* const recordHandler = async (record: KinesisStreamRecord): Promise<void> => { | ||
* const payload = JSON.parse(record.kinesis.data); | ||
* }; | ||
* | ||
* export const handler: KinesisStreamHandler = async (event, context) => | ||
* processPartialResponse(event, recordHandler, processor, { | ||
* context, | ||
* throwOnFullBatchFailure: false | ||
* }); | ||
* ``` | ||
* | ||
* @param event The event object containing the batch of records | ||
@@ -40,0 +65,0 @@ * @param recordHandler Async function to process each record from the batch |
@@ -65,2 +65,26 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; | ||
* | ||
* By default, if the entire batch fails, the function will throw an error. | ||
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false` | ||
* | ||
* @example | ||
* ```typescript | ||
* import { | ||
* SqsFifoPartialProcessor, | ||
* processPartialResponseSync, | ||
* } from '@aws-lambda-powertools/batch'; | ||
* import type { SQSRecord, SQSHandler } from 'aws-lambda'; | ||
* | ||
* const processor = new SqsFifoPartialProcessor(); | ||
* | ||
* const recordHandler = async (record: SQSRecord): Promise<void> => { | ||
* const payload = JSON.parse(record.body); | ||
* }; | ||
* | ||
* export const handler: SQSHandler = async (event, context) => | ||
* processPartialResponseSync(event, recordHandler, processor, { | ||
* context, | ||
* throwOnFullBatchFailure: false | ||
* }); | ||
* ``` | ||
* | ||
* @param event The event object containing the batch of records | ||
@@ -67,0 +91,0 @@ * @param recordHandler Sync function to process each record from the batch |
@@ -64,2 +64,26 @@ import { UnexpectedBatchTypeError } from './errors.js'; | ||
* | ||
* By default, if the entire batch fails, the function will throw an error. | ||
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false` | ||
* | ||
* @example | ||
* ```typescript | ||
* import { | ||
* SqsFifoPartialProcessor, | ||
* processPartialResponseSync, | ||
* } from '@aws-lambda-powertools/batch'; | ||
* import type { SQSRecord, SQSHandler } from 'aws-lambda'; | ||
* | ||
* const processor = new SqsFifoPartialProcessor(); | ||
* | ||
* const recordHandler = async (record: SQSRecord): Promise<void> => { | ||
* const payload = JSON.parse(record.body); | ||
* }; | ||
* | ||
* export const handler: SQSHandler = async (event, context) => | ||
* processPartialResponseSync(event, recordHandler, processor, { | ||
* context, | ||
* throwOnFullBatchFailure: false | ||
* }); | ||
* ``` | ||
* | ||
* @param event The event object containing the batch of records | ||
@@ -66,0 +90,0 @@ * @param recordHandler Sync function to process each record from the batch |
@@ -1,13 +0,1 @@ | ||
var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) { | ||
if (kind === "m") throw new TypeError("Private method is not writable"); | ||
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter"); | ||
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it"); | ||
return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value; | ||
}; | ||
var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) { | ||
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter"); | ||
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it"); | ||
return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver); | ||
}; | ||
var _SqsFifoPartialProcessor_instances, _SqsFifoPartialProcessor_currentGroupId, _SqsFifoPartialProcessor_failedGroupIds, _SqsFifoPartialProcessor_addToFailedGroup, _SqsFifoPartialProcessor_processFailRecord, _SqsFifoPartialProcessor_setCurrentGroup; | ||
import { BatchProcessorSync } from './BatchProcessorSync.js'; | ||
@@ -48,14 +36,13 @@ import { EventType } from './constants.js'; | ||
class SqsFifoPartialProcessor extends BatchProcessorSync { | ||
/** | ||
* The ID of the current message group being processed. | ||
*/ | ||
#currentGroupId; | ||
/** | ||
* A set of group IDs that have already encountered failures. | ||
*/ | ||
#failedGroupIds; | ||
constructor() { | ||
super(EventType.SQS); | ||
_SqsFifoPartialProcessor_instances.add(this); | ||
/** | ||
* The ID of the current message group being processed. | ||
*/ | ||
_SqsFifoPartialProcessor_currentGroupId.set(this, void 0); | ||
/** | ||
* A set of group IDs that have already encountered failures. | ||
*/ | ||
_SqsFifoPartialProcessor_failedGroupIds.set(this, void 0); | ||
__classPrivateFieldSet(this, _SqsFifoPartialProcessor_failedGroupIds, new Set(), "f"); | ||
this.#failedGroupIds = new Set(); | ||
} | ||
@@ -70,4 +57,4 @@ /** | ||
failureHandler(record, exception) { | ||
if (this.options?.skipGroupOnError && __classPrivateFieldGet(this, _SqsFifoPartialProcessor_currentGroupId, "f")) { | ||
__classPrivateFieldGet(this, _SqsFifoPartialProcessor_instances, "m", _SqsFifoPartialProcessor_addToFailedGroup).call(this, __classPrivateFieldGet(this, _SqsFifoPartialProcessor_currentGroupId, "f")); | ||
if (this.options?.skipGroupOnError && this.#currentGroupId) { | ||
this.#addToFailedGroup(this.#currentGroupId); | ||
} | ||
@@ -99,3 +86,3 @@ return super.failureHandler(record, exception); | ||
for (const record of this.records) { | ||
__classPrivateFieldGet(this, _SqsFifoPartialProcessor_instances, "m", _SqsFifoPartialProcessor_setCurrentGroup).call(this, record.attributes?.MessageGroupId); | ||
this.#setCurrentGroup(record.attributes?.MessageGroupId); | ||
// If we have any failed messages, we should then short circuit the process and | ||
@@ -110,6 +97,6 @@ // fail remaining messages unless `skipGroupOnError` is true | ||
const shouldSkipCurrentGroup = this.options?.skipGroupOnError && | ||
__classPrivateFieldGet(this, _SqsFifoPartialProcessor_currentGroupId, "f") && | ||
__classPrivateFieldGet(this, _SqsFifoPartialProcessor_failedGroupIds, "f").has(__classPrivateFieldGet(this, _SqsFifoPartialProcessor_currentGroupId, "f")); | ||
this.#currentGroupId && | ||
this.#failedGroupIds.has(this.#currentGroupId); | ||
const result = shouldSkipCurrentGroup | ||
? __classPrivateFieldGet(this, _SqsFifoPartialProcessor_instances, "m", _SqsFifoPartialProcessor_processFailRecord).call(this, record, new SqsFifoMessageGroupShortCircuitError()) | ||
? this.#processFailRecord(record, new SqsFifoMessageGroupShortCircuitError()) | ||
: this.processRecordSync(record); | ||
@@ -137,3 +124,3 @@ processedRecords.push(result); | ||
for (const record of remainingRecords) { | ||
__classPrivateFieldGet(this, _SqsFifoPartialProcessor_instances, "m", _SqsFifoPartialProcessor_processFailRecord).call(this, record, new SqsFifoShortCircuitError()); | ||
this.#processFailRecord(record, new SqsFifoShortCircuitError()); | ||
} | ||
@@ -143,11 +130,29 @@ this.clean(); | ||
} | ||
/** | ||
* Adds the specified group ID to the set of failed group IDs. | ||
* | ||
* @param group - The group ID to be added to the set of failed group IDs. | ||
*/ | ||
#addToFailedGroup(group) { | ||
this.#failedGroupIds.add(group); | ||
} | ||
/** | ||
* Processes a fail record. | ||
* | ||
* @param record - The record that failed. | ||
* @param exception - The error that occurred. | ||
*/ | ||
#processFailRecord(record, exception) { | ||
const data = this.toBatchType(record, this.eventType); | ||
return this.failureHandler(data, exception); | ||
} | ||
/** | ||
* Sets the current group ID for the message being processed. | ||
* | ||
* @param group - The group ID of the current message being processed. | ||
*/ | ||
#setCurrentGroup(group) { | ||
this.#currentGroupId = group; | ||
} | ||
} | ||
_SqsFifoPartialProcessor_currentGroupId = new WeakMap(), _SqsFifoPartialProcessor_failedGroupIds = new WeakMap(), _SqsFifoPartialProcessor_instances = new WeakSet(), _SqsFifoPartialProcessor_addToFailedGroup = function _SqsFifoPartialProcessor_addToFailedGroup(group) { | ||
__classPrivateFieldGet(this, _SqsFifoPartialProcessor_failedGroupIds, "f").add(group); | ||
}, _SqsFifoPartialProcessor_processFailRecord = function _SqsFifoPartialProcessor_processFailRecord(record, exception) { | ||
const data = this.toBatchType(record, this.eventType); | ||
return this.failureHandler(data, exception); | ||
}, _SqsFifoPartialProcessor_setCurrentGroup = function _SqsFifoPartialProcessor_setCurrentGroup(group) { | ||
__classPrivateFieldSet(this, _SqsFifoPartialProcessor_currentGroupId, group, "f"); | ||
}; | ||
export { SqsFifoPartialProcessor }; |
@@ -10,2 +10,3 @@ import type { Context, DynamoDBRecord, KinesisStreamRecord, SQSRecord } from 'aws-lambda'; | ||
* @property skipGroupOnError The option to group on error during processing | ||
* @property throwOnFullBatchFailure The option to throw an error if the entire batch fails | ||
*/ | ||
@@ -23,2 +24,6 @@ type BatchProcessingOptions<T = BasePartialBatchProcessor> = { | ||
skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never; | ||
/** | ||
* Set this to false to prevent throwing an error if the entire batch fails. | ||
*/ | ||
throwOnFullBatchFailure?: boolean; | ||
}; | ||
@@ -25,0 +30,0 @@ /** |
{ | ||
"name": "@aws-lambda-powertools/batch", | ||
"version": "2.3.0", | ||
"version": "2.4.0", | ||
"description": "The batch processing package for the Powertools for AWS Lambda (TypeScript) library.", | ||
@@ -5,0 +5,0 @@ "author": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
167622
2.98%3718
8.59%