-
Notifications
You must be signed in to change notification settings - Fork 340
EMA based statistically adaptive thread pool design #108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0e6b30f
4a1afcd
7f237b7
55664f1
ecfce3a
1de75ee
07eac68
08c8e04
2dca64d
d9785cc
9135ca0
4606893
73ccc67
186d55f
e765cee
2340250
572b80f
6664f4b
e934a5e
405b081
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
#![feature(test)] | ||
|
||
extern crate test; | ||
|
||
use async_std::task; | ||
use async_std::task::blocking::JoinHandle; | ||
use futures::future::join_all; | ||
use std::thread; | ||
use std::time::Duration; | ||
use test::Bencher; | ||
|
||
// Benchmark for a 10K burst task spawn | ||
#[bench] | ||
fn blocking(b: &mut Bencher) { | ||
b.iter(|| { | ||
let handles = (0..10_000) | ||
.map(|_| { | ||
task::blocking::spawn(async { | ||
let duration = Duration::from_millis(1); | ||
thread::sleep(duration); | ||
}) | ||
}) | ||
.collect::<Vec<JoinHandle<()>>>(); | ||
|
||
task::block_on(join_all(handles)); | ||
}); | ||
} | ||
|
||
// Benchmark for a single blocking task spawn | ||
#[bench] | ||
fn blocking_single(b: &mut Bencher) { | ||
b.iter(|| { | ||
task::blocking::spawn(async { | ||
let duration = Duration::from_millis(1); | ||
thread::sleep(duration); | ||
}) | ||
}); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,57 @@ | ||
//! A thread pool for running blocking functions asynchronously. | ||
//! | ||
//! Blocking thread pool consists of four elements: | ||
//! * Frequency Detector | ||
//! * Trend Estimator | ||
//! * Predictive Upscaler | ||
//! * Time-based Downscaler | ||
//! | ||
//! ## Frequency Detector | ||
//! Detects how many tasks are submitted from scheduler to thread pool in a given time frame. | ||
//! Pool manager thread does this sampling every 200 milliseconds. | ||
//! This value is going to be used for trend estimation phase. | ||
//! | ||
//! ## Trend Estimator | ||
//! Hold up to the given number of frequencies to create an estimation. | ||
//! Trend estimator holds 10 frequencies at a time. | ||
//! This value is stored as constant in [FREQUENCY_QUEUE_SIZE](constant.FREQUENCY_QUEUE_SIZE.html). | ||
//! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm. | ||
//! | ||
//! This algorithm is adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61) | ||
//! and altered to: | ||
//! * use instead of heavy calculation of trend, utilize thread redundancy which is the sum of the differences between the predicted and observed value. | ||
//! * use instead of linear trend estimation, it uses exponential trend estimation where formula is: | ||
//! ```text | ||
//! LOW_WATERMARK * (predicted - observed) + LOW_WATERMARK | ||
//! ``` | ||
//! *NOTE:* If this algorithm wants to be tweaked increasing [LOW_WATERMARK](constant.LOW_WATERMARK.html) will automatically adapt the additional dynamic thread spawn count | ||
//! * operate without watermarking by timestamps (in paper which is used to measure algorithms own performance during the execution) | ||
//! * operate extensive subsampling. Extensive subsampling congests the pool manager thread. | ||
//! * operate without keeping track of idle time of threads or job out queue like TEMA and FOPS implementations. | ||
//! | ||
//! ## Predictive Upscaler | ||
//! Upscaler has three cases (also can be seen in paper): | ||
//! * The rate slightly increases and there are many idle threads. | ||
//! * The number of worker threads tends to be reduced since the workload of the system is descending. | ||
//! * The system has no request or stalled. (Our case here is when the current tasks block further tasks from being processed – throughput hogs) | ||
//! | ||
//! For the first two EMA calculation and exponential trend estimation gives good performance. | ||
//! For the last case, upscaler selects upscaling amount by amount of tasks mapped when throughput hogs happen. | ||
//! | ||
//! **example scenario:** Let's say we have 10_000 tasks where every one of them is blocking for 1 second. Scheduler will map plenty of tasks but will got rejected. | ||
//! This makes estimation calculation nearly 0 for both entering and exiting parts. When this happens and we still see tasks mapped from scheduler. | ||
//! We start to slowly increase threads by amount of frequency linearly. High increase of this value either make us hit to the thread threshold on | ||
//! some OS or make congestion on the other thread utilizations of the program, because of context switch. | ||
//! | ||
//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency. | ||
vertexclique marked this conversation as resolved.
Show resolved
Hide resolved
|
||
//! Threshold of EMA difference is eluded by machine epsilon for floating point arithmetic errors. | ||
//! | ||
//! ## Time-based Downscaler | ||
//! When threads becomes idle, they will not shut down immediately. | ||
//! Instead, they wait a random amount between 1 and 11 seconds | ||
//! to even out the load. | ||
|
||
use std::collections::VecDeque; | ||
use std::fmt; | ||
use std::pin::Pin; | ||
use std::sync::atomic::{AtomicU64, Ordering}; | ||
|
@@ -10,21 +62,44 @@ use crossbeam_channel::{bounded, Receiver, Sender}; | |
use lazy_static::lazy_static; | ||
|
||
use crate::future::Future; | ||
use crate::io::ErrorKind; | ||
use crate::task::{Context, Poll}; | ||
use crate::utils::abort_on_panic; | ||
use std::sync::Mutex; | ||
|
||
const MAX_THREADS: u64 = 10_000; | ||
/// Low watermark value, defines the bare minimum of the pool. | ||
/// Spawns initial thread set. | ||
const LOW_WATERMARK: u64 = 2; | ||
|
||
static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); | ||
/// Pool managers interval time (milliseconds). | ||
/// This is the actual interval which makes adaptation calculation. | ||
const MANAGER_POLL_INTERVAL: u64 = 200; | ||
|
||
/// Frequency histogram's sliding window size. | ||
/// Defines how many frequencies will be considered for adaptation. | ||
const FREQUENCY_QUEUE_SIZE: usize = 10; | ||
|
||
/// Exponential moving average smoothing coefficient for limited window. | ||
/// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size. | ||
const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64); | ||
|
||
/// Pool task frequency variable. | ||
/// Holds scheduled tasks onto the thread pool for the calculation time window. | ||
static FREQUENCY: AtomicU64 = AtomicU64::new(0); | ||
|
||
/// Possible max threads (without OS contract). | ||
static MAX_THREADS: AtomicU64 = AtomicU64::new(10_000); | ||
|
||
/// Pool interface between the scheduler and thread pool | ||
struct Pool { | ||
sender: Sender<async_task::Task<()>>, | ||
receiver: Receiver<async_task::Task<()>>, | ||
} | ||
|
||
lazy_static! { | ||
/// Blocking pool with static starting thread count. | ||
static ref POOL: Pool = { | ||
for _ in 0..2 { | ||
for _ in 0..LOW_WATERMARK { | ||
thread::Builder::new() | ||
.name("async-blocking-driver".to_string()) | ||
.spawn(|| abort_on_panic(|| { | ||
|
@@ -35,6 +110,19 @@ lazy_static! { | |
.expect("cannot start a thread driving blocking tasks"); | ||
} | ||
|
||
// Pool manager to check frequency of task rates | ||
// and take action by scaling the pool accordingly. | ||
thread::Builder::new() | ||
.name("async-pool-manager".to_string()) | ||
.spawn(|| abort_on_panic(|| { | ||
let poll_interval = Duration::from_millis(MANAGER_POLL_INTERVAL); | ||
loop { | ||
scale_pool(); | ||
thread::sleep(poll_interval); | ||
} | ||
})) | ||
.expect("thread pool manager cannot be started"); | ||
|
||
// We want to use an unbuffered channel here to help | ||
// us drive our dynamic control. In effect, the | ||
// kernel's scheduler becomes the queue, reducing | ||
|
@@ -45,52 +133,175 @@ lazy_static! { | |
let (sender, receiver) = bounded(0); | ||
Pool { sender, receiver } | ||
}; | ||
|
||
/// Sliding window for pool task frequency calculation | ||
static ref FREQ_QUEUE: Mutex<VecDeque<u64>> = { | ||
Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE.saturating_add(1))) | ||
}; | ||
|
||
/// Dynamic pool thread count variable | ||
static ref POOL_SIZE: Mutex<u64> = Mutex::new(LOW_WATERMARK); | ||
} | ||
|
||
// Create up to MAX_THREADS dynamic blocking task worker threads. | ||
// Dynamic threads will terminate themselves if they don't | ||
// receive any work after between one and ten seconds. | ||
fn maybe_create_another_blocking_thread() { | ||
// We use a `Relaxed` atomic operation because | ||
// it's just a heuristic, and would not lose correctness | ||
// even if it's random. | ||
let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); | ||
if workers >= MAX_THREADS { | ||
return; | ||
/// Exponentially Weighted Moving Average calculation | ||
/// | ||
/// This allows us to find the EMA value. | ||
/// This value represents the trend of tasks mapped onto the thread pool. | ||
/// Calculation is following: | ||
/// ```text | ||
/// +--------+-----------------+----------------------------------+ | ||
/// | Symbol | Identifier | Explanation | | ||
/// +--------+-----------------+----------------------------------+ | ||
/// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 | | ||
/// | Yt | freq | frequency sample at time t | | ||
/// | St | acc | EMA at time t | | ||
/// +--------+-----------------+----------------------------------+ | ||
/// ``` | ||
/// Under these definitions formula is following: | ||
/// ```text | ||
/// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St | ||
/// ``` | ||
/// # Arguments | ||
/// | ||
/// * `freq_queue` - Sliding window of frequency samples | ||
#[inline] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in general, humans are bad at guessing whether something should be inlined. Usually in rust it's better to leave these attributes out unless some code is going to be executed in a super hot path and you've done work to ensure the improvement is measurable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is not contributing to the overall discussion + review. I’ve found it quite a bit Mean of 10 consecutive runs of
That is also better for users of this library indirectly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a bit skeptical that change is related to the inline here because this is a function that gets called 5 times per second, and that measurement skew is ~1ms. For a workload that lasts ~40ms, it's not clear to me how measuring things that happen on the order of once every 200ms is relevant. I'm asking you to simplify your proposed code because we are going to have to keep it working over time, and it's important not to add bits that add complexity for humans without clear benefits. |
||
fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 { | ||
freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| { | ||
acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64)) | ||
}) * EMA_COEFFICIENT as f64 | ||
} | ||
|
||
/// Adaptive pool scaling function | ||
/// | ||
/// This allows to spawn new threads to make room for incoming task pressure. | ||
/// Works in the background detached from the pool system and scales up the pool based | ||
/// on the request rate. | ||
/// | ||
/// It uses frequency based calculation to define work. Utilizing average processing rate. | ||
fn scale_pool() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In another context there was some discussion around adding tracing (using log's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just know that the idea was thrown around, and it was probably an opt-in feature, too. @stjepang or @yoshuawuyts would know more :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should get the I haven't considered how the differences between async-log's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @davidbarsky we could use From convos with Eliza during RustConf there's def a desire to bridge between the two approaches, and we mostly need to figure out how to go about it. async-rs/async-log#7 is what I've currently got (repo incoming soon), but need to check how well that works (: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yoshuawuyts Yep! I think we're on the same page about wanting to bridge the two libraries. I'm spitballing one mechanism on bridging that gap, which consists of using mutually exclusive feature flags in a I'm sorry if what I communicated wasn't clear! |
||
// Fetch current frequency, it does matter that operations are ordered in this approach. | ||
let current_frequency = FREQUENCY.swap(0, Ordering::SeqCst); | ||
let mut freq_queue = FREQ_QUEUE.lock().unwrap(); | ||
|
||
// Make it safe to start for calculations by adding initial frequency scale | ||
if freq_queue.len() == 0 { | ||
freq_queue.push_back(0); | ||
} | ||
|
||
// Calculate message rate for the given time window | ||
let frequency = (current_frequency as f64 / MANAGER_POLL_INTERVAL as f64) as u64; | ||
|
||
// Calculates current time window's EMA value (including last sample) | ||
let prev_ema_frequency = calculate_ema(&freq_queue); | ||
|
||
// Add seen frequency data to the frequency histogram. | ||
freq_queue.push_back(frequency); | ||
if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) { | ||
freq_queue.pop_front(); | ||
} | ||
|
||
// Calculates current time window's EMA value (including last sample) | ||
let curr_ema_frequency = calculate_ema(&freq_queue); | ||
|
||
// Adapts the thread count of pool | ||
// | ||
// Sliding window of frequencies visited by the pool manager. | ||
// Pool manager creates EMA value for previous window and current window. | ||
// Compare them to determine scaling amount based on the trends. | ||
// If current EMA value is bigger, we will scale up. | ||
if curr_ema_frequency > prev_ema_frequency { | ||
// "Scale by" amount can be seen as "how much load is coming". | ||
// "Scale" amount is "how many threads we should spawn". | ||
let scale_by: f64 = curr_ema_frequency - prev_ema_frequency; | ||
let scale = num_cpus::get() | ||
.min(((LOW_WATERMARK as f64 * scale_by) + LOW_WATERMARK as f64) as usize); | ||
|
||
// It is time to scale the pool! | ||
(0..scale).for_each(|_| { | ||
create_blocking_thread(); | ||
}); | ||
} else if (curr_ema_frequency - prev_ema_frequency).abs() < std::f64::EPSILON | ||
&& current_frequency != 0 | ||
{ | ||
// Throughput is low. Allocate more threads to unblock flow. | ||
// If we fall to this case, scheduler is congested by longhauling tasks. | ||
// For unblock the flow we should add up some threads to the pool, but not that many to | ||
// stagger the program's operation. | ||
(0..LOW_WATERMARK).for_each(|_| { | ||
create_blocking_thread(); | ||
}); | ||
} | ||
} | ||
|
||
/// Creates blocking thread to receive tasks | ||
/// Dynamic threads will terminate themselves if they don't | ||
/// receive any work after between one and ten seconds. | ||
fn create_blocking_thread() { | ||
// Check that thread is spawnable. | ||
// If it hits to the OS limits don't spawn it. | ||
{ | ||
let pool_size = *POOL_SIZE.lock().unwrap(); | ||
if pool_size >= MAX_THREADS.load(Ordering::SeqCst) { | ||
MAX_THREADS.store(10_000, Ordering::SeqCst); | ||
return; | ||
} | ||
} | ||
// We want to avoid having all threads terminate at | ||
// exactly the same time, causing thundering herd | ||
// effects. We want to stagger their destruction over | ||
// 10 seconds or so to make the costs fade into | ||
// background noise. | ||
// | ||
// Generate a simple random number of milliseconds | ||
let rand_sleep_ms = u64::from(random(10_000)); | ||
let rand_sleep_ms = 1000_u64 | ||
.checked_add(u64::from(random(10_000))) | ||
.expect("shouldn't overflow"); | ||
|
||
thread::Builder::new() | ||
let _ = thread::Builder::new() | ||
.name("async-blocking-driver-dynamic".to_string()) | ||
.spawn(move || { | ||
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); | ||
let wait_limit = Duration::from_millis(rand_sleep_ms); | ||
|
||
DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); | ||
// Adjust the pool size counter before and after spawn | ||
*POOL_SIZE.lock().unwrap() += 1; | ||
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { | ||
abort_on_panic(|| task.run()); | ||
} | ||
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed); | ||
*POOL_SIZE.lock().unwrap() -= 1; | ||
}) | ||
.expect("cannot start a dynamic thread driving blocking tasks"); | ||
.map_err(|err| { | ||
match err.kind() { | ||
ErrorKind::WouldBlock => { | ||
// Maximum allowed threads per process is varying from system to system. | ||
// Also, some systems have it(like macOS), and some don't(Linux). | ||
// This case expected not to happen. | ||
// But when happened this shouldn't throw a panic. | ||
let guarded_count = POOL_SIZE | ||
.lock() | ||
.unwrap() | ||
.checked_sub(1) | ||
.expect("shouldn't underflow"); | ||
MAX_THREADS.store(guarded_count, Ordering::SeqCst); | ||
} | ||
_ => eprintln!( | ||
"cannot start a dynamic thread driving blocking tasks: {}", | ||
err | ||
), | ||
} | ||
}); | ||
} | ||
|
||
// Enqueues work, attempting to send to the threadpool in a | ||
// nonblocking way and spinning up another worker thread if | ||
// there is not a thread ready to accept the work. | ||
/// Enqueues work, attempting to send to the thread pool in a | ||
/// nonblocking way and spinning up needed amount of threads | ||
/// based on the previous statistics without relying on | ||
/// if there is not a thread ready to accept the work or not. | ||
fn schedule(t: async_task::Task<()>) { | ||
// Add up for every incoming scheduled task | ||
FREQUENCY.fetch_add(1, Ordering::Acquire); | ||
|
||
if let Err(err) = POOL.sender.try_send(t) { | ||
// We were not able to send to the channel without | ||
// blocking. Try to spin up another thread and then | ||
// retry sending while blocking. | ||
maybe_create_another_blocking_thread(); | ||
// blocking. | ||
POOL.sender.send(err.into_inner()).unwrap(); | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,4 +34,4 @@ mod pool; | |
mod sleep; | ||
mod task; | ||
|
||
pub(crate) mod blocking; | ||
pub mod blocking; |
Uh oh!
There was an error while loading. Please reload this page.