Skip to content

[ESM] Add configurable poll frequency and log shard info #12415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 20, 2025

Conversation

gregfurman
Copy link
Contributor

Motivation

In order to address performance shortcomings, we should allow for a user to configure their poller frequency as well as their maximum backoff on error/empty polls.

Changes

  • StreamPollers no longer back-off when a GetRecords call is empty
  • StreamPollers back-off when no shards have been initialised.
  • More debug logs have been added to show shard count.
  • Adds the following config varibales:
    • LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC -> set all poller's per-second frequency.
    • LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC -> set maximum backoff on poller exception.
    • LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC -> set maximum backoff on empty polls.

@gregfurman gregfurman added area: performance Make LocalStack go rocket-fast aws:lambda:event-source-mapping AWS Lambda Event Source Mapping (ESM) labels Mar 19, 2025
@gregfurman gregfurman added this to the 4.3 milestone Mar 19, 2025
@gregfurman gregfurman self-assigned this Mar 19, 2025
Copy link

github-actions bot commented Mar 19, 2025

S3 Image Test Results (AMD64 / ARM64)

  2 files  ±0    2 suites  ±0   9m 4s ⏱️ +12s
486 tests ±0  436 ✅ ±0   50 💤 ±0  0 ❌ ±0 
972 runs  ±0  872 ✅ ±0  100 💤 ±0  0 ❌ ±0 

Results for commit c964218. ± Comparison against base commit 00e64fd.

♻️ This comment has been updated with latest results.

Copy link

github-actions bot commented Mar 20, 2025

LocalStack Community integration with Pro

    2 files  ±0      2 suites  ±0   1h 51m 15s ⏱️ -29s
4 304 tests +2  3 983 ✅ ±0  321 💤 +2  0 ❌ ±0 
4 306 runs  +2  3 983 ✅ ±0  323 💤 +2  0 ❌ ±0 

Results for commit c964218. ± Comparison against base commit 00e64fd.

♻️ This comment has been updated with latest results.

@gregfurman gregfurman added semver: patch Non-breaking changes which can be included in patch releases semver: minor Non-breaking changes which can be included in minor releases, but not in patch releases and removed semver: patch Non-breaking changes which can be included in patch releases labels Mar 20, 2025
@gregfurman gregfurman marked this pull request as ready for review March 20, 2025 09:07
@@ -164,7 +170,7 @@ def poll_events_from_shard(self, shard_id: str, shard_iterator: str):
records = get_records_response.get("Records", [])
if not records:
self.shards[shard_id] = get_records_response["NextShardIterator"]
raise EmptyPollResultsException(service=self.event_source(), source_arn=self.source_arn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that gonna help a lot with responsiveness; good we caught this accidential delay introducer 😬

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will effectively prevent backoff when querying empty streams, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unfortunately. A GetRecords may return no records while iterating across a shard.

We should re-think how to best approach this. Like with Kinesis we have that MillisBehindLatest that is 0 when iterator is caught up. So a 0 value and no new records returned could warrant some backoff.


# INTERNAL: 60 (default)
# Maximum duration (in seconds) to wait between retries when an event source poll fails.
LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC = float(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for marking this as internal 👍

I understand that it can be helpful for quick experimentation with some customers, but we should be generally cautious making everything configurable, which introduces extra complexity. These are candidates to remove if there is no strong need in favor of choosing sensible defaults.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agree. Since these are not internal AWS behaviour though, think we should make these configurable.

Copy link
Member

@dfangl dfangl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just some minor comments

MAX_BACKOFF_POLL_ERROR_SEC: float = 60
POLL_INTERVAL_SEC: float = LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC
MAX_BACKOFF_POLL_EMPTY_SEC: float = LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC
MAX_BACKOFF_POLL_ERROR_SEC: float = LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC
Copy link
Member

@dfangl dfangl Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Importing them like this, and then assigning it to a static variable, prevents us from monkeypatching easily. We might want to do that for tests, would it be a lot of work to directly access the config from where it is used?

@@ -164,7 +170,7 @@ def poll_events_from_shard(self, shard_id: str, shard_iterator: str):
records = get_records_response.get("Records", [])
if not records:
self.shards[shard_id] = get_records_response["NextShardIterator"]
raise EmptyPollResultsException(service=self.event_source(), source_arn=self.source_arn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will effectively prevent backoff when querying empty streams, right?

@gregfurman
Copy link
Contributor Author

gregfurman commented Mar 20, 2025

@dfangl Unfortunately we can't reliably backoff when a GetRecords call is empty since this could be expected behabiour. I'll make a note of this as a comment but here's the relevant snippet from GetRecords returns an empty records array even when there is data in the stream:

Consuming, or getting records is a pull model. Developers are expected to call GetRecords in a continuous loop with no back-offs. Every call to GetRecords also returns a ShardIterator value, which must be used in the next iteration of the loop.

Which goes on to say:

[...] An empty Records element is returned under two conditions:

  1. There is no more data currently in the shard.
  2. There is no data near the part of the shard pointed to by the ShardIterator

And we can't reliably tell whether this is (1) or (2) which makes this back-off incorrect if we're doing pull-based retrieval.

However.... we could actually use the MillisBehindLatest 🤔 since:

The number of milliseconds the GetRecords response is from the tip of the stream, indicating how far behind current time the consumer is. A value of zero indicates that record processing is caught up, and there are no new records to process at this moment.

So a zero value of MillisBehindLatest with no new records should indicate we can backoff 💡

@dfangl
Copy link
Member

dfangl commented Mar 20, 2025

@gregfurman Thank you for linking the docs! Yes - the empty get records call is something we also discussed yesterday. I would like to bring up two points for this one:

  1. Against AWS this perfectly makes sense - however, did we check if kinesis-mock also has this behavior? Are we fixing a potential problem in theory, or did we actually experience the issue? To be clear, I am fine with removing this for now, and the ESM should really work against AWS as well, just asking out of interest.
  2. This will again increase the number of requests against the gateway. It is fine, since it is not higher than a couple of weeks ago, but do we have data about what improvements we actually lose this way?

Just using this PR as discussion point for this, please do not let it hold back the merge :)

@gregfurman gregfurman merged commit e70207f into master Mar 20, 2025
38 checks passed
@gregfurman gregfurman deleted the add/esm/configurable-polling branch March 20, 2025 13:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area: performance Make LocalStack go rocket-fast aws:lambda:event-source-mapping AWS Lambda Event Source Mapping (ESM) semver: minor Non-breaking changes which can be included in minor releases, but not in patch releases
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants