Skip to content

Commit 7ee127e

Browse files
committed
Attmept to use cross process critical section
1 parent 10c3f7d commit 7ee127e

File tree

2 files changed

+74
-37
lines changed

2 files changed

+74
-37
lines changed

localstack-core/localstack/testing/pytest/in_memory_localstack.py

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ def pytest_configure(config):
3535
_started = threading.Event()
3636

3737

38-
critical_section = CrossProcessCriticalSection()
39-
40-
4138
def pytest_addoption(parser: Parser, pluginmanager: PytestPluginManager):
4239
parser.addoption(
4340
"--start-localstack",
@@ -79,14 +76,25 @@ def pytest_runtestloop(session: Session):
7976

8077
from localstack.runtime import current
8178

82-
_started.set()
83-
runtime = current.initialize_runtime()
84-
# start runtime asynchronously
85-
threading.Thread(target=runtime.run).start()
79+
def work():
80+
_started.set()
81+
runtime = current.initialize_runtime()
82+
83+
# start runtime asynchronously
84+
def target():
85+
try:
86+
runtime.run()
87+
except Exception:
88+
LOG.warning("error starting runtime", exc_info=LOG.isEnabledFor(logging.DEBUG))
89+
90+
threading.Thread(target=target).start()
8691

87-
# wait for runtime to be ready
88-
if not runtime.ready.wait(timeout=120):
89-
raise TimeoutError("gave up waiting for runtime to be ready")
92+
# wait for runtime to be ready
93+
if not runtime.ready.wait(timeout=120):
94+
raise TimeoutError("gave up waiting for runtime to be ready")
95+
96+
critical_section = CrossProcessCriticalSection("runtime.start", work)
97+
critical_section.run_once()
9098

9199

92100
@pytest.hookimpl(trylast=True)
@@ -103,9 +111,16 @@ def pytest_sessionfinish(session: Session):
103111
LOG.warning("Could not access the current runtime in a pytest sessionfinish hook.")
104112
return
105113

106-
get_current_runtime().shutdown()
107-
LOG.info("waiting for runtime to stop")
114+
def work():
115+
get_current_runtime().shutdown()
116+
LOG.info("waiting for runtime to stop")
117+
118+
# wait for runtime to shut down
119+
if not get_current_runtime().stopped.wait(timeout=20):
120+
LOG.warning("gave up waiting for runtime to stop, returning anyway")
108121

109-
# wait for runtime to shut down
110-
if not get_current_runtime().stopped.wait(timeout=20):
111-
LOG.warning("gave up waiting for runtime to stop, returning anyway")
122+
critical_section = CrossProcessCriticalSection("runtime.shutdown", work)
123+
try:
124+
critical_section.run_once()
125+
except Exception:
126+
LOG.warning("error shutting runtime down", exc_info=LOG.isEnabledFor(logging.DEBUG))
Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,73 @@
11
import atexit
2+
import logging
23
import os
4+
import time
35
from pathlib import Path
46
from typing import Callable
57

68
from filelock import FileLock
79

8-
LOCKFILE = Path("/tmp/my_task.lock")
9-
DONEFILE = Path("/tmp/my_task.done")
10-
PIDFILE = Path("/tmp/my_task.pids")
11-
LOCK_TIMEOUT = 60 # seconds
10+
from localstack.logging.setup import setup_logging_from_config
11+
from localstack.utils.functions import run_safe
12+
13+
LOG = logging.getLogger(__name__)
14+
logging.getLogger("filelock").setLevel(logging.WARNING)
1215

1316

1417
class CrossProcessCriticalSection:
15-
def __init__(self, work: Callable[[], None] | None = None):
18+
def __init__(self, name: str, work: Callable[[], None]):
19+
self.name = name
1620
self.work = work
17-
self.pid = os.getpid()
21+
self.lockfile = Path(f"/tmp/my_task.{name}.lock")
22+
self.donefile = Path(f"/tmp/my_task.{name}.done")
23+
self.lock_timeout = 60 # seconds
1824

19-
def _log(self, message: str):
20-
print(f"[{self.pid}]: {message}")
25+
self.pid = os.getpid()
2126

2227
def run_once(self):
23-
if not self.work:
24-
raise RuntimeError("Work not defined")
25-
26-
self._log("waiting for file lock")
27-
with FileLock(str(LOCKFILE), timeout=LOCK_TIMEOUT):
28-
self._log("got file lock")
29-
if DONEFILE.is_file():
30-
self._log("donefile exists, exiting critical section")
28+
self._log("waiting for lock")
29+
with FileLock(str(self.lockfile), timeout=self.lock_timeout):
30+
self._log("lock achieved")
31+
if self.donefile.is_file():
32+
self._log("work already complete")
3133
# we are not the first process so end this critical section
3234
return
3335

3436
# we are the first to reach this critical section, so make sure the done file does not exist
3537
# and run the work
36-
self._log("donefile does not exist, registering cleanup")
38+
self._log("registering atexit")
3739
atexit.register(self.cleanup)
3840

39-
self._log("adding donefile")
40-
with DONEFILE.open("w") as outfile:
41+
with self.donefile.open("w") as outfile:
4142
outfile.write("1")
4243

43-
self._log("performing work")
44+
self._log("starting work")
4445
self.work()
4546
self._log("work finished")
4647

47-
self._log("ended critical section")
48+
run_safe(lambda: os.remove(self.lockfile))
4849

4950
def cleanup(self):
5051
self._log("cleaning up")
51-
os.remove(DONEFILE)
52+
for path in [self.lockfile, self.donefile]:
53+
run_safe(lambda: os.remove(path))
54+
55+
def _log(self, message: str):
56+
LOG.debug("[%s:%d] %s", self.name, self.pid, message)
57+
58+
59+
def __main__():
60+
setup_logging_from_config()
61+
62+
def work():
63+
print("Should only run once")
64+
time.sleep(10)
65+
66+
cpcs = CrossProcessCriticalSection(work)
67+
cpcs.run_once()
68+
69+
time.sleep(5)
70+
71+
72+
if __name__ == "__main__":
73+
__main__()

0 commit comments

Comments
 (0)