Skip to content

Commit fd3d900

Browse files
authored
[SQS] Stop blocking Queue.get() call on LocalStack shutdown (#12214)
1 parent 379d551 commit fd3d900

File tree

6 files changed

+126
-7
lines changed

6 files changed

+126
-7
lines changed

localstack-core/localstack/services/sqs/models.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import threading
88
import time
99
from datetime import datetime
10-
from queue import Empty, PriorityQueue, Queue
10+
from queue import Empty
1111
from typing import Dict, Optional, Set
1212

1313
from localstack import config
@@ -28,6 +28,7 @@
2828
InvalidParameterValueException,
2929
MissingRequiredParameterException,
3030
)
31+
from localstack.services.sqs.queue import InterruptiblePriorityQueue, InterruptibleQueue
3132
from localstack.services.sqs.utils import (
3233
decode_receipt_handle,
3334
encode_move_task_handle,
@@ -300,6 +301,9 @@ def __init__(self, name: str, region: str, account_id: str, attributes=None, tag
300301
self.permissions = set()
301302
self.mutex = threading.RLock()
302303

304+
def shutdown(self):
305+
pass
306+
303307
def default_attributes(self) -> QueueAttributeMap:
304308
return {
305309
QueueAttributeName.ApproximateNumberOfMessages: lambda: str(
@@ -719,12 +723,12 @@ def remove_expired_messages_from_heap(
719723

720724

721725
class StandardQueue(SqsQueue):
722-
visible: PriorityQueue[SqsMessage]
726+
visible: InterruptiblePriorityQueue[SqsMessage]
723727
inflight: Set[SqsMessage]
724728

725729
def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
726730
super().__init__(name, region, account_id, attributes, tags)
727-
self.visible = PriorityQueue()
731+
self.visible = InterruptiblePriorityQueue()
728732

729733
def clear(self):
730734
with self.mutex:
@@ -735,6 +739,9 @@ def clear(self):
735739
def approx_number_of_messages(self):
736740
return self.visible.qsize()
737741

742+
def shutdown(self):
743+
self.visible.shutdown()
744+
738745
def put(
739746
self,
740747
message: Message,
@@ -937,7 +944,7 @@ class FifoQueue(SqsQueue):
937944
deduplication: Dict[str, SqsMessage]
938945
message_groups: dict[str, MessageGroup]
939946
inflight_groups: set[MessageGroup]
940-
message_group_queue: Queue
947+
message_group_queue: InterruptibleQueue
941948
deduplication_scope: str
942949

943950
def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:
@@ -946,7 +953,7 @@ def __init__(self, name: str, region: str, account_id: str, attributes=None, tag
946953

947954
self.message_groups = {}
948955
self.inflight_groups = set()
949-
self.message_group_queue = Queue()
956+
self.message_group_queue = InterruptibleQueue()
950957

951958
# SQS does not seem to change the deduplication behaviour of fifo queues if you
952959
# change to/from 'queue'/'messageGroup' scope after creation -> we need to set this on creation
@@ -959,6 +966,9 @@ def approx_number_of_messages(self):
959966
n += len(message_group.messages)
960967
return n
961968

969+
def shutdown(self):
970+
self.message_group_queue.shutdown()
971+
962972
def get_message_group(self, message_group_id: str) -> MessageGroup:
963973
"""
964974
Thread safe lazy factory for MessageGroup objects.

localstack-core/localstack/services/sqs/provider.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,10 @@ def on_before_stop(self):
818818

819819
self._queue_update_worker.stop()
820820
self._message_move_task_manager.close()
821+
for _, _, store in sqs_stores.iter_stores():
822+
for queue in store.queues.values():
823+
queue.shutdown()
824+
821825
self._stop_cloudwatch_metrics_reporting()
822826

823827
@staticmethod
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import time
2+
from queue import Empty, PriorityQueue, Queue
3+
4+
5+
class InterruptibleQueue(Queue):
6+
# is_shutdown is used to check whether we have triggered a shutdown of the Queue
7+
is_shutdown: bool
8+
9+
def __init__(self, maxsize=0):
10+
super().__init__(maxsize)
11+
self.is_shutdown = False
12+
13+
def get(self, block=True, timeout=None):
14+
with self.not_empty:
15+
if not block:
16+
if not self._qsize():
17+
raise Empty
18+
elif timeout is None:
19+
while not self._qsize() and not self.is_shutdown: # additional shutdown check
20+
self.not_empty.wait()
21+
elif timeout < 0:
22+
raise ValueError("'timeout' must be a non-negative number")
23+
else:
24+
endtime = time.time() + timeout
25+
while not self._qsize() and not self.is_shutdown: # additional shutdown check
26+
remaining = endtime - time.time()
27+
if remaining <= 0.0:
28+
raise Empty
29+
self.not_empty.wait(remaining)
30+
if self.is_shutdown: # additional shutdown check
31+
raise Empty
32+
item = self._get()
33+
self.not_full.notify()
34+
return item
35+
36+
def shutdown(self):
37+
"""
38+
`shutdown` signals to stop all current and future `Queue.get` calls from executing.
39+
40+
This is helpful for exiting otherwise blocking calls early.
41+
"""
42+
with self.not_empty:
43+
self.is_shutdown = True
44+
self.not_empty.notify_all()
45+
46+
47+
class InterruptiblePriorityQueue(PriorityQueue, InterruptibleQueue):
48+
pass

tests/aws/services/sqs/test_sqs.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,21 @@ def test_send_receive_max_number_of_messages(self, sqs_queue, snapshot, aws_sqs_
249249

250250
snapshot.match("send_max_number_of_messages", e.value.response)
251251

252+
@markers.aws.validated
253+
def test_receive_empty_queue(self, sqs_queue, snapshot, aws_sqs_client):
254+
queue_url = sqs_queue
255+
256+
empty_short_poll_resp = aws_sqs_client.receive_message(
257+
QueueUrl=queue_url, MaxNumberOfMessages=1
258+
)
259+
260+
snapshot.match("empty_short_poll_resp", empty_short_poll_resp)
261+
262+
empty_long_poll_resp = aws_sqs_client.receive_message(
263+
QueueUrl=queue_url, MaxNumberOfMessages=1, WaitTimeSeconds=1
264+
)
265+
snapshot.match("empty_long_poll_resp", empty_long_poll_resp)
266+
252267
@markers.aws.validated
253268
def test_receive_message_attributes_timestamp_types(self, sqs_queue, aws_sqs_client):
254269
aws_sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="message")
@@ -1035,7 +1050,9 @@ def test_extend_message_visibility_timeout_set_in_queue(self, sqs_create_queue,
10351050
)
10361051
assert aws_sqs_client.receive_message(QueueUrl=queue_url).get("Messages", []) == []
10371052

1038-
messages = aws_sqs_client.receive_message(QueueUrl=queue_url, WaitTimeSeconds=5)["Messages"]
1053+
messages = aws_sqs_client.receive_message(QueueUrl=queue_url, WaitTimeSeconds=5).get(
1054+
"Messages", []
1055+
)
10391056
assert messages[0]["Body"] == "test"
10401057
assert len(messages) == 1
10411058

@@ -2287,7 +2304,7 @@ def test_publish_get_delete_message_batch(self, sqs_create_queue, aws_sqs_client
22872304
while len(result_recv) < message_count and i < message_count:
22882305
result = aws_sqs_client.receive_message(
22892306
QueueUrl=queue_url, MaxNumberOfMessages=message_count
2290-
)["Messages"]
2307+
).get("Messages", [])
22912308
if result:
22922309
result_recv.extend(result)
22932310
i += 1

tests/aws/services/sqs/test_sqs.snapshot.json

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3646,5 +3646,39 @@
36463646
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_set_empty_redrive_policy[sqs_query]": {
36473647
"recorded-date": "20-08-2024, 14:14:11",
36483648
"recorded-content": {}
3649+
},
3650+
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_receive_empty_queue[sqs]": {
3651+
"recorded-date": "30-01-2025, 22:32:45",
3652+
"recorded-content": {
3653+
"empty_short_poll_resp": {
3654+
"ResponseMetadata": {
3655+
"HTTPHeaders": {},
3656+
"HTTPStatusCode": 200
3657+
}
3658+
},
3659+
"empty_long_poll_resp": {
3660+
"ResponseMetadata": {
3661+
"HTTPHeaders": {},
3662+
"HTTPStatusCode": 200
3663+
}
3664+
}
3665+
}
3666+
},
3667+
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_receive_empty_queue[sqs_query]": {
3668+
"recorded-date": "30-01-2025, 22:32:48",
3669+
"recorded-content": {
3670+
"empty_short_poll_resp": {
3671+
"ResponseMetadata": {
3672+
"HTTPHeaders": {},
3673+
"HTTPStatusCode": 200
3674+
}
3675+
},
3676+
"empty_long_poll_resp": {
3677+
"ResponseMetadata": {
3678+
"HTTPHeaders": {},
3679+
"HTTPStatusCode": 200
3680+
}
3681+
}
3682+
}
36493683
}
36503684
}

tests/aws/services/sqs/test_sqs.validation.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,12 @@
203203
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_posting_to_fifo_requires_deduplicationid_group_id[sqs_query]": {
204204
"last_validated_date": "2024-04-30T13:34:22+00:00"
205205
},
206+
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_receive_empty_queue[sqs]": {
207+
"last_validated_date": "2025-01-30T22:32:45+00:00"
208+
},
209+
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_receive_empty_queue[sqs_query]": {
210+
"last_validated_date": "2025-01-30T22:32:48+00:00"
211+
},
206212
"tests/aws/services/sqs/test_sqs.py::TestSqsProvider::test_receive_message_attribute_names_filters[sqs]": {
207213
"last_validated_date": "2024-06-04T11:54:31+00:00"
208214
},

0 commit comments

Comments
 (0)