From 22f9ecc4e949dff5b428ef5105b957a8f5513bd5 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Mon, 30 Sep 2019 14:24:55 +0300 Subject: [PATCH 1/9] Async combinators --- src/stream/stream/all.rs | 46 +++++++++++++--------- src/stream/stream/any.rs | 46 +++++++++++++--------- src/stream/stream/filter_map.rs | 51 ++++++++++++++++++------- src/stream/stream/find_map.rs | 50 ++++++++++++++++-------- src/stream/stream/fold.rs | 36 +++++++++++------ src/stream/stream/mod.rs | 68 +++++++++++++++++++++------------ 6 files changed, 200 insertions(+), 97 deletions(-) diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs index 3b65fc764..fea093e2e 100644 --- a/src/stream/stream/all.rs +++ b/src/stream/stream/all.rs @@ -7,39 +7,51 @@ use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct AllFuture<'a, S, F, T> { +pub struct AllFuture<'a, S, F, Fut, T> { pub(crate) stream: &'a mut S, pub(crate) f: F, pub(crate) result: bool, + pub(crate) future: Option, pub(crate) _marker: PhantomData, } -impl Unpin for AllFuture<'_, S, F, T> {} +impl<'a, S, F, Fut, T> AllFuture<'a, S, F, Fut, T> { + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_pinned!(future: Option); +} -impl Future for AllFuture<'_, S, F, S::Item> +impl Future for AllFuture<'_, S, F, Fut, S::Item> where S: Stream + Unpin + Sized, - F: FnMut(S::Item) -> bool, + F: FnMut(S::Item) -> Fut, + Fut: Future, { type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); - - match next { - Some(v) => { - let result = (&mut self.f)(v); - self.result = result; + loop { + match self.future.is_some() { + false => { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => { + let fut = (self.as_mut().f())(v); + self.as_mut().future().set(Some(fut)); + } + None => return Poll::Ready(self.result), + } + } + true => { + let res = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); - if result { - // don't forget to wake this task again to pull the next item from stream - cx.waker().wake_by_ref(); - Poll::Pending - } else { - Poll::Ready(false) + self.as_mut().future().set(None); + if !res { + return Poll::Ready(false); + } } } - None => Poll::Ready(self.result), } } } diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs index a23adf4bf..ad0361229 100644 --- a/src/stream/stream/any.rs +++ b/src/stream/stream/any.rs @@ -7,39 +7,51 @@ use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct AnyFuture<'a, S, F, T> { +pub struct AnyFuture<'a, S, F, Fut, T> { pub(crate) stream: &'a mut S, pub(crate) f: F, pub(crate) result: bool, + pub(crate) future: Option, pub(crate) _marker: PhantomData, } -impl Unpin for AnyFuture<'_, S, F, T> {} +impl<'a, S, F, Fut, T> AnyFuture<'a, S, F, Fut, T> { + pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_pinned!(future: Option); + pin_utils::unsafe_pinned!(stream: &'a mut S); +} -impl Future for AnyFuture<'_, S, F, S::Item> +impl Future for AnyFuture<'_, S, F, Fut, S::Item> where S: Stream + Unpin + Sized, - F: FnMut(S::Item) -> bool, + F: FnMut(S::Item) -> Fut, + Fut: Future, { type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); - - match next { - Some(v) => { - let result = (&mut self.f)(v); - self.result = result; + loop { + match self.future.is_some() { + false => { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => { + let fut = (self.as_mut().f())(v); + self.as_mut().future().set(Some(fut)); + } + None => return Poll::Ready(self.result), + } + } + true => { + let res = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); - if result { - Poll::Ready(true) - } else { - // don't forget to wake this task again to pull the next item from stream - cx.waker().wake_by_ref(); - Poll::Pending + self.as_mut().future().set(None); + if res { + return Poll::Ready(true); + } } } - None => Poll::Ready(self.result), } } } diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index 756efff18..f46ddbd42 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -2,49 +2,74 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; +use crate::future::Future; use crate::stream::Stream; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct FilterMap { +pub struct FilterMap { stream: S, f: F, + future: Option, __from: PhantomData, __to: PhantomData, } -impl FilterMap { +impl Unpin for FilterMap +where + S: Unpin, + Fut: Unpin, +{ +} + +impl FilterMap { pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_pinned!(future: Option); pin_utils::unsafe_unpinned!(f: F); pub(crate) fn new(stream: S, f: F) -> Self { FilterMap { stream, f, + future: None, __from: PhantomData, __to: PhantomData, } } } -impl Stream for FilterMap +impl Stream for FilterMap where S: Stream, - F: FnMut(S::Item) -> Option, + F: FnMut(S::Item) -> Fut, + Fut: Future>, { type Item = B; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); - match next { - Some(v) => match (self.as_mut().f())(v) { - Some(b) => Poll::Ready(Some(b)), - None => { - cx.waker().wake_by_ref(); - Poll::Pending + loop { + match self.future.is_some() { + false => { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => { + let fut = self.as_mut().f()(v); + self.as_mut().future().set(Some(fut)); + } + None => return Poll::Ready(None), + } + } + true => { + let res = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + + self.as_mut().future().set(None); + + if let Some(b) = res { + return Poll::Ready(Some(b)); + } } - }, - None => Poll::Ready(None), + } } } } diff --git a/src/stream/stream/find_map.rs b/src/stream/stream/find_map.rs index dfcf92d66..be7f70730 100644 --- a/src/stream/stream/find_map.rs +++ b/src/stream/stream/find_map.rs @@ -7,45 +7,65 @@ use crate::stream::Stream; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct FindMapFuture<'a, S, F, T, B> { +pub struct FindMapFuture<'a, S, F, Fut, T, B> { stream: &'a mut S, f: F, + future: Option, __b: PhantomData, __t: PhantomData, } -impl<'a, S, B, F, T> FindMapFuture<'a, S, F, T, B> { +impl<'a, S, F, Fut, T, B> FindMapFuture<'a, S, F, Fut, T, B> { + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_pinned!(future: Option); + pub(super) fn new(stream: &'a mut S, f: F) -> Self { FindMapFuture { stream, f, + future: None, __b: PhantomData, __t: PhantomData, } } } -impl Unpin for FindMapFuture<'_, S, F, T, B> {} +impl Unpin for FindMapFuture<'_, S, F, Fut, T, B> {} -impl<'a, S, B, F> Future for FindMapFuture<'a, S, F, S::Item, B> +impl<'a, S, B, F, Fut> Future for FindMapFuture<'a, S, F, Fut, S::Item, B> where S: Stream + Unpin + Sized, - F: FnMut(S::Item) -> Option, + F: FnMut(S::Item) -> Fut, + Fut: Future>, { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); - - match item { - Some(v) => match (&mut self.f)(v) { - Some(v) => Poll::Ready(Some(v)), - None => { - cx.waker().wake_by_ref(); - Poll::Pending + loop { + match self.future.is_some() { + false => { + let item = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match item { + Some(v) => { + let fut = (self.as_mut().f())(v); + self.as_mut().future().set(Some(fut)); + } + None => return Poll::Ready(None), + } + } + true => { + let res = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + self.as_mut().future().set(None); + + match res { + Some(v) => return Poll::Ready(Some(v)), + None => (), + } } - }, - None => Poll::Ready(None), + } } } } diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index 18ddcd815..6577ba874 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -7,46 +7,60 @@ use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct FoldFuture { +pub struct FoldFuture { stream: S, f: F, + future: Option, acc: Option, __t: PhantomData, } -impl FoldFuture { +impl FoldFuture { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(f: F); pin_utils::unsafe_unpinned!(acc: Option); + pin_utils::unsafe_pinned!(future: Option); pub(super) fn new(stream: S, init: B, f: F) -> Self { FoldFuture { stream, f, + future: None, acc: Some(init), __t: PhantomData, } } } -impl Future for FoldFuture +impl Future for FoldFuture where S: Stream + Sized, - F: FnMut(B, S::Item) -> B, + F: FnMut(B, S::Item) -> Fut, + Fut: Future, { type Output = B; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match self.future.is_some() { + false => { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); - match next { - Some(v) => { - let old = self.as_mut().acc().take().unwrap(); - let new = (self.as_mut().f())(old, v); - *self.as_mut().acc() = Some(new); + match next { + Some(v) => { + let old = self.as_mut().acc().take().unwrap(); + let fut = (self.as_mut().f())(old, v); + self.as_mut().future().set(Some(fut)); + } + None => return Poll::Ready(self.as_mut().acc().take().unwrap()), + } + } + true => { + let res = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + self.as_mut().future().set(None); + *self.as_mut().acc() = Some(res); } - None => return Poll::Ready(self.as_mut().acc().take().unwrap()), } } } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 07de323a6..d17312b97 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -69,6 +69,7 @@ use std::marker::PhantomData; use cfg_if::cfg_if; +use crate::future::Future; use crate::utils::extension_trait; cfg_if! { @@ -353,7 +354,7 @@ extension_trait! { .inspect(|x| println!("about to filter {}", x)) .filter(|x| x % 2 == 0) .inspect(|x| println!("made it through filter: {}", x)) - .fold(0, |sum, i| sum + i).await; + .fold(0, |sum, i| async move { sum + i }).await; assert_eq!(sum, 6); # @@ -447,7 +448,12 @@ extension_trait! { let s: VecDeque<&str> = vec!["1", "lol", "3", "NaN", "5"].into_iter().collect(); - let mut parsed = s.filter_map(|a| a.parse::().ok()); + let mut parsed = s.filter_map(|a| + async move { + a.parse::().ok() + }); + + let mut parsed = unsafe { std::pin::Pin::new_unchecked(&mut parsed) }; let one = parsed.next().await; assert_eq!(one, Some(1)); @@ -464,10 +470,11 @@ extension_trait! { # }) } ``` "#] - fn filter_map(self, f: F) -> FilterMap + fn filter_map(self, f: F) -> FilterMap where Self: Sized, - F: FnMut(Self::Item) -> Option, + F: FnMut(Self::Item) -> Fut, + Fut: Future>, { FilterMap::new(self, f) } @@ -602,7 +609,7 @@ extension_trait! { use async_std::stream; let mut s = stream::repeat::(42).take(3); - assert!(s.all(|x| x == 42).await); + assert!(s.all(|x| async move { x == 42 }).await); # # }) } @@ -617,23 +624,25 @@ extension_trait! { use async_std::stream; let mut s = stream::empty::(); - assert!(s.all(|_| false).await); + assert!(s.all(|_| async { false }).await); # # }) } ``` "#] #[inline] - fn all( + fn all( &mut self, f: F, - ) -> impl Future + '_ [AllFuture<'_, Self, F, Self::Item>] + ) -> impl Future + '_ [AllFuture<'_, Self, F, Fut, Self::Item>] where Self: Unpin + Sized, - F: FnMut(Self::Item) -> bool, + F: FnMut(Self::Item) -> Fut, + Fut: Future, { AllFuture { stream: self, result: true, // the default if the empty stream + future: None, _marker: PhantomData, f, } @@ -697,21 +706,26 @@ extension_trait! { use async_std::prelude::*; use std::collections::VecDeque; - let mut s: VecDeque<&str> = vec!["lol", "NaN", "2", "5"].into_iter().collect(); - let first_number = s.find_map(|s| s.parse().ok()).await; + let s: VecDeque<&str> = vec!["lol", "NaN", "2", "5"].into_iter().collect(); + pin_utils::pin_mut!(s); + + let first_number = s.find_map(|s| async move { + s.parse().ok() + }).await; - assert_eq!(first_number, Some(2)); + assert_eq!(first_number, Some(2u8)); # # }) } ``` "#] - fn find_map( + fn find_map( &mut self, f: F, - ) -> impl Future> + '_ [FindMapFuture<'_, Self, F, Self::Item, B>] + ) -> impl Future> + '_ [FindMapFuture<'_, Self, F, Fut, Self::Item, B>] where Self: Sized, - F: FnMut(Self::Item) -> Option, + F: FnMut(Self::Item) -> Fut, + Fut: Future>, { FindMapFuture::new(self, f) } @@ -731,21 +745,25 @@ extension_trait! { use std::collections::VecDeque; let s: VecDeque = vec![1, 2, 3].into_iter().collect(); - let sum = s.fold(0, |acc, x| acc + x).await; + pin_utils::pin_mut!(s); + let sum = s.fold(0, |acc, x| async move { + acc + x + }).await; assert_eq!(sum, 6); # # }) } ``` "#] - fn fold( + fn fold( self, init: B, f: F, - ) -> impl Future [FoldFuture] + ) -> impl Future [FoldFuture] where Self: Sized, - F: FnMut(B, Self::Item) -> B, + F: FnMut(B, Self::Item) -> Fut, + Fut: Future, { FoldFuture::new(self, init, f) } @@ -775,7 +793,7 @@ extension_trait! { use async_std::stream; let mut s = stream::repeat::(42).take(3); - assert!(s.any(|x| x == 42).await); + assert!(s.any(|x| async move { x == 42 }).await); # # }) } ``` @@ -789,23 +807,25 @@ extension_trait! { use async_std::stream; let mut s = stream::empty::(); - assert!(!s.any(|_| false).await); + assert!(!s.any(|_| async { false }).await); # # }) } ``` "#] #[inline] - fn any( + fn any( &mut self, f: F, - ) -> impl Future + '_ [AnyFuture<'_, Self, F, Self::Item>] + ) -> impl Future + '_ [AnyFuture<'_, Self, F, Fut, Self::Item>] where Self: Unpin + Sized, - F: FnMut(Self::Item) -> bool, + F: FnMut(Self::Item) -> Fut, + Fut: Future, { AnyFuture { stream: self, result: false, // the default if the empty stream + future: None, _marker: PhantomData, f, } From 1280e2fce17dcda0823009a9e26eaef8f138a208 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 2 Oct 2019 22:15:54 +0300 Subject: [PATCH 2/9] Fix build and update for_each --- src/stream/stream/for_each.rs | 33 +++++++++++++++++++++++++-------- src/stream/stream/mod.rs | 22 +++++++++++----------- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs index 0406a5075..33839c917 100644 --- a/src/stream/stream/for_each.rs +++ b/src/stream/stream/for_each.rs @@ -7,39 +7,56 @@ use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct ForEachFuture { +pub struct ForEachFuture { stream: S, f: F, + future: Option, __t: PhantomData, } -impl ForEachFuture { +impl ForEachFuture { pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_pinned!(future: Option); pin_utils::unsafe_unpinned!(f: F); pub(super) fn new(stream: S, f: F) -> Self { ForEachFuture { stream, f, + future: None, __t: PhantomData, } } } -impl Future for ForEachFuture +impl Future for ForEachFuture where S: Stream + Sized, - F: FnMut(S::Item), + F: FnMut(S::Item) -> Fut, + Fut: Future, { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match self.future.is_some() { + false => { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); - match next { - Some(v) => (self.as_mut().f())(v), - None => return Poll::Ready(()), + match next { + Some(v) => { + let fut = (self.as_mut().f())(v); + self.as_mut().future().set(Some(fut)); + } + None => return Poll::Ready(()), + } + } + true => { + let _ = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + + self.as_mut().future().set(None); + } } } } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 752fca12a..53b0775f2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -88,7 +88,6 @@ cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { use std::pin::Pin; - use crate::future::Future; use crate::stream::FromStream; } } @@ -782,27 +781,28 @@ extension_trait! { # use async_std::prelude::*; use std::collections::VecDeque; - use std::sync::mpsc::channel; - - let (tx, rx) = channel(); - let s: VecDeque = vec![1, 2, 3].into_iter().collect(); - let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await; + let mut x = 0; - let v: Vec<_> = rx.iter().collect(); + let s: VecDeque<_> = vec![1u8, 2, 3].into_iter().collect(); + s.for_each(|item| { + x += item; + futures::future::ready(()) + }).await; - assert_eq!(v, vec![1, 2, 3]); + assert_eq!(x, 6); # # }) } ``` "#] - fn for_each( + fn for_each( self, f: F, - ) -> impl Future [ForEachFuture] + ) -> impl Future [ForEachFuture] where Self: Sized, - F: FnMut(Self::Item), + F: FnMut(Self::Item) -> Fut, + Fut: Future, { ForEachFuture::new(self, f) } From 37fed3a565e838697ec268bff09e9f75fb1ccde1 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Thu, 3 Oct 2019 08:41:12 +0300 Subject: [PATCH 3/9] Updates map to async closures --- src/stream/stream/map.rs | 37 +++++++++++++++++++++++++++++++------ src/stream/stream/mod.rs | 10 ++++++---- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs index 4bc2e366a..05c30be1c 100644 --- a/src/stream/stream/map.rs +++ b/src/stream/stream/map.rs @@ -1,41 +1,66 @@ use std::marker::PhantomData; use std::pin::Pin; +use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct Map { +pub struct Map { stream: S, f: F, + future: Option, __from: PhantomData, __to: PhantomData, } -impl Map { +impl Map { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_pinned!(future: Option); pub(crate) fn new(stream: S, f: F) -> Self { Map { stream, f, + future: None, __from: PhantomData, __to: PhantomData, } } } -impl Stream for Map +impl Stream for Map where S: Stream, - F: FnMut(S::Item) -> B, + F: FnMut(S::Item) -> Fut, + Fut: Future, { type Item = B; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); - Poll::Ready(next.map(self.as_mut().f())) + loop { + match self.future.is_some() { + false => { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => { + let fut = (self.as_mut().f())(v); + self.as_mut().future().set(Some(fut)); + } + None => { + return Poll::Ready(None); + } + } + } + true => { + let res = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + self.as_mut().future().set(None); + return Poll::Ready(Some(res)); + } + } + } } } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index b29f5b54f..713ebcfe8 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -351,8 +351,9 @@ extension_trait! { use async_std::prelude::*; use std::collections::VecDeque; - let s: VecDeque<_> = vec![1, 2, 3].into_iter().collect(); - let mut s = s.map(|x| 2 * x); + let s: VecDeque<_> = vec![1u8, 2, 3].into_iter().collect(); + let s = s.map(|x| async move { 2 * x }); + pin_utils::pin_mut!(s); assert_eq!(s.next().await, Some(2)); assert_eq!(s.next().await, Some(4)); @@ -363,10 +364,11 @@ extension_trait! { # }) } ``` "#] - fn map(self, f: F) -> Map + fn map(self, f: F) -> Map where Self: Sized, - F: FnMut(Self::Item) -> B, + F: FnMut(Self::Item) -> Fut, + Fut: Future, { Map::new(self, f) } From ba245453d7b04d3d64d4e3323637f682b9d091b7 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sat, 5 Oct 2019 20:17:07 +0300 Subject: [PATCH 4/9] Nits from review --- src/stream/stream/mod.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 713ebcfe8..89d7bed4d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -486,12 +486,12 @@ extension_trait! { let s: VecDeque<&str> = vec!["1", "lol", "3", "NaN", "5"].into_iter().collect(); - let mut parsed = s.filter_map(|a| + let parsed = s.filter_map(|a| async move { a.parse::().ok() }); - let mut parsed = unsafe { std::pin::Pin::new_unchecked(&mut parsed) }; + pin_utils::pin_mut!(parsed); let one = parsed.next().await; assert_eq!(one, Some(1)); @@ -816,16 +816,19 @@ extension_trait! { # use async_std::prelude::*; use std::collections::VecDeque; + use std::sync::{Arc, Mutex}; - let mut x = 0; + let x = Arc::new(Mutex::new(0u8)); let s: VecDeque<_> = vec![1u8, 2, 3].into_iter().collect(); s.for_each(|item| { - x += item; - futures::future::ready(()) + let x = x.clone(); + async move { + *x.lock().unwrap() += item; + } }).await; - assert_eq!(x, 6); + assert_eq!(*x.lock().unwrap(), 6); # # }) } ``` From 7f6d8f9344894e7f054f98e54de50d59f7b69481 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sat, 5 Oct 2019 20:45:44 +0300 Subject: [PATCH 5/9] Use appropriate Mutex in async context --- src/stream/stream/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 89d7bed4d..8403fdcb0 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -815,8 +815,9 @@ extension_trait! { # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; + use async_std::sync::Mutex; use std::collections::VecDeque; - use std::sync::{Arc, Mutex}; + use std::sync::Arc; let x = Arc::new(Mutex::new(0u8)); @@ -824,11 +825,11 @@ extension_trait! { s.for_each(|item| { let x = x.clone(); async move { - *x.lock().unwrap() += item; + *x.lock().await += item; } }).await; - assert_eq!(*x.lock().unwrap(), 6); + assert_eq!(*x.lock().await, 6); # # }) } ``` From 9df3afc6e1c8c84f14ad83e3125ff8190e498fa4 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sat, 5 Oct 2019 22:32:41 +0300 Subject: [PATCH 6/9] Roll back the for_each example --- src/stream/stream/mod.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8403fdcb0..26f7c6313 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -815,21 +815,16 @@ extension_trait! { # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; - use async_std::sync::Mutex; use std::collections::VecDeque; - use std::sync::Arc; - - let x = Arc::new(Mutex::new(0u8)); + let mut x = 0; let s: VecDeque<_> = vec![1u8, 2, 3].into_iter().collect(); s.for_each(|item| { - let x = x.clone(); - async move { - *x.lock().await += item; - } + x += item; + futures::future::ready(()) }).await; - assert_eq!(*x.lock().await, 6); + assert_eq!(x, 6); # # }) } ``` From f2a61103d4250759b7561bb5219bda6bebbb2029 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 9 Oct 2019 10:44:14 +0300 Subject: [PATCH 7/9] Fixes for_each usage in extend implementations --- src/collections/binary_heap/extend.rs | 5 ++++- src/collections/btree_map/extend.rs | 1 + src/collections/btree_set/extend.rs | 1 + src/collections/hash_map/extend.rs | 1 + src/collections/hash_set/extend.rs | 1 + src/collections/linked_list/extend.rs | 5 ++++- src/collections/vec_deque/extend.rs | 5 ++++- src/string/extend.rs | 20 ++++++++++++++++---- src/unit/from_stream.rs | 2 +- src/vec/extend.rs | 5 ++++- 10 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/collections/binary_heap/extend.rs b/src/collections/binary_heap/extend.rs index 4503fe393..04b704c03 100644 --- a/src/collections/binary_heap/extend.rs +++ b/src/collections/binary_heap/extend.rs @@ -13,6 +13,9 @@ impl Extend for BinaryHeap { self.reserve(stream.size_hint().0); - Box::pin(stream.for_each(move |item| self.push(item))) + Box::pin(stream.for_each(move |item| { + self.push(item); + async {} + })) } } diff --git a/src/collections/btree_map/extend.rs b/src/collections/btree_map/extend.rs index ae02c4248..7f227089b 100644 --- a/src/collections/btree_map/extend.rs +++ b/src/collections/btree_map/extend.rs @@ -11,6 +11,7 @@ impl Extend<(K, V)> for BTreeMap { ) -> Pin + 'a>> { Box::pin(stream.into_stream().for_each(move |(k, v)| { self.insert(k, v); + async {} })) } } diff --git a/src/collections/btree_set/extend.rs b/src/collections/btree_set/extend.rs index ccf033783..b145e388b 100644 --- a/src/collections/btree_set/extend.rs +++ b/src/collections/btree_set/extend.rs @@ -11,6 +11,7 @@ impl Extend for BTreeSet { ) -> Pin + 'a>> { Box::pin(stream.into_stream().for_each(move |item| { self.insert(item); + async {} })) } } diff --git a/src/collections/hash_map/extend.rs b/src/collections/hash_map/extend.rs index c34bb9ed3..b0747d841 100644 --- a/src/collections/hash_map/extend.rs +++ b/src/collections/hash_map/extend.rs @@ -32,6 +32,7 @@ where Box::pin(stream.for_each(move |(k, v)| { self.insert(k, v); + async {} })) } } diff --git a/src/collections/hash_set/extend.rs b/src/collections/hash_set/extend.rs index 123e844e2..f67490c34 100644 --- a/src/collections/hash_set/extend.rs +++ b/src/collections/hash_set/extend.rs @@ -35,6 +35,7 @@ where Box::pin(stream.for_each(move |item| { self.insert(item); + async {} })) } } diff --git a/src/collections/linked_list/extend.rs b/src/collections/linked_list/extend.rs index 63a1a97c3..7d02a2453 100644 --- a/src/collections/linked_list/extend.rs +++ b/src/collections/linked_list/extend.rs @@ -10,6 +10,9 @@ impl Extend for LinkedList { stream: S, ) -> Pin + 'a>> { let stream = stream.into_stream(); - Box::pin(stream.for_each(move |item| self.push_back(item))) + Box::pin(stream.for_each(move |item| { + self.push_back(item); + async {} + })) } } diff --git a/src/collections/vec_deque/extend.rs b/src/collections/vec_deque/extend.rs index 17e396ab8..5599b3841 100644 --- a/src/collections/vec_deque/extend.rs +++ b/src/collections/vec_deque/extend.rs @@ -13,6 +13,9 @@ impl Extend for VecDeque { self.reserve(stream.size_hint().0); - Box::pin(stream.for_each(move |item| self.push_back(item))) + Box::pin(stream.for_each(move |item| { + self.push_back(item); + async {} + })) } } diff --git a/src/string/extend.rs b/src/string/extend.rs index 8572cc3ce..6a8c4c85d 100644 --- a/src/string/extend.rs +++ b/src/string/extend.rs @@ -13,7 +13,10 @@ impl Extend for String { self.reserve(stream.size_hint().0); - Box::pin(stream.for_each(move |c| self.push(c))) + Box::pin(stream.for_each(move |c| { + self.push(c); + async {} + })) } } @@ -40,7 +43,10 @@ impl<'b> Extend<&'b str> for String { where 'b: 'a, { - Box::pin(stream.into_stream().for_each(move |s| self.push_str(s))) + Box::pin(stream.into_stream().for_each(move |s| { + self.push_str(s); + async {} + })) } } @@ -49,7 +55,10 @@ impl Extend for String { &'a mut self, stream: S, ) -> Pin + 'a>> { - Box::pin(stream.into_stream().for_each(move |s| self.push_str(&s))) + Box::pin(stream.into_stream().for_each(move |s| { + self.push_str(&s); + async {} + })) } } @@ -61,6 +70,9 @@ impl<'b> Extend> for String { where 'b: 'a, { - Box::pin(stream.into_stream().for_each(move |s| self.push_str(&s))) + Box::pin(stream.into_stream().for_each(move |s| { + self.push_str(&s); + async {} + })) } } diff --git a/src/unit/from_stream.rs b/src/unit/from_stream.rs index a238982da..efa7f4ffe 100644 --- a/src/unit/from_stream.rs +++ b/src/unit/from_stream.rs @@ -11,6 +11,6 @@ impl FromStream<()> for () { where ::IntoStream: 'a, { - Box::pin(stream.into_stream().for_each(|_| ())) + Box::pin(stream.into_stream().for_each(|_| async {})) } } diff --git a/src/vec/extend.rs b/src/vec/extend.rs index ecf68c83f..80a561f59 100644 --- a/src/vec/extend.rs +++ b/src/vec/extend.rs @@ -12,6 +12,9 @@ impl Extend for Vec { self.reserve(stream.size_hint().0); - Box::pin(stream.for_each(move |item| self.push(item))) + Box::pin(stream.for_each(move |item| { + self.push(item); + async {} + })) } } From 27eee8c8a9b2801b5cea4d3a722cb68580fabe64 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Fri, 10 Apr 2020 11:12:18 +0300 Subject: [PATCH 8/9] Some fixes after a merge --- src/option/from_stream.rs | 2 +- src/option/product.rs | 7 ++++--- src/option/sum.rs | 4 ++-- src/result/from_stream.rs | 11 +++++++---- src/result/product.rs | 26 +++++++++++++++----------- src/result/sum.rs | 26 +++++++++++++++----------- src/stream/product.rs | 9 ++++----- src/stream/stream/flat_map.rs | 20 ++++++++++++++------ src/stream/stream/mod.rs | 16 ++++++++-------- src/stream/sum.rs | 8 ++++---- 10 files changed, 74 insertions(+), 55 deletions(-) diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index de929ca94..6c03f4fa3 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -29,7 +29,7 @@ where false } }) - .filter_map(identity) + .filter_map(|a| async move { identity(a) }) .collect() .await; diff --git a/src/option/product.rs b/src/option/product.rs index b446c1ffe..6ae5a5a14 100644 --- a/src/option/product.rs +++ b/src/option/product.rs @@ -25,12 +25,13 @@ where use async_std::stream; let v = stream::from_iter(vec![1, 2, 4]); - let prod: Option = v.map(|x| + let prod: Option = v.map(|x| async move { if x < 0 { None } else { Some(x) - }).product().await; + } + }).product().await; assert_eq!(prod, Some(8)); # # }) } @@ -53,7 +54,7 @@ where false } }) - .filter_map(identity), + .filter_map(|a| async move { identity(a) }), ) .await; diff --git a/src/option/sum.rs b/src/option/sum.rs index de404f42d..422ce62c0 100644 --- a/src/option/sum.rs +++ b/src/option/sum.rs @@ -25,7 +25,7 @@ where use async_std::stream; let words = stream::from_iter(vec!["have", "a", "great", "day"]); - let total: Option = words.map(|w| w.find('a')).sum().await; + let total: Option = words.map(|w| async move { w.find('a') }).sum().await; assert_eq!(total, Some(5)); # # }) } @@ -48,7 +48,7 @@ where false } }) - .filter_map(identity), + .filter_map(|a| async move { identity(a) }), ) .await; diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index 8a8e0eaf3..7cfd02b77 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -20,9 +20,9 @@ where /// use async_std::stream; /// /// let v = stream::from_iter(vec![1, 2]); - /// let res: Result, &'static str> = v.map(|x: u32| + /// let res: Result, &'static str> = v.map(|x: u32| async move { /// x.checked_add(1).ok_or("Overflow!") - /// ).collect().await; + /// }).collect().await; /// assert_eq!(res, Ok(vec![2, 3])); /// # /// # }) } @@ -48,12 +48,15 @@ where true }) }) - .filter_map(|elem| match elem { + .filter_map(|elem| { + let res = match elem { Ok(value) => Some(value), Err(err) => { found_error = Some(err); None - } + }, + }; + async { res } }) .collect() .await; diff --git a/src/result/product.rs b/src/result/product.rs index 45782ff70..decebd368 100644 --- a/src/result/product.rs +++ b/src/result/product.rs @@ -24,11 +24,12 @@ where use async_std::stream; let v = stream::from_iter(vec![1, 2, 4]); - let res: Result = v.map(|x| - if x < 0 { - Err("Negative element found") - } else { - Ok(x) + let res: Result = v.map(|x| async move { + if x < 0 { + Err("Negative element found") + } else { + Ok(x) + } }).product().await; assert_eq!(res, Ok(8)); # @@ -55,12 +56,15 @@ where true }) }) - .filter_map(|elem| match elem { - Ok(value) => Some(value), - Err(err) => { - found_error = Some(err); - None - } + .filter_map(|elem| { + let res = match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None + } + }; + async { res } }), ) .await; diff --git a/src/result/sum.rs b/src/result/sum.rs index b6d84a0c4..4665b57b0 100644 --- a/src/result/sum.rs +++ b/src/result/sum.rs @@ -24,11 +24,12 @@ where use async_std::stream; let v = stream::from_iter(vec![1, 2]); - let res: Result = v.map(|x| - if x < 0 { - Err("Negative element found") - } else { - Ok(x) + let res: Result = v.map(|x| async move { + if x < 0 { + Err("Negative element found") + } else { + Ok(x) + } }).sum().await; assert_eq!(res, Ok(3)); # @@ -55,12 +56,15 @@ where true }) }) - .filter_map(|elem| match elem { - Ok(value) => Some(value), - Err(err) => { - found_error = Some(err); - None - } + .filter_map(|elem| { + let res = match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None + } + }; + async { res } }), ) .await; diff --git a/src/stream/product.rs b/src/stream/product.rs index 15497e87c..a32a9a5c2 100644 --- a/src/stream/product.rs +++ b/src/stream/product.rs @@ -23,7 +23,6 @@ pub trait Product: Sized { S: Stream + 'a; } -use core::ops::Mul; use core::num::Wrapping; use crate::stream::stream::StreamExt; @@ -34,7 +33,7 @@ macro_rules! integer_product { where S: Stream + 'a, { - Box::pin(async move { stream.fold($one, Mul::mul).await } ) + Box::pin(async move { stream.fold($one, |a, b| async move { a * b }).await } ) } } impl<'a> Product<&'a $a> for $a { @@ -42,7 +41,7 @@ macro_rules! integer_product { where S: Stream + 'b, { - Box::pin(async move { stream.fold($one, Mul::mul).await } ) + Box::pin(async move { stream.fold($one, |a, b| async move { a * b }).await } ) } } )*); @@ -58,14 +57,14 @@ macro_rules! float_product { fn product<'a, S>(stream: S) -> Pin+ 'a>> where S: Stream + 'a, { - Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } ) + Box::pin(async move { stream.fold(1.0, |a, b| async move { a * b }).await } ) } } impl<'a> Product<&'a $a> for $a { fn product<'b, S>(stream: S) -> Pin+ 'b>> where S: Stream + 'b, { - Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } ) + Box::pin(async move { stream.fold(1.0, |a, b| async move { a * b }).await } ) } } )*); diff --git a/src/stream/stream/flat_map.rs b/src/stream/stream/flat_map.rs index e07893a94..aea636e3d 100644 --- a/src/stream/stream/flat_map.rs +++ b/src/stream/stream/flat_map.rs @@ -1,7 +1,9 @@ use core::pin::Pin; +use core::marker::PhantomData; use pin_project_lite::pin_project; +use crate::future::Future; use crate::stream::stream::map::Map; use crate::stream::stream::StreamExt; use crate::stream::{IntoStream, Stream}; @@ -16,33 +18,39 @@ pin_project! { /// /// [`flat_map`]: trait.Stream.html#method.flat_map /// [`Stream`]: trait.Stream.html - pub struct FlatMap { + pub struct FlatMap { #[pin] - stream: Map, + stream: Map, #[pin] inner_stream: Option, + __fut: PhantomData, + __from: PhantomData, } } -impl FlatMap +impl FlatMap where S: Stream, U: IntoStream, - F: FnMut(S::Item) -> U, + F: FnMut(S::Item) -> Fut, + Fut: Future, { pub(super) fn new(stream: S, f: F) -> Self { Self { stream: stream.map(f), inner_stream: None, + __fut: PhantomData, + __from: PhantomData, } } } -impl Stream for FlatMap +impl Stream for FlatMap where S: Stream, U: Stream, - F: FnMut(S::Item) -> U, + F: FnMut(S::Item) -> Fut, + Fut: Future, { type Item = U::Item; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8f07f328a..a633af74d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -114,7 +114,6 @@ use std::cmp::Ordering; use crate::future::Future; cfg_unstable! { - use core::future::Future; use core::pin::Pin; use core::time::Duration; @@ -799,27 +798,28 @@ extension_trait! { let words = stream::from_iter(&["alpha", "beta", "gamma"]); let merged: String = words - .flat_map(|s| stream::from_iter(s.chars())) + .flat_map(|s| async move { stream::from_iter(s.chars()) }) .collect().await; assert_eq!(merged, "alphabetagamma"); - let d3 = stream::from_iter(&[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]); + let d3 = stream::from_iter(&[[[1u8, 2], [3, 4]], [[5, 6], [7, 8]]]); let d1: Vec<_> = d3 - .flat_map(|item| stream::from_iter(item)) - .flat_map(|item| stream::from_iter(item)) + .flat_map(|item| async move { stream::from_iter(item) }) + .flat_map(|item| async move { stream::from_iter(item) }) .collect().await; - assert_eq!(d1, [&1, &2, &3, &4, &5, &6, &7, &8]); + assert_eq!(d1, [&1u8, &2, &3, &4, &5, &6, &7, &8]); # }); ``` "#] #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - fn flat_map(self, f: F) -> FlatMap + fn flat_map(self, f: F) -> FlatMap where Self: Sized, U: IntoStream, - F: FnMut(Self::Item) -> U, + F: FnMut(Self::Item) -> Fut, + Fut: Future, { FlatMap::new(self, f) } diff --git a/src/stream/sum.rs b/src/stream/sum.rs index 3b3144e5e..6d7981b2d 100644 --- a/src/stream/sum.rs +++ b/src/stream/sum.rs @@ -34,7 +34,7 @@ macro_rules! integer_sum { where S: Stream + 'a, { - Box::pin(async move { stream.fold($zero, Add::add).await } ) + Box::pin(async move { stream.fold($zero, |a, b| async move { a + b }).await } ) } } impl<'a> Sum<&'a $a> for $a { @@ -42,7 +42,7 @@ macro_rules! integer_sum { where S: Stream + 'b, { - Box::pin(async move { stream.fold($zero, Add::add).await } ) + Box::pin(async move { stream.fold($zero, |a, b| async move { a + b }).await } ) } } )*); @@ -58,14 +58,14 @@ macro_rules! float_sum { fn sum<'a, S>(stream: S) -> Pin + 'a>> where S: Stream + 'a, { - Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } ) + Box::pin(async move { stream.fold(0.0, |a, b| async move { a + b }).await } ) } } impl<'a> Sum<&'a $a> for $a { fn sum<'b, S>(stream: S) -> Pin + 'b>> where S: Stream + 'b, { - Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } ) + Box::pin(async move { stream.fold(0.0, |a, b| async move { a + b }).await } ) } } )*); From 6a91e524d3f69a3310ecf2aea82dcef0079a4375 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Fri, 10 Apr 2020 11:28:27 +0300 Subject: [PATCH 9/9] Remove a warning --- src/stream/sum.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stream/sum.rs b/src/stream/sum.rs index 6d7981b2d..d40c698b6 100644 --- a/src/stream/sum.rs +++ b/src/stream/sum.rs @@ -25,7 +25,6 @@ pub trait Sum: Sized { use crate::stream::stream::StreamExt; use core::num::Wrapping; -use core::ops::Add; macro_rules! integer_sum { (@impls $zero: expr, $($a:ty)*) => ($(