-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
[ESM] Add backoff between poll events calls #12304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,12 +5,19 @@ | |
from localstack.aws.api.lambda_ import ( | ||
EventSourceMappingConfiguration, | ||
) | ||
from localstack.services.lambda_.event_source_mapping.pollers.poller import Poller | ||
from localstack.services.lambda_.event_source_mapping.pollers.poller import ( | ||
EmptyPollResultsException, | ||
Poller, | ||
) | ||
from localstack.services.lambda_.invocation.models import LambdaStore, lambda_stores | ||
from localstack.services.lambda_.provider_utils import get_function_version_from_arn | ||
from localstack.utils.backoff import ExponentialBackoff | ||
from localstack.utils.threads import FuncThread | ||
|
||
POLL_INTERVAL_SEC: float = 1 | ||
MAX_BACKOFF_POLL_EMPTY_SEC: float = 10 | ||
MAX_BACKOFF_POLL_ERROR_SEC: float = 60 | ||
|
||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
|
@@ -133,13 +140,30 @@ def poller_loop(self, *args, **kwargs): | |
self.update_esm_state_in_store(EsmState.ENABLED) | ||
self.state_transition_reason = self.user_state_reason | ||
|
||
error_boff = ExponentialBackoff(initial_interval=2, max_interval=MAX_BACKOFF_POLL_ERROR_SEC) | ||
empty_boff = ExponentialBackoff(initial_interval=1, max_interval=MAX_BACKOFF_POLL_EMPTY_SEC) | ||
|
||
poll_interval_duration = POLL_INTERVAL_SEC | ||
|
||
while not self._shutdown_event.is_set(): | ||
try: | ||
self.poller.poll_events() | ||
# TODO: update state transition reason? | ||
# Wait for next short-polling interval | ||
# MAYBE: read the poller interval from self.poller if we need the flexibility | ||
self._shutdown_event.wait(POLL_INTERVAL_SEC) | ||
self.poller.poll_events() | ||
|
||
# If no exception encountered, reset the backoff | ||
error_boff.reset() | ||
empty_boff.reset() | ||
|
||
# Set the poll frequency back to the default | ||
poll_interval_duration = POLL_INTERVAL_SEC | ||
except EmptyPollResultsException as miss_ex: | ||
# If the event source is empty, backoff | ||
poll_interval_duration = empty_boff.next_backoff() | ||
LOG.debug( | ||
"The event source %s is empty. Backing off for %s seconds until next request.", | ||
miss_ex.source_arn, | ||
poll_interval_duration, | ||
) | ||
except Exception as e: | ||
LOG.error( | ||
"Error while polling messages for event source %s: %s", | ||
|
@@ -148,9 +172,10 @@ def poller_loop(self, *args, **kwargs): | |
e, | ||
exc_info=LOG.isEnabledFor(logging.DEBUG), | ||
) | ||
# TODO: implement some backoff here and stop poller upon continuous errors | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. strictly speaking, we could parity test when pollers are shut down at AWS after continous errors. However, I think the current solution with the up to 60s max error backoff is good enough (or maybe even preferrable in a dev environment to make the error visible). |
||
# Wait some time between retries to avoid running into the problem right again | ||
self._shutdown_event.wait(2) | ||
poll_interval_duration = error_boff.next_backoff() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Much nicer than static waits 👏👏👏 |
||
finally: | ||
self._shutdown_event.wait(poll_interval_duration) | ||
|
||
# Optionally closes internal components of Poller. This is a no-op for unimplemented pollers. | ||
self.poller.close() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: any particular reason why one of the two parameters is a constant and the other not :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually wanted to make these values configurable via environment variables (see this commit 451ce15)
Didn't think to make the initial interval configurable though 🫠