@@ -199,7 +199,7 @@ def _assert_single_task(self, p, async=False):
199
199
200
200
# add a simple task
201
201
# it iterates n items
202
- ni = 5000
202
+ ni = 1000
203
203
assert ni % 2 == 0 , "ni needs to be dividable by 2"
204
204
assert ni % 4 == 0 , "ni needs to be dividable by 4"
205
205
@@ -382,18 +382,18 @@ def _assert_async_dependent_tasks(self, pool):
382
382
# includes failure in center task, 'recursive' orphan cleanup
383
383
# This will also verify that the channel-close mechanism works
384
384
# t1 -> t2 -> t3
385
-
385
+
386
386
print >> sys .stderr , "Threadpool: starting async dependency test in %i threads" % pool .size ()
387
387
null_tasks = pool .num_tasks ()
388
- ni = 5000
388
+ ni = 1000
389
389
count = 3
390
390
aic = count + 2
391
391
make_task = lambda * args , ** kwargs : self ._add_task_chain (pool , ni , count , * args , ** kwargs )
392
+
392
393
ts , rcs = make_task ()
393
394
assert len (ts ) == aic
394
395
assert len (rcs ) == aic
395
396
assert pool .num_tasks () == null_tasks + len (ts )
396
- print pool ._tasks .nodes
397
397
398
398
# read(0)
399
399
#########
@@ -407,9 +407,6 @@ def _assert_async_dependent_tasks(self, pool):
407
407
# wait a tiny moment - there could still be something unprocessed on the
408
408
# queue, increasing the refcount
409
409
time .sleep (0.15 )
410
- import gc
411
- print gc .get_referrers (ts [- 1 ])
412
- print len (pool ._queue )
413
410
assert sys .getrefcount (ts [- 1 ]) == 2 # ts + call
414
411
assert sys .getrefcount (ts [0 ]) == 2 # ts + call
415
412
print >> sys .stderr , "Dependent Tasks: evaluated %i items of %i dependent in %f s ( %i items / s )" % (ni , aic , elapsed , ni / elapsed )
@@ -467,15 +464,15 @@ def _assert_async_dependent_tasks(self, pool):
467
464
items = rcs [- 1 ].read ()
468
465
assert len (items ) == fail_after
469
466
470
-
467
+
471
468
# MULTI-POOL
472
469
# If two pools are connected, this shold work as well.
473
470
# The second one has just one more thread
474
471
ts , rcs = make_task ()
475
472
476
473
# connect verifier channel as feeder of the second pool
477
- p2 = ThreadPool (1 )
478
- assert p2 .size () == 1
474
+ p2 = ThreadPool (0 ) # don't spawn new threads, they have the tendency not to wake up on mutexes
475
+ assert p2 .size () == 0
479
476
p2ts , p2rcs = self ._add_task_chain (p2 , ni , count , feeder_channel = rcs [- 1 ], id_offset = count )
480
477
assert p2ts [0 ] is None # we have no feeder task
481
478
assert rcs [- 1 ].pool_ref ()() is pool # it didnt change the pool
@@ -501,14 +498,8 @@ def _assert_async_dependent_tasks(self, pool):
501
498
502
499
503
500
del (ts )
504
- print "del rcs"
505
- print rcs [- 1 ]
506
- print sys .getrefcount (rcs [- 1 ])
507
501
del (rcs )
508
- # TODO: make this work - something with the refcount goes wrong,
509
- # they never get cleaned up properly
510
- ts = pool ._tasks .nodes
511
- print pool .num_tasks ()
502
+
512
503
assert pool .num_tasks () == null_tasks
513
504
514
505
@@ -585,7 +576,6 @@ def test_base(self):
585
576
# step one gear up - just one thread for now.
586
577
p .set_size (1 )
587
578
assert p .size () == 1
588
- print len (threading .enumerate ())
589
579
assert len (threading .enumerate ()) == num_threads + 1
590
580
# deleting the pool stops its threads - just to be sure ;)
591
581
# Its not synchronized, hence we wait a moment
0 commit comments