async_std/stream/stream/
merge.rs

1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use pin_project_lite::pin_project;
5
6use crate::stream::stream::StreamExt;
7use crate::stream::Fuse;
8use crate::stream::Stream;
9use crate::utils;
10
11pin_project! {
12    /// A stream that merges two other streams into a single stream.
13    ///
14    /// This `struct` is created by the [`merge`] method on [`Stream`]. See its
15    /// documentation for more.
16    ///
17    /// [`merge`]: trait.Stream.html#method.merge
18    /// [`Stream`]: trait.Stream.html
19    #[cfg(feature = "unstable")]
20    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
21    #[derive(Debug)]
22    pub struct Merge<L, R> {
23        #[pin]
24        left: Fuse<L>,
25        #[pin]
26        right: Fuse<R>,
27    }
28}
29
30impl<L: Stream, R: Stream> Merge<L, R> {
31    pub(crate) fn new(left: L, right: R) -> Self {
32        Self {
33            left: left.fuse(),
34            right: right.fuse(),
35        }
36    }
37}
38
39impl<L, R, T> Stream for Merge<L, R>
40where
41    L: Stream<Item = T>,
42    R: Stream<Item = T>,
43{
44    type Item = T;
45
46    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47        let this = self.project();
48        if utils::random(2) == 0 {
49            poll_next_in_order(this.left, this.right, cx)
50        } else {
51            poll_next_in_order(this.right, this.left, cx)
52        }
53    }
54}
55
56fn poll_next_in_order<F, S, I>(
57    first: Pin<&mut F>,
58    second: Pin<&mut S>,
59    cx: &mut Context<'_>,
60) -> Poll<Option<I>>
61where
62    F: Stream<Item = I>,
63    S: Stream<Item = I>,
64{
65    match first.poll_next(cx) {
66        Poll::Ready(None) => second.poll_next(cx),
67        Poll::Ready(item) => Poll::Ready(item),
68        Poll::Pending => match second.poll_next(cx) {
69            Poll::Ready(None) | Poll::Pending => Poll::Pending,
70            Poll::Ready(item) => Poll::Ready(item),
71        },
72    }
73}