-
Notifications
You must be signed in to change notification settings - Fork 171
feat(parser): integrate parser with Batch Processing #4408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
packages/batch/src/BatchProcessor.ts
Outdated
'@aws-lambda-powertools/parser/schemas/sqs' | ||
); | ||
const extendedSchema = SqsRecordSchema.extend({ | ||
// biome-ignore lint/suspicious/noExplicitAny: at least for now, we need to broaden the type because the JSONstringified helper method is not typed with StandardSchemaV1 but with ZodSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In practice, I don't think we can support this use case at runtime anyway.
If someone passes us a complete StandardSchema schema from another library, we'll be able to use it as-is (try
block).
For us to be able to extend the built-in SqsRecordSchema
however we must receive a Zod schema or both the extend
and JSONStringified
will fail at runtime - so the type error is correct here.
With this in mind, we might want to use the vendor
key on the schema (see spec) to check if it's a Zod schema and throw either 1/ throw an error or 2/ log a warning and ignore parsing if the schema is from another library we can't extend with.
If instead we find this conditional behavior confusing, we'll have to restrict the type of schema
to Zod schemas only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm in favour of throwing here, if we just log a warning them we run the risk of passing on invalid events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both throwing an error and continue processing an invalid event would result in the item being marked as failed by the Batch Processor and sent back to the source.
Most likely we should also log a warning, otherwise all operators would see is the item being reprocessed and eventually be sent to a DLQ or lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a good point, it will be much easier to diagnose if we emit a log statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to cast the schema to Zod schema, do we need to dynamically import Zod as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm testing the package on a function and it works, but I think we forgot to emit some kind of warning/log/error when the parsing fails.
Is the Python implementation doing anything in this regard?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the python implementation for handling parsing failure. Should we do something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to test it for Kinesis using https://github.com/dreamorosi/kinesis-batch and it was failing, trying to debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Message me, I got it working fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do something similar?
Yes, but in order to be able to do that we'll need to add a logger feature which we currently don't have.
Let's put a pin on this and I'll open a separate issue.
Can you also please address all the Sonar findings? |
@dreamorosi I'm still a bit confused about extending the if (eventType === EventType.KinesisDataStreams) {
const extendedSchemaParsing = parse(record, undefined, schema, true);
if (extendedSchemaParsing.success)
return extendedSchemaParsing.data as KinesisStreamRecord;
if (schema['~standard'].vendor === SchemaType.Zod) {
const { JSONStringified } = await import(
'@aws-lambda-powertools/parser/helpers'
);
const { KinesisDataStreamRecord } = await import(
'@aws-lambda-powertools/parser/schemas/kinesis'
);
const extendedSchema = KinesisDataStreamRecord.extend({
// biome-ignore lint/suspicious/noExplicitAny: The vendor field in the schema is verified that the schema is a Zod schema
data: // Decompress and decode the data to match against schema
});
return parse(record, undefined, extendedSchema);
}
console.warn(
'The schema provided is not supported. Only Zod schemas are supported for extension.'
);
throw new Error('Unsupported schema type');
} To extend it, should I create another helper which does the decompressions and decoding? |
I've been thinking about your question for a while and there's no straightforward way to do it with our current feature set. We can't use envelopes because of what you mentioned, and doing the parsing in two parts while possible, is suboptimal for two reasons:
With this in mind, I think we'll have to extract the |
The SonarCloud message still shows an issue |
…:aws-powertools/powertools-lambda-typescript into feat/parser-integration-batch-processing
|
/** | ||
* Enum of supported schema types for the utility | ||
*/ | ||
const SchemaType = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const SchemaType = { | |
const SchemaVendor = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For new methods, I'd prefer creating signatures with objects rather than positional args since it makes it easier to read/understand at the call site.
For example, with the current signatures calling parseRecord
looks like: await this.parseRecord(record, this.eventType, this.schema);
which is not too bad because the args being passed are named correctly.
Calling createExtendedSchema
instead looks much more opaque: await this.createExtendedSchema(eventType, schema, false)
making it unclear what false
stands for.
If the signature was:
private async createExtendedSchema(options: {
eventType: keyof typeof EventType,
schema: StandardSchemaV1,
useTransformers: boolean
})
calling it would look like: await this.createExtendedSchema({ eventType, schema, useTransformers : false })` which is much clearer.
Also, unless the methods are public
or protected
, let's use #methodName
instead of private methodName
to make them truly private at runtime.
schema, | ||
true | ||
); | ||
return parse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using safeParse: true
in this call will make it easier to add debug logs later since right now a failed parsing exits this scope.
Similarly, for the parse
call above we could achieve the same without generating a stack trace, which can be more performant.
* | ||
* Only Zod Schemas are supported for automatic schema extension | ||
* | ||
* @param record The record to be parsed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param record The record to be parsed | |
* @param record - The record to be parsed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for the other @param
added in this diff
* If the passed schema is already an extended schema, | ||
* it directly uses the schema to parse the record | ||
* | ||
* Only Zod Schemas are supported for automatic schema extension |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use punctuation correctly throughout the docstring.
@@ -89,6 +91,10 @@ type PartialItemFailures = { itemIdentifier: string }; | |||
*/ | |||
type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; | |||
|
|||
type BasePartialBatchProcessorConfig = { | |||
schema: StandardSchemaV1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to document schema
so that customer know what it does and all its nuances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only a couple of last styling and documentation details and then we're good to merge!
return extendedSchemaParsing.data as EventSourceDataClassTypes; | ||
} | ||
// Only proceed with schema extension if it's a Zod schema | ||
if (schema['~standard'].vendor !== SchemaType.Zod) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not do this check as the very first thing in this function? We're wasting work doing the parsing if all we're going to do is throw based on information we had before we began parsing.
Summary
This PR integrates the Parser functionality with the Batch Processor so that customers can parse and validate payloads before they're passed to the record handler. It supports both the extended schemas for the SQSRecord, KinesisRecord, and DynamoDBRecord as well as the inner payload schema.
Changes
BasePartialBatchProcessor
to set the schema propertyparseRecord
method to theBatchProcessor
to do the parsing by dynamically importing the parse function and the appropriate schemaIssue number: closes #4394
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.