Skip to content

Refactor _run_parallel #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 137 additions & 144 deletions lib/test/unit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,9 @@ def read
end

def close
begin
@io.close unless @io.closed?
rescue IOError; end
@io.close unless @io.closed?
self
rescue IOError
end

def quit
Expand All @@ -323,6 +322,11 @@ def quit
@io.close
end

def kill
Process.kill(:KILL, @pid)
rescue Errno::ESRCH
end

def died(*additional)
@status = :quit
@io.close
Expand Down Expand Up @@ -452,166 +456,160 @@ def after_worker_quit(worker)
@ios = @workers.map(&:io)
end

def launch_worker
begin
worker = Worker.launch(@options[:ruby],@args)
rescue => e
abort "ERROR: Failed to launch job process - #{e.class}: #{e.message}"
end
worker.hook(:dead) do |w,info|
after_worker_quit w
after_worker_down w, *info if !info.empty? && !worker.quit_called
end
@workers << worker
@ios << worker.io
@workers_hash[worker.io] = worker
worker
end

def delete_worker(worker)
@workers_hash.delete worker.io
@workers.delete worker
@ios.delete worker.io
end

def quit_workers
return if @workers.empty?
@workers.reject! do |worker|
begin
timeout(1) do
worker.quit
end
rescue Errno::EPIPE
rescue Timeout::Error
end
worker.close
end

return if @workers.empty?
begin
timeout(0.2 * @workers.size) do
Process.waitall
end
rescue Timeout::Error
@workers.each do |worker|
worker.kill
end
@worker.clear
end
end

def start_watchdog
Thread.new do
while stat = Process.wait2
break if @interrupt # Break when interrupt
pid, stat = stat
w = (@workers + @dead_workers).find{|x| pid == x.pid }
next unless w
w = w.dup
if w.status != :quit && !w.quit_called?
# Worker down
w.died(nil, !stat.signaled? && stat.exitstatus)
end
end
end
end

def deal(io, type, result, rep, shutting_down = false)
worker = @workers_hash[io]
case worker.read
when /^okay$/
worker.status = :running
jobs_status
when /^ready(!)?$/
bang = $1
worker.status = :ready

return nil unless task = @tasks.shift
if @options[:separate] and not bang
worker.quit
worker = add_worker
end
worker.run(task, type)
@test_count += 1

jobs_status
when /^done (.+?)$/
r = Marshal.load($1.unpack("m")[0])
result << r[0..1] unless r[0..1] == [nil,nil]
rep << {file: worker.real_file, report: r[2], result: r[3], testcase: r[5]}
$:.push(*r[4]).uniq!
return true
when /^p (.+?)$/
del_jobs_status
print $1.unpack("m")[0]
jobs_status if @options[:job_status] == :replace
when /^after (.+?)$/
@warnings << Marshal.load($1.unpack("m")[0])
when /^bye (.+?)$/
after_worker_down worker, Marshal.load($1.unpack("m")[0])
when /^bye$/, nil
if shutting_down || worker.quit_called
after_worker_quit worker
else
after_worker_down worker
end
end
return false
end

def _run_parallel suites, type, result
if @options[:parallel] < 1
warn "Error: parameter of -j option should be greater than 0."
return
end

begin
# Require needed things for parallel running
require 'thread'
require 'timeout'
@tasks = @files.dup # Array of filenames.
@need_quit = false
@dead_workers = [] # Array of dead workers.
@warnings = []
@total_tests = @tasks.size.to_s(10)
shutting_down = false
rep = [] # FIXME: more good naming

# Array of workers.
launch_worker = Proc.new {
begin
worker = Worker.launch(@options[:ruby],@args)
rescue => e
warn "ERROR: Failed to launch job process - #{e.class}: #{e.message}"
exit 1
end
worker.hook(:dead) do |w,info|
after_worker_quit w
after_worker_down w, *info if !info.empty? && !worker.quit_called
end
worker
}
@workers = @options[:parallel].times.map(&launch_worker)
# Require needed things for parallel running
require 'thread'
require 'timeout'
@tasks = @files.dup # Array of filenames.
@need_quit = false
@dead_workers = [] # Array of dead workers.
@warnings = []
@total_tests = @tasks.size.to_s(10)
rep = [] # FIXME: more good naming

