rustis/client/
pipeline.rs

1#[cfg(feature = "redis-graph")]
2use crate::commands::GraphCommands;
3#[cfg(feature = "redis-json")]
4use crate::commands::JsonCommands;
5#[cfg(feature = "redis-search")]
6use crate::commands::SearchCommands;
7#[cfg(feature = "redis-time-series")]
8use crate::commands::TimeSeriesCommands;
9#[cfg(feature = "redis-bloom")]
10use crate::commands::{
11    BloomCommands, CountMinSketchCommands, CuckooCommands, TDigestCommands, TopKCommands,
12};
13use crate::{
14    client::{Client, PreparedCommand},
15    commands::{
16        BitmapCommands, ClusterCommands, ConnectionCommands, GenericCommands, GeoCommands,
17        HashCommands, HyperLogLogCommands, ListCommands, ScriptingCommands, ServerCommands,
18        SetCommands, SortedSetCommands, StreamCommands, StringCommands,
19    },
20    resp::{Command, RespBatchDeserializer, Response},
21    Result,
22};
23use serde::de::DeserializeOwned;
24use std::iter::zip;
25
26/// Represents a Redis command pipeline.
27pub struct Pipeline<'a> {
28    client: &'a Client,
29    commands: Vec<Command>,
30    forget_flags: Vec<bool>,
31    retry_on_error: Option<bool>,
32}
33
34impl<'a> Pipeline<'a> {
35    pub(crate) fn new(client: &'a Client) -> Pipeline {
36        Pipeline {
37            client,
38            commands: Vec::new(),
39            forget_flags: Vec::new(),
40            retry_on_error: None,
41        }
42    }
43    /// Set a flag to override default `retry_on_error` behavior.
44    ///
45    /// See [Config::retry_on_error](crate::client::Config::retry_on_error)
46    pub fn retry_on_error(&mut self, retry_on_error: bool) {
47        self.retry_on_error = Some(retry_on_error);
48    }
49
50    /// Queue a command
51    pub fn queue(&mut self, command: Command) {
52        self.commands.push(command);
53        self.forget_flags.push(false);
54    }
55
56    /// Queue a command and forget its response
57    pub fn forget(&mut self, command: Command) {
58        self.commands.push(command);
59        self.forget_flags.push(true);
60    }
61
62    /// Execute the pipeline by the sending the queued command
63    /// as a whole batch to the Redis server.
64    ///
65    /// # Return
66    /// It is the caller responsability to use the right type to cast the server response
67    /// to the right tuple or collection depending on which command has been
68    /// [queued](BatchPreparedCommand::queue) or [forgotten](BatchPreparedCommand::forget).
69    ///
70    /// The most generic type that can be requested as a result is `Vec<resp::Value>`
71    ///
72    /// # Example
73    /// ```
74    /// use rustis::{
75    ///     client::{Client, Pipeline, BatchPreparedCommand},
76    ///     commands::StringCommands,
77    ///     resp::{cmd, Value}, Result,
78    /// };
79    ///
80    /// #[cfg_attr(feature = "tokio-runtime", tokio::main)]
81    /// #[cfg_attr(feature = "async-std-runtime", async_std::main)]
82    /// async fn main() -> Result<()> {
83    ///     let client = Client::connect("127.0.0.1:6379").await?;
84    ///
85    ///     let mut pipeline = client.create_pipeline();
86    ///     pipeline.set("key1", "value1").forget();
87    ///     pipeline.set("key2", "value2").forget();
88    ///     pipeline.get::<_, String>("key1").queue();
89    ///     pipeline.get::<_, String>("key2").queue();
90    ///
91    ///     let (value1, value2): (String, String) = pipeline.execute().await?;
92    ///     assert_eq!("value1", value1);
93    ///     assert_eq!("value2", value2);
94    ///
95    ///     Ok(())
96    /// }
97    /// ```    
98    pub async fn execute<T: DeserializeOwned>(self) -> Result<T> {
99        let num_commands = self.commands.len();
100        let results = self
101            .client
102            .send_batch(self.commands, self.retry_on_error)
103            .await?;
104
105        if num_commands > 1 {
106            let mut filtered_results = zip(results, self.forget_flags.iter())
107                .filter_map(|(value, forget_flag)| if *forget_flag { None } else { Some(value) })
108                .collect::<Vec<_>>();
109
110            if filtered_results.len() == 1 {
111                let result = filtered_results.pop().unwrap();
112                result.to()
113            } else {
114                let deserializer = RespBatchDeserializer::new(&filtered_results);
115                T::deserialize(&deserializer)
116            }
117        } else {
118            results[0].to()
119        }
120    }
121}
122
123/// Extension trait dedicated to [`PreparedCommand`](crate::client::PreparedCommand)
124/// to add specific methods for the [`Pipeline`](crate::client::Pipeline) &
125/// the [`Transaction`](crate::client::Transaction) executors
126pub trait BatchPreparedCommand<R = ()> {
127    /// Queue a command.
128    fn queue(self);
129
130    /// Queue a command and forget its response.
131    fn forget(self);
132}
133
134impl<'a, 'b, R: Response> BatchPreparedCommand for PreparedCommand<'a, &'a mut Pipeline<'b>, R> {
135    /// Queue a command.
136    #[inline]
137    fn queue(self) {
138        self.executor.queue(self.command)
139    }
140
141    /// Queue a command and forget its response.
142    #[inline]
143    fn forget(self) {
144        self.executor.forget(self.command)
145    }
146}
147
148impl<'a, 'b> BitmapCommands<'a> for &'a mut Pipeline<'b> {}
149#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
150#[cfg(feature = "redis-bloom")]
151impl<'a, 'b> BloomCommands<'a> for &'a mut Pipeline<'b> {}
152impl<'a, 'b> ClusterCommands<'a> for &'a mut Pipeline<'b> {}
153impl<'a, 'b> ConnectionCommands<'a> for &'a mut Pipeline<'b> {}
154#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
155#[cfg(feature = "redis-bloom")]
156impl<'a, 'b> CountMinSketchCommands<'a> for &'a mut Pipeline<'b> {}
157#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
158#[cfg(feature = "redis-bloom")]
159impl<'a, 'b> CuckooCommands<'a> for &'a mut Pipeline<'b> {}
160impl<'a, 'b> GenericCommands<'a> for &'a mut Pipeline<'b> {}
161impl<'a, 'b> GeoCommands<'a> for &'a mut Pipeline<'b> {}
162#[cfg_attr(docsrs, doc(cfg(feature = "redis-graph")))]
163#[cfg(feature = "redis-graph")]
164impl<'a, 'b> GraphCommands<'a> for &'a mut Pipeline<'b> {}
165impl<'a, 'b> HashCommands<'a> for &'a mut Pipeline<'b> {}
166impl<'a, 'b> HyperLogLogCommands<'a> for &'a mut Pipeline<'b> {}
167#[cfg_attr(docsrs, doc(cfg(feature = "redis-json")))]
168#[cfg(feature = "redis-json")]
169impl<'a, 'b> JsonCommands<'a> for &'a mut Pipeline<'b> {}
170impl<'a, 'b> ListCommands<'a> for &'a mut Pipeline<'b> {}
171#[cfg_attr(docsrs, doc(cfg(feature = "redis-search")))]
172#[cfg(feature = "redis-search")]
173impl<'a, 'b> SearchCommands<'a> for &'a mut Pipeline<'b> {}
174impl<'a, 'b> SetCommands<'a> for &'a mut Pipeline<'b> {}
175impl<'a, 'b> ScriptingCommands<'a> for &'a mut Pipeline<'b> {}
176impl<'a, 'b> ServerCommands<'a> for &'a mut Pipeline<'b> {}
177impl<'a, 'b> SortedSetCommands<'a> for &'a mut Pipeline<'b> {}
178impl<'a, 'b> StreamCommands<'a> for &'a mut Pipeline<'b> {}
179impl<'a, 'b> StringCommands<'a> for &'a mut Pipeline<'b> {}
180#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
181#[cfg(feature = "redis-bloom")]
182impl<'a, 'b> TDigestCommands<'a> for &'a mut Pipeline<'b> {}
183#[cfg_attr(docsrs, doc(cfg(feature = "redis-time-series")))]
184#[cfg(feature = "redis-time-series")]
185impl<'a, 'b> TimeSeriesCommands<'a> for &'a mut Pipeline<'b> {}
186#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
187#[cfg(feature = "redis-bloom")]
188impl<'a, 'b> TopKCommands<'a> for &'a mut Pipeline<'b> {}