diff --git a/Cargo.toml b/Cargo.toml
index f768f8183..80817fb8a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,8 +21,8 @@ features = ["docs"]
 rustdoc-args = ["--cfg", "feature=\"docs\""]
 
 [features]
-docs = []
-unstable = []
+docs = ["futures-channel-preview"]
+unstable = ["futures-channel-preview"]
 
 [dependencies]
 async-macros = "1.0.0"
@@ -30,6 +30,7 @@ async-task = "1.0.0"
 cfg-if = "0.1.9"
 crossbeam-channel = "0.3.9"
 crossbeam-deque = "0.7.1"
+futures-channel-preview = { version = "0.3.0-alpha.18", optional = true }
 futures-core-preview = "0.3.0-alpha.18"
 futures-io-preview = "0.3.0-alpha.18"
 futures-timer = "0.4.0"
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index 65992eb82..e41f4683e 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -71,6 +71,9 @@ use cfg_if::cfg_if;
 
 cfg_if! {
     if #[cfg(any(feature = "unstable", feature = "docs"))] {
+        mod unzip;
+        use unzip::UnzipSendFuture;
+
         use crate::stream::FromStream;
     }
 }
@@ -929,6 +932,47 @@ pub trait Stream {
     {
         FromStream::from_stream(self)
     }
+
+    /// Converts a stream of pairs into a pair of containers.
+    ///
+    /// ## Examples
+    ///
+    /// ```
+    /// # fn main() { async_std::task::block_on(async {
+    /// #
+    /// use std::collections::VecDeque;
+    /// use async_std::stream::Stream;
+    ///
+    /// let s: VecDeque<(usize, usize)> = vec![(1, 4), (2, 5), (3, 6)].into_iter().collect();
+    /// let (a, b): (Vec<_>, Vec<_>) = s.unzip().await;
+    ///
+    /// assert_eq!(a, vec![1, 2, 3]);
+    /// assert_eq!(b, vec![4, 5, 6]);
+    /// #
+    /// # }) }
+    /// ```
+    #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
+    #[cfg(any(feature = "unstable", feature = "docs"))]
+    fn unzip<'a, A, B, FromA, FromB>(self) -> dyn_ret!('a, (FromA, FromB))
+    where
+        Self: 'a + Stream<Item = (A, B)> + Send + Sized,
+        FromA: 'a + FromStream<A> + Send,
+        FromB: 'a + FromStream<B> + Send,
+        A: 'a + Send,
+        B: 'a + Send,
+    {
+        let (tx_a, rx_a) = futures_channel::mpsc::channel(0);
+        let (tx_b, rx_b) = futures_channel::mpsc::channel(0);
+
+        let a_fut = FromA::from_stream(rx_a);
+        let b_fut = FromB::from_stream(rx_b);
+        let send_fut = UnzipSendFuture::new(self, tx_a, tx_b);
+
+        Box::pin(async move {
+            let (a, b, _): (_, _, ()) = async_macros::join!(a_fut, b_fut, send_fut).await;
+            (a, b)
+        })
+    }
 }
 
 impl<T: futures_core::stream::Stream + ?Sized> Stream for T {
diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs
new file mode 100644
index 000000000..3c8eaf0fa
--- /dev/null
+++ b/src/stream/stream/unzip.rs
@@ -0,0 +1,66 @@
+use std::pin::Pin;
+
+use futures_channel::mpsc::Sender;
+
+use crate::stream::Stream;
+use crate::task::{Context, Poll};
+
+pub(crate) struct UnzipSendFuture<S, A, B> {
+    stream: S,
+    a: (Option<A>, Sender<A>),
+    b: (Option<B>, Sender<B>),
+}
+
+impl<S, A, B> UnzipSendFuture<S, A, B> {
+    pub(crate) fn new(stream: S, tx_a: Sender<A>, tx_b: Sender<B>) -> Self {
+        UnzipSendFuture {
+            stream,
+            a: (None, tx_a),
+            b: (None, tx_b),
+        }
+    }
+
+    pin_utils::unsafe_pinned!(stream: S);
+    pin_utils::unsafe_unpinned!(a: (Option<A>, Sender<A>));
+    pin_utils::unsafe_unpinned!(b: (Option<B>, Sender<B>));
+}
+
+impl<S, A, B> crate::future::Future for UnzipSendFuture<S, A, B>
+where
+    S: Stream<Item = (A, B)>,
+{
+    type Output = ();
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+        // hand-written send-join
+        let (item, tx) = self.as_mut().a();
+        if item.is_some() {
+            let poll_result = tx.poll_ready(cx);
+            if poll_result.is_ready() {
+                let item = item.take().unwrap();
+                let _ = tx.start_send(item);
+            }
+        }
+        let (item, tx) = self.as_mut().b();
+        if item.is_some() {
+            let poll_result = tx.poll_ready(cx);
+            if poll_result.is_ready() {
+                let item = item.take().unwrap();
+                let _ = tx.start_send(item);
+            }
+        }
+        if self.as_mut().a().0.is_some() || self.as_mut().b().0.is_some() {
+            return Poll::Pending;
+        }
+
+        let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));
+        if let Some((a, b)) = item {
+            self.as_mut().a().0 = Some(a);
+            self.as_mut().b().0 = Some(b);
+            cx.waker().wake_by_ref();
+            Poll::Pending
+        } else {
+            Poll::Ready(())
+        }
+    }
+}