@workers = [] # Array of workers.
@workers_hash = {} # out-IO => worker
@ios = [] # Array of worker IOs
begin
# Thread: watchdog
watchdog = Thread.new do
while stat = Process.wait2
break if @interrupt # Break when interrupt
pid, stat = stat
w = (@workers + @dead_workers).find{|x| pid == x.pid }
next unless w
w = w.dup
if w.status != :quit && !w.quit_called?
# Worker down
w.died(nil, !stat.signaled? && stat.exitstatus)
end
end
end
watchdog = start_watchdog

@workers_hash = Hash[@workers.map {|w| [w.io,w] }] # out-IO => worker
@ios = @workers.map{|w| w.io } # Array of worker IOs
@options[:parallel].times {launch_worker}

while _io = IO.select(@ios)[0]
break unless _io.each do |io|
break if @need_quit
worker = @workers_hash[io]
case worker.read
when /^okay$/
worker.status = :running
jobs_status
when /^ready(!?)$/
bang = $1
worker.status = :ready
if @tasks.empty?
unless @workers.find{|x| [:running, :prepare].include? x.status}
break
end
else
if @options[:separate] && bang.empty?
@workers_hash.delete worker.io
@workers.delete worker
@ios.delete worker.io
new_worker = launch_worker.call()
worker.quit
@workers << new_worker
@ios << new_worker.io
@workers_hash[new_worker.io] = new_worker
worker = new_worker
end
worker.run(@tasks.shift, type)
@test_count += 1
end

jobs_status
when /^done (.+?)$/
r = Marshal.load($1.unpack("m")[0])
result << r[0..1] unless r[0..1] == [nil,nil]
rep << {file: worker.real_file,
report: r[2], result: r[3], testcase: r[5]}
$:.push(*r[4]).uniq!
when /^p (.+?)$/
del_jobs_status
print $1.unpack("m")[0]
jobs_status if @options[:job_status] == :replace
when /^after (.+?)$/
@warnings << Marshal.load($1.unpack("m")[0])
when /^bye (.+?)$/
after_worker_down worker, Marshal.load($1.unpack("m")[0])
when /^bye$/, nil
if shutting_down || worker.quit_called
after_worker_quit worker
else
after_worker_down worker
end
end
break if @need_quit
break if _io.any? do |io|
@need_quit or
(deal(io, type, result, rep).nil? and
!@workers.any? {|x| [:running, :prepare].include? x.status})
end
end
rescue Interrupt => e
@interrupt = e
return result
ensure
shutting_down = true

watchdog.kill if watchdog
if @interrupt
@ios.select!{|x| @workers_hash[x].status == :running }
while !@ios.empty? && (__io = IO.select(@ios,[],[],10))
_io = __io[0]
_io.each do |io|
worker = @workers_hash[io]
case worker.read
when /^done (.+?)$/
r = Marshal.load($1.unpack("m")[0])
result << r[0..1] unless r[0..1] == [nil,nil]
rep << {file: worker.real_file,
report: r[2], result: r[3], testcase: r[5]}
$:.push(*r[4]).uniq!
@ios.delete(io)
end
end
__io[0].reject! {|io| deal(io, type, result, rep, true)}
end
end

if @workers
@workers.each do |worker|
begin
timeout(1) do
worker.quit
end
rescue Errno::EPIPE
rescue Timeout::Error
end
worker.close
end
quit_workers

begin
timeout(0.2*@workers.size) do
Process.waitall
end
rescue Timeout::Error
@workers.each do |worker|
begin
Process.kill(:KILL,worker.pid)
rescue Errno::ESRCH; end
end
end
end

if !(@interrupt || !@options[:retry] || @need_quit) && @workers
unless @interrupt || !@options[:retry] || @need_quit
@options[:parallel] = false
suites, rep = rep.partition {|r| r[:testcase] && r[:file] && !r[:report].empty?}
suites.map {|r| r[:file]}.uniq.each {|file| require file}
Expand All @@ -634,12 +632,7 @@ def _run_parallel suites, type, result
end
unless @warnings.empty?
warn ""
ary = []
@warnings.reject! do |w|
r = ary.include?(w[1].message)
ary << w[1].message
r
end
@warnings.uniq! {|w| w[1].message}
@warnings.each do |w|
warn "#{w[0]}: #{w[1].message} (#{w[1].class})"
end
Expand Down