From ee50156eae3e8a269a978dad123d8eeda7e13659 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Fri, 1 Nov 2019 20:55:26 +0100 Subject: [PATCH 1/2] Rebase onto master --- src/task/spawn_blocking.rs | 42 +++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/src/task/spawn_blocking.rs b/src/task/spawn_blocking.rs index b6b5ea34b..33ace29e3 100644 --- a/src/task/spawn_blocking.rs +++ b/src/task/spawn_blocking.rs @@ -95,27 +95,31 @@ fn maybe_create_another_blocking_thread() { return; } - // We want to avoid having all threads terminate at - // exactly the same time, causing thundering herd - // effects. We want to stagger their destruction over - // 10 seconds or so to make the costs fade into - // background noise. - // - // Generate a simple random number of milliseconds - let rand_sleep_ms = u64::from(random(10_000)); + let n_to_spawn = std::cmp::min(2 + (workers / 10), 10); - thread::Builder::new() - .name("async-std/blocking".to_string()) - .spawn(move || { - let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); + for _ in 0..n_to_spawn { + // We want to avoid having all threads terminate at + // exactly the same time, causing thundering herd + // effects. We want to stagger their destruction over + // 10 seconds or so to make the costs fade into + // background noise. + // + // Generate a simple random number of milliseconds + let rand_sleep_ms = u64::from(random(10_000)); - DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); - while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { - abort_on_panic(|| task.run()); - } - DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed); - }) - .expect("cannot start a dynamic thread driving blocking tasks"); + thread::Builder::new() + .name("async-std/blocking".to_string()) + .spawn(move || { + let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); + + DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); + while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { + abort_on_panic(|| task.run()); + } + DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed); + }) + .expect("cannot start a dynamic thread driving blocking tasks"); + } } // Enqueues work, attempting to send to the threadpool in a From 20d39ef2318344bc346c57d8f18795918c0140e3 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Fri, 1 Nov 2019 21:49:49 +0100 Subject: [PATCH 2/2] Switch to unbounded channels --- src/task/spawn_blocking.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/task/spawn_blocking.rs b/src/task/spawn_blocking.rs index 33ace29e3..3f4f18a17 100644 --- a/src/task/spawn_blocking.rs +++ b/src/task/spawn_blocking.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::thread; use std::time::Duration; -use crossbeam_channel::{bounded, Receiver, Sender}; +use crossbeam_channel::{unbounded, Receiver, Sender}; use once_cell::sync::Lazy; use crate::task::{JoinHandle, Task}; @@ -79,7 +79,7 @@ static POOL: Lazy = Lazy::new(|| { // before being acted on by a core. This helps keep // latency snappy in the overall async system by // reducing bufferbloat. - let (sender, receiver) = bounded(0); + let (sender, receiver) = unbounded(); Pool { sender, receiver } });