diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py index c1f8af9556e7f..45f977f0dd138 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py @@ -283,12 +283,27 @@ def get_records(self, shard_iterator: str) -> dict: ) raise CustomerInvocationError from e elif "ResourceNotFoundException" in str(e): - LOG.warning( - "Source stream %s does not exist: %s", + # FIXME: The 'Invalid ShardId in ShardIterator' error is returned by DynamoDB-local. Unsure when/why this is returned. + if "Invalid ShardId in ShardIterator" in str(e): + LOG.warning( + "Invalid ShardId in ShardIterator for %s. Re-initializing shards.", + self.source_arn, + ) + self.initialize_shards() + else: + LOG.warning( + "Source stream %s does not exist: %s", + self.source_arn, + e, + ) + raise CustomerInvocationError from e + elif "TrimmedDataAccessException" in str(e): + LOG.debug( + "Attempted to iterate over trimmed record or expired shard iterator %s for stream %s, re-initializing shards", + shard_iterator, self.source_arn, - e, ) - raise CustomerInvocationError from e + self.initialize_shards() else: LOG.debug("ClientError during get_records for stream %s: %s", self.source_arn, e) raise PipeInternalError from e