diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs new file mode 100644 index 000000000..b1d59e5e0 --- /dev/null +++ b/src/stream/from_fn.rs @@ -0,0 +1,104 @@ +use futures::stream::Stream; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Creates a new stream where each iteration calls the provided closure. +/// +/// This allows creating a custom stream with any behavior +/// without using the more verbose syntax of creating a dedicated type +/// and implementing the `Stream` trait for it. +/// +/// Note that the `FromFn` stream doesn’t make assumptions about the behavior of the closure, +/// and therefore conservatively does not implement [`FusedStream`](futures_core::stream::FusedStream). +/// +/// The closure can use captures and its environment to track state across iterations. Depending on +/// how the stream is used, this may require specifying the `move` keyword on the closure. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use async_std::{future, stream}; +/// use std::sync::atomic::{AtomicUsize, Ordering}; +/// +/// static COUNT: AtomicUsize = AtomicUsize::new(0); +/// let stream = stream::from_fn(|| { +/// // Increment our count. This is why we started at zero. +/// +/// let count = COUNT.fetch_add(1, Ordering::SeqCst); +/// +/// // Check to see if we've finished counting or not. +/// if count < 6 { +/// future::ready(Some(count)) +/// } else { +/// future::ready(None) +/// } +/// }); +/// assert_eq!(stream.collect::>().await, &[1, 2, 3, 4, 5]); +/// # }); +/// ``` +pub fn from_fn(f: F) -> FromFn +where + F: FnMut() -> Fut, + Fut: Future>, +{ + FromFn { f, fut: None } +} + +/// A stream where each iteration calls the provided closure. +/// +/// This `struct` is created by the [`stream::from_fn`] function. +/// See its documentation for more. +/// +/// [`stream::from_fn`]: fn.from_fn.html +#[must_use = "streams do nothing unless polled"] +pub struct FromFn { + f: F, + fut: Option, +} + +impl Unpin for FromFn {} + +impl fmt::Debug for FromFn +where + Fut: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FromFn").finish() + } +} + +impl FromFn { + unsafe_unpinned!(f: F); + unsafe_pinned!(fut: Option); +} + +impl Stream for FromFn +where + F: FnMut() -> Fut, + Fut: Future>, +{ + type Item = Item; + + #[inline] + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.fut.is_none() { + let fut = (self.as_mut().f())(); + self.as_mut().fut().set(Some(fut)); + } + + self.as_mut() + .fut() + .as_pin_mut() + .unwrap() + .poll(cx) + .map(|item| { + self.as_mut().fut().set(None); + item + }) + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 8dcc6d54a..19fa3b8f4 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -22,11 +22,13 @@ //! ``` pub use empty::{empty, Empty}; +pub use from_fn::{from_fn, FromFn}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use stream::{Stream, Take}; mod empty; +mod from_fn; mod once; mod repeat; mod stream;