Skip to content

EMA based statistically adaptive thread pool design #108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Doc comments for the module
  • Loading branch information
vertexclique committed Aug 30, 2019
commit e765cee30dbdab56db8c113c872a6c20e735ca1f
119 changes: 76 additions & 43 deletions src/task/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,30 @@
//! A thread pool for running blocking functions asynchronously.
//!
//! Blocking thread pool consists of four elements:
//! * Frequency Detector
//! * Trend Estimator
//! * Predictive Upscaler
//! * Time-based Downscaler
//!
//! ## Frequency Detector
//! Detects how many tasks are submitted from scheduler to thread pool in a given time frame.
//! Pool manager thread does this sampling every 200 milliseconds.
//! This value is going to be used for trend estimation phase.
//!
//! ## Trend Estimator
//! Hold up to the given number of frequencies to create an estimation.
//! This pool holds 10 frequencies at a time.
//! Estimation algorithm and prediction uses Exponentially Weighted Moving Average algorithm.
//!
//! Algorithm is altered and adapted from [A Novel Predictive and Self–Adaptive Dynamic Thread Pool Management](https://doi.org/10.1109/ISPA.2011.61).
//!
//! ## Predictive Upscaler
//! Selects upscaling amount based on estimation or when throughput hogs based on amount of tasks mapped.
//! Throughput hogs determined by a combination of job in / job out frequency and current scheduler task assignment frequency.
//!
//! ## Time-based Downscaler
//! After dynamic tasks spawned with upscaler they will continue working in between 1 second and 10 seconds.
//! When tasks are detached from the channels after this amount they join back.

use std::collections::VecDeque;
use std::fmt;
Expand All @@ -16,35 +42,37 @@ use crate::task::{Context, Poll};
use crate::utils::abort_on_panic;
use std::sync::{Arc, Mutex};

// Low watermark value, defines the bare minimum of the pool.
// Spawns initial thread set.
/// Low watermark value, defines the bare minimum of the pool.
/// Spawns initial thread set.
const LOW_WATERMARK: u64 = 2;

// Pool managers interval time (milliseconds)
// This is the actual interval which makes adaptation calculation
/// Pool managers interval time (milliseconds).
/// This is the actual interval which makes adaptation calculation.
const MANAGER_POLL_INTERVAL: u64 = 200;

// Frequency histogram's sliding window size
// Defines how many frequencies will be considered for adaptation.
/// Frequency histogram's sliding window size.
/// Defines how many frequencies will be considered for adaptation.
const FREQUENCY_QUEUE_SIZE: usize = 10;

// Exponential moving average smoothing coefficient for limited window
// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
/// Exponential moving average smoothing coefficient for limited window.
/// Smoothing factor is estimated with: 2 / (N + 1) where N is sample size.
const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64);

// Pool task frequency variable
// Holds scheduled tasks onto the thread pool for the calculation window
/// Pool task frequency variable.
/// Holds scheduled tasks onto the thread pool for the calculation window.
static FREQUENCY: AtomicU64 = AtomicU64::new(0);

// Possible max threads (without OS contract)
/// Possible max threads (without OS contract).
static MAX_THREADS: AtomicU64 = AtomicU64::new(10_000);

/// Pool interface between the scheduler and thread pool
struct Pool {
sender: Sender<async_task::Task<()>>,
receiver: Receiver<async_task::Task<()>>,
}

lazy_static! {
/// Blocking pool with static starting thread count.
static ref POOL: Pool = {
for _ in 0..LOW_WATERMARK {
thread::Builder::new()
Expand Down Expand Up @@ -81,45 +109,50 @@ lazy_static! {
Pool { sender, receiver }
};

// Pool task frequency calculation variables
/// Sliding window for pool task frequency calculation
static ref FREQ_QUEUE: Arc<Mutex<VecDeque<u64>>> = {
Arc::new(Mutex::new(VecDeque::with_capacity(FREQUENCY_QUEUE_SIZE + 1)))
};

// Pool size variable
/// Dynamic pool thread count variable
static ref POOL_SIZE: Arc<Mutex<u64>> = Arc::new(Mutex::new(LOW_WATERMARK));
}

// Exponentially Weighted Moving Average calculation
//
// This allows us to find the EMA value.
// This value represents the trend of tasks mapped onto the thread pool.
// Calculation is following:
//
// +--------+-----------------+----------------------------------+
// | Symbol | Identifier | Explanation |
// +--------+-----------------+----------------------------------+
// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 |
// | Yt | freq | frequency sample at time t |
// | St | acc | EMA at time t |
// +--------+-----------------+----------------------------------+
//
// Under these definitions formula is following:
// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St
/// Exponentially Weighted Moving Average calculation
///
/// This allows us to find the EMA value.
/// This value represents the trend of tasks mapped onto the thread pool.
/// Calculation is following:
/// ```
/// +--------+-----------------+----------------------------------+
/// | Symbol | Identifier | Explanation |
/// +--------+-----------------+----------------------------------+
/// | α | EMA_COEFFICIENT | smoothing factor between 0 and 1 |
/// | Yt | freq | frequency sample at time t |
/// | St | acc | EMA at time t |
/// +--------+-----------------+----------------------------------+
/// ```
/// Under these definitions formula is following:
/// ```
/// EMA = α * [ Yt + (1 - α)*Yt-1 + ((1 - α)^2)*Yt-2 + ((1 - α)^3)*Yt-3 ... ] + St
/// ```
/// # Arguments
///
/// * `freq_queue` - Sliding window of frequency samples
#[inline]
Copy link
Contributor

@spacejam spacejam Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, humans are bad at guessing whether something should be inlined. Usually in rust it's better to leave these attributes out unless some code is going to be executed in a super hot path and you've done work to ensure the improvement is measurable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not contributing to the overall discussion + review. I’ve found it quite a bit out of context and also from the book. I've made the benchmarks already with and without and decided to leave it inlined. You can see below my last run again after your comment (run shouldn't be needed because of how compiler inlines and unrolls):

Mean of 10 consecutive runs of blocking benchmark:

43057207.9 ns without inlining
41816626.1 ns with inlining

That is also better for users of this library indirectly.

Copy link
Contributor

@spacejam spacejam Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit skeptical that change is related to the inline here because this is a function that gets called 5 times per second, and that measurement skew is ~1ms. For a workload that lasts ~40ms, it's not clear to me how measuring things that happen on the order of once every 200ms is relevant.

I'm asking you to simplify your proposed code because we are going to have to keep it working over time, and it's important not to add bits that add complexity for humans without clear benefits.

fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 {
freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| {
acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64))
}) * EMA_COEFFICIENT as f64
}

