Skip to content

Conversation

sdangol
Copy link
Contributor

@sdangol sdangol commented Sep 3, 2025

Summary

This PR integrates the Parser functionality with the Batch Processor so that customers can parse and validate payloads before they're passed to the record handler. It supports both the extended schemas for the SQSRecord, KinesisRecord, and DynamoDBRecord as well as the inner payload schema.

Changes

Please provide a summary of what's being changed

  • Added parser as dev dependency
  • Added a config to the constructor of BasePartialBatchProcessor to set the schema property
  • Added a parseRecord method to the BatchProcessor to do the parsing by dynamically importing the parse function and the appropriate schema

Please add the issue number below, if no issue is present the PR might get blocked and not be reviewed

Issue number: closes #4394


By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.

@boring-cyborg boring-cyborg bot added batch This item relates to the Batch Processing Utility dependencies Changes that touch dependencies, e.g. Dependabot, etc. tests PRs that add or change tests labels Sep 3, 2025
@pull-request-size pull-request-size bot added the size/L PRs between 100-499 LOC label Sep 3, 2025
@sdangol sdangol requested review from dreamorosi and svozza September 3, 2025 08:37
@sdangol sdangol self-assigned this Sep 3, 2025
@dreamorosi
Copy link
Contributor

Can you also please address all the Sonar findings?

@sdangol sdangol marked this pull request as draft September 3, 2025 12:47
@sdangol
Copy link
Contributor Author

sdangol commented Sep 3, 2025

@dreamorosi I'm still a bit confused about extending the KinesisDataStreamRecord.

if (eventType === EventType.KinesisDataStreams) {
      const extendedSchemaParsing = parse(record, undefined, schema, true);
      if (extendedSchemaParsing.success)
        return extendedSchemaParsing.data as KinesisStreamRecord;
      if (schema['~standard'].vendor === SchemaType.Zod) {
        const { JSONStringified } = await import(
          '@aws-lambda-powertools/parser/helpers'
        );
        const { KinesisDataStreamRecord } = await import(
          '@aws-lambda-powertools/parser/schemas/kinesis'
        );
        const extendedSchema = KinesisDataStreamRecord.extend({
          // biome-ignore lint/suspicious/noExplicitAny: The vendor field in the schema is verified that the schema is a Zod schema
          data: // Decompress and decode the data to match against schema
        });
        return parse(record, undefined, extendedSchema);
      }
      console.warn(
        'The schema provided is not supported. Only Zod schemas are supported for extension.'
      );
      throw new Error('Unsupported schema type');
    }

To extend it, should I create another helper which does the decompressions and decoding?
Or should I just use Envelopes for the whole thing. But, using Envelopes would just return the internal payload and our record handler expects the whole Record. We could maybe update the Record with the parsed payload and return it.
What would you suggest?

@dreamorosi
Copy link
Contributor

I've been thinking about your question for a while and there's no straightforward way to do it with our current feature set.

We can't use envelopes because of what you mentioned, and doing the parsing in two parts while possible, is suboptimal for two reasons:

  • we'd be iterating through the entire batch in order to parse each record and then we'd have to recompose the object
  • in case of parsing errors the path of the failed field(s) would be messed up since we'd no longer parse a schema (i.e. there's an error at the 2nd item in the batch, the error path should be something like Records.1.kinesis.data.foo)

With this in mind, I think we'll have to extract the transform logic that we have here into its own helper (with a dedicated PR) and then use the helper in the Batch Processing utility when extending.

@pull-request-size pull-request-size bot added size/XL PRs between 500-999 LOC, often PRs that grown with feedback and removed size/L PRs between 100-499 LOC labels Sep 3, 2025
@sdangol sdangol changed the title feat(parser): integrate parser with Batch Processing for SQSRecord feat(parser): integrate parser with Batch Processing Sep 3, 2025
svozza
svozza previously approved these changes Sep 4, 2025
@sdangol sdangol added the do-not-merge This item should not be merged label Sep 4, 2025
@pull-request-size pull-request-size bot added size/XXL PRs with 1K+ LOC, largely documentation related and removed size/XL PRs between 500-999 LOC, often PRs that grown with feedback labels Sep 4, 2025
@sdangol sdangol requested a review from svozza September 5, 2025 07:57
@pull-request-size pull-request-size bot added size/XL PRs between 500-999 LOC, often PRs that grown with feedback and removed size/XXL PRs with 1K+ LOC, largely documentation related labels Sep 5, 2025
Copy link
Contributor

@dreamorosi dreamorosi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a couple of last styling and documentation details and then we're good to merge!

Copy link

sonarqubecloud bot commented Sep 8, 2025

@sdangol sdangol requested a review from dreamorosi September 8, 2025 09:43
@sdangol sdangol removed the do-not-merge This item should not be merged label Sep 8, 2025
@sdangol sdangol merged commit 0b6bbbb into main Sep 8, 2025
39 checks passed
@sdangol sdangol deleted the feat/parser-integration-batch-processing branch September 8, 2025 09:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
batch This item relates to the Batch Processing Utility dependencies Changes that touch dependencies, e.g. Dependabot, etc. size/XL PRs between 500-999 LOC, often PRs that grown with feedback tests PRs that add or change tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature request: Parser integration for Batch Processing
3 participants