Skip to content

Commit 54c63fb

Browse files
committed
Adds funtests/stress: stresstests for multisock
1 parent b9489fc commit 54c63fb

File tree

2 files changed

+152
-0
lines changed

2 files changed

+152
-0
lines changed

celery/worker/loops.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ def on_task_received(body, message):
9292
while connection.more_to_read:
9393
try:
9494
events = poll(poll_timeout)
95+
#print('EVENTS: %r' % (hub.repr_events(events), ))
9596
except ValueError: # Issue 882
9697
return
9798
if not events:

funtests/stress/stress.py

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
from __future__ import absolute_import
2+
3+
import random
4+
import os
5+
import signal
6+
import sys
7+
8+
from time import time, sleep
9+
10+
from celery import Celery, group
11+
from celery.exceptions import TimeoutError
12+
from celery.five import range
13+
from celery.utils.debug import blockdetection
14+
15+
# Should be run with workers running using these options:
16+
#
17+
# 1) celery -A stress worker -c 1 --maxtasksperchild=1
18+
# 2) celery -A stress worker -c 8 --maxtasksperchild=1
19+
#
20+
# 3) celery -A stress worker -c 1
21+
# 4) celery -A stress worker -c 8
22+
#
23+
# 5) celery -A stress worker --autoscale=8,0
24+
#
25+
# 6) celery -A stress worker --time-limit=1
26+
#
27+
# 7) celery -A stress worker -c1 --maxtasksperchild=1 -- celery.acks_late=1
28+
29+
BIG = 'x' * 2 ** 20 * 8
30+
SMALL = 'e' * 1024
31+
32+
celery = Celery(
33+
'stress', broker='amqp://', backend='redis://',
34+
set_as_current=False,
35+
)
36+
37+
38+
@celery.task
39+
def add(x, y):
40+
return x + y
41+
42+
43+
@celery.task
44+
def any_(*args, **kwargs):
45+
wait = kwargs.get('sleep')
46+
if wait:
47+
sleep(wait)
48+
49+
50+
@celery.task
51+
def exiting(status=0):
52+
sys.exit(status)
53+
54+
55+
@celery.task
56+
def kill(sig=signal.SIGKILL):
57+
os.kill(os.getpid(), sig)
58+
59+
60+
@celery.task
61+
def segfault():
62+
import ctypes
63+
ctypes.memset(0, 0, 1)
64+
assert False, 'should not get here'
65+
66+
67+
class Stresstests(object):
68+
69+
def __init__(self, app, block_timeout=30 * 60):
70+
self.app = app
71+
self.connerrors = self.app.connection().recoverable_connection_errors
72+
self.block_timeout = block_timeout
73+
74+
def run(self, n=50):
75+
tests = [self.manyshort,
76+
self.termbysig,
77+
self.bigtasks,
78+
self.smalltasks,
79+
self.revoketermfast,
80+
self.revoketermslow]
81+
for test in tests:
82+
self.runtest(test, n)
83+
84+
def manyshort(self):
85+
self.join(group(add.s(i, i) for i in xrange(1000))())
86+
87+
def runtest(self, fun, n=50):
88+
with blockdetection(self.block_timeout):
89+
t = time()
90+
i = 0
91+
failed = False
92+
print('-%s(%s)' % (fun.__name__, n))
93+
try:
94+
for i in range(n):
95+
print(i)
96+
fun()
97+
except Exception:
98+
failed = True
99+
raise
100+
finally:
101+
print('{0} {1} iterations in {2}s'.format(
102+
'failed after' if failed else 'completed', i, time() - t
103+
))
104+
105+
def termbysig(self):
106+
self._evil_groupmember(kill)
107+
108+
def termbysegfault(self):
109+
self._evil_groupmember(segfault)
110+
111+
def _evil_groupmember(self, evil_t):
112+
g1 = group(add.s(2, 2), evil_t.s(), add.s(4, 4), add.s(8, 8))
113+
g2 = group(add.s(3, 3), add.s(5, 5), evil_t.s(), add.s(7, 7))
114+
self.join(g1(), timeout=10)
115+
self.join(g2(), timeout=10)
116+
117+
def bigtasks(self, wait=None):
118+
self._revoketerm(wait, False, False, BIG)
119+
120+
def smalltasks(self, wait=None):
121+
self._revoketerm(wait, False, False, SMALL)
122+
123+
def revoketermfast(self, wait=None):
124+
self._revoketerm(wait, True, False, SMALL)
125+
126+
def revoketermslow(self, wait=5):
127+
self._revoketerm(wait, True, True, BIG)
128+
129+
def _revoketerm(self, wait=None, terminate=True,
130+
joindelay=True, data=BIG):
131+
g = group(any_.s(data, sleep=wait) for i in range(8))
132+
r = g()
133+
if terminate:
134+
if joindelay:
135+
sleep(random.choice(range(4)))
136+
r.revoke(terminate=True)
137+
self.join(r, timeout=100)
138+
139+
def join(self, r, **kwargs):
140+
while 1:
141+
try:
142+
return r.get(propagate=False, **kwargs)
143+
except TimeoutError as exc:
144+
print('join timed out: %s' % (exc, ))
145+
except self.connerrors as exc:
146+
print('join: connection lost: %r' % (exc, ))
147+
148+
149+
if __name__ == '__main__':
150+
s = Stresstests(celery)
151+
s.run()

0 commit comments

Comments
 (0)