diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5a717826f..a3eb933dc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, windows-latest, macOS-latest] - rust: [nightly, beta, stable] + rust: [nightly, beta, stable, 1.63] steps: - uses: actions/checkout@v3 @@ -108,6 +108,21 @@ jobs: with: command: check args: --all --features tokio02 + + check_io_safety_feature: + name: Check io_safety feature + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, windows-latest] + steps: + - uses: actions/checkout@v3 + - name: check io_safety + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --features io_safety + cross: name: Cross compile @@ -117,8 +132,8 @@ jobs: target: - i686-unknown-linux-gnu - powerpc-unknown-linux-gnu -# - powerpc64-unknown-linux-gnu - - mips-unknown-linux-gnu + - powerpc64-unknown-linux-gnu +# - mips-unknown-linux-gnu - arm-linux-androideabi steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index e8a0a0a76..ebb3ffd80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,25 @@ All notable changes to async-std will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://book.async.rs/overview/stability-guarantees.html). +# [1.13.1] - 2025-02-21 + +`async-std` has officially been discontinued. We recommend that all users and +libraries migrate to the excellent [`smol`](https://github.com/smol-rs/smol/) +project. + +# [1.13.0] - 2024-09-06 + +## Added +- IO Safety traits implementations + +## Changed +- Various dependencies updates +- Export `BufReadExt` and `SeekExt` from `async_std::io` + # [1.12.0] - 2022-06-18 ## Added -- `std::task_spawn_blocking` is now stabilized. We consider it a fundamental API for bridging between blocking code and async code, and we widely use it within async-std's own implementation. +- `async_std::task::spawn_blocking` is now stabilized. We consider it a fundamental API for bridging between blocking code and async code, and we widely use it within async-std's own implementation. - Add `TryFrom` implementations to convert `TcpListener`, `TcpStream`, `UdpSocket`, `UnixDatagram`, `UnixListener`, and `UnixStream` to their synchronous equivalents, including putting them back into blocking mode. ## Changed @@ -61,7 +76,7 @@ deprecated `sync::channel` types, and introduces the `tokio1` feature. As part of our `1.8.0` release last month we introduced the new `async_std::channel` submodule and deprecated the unstable -`async_std::sync::channel` types. You can read our full motiviation for this +`async_std::sync::channel` types. You can read our full motivation for this change in the last patch notes. But the short version is that the old channels had some fundamental problems, and the `sync` submodule is a bit of a mess. @@ -298,7 +313,7 @@ Including improved performance, stability, and the addition of various - Fixed documentation for `UdpSocket::send` ([#671](https://github.com/async-rs/async-std/pull/671)) - Fixed typo in stream documentation ([#650](https://github.com/async-rs/async-std/pull/650)) - Fixed typo on `sync::JoinHandle` documentation ([#659](https://github.com/async-rs/async-std/pull/659)) -- Removed use of `std::error::Error::description` which failed CI ([#661](https://github.com/async-rs/async-std/pull/662)) +- Removed use of `std::error::Error::description` which failed CI ([#661](https://github.com/async-rs/async-std/pull/661)) - Removed the use of rustfmt's unstable `format_code_in_doc_comments` option which failed CI ([#685](https://github.com/async-rs/async-std/pull/685)) - Fixed a code typo in the `task::sleep` example ([#688](https://github.com/async-rs/async-std/pull/688)) @@ -686,9 +701,9 @@ use async_std::prelude::*; use async_std::task; task::spawn(async { - let x = fibonnacci(1000); // Do expensive work + let x = fibonacci(1000); // Do expensive work task::yield_now().await; // Allow other tasks to run - x + fibonnacci(100) // Do more work + x + fibonacci(100) // Do more work }) ``` diff --git a/Cargo.toml b/Cargo.toml index 9a7a3fe2a..fed440172 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async-std" -version = "1.12.0" +version = "1.13.1" authors = [ "Stjepan Glavina ", "Yoshua Wuyts ", @@ -8,10 +8,11 @@ authors = [ "Contributors to async-std", ] edition = "2018" -license = "Apache-2.0/MIT" +rust-version = "1.63" +license = "Apache-2.0 OR MIT" repository = "https://github.com/async-rs/async-std" homepage = "https://async.rs" -description = "Async version of the Rust standard library" +description = "Deprecated in favor of `smol` - Async version of the Rust standard library" keywords = ["async", "await", "future", "std", "task"] categories = ["asynchronous", "concurrency", "network-programming"] @@ -58,10 +59,11 @@ alloc = [ tokio1 = ["async-global-executor/tokio"] tokio02 = ["async-global-executor/tokio02"] tokio03 = ["async-global-executor/tokio03"] +io_safety = [] [dependencies] -async-attributes = { version = "1.1.1", optional = true } -async-lock = { version = "2.3.0", optional = true } +async-attributes = { version = "1.1.2", optional = true } +async-lock = { version = "3.1.0", optional = true } crossbeam-utils = { version = "0.8.0", optional = true } futures-core = { version = "0.3.4", optional = true, default-features = false } futures-io = { version = "0.3.4", optional = true } @@ -72,20 +74,20 @@ once_cell = { version = "1.3.1", optional = true } pin-project-lite = { version = "0.2.0", optional = true } pin-utils = { version = "0.1.0-alpha.4", optional = true } slab = { version = "0.4.2", optional = true } -async-channel = { version = "1.5.1", optional = true } +async-channel = { version = "1.8.0", optional = true } # dev dependency, but they are not allowed to be optional :/ surf = { version = "2.0.0", optional = true } [target.'cfg(not(target_os = "unknown"))'.dependencies] -async-global-executor = { version = "2.0.0", optional = true, features = ["async-io"] } -async-io = { version = "1.0.1", optional = true } -futures-lite = { version = "1.0.0", optional = true } -async-process = { version = "1.0.1", optional = true } +async-global-executor = { version = "2.4.0", optional = true, features = ["async-io"] } +async-io = { version = "2.2.0", optional = true } +futures-lite = { version = "2.0.0", optional = true } +async-process = { version = "2.0.0", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] -gloo-timers = { version = "0.2.1", features = ["futures"], optional = true } +gloo-timers = { version = "0.3.0", features = ["futures"], optional = true } wasm-bindgen-futures = { version = "0.4.10", optional = true } futures-channel = { version = "0.3.4", optional = true } diff --git a/README.md b/README.md index a6d8276db..f432898dc 100644 --- a/README.md +++ b/README.md @@ -1,39 +1,19 @@ -

