Skip to content

Commit b142824

Browse files
Add Future::join and Future::try_join
1 parent ab2f64c commit b142824

File tree

3 files changed

+222
-0
lines changed

3 files changed

+222
-0
lines changed

src/future/future/join.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use std::pin::Pin;
2+
3+
use async_macros::MaybeDone;
4+
use pin_project_lite::pin_project;
5+
6+
use crate::task::{Context, Poll};
7+
use std::future::Future;
8+
9+
pin_project! {
10+
#[allow(missing_docs)]
11+
#[allow(missing_debug_implementations)]
12+
pub struct Join<L, R>
13+
where
14+
L: Future,
15+
R: Future<Output = L::Output>
16+
{
17+
#[pin] left: MaybeDone<L>,
18+
#[pin] right: MaybeDone<R>,
19+
}
20+
}
21+
22+
impl<L, R> Join<L, R>
23+
where
24+
L: Future,
25+
R: Future<Output = L::Output>,
26+
{
27+
pub(crate) fn new(left: L, right: R) -> Self {
28+
Self {
29+
left: MaybeDone::new(left),
30+
right: MaybeDone::new(right),
31+
}
32+
}
33+
}
34+
35+
impl<L, R> Future for Join<L, R>
36+
where
37+
L: Future,
38+
R: Future<Output = L::Output>,
39+
{
40+
type Output = (L::Output, R::Output);
41+
42+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
43+
let this = self.project();
44+
45+
let mut left = this.left;
46+
let mut right = this.right;
47+
48+
if Future::poll(Pin::new(&mut left), cx).is_ready() {
49+
if right.as_ref().output().is_some() {
50+
return Poll::Ready((left.take().unwrap(), right.take().unwrap()));
51+
}
52+
}
53+
54+
if Future::poll(Pin::new(&mut right), cx).is_ready() {
55+
if left.as_ref().output().is_some() {
56+
return Poll::Ready((left.take().unwrap(), right.take().unwrap()));
57+
}
58+
}
59+
60+
Poll::Pending
61+
}
62+
}

src/future/future/mod.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ cfg_unstable! {
33
mod flatten;
44
mod race;
55
mod try_race;
6+
mod join;
7+
mod try_join;
68

79
use std::time::Duration;
810

@@ -11,6 +13,8 @@ cfg_unstable! {
1113
use crate::future::IntoFuture;
1214
use race::Race;
1315
use try_race::TryRace;
16+
use join::Join;
17+
use try_join::TryJoin;
1418
}
1519

1620
extension_trait! {
@@ -264,6 +268,90 @@ extension_trait! {
264268
{
265269
TryRace::new(self, other)
266270
}
271+
272+
#[doc = r#"
273+
Waits for two similarly-typed futures to complete.
274+
275+
Awaits multiple futures simultaneously, returning the output of the
276+
futures once both complete.
277+
278+
This function returns a new future which polls both futures
279+
concurrently.
280+
281+
# Examples
282+
283+
```
284+
# async_std::task::block_on(async {
285+
use async_std::prelude::*;
286+
use async_std::future;
287+
288+
let a = future::ready(1u8);
289+
let b = future::ready(2u8);
290+
291+
let f = a.join(b);
292+
assert_eq!(f.await, (1u8, 2u8));
293+
# });
294+
```
295+
"#]
296+
#[cfg(any(feature = "unstable", feature = "docs"))]
297+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
298+
fn join<F>(
299+
self,
300+
other: F
301+
) -> impl Future<Output = (<Self as std::future::Future>::Output, <F as std::future::Future>::Output)> [Join<Self, F>]
302+
where
303+
Self: std::future::Future + Sized,
304+
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
305+
{
306+
Join::new(self, other)
307+
}
308+
309+
#[doc = r#"
310+
Waits for two similarly-typed fallible futures to complete.
311+
312+
Awaits multiple futures simultaneously, returning all results once
313+
complete.
314+
315+
`try_join` is similar to [`join`], but returns an error immediately
316+
if a future resolves to an error.
317+
318+
[`join`]: #method.join
319+
320+
# Examples
321+
322+
```
323+
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
324+
#
325+
use async_std::prelude::*;
326+
use async_std::future;
327+
328+
let a = future::ready(Err("Error"));
329+
let b = future::ready(Ok(1u8));
330+
331+
let f = a.try_join(b);
332+
assert_eq!(f.await, Err("Error"));
333+
334+
let a = future::ready(Ok::<u8, String>(1u8));
335+
let b = future::ready(Ok::<u8, String>(2u8));
336+
337+
let f = a.try_join(b);
338+
assert_eq!(f.await, Ok((1u8, 2u8)));
339+
#
340+
# Ok(()) }) }
341+
```
342+
"#]
343+
#[cfg(any(feature = "unstable", feature = "docs"))]
344+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
345+
fn try_join<F, T, E>(
346+
self,
347+
other: F
348+
) -> impl Future<Output = Result<(T, T), E>> [TryJoin<Self, F>]
349+
where
350+
Self: std::future::Future<Output = Result<T, E>> + Sized,
351+
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
352+
{
353+
TryJoin::new(self, other)
354+
}
267355
}
268356

269357
impl<F: Future + Unpin + ?Sized> Future for Box<F> {

src/future/future/try_join.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
use std::pin::Pin;
2+
3+
use async_macros::MaybeDone;
4+
use pin_project_lite::pin_project;
5+
6+
use crate::task::{Context, Poll};
7+
use std::future::Future;
8+
9+
pin_project! {
10+
#[allow(missing_docs)]
11+
#[allow(missing_debug_implementations)]
12+
pub struct TryJoin<L, R>
13+
where
14+
L: Future,
15+
R: Future<Output = L::Output>
16+
{
17+
#[pin] left: MaybeDone<L>,
18+
#[pin] right: MaybeDone<R>,
19+
}
20+
}
21+
22+
impl<L, R> TryJoin<L, R>
23+
where
24+
L: Future,
25+
R: Future<Output = L::Output>,
26+
{
27+
pub(crate) fn new(left: L, right: R) -> Self {
28+
Self {
29+
left: MaybeDone::new(left),
30+
right: MaybeDone::new(right),
31+
}
32+
}
33+
}
34+
35+
impl<L, R, T, E> Future for TryJoin<L, R>
36+
where
37+
L: Future<Output = Result<T, E>>,
38+
R: Future<Output = L::Output>,
39+
{
40+
type Output = Result<(T, T), E>;
41+
42+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
43+
let this = self.project();
44+
45+
let mut left = this.left;
46+
let mut right = this.right;
47+
48+
if Future::poll(Pin::new(&mut left), cx).is_ready() {
49+
if left.as_ref().output().unwrap().is_err() {
50+
return Poll::Ready(Err(left.take().unwrap().err().unwrap()));
51+
} else if right.as_ref().output().is_some() {
52+
return Poll::Ready(Ok((
53+
left.take().unwrap().ok().unwrap(),
54+
right.take().unwrap().ok().unwrap(),
55+
)));
56+
}
57+
}
58+
59+
if Future::poll(Pin::new(&mut right), cx).is_ready() {
60+
if right.as_ref().output().unwrap().is_err() {
61+
return Poll::Ready(Err(right.take().unwrap().err().unwrap()));
62+
} else if left.as_ref().output().is_some() {
63+
return Poll::Ready(Ok((
64+
left.take().unwrap().ok().unwrap(),
65+
right.take().unwrap().ok().unwrap(),
66+
)));
67+
}
68+
}
69+
70+
Poll::Pending
71+
}
72+
}

0 commit comments

Comments
 (0)