Skip to content

Commit 7490ef5

Browse files
committed
Simplify to only fix CRUD issue
1 parent c2f1498 commit 7490ef5

File tree

4 files changed

+19
-88
lines changed

4 files changed

+19
-88
lines changed

localstack-core/localstack/services/lambda_/event_source_mapping/pollers/sqs_poller.py

Lines changed: 10 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
import json
22
import logging
3-
import random
4-
import threading
5-
import time
63
from collections import defaultdict
74
from functools import cached_property
85

@@ -19,7 +16,6 @@
1916
Poller,
2017
parse_batch_item_failures,
2118
)
22-
from localstack.services.lambda_.event_source_mapping.senders.sender_utils import batched
2319
from localstack.utils.aws.arns import parse_arn
2420
from localstack.utils.strings import first_char_to_lower
2521

@@ -40,7 +36,6 @@ def __init__(
4036
):
4137
super().__init__(source_arn, source_parameters, source_client, processor)
4238
self.queue_url = get_queue_url(self.source_arn)
43-
self._shutdown_event = threading.Event()
4439

4540
@property
4641
def sqs_queue_parameters(self) -> PipeSourceSqsQueueParameters:
@@ -62,88 +57,22 @@ def get_queue_attributes(self) -> dict:
6257
def event_source(self) -> str:
6358
return "aws:sqs"
6459

65-
def close(self) -> None:
66-
self._shutdown_event.set()
67-
68-
def collect_messages(self, max_batch_size=10, max_batch_window=0, **kwargs) -> list[dict]:
69-
# The number of ReceiveMessage requests we expect to be made in order to fill up the max_batch_size.
70-
_total_expected_requests = (
71-
max_batch_size + DEFAULT_MAX_RECEIVE_COUNT - 1
72-
) // DEFAULT_MAX_RECEIVE_COUNT
73-
74-
# The maximum duration a ReceiveMessage call should take, given how many requests
75-
# we are going to make to fill the batch and the maximum batching window.
76-
_maximum_duration_per_request = max_batch_window / _total_expected_requests
77-
78-
# Number of messages we want to receive per ReceiveMessage operation.
79-
messages_per_receive = min(DEFAULT_MAX_RECEIVE_COUNT, max_batch_size)
80-
81-
def receive_message(num_messages: int = messages_per_receive):
82-
start_request_t = time.monotonic()
83-
response = self.source_client.receive_message(
84-
QueueUrl=self.queue_url,
85-
MaxNumberOfMessages=num_messages,
86-
MessageAttributeNames=["All"],
87-
MessageSystemAttributeNames=[MessageSystemAttributeName.All],
88-
)
89-
return response.get("Messages", []), time.monotonic() - start_request_t
90-
91-
batch = []
92-
start_collection_t = time.monotonic()
93-
while not self._shutdown_event.is_set():
94-
# Adjust request size if we're close to max_batch_size
95-
if (remaining := max_batch_size - len(batch)) < messages_per_receive:
96-
messages_per_receive = remaining
97-
98-
# Return the messages received and the request duration in seconds.
99-
try:
100-
messages, request_duration = receive_message(messages_per_receive)
101-
except Exception as e:
102-
# If an exception is raised here, break the loop and return whatever
103-
# has been collected early.
104-
# TODO: Handle exceptions differently i.e QueueNotExist or ConnectionFailed should retry with backoff
105-
LOG.warning(
106-
"Polling SQS queue failed: %s",
107-
e,
108-
exc_info=LOG.isEnabledFor(logging.DEBUG),
109-
)
110-
break
111-
112-
if messages:
113-
batch.extend(messages)
114-
115-
time_elapsed = time.monotonic() - start_collection_t
116-
if time_elapsed >= max_batch_window or len(batch) >= max_batch_size:
117-
return batch
118-
119-
# Simple adaptive interval technique to randomly backoff between last request duration
120-
# and max allowed time per request.
121-
# Note: This approach assumes that a larger batching window means a user is content
122-
# with waiting longer for a batch response.
123-
adaptive_wait_time = random.uniform(request_duration, _maximum_duration_per_request)
124-
self._shutdown_event.wait(adaptive_wait_time)
125-
126-
return batch
127-
12860
def poll_events(self) -> None:
12961
# SQS pipe source: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html
13062
# "The 9 Ways an SQS Message can be Deleted": https://lucvandonkersgoed.com/2022/01/20/the-9-ways-an-sqs-message-can-be-deleted/
63+
# TODO: implement batch window expires based on MaximumBatchingWindowInSeconds
13164
# TODO: implement invocation payload size quota
13265
# TODO: consider long-polling vs. short-polling trade-off. AWS uses long-polling:
13366
# https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html#pipes-sqs-scaling
134-
if self._shutdown_event.is_set():
135-
self._shutdown_event.clear()
136-
137-
messages = self.collect_messages(
138-
max_batch_size=self.sqs_queue_parameters["BatchSize"],
139-
max_batch_window=self.sqs_queue_parameters["MaximumBatchingWindowInSeconds"],
67+
response = self.source_client.receive_message(
68+
QueueUrl=self.queue_url,
69+
MaxNumberOfMessages=min(
70+
self.sqs_queue_parameters["BatchSize"], DEFAULT_MAX_RECEIVE_COUNT
71+
), # BatchSize cannot exceed 10
72+
MessageAttributeNames=["All"],
73+
MessageSystemAttributeNames=[MessageSystemAttributeName.All],
14074
)
141-
142-
# NOTE: If a batch is collected, this will send a single collected batch for each poll call.
143-
# Increasing the poller frequency _should_ influence the rate of collection but this has not
144-
# yet been investigated.
145-
# messages = next(self.collector)
146-
if messages:
75+
if messages := response.get("Messages"):
14776
LOG.debug("Polled %d events from %s", len(messages), self.source_arn)
14877
try:
14978
if self.is_fifo_queue:
@@ -242,10 +171,7 @@ def delete_messages(self, messages: list[dict], message_ids_to_delete: set):
242171
for count, message in enumerate(messages)
243172
if message["MessageId"] in message_ids_to_delete
244173
]
245-
for batched_entries in batched(entries, DEFAULT_MAX_RECEIVE_COUNT):
246-
self.source_client.delete_message_batch(
247-
QueueUrl=self.queue_url, Entries=batched_entries
248-
)
174+
self.source_client.delete_message_batch(QueueUrl=self.queue_url, Entries=entries)
249175