async-std

-
- - Async version of the Rust standard library - -
+# `async-std` has been discontinued; use `smol` instead -
+We created `async-std` to demonstrate the value of making a library as close to +`std` as possible, but async. We think that demonstration was successful, and +we hope it will influence future design and development directions of async in +`std`. However, in the meantime, the [`smol`](https://github.com/smol-rs/smol/) +project came about and provided a great executor and libraries for asynchronous +use in the Rust ecosystem. We think that resources would be better spent +consolidating around `smol`, rather than continuing to provide occasional +maintenance of `async-std`. As such, we recommend that all users of +`async-std`, and all libraries built on `async-std`, switch to `smol` instead. + +In addition to the `smol` project as a direct replacement, you may find other +parts of the futures ecosystem useful, including `futures-concurrency`, +`async-io`, `futures-lite`, and `async-compat`. -
- - - CI Status - - - - Crates.io version - - - - Download - - - - docs.rs docs - - - - chat - -

@@ -44,14 +24,6 @@ Book - | - - Releases - - | - - Contributing -

@@ -111,38 +83,6 @@ creation, with an adaptive lock-free executor, threadpool and network driver to create a smooth system that processes work at a high pace with low latency, using Rust's familiar stdlib API. -## Installation - -With [cargo-edit](https://github.com/killercup/cargo-edit) installed run: - -```sh -$ cargo add async-std -``` - -We also provide a set of "unstable" features with async-std. See the [features -documentation] on how to enable them. - -[cargo-add]: https://github.com/killercup/cargo-edit -[features documentation]: https://docs.rs/async-std/#features - -## Ecosystem - - * [async-tls](https://crates.io/crates/async-tls) — Async TLS/SSL streams using **Rustls**. - - * [async-native-tls](https://crates.io/crates/async-native-tls) — **Native TLS** for Async. Native TLS for futures and async-std. - - * [async-tungstenite](https://crates.io/crates/async-tungstenite) — Asynchronous **WebSockets** for async-std, tokio, gio and any std Futures runtime. - - * [Tide](https://crates.io/crates/tide) — Serve the web. A modular **web framework** built around async/await. - - * [SQLx](https://crates.io/crates/sqlx) — The Rust **SQL** Toolkit. SQLx is a 100% safe Rust library for Postgres and MySQL with compile-time checked queries. - - * [Surf](https://crates.io/crates/surf) — Surf the web. Surf is a friendly **HTTP client** built for casual Rustaceans and veterans alike. - - * [Xactor](https://crates.io/crates/xactor) — Xactor is a rust actors framework based on async-std. - - * [async-graphql](https://crates.io/crates/async-graphql) — A GraphQL server library implemented in rust, with full support for async/await. - ## License diff --git a/docs/src/security/index.md b/docs/src/security/index.md index 5a94a5401..02fef4c0a 100644 --- a/docs/src/security/index.md +++ b/docs/src/security/index.md @@ -1,6 +1,6 @@ # Security -Writing a highly perfomant async core library is a task involving some instances of unsafe code. +Writing a highly performant async core library is a task involving some instances of unsafe code. We take great care in vetting all unsafe code included in `async-std` and do follow generally accepted practices. diff --git a/docs/src/tutorial/clean_shutdown.md b/docs/src/tutorial/clean_shutdown.md index bd112c934..0937d7086 100644 --- a/docs/src/tutorial/clean_shutdown.md +++ b/docs/src/tutorial/clean_shutdown.md @@ -245,7 +245,7 @@ async fn broker_loop(mut events: Receiver) -> Result<()> { Notice what happens with all of the channels once we exit the accept loop: 1. First, we drop the main broker's sender. - That way when the readers are done, there's no sender for the broker's channel, and the chanel closes. + That way when the readers are done, there's no sender for the broker's channel, and the channel closes. 2. Next, the broker exits `while let Some(event) = events.next().await` loop. 3. It's crucial that, at this stage, we drop the `peers` map. This drops writer's senders. diff --git a/docs/src/tutorial/receiving_messages.md b/docs/src/tutorial/receiving_messages.md index f62b65d9c..036bc4597 100644 --- a/docs/src/tutorial/receiving_messages.md +++ b/docs/src/tutorial/receiving_messages.md @@ -118,7 +118,7 @@ handle.await? The `.await` waits until the client finishes, and `?` propagates the result. There are two problems with this solution however! -*First*, because we immediately await the client, we can only handle one client at time, and that completely defeats the purpose of async! +*First*, because we immediately await the client, we can only handle one client at a time, and that completely defeats the purpose of async! *Second*, if a client encounters an IO error, the whole server immediately exits. That is, a flaky internet connection of one peer brings down the whole chat room! diff --git a/examples/a-chat/server.rs b/examples/a-chat/server.rs index e049a490e..d3ac74699 100644 --- a/examples/a-chat/server.rs +++ b/examples/a-chat/server.rs @@ -96,6 +96,7 @@ async fn connection_writer_loop( None => break, }, void = shutdown.next().fuse() => match void { + #[allow(unreachable_patterns)] Some(void) => match void {}, None => break, } diff --git a/src/collections/mod.rs b/src/collections/mod.rs index ae9efaa92..745aed01c 100644 --- a/src/collections/mod.rs +++ b/src/collections/mod.rs @@ -11,10 +11,17 @@ pub mod hash_set; pub mod linked_list; pub mod vec_deque; +#[allow(unused)] pub use binary_heap::BinaryHeap; +#[allow(unused)] pub use btree_map::BTreeMap; +#[allow(unused)] pub use btree_set::BTreeSet; +#[allow(unused)] pub use hash_map::HashMap; +#[allow(unused)] pub use hash_set::HashSet; +#[allow(unused)] pub use linked_list::LinkedList; +#[allow(unused)] pub use vec_deque::VecDeque; diff --git a/src/fs/file.rs b/src/fs/file.rs index aae201b6d..7dc12dd0e 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -15,6 +15,8 @@ use crate::prelude::*; use crate::task::{spawn_blocking, Context, Poll, Waker}; use crate::utils::Context as _; +const ARC_TRY_UNWRAP_EXPECT: &str = "cannot acquire ownership of the file handle after drop"; + /// An open file on the filesystem. /// /// Depending on what options the file was opened with, this type can be used for reading and/or @@ -415,6 +417,15 @@ impl From for File { cfg_unix! { use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; + impl File { + fn into_std_file(self) -> std::fs::File { + let file = self.file.clone(); + drop(self); + Arc::try_unwrap(file) + .expect(ARC_TRY_UNWRAP_EXPECT) + } + } + impl AsRawFd for File { fn as_raw_fd(&self) -> RawFd { self.file.as_raw_fd() @@ -429,11 +440,29 @@ cfg_unix! { impl IntoRawFd for File { fn into_raw_fd(self) -> RawFd { - let file = self.file.clone(); - drop(self); - Arc::try_unwrap(file) - .expect("cannot acquire ownership of the file handle after drop") - .into_raw_fd() + self.into_std_file().into_raw_fd() + } + } + + cfg_io_safety! { + use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; + + impl AsFd for File { + fn as_fd(&self) -> BorrowedFd<'_> { + self.file.as_fd() + } + } + + impl From for File { + fn from(fd: OwnedFd) -> Self { + std::fs::File::from(fd).into() + } + } + + impl From for OwnedFd { + fn from(val: File) -> OwnedFd { + val.into_std_file().into() + } } } } @@ -458,10 +487,36 @@ cfg_windows! { let file = self.file.clone(); drop(self); Arc::try_unwrap(file) - .expect("cannot acquire ownership of the file handle after drop") + .expect(ARC_TRY_UNWRAP_EXPECT) .into_raw_handle() } } + + cfg_io_safety! { + use crate::os::windows::io::{AsHandle, BorrowedHandle, OwnedHandle}; + + impl AsHandle for File { + fn as_handle(&self) -> BorrowedHandle<'_> { + self.file.as_handle() + } + } + + impl From for File { + fn from(handle: OwnedHandle) -> Self { + std::fs::File::from(handle).into() + } + } + + impl From for OwnedHandle { + fn from(val: File) -> OwnedHandle { + let file = val.file.clone(); + drop(val); + Arc::try_unwrap(file) + .expect(ARC_TRY_UNWRAP_EXPECT) + .into() + } + } + } } /// An async mutex with non-borrowing lock guards. @@ -517,14 +572,16 @@ impl Lock { } // The lock was successfully acquired. - Poll::Ready(LockGuard(self.0.clone())) + Poll::Ready(LockGuard(Some(self.0.clone()))) } } /// A lock guard. /// /// When dropped, ownership of the inner value is returned back to the lock. -struct LockGuard(Arc>); +/// The inner value is always Some, except when the lock is dropped, where we +/// set it to None. See comment in drop(). +struct LockGuard(Option>>); unsafe impl Send for LockGuard {} unsafe impl Sync for LockGuard {} @@ -534,7 +591,7 @@ impl LockGuard { /// /// When this lock guard gets dropped, all registered tasks will be woken up. fn register(&self, cx: &Context<'_>) { - let mut list = self.0.wakers.lock().unwrap(); + let mut list = self.0.as_ref().unwrap().wakers.lock().unwrap(); if list.iter().all(|w| !w.will_wake(cx.waker())) { list.push(cx.waker().clone()); @@ -544,11 +601,22 @@ impl LockGuard { impl Drop for LockGuard { fn drop(&mut self) { + // Set the Option to None and take its value so we can drop the Arc + // before we wake up the tasks. + let lock = self.0.take().unwrap(); + + // Prepare to wake up all registered tasks interested in acquiring the lock. + let wakers: Vec<_> = lock.wakers.lock().unwrap().drain(..).collect(); + // Release the lock. - self.0.locked.store(false, Ordering::Release); + lock.locked.store(false, Ordering::Release); + + // Drop the Arc _before_ waking up the tasks, to avoid races. See + // reproducer and discussion in https://github.com/async-rs/async-std/issues/1001. + drop(lock); // Wake up all registered tasks interested in acquiring the lock. - for w in self.0.wakers.lock().unwrap().drain(..) { + for w in wakers { w.wake(); } } @@ -558,13 +626,19 @@ impl Deref for LockGuard { type Target = T; fn deref(&self) -> &T { - unsafe { &*self.0.value.get() } + // SAFETY: Safe because the lock is held when this method is called. And + // the inner value is always Some since it is only set to None in + // drop(). + unsafe { &*self.0.as_ref().unwrap().value.get() } } } impl DerefMut for LockGuard { fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.0.value.get() } + // SAFETY: Safe because the lock is held when this method is called. And + // the inner value is always Some since it is only set to None in + // drop(). + unsafe { &mut *self.0.as_ref().unwrap().value.get() } } } @@ -670,14 +744,19 @@ impl LockGuard { match self.mode { Mode::Idle => {} + Mode::Reading(0) if self.cache.is_empty() => { + // If the cache is empty in reading mode, the last operation didn't read any bytes, + // which indicates that it reached the end of the file. In this case we need to + // reset the mode to idle so that next time we try to read again, since the file + // may grow after the first EOF. + self.mode = Mode::Idle; + return Poll::Ready(Ok(0)); + } Mode::Reading(start) => { // How many bytes in the cache are available for reading. let available = self.cache.len() - start; - // If there is cached unconsumed data or if the cache is empty, we can read from - // it. Empty cache in reading mode indicates that the last operation didn't read - // any bytes, i.e. it reached the end of the file. - if available > 0 || self.cache.is_empty() { + if available > 0 { // Copy data from the cache into the buffer. let n = cmp::min(available, buf.len()); buf[..n].copy_from_slice(&self.cache[start..(start + n)]); @@ -904,13 +983,49 @@ mod tests { } #[test] - fn async_file_create_error () { + fn async_file_create_error() { let file_name = Path::new("/tmp/does_not_exist/test"); let expect = std::fs::File::create(file_name).unwrap_err(); crate::task::block_on(async move { let actual = File::create(file_name).await.unwrap_err(); - assert_eq!(format!("{}", expect), format!("{}", actual)); + assert_eq!(format!("{}", expect), format!("{}", actual)); + }) + } + + #[test] + fn file_eof_is_not_permanent() -> crate::io::Result<()> { + let tempdir = tempfile::Builder::new() + .prefix("async-std-file-eof-test") + .tempdir()?; + let path = tempdir.path().join("testfile"); + + crate::task::block_on(async { + let mut file_w = File::create(&path).await?; + let mut file_r = File::open(&path).await?; + + file_w.write_all(b"data").await?; + file_w.flush().await?; + + let mut buf = [0u8; 4]; + let mut len = file_r.read(&mut buf).await?; + assert_eq!(len, 4); + assert_eq!(&buf, b"data"); + + len = file_r.read(&mut buf).await?; + assert_eq!(len, 0); + + file_w.write_all(b"more").await?; + file_w.flush().await?; + + len = file_r.read(&mut buf).await?; + assert_eq!(len, 4); + assert_eq!(&buf, b"more"); + + len = file_r.read(&mut buf).await?; + assert_eq!(len, 0); + + Ok(()) }) } } diff --git a/src/fs/open_options.rs b/src/fs/open_options.rs index 91ad8cab5..18aa89eb3 100644 --- a/src/fs/open_options.rs +++ b/src/fs/open_options.rs @@ -136,7 +136,7 @@ impl OpenOptions { /// Configures the option for append mode. /// /// When set to `true`, this option means the file will be writable after opening and the file - /// cursor will be moved to the end of file before every write operaiton. + /// cursor will be moved to the end of file before every write operation. /// /// # Examples /// diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs index 47187b235..9a8bfcc1f 100644 --- a/src/future/future/mod.rs +++ b/src/future/future/mod.rs @@ -27,6 +27,7 @@ pub use core::future::Future as Future; [`Future`]: ../future/trait.Future.html "#] +#[cfg(any(feature = "std", feature = "docs"))] pub trait FutureExt: Future { /// Returns a Future that delays execution for a specified time. /// @@ -284,5 +285,6 @@ pub trait FutureExt: Future { } } +#[cfg(any(feature = "std", feature = "docs"))] impl FutureExt for T {} diff --git a/src/io/buf_read/mod.rs b/src/io/buf_read/mod.rs index 75247a516..bcb9d907c 100644 --- a/src/io/buf_read/mod.rs +++ b/src/io/buf_read/mod.rs @@ -239,7 +239,7 @@ pub trait BufReadExt: BufRead { impl BufReadExt for T {} -pub fn read_until_internal( +pub(crate) fn read_until_internal( mut reader: Pin<&mut R>, cx: &mut Context<'_>, byte: u8, diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index c972937fd..230954e88 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -114,7 +114,7 @@ pin_project! { /// # Ok(()) }) } ///``` #[derive(Debug)] -pub struct IntoInnerError(W, crate::io::Error); +pub struct IntoInnerError(W, #[allow(dead_code)] crate::io::Error); impl BufWriter { /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, diff --git a/src/io/copy.rs b/src/io/copy.rs index f05ed0e17..e8d5d733f 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -7,6 +7,12 @@ use crate::io::{self, BufRead, BufReader, Read, Write}; use crate::task::{Context, Poll}; use crate::utils::Context as _; +// Note: There are two otherwise-identical implementations of this +// function because unstable has removed the `?Sized` bound for the +// reader and writer and accepts `R` and `W` instead of `&mut R` and +// `&mut W`. If making a change to either of the implementations, +// ensure that you copy it into the other. + /// Copies the entire contents of a reader into a writer. /// /// This function will continuously read data from `reader` and then @@ -57,6 +63,7 @@ where #[pin] writer: W, amt: u64, + reader_eof: bool } } @@ -69,13 +76,20 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); + loop { - let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if buffer.is_empty() { + if *this.reader_eof { futures_core::ready!(this.writer.as_mut().poll_flush(cx))?; return Poll::Ready(Ok(*this.amt)); } + let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?; + + if buffer.is_empty() { + *this.reader_eof = true; + continue; + } + let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?; if i == 0 { return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); @@ -89,6 +103,7 @@ where let future = CopyFuture { reader: BufReader::new(reader), writer, + reader_eof: false, amt: 0, }; future.await.context(|| String::from("io::copy failed")) @@ -144,6 +159,7 @@ where #[pin] writer: W, amt: u64, + reader_eof: bool } } @@ -156,13 +172,20 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); + loop { - let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if buffer.is_empty() { + if *this.reader_eof { futures_core::ready!(this.writer.as_mut().poll_flush(cx))?; return Poll::Ready(Ok(*this.amt)); } + let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?; + + if buffer.is_empty() { + *this.reader_eof = true; + continue; + } + let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?; if i == 0 { return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); @@ -176,6 +199,7 @@ where let future = CopyFuture { reader: BufReader::new(reader), writer, + reader_eof: false, amt: 0, }; future.await.context(|| String::from("io::copy failed")) diff --git a/src/io/mod.rs b/src/io/mod.rs index e20ed800b..8a415eb71 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -275,7 +275,7 @@ cfg_std! { #[doc(inline)] pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; - pub use buf_read::{BufRead, Lines, Split}; + pub use buf_read::*; pub use buf_reader::BufReader; pub use buf_writer::{BufWriter, IntoInnerError}; pub use copy::copy; @@ -283,7 +283,7 @@ cfg_std! { pub use empty::{empty, Empty}; pub use read::*; pub use repeat::{repeat, Repeat}; - pub use seek::Seek; + pub use seek::*; pub use sink::{sink, Sink}; pub use write::*; diff --git a/src/io/read/bytes.rs b/src/io/read/bytes.rs index ab9259611..786fdaa57 100644 --- a/src/io/read/bytes.rs +++ b/src/io/read/bytes.rs @@ -32,7 +32,7 @@ impl Stream for Bytes { } } -#[cfg(all(test, default))] +#[cfg(all(test, feature = "default", not(target_arch = "wasm32")))] mod tests { use crate::io; use crate::prelude::*; diff --git a/src/io/read/chain.rs b/src/io/read/chain.rs index 4fcdb0ec9..b5eac5814 100644 --- a/src/io/read/chain.rs +++ b/src/io/read/chain.rs @@ -165,7 +165,7 @@ impl BufRead for Chain { } } -#[cfg(all(test, default))] +#[cfg(all(test, feature = "default", not(target_arch = "wasm32")))] mod tests { use crate::io; use crate::prelude::*; diff --git a/src/io/stderr.rs b/src/io/stderr.rs index 22dadd1f6..81cc197b9 100644 --- a/src/io/stderr.rs +++ b/src/io/stderr.rs @@ -180,6 +180,18 @@ cfg_unix! { std::io::stderr().as_raw_fd() } } + + cfg_io_safety! { + use crate::os::unix::io::{AsFd, BorrowedFd}; + + impl AsFd for Stderr { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { + BorrowedFd::borrow_raw(std::io::stderr().as_raw_fd()) + } + } + } + } } cfg_windows! { @@ -190,4 +202,16 @@ cfg_windows! { std::io::stderr().as_raw_handle() } } + + cfg_io_safety! { + use crate::os::windows::io::{AsHandle, BorrowedHandle}; + + impl AsHandle for Stderr { + fn as_handle(&self) -> BorrowedHandle<'_> { + unsafe { + BorrowedHandle::borrow_raw(std::io::stderr().as_raw_handle()) + } + } + } + } } diff --git a/src/io/stdin.rs b/src/io/stdin.rs index c969574e5..d8f96d49e 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -206,6 +206,18 @@ cfg_unix! { std::io::stdin().as_raw_fd() } } + + cfg_io_safety! { + use crate::os::unix::io::{AsFd, BorrowedFd}; + + impl AsFd for Stdin { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { + BorrowedFd::borrow_raw(std::io::stdin().as_raw_fd()) + } + } + } + } } cfg_windows! { @@ -216,4 +228,16 @@ cfg_windows! { std::io::stdin().as_raw_handle() } } + + cfg_io_safety! { + use crate::os::windows::io::{AsHandle, BorrowedHandle}; + + impl AsHandle for Stdin { + fn as_handle(&self) -> BorrowedHandle<'_> { + unsafe { + BorrowedHandle::borrow_raw(std::io::stdin().as_raw_handle()) + } + } + } + } } diff --git a/src/io/stdout.rs b/src/io/stdout.rs index 45244b140..3cc570dc8 100644 --- a/src/io/stdout.rs +++ b/src/io/stdout.rs @@ -180,6 +180,18 @@ cfg_unix! { std::io::stdout().as_raw_fd() } } + + cfg_io_safety! { + use crate::os::unix::io::{AsFd, BorrowedFd}; + + impl AsFd for Stdout { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { + BorrowedFd::borrow_raw(std::io::stdout().as_raw_fd()) + } + } + } + } } cfg_windows! { @@ -190,4 +202,16 @@ cfg_windows! { std::io::stdout().as_raw_handle() } } + + cfg_io_safety! { + use crate::os::windows::io::{AsHandle, BorrowedHandle}; + + impl AsHandle for Stdout { + fn as_handle(&self) -> BorrowedHandle<'_> { + unsafe { + BorrowedHandle::borrow_raw(std::io::stdout().as_raw_handle()) + } + } + } + } } diff --git a/src/io/write/write_fmt.rs b/src/io/write/write_fmt.rs index 318b1c37f..a65e06a64 100644 --- a/src/io/write/write_fmt.rs +++ b/src/io/write/write_fmt.rs @@ -18,7 +18,7 @@ impl Future for WriteFmtFuture<'_, T> { type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Process the interal Result the first time we run. + // Process the internal Result the first time we run. if self.buffer.is_none() { match self.res.take().unwrap() { Err(err) => return Poll::Ready(Err(err)), diff --git a/src/lib.rs b/src/lib.rs index 86786e814..b807d448d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(rustdoc::invalid_html_tags)] //! # Async version of the Rust standard library //! //! `async-std` is a foundation of portable Rust software, a set of minimal and battle-tested @@ -191,7 +192,7 @@ //! unstable +//! > unstable //! are available only when the `unstable` Cargo feature is enabled: //! //! ```toml @@ -204,7 +205,7 @@ //! attributes +//! > attributes //! are available only when the `attributes` Cargo feature is enabled: //! //! ```toml @@ -280,12 +281,10 @@ #![cfg_attr(feature = "docs", feature(doc_cfg))] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![allow(clippy::mutex_atomic, clippy::module_inception)] -#![doc(test(attr(deny(rust_2018_idioms, warnings))))] +#![doc(test(attr(deny(rust_2018_idioms))))] #![doc(test(attr(allow(unused_extern_crates, unused_variables))))] #![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")] -extern crate alloc; - #[macro_use] mod utils; diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 69340db4e..d014f3387 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -282,6 +282,28 @@ cfg_unix! { self.watcher.into_inner().unwrap().into_raw_fd() } } + + cfg_io_safety! { + use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; + + impl AsFd for TcpListener { + fn as_fd(&self) -> BorrowedFd<'_> { + self.watcher.get_ref().as_fd() + } + } + + impl From for TcpListener { + fn from(fd: OwnedFd) -> TcpListener { + std::net::TcpListener::from(fd).into() + } + } + + impl From for OwnedFd { + fn from(listener: TcpListener) -> OwnedFd { + listener.watcher.into_inner().unwrap().into() + } + } + } } cfg_windows! { @@ -306,4 +328,26 @@ cfg_windows! { self.watcher.into_inner().unwrap().into_raw_socket() } } + + cfg_io_safety! { + use crate::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket}; + + impl AsSocket for TcpListener { + fn as_socket(&self) -> BorrowedSocket<'_> { + self.watcher.get_ref().as_socket() + } + } + + impl From for TcpListener { + fn from(fd: OwnedSocket) -> TcpListener { + std::net::TcpListener::from(fd).into() + } + } + + impl From for OwnedSocket { + fn from(listener: TcpListener) -> OwnedSocket { + listener.watcher.into_inner().unwrap().into() + } + } + } } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index f30e4714c..3311f904c 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -416,6 +416,28 @@ cfg_unix! { self.as_raw_fd() } } + + cfg_io_safety! { + use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; + + impl AsFd for TcpStream { + fn as_fd(&self) -> BorrowedFd<'_> { + self.watcher.get_ref().as_fd() + } + } + + impl From for TcpStream { + fn from(fd: OwnedFd) -> TcpStream { + std::net::TcpStream::from(fd).into() + } + } + + impl From for OwnedFd { + fn from(stream: TcpStream) -> OwnedFd { + stream.watcher.get_ref().try_clone().unwrap().into() + } + } + } } cfg_windows! { @@ -443,4 +465,26 @@ cfg_windows! { self.as_raw_socket() } } + + cfg_io_safety! { + use crate::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket}; + + impl AsSocket for TcpStream { + fn as_socket(&self) -> BorrowedSocket<'_> { + self.watcher.get_ref().as_socket() + } + } + + impl From for TcpStream { + fn from(fd: OwnedSocket) -> TcpStream { + std::net::TcpStream::from(fd).into() + } + } + + impl From for OwnedSocket { + fn from(stream: TcpStream) -> OwnedSocket { + stream.watcher.get_ref().try_clone().unwrap().into() + } + } + } } diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index e216af43a..3bb2c6e9c 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -562,6 +562,28 @@ cfg_unix! { self.watcher.into_inner().unwrap().into_raw_fd() } } + + cfg_io_safety! { + use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; + + impl AsFd for UdpSocket { + fn as_fd(&self) -> BorrowedFd<'_> { + self.watcher.get_ref().as_fd() + } + } + + impl From for UdpSocket { + fn from(fd: OwnedFd) -> UdpSocket { + std::net::UdpSocket::from(fd).into() + } + } + + impl From for OwnedFd { + fn from(stream: UdpSocket) -> OwnedFd { + stream.watcher.into_inner().unwrap().into() + } + } + } } cfg_windows! { @@ -586,4 +608,26 @@ cfg_windows! { self.watcher.into_inner().unwrap().into_raw_socket() } } + + cfg_io_safety! { + use crate::os::windows::io::{AsSocket, BorrowedSocket, OwnedSocket}; + + impl AsSocket for UdpSocket { + fn as_socket(&self) -> BorrowedSocket<'_> { + self.watcher.get_ref().as_socket() + } + } + + impl From for UdpSocket { + fn from(fd: OwnedSocket) -> UdpSocket { + std::net::UdpSocket::from(fd).into() + } + } + + impl From for OwnedSocket { + fn from(stream: UdpSocket) -> OwnedSocket { + stream.watcher.into_inner().unwrap().into() + } + } + } } diff --git a/src/option/mod.rs b/src/option/mod.rs index 76f096b3f..f0d67b77b 100644 --- a/src/option/mod.rs +++ b/src/option/mod.rs @@ -5,6 +5,7 @@ mod from_stream; +#[allow(unused)] #[doc(inline)] pub use std::option::Option; diff --git a/src/os/unix/io.rs b/src/os/unix/io.rs index 0b9846074..9d1fbaede 100644 --- a/src/os/unix/io.rs +++ b/src/os/unix/io.rs @@ -2,6 +2,10 @@ cfg_not_docs! { pub use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; + + cfg_io_safety! { + pub use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; + } } cfg_docs! { @@ -51,4 +55,9 @@ cfg_docs! { /// and must close the descriptor once it's no longer needed. fn into_raw_fd(self) -> RawFd; } + + cfg_io_safety! { + #[doc(inline)] + pub use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; + } } diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index 7b0fc32ec..792860223 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -340,3 +340,25 @@ impl IntoRawFd for UnixDatagram { self.watcher.into_inner().unwrap().into_raw_fd() } } + +cfg_io_safety! { + use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; + + impl AsFd for UnixDatagram { + fn as_fd(&self) -> BorrowedFd<'_> { + self.watcher.get_ref().as_fd() + } + } + + impl From for UnixDatagram { + fn from(fd: OwnedFd) -> UnixDatagram { + StdUnixDatagram::from(fd).into() + } + } + + impl From for OwnedFd { + fn from(stream: UnixDatagram) -> OwnedFd { + stream.watcher.into_inner().unwrap().into() + } + } +} diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index 1f983656f..b87b781f2 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -233,3 +233,25 @@ impl IntoRawFd for UnixListener { self.watcher.into_inner().unwrap().into_raw_fd() } } + +cfg_io_safety! { + use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; + + impl AsFd for UnixListener { + fn as_fd(&self) -> BorrowedFd<'_> { + self.watcher.get_ref().as_fd() + } + } + + impl From for UnixListener { + fn from(fd: OwnedFd) -> UnixListener { + std::os::unix::net::UnixListener::from(fd).into() + } + } + + impl From for OwnedFd { + fn from(stream: UnixListener) -> OwnedFd { + stream.watcher.into_inner().unwrap().into() + } + } +} diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index 8674e7c32..4a4f45ad6 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -264,3 +264,25 @@ impl IntoRawFd for UnixStream { (*self.watcher).get_ref().try_clone().unwrap().into_raw_fd() } } + +cfg_io_safety! { + use crate::os::unix::io::{AsFd, BorrowedFd, OwnedFd}; + + impl AsFd for UnixStream { + fn as_fd(&self) -> BorrowedFd<'_> { + self.watcher.get_ref().as_fd() + } + } + + impl From for UnixStream { + fn from(fd: OwnedFd) -> UnixStream { + std::os::unix::net::UnixStream::from(fd).into() + } + } + + impl From for OwnedFd { + fn from(stream: UnixStream) -> OwnedFd { + stream.watcher.get_ref().try_clone().unwrap().into() + } + } +} diff --git a/src/os/windows/io.rs b/src/os/windows/io.rs index 30d37a0ef..caffc6fc6 100644 --- a/src/os/windows/io.rs +++ b/src/os/windows/io.rs @@ -5,6 +5,13 @@ cfg_not_docs! { AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle, AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket, }; + + cfg_io_safety! { + pub use std::os::windows::io::{ + AsHandle, BorrowedHandle, OwnedHandle, + AsSocket, BorrowedSocket, OwnedSocket, + }; + } } cfg_docs! { @@ -75,4 +82,12 @@ cfg_docs! { /// it once it's no longer needed. fn into_raw_socket(self) -> RawSocket; } + + cfg_io_safety! { + #[doc(inline)] + pub use std::os::windows::io::{ + AsHandle, BorrowedHandle, OwnedHandle, + AsSocket, BorrowedSocket, OwnedSocket, + }; + } } diff --git a/src/result/mod.rs b/src/result/mod.rs index cae0ebd93..fc263318b 100644 --- a/src/result/mod.rs +++ b/src/result/mod.rs @@ -5,6 +5,7 @@ mod from_stream; +#[allow(unused)] #[doc(inline)] pub use std::result::Result; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f7f2727a1..8dd7d6339 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -34,12 +34,17 @@ //! [`Stream`] looks like this: //! //! ``` +//! #![allow(dead_code)] //! # use async_std::task::{Context, Poll}; //! # use std::pin::Pin; -//! trait Stream { +//! pub trait Stream { //! type Item; //! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; //! } +//! # impl Stream for () { +//! # type Item = (); +//! # fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Poll::Pending } +//! # } //! ``` //! //! A stream has a method, [`next`], which when called, returns an diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index b0bf1f339..144194d24 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1485,7 +1485,7 @@ pub trait StreamExt: Stream { the stream and ignore elements until it returns `false`. After `false` is returned, `SkipWhile`'s job is over and all further - elements in the strem are yielded. + elements in the stream are yielded. ## Examples diff --git a/src/stream/successors.rs b/src/stream/successors.rs index a9ce40ffe..eb0e4bf48 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -42,7 +42,7 @@ pin_project! { /// /// This stream is constructed by [`successors`] function /// - /// [`successors`]: fn.succssors.html + /// [`successors`]: fn.successors.html #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] diff --git a/src/string/mod.rs b/src/string/mod.rs index e382fcf2c..3f6141fa3 100644 --- a/src/string/mod.rs +++ b/src/string/mod.rs @@ -5,5 +5,6 @@ mod extend; mod from_stream; +#[allow(unused)] #[doc(inline)] pub use std::string::String; diff --git a/src/sync/condvar.rs b/src/sync/condvar.rs index 1a208efdd..b7e81ed29 100644 --- a/src/sync/condvar.rs +++ b/src/sync/condvar.rs @@ -135,7 +135,7 @@ impl Condvar { } } - /// Blocks the current taks until this condition variable receives a notification and the + /// Blocks the current task until this condition variable receives a notification and the /// required condition is met. Spurious wakeups are ignored and this function will only /// return once the condition has been met. /// diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 35203a6ea..1d8579614 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -95,9 +95,9 @@ //! at the same time: In multi-threaded scenarios, you can use two //! kinds of primitives to deal with synchronization: //! - [memory fences] to ensure memory accesses are made visible to -//! other CPUs in the right order. +//! other CPUs in the right order. //! - [atomic operations] to ensure simultaneous access to the same -//! memory location doesn't lead to undefined behavior. +//! memory location doesn't lead to undefined behavior. //! //! [prefetching]: https://en.wikipedia.org/wiki/Cache_prefetching //! [compiler fences]: https://doc.rust-lang.org/std/sync/atomic/fn.compiler_fence.html diff --git a/src/sync/waker_set.rs b/src/sync/waker_set.rs index 243b3e33e..05590f488 100644 --- a/src/sync/waker_set.rs +++ b/src/sync/waker_set.rs @@ -149,7 +149,7 @@ impl WakerSet { /// Returns `true` if at least one operation was notified. #[cold] fn notify(&self, n: Notify) -> bool { - let mut inner = &mut *self.lock(); + let inner = &mut *self.lock(); let mut notified = false; for (_, opt_waker) in inner.entries.iter_mut() { diff --git a/src/task/block_on.rs b/src/task/block_on.rs index fa66f915b..3ab4dc06f 100644 --- a/src/task/block_on.rs +++ b/src/task/block_on.rs @@ -19,11 +19,9 @@ use crate::task::Builder; /// ```no_run /// use async_std::task; /// -/// fn main() { -/// task::block_on(async { -/// println!("Hello, world!"); -/// }) -/// } +/// task::block_on(async { +/// println!("Hello, world!"); +/// }) /// ``` #[cfg(not(target_os = "unknown"))] pub fn block_on(future: F) -> T diff --git a/src/utils.rs b/src/utils.rs index d1b497273..d1cb063bd 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,5 +1,3 @@ -use alloc::string::String; - /// Calls a function and aborts if it panics. /// /// This is useful in unsafe code where we can't recover from panics. @@ -55,6 +53,7 @@ pub fn random(n: u32) -> u32 { } /// Add additional context to errors +#[cfg(feature = "std")] pub(crate) trait Context { fn context(self, message: impl Fn() -> String) -> Self; } @@ -84,7 +83,13 @@ mod timer { impl Timer { pub(crate) fn after(dur: std::time::Duration) -> Self { - Timer(TimeoutFuture::new(dur.as_millis() as u32)) + // Round up to the nearest millisecond. + let mut timeout_ms = dur.as_millis() as u32; + if std::time::Duration::from_millis(timeout_ms as u64) < dur { + timeout_ms += 1; + } + + Timer(TimeoutFuture::new(timeout_ms)) } } @@ -142,7 +147,7 @@ macro_rules! cfg_unstable_default { ($($item:item)*) => { $( #[cfg(all(feature = "default", feature = "unstable"))] - #[cfg_attr(feature = "docs", doc(unstable))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] $item )* } @@ -155,7 +160,6 @@ macro_rules! cfg_unix { ($($item:item)*) => { $( #[cfg(any(unix, feature = "docs"))] - #[cfg_attr(feature = "docs", doc(cfg(unix)))] $item )* } @@ -168,7 +172,6 @@ macro_rules! cfg_windows { ($($item:item)*) => { $( #[cfg(any(windows, feature = "docs"))] - #[cfg_attr(feature = "docs", doc(cfg(windows)))] $item )* } @@ -233,3 +236,15 @@ macro_rules! cfg_default { )* } } + +/// Declares items that use I/O safety. +#[allow(unused_macros)] +#[doc(hidden)] +macro_rules! cfg_io_safety { + ($($item:item)*) => { + $( + #[cfg(feature = "io_safety")] + $item + )* + } +} diff --git a/src/vec/mod.rs b/src/vec/mod.rs index 77a0b746b..2efd3c3f9 100644 --- a/src/vec/mod.rs +++ b/src/vec/mod.rs @@ -6,5 +6,6 @@ mod extend; mod from_stream; +#[allow(unused)] #[doc(inline)] pub use std::vec::Vec; diff --git a/tests/io_copy.rs b/tests/io_copy.rs new file mode 100644 index 000000000..394c34f8e --- /dev/null +++ b/tests/io_copy.rs @@ -0,0 +1,72 @@ +use std::{ + io::Result, + pin::Pin, + task::{Context, Poll}, +}; + +struct ReaderThatPanicsAfterEof { + read_count: usize, + has_sent_eof: bool, + max_read: usize, +} + +impl async_std::io::Read for ReaderThatPanicsAfterEof { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + if self.has_sent_eof { + panic!("this should be unreachable because we should not poll after eof (Ready(Ok(0)))") + } else if self.read_count >= self.max_read { + self.has_sent_eof = true; + Poll::Ready(Ok(0)) + } else { + self.read_count += 1; + Poll::Ready(Ok(buf.len())) + } + } +} + +struct WriterThatTakesAWhileToFlush { + max_flush: usize, + flush_count: usize, +} + +impl async_std::io::Write for WriterThatTakesAWhileToFlush { + fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.flush_count += 1; + if self.flush_count >= self.max_flush { + Poll::Ready(Ok(())) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +#[test] +fn io_copy_does_not_poll_after_eof() { + async_std::task::block_on(async { + let mut reader = ReaderThatPanicsAfterEof { + has_sent_eof: false, + max_read: 10, + read_count: 0, + }; + + let mut writer = WriterThatTakesAWhileToFlush { + flush_count: 0, + max_flush: 10, + }; + + assert!(async_std::io::copy(&mut reader, &mut writer).await.is_ok()); + }) +}