From ec528d62a2dd88c2f04223946e8b5e5c163fea81 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sat, 21 Sep 2019 15:01:31 +0200 Subject: [PATCH 1/3] Make async-std compile for wasm32-unknown-unknown --- .travis.yml | 8 + Cargo.toml | 6 +- src/net/driver/mod.rs | 2 + src/net/tcp/listener.rs | 7 +- src/net/tcp/stream.rs | 7 +- src/net/udp/mod.rs | 382 +++++++++++++++++++++++++++++++--------- 6 files changed, 320 insertions(+), 92 deletions(-) diff --git a/.travis.yml b/.travis.yml index d2862fcae..4115cb90c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,6 +32,14 @@ matrix: - rust: nightly-x86_64-pc-windows-msvc os: windows + - name: docs + rust: nightly + os: linux + before_script: | + rustup target add wasm32-unknown-unknown + script: + - cargo check --target wasm32-unknown-unknown --features unstable --all --benches --bins --examples --tests + - name: fmt rust: nightly os: linux diff --git a/Cargo.toml b/Cargo.toml index f768f8183..5e7fea009 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,13 +36,15 @@ futures-timer = "0.4.0" lazy_static = "1.4.0" log = { version = "0.4.8", features = ["kv_unstable"] } memchr = "2.2.1" -mio = "0.6.19" -mio-uds = "0.6.7" num_cpus = "1.10.1" pin-utils = "0.1.0-alpha.4" slab = "0.4.2" kv-log-macro = "1.0.4" +[target.'cfg(not(target_os = "unknown"))'.dependencies] +mio = "0.6.19" +mio-uds = "0.6.7" + [dev-dependencies] femme = "1.2.0" surf = "1.0.2" diff --git a/src/net/driver/mod.rs b/src/net/driver/mod.rs index 806acdbe4..759125872 100644 --- a/src/net/driver/mod.rs +++ b/src/net/driver/mod.rs @@ -1,3 +1,5 @@ +#![cfg(not(target_os = "unknown"))] + use std::fmt; use std::sync::{Arc, Mutex}; diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 4afa57451..d57918a42 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -6,10 +6,15 @@ use cfg_if::cfg_if; use super::TcpStream; use crate::future::{self, Future}; use crate::io; -use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; use crate::task::{Context, Poll}; +cfg_if! { + if #[cfg(not(target_os = "unknown"))] { + use crate::net::driver::Watcher; + } +} + /// A TCP socket server, listening for connections. /// /// After creating a `TcpListener` by [`bind`]ing it to a socket address, it listens for incoming diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 7c10602c7..b95a316ce 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -7,11 +7,16 @@ use futures_io::{AsyncRead, AsyncWrite}; use crate::future; use crate::io; -use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; use crate::task::blocking; use crate::task::{Context, Poll}; +cfg_if! { + if #[cfg(not(target_os = "unknown"))] { + use crate::net::driver::Watcher; + } +} + /// A TCP stream between a local and a remote socket. /// /// A `TcpStream` can either be created by connecting to an endpoint, via the [`connect`] method, diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index a750899d4..7fc36168d 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -4,10 +4,15 @@ use std::net::SocketAddr; use cfg_if::cfg_if; use std::net::{Ipv4Addr, Ipv6Addr}; -use crate::future; -use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; +cfg_if! { + if #[cfg(not(target_os = "unknown"))] { + use crate::future; + use crate::net::driver::Watcher; + } +} + /// A UDP socket. /// /// After creating a `UdpSocket` by [`bind`]ing it to a socket address, data can be [sent to] and @@ -46,7 +51,19 @@ use crate::net::ToSocketAddrs; /// ``` #[derive(Debug)] pub struct UdpSocket { - watcher: Watcher, + inner: Inner, +} + +cfg_if! { + if #[cfg(not(target_os = "unknown"))] { + #[derive(Debug)] + struct Inner { + watcher: Watcher + } + } else { + #[derive(Debug)] + struct Inner; + } } impl UdpSocket { @@ -69,25 +86,7 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub async fn bind(addr: A) -> io::Result { - let mut last_err = None; - - for addr in addr.to_socket_addrs().await? { - match mio::net::UdpSocket::bind(&addr) { - Ok(mio_socket) => { - return Ok(UdpSocket { - watcher: Watcher::new(mio_socket), - }); - } - Err(err) => last_err = Some(err), - } - } - - Err(last_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "could not resolve to any addresses", - ) - })) + bind(addr).await } /// Returns the local address that this listener is bound to. @@ -108,7 +107,7 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub fn local_addr(&self) -> io::Result { - self.watcher.get_ref().local_addr() + local_addr(self) } /// Sends data on the socket to the given address. @@ -138,21 +137,7 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub async fn send_to(&self, buf: &[u8], addrs: A) -> io::Result { - let addr = match addrs.to_socket_addrs().await?.next() { - Some(addr) => addr, - None => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "no addresses to send data to", - )); - } - }; - - future::poll_fn(|cx| { - self.watcher - .poll_write_with(cx, |inner| inner.send_to(buf, &addr)) - }) - .await + send_to(self, buf, addrs).await } /// Receives data from the socket. @@ -175,11 +160,7 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - future::poll_fn(|cx| { - self.watcher - .poll_read_with(cx, |inner| inner.recv_from(buf)) - }) - .await + recv_from(self, buf).await } /// Connects the UDP socket to a remote address. @@ -205,22 +186,7 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub async fn connect(&self, addrs: A) -> io::Result<()> { - let mut last_err = None; - - for addr in addrs.to_socket_addrs().await? { - // TODO(stjepang): connect on the blocking pool - match self.watcher.get_ref().connect(addr) { - Ok(()) => return Ok(()), - Err(err) => last_err = Some(err), - } - } - - Err(last_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "could not resolve to any addresses", - ) - })) + connect(self, addrs).await } /// Sends data on the socket to the given address. @@ -250,7 +216,7 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { - future::poll_fn(|cx| self.watcher.poll_write_with(cx, |inner| inner.send(buf))).await + send(self, buf).await } /// Receives data from the socket. @@ -273,7 +239,7 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { - future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.recv(buf))).await + recv(self, buf).await } /// Gets the value of the `SO_BROADCAST` option for this socket. @@ -282,14 +248,14 @@ impl UdpSocket { /// /// [`set_broadcast`]: #method.set_broadcast pub fn broadcast(&self) -> io::Result { - self.watcher.get_ref().broadcast() + broadcast(self) } /// Sets the value of the `SO_BROADCAST` option for this socket. /// /// When enabled, this socket is allowed to send packets to a broadcast address. pub fn set_broadcast(&self, on: bool) -> io::Result<()> { - self.watcher.get_ref().set_broadcast(on) + set_broadcast(self, on) } /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. @@ -298,7 +264,7 @@ impl UdpSocket { /// /// [`set_multicast_loop_v4`]: #method.set_multicast_loop_v4 pub fn multicast_loop_v4(&self) -> io::Result { - self.watcher.get_ref().multicast_loop_v4() + multicast_loop_v4(self) } /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. @@ -309,7 +275,7 @@ impl UdpSocket { /// /// This may not have any affect on IPv6 sockets. pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { - self.watcher.get_ref().set_multicast_loop_v4(on) + set_multicast_loop_v4(self, on) } /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. @@ -318,7 +284,7 @@ impl UdpSocket { /// /// [`set_multicast_ttl_v4`]: #method.set_multicast_ttl_v4 pub fn multicast_ttl_v4(&self) -> io::Result { - self.watcher.get_ref().multicast_ttl_v4() + multicast_ttl_v4(self) } /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. @@ -331,7 +297,7 @@ impl UdpSocket { /// /// This may not have any affect on IPv6 sockets. pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { - self.watcher.get_ref().set_multicast_ttl_v4(ttl) + set_multicast_ttl_v4(self, ttl) } /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. @@ -340,7 +306,7 @@ impl UdpSocket { /// /// [`set_multicast_loop_v6`]: #method.set_multicast_loop_v6 pub fn multicast_loop_v6(&self) -> io::Result { - self.watcher.get_ref().multicast_loop_v6() + multicast_loop_v6(self) } /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. @@ -351,7 +317,7 @@ impl UdpSocket { /// /// This may not have any affect on IPv4 sockets. pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { - self.watcher.get_ref().set_multicast_loop_v6(on) + set_multicast_loop_v6(self, on) } /// Gets the value of the `IP_TTL` option for this socket. @@ -360,7 +326,7 @@ impl UdpSocket { /// /// [`set_ttl`]: #method.set_ttl pub fn ttl(&self) -> io::Result { - self.watcher.get_ref().ttl() + ttl(self) } /// Sets the value for the `IP_TTL` option on this socket. @@ -368,7 +334,7 @@ impl UdpSocket { /// This value sets the time-to-live field that is used in every packet sent /// from this socket. pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.watcher.get_ref().set_ttl(ttl) + set_ttl(self, ttl) } /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. @@ -396,9 +362,7 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub fn join_multicast_v4(&self, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { - self.watcher - .get_ref() - .join_multicast_v4(multiaddr, interface) + join_multicast_v4(self, multiaddr, interface) } /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. @@ -425,9 +389,7 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.watcher - .get_ref() - .join_multicast_v6(multiaddr, interface) + join_multicast_v6(self, multiaddr, interface) } /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. @@ -436,9 +398,7 @@ impl UdpSocket { /// /// [`join_multicast_v4`]: #method.join_multicast_v4 pub fn leave_multicast_v4(&self, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { - self.watcher - .get_ref() - .leave_multicast_v4(multiaddr, interface) + leave_multicast_v4(self, multiaddr, interface) } /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. @@ -447,18 +407,264 @@ impl UdpSocket { /// /// [`join_multicast_v6`]: #method.join_multicast_v6 pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.watcher - .get_ref() - .leave_multicast_v6(multiaddr, interface) + leave_multicast_v6(self, multiaddr, interface) } } impl From for UdpSocket { /// Converts a `std::net::UdpSocket` into its asynchronous equivalent. fn from(socket: std::net::UdpSocket) -> UdpSocket { - let mio_socket = mio::net::UdpSocket::from_socket(socket).unwrap(); - UdpSocket { - watcher: Watcher::new(mio_socket), + from(socket) + } +} + +cfg_if! { + if #[cfg(not(target_os = "unknown"))] { + async fn bind(addr: A) -> io::Result { + let mut last_err = None; + + for addr in addr.to_socket_addrs().await? { + match mio::net::UdpSocket::bind(&addr) { + Ok(mio_socket) => { + return Ok(UdpSocket { + inner: Inner { + watcher: Watcher::new(mio_socket), + }, + }); + } + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any addresses", + ) + })) + } + + fn local_addr(socket: &UdpSocket) -> io::Result { + socket.inner.watcher.get_ref().local_addr() + } + + async fn send_to(socket: &UdpSocket, buf: &[u8], addrs: A) -> io::Result { + let addr = match addrs.to_socket_addrs().await?.next() { + Some(addr) => addr, + None => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "no addresses to send data to", + )); + } + }; + + future::poll_fn(|cx| { + socket.inner.watcher + .poll_write_with(cx, |inner| inner.send_to(buf, &addr)) + }) + .await + } + + async fn recv_from(socket: &UdpSocket, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + future::poll_fn(|cx| { + socket.inner.watcher + .poll_read_with(cx, |inner| inner.recv_from(buf)) + }) + .await + } + + async fn connect(socket: &UdpSocket, addrs: A) -> io::Result<()> { + let mut last_err = None; + + for addr in addrs.to_socket_addrs().await? { + // TODO(stjepang): connect on the blocking pool + match socket.inner.watcher.get_ref().connect(addr) { + Ok(()) => return Ok(()), + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any addresses", + ) + })) + } + + async fn send(socket: &UdpSocket, buf: &[u8]) -> io::Result { + future::poll_fn(|cx| socket.inner.watcher.poll_write_with(cx, |inner| inner.send(buf))).await + } + + async fn recv(socket: &UdpSocket, buf: &mut [u8]) -> io::Result { + future::poll_fn(|cx| socket.inner.watcher.poll_read_with(cx, |inner| inner.recv(buf))).await + } + + fn broadcast(socket: &UdpSocket) -> io::Result { + socket.inner.watcher.get_ref().broadcast() + } + + fn set_broadcast(socket: &UdpSocket, on: bool) -> io::Result<()> { + socket.inner.watcher.get_ref().set_broadcast(on) + } + + fn multicast_loop_v4(socket: &UdpSocket) -> io::Result { + socket.inner.watcher.get_ref().multicast_loop_v4() + } + + fn set_multicast_loop_v4(socket: &UdpSocket, on: bool) -> io::Result<()> { + socket.inner.watcher.get_ref().set_multicast_loop_v4(on) + } + + fn multicast_ttl_v4(socket: &UdpSocket) -> io::Result { + socket.inner.watcher.get_ref().multicast_ttl_v4() + } + + fn set_multicast_ttl_v4(socket: &UdpSocket, ttl: u32) -> io::Result<()> { + socket.inner.watcher.get_ref().set_multicast_ttl_v4(ttl) + } + + fn multicast_loop_v6(socket: &UdpSocket) -> io::Result { + socket.inner.watcher.get_ref().multicast_loop_v6() + } + + fn set_multicast_loop_v6(socket: &UdpSocket, on: bool) -> io::Result<()> { + socket.inner.watcher.get_ref().set_multicast_loop_v6(on) + } + + fn ttl(socket: &UdpSocket) -> io::Result { + socket.inner.watcher.get_ref().ttl() + } + + fn set_ttl(socket: &UdpSocket, ttl: u32) -> io::Result<()> { + socket.inner.watcher.get_ref().set_ttl(ttl) + } + + fn join_multicast_v4(socket: &UdpSocket, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { + socket.inner.watcher + .get_ref() + .join_multicast_v4(multiaddr, interface) + } + + fn join_multicast_v6(socket: &UdpSocket, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { + socket.inner.watcher + .get_ref() + .join_multicast_v6(multiaddr, interface) + } + + fn leave_multicast_v4(socket: &UdpSocket, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { + socket.inner.watcher + .get_ref() + .leave_multicast_v4(multiaddr, interface) + } + + fn leave_multicast_v6(socket: &UdpSocket, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { + socket.inner.watcher + .get_ref() + .leave_multicast_v6(multiaddr, interface) + } + + fn from(socket: std::net::UdpSocket) -> UdpSocket { + let mio_socket = mio::net::UdpSocket::from_socket(socket).unwrap(); + UdpSocket { + inner: Inner { + watcher: Watcher::new(mio_socket), + }, + } + } + + } else { + async fn bind(_: A) -> io::Result { + Err(io::Error::new( + io::ErrorKind::Other, + "UDP sockets unsupported on this platform", + )) + } + + fn local_addr(_: &UdpSocket) -> io::Result { + unreachable!() + } + + async fn send_to(_: &UdpSocket, _: &[u8], _: A) -> io::Result { + unreachable!() + } + + async fn recv_from(_: &UdpSocket, _: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + unreachable!() + } + + async fn connect(_: &UdpSocket, _: A) -> io::Result<()> { + unreachable!() + } + + async fn send(_: &UdpSocket, _: &[u8]) -> io::Result { + unreachable!() + } + + async fn recv(_: &UdpSocket, _: &mut [u8]) -> io::Result { + unreachable!() + } + + fn broadcast(_: &UdpSocket) -> io::Result { + unreachable!() + } + + fn set_broadcast(_: &UdpSocket, _: bool) -> io::Result<()> { + unreachable!() + } + + fn multicast_loop_v4(_: &UdpSocket) -> io::Result { + unreachable!() + } + + fn set_multicast_loop_v4(_: &UdpSocket, _: bool) -> io::Result<()> { + unreachable!() + } + + fn multicast_ttl_v4(_: &UdpSocket) -> io::Result { + unreachable!() + } + + fn set_multicast_ttl_v4(_: &UdpSocket, _: u32) -> io::Result<()> { + unreachable!() + } + + fn multicast_loop_v6(_: &UdpSocket) -> io::Result { + unreachable!() + } + + fn set_multicast_loop_v6(_: &UdpSocket, _: bool) -> io::Result<()> { + unreachable!() + } + + fn ttl(_: &UdpSocket) -> io::Result { + unreachable!() + } + + fn set_ttl(_: &UdpSocket, _: u32) -> io::Result<()> { + unreachable!() + } + + fn join_multicast_v4(_: &UdpSocket, _: &Ipv4Addr, _: &Ipv4Addr) -> io::Result<()> { + unreachable!() + } + + fn join_multicast_v6(_: &UdpSocket, _: &Ipv6Addr, _: u32) -> io::Result<()> { + unreachable!() + } + + fn leave_multicast_v4(_: &UdpSocket, _: &Ipv4Addr, _: &Ipv4Addr) -> io::Result<()> { + unreachable!() + } + + fn leave_multicast_v6(_: &UdpSocket, _: &Ipv6Addr, _: u32) -> io::Result<()> { + unreachable!() + } + + fn from(_: std::net::UdpSocket) -> UdpSocket { + // We can never successfully build a `std::net::UdpSocket` on an unknown OS. + unreachable!() } } } @@ -479,7 +685,7 @@ cfg_if! { if #[cfg(any(unix, feature = "docs"))] { impl AsRawFd for UdpSocket { fn as_raw_fd(&self) -> RawFd { - self.watcher.get_ref().as_raw_fd() + self.inner.watcher.get_ref().as_raw_fd() } } @@ -491,7 +697,7 @@ cfg_if! { impl IntoRawFd for UdpSocket { fn into_raw_fd(self) -> RawFd { - self.watcher.into_inner().into_raw_fd() + self.inner.watcher.into_inner().into_raw_fd() } } } From 015abf5ee4790f07e79b39267b9eee476cb4ca7c Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sat, 21 Sep 2019 15:11:50 +0200 Subject: [PATCH 2/3] Fix travis wasm build name --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 4115cb90c..2ce0befbd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,7 +32,7 @@ matrix: - rust: nightly-x86_64-pc-windows-msvc os: windows - - name: docs + - name: wasm rust: nightly os: linux before_script: | From 21e30b60bf4c756cf95c04525cdc4868125408d9 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 24 Sep 2019 09:33:15 +0200 Subject: [PATCH 3/3] Update TCP as well --- src/net/tcp/listener.rs | 135 ++++++++++++++------ src/net/tcp/stream.rs | 272 ++++++++++++++++++++++++++++++---------- 2 files changed, 304 insertions(+), 103 deletions(-) diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index d57918a42..da82e0ce5 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -4,14 +4,16 @@ use std::pin::Pin; use cfg_if::cfg_if; use super::TcpStream; -use crate::future::{self, Future}; +use crate::future::Future; use crate::io; use crate::net::ToSocketAddrs; use crate::task::{Context, Poll}; cfg_if! { if #[cfg(not(target_os = "unknown"))] { + use crate::future; use crate::net::driver::Watcher; + use super::stream::Inner as TcpStreamInner; } } @@ -54,7 +56,19 @@ cfg_if! { /// ``` #[derive(Debug)] pub struct TcpListener { - watcher: Watcher, + inner: Inner, +} + +cfg_if! { + if #[cfg(not(target_os = "unknown"))] { + #[derive(Debug)] + struct Inner { + watcher: Watcher, + } + } else { + #[derive(Debug)] + struct Inner; + } } impl TcpListener { @@ -80,25 +94,7 @@ impl TcpListener { /// /// [`local_addr`]: #method.local_addr pub async fn bind(addrs: A) -> io::Result { - let mut last_err = None; - - for addr in addrs.to_socket_addrs().await? { - match mio::net::TcpListener::bind(&addr) { - Ok(mio_listener) => { - return Ok(TcpListener { - watcher: Watcher::new(mio_listener), - }); - } - Err(err) => last_err = Some(err), - } - } - - Err(last_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "could not resolve to any addresses", - ) - })) + bind(addrs).await } /// Accepts a new incoming connection to this listener. @@ -118,15 +114,7 @@ impl TcpListener { /// # Ok(()) }) } /// ``` pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { - let (io, addr) = - future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.accept_std())) - .await?; - - let mio_stream = mio::net::TcpStream::from_stream(io)?; - let stream = TcpStream { - watcher: Watcher::new(mio_stream), - }; - Ok((stream, addr)) + accept(self).await } /// Returns a stream of incoming connections. @@ -177,7 +165,7 @@ impl TcpListener { /// # Ok(()) }) } /// ``` pub fn local_addr(&self) -> io::Result { - self.watcher.get_ref().local_addr() + local_addr(self) } } @@ -210,10 +198,7 @@ impl<'a> futures_core::stream::Stream for Incoming<'a> { impl From for TcpListener { /// Converts a `std::net::TcpListener` into its asynchronous equivalent. fn from(listener: std::net::TcpListener) -> TcpListener { - let mio_listener = mio::net::TcpListener::from_std(listener).unwrap(); - TcpListener { - watcher: Watcher::new(mio_listener), - } + from(listener) } } @@ -233,7 +218,7 @@ cfg_if! { if #[cfg(any(unix, feature = "docs"))] { impl AsRawFd for TcpListener { fn as_raw_fd(&self) -> RawFd { - self.watcher.get_ref().as_raw_fd() + self.inner.watcher.get_ref().as_raw_fd() } } @@ -245,7 +230,7 @@ cfg_if! { impl IntoRawFd for TcpListener { fn into_raw_fd(self) -> RawFd { - self.watcher.into_inner().into_raw_fd() + self.inner.watcher.into_inner().into_raw_fd() } } } @@ -273,3 +258,79 @@ cfg_if! { // } } } + +cfg_if! { + if #[cfg(not(target_os = "unknown"))] { + async fn bind(addrs: A) -> io::Result { + let mut last_err = None; + + for addr in addrs.to_socket_addrs().await? { + match mio::net::TcpListener::bind(&addr) { + Ok(mio_listener) => { + return Ok(TcpListener { + inner: Inner { + watcher: Watcher::new(mio_listener), + }, + }); + } + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any addresses", + ) + })) + } + + async fn accept(listener: &TcpListener) -> io::Result<(TcpStream, SocketAddr)> { + let (io, addr) = + future::poll_fn(|cx| listener.inner.watcher.poll_read_with(cx, |inner| inner.accept_std())) + .await?; + + let mio_stream = mio::net::TcpStream::from_stream(io)?; + let stream = TcpStream { + inner: TcpStreamInner { + watcher: Watcher::new(mio_stream), + }, + }; + Ok((stream, addr)) + } + + fn local_addr(listener: &TcpListener) -> io::Result { + listener.inner.watcher.get_ref().local_addr() + } + + fn from(listener: std::net::TcpListener) -> TcpListener { + let mio_listener = mio::net::TcpListener::from_std(listener).unwrap(); + TcpListener { + inner: Inner { + watcher: Watcher::new(mio_listener), + }, + } + } + + } else { + async fn bind(_: A) -> io::Result { + Err(io::Error::new( + io::ErrorKind::Other, + "TCP sockets unsupported on this platform", + )) + } + + async fn accept(_: &TcpListener) -> io::Result<(TcpStream, SocketAddr)> { + unreachable!() + } + + fn local_addr(_: &TcpListener) -> io::Result { + unreachable!() + } + + fn from(_: std::net::TcpListener) -> TcpListener { + // We can never successfully build a `std::net::TcpListener` on an unknown OS. + unreachable!() + } + } +} diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index b95a316ce..0032d83eb 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -1,19 +1,21 @@ -use std::io::{IoSlice, IoSliceMut, Read as _, Write as _}; +use std::io::{IoSlice, IoSliceMut}; use std::net::SocketAddr; use std::pin::Pin; use cfg_if::cfg_if; use futures_io::{AsyncRead, AsyncWrite}; -use crate::future; use crate::io; use crate::net::ToSocketAddrs; -use crate::task::blocking; use crate::task::{Context, Poll}; cfg_if! { if #[cfg(not(target_os = "unknown"))] { + use std::io::{Read as _, Write as _}; + + use crate::future; use crate::net::driver::Watcher; + use crate::task::blocking; } } @@ -55,7 +57,19 @@ cfg_if! { /// ``` #[derive(Debug)] pub struct TcpStream { - pub(super) watcher: Watcher, + pub(super) inner: Inner, +} + +cfg_if! { + if #[cfg(not(target_os = "unknown"))] { + #[derive(Debug)] + pub(super) struct Inner { + pub(super) watcher: Watcher, + } + } else { + #[derive(Debug)] + pub(super) struct Inner; + } } impl TcpStream { @@ -79,30 +93,7 @@ impl TcpStream { /// # Ok(()) }) } /// ``` pub async fn connect(addrs: A) -> io::Result { - let mut last_err = None; - - for addr in addrs.to_socket_addrs().await? { - let res = blocking::spawn(async move { - let std_stream = std::net::TcpStream::connect(addr)?; - let mio_stream = mio::net::TcpStream::from_stream(std_stream)?; - Ok(TcpStream { - watcher: Watcher::new(mio_stream), - }) - }) - .await; - - match res { - Ok(stream) => return Ok(stream), - Err(err) => last_err = Some(err), - } - } - - Err(last_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "could not resolve to any addresses", - ) - })) + connect(addrs).await } /// Returns the local address that this stream is connected to. @@ -120,7 +111,7 @@ impl TcpStream { /// # Ok(()) }) } /// ``` pub fn local_addr(&self) -> io::Result { - self.watcher.get_ref().local_addr() + local_addr(self) } /// Returns the remote address that this stream is connected to. @@ -138,7 +129,7 @@ impl TcpStream { /// # Ok(()) }) } /// ``` pub fn peer_addr(&self) -> io::Result { - self.watcher.get_ref().peer_addr() + peer_addr(self) } /// Gets the value of the `IP_TTL` option for this socket. @@ -162,7 +153,7 @@ impl TcpStream { /// # Ok(()) }) } /// ``` pub fn ttl(&self) -> io::Result { - self.watcher.get_ref().ttl() + ttl(self) } /// Sets the value for the `IP_TTL` option on this socket. @@ -185,7 +176,7 @@ impl TcpStream { /// # Ok(()) }) } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.watcher.get_ref().set_ttl(ttl) + set_ttl(self, ttl) } /// Receives data on the socket from the remote address to which it is connected, without @@ -211,7 +202,7 @@ impl TcpStream { /// # Ok(()) }) } /// ``` pub async fn peek(&self, buf: &mut [u8]) -> io::Result { - future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.peek(buf))).await + peek(self, buf).await } /// Gets the value of the `TCP_NODELAY` option on this socket. @@ -235,7 +226,7 @@ impl TcpStream { /// # Ok(()) }) } /// ``` pub fn nodelay(&self) -> io::Result { - self.watcher.get_ref().nodelay() + nodelay(self) } /// Sets the value of the `TCP_NODELAY` option on this socket. @@ -261,7 +252,7 @@ impl TcpStream { /// # Ok(()) }) } /// ``` pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { - self.watcher.get_ref().set_nodelay(nodelay) + set_nodelay(self, nodelay) } /// Shuts down the read, write, or both halves of this connection. @@ -286,7 +277,7 @@ impl TcpStream { /// # Ok(()) }) } /// ``` pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> { - self.watcher.get_ref().shutdown(how) + shutdown(self, how) } } @@ -308,16 +299,6 @@ impl AsyncRead for TcpStream { } } -impl AsyncRead for &TcpStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - self.watcher.poll_read_with(cx, |mut inner| inner.read(buf)) - } -} - impl AsyncWrite for TcpStream { fn poll_write( self: Pin<&mut Self>, @@ -344,32 +325,72 @@ impl AsyncWrite for TcpStream { } } -impl AsyncWrite for &TcpStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.watcher - .poll_write_with(cx, |mut inner| inner.write(buf)) - } +cfg_if! { + if #[cfg(not(target_os = "unknown"))] { + impl AsyncRead for &TcpStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.inner.watcher.poll_read_with(cx, |mut inner| inner.read(buf)) + } + } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.watcher.poll_write_with(cx, |mut inner| inner.flush()) - } + impl AsyncWrite for &TcpStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.inner.watcher + .poll_write_with(cx, |mut inner| inner.write(buf)) + } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.watcher.poll_write_with(cx, |mut inner| inner.flush()) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + } + + } else { + impl AsyncRead for &TcpStream { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + _: &mut [u8], + ) -> Poll> { + unreachable!() + } + } + + impl AsyncWrite for &TcpStream { + fn poll_write( + self: Pin<&mut Self>, + _: &mut Context<'_>, + _: &[u8], + ) -> Poll> { + unreachable!() + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + unreachable!() + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + unreachable!() + } + } } } impl From for TcpStream { /// Converts a `std::net::TcpStream` into its asynchronous equivalent. fn from(stream: std::net::TcpStream) -> TcpStream { - let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap(); - TcpStream { - watcher: Watcher::new(mio_stream), - } + from(stream) } } @@ -389,7 +410,7 @@ cfg_if! { if #[cfg(any(unix, feature = "docs"))] { impl AsRawFd for TcpStream { fn as_raw_fd(&self) -> RawFd { - self.watcher.get_ref().as_raw_fd() + self.inner.watcher.get_ref().as_raw_fd() } } @@ -401,7 +422,7 @@ cfg_if! { impl IntoRawFd for TcpStream { fn into_raw_fd(self) -> RawFd { - self.watcher.into_inner().into_raw_fd() + self.inner.watcher.into_inner().into_raw_fd() } } } @@ -429,3 +450,122 @@ cfg_if! { // } } } + +cfg_if! { + if #[cfg(not(target_os = "unknown"))] { + async fn connect(addrs: A) -> io::Result { + let mut last_err = None; + + for addr in addrs.to_socket_addrs().await? { + let res = blocking::spawn(async move { + let std_stream = std::net::TcpStream::connect(addr)?; + let mio_stream = mio::net::TcpStream::from_stream(std_stream)?; + Ok(TcpStream { + inner: Inner { + watcher: Watcher::new(mio_stream), + }, + }) + }) + .await; + + match res { + Ok(stream) => return Ok(stream), + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any addresses", + ) + })) + } + + fn local_addr(socket: &TcpStream) -> io::Result { + socket.inner.watcher.get_ref().local_addr() + } + + fn peer_addr(socket: &TcpStream) -> io::Result { + socket.inner.watcher.get_ref().peer_addr() + } + + fn ttl(socket: &TcpStream) -> io::Result { + socket.inner.watcher.get_ref().ttl() + } + + fn set_ttl(socket: &TcpStream, ttl: u32) -> io::Result<()> { + socket.inner.watcher.get_ref().set_ttl(ttl) + } + + async fn peek(socket: &TcpStream, buf: &mut [u8]) -> io::Result { + future::poll_fn(|cx| socket.inner.watcher.poll_read_with(cx, |inner| inner.peek(buf))).await + } + + fn nodelay(socket: &TcpStream) -> io::Result { + socket.inner.watcher.get_ref().nodelay() + } + + fn set_nodelay(socket: &TcpStream, nodelay: bool) -> io::Result<()> { + socket.inner.watcher.get_ref().set_nodelay(nodelay) + } + + fn shutdown(socket: &TcpStream, how: std::net::Shutdown) -> std::io::Result<()> { + socket.inner.watcher.get_ref().shutdown(how) + } + + fn from(stream: std::net::TcpStream) -> TcpStream { + let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap(); + TcpStream { + inner: Inner { + watcher: Watcher::new(mio_stream), + }, + } + } + + } else { + async fn connect(_: A) -> io::Result { + Err(io::Error::new( + io::ErrorKind::Other, + "TCP sockets unsupported on this platform", + )) + } + + fn local_addr(_: &TcpStream) -> io::Result { + unreachable!() + } + + fn peer_addr(_: &TcpStream) -> io::Result { + unreachable!() + } + + fn ttl(_: &TcpStream) -> io::Result { + unreachable!() + } + + fn set_ttl(_: &TcpStream, _: u32) -> io::Result<()> { + unreachable!() + } + + async fn peek(_: &TcpStream, _: &mut [u8]) -> io::Result { + unreachable!() + } + + fn nodelay(_: &TcpStream) -> io::Result { + unreachable!() + } + + fn set_nodelay(_: &TcpStream, _: bool) -> io::Result<()> { + unreachable!() + } + + fn shutdown(_: &TcpStream, _: std::net::Shutdown) -> std::io::Result<()> { + unreachable!() + } + + fn from(_: std::net::TcpStream) -> TcpStream { + // We can never successfully build a `std::net::TcpStream` on an unknown OS. + unreachable!() + } + } +}