Skip to content

Feature request: return result of batch processing #6863

Open
@manzkel

Description

@manzkel

Use case

Hello there,
we have the following use case, where we need to validate records on a per record basis, as part of the validation we also enrich the data with extra information. At the end of this we want to be able to have a function run once on all those validated records (e.g. an insert into the database in bulk).

Currently the implementation for process_partial_response in the batchprocessor does not read out the returned value of the processor.process call, meaning one is not able to re-use the value and will have to recompute the information.

We are currently slightly modifying the code (see below).

Solution/User Experience

I will attach below the modified version we use the question of whether not only the "success" records should be returned (e.g. for monitoring should be up for debate too I suppose)

def process_partial_response_with_return(
    event: dict,
    record_handler: Callable,
    processor: BatchProcessor,
    context: LambdaContext | None = None,
) -> tuple[PartialItemFailureResponse, list[Any]]:
    """
    Higher level function to handle batch event processing.

    Parameters
    ----------
    event: dict
        Lambda's original event
    record_handler: Callable
        Callable to process each record from the batch
    processor: BasePartialBatchProcessor
        Batch Processor to handle partial failure cases
    context: LambdaContext
        Lambda's context, used to optionally inject in record handler

    Returns
    -------
    result: PartialItemFailureResponse
        Lambda Partial Batch Response

    Example
    --------
    **Processes Lambda's SQS event**

    ```python
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
    from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

    processor = BatchProcessor(EventType.SQS)

    def record_handler(record: SQSRecord):
        return record.body

    def handler(event, context):
        return process_partial_response(
            event=event, record_handler=record_handler, processor=processor, context=context
        )
    ```

    Limitations
    -----------
    * Async batch processors. Use `async_process_partial_response` instead.
    """
    try:
        records: list[dict] = event.get("Records", [])
        if not records or not isinstance(records, list):
            raise UnexpectedBatchTypeError(
                "Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams",
            )

    except AttributeError:
        event_types = ", ".join(list(EventType.__members__))
        docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs"  # noqa: E501 # long-line
        raise ValueError( # noqa: B904
            f"Invalid event format. Please ensure batch event is a valid {processor.event_type.value} event. \n"
            f"See sample events in our documentation for either {event_types}: \n {docs}",
        )

    with processor(records, record_handler, context):
        returned = processor.process()

    successful_returns = [data for status, data, _ in returned if status.lower() == "success"]

    return processor.response(), successful_returns

Alternative solutions

Acknowledgment

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    Pending customer

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions