Skip to content

[ESM] Support Stream Poller batching #12437

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 26, 2025
Merged

Conversation

gregfurman
Copy link
Contributor

@gregfurman gregfurman commented Mar 25, 2025

Motivation

In order to reduce pressure on the LocalStack internal gateway, events being collected by ESM/Pipes pollers should be properly collected and and sent onto a target as a batch.

Changes

  • Uses new Batcher utility that allows us to collect and batch records based on an ESM's MaximumBatchingWindowInSeconds and BatchSize configuration.
  • Adds a shard_batcher dictionary to the StreamPoller that implements a batcher for each shard -- removing those batchers that do not correspond to shards each poll_event iteration.

Performance

We ran some performance tests comparing baseline LocalStack with implemented Batching and Parallelised invocations.

Batching

  • Proper batching was implemented such that we only invoke a function when a BatchSize is met or a MaximumBatchingWindowInSeconds is reached

Parallelised

  • Polling of each shard was done in parallel
  • ParallelizationFactor allowed for batches to be split by partition key and sent onto a target lambdas concurrently

Experiments

  • Batch Size = 100
  • Batch Window = 20s
  • No. Streams/Functions/ESMs = 20/20/20
  • No. Partition Keys = 2
  • No. Shards per Stream = 2
Expand for graph of average latency per `Lambda::Invoke` and `Kinesis::GetRecords`

image

Results

Batching showed improved lambda invocation performance against baseline LocalStack -- with the average latency of Invoke both:

  • Improving over baseline and parallel and;
  • Seemingly not increasing as more locust clients were added -- indicating that batching assists the scalability of adding multiple producer clients to LocalStack.

@gregfurman gregfurman added area: performance Make LocalStack go rocket-fast semver: minor Non-breaking changes which can be included in minor releases, but not in patch releases aws:lambda:event-source-mapping AWS Lambda Event Source Mapping (ESM) labels Mar 25, 2025
@gregfurman gregfurman added this to the 4.3 milestone Mar 25, 2025
@gregfurman gregfurman self-assigned this Mar 25, 2025
@gregfurman gregfurman marked this pull request as ready for review March 25, 2025 09:45
@gregfurman gregfurman requested a review from tiurin March 25, 2025 09:51
Copy link

github-actions bot commented Mar 25, 2025

LocalStack Community integration with Pro

    2 files  ±    0      2 suites  ±0   1h 30m 43s ⏱️ - 22m 56s
3 143 tests  - 1 162  2 924 ✅  - 1 060  219 💤  - 102  0 ❌ ±0 
3 145 runs   - 1 162  2 924 ✅  - 1 060  221 💤  - 102  0 ❌ ±0 

Results for commit c9ae2be. ± Comparison against base commit c6096d8.

This pull request removes 1162 tests.
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_lambda_dynamodb
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_opensearch_crud
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_search_books
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_setup
tests.aws.scenario.kinesis_firehose.test_kinesis_firehose.TestKinesisFirehoseScenario ‑ test_kinesis_firehose_s3
tests.aws.scenario.lambda_destination.test_lambda_destination_scenario.TestLambdaDestinationScenario ‑ test_destination_sns
tests.aws.scenario.lambda_destination.test_lambda_destination_scenario.TestLambdaDestinationScenario ‑ test_infra
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_prefill_dynamodb_table
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_stepfunctions_input_recipient_list[step_function_input0-SUCCEEDED]
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_stepfunctions_input_recipient_list[step_function_input1-SUCCEEDED]
…

♻️ This comment has been updated with latest results.

Copy link
Member

@joe4dev joe4dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding support for MaximumBatchingWindowInSeconds is a great addition that can help to reduce unnecessary Lambda invocations with small batch sizes.

Nice work 👏👏 . I like how clean it looks with the batcher abstraction. I still think we need to re-think polling around shard management to enable more frequent or adaptive get_records calls. This PR is a good step in the right direction 📈.

The benchmark results are not very clear to me. What does improving over the baseline mean? fewer Lambda invokes due to batching?

My main question is around frequent logging and shard batch synchronization.

Can we trigger an ext run to ensure all Pipe tests are 🟢 ?

@@ -140,6 +155,9 @@ def poll_events(self):
raise EmptyPollResultsException(service=self.event_source(), source_arn=self.source_arn)
else:
LOG.debug("Event source %s has %d shards.", self.source_arn, len(self.shards))
# Remove all shard batchers without corresponding shards
for shard_id in self.shard_batcher.keys() - self.shards.keys():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to log and synchronize the shard batchers upon every poll?
Wouldn't it be sufficient to do that after after shard (re-)initialization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily but since the re-initialization method is abstract, and defined in the child classes, I thought that this would be the cleanest way of syncing up the batcher.

We could add this after every self.initialize_shard() call as well? Else we can change the initialize_shard from being abstract to always do this re-sync (and log).

# If there is overflow (i.e 1k BatchSize and 1.2K returned in flush), further split up the batch.
for batch in batched(collected_records, self.stream_parameters.get("BatchSize")):
# This could potentially lead to data loss if forward_events_to_target raises an exception after a flush
# which would otherwise be solved with checkpointing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a TODO with an actionable suggestion on how to fix this (i.e., implement checkpointing)?

💡 The KCL concepts page could give some hints/insights on how to design a robust stream poller.

# which would otherwise be solved with checkpointing.
self.forward_events_to_target(shard_id, next_shard_iterator, batch)

def forward_events_to_target(self, shard_id, next_shard_iterator, records):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good refactoring 👍

@@ -165,18 +183,31 @@ def poll_events(self):
pass

def poll_events_from_shard(self, shard_id: str, shard_iterator: str):
abort_condition = None
get_records_response = self.get_records(shard_iterator)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Follow up (out of scope): Moving forward, we might need to think about a way how call get_records multiple times and take the IteratorAge into consideration without waiting for the inter-poll interval.

@gregfurman
Copy link
Contributor Author

The benchmark results are not very clear to me. What does improving over the baseline mean? fewer Lambda invokes due to batching?

If you look at the operation="Invoke" part of the figure, the processing time of each Lambda::invoke request is consistently less than HEAD of master. There are both fewer invokes as well as each invoke completing faster. Does that explain it a bit better?

Base automatically changed from add/util/batch-policy to master March 25, 2025 18:43
@gregfurman gregfurman force-pushed the add/esm/batch-stream-poller branch from b9efaef to 86a7905 Compare March 25, 2025 18:49
@gregfurman gregfurman merged commit e19a602 into master Mar 26, 2025
31 checks passed
@gregfurman gregfurman deleted the add/esm/batch-stream-poller branch March 26, 2025 08:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: performance Make LocalStack go rocket-fast aws:lambda:event-source-mapping AWS Lambda Event Source Mapping (ESM) semver: minor Non-breaking changes which can be included in minor releases, but not in patch releases
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants