|
1 | 1 | import atexit
|
| 2 | +import logging |
2 | 3 | import os
|
| 4 | +import time |
3 | 5 | from pathlib import Path
|
4 | 6 | from typing import Callable
|
5 | 7 |
|
6 | 8 | from filelock import FileLock
|
7 | 9 |
|
8 |
| -LOCKFILE = Path("/tmp/my_task.lock") |
9 |
| -DONEFILE = Path("/tmp/my_task.done") |
10 |
| -PIDFILE = Path("/tmp/my_task.pids") |
11 |
| -LOCK_TIMEOUT = 60 # seconds |
| 10 | +from localstack.logging.setup import setup_logging_from_config |
| 11 | +from localstack.utils.functions import run_safe |
| 12 | + |
| 13 | +LOG = logging.getLogger(__name__) |
| 14 | +logging.getLogger("filelock").setLevel(logging.WARNING) |
12 | 15 |
|
13 | 16 |
|
14 | 17 | class CrossProcessCriticalSection:
|
15 |
| - def __init__(self, work: Callable[[], None] | None = None): |
| 18 | + def __init__(self, name: str, work: Callable[[], None]): |
| 19 | + self.name = name |
16 | 20 | self.work = work
|
17 |
| - self.pid = os.getpid() |
| 21 | + self.lockfile = Path(f"/tmp/my_task.{name}.lock") |
| 22 | + self.donefile = Path(f"/tmp/my_task.{name}.done") |
| 23 | + self.lock_timeout = 60 # seconds |
18 | 24 |
|
19 |
| - def _log(self, message: str): |
20 |
| - print(f"[{self.pid}]: {message}") |
| 25 | + self.pid = os.getpid() |
21 | 26 |
|
22 | 27 | def run_once(self):
|
23 |
| - if not self.work: |
24 |
| - raise RuntimeError("Work not defined") |
25 |
| - |
26 |
| - self._log("waiting for file lock") |
27 |
| - with FileLock(str(LOCKFILE), timeout=LOCK_TIMEOUT): |
28 |
| - self._log("got file lock") |
29 |
| - if DONEFILE.is_file(): |
30 |
| - self._log("donefile exists, exiting critical section") |
| 28 | + self._log("waiting for lock") |
| 29 | + with FileLock(str(self.lockfile), timeout=self.lock_timeout): |
| 30 | + self._log("lock achieved") |
| 31 | + if self.donefile.is_file(): |
| 32 | + self._log("work already complete") |
31 | 33 | # we are not the first process so end this critical section
|
32 | 34 | return
|
33 | 35 |
|
34 | 36 | # we are the first to reach this critical section, so make sure the done file does not exist
|
35 | 37 | # and run the work
|
36 |
| - self._log("donefile does not exist, registering cleanup") |
| 38 | + self._log("registering atexit") |
37 | 39 | atexit.register(self.cleanup)
|
38 | 40 |
|
39 |
| - self._log("adding donefile") |
40 |
| - with DONEFILE.open("w") as outfile: |
| 41 | + with self.donefile.open("w") as outfile: |
41 | 42 | outfile.write("1")
|
42 | 43 |
|
43 |
| - self._log("performing work") |
| 44 | + self._log("starting work") |
44 | 45 | self.work()
|
45 | 46 | self._log("work finished")
|
46 | 47 |
|
47 |
| - self._log("ended critical section") |
| 48 | + run_safe(lambda: os.remove(self.lockfile)) |
48 | 49 |
|
49 | 50 | def cleanup(self):
|
50 | 51 | self._log("cleaning up")
|
51 |
| - os.remove(DONEFILE) |
| 52 | + for path in [self.lockfile, self.donefile]: |
| 53 | + run_safe(lambda: os.remove(path)) |
| 54 | + |
| 55 | + def _log(self, message: str): |
| 56 | + LOG.debug("[%s:%d] %s", self.name, self.pid, message) |
| 57 | + |
| 58 | + |
| 59 | +def __main__(): |
| 60 | + setup_logging_from_config() |
| 61 | + |
| 62 | + def work(): |
| 63 | + print("Should only run once") |
| 64 | + time.sleep(10) |
| 65 | + |
| 66 | + cpcs = CrossProcessCriticalSection(work) |
| 67 | + cpcs.run_once() |
| 68 | + |
| 69 | + time.sleep(5) |
| 70 | + |
| 71 | + |
| 72 | +if __name__ == "__main__": |
| 73 | + __main__() |
0 commit comments