async_std/stream/stream/
flat_map.rs1use core::pin::Pin;
2
3use pin_project_lite::pin_project;
4
5use crate::stream::stream::map::Map;
6use crate::stream::stream::StreamExt;
7use crate::stream::{IntoStream, Stream};
8use crate::task::{Context, Poll};
9
10pin_project! {
11 pub struct FlatMap<S, U, F> {
20 #[pin]
21 stream: Map<S, F>,
22 #[pin]
23 inner_stream: Option<U>,
24 }
25}
26
27impl<S, U, F> FlatMap<S, U, F>
28where
29 S: Stream,
30 U: IntoStream,
31 F: FnMut(S::Item) -> U,
32{
33 pub(super) fn new(stream: S, f: F) -> Self {
34 Self {
35 stream: stream.map(f),
36 inner_stream: None,
37 }
38 }
39}
40
41impl<S, U, F> Stream for FlatMap<S, U, F>
42where
43 S: Stream,
44 U: Stream,
45 F: FnMut(S::Item) -> U,
46{
47 type Item = U::Item;
48
49 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50 let mut this = self.project();
51 loop {
52 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
53 match futures_core::ready!(inner.poll_next(cx)) {
54 item @ Some(_) => return Poll::Ready(item),
55 None => this.inner_stream.set(None),
56 }
57 }
58
59 match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
60 inner @ Some(_) => this.inner_stream.set(inner.map(IntoStream::into_stream)),
61 None => return Poll::Ready(None),
62 }
63 }
64 }
65}