Skip to content

Commit e70207f

Browse files
authored
[ESM] Add configurable poll frequency and log shard info (#12415)
1 parent 310acf4 commit e70207f

File tree

3 files changed

+42
-10
lines changed

3 files changed

+42
-10
lines changed

localstack-core/localstack/config.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,24 @@ def populate_edge_configuration(
10711071
# DEV: sbx_user1051 (default when not provided) Alternative system user or empty string to skip dropping privileges.
10721072
LAMBDA_INIT_USER = os.environ.get("LAMBDA_INIT_USER")
10731073

1074+
# INTERNAL: 1 (default)
1075+
# The duration (in seconds) to wait between each poll call to an event source.
1076+
LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC = float(
1077+
os.environ.get("LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC") or 1
1078+
)
1079+
1080+
# INTERNAL: 60 (default)
1081+
# Maximum duration (in seconds) to wait between retries when an event source poll fails.
1082+
LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC = float(
1083+
os.environ.get("LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC") or 60
1084+
)
1085+
1086+
# INTERNAL: 10 (default)
1087+
# Maximum duration (in seconds) to wait between polls when an event source returns empty results.
1088+
LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC = float(
1089+
os.environ.get("LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC") or 10
1090+
)
1091+
10741092
# Adding Stepfunctions default port
10751093
LOCAL_PORT_STEPFUNCTIONS = int(os.environ.get("LOCAL_PORT_STEPFUNCTIONS") or 8083)
10761094
# Stepfunctions lambda endpoint override

localstack-core/localstack/services/lambda_/event_source_mapping/esm_worker.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
from localstack.aws.api.lambda_ import (
66
EventSourceMappingConfiguration,
77
)
8+
from localstack.config import (
9+
LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC,
10+
LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC,
11+
LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC,
12+
)
813
from localstack.services.lambda_.event_source_mapping.pollers.poller import (
914
EmptyPollResultsException,
1015
Poller,
@@ -14,11 +19,6 @@
1419
from localstack.utils.backoff import ExponentialBackoff
1520
from localstack.utils.threads import FuncThread
1621

17-
POLL_INTERVAL_SEC: float = 1
18-
MAX_BACKOFF_POLL_EMPTY_SEC: float = 10
19-
MAX_BACKOFF_POLL_ERROR_SEC: float = 60
20-
21-
2222
LOG = logging.getLogger(__name__)
2323

2424

@@ -144,10 +144,15 @@ def poller_loop(self, *args, **kwargs):
144144
self.update_esm_state_in_store(EsmState.ENABLED)
145145
self.state_transition_reason = self.user_state_reason
146146

147-
error_boff = ExponentialBackoff(initial_interval=2, max_interval=MAX_BACKOFF_POLL_ERROR_SEC)
148-
empty_boff = ExponentialBackoff(initial_interval=1, max_interval=MAX_BACKOFF_POLL_EMPTY_SEC)
147+
error_boff = ExponentialBackoff(
148+
initial_interval=2, max_interval=LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC
149+
)
150+
empty_boff = ExponentialBackoff(
151+
initial_interval=1,
152+
max_interval=LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_EMPTY_POLL_SEC,
153+
)
149154

150-
poll_interval_duration = POLL_INTERVAL_SEC
155+
poll_interval_duration = LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC
151156

152157
while not self._shutdown_event.is_set():
153158
try:
@@ -159,7 +164,7 @@ def poller_loop(self, *args, **kwargs):
159164
empty_boff.reset()
160165

161166
# Set the poll frequency back to the default
162-
poll_interval_duration = POLL_INTERVAL_SEC
167+
poll_interval_duration = LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC
163168
except EmptyPollResultsException as miss_ex:
164169
# If the event source is empty, backoff
165170
poll_interval_duration = empty_boff.next_backoff()

localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ def poll_events(self):
135135
if not self.shards:
136136
self.shards = self.initialize_shards()
137137

138+
if not self.shards:
139+
LOG.debug("No shards found for %s.", self.source_arn)
140+
raise EmptyPollResultsException(service=self.event_source(), source_arn=self.source_arn)
141+
else:
142+
LOG.debug("Event source %s has %d shards.", self.source_arn, len(self.shards))
143+
138144
# TODO: improve efficiency because this currently limits the throughput to at most batch size per poll interval
139145
# Handle shards round-robin. Re-initialize current shard iterator once all shards are handled.
140146
if self.iterator_over_shards is None:
@@ -163,8 +169,11 @@ def poll_events_from_shard(self, shard_id: str, shard_iterator: str):
163169
get_records_response = self.get_records(shard_iterator)
164170
records = get_records_response.get("Records", [])
165171
if not records:
172+
# We cannot reliably back-off when no records found since an iterator
173+
# may have to move multiple times until records are returned.
174+
# See https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#getrecords-returns-empty
166175
self.shards[shard_id] = get_records_response["NextShardIterator"]
167-
raise EmptyPollResultsException(service=self.event_source(), source_arn=self.source_arn)
176+
return
168177

169178
polled_events = self.transform_into_events(records, shard_id)
170179

0 commit comments

Comments
 (0)