diff --git a/async.gemspec b/async.gemspec index ca7f3515..28a7e3f7 100644 --- a/async.gemspec +++ b/async.gemspec @@ -26,6 +26,6 @@ Gem::Specification.new do |spec| spec.add_dependency "console", "~> 1.10" spec.add_dependency "fiber-annotation" - spec.add_dependency "io-event", "~> 1.1" + spec.add_dependency "io-event", "~> 1.5", ">= 1.5.1" spec.add_dependency "timers", "~> 4.1" end diff --git a/examples/count/fibers.rb b/examples/count/fibers.rb new file mode 100755 index 00000000..14865254 --- /dev/null +++ b/examples/count/fibers.rb @@ -0,0 +1,30 @@ +#!/usr/bin/env ruby + +require 'async' +require 'benchmark' + +transitions = [] + +puts "=========== FIBERS ===========" +puts +count = 0 +time = Benchmark.measure do + Sync do + 5.times do + [ + Async do ; transitions << "A1" + puts "Task 1: count is #{count}" ; transitions << "A2" + count += 1 ; transitions << "A3" + sleep(0.1) ; transitions << "A4" + end, + Async do ; transitions << "B1" + puts "Task 2: count is #{count}" ; transitions << "B2" + count += 1 ; transitions << "B3" + sleep(0.1) ; transitions << "B4" + end + ].map(&:wait) + end + end +end +puts "#{time.real.round(2)} seconds to run. Final count is #{count}" +puts transitions.join(" ") diff --git a/examples/count/threads.rb b/examples/count/threads.rb new file mode 100755 index 00000000..4be5f038 --- /dev/null +++ b/examples/count/threads.rb @@ -0,0 +1,27 @@ +#!/usr/bin/env ruby + +require 'benchmark' + +transitions = [] + +puts "=========== THREADS ===========" +puts +count = 0 +time = Benchmark.measure do + 5.times do + [ + Thread.new do ; transitions << "A1" + puts "Task 1: count is #{count}" ; transitions << "A2" + count += 1 ; transitions << "A3" + sleep(0.1) ; transitions << "A4" + end, + Thread.new do ; transitions << "B1" + puts "Task 2: count is #{count}" ; transitions << "B2" + count += 1 ; transitions << "B3" + sleep(0.1) ; transitions << "B4" + end + ].map(&:join) + end +end +puts "#{time.real.round(2)} seconds to run. Final count is #{count}" +puts transitions.join(" ") diff --git a/examples/load/test.rb b/examples/load/test.rb new file mode 100755 index 00000000..ef42c55f --- /dev/null +++ b/examples/load/test.rb @@ -0,0 +1,27 @@ +#!/usr/bin/env ruby + +require_relative '../../lib/async' +require_relative '../../lib/async/idler' + +Async do + idler = Async::Idler.new(0.8) + + Async do + while true + idler.async do + $stdout.write '.' + while true + sleep 0.1 + end + end + end + end + + scheduler = Fiber.scheduler + while true + load = scheduler.load + + $stdout.write "\nLoad: #{load} " + sleep 1.0 + end +end diff --git a/lib/async/idler.rb b/lib/async/idler.rb new file mode 100644 index 00000000..9226d62a --- /dev/null +++ b/lib/async/idler.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +module Async + class Idler + def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil) + @maximum_load = maximum_load + @backoff = backoff + @parent = parent + end + + def async(*arguments, parent: (@parent or Task.current), **options, &block) + wait + + # It is crucial that we optimistically execute the child task, so that we prevent a tight loop invoking this method from consuming all available resources. + parent.async(*arguments, **options, &block) + end + + def wait + scheduler = Fiber.scheduler + backoff = nil + + while true + load = scheduler.load + break if load < @maximum_load + + if backoff + sleep(backoff) + backoff *= 2.0 + else + scheduler.yield + backoff = @backoff + end + end + end + end +end diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 63e1245c..3b2703f1 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -37,9 +37,33 @@ def initialize(parent = nil, selector: nil) @blocked = 0 + @busy_time = 0.0 + @idle_time = 0.0 + @timers = ::Timers::Group.new end + # Compute the scheduler load according to the busy and idle times that are updated by the run loop. + # @returns [Float] The load of the scheduler. 0.0 means no load, 1.0 means fully loaded or over-loaded. + def load + total_time = @busy_time + @idle_time + + # If the total time is zero, then the load is zero: + return 0.0 if total_time.zero? + + # We normalize to a 1 second window: + if total_time > 1.0 + ratio = 1.0 / total_time + @busy_time *= ratio + @idle_time *= ratio + + # We don't need to divide here as we've already normalised it to a 1s window: + return @busy_time + else + return @busy_time / total_time + end + end + def scheduler_close # If the execution context (thread) was handling an exception, we want to exit as quickly as possible: unless $! @@ -267,6 +291,8 @@ def run_once(timeout = nil) # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. # @returns [Boolean] Whether there is more work to do. private def run_once!(timeout = 0) + start_time = Async::Clock.now + interval = @timers.wait_interval # If there is no interval to wait (thus no timers), and no tasks, we could be done: @@ -288,6 +314,15 @@ def run_once(timeout = nil) @timers.fire + # Compute load: + end_time = Async::Clock.now + total_duration = end_time - start_time + idle_duration = @selector.idle_duration + busy_duration = total_duration - idle_duration + + @busy_time += busy_duration + @idle_time += idle_duration + # The reactor still has work to do: return true end diff --git a/lib/async/task.rb b/lib/async/task.rb index ecda7230..2b02f689 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -208,7 +208,7 @@ def wait # @parameter later [Boolean] Whether to stop the task later, or immediately. def stop(later = false) if self.stopped? - # If we already stopped this task... don't try to stop it again: + # If the task is already stopped, a `stop` state transition re-enters the same state which is a no-op. However, we will also attempt to stop any running children too. This can happen if the children did not stop correctly the first time around. Doing this should probably be considered a bug, but it's better to be safe than sorry. return stopped! end diff --git a/lib/async/version.rb b/lib/async/version.rb index 97a49215..ff68cae6 100644 --- a/lib/async/version.rb +++ b/lib/async/version.rb @@ -4,5 +4,5 @@ # Copyright, 2017-2024, by Samuel Williams. module Async - VERSION = "2.8.2" + VERSION = "2.9.0" end diff --git a/test/async/idler.rb b/test/async/idler.rb new file mode 100644 index 00000000..dcc7a336 --- /dev/null +++ b/test/async/idler.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2018-2023, by Samuel Williams. + +require 'async/idler' +require 'sus/fixtures/async' + +require 'chainable_async' + +describe Async::Idler do + include Sus::Fixtures::Async::ReactorContext + let(:idler) {subject.new(0.5)} + + it 'can schedule tasks up to the desired load' do + # Generate the load: + Async do + while true + idler.async do + while true + sleep 0.1 + end + end + end + end + + # This test must be longer than the test window... + sleep 1.1 + + # Verify that the load is within the desired range: + expect(Fiber.scheduler.load).to be_within(0.1).of(0.5) + end + + it_behaves_like ChainableAsync +end