From 8f2ed482e0662b4b0dd53b5d8868a1ac78438cc4 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Thu, 17 Oct 2019 23:36:51 +0200 Subject: [PATCH 1/7] add stream::peekable peek() implementation is still missing --- src/stream/stream/mod.rs | 33 +++++++++++++++++++++++ src/stream/stream/peekable.rs | 49 +++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 src/stream/stream/peekable.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 764dc9782..c9d70c5d2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -44,6 +44,7 @@ mod min_by; mod next; mod nth; mod partial_cmp; +mod peekable; mod scan; mod skip; mod skip_while; @@ -80,6 +81,7 @@ pub use filter::Filter; pub use fuse::Fuse; pub use inspect::Inspect; pub use map::Map; +pub use peekable::Peekable; pub use scan::Scan; pub use skip::Skip; pub use skip_while::SkipWhile; @@ -984,6 +986,37 @@ extension_trait! { } } + #[doc = r#" + ## Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + let mut s = s.peekable(); + + assert_eq!(s.next().await, Some(1)); + assert_eq!(s.peek().await, Some(1)); + assert_eq!(s.next().await, Some(2)); + assert_eq!(s.next().await, Some(3)); + assert_eq!(s.next().await, None); + # + # }) } + ``` + "#] + #[inline] + fn peekable(self) -> Peekable + where + Self: Sized, + { + Peekable::new(self) + } + + #[doc = r#" A stream adaptor similar to [`fold`] that holds internal state and produces a new stream. diff --git a/src/stream/stream/peekable.rs b/src/stream/stream/peekable.rs new file mode 100644 index 000000000..9be76a0ff --- /dev/null +++ b/src/stream/stream/peekable.rs @@ -0,0 +1,49 @@ +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Peekable { + stream: S, + peeked: Option>, +} + + +impl Peekable +where + S: Stream, +{ + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(peeked: Option>); + + pub(crate) fn new(stream: S) -> Self { + Peekable{ + stream: stream, + peeked: None, + } + } + + pub fn peek(&mut self) -> Poll> { + Poll::Ready(None) + } + +} + +impl Stream for Peekable +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &self.peeked { + Some(_) => Poll::Ready(self.as_mut().peeked().take().unwrap()) , + None => { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + Poll::Ready(next) + } + } + } +} From a2dc7530d02d5d1f05e453b4e8fcfe8b9ab8fe8c Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Fri, 18 Oct 2019 00:14:24 +0200 Subject: [PATCH 2/7] change type of peeked --- src/stream/stream/peekable.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/stream/stream/peekable.rs b/src/stream/stream/peekable.rs index 9be76a0ff..bc76f038a 100644 --- a/src/stream/stream/peekable.rs +++ b/src/stream/stream/peekable.rs @@ -7,7 +7,14 @@ use crate::task::{Context, Poll}; #[allow(missing_debug_implementations)] pub struct Peekable { stream: S, - peeked: Option>, + peeked: Option>>, +} + +pub struct PeekFuture<'a, T: Unpin + ?Sized> { + pub(crate) stream: &'a T, +} + +impl Future for PeekFuture<'_, T> { } From d644eee36d1768fcf542c98cb56d4a4234d3512f Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Mon, 21 Oct 2019 15:32:13 +0200 Subject: [PATCH 3/7] match next in poll_next --- src/stream/stream/peekable.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/stream/stream/peekable.rs b/src/stream/stream/peekable.rs index bc76f038a..dd164ae92 100644 --- a/src/stream/stream/peekable.rs +++ b/src/stream/stream/peekable.rs @@ -32,7 +32,7 @@ where } } - pub fn peek(&mut self) -> Poll> { + pub fn peek(&mut self) -> PeekFuture> { Poll::Ready(None) } @@ -46,11 +46,18 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &self.peeked { - Some(_) => Poll::Ready(self.as_mut().peeked().take().unwrap()) , + Some(_) => { + let v = Poll::Ready(self.as_mut().peeked().take().unwrap()); + self.peeked = None; + v + }, None => { let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); - Poll::Ready(next) - } + match next { + Some(v) => Poll::Ready(Some(v)), + None => Poll::Ready(None), + } + }, } } } From 0aef382e80c252f24374a4eed95b6945223aa9bb Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Mon, 21 Oct 2019 15:49:14 +0200 Subject: [PATCH 4/7] fix types and it compiles --- src/stream/stream/peekable.rs | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/stream/stream/peekable.rs b/src/stream/stream/peekable.rs index dd164ae92..503d7385b 100644 --- a/src/stream/stream/peekable.rs +++ b/src/stream/stream/peekable.rs @@ -1,22 +1,25 @@ use std::pin::Pin; +use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct Peekable { +pub struct Peekable +where + S: Sized, +{ stream: S, - peeked: Option>>, -} - -pub struct PeekFuture<'a, T: Unpin + ?Sized> { - pub(crate) stream: &'a T, + peeked: Option>, } -impl Future for PeekFuture<'_, T> { -} +//pub struct PeekFuture<'a, T: Unpin + ?Sized> { +//pub(crate) stream: &'a T, +//} +//impl Future for PeekFuture<'_, T> { +//} impl Peekable where @@ -26,16 +29,15 @@ where pin_utils::unsafe_unpinned!(peeked: Option>); pub(crate) fn new(stream: S) -> Self { - Peekable{ + Peekable { stream: stream, peeked: None, } } - pub fn peek(&mut self) -> PeekFuture> { - Poll::Ready(None) + pub fn peek(&mut self) -> impl Future> { + async { None } } - } impl Stream for Peekable @@ -46,18 +48,18 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &self.peeked { - Some(_) => { + Some(_) => { let v = Poll::Ready(self.as_mut().peeked().take().unwrap()); - self.peeked = None; + *self.as_mut().peeked() = None; v - }, + } None => { let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); match next { Some(v) => Poll::Ready(Some(v)), None => Poll::Ready(None), } - }, + } } } } From bd104c2d831b40caa50042f7e93f8ff567620658 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Tue, 29 Oct 2019 11:53:17 +0100 Subject: [PATCH 5/7] using next as peekable value --- src/stream/stream/peekable.rs | 39 +++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/src/stream/stream/peekable.rs b/src/stream/stream/peekable.rs index 503d7385b..32c4810cc 100644 --- a/src/stream/stream/peekable.rs +++ b/src/stream/stream/peekable.rs @@ -11,22 +11,15 @@ where S: Sized, { stream: S, - peeked: Option>, + peeked: Option>>, } -//pub struct PeekFuture<'a, T: Unpin + ?Sized> { -//pub(crate) stream: &'a T, -//} - -//impl Future for PeekFuture<'_, T> { -//} - impl Peekable where S: Stream, { pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_unpinned!(peeked: Option>); + pin_utils::unsafe_unpinned!(peeked: Option>>); pub(crate) fn new(stream: S) -> Self { Peekable { @@ -35,8 +28,15 @@ where } } - pub fn peek(&mut self) -> impl Future> { - async { None } + pub fn peek(mut self: Pin<&mut Self>) -> &Poll> { + match &self.peeked { + Some(peeked) => &Poll::Ready(None), + None => { + let next = self.stream.next(); + *self.as_mut().peeked() = next; + &next + } + } } } @@ -47,18 +47,21 @@ where type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match &self.peeked { - Some(_) => { - let v = Poll::Ready(self.as_mut().peeked().take().unwrap()); + Some(peeked) => { + let v = self.as_mut().peeked().take().unwrap(); + //*self.as_mut().peeked() = Some(Poll::Ready(next)); *self.as_mut().peeked() = None; + //*peeked + v + //Poll::Ready() // wrong thing to return just a dummy to match types + } None => { - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); - match next { - Some(v) => Poll::Ready(Some(v)), - None => Poll::Ready(None), - } + Poll::Ready(next) } } } From dc04fde2d74b83a967ffa879cd41434bcf68f1e3 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Tue, 29 Oct 2019 11:56:36 +0100 Subject: [PATCH 6/7] tidy up --- src/stream/stream/peekable.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/stream/stream/peekable.rs b/src/stream/stream/peekable.rs index 32c4810cc..8166e9579 100644 --- a/src/stream/stream/peekable.rs +++ b/src/stream/stream/peekable.rs @@ -30,8 +30,10 @@ where pub fn peek(mut self: Pin<&mut Self>) -> &Poll> { match &self.peeked { - Some(peeked) => &Poll::Ready(None), + Some(peeked) => + self.as_ref().peeked() None => { + // how to get the next element from here? let next = self.stream.next(); *self.as_mut().peeked() = next; &next @@ -52,13 +54,9 @@ where match &self.peeked { Some(peeked) => { let v = self.as_mut().peeked().take().unwrap(); - //*self.as_mut().peeked() = Some(Poll::Ready(next)); *self.as_mut().peeked() = None; - //*peeked v - //Poll::Ready() // wrong thing to return just a dummy to match types - } None => { Poll::Ready(next) From f37916c92d640dc69888edfee8f2150b6814b2e5 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Tue, 29 Oct 2019 14:13:26 +0100 Subject: [PATCH 7/7] fix types --- src/stream/stream/peekable.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/stream/stream/peekable.rs b/src/stream/stream/peekable.rs index 8166e9579..4095f84c2 100644 --- a/src/stream/stream/peekable.rs +++ b/src/stream/stream/peekable.rs @@ -3,6 +3,7 @@ use std::pin::Pin; use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; +use crate::stream::stream::StreamExt; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -30,12 +31,11 @@ where pub fn peek(mut self: Pin<&mut Self>) -> &Poll> { match &self.peeked { - Some(peeked) => - self.as_ref().peeked() + Some(peeked) => &peeked, None => { - // how to get the next element from here? + // how to get the next `next` value? What about `Context` let next = self.stream.next(); - *self.as_mut().peeked() = next; + *self.as_mut().peeked() = Some(next); &next } }