// Adaptive pool scaling function
//
// This allows to spawn new threads to make room for incoming task pressure.
// Works in the background detached from the pool system and scales up the pool based
// on the request rate.
//
// It uses frequency based calculation to define work. Utilizing average processing rate.
/// Adaptive pool scaling function
///
/// This allows to spawn new threads to make room for incoming task pressure.
/// Works in the background detached from the pool system and scales up the pool based
/// on the request rate.
///
/// It uses frequency based calculation to define work. Utilizing average processing rate.
fn scale_pool() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another context there was some discussion around adding tracing (using log's trace! macro) to async-std. This functions seems like a good place for that, too.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log::trace or the new tracing library? wondering about how much overhead each would add

Copy link
Contributor

@killercup killercup Aug 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just know that the idea was thrown around, and it was probably an opt-in feature, too. @stjepang or @yoshuawuyts would know more :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log now has kv support, which we're already using in other places too. tracing can pick these up too, so using log is probably the best choice here overall (:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should get the kv support in tracing-log soon-ish? However I don't believe that log supports the span-like macros, so async-std might need to have its own, internal span-like macro and dispatch conditionally to async-log or tracing.

I haven't considered how the differences between async-log's span and tracing's span can be bridged, but I think it's feasible if tracing's span!().enter() is used to mimic async-log's nested span block.

Copy link
Contributor

@yoshuawuyts yoshuawuyts Sep 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidbarsky we could use async-log::span for this, probably. Though there's still a few open issues on the repo, the interface wouldn't change.

From convos with Eliza during RustConf there's def a desire to bridge between the two approaches, and we mostly need to figure out how to go about it. async-rs/async-log#7 is what I've currently got (repo incoming soon), but need to check how well that works (:

Copy link

@davidbarsky davidbarsky Sep 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yoshuawuyts Yep! I think we're on the same page about wanting to bridge the two libraries.

I'm spitballing one mechanism on bridging that gap, which consists of using mutually exclusive feature flags in a build.rs to dispatch to either async-log or tracing. This would entail using an async-std-specific span macro until the gaps—if any exist—between async-std's span and tracing's span are closed. I'm personally excited to dig into rust-lang/log#353!

I'm sorry if what I communicated wasn't clear!

// Fetch current frequency, it does matter that operations are ordered in this approach.
let current_frequency = FREQUENCY.load(Ordering::SeqCst);
Expand Down Expand Up @@ -180,9 +213,9 @@ fn scale_pool() {
FREQUENCY.store(0, Ordering::Release);
}

// Creates yet another thread to receive tasks.
// Dynamic threads will terminate themselves if they don't
// receive any work after between one and ten seconds.
/// Creates blocking thread to receive tasks
/// Dynamic threads will terminate themselves if they don't
/// receive any work after between one and ten seconds.
fn create_blocking_thread() {
// Check that thread is spawnable.
// If it hits to the OS limits don't spawn it.
Expand Down Expand Up @@ -239,10 +272,10 @@ fn create_blocking_thread() {
});
}

// Enqueues work, attempting to send to the thread pool in a
// nonblocking way and spinning up needed amount of threads
// based on the previous statistics without relying on
// if there is not a thread ready to accept the work or not.
/// Enqueues work, attempting to send to the thread pool in a
/// nonblocking way and spinning up needed amount of threads
/// based on the previous statistics without relying on
/// if there is not a thread ready to accept the work or not.
fn schedule(t: async_task::Task<()>) {
// Add up for every incoming scheduled task
FREQUENCY.fetch_add(1, Ordering::Acquire);
Expand Down