async_std/stream/stream/
merge.rs1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use pin_project_lite::pin_project;
5
6use crate::stream::stream::StreamExt;
7use crate::stream::Fuse;
8use crate::stream::Stream;
9use crate::utils;
10
11pin_project! {
12 #[cfg(feature = "unstable")]
20 #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
21 #[derive(Debug)]
22 pub struct Merge<L, R> {
23 #[pin]
24 left: Fuse<L>,
25 #[pin]
26 right: Fuse<R>,
27 }
28}
29
30impl<L: Stream, R: Stream> Merge<L, R> {
31 pub(crate) fn new(left: L, right: R) -> Self {
32 Self {
33 left: left.fuse(),
34 right: right.fuse(),
35 }
36 }
37}
38
39impl<L, R, T> Stream for Merge<L, R>
40where
41 L: Stream<Item = T>,
42 R: Stream<Item = T>,
43{
44 type Item = T;
45
46 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47 let this = self.project();
48 if utils::random(2) == 0 {
49 poll_next_in_order(this.left, this.right, cx)
50 } else {
51 poll_next_in_order(this.right, this.left, cx)
52 }
53 }
54}
55
56fn poll_next_in_order<F, S, I>(
57 first: Pin<&mut F>,
58 second: Pin<&mut S>,
59 cx: &mut Context<'_>,
60) -> Poll<Option<I>>
61where
62 F: Stream<Item = I>,
63 S: Stream<Item = I>,
64{
65 match first.poll_next(cx) {
66 Poll::Ready(None) => second.poll_next(cx),
67 Poll::Ready(item) => Poll::Ready(item),
68 Poll::Pending => match second.poll_next(cx) {
69 Poll::Ready(None) | Poll::Pending => Poll::Pending,
70 Poll::Ready(item) => Poll::Ready(item),
71 },
72 }
73}