-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
[ESM] Add configurable poll frequency and log shard info #12415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -164,7 +170,7 @@ def poll_events_from_shard(self, shard_id: str, shard_iterator: str): | |||
records = get_records_response.get("Records", []) | |||
if not records: | |||
self.shards[shard_id] = get_records_response["NextShardIterator"] | |||
raise EmptyPollResultsException(service=self.event_source(), source_arn=self.source_arn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that gonna help a lot with responsiveness; good we caught this accidential delay introducer 😬
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will effectively prevent backoff when querying empty streams, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah unfortunately. A GetRecords may return no records while iterating across a shard.
We should re-think how to best approach this. Like with Kinesis we have that MillisBehindLatest that is 0 when iterator is caught up. So a 0
value and no new records returned could warrant some backoff.
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py
Show resolved
Hide resolved
|
||
# INTERNAL: 60 (default) | ||
# Maximum duration (in seconds) to wait between retries when an event source poll fails. | ||
LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC = float( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for marking this as internal 👍
I understand that it can be helpful for quick experimentation with some customers, but we should be generally cautious making everything configurable, which introduces extra complexity. These are candidates to remove if there is no strong need in favor of choosing sensible defaults.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agree. Since these are not internal AWS behaviour though, think we should make these configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just some minor comments
MAX_BACKOFF_POLL_ERROR_SEC: float = 60 | ||
POLL_INTERVAL_SEC: float = LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC | ||
MAX_BACKOFF_POLL_EMPTY_SEC: float = LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC | ||
MAX_BACKOFF_POLL_ERROR_SEC: float = LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Importing them like this, and then assigning it to a static variable, prevents us from monkeypatching easily. We might want to do that for tests, would it be a lot of work to directly access the config from where it is used?
@@ -164,7 +170,7 @@ def poll_events_from_shard(self, shard_id: str, shard_iterator: str): | |||
records = get_records_response.get("Records", []) | |||
if not records: | |||
self.shards[shard_id] = get_records_response["NextShardIterator"] | |||
raise EmptyPollResultsException(service=self.event_source(), source_arn=self.source_arn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will effectively prevent backoff when querying empty streams, right?
@dfangl Unfortunately we can't reliably backoff when a
Which goes on to say:
And we can't reliably tell whether this is (1) or (2) which makes this back-off incorrect if we're doing pull-based retrieval. However.... we could actually use the MillisBehindLatest 🤔 since:
So a zero value of |
@gregfurman Thank you for linking the docs! Yes - the empty get records call is something we also discussed yesterday. I would like to bring up two points for this one:
Just using this PR as discussion point for this, please do not let it hold back the merge :) |
Motivation
In order to address performance shortcomings, we should allow for a user to configure their poller frequency as well as their maximum backoff on error/empty polls.
Changes
StreamPollers
no longer back-off when aGetRecords
call is emptyStreamPollers
back-off when no shards have been initialised.LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC
-> set all poller's per-second frequency.LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC
-> set maximum backoff on poller exception.LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC
-> set maximum backoff on empty polls.