-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
[ESM] Support Stream Poller batching #12437
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
LocalStack Community integration with Pro 2 files ± 0 2 suites ±0 1h 30m 43s ⏱️ - 22m 56s Results for commit c9ae2be. ± Comparison against base commit c6096d8. This pull request removes 1162 tests.
♻️ This comment has been updated with latest results. |
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding support for MaximumBatchingWindowInSeconds
is a great addition that can help to reduce unnecessary Lambda invocations with small batch sizes.
Nice work 👏👏 . I like how clean it looks with the batcher abstraction. I still think we need to re-think polling around shard management to enable more frequent or adaptive get_records
calls. This PR is a good step in the right direction 📈.
The benchmark results are not very clear to me. What does improving over the baseline mean? fewer Lambda invokes due to batching?
My main question is around frequent logging and shard batch synchronization.
Can we trigger an ext run to ensure all Pipe tests are 🟢 ?
@@ -140,6 +155,9 @@ def poll_events(self): | |||
raise EmptyPollResultsException(service=self.event_source(), source_arn=self.source_arn) | |||
else: | |||
LOG.debug("Event source %s has %d shards.", self.source_arn, len(self.shards)) | |||
# Remove all shard batchers without corresponding shards | |||
for shard_id in self.shard_batcher.keys() - self.shards.keys(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to log and synchronize the shard batchers upon every poll?
Wouldn't it be sufficient to do that after after shard (re-)initialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily but since the re-initialization method is abstract, and defined in the child classes, I thought that this would be the cleanest way of syncing up the batcher.
We could add this after every self.initialize_shard()
call as well? Else we can change the initialize_shard
from being abstract
to always do this re-sync (and log).
# If there is overflow (i.e 1k BatchSize and 1.2K returned in flush), further split up the batch. | ||
for batch in batched(collected_records, self.stream_parameters.get("BatchSize")): | ||
# This could potentially lead to data loss if forward_events_to_target raises an exception after a flush | ||
# which would otherwise be solved with checkpointing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a TODO with an actionable suggestion on how to fix this (i.e., implement checkpointing)?
💡 The KCL concepts page could give some hints/insights on how to design a robust stream poller.
# which would otherwise be solved with checkpointing. | ||
self.forward_events_to_target(shard_id, next_shard_iterator, batch) | ||
|
||
def forward_events_to_target(self, shard_id, next_shard_iterator, records): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good refactoring 👍
@@ -165,18 +183,31 @@ def poll_events(self): | |||
pass | |||
|
|||
def poll_events_from_shard(self, shard_id: str, shard_iterator: str): | |||
abort_condition = None | |||
get_records_response = self.get_records(shard_iterator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Follow up (out of scope): Moving forward, we might need to think about a way how call get_records
multiple times and take the IteratorAge
into consideration without waiting for the inter-poll interval.
If you look at the |
b9efaef
to
86a7905
Compare
Motivation
In order to reduce pressure on the LocalStack internal gateway, events being collected by ESM/Pipes pollers should be properly collected and and sent onto a target as a batch.
Changes
Batcher
utility that allows us to collect and batch records based on an ESM'sMaximumBatchingWindowInSeconds
andBatchSize
configuration.shard_batcher
dictionary to theStreamPoller
that implements a batcher for each shard -- removing those batchers that do not correspond to shards each poll_event iteration.Performance
We ran some performance tests comparing baseline LocalStack with implemented Batching and Parallelised invocations.
Batching
BatchSize
is met or aMaximumBatchingWindowInSeconds
is reachedParallelised
ParallelizationFactor
allowed for batches to be split by partition key and sent onto a target lambdas concurrentlyExperiments
Expand for graph of average latency per `Lambda::Invoke` and `Kinesis::GetRecords`
Results
Batching showed improved lambda invocation performance against baseline LocalStack -- with the average latency of
Invoke
both: