-
Notifications
You must be signed in to change notification settings - Fork 171
feat(parser): integrate parser with Batch Processing #4408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
packages/batch/src/BatchProcessor.ts
Outdated
'@aws-lambda-powertools/parser/schemas/sqs' | ||
); | ||
const extendedSchema = SqsRecordSchema.extend({ | ||
// biome-ignore lint/suspicious/noExplicitAny: at least for now, we need to broaden the type because the JSONstringified helper method is not typed with StandardSchemaV1 but with ZodSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In practice, I don't think we can support this use case at runtime anyway.
If someone passes us a complete StandardSchema schema from another library, we'll be able to use it as-is (try
block).
For us to be able to extend the built-in SqsRecordSchema
however we must receive a Zod schema or both the extend
and JSONStringified
will fail at runtime - so the type error is correct here.
With this in mind, we might want to use the vendor
key on the schema (see spec) to check if it's a Zod schema and throw either 1/ throw an error or 2/ log a warning and ignore parsing if the schema is from another library we can't extend with.
If instead we find this conditional behavior confusing, we'll have to restrict the type of schema
to Zod schemas only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm in favour of throwing here, if we just log a warning them we run the risk of passing on invalid events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both throwing an error and continue processing an invalid event would result in the item being marked as failed by the Batch Processor and sent back to the source.
Most likely we should also log a warning, otherwise all operators would see is the item being reprocessed and eventually be sent to a DLQ or lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a good point, it will be much easier to diagnose if we emit a log statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to cast the schema to Zod schema, do we need to dynamically import Zod as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm testing the package on a function and it works, but I think we forgot to emit some kind of warning/log/error when the parsing fails.
Is the Python implementation doing anything in this regard?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the python implementation for handling parsing failure. Should we do something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to test it for Kinesis using https://github.com/dreamorosi/kinesis-batch and it was failing, trying to debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Message me, I got it working fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do something similar?
Yes, but in order to be able to do that we'll need to add a logger feature which we currently don't have.
Let's put a pin on this and I'll open a separate issue.
Can you also please address all the Sonar findings? |
@dreamorosi I'm still a bit confused about extending the if (eventType === EventType.KinesisDataStreams) {
const extendedSchemaParsing = parse(record, undefined, schema, true);
if (extendedSchemaParsing.success)
return extendedSchemaParsing.data as KinesisStreamRecord;
if (schema['~standard'].vendor === SchemaType.Zod) {
const { JSONStringified } = await import(
'@aws-lambda-powertools/parser/helpers'
);
const { KinesisDataStreamRecord } = await import(
'@aws-lambda-powertools/parser/schemas/kinesis'
);
const extendedSchema = KinesisDataStreamRecord.extend({
// biome-ignore lint/suspicious/noExplicitAny: The vendor field in the schema is verified that the schema is a Zod schema
data: // Decompress and decode the data to match against schema
});
return parse(record, undefined, extendedSchema);
}
console.warn(
'The schema provided is not supported. Only Zod schemas are supported for extension.'
);
throw new Error('Unsupported schema type');
} To extend it, should I create another helper which does the decompressions and decoding? |
I've been thinking about your question for a while and there's no straightforward way to do it with our current feature set. We can't use envelopes because of what you mentioned, and doing the parsing in two parts while possible, is suboptimal for two reasons:
With this in mind, I think we'll have to extract the |
31767a1
to
f1e8f70
Compare
@sdangol can you look at the SonarCloud issue? |
@dreamorosi @svozza This is now ready to be reviewed. |
The SonarCloud message still shows an issue |
…:aws-powertools/powertools-lambda-typescript into feat/parser-integration-batch-processing
|
import('@aws-lambda-powertools/parser/schemas/sqs'), | ||
]); | ||
return SqsRecordSchema.extend({ | ||
body: JSONStringified(schema as any), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dreamorosi The body could just be a string only as well, right? If we try to extend using JSONStringified
, it currently fails for when we pass a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not need to use the JSONStringified
helper here, or any other helpers in other events? and assume customers will use the necessary helpers if they need to?
Summary
This PR integrates the Parser functionality with the Batch Processor so that customers can parse and validate payloads before they're passed to the record handler. It supports both the extended schemas for the SQSRecord, KinesisRecord, and DynamoDBRecord as well as the inner payload schema.
Changes
BasePartialBatchProcessor
to set the schema propertyparseRecord
method to theBatchProcessor
to do the parsing by dynamically importing the parse function and the appropriate schemaIssue number: closes #4394
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.