async_std/stream/stream/
flat_map.rs

1use core::pin::Pin;
2
3use pin_project_lite::pin_project;
4
5use crate::stream::stream::map::Map;
6use crate::stream::stream::StreamExt;
7use crate::stream::{IntoStream, Stream};
8use crate::task::{Context, Poll};
9
10pin_project! {
11    /// A stream that maps each element to a stream, and yields the elements of the produced
12    /// streams.
13    ///
14    /// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
15    /// documentation for more.
16    ///
17    /// [`flat_map`]: trait.Stream.html#method.flat_map
18    /// [`Stream`]: trait.Stream.html
19    pub struct FlatMap<S, U, F> {
20        #[pin]
21        stream: Map<S, F>,
22        #[pin]
23        inner_stream: Option<U>,
24    }
25}
26
27impl<S, U, F> FlatMap<S, U, F>
28where
29    S: Stream,
30    U: IntoStream,
31    F: FnMut(S::Item) -> U,
32{
33    pub(super) fn new(stream: S, f: F) -> Self {
34        Self {
35            stream: stream.map(f),
36            inner_stream: None,
37        }
38    }
39}
40
41impl<S, U, F> Stream for FlatMap<S, U, F>
42where
43    S: Stream,
44    U: Stream,
45    F: FnMut(S::Item) -> U,
46{
47    type Item = U::Item;
48
49    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50        let mut this = self.project();
51        loop {
52            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
53                match futures_core::ready!(inner.poll_next(cx)) {
54                    item @ Some(_) => return Poll::Ready(item),
55                    None => this.inner_stream.set(None),
56                }
57            }
58
59            match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
60                inner @ Some(_) => this.inner_stream.set(inner.map(IntoStream::into_stream)),
61                None => return Poll::Ready(None),
62            }
63        }
64    }
65}