From d703fb412afe39b437361304fff11619f0e45f07 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sat, 5 Oct 2019 19:41:43 +0300 Subject: [PATCH 1/4] Adds a from_fn stream implementation --- src/stream/from_fn.rs | 99 +++++++++++++++++++++++++++++++++++++++++++ src/stream/mod.rs | 2 + 2 files changed, 101 insertions(+) create mode 100644 src/stream/from_fn.rs diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs new file mode 100644 index 000000000..868652948 --- /dev/null +++ b/src/stream/from_fn.rs @@ -0,0 +1,99 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream that yeilds elements by calling a closure. +/// +/// This stream is constructed by [`from_fn`] function. +/// +/// [`from_fn`]: fn.from_fn.html +#[derive(Debug)] +pub struct FromFn { + f: F, + future: Option, + __t: PhantomData, +} + +/// Creates a new stream where to produce each new element a provided closure is called. +/// +/// This allows creating a custom stream with any behaviour without using the more verbose +/// syntax of creating a dedicated type and implementing a `Stream` trait for it. +/// +/// # Examples +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use std::sync::{Mutex, Arc}; +/// use async_std::stream; +/// +/// let count = Arc::new(Mutex::new(0u8)); +/// let s = stream::from_fn(|| { +/// let count = Arc::clone(&count); +/// +/// async move { +/// *count.lock().unwrap() += 1; +/// +/// if *count.lock().unwrap() > 3 { +/// None +/// } else { +/// Some(*count.lock().unwrap()) +/// } +/// } +/// }); +/// +/// pin_utils::pin_mut!(s); +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(2)); +/// assert_eq!(s.next().await, Some(3)); +/// assert_eq!(s.next().await, None); +/// # +/// # }) } +/// +/// ``` +pub fn from_fn(f: F) -> FromFn +where + F: FnMut() -> Fut, + Fut: Future>, +{ + FromFn { + f, + future: None, + __t: PhantomData, + } +} + +impl FromFn { + pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_pinned!(future: Option); +} + +impl Stream for FromFn +where + F: FnMut() -> Fut, + Fut: Future>, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match self.future.is_some() { + true => { + let next = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + self.as_mut().future().set(None); + + return Poll::Ready(next); + } + false => { + let fut = (self.as_mut().f())(); + self.as_mut().future().set(Some(fut)); + } + } + } + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 8aa12a2f1..8435a3911 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -24,6 +24,7 @@ use cfg_if::cfg_if; pub use empty::{empty, Empty}; +pub use from_fn::{from_fn, FromFn}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use stream::{Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, Zip}; @@ -31,6 +32,7 @@ pub use stream::{Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, St pub(crate) mod stream; mod empty; +mod from_fn; mod once; mod repeat; From 403da96c295c21ed99715aa9b9631add419878fd Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sun, 6 Oct 2019 08:35:25 +0300 Subject: [PATCH 2/4] Update src/stream/from_fn.rs Co-Authored-By: Yoshua Wuyts --- src/stream/from_fn.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index 868652948..6971deb9a 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -5,7 +5,7 @@ use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; -/// A stream that yeilds elements by calling a closure. +/// A stream that yields elements by calling a closure. /// /// This stream is constructed by [`from_fn`] function. /// From 0be05ba0559bb411ca3ac6b46bbcce6c2dcd84bd Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sun, 6 Oct 2019 08:38:29 +0300 Subject: [PATCH 3/4] Fix review nits --- src/stream/from_fn.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index 6971deb9a..c6e48a637 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -81,15 +81,15 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - match self.future.is_some() { - true => { + match &self.future { + Some(_) => { let next = futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); self.as_mut().future().set(None); return Poll::Ready(next); } - false => { + None => { let fut = (self.as_mut().f())(); self.as_mut().future().set(Some(fut)); } From d1f7b05d5edbce33cc30c7a4e33fb372864a8ef1 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sun, 6 Oct 2019 08:41:02 +0300 Subject: [PATCH 4/4] Use async_std Mutex --- src/stream/from_fn.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index c6e48a637..c1cb97af4 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -28,7 +28,8 @@ pub struct FromFn { /// # fn main() { async_std::task::block_on(async { /// # /// use async_std::prelude::*; -/// use std::sync::{Mutex, Arc}; +/// use async_std::sync::Mutex; +/// use std::sync::Arc; /// use async_std::stream; /// /// let count = Arc::new(Mutex::new(0u8)); @@ -36,12 +37,12 @@ pub struct FromFn { /// let count = Arc::clone(&count); /// /// async move { -/// *count.lock().unwrap() += 1; +/// *count.lock().await += 1; /// -/// if *count.lock().unwrap() > 3 { +/// if *count.lock().await > 3 { /// None /// } else { -/// Some(*count.lock().unwrap()) +/// Some(*count.lock().await) /// } /// } /// });