From d3e2c0e26c7627002cd65f0b0e9f0a6efe83d4dc Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Wed, 19 Feb 2025 17:55:44 +0200 Subject: [PATCH 1/3] [ESM] Add backoff between ESM worker poll_events calls --- localstack-core/localstack/config.py | 18 +++++++ .../event_source_mapping/esm_worker.py | 47 +++++++++++++++---- .../event_source_mapping/pollers/poller.py | 8 ++++ .../pollers/sqs_poller.py | 7 +++ .../pollers/stream_poller.py | 4 ++ 5 files changed, 76 insertions(+), 8 deletions(-) diff --git a/localstack-core/localstack/config.py b/localstack-core/localstack/config.py index a063abb1213c9..9327053274a18 100644 --- a/localstack-core/localstack/config.py +++ b/localstack-core/localstack/config.py @@ -1071,6 +1071,24 @@ def populate_edge_configuration( # DEV: sbx_user1051 (default when not provided) Alternative system user or empty string to skip dropping privileges. LAMBDA_INIT_USER = os.environ.get("LAMBDA_INIT_USER") +# INTERNAL: 1 (default) +# The duration (in seconds) to wait between each poll call to an event source. +LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC = float( + os.environ.get("LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC") or 1 +) + +# INTERNAL: 60 (default) +# Maximum duration (in seconds) to wait between retries when an event source poll fails. +LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC = float( + os.environ.get("LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC") or 60 +) + +# INTERNAL: 10 (default) +# Maximum duration (in seconds) to wait between polls when an event source returns empty results. +LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC = float( + os.environ.get("LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC") or 10 +) + # Adding Stepfunctions default port LOCAL_PORT_STEPFUNCTIONS = int(os.environ.get("LOCAL_PORT_STEPFUNCTIONS") or 8083) # Stepfunctions lambda endpoint override diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py b/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py index 4287656b20581..52cb89764bcac 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py @@ -5,12 +5,24 @@ from localstack.aws.api.lambda_ import ( EventSourceMappingConfiguration, ) -from localstack.services.lambda_.event_source_mapping.pollers.poller import Poller +from localstack.config import ( + LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC, + LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC, + LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC, +) +from localstack.services.lambda_.event_source_mapping.pollers.poller import ( + EmptyPollResultsException, + Poller, +) from localstack.services.lambda_.invocation.models import LambdaStore, lambda_stores from localstack.services.lambda_.provider_utils import get_function_version_from_arn +from localstack.utils.backoff import ExponentialBackoff from localstack.utils.threads import FuncThread -POLL_INTERVAL_SEC: float = 1 +POLL_INTERVAL_SEC: float = LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC +MAX_BACKOFF_POLL_EMPTY_SEC: float = LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC +MAX_BACKOFF_POLL_ERROR_SEC: float = LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC + LOG = logging.getLogger(__name__) @@ -133,13 +145,31 @@ def poller_loop(self, *args, **kwargs): self.update_esm_state_in_store(EsmState.ENABLED) self.state_transition_reason = self.user_state_reason + error_boff = ExponentialBackoff(initial_interval=2, max_interval=MAX_BACKOFF_POLL_ERROR_SEC) + empty_boff = ExponentialBackoff(initial_interval=1, max_interval=MAX_BACKOFF_POLL_EMPTY_SEC) + + poll_interval_duration = POLL_INTERVAL_SEC + while not self._shutdown_event.is_set(): try: - self.poller.poll_events() # TODO: update state transition reason? - # Wait for next short-polling interval - # MAYBE: read the poller interval from self.poller if we need the flexibility - self._shutdown_event.wait(POLL_INTERVAL_SEC) + self.poller.poll_events() + + # If no exception encountered, reset the backoff + error_boff.reset() + empty_boff.reset() + + # Set the poll frequency back to the default + poll_interval_duration = POLL_INTERVAL_SEC + except EmptyPollResultsException as miss_ex: + # If the event source is empty, backoff + poll_miss_boff_duration = empty_boff.next_backoff() + LOG.debug( + "The event source %s is empty. Backing off for for %s seconds until next request.", + miss_ex.source_arn, + poll_miss_boff_duration, + ) + poll_interval_duration = empty_boff.next_backoff() except Exception as e: LOG.error( "Error while polling messages for event source %s: %s", @@ -148,9 +178,10 @@ def poller_loop(self, *args, **kwargs): e, exc_info=LOG.isEnabledFor(logging.DEBUG), ) - # TODO: implement some backoff here and stop poller upon continuous errors # Wait some time between retries to avoid running into the problem right again - self._shutdown_event.wait(2) + poll_interval_duration = error_boff.next_backoff() + finally: + self._shutdown_event.wait(poll_interval_duration) # Optionally closes internal components of Poller. This is a no-op for unimplemented pollers. self.poller.close() diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py index 272804f6f5a3d..6ed781716b162 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py @@ -15,6 +15,14 @@ from localstack.utils.event_matcher import matches_event +class EmptyPollResultsException(Exception): + service: str + source_arn: str + + def __init__(self, *args, service: str = "", source_arn: str = ""): + super(EmptyPollResultsException, self).__init__(*args) + + class PipeStateReasonValues(PipeStateReason): USER_INITIATED = "USER_INITIATED" NO_RECORDS_PROCESSED = "No records processed" diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py index be11ddc05c624..0d2339136f830 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py @@ -13,6 +13,7 @@ PartialBatchFailureError, ) from localstack.services.lambda_.event_source_mapping.pollers.poller import ( + EmptyPollResultsException, Poller, parse_batch_item_failures, ) @@ -131,6 +132,12 @@ def poll_events(self) -> None: e, exc_info=LOG.isEnabledFor(logging.DEBUG), ) + else: + raise EmptyPollResultsException( + "No results found in poll call to SQS", + service="sqs", + source_arn=self.source_arn, + ) def handle_messages(self, messages): polled_events = transform_into_events(messages) diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py index b95d699ff6a05..7c40f563ef2f1 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py @@ -24,6 +24,7 @@ get_internal_client, ) from localstack.services.lambda_.event_source_mapping.pollers.poller import ( + EmptyPollResultsException, Poller, get_batch_item_failures, ) @@ -157,6 +158,9 @@ def poll_events_from_shard(self, shard_id: str, shard_iterator: str): get_records_response = self.get_records(shard_iterator) records = get_records_response["Records"] polled_events = self.transform_into_events(records, shard_id) + if not polled_events: + raise EmptyPollResultsException(service=self.event_source, source_arn=self.source_arn) + # Check MaximumRecordAgeInSeconds if maximum_record_age_in_seconds := self.stream_parameters.get("MaximumRecordAgeInSeconds"): arrival_timestamp_of_last_event = polled_events[-1]["approximateArrivalTimestamp"] From 451ce151b33c1ee678121498bb377eecb6b67c49 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Tue, 25 Feb 2025 14:57:42 +0200 Subject: [PATCH 2/3] [ESM] Remove configurable polling frequency --- localstack-core/localstack/config.py | 18 ------------------ .../lambda_/event_source_mapping/esm_worker.py | 11 +++-------- 2 files changed, 3 insertions(+), 26 deletions(-) diff --git a/localstack-core/localstack/config.py b/localstack-core/localstack/config.py index 9327053274a18..a063abb1213c9 100644 --- a/localstack-core/localstack/config.py +++ b/localstack-core/localstack/config.py @@ -1071,24 +1071,6 @@ def populate_edge_configuration( # DEV: sbx_user1051 (default when not provided) Alternative system user or empty string to skip dropping privileges. LAMBDA_INIT_USER = os.environ.get("LAMBDA_INIT_USER") -# INTERNAL: 1 (default) -# The duration (in seconds) to wait between each poll call to an event source. -LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC = float( - os.environ.get("LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC") or 1 -) - -# INTERNAL: 60 (default) -# Maximum duration (in seconds) to wait between retries when an event source poll fails. -LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC = float( - os.environ.get("LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC") or 60 -) - -# INTERNAL: 10 (default) -# Maximum duration (in seconds) to wait between polls when an event source returns empty results. -LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC = float( - os.environ.get("LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC") or 10 -) - # Adding Stepfunctions default port LOCAL_PORT_STEPFUNCTIONS = int(os.environ.get("LOCAL_PORT_STEPFUNCTIONS") or 8083) # Stepfunctions lambda endpoint override diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py b/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py index 52cb89764bcac..680774738340d 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py @@ -5,11 +5,6 @@ from localstack.aws.api.lambda_ import ( EventSourceMappingConfiguration, ) -from localstack.config import ( - LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC, - LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC, - LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC, -) from localstack.services.lambda_.event_source_mapping.pollers.poller import ( EmptyPollResultsException, Poller, @@ -19,9 +14,9 @@ from localstack.utils.backoff import ExponentialBackoff from localstack.utils.threads import FuncThread -POLL_INTERVAL_SEC: float = LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC -MAX_BACKOFF_POLL_EMPTY_SEC: float = LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC -MAX_BACKOFF_POLL_ERROR_SEC: float = LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC +POLL_INTERVAL_SEC: float = 1 +MAX_BACKOFF_POLL_EMPTY_SEC: float = 10 +MAX_BACKOFF_POLL_ERROR_SEC: float = 60 LOG = logging.getLogger(__name__) From fc602004dab82a4b9fbb187e25f73842020ad6e6 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Tue, 25 Feb 2025 19:15:41 +0200 Subject: [PATCH 3/3] fix: passing in attributes to exception --- .../services/lambda_/event_source_mapping/esm_worker.py | 7 +++---- .../lambda_/event_source_mapping/pollers/poller.py | 5 +++-- .../lambda_/event_source_mapping/pollers/sqs_poller.py | 6 +----- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py b/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py index 680774738340d..abaf4dbbdbb8c 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py @@ -158,13 +158,12 @@ def poller_loop(self, *args, **kwargs): poll_interval_duration = POLL_INTERVAL_SEC except EmptyPollResultsException as miss_ex: # If the event source is empty, backoff - poll_miss_boff_duration = empty_boff.next_backoff() + poll_interval_duration = empty_boff.next_backoff() LOG.debug( - "The event source %s is empty. Backing off for for %s seconds until next request.", + "The event source %s is empty. Backing off for %s seconds until next request.", miss_ex.source_arn, - poll_miss_boff_duration, + poll_interval_duration, ) - poll_interval_duration = empty_boff.next_backoff() except Exception as e: LOG.error( "Error while polling messages for event source %s: %s", diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py index 6ed781716b162..d968d138eb9b7 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/poller.py @@ -19,8 +19,9 @@ class EmptyPollResultsException(Exception): service: str source_arn: str - def __init__(self, *args, service: str = "", source_arn: str = ""): - super(EmptyPollResultsException, self).__init__(*args) + def __init__(self, service: str = "", source_arn: str = ""): + self.service = service + self.source_arn = source_arn class PipeStateReasonValues(PipeStateReason): diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py index 0d2339136f830..3799fb3bc612e 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py @@ -133,11 +133,7 @@ def poll_events(self) -> None: exc_info=LOG.isEnabledFor(logging.DEBUG), ) else: - raise EmptyPollResultsException( - "No results found in poll call to SQS", - service="sqs", - source_arn=self.source_arn, - ) + raise EmptyPollResultsException(service="sqs", source_arn=self.source_arn) def handle_messages(self, messages): polled_events = transform_into_events(messages)