Skip to content

[SQS] Stop blocking Queue.get() call on LocalStack shutdown #12214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 4, 2025

Conversation

gregfurman
Copy link
Contributor

Motivation

Blocking calls in the SQS provider's internal queue implementation cause LocalStack shutdown to hang, and in some cases time-out.

This PR allows for these blocking Queue.get calls to exit early if LocalStack is shutting down and the do_shutdown method is called.

Changes

  • Add an InterruptibleQueue class that overwrites a Queue implementations get method to leverage a shutdown Threading.Event.
  • InterruptibleQueue also has a do_shutdown method that a queue to be shutdown early if it is currently undergoing a blocking call.
  • Add queue closing to the SQS provider's on_before_stop lifecycle hook (which happens after persistence) to ensure no hanging threads prevent LS from shutting down.

@gregfurman gregfurman added aws:sqs Amazon Simple Queue Service semver: patch Non-breaking changes which can be included in patch releases labels Jan 31, 2025
@gregfurman gregfurman requested a review from dfangl January 31, 2025 17:28
@gregfurman gregfurman self-assigned this Jan 31, 2025
Copy link

github-actions bot commented Jan 31, 2025

LocalStack Community integration with Pro

    2 files  ±    0      2 suites  ±0   1h 12m 5s ⏱️ - 41m 23s
2 990 tests  - 1 085  2 860 ✅  - 900  130 💤  - 185  0 ❌ ±0 
2 992 runs   - 1 085  2 860 ✅  - 900  132 💤  - 185  0 ❌ ±0 

Results for commit ca3b342. ± Comparison against base commit ca9275b.

This pull request removes 1087 and adds 2 tests. Note that renamed tests count towards both.
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_lambda_dynamodb
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_opensearch_crud
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_search_books
tests.aws.scenario.bookstore.test_bookstore.TestBookstoreApplication ‑ test_setup
tests.aws.scenario.kinesis_firehose.test_kinesis_firehose.TestKinesisFirehoseScenario ‑ test_kinesis_firehose_s3
tests.aws.scenario.lambda_destination.test_lambda_destination_scenario.TestLambdaDestinationScenario ‑ test_destination_sns
tests.aws.scenario.lambda_destination.test_lambda_destination_scenario.TestLambdaDestinationScenario ‑ test_infra
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_prefill_dynamodb_table
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_stepfunctions_input_recipient_list[step_function_input0-SUCCEEDED]
tests.aws.scenario.loan_broker.test_loan_broker.TestLoanBrokerScenario ‑ test_stepfunctions_input_recipient_list[step_function_input1-SUCCEEDED]
…
tests.aws.services.sqs.test_sqs.TestSqsProvider ‑ test_receive_empty_queue[sqs]
tests.aws.services.sqs.test_sqs.TestSqsProvider ‑ test_receive_empty_queue[sqs_query]

♻️ This comment has been updated with latest results.

@gregfurman gregfurman marked this pull request as ready for review January 31, 2025 20:42
Copy link
Member

@dfangl dfangl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already looks really nice, just some minor comments.
Could we test the shutdown behavior with a bootstrap test perhaps? Do you think that makes sense?

@@ -300,6 +301,9 @@ def __init__(self, name: str, region: str, account_id: str, attributes=None, tag
self.permissions = set()
self.mutex = threading.RLock()

def do_shutdown(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we call do_shutdown from a method like shutdown, especially when inheriting from an implementation which has a more generic shutdown method.

Why did you choose this name here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later versions of the python Queue already have a shutdown implementation so thought to distinguish this method from that. Also saw some other classes using it so assumed it was standard naming practice.

Will rename it to just shutdown -- but we should be cognizant that Queue.shutdown will be introduced in python 3.13+.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is the sqs queue implementation though, not our inherited one, right?

Copy link
Contributor Author

@gregfurman gregfurman Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is the SQS model yeah (which contains an InterruptibleQueue attribute)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I do not understand the implementation of the shutdown method in python 3.13 for a standard queue influencing this? The argument would fit for the InterruptibleQueue implementation, not this one, or do I misunderstand something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the shutdown signal is made to the SqsQueue object, I was wanting to distinguish the SqsQueue.shutdown from the SqsQueue.internal_queue.shutdown since a shutdown call would otherwise look like:

# StandardQueue(SqsQueue) 
def shutdown(self):
  self.queue.shutdown()

So the shutdown method is just us calling another interal shutdown method. Was hoping to distinguish these methods with the higher level abstraction calling a do_shutdown that (in turn) triggers the internal queue's shutdown.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, but I think it is fine in this case!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to shutdown 🙂

Comment on lines +42 to +48
class InterruptiblePriorityQueue(PriorityQueue, InterruptibleQueue):
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Comment on lines 13 to 34
def get(self, block=True, timeout=None):
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize() and not self.shutdown_event.is_set():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time.time() + timeout
while not self._qsize() and not self.shutdown_event.is_set():
remaining = endtime - time.time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
if self.shutdown_event.is_set():
raise Empty
item = self._get()
self.not_full.notify()
return item
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love a comment here to highlight the changes we made from the actual queue implementation, so basically highlighting the event changes.


def __init__(self, maxsize=0):
super().__init__(maxsize)
self.shutdown_event = threading.Event()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Come to think about it, this could be a boolean as well, we never wait for it. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The python 3.13 implementation actually has a boolean flag instead of a thread (see Queue.get). Suppose since we are locking on entry in anycase using a flag makes sense. Will change!

@gregfurman gregfurman requested a review from dfangl February 3, 2025 14:50
Copy link
Member

@dfangl dfangl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, the changes are relatively minimal, but we can now "abort" connections on shutdown!
Would love an -ext run though (especially of the sqs persistence tests) before merging this, just to make sure!

@gregfurman
Copy link
Contributor Author

@dfangl Am currently running an -ext test just targeting the SQS persistence tests. Also, will add a boostrap test in a follow-up

@gregfurman gregfurman force-pushed the fix/sqs/long-polling-shutdown branch from 3b034a3 to ca3b342 Compare February 3, 2025 20:44
@gregfurman gregfurman merged commit fd3d900 into master Feb 4, 2025
32 checks passed
@gregfurman gregfurman deleted the fix/sqs/long-polling-shutdown branch February 4, 2025 08:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
aws:sqs Amazon Simple Queue Service semver: patch Non-breaking changes which can be included in patch releases
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants