-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
[SQS] Stop blocking Queue.get() call on LocalStack shutdown #12214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
LocalStack Community integration with Pro 2 files ± 0 2 suites ±0 1h 12m 5s ⏱️ - 41m 23s Results for commit ca3b342. ± Comparison against base commit ca9275b. This pull request removes 1087 and adds 2 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already looks really nice, just some minor comments.
Could we test the shutdown behavior with a bootstrap test perhaps? Do you think that makes sense?
@@ -300,6 +301,9 @@ def __init__(self, name: str, region: str, account_id: str, attributes=None, tag | |||
self.permissions = set() | |||
self.mutex = threading.RLock() | |||
|
|||
def do_shutdown(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually we call do_shutdown
from a method like shutdown
, especially when inheriting from an implementation which has a more generic shutdown
method.
Why did you choose this name here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later versions of the python Queue
already have a shutdown implementation so thought to distinguish this method from that. Also saw some other classes using it so assumed it was standard naming practice.
Will rename it to just shutdown
-- but we should be cognizant that Queue.shutdown
will be introduced in python 3.13+.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is the sqs queue implementation though, not our inherited one, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is the SQS model yeah (which contains an InterruptibleQueue
attribute)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then I do not understand the implementation of the shutdown
method in python 3.13 for a standard queue influencing this? The argument would fit for the InterruptibleQueue implementation, not this one, or do I misunderstand something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the shutdown signal is made to the SqsQueue
object, I was wanting to distinguish the SqsQueue.shutdown
from the SqsQueue.internal_queue.shutdown
since a shutdown
call would otherwise look like:
# StandardQueue(SqsQueue)
def shutdown(self):
self.queue.shutdown()
So the shutdown
method is just us calling another interal shutdown
method. Was hoping to distinguish these methods with the higher level abstraction calling a do_shutdown
that (in turn) triggers the internal queue's shutdown
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, but I think it is fine in this case!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it to shutdown
🙂
class InterruptiblePriorityQueue(PriorityQueue, InterruptibleQueue): | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
def get(self, block=True, timeout=None): | ||
with self.not_empty: | ||
if not block: | ||
if not self._qsize(): | ||
raise Empty | ||
elif timeout is None: | ||
while not self._qsize() and not self.shutdown_event.is_set(): | ||
self.not_empty.wait() | ||
elif timeout < 0: | ||
raise ValueError("'timeout' must be a non-negative number") | ||
else: | ||
endtime = time.time() + timeout | ||
while not self._qsize() and not self.shutdown_event.is_set(): | ||
remaining = endtime - time.time() | ||
if remaining <= 0.0: | ||
raise Empty | ||
self.not_empty.wait(remaining) | ||
if self.shutdown_event.is_set(): | ||
raise Empty | ||
item = self._get() | ||
self.not_full.notify() | ||
return item |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would love a comment here to highlight the changes we made from the actual queue implementation, so basically highlighting the event changes.
|
||
def __init__(self, maxsize=0): | ||
super().__init__(maxsize) | ||
self.shutdown_event = threading.Event() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Come to think about it, this could be a boolean as well, we never wait for it. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The python 3.13 implementation actually has a boolean flag instead of a thread (see Queue.get). Suppose since we are locking on entry in anycase using a flag makes sense. Will change!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, the changes are relatively minimal, but we can now "abort" connections on shutdown!
Would love an -ext run though (especially of the sqs persistence tests) before merging this, just to make sure!
@dfangl Am currently running an -ext test just targeting the SQS persistence tests. Also, will add a boostrap test in a follow-up |
3b034a3
to
ca3b342
Compare
Motivation
Blocking calls in the SQS provider's internal queue implementation cause LocalStack shutdown to hang, and in some cases time-out.
This PR allows for these blocking
Queue.get
calls to exit early if LocalStack is shutting down and thedo_shutdown
method is called.Changes
InterruptibleQueue
class that overwrites aQueue
implementationsget
method to leverage a shutdownThreading.Event
.InterruptibleQueue
also has ado_shutdown
method that a queue to be shutdown early if it is currently undergoing a blocking call.on_before_stop
lifecycle hook (which happens after persistence) to ensure no hanging threads prevent LS from shutting down.