Skip to content

Commit ed6f7ec

Browse files
authored
Step Functions: Improve responsiveness on shutdown (#11596)
1 parent 6679ec9 commit ed6f7ec

File tree

9 files changed

+16
-9
lines changed

9 files changed

+16
-9
lines changed

localstack-core/localstack/services/stepfunctions/asl/component/program/program.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def _get_state(self, state_name: str) -> CommonStateField:
7272
def eval(self, env: Environment) -> None:
7373
timeout = self.timeout_seconds.timeout_seconds if self.timeout_seconds else None
7474
env.next_state_name = self.start_at.start_at_name
75-
worker_thread = threading.Thread(target=super().eval, args=(env,))
75+
worker_thread = threading.Thread(target=super().eval, args=(env,), daemon=True)
7676
TMP_THREADS.append(worker_thread)
7777
worker_thread.start()
7878
worker_thread.join(timeout=timeout)

localstack-core/localstack/services/stepfunctions/asl/component/state/state_execution/execute_state.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from localstack.services.stepfunctions.asl.component.state.state_props import StateProps
3636
from localstack.services.stepfunctions.asl.eval.environment import Environment
3737
from localstack.services.stepfunctions.asl.eval.event.event_detail import EventDetails
38+
from localstack.utils.common import TMP_THREADS
3839

3940
LOG = logging.getLogger(__name__)
4041

@@ -183,8 +184,10 @@ def _exec_and_notify():
183184
execution_exceptions.append(ex)
184185
terminated_event.set()
185186

186-
thread = Thread(target=_exec_and_notify)
187+
thread = Thread(target=_exec_and_notify, daemon=True)
188+
TMP_THREADS.append(thread)
187189
thread.start()
190+
188191
finished_on_time: bool = terminated_event.wait(timeout_seconds)
189192
frame.set_ended()
190193
env.close_frame(frame)
@@ -213,7 +216,7 @@ def _eval_state(self, env: Environment) -> None:
213216
env.context_object_manager.context_object["State"]["RetryCount"] = 0
214217

215218
# Attempt to evaluate the state's logic through until it's successful, caught, or retries have run out.
216-
while True:
219+
while env.is_running():
217220
try:
218221
self._evaluate_with_timeout(env)
219222
break

localstack-core/localstack/services/stepfunctions/asl/component/state/state_execution/state_map/iteration/inline_iteration_component.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def _create_worker(self, env: Environment) -> IterationWorker: ...
7777

7878
def _launch_worker(self, env: Environment) -> IterationWorker:
7979
worker = self._create_worker(env=env)
80-
worker_thread = threading.Thread(target=worker.eval)
80+
worker_thread = threading.Thread(target=worker.eval, daemon=True)
8181
TMP_THREADS.append(worker_thread)
8282
worker_thread.start()
8383
return worker

localstack-core/localstack/services/stepfunctions/asl/component/state/state_execution/state_map/state_map.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ def _eval_state(self, env: Environment) -> None:
240240
env.context_object_manager.context_object["State"]["RetryCount"] = 0
241241

242242
# Attempt to evaluate the state's logic through until it's successful, caught, or retries have run out.
243-
while True:
243+
while env.is_running():
244244
try:
245245
self._evaluate_with_timeout(env)
246246
break

localstack-core/localstack/services/stepfunctions/asl/component/state/state_execution/state_parallel/branch_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def start(self):
3838
raise RuntimeError(f"Attempted to rerun BranchWorker for program ${self._program}.")
3939

4040
self._worker_thread = threading.Thread(
41-
target=self._thread_routine, name=f"BranchWorker_${self._program}"
41+
target=self._thread_routine, name=f"BranchWorker_${self._program}", daemon=True
4242
)
4343
TMP_THREADS.append(self._worker_thread)
4444
self._worker_thread.start()

localstack-core/localstack/services/stepfunctions/asl/component/state/state_execution/state_parallel/state_parallel.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def _eval_state(self, env: Environment) -> None:
6565
input_value = copy.deepcopy(env.stack.pop())
6666

6767
# Attempt to evaluate the state's logic through until it's successful, caught, or retries have run out.
68-
while True:
68+
while env.is_running():
6969
try:
7070
env.stack.append(input_value)
7171
self._evaluate_with_timeout(env)

localstack-core/localstack/services/stepfunctions/asl/component/state/state_execution/state_task/service/state_task_service_callback.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ def _local_update_wait_for_task_token():
120120
thread_wait_for_task_token = threading.Thread(
121121
target=_local_update_wait_for_task_token,
122122
name=f"WaitForTaskToken_SyncTask_{self.resource.resource_arn}",
123+
daemon=True,
123124
)
124125
TMP_THREADS.append(thread_wait_for_task_token)
125126
thread_wait_for_task_token.start()

localstack-core/localstack/services/stepfunctions/asl/component/test_state/program/test_state_program.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def __init__(
3636

3737
def eval(self, env: TestStateEnvironment) -> None:
3838
env.next_state_name = self.test_state.name
39-
worker_thread = threading.Thread(target=super().eval, args=(env,))
39+
worker_thread = threading.Thread(target=super().eval, args=(env,), daemon=True)
4040
TMP_THREADS.append(worker_thread)
4141
worker_thread.start()
4242
worker_thread.join(timeout=TEST_CASE_EXECUTION_TIMEOUT_SECONDS)

localstack-core/localstack/services/stepfunctions/backend/execution_worker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from localstack.services.stepfunctions.backend.execution_worker_comm import (
3131
ExecutionWorkerCommunication,
3232
)
33+
from localstack.utils.common import TMP_THREADS
3334

3435

3536
class ExecutionWorker:
@@ -104,7 +105,9 @@ def _execution_logic(self):
104105
self._exec_comm.terminated()
105106

106107
def start(self):
107-
Thread(target=self._execution_logic).start()
108+
execution_logic_thread = Thread(target=self._execution_logic, daemon=True)
109+
TMP_THREADS.append(execution_logic_thread)
110+
execution_logic_thread.start()
108111

109112
def stop(self, stop_date: datetime.datetime, error: Optional[str], cause: Optional[str]):
110113
self.env.set_stop(stop_date=stop_date, cause=cause, error=error)

0 commit comments

Comments
 (0)