Skip to content

Commit dfe3575

Browse files
authored
[SQS] Add custom header for overriding WaitTimeSeconds (#12172)
1 parent 35cd175 commit dfe3575

File tree

7 files changed

+330
-10
lines changed

7 files changed

+330
-10
lines changed

localstack-core/localstack/services/sqs/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,4 @@
4747

4848
# HTTP headers used to override internal SQS ReceiveMessage
4949
HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT = "x-localstack-sqs-override-message-count"
50+
HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS = "x-localstack-sqs-override-wait-time-seconds"

localstack-core/localstack/services/sqs/models.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,8 @@ def receive(
523523
num_messages: int = 1,
524524
wait_time_seconds: int = None,
525525
visibility_timeout: int = None,
526+
*,
527+
poll_empty_queue: bool = False,
526528
) -> ReceiveMessageResult:
527529
"""
528530
Receive ``num_messages`` from the queue, and wait at max ``wait_time_seconds``. If a visibility
@@ -531,6 +533,7 @@ def receive(
531533
:param num_messages: the number of messages you want to get from the underlying queue
532534
:param wait_time_seconds: the number of seconds you want to wait
533535
:param visibility_timeout: an optional new visibility timeout
536+
:param poll_empty_queue: whether to keep polling an empty queue until the duration ``wait_time_seconds`` has elapsed
534537
:return: a ReceiveMessageResult object that contains the result of the operation
535538
"""
536539
raise NotImplementedError
@@ -798,6 +801,8 @@ def receive(
798801
num_messages: int = 1,
799802
wait_time_seconds: int = None,
800803
visibility_timeout: int = None,
804+
*,
805+
poll_empty_queue: bool = False,
801806
) -> ReceiveMessageResult:
802807
result = ReceiveMessageResult()
803808

@@ -819,7 +824,8 @@ def receive(
819824
# setting block to false guarantees that, if we've already waited before, we don't wait the
820825
# full time again in the next iteration if max_number_of_messages is set but there are no more
821826
# messages in the queue. see https://github.com/localstack/localstack/issues/5824
822-
block = False
827+
if not poll_empty_queue:
828+
block = False
823829

824830
timeout -= time.time() - start
825831
if timeout < 0:
@@ -1110,6 +1116,8 @@ def receive(
11101116
num_messages: int = 1,
11111117
wait_time_seconds: int = None,
11121118
visibility_timeout: int = None,
1119+
*,
1120+
poll_empty_queue: bool = False,
11131121
) -> ReceiveMessageResult:
11141122
"""
11151123
Receive logic for FIFO queues is different from standard queues. See
@@ -1157,7 +1165,8 @@ def receive(
11571165

11581166
received_groups.add(group)
11591167

1160-
block = False
1168+
if not poll_empty_queue:
1169+
block = False
11611170

11621171
# we lock the queue while accessing the groups to not get into races with re-queueing/deleting
11631172
with self.mutex:

localstack-core/localstack/services/sqs/provider.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
from localstack.services.sqs import constants as sqs_constants
8080
from localstack.services.sqs.constants import (
8181
HEADER_LOCALSTACK_SQS_OVERRIDE_MESSAGE_COUNT,
82+
HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS,
8283
)
8384
from localstack.services.sqs.exceptions import InvalidParameterValueException
8485
from localstack.services.sqs.models import (
@@ -1234,9 +1235,17 @@ def receive_message(
12341235
# TODO add support for message_system_attribute_names
12351236
queue = self._resolve_queue(context, queue_url=queue_url)
12361237

1237-
if wait_time_seconds is None:
1238+
poll_empty_queue = False
1239+
if override := extract_wait_time_seconds_from_headers(context):
1240+
wait_time_seconds = override
1241+
poll_empty_queue = True
1242+
elif wait_time_seconds is None:
12381243
wait_time_seconds = queue.wait_time_seconds
1239-
1244+
elif wait_time_seconds < 0 or wait_time_seconds > 20:
1245+
raise InvalidParameterValueException(
1246+
f"Value {wait_time_seconds} for parameter WaitTimeSeconds is invalid. "
1247+
f"Reason: Must be >= 0 and <= 20, if provided."
1248+
)
12401249
num = max_number_of_messages or 1
12411250

12421251
# override receive count with value from custom header
@@ -1257,7 +1266,9 @@ def receive_message(
12571266
# fewer messages than requested on small queues. at some point we could maybe change this to randomly sample
12581267
# between 1 and max_number_of_messages.
12591268
# see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html
1260-
result = queue.receive(num, wait_time_seconds, visibility_timeout)
1269+
result = queue.receive(
1270+
num, wait_time_seconds, visibility_timeout, poll_empty_queue=poll_empty_queue
1271+
)
12611272

12621273
# process dead letter messages
12631274
if result.dead_letter_messages:
@@ -1905,3 +1916,12 @@ def extract_message_count_from_headers(context: RequestContext) -> int | None:
19051916
return override
19061917

19071918
return None
1919+
1920+
1921+
def extract_wait_time_seconds_from_headers(context: RequestContext) -> int | None:
1922+
if override := context.request.headers.get(
1923+
HEADER_LOCALSTACK_SQS_OVERRIDE_WAIT_TIME_SECONDS, default=None, type=int
1924+
):
1925+
return override
1926+
1927+
return None

tests/aws/services/sqs/test_sqs.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,14 +256,44 @@ def test_receive_empty_queue(self, sqs_queue, snapshot, aws_sqs_client):
256256
empty_short_poll_resp = aws_sqs_client.receive_message(
257257
QueueUrl=queue_url, MaxNumberOfMessages=1
258258
)
259-
260259
snapshot.match("empty_short_poll_resp", empty_short_poll_resp)
261260

262261
empty_long_poll_resp = aws_sqs_client.receive_message(
263262
QueueUrl=queue_url, MaxNumberOfMessages=1, WaitTimeSeconds=1
264263
)
265264
snapshot.match("empty_long_poll_resp", empty_long_poll_resp)
266265

266+
@markers.aws.validated
267+
@markers.snapshot.skip_snapshot_verify(paths=["$..Error.Detail"])
268+
def test_send_receive_wait_time_seconds(self, sqs_queue, snapshot, aws_sqs_client):
269+
queue_url = sqs_queue
270+
send_result_1 = aws_sqs_client.send_message(QueueUrl=queue_url, MessageBody="message")
271+
assert send_result_1["MessageId"]
272+
273+
send_result_2 = aws_sqs_client.send_message(QueueUrl=queue_url, MessageBody="message")
274+
assert send_result_2["MessageId"]
275+
276+
MAX_WAIT_TIME_SECONDS = 20
277+
with pytest.raises(ClientError) as e:
278+
aws_sqs_client.receive_message(
279+
QueueUrl=queue_url, WaitTimeSeconds=MAX_WAIT_TIME_SECONDS + 1
280+
)
281+
snapshot.match("recieve_message_error_too_large", e.value.response)
282+
283+
with pytest.raises(ClientError) as e:
284+
aws_sqs_client.receive_message(QueueUrl=queue_url, WaitTimeSeconds=-1)
285+
snapshot.match("recieve_message_error_too_small", e.value.response)
286+
287+
empty_short_poll_by_default_resp = aws_sqs_client.receive_message(
288+
QueueUrl=queue_url, MaxNumberOfMessages=1
289+
)
290+
snapshot.match("empty_short_poll_by_default_resp", empty_short_poll_by_default_resp)
291+
292+
empty_short_poll_explicit_resp = aws_sqs_client.receive_message(
293+
QueueUrl=queue_url, MaxNumberOfMessages=1, WaitTimeSeconds=0
294+
)
295+
snapshot.match("empty_short_poll_explicit_resp", empty_short_poll_explicit_resp)
296+
267297
@markers.aws.validated
268298
def test_receive_message_attributes_timestamp_types(self, sqs_queue, aws_sqs_client):
269299
aws_sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="message")

tests/aws/services/sqs/test_sqs.snapshot.json

Lines changed: 128 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3647,9 +3647,129 @@
36473647
"recorded-date": "20-08-2024, 14:14:11",
36483648
"recorded-content": {}
36493649
},
3650+
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_send_receive_wait_time_seconds[sqs]": {
3651+
"recorded-date": "10-02-2025, 13:22:29",
3652+
"recorded-content": {
3653+
"recieve_message_error_too_large": {
3654+
"Error": {
3655+
"Code": "InvalidParameterValue",
3656+
"Message": "Value 21 for parameter WaitTimeSeconds is invalid. Reason: Must be >= 0 and <= 20, if provided.",
3657+
"QueryErrorCode": "InvalidParameterValueException",
3658+
"Type": "Sender"
3659+
},
3660+
"ResponseMetadata": {
3661+
"HTTPHeaders": {},
3662+
"HTTPStatusCode": 400
3663+
}
3664+
},
3665+
"recieve_message_error_too_small": {
3666+
"Error": {
3667+
"Code": "InvalidParameterValue",
3668+
"Message": "Value -1 for parameter WaitTimeSeconds is invalid. Reason: Must be >= 0 and <= 20, if provided.",
3669+
"QueryErrorCode": "InvalidParameterValueException",
3670+
"Type": "Sender"
3671+
},
3672+
"ResponseMetadata": {
3673+
"HTTPHeaders": {},
3674+
"HTTPStatusCode": 400
3675+
}
3676+
},
3677+
"empty_short_poll_by_default_resp": {
3678+
"Messages": [
3679+
{
3680+
"Body": "message",
3681+
"MD5OfBody": "78e731027d8fd50ed642340b7c9a63b3",
3682+
"MessageId": "<uuid:1>",
3683+
"ReceiptHandle": "<receipt-handle:1>"
3684+
}
3685+
],
3686+
"ResponseMetadata": {
3687+
"HTTPHeaders": {},
3688+
"HTTPStatusCode": 200
3689+
}
3690+
},
3691+
"empty_short_poll_explicit_resp": {
3692+
"Messages": [
3693+
{
3694+
"Body": "message",
3695+
"MD5OfBody": "78e731027d8fd50ed642340b7c9a63b3",
3696+
"MessageId": "<uuid:2>",
3697+
"ReceiptHandle": "<receipt-handle:2>"
3698+
}
3699+
],
3700+
"ResponseMetadata": {
3701+
"HTTPHeaders": {},
3702+
"HTTPStatusCode": 200
3703+
}
3704+
}
3705+
}
3706+
},
3707+
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_send_receive_wait_time_seconds[sqs_query]": {
3708+
"recorded-date": "10-02-2025, 13:22:32",
3709+
"recorded-content": {
3710+
"recieve_message_error_too_large": {
3711+
"Error": {
3712+
"Code": "InvalidParameterValue",
3713+
"Detail": null,
3714+
"Message": "Value 21 for parameter WaitTimeSeconds is invalid. Reason: Must be >= 0 and <= 20, if provided.",
3715+
"Type": "Sender"
3716+
},
3717+
"ResponseMetadata": {
3718+
"HTTPHeaders": {},
3719+
"HTTPStatusCode": 400
3720+
}
3721+
},
3722+
"recieve_message_error_too_small": {
3723+
"Error": {
3724+
"Code": "InvalidParameterValue",
3725+
"Detail": null,
3726+
"Message": "Value -1 for parameter WaitTimeSeconds is invalid. Reason: Must be >= 0 and <= 20, if provided.",
3727+
"Type": "Sender"
3728+
},
3729+
"ResponseMetadata": {
3730+
"HTTPHeaders": {},
3731+
"HTTPStatusCode": 400
3732+
}
3733+
},
3734+
"empty_short_poll_by_default_resp": {
3735+
"Messages": [
3736+
{
3737+
"Body": "message",
3738+
"MD5OfBody": "78e731027d8fd50ed642340b7c9a63b3",
3739+
"MessageId": "<uuid:1>",
3740+
"ReceiptHandle": "<receipt-handle:1>"
3741+
}
3742+
],
3743+
"ResponseMetadata": {
3744+
"HTTPHeaders": {},
3745+
"HTTPStatusCode": 200
3746+
}
3747+
},
3748+
"empty_short_poll_explicit_resp": {
3749+
"Messages": [
3750+
{
3751+
"Body": "message",
3752+
"MD5OfBody": "78e731027d8fd50ed642340b7c9a63b3",
3753+
"MessageId": "<uuid:2>",
3754+
"ReceiptHandle": "<receipt-handle:2>"
3755+
}
3756+
],
3757+
"ResponseMetadata": {
3758+
"HTTPHeaders": {},
3759+
"HTTPStatusCode": 200
3760+
}
3761+
}
3762+
}
3763+
},
36503764
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_receive_empty_queue[sqs]": {
3651-
"recorded-date": "30-01-2025, 22:32:45",
3765+
"recorded-date": "10-02-2025, 13:18:17",
36523766
"recorded-content": {
3767+
"empty_short_poll_resp_no_param": {
3768+
"ResponseMetadata": {
3769+
"HTTPHeaders": {},
3770+
"HTTPStatusCode": 200
3771+
}
3772+
},
36533773
"empty_short_poll_resp": {
36543774
"ResponseMetadata": {
36553775
"HTTPHeaders": {},
@@ -3665,8 +3785,14 @@
36653785
}
36663786
},
36673787
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_receive_empty_queue[sqs_query]": {
3668-
"recorded-date": "30-01-2025, 22:32:48",
3788+
"recorded-date": "10-02-2025, 13:18:20",
36693789
"recorded-content": {
3790+
"empty_short_poll_resp_no_param": {
3791+
"ResponseMetadata": {
3792+
"HTTPHeaders": {},
3793+
"HTTPStatusCode": 200
3794+
}
3795+
},
36703796
"empty_short_poll_resp": {
36713797
"ResponseMetadata": {
36723798
"HTTPHeaders": {},

tests/aws/services/sqs/test_sqs.validation.json

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,10 @@
204204
"last_validated_date": "2024-04-30T13:34:22+00:00"
205205
},
206206
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_receive_empty_queue[sqs]": {
207-
"last_validated_date": "2025-01-30T22:32:45+00:00"
207+
"last_validated_date": "2025-02-10T13:18:17+00:00"
208208
},
209209
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_receive_empty_queue[sqs_query]": {
210-
"last_validated_date": "2025-01-30T22:32:48+00:00"
210+
"last_validated_date": "2025-02-10T13:18:20+00:00"
211211
},
212212
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_receive_message_attribute_names_filters[sqs]": {
213213
"last_validated_date": "2024-06-04T11:54:31+00:00"
@@ -329,6 +329,12 @@
329329
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_send_receive_message_multiple_queues": {
330330
"last_validated_date": "2024-04-30T13:40:05+00:00"
331331
},
332+
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_send_receive_wait_time_seconds[sqs]": {
333+
"last_validated_date": "2025-02-10T13:22:29+00:00"
334+
},
335+
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_send_receive_wait_time_seconds[sqs_query]": {
336+
"last_validated_date": "2025-02-10T13:22:32+00:00"
337+
},
332338
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_set_empty_redrive_policy[sqs]": {
333339
"last_validated_date": "2024-08-20T14:14:08+00:00"
334340
},
@@ -401,6 +407,12 @@
401407
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_too_many_entries_in_batch_request[sqs_query]": {
402408
"last_validated_date": "2024-04-30T13:33:40+00:00"
403409
},
410+
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_wait_time_seconds_waits_correctly[sqs]": {
411+
"last_validated_date": "2025-01-23T13:57:19+00:00"
412+
},
413+
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_wait_time_seconds_waits_correctly[sqs_query]": {
414+
"last_validated_date": "2025-01-23T13:57:30+00:00"
415+
},
404416
"tests/aws/services/sqs/test_sqs.py::TestSqsQueryApi::test_send_message_via_queue_url_with_json_protocol": {
405417
"last_validated_date": "2024-04-30T13:35:11+00:00"
406418
}

0 commit comments

Comments
 (0)