Skip to content

Conversation

sdangol
Copy link
Contributor

@sdangol sdangol commented Sep 3, 2025

Summary

This PR integrates the Parser functionality with the Batch Processor so that customers can parse and validate payloads before they're passed to the record handler. It supports both the extended schemas for the SQSRecord, KinesisRecord, and DynamoDBRecord as well as the inner payload schema.

Changes

Please provide a summary of what's being changed

  • Added parser as dev dependency
  • Added a config to the constructor of BasePartialBatchProcessor to set the schema property
  • Added a parseRecord method to the BatchProcessor to do the parsing by dynamically importing the parse function and the appropriate schema

Please add the issue number below, if no issue is present the PR might get blocked and not be reviewed

Issue number: closes #4394


By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.

@boring-cyborg boring-cyborg bot added batch This item relates to the Batch Processing Utility dependencies Changes that touch dependencies, e.g. Dependabot, etc. tests PRs that add or change tests labels Sep 3, 2025
@pull-request-size pull-request-size bot added the size/L PRs between 100-499 LOC label Sep 3, 2025
@sdangol sdangol requested review from dreamorosi and svozza September 3, 2025 08:37
@sdangol sdangol self-assigned this Sep 3, 2025
'@aws-lambda-powertools/parser/schemas/sqs'
);
const extendedSchema = SqsRecordSchema.extend({
// biome-ignore lint/suspicious/noExplicitAny: at least for now, we need to broaden the type because the JSONstringified helper method is not typed with StandardSchemaV1 but with ZodSchema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, I don't think we can support this use case at runtime anyway.

If someone passes us a complete StandardSchema schema from another library, we'll be able to use it as-is (try block).

For us to be able to extend the built-in SqsRecordSchema however we must receive a Zod schema or both the extend and JSONStringified will fail at runtime - so the type error is correct here.

With this in mind, we might want to use the vendor key on the schema (see spec) to check if it's a Zod schema and throw either 1/ throw an error or 2/ log a warning and ignore parsing if the schema is from another library we can't extend with.

If instead we find this conditional behavior confusing, we'll have to restrict the type of schema to Zod schemas only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in favour of throwing here, if we just log a warning them we run the risk of passing on invalid events.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both throwing an error and continue processing an invalid event would result in the item being marked as failed by the Batch Processor and sent back to the source.

Most likely we should also log a warning, otherwise all operators would see is the item being reprocessed and eventually be sent to a DLQ or lost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good point, it will be much easier to diagnose if we emit a log statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to cast the schema to Zod schema, do we need to dynamically import Zod as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm testing the package on a function and it works, but I think we forgot to emit some kind of warning/log/error when the parsing fails.

Is the Python implementation doing anything in this regard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the python implementation for handling parsing failure. Should we do something similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to test it for Kinesis using https://github.com/dreamorosi/kinesis-batch and it was failing, trying to debug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Message me, I got it working fine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do something similar?

Yes, but in order to be able to do that we'll need to add a logger feature which we currently don't have.

Let's put a pin on this and I'll open a separate issue.

@dreamorosi
Copy link
Contributor

Can you also please address all the Sonar findings?

@sdangol sdangol marked this pull request as draft September 3, 2025 12:47
@sdangol
Copy link
Contributor Author

sdangol commented Sep 3, 2025

@dreamorosi I'm still a bit confused about extending the KinesisDataStreamRecord.

if (eventType === EventType.KinesisDataStreams) {
      const extendedSchemaParsing = parse(record, undefined, schema, true);
      if (extendedSchemaParsing.success)
        return extendedSchemaParsing.data as KinesisStreamRecord;
      if (schema['~standard'].vendor === SchemaType.Zod) {
        const { JSONStringified } = await import(
          '@aws-lambda-powertools/parser/helpers'
        );
        const { KinesisDataStreamRecord } = await import(
          '@aws-lambda-powertools/parser/schemas/kinesis'
        );
        const extendedSchema = KinesisDataStreamRecord.extend({
          // biome-ignore lint/suspicious/noExplicitAny: The vendor field in the schema is verified that the schema is a Zod schema
          data: // Decompress and decode the data to match against schema
        });
        return parse(record, undefined, extendedSchema);
      }
      console.warn(
        'The schema provided is not supported. Only Zod schemas are supported for extension.'
      );
      throw new Error('Unsupported schema type');
    }

To extend it, should I create another helper which does the decompressions and decoding?
Or should I just use Envelopes for the whole thing. But, using Envelopes would just return the internal payload and our record handler expects the whole Record. We could maybe update the Record with the parsed payload and return it.
What would you suggest?

@dreamorosi
Copy link
Contributor

I've been thinking about your question for a while and there's no straightforward way to do it with our current feature set.

We can't use envelopes because of what you mentioned, and doing the parsing in two parts while possible, is suboptimal for two reasons:

  • we'd be iterating through the entire batch in order to parse each record and then we'd have to recompose the object
  • in case of parsing errors the path of the failed field(s) would be messed up since we'd no longer parse a schema (i.e. there's an error at the 2nd item in the batch, the error path should be something like Records.1.kinesis.data.foo)

With this in mind, I think we'll have to extract the transform logic that we have here into its own helper (with a dedicated PR) and then use the helper in the Batch Processing utility when extending.

@pull-request-size pull-request-size bot added size/XL PRs between 500-999 LOC, often PRs that grown with feedback and removed size/L PRs between 100-499 LOC labels Sep 3, 2025
@sdangol sdangol changed the title feat(parser): integrate parser with Batch Processing for SQSRecord feat(parser): integrate parser with Batch Processing Sep 3, 2025
@dreamorosi
Copy link
Contributor

The SonarCloud message still shows an issue

…:aws-powertools/powertools-lambda-typescript into feat/parser-integration-batch-processing
@sdangol sdangol requested a review from svozza September 4, 2025 14:46
svozza
svozza previously approved these changes Sep 4, 2025
@sdangol sdangol added the do-not-merge This item should not be merged label Sep 4, 2025
@pull-request-size pull-request-size bot added size/XXL PRs with 1K+ LOC, largely documentation related and removed size/XL PRs between 500-999 LOC, often PRs that grown with feedback labels Sep 4, 2025
@sdangol sdangol requested a review from svozza September 5, 2025 07:57
@pull-request-size pull-request-size bot added size/XL PRs between 500-999 LOC, often PRs that grown with feedback and removed size/XXL PRs with 1K+ LOC, largely documentation related labels Sep 5, 2025
Copy link

sonarqubecloud bot commented Sep 5, 2025

/**
* Enum of supported schema types for the utility
*/
const SchemaType = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const SchemaType = {
const SchemaVendor = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For new methods, I'd prefer creating signatures with objects rather than positional args since it makes it easier to read/understand at the call site.

For example, with the current signatures calling parseRecord looks like: await this.parseRecord(record, this.eventType, this.schema); which is not too bad because the args being passed are named correctly.

Calling createExtendedSchema instead looks much more opaque: await this.createExtendedSchema(eventType, schema, false) making it unclear what false stands for.

If the signature was:

private async createExtendedSchema(options: {
  eventType: keyof typeof EventType,
  schema: StandardSchemaV1,
  useTransformers: boolean
})

calling it would look like: await this.createExtendedSchema({ eventType, schema, useTransformers : false })` which is much clearer.

Also, unless the methods are public or protected, let's use #methodName instead of private methodName to make them truly private at runtime.

schema,
true
);
return parse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using safeParse: true in this call will make it easier to add debug logs later since right now a failed parsing exits this scope.

Similarly, for the parse call above we could achieve the same without generating a stack trace, which can be more performant.

*
* Only Zod Schemas are supported for automatic schema extension
*
* @param record The record to be parsed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param record The record to be parsed
* @param record - The record to be parsed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for the other @param added in this diff

* If the passed schema is already an extended schema,
* it directly uses the schema to parse the record
*
* Only Zod Schemas are supported for automatic schema extension
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use punctuation correctly throughout the docstring.

@@ -89,6 +91,10 @@ type PartialItemFailures = { itemIdentifier: string };
*/
type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] };

type BasePartialBatchProcessorConfig = {
schema: StandardSchemaV1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to document schema so that customer know what it does and all its nuances.

Copy link
Contributor

@dreamorosi dreamorosi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a couple of last styling and documentation details and then we're good to merge!

return extendedSchemaParsing.data as EventSourceDataClassTypes;
}
// Only proceed with schema extension if it's a Zod schema
if (schema['~standard'].vendor !== SchemaType.Zod) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not do this check as the very first thing in this function? We're wasting work doing the parsing if all we're going to do is throw based on information we had before we began parsing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
batch This item relates to the Batch Processing Utility dependencies Changes that touch dependencies, e.g. Dependabot, etc. do-not-merge This item should not be merged size/XL PRs between 500-999 LOC, often PRs that grown with feedback tests PRs that add or change tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature request: Parser integration for Batch Processing
3 participants