Skip to content

Conversation

sdangol
Copy link
Contributor

@sdangol sdangol commented Sep 3, 2025

Summary

This PR integrates the Parser functionality with the Batch Processor so that customers can parse and validate payloads before they're passed to the record handler. It supports both the extended schemas for the SQSRecord, KinesisRecord, and DynamoDBRecord as well as the inner payload schema.

Changes

Please provide a summary of what's being changed

  • Added parser as dev dependency
  • Added a config to the constructor of BasePartialBatchProcessor to set the schema property
  • Added a parseRecord method to the BatchProcessor to do the parsing by dynamically importing the parse function and the appropriate schema

Please add the issue number below, if no issue is present the PR might get blocked and not be reviewed

Issue number: closes #4394


By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.

@boring-cyborg boring-cyborg bot added batch This item relates to the Batch Processing Utility dependencies Changes that touch dependencies, e.g. Dependabot, etc. tests PRs that add or change tests labels Sep 3, 2025
@pull-request-size pull-request-size bot added the size/L PRs between 100-499 LOC label Sep 3, 2025
@sdangol sdangol requested review from dreamorosi and svozza September 3, 2025 08:37
@sdangol sdangol self-assigned this Sep 3, 2025
'@aws-lambda-powertools/parser/schemas/sqs'
);
const extendedSchema = SqsRecordSchema.extend({
// biome-ignore lint/suspicious/noExplicitAny: at least for now, we need to broaden the type because the JSONstringified helper method is not typed with StandardSchemaV1 but with ZodSchema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, I don't think we can support this use case at runtime anyway.

If someone passes us a complete StandardSchema schema from another library, we'll be able to use it as-is (try block).

For us to be able to extend the built-in SqsRecordSchema however we must receive a Zod schema or both the extend and JSONStringified will fail at runtime - so the type error is correct here.

With this in mind, we might want to use the vendor key on the schema (see spec) to check if it's a Zod schema and throw either 1/ throw an error or 2/ log a warning and ignore parsing if the schema is from another library we can't extend with.

If instead we find this conditional behavior confusing, we'll have to restrict the type of schema to Zod schemas only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in favour of throwing here, if we just log a warning them we run the risk of passing on invalid events.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both throwing an error and continue processing an invalid event would result in the item being marked as failed by the Batch Processor and sent back to the source.

Most likely we should also log a warning, otherwise all operators would see is the item being reprocessed and eventually be sent to a DLQ or lost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good point, it will be much easier to diagnose if we emit a log statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to cast the schema to Zod schema, do we need to dynamically import Zod as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm testing the package on a function and it works, but I think we forgot to emit some kind of warning/log/error when the parsing fails.

Is the Python implementation doing anything in this regard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the python implementation for handling parsing failure. Should we do something similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to test it for Kinesis using https://github.com/dreamorosi/kinesis-batch and it was failing, trying to debug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Message me, I got it working fine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do something similar?

Yes, but in order to be able to do that we'll need to add a logger feature which we currently don't have.

Let's put a pin on this and I'll open a separate issue.

@dreamorosi
Copy link
Contributor

Can you also please address all the Sonar findings?

@sdangol sdangol marked this pull request as draft September 3, 2025 12:47
@sdangol
Copy link
Contributor Author

sdangol commented Sep 3, 2025

@dreamorosi I'm still a bit confused about extending the KinesisDataStreamRecord.

if (eventType === EventType.KinesisDataStreams) {
      const extendedSchemaParsing = parse(record, undefined, schema, true);
      if (extendedSchemaParsing.success)
        return extendedSchemaParsing.data as KinesisStreamRecord;
      if (schema['~standard'].vendor === SchemaType.Zod) {
        const { JSONStringified } = await import(
          '@aws-lambda-powertools/parser/helpers'
        );
        const { KinesisDataStreamRecord } = await import(
          '@aws-lambda-powertools/parser/schemas/kinesis'
        );
        const extendedSchema = KinesisDataStreamRecord.extend({
          // biome-ignore lint/suspicious/noExplicitAny: The vendor field in the schema is verified that the schema is a Zod schema
          data: // Decompress and decode the data to match against schema
        });
        return parse(record, undefined, extendedSchema);
      }
      console.warn(
        'The schema provided is not supported. Only Zod schemas are supported for extension.'
      );
      throw new Error('Unsupported schema type');
    }

To extend it, should I create another helper which does the decompressions and decoding?
Or should I just use Envelopes for the whole thing. But, using Envelopes would just return the internal payload and our record handler expects the whole Record. We could maybe update the Record with the parsed payload and return it.
What would you suggest?

@dreamorosi
Copy link
Contributor

I've been thinking about your question for a while and there's no straightforward way to do it with our current feature set.

We can't use envelopes because of what you mentioned, and doing the parsing in two parts while possible, is suboptimal for two reasons:

  • we'd be iterating through the entire batch in order to parse each record and then we'd have to recompose the object
  • in case of parsing errors the path of the failed field(s) would be messed up since we'd no longer parse a schema (i.e. there's an error at the 2nd item in the batch, the error path should be something like Records.1.kinesis.data.foo)

With this in mind, I think we'll have to extract the transform logic that we have here into its own helper (with a dedicated PR) and then use the helper in the Batch Processing utility when extending.

@pull-request-size pull-request-size bot added size/XL PRs between 500-999 LOC, often PRs that grown with feedback and removed size/L PRs between 100-499 LOC labels Sep 3, 2025
@sdangol sdangol changed the title feat(parser): integrate parser with Batch Processing for SQSRecord feat(parser): integrate parser with Batch Processing Sep 3, 2025
@sdangol sdangol force-pushed the feat/parser-integration-batch-processing branch from 31767a1 to f1e8f70 Compare September 4, 2025 13:08
@sdangol sdangol marked this pull request as ready for review September 4, 2025 13:14
@dreamorosi
Copy link
Contributor

@sdangol can you look at the SonarCloud issue?

@sdangol sdangol requested review from dreamorosi and svozza September 4, 2025 13:20
@sdangol
Copy link
Contributor Author

sdangol commented Sep 4, 2025

@dreamorosi @svozza This is now ready to be reviewed.

@dreamorosi
Copy link
Contributor

The SonarCloud message still shows an issue

…:aws-powertools/powertools-lambda-typescript into feat/parser-integration-batch-processing
Copy link

sonarqubecloud bot commented Sep 4, 2025

@sdangol sdangol requested a review from svozza September 4, 2025 14:46
@sdangol sdangol added the do-not-merge This item should not be merged label Sep 4, 2025
import('@aws-lambda-powertools/parser/schemas/sqs'),
]);
return SqsRecordSchema.extend({
body: JSONStringified(schema as any),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dreamorosi The body could just be a string only as well, right? If we try to extend using JSONStringified, it currently fails for when we pass a string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not need to use the JSONStringified helper here, or any other helpers in other events? and assume customers will use the necessary helpers if they need to?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
batch This item relates to the Batch Processing Utility dependencies Changes that touch dependencies, e.g. Dependabot, etc. do-not-merge This item should not be merged size/XL PRs between 500-999 LOC, often PRs that grown with feedback tests PRs that add or change tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature request: Parser integration for Batch Processing
3 participants