Skip to content

Commit fc72fc4

Browse files
committed
Replace the std mpsc channels with a crossbeam channel
1 parent f0a1e58 commit fc72fc4

File tree

2 files changed

+20
-20
lines changed

2 files changed

+20
-20
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ path = "lib.rs"
2323
slog = "2.1"
2424
thread_local = "0.3.3"
2525
take_mut = "0.2.0"
26+
crossbeam-channel = "0.3"
2627

2728
[package.metadata.docs.rs]
2829
features = ["nested-values", "dynamic-keys"]

lib.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@
5252
extern crate slog;
5353
extern crate thread_local;
5454
extern crate take_mut;
55+
extern crate crossbeam_channel;
56+
57+
use crossbeam_channel::Sender;
5558

5659
use slog::{Record, RecordStatic, Level, SingleKV, KV, BorrowedKV};
5760
use slog::{Serializer, OwnedKVList, Key};
@@ -62,7 +65,7 @@ use std::error::Error;
6265
use std::fmt;
6366
use std::sync;
6467

65-
use std::sync::{mpsc, Mutex};
68+
use std::sync::Mutex;
6669
use std::sync::atomic::AtomicUsize;
6770
use std::sync::atomic::Ordering;
6871
use take_mut::take;
@@ -184,19 +187,14 @@ pub enum AsyncError {
184187
Fatal(Box<std::error::Error>),
185188
}
186189

187-
impl<T> From<mpsc::TrySendError<T>> for AsyncError {
188-
fn from(_: mpsc::TrySendError<T>) -> AsyncError {
189-
AsyncError::Full
190-
}
191-
}
192-
impl<T> From<std::sync::TryLockError<T>> for AsyncError {
193-
fn from(_: std::sync::TryLockError<T>) -> AsyncError {
190+
impl<T> From<crossbeam_channel::TrySendError<T>> for AsyncError {
191+
fn from(_: crossbeam_channel::TrySendError<T>) -> AsyncError {
194192
AsyncError::Full
195193
}
196194
}
197195

198-
impl<T> From<mpsc::SendError<T>> for AsyncError {
199-
fn from(_: mpsc::SendError<T>) -> AsyncError {
196+
impl<T> From<crossbeam_channel::SendError<T>> for AsyncError {
197+
fn from(_: crossbeam_channel::SendError<T>) -> AsyncError {
200198
AsyncError::Fatal(Box::new(
201199
io::Error::new(io::ErrorKind::BrokenPipe, "The logger thread terminated"),
202200
))
@@ -210,6 +208,7 @@ impl<T> From<std::sync::PoisonError<T>> for AsyncError {
210208
))
211209
}
212210
}
211+
213212
/// `AsyncResult` alias
214213
pub type AsyncResult<T> = std::result::Result<T, AsyncError>;
215214

@@ -271,8 +270,8 @@ where
271270

272271
fn spawn_thread(
273272
self,
274-
) -> (thread::JoinHandle<()>, mpsc::SyncSender<AsyncMsg>) {
275-
let (tx, rx) = mpsc::sync_channel(self.chan_size);
273+
) -> (thread::JoinHandle<()>, Sender<AsyncMsg>) {
274+
let (tx, rx) = crossbeam_channel::bounded(self.chan_size);
276275
let mut builder = thread::Builder::new();
277276
if let Some(thread_name) = self.thread_name {
278277
builder = builder.name(thread_name);
@@ -316,7 +315,7 @@ where
316315
let (join, tx) = self.spawn_thread();
317316

318317
AsyncCore {
319-
ref_sender: Mutex::new(tx),
318+
ref_sender: tx,
320319
tl_sender: thread_local::ThreadLocal::new(),
321320
join: Mutex::new(Some(join)),
322321
blocking,
@@ -332,7 +331,7 @@ where
332331

333332
(
334333
AsyncCore {
335-
ref_sender: Mutex::new(tx.clone()),
334+
ref_sender: tx.clone(),
336335
tl_sender: thread_local::ThreadLocal::new(),
337336
join: Mutex::new(None),
338337
blocking,
@@ -363,7 +362,7 @@ pub struct AsyncGuard {
363362
// Should always be `Some`. `None` only
364363
// after `drop`
365364
join: Option<thread::JoinHandle<()>>,
366-
tx: mpsc::SyncSender<AsyncMsg>,
365+
tx: Sender<AsyncMsg>,
367366
}
368367

369368
impl Drop for AsyncGuard {
@@ -395,8 +394,8 @@ impl Drop for AsyncGuard {
395394
/// handling all previous `Record`s sent to it). If you can't tolerate the
396395
/// delay, make sure you drop it eg. in another thread.
397396
pub struct AsyncCore {
398-
ref_sender: Mutex<mpsc::SyncSender<AsyncMsg>>,
399-
tl_sender: thread_local::ThreadLocal<mpsc::SyncSender<AsyncMsg>>,
397+
ref_sender: Sender<AsyncMsg>,
398+
tl_sender: thread_local::ThreadLocal<Sender<AsyncMsg>>,
400399
join: Mutex<Option<thread::JoinHandle<()>>>,
401400
blocking: bool,
402401
}
@@ -422,11 +421,11 @@ impl AsyncCore {
422421
fn get_sender(
423422
&self,
424423
) -> Result<
425-
&mpsc::SyncSender<AsyncMsg>,
426-
std::sync::PoisonError<sync::MutexGuard<mpsc::SyncSender<AsyncMsg>>>,
424+
&crossbeam_channel::Sender<AsyncMsg>,
425+
std::sync::PoisonError<sync::MutexGuard<crossbeam_channel::Sender<AsyncMsg>>>,
427426
> {
428427
self.tl_sender
429-
.get_or_try(|| Ok(Box::new(self.ref_sender.lock()?.clone())))
428+
.get_or_try(|| Ok(Box::new(self.ref_sender.clone())))
430429
}
431430

432431
/// Send `AsyncRecord` to a worker thread.

0 commit comments

Comments
 (0)