diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py b/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py index 4287656b20581..abaf4dbbdbb8c 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py @@ -5,12 +5,19 @@ from localstack.aws.api.lambda_ import ( EventSourceMappingConfiguration, ) -from localstack.services.lambda_.event_source_mapping.pollers.poller import Poller +from localstack.services.lambda_.event_source_mapping.pollers.poller import ( + EmptyPollResultsException, + Poller, +) from localstack.services.lambda_.invocation.models import LambdaStore, lambda_stores from localstack.services.lambda_.provider_utils import get_function_version_from_arn +from localstack.utils.backoff import ExponentialBackoff from localstack.utils.threads import FuncThread POLL_INTERVAL_SEC: float = 1 +MAX_BACKOFF_POLL_EMPTY_SEC: float = 10 +MAX_BACKOFF_POLL_ERROR_SEC: float = 60 + LOG = logging.getLogger(__name__) @@ -133,13 +140,30 @@ def poller_loop(self, *args, **kwargs): self.update_esm_state_in_store(EsmState.ENABLED) self.state_transition_reason = self.user_state_reason + error_boff = ExponentialBackoff(initial_interval=2, max_interval=MAX_BACKOFF_POLL_ERROR_SEC) + empty_boff = ExponentialBackoff(initial_interval=1, max_interval=MAX_BACKOFF_POLL_EMPTY_SEC) + + poll_interval_duration = POLL_INTERVAL_SEC + while not self._shutdown_event.is_set(): try: - self.poller.poll_events() # TODO: update state transition reason? - # Wait for next short-polling interval - # MAYBE: read the poller interval from self.poller if we need the flexibility - self._shutdown_event.wait(POLL_INTERVAL_SEC) + self.poller.poll_events() + + # If no exception encountered, reset the backoff + error_boff.reset() + empty_boff.reset() + + # Set the poll frequency back to the default + poll_interval_duration = POLL_INTERVAL_SEC + except EmptyPollResultsException as miss_ex: + # If the event source is empty, backoff + poll_interval_duration = empty_boff.next_backoff() + LOG.debug( + "The event source %s is empty. Backing off for %s seconds until next request.", + miss_ex.source_arn, + poll_interval_duration, + ) except Exception as e: LOG.error( "Error while polling messages for event source %s: %s", @@ -148,9 +172,10 @@ def poller_loop(self, *args, **kwargs): e, exc_info=LOG.isEnabledFor(logging.DEBUG), ) - # TODO: implement some backoff here and stop poller upon continuous errors # Wait some time between retries to avoid running into the problem right again - self._shutdown_event.wait(2) + poll_interval_duration = error_boff.next_backoff() + finally: + self._shutdown_event.wait(poll_interval_duration) # Optionally closes internal components of Poller. This is a no-op for unimplemented pollers. self.poller.close() diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py index 272804f6f5a3d..d968d138eb9b7 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py @@ -15,6 +15,15 @@ from localstack.utils.event_matcher import matches_event +class EmptyPollResultsException(Exception): + service: str + source_arn: str + + def __init__(self, service: str = "", source_arn: str = ""): + self.service = service + self.source_arn = source_arn + + class PipeStateReasonValues(PipeStateReason): USER_INITIATED = "USER_INITIATED" NO_RECORDS_PROCESSED = "No records processed" diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py index be11ddc05c624..3799fb3bc612e 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py @@ -13,6 +13,7 @@ PartialBatchFailureError, ) from localstack.services.lambda_.event_source_mapping.pollers.poller import ( + EmptyPollResultsException, Poller, parse_batch_item_failures, ) @@ -131,6 +132,8 @@ def poll_events(self) -> None: e, exc_info=LOG.isEnabledFor(logging.DEBUG), ) + else: + raise EmptyPollResultsException(service="sqs", source_arn=self.source_arn) def handle_messages(self, messages): polled_events = transform_into_events(messages) diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py index b95d699ff6a05..7c40f563ef2f1 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py @@ -24,6 +24,7 @@ get_internal_client, ) from localstack.services.lambda_.event_source_mapping.pollers.poller import ( + EmptyPollResultsException, Poller, get_batch_item_failures, ) @@ -157,6 +158,9 @@ def poll_events_from_shard(self, shard_id: str, shard_iterator: str): get_records_response = self.get_records(shard_iterator) records = get_records_response["Records"] polled_events = self.transform_into_events(records, shard_id) + if not polled_events: + raise EmptyPollResultsException(service=self.event_source, source_arn=self.source_arn) + # Check MaximumRecordAgeInSeconds if maximum_record_age_in_seconds := self.stream_parameters.get("MaximumRecordAgeInSeconds"): arrival_timestamp_of_last_event = polled_events[-1]["approximateArrivalTimestamp"]