From 81174f6a7ab08b8c206ad381ff9212ea2a3499d0 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Fri, 13 Dec 2024 19:30:06 +0200 Subject: [PATCH 1/6] [ESM] Handle DynamoDB-local Invalid ShardID exception --- .../pollers/stream_poller.py | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py index c1f8af9556e7f..b7bbeaf4f0c50 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py @@ -283,12 +283,21 @@ def get_records(self, shard_iterator: str) -> dict: ) raise CustomerInvocationError from e elif "ResourceNotFoundException" in str(e): - LOG.warning( - "Source stream %s does not exist: %s", - self.source_arn, - e, - ) - raise CustomerInvocationError from e + # FIXME: The 'Invalid ShardId in ShardIterator' error is returned by DynamoDB-local. Unsure when/why this is returned. + # + if "Invalid ShardId in ShardIterator" in str(e): + LOG.warning( + "Invalid ShardId in ShardIterator for %s. Re-initializing shards.", + self.source_arn, + ) + self.initialize_shards() + else: + LOG.warning( + "Source stream %s does not exist: %s", + self.source_arn, + e, + ) + raise CustomerInvocationError from e else: LOG.debug("ClientError during get_records for stream %s: %s", self.source_arn, e) raise PipeInternalError from e From 125d5d956b17262f54c85d97caebbe120761438b Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Fri, 13 Dec 2024 20:44:51 +0200 Subject: [PATCH 2/6] [ESM] Catch TrimmedDataAccessException --- .../lambda_/event_source_mapping/pollers/stream_poller.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py index b7bbeaf4f0c50..779e86c73b492 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py @@ -264,7 +264,10 @@ def get_records(self, shard_iterator: str) -> dict: ) return get_records_response # TODO: test iterator expired with conditional error scenario (requires failure destinations) - except self.source_client.exceptions.ExpiredIteratorException as e: + except ( + self.source_client.exceptions.ExpiredIteratorException, + self.source_client.exceptions.TrimmedDataAccessException, + ) as e: LOG.debug( "Shard iterator %s expired for stream %s, re-initializing shards", shard_iterator, From d5ef268f80a757d11760ce37e4b2f97b99ca0711 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Fri, 13 Dec 2024 21:05:26 +0200 Subject: [PATCH 3/6] Handle TrimmedDataAccessException in DynamoDBStreams Poller --- .../event_source_mapping/pollers/dynamodb_poller.py | 12 ++++++++++++ .../event_source_mapping/pollers/stream_poller.py | 5 +---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/dynamodb_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/dynamodb_poller.py index 17e083506eaa4..832c25f8eb343 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/dynamodb_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/dynamodb_poller.py @@ -74,6 +74,18 @@ def extra_metadata(self) -> dict: "eventVersion": "1.1", } + def get_records(self, shard_iterator: str) -> dict: + try: + return super().get_records(shard_iterator) + except self.source_client.exceptions.TrimmedDataAccessException as e: + LOG.debug( + "Attempted to iterate over trimmed record or expired shard iterator %s for stream %s, re-initializing shards", + shard_iterator, + self.source_arn, + ) + self.shards = self.initialize_shards() + raise e + def transform_into_events(self, records: list[dict], shard_id) -> list[dict]: events = [] for record in records: diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py index 779e86c73b492..c3bc5c2eae124 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py @@ -264,10 +264,7 @@ def get_records(self, shard_iterator: str) -> dict: ) return get_records_response # TODO: test iterator expired with conditional error scenario (requires failure destinations) - except ( - self.source_client.exceptions.ExpiredIteratorException, - self.source_client.exceptions.TrimmedDataAccessException, - ) as e: + except self.source_client.exceptions.ExpiredIteratorExceptionas as e: LOG.debug( "Shard iterator %s expired for stream %s, re-initializing shards", shard_iterator, From b379cfd0137b5e47b0dcc1d3ec37336475c7a3d7 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Fri, 13 Dec 2024 21:16:17 +0200 Subject: [PATCH 4/6] Handle TrimmedDataAccessException in in StreamPoller --- .../event_source_mapping/pollers/dynamodb_poller.py | 12 ------------ .../event_source_mapping/pollers/stream_poller.py | 7 +++++++ 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/dynamodb_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/dynamodb_poller.py index 832c25f8eb343..17e083506eaa4 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/dynamodb_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/dynamodb_poller.py @@ -74,18 +74,6 @@ def extra_metadata(self) -> dict: "eventVersion": "1.1", } - def get_records(self, shard_iterator: str) -> dict: - try: - return super().get_records(shard_iterator) - except self.source_client.exceptions.TrimmedDataAccessException as e: - LOG.debug( - "Attempted to iterate over trimmed record or expired shard iterator %s for stream %s, re-initializing shards", - shard_iterator, - self.source_arn, - ) - self.shards = self.initialize_shards() - raise e - def transform_into_events(self, records: list[dict], shard_id) -> list[dict]: events = [] for record in records: diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py index c3bc5c2eae124..8568dc76bb800 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py @@ -298,6 +298,13 @@ def get_records(self, shard_iterator: str) -> dict: e, ) raise CustomerInvocationError from e + elif "TrimmedDataAccessException" in str(e): + LOG.debug( + "Attempted to iterate over trimmed record or expired shard iterator %s for stream %s, re-initializing shards", + shard_iterator, + self.source_arn, + ) + self.initialize_shards() else: LOG.debug("ClientError during get_records for stream %s: %s", self.source_arn, e) raise PipeInternalError from e From ef48cb4d6224b66bdd8514a216685332f14b3c48 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Fri, 13 Dec 2024 21:19:17 +0200 Subject: [PATCH 5/6] Fix... --- .../lambda_/event_source_mapping/pollers/stream_poller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py index 8568dc76bb800..0dd11affabf9b 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py @@ -264,7 +264,7 @@ def get_records(self, shard_iterator: str) -> dict: ) return get_records_response # TODO: test iterator expired with conditional error scenario (requires failure destinations) - except self.source_client.exceptions.ExpiredIteratorExceptionas as e: + except self.source_client.exceptions.ExpiredIteratorException as e: LOG.debug( "Shard iterator %s expired for stream %s, re-initializing shards", shard_iterator, From e02a1af0db241d655624c38c4aa118f58d591943 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Fri, 13 Dec 2024 21:20:19 +0200 Subject: [PATCH 6/6] Remove blank line --- .../lambda_/event_source_mapping/pollers/stream_poller.py | 1 - 1 file changed, 1 deletion(-) diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py index 0dd11affabf9b..45f977f0dd138 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py @@ -284,7 +284,6 @@ def get_records(self, shard_iterator: str) -> dict: raise CustomerInvocationError from e elif "ResourceNotFoundException" in str(e): # FIXME: The 'Invalid ShardId in ShardIterator' error is returned by DynamoDB-local. Unsure when/why this is returned. - # if "Invalid ShardId in ShardIterator" in str(e): LOG.warning( "Invalid ShardId in ShardIterator for %s. Re-initializing shards.",