Skip to content

[ESM] Add backoff between poll events calls #12304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@
from localstack.aws.api.lambda_ import (
EventSourceMappingConfiguration,
)
from localstack.services.lambda_.event_source_mapping.pollers.poller import Poller
from localstack.services.lambda_.event_source_mapping.pollers.poller import (
EmptyPollResultsException,
Poller,
)
from localstack.services.lambda_.invocation.models import LambdaStore, lambda_stores
from localstack.services.lambda_.provider_utils import get_function_version_from_arn
from localstack.utils.backoff import ExponentialBackoff
from localstack.utils.threads import FuncThread

POLL_INTERVAL_SEC: float = 1
MAX_BACKOFF_POLL_EMPTY_SEC: float = 10
MAX_BACKOFF_POLL_ERROR_SEC: float = 60


LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -133,13 +140,30 @@ def poller_loop(self, *args, **kwargs):
self.update_esm_state_in_store(EsmState.ENABLED)
self.state_transition_reason = self.user_state_reason

error_boff = ExponentialBackoff(initial_interval=2, max_interval=MAX_BACKOFF_POLL_ERROR_SEC)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: any particular reason why one of the two parameters is a constant and the other not :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually wanted to make these values configurable via environment variables (see this commit 451ce15)

Didn't think to make the initial interval configurable though 🫠

empty_boff = ExponentialBackoff(initial_interval=1, max_interval=MAX_BACKOFF_POLL_EMPTY_SEC)

poll_interval_duration = POLL_INTERVAL_SEC

while not self._shutdown_event.is_set():
try:
self.poller.poll_events()
# TODO: update state transition reason?
# Wait for next short-polling interval
# MAYBE: read the poller interval from self.poller if we need the flexibility
self._shutdown_event.wait(POLL_INTERVAL_SEC)
self.poller.poll_events()

# If no exception encountered, reset the backoff
error_boff.reset()
empty_boff.reset()

# Set the poll frequency back to the default
poll_interval_duration = POLL_INTERVAL_SEC
except EmptyPollResultsException as miss_ex:
# If the event source is empty, backoff
poll_interval_duration = empty_boff.next_backoff()
LOG.debug(
"The event source %s is empty. Backing off for %s seconds until next request.",
miss_ex.source_arn,
poll_interval_duration,
)
except Exception as e:
LOG.error(
"Error while polling messages for event source %s: %s",
Expand All @@ -148,9 +172,10 @@ def poller_loop(self, *args, **kwargs):
e,
exc_info=LOG.isEnabledFor(logging.DEBUG),
)
# TODO: implement some backoff here and stop poller upon continuous errors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strictly speaking, we could parity test when pollers are shut down at AWS after continous errors. However, I think the current solution with the up to 60s max error backoff is good enough (or maybe even preferrable in a dev environment to make the error visible).

# Wait some time between retries to avoid running into the problem right again
self._shutdown_event.wait(2)
poll_interval_duration = error_boff.next_backoff()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much nicer than static waits 👏👏👏

finally:
self._shutdown_event.wait(poll_interval_duration)

# Optionally closes internal components of Poller. This is a no-op for unimplemented pollers.
self.poller.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@
from localstack.utils.event_matcher import matches_event


class EmptyPollResultsException(Exception):
service: str
source_arn: str

def __init__(self, service: str = "", source_arn: str = ""):
self.service = service
self.source_arn = source_arn


class PipeStateReasonValues(PipeStateReason):
USER_INITIATED = "USER_INITIATED"
NO_RECORDS_PROCESSED = "No records processed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
PartialBatchFailureError,
)
from localstack.services.lambda_.event_source_mapping.pollers.poller import (
EmptyPollResultsException,
Poller,
parse_batch_item_failures,
)
Expand Down Expand Up @@ -131,6 +132,8 @@ def poll_events(self) -> None:
e,
exc_info=LOG.isEnabledFor(logging.DEBUG),
)
else:
raise EmptyPollResultsException(service="sqs", source_arn=self.source_arn)

def handle_messages(self, messages):
polled_events = transform_into_events(messages)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
get_internal_client,
)
from localstack.services.lambda_.event_source_mapping.pollers.poller import (
EmptyPollResultsException,
Poller,
get_batch_item_failures,
)
Expand Down Expand Up @@ -157,6 +158,9 @@ def poll_events_from_shard(self, shard_id: str, shard_iterator: str):
get_records_response = self.get_records(shard_iterator)
records = get_records_response["Records"]
polled_events = self.transform_into_events(records, shard_id)
if not polled_events:
raise EmptyPollResultsException(service=self.event_source, source_arn=self.source_arn)

# Check MaximumRecordAgeInSeconds
if maximum_record_age_in_seconds := self.stream_parameters.get("MaximumRecordAgeInSeconds"):
arrival_timestamp_of_last_event = polled_events[-1]["approximateArrivalTimestamp"]
Expand Down
Loading