Skip to content

Commit b64160c

Browse files
committed
Provide owning Idle guard
1 parent b61d16d commit b64160c

File tree

3 files changed

+122
-13
lines changed

3 files changed

+122
-13
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hdrhistogram"
3-
version = "6.3.1"
3+
version = "6.3.2"
44
edition = "2018"
55

66
description = "A port of HdrHistogram to Rust"

src/sync/mod.rs

Lines changed: 75 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
use crate::errors::*;
44
use crate::{Counter, Histogram};
55
use std::borrow::Borrow;
6+
use std::borrow::BorrowMut;
7+
use std::marker::PhantomData;
68
use std::ops::{AddAssign, Deref, DerefMut};
79
use std::sync::{atomic, Arc, Mutex};
810
use std::time;
@@ -88,15 +90,36 @@ struct Shared<C: Counter> {
8890
phase: atomic::AtomicUsize,
8991
}
9092

93+
/// See [`IdleRecorder`]. This guard borrows the idle [`Recorder`].
94+
pub type IdleRecorderGuard<'a, C> = IdleRecorder<&'a mut Recorder<C>, C>;
95+
9196
/// This guard denotes that a [`Recorder`] is currently idle, and should not be waited on by a
9297
/// [`SyncHistogram`] phase-shift.
93-
pub struct IdleRecorderGuard<'a, C: Counter>(&'a mut Recorder<C>);
98+
pub struct IdleRecorder<T, C: Counter>
99+
where
100+
T: BorrowMut<Recorder<C>>,
101+
{
102+
recorder: Option<T>,
103+
c: PhantomData<C>,
104+
}
105+
106+
impl<T, C: Counter> IdleRecorder<T, C>
107+
where
108+
T: BorrowMut<Recorder<C>>,
109+
{
110+
fn reactivate(&mut self) {
111+
let recorder = if let Some(ref mut r) = self.recorder {
112+
r
113+
} else {
114+
// already reactivated
115+
return;
116+
};
117+
118+
let recorder = recorder.borrow_mut();
94119

95-
impl<'a, C: Counter> Drop for IdleRecorderGuard<'a, C> {
96-
fn drop(&mut self) {
97120
// the Recorder is no longer idle, so the reader has to wait for us again
98121
// this basically means re-incrementing .recorders
99-
let mut crit = self.0.shared.truth.lock().unwrap();
122+
let mut crit = recorder.shared.truth.lock().unwrap();
100123
crit.recorders += 1;
101124

102125
// we need to figure out what phase we're joining
@@ -107,13 +130,35 @@ impl<'a, C: Counter> Drop for IdleRecorderGuard<'a, C> {
107130
// to send), and bump the phase, all before we read it, which would lead us to believe that
108131
// we were already synchronized when in reality we were not, which would stall the reader
109132
// even if we issued more writes.
110-
self.0.last_phase = self.0.shared.phase.load(atomic::Ordering::Acquire);
133+
recorder.last_phase = recorder.shared.phase.load(atomic::Ordering::Acquire);
111134

112135
// explicitly drop guard to ensure we don't accidentally drop it above
113136
drop(crit);
114137
}
115138
}
116139

140+
impl<C: Counter> IdleRecorder<Recorder<C>, C> {
141+
/// Mark the wrapped [`Recorder`] as active again and return it.
142+
pub fn activate(mut self) -> Recorder<C> {
143+
self.reactivate();
144+
self.recorder.take().unwrap()
145+
}
146+
147+
/// Clone the wrapped [`Recorder`].
148+
pub fn recorder(&self) -> Recorder<C> {
149+
self.recorder.as_ref().unwrap().clone()
150+
}
151+
}
152+
153+
impl<T, C: Counter> Drop for IdleRecorder<T, C>
154+
where
155+
T: BorrowMut<Recorder<C>>,
156+
{
157+
fn drop(&mut self) {
158+
self.reactivate()
159+
}
160+
}
161+
117162
impl<C: Counter> Recorder<C> {
118163
fn with_hist<F, R>(&mut self, f: F) -> R
119164
where
@@ -139,11 +184,7 @@ impl<C: Counter> Recorder<C> {
139184
let _ = self.shared.sender.send(h).is_ok(); // if this is err, the reader went away
140185
}
141186

142-
/// Call this method if the Recorder will be idle for a while.
143-
///
144-
/// Until the returned guard is dropped, the associated [`SyncHistogram`] will not wait for
145-
/// this recorder on a phase shift.
146-
pub fn idle(&mut self) -> IdleRecorderGuard<C> {
187+
fn deactivate(&mut self) {
147188
let phase;
148189
{
149190
// we're leaving rotation, so we need to decrement .recorders
@@ -160,8 +201,31 @@ impl<C: Counter> Recorder<C> {
160201
}
161202
}
162203
self.last_phase = phase;
204+
}
163205

164-
IdleRecorderGuard(self)
206+
/// Call this method if the Recorder will be idle for a while.
207+
///
208+
/// Until the returned guard is dropped, the associated [`SyncHistogram`] will not wait for
209+
/// this recorder on a phase shift.
210+
pub fn idle(&mut self) -> IdleRecorderGuard<C> {
211+
self.deactivate();
212+
IdleRecorder {
213+
recorder: Some(self),
214+
c: PhantomData,
215+
}
216+
}
217+
218+
/// Mark this `Recorder` as inactive.
219+
///
220+
/// Until the returned guard is consumed, either by calling [`IdleRecorder::activate`] or by
221+
/// dropping it, the associated [`SyncHistogram`] will not wait for this recorder on a phase
222+
/// shift.
223+
pub fn into_idle(mut self) -> IdleRecorder<Self, C> {
224+
self.deactivate();
225+
IdleRecorder {
226+
recorder: Some(self),
227+
c: PhantomData,
228+
}
165229
}
166230

167231
/// See [`Histogram::add`].

tests/sync.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#[cfg(all(feature = "sync", test))]
22
mod sync {
33
use hdrhistogram::{sync::SyncHistogram, Histogram};
4-
use std::sync::Arc;
4+
use std::sync::{atomic, Arc};
55
use std::{thread, time};
66

77
const TRACKABLE_MAX: u64 = 3600 * 1000 * 1000;
@@ -214,6 +214,51 @@ mod sync {
214214
jh.join().unwrap();
215215
}
216216

217+
#[test]
218+
fn clone_idle_recorder() {
219+
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)
220+
.unwrap()
221+
.into();
222+
223+
let done = Arc::new(atomic::AtomicBool::new(false));
224+
let r = h.recorder().into_idle();
225+
h.refresh(); // this should not block
226+
h.refresh(); // nor should this
227+
let mut r2 = r.recorder();
228+
let d = Arc::clone(&done);
229+
let jh = thread::spawn(move || {
230+
let mut i = 0;
231+
while !d.load(atomic::Ordering::SeqCst) {
232+
r2 += TEST_VALUE_LEVEL;
233+
i += 1;
234+
}
235+
i
236+
});
237+
h.refresh(); // this is released by the r2 += above
238+
let mut r = r.activate();
239+
// a call to refresh would block here now
240+
let d = Arc::clone(&done);
241+
let jh2 = thread::spawn(move || {
242+
let mut i = 0;
243+
while !d.load(atomic::Ordering::SeqCst) {
244+
r += TEST_VALUE_LEVEL;
245+
i += 1;
246+
}
247+
i
248+
});
249+
250+
h.refresh(); // this is released by the second r2 _and_ the r += above
251+
252+
// tell recorders to exit
253+
done.store(true, atomic::Ordering::SeqCst);
254+
h.refresh(); // shouldn't block for long
255+
let n = jh.join().unwrap() + jh2.join().unwrap();
256+
h.refresh(); // no more recorders, so shouldn't block
257+
258+
assert_eq!(h.count_at(TEST_VALUE_LEVEL), n);
259+
assert_eq!(h.len(), n);
260+
}
261+
217262
#[test]
218263
fn concurrent_writes() {
219264
let mut h: SyncHistogram<_> = Histogram::<u64>::new_with_max(TRACKABLE_MAX, SIGFIG)

0 commit comments

Comments
 (0)