diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md index a865b48b3..0f6450e5c 100644 --- a/.github/CONTRIBUTING.md +++ b/.github/CONTRIBUTING.md @@ -66,7 +66,7 @@ There are a few guidelines which we follow when adding features. Consider that s #### Write Documentation -Document any external behavior in the [README](README.md). +Document any external behavior in the [README](../README.md). #### Commit Changes @@ -106,7 +106,7 @@ git push origin my-feature-branch -f #### Update CHANGELOG -Update the [CHANGELOG](CHANGELOG.md) with a description of what you have changed. +Update the [CHANGELOG](../CHANGELOG.md) with a description of what you have changed. #### Check on Your Pull Request diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 000000000..5ace4600a --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,6 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 01be2df2b..093571fa6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -6,22 +6,27 @@ concurrency: jobs: build: - runs-on: ubuntu-latest + name: "Tests: Ruby ${{ matrix.ruby }} - ${{ matrix.os }}" + runs-on: ${{ matrix.os }}-latest + timeout-minutes: 10 strategy: fail-fast: false matrix: - ruby: [2.3, 2.4, 2.5, 2.6, 2.7, '3.0', 3.1, 3.2, jruby, truffleruby] + ruby: [2.3, 2.4, 2.5, 2.6, 2.7, '3.0', 3.1, 3.2, 3.3, 3.4, jruby, truffleruby] + os: [ubuntu] + include: + - ruby: ruby + os: windows env: JAVA_OPTS: '-Xmx1024m' RUBYOPT: '-w' JRUBY_OPTS: '--dev' - name: "Tests: Ruby ${{ matrix.ruby }}" steps: - name: Clone Repo - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup Ruby ${{ matrix.ruby }} uses: ruby/setup-ruby@v1 with: @@ -30,17 +35,33 @@ jobs: - name: Run tests run: bundle exec rake ci + no-extensions: + name: "Test without C extension" + runs-on: ubuntu-latest + timeout-minutes: 10 + env: + RUBYOPT: '-w' + steps: + - uses: actions/checkout@v4 + - uses: ruby/setup-ruby@v1 + with: + ruby-version: ruby + bundler-cache: true + - name: Run tests + run: bundle exec rake spec:ci + isolated: name: "Test isolated" runs-on: ubuntu-latest + timeout-minutes: 10 strategy: fail-fast: false matrix: - ruby: [ 2.3, 3.2 ] # oldest and latest CRuby + ruby: [ 2.3, ruby ] # oldest and latest CRuby env: RUBYOPT: '-w' steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - uses: ruby/setup-ruby@v1 with: ruby-version: ${{ matrix.ruby }} diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 34dab01fd..780f0741c 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -18,7 +18,7 @@ jobs: env: BUNDLE_WITH: "documentation" steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 - uses: ruby/setup-ruby@v1 @@ -29,7 +29,7 @@ jobs: - run: ruby support/generate_docs.rb - name: Upload artifact - uses: actions/upload-pages-artifact@v1 + uses: actions/upload-pages-artifact@v3 with: path: docs @@ -45,4 +45,4 @@ jobs: steps: - name: Deploy to GitHub Pages id: deployment - uses: actions/deploy-pages@v1 + uses: actions/deploy-pages@v4 diff --git a/.github/workflows/experimental.yml b/.github/workflows/experimental.yml index 6e469a4a1..28048747c 100644 --- a/.github/workflows/experimental.yml +++ b/.github/workflows/experimental.yml @@ -3,12 +3,11 @@ on: schedule: - cron: '0 0 * * *' # Runs every day at midnight workflow_dispatch: - branches: [ master ] jobs: build: runs-on: ubuntu-latest - continue-on-error: true + timeout-minutes: 10 strategy: matrix: @@ -22,7 +21,7 @@ jobs: name: "Tests: Experimental Ruby ${{ matrix.ruby }}" steps: - name: Clone Repo - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup Ruby ${{ matrix.ruby }} uses: ruby/setup-ruby@v1 with: diff --git a/CHANGELOG.md b/CHANGELOG.md index 17a2a6415..30810b724 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,47 @@ ## Current +## Release v1.3.5, edge v0.7.2 (15 January 2025) + +concurrent-ruby: + +* (#1062) Remove dependency on logger. + +concurrent-ruby-edge: + +* (#1062) Remove dependency on logger. + +## Release v1.3.4 (10 August 2024) + +* (#1060) Fix bug with return value of `Concurrent.available_processor_count` when `cpu.cfs_quota_us` is -1. +* (#1058) Add `Concurrent.cpu_shares` that is cgroups aware. + +## Release v1.3.3 (9 June 2024) + +* (#1053) Improve the speed of `Concurrent.physical_processor_count` on Windows. + +## Release v1.3.2, edge v0.7.1 (7 June 2024) + +concurrent-ruby: + +* (#1051) Remove dependency on `win32ole`. + +concurrent-ruby-edge: + +* (#1052) Fix dependency on `concurrent-ruby` to allow the latest release. + +## Release v1.3.1 (29 May 2024) + +* Release 1.3.0 was broken when pushed to RubyGems. 1.3.1 is a packaging fix. + +## Release v1.3.0 (28 May 2024) + +* (#1042) Align Java Executor Service behavior for `shuttingdown?`, `shutdown?` +* (#1038) Add `Concurrent.available_processor_count` that is cgroups aware. + +## Release v1.2.3 (16 Jan 2024) + +* See [the GitHub release](https://github.com/ruby-concurrency/concurrent-ruby/releases/tag/v1.2.3) for details. + ## Release v1.2.2 (24 Feb 2023) * (#993) Fix arguments passed to `Concurrent::Map`'s `default_proc`. @@ -264,7 +306,7 @@ concurrent-ruby-edge: * Simplification of `RubySingleThreadExecutor` * `Async` improvements - Each object uses its own `SingleThreadExecutor` instead of the global thread pool. - - No longers supports executor injection + - No longer supports executor injection - Much better documentation * `Atom` updates - No longer `Dereferenceable` @@ -439,7 +481,7 @@ Please see the [roadmap](https://github.com/ruby-concurrency/concurrent-ruby/iss * Fixed bug with return value of `Concurrent::Actor::Utils::Pool#ask` * Fixed timing bug in `TimerTask` * Fixed bug when creating a `JavaThreadPoolExecutor` with minimum pool size of zero -* Removed confusing warning when not using native extenstions +* Removed confusing warning when not using native extensions * Improved documentation ## Release v0.7.0 (13 August 2014) diff --git a/Gemfile b/Gemfile index b336031b7..1786c8b21 100644 --- a/Gemfile +++ b/Gemfile @@ -1,18 +1,18 @@ source 'https://rubygems.org' -require File.join(File.dirname(__FILE__), 'lib/concurrent-ruby/concurrent/version') -require File.join(File.dirname(__FILE__ ), 'lib/concurrent-ruby-edge/concurrent/edge/version') +version = File.read("#{__dir__}/lib/concurrent-ruby/concurrent/version.rb")[/'(.+)'/, 1] or raise +edge_version = File.read("#{__dir__}/lib/concurrent-ruby-edge/concurrent/edge/version.rb")[/'(.+)'/, 1] or raise no_path = ENV['NO_PATH'] options = no_path ? {} : { path: '.' } -gem 'concurrent-ruby', Concurrent::VERSION, options -gem 'concurrent-ruby-edge', Concurrent::EDGE_VERSION, options -gem 'concurrent-ruby-ext', Concurrent::VERSION, options.merge(platform: :mri) +gem 'concurrent-ruby', version, options +gem 'concurrent-ruby-edge', edge_version, options +gem 'concurrent-ruby-ext', version, options.merge(platform: :mri) group :development do gem 'rake', '~> 13.0' - gem 'rake-compiler', '~> 1.0', '>= 1.0.7' + gem 'rake-compiler', '~> 1.0', '>= 1.0.7', '!= 1.2.4' gem 'rake-compiler-dock', '~> 1.0' gem 'pry', '~> 0.11', platforms: :mri end diff --git a/README.md b/README.md index 15f011b98..6635e815a 100644 --- a/README.md +++ b/README.md @@ -207,7 +207,7 @@ Deprecated features are still available and bugs are being fixed, but new featur These are available in the `concurrent-ruby-edge` companion gem. These features are under active development and may change frequently. They are expected not to -keep backward compatibility (there may also lack tests and documentation). Semantic versions will +keep backward compatibility (they may also lack tests and documentation). Semantic versions will be obeyed though. Features developed in `concurrent-ruby-edge` are expected to move to `concurrent-ruby` when final. @@ -284,7 +284,7 @@ To use the tools in the Edge gem it must be required separately: require 'concurrent-edge' ``` -If the library does not behave as expected, `Concurrent.use_stdlib_logger(Logger::DEBUG)` could +If the library does not behave as expected, `Concurrent.use_simple_logger(:DEBUG)` could help to reveal the problem. ## Installation @@ -358,7 +358,8 @@ best practice is to depend on `concurrent-ruby` and let users to decide if they * Recent CRuby * JRuby, `rbenv install jruby-9.2.17.0` * Set env variable `CONCURRENT_JRUBY_HOME` to point to it, e.g. `/usr/local/opt/rbenv/versions/jruby-9.2.17.0` -* Install Docker, required for Windows builds +* Install Docker or Podman, required for Windows builds +* If `bundle config get path` is set, use `bundle config set --local path.system true` otherwise the `gem name, path: '.'` gems won't be found (Bundler limitation). ### Publishing the Gem @@ -375,6 +376,8 @@ best practice is to depend on `concurrent-ruby` and let users to decide if they * [Benoit Daloze](https://github.com/eregon) * [Matthew Draper](https://github.com/matthewd) * [Rafael França](https://github.com/rafaelfranca) +* [Charles Oliver Nutter](https://github.com/headius) +* [Ben Sheldon](https://github.com/bensheldon) * [Samuel Williams](https://github.com/ioquatix) ### Special Thanks to diff --git a/Rakefile b/Rakefile index f167f4659..3d157d475 100644 --- a/Rakefile +++ b/Rakefile @@ -1,6 +1,5 @@ -require_relative 'lib/concurrent-ruby/concurrent/version' -require_relative 'lib/concurrent-ruby-edge/concurrent/edge/version' -require_relative 'lib/concurrent-ruby/concurrent/utility/engine' +version = File.read("#{__dir__}/lib/concurrent-ruby/concurrent/version.rb")[/'(.+)'/, 1] or raise +edge_version = File.read("#{__dir__}/lib/concurrent-ruby-edge/concurrent/edge/version.rb")[/'(.+)'/, 1] or raise core_gemspec = Gem::Specification.load File.join(__dir__, 'concurrent-ruby.gemspec') ext_gemspec = Gem::Specification.load File.join(__dir__, 'concurrent-ruby-ext.gemspec') @@ -8,14 +7,16 @@ edge_gemspec = Gem::Specification.load File.join(__dir__, 'concurrent-ruby-edge. require 'rake/javaextensiontask' -ENV['JRUBY_HOME'] = ENV['CONCURRENT_JRUBY_HOME'] if ENV['CONCURRENT_JRUBY_HOME'] && !Concurrent.on_jruby? +ENV['JRUBY_HOME'] = ENV['CONCURRENT_JRUBY_HOME'] if ENV['CONCURRENT_JRUBY_HOME'] && RUBY_ENGINE != 'jruby' Rake::JavaExtensionTask.new('concurrent_ruby', core_gemspec) do |ext| ext.ext_dir = 'ext/concurrent-ruby' ext.lib_dir = 'lib/concurrent-ruby/concurrent' + ext.source_version = '8' + ext.target_version = '8' end -unless Concurrent.on_jruby? || Concurrent.on_truffleruby? +if RUBY_ENGINE == 'ruby' require 'rake/extensiontask' Rake::ExtensionTask.new('concurrent_ruby_ext', ext_gemspec) do |ext| @@ -28,6 +29,10 @@ unless Concurrent.on_jruby? || Concurrent.on_truffleruby? end end +def which?(executable) + !`which #{executable} 2>/dev/null`.empty? +end + require 'rake_compiler_dock' namespace :repackage do desc '* with Windows fat distributions' @@ -42,12 +47,19 @@ namespace :repackage do Rake::Task['lib/concurrent-ruby/concurrent/concurrent_ruby.jar'].invoke # build all gem files + rack_compiler_dock_kwargs = {} + if which?('podman') and (!which?('docker') || `docker --version`.include?('podman')) + # podman and only podman available, so RakeCompilerDock will use podman, otherwise it uses docker + rack_compiler_dock_kwargs = { + options: ['--privileged'], # otherwise the directory in the image is empty + runas: false + } + end %w[x86-mingw32 x64-mingw32].each do |plat| RakeCompilerDock.sh( "bundle install --local && bundle exec rake native:#{plat} gem --trace", platform: plat, - options: ['--privileged'], # otherwise the directory in the image is empty - runas: false) + **rack_compiler_dock_kwargs) end end end @@ -57,7 +69,7 @@ require 'rubygems' require 'rubygems/package_task' Gem::PackageTask.new(core_gemspec) {} if core_gemspec -Gem::PackageTask.new(ext_gemspec) {} if ext_gemspec && !Concurrent.on_jruby? +Gem::PackageTask.new(ext_gemspec) {} if ext_gemspec && RUBY_ENGINE != 'jruby' Gem::PackageTask.new(edge_gemspec) {} if edge_gemspec CLEAN.include( @@ -85,9 +97,9 @@ begin task :installed do Bundler.with_original_env do Dir.chdir(__dir__) do - sh "gem install pkg/concurrent-ruby-#{Concurrent::VERSION}.gem" - sh "gem install pkg/concurrent-ruby-ext-#{Concurrent::VERSION}.gem" if Concurrent.on_cruby? - sh "gem install pkg/concurrent-ruby-edge-#{Concurrent::EDGE_VERSION}.gem" + sh "gem install pkg/concurrent-ruby-#{version}.gem" + sh "gem install pkg/concurrent-ruby-ext-#{version}.gem" if RUBY_ENGINE == 'ruby' + sh "gem install pkg/concurrent-ruby-edge-#{edge_version}.gem" ENV['NO_PATH'] = 'true' sh 'bundle update' sh 'bundle exec rake spec:ci' @@ -117,7 +129,7 @@ rescue LoadError => e puts 'RSpec is not installed, skipping test task definitions: ' + e.message end -current_yard_version_name = Concurrent::VERSION +current_yard_version_name = version begin require 'yard' @@ -221,6 +233,8 @@ namespace :release do # Depends on environment of @pitr-ch task :checks do + raise '$CONCURRENT_JRUBY_HOME must be set' unless ENV['CONCURRENT_JRUBY_HOME'] + Dir.chdir(__dir__) do sh 'test -z "$(git status --porcelain)"' do |ok, res| unless ok @@ -251,15 +265,19 @@ namespace :release do desc '* test actual installed gems instead of cloned repository on MRI and JRuby' task :test do + raise '$CONCURRENT_JRUBY_HOME must be set' unless ENV['CONCURRENT_JRUBY_HOME'] + Dir.chdir(__dir__) do puts "Testing with the installed gem" Bundler.with_original_env do sh 'ruby -v' + sh 'bundle install' sh 'bundle exec rake spec:installed' - env = { "PATH" => "#{ENV['CONCURRENT_JRUBY_HOME']}/bin:#{ENV['PATH']}" } + env = { "PATH" => "#{ENV.fetch('CONCURRENT_JRUBY_HOME')}/bin:#{ENV['PATH']}" } sh env, 'ruby -v' + sh env, 'bundle install' sh env, 'bundle exec rake spec:installed' end @@ -271,8 +289,8 @@ namespace :release do task :publish => ['publish:ask', 'publish:tag', 'publish:rubygems', 'publish:post_steps'] namespace :publish do - publish_base = true - publish_edge = false + publish_base = nil + publish_edge = nil task :ask do begin @@ -280,8 +298,15 @@ namespace :release do input = STDIN.gets.strip.downcase end until %w(y n).include?(input) exit 1 if input == 'n' + + begin + STDOUT.puts 'Do you want to publish `concurrent-ruby`? (y/n)' + input = STDIN.gets.strip.downcase + end until %w(y n).include?(input) + publish_base = input == 'y' + begin - STDOUT.puts 'It will publish `concurrent-ruby`. Do you want to publish `concurrent-ruby-edge`? (y/n)' + STDOUT.puts 'Do you want to publish `concurrent-ruby-edge`? (y/n)' input = STDIN.gets.strip.downcase end until %w(y n).include?(input) publish_edge = input == 'y' @@ -290,21 +315,21 @@ namespace :release do desc '** tag HEAD with current version and push to github' task :tag => :ask do Dir.chdir(__dir__) do - sh "git tag v#{Concurrent::VERSION}" if publish_base - sh "git push origin v#{Concurrent::VERSION}" if publish_base - sh "git tag edge-v#{Concurrent::EDGE_VERSION}" if publish_edge - sh "git push origin edge-v#{Concurrent::EDGE_VERSION}" if publish_edge + sh "git tag v#{version}" if publish_base + sh "git push origin v#{version}" if publish_base + sh "git tag edge-v#{edge_version}" if publish_edge + sh "git push origin edge-v#{edge_version}" if publish_edge end end desc '** push all *.gem files to rubygems' task :rubygems => :ask do Dir.chdir(__dir__) do - sh "gem push pkg/concurrent-ruby-#{Concurrent::VERSION}.gem" if publish_base - sh "gem push pkg/concurrent-ruby-edge-#{Concurrent::EDGE_VERSION}.gem" if publish_edge - sh "gem push pkg/concurrent-ruby-ext-#{Concurrent::VERSION}.gem" if publish_base - sh "gem push pkg/concurrent-ruby-ext-#{Concurrent::VERSION}-x64-mingw32.gem" if publish_base - sh "gem push pkg/concurrent-ruby-ext-#{Concurrent::VERSION}-x86-mingw32.gem" if publish_base + sh "gem push pkg/concurrent-ruby-#{version}.gem" if publish_base + sh "gem push pkg/concurrent-ruby-edge-#{edge_version}.gem" if publish_edge + sh "gem push pkg/concurrent-ruby-ext-#{version}.gem" if publish_base + sh "gem push pkg/concurrent-ruby-ext-#{version}-x64-mingw32.gem" if publish_base + sh "gem push pkg/concurrent-ruby-ext-#{version}-x86-mingw32.gem" if publish_base end end diff --git a/concurrent-ruby-edge.gemspec b/concurrent-ruby-edge.gemspec index 02383d346..719548b95 100644 --- a/concurrent-ruby-edge.gemspec +++ b/concurrent-ruby-edge.gemspec @@ -1,11 +1,11 @@ -require File.join(File.dirname(__FILE__ ), 'lib/concurrent-ruby/concurrent/version') -require File.join(File.dirname(__FILE__ ), 'lib/concurrent-ruby-edge/concurrent/edge/version') +version = File.read("#{__dir__}/lib/concurrent-ruby/concurrent/version.rb")[/'(.+)'/, 1] or raise +edge_version = File.read("#{__dir__}/lib/concurrent-ruby-edge/concurrent/edge/version.rb")[/'(.+)'/, 1] or raise Gem::Specification.new do |s| git_files = `git ls-files`.split("\n") s.name = 'concurrent-ruby-edge' - s.version = Concurrent::EDGE_VERSION + s.version = edge_version s.platform = Gem::Platform::RUBY s.authors = ["Jerry D'Antonio", 'Petr Chalupa', 'The Ruby Concurrency Team'] s.email = 'concurrent-ruby@googlegroups.com' @@ -25,5 +25,5 @@ Please see http://concurrent-ruby.com for more information. s.required_ruby_version = '>= 2.3' - s.add_runtime_dependency 'concurrent-ruby', "~> #{Concurrent::VERSION}" + s.add_runtime_dependency 'concurrent-ruby', "~> #{version.split('.')[0..1].join('.')}" end diff --git a/concurrent-ruby-ext.gemspec b/concurrent-ruby-ext.gemspec index 534822de0..4bfafdd5b 100644 --- a/concurrent-ruby-ext.gemspec +++ b/concurrent-ruby-ext.gemspec @@ -1,8 +1,8 @@ -require File.join(File.dirname(__FILE__ ), 'lib/concurrent-ruby/concurrent/version') +version = File.read("#{__dir__}/lib/concurrent-ruby/concurrent/version.rb")[/'(.+)'/, 1] or raise Gem::Specification.new do |s| s.name = 'concurrent-ruby-ext' - s.version = Concurrent::VERSION + s.version = version s.platform = Gem::Platform::RUBY s.authors = ["Jerry D'Antonio", 'The Ruby Concurrency Team'] s.email = 'concurrent-ruby@googlegroups.com' @@ -23,5 +23,5 @@ Gem::Specification.new do |s| s.required_ruby_version = '>= 2.3' - s.add_runtime_dependency 'concurrent-ruby', "= #{Concurrent::VERSION}" + s.add_runtime_dependency 'concurrent-ruby', "= #{version}" end diff --git a/concurrent-ruby.gemspec b/concurrent-ruby.gemspec index 863201b54..1537d377d 100644 --- a/concurrent-ruby.gemspec +++ b/concurrent-ruby.gemspec @@ -1,10 +1,10 @@ -require File.join(File.dirname(__FILE__ ), 'lib/concurrent-ruby/concurrent/version') +version = File.read("#{__dir__}/lib/concurrent-ruby/concurrent/version.rb")[/'(.+)'/, 1] or raise Gem::Specification.new do |s| git_files = `git ls-files`.split("\n") s.name = 'concurrent-ruby' - s.version = Concurrent::VERSION + s.version = version s.platform = Gem::Platform::RUBY s.authors = ["Jerry D'Antonio", 'Petr Chalupa', 'The Ruby Concurrency Team'] s.email = 'concurrent-ruby@googlegroups.com' diff --git a/docs-source/actor/celluloid_benchmark.rb b/docs-source/actor/celluloid_benchmark.rb index dea20e553..30ca1e0d6 100644 --- a/docs-source/actor/celluloid_benchmark.rb +++ b/docs-source/actor/celluloid_benchmark.rb @@ -7,11 +7,7 @@ # require 'stackprof' # require 'profiler' -logger = Logger.new($stderr) -logger.level = Logger::INFO -Concurrent.configuration.logger = lambda do |level, progname, message = nil, &block| - logger.add level, message, progname, &block -end +Concurrent.use_simple_logger(:INFO) scale = 1 ADD_TO = (100 * scale).to_i diff --git a/docs-source/actor/io.in.rb b/docs-source/actor/io.in.rb index 6bbc1a07c..3c6a1bdb5 100644 --- a/docs-source/actor/io.in.rb +++ b/docs-source/actor/io.in.rb @@ -1,7 +1,6 @@ require 'concurrent' -# logger = Logger.new(STDOUT) -# Concurrent.configuration.logger = logger.method(:add) +# Concurrent.use_simple_logger(:WARN, STDOUT) # First option is to use operation pool diff --git a/docs-source/actor/io.out.rb b/docs-source/actor/io.out.rb index b96cfc72a..b879c4fb8 100644 --- a/docs-source/actor/io.out.rb +++ b/docs-source/actor/io.out.rb @@ -1,7 +1,6 @@ require 'concurrent' # => false -# logger = Logger.new(STDOUT) -# Concurrent.configuration.logger = logger.method(:add) +# Concurrent.use_simple_logger(:WARN, STDOUT) # First option is to use operation pool diff --git a/docs-source/actor/main.md b/docs-source/actor/main.md index 43cf72798..e3e67f62c 100644 --- a/docs-source/actor/main.md +++ b/docs-source/actor/main.md @@ -124,12 +124,12 @@ Spawned actor cannot be garbage-collected until it's terminated. There is a refe Actors are running on shared thread poll which allows user to create many actors cheaply. Downside is that these actors cannot be directly used to do IO or other blocking operations. -Blocking operations could starve the `default_task_pool`. However there are two options: +Blocking operations could starve the `global_fast_executor`. However there are two options: -- Create an regular actor which will schedule blocking operations in `global_operation_pool` +- Create an regular actor which will schedule blocking operations in `global_io_executor` (which is intended for blocking operations) sending results back to self in messages. -- Create an actor using `global_operation_pool` instead of `global_task_pool`, e.g. - `AnIOActor.spawn name: :blocking, executor: Concurrent.configuration.global_operation_pool`. +- Create an actor using `global_io_executor` instead of `global_fast_executor`, e.g. + `AnIOActor.spawn name: :blocking, executor: Concurrent.global_io_executor`. ### Example diff --git a/docs-source/erlang_actor.in.md b/docs-source/erlang_actor.in.md index d8f7a6da0..ed0b94e3c 100644 --- a/docs-source/erlang_actor.in.md +++ b/docs-source/erlang_actor.in.md @@ -254,7 +254,7 @@ The one exception from the original Erlang naming is exit. To avoid clashing with `Kernel#exit` it's called `terminate`. Until there is more information available here, the chapters listed below from -a book [lern you some Erlang](https://learnyousomeerlang.com) +a book [learn you some Erlang](https://learnyousomeerlang.com) are excellent source of information. The Ruby ErlangActor implementation has same behaviour. diff --git a/docs-source/erlang_actor.out.md b/docs-source/erlang_actor.out.md index f062557a4..f94d1492c 100644 --- a/docs-source/erlang_actor.out.md +++ b/docs-source/erlang_actor.out.md @@ -276,7 +276,7 @@ The one exception from the original Erlang naming is exit. To avoid clashing with `Kernel#exit` it's called `terminate`. Until there is more information available here, the chapters listed below from -a book [lern you some Erlang](https://learnyousomeerlang.com) +a book [learn you some Erlang](https://learnyousomeerlang.com) are excellent source of information. The Ruby ErlangActor implementation has same behaviour. diff --git a/docs-source/signpost.md b/docs-source/signpost.md index cff62de7d..7b52f16ac 100644 --- a/docs-source/signpost.md +++ b/docs-source/signpost.md @@ -3,7 +3,7 @@ Pick a `concurrent-ruby` version: * [master](./master/index.html) -* [1.2.0 with edge 0.7.0](./1.2.0/index.html) +* [1.3.5 with edge 0.7.2](./1.3.5/index.html) * [1.1.10 with edge 0.6.0](./1.1.10/index.html) * [1.1.9 with edge 0.6.0](./1.1.9/index.html) * [1.1.8 with edge 0.6.0](./1.1.8/index.html) diff --git a/docs-source/throttle.in.md b/docs-source/throttle.in.md index ee2b317fc..a0d08b63a 100644 --- a/docs-source/throttle.in.md +++ b/docs-source/throttle.in.md @@ -34,7 +34,7 @@ Notice that the returned array has no number bigger than 2 therefore the concurrency level of the block with the `do_stuff` was never bigger than 2. ```ruby -# runs a block, and returns he observed concurrency level during the execution +# runs a block, and returns the observed concurrency level during the execution def monitor_concurrency_level(concurrency_level, &block) concurrency_level.increment block.call diff --git a/docs-source/throttle.out.md b/docs-source/throttle.out.md index 7ff59137e..944d1407b 100644 --- a/docs-source/throttle.out.md +++ b/docs-source/throttle.out.md @@ -36,7 +36,7 @@ Notice that the returned array has no number bigger than 2 therefore the concurrency level of the block with the `do_stuff` was never bigger than 2. ```ruby -# runs a block, and returns he observed concurrency level during the execution +# runs a block, and returns the observed concurrency level during the execution def monitor_concurrency_level(concurrency_level, &block) concurrency_level.increment block.call diff --git a/examples/benchmark_atomic_1.rb b/examples/benchmark_atomic_1.rb index f3f09b08e..d9deedf1b 100755 --- a/examples/benchmark_atomic_1.rb +++ b/examples/benchmark_atomic_1.rb @@ -133,7 +133,7 @@ def para_prepare(&block) # NOTE: It seems to me that this measurement method # is sensible to how the system dispatches his resources. # -# More precise caluclation could be done using +# More precise calculation could be done using # getrusage's times ret = Benchmark.measure do $go = true diff --git a/examples/graph_atomic_bench.rb b/examples/graph_atomic_bench.rb index c50da1737..b124f3b35 100755 --- a/examples/graph_atomic_bench.rb +++ b/examples/graph_atomic_bench.rb @@ -47,7 +47,7 @@ end elsif conf[:vary] == "speed" # Varies the execution time of the update block - # by using long calulation (MD5) + # by using long calculation (MD5) # # NOTE: Thread.pass and sleep() are not usable by the atomic # lock. It needs to run the whole block without hitting diff --git a/examples/init.rb b/examples/init.rb index 03c91f7c1..92d2c63af 100644 --- a/examples/init.rb +++ b/examples/init.rb @@ -4,4 +4,4 @@ def do_stuff(*args) :stuff end -Concurrent.use_simple_logger Logger::DEBUG +Concurrent.use_simple_logger :DEBUG diff --git a/ext/concurrent-ruby-ext/atomic_reference.c b/ext/concurrent-ruby-ext/atomic_reference.c index e03e07eae..d99e5afd5 100644 --- a/ext/concurrent-ruby-ext/atomic_reference.c +++ b/ext/concurrent-ruby-ext/atomic_reference.c @@ -105,7 +105,7 @@ VALUE ir_compare_and_set(volatile VALUE self, VALUE expect_value, VALUE new_valu return Qtrue; } #else - if (__sync_bool_compare_and_swap(&DATA_PTR(self), expect_value, new_value)) { + if (__sync_bool_compare_and_swap(&DATA_PTR(self), (void *)expect_value, (void *)new_value)) { return Qtrue; } #endif diff --git a/ext/concurrent-ruby/com/concurrent_ruby/ext/jsr166e/ConcurrentHashMapV8.java b/ext/concurrent-ruby/com/concurrent_ruby/ext/jsr166e/ConcurrentHashMapV8.java index 86aa4eb06..dc9901fb7 100644 --- a/ext/concurrent-ruby/com/concurrent_ruby/ext/jsr166e/ConcurrentHashMapV8.java +++ b/ext/concurrent-ruby/com/concurrent_ruby/ext/jsr166e/ConcurrentHashMapV8.java @@ -481,7 +481,7 @@ public static interface Spliterator extends Iterator { * * Maintaining API and serialization compatibility with previous * versions of this class introduces several oddities. Mainly: We - * leave untouched but unused constructor arguments refering to + * leave untouched but unused constructor arguments referring to * concurrencyLevel. We accept a loadFactor constructor argument, * but apply it only to initial table capacity (which is the only * time that we can guarantee to honor it.) We also declare an diff --git a/ext/concurrent-ruby/com/concurrent_ruby/ext/jsr166e/nounsafe/ConcurrentHashMapV8.java b/ext/concurrent-ruby/com/concurrent_ruby/ext/jsr166e/nounsafe/ConcurrentHashMapV8.java index b7fc5a937..a4e73ea14 100644 --- a/ext/concurrent-ruby/com/concurrent_ruby/ext/jsr166e/nounsafe/ConcurrentHashMapV8.java +++ b/ext/concurrent-ruby/com/concurrent_ruby/ext/jsr166e/nounsafe/ConcurrentHashMapV8.java @@ -484,7 +484,7 @@ public static interface Spliterator extends Iterator { * * Maintaining API and serialization compatibility with previous * versions of this class introduces several oddities. Mainly: We - * leave untouched but unused constructor arguments refering to + * leave untouched but unused constructor arguments referring to * concurrencyLevel. We accept a loadFactor constructor argument, * but apply it only to initial table capacity (which is the only * time that we can guarantee to honor it.) We also declare an diff --git a/lib/concurrent-ruby-edge/concurrent/actor/context.rb b/lib/concurrent-ruby-edge/concurrent/actor/context.rb index 19ee14ab7..96252901e 100644 --- a/lib/concurrent-ruby-edge/concurrent/actor/context.rb +++ b/lib/concurrent-ruby-edge/concurrent/actor/context.rb @@ -84,7 +84,7 @@ def default_reference_class Reference end - # override to se different default executor, e.g. to change it to global_operation_pool + # override to se different default executor, e.g. to change it to global_fast_executor # @return [Executor] def default_executor Concurrent.global_io_executor @@ -109,7 +109,7 @@ def ask(message) # @example by option hash # inc2 = AdHoc.spawn(name: 'increment by 2', # args: [2], - # executor: Concurrent.configuration.global_task_pool) do |increment_by| + # executor: Concurrent.global_fast_executor) do |increment_by| # lambda { |number| number + increment_by } # end # inc2.ask!(2) # => 4 diff --git a/lib/concurrent-ruby-edge/concurrent/actor/internal_delegations.rb b/lib/concurrent-ruby-edge/concurrent/actor/internal_delegations.rb index ddca34883..9ed051b5a 100644 --- a/lib/concurrent-ruby-edge/concurrent/actor/internal_delegations.rb +++ b/lib/concurrent-ruby-edge/concurrent/actor/internal_delegations.rb @@ -1,11 +1,11 @@ -require 'logger' +require 'concurrent/concern/logging' require 'concurrent/actor/public_delegations' module Concurrent module Actor module InternalDelegations include PublicDelegations - include Logger::Severity + include Concurrent::Concern::Logging # @see Core#children def children diff --git a/lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb b/lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb index 2277a977c..6690e9e26 100644 --- a/lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb +++ b/lib/concurrent-ruby-edge/concurrent/edge/erlang_actor.rb @@ -674,7 +674,7 @@ def initialize(mailbox, environment, name, executor) end def tell_op(message) - log Logger::DEBUG, @Pid, told: message + log DEBUG, @Pid, told: message if (mailbox = @Mailbox) mailbox.push_op(message).then { @Pid } else @@ -683,7 +683,7 @@ def tell_op(message) end def tell(message, timeout = nil) - log Logger::DEBUG, @Pid, told: message + log DEBUG, @Pid, told: message if (mailbox = @Mailbox) timed_out = mailbox.push message, timeout timeout ? timed_out : @Pid @@ -693,7 +693,7 @@ def tell(message, timeout = nil) end def ask(message, timeout, timeout_value) - log Logger::DEBUG, @Pid, asked: message + log DEBUG, @Pid, asked: message if @Terminated.resolved? raise NoActor.new(@Pid) else @@ -724,7 +724,7 @@ def ask(message, timeout, timeout_value) end def ask_op(message, probe) - log Logger::DEBUG, @Pid, asked: message + log DEBUG, @Pid, asked: message if @Terminated.resolved? probe.reject NoActor.new(@Pid), false else @@ -1029,7 +1029,7 @@ def terminate_self(reason, value) end def after_termination(final_reason) - log Logger::DEBUG, @Pid, terminated: final_reason + log DEBUG, @Pid, terminated: final_reason clean_reply NoActor.new(@Pid) while true message = @Mailbox.try_pop NOTHING @@ -1071,7 +1071,7 @@ def run(*args, &body) inner_run(*args, &body). run(Run::TEST). then(&method(:after_termination)). - rescue { |e| log Logger::ERROR, e } + rescue { |e| log ERROR, e } end def receive(*rules, timeout: nil, timeout_value: nil, keep: false, &given_block) @@ -1163,7 +1163,7 @@ def internal_receive end message_future.then(start, self) do |message, s, _actor| - log Logger::DEBUG, pid, got: message + log DEBUG, pid, got: message catch(JUMP) do if (message = consume_signal(message)) == NOTHING @timeout = [@timeout + s - Concurrent.monotonic_time, 0].max if s @@ -1230,7 +1230,7 @@ def receive(*rules, timeout: nil, timeout_value: nil, &given_block) matcher = -> m { m.is_a?(Ask) ? rules_matcher === m.message : rules_matcher === m } while true message = @Mailbox.pop_matching(matcher, timeout, TIMEOUT) - log Logger::DEBUG, pid, got: message + log DEBUG, pid, got: message unless (message = consume_signal(message)) == NOTHING rules.each do |rule, job| return eval_task(message, job) if rule === message @@ -1535,7 +1535,7 @@ class NoReply < Error def self.create(type, channel, environment, name, executor) actor = KLASS_MAP.fetch(type).new(channel, environment, name, executor) ensure - log Logger::DEBUG, actor.pid, created: caller[1] if actor + log Concern::Logging::DEBUG, actor.pid, created: caller[1] if actor end KLASS_MAP = { diff --git a/lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/node.rb b/lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/node.rb index d7fafa370..f3682bbfc 100644 --- a/lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/node.rb +++ b/lib/concurrent-ruby-edge/concurrent/edge/lock_free_linked_set/node.rb @@ -38,7 +38,7 @@ def next_node @SuccessorReference.value end - # This method provides a unqiue key for the data which will be used for + # This method provides a unique key for the data which will be used for # ordering. This is configurable, and changes depending on how you wish # the nodes to be ordered. def key_for(data) diff --git a/lib/concurrent-ruby-edge/concurrent/edge/old_channel_integration.rb b/lib/concurrent-ruby-edge/concurrent/edge/old_channel_integration.rb index efb003eff..a215ad80c 100644 --- a/lib/concurrent-ruby-edge/concurrent/edge/old_channel_integration.rb +++ b/lib/concurrent-ruby-edge/concurrent/edge/old_channel_integration.rb @@ -36,7 +36,7 @@ module OldChannelIntegration # @!visibility private - # Zips with selected value form the suplied channels + # Zips with selected value form the supplied channels # @return [Future] def then_select(*channels) future = Concurrent::Promises.select(*channels) diff --git a/lib/concurrent-ruby-edge/concurrent/edge/version.rb b/lib/concurrent-ruby-edge/concurrent/edge/version.rb index 4a129936d..e3dd82c4a 100644 --- a/lib/concurrent-ruby-edge/concurrent/edge/version.rb +++ b/lib/concurrent-ruby-edge/concurrent/edge/version.rb @@ -1,3 +1,3 @@ module Concurrent - EDGE_VERSION = '0.7.0' + EDGE_VERSION = '0.7.2' end diff --git a/lib/concurrent-ruby/concurrent/agent.rb b/lib/concurrent-ruby/concurrent/agent.rb index 2d32926ba..dc8a26000 100644 --- a/lib/concurrent-ruby/concurrent/agent.rb +++ b/lib/concurrent-ruby/concurrent/agent.rb @@ -371,7 +371,7 @@ def await_for(timeout) # @param [Float] timeout the maximum number of seconds to wait # @return [Boolean] true if all actions complete before timeout # - # @raise [Concurrent::TimeoutError] when timout is reached + # @raise [Concurrent::TimeoutError] when timeout is reached # # @!macro agent_await_warning def await_for!(timeout) @@ -477,7 +477,7 @@ def await_for(timeout, *agents) # @param [Array] agents the Agents on which to wait # @return [Boolean] true if all actions complete before timeout # - # @raise [Concurrent::TimeoutError] when timout is reached + # @raise [Concurrent::TimeoutError] when timeout is reached # @!macro agent_await_warning def await_for!(timeout, *agents) raise Concurrent::TimeoutError unless await_for(timeout, *agents) diff --git a/lib/concurrent-ruby/concurrent/array.rb b/lib/concurrent-ruby/concurrent/array.rb index 96434a288..c8761af8a 100644 --- a/lib/concurrent-ruby/concurrent/array.rb +++ b/lib/concurrent-ruby/concurrent/array.rb @@ -21,9 +21,9 @@ module Concurrent # @!macro internal_implementation_note ArrayImplementation = case when Concurrent.on_cruby? - # Array is thread-safe in practice because CRuby runs - # threads one at a time and does not do context - # switching during the execution of C functions. + # Array is not fully thread-safe on CRuby, see + # https://github.com/ruby-concurrency/concurrent-ruby/issues/929 + # So we will need to add synchronization here ::Array when Concurrent.on_jruby? diff --git a/lib/concurrent-ruby/concurrent/async.rb b/lib/concurrent-ruby/concurrent/async.rb index f9f8adf00..97c5a6b2d 100644 --- a/lib/concurrent-ruby/concurrent/async.rb +++ b/lib/concurrent-ruby/concurrent/async.rb @@ -218,7 +218,7 @@ module Async # @!method self.new(*args, &block) # - # Instanciate a new object and ensure proper initialization of the + # Instantiate a new object and ensure proper initialization of the # synchronization mechanisms. # # @param [Array] args Zero or more arguments to be passed to the diff --git a/lib/concurrent-ruby/concurrent/atom.rb b/lib/concurrent-ruby/concurrent/atom.rb index 1074006d7..f590a23d9 100644 --- a/lib/concurrent-ruby/concurrent/atom.rb +++ b/lib/concurrent-ruby/concurrent/atom.rb @@ -113,7 +113,7 @@ class Atom < Synchronization::Object # @option opts [Proc] :validator (nil) Optional proc used to validate new # values. It must accept one and only one argument which will be the # intended new value. The validator will return true if the new value - # is acceptable else return false (preferrably) or raise an exception. + # is acceptable else return false (preferably) or raise an exception. # # @!macro deref_options # diff --git a/lib/concurrent-ruby/concurrent/collection/map/atomic_reference_map_backend.rb b/lib/concurrent-ruby/concurrent/collection/map/atomic_reference_map_backend.rb deleted file mode 100644 index dc5189389..000000000 --- a/lib/concurrent-ruby/concurrent/collection/map/atomic_reference_map_backend.rb +++ /dev/null @@ -1,927 +0,0 @@ -require 'concurrent/constants' -require 'concurrent/thread_safe/util' -require 'concurrent/thread_safe/util/adder' -require 'concurrent/thread_safe/util/cheap_lockable' -require 'concurrent/thread_safe/util/power_of_two_tuple' -require 'concurrent/thread_safe/util/volatile' -require 'concurrent/thread_safe/util/xor_shift_random' - -module Concurrent - - # @!visibility private - module Collection - - # A Ruby port of the Doug Lea's jsr166e.ConcurrentHashMapV8 class version 1.59 - # available in public domain. - # - # Original source code available here: - # http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ConcurrentHashMapV8.java?revision=1.59 - # - # The Ruby port skips out the +TreeBin+ (red-black trees for use in bins whose - # size exceeds a threshold). - # - # A hash table supporting full concurrency of retrievals and high expected - # concurrency for updates. However, even though all operations are - # thread-safe, retrieval operations do _not_ entail locking, and there is - # _not_ any support for locking the entire table in a way that prevents all - # access. - # - # Retrieval operations generally do not block, so may overlap with update - # operations. Retrievals reflect the results of the most recently _completed_ - # update operations holding upon their onset. (More formally, an update - # operation for a given key bears a _happens-before_ relation with any (non - # +nil+) retrieval for that key reporting the updated value.) For aggregate - # operations such as +clear()+, concurrent retrievals may reflect insertion or - # removal of only some entries. Similarly, the +each_pair+ iterator yields - # elements reflecting the state of the hash table at some point at or since - # the start of the +each_pair+. Bear in mind that the results of aggregate - # status methods including +size()+ and +empty?+} are typically useful only - # when a map is not undergoing concurrent updates in other threads. Otherwise - # the results of these methods reflect transient states that may be adequate - # for monitoring or estimation purposes, but not for program control. - # - # The table is dynamically expanded when there are too many collisions (i.e., - # keys that have distinct hash codes but fall into the same slot modulo the - # table size), with the expected average effect of maintaining roughly two - # bins per mapping (corresponding to a 0.75 load factor threshold for - # resizing). There may be much variance around this average as mappings are - # added and removed, but overall, this maintains a commonly accepted - # time/space tradeoff for hash tables. However, resizing this or any other - # kind of hash table may be a relatively slow operation. When possible, it is - # a good idea to provide a size estimate as an optional :initial_capacity - # initializer argument. An additional optional :load_factor constructor - # argument provides a further means of customizing initial table capacity by - # specifying the table density to be used in calculating the amount of space - # to allocate for the given number of elements. Note that using many keys with - # exactly the same +hash+ is a sure way to slow down performance of any hash - # table. - # - # ## Design overview - # - # The primary design goal of this hash table is to maintain concurrent - # readability (typically method +[]+, but also iteration and related methods) - # while minimizing update contention. Secondary goals are to keep space - # consumption about the same or better than plain +Hash+, and to support high - # initial insertion rates on an empty table by many threads. - # - # Each key-value mapping is held in a +Node+. The validation-based approach - # explained below leads to a lot of code sprawl because retry-control - # precludes factoring into smaller methods. - # - # The table is lazily initialized to a power-of-two size upon the first - # insertion. Each bin in the table normally contains a list of +Node+s (most - # often, the list has only zero or one +Node+). Table accesses require - # volatile/atomic reads, writes, and CASes. The lists of nodes within bins are - # always accurately traversable under volatile reads, so long as lookups check - # hash code and non-nullness of value before checking key equality. - # - # We use the top two bits of +Node+ hash fields for control purposes -- they - # are available anyway because of addressing constraints. As explained further - # below, these top bits are used as follows: - # - # - 00 - Normal - # - 01 - Locked - # - 11 - Locked and may have a thread waiting for lock - # - 10 - +Node+ is a forwarding node - # - # The lower 28 bits of each +Node+'s hash field contain a the key's hash code, - # except for forwarding nodes, for which the lower bits are zero (and so - # always have hash field == +MOVED+). - # - # Insertion (via +[]=+ or its variants) of the first node in an empty bin is - # performed by just CASing it to the bin. This is by far the most common case - # for put operations under most key/hash distributions. Other update - # operations (insert, delete, and replace) require locks. We do not want to - # waste the space required to associate a distinct lock object with each bin, - # so instead use the first node of a bin list itself as a lock. Blocking - # support for these locks relies +Concurrent::ThreadSafe::Util::CheapLockable. However, we also need a - # +try_lock+ construction, so we overlay these by using bits of the +Node+ - # hash field for lock control (see above), and so normally use builtin - # monitors only for blocking and signalling using - # +cheap_wait+/+cheap_broadcast+ constructions. See +Node#try_await_lock+. - # - # Using the first node of a list as a lock does not by itself suffice though: - # When a node is locked, any update must first validate that it is still the - # first node after locking it, and retry if not. Because new nodes are always - # appended to lists, once a node is first in a bin, it remains first until - # deleted or the bin becomes invalidated (upon resizing). However, operations - # that only conditionally update may inspect nodes until the point of update. - # This is a converse of sorts to the lazy locking technique described by - # Herlihy & Shavit. - # - # The main disadvantage of per-bin locks is that other update operations on - # other nodes in a bin list protected by the same lock can stall, for example - # when user +eql?+ or mapping functions take a long time. However, - # statistically, under random hash codes, this is not a common problem. - # Ideally, the frequency of nodes in bins follows a Poisson distribution - # (http://en.wikipedia.org/wiki/Poisson_distribution) with a parameter of - # about 0.5 on average, given the resizing threshold of 0.75, although with a - # large variance because of resizing granularity. Ignoring variance, the - # expected occurrences of list size k are (exp(-0.5) * pow(0.5, k) / - # factorial(k)). The first values are: - # - # - 0: 0.60653066 - # - 1: 0.30326533 - # - 2: 0.07581633 - # - 3: 0.01263606 - # - 4: 0.00157952 - # - 5: 0.00015795 - # - 6: 0.00001316 - # - 7: 0.00000094 - # - 8: 0.00000006 - # - more: less than 1 in ten million - # - # Lock contention probability for two threads accessing distinct elements is - # roughly 1 / (8 * #elements) under random hashes. - # - # The table is resized when occupancy exceeds a percentage threshold - # (nominally, 0.75, but see below). Only a single thread performs the resize - # (using field +size_control+, to arrange exclusion), but the table otherwise - # remains usable for reads and updates. Resizing proceeds by transferring - # bins, one by one, from the table to the next table. Because we are using - # power-of-two expansion, the elements from each bin must either stay at same - # index, or move with a power of two offset. We eliminate unnecessary node - # creation by catching cases where old nodes can be reused because their next - # fields won't change. On average, only about one-sixth of them need cloning - # when a table doubles. The nodes they replace will be garbage collectable as - # soon as they are no longer referenced by any reader thread that may be in - # the midst of concurrently traversing table. Upon transfer, the old table bin - # contains only a special forwarding node (with hash field +MOVED+) that - # contains the next table as its key. On encountering a forwarding node, - # access and update operations restart, using the new table. - # - # Each bin transfer requires its bin lock. However, unlike other cases, a - # transfer can skip a bin if it fails to acquire its lock, and revisit it - # later. Method +rebuild+ maintains a buffer of TRANSFER_BUFFER_SIZE bins that - # have been skipped because of failure to acquire a lock, and blocks only if - # none are available (i.e., only very rarely). The transfer operation must - # also ensure that all accessible bins in both the old and new table are - # usable by any traversal. When there are no lock acquisition failures, this - # is arranged simply by proceeding from the last bin (+table.size - 1+) up - # towards the first. Upon seeing a forwarding node, traversals arrange to move - # to the new table without revisiting nodes. However, when any node is skipped - # during a transfer, all earlier table bins may have become visible, so are - # initialized with a reverse-forwarding node back to the old table until the - # new ones are established. (This sometimes requires transiently locking a - # forwarding node, which is possible under the above encoding.) These more - # expensive mechanics trigger only when necessary. - # - # The traversal scheme also applies to partial traversals of - # ranges of bins (via an alternate Traverser constructor) - # to support partitioned aggregate operations. Also, read-only - # operations give up if ever forwarded to a null table, which - # provides support for shutdown-style clearing, which is also not - # currently implemented. - # - # Lazy table initialization minimizes footprint until first use. - # - # The element count is maintained using a +Concurrent::ThreadSafe::Util::Adder+, - # which avoids contention on updates but can encounter cache thrashing - # if read too frequently during concurrent access. To avoid reading so - # often, resizing is attempted either when a bin lock is - # contended, or upon adding to a bin already holding two or more - # nodes (checked before adding in the +x_if_absent+ methods, after - # adding in others). Under uniform hash distributions, the - # probability of this occurring at threshold is around 13%, - # meaning that only about 1 in 8 puts check threshold (and after - # resizing, many fewer do so). But this approximation has high - # variance for small table sizes, so we check on any collision - # for sizes <= 64. The bulk putAll operation further reduces - # contention by only committing count updates upon these size - # checks. - # - # @!visibility private - class AtomicReferenceMapBackend - - # @!visibility private - class Table < Concurrent::ThreadSafe::Util::PowerOfTwoTuple - def cas_new_node(i, hash, key, value) - cas(i, nil, Node.new(hash, key, value)) - end - - def try_to_cas_in_computed(i, hash, key) - succeeded = false - new_value = nil - new_node = Node.new(locked_hash = hash | LOCKED, key, NULL) - if cas(i, nil, new_node) - begin - if NULL == (new_value = yield(NULL)) - was_null = true - else - new_node.value = new_value - end - succeeded = true - ensure - volatile_set(i, nil) if !succeeded || was_null - new_node.unlock_via_hash(locked_hash, hash) - end - end - return succeeded, new_value - end - - def try_lock_via_hash(i, node, node_hash) - node.try_lock_via_hash(node_hash) do - yield if volatile_get(i) == node - end - end - - def delete_node_at(i, node, predecessor_node) - if predecessor_node - predecessor_node.next = node.next - else - volatile_set(i, node.next) - end - end - end - - # Key-value entry. Nodes with a hash field of +MOVED+ are special, and do - # not contain user keys or values. Otherwise, keys are never +nil+, and - # +NULL+ +value+ fields indicate that a node is in the process of being - # deleted or created. For purposes of read-only access, a key may be read - # before a value, but can only be used after checking value to be +!= NULL+. - # - # @!visibility private - class Node - extend Concurrent::ThreadSafe::Util::Volatile - attr_volatile :hash, :value, :next - - include Concurrent::ThreadSafe::Util::CheapLockable - - bit_shift = Concurrent::ThreadSafe::Util::FIXNUM_BIT_SIZE - 2 # need 2 bits for ourselves - # Encodings for special uses of Node hash fields. See above for explanation. - MOVED = ('10' << ('0' * bit_shift)).to_i(2) # hash field for forwarding nodes - LOCKED = ('01' << ('0' * bit_shift)).to_i(2) # set/tested only as a bit - WAITING = ('11' << ('0' * bit_shift)).to_i(2) # both bits set/tested together - HASH_BITS = ('00' << ('1' * bit_shift)).to_i(2) # usable bits of normal node hash - - SPIN_LOCK_ATTEMPTS = Concurrent::ThreadSafe::Util::CPU_COUNT > 1 ? Concurrent::ThreadSafe::Util::CPU_COUNT * 2 : 0 - - attr_reader :key - - def initialize(hash, key, value, next_node = nil) - super() - @key = key - self.lazy_set_hash(hash) - self.lazy_set_value(value) - self.next = next_node - end - - # Spins a while if +LOCKED+ bit set and this node is the first of its bin, - # and then sets +WAITING+ bits on hash field and blocks (once) if they are - # still set. It is OK for this method to return even if lock is not - # available upon exit, which enables these simple single-wait mechanics. - # - # The corresponding signalling operation is performed within callers: Upon - # detecting that +WAITING+ has been set when unlocking lock (via a failed - # CAS from non-waiting +LOCKED+ state), unlockers acquire the - # +cheap_synchronize+ lock and perform a +cheap_broadcast+. - def try_await_lock(table, i) - if table && i >= 0 && i < table.size # bounds check, TODO: why are we bounds checking? - spins = SPIN_LOCK_ATTEMPTS - randomizer = base_randomizer = Concurrent::ThreadSafe::Util::XorShiftRandom.get - while equal?(table.volatile_get(i)) && self.class.locked_hash?(my_hash = hash) - if spins >= 0 - if (randomizer = (randomizer >> 1)).even? # spin at random - if (spins -= 1) == 0 - Thread.pass # yield before blocking - else - randomizer = base_randomizer = Concurrent::ThreadSafe::Util::XorShiftRandom.xorshift(base_randomizer) if randomizer.zero? - end - end - elsif cas_hash(my_hash, my_hash | WAITING) - force_acquire_lock(table, i) - break - end - end - end - end - - def key?(key) - @key.eql?(key) - end - - def matches?(key, hash) - pure_hash == hash && key?(key) - end - - def pure_hash - hash & HASH_BITS - end - - def try_lock_via_hash(node_hash = hash) - if cas_hash(node_hash, locked_hash = node_hash | LOCKED) - begin - yield - ensure - unlock_via_hash(locked_hash, node_hash) - end - end - end - - def locked? - self.class.locked_hash?(hash) - end - - def unlock_via_hash(locked_hash, node_hash) - unless cas_hash(locked_hash, node_hash) - self.hash = node_hash - cheap_synchronize { cheap_broadcast } - end - end - - private - def force_acquire_lock(table, i) - cheap_synchronize do - if equal?(table.volatile_get(i)) && (hash & WAITING) == WAITING - cheap_wait - else - cheap_broadcast # possibly won race vs signaller - end - end - end - - class << self - def locked_hash?(hash) - (hash & LOCKED) != 0 - end - end - end - - # shorthands - MOVED = Node::MOVED - LOCKED = Node::LOCKED - WAITING = Node::WAITING - HASH_BITS = Node::HASH_BITS - - NOW_RESIZING = -1 - DEFAULT_CAPACITY = 16 - MAX_CAPACITY = Concurrent::ThreadSafe::Util::MAX_INT - - # The buffer size for skipped bins during transfers. The - # value is arbitrary but should be large enough to avoid - # most locking stalls during resizes. - TRANSFER_BUFFER_SIZE = 32 - - extend Concurrent::ThreadSafe::Util::Volatile - attr_volatile :table, # The array of bins. Lazily initialized upon first insertion. Size is always a power of two. - - # Table initialization and resizing control. When negative, the - # table is being initialized or resized. Otherwise, when table is - # null, holds the initial table size to use upon creation, or 0 - # for default. After initialization, holds the next element count - # value upon which to resize the table. - :size_control - - def initialize(options = nil) - super() - @counter = Concurrent::ThreadSafe::Util::Adder.new - initial_capacity = options && options[:initial_capacity] || DEFAULT_CAPACITY - self.size_control = (capacity = table_size_for(initial_capacity)) > MAX_CAPACITY ? MAX_CAPACITY : capacity - end - - def get_or_default(key, else_value = nil) - hash = key_hash(key) - current_table = table - while current_table - node = current_table.volatile_get_by_hash(hash) - current_table = - while node - if (node_hash = node.hash) == MOVED - break node.key - elsif (node_hash & HASH_BITS) == hash && node.key?(key) && NULL != (value = node.value) - return value - end - node = node.next - end - end - else_value - end - - def [](key) - get_or_default(key) - end - - def key?(key) - get_or_default(key, NULL) != NULL - end - - def []=(key, value) - get_and_set(key, value) - value - end - - def compute_if_absent(key) - hash = key_hash(key) - current_table = table || initialize_table - while true - if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash))) - succeeded, new_value = current_table.try_to_cas_in_computed(i, hash, key) { yield } - if succeeded - increment_size - return new_value - end - elsif (node_hash = node.hash) == MOVED - current_table = node.key - elsif NULL != (current_value = find_value_in_node_list(node, key, hash, node_hash & HASH_BITS)) - return current_value - elsif Node.locked_hash?(node_hash) - try_await_lock(current_table, i, node) - else - succeeded, value = attempt_internal_compute_if_absent(key, hash, current_table, i, node, node_hash) { yield } - return value if succeeded - end - end - end - - def compute_if_present(key) - new_value = nil - internal_replace(key) do |old_value| - if (new_value = yield(NULL == old_value ? nil : old_value)).nil? - NULL - else - new_value - end - end - new_value - end - - def compute(key) - internal_compute(key) do |old_value| - if (new_value = yield(NULL == old_value ? nil : old_value)).nil? - NULL - else - new_value - end - end - end - - def merge_pair(key, value) - internal_compute(key) do |old_value| - if NULL == old_value || !(value = yield(old_value)).nil? - value - else - NULL - end - end - end - - def replace_pair(key, old_value, new_value) - NULL != internal_replace(key, old_value) { new_value } - end - - def replace_if_exists(key, new_value) - if (result = internal_replace(key) { new_value }) && NULL != result - result - end - end - - def get_and_set(key, value) # internalPut in the original CHMV8 - hash = key_hash(key) - current_table = table || initialize_table - while true - if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash))) - if current_table.cas_new_node(i, hash, key, value) - increment_size - break - end - elsif (node_hash = node.hash) == MOVED - current_table = node.key - elsif Node.locked_hash?(node_hash) - try_await_lock(current_table, i, node) - else - succeeded, old_value = attempt_get_and_set(key, value, hash, current_table, i, node, node_hash) - break old_value if succeeded - end - end - end - - def delete(key) - replace_if_exists(key, NULL) - end - - def delete_pair(key, value) - result = internal_replace(key, value) { NULL } - if result && NULL != result - !!result - else - false - end - end - - def each_pair - return self unless current_table = table - current_table_size = base_size = current_table.size - i = base_index = 0 - while base_index < base_size - if node = current_table.volatile_get(i) - if node.hash == MOVED - current_table = node.key - current_table_size = current_table.size - else - begin - if NULL != (value = node.value) # skip deleted or special nodes - yield node.key, value - end - end while node = node.next - end - end - - if (i_with_base = i + base_size) < current_table_size - i = i_with_base # visit upper slots if present - else - i = base_index += 1 - end - end - self - end - - def size - (sum = @counter.sum) < 0 ? 0 : sum # ignore transient negative values - end - - def empty? - size == 0 - end - - # Implementation for clear. Steps through each bin, removing all nodes. - def clear - return self unless current_table = table - current_table_size = current_table.size - deleted_count = i = 0 - while i < current_table_size - if !(node = current_table.volatile_get(i)) - i += 1 - elsif (node_hash = node.hash) == MOVED - current_table = node.key - current_table_size = current_table.size - elsif Node.locked_hash?(node_hash) - decrement_size(deleted_count) # opportunistically update count - deleted_count = 0 - node.try_await_lock(current_table, i) - else - current_table.try_lock_via_hash(i, node, node_hash) do - begin - deleted_count += 1 if NULL != node.value # recheck under lock - node.value = nil - end while node = node.next - current_table.volatile_set(i, nil) - i += 1 - end - end - end - decrement_size(deleted_count) - self - end - - private - # Internal versions of the insertion methods, each a - # little more complicated than the last. All have - # the same basic structure: - # 1. If table uninitialized, create - # 2. If bin empty, try to CAS new node - # 3. If bin stale, use new table - # 4. Lock and validate; if valid, scan and add or update - # - # The others interweave other checks and/or alternative actions: - # * Plain +get_and_set+ checks for and performs resize after insertion. - # * compute_if_absent prescans for mapping without lock (and fails to add - # if present), which also makes pre-emptive resize checks worthwhile. - # - # Someday when details settle down a bit more, it might be worth - # some factoring to reduce sprawl. - def internal_replace(key, expected_old_value = NULL, &block) - hash = key_hash(key) - current_table = table - while current_table - if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash))) - break - elsif (node_hash = node.hash) == MOVED - current_table = node.key - elsif (node_hash & HASH_BITS) != hash && !node.next # precheck - break # rules out possible existence - elsif Node.locked_hash?(node_hash) - try_await_lock(current_table, i, node) - else - succeeded, old_value = attempt_internal_replace(key, expected_old_value, hash, current_table, i, node, node_hash, &block) - return old_value if succeeded - end - end - NULL - end - - def attempt_internal_replace(key, expected_old_value, hash, current_table, i, node, node_hash) - current_table.try_lock_via_hash(i, node, node_hash) do - predecessor_node = nil - old_value = NULL - begin - if node.matches?(key, hash) && NULL != (current_value = node.value) - if NULL == expected_old_value || expected_old_value == current_value # NULL == expected_old_value means whatever value - old_value = current_value - if NULL == (node.value = yield(old_value)) - current_table.delete_node_at(i, node, predecessor_node) - decrement_size - end - end - break - end - - predecessor_node = node - end while node = node.next - - return true, old_value - end - end - - def find_value_in_node_list(node, key, hash, pure_hash) - do_check_for_resize = false - while true - if pure_hash == hash && node.key?(key) && NULL != (value = node.value) - return value - elsif node = node.next - do_check_for_resize = true # at least 2 nodes -> check for resize - pure_hash = node.pure_hash - else - return NULL - end - end - ensure - check_for_resize if do_check_for_resize - end - - def internal_compute(key, &block) - hash = key_hash(key) - current_table = table || initialize_table - while true - if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash))) - succeeded, new_value = current_table.try_to_cas_in_computed(i, hash, key, &block) - if succeeded - if NULL == new_value - break nil - else - increment_size - break new_value - end - end - elsif (node_hash = node.hash) == MOVED - current_table = node.key - elsif Node.locked_hash?(node_hash) - try_await_lock(current_table, i, node) - else - succeeded, new_value = attempt_compute(key, hash, current_table, i, node, node_hash, &block) - break new_value if succeeded - end - end - end - - def attempt_internal_compute_if_absent(key, hash, current_table, i, node, node_hash) - added = false - current_table.try_lock_via_hash(i, node, node_hash) do - while true - if node.matches?(key, hash) && NULL != (value = node.value) - return true, value - end - last = node - unless node = node.next - last.next = Node.new(hash, key, value = yield) - added = true - increment_size - return true, value - end - end - end - ensure - check_for_resize if added - end - - def attempt_compute(key, hash, current_table, i, node, node_hash) - added = false - current_table.try_lock_via_hash(i, node, node_hash) do - predecessor_node = nil - while true - if node.matches?(key, hash) && NULL != (value = node.value) - if NULL == (node.value = value = yield(value)) - current_table.delete_node_at(i, node, predecessor_node) - decrement_size - value = nil - end - return true, value - end - predecessor_node = node - unless node = node.next - if NULL == (value = yield(NULL)) - value = nil - else - predecessor_node.next = Node.new(hash, key, value) - added = true - increment_size - end - return true, value - end - end - end - ensure - check_for_resize if added - end - - def attempt_get_and_set(key, value, hash, current_table, i, node, node_hash) - node_nesting = nil - current_table.try_lock_via_hash(i, node, node_hash) do - node_nesting = 1 - old_value = nil - found_old_value = false - while node - if node.matches?(key, hash) && NULL != (old_value = node.value) - found_old_value = true - node.value = value - break - end - last = node - unless node = node.next - last.next = Node.new(hash, key, value) - break - end - node_nesting += 1 - end - - return true, old_value if found_old_value - increment_size - true - end - ensure - check_for_resize if node_nesting && (node_nesting > 1 || current_table.size <= 64) - end - - def initialize_copy(other) - super - @counter = Concurrent::ThreadSafe::Util::Adder.new - self.table = nil - self.size_control = (other_table = other.table) ? other_table.size : DEFAULT_CAPACITY - self - end - - def try_await_lock(current_table, i, node) - check_for_resize # try resizing if can't get lock - node.try_await_lock(current_table, i) - end - - def key_hash(key) - key.hash & HASH_BITS - end - - # Returns a power of two table size for the given desired capacity. - def table_size_for(entry_count) - size = 2 - size <<= 1 while size < entry_count - size - end - - # Initializes table, using the size recorded in +size_control+. - def initialize_table - until current_table ||= table - if (size_ctrl = size_control) == NOW_RESIZING - Thread.pass # lost initialization race; just spin - else - try_in_resize_lock(current_table, size_ctrl) do - initial_size = size_ctrl > 0 ? size_ctrl : DEFAULT_CAPACITY - current_table = self.table = Table.new(initial_size) - initial_size - (initial_size >> 2) # 75% load factor - end - end - end - current_table - end - - # If table is too small and not already resizing, creates next table and - # transfers bins. Rechecks occupancy after a transfer to see if another - # resize is already needed because resizings are lagging additions. - def check_for_resize - while (current_table = table) && MAX_CAPACITY > (table_size = current_table.size) && NOW_RESIZING != (size_ctrl = size_control) && size_ctrl < @counter.sum - try_in_resize_lock(current_table, size_ctrl) do - self.table = rebuild(current_table) - (table_size << 1) - (table_size >> 1) # 75% load factor - end - end - end - - def try_in_resize_lock(current_table, size_ctrl) - if cas_size_control(size_ctrl, NOW_RESIZING) - begin - if current_table == table # recheck under lock - size_ctrl = yield # get new size_control - end - ensure - self.size_control = size_ctrl - end - end - end - - # Moves and/or copies the nodes in each bin to new table. See above for explanation. - def rebuild(table) - old_table_size = table.size - new_table = table.next_in_size_table - # puts "#{old_table_size} -> #{new_table.size}" - forwarder = Node.new(MOVED, new_table, NULL) - rev_forwarder = nil - locked_indexes = nil # holds bins to revisit; nil until needed - locked_arr_idx = 0 - bin = old_table_size - 1 - i = bin - while true - if !(node = table.volatile_get(i)) - # no lock needed (or available) if bin >= 0, because we're not popping values from locked_indexes until we've run through the whole table - redo unless (bin >= 0 ? table.cas(i, nil, forwarder) : lock_and_clean_up_reverse_forwarders(table, old_table_size, new_table, i, forwarder)) - elsif Node.locked_hash?(node_hash = node.hash) - locked_indexes ||= ::Array.new - if bin < 0 && locked_arr_idx > 0 - locked_arr_idx -= 1 - i, locked_indexes[locked_arr_idx] = locked_indexes[locked_arr_idx], i # swap with another bin - redo - end - if bin < 0 || locked_indexes.size >= TRANSFER_BUFFER_SIZE - node.try_await_lock(table, i) # no other options -- block - redo - end - rev_forwarder ||= Node.new(MOVED, table, NULL) - redo unless table.volatile_get(i) == node && node.locked? # recheck before adding to list - locked_indexes << i - new_table.volatile_set(i, rev_forwarder) - new_table.volatile_set(i + old_table_size, rev_forwarder) - else - redo unless split_old_bin(table, new_table, i, node, node_hash, forwarder) - end - - if bin > 0 - i = (bin -= 1) - elsif locked_indexes && !locked_indexes.empty? - bin = -1 - i = locked_indexes.pop - locked_arr_idx = locked_indexes.size - 1 - else - return new_table - end - end - end - - def lock_and_clean_up_reverse_forwarders(old_table, old_table_size, new_table, i, forwarder) - # transiently use a locked forwarding node - locked_forwarder = Node.new(moved_locked_hash = MOVED | LOCKED, new_table, NULL) - if old_table.cas(i, nil, locked_forwarder) - new_table.volatile_set(i, nil) # kill the potential reverse forwarders - new_table.volatile_set(i + old_table_size, nil) # kill the potential reverse forwarders - old_table.volatile_set(i, forwarder) - locked_forwarder.unlock_via_hash(moved_locked_hash, MOVED) - true - end - end - - # Splits a normal bin with list headed by e into lo and hi parts; installs in given table. - def split_old_bin(table, new_table, i, node, node_hash, forwarder) - table.try_lock_via_hash(i, node, node_hash) do - split_bin(new_table, i, node, node_hash) - table.volatile_set(i, forwarder) - end - end - - def split_bin(new_table, i, node, node_hash) - bit = new_table.size >> 1 # bit to split on - run_bit = node_hash & bit - last_run = nil - low = nil - high = nil - current_node = node - # this optimises for the lowest amount of volatile writes and objects created - while current_node = current_node.next - unless (b = current_node.hash & bit) == run_bit - run_bit = b - last_run = current_node - end - end - if run_bit == 0 - low = last_run - else - high = last_run - end - current_node = node - until current_node == last_run - pure_hash = current_node.pure_hash - if (pure_hash & bit) == 0 - low = Node.new(pure_hash, current_node.key, current_node.value, low) - else - high = Node.new(pure_hash, current_node.key, current_node.value, high) - end - current_node = current_node.next - end - new_table.volatile_set(i, low) - new_table.volatile_set(i + bit, high) - end - - def increment_size - @counter.increment - end - - def decrement_size(by = 1) - @counter.add(-by) - end - end - end -end diff --git a/lib/concurrent-ruby/concurrent/collection/map/synchronized_map_backend.rb b/lib/concurrent-ruby/concurrent/collection/map/synchronized_map_backend.rb index 190c8d98d..efa161ed9 100644 --- a/lib/concurrent-ruby/concurrent/collection/map/synchronized_map_backend.rb +++ b/lib/concurrent-ruby/concurrent/collection/map/synchronized_map_backend.rb @@ -8,74 +8,77 @@ module Collection # @!visibility private class SynchronizedMapBackend < NonConcurrentMapBackend - require 'mutex_m' - include Mutex_m - # WARNING: Mutex_m is a non-reentrant lock, so the synchronized methods are - # not allowed to call each other. + def initialize(*args, &block) + super + + # WARNING: Mutex is a non-reentrant lock, so the synchronized methods are + # not allowed to call each other. + @mutex = Mutex.new + end def [](key) - synchronize { super } + @mutex.synchronize { super } end def []=(key, value) - synchronize { super } + @mutex.synchronize { super } end def compute_if_absent(key) - synchronize { super } + @mutex.synchronize { super } end def compute_if_present(key) - synchronize { super } + @mutex.synchronize { super } end def compute(key) - synchronize { super } + @mutex.synchronize { super } end def merge_pair(key, value) - synchronize { super } + @mutex.synchronize { super } end def replace_pair(key, old_value, new_value) - synchronize { super } + @mutex.synchronize { super } end def replace_if_exists(key, new_value) - synchronize { super } + @mutex.synchronize { super } end def get_and_set(key, value) - synchronize { super } + @mutex.synchronize { super } end def key?(key) - synchronize { super } + @mutex.synchronize { super } end def delete(key) - synchronize { super } + @mutex.synchronize { super } end def delete_pair(key, value) - synchronize { super } + @mutex.synchronize { super } end def clear - synchronize { super } + @mutex.synchronize { super } end def size - synchronize { super } + @mutex.synchronize { super } end def get_or_default(key, default_value) - synchronize { super } + @mutex.synchronize { super } end private def dupped_backend - synchronize { super } + @mutex.synchronize { super } end end end diff --git a/lib/concurrent-ruby/concurrent/concern/logging.rb b/lib/concurrent-ruby/concurrent/concern/logging.rb index 568a539eb..d1aae81ae 100644 --- a/lib/concurrent-ruby/concurrent/concern/logging.rb +++ b/lib/concurrent-ruby/concurrent/concern/logging.rb @@ -1,4 +1,3 @@ -require 'logger' require 'concurrent/atomic/atomic_reference' module Concurrent @@ -8,10 +7,12 @@ module Concern # # @!visibility private module Logging - include Logger::Severity + # The same as Logger::Severity but we copy it here to avoid a dependency on the logger gem just for these 7 constants + DEBUG, INFO, WARN, ERROR, FATAL, UNKNOWN = 0, 1, 2, 3, 4, 5 + SEV_LABEL = %w[DEBUG INFO WARN ERROR FATAL ANY].freeze # Logs through {Concurrent.global_logger}, it can be overridden by setting @logger - # @param [Integer] level one of Logger::Severity constants + # @param [Integer] level one of Concurrent::Concern::Logging constants # @param [String] progname e.g. a path of an Actor # @param [String, nil] message when nil block is used to generate the message # @yieldreturn [String] a message @@ -23,7 +24,7 @@ def log(level, progname, message = nil, &block) end logger.call level, progname, message, &block rescue => error - $stderr.puts "`Concurrent.configuration.logger` failed to log #{[level, progname, message, block]}\n" + + $stderr.puts "`Concurrent.global_logger` failed to log #{[level, progname, message, block]}\n" + "#{error.message} (#{error.class})\n#{error.backtrace.join "\n"}" end end @@ -33,8 +34,10 @@ def log(level, progname, message = nil, &block) module Concurrent extend Concern::Logging - # @return [Logger] Logger with provided level and output. - def self.create_simple_logger(level = Logger::FATAL, output = $stderr) + # Create a simple logger with provided level and output. + def self.create_simple_logger(level = :FATAL, output = $stderr) + level = Concern::Logging.const_get(level) unless level.is_a?(Integer) + # TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlocking lambda do |severity, progname, message = nil, &block| return false if severity < level @@ -52,7 +55,7 @@ def self.create_simple_logger(level = Logger::FATAL, output = $stderr) output.print format "[%s] %5s -- %s: %s\n", Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'), - Logger::SEV_LABEL[severity], + Concern::Logging::SEV_LABEL[severity], progname, formatted_message true @@ -60,13 +63,15 @@ def self.create_simple_logger(level = Logger::FATAL, output = $stderr) end # Use logger created by #create_simple_logger to log concurrent-ruby messages. - def self.use_simple_logger(level = Logger::FATAL, output = $stderr) + def self.use_simple_logger(level = :FATAL, output = $stderr) Concurrent.global_logger = create_simple_logger level, output end - # @return [Logger] Logger with provided level and output. + # Create a stdlib logger with provided level and output. + # If you use this deprecated method you might need to add logger to your Gemfile to avoid warnings from Ruby 3.3.5+. # @deprecated - def self.create_stdlib_logger(level = Logger::FATAL, output = $stderr) + def self.create_stdlib_logger(level = :FATAL, output = $stderr) + require 'logger' logger = Logger.new(output) logger.level = level logger.formatter = lambda do |severity, datetime, progname, msg| @@ -93,7 +98,7 @@ def self.create_stdlib_logger(level = Logger::FATAL, output = $stderr) # Use logger created by #create_stdlib_logger to log concurrent-ruby messages. # @deprecated - def self.use_stdlib_logger(level = Logger::FATAL, output = $stderr) + def self.use_stdlib_logger(level = :FATAL, output = $stderr) Concurrent.global_logger = create_stdlib_logger level, output end @@ -103,7 +108,7 @@ def self.use_stdlib_logger(level = Logger::FATAL, output = $stderr) NULL_LOGGER = lambda { |level, progname, message = nil, &block| } # @!visibility private - GLOBAL_LOGGER = AtomicReference.new(create_simple_logger(Logger::WARN)) + GLOBAL_LOGGER = AtomicReference.new(create_simple_logger(:WARN)) private_constant :GLOBAL_LOGGER def self.global_logger diff --git a/lib/concurrent-ruby/concurrent/delay.rb b/lib/concurrent-ruby/concurrent/delay.rb index 923773cbc..0d6d91a57 100644 --- a/lib/concurrent-ruby/concurrent/delay.rb +++ b/lib/concurrent-ruby/concurrent/delay.rb @@ -19,7 +19,7 @@ module Concurrent # # When a `Delay` is created its state is set to `pending`. The value and # reason are both `nil`. The first time the `#value` method is called the - # enclosed opration will be run and the calling thread will block. Other + # enclosed operation will be run and the calling thread will block. Other # threads attempting to call `#value` will block as well. Once the operation # is complete the *value* will be set to the result of the operation or the # *reason* will be set to the raised exception, as appropriate. All threads diff --git a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb index 4de512a5f..8324c0673 100644 --- a/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb +++ b/lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb @@ -39,6 +39,10 @@ module Concurrent # The number of tasks that have been completed by the pool since construction. # @return [Integer] The number of tasks that have been completed by the pool since construction. + # @!macro thread_pool_executor_method_active_count + # The number of threads that are actively executing tasks. + # @return [Integer] The number of threads that are actively executing tasks. + # @!macro thread_pool_executor_attr_reader_idletime # The number of seconds that a thread may be idle before being reclaimed. # @return [Integer] The number of seconds that a thread may be idle before being reclaimed. @@ -79,7 +83,7 @@ module Concurrent # # This is a no-op on some pool implementation (e.g. the Java one). The Ruby # pool will auto-prune each time a new job is posted. You will need to call - # this method explicitely in case your application post jobs in bursts (a + # this method explicitly in case your application post jobs in bursts (a # lot of jobs and then nothing for long periods) # @!macro thread_pool_executor_public_api diff --git a/lib/concurrent-ruby/concurrent/executor/java_executor_service.rb b/lib/concurrent-ruby/concurrent/executor/java_executor_service.rb index 9a8638552..b2bc69a6e 100644 --- a/lib/concurrent-ruby/concurrent/executor/java_executor_service.rb +++ b/lib/concurrent-ruby/concurrent/executor/java_executor_service.rb @@ -57,15 +57,11 @@ def ns_running? end def ns_shuttingdown? - if @executor.respond_to? :isTerminating - @executor.isTerminating - else - false - end + @executor.isShutdown && !@executor.isTerminated end def ns_shutdown? - @executor.isShutdown || @executor.isTerminated + @executor.isTerminated end class Job @@ -88,10 +84,11 @@ class DaemonThreadFactory def initialize(daemonize = true) @daemonize = daemonize + @java_thread_factory = java.util.concurrent.Executors.defaultThreadFactory end def newThread(runnable) - thread = java.util.concurrent.Executors.defaultThreadFactory().newThread(runnable) + thread = @java_thread_factory.newThread(runnable) thread.setDaemon(@daemonize) return thread end diff --git a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb index 1213a95fb..598a5f91f 100644 --- a/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb @@ -73,6 +73,11 @@ def completed_task_count @executor.getCompletedTaskCount end + # @!macro thread_pool_executor_method_active_count + def active_count + @executor.getActiveCount + end + # @!macro thread_pool_executor_attr_reader_idletime def idletime @executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS) diff --git a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb index 298dd7fed..9375acf38 100644 --- a/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb @@ -61,6 +61,13 @@ def completed_task_count synchronize { @completed_task_count } end + # @!macro thread_pool_executor_method_active_count + def active_count + synchronize do + @pool.length - @ready.length + end + end + # @!macro executor_service_method_can_overflow_question def can_overflow? synchronize { ns_limited_queue? } diff --git a/lib/concurrent-ruby/concurrent/executor/single_thread_executor.rb b/lib/concurrent-ruby/concurrent/executor/single_thread_executor.rb index f1474ea9f..220eb0ff6 100644 --- a/lib/concurrent-ruby/concurrent/executor/single_thread_executor.rb +++ b/lib/concurrent-ruby/concurrent/executor/single_thread_executor.rb @@ -27,7 +27,7 @@ module Concurrent # is received. This pattern has several issues. The thread itself is highly # susceptible to errors during processing. Also, the thread itself must be # constantly monitored and restarted should it die. `SingleThreadExecutor` - # encapsulates all these bahaviors. The task processor is highly resilient + # encapsulates all these behaviors. The task processor is highly resilient # to errors from within tasks. Also, should the thread die it will # automatically be restarted. # diff --git a/lib/concurrent-ruby/concurrent/executor/timer_set.rb b/lib/concurrent-ruby/concurrent/executor/timer_set.rb index 0dfaf1288..759dce093 100644 --- a/lib/concurrent-ruby/concurrent/executor/timer_set.rb +++ b/lib/concurrent-ruby/concurrent/executor/timer_set.rb @@ -3,7 +3,7 @@ require 'concurrent/collection/non_concurrent_priority_queue' require 'concurrent/executor/executor_service' require 'concurrent/executor/single_thread_executor' - +require 'concurrent/errors' require 'concurrent/options' module Concurrent @@ -162,7 +162,11 @@ def process_tasks # queue now must have the same pop time, or a closer one, as # when we peeked). task = synchronize { @queue.pop } - task.executor.post { task.process_task } + begin + task.executor.post { task.process_task } + rescue RejectedExecutionError + # ignore and continue + end else @condition.wait([diff, 60].min) end diff --git a/lib/concurrent-ruby/concurrent/hash.rb b/lib/concurrent-ruby/concurrent/hash.rb index 7902fe9d2..db0208e01 100644 --- a/lib/concurrent-ruby/concurrent/hash.rb +++ b/lib/concurrent-ruby/concurrent/hash.rb @@ -15,9 +15,11 @@ module Concurrent # @!macro internal_implementation_note HashImplementation = case when Concurrent.on_cruby? - # Hash is thread-safe in practice because CRuby runs - # threads one at a time and does not do context - # switching during the execution of C functions. + # Hash is not fully thread-safe on CRuby, see + # https://bugs.ruby-lang.org/issues/19237 + # https://github.com/ruby/ruby/commit/ffd52412ab + # https://github.com/ruby-concurrency/concurrent-ruby/issues/929 + # So we will need to add synchronization here (similar to Concurrent::Map). ::Hash when Concurrent.on_jruby? diff --git a/lib/concurrent-ruby/concurrent/map.rb b/lib/concurrent-ruby/concurrent/map.rb index 1b2224195..b263f83d8 100644 --- a/lib/concurrent-ruby/concurrent/map.rb +++ b/lib/concurrent-ruby/concurrent/map.rb @@ -20,8 +20,8 @@ module Collection require 'concurrent/collection/map/truffleruby_map_backend' TruffleRubyMapBackend else - require 'concurrent/collection/map/atomic_reference_map_backend' - AtomicReferenceMapBackend + require 'concurrent/collection/map/synchronized_map_backend' + SynchronizedMapBackend end else warn 'Concurrent::Map: unsupported Ruby engine, using a fully synchronized Concurrent::Map implementation' @@ -148,7 +148,7 @@ def [](key) if value = super # non-falsy value is an existing mapping, return it right away value # re-check is done with get_or_default(key, NULL) instead of a simple !key?(key) in order to avoid a race condition, whereby by the time the current thread gets to the key?(key) call - # a key => value mapping might have already been created by a different thread (key?(key) would then return true, this elsif branch wouldn't be taken and an incorrent +nil+ value + # a key => value mapping might have already been created by a different thread (key?(key) would then return true, this elsif branch wouldn't be taken and an incorrect +nil+ value # would be returned) # note: nil == value check is not technically necessary elsif @default_proc && nil == value && NULL == (value = get_or_default(key, NULL)) diff --git a/lib/concurrent-ruby/concurrent/mvar.rb b/lib/concurrent-ruby/concurrent/mvar.rb index dfc41950c..9777ba347 100644 --- a/lib/concurrent-ruby/concurrent/mvar.rb +++ b/lib/concurrent-ruby/concurrent/mvar.rb @@ -9,7 +9,7 @@ module Concurrent # queue of length one, or a special kind of mutable variable. # # On top of the fundamental `#put` and `#take` operations, we also provide a - # `#mutate` that is atomic with respect to operations on the same instance. + # `#modify` that is atomic with respect to operations on the same instance. # These operations all support timeouts. # # We also support non-blocking operations `#try_put!` and `#try_take!`, a @@ -87,7 +87,7 @@ def borrow(timeout = nil) @mutex.synchronize do wait_for_full(timeout) - # if we timeoud out we'll still be empty + # If we timed out we'll still be empty if unlocked_full? yield @value else @@ -116,10 +116,10 @@ def put(value, timeout = nil) end # Atomically `take`, yield the value to a block for transformation, and then - # `put` the transformed value. Returns the transformed value. A timeout can + # `put` the transformed value. Returns the pre-transform value. A timeout can # be set to limit the time spent blocked, in which case it returns `TIMEOUT` # if the time is exceeded. - # @return [Object] the transformed value, or `TIMEOUT` + # @return [Object] the pre-transform value, or `TIMEOUT` def modify(timeout = nil) raise ArgumentError.new('no block given') unless block_given? diff --git a/lib/concurrent-ruby/concurrent/promise.rb b/lib/concurrent-ruby/concurrent/promise.rb index ccc47dd62..8df80b0c7 100644 --- a/lib/concurrent-ruby/concurrent/promise.rb +++ b/lib/concurrent-ruby/concurrent/promise.rb @@ -103,7 +103,7 @@ module Concurrent # - if parent is *rejected* the child will be *pending* (but will ultimately be *rejected*) # # Promises are executed asynchronously from the main thread. By the time a - # child Promise finishes intialization it may be in a different state than its + # child Promise finishes initialization it may be in a different state than its # parent (by the time a child is created its parent may have completed # execution and changed state). Despite being asynchronous, however, the order # of execution of Promise objects in a chain (or tree) is strictly defined. @@ -167,7 +167,7 @@ module Concurrent # c2 = p.then(-> reason { raise 'Boom!' }) # # c1.wait.state #=> :fulfilled - # c1.value #=> 45 + # c1.value #=> 42 # c2.wait.state #=> :rejected # c2.reason #=> # # ``` diff --git a/lib/concurrent-ruby/concurrent/promises.rb b/lib/concurrent-ruby/concurrent/promises.rb index 3cd17055c..c5df8fe9c 100644 --- a/lib/concurrent-ruby/concurrent/promises.rb +++ b/lib/concurrent-ruby/concurrent/promises.rb @@ -5,6 +5,7 @@ require 'concurrent/configuration' require 'concurrent/errors' require 'concurrent/re_include' +require 'concurrent/utility/monotonic_time' module Concurrent @@ -22,7 +23,7 @@ module Promises # # @!macro promises.param.args # @param [Object] args arguments which are passed to the task when it's executed. - # (It might be prepended with other arguments, see the @yeild section). + # (It might be prepended with other arguments, see the @yield section). # # @!macro promises.shortcut.on # Shortcut of {#$0_on} with default `:io` executor supplied. @@ -63,8 +64,8 @@ def resolvable_event resolvable_event_on default_executor end - # Created resolvable event, user is responsible for resolving the event once by - # {Promises::ResolvableEvent#resolve}. + # Creates a resolvable event, user is responsible for resolving the event once + # by calling {Promises::ResolvableEvent#resolve}. # # @!macro promises.param.default_executor # @return [ResolvableEvent] @@ -94,7 +95,7 @@ def future(*args, &task) future_on(default_executor, *args, &task) end - # Constructs new Future which will be resolved after block is evaluated on default executor. + # Constructs a new Future which will be resolved after block is evaluated on default executor. # Evaluation begins immediately. # # @!macro promises.param.default_executor @@ -106,7 +107,7 @@ def future_on(default_executor, *args, &task) ImmediateEventPromise.new(default_executor).future.then(*args, &task) end - # Creates resolved future with will be either fulfilled with the given value or rejection with + # Creates a resolved future with will be either fulfilled with the given value or rejected with # the given reason. # # @param [true, false] fulfilled @@ -118,7 +119,7 @@ def resolved_future(fulfilled, value, reason, default_executor = self.default_ex ImmediateFuturePromise.new(default_executor, fulfilled, value, reason).future end - # Creates resolved future with will be fulfilled with the given value. + # Creates a resolved future which will be fulfilled with the given value. # # @!macro promises.param.default_executor # @param [Object] value @@ -127,7 +128,7 @@ def fulfilled_future(value, default_executor = self.default_executor) resolved_future true, value, nil, default_executor end - # Creates resolved future with will be rejected with the given reason. + # Creates a resolved future which will be rejected with the given reason. # # @!macro promises.param.default_executor # @param [Object] reason @@ -190,7 +191,7 @@ def delay(*args, &task) delay_on default_executor, *args, &task end - # Creates new event or future which is resolved only after it is touched, + # Creates a new event or future which is resolved only after it is touched, # see {Concurrent::AbstractEventFuture#touch}. # # @!macro promises.param.default_executor @@ -214,7 +215,7 @@ def schedule(intended_time, *args, &task) schedule_on default_executor, intended_time, *args, &task end - # Creates new event or future which is resolved in intended_time. + # Creates a new event or future which is resolved in intended_time. # # @!macro promises.param.default_executor # @!macro promises.param.intended_time @@ -240,8 +241,8 @@ def zip_futures(*futures_and_or_events) zip_futures_on default_executor, *futures_and_or_events end - # Creates new future which is resolved after all futures_and_or_events are resolved. - # Its value is array of zipped future values. Its reason is array of reasons for rejection. + # Creates a new future which is resolved after all futures_and_or_events are resolved. + # Its value is an array of zipped future values. Its reason is an array of reasons for rejection. # If there is an error it rejects. # @!macro promises.event-conversion # If event is supplied, which does not have value and can be only resolved, it's @@ -262,7 +263,7 @@ def zip_events(*futures_and_or_events) zip_events_on default_executor, *futures_and_or_events end - # Creates new event which is resolved after all futures_and_or_events are resolved. + # Creates a new event which is resolved after all futures_and_or_events are resolved. # (Future is resolved when fulfilled or rejected.) # # @!macro promises.param.default_executor @@ -280,8 +281,8 @@ def any_resolved_future(*futures_and_or_events) alias_method :any, :any_resolved_future - # Creates new future which is resolved after first futures_and_or_events is resolved. - # Its result equals result of the first resolved future. + # Creates a new future which is resolved after the first futures_and_or_events is resolved. + # Its result equals the result of the first resolved future. # @!macro promises.any-touch # If resolved it does not propagate {Concurrent::AbstractEventFuture#touch}, leaving delayed # futures un-executed if they are not required any more. @@ -300,9 +301,9 @@ def any_fulfilled_future(*futures_and_or_events) any_fulfilled_future_on default_executor, *futures_and_or_events end - # Creates new future which is resolved after first of futures_and_or_events is fulfilled. - # Its result equals result of the first resolved future or if all futures_and_or_events reject, - # it has reason of the last resolved future. + # Creates a new future which is resolved after the first futures_and_or_events is fulfilled. + # Its result equals the result of the first resolved future or if all futures_and_or_events reject, + # it has reason of the last rejected future. # @!macro promises.any-touch # @!macro promises.event-conversion # @@ -319,7 +320,7 @@ def any_event(*futures_and_or_events) any_event_on default_executor, *futures_and_or_events end - # Creates new event which becomes resolved after first of the futures_and_or_events resolves. + # Creates a new event which becomes resolved after the first futures_and_or_events resolves. # @!macro promises.any-touch # # @!macro promises.param.default_executor @@ -611,7 +612,7 @@ def chain(*args, &task) # @yieldparam [Object] value # @yieldparam [Object] reason def chain_on(executor, *args, &task) - ChainPromise.new_blocked_by1(self, @DefaultExecutor, executor, args, &task).future + ChainPromise.new_blocked_by1(self, executor, executor, args, &task).future end # @return [String] Short string representation. @@ -772,8 +773,17 @@ def wait_until_resolved(timeout) @Lock.synchronize do @Waiters.increment begin - unless resolved? - @Condition.wait @Lock, timeout + if timeout + start = Concurrent.monotonic_time + until resolved? + break if @Condition.wait(@Lock, timeout) == nil # nil means timeout + timeout -= (Concurrent.monotonic_time - start) + break if timeout <= 0 + end + else + until resolved? + @Condition.wait(@Lock, timeout) + end end ensure # JRuby may raise ConcurrencyError @@ -1034,7 +1044,7 @@ def then(*args, &task) # @return [Future] # @yield [value, *args] to the task. def then_on(executor, *args, &task) - ThenPromise.new_blocked_by1(self, @DefaultExecutor, executor, args, &task).future + ThenPromise.new_blocked_by1(self, executor, executor, args, &task).future end # @!macro promises.shortcut.on @@ -1052,7 +1062,7 @@ def rescue(*args, &task) # @return [Future] # @yield [reason, *args] to the task. def rescue_on(executor, *args, &task) - RescuePromise.new_blocked_by1(self, @DefaultExecutor, executor, args, &task).future + RescuePromise.new_blocked_by1(self, executor, executor, args, &task).future end # @!macro promises.method.zip diff --git a/lib/concurrent-ruby/concurrent/scheduled_task.rb b/lib/concurrent-ruby/concurrent/scheduled_task.rb index 429fc0683..efe9e193a 100644 --- a/lib/concurrent-ruby/concurrent/scheduled_task.rb +++ b/lib/concurrent-ruby/concurrent/scheduled_task.rb @@ -193,7 +193,7 @@ def initialize(delay, opts = {}, &task) end end - # The `delay` value given at instanciation. + # The `delay` value given at instantiation. # # @return [Float] the initial delay. def initial_delay diff --git a/lib/concurrent-ruby/concurrent/synchronization/abstract_struct.rb b/lib/concurrent-ruby/concurrent/synchronization/abstract_struct.rb index 1fe90c164..28816c518 100644 --- a/lib/concurrent-ruby/concurrent/synchronization/abstract_struct.rb +++ b/lib/concurrent-ruby/concurrent/synchronization/abstract_struct.rb @@ -157,7 +157,7 @@ def ns_initialize(*values) end end members.each_with_index do |member, index| - clazz.send :remove_method, member if clazz.instance_methods.include? member + clazz.send :remove_method, member if clazz.instance_methods(false).include? member clazz.send(:define_method, member) do @values[index] end diff --git a/lib/concurrent-ruby/concurrent/synchronization/object.rb b/lib/concurrent-ruby/concurrent/synchronization/object.rb index e839c9f18..592190708 100644 --- a/lib/concurrent-ruby/concurrent/synchronization/object.rb +++ b/lib/concurrent-ruby/concurrent/synchronization/object.rb @@ -58,7 +58,7 @@ def self.new(*args, &block) # Creates methods for reading and writing to a instance variable with # volatile (Java) semantic as {.attr_volatile} does. - # The instance variable should be accessed oly through generated methods. + # The instance variable should be accessed only through generated methods. # This method generates following methods: `value`, `value=(new_value) #=> new_value`, # `swap_value(new_value) #=> old_value`, # `compare_and_set_value(expected, value) #=> true || false`, `update_value(&block)`. diff --git a/lib/concurrent-ruby/concurrent/thread_safe/util/adder.rb b/lib/concurrent-ruby/concurrent/thread_safe/util/adder.rb index 7a6e8d5c0..852b403b8 100644 --- a/lib/concurrent-ruby/concurrent/thread_safe/util/adder.rb +++ b/lib/concurrent-ruby/concurrent/thread_safe/util/adder.rb @@ -9,7 +9,7 @@ module ThreadSafe # @!visibility private module Util - # A Ruby port of the Doug Lea's jsr166e.LondAdder class version 1.8 + # A Ruby port of the Doug Lea's jsr166e.LongAdder class version 1.8 # available in public domain. # # Original source code available here: diff --git a/lib/concurrent-ruby/concurrent/thread_safe/util/cheap_lockable.rb b/lib/concurrent-ruby/concurrent/thread_safe/util/cheap_lockable.rb deleted file mode 100644 index a07678df2..000000000 --- a/lib/concurrent-ruby/concurrent/thread_safe/util/cheap_lockable.rb +++ /dev/null @@ -1,81 +0,0 @@ -require 'concurrent/thread_safe/util' -require 'concurrent/thread_safe/util/volatile' -require 'concurrent/utility/engine' - -module Concurrent - - # @!visibility private - module ThreadSafe - - # @!visibility private - module Util - - # Provides a cheapest possible (mainly in terms of memory usage) +Mutex+ - # with the +ConditionVariable+ bundled in. - # - # Usage: - # class A - # include CheapLockable - # - # def do_exlusively - # cheap_synchronize { yield } - # end - # - # def wait_for_something - # cheap_synchronize do - # cheap_wait until resource_available? - # do_something - # cheap_broadcast # wake up others - # end - # end - # end - # - # @!visibility private - module CheapLockable - private - if Concurrent.on_jruby? - # Use Java's native synchronized (this) { wait(); notifyAll(); } to avoid the overhead of the extra Mutex objects - require 'jruby' - - def cheap_synchronize - JRuby.reference0(self).synchronized { yield } - end - - def cheap_wait - JRuby.reference0(self).wait - end - - def cheap_broadcast - JRuby.reference0(self).notify_all - end - else - require 'thread' - - extend Volatile - attr_volatile :mutex - - # Non-reentrant Mutex#syncrhonize - def cheap_synchronize - true until (my_mutex = mutex) || cas_mutex(nil, my_mutex = Mutex.new) - my_mutex.synchronize { yield } - end - - # Releases this object's +cheap_synchronize+ lock and goes to sleep waiting for other threads to +cheap_broadcast+, reacquires the lock on wakeup. - # Must only be called in +cheap_broadcast+'s block. - def cheap_wait - conditional_variable = @conditional_variable ||= ConditionVariable.new - conditional_variable.wait(mutex) - end - - # Wakes up all threads waiting for this object's +cheap_synchronize+ lock. - # Must only be called in +cheap_broadcast+'s block. - def cheap_broadcast - if conditional_variable = @conditional_variable - conditional_variable.broadcast - end - end - end - end - end - end -end diff --git a/lib/concurrent-ruby/concurrent/thread_safe/util/xor_shift_random.rb b/lib/concurrent-ruby/concurrent/thread_safe/util/xor_shift_random.rb index bdde2dd8b..c231d182c 100644 --- a/lib/concurrent-ruby/concurrent/thread_safe/util/xor_shift_random.rb +++ b/lib/concurrent-ruby/concurrent/thread_safe/util/xor_shift_random.rb @@ -15,7 +15,7 @@ module Util # Usage: # x = XorShiftRandom.get # uses Kernel.rand to generate an initial seed # while true - # if (x = XorShiftRandom.xorshift).odd? # thread-localy generate a next random number + # if (x = XorShiftRandom.xorshift).odd? # thread-locally generate a next random number # do_something_at_random # end # end diff --git a/lib/concurrent-ruby/concurrent/timer_task.rb b/lib/concurrent-ruby/concurrent/timer_task.rb index b69cfc8d8..dd2037f62 100644 --- a/lib/concurrent-ruby/concurrent/timer_task.rb +++ b/lib/concurrent-ruby/concurrent/timer_task.rb @@ -32,6 +32,17 @@ module Concurrent # be tested separately then passed to the `TimerTask` for scheduling and # running. # + # A `TimerTask` supports two different types of interval calculations. + # A fixed delay will always wait the same amount of time between the + # completion of one task and the start of the next. A fixed rate will + # attempt to maintain a constant rate of execution regardless of the + # duration of the task. For example, if a fixed rate task is scheduled + # to run every 60 seconds but the task itself takes 10 seconds to + # complete, the next task will be scheduled to run 50 seconds after + # the start of the previous task. If the task takes 70 seconds to + # complete, the next task will be start immediately after the previous + # task completes. Tasks will not be executed concurrently. + # # In some cases it may be necessary for a `TimerTask` to affect its own # execution cycle. To facilitate this, a reference to the TimerTask instance # is passed as an argument to the provided block every time the task is @@ -74,6 +85,12 @@ module Concurrent # # #=> 'Boom!' # + # @example Configuring `:interval_type` with either :fixed_delay or :fixed_rate, default is :fixed_delay + # task = Concurrent::TimerTask.new(execution_interval: 5, interval_type: :fixed_rate) do + # puts 'Boom!' + # end + # task.interval_type #=> :fixed_rate + # # @example Last `#value` and `Dereferenceable` mixin # task = Concurrent::TimerTask.new( # dup_on_deref: true, @@ -87,7 +104,7 @@ module Concurrent # # @example Controlling execution from within the block # timer_task = Concurrent::TimerTask.new(execution_interval: 1) do |task| - # task.execution_interval.times{ print 'Boom! ' } + # task.execution_interval.to_i.times{ print 'Boom! ' } # print "\n" # task.execution_interval += 1 # if task.execution_interval > 5 @@ -96,7 +113,7 @@ module Concurrent # end # end # - # timer_task.execute # blocking call - this task will stop itself + # timer_task.execute # #=> Boom! # #=> Boom! Boom! # #=> Boom! Boom! Boom! @@ -152,18 +169,30 @@ class TimerTask < RubyExecutorService # Default `:execution_interval` in seconds. EXECUTION_INTERVAL = 60 - # Default `:timeout_interval` in seconds. - TIMEOUT_INTERVAL = 30 + # Maintain the interval between the end of one execution and the start of the next execution. + FIXED_DELAY = :fixed_delay + + # Maintain the interval between the start of one execution and the start of the next. + # If execution time exceeds the interval, the next execution will start immediately + # after the previous execution finishes. Executions will not run concurrently. + FIXED_RATE = :fixed_rate + + # Default `:interval_type` + DEFAULT_INTERVAL_TYPE = FIXED_DELAY # Create a new TimerTask with the given task and configuration. # # @!macro timer_task_initialize # @param [Hash] opts the options defining task execution. - # @option opts [Integer] :execution_interval number of seconds between + # @option opts [Float] :execution_interval number of seconds between # task executions (default: EXECUTION_INTERVAL) # @option opts [Boolean] :run_now Whether to run the task immediately # upon instantiation or to wait until the first # execution_interval # has passed (default: false) + # @options opts [Symbol] :interval_type method to calculate the interval + # between executions, can be either :fixed_rate or :fixed_delay. + # (default: :fixed_delay) + # @option opts [Executor] executor, default is `global_io_executor` # # @!macro deref_options # @@ -242,6 +271,10 @@ def execution_interval=(value) end end + # @!attribute [r] interval_type + # @return [Symbol] method to calculate the interval between executions + attr_reader :interval_type + # @!attribute [rw] timeout_interval # @return [Fixnum] Number of seconds the task can run before it is # considered to have failed. @@ -264,11 +297,17 @@ def ns_initialize(opts, &task) set_deref_options(opts) self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL + if opts[:interval_type] && ![FIXED_DELAY, FIXED_RATE].include?(opts[:interval_type]) + raise ArgumentError.new('interval_type must be either :fixed_delay or :fixed_rate') + end if opts[:timeout] || opts[:timeout_interval] warn 'TimeTask timeouts are now ignored as these were not able to be implemented correctly' end + @run_now = opts[:now] || opts[:run_now] - @executor = Concurrent::SafeTaskExecutor.new(task) + @interval_type = opts[:interval_type] || DEFAULT_INTERVAL_TYPE + @task = Concurrent::SafeTaskExecutor.new(task) + @executor = opts[:executor] || Concurrent.global_io_executor @running = Concurrent::AtomicBoolean.new(false) @value = nil @@ -289,17 +328,18 @@ def ns_kill_execution # @!visibility private def schedule_next_task(interval = execution_interval) - ScheduledTask.execute(interval, args: [Concurrent::Event.new], &method(:execute_task)) + ScheduledTask.execute(interval, executor: @executor, args: [Concurrent::Event.new], &method(:execute_task)) nil end # @!visibility private def execute_task(completion) return nil unless @running.true? - _success, value, reason = @executor.execute(self) + start_time = Concurrent.monotonic_time + _success, value, reason = @task.execute(self) if completion.try? self.value = value - schedule_next_task + schedule_next_task(calculate_next_interval(start_time)) time = Time.now observers.notify_observers do [time, self.value, reason] @@ -307,5 +347,15 @@ def execute_task(completion) end nil end + + # @!visibility private + def calculate_next_interval(start_time) + if @interval_type == FIXED_RATE + run_time = Concurrent.monotonic_time - start_time + [execution_interval - run_time, 0].max + else # FIXED_DELAY + execution_interval + end + end end end diff --git a/lib/concurrent-ruby/concurrent/utility/processor_counter.rb b/lib/concurrent-ruby/concurrent/utility/processor_counter.rb index 986e2d523..2489cbd76 100644 --- a/lib/concurrent-ruby/concurrent/utility/processor_counter.rb +++ b/lib/concurrent-ruby/concurrent/utility/processor_counter.rb @@ -11,6 +11,8 @@ class ProcessorCounter def initialize @processor_count = Delay.new { compute_processor_count } @physical_processor_count = Delay.new { compute_physical_processor_count } + @cpu_quota = Delay.new { compute_cpu_quota } + @cpu_shares = Delay.new { compute_cpu_shares } end def processor_count @@ -21,6 +23,29 @@ def physical_processor_count @physical_processor_count.value end + def available_processor_count + cpu_count = processor_count.to_f + quota = cpu_quota + + return cpu_count if quota.nil? + + # cgroup cpus quotas have no limits, so they can be set to higher than the + # real count of cores. + if quota > cpu_count + cpu_count + else + quota + end + end + + def cpu_quota + @cpu_quota.value + end + + def cpu_shares + @cpu_shares.value + end + private def compute_processor_count @@ -48,10 +73,20 @@ def compute_physical_processor_count end cores.count when /mswin|mingw/ - require 'win32ole' - result_set = WIN32OLE.connect("winmgmts://").ExecQuery( - "select NumberOfCores from Win32_Processor") - result_set.to_enum.collect(&:NumberOfCores).reduce(:+) + # Get-CimInstance introduced in PowerShell 3 or earlier: https://learn.microsoft.com/en-us/previous-versions/powershell/module/cimcmdlets/get-ciminstance?view=powershell-3.0 + result = run('powershell -command "Get-CimInstance -ClassName Win32_Processor -Property NumberOfCores | Select-Object -Property NumberOfCores"') + if !result || $?.exitstatus != 0 + # fallback to deprecated wmic for older systems + result = run("wmic cpu get NumberOfCores") + end + if !result || $?.exitstatus != 0 + # Bail out if both commands returned something unexpected + processor_count + else + # powershell: "\nNumberOfCores\n-------------\n 4\n\n\n" + # wmic: "NumberOfCores \n\n4 \n\n\n\n" + result.scan(/\d+/).map(&:to_i).reduce(:+) + end else processor_count end @@ -60,6 +95,45 @@ def compute_physical_processor_count rescue return 1 end + + def run(command) + IO.popen(command, &:read) + rescue Errno::ENOENT + end + + def compute_cpu_quota + if RbConfig::CONFIG["target_os"].include?("linux") + if File.exist?("/sys/fs/cgroup/cpu.max") + # cgroups v2: https://docs.kernel.org/admin-guide/cgroup-v2.html#cpu-interface-files + cpu_max = File.read("/sys/fs/cgroup/cpu.max") + return nil if cpu_max.start_with?("max ") # no limit + max, period = cpu_max.split.map(&:to_f) + max / period + elsif File.exist?("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us") + # cgroups v1: https://kernel.googlesource.com/pub/scm/linux/kernel/git/glommer/memcg/+/cpu_stat/Documentation/cgroups/cpu.txt + max = File.read("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").to_i + # If the cpu.cfs_quota_us is -1, cgroup does not adhere to any CPU time restrictions + # https://docs.kernel.org/scheduler/sched-bwc.html#management + return nil if max <= 0 + period = File.read("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_period_us").to_f + max / period + end + end + end + + def compute_cpu_shares + if RbConfig::CONFIG["target_os"].include?("linux") + if File.exist?("/sys/fs/cgroup/cpu.weight") + # cgroups v2: https://docs.kernel.org/admin-guide/cgroup-v2.html#cpu-interface-files + # Ref: https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2254-cgroup-v2#phase-1-convert-from-cgroups-v1-settings-to-v2 + weight = File.read("/sys/fs/cgroup/cpu.weight").to_f + ((((weight - 1) * 262142) / 9999) + 2) / 1024 + elsif File.exist?("/sys/fs/cgroup/cpu/cpu.shares") + # cgroups v1: https://kernel.googlesource.com/pub/scm/linux/kernel/git/glommer/memcg/+/cpu_stat/Documentation/cgroups/cpu.txt + File.read("/sys/fs/cgroup/cpu/cpu.shares").to_f / 1024 + end + end + end end end @@ -75,8 +149,8 @@ def compute_physical_processor_count # `java.lang.Runtime.getRuntime.availableProcessors` will be used. According # to the Java documentation this "value may change during a particular # invocation of the virtual machine... [applications] should therefore - # occasionally poll this property." Subsequently the result will NOT be - # memoized under JRuby. + # occasionally poll this property." We still memoize this value once under + # JRuby. # # Otherwise Ruby's Etc.nprocessors will be used. # @@ -107,4 +181,40 @@ def self.processor_count def self.physical_processor_count processor_counter.physical_processor_count end + + # Number of processors cores available for process scheduling. + # This method takes in account the CPU quota if the process is inside a cgroup with a + # dedicated CPU quota (typically Docker). + # Otherwise it returns the same value as #processor_count but as a Float. + # + # For performance reasons the calculated value will be memoized on the first + # call. + # + # @return [Float] number of available processors + def self.available_processor_count + processor_counter.available_processor_count + end + + # The maximum number of processors cores available for process scheduling. + # Returns `nil` if there is no enforced limit, or a `Float` if the + # process is inside a cgroup with a dedicated CPU quota (typically Docker). + # + # Note that nothing prevents setting a CPU quota higher than the actual number of + # cores on the system. + # + # For performance reasons the calculated value will be memoized on the first + # call. + # + # @return [nil, Float] Maximum number of available processors as set by a cgroup CPU quota, or nil if none set + def self.cpu_quota + processor_counter.cpu_quota + end + + # The CPU shares requested by the process. For performance reasons the calculated + # value will be memoized on the first call. + # + # @return [Float, nil] CPU shares requested by the process, or nil if not set + def self.cpu_shares + processor_counter.cpu_shares + end end diff --git a/lib/concurrent-ruby/concurrent/version.rb b/lib/concurrent-ruby/concurrent/version.rb index d1c098956..f773e44f1 100644 --- a/lib/concurrent-ruby/concurrent/version.rb +++ b/lib/concurrent-ruby/concurrent/version.rb @@ -1,3 +1,3 @@ module Concurrent - VERSION = '1.2.2' + VERSION = '1.3.5' end diff --git a/spec/concurrent/actor_spec.rb b/spec/concurrent/actor_spec.rb index 5bb639fed..9f1e53b7c 100644 --- a/spec/concurrent/actor_spec.rb +++ b/spec/concurrent/actor_spec.rb @@ -160,7 +160,7 @@ def on_message(message) end describe 'dead letter routing' do - it 'logs by deafault' do + it 'logs by default' do ping = Ping.spawn! :ping, [] ping << :terminate! ping << 'asd' diff --git a/spec/concurrent/async_spec.rb b/spec/concurrent/async_spec.rb index c0ebfe8e6..aae2642db 100644 --- a/spec/concurrent/async_spec.rb +++ b/spec/concurrent/async_spec.rb @@ -153,7 +153,7 @@ def many(*args, &block) nil; end }.to raise_error(ArgumentError) end - it 'raises an error when pasing too many arguments (arity >= 0)' do + it 'raises an error when passing too many arguments (arity >= 0)' do expect { subject.async.echo(1, 2, 3, 4, 5) }.to raise_error(StandardError) @@ -224,7 +224,7 @@ def many(*args, &block) nil; end }.to raise_error(ArgumentError) end - it 'raises an error when pasing too many arguments (arity >= 0)' do + it 'raises an error when passing too many arguments (arity >= 0)' do expect { subject.await.echo(1, 2, 3, 4, 5) }.to raise_error(StandardError) diff --git a/spec/concurrent/atomic/atomic_boolean_spec.rb b/spec/concurrent/atomic/atomic_boolean_spec.rb index 133b7f2ce..76edd3e27 100644 --- a/spec/concurrent/atomic/atomic_boolean_spec.rb +++ b/spec/concurrent/atomic/atomic_boolean_spec.rb @@ -142,7 +142,7 @@ module Concurrent end end - if Concurrent.allow_c_extensions? + if Concurrent.c_extensions_loaded? RSpec.describe CAtomicBoolean do it_should_behave_like :atomic_boolean end @@ -165,7 +165,7 @@ module Concurrent it 'inherits from JavaAtomicBoolean' do expect(AtomicBoolean.ancestors).to include(JavaAtomicBoolean) end - elsif Concurrent.allow_c_extensions? + elsif Concurrent.c_extensions_loaded? it 'inherits from CAtomicBoolean' do expect(AtomicBoolean.ancestors).to include(CAtomicBoolean) end diff --git a/spec/concurrent/atomic/atomic_fixnum_spec.rb b/spec/concurrent/atomic/atomic_fixnum_spec.rb index 39259b13f..8222bf523 100644 --- a/spec/concurrent/atomic/atomic_fixnum_spec.rb +++ b/spec/concurrent/atomic/atomic_fixnum_spec.rb @@ -204,7 +204,7 @@ module Concurrent end end - if Concurrent.allow_c_extensions? + if Concurrent.c_extensions_loaded? RSpec.describe CAtomicFixnum do it_should_behave_like :atomic_fixnum end @@ -227,7 +227,7 @@ module Concurrent it 'inherits from JavaAtomicFixnum' do expect(AtomicFixnum.ancestors).to include(JavaAtomicFixnum) end - elsif Concurrent.allow_c_extensions? + elsif Concurrent.c_extensions_loaded? it 'inherits from CAtomicFixnum' do expect(AtomicFixnum.ancestors).to include(CAtomicFixnum) end diff --git a/spec/concurrent/atomic/atomic_reference_spec.rb b/spec/concurrent/atomic/atomic_reference_spec.rb index 2cc0caada..7eaf2e55e 100644 --- a/spec/concurrent/atomic/atomic_reference_spec.rb +++ b/spec/concurrent/atomic/atomic_reference_spec.rb @@ -163,7 +163,7 @@ module Concurrent it_should_behave_like :atomic_reference end - if Concurrent.allow_c_extensions? + if Concurrent.c_extensions_loaded? RSpec.describe CAtomicReference do it_should_behave_like :atomic_reference end @@ -190,7 +190,7 @@ module Concurrent it 'inherits from JavaAtomicReference' do expect(described_class.ancestors).to include(Concurrent::JavaAtomicReference) end - elsif Concurrent.allow_c_extensions? + elsif Concurrent.c_extensions_loaded? it 'inherits from CAtomicReference' do expect(described_class.ancestors).to include(Concurrent::CAtomicReference) end diff --git a/spec/concurrent/atomic/cyclic_barrier_spec.rb b/spec/concurrent/atomic/cyclic_barrier_spec.rb index 3a2bb2426..8ed899d67 100644 --- a/spec/concurrent/atomic/cyclic_barrier_spec.rb +++ b/spec/concurrent/atomic/cyclic_barrier_spec.rb @@ -71,6 +71,7 @@ module Concurrent end start_latch.wait(1) + repeat_until_success { expect(barrier.number_waiting).to eq 1 } barrier.reset expect(barrier).not_to be_broken diff --git a/spec/concurrent/channel/integration_spec.rb b/spec/concurrent/channel/integration_spec.rb index 01f490dc9..4d4c073ee 100644 --- a/spec/concurrent/channel/integration_spec.rb +++ b/spec/concurrent/channel/integration_spec.rb @@ -68,7 +68,7 @@ end specify 'default-selection.rb' do - skip('flaky') if Concurrent.on_jruby? || Concurrent.on_truffleruby? + skip('flaky') if Concurrent.on_jruby? || Concurrent.on_truffleruby? || Concurrent.on_windows? expected = <<-STDOUT . . diff --git a/spec/concurrent/edge/erlang_actor_spec.rb b/spec/concurrent/edge/erlang_actor_spec.rb index ed94607dc..5ab7c4e48 100644 --- a/spec/concurrent/edge/erlang_actor_spec.rb +++ b/spec/concurrent/edge/erlang_actor_spec.rb @@ -918,7 +918,7 @@ end specify "timing out" do - skip('flaky on truffleruby') if Concurrent.on_truffleruby? + skip('flaky on truffleruby and jruby') if Concurrent.on_truffleruby? || Concurrent.on_jruby? count_down = Concurrent::CountDownLatch.new body = { on_thread: -> { m = receive; count_down.wait; reply m }, diff --git a/spec/concurrent/executor/executor_service_shared.rb b/spec/concurrent/executor/executor_service_shared.rb index e5ffa367c..7b87922d5 100644 --- a/spec/concurrent/executor/executor_service_shared.rb +++ b/spec/concurrent/executor/executor_service_shared.rb @@ -3,7 +3,7 @@ require 'concurrent/atomic/atomic_fixnum' require 'timeout' -RSpec.shared_examples :executor_service do +RSpec.shared_examples :executor_service do |immediate_type: false| after(:each) do subject.shutdown @@ -84,6 +84,48 @@ end end + context '#shuttingdown?' do + it 'returns false when the thread pool is running' do + expect(subject).not_to be_shuttingdown + end + + it 'returns true when the thread pool is shutting down' do + skip "will never be in shuttingdown? state" if immediate_type + + subject.post{ sleep(0.5) } + subject.shutdown + expect(subject).to be_shuttingdown + expect(subject.wait_for_termination(pool_termination_timeout)).to eq true + end + + it 'returns false when the thread pool is shutdown' do + subject.shutdown + expect(subject.wait_for_termination(pool_termination_timeout)).to eq true + expect(subject).not_to be_shuttingdown + end + end + + context '#shutdown?' do + it 'returns false when the thread pool is running' do + expect(subject).not_to be_shutdown + end + + it 'returns false when the thread pool is shutting down' do + skip "will never be in shuttingdown? state" if immediate_type + + subject.post{ sleep(0.5) } + subject.shutdown + expect(subject).not_to be_shutdown + expect(subject.wait_for_termination(pool_termination_timeout)).to eq true + end + + it 'returns true when the thread pool is shutdown' do + subject.shutdown + expect(subject.wait_for_termination(pool_termination_timeout)).to eq true + expect(subject).to be_shutdown + end + end + context '#shutdown' do it 'stops accepting new tasks' do diff --git a/spec/concurrent/executor/immediate_executor_spec.rb b/spec/concurrent/executor/immediate_executor_spec.rb index c53efd72e..53f2748ab 100644 --- a/spec/concurrent/executor/immediate_executor_spec.rb +++ b/spec/concurrent/executor/immediate_executor_spec.rb @@ -7,6 +7,6 @@ module Concurrent subject { ImmediateExecutor.new } - it_should_behave_like :executor_service + it_should_behave_like :executor_service, immediate_type: true end end diff --git a/spec/concurrent/executor/indirect_immediate_executor_spec.rb b/spec/concurrent/executor/indirect_immediate_executor_spec.rb index f40364f28..c3c043d73 100644 --- a/spec/concurrent/executor/indirect_immediate_executor_spec.rb +++ b/spec/concurrent/executor/indirect_immediate_executor_spec.rb @@ -7,7 +7,7 @@ module Concurrent subject { IndirectImmediateExecutor.new } - it_should_behave_like :executor_service + it_should_behave_like :executor_service, immediate_type: true it "runs its tasks synchronously" do start = Time.now diff --git a/spec/concurrent/executor/safe_task_executor_spec.rb b/spec/concurrent/executor/safe_task_executor_spec.rb index 5b4edf6a4..2907109e9 100644 --- a/spec/concurrent/executor/safe_task_executor_spec.rb +++ b/spec/concurrent/executor/safe_task_executor_spec.rb @@ -33,7 +33,7 @@ module Concurrent expect(expected).to eq [1, 2, 3] end - it 'protectes #execute with a mutex' do + it 'protects #execute with a mutex' do expect(subject).to receive(:synchronize).with(no_args) subject.execute end diff --git a/spec/concurrent/executor/serialized_execution_spec.rb b/spec/concurrent/executor/serialized_execution_spec.rb index 5f0cb4172..11b9bc72c 100644 --- a/spec/concurrent/executor/serialized_execution_spec.rb +++ b/spec/concurrent/executor/serialized_execution_spec.rb @@ -8,6 +8,6 @@ module Concurrent subject { SerializedExecutionDelegator.new(ImmediateExecutor.new) } - it_should_behave_like :executor_service + it_should_behave_like :executor_service, immediate_type: true end end diff --git a/spec/concurrent/executor/thread_pool_executor_shared.rb b/spec/concurrent/executor/thread_pool_executor_shared.rb index bb91b3d4b..147e2c5ca 100644 --- a/spec/concurrent/executor/thread_pool_executor_shared.rb +++ b/spec/concurrent/executor/thread_pool_executor_shared.rb @@ -1,5 +1,6 @@ require_relative 'thread_pool_shared' require 'concurrent/atomic/atomic_fixnum' +require 'concurrent/atomic/cyclic_barrier' RSpec.shared_examples :thread_pool_executor do @@ -258,6 +259,36 @@ end end + context '#active_count' do + subject do + described_class.new( + min_threads: 5, + max_threads: 10, + idletime: 60, + max_queue: 0, + fallback_policy: :discard + ) + end + + it 'returns the number of threads that are actively executing tasks.' do + barrier = Concurrent::CyclicBarrier.new(4) + latch = Concurrent::CountDownLatch.new(1) + + 3.times do + subject.post do + barrier.wait + latch.wait + end + end + barrier.wait + + expect(subject.active_count).to eq 3 + + # release + latch.count_down + end + end + context '#fallback_policy' do let!(:min_threads){ 1 } @@ -380,7 +411,7 @@ subject << proc { all_tasks_posted.wait; initial_executed.increment; } end - # Inject 20 more tasks, which should throw an exeption + # Inject 20 more tasks, which should throw an exception 20.times do expect { subject << proc { subsequent_executed.increment; } @@ -627,33 +658,33 @@ max_threads: 1, max_queue: 1, fallback_policy: :caller_runs) - + worker_unblocker = Concurrent::CountDownLatch.new(1) executor_unblocker = Concurrent::CountDownLatch.new(1) queue_done = Concurrent::CountDownLatch.new(1) - + # Block the worker thread executor << proc { worker_unblocker.wait } - + # Fill the queue executor << proc { log.push :queued; queue_done.count_down } - + # Block in a caller_runs job caller_runs_thread = Thread.new { executor << proc { executor_unblocker.wait; log.push :unblocked } } - + # Wait until the caller_runs job is blocked Thread.pass until caller_runs_thread.status == 'sleep' - + # Now unblock the worker thread worker_unblocker.count_down queue_done.wait executor_unblocker.count_down - + # Tidy up caller_runs_thread.join - + # We will see the queued jobs run before the caller_runs job unblocks expect([log.pop, log.pop]).to eq [:queued, :unblocked] end diff --git a/spec/concurrent/executor/timer_set_spec.rb b/spec/concurrent/executor/timer_set_spec.rb index 372b9af5b..a386553b4 100644 --- a/spec/concurrent/executor/timer_set_spec.rb +++ b/spec/concurrent/executor/timer_set_spec.rb @@ -122,6 +122,20 @@ module Concurrent expect(task.value).to eq i end end + + it 'safely handles an executor raising RejectedExecutionError' do + # force a task's executor to raise RejectedExecutionError within the TimerSet + abort_executor = ImmediateExecutor.new + allow(abort_executor).to receive(:post).and_raise(Concurrent::RejectedExecutionError) + ScheduledTask.execute(0.2, executor: abort_executor, timer_set: subject){ nil } + abort_executor.shutdown + + latch = CountDownLatch.new(1) + ScheduledTask.execute(0.3, timer_set: subject) do + latch.count_down + end + expect(latch.wait(1)).to be_truthy + end end context 'resolution' do diff --git a/spec/concurrent/map_spec.rb b/spec/concurrent/map_spec.rb index d42f88607..4f1ff2bff 100644 --- a/spec/concurrent/map_spec.rb +++ b/spec/concurrent/map_spec.rb @@ -840,12 +840,18 @@ def key # assert_collision_resistance expects to be able to call .key to get the def with_or_without_default_proc(&block) block.call(false) - @cache = Concurrent::Map.new { |h, k| h[k] = :default_value } + @cache = Concurrent::Map.new { |h, k| + expect(h).to be_kind_of(Concurrent::Map) + h[k] = :default_value + } block.call(true) end def cache_with_default_proc(default_value = 1) - Concurrent::Map.new { |cache, k| cache[k] = default_value } + Concurrent::Map.new do |map, k| + expect(map).to be_kind_of(Concurrent::Map) + map[k] = default_value + end end def expect_size_change(change, cache = @cache, &block) diff --git a/spec/concurrent/processing_actor_spec.rb b/spec/concurrent/processing_actor_spec.rb index 36e3ac0e8..4632eaed8 100644 --- a/spec/concurrent/processing_actor_spec.rb +++ b/spec/concurrent/processing_actor_spec.rb @@ -29,7 +29,7 @@ def count(actor, count) # continue running count(actor, count) when Integer - # this will call count again to set up what to do on next message, based on new state `count + numer` + # this will call count again to set up what to do on next message, based on new state `count + number` count(actor, count + number_or_command) end end diff --git a/spec/concurrent/promises_spec.rb b/spec/concurrent/promises_spec.rb index 5e9d735d5..2aa88fdfa 100644 --- a/spec/concurrent/promises_spec.rb +++ b/spec/concurrent/promises_spec.rb @@ -380,12 +380,12 @@ def behaves_as_delay(delay, value) end it 'chains' do - future0 = future { 1 }.then { |v| v + 2 } # both executed on default FAST_EXECUTOR - future1 = future0.then_on(:fast) { raise 'boo' } # executed on IO_EXECUTOR - future2 = future1.then { |v| v + 1 } # will reject with 'boo' error, executed on default FAST_EXECUTOR - future3 = future1.rescue { |err| err.message } # executed on default FAST_EXECUTOR - future4 = future0.chain { |success, value, reason| success } # executed on default FAST_EXECUTOR - future5 = future3.with_default_executor(:fast) # connects new future with different executor, the new future is resolved when future3 is + future0 = future { 1 }.then { |v| v + 2 } # both executed on default IO_EXECUTOR + future1 = future0.then_on(:fast) { raise 'boo' } # executed on FAST_EXECUTOR + future2 = future1.then { |v| v + 1 } # will reject with 'boo' error, executed on FAST_EXECUTOR + future3 = future1.rescue { |err| err.message } # executed on FAST_EXECUTOR + future4 = future0.chain { |success, value, reason| success } # executed on FAST_EXECUTOR + future5 = future3.with_default_executor(:io) # connects new future with different executor, the new future is resolved when future3 is future6 = future5.then(&:capitalize) # executes on IO_EXECUTOR because default was set to :io on future5 future7 = future0 & future3 future8 = future0.rescue { raise 'never happens' } # future0 fulfills so future8'll have same value as future 0 @@ -402,12 +402,12 @@ def behaves_as_delay(delay, value) expect(table.join("\n")).to eq <<-TABLE.gsub(/^\s+\|/, '').strip |index success value reason pool d.pool | 0 true 3 io io - | 1 false boo fast io - | 2 false boo io io - | 3 true boo io io + | 1 false boo fast fast + | 2 false boo fast fast + | 3 true boo fast fast | 4 true true io io - | 5 true boo fast - | 6 true Boo fast fast + | 5 true boo io + | 6 true Boo io io | 7 true [3, "boo"] io | 8 true 3 io io TABLE @@ -754,4 +754,33 @@ def behaves_as_delay(delay, value) specify 'zip_futures_over' do expect(zip_futures_over([1, 2]) { |v| v.succ }.value!).to eq [2, 3] end + + describe 'value!' do + %w[with without].each do |timeout| + it "does not return spuriously #{timeout} timeout" do + skip "SIGHUP not supported" if Concurrent.on_windows? + # https://github.com/ruby-concurrency/concurrent-ruby/issues/1015 + trapped = false + original_handler = Signal.trap(:SIGHUP) { trapped = true } + begin + task = Concurrent::Promises.future { sleep } + main = Thread.current + t = Thread.new do + Thread.pass until main.stop? + Process.kill(:SIGHUP, Process.pid) + sleep 0.1 + main.raise "Done" + end + expect { + timeout == 'with' ? task.value!(3600) : task.value! + expect(task).to be_resolved # fail if #value! returned + }.to raise_error(RuntimeError, "Done") + expect(trapped).to be true + t.join + ensure + Signal.trap(:SIGHUP, original_handler) + end + end + end + end end diff --git a/spec/concurrent/struct_shared.rb b/spec/concurrent/struct_shared.rb index ae812f223..4c73a9e40 100644 --- a/spec/concurrent/struct_shared.rb +++ b/spec/concurrent/struct_shared.rb @@ -26,6 +26,20 @@ expect(clazz.ancestors).to include described_class end + it 'ignores methods on ancestor classes' do + ancestor = described_class.ancestors.last + ancestor.class_eval { def foo(bar); end } + + clazz = described_class.new(:foo) + struct = clazz.new + + expect(struct).to respond_to :foo + method = struct.method(:foo) + expect(method.arity).to eq 0 + + ancestor.send :remove_method, :foo + end + it 'raises an exception when given an invalid class name' do expect{ described_class.new('lowercase') }.to raise_error(NameError) expect{ described_class.new('_') }.to raise_error(NameError) diff --git a/spec/concurrent/timer_task_spec.rb b/spec/concurrent/timer_task_spec.rb index 44cc3e22e..10fbf34db 100644 --- a/spec/concurrent/timer_task_spec.rb +++ b/spec/concurrent/timer_task_spec.rb @@ -82,6 +82,22 @@ def trigger_observable(observable) subject = TimerTask.new(execution_interval: 5) { nil } expect(subject.execution_interval).to eq 5 end + + it 'raises an exception if :interval_type is not a valid value' do + expect { + Concurrent::TimerTask.new(interval_type: :cat) { nil } + }.to raise_error(ArgumentError) + end + + it 'uses the default :interval_type when no type is given' do + subject = TimerTask.new { nil } + expect(subject.interval_type).to eq TimerTask::FIXED_DELAY + end + + it 'uses the given interval type' do + subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE) { nil } + expect(subject.interval_type).to eq TimerTask::FIXED_RATE + end end context '#kill' do @@ -112,7 +128,6 @@ def trigger_observable(observable) end specify '#execution_interval is writeable' do - latch = CountDownLatch.new(1) subject = TimerTask.new(timeout_interval: 1, execution_interval: 1, @@ -132,6 +147,28 @@ def trigger_observable(observable) subject.kill end + it 'raises on invalid interval_type' do + expect { + fixed_delay = TimerTask.new(interval_type: TimerTask::FIXED_DELAY, + execution_interval: 0.1, + run_now: true) { nil } + fixed_delay.kill + }.not_to raise_error + + expect { + fixed_rate = TimerTask.new(interval_type: TimerTask::FIXED_RATE, + execution_interval: 0.1, + run_now: true) { nil } + fixed_rate.kill + }.not_to raise_error + + expect { + TimerTask.new(interval_type: :unknown, + execution_interval: 0.1, + run_now: true) { nil } + }.to raise_error(ArgumentError) + end + specify '#timeout_interval being written produces a warning' do subject = TimerTask.new(timeout_interval: 1, execution_interval: 0.1, @@ -181,6 +218,69 @@ def trigger_observable(observable) expect(latch.count).to eq(0) subject.kill end + + it 'uses the global executor by default' do + executor = Concurrent::ImmediateExecutor.new + allow(Concurrent).to receive(:global_io_executor).and_return(executor) + allow(executor).to receive(:post).and_call_original + + latch = CountDownLatch.new(1) + subject = TimerTask.new(execution_interval: 0.1, run_now: true) { latch.count_down } + subject.execute + expect(latch.wait(1)).to be_truthy + subject.kill + + expect(executor).to have_received(:post) + end + + it 'uses a custom executor when given' do + executor = Concurrent::ImmediateExecutor.new + allow(executor).to receive(:post).and_call_original + + latch = CountDownLatch.new(1) + subject = TimerTask.new(execution_interval: 0.1, run_now: true, executor: executor) { latch.count_down } + subject.execute + expect(latch.wait(1)).to be_truthy + subject.kill + + expect(executor).to have_received(:post) + end + + it 'uses a fixed delay when set' do + finished = [] + latch = CountDownLatch.new(2) + subject = TimerTask.new(interval_type: TimerTask::FIXED_DELAY, + execution_interval: 0.1, + run_now: true) do |task| + sleep(0.2) + finished << Concurrent.monotonic_time + latch.count_down + end + subject.execute + latch.wait(1) + subject.kill + + expect(latch.count).to eq(0) + expect(finished[1] - finished[0]).to be >= 0.3 + end + + it 'uses a fixed rate when set' do + finished = [] + latch = CountDownLatch.new(2) + subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE, + execution_interval: 0.1, + run_now: true) do |task| + sleep(0.2) + finished << Concurrent.monotonic_time + latch.count_down + end + subject.execute + latch.wait(1) + subject.kill + + expect(latch.count).to eq(0) + expect(finished[1] - finished[0]).to be < 0.3 + end end context 'observation' do diff --git a/spec/concurrent/utility/processor_count_spec.rb b/spec/concurrent/utility/processor_count_spec.rb index 229125feb..34d133f70 100644 --- a/spec/concurrent/utility/processor_count_spec.rb +++ b/spec/concurrent/utility/processor_count_spec.rb @@ -4,7 +4,7 @@ module Concurrent RSpec.describe '#processor_count' do - it 'retuns a positive integer' do + it 'returns a positive integer' do expect(Concurrent::processor_count).to be_a Integer expect(Concurrent::processor_count).to be >= 1 end @@ -12,9 +12,115 @@ module Concurrent RSpec.describe '#physical_processor_count' do - it 'retuns a positive integer' do + it 'returns a positive integer' do expect(Concurrent::physical_processor_count).to be_a Integer expect(Concurrent::physical_processor_count).to be >= 1 end end + + RSpec.describe '#cpu_quota' do + + let(:counter) { Concurrent::Utility::ProcessorCounter.new } + + it 'returns #compute_cpu_quota' do + expect(Concurrent::cpu_quota).to be == counter.cpu_quota + end + + it 'returns nil if no quota is detected' do + if RbConfig::CONFIG["target_os"].include?("linux") + expect(File).to receive(:exist?).twice.and_return(nil) # Checks for cgroups V1 and V2 + end + expect(counter.cpu_quota).to be_nil + end + + it 'returns nil if cgroups v2 sets no limit' do + expect(RbConfig::CONFIG).to receive(:[]).with("target_os").and_return("linux") + expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu.max").and_return(true) + expect(File).to receive(:read).with("/sys/fs/cgroup/cpu.max").and_return("max 100000\n") + expect(counter.cpu_quota).to be_nil + end + + it 'returns a float if cgroups v2 sets a limit' do + expect(RbConfig::CONFIG).to receive(:[]).with("target_os").and_return("linux") + expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu.max").and_return(true) + expect(File).to receive(:read).with("/sys/fs/cgroup/cpu.max").and_return("150000 100000\n") + expect(counter.cpu_quota).to be == 1.5 + end + + it 'returns nil if cgroups v1 sets no limit' do + expect(RbConfig::CONFIG).to receive(:[]).with("target_os").and_return("linux") + expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu.max").and_return(false) + expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").and_return(true) + + expect(File).to receive(:read).with("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").and_return("max\n") + expect(counter.cpu_quota).to be_nil + end + + it 'returns nil if cgroups v1 and cpu.cfs_quota_us is -1' do + expect(RbConfig::CONFIG).to receive(:[]).with("target_os").and_return("linux") + expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu.max").and_return(false) + expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").and_return(true) + + expect(File).to receive(:read).with("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").and_return("-1\n") + expect(counter.cpu_quota).to be_nil + end + + it 'returns a float if cgroups v1 sets a limit' do + expect(RbConfig::CONFIG).to receive(:[]).with("target_os").and_return("linux") + expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu.max").and_return(false) + expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").and_return(true) + + expect(File).to receive(:read).with("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_quota_us").and_return("150000\n") + expect(File).to receive(:read).with("/sys/fs/cgroup/cpu,cpuacct/cpu.cfs_period_us").and_return("100000\n") + expect(counter.cpu_quota).to be == 1.5 + end + + end + + RSpec.describe '#available_processor_count' do + + it 'returns #processor_count if #cpu_quota is nil' do + expect(Concurrent::processor_counter).to receive(:cpu_quota).and_return(nil) + available_processor_count = Concurrent.available_processor_count + expect(available_processor_count).to be == Concurrent::processor_count + expect(available_processor_count).to be_a Float + end + + it 'returns #processor_count if #cpu_quota is higher' do + expect(Concurrent::processor_counter).to receive(:cpu_quota).and_return(Concurrent::processor_count.to_f * 2) + available_processor_count = Concurrent.available_processor_count + expect(available_processor_count).to be == Concurrent::processor_count + expect(available_processor_count).to be_a Float + end + + it 'returns #cpu_quota if #cpu_quota is lower than #processor_count' do + expect(Concurrent::processor_counter).to receive(:cpu_quota).and_return(Concurrent::processor_count.to_f / 2) + available_processor_count = Concurrent.available_processor_count + expect(available_processor_count).to be == Concurrent::processor_count.to_f / 2 + expect(available_processor_count).to be_a Float + end + + end + + RSpec.describe '#cpu_shares' do + let(:counter) { Concurrent::Utility::ProcessorCounter.new } + + it 'returns a float when cgroups v2 sets a cpu.weight' do + expect(RbConfig::CONFIG).to receive(:[]).with("target_os").and_return("linux") + expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu.weight").and_return(true) + + expect(File).to receive(:read).with("/sys/fs/cgroup/cpu.weight").and_return("10000\n") + expect(counter.cpu_shares).to be == 256.0 + end + + it 'returns a float if cgroups v1 sets a cpu.shares' do + expect(RbConfig::CONFIG).to receive(:[]).with("target_os").and_return("linux") + expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu.weight").and_return(false) + expect(File).to receive(:exist?).with("/sys/fs/cgroup/cpu/cpu.shares").and_return(true) + + expect(File).to receive(:read).with("/sys/fs/cgroup/cpu/cpu.shares").and_return("512\n") + expect(counter.cpu_shares).to be == 0.5 + end + + end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 2a8fbc54a..1f0048647 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -42,7 +42,7 @@ def requires=(paths) config.before :all do # Only configure logging if it has been required, to make sure the necessary require's are in place if Concurrent.respond_to? :use_simple_logger - Concurrent.use_simple_logger Logger::FATAL + Concurrent.use_simple_logger :FATAL end end @@ -53,7 +53,7 @@ def requires=(paths) config.after :each do while defined?(@created_threads) && @created_threads && (thread = (@created_threads.pop(true) rescue nil)) thread.kill - thread_join = thread.join(1.0) + thread_join = thread.join(10.0) expect(thread_join).not_to be_nil, thread.inspect end end