250176

251177
def split_by_message_group_id(messages) -> defaultdict[str, list[dict]]:

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,9 @@ def get_msg_from_q():
11261126
events = retry(get_msg_from_q, retries=15, sleep=5)
11271127
snapshot.match("Records", events)
11281128

1129+
# FIXME: this fails due to ESM not correctly collecting and sending batches
1130+
# where size exceeds 10 messages.
1131+
@markers.snapshot.skip_snapshot_verify(paths=["$..total_batches_received"])
11291132
@markers.aws.validated
11301133
def test_sqs_event_source_mapping_batching_reserved_concurrency(
11311134
self,
@@ -1213,7 +1216,7 @@ def get_msg_from_q():
12131216
# We expect to receive 2 batches where each batch contains some proportion of the
12141217
# 30 messages we sent through, divided by the 20 ESM batch size. How this is split is
12151218
# not determinable a priori so rather just snapshots the events and the no. of batches.
1216-
snapshot.match("total_batches_received", len(batches))
1219+
snapshot.match("batch_info", {"total_batches_received": len(batches)})
12171220
snapshot.match("Records", events)
12181221

12191222
@markers.aws.validated

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.snapshot.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2033,7 +2033,7 @@
20332033
}
20342034
},
20352035
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::TestSQSEventSourceMapping::test_sqs_event_source_mapping_batching_reserved_concurrency": {
2036-
"recorded-date": "26-11-2024, 08:29:04",
2036+
"recorded-date": "29-11-2024, 13:29:56",
20372037
"recorded-content": {
20382038
"put_concurrency_resp": {
20392039
"ReservedConcurrentExecutions": 2,
@@ -2061,7 +2061,9 @@
20612061
"HTTPStatusCode": 202
20622062
}
20632063
},
2064-
"total_batches_received": 2,
2064+
"batch_info": {
2065+
"total_batches_received": 2
2066+
},
20652067
"Records": [
20662068
{
20672069
"messageId": "<uuid:2>",

tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.validation.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
"last_validated_date": "2024-11-26T13:43:39+00:00"
4646
},
4747
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::TestSQSEventSourceMapping::test_sqs_event_source_mapping_batching_reserved_concurrency": {
48-
"last_validated_date": "2024-11-26T08:29:01+00:00"
48+
"last_validated_date": "2024-11-29T13:29:53+00:00"
4949
},
5050
"tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::TestSQSEventSourceMapping::test_sqs_event_source_mapping_update": {
5151
"last_validated_date": "2024-10-12T13:45:43+00:00"

0 commit comments

Comments
 (0)