Skip to content

Commit cf525eb

Browse files
authored
Merge pull request rails#38893 from leequarella/parallelization
Improve ActiveSupport Parallelization error logging
2 parents 3053e54 + 5a35738 commit cf525eb

File tree

4 files changed

+191
-107
lines changed

4 files changed

+191
-107
lines changed

activesupport/lib/active_support/testing/parallelization.rb

Lines changed: 12 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,12 @@
33
require "drb"
44
require "drb/unix" unless Gem.win_platform?
55
require "active_support/core_ext/module/attribute_accessors"
6+
require "active_support/testing/parallelization/server"
7+
require "active_support/testing/parallelization/worker"
68

79
module ActiveSupport
810
module Testing
911
class Parallelization # :nodoc:
10-
class Server
11-
include DRb::DRbUndumped
12-
13-
def initialize
14-
@queue = Queue.new
15-
end
16-
17-
def record(reporter, result)
18-
raise DRb::DRbConnError if result.is_a?(DRb::DRbUnknown)
19-
20-
reporter.synchronize do
21-
reporter.record(result)
22-
end
23-
end
24-
25-
def <<(o)
26-
o[2] = DRbObject.new(o[2]) if o
27-
@queue << o
28-
end
29-
30-
def length
31-
@queue.length
32-
end
33-
34-
def pop; @queue.pop; end
35-
end
36-
3712
@@after_fork_hooks = []
3813

3914
def self.after_fork_hook(&blk)
@@ -50,96 +25,27 @@ def self.run_cleanup_hook(&blk)
5025

5126
cattr_reader :run_cleanup_hooks
5227

53-
def initialize(queue_size)
54-
@queue_size = queue_size
55-
@queue = Server.new
56-
@pool = []
57-
58-
@url = DRb.start_service("drbunix:", @queue).uri
59-
end
60-
61-
def after_fork(worker)
62-
self.class.after_fork_hooks.each do |cb|
63-
cb.call(worker)
64-
end
65-
end
66-
67-
def run_cleanup(worker)
68-
self.class.run_cleanup_hooks.each do |cb|
69-
cb.call(worker)
70-
end
28+
def initialize(worker_count)
29+
@worker_count = worker_count
30+
@queue_server = Server.new
31+
@worker_pool = []
32+
@url = DRb.start_service("drbunix:", @queue_server).uri
7133
end
7234

7335
def start
74-
@pool = @queue_size.times.map do |worker|
75-
title = "Rails test worker #{worker}"
76-
77-
fork do
78-
Process.setproctitle("#{title} - (starting)")
79-
80-
DRb.stop_service
81-
82-
begin
83-
after_fork(worker)
84-
rescue => setup_exception; end
85-
86-
queue = DRbObject.new_with_uri(@url)
87-
88-
while job = queue.pop
89-
klass = job[0]
90-
method = job[1]
91-
reporter = job[2]
92-
93-
Process.setproctitle("#{title} - #{klass}##{method}")
94-
95-
result = klass.with_info_handler reporter do
96-
Minitest.run_one_method(klass, method)
97-
end
98-
99-
add_setup_exception(result, setup_exception) if setup_exception
100-
101-
begin
102-
queue.record(reporter, result)
103-
rescue DRb::DRbConnError
104-
result.failures.map! do |failure|
105-
if failure.respond_to?(:error)
106-
# minitest >5.14.0
107-
error = DRb::DRbRemoteError.new(failure.error)
108-
else
109-
error = DRb::DRbRemoteError.new(failure.exception)
110-
end
111-
Minitest::UnexpectedError.new(error)
112-
end
113-
queue.record(reporter, result)
114-
end
115-
116-
Process.setproctitle("#{title} - (idle)")
117-
end
118-
ensure
119-
Process.setproctitle("#{title} - (stopping)")
120-
121-
run_cleanup(worker)
122-
end
36+
@worker_pool = @worker_count.times.map do |worker|
37+
Worker.new(worker, @url).start
12338
end
12439
end
12540

12641
def <<(work)
127-
@queue << work
42+
@queue_server << work
12843
end
12944

