forked from RedisJSON/RedisJSON
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommon.py
44 lines (36 loc) · 1.35 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from contextlib import contextmanager
from includes import *
SANITIZER = os.getenv('SANITIZER', '') # typically 'address' or 'memory'
VALGRIND = os.getenv('VALGRIND', '0') == '1'
CODE_COVERAGE = os.getenv('CODE_COVERAGE', '0') == '1'
MEMINFO = os.getenv('MEMINFO', '0') == '1'
SERDE_JSON = os.getenv('SERDE_JSON', '0') == '1'
@contextmanager
def TimeLimit(timeout):
def handler(signum, frame):
raise Exception('TimeLimit timeout')
signal.signal(signal.SIGALRM, handler)
signal.setitimer(signal.ITIMER_REAL, timeout, 0)
try:
yield
finally:
signal.setitimer(signal.ITIMER_REAL, 0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)
def envMem(env):
pid = env.envRunner.masterProcess.pid
meminfo = psutil.Process(pid).memory_info()
vms = meminfo.vms / 1024
rss = meminfo.rss / 1024
return {'vsz': vms, 'rss': rss }
def checkEnvMem(env, expected_vsz=None, vsz0=0, threshold=0.1, title=None):
if MEMINFO:
if title is not None:
print(f"--- {title}")
pid = env.envRunner.masterProcess.pid
print(paella.sh(f'cat /proc/{pid}/status | grep ^Vm', join=False))
mem = envMem(env)
vsz = mem['vsz'] - vsz0
if expected_vsz is not None:
env.assertGreater(vsz, expected_vsz * (1 - threshold))
env.assertLess(vsz, expected_vsz * (1 + threshold))
return vsz