async_std/stream/stream/
zip.rs

1use core::fmt;
2use core::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::stream::Stream;
7use crate::task::{Context, Poll};
8
9pin_project! {
10    /// A stream that takes items from two other streams simultaneously.
11    ///
12    /// This `struct` is created by the [`zip`] method on [`Stream`]. See its
13    /// documentation for more.
14    ///
15    /// [`zip`]: trait.Stream.html#method.zip
16    /// [`Stream`]: trait.Stream.html
17    pub struct Zip<A: Stream, B> {
18        item_slot: Option<A::Item>,
19        #[pin]
20        first: A,
21        #[pin]
22        second: B,
23    }
24}
25
26impl<A: Stream + fmt::Debug, B: fmt::Debug> fmt::Debug for Zip<A, B> {
27    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
28        fmt.debug_struct("Zip")
29            .field("first", &self.first)
30            .field("second", &self.second)
31            .finish()
32    }
33}
34
35impl<A: Stream, B> Zip<A, B> {
36    pub(crate) fn new(first: A, second: B) -> Self {
37        Self {
38            item_slot: None,
39            first,
40            second,
41        }
42    }
43}
44
45impl<A: Stream, B: Stream> Stream for Zip<A, B> {
46    type Item = (A::Item, B::Item);
47
48    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49        let this = self.project();
50        if this.item_slot.is_none() {
51            match this.first.poll_next(cx) {
52                Poll::Pending => return Poll::Pending,
53                Poll::Ready(None) => return Poll::Ready(None),
54                Poll::Ready(Some(item)) => *this.item_slot = Some(item),
55            }
56        }
57        let second_item = futures_core::ready!(this.second.poll_next(cx));
58        let first_item = this.item_slot.take().unwrap();
59        Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
60    }
61}