diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index fa2529794..311383373 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -10,6 +10,10 @@ env: CARGO_TERM_COLOR: always REDIS_RS_REDIS_JSON_PATH: "/tmp/librejson.so" +concurrency: + group: "${{ github.workflow }}-${{ github.head_ref || github.run_id || github.ref }}" + cancel-in-progress: true + jobs: build: @@ -25,7 +29,7 @@ jobs: - stable - beta - nightly - - 1.65.0 + - 1.70.0 steps: diff --git a/Cargo.lock b/Cargo.lock index 8cfaa8be1..3195f46c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "afl" -version = "0.15.8" +version = "0.15.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ff7c9e6d8b0f28402139fcbff21a22038212c94c44ecf90812ce92f384308f6" +checksum = "c21e10b6947189c5ff61343b5354e9ad1c1722bd47b69cd0a6b49e5fa7f7ecf6" dependencies = [ "home", "libc", @@ -250,9 +250,9 @@ checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" [[package]] name = "async-trait" -version = "0.1.80" +version = "0.1.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" +checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", @@ -305,9 +305,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bigdecimal" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9324c8014cd04590682b34f1e9448d38f0674d0f7b2dc553331016ef0e4e9ebc" +checksum = "51d712318a27c7150326677b321a5fa91b55f6d9034ffd67f20319e147d40cee" dependencies = [ "autocfg", "libm", @@ -410,9 +410,9 @@ dependencies = [ [[package]] name = "bytes" -version = "1.6.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "cast" @@ -923,9 +923,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.4" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "home" @@ -981,7 +981,7 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" dependencies = [ - "hermit-abi 0.3.4", + "hermit-abi 0.3.9", "libc", "windows-sys 0.48.0", ] @@ -1085,22 +1085,22 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.11" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4" dependencies = [ + "hermit-abi 0.3.9", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "native-tls" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" dependencies = [ - "lazy_static", "libc", "log", "openssl", @@ -1140,16 +1140,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi 0.3.4", - "libc", -] - [[package]] name = "object" version = "0.32.2" @@ -1530,7 +1520,7 @@ dependencies = [ [[package]] name = "redis" -version = "0.26.0" +version = "0.27.0" dependencies = [ "ahash 0.8.11", "anyhow", @@ -1583,7 +1573,7 @@ dependencies = [ [[package]] name = "redis-test" -version = "0.5.0" +version = "0.6.0" dependencies = [ "bytes", "futures", @@ -1683,9 +1673,9 @@ dependencies = [ [[package]] name = "rust_decimal" -version = "1.35.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1790d1c4c0ca81211399e0e0af16333276f375209e71a37b67698a373db5b47a" +checksum = "b082d80e3e3cc52b2ed634388d436fe1f4de6af5786cc2de9ba9737527bdf555" dependencies = [ "arrayvec", "borsh", @@ -1741,9 +1731,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.10" +version = "0.23.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" +checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" dependencies = [ "once_cell", "ring", @@ -1768,9 +1758,9 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "2.1.2" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +checksum = "196fe16b00e106300d3e45ecfcb764fa292a535d7326a29a5875c579c7417425" dependencies = [ "base64", "rustls-pki-types", @@ -1784,9 +1774,9 @@ checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" [[package]] name = "rustls-webpki" -version = "0.102.4" +version = "0.102.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" +checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e" dependencies = [ "ring", "rustls-pki-types", @@ -1869,18 +1859,18 @@ checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" [[package]] name = "serde" -version = "1.0.203" +version = "1.0.209" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" +checksum = "99fce0ffe7310761ca6bf9faf5115afbc19688edd00171d81b1bb1b116c63e09" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.203" +version = "1.0.209" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" +checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170" dependencies = [ "proc-macro2", "quote", @@ -1889,20 +1879,21 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.119" +version = "1.0.127" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8eddb61f0697cc3989c5d64b452f5488e2b8a60fd7d5076a3045076ffef8cb0" +checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad" dependencies = [ "itoa", + "memchr", "ryu", "serde", ] [[package]] name = "sha1_smol" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" [[package]] name = "simdutf8" @@ -2062,26 +2053,25 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.0" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "pin-project-lite", "socket2 0.5.7", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -2206,9 +2196,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.9.1" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" [[package]] name = "valkey" @@ -2332,9 +2322,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.26.1" +version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +checksum = "bd7c23921eeb1713a4e851530e9b9756e4fb0e89978582942612524cf09f01cd" dependencies = [ "rustls-pki-types", ] diff --git a/Makefile b/Makefile index 07dd7b3b9..c667ab831 100644 --- a/Makefile +++ b/Makefile @@ -40,7 +40,7 @@ test: @echo "====================================================================" @echo "Testing Connection Type UNIX SOCKETS" @echo "====================================================================" - @RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=unix RUST_BACKTRACE=1 cargo test --locked -p redis --all-features -- --test-threads=1 --skip test_cluster --skip test_async_cluster --skip test_module + @RUSTFLAGS="-D warnings" REDISRS_SERVER_TYPE=unix RUST_BACKTRACE=1 cargo test --locked -p redis --all-features -- --test-threads=1 --skip test_cluster --skip cluster_async --skip test_module @echo "====================================================================" @echo "Testing async-std with Rustls" diff --git a/README.md b/README.md index 3df4ed0e7..2d6501d1a 100644 --- a/README.md +++ b/README.md @@ -14,14 +14,12 @@ The crate is called `redis` and you can depend on it via cargo: ```ini [dependencies] -redis = "0.26.0" +redis = "0.27.0" ``` Documentation on the library can be found at [docs.rs/redis](https://docs.rs/redis). -**Note: redis-rs requires at least Rust 1.60.** - ## Basic Operation To open a connection you need to create a client and then to fetch a @@ -59,10 +57,10 @@ To enable asynchronous clients, enable the relevant feature in your Cargo.toml, ``` # if you use tokio -redis = { version = "0.26.0", features = ["tokio-comp"] } +redis = { version = "0.27.0", features = ["tokio-comp"] } # if you use async-std -redis = { version = "0.26.0", features = ["async-std-comp"] } +redis = { version = "0.27.0", features = ["async-std-comp"] } ``` ## TLS Support @@ -73,25 +71,25 @@ Currently, `native-tls` and `rustls` are supported. To use `native-tls`: ``` -redis = { version = "0.26.0", features = ["tls-native-tls"] } +redis = { version = "0.27.0", features = ["tls-native-tls"] } # if you use tokio -redis = { version = "0.26.0", features = ["tokio-native-tls-comp"] } +redis = { version = "0.27.0", features = ["tokio-native-tls-comp"] } # if you use async-std -redis = { version = "0.26.0", features = ["async-std-native-tls-comp"] } +redis = { version = "0.27.0", features = ["async-std-native-tls-comp"] } ``` To use `rustls`: ``` -redis = { version = "0.26.0", features = ["tls-rustls"] } +redis = { version = "0.27.0", features = ["tls-rustls"] } # if you use tokio -redis = { version = "0.26.0", features = ["tokio-rustls-comp"] } +redis = { version = "0.27.0", features = ["tokio-rustls-comp"] } # if you use async-std -redis = { version = "0.26.0", features = ["async-std-rustls-comp"] } +redis = { version = "0.27.0", features = ["async-std-rustls-comp"] } ``` With `rustls`, you can add the following feature flags on top of other feature flags to enable additional features: @@ -117,7 +115,7 @@ let client = redis::Client::open("rediss://127.0.0.1/#insecure")?; Support for Redis Cluster can be enabled by enabling the `cluster` feature in your Cargo.toml: -`redis = { version = "0.26.0", features = [ "cluster"] }` +`redis = { version = "0.27.0", features = [ "cluster"] }` Then you can simply use the `ClusterClient`, which accepts a list of available nodes. Note that only one node in the cluster needs to be specified when instantiating the client, though @@ -140,7 +138,7 @@ fn fetch_an_integer() -> String { Async Redis Cluster support can be enabled by enabling the `cluster-async` feature, along with your preferred async runtime, e.g.: -`redis = { version = "0.26.0", features = [ "cluster-async", "tokio-std-comp" ] }` +`redis = { version = "0.27.0", features = [ "cluster-async", "tokio-std-comp" ] }` ```rust use redis::cluster::ClusterClient; @@ -160,7 +158,7 @@ async fn fetch_an_integer() -> String { Support for the RedisJSON Module can be enabled by specifying "json" as a feature in your Cargo.toml. -`redis = { version = "0.26.0", features = ["json"] }` +`redis = { version = "0.27.0", features = ["json"] }` Then you can simply import the `JsonCommands` trait which will add the `json` commands to all Redis Connections (not to be confused with just `Commands` which only adds the default commands) diff --git a/redis-test/CHANGELOG.md b/redis-test/CHANGELOG.md index 3e45593e8..2a73f1ed5 100644 --- a/redis-test/CHANGELOG.md +++ b/redis-test/CHANGELOG.md @@ -1,3 +1,6 @@ +### 0.5.0 (2024-09-07) +* Track redis 0.27.0 release + ### 0.5.0 (2024-07-26) * Track redis 0.26.0 release diff --git a/redis-test/Cargo.toml b/redis-test/Cargo.toml index a1bf8c18a..41ec1d230 100644 --- a/redis-test/Cargo.toml +++ b/redis-test/Cargo.toml @@ -1,19 +1,19 @@ [package] name = "redis-test" -version = "0.5.0" +version = "0.6.0" edition = "2021" description = "Testing helpers for the `redis` crate" homepage = "https://github.com/redis-rs/redis-rs" repository = "https://github.com/redis-rs/redis-rs" documentation = "https://docs.rs/redis-test" license = "BSD-3-Clause" -rust-version = "1.65" +rust-version = "1.70" [lib] bench = false [dependencies] -redis = { version = "0.26.0", path = "../redis" } +redis = { version = "0.27.0", path = "../redis" } bytes = { version = "1", optional = true } futures = { version = "0.3", optional = true } @@ -22,5 +22,5 @@ futures = { version = "0.3", optional = true } aio = ["futures", "redis/aio"] [dev-dependencies] -redis = { version = "0.26.0", path = "../redis", features = ["aio", "tokio-comp"] } +redis = { version = "0.27.0", path = "../redis", features = ["aio", "tokio-comp"] } tokio = { version = "1", features = ["rt", "macros", "rt-multi-thread", "time"] } diff --git a/redis/CHANGELOG.md b/redis/CHANGELOG.md index ecc0b6119..71f5df501 100644 --- a/redis/CHANGELOG.md +++ b/redis/CHANGELOG.md @@ -1,12 +1,33 @@ +### 0.27.0 (2024-09-07) + +#### Features + +* **Breaking change**: Abort backing task to multiplexed connection on drop ([#1264](https://github.com/redis-rs/redis-rs/pull/1264)) +* Add r2d2 support for SentinelClient ([#1297](https://github.com/redis-rs/redis-rs/pull/1297) @smf8) +* Xinfo groups lag and entries-read support ([#837](https://github.com/redis-rs/redis-rs/pull/837) @massimiliano-mantione) +* Improve cluster documentation. [#1263](https://github.com/redis-rs/redis-rs/pull/1263) +* Allow splitting async PubSub to Sink & Stream. [#1144](https://github.com/redis-rs/redis-rs/pull/1144) +* Default for ConnectionManagerConfig ([#1308](https://github.com/redis-rs/redis-rs/pull/1308) @feelingsonice) + +#### Changes & Bug fixes + +* Fix new lints [#1310](https://github.com/redis-rs/redis-rs/pull/1310) +* Use pipelines to setup connections [#1250](https://github.com/redis-rs/redis-rs/pull/1250) +* Bump MSRV to 1.70 [#1286](https://github.com/redis-rs/redis-rs/pull/1286) + +### 0.26.1 (2024-08-02) + +* bug: Exported configured-out item ([#1273](https://github.com/redis-rs/redis-rs/pull/1273) @EmilyMatt) + ### 0.26.0 (2024-07-26) #### Features * **Breaking change**: Add RESP3 support ([#1058](https://github.com/redis-rs/redis-rs/pull/1058) @altanozlu) -* **Breaking change**: Expose Errors in `Value` [1093](https://github.com/redis-rs/redis-rs/pull/1093) +* **Breaking change**: Expose Errors in `Value` [#1093](https://github.com/redis-rs/redis-rs/pull/1093) * Add max retry delay for every reconnect ([#1194](https://github.com/redis-rs/redis-rs/pull/1194) tonynguyen-sotatek) * Add support for routing by node address. [#1062](https://github.com/redis-rs/redis-rs/pull/1062) -* **Breaking change**: Deprecate function that erroneously use tokio in its name. [1087](https://github.com/redis-rs/redis-rs/pull/1087) +* **Breaking change**: Deprecate function that erroneously use tokio in its name. [1087](https://github.com/redis-rs/redis-rs/pull/1087) * **Breaking change**: Change is_single_arg to num_of_args in ToRedisArgs trait ([1238](https://github.com/redis-rs/redis-rs/pull/1238) @git-hulk) * feat: add implementation of `ToRedisArgs`,`FromRedisValue` traits for `Arc`,`Box`,`Rc` ([1088](https://github.com/redis-rs/redis-rs/pull/1088) @xoac) * MultiplexedConnection: Relax type requirements for pubsub functions. [1129](https://github.com/redis-rs/redis-rs/pull/1129) diff --git a/redis/Cargo.toml b/redis/Cargo.toml index 179b5a3b9..484a83cff 100644 --- a/redis/Cargo.toml +++ b/redis/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "redis" -version = "0.26.0" +version = "0.27.0" keywords = ["redis", "database"] description = "Redis driver for Rust." homepage = "https://github.com/redis-rs/redis-rs" @@ -8,7 +8,7 @@ repository = "https://github.com/redis-rs/redis-rs" documentation = "https://docs.rs/redis" license = "BSD-3-Clause" edition = "2021" -rust-version = "1.65" +rust-version = "1.70" readme = "../README.md" [package.metadata.docs.rs] @@ -57,7 +57,7 @@ crc16 = { version = "0.4", optional = true } rand = { version = "0.8", optional = true } # Only needed for async_std support async-std = { version = "1.8.0", optional = true } -async-trait = { version = "0.1.80", optional = true } +async-trait = { version = "0.1.81", optional = true } # Only needed for native tls native-tls = { version = "0.2", optional = true } @@ -74,12 +74,12 @@ rustls-pemfile = { version = "2", optional = true } rustls-pki-types = { version = "1", optional = true } # Only needed for RedisJSON Support -serde = { version = "1.0.203", optional = true } -serde_json = { version = "1.0.119", optional = true } +serde = { version = "1.0.209", optional = true } +serde_json = { version = "1.0.127", optional = true } # Only needed for bignum Support -rust_decimal = { version = "1.35.0", optional = true } -bigdecimal = { version = "0.4.3", optional = true } +rust_decimal = { version = "1.36.0", optional = true } +bigdecimal = { version = "0.4.5", optional = true } num-bigint = "0.4.6" # Optional aHash support @@ -88,7 +88,7 @@ ahash = { version = "0.8.11", optional = true } log = { version = "0.4", optional = true } # Optional uuid support -uuid = { version = "1.9.1", optional = true } +uuid = { version = "1.10.0", optional = true } [features] default = ["acl", "streams", "geospatial", "script", "keep-alive"] diff --git a/redis/src/aio/async_std.rs b/redis/src/aio/async_std.rs index 19c54d3b3..ce4994002 100644 --- a/redis/src/aio/async_std.rs +++ b/redis/src/aio/async_std.rs @@ -21,6 +21,7 @@ use crate::connection::create_rustls_config; #[cfg(feature = "tls-rustls")] use futures_rustls::{client::TlsStream, TlsConnector}; +use super::TaskHandle; use async_std::net::TcpStream; #[cfg(unix)] use async_std::os::unix::net::UnixStream; @@ -250,8 +251,8 @@ impl RedisRuntime for AsyncStd { .map(|con| Self::Unix(AsyncStdWrapped::new(con)))?) } - fn spawn(f: impl Future + Send + 'static) { - async_std::task::spawn(f); + fn spawn(f: impl Future + Send + 'static) -> TaskHandle { + TaskHandle::AsyncStd(async_std::task::spawn(f)) } fn boxed(self) -> Pin> { diff --git a/redis/src/aio/connection_manager.rs b/redis/src/aio/connection_manager.rs index 06864938f..19f9f2669 100644 --- a/redis/src/aio/connection_manager.rs +++ b/redis/src/aio/connection_manager.rs @@ -18,7 +18,7 @@ use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; /// ConnectionManager is the configuration for reconnect mechanism and request timing -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct ConnectionManagerConfig { /// The resulting duration is calculated by taking the base to the `n`-th power, /// where `n` denotes the number of past attempts. @@ -48,15 +48,7 @@ impl ConnectionManagerConfig { /// Creates a new instance of the options with nothing set pub fn new() -> Self { - Self { - exponent_base: Self::DEFAULT_CONNECTION_RETRY_EXPONENT_BASE, - factor: Self::DEFAULT_CONNECTION_RETRY_FACTOR, - number_of_retries: Self::DEFAULT_NUMBER_OF_CONNECTION_RETRIES, - max_delay: None, - response_timeout: Self::DEFAULT_RESPONSE_TIMEOUT, - connection_timeout: Self::DEFAULT_CONNECTION_TIMEOUT, - push_sender: None, - } + Self::default() } /// A multiplicative factor that will be applied to the retry delay. @@ -110,6 +102,21 @@ impl ConnectionManagerConfig { self } } + +impl Default for ConnectionManagerConfig { + fn default() -> Self { + Self { + exponent_base: Self::DEFAULT_CONNECTION_RETRY_EXPONENT_BASE, + factor: Self::DEFAULT_CONNECTION_RETRY_FACTOR, + number_of_retries: Self::DEFAULT_NUMBER_OF_CONNECTION_RETRIES, + max_delay: None, + response_timeout: Self::DEFAULT_RESPONSE_TIMEOUT, + connection_timeout: Self::DEFAULT_CONNECTION_TIMEOUT, + push_sender: None, + } + } +} + /// A `ConnectionManager` is a proxy that wraps a [multiplexed /// connection][multiplexed-connection] and automatically reconnects to the /// server when necessary. diff --git a/redis/src/aio/mod.rs b/redis/src/aio/mod.rs index bb2083f06..c504c7372 100644 --- a/redis/src/aio/mod.rs +++ b/redis/src/aio/mod.rs @@ -1,8 +1,10 @@ //! Adds async IO support to redis. -use crate::cmd::{cmd, Cmd}; -use crate::connection::get_resp3_hello_command_error; -use crate::connection::RedisConnectionInfo; -use crate::types::{ErrorKind, ProtocolVersion, RedisFuture, RedisResult, Value}; +use crate::cmd::Cmd; +use crate::connection::{ + check_connection_setup, connection_setup_pipeline, AuthResult, ConnectionSetupComponents, + RedisConnectionInfo, +}; +use crate::types::{RedisFuture, RedisResult, Value}; use ::tokio::io::{AsyncRead, AsyncWrite}; use async_trait::async_trait; use futures_util::Future; @@ -27,6 +29,9 @@ use crate::connection::TlsConnParams; #[cfg_attr(docsrs, doc(cfg(feature = "tokio-comp")))] pub mod tokio; +mod pubsub; +pub use pubsub::PubSub; + /// Represents the ability of connecting via TCP or via Unix socket #[async_trait] pub(crate) trait RedisRuntime: AsyncStream + Send + Sync + Sized + 'static { @@ -46,7 +51,7 @@ pub(crate) trait RedisRuntime: AsyncStream + Send + Sync + Sized + 'static { #[cfg(unix)] async fn connect_unix(path: &Path) -> RedisResult; - fn spawn(f: impl Future + Send + 'static); + fn spawn(f: impl Future + Send + 'static) -> TaskHandle; fn boxed(self) -> Pin> { Box::pin(self) @@ -85,73 +90,29 @@ pub trait ConnectionLike { fn get_db(&self) -> i64; } -// Initial setup for every connection. -async fn setup_connection(connection_info: &RedisConnectionInfo, con: &mut C) -> RedisResult<()> -where - C: ConnectionLike, -{ - if connection_info.protocol != ProtocolVersion::RESP2 { - let hello_cmd = resp3_hello(connection_info); - let val: RedisResult = hello_cmd.query_async(con).await; - if let Err(err) = val { - return Err(get_resp3_hello_command_error(err)); - } - } else if let Some(password) = &connection_info.password { - let mut command = cmd("AUTH"); - if let Some(username) = &connection_info.username { - command.arg(username); - } - match command.arg(password).query_async(con).await { - Ok(Value::Okay) => (), - Err(e) => { - let err_msg = e.detail().ok_or(( - ErrorKind::AuthenticationFailed, - "Password authentication failed", - ))?; - - if !err_msg.contains("wrong number of arguments for 'auth' command") { - fail!(( - ErrorKind::AuthenticationFailed, - "Password authentication failed", - )); - } - - let mut command = cmd("AUTH"); - match command.arg(password).query_async(con).await { - Ok(Value::Okay) => (), - _ => { - fail!(( - ErrorKind::AuthenticationFailed, - "Password authentication failed" - )); - } - } - } - _ => { - fail!(( - ErrorKind::AuthenticationFailed, - "Password authentication failed" - )); - } - } +async fn execute_connection_pipeline( + rv: &mut impl ConnectionLike, + (pipeline, instructions): (crate::Pipeline, ConnectionSetupComponents), +) -> RedisResult { + if pipeline.len() == 0 { + return Ok(AuthResult::Succeeded); } - if connection_info.db != 0 { - match cmd("SELECT").arg(connection_info.db).query_async(con).await { - Ok(Value::Okay) => (), - _ => fail!(( - ErrorKind::ResponseError, - "Redis server refused to switch database" - )), - } - } + let results = rv.req_packed_commands(&pipeline, 0, pipeline.len()).await?; + + check_connection_setup(results, instructions) +} - // result is ignored, as per the command's instructions. - // https://redis.io/commands/client-setinfo/ - #[cfg(not(feature = "disable-client-setinfo"))] - let _: RedisResult<()> = crate::connection::client_set_info_pipeline() - .query_async(con) - .await; +// Initial setup for every connection. +async fn setup_connection( + connection_info: &RedisConnectionInfo, + con: &mut impl ConnectionLike, +) -> RedisResult<()> { + if execute_connection_pipeline(con, connection_setup_pipeline(connection_info, true)).await? + == AuthResult::ShouldRetryWithoutUsername + { + execute_connection_pipeline(con, connection_setup_pipeline(connection_info, false)).await?; + } Ok(()) } @@ -166,7 +127,6 @@ mod connection_manager; #[cfg_attr(docsrs, doc(cfg(feature = "connection-manager")))] pub use connection_manager::*; mod runtime; -use crate::commands::resp3_hello; pub(super) use runtime::*; macro_rules! check_resp3 { diff --git a/redis/src/aio/multiplexed_connection.rs b/redis/src/aio/multiplexed_connection.rs index 48585f879..1341e750d 100644 --- a/redis/src/aio/multiplexed_connection.rs +++ b/redis/src/aio/multiplexed_connection.rs @@ -1,10 +1,14 @@ -use super::{ConnectionLike, Runtime}; +use super::{ConnectionLike, Runtime, SharedHandleContainer, TaskHandle}; use crate::aio::{check_resp3, setup_connection}; use crate::cmd::Cmd; #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] use crate::parser::ValueCodec; -use crate::types::{AsyncPushSender, RedisError, RedisFuture, RedisResult, Value}; -use crate::{cmd, AsyncConnectionConfig, ConnectionInfo, ProtocolVersion, PushInfo, ToRedisArgs}; +use crate::types::{ + closed_connection_error, AsyncPushSender, RedisError, RedisFuture, RedisResult, Value, +}; +use crate::{ + cmd, AsyncConnectionConfig, ProtocolVersion, PushInfo, RedisConnectionInfo, ToRedisArgs, +}; use ::tokio::{ io::{AsyncRead, AsyncWrite}, sync::{mpsc, oneshot}, @@ -19,7 +23,6 @@ use pin_project_lite::pin_project; use std::collections::VecDeque; use std::fmt; use std::fmt::Debug; -use std::io; use std::pin::Pin; use std::task::{self, Poll}; use std::time::Duration; @@ -402,16 +405,20 @@ impl Pipeline { /// A side-effect of this is that the underlying connection won't be closed until all sent requests have been answered, /// which means that in case of blocking commands, the underlying connection resource might not be released, /// even when all clones of the multiplexed connection have been dropped (see ). -/// If that is an issue, the user can, instead of using [crate::Client::get_multiplexed_async_connection], use either [MultiplexedConnection::new] or -/// [crate::Client::create_multiplexed_tokio_connection]/[crate::Client::create_multiplexed_async_std_connection], -/// manually spawn the returned driver function, keep the spawned task's handle and abort the task whenever they want, -/// at the cost of effectively closing the clones of the multiplexed connection. +/// This isn't an issue in a connection that was created in a canonical way, which ensures that `_task_handle` is set, so that +/// once all of the connection's clones are dropped, the task will also be dropped. If the user creates the connection in +/// another way and `_task_handle` isn't set, they should manually spawn the returned driver function, keep the spawned task's +/// handle and abort the task whenever they want, at the risk of effectively closing the clones of the multiplexed connection. #[derive(Clone)] pub struct MultiplexedConnection { pipeline: Pipeline, db: i64, response_timeout: Option, protocol: ProtocolVersion, + // This handle ensures that once all the clones of the connection will be dropped, the underlying task will stop. + // This handle is only set for connection whose task was spawned by the crate, not for users who spawned their own + // task. + _task_handle: Option, } impl Debug for MultiplexedConnection { @@ -425,9 +432,9 @@ impl Debug for MultiplexedConnection { impl MultiplexedConnection { /// Constructs a new `MultiplexedConnection` out of a `AsyncRead + AsyncWrite` object - /// and a `ConnectionInfo` + /// and a `RedisConnectionInfo` pub async fn new( - connection_info: &ConnectionInfo, + connection_info: &RedisConnectionInfo, stream: C, ) -> RedisResult<(Self, impl Future)> where @@ -437,9 +444,9 @@ impl MultiplexedConnection { } /// Constructs a new `MultiplexedConnection` out of a `AsyncRead + AsyncWrite` object - /// and a `ConnectionInfo`. The new object will wait on operations for the given `response_timeout`. + /// and a `RedisConnectionInfo`. The new object will wait on operations for the given `response_timeout`. pub async fn new_with_response_timeout( - connection_info: &ConnectionInfo, + connection_info: &RedisConnectionInfo, stream: C, response_timeout: Option, ) -> RedisResult<(Self, impl Future)> @@ -458,41 +465,36 @@ impl MultiplexedConnection { .await } - pub(crate) async fn new_with_config( - connection_info: &ConnectionInfo, + /// Constructs a new `MultiplexedConnection` out of a `AsyncRead + AsyncWrite` object + /// , a `RedisConnectionInfo` and a `AsyncConnectionConfig`. + pub async fn new_with_config( + connection_info: &RedisConnectionInfo, stream: C, config: AsyncConnectionConfig, ) -> RedisResult<(Self, impl Future)> where C: Unpin + AsyncRead + AsyncWrite + Send + 'static, { - fn boxed( - f: impl Future + Send + 'static, - ) -> Pin + Send>> { - Box::pin(f) - } - #[cfg(all(not(feature = "tokio-comp"), not(feature = "async-std-comp")))] compile_error!("tokio-comp or async-std-comp features required for aio feature"); - let redis_connection_info = &connection_info.redis; let codec = ValueCodec::default().framed(stream); if config.push_sender.is_some() { check_resp3!( - redis_connection_info.protocol, + connection_info.protocol, "Can only pass push sender to a connection using RESP3" ); } let (pipeline, driver) = Pipeline::new(codec, config.push_sender); - let driver = boxed(driver); let mut con = MultiplexedConnection { pipeline, - db: connection_info.redis.db, + db: connection_info.db, response_timeout: config.response_timeout, - protocol: redis_connection_info.protocol, + protocol: connection_info.protocol, + _task_handle: None, }; let driver = { - let auth = setup_connection(&connection_info.redis, &mut con); + let auth = setup_connection(connection_info, &mut con); futures_util::pin_mut!(auth); @@ -512,6 +514,12 @@ impl MultiplexedConnection { Ok((con, driver)) } + /// This should be called strictly before the multiplexed connection is cloned - that is, before it is returned to the user. + /// Otherwise some clones will be able to kill the backing task, while other clones are still alive. + pub(crate) fn set_task_handle(&mut self, handle: TaskHandle) { + self._task_handle = Some(SharedHandleContainer::new(handle)); + } + /// Sets the time that the multiplexer will wait for responses on operations before failing. pub fn set_response_timeout(&mut self, timeout: std::time::Duration) { self.response_timeout = Some(timeout); @@ -523,9 +531,7 @@ impl MultiplexedConnection { self.pipeline .send_single(cmd.get_packed_command(), self.response_timeout) .await - .map_err(|err| { - err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe))) - }) + .map_err(|err| err.unwrap_or_else(closed_connection_error)) } /// Sends multiple already encoded (packed) command into the TCP socket @@ -545,9 +551,7 @@ impl MultiplexedConnection { self.response_timeout, ) .await - .map_err(|err| { - err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe))) - }); + .map_err(|err| err.unwrap_or_else(closed_connection_error)); let value = result?; match value { diff --git a/redis/src/aio/pubsub.rs b/redis/src/aio/pubsub.rs new file mode 100644 index 000000000..279e05c55 --- /dev/null +++ b/redis/src/aio/pubsub.rs @@ -0,0 +1,434 @@ +use crate::aio::Runtime; +use crate::connection::{ + check_connection_setup, connection_setup_pipeline, AuthResult, ConnectionSetupComponents, +}; +#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] +use crate::parser::ValueCodec; +use crate::types::{closed_connection_error, RedisError, RedisResult, Value}; +use crate::{cmd, Msg, RedisConnectionInfo, ToRedisArgs}; +use ::tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::oneshot, +}; +use futures_util::{ + future::{Future, FutureExt}, + ready, + sink::{Sink, SinkExt}, + stream::{self, Stream, StreamExt}, +}; +use pin_project_lite::pin_project; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::{self, Poll}; +use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::mpsc::UnboundedSender; +#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] +use tokio_util::codec::Decoder; + +use super::SharedHandleContainer; + +// A signal that a un/subscribe request has completed. +type RequestCompletedSignal = oneshot::Sender>; + +// A single message sent through the pipeline +struct PipelineMessage { + input: Vec, + output: RequestCompletedSignal, +} + +/// A sender to an async task passing the messages to a `Stream + Sink`. +pub struct PubSubSink { + sender: UnboundedSender, +} + +pin_project! { + pub struct PubSubStream { + #[pin] + receiver: tokio::sync::mpsc::UnboundedReceiver, + // This handle ensures that once the stream will be dropped, the underlying task will stop. + _task_handle: Option, + } +} + +impl Clone for PubSubSink { + fn clone(&self) -> Self { + PubSubSink { + sender: self.sender.clone(), + } + } +} + +pin_project! { + struct PipelineSink { + // The `Sink + Stream` that sends requests and receives values from the server. + #[pin] + sink_stream: T, + // The requests that were sent and are awaiting a response. + in_flight: VecDeque, + // A sender for the push messages received from the server. + sender: UnboundedSender, + } +} + +impl PipelineSink +where + T: Stream> + 'static, +{ + fn new(sink_stream: T, sender: UnboundedSender) -> Self + where + T: Sink, Error = RedisError> + Stream> + 'static, + { + PipelineSink { + sink_stream, + in_flight: VecDeque::new(), + sender, + } + } + + // Read messages from the stream and handle them. + fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { + loop { + let self_ = self.as_mut().project(); + if self_.sender.is_closed() { + return Poll::Ready(Err(())); + } + + let item = match ready!(self.as_mut().project().sink_stream.poll_next(cx)) { + Some(result) => result, + // The redis response stream is not going to produce any more items so we `Err` + // to break out of the `forward` combinator and stop handling requests + None => return Poll::Ready(Err(())), + }; + self.as_mut().handle_message(item)?; + } + } + + fn handle_message(self: Pin<&mut Self>, result: RedisResult) -> Result<(), ()> { + let self_ = self.project(); + + match result { + Ok(Value::Array(value)) => { + if let Some(Value::BulkString(kind)) = value.first() { + if matches!( + kind.as_slice(), + b"subscribe" | b"psubscribe" | b"unsubscribe" | b"punsubscribe" + ) { + if let Some(entry) = self_.in_flight.pop_front() { + let _ = entry.send(Ok(())); + }; + return Ok(()); + } + } + + if let Some(msg) = Msg::from_owned_value(Value::Array(value)) { + let _ = self_.sender.send(msg); + Ok(()) + } else { + Err(()) + } + } + + Ok(Value::Push { kind, data }) => { + if kind.has_reply() { + if let Some(entry) = self_.in_flight.pop_front() { + let _ = entry.send(Ok(())); + }; + return Ok(()); + } + + if let Some(msg) = Msg::from_push_info(crate::PushInfo { kind, data }) { + let _ = self_.sender.send(msg); + Ok(()) + } else { + Err(()) + } + } + + Err(err) if err.is_unrecoverable_error() => Err(()), + + _ => { + if let Some(entry) = self_.in_flight.pop_front() { + let _ = entry.send(result.map(|_| ())); + Ok(()) + } else { + Err(()) + } + } + } + } +} + +impl Sink for PipelineSink +where + T: Sink, Error = RedisError> + Stream> + 'static, +{ + type Error = (); + + // Retrieve incoming messages and write them to the sink + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut task::Context, + ) -> Poll> { + self.as_mut() + .project() + .sink_stream + .poll_ready(cx) + .map_err(|_| ()) + } + + fn start_send( + mut self: Pin<&mut Self>, + PipelineMessage { input, output }: PipelineMessage, + ) -> Result<(), Self::Error> { + let self_ = self.as_mut().project(); + + match self_.sink_stream.start_send(input) { + Ok(()) => { + self_.in_flight.push_back(output); + Ok(()) + } + Err(err) => { + let _ = output.send(Err(err)); + Err(()) + } + } + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut task::Context, + ) -> Poll> { + ready!(self + .as_mut() + .project() + .sink_stream + .poll_flush(cx) + .map_err(|err| { + let _ = self.as_mut().handle_message(Err(err)); + }))?; + self.poll_read(cx) + } + + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut task::Context, + ) -> Poll> { + // No new requests will come in after the first call to `close` but we need to complete any + // in progress requests before closing + if !self.in_flight.is_empty() { + ready!(self.as_mut().poll_flush(cx))?; + } + let this = self.as_mut().project(); + + if this.sender.is_closed() { + return Poll::Ready(Ok(())); + } + + match ready!(this.sink_stream.poll_next(cx)) { + Some(result) => { + let _ = self.handle_message(result); + Poll::Pending + } + None => Poll::Ready(Ok(())), + } + } +} + +impl PubSubSink { + fn new( + sink_stream: T, + messages_sender: UnboundedSender, + ) -> (Self, impl Future) + where + T: Sink, Error = RedisError> + Stream> + Send + 'static, + T::Item: Send, + T::Error: Send, + T::Error: ::std::fmt::Debug, + { + let (sender, mut receiver) = unbounded_channel(); + let sink = PipelineSink::new(sink_stream, messages_sender); + let f = stream::poll_fn(move |cx| receiver.poll_recv(cx)) + .map(Ok) + .forward(sink) + .map(|_| ()); + (PubSubSink { sender }, f) + } + + async fn send_recv(&mut self, input: Vec) -> Result<(), RedisError> { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(PipelineMessage { + input, + output: sender, + }) + .map_err(|_| closed_connection_error())?; + match receiver.await { + Ok(result) => result, + Err(_) => Err(closed_connection_error()), + } + } + + /// Subscribes to a new channel. + pub async fn subscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> { + let mut cmd = cmd("SUBSCRIBE"); + cmd.arg(channel_name); + self.send_recv(cmd.get_packed_command()).await + } + + /// Unsubscribes from channel. + pub async fn unsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> { + let mut cmd = cmd("UNSUBSCRIBE"); + cmd.arg(channel_name); + self.send_recv(cmd.get_packed_command()).await + } + + /// Subscribes to a new channel with pattern. + pub async fn psubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> { + let mut cmd = cmd("PSUBSCRIBE"); + cmd.arg(channel_pattern); + self.send_recv(cmd.get_packed_command()).await + } + + /// Unsubscribes from channel pattern. + pub async fn punsubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> { + let mut cmd = cmd("PUNSUBSCRIBE"); + cmd.arg(channel_pattern); + self.send_recv(cmd.get_packed_command()).await + } +} + +/// A connection dedicated to pubsub messages. +pub struct PubSub { + sink: PubSubSink, + stream: PubSubStream, +} + +async fn execute_connection_pipeline( + codec: &mut T, + (pipeline, instructions): (crate::Pipeline, ConnectionSetupComponents), +) -> RedisResult +where + T: Sink, Error = RedisError> + Stream> + 'static, + T: Send + 'static, + T::Item: Send, + T::Error: Send, + T::Error: ::std::fmt::Debug, + T: Unpin, +{ + let count = pipeline.len(); + if count == 0 { + return Ok(AuthResult::Succeeded); + } + codec.send(pipeline.get_packed_pipeline()).await?; + + let mut results = Vec::with_capacity(count); + for _ in 0..count { + let value = codec.next().await; + match value { + Some(Ok(val)) => results.push(val), + _ => return Err(closed_connection_error()), + } + } + + check_connection_setup(results, instructions) +} + +async fn setup_connection( + codec: &mut T, + connection_info: &RedisConnectionInfo, +) -> RedisResult<()> +where + T: Sink, Error = RedisError> + Stream> + 'static, + T: Send + 'static, + T::Item: Send, + T::Error: Send, + T::Error: ::std::fmt::Debug, + T: Unpin, +{ + if execute_connection_pipeline(codec, connection_setup_pipeline(connection_info, true)).await? + == AuthResult::ShouldRetryWithoutUsername + { + execute_connection_pipeline(codec, connection_setup_pipeline(connection_info, false)) + .await?; + } + + Ok(()) +} + +impl PubSub { + /// Constructs a new `MultiplexedConnection` out of a `AsyncRead + AsyncWrite` object + /// and a `ConnectionInfo` + pub async fn new(connection_info: &RedisConnectionInfo, stream: C) -> RedisResult + where + C: Unpin + AsyncRead + AsyncWrite + Send + 'static, + { + #[cfg(all(not(feature = "tokio-comp"), not(feature = "async-std-comp")))] + compile_error!("tokio-comp or async-std-comp features required for aio feature"); + + let mut codec = ValueCodec::default().framed(stream); + setup_connection(&mut codec, connection_info).await?; + let (sender, receiver) = unbounded_channel(); + let (sink, driver) = PubSubSink::new(codec, sender); + let handle = Runtime::locate().spawn(driver); + let _task_handle = Some(SharedHandleContainer::new(handle)); + let stream = PubSubStream { + receiver, + _task_handle, + }; + let con = PubSub { sink, stream }; + Ok(con) + } + + /// Subscribes to a new channel. + pub async fn subscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> { + self.sink.subscribe(channel_name).await + } + + /// Unsubscribes from channel. + pub async fn unsubscribe(&mut self, channel_name: impl ToRedisArgs) -> RedisResult<()> { + self.sink.unsubscribe(channel_name).await + } + + /// Subscribes to a new channel with pattern. + pub async fn psubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> { + self.sink.psubscribe(channel_pattern).await + } + + /// Unsubscribes from channel pattern. + pub async fn punsubscribe(&mut self, channel_pattern: impl ToRedisArgs) -> RedisResult<()> { + self.sink.punsubscribe(channel_pattern).await + } + + /// Returns [`Stream`] of [`Msg`]s from this [`PubSub`]s subscriptions. + /// + /// The message itself is still generic and can be converted into an appropriate type through + /// the helper methods on it. + pub fn on_message(&mut self) -> impl Stream + '_ { + &mut self.stream + } + + /// Returns [`Stream`] of [`Msg`]s from this [`PubSub`]s subscriptions consuming it. + /// + /// The message itself is still generic and can be converted into an appropriate type through + /// the helper methods on it. + /// This can be useful in cases where the stream needs to be returned or held by something other + /// than the [`PubSub`]. + pub fn into_on_message(self) -> PubSubStream { + self.stream + } + + /// Splits the PubSub into separate sink and stream components, so that subscriptions could be + /// updated through the `Sink` while concurrently waiting for new messages on the `Stream`. + pub fn split(self) -> (PubSubSink, PubSubStream) { + (self.sink, self.stream) + } +} + +impl Stream for PubSubStream { + type Item = Msg; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + self.project().receiver.poll_recv(cx) + } +} diff --git a/redis/src/aio/runtime.rs b/redis/src/aio/runtime.rs index 5755f62c9..f5f9a34e1 100644 --- a/redis/src/aio/runtime.rs +++ b/redis/src/aio/runtime.rs @@ -1,11 +1,11 @@ -use std::{io, time::Duration}; +use std::{io, sync::Arc, time::Duration}; use futures_util::Future; #[cfg(feature = "async-std-comp")] -use super::async_std; +use super::async_std as crate_async_std; #[cfg(feature = "tokio-comp")] -use super::tokio; +use super::tokio as crate_tokio; use super::RedisRuntime; use crate::types::RedisError; @@ -17,6 +17,47 @@ pub(crate) enum Runtime { AsyncStd, } +pub(crate) enum TaskHandle { + #[cfg(feature = "tokio-comp")] + Tokio(tokio::task::JoinHandle<()>), + #[cfg(feature = "async-std-comp")] + AsyncStd(async_std::task::JoinHandle<()>), +} + +pub(crate) struct HandleContainer(Option); + +impl HandleContainer { + pub(crate) fn new(handle: TaskHandle) -> Self { + Self(Some(handle)) + } +} + +impl Drop for HandleContainer { + fn drop(&mut self) { + match self.0.take() { + None => {} + #[cfg(feature = "tokio-comp")] + Some(TaskHandle::Tokio(handle)) => handle.abort(), + #[cfg(feature = "async-std-comp")] + Some(TaskHandle::AsyncStd(handle)) => { + // schedule for cancellation without waiting for result. + Runtime::locate().spawn(async move { handle.cancel().await.unwrap_or_default() }); + } + } + } +} + +#[derive(Clone)] +// we allow dead code here because the container isn't used directly, only in the derived drop. +#[allow(dead_code)] +pub(crate) struct SharedHandleContainer(Arc); + +impl SharedHandleContainer { + pub(crate) fn new(handle: TaskHandle) -> Self { + Self(Arc::new(HandleContainer::new(handle))) + } +} + impl Runtime { pub(crate) fn locate() -> Self { #[cfg(all(feature = "tokio-comp", not(feature = "async-std-comp")))] @@ -45,12 +86,12 @@ impl Runtime { } #[allow(dead_code)] - pub(super) fn spawn(&self, f: impl Future + Send + 'static) { + pub(crate) fn spawn(&self, f: impl Future + Send + 'static) -> TaskHandle { match self { #[cfg(feature = "tokio-comp")] - Runtime::Tokio => tokio::Tokio::spawn(f), + Runtime::Tokio => crate_tokio::Tokio::spawn(f), #[cfg(feature = "async-std-comp")] - Runtime::AsyncStd => async_std::AsyncStd::spawn(f), + Runtime::AsyncStd => crate_async_std::AsyncStd::spawn(f), } } @@ -61,11 +102,11 @@ impl Runtime { ) -> Result { match self { #[cfg(feature = "tokio-comp")] - Runtime::Tokio => ::tokio::time::timeout(duration, future) + Runtime::Tokio => tokio::time::timeout(duration, future) .await .map_err(|_| Elapsed(())), #[cfg(feature = "async-std-comp")] - Runtime::AsyncStd => ::async_std::future::timeout(duration, future) + Runtime::AsyncStd => async_std::future::timeout(duration, future) .await .map_err(|_| Elapsed(())), } diff --git a/redis/src/aio/tokio.rs b/redis/src/aio/tokio.rs index 73f3bf48f..56d5f74a3 100644 --- a/redis/src/aio/tokio.rs +++ b/redis/src/aio/tokio.rs @@ -1,4 +1,4 @@ -use super::{AsyncStream, RedisResult, RedisRuntime, SocketAddr}; +use super::{AsyncStream, RedisResult, RedisRuntime, SocketAddr, TaskHandle}; use async_trait::async_trait; use std::{ future::Future, @@ -174,12 +174,12 @@ impl RedisRuntime for Tokio { } #[cfg(feature = "tokio-comp")] - fn spawn(f: impl Future + Send + 'static) { - tokio::spawn(f); + fn spawn(f: impl Future + Send + 'static) -> TaskHandle { + TaskHandle::Tokio(tokio::spawn(f)) } #[cfg(not(feature = "tokio-comp"))] - fn spawn(_: impl Future + Send + 'static) { + fn spawn(_: impl Future + Send + 'static) -> TokioTaskHandle { unreachable!() } diff --git a/redis/src/client.rs b/redis/src/client.rs index 1bfcf3f43..32294cf33 100644 --- a/redis/src/client.rs +++ b/redis/src/client.rs @@ -644,10 +644,11 @@ impl Client { where T: crate::aio::RedisRuntime, { - let (connection, driver) = self + let (mut connection, driver) = self .create_multiplexed_async_connection_inner::(config) .await?; - T::spawn(driver); + let handle = T::spawn(driver); + connection.set_task_handle(handle); Ok(connection) } @@ -663,7 +664,7 @@ impl Client { { let con = self.get_simple_async_connection::().await?; crate::aio::MultiplexedConnection::new_with_config( - &self.connection_info, + &self.connection_info.redis, con, config.clone(), ) @@ -778,10 +779,21 @@ impl Client { #[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))] // TODO - do we want to type-erase pubsub using a trait, to allow us to replace it with a different implementation later? pub async fn get_async_pubsub(&self) -> RedisResult { - #[allow(deprecated)] - self.get_async_connection() - .await - .map(|connection| connection.into_pubsub()) + let connection = match Runtime::locate() { + #[cfg(feature = "tokio-comp")] + Runtime::Tokio => { + self.get_simple_async_connection::() + .await? + } + + #[cfg(feature = "async-std-comp")] + Runtime::AsyncStd => { + self.get_simple_async_connection::() + .await? + } + }; + + crate::aio::PubSub::new(&self.connection_info.redis, connection).await } /// Returns an async receiver for monitor messages. diff --git a/redis/src/cluster.rs b/redis/src/cluster.rs index 3c3e2640d..88eb57f53 100644 --- a/redis/src/cluster.rs +++ b/redis/src/cluster.rs @@ -1,7 +1,13 @@ //! This module extends the library to support Redis Cluster. //! -//! Note that this module does not currently provide pubsub -//! functionality. +//! The cluster connection is meant to abstract the fact that a cluster is composed of multiple nodes, +//! and to provide an API which is as close as possible to that of a single node connection. In order to do that, +//! the cluster connection maintains connections to each node in the Redis/ Valkey cluster, and can route +//! requests automatically to the relevant nodes. In cases that the cluster connection receives indications +//! that the cluster topology has changed, it will query nodes in order to find the current cluster topology. +//! If it disconnects from some nodes, it will automatically reconnect to those nodes. +//! +//! Note that pubsub & push sending functionality is not currently provided by this module. //! //! # Example //! ```rust,no_run diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 402a2c777..2ad400f44 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -1,10 +1,17 @@ -//! This module provides async functionality for Redis Cluster. +//! This module provides async functionality for connecting to Redis / Valkey Clusters. +//! +//! The cluster connection is meant to abstract the fact that a cluster is composed of multiple nodes, +//! and to provide an API which is as close as possible to that of a single node connection. In order to do that, +//! the cluster connection maintains connections to each node in the Redis/ Valkey cluster, and can route +//! requests automatically to the relevant nodes. In cases that the cluster connection receives indications +//! that the cluster topology has changed, it will query nodes in order to find the current cluster topology. +//! If it disconnects from some nodes, it will automatically reconnect to those nodes. +//! +//! Note that pubsub & push sending functionality is not currently provided by this module. //! //! By default, [`ClusterConnection`] makes use of [`MultiplexedConnection`] and maintains a pool //! of connections to each node in the cluster. //! -//! Also note that pubsub functionality is not currently provided by this module. -//! //! # Example //! ```rust,no_run //! use redis::cluster::ClusterClient; @@ -72,7 +79,7 @@ use std::{ mod request; mod routing; use crate::{ - aio::{ConnectionLike, MultiplexedConnection}, + aio::{ConnectionLike, MultiplexedConnection, Runtime, SharedHandleContainer}, cluster::{get_connection_info, slot_cmd}, cluster_client::ClusterParams, cluster_routing::{ @@ -80,12 +87,11 @@ use crate::{ Slot, SlotMap, }, cluster_topology::parse_slots, + types::closed_connection_error, Cmd, ConnectionInfo, ErrorKind, IntoConnectionInfo, RedisError, RedisFuture, RedisResult, Value, }; -#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] -use crate::aio::{async_std::AsyncStd, RedisRuntime}; use futures::{future::BoxFuture, prelude::*, ready}; use log::{trace, warn}; use rand::{seq::IteratorRandom, thread_rng}; @@ -97,7 +103,10 @@ use tokio::sync::{mpsc, oneshot, RwLock}; /// underlying connections maintained for each node in the cluster, as well /// as common parameters for connecting to nodes and executing commands. #[derive(Clone)] -pub struct ClusterConnection(mpsc::Sender>); +pub struct ClusterConnection { + sender: mpsc::Sender>, + _task_handle: SharedHandleContainer, +} impl ClusterConnection where @@ -110,19 +119,19 @@ where ClusterConnInner::new(initial_nodes, cluster_params) .await .map(|inner| { - let (tx, mut rx) = mpsc::channel::>(100); + let (sender, mut receiver) = mpsc::channel::>(100); let stream = async move { - let _ = stream::poll_fn(move |cx| rx.poll_recv(cx)) + let _ = stream::poll_fn(move |cx| receiver.poll_recv(cx)) .map(Ok) .forward(inner) .await; }; - #[cfg(feature = "tokio-comp")] - tokio::spawn(stream); - #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] - AsyncStd::spawn(stream); + let _task_handle = SharedHandleContainer::new(Runtime::locate().spawn(stream)); - ClusterConnection(tx) + ClusterConnection { + sender, + _task_handle, + } }) } @@ -130,7 +139,7 @@ where pub async fn route_command(&mut self, cmd: &Cmd, routing: RoutingInfo) -> RedisResult { trace!("send_packed_command"); let (sender, receiver) = oneshot::channel(); - self.0 + self.sender .send(Message { cmd: CmdArg::Cmd { cmd: Arc::new(cmd.clone()), // TODO Remove this clone? @@ -168,7 +177,7 @@ where route: SingleNodeRoutingInfo, ) -> RedisResult> { let (sender, receiver) = oneshot::channel(); - self.0 + self.sender .send(Message { cmd: CmdArg::Pipeline { pipeline: Arc::new(pipeline.clone()), // TODO Remove this clone? @@ -179,11 +188,11 @@ where sender, }) .await - .map_err(|_| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))?; + .map_err(|_| closed_connection_error())?; receiver .await - .unwrap_or_else(|_| Err(RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))) + .unwrap_or_else(|_| Err(closed_connection_error())) .map(|response| match response { Response::Multiple(values) => values, Response::Single(_) => unreachable!(), @@ -194,6 +203,9 @@ where type ConnectionFuture = future::Shared>; type ConnectionMap = HashMap>; +/// This is the internal representation of an async Redis Cluster connection. It stores the +/// underlying connections maintained for each node in the cluster, as well +/// as common parameters for connecting to nodes and executing commands. struct InnerCore { conn_lock: RwLock<(ConnectionMap, SlotMap)>, cluster_params: ClusterParams, @@ -203,6 +215,9 @@ struct InnerCore { type Core = Arc>; +/// This is the sink for requests sent by the user. +/// It holds the stream of requests which are "in flight", E.G. on their way to the server, +/// and the inner representation of the connection. struct ClusterConnInner { inner: Core, state: ConnectionState, diff --git a/redis/src/commands/mod.rs b/redis/src/commands/mod.rs index d0c8244f4..e8e8e7a3b 100644 --- a/redis/src/commands/mod.rs +++ b/redis/src/commands/mod.rs @@ -2487,15 +2487,13 @@ impl ToRedisArgs for SetOptions { pub fn resp3_hello(connection_info: &RedisConnectionInfo) -> Cmd { let mut hello_cmd = cmd("HELLO"); hello_cmd.arg("3"); - if connection_info.password.is_some() { + if let Some(password) = &connection_info.password { let username: &str = match connection_info.username.as_ref() { None => "default", Some(username) => username, }; - hello_cmd - .arg("AUTH") - .arg(username) - .arg(connection_info.password.as_ref().unwrap()); + hello_cmd.arg("AUTH").arg(username).arg(password); } + hello_cmd } diff --git a/redis/src/connection.rs b/redis/src/connection.rs index dfdedf270..15180e5d5 100644 --- a/redis/src/connection.rs +++ b/redis/src/connection.rs @@ -208,7 +208,7 @@ pub struct ConnectionInfo { /// A connection address for where to connect to. pub addr: ConnectionAddr, - /// A boxed connection address for where to connect to. + /// A redis connection info for how to handshake with redis. pub redis: RedisConnectionInfo, } @@ -920,71 +920,207 @@ pub(crate) fn create_rustls_config( } } -fn connect_auth( - con: &mut Connection, +fn authenticate_cmd( connection_info: &RedisConnectionInfo, + check_username: bool, password: &str, -) -> RedisResult<()> { +) -> Cmd { let mut command = cmd("AUTH"); - if let Some(username) = &connection_info.username { - command.arg(username); - } - let err = match command.arg(password).query::(con) { - Ok(Value::Okay) => return Ok(()), - Ok(_) => { - fail!(( - ErrorKind::ResponseError, - "Redis server refused to authenticate, returns Ok() != Value::Okay" - )); + if check_username { + if let Some(username) = &connection_info.username { + command.arg(username); } - Err(e) => e, - }; - let err_msg = err.detail().ok_or(( - ErrorKind::AuthenticationFailed, - "Password authentication failed", - ))?; - if !err_msg.contains("wrong number of arguments for 'auth' command") { - fail!(( - ErrorKind::AuthenticationFailed, - "Password authentication failed", - )); - } - - // fallback to AUTH version <= 5 - let mut command = cmd("AUTH"); - match command.arg(password).query::(con) { - Ok(Value::Okay) => Ok(()), - _ => fail!(( - ErrorKind::AuthenticationFailed, - "Password authentication failed", - )), } + command.arg(password); + command } pub fn connect( connection_info: &ConnectionInfo, timeout: Option, ) -> RedisResult { - let con = ActualConnection::new(&connection_info.addr, timeout)?; + let con: ActualConnection = ActualConnection::new(&connection_info.addr, timeout)?; setup_connection(con, &connection_info.redis) } -#[cfg(not(feature = "disable-client-setinfo"))] -pub(crate) fn client_set_info_pipeline() -> Pipeline { - let mut pipeline = crate::pipe(); +pub(crate) struct ConnectionSetupComponents { + resp3_auth_cmd_idx: Option, + resp2_auth_cmd_idx: Option, + select_cmd_idx: Option, +} + +pub(crate) fn connection_setup_pipeline( + connection_info: &RedisConnectionInfo, + check_username: bool, +) -> (crate::Pipeline, ConnectionSetupComponents) { + let mut last_cmd_index = 0; + + let mut get_next_command_index = |condition| { + if condition { + last_cmd_index += 1; + Some(last_cmd_index - 1) + } else { + None + } + }; + + let authenticate_with_resp3_cmd_index = + get_next_command_index(connection_info.protocol != ProtocolVersion::RESP2); + let authenticate_with_resp2_cmd_index = get_next_command_index( + authenticate_with_resp3_cmd_index.is_none() && connection_info.password.is_some(), + ); + let select_db_cmd_index = get_next_command_index(connection_info.db != 0); + + let mut pipeline = pipe(); + + if authenticate_with_resp3_cmd_index.is_some() { + pipeline.add_command(resp3_hello(connection_info)); + } else if authenticate_with_resp2_cmd_index.is_some() { + pipeline.add_command(authenticate_cmd( + connection_info, + check_username, + connection_info.password.as_ref().unwrap(), + )); + } + + if select_db_cmd_index.is_some() { + pipeline.cmd("SELECT").arg(connection_info.db); + } + + // result is ignored, as per the command's instructions. + // https://redis.io/commands/client-setinfo/ + #[cfg(not(feature = "disable-client-setinfo"))] pipeline .cmd("CLIENT") .arg("SETINFO") .arg("LIB-NAME") .arg("redis-rs") .ignore(); + #[cfg(not(feature = "disable-client-setinfo"))] pipeline .cmd("CLIENT") .arg("SETINFO") .arg("LIB-VER") .arg(env!("CARGO_PKG_VERSION")) .ignore(); - pipeline + + ( + pipeline, + ConnectionSetupComponents { + resp3_auth_cmd_idx: authenticate_with_resp3_cmd_index, + resp2_auth_cmd_idx: authenticate_with_resp2_cmd_index, + select_cmd_idx: select_db_cmd_index, + }, + ) +} + +fn check_resp3_auth(result: &Value) -> RedisResult<()> { + if let Value::ServerError(err) = result { + return Err(get_resp3_hello_command_error(err.clone().into())); + } + Ok(()) +} + +#[derive(PartialEq)] +pub(crate) enum AuthResult { + Succeeded, + ShouldRetryWithoutUsername, +} + +fn check_resp2_auth(result: &Value) -> RedisResult { + let err = match result { + Value::Okay => { + return Ok(AuthResult::Succeeded); + } + Value::ServerError(err) => err, + _ => { + return Err(( + ErrorKind::ResponseError, + "Redis server refused to authenticate, returns Ok() != Value::Okay", + ) + .into()); + } + }; + + let err_msg = err.details().ok_or(( + ErrorKind::AuthenticationFailed, + "Password authentication failed", + ))?; + if !err_msg.contains("wrong number of arguments for 'auth' command") { + return Err(( + ErrorKind::AuthenticationFailed, + "Password authentication failed", + ) + .into()); + } + Ok(AuthResult::ShouldRetryWithoutUsername) +} + +fn check_db_select(value: &Value) -> RedisResult<()> { + let Value::ServerError(err) = value else { + return Ok(()); + }; + + match err.details() { + Some(err_msg) => Err(( + ErrorKind::ResponseError, + "Redis server refused to switch database", + err_msg.to_string(), + ) + .into()), + None => Err(( + ErrorKind::ResponseError, + "Redis server refused to switch database", + ) + .into()), + } +} + +pub(crate) fn check_connection_setup( + results: Vec, + ConnectionSetupComponents { + resp3_auth_cmd_idx, + resp2_auth_cmd_idx, + select_cmd_idx, + }: ConnectionSetupComponents, +) -> RedisResult { + // can't have both values set + assert!(!(resp2_auth_cmd_idx.is_some() && resp3_auth_cmd_idx.is_some())); + + if let Some(index) = resp3_auth_cmd_idx { + let Some(value) = results.get(index) else { + return Err((ErrorKind::ClientError, "Missing RESP3 auth response").into()); + }; + check_resp3_auth(value)?; + } else if let Some(index) = resp2_auth_cmd_idx { + let Some(value) = results.get(index) else { + return Err((ErrorKind::ClientError, "Missing RESP2 auth response").into()); + }; + if check_resp2_auth(value)? == AuthResult::ShouldRetryWithoutUsername { + return Ok(AuthResult::ShouldRetryWithoutUsername); + } + } + + if let Some(index) = select_cmd_idx { + let Some(value) = results.get(index) else { + return Err((ErrorKind::ClientError, "Missing SELECT DB response").into()); + }; + check_db_select(value)?; + } + + Ok(AuthResult::Succeeded) +} + +fn execute_connection_pipeline( + rv: &mut Connection, + (pipeline, instructions): (crate::Pipeline, ConnectionSetupComponents), +) -> RedisResult { + if pipeline.len() == 0 { + return Ok(AuthResult::Succeeded); + } + let results = rv.req_packed_commands(&pipeline.get_packed_pipeline(), 0, pipeline.len())?; + + check_connection_setup(results, instructions) } fn setup_connection( @@ -1000,33 +1136,12 @@ fn setup_connection( push_sender: None, }; - if connection_info.protocol != ProtocolVersion::RESP2 { - let hello_cmd = resp3_hello(connection_info); - let val: RedisResult = hello_cmd.query(&mut rv); - if let Err(err) = val { - return Err(get_resp3_hello_command_error(err)); - } - } else if let Some(password) = &connection_info.password { - connect_auth(&mut rv, connection_info, password)?; - } - if connection_info.db != 0 { - match cmd("SELECT") - .arg(connection_info.db) - .query::(&mut rv) - { - Ok(Value::Okay) => {} - _ => fail!(( - ErrorKind::ResponseError, - "Redis server refused to switch database" - )), - } + if execute_connection_pipeline(&mut rv, connection_setup_pipeline(connection_info, true))? + == AuthResult::ShouldRetryWithoutUsername + { + execute_connection_pipeline(&mut rv, connection_setup_pipeline(connection_info, false))?; } - // result is ignored, as per the command's instructions. - // https://redis.io/commands/client-setinfo/ - #[cfg(not(feature = "disable-client-setinfo"))] - let _: RedisResult<()> = client_set_info_pipeline().query(&mut rv); - Ok(rv) } diff --git a/redis/src/lib.rs b/redis/src/lib.rs index 6f292a72e..d25fbaec2 100644 --- a/redis/src/lib.rs +++ b/redis/src/lib.rs @@ -281,10 +281,8 @@ //! //! # PubSub //! -//! Pubsub is currently work in progress but provided through the `PubSub` -//! connection object. Due to the fact that Rust does not have support -//! for async IO in libnative yet, the API does not provide a way to -//! read messages with any form of timeout yet. +//! Pubsub is provided through the `PubSub` connection object for sync usage, or the `aio::PubSub` +//! for async usage. //! //! Example usage: //! @@ -303,6 +301,24 @@ //! } //! # } //! ``` +//! In order to update subscriptions while concurrently waiting for messages, the async PubSub can be split into separate sink & stream components. The sink can be receive subscription requests while the stream is awaited for messages. +//! +//! ```rust,no_run +//! # #[cfg(feature = "aio")] +//! # use futures_util::StreamExt; +//! # #[cfg(feature = "aio")] +//! # async fn do_something() -> redis::RedisResult<()> { +//! let client = redis::Client::open("redis://127.0.0.1/")?; +//! let (mut sink, mut stream) = client.get_async_pubsub().await?.split(); +//! sink.subscribe("channel_1").await?; +//! +//! loop { +//! let msg = stream.next().await.unwrap(); +//! let payload : String = msg.get_payload().unwrap(); +//! println!("channel '{}': {}", msg.get_channel_name(), payload); +//! } +//! # Ok(()) } +//! ``` //! //! ## RESP3 async pubsub //! If you're targeting a Redis/Valkey server of version 6 or above, you can receive diff --git a/redis/src/pipeline.rs b/redis/src/pipeline.rs index e809b1e06..08bc25154 100644 --- a/redis/src/pipeline.rs +++ b/redis/src/pipeline.rs @@ -80,6 +80,10 @@ impl Pipeline { write_pipeline(out, &self.commands, self.transaction_mode) } + pub(crate) fn len(&self) -> usize { + self.commands.len() + } + fn execute_pipelined(&self, con: &mut dyn ConnectionLike) -> RedisResult { self.make_pipeline_results(con.req_packed_commands( &encode_pipeline(&self.commands, false), diff --git a/redis/src/r2d2.rs b/redis/src/r2d2.rs index 79cc22dd4..f26b559ea 100644 --- a/redis/src/r2d2.rs +++ b/redis/src/r2d2.rs @@ -1,6 +1,6 @@ -use std::io; - +use crate::sentinel::LockedSentinelClient; use crate::{ConnectionLike, RedisError}; +use std::io; macro_rules! impl_manage_connection { ($client:ty, $connection:ty) => { @@ -34,3 +34,6 @@ impl_manage_connection!( crate::cluster::ClusterClient, crate::cluster::ClusterConnection ); + +#[cfg(feature = "sentinel")] +impl_manage_connection!(LockedSentinelClient, crate::Connection); diff --git a/redis/src/sentinel.rs b/redis/src/sentinel.rs index ade86e344..c2c3de406 100644 --- a/redis/src/sentinel.rs +++ b/redis/src/sentinel.rs @@ -103,16 +103,19 @@ //! ``` //! -use std::{collections::HashMap, num::NonZeroUsize}; - #[cfg(feature = "aio")] use futures_util::StreamExt; use rand::Rng; +#[cfg(feature = "r2d2")] +use std::sync::Mutex; +use std::{collections::HashMap, num::NonZeroUsize}; #[cfg(feature = "aio")] use crate::aio::MultiplexedConnection as AsyncConnection; +#[cfg(feature = "aio")] use crate::client::AsyncConnectionConfig; + use crate::{ connection::ConnectionInfo, types::RedisResult, Client, Cmd, Connection, ErrorKind, FromRedisValue, IntoConnectionInfo, RedisConnectionInfo, TlsMode, Value, @@ -691,6 +694,26 @@ pub enum SentinelServerType { Replica, } +/// LockedSentinelClient is a wrapper around SentinelClient since +/// it SentinelClient requires &mut ref for get_connection() +/// and we can't use it like this inside the r2d2 Manager +#[cfg(feature = "r2d2")] +pub struct LockedSentinelClient(pub(crate) Mutex); + +#[cfg(feature = "r2d2")] +impl LockedSentinelClient { + /// new creates a LockedSentinelClient by wrapping a new Mutex around the SentinelClient + pub fn new(client: SentinelClient) -> Self { + LockedSentinelClient(Mutex::new(client)) + } + + /// get_connection is the override for LockedSentinelClient to make it possible to + /// use LockedSentinelClient with r2d2 macro. + pub fn get_connection(&self) -> RedisResult { + self.0.lock().unwrap().get_connection() + } +} + /// An alternative to the Client type which creates connections from clients created /// on-demand based on information fetched from the sentinels. Uses the Sentinel type /// internally. This is basic an utility to help make it easier to use sentinels but diff --git a/redis/src/streams.rs b/redis/src/streams.rs index dd2df0b65..fd8710e28 100644 --- a/redis/src/streams.rs +++ b/redis/src/streams.rs @@ -450,6 +450,12 @@ pub struct StreamInfoGroup { pub pending: usize, /// Last ID delivered to this group. pub last_delivered_id: String, + /// The logical "read counter" of the last entry delivered to group's consumers + /// (or `None` if the server does not provide the value). + pub entries_read: Option, + /// The number of entries in the stream that are still waiting to be delivered to the + /// group's consumers, or a `None` when that number can't be determined. + pub lag: Option, } /// Represents a pending message parsed from [`xpending`] methods. @@ -781,6 +787,20 @@ impl FromRedisValue for StreamInfoGroupsReply { if let Some(v) = &map.get("last-delivered-id") { g.last_delivered_id = from_redis_value(v)?; } + if let Some(v) = &map.get("entries-read") { + g.entries_read = if let Value::Nil = v { + None + } else { + Some(from_redis_value(v)?) + }; + } + if let Some(v) = &map.get("lag") { + g.lag = if let Value::Nil = v { + None + } else { + Some(from_redis_value(v)?) + }; + } reply.groups.push(g); } Ok(reply) diff --git a/redis/src/types.rs b/redis/src/types.rs index 49816ee1c..c610f7cb0 100644 --- a/redis/src/types.rs +++ b/redis/src/types.rs @@ -1499,23 +1499,23 @@ macro_rules! deref_to_write_redis_args_impl { } deref_to_write_redis_args_impl! { - <'a, T: ?Sized> ToRedisArgs for &'a T where T: ToRedisArgs + <'a, T> ToRedisArgs for &'a T where T: ToRedisArgs } deref_to_write_redis_args_impl! { - <'a, T: ?Sized> ToRedisArgs for &'a mut T where T: ToRedisArgs + <'a, T> ToRedisArgs for &'a mut T where T: ToRedisArgs } deref_to_write_redis_args_impl! { - ToRedisArgs for Box where T: ToRedisArgs + ToRedisArgs for Box where T: ToRedisArgs } deref_to_write_redis_args_impl! { - ToRedisArgs for std::sync::Arc where T: ToRedisArgs + ToRedisArgs for std::sync::Arc where T: ToRedisArgs } deref_to_write_redis_args_impl! { - ToRedisArgs for std::rc::Rc where T: ToRedisArgs + ToRedisArgs for std::rc::Rc where T: ToRedisArgs } /// @note: Redis cannot store empty sets so the application has to @@ -1678,7 +1678,7 @@ fn vec_to_array(items: Vec, original_value: &Value) -> Red } } -impl FromRedisValue for [T; N] { +impl FromRedisValue for [T; N] { fn from_redis_value(value: &Value) -> RedisResult<[T; N]> { match *value { Value::BulkString(ref bytes) => match FromRedisValue::from_byte_vec(bytes) { @@ -1983,7 +1983,7 @@ macro_rules! pointer_from_redis_value_impl { $id:ident, $ty:ty, $func:expr ) => { $(#[$attr])* - impl<$id: ?Sized + FromRedisValue> FromRedisValue for $ty { + impl<$id: FromRedisValue> FromRedisValue for $ty { fn from_redis_value(v: &Value) -> RedisResult { FromRedisValue::from_redis_value(v).map($func) @@ -2565,3 +2565,9 @@ pub struct PushInfo { pub(crate) type AsyncPushSender = tokio::sync::mpsc::UnboundedSender; pub(crate) type SyncPushSender = std::sync::mpsc::Sender; + +// A consistent error value for connections closed without a reason. +#[cfg(feature = "aio")] +pub(crate) fn closed_connection_error() -> RedisError { + RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)) +} diff --git a/redis/tests/support/mod.rs b/redis/tests/support/mod.rs index ec11f5dad..572057156 100644 --- a/redis/tests/support/mod.rs +++ b/redis/tests/support/mod.rs @@ -508,6 +508,12 @@ impl TestContext { self.client.get_connection().unwrap() } + #[cfg(feature = "aio")] + #[allow(deprecated)] + pub async fn deprecated_async_connection(&self) -> redis::RedisResult { + self.client.get_async_connection().await + } + #[cfg(feature = "aio")] pub async fn async_connection(&self) -> RedisResult { self.client.get_multiplexed_async_connection().await @@ -655,7 +661,7 @@ pub fn build_keys_and_certs_for_tls(tempdir: &TempDir) -> TlsFilePaths { .arg("genrsa") .arg("-out") .arg(name) - .arg(&format!("{size}")) + .arg(format!("{size}")) .stdout(process::Stdio::piped()) .stderr(process::Stdio::piped()) .spawn() diff --git a/redis/tests/test_async.rs b/redis/tests/test_async.rs index a7ebe5cbd..d24633a8d 100644 --- a/redis/tests/test_async.rs +++ b/redis/tests/test_async.rs @@ -5,6 +5,7 @@ mod basic_async { use std::{collections::HashMap, time::Duration}; use futures::{prelude::*, StreamExt}; + use futures_time::task::sleep; #[cfg(feature = "connection-manager")] use redis::aio::ConnectionManager; use redis::{ @@ -671,6 +672,8 @@ mod basic_async { mod pub_sub { use std::time::Duration; + use futures_time::task::sleep; + use super::*; #[test] @@ -686,6 +689,30 @@ mod basic_async { let msg_payload: String = pubsub_stream.next().await.unwrap().get_payload()?; assert_eq!("banana".to_string(), msg_payload); + Ok(()) + }) + .unwrap(); + } + + #[test] + fn pub_sub_subscription_to_multiple_channels() { + use redis::RedisError; + + let ctx = TestContext::new(); + block_on_all(async move { + let mut pubsub_conn = ctx.async_pubsub().await?; + let _: () = pubsub_conn.subscribe(&["phonewave", "foo", "bar"]).await?; + let mut pubsub_stream = pubsub_conn.on_message(); + let mut publish_conn = ctx.async_connection().await?; + let _: () = publish_conn.publish("phonewave", "banana").await?; + + let msg_payload: String = pubsub_stream.next().await.unwrap().get_payload()?; + assert_eq!("banana".to_string(), msg_payload); + + let _: () = publish_conn.publish("foo", "foobar").await?; + let msg_payload: String = pubsub_stream.next().await.unwrap().get_payload()?; + assert_eq!("foobar".to_string(), msg_payload); + Ok::<_, RedisError>(()) }) .unwrap(); @@ -710,7 +737,62 @@ mod basic_async { let subscription_count = *subscriptions_counts.get(SUBSCRIPTION_KEY).unwrap(); assert_eq!(subscription_count, 0); - Ok::<_, RedisError>(()) + Ok(()) + }) + .unwrap(); + } + + #[test] + fn can_receive_messages_while_sending_requests_from_split_pub_sub() { + let ctx = TestContext::new(); + block_on_all(async move { + let (mut sink, mut stream) = ctx.async_pubsub().await?.split(); + let mut publish_conn = ctx.async_connection().await?; + let spawned_read = tokio::spawn(async move { stream.next().await }); + + let _: () = sink.subscribe("phonewave").await?; + let _: () = publish_conn.publish("phonewave", "banana").await?; + + let message: String = spawned_read.await.unwrap().unwrap().get_payload().unwrap(); + + assert_eq!("banana".to_string(), message); + + Ok(()) + }) + .unwrap(); + } + + #[test] + fn can_receive_messages_from_split_pub_sub_after_sink_was_dropped() { + let ctx = TestContext::new(); + block_on_all(async move { + let (mut sink, mut stream) = ctx.async_pubsub().await?.split(); + let mut publish_conn = ctx.async_connection().await?; + let spawned_read = tokio::spawn(async move { stream.next().await }); + + let _: () = sink.subscribe("phonewave").await?; + drop(sink); + let _: () = publish_conn.publish("phonewave", "banana").await?; + + let message: String = spawned_read.await.unwrap().unwrap().get_payload().unwrap(); + + assert_eq!("banana".to_string(), message); + + Ok(()) + }) + .unwrap(); + } + + #[test] + fn cannot_subscribe_on_split_pub_sub_after_stream_was_dropped() { + let ctx = TestContext::new(); + block_on_all(async move { + let (mut sink, stream) = ctx.async_pubsub().await?.split(); + drop(stream); + + assert!(sink.subscribe("phonewave").await.is_err()); + + Ok(()) }) .unwrap(); } @@ -748,11 +830,57 @@ mod basic_async { .unwrap(); } + #[test] + fn automatic_unsubscription_on_split() { + const SUBSCRIPTION_KEY: &str = "phonewave-automatic-unsubscription-on-split"; + + let ctx = TestContext::new(); + block_on_all(async move { + let (mut sink, stream) = ctx.async_pubsub().await?.split(); + sink.subscribe(SUBSCRIPTION_KEY).await?; + let mut conn = ctx.async_connection().await?; + sleep(Duration::from_millis(100).into()).await; + + let subscriptions_counts: HashMap = redis::cmd("PUBSUB") + .arg("NUMSUB") + .arg(SUBSCRIPTION_KEY) + .query_async(&mut conn) + .await?; + let mut subscription_count = *subscriptions_counts.get(SUBSCRIPTION_KEY).unwrap(); + assert_eq!(subscription_count, 1); + + drop(stream); + + // Allow for the unsubscription to occur within 5 seconds + for _ in 0..100 { + let subscriptions_counts: HashMap = redis::cmd("PUBSUB") + .arg("NUMSUB") + .arg(SUBSCRIPTION_KEY) + .query_async(&mut conn) + .await?; + subscription_count = *subscriptions_counts.get(SUBSCRIPTION_KEY).unwrap(); + if subscription_count == 0 { + break; + } + + sleep(Duration::from_millis(50).into()).await; + } + assert_eq!(subscription_count, 0); + + // verify that the sink is unusable after the stream is dropped. + let err = sink.subscribe(SUBSCRIPTION_KEY).await.unwrap_err(); + assert!(err.is_unrecoverable_error(), "{err:?}"); + + Ok::<_, RedisError>(()) + }) + .unwrap(); + } + #[test] fn pub_sub_conn_reuse() { let ctx = TestContext::new(); block_on_all(async move { - let mut pubsub_conn = ctx.async_pubsub().await?; + let mut pubsub_conn = ctx.deprecated_async_connection().await?.into_pubsub(); pubsub_conn.subscribe("phonewave").await?; pubsub_conn.psubscribe("*").await?; @@ -777,13 +905,11 @@ mod basic_async { test_with_all_connection_types(|mut conn| async move { conn.lpush::<&str, &str, ()>("key", "value").await?; - let res: Result<(String, usize), redis::RedisError> = redis::pipe() + redis::pipe() .get("key") // WRONGTYPE .llen("key") - .query_async(&mut conn) - .await; - - assert!(res.is_err()); + .exec_async(&mut conn) + .await.unwrap_err(); let list: Vec = conn.lrange("key", 0, -1).await?; @@ -793,6 +919,39 @@ mod basic_async { }); } + #[test] + fn multiplexed_pub_sub_subscribe_on_multiple_channels() { + let ctx = TestContext::new(); + if ctx.protocol == ProtocolVersion::RESP2 { + return; + } + block_on_all(async move { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let config = redis::AsyncConnectionConfig::new().set_push_sender(tx); + let mut conn = ctx + .client + .get_multiplexed_async_connection_with_config(&config) + .await?; + let _: () = conn.subscribe(&["phonewave", "foo", "bar"]).await?; + let mut publish_conn = ctx.async_connection().await?; + + let msg_payload = rx.recv().await.unwrap(); + assert_eq!(msg_payload.kind, PushKind::Subscribe); + + let _: () = publish_conn.publish("foo", "foobar").await?; + + let msg_payload = rx.recv().await.unwrap(); + assert_eq!(msg_payload.kind, PushKind::Subscribe); + let msg_payload = rx.recv().await.unwrap(); + assert_eq!(msg_payload.kind, PushKind::Subscribe); + let msg_payload = rx.recv().await.unwrap(); + assert_eq!(msg_payload.kind, PushKind::Message); + + Ok(()) + }) + .unwrap(); + } + #[test] fn pub_sub_multiple() { use redis::RedisError; @@ -986,6 +1145,47 @@ mod basic_async { .unwrap(); } + #[test] + fn test_multiplexed_connection_kills_connection_on_drop_even_when_blocking() { + let ctx = TestContext::new(); + block_on_all(async move { + let mut conn = ctx.async_connection().await.unwrap(); + let mut connection_to_dispose_of = ctx.async_connection().await.unwrap(); + connection_to_dispose_of.set_response_timeout(Duration::from_millis(1)); + + async fn count_ids(conn: &mut impl redis::aio::ConnectionLike) -> RedisResult { + let initial_connections: String = + cmd("CLIENT").arg("LIST").query_async(conn).await?; + + Ok(initial_connections + .as_bytes() + .windows(3) + .filter(|substr| substr == b"id=") + .count()) + } + + assert_eq!(count_ids(&mut conn).await.unwrap(), 2); + + let command_that_blocks = cmd("BLPOP") + .arg("foo") + .arg(0) + .exec_async(&mut connection_to_dispose_of) + .await; + + let err = command_that_blocks.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::IoError); + + drop(connection_to_dispose_of); + + sleep(Duration::from_millis(10).into()).await; + + assert_eq!(count_ids(&mut conn).await.unwrap(), 1); + + Ok(()) + }) + .unwrap(); + } + #[cfg(feature = "tls-rustls")] mod mtls_test { use super::*; @@ -1085,4 +1285,25 @@ mod basic_async { }) .unwrap(); } + + #[test] + fn test_select_db() { + let ctx = TestContext::new(); + let mut connection_info = ctx.client.get_connection_info().clone(); + connection_info.redis.db = 5; + let client = redis::Client::open(connection_info).unwrap(); + block_on_all(async move { + let mut connection = client.get_multiplexed_async_connection().await.unwrap(); + + let info: String = redis::cmd("CLIENT") + .arg("info") + .query_async(&mut connection) + .await + .unwrap(); + assert!(info.contains("db=5")); + + Ok(()) + }) + .unwrap(); + } } diff --git a/redis/tests/test_basic.rs b/redis/tests/test_basic.rs index 12f279300..54ddf3ca6 100644 --- a/redis/tests/test_basic.rs +++ b/redis/tests/test_basic.rs @@ -880,6 +880,23 @@ mod basic { } } + #[test] + fn pub_sub_subscription_to_multiple_channels() { + let ctx = TestContext::new(); + let mut conn = ctx.connection(); + let mut pubsub_conn = conn.as_pubsub(); + pubsub_conn.subscribe(&["phonewave", "foo", "bar"]).unwrap(); + let mut publish_conn = ctx.connection(); + + let _: () = publish_conn.publish("phonewave", "banana").unwrap(); + let msg_payload: String = pubsub_conn.get_message().unwrap().get_payload().unwrap(); + assert_eq!("banana".to_string(), msg_payload); + + let _: () = publish_conn.publish("foo", "foobar").unwrap(); + let msg_payload: String = pubsub_conn.get_message().unwrap().get_payload().unwrap(); + assert_eq!("foobar".to_string(), msg_payload); + } + #[test] fn test_pubsub_unsubscribe() { let ctx = TestContext::new(); @@ -1873,4 +1890,18 @@ mod basic { (kind, data) ); } + + #[test] + fn test_select_db() { + let ctx = TestContext::new(); + let mut connection_info = ctx.client.get_connection_info().clone(); + connection_info.redis.db = 5; + let client = redis::Client::open(connection_info).unwrap(); + let mut connection = client.get_connection().unwrap(); + let info: String = redis::cmd("CLIENT") + .arg("info") + .query(&mut connection) + .unwrap(); + assert!(info.contains("db=5")); + } } diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index 4c6c6bbb5..f4f8b62ca 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -19,8 +19,9 @@ mod cluster_async { cluster::ClusterClient, cluster_async::Connect, cluster_routing::{MultipleNodeRoutingInfo, RoutingInfo, SingleNodeRoutingInfo}, - cmd, from_owned_redis_value, parse_redis_value, AsyncCommands, Cmd, ErrorKind, InfoDict, - IntoConnectionInfo, ProtocolVersion, RedisError, RedisFuture, RedisResult, Script, Value, + cmd, from_owned_redis_value, parse_redis_value, pipe, AsyncCommands, Cmd, ErrorKind, + InfoDict, IntoConnectionInfo, ProtocolVersion, RedisError, RedisFuture, RedisResult, + Script, Value, }; use crate::support::*; @@ -2123,6 +2124,63 @@ mod cluster_async { assert_eq!(ping_attempts.load(Ordering::Acquire), 5); } + #[test] + fn test_kill_connection_on_drop_even_when_blocking() { + let ctx = TestClusterContext::new_with_cluster_client_builder(|builder| builder.retries(3)); + + block_on_all(async move { + async fn count_ids(conn: &mut impl redis::aio::ConnectionLike) -> RedisResult { + // we use a pipeline with a fake command in order to ensure that the CLIENT LIST command gets routed to the correct node. + // we use LIST as the key, in order to ensure that adding CLIENT LIST doesn't trigger a CROSSSLOTS error. + let initial_connections: String = pipe() + .cmd("GET") + .arg("LIST") + .cmd("CLIENT") + .arg("LIST") + .query_async::>>(conn) + .await? + .pop() + .unwrap() + .unwrap(); + + Ok(initial_connections + .as_bytes() + .windows(3) + .filter(|substr| substr == b"id=") + .count()) + } + + let mut conn = ctx.async_connection().await; + let mut connection_to_dispose_of = ctx.async_connection().await; + + assert_eq!(count_ids(&mut conn).await.unwrap(), 2); + + let mut cmd = cmd("BLPOP"); + let command_that_blocks = Box::pin(async move { + () = cmd + .arg("LIST") + .arg(0) + .exec_async(&mut connection_to_dispose_of) + .await + .unwrap(); + unreachable!("This shouldn't happen"); + }) + .fuse(); + let timeout = + futures_time::task::sleep(futures_time::time::Duration::from_millis(1)).fuse(); + + let others = futures::future::select(command_that_blocks, timeout).await; + drop(others); + + futures_time::task::sleep(futures_time::time::Duration::from_millis(100)).await; + + assert_eq!(count_ids(&mut conn).await.unwrap(), 1); + + Ok(()) + }) + .unwrap(); + } + #[cfg(feature = "tls-rustls")] mod mtls_test { use crate::support::mtls_test::create_cluster_client_from_cluster; diff --git a/redis/tests/test_sentinel.rs b/redis/tests/test_sentinel.rs index b03ddb28a..357c62d56 100644 --- a/redis/tests/test_sentinel.rs +++ b/redis/tests/test_sentinel.rs @@ -10,6 +10,14 @@ use redis::{ use crate::support::*; +fn parse_client_info(value: &str) -> HashMap<&str, &str> { + let info_map: std::collections::HashMap<&str, &str> = value + .split(" ") + .filter_map(|line| line.split_once('=')) + .collect(); + info_map +} + fn parse_replication_info(value: &str) -> HashMap<&str, &str> { let info_map: std::collections::HashMap<&str, &str> = value .split("\r\n") @@ -650,3 +658,62 @@ pub mod async_tests { .unwrap(); } } + +#[cfg(feature = "r2d2")] +pub mod pool_tests { + use crate::support::TestSentinelContext; + use crate::{assert_is_connection_to_master, parse_client_info}; + use r2d2::Pool; + use redis::sentinel::{LockedSentinelClient, SentinelClient}; + use std::collections::HashSet; + use std::ops::DerefMut; + + #[test] + fn test_sentinel_client() { + let master_name = "master1"; + let context = TestSentinelContext::new(2, 3, 3); + let master_client = SentinelClient::build( + context.sentinels_connection_info().clone(), + String::from(master_name), + Some(context.sentinel_node_connection_info()), + redis::sentinel::SentinelServerType::Master, + ) + .unwrap(); + + let pool = Pool::builder() + .max_size(5) + .build(LockedSentinelClient::new(master_client)) + .unwrap(); + + let mut conns = Vec::new(); + for _ in 0..5 { + let conn = pool.get().unwrap(); + + conns.push(conn); + } + + // since max_size is 5 and we haven't freed any connection this try should fail + let try_conn = pool.try_get(); + assert!(try_conn.is_none()); + + let mut client_id_set = HashSet::new(); + + for mut conn in conns { + let client_info_str: String = redis::cmd("CLIENT") + .arg("INFO") + .query(conn.deref_mut()) + .unwrap(); + + let client_info_parsed = parse_client_info(client_info_str.as_str()); + + // assert if all connections have different IDs + assert!(client_id_set.insert(client_info_parsed.get("id").unwrap().to_string())); + + assert_is_connection_to_master(conn.deref_mut()); + } + + // since previous connections are freed, this should work + let try_conn = pool.try_get(); + assert!(try_conn.is_some()); + } +} diff --git a/redis/tests/test_streams.rs b/redis/tests/test_streams.rs index 776fee528..526b6db85 100644 --- a/redis/tests/test_streams.rs +++ b/redis/tests/test_streams.rs @@ -281,12 +281,26 @@ fn test_assorted_2() { assert_eq!(&reply.groups.len(), &1); assert_eq!(&reply.groups[0].name, &"g99"); assert_eq!(&reply.groups[0].last_delivered_id, &"0-0"); + if let Some(lag) = reply.groups[0].lag { + assert_eq!(lag, 0); + } // call xadd on k99 just so we can read from it // using consumer g99 and test xinfo_consumers let _: RedisResult = con.xadd("k99", "1000-0", &[("a", "b"), ("c", "d")]); let _: RedisResult = con.xadd("k99", "1000-1", &[("e", "f"), ("g", "h")]); + // Two messages have been added but not acked: + // this should give us a `lag` of 2 (if the server supports it) + let result: RedisResult = con.xinfo_groups("k99"); + assert!(result.is_ok()); + let reply = result.unwrap(); + assert_eq!(&reply.groups.len(), &1); + assert_eq!(&reply.groups[0].name, &"g99"); + if let Some(lag) = reply.groups[0].lag { + assert_eq!(lag, 2); + } + // test empty PEL let empty_reply: StreamPendingReply = con.xpending("k99", "g99").unwrap();