Skip to content

Spawn several threads when we fail to enqueue work in the blocki… #181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
2 commits merged into from
Nov 1, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 25 additions & 21 deletions src/task/spawn_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::Duration;

use crossbeam_channel::{bounded, Receiver, Sender};
use crossbeam_channel::{unbounded, Receiver, Sender};
use once_cell::sync::Lazy;

use crate::task::{JoinHandle, Task};
Expand Down Expand Up @@ -79,7 +79,7 @@ static POOL: Lazy<Pool> = Lazy::new(|| {
// before being acted on by a core. This helps keep
// latency snappy in the overall async system by
// reducing bufferbloat.
let (sender, receiver) = bounded(0);
let (sender, receiver) = unbounded();
Pool { sender, receiver }
});

Expand All @@ -95,27 +95,31 @@ fn maybe_create_another_blocking_thread() {
return;
}

// We want to avoid having all threads terminate at
// exactly the same time, causing thundering herd
// effects. We want to stagger their destruction over
// 10 seconds or so to make the costs fade into
// background noise.
//
// Generate a simple random number of milliseconds
let rand_sleep_ms = u64::from(random(10_000));
let n_to_spawn = std::cmp::min(2 + (workers / 10), 10);

thread::Builder::new()
.name("async-std/blocking".to_string())
.spawn(move || {
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);
for _ in 0..n_to_spawn {
// We want to avoid having all threads terminate at
// exactly the same time, causing thundering herd
// effects. We want to stagger their destruction over
// 10 seconds or so to make the costs fade into
// background noise.
//
// Generate a simple random number of milliseconds
let rand_sleep_ms = u64::from(random(10_000));

DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
abort_on_panic(|| task.run());
}
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
})
.expect("cannot start a dynamic thread driving blocking tasks");
thread::Builder::new()
.name("async-std/blocking".to_string())
.spawn(move || {
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);

DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
abort_on_panic(|| task.run());
}
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
})
.expect("cannot start a dynamic thread driving blocking tasks");
}
}

// Enqueues work, attempting to send to the threadpool in a
Expand Down