Skip to content

Commit 3b034a3

Browse files
committed
Rename shutdown method and highlight custom implementation
1 parent 7b34a30 commit 3b034a3

File tree

3 files changed

+16
-11
lines changed

3 files changed

+16
-11
lines changed

localstack-core/localstack/services/sqs/models.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ def __init__(self, name: str, region: str, account_id: str, attributes=None, tag
301301
self.permissions = set()
302302
self.mutex = threading.RLock()
303303

304-
def do_shutdown(self):
304+
def shutdown(self):
305305
pass
306306

307307
def default_attributes(self) -> QueueAttributeMap:
@@ -739,7 +739,7 @@ def clear(self):
739739
def approx_number_of_messages(self):
740740
return self.visible.qsize()
741741

742-
def do_shutdown(self):
742+
def shutdown(self):
743743
self.visible.shutdown()
744744

745745
def put(
@@ -966,7 +966,7 @@ def approx_number_of_messages(self):
966966
n += len(message_group.messages)
967967
return n
968968

969-
def do_shutdown(self):
969+
def shutdown(self):
970970
self.message_group_queue.shutdown()
971971

972972
def get_message_group(self, message_group_id: str) -> MessageGroup:

localstack-core/localstack/services/sqs/provider.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -820,7 +820,7 @@ def on_before_stop(self):
820820
self._message_move_task_manager.close()
821821
for _, _, store in sqs_stores.iter_stores():
822822
for queue in store.queues.values():
823-
queue.do_shutdown()
823+
queue.shutdown()
824824

825825
self._stop_cloudwatch_metrics_reporting()
826826

localstack-core/localstack/services/sqs/queue.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,46 @@
1-
import threading
21
import time
32
from queue import Empty, PriorityQueue, Queue
43

54

65
class InterruptibleQueue(Queue):
7-
shutdown_event: threading.Event
6+
# is_shutdown is used to check whether we have triggered a shutdown of the Queue
7+
is_shutdown: bool
88

99
def __init__(self, maxsize=0):
1010
super().__init__(maxsize)
11-
self.shutdown_event = threading.Event()
11+
self.is_shutdown = False
1212

1313
def get(self, block=True, timeout=None):
1414
with self.not_empty:
1515
if not block:
1616
if not self._qsize():
1717
raise Empty
1818
elif timeout is None:
19-
while not self._qsize() and not self.shutdown_event.is_set():
19+
while not self._qsize() and not self.is_shutdown: # additional shutdown check
2020
self.not_empty.wait()
2121
elif timeout < 0:
2222
raise ValueError("'timeout' must be a non-negative number")
2323
else:
2424
endtime = time.time() + timeout
25-
while not self._qsize() and not self.shutdown_event.is_set():
25+
while not self._qsize() and not self.is_shutdown: # additional shutdown check
2626
remaining = endtime - time.time()
2727
if remaining <= 0.0:
2828
raise Empty
2929
self.not_empty.wait(remaining)
30-
if self.shutdown_event.is_set():
30+
if self.is_shutdown: # additional shutdown check
3131
raise Empty
3232
item = self._get()
3333
self.not_full.notify()
3434
return item
3535

3636
def shutdown(self):
37-
self.shutdown_event.set()
37+
"""
38+
`shutdown` signals to stop all current and future `Queue.get` calls from executing.
39+
40+
This is helpful for exiting otherwise blocking calls early.
41+
"""
3842
with self.not_empty:
43+
self.is_shutdown = True
3944
self.not_empty.notify_all()
4045

4146

0 commit comments

Comments
 (0)