13045
def shutdown
131-
@queue_size.times { @queue << nil }
132-
@pool.each { |pid| Process.waitpid pid }
133-
134-
if @queue.length > 0
135-
raise "Queue not empty, but all workers have finished. This probably means that a worker crashed and #{@queue.length} tests were missed."
136-
end
46+
@queue_server.shutdown
47+
@worker_pool.each { |pid| Process.waitpid pid }
13748
end
138-
139-
private
140-
def add_setup_exception(result, setup_exception)
141-
result.failures.prepend Minitest::UnexpectedError.new(setup_exception)
142-
end
14349
end
14450
end
14551
end
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# frozen_string_literal: true
2+
3+
require "drb"
4+
require "drb/unix" unless Gem.win_platform?
5+
6+
module ActiveSupport
7+
module Testing
8+
class Parallelization # :nodoc:
9+
class Server
10+
include DRb::DRbUndumped
11+
12+
def initialize
13+
@queue = Queue.new
14+
@active_workers = Concurrent::Map.new
15+
@in_flight = Concurrent::Map.new
16+
end
17+
18+
def record(reporter, result)
19+
raise DRb::DRbConnError if result.is_a?(DRb::DRbUnknown)
20+
21+
@in_flight.delete([result.klass, result.name])
22+
23+
reporter.synchronize do
24+
reporter.record(result)
25+
end
26+
end
27+
28+
def <<(o)
29+
o[2] = DRbObject.new(o[2]) if o
30+
@queue << o
31+
end
32+
33+
def pop
34+
if test = @queue.pop
35+
@in_flight[[test[0].to_s, test[1]]] = test
36+
test
37+
end
38+
end
39+
40+
def start_worker(worker_id)
41+
@active_workers[worker_id] = true
42+
end
43+
44+
def stop_worker(worker_id)
45+
@active_workers.delete(worker_id)
46+
end
47+
48+
def active_workers?
49+
@active_workers.size > 0
50+
end
51+
52+
def shutdown
53+
# Wait for initial queue to drain
54+
while @queue.length != 0
55+
sleep 0.1
56+
end
57+
58+
@queue.close
59+
60+
# Wait until all workers have finished
61+
while active_workers?
62+
sleep 0.1
63+
end
64+
65+
@in_flight.values.each do |(klass, name, reporter)|
66+
result = Minitest::Result.from(klass.new(name))
67+
error = RuntimeError.new("result not reported")
68+
error.set_backtrace([""])
69+
result.failures << Minitest::UnexpectedError.new(error)
70+
reporter.synchronize do
71+
reporter.record(result)
72+
end
73+
end
74+
end
75+
end
76+
end
77+
end
78+
end
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# frozen_string_literal: true
2+
3+
module ActiveSupport
4+
module Testing
5+
class Parallelization # :nodoc:
6+
class Worker
7+
def initialize(number, url)
8+
@id = SecureRandom.uuid
9+
@number = number
10+
@url = url
11+
@setup_exception = nil
12+
end
13+
14+
def start
15+
fork do
16+
set_process_title("(starting)")
17+
18+
DRb.stop_service
19+
20+
@queue = DRbObject.new_with_uri(@url)
21+
@queue.start_worker(@id)
22+
23+
begin
24+
after_fork
25+
rescue => @setup_exception; end
26+
27+
work_from_queue
28+
ensure
29+
set_process_title("(stopping)")
30+
31+
run_cleanup
32+
@queue.stop_worker(@id)
33+
end
34+
end
35+
36+
def work_from_queue
37+
while job = @queue.pop
38+
perform_job(job)
39+
end
40+
end
41+
42+
def perform_job(job)
43+
klass = job[0]
44+
method = job[1]
45+
reporter = job[2]
46+
47+
set_process_title("#{klass}##{method}")
48+
49+
result = klass.with_info_handler reporter do
50+
Minitest.run_one_method(klass, method)
51+
end
52+
53+
safe_record(reporter, result)
54+
end
55+
56+
def safe_record(reporter, result)
57+
add_setup_exception(result) if @setup_exception
58+
59+
begin
60+
@queue.record(reporter, result)
61+
rescue DRb::DRbConnError
62+
result.failures.map! do |failure|
63+
if failure.respond_to?(:error)
64+
# minitest >5.14.0
65+
error = DRb::DRbRemoteError.new(failure.error)
66+
else
67+
error = DRb::DRbRemoteError.new(failure.exception)
68+
end
69+
Minitest::UnexpectedError.new(error)
70+
end
71+
@queue.record(reporter, result)
72+
end
73+
74+
set_process_title("(idle)")
75+
end
76+
77+
def after_fork
78+
Parallelization.after_fork_hooks.each do |cb|
79+
cb.call(@id)
80+
end
81+
end
82+
83+
def run_cleanup
84+
Parallelization.run_cleanup_hooks.each do |cb|
85+
cb.call(@id)
86+
end
87+
end
88+
89+
private
90+
def add_setup_exception(result)
91+
result.failures.prepend Minitest::UnexpectedError.new(@setup_exception)
92+
end
93+
94+
def set_process_title(status)
95+
Process.setproctitle("Rails test worker #{@number} - #{status}")
96+
end
97+
end
98+
end
99+
end
100+
end

railties/test/application/test_runner_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ def test_crash
589589

590590
output = run_test_command(file_name)
591591

592-
assert_match %r{Queue not empty, but all workers have finished. This probably means that a worker crashed and 1 tests were missed.}, output
592+
assert_match %r{RuntimeError: result not reported}, output
593593
end
594594

595595
def test_run_in_parallel_with_threads

0 commit comments

Comments
 (0)