rustis/client/
pipeline.rs

1#[cfg(feature = "redis-graph")]
2use crate::commands::GraphCommands;
3use crate::commands::{
4    BloomCommands, CountMinSketchCommands, CuckooCommands, JsonCommands, SearchCommands,
5    TDigestCommands, TimeSeriesCommands, TopKCommands, VectorSetCommands,
6};
7use crate::{
8    client::{Client, PreparedCommand},
9    commands::{
10        BitmapCommands, ClusterCommands, ConnectionCommands, GenericCommands, GeoCommands,
11        HashCommands, HyperLogLogCommands, ListCommands, ScriptingCommands, ServerCommands,
12        SetCommands, SortedSetCommands, StreamCommands, StringCommands,
13    },
14    resp::{Command, RespBatchDeserializer, Response},
15    Result,
16};
17use serde::de::DeserializeOwned;
18use std::iter::zip;
19
20/// Represents a Redis command pipeline.
21pub struct Pipeline<'a> {
22    client: &'a Client,
23    commands: Vec<Command>,
24    forget_flags: Vec<bool>,
25    retry_on_error: Option<bool>,
26}
27
28impl Pipeline<'_> {
29    pub(crate) fn new(client: &Client) -> Pipeline {
30        Pipeline {
31            client,
32            commands: Vec::new(),
33            forget_flags: Vec::new(),
34            retry_on_error: None,
35        }
36    }
37    /// Set a flag to override default `retry_on_error` behavior.
38    ///
39    /// See [Config::retry_on_error](crate::client::Config::retry_on_error)
40    pub fn retry_on_error(&mut self, retry_on_error: bool) {
41        self.retry_on_error = Some(retry_on_error);
42    }
43
44    /// Queue a command
45    pub fn queue(&mut self, command: Command) {
46        self.commands.push(command);
47        self.forget_flags.push(false);
48    }
49
50    /// Queue a command and forget its response
51    pub fn forget(&mut self, command: Command) {
52        self.commands.push(command);
53        self.forget_flags.push(true);
54    }
55
56    /// Execute the pipeline by the sending the queued command
57    /// as a whole batch to the Redis server.
58    ///
59    /// # Return
60    /// It is the caller responsability to use the right type to cast the server response
61    /// to the right tuple or collection depending on which command has been
62    /// [queued](BatchPreparedCommand::queue) or [forgotten](BatchPreparedCommand::forget).
63    ///
64    /// The most generic type that can be requested as a result is `Vec<resp::Value>`
65    ///
66    /// # Example
67    /// ```
68    /// use rustis::{
69    ///     client::{Client, Pipeline, BatchPreparedCommand},
70    ///     commands::StringCommands,
71    ///     resp::{cmd, Value}, Result,
72    /// };
73    ///
74    /// #[cfg_attr(feature = "tokio-runtime", tokio::main)]
75    /// #[cfg_attr(feature = "async-std-runtime", async_std::main)]
76    /// async fn main() -> Result<()> {
77    ///     let client = Client::connect("127.0.0.1:6379").await?;
78    ///
79    ///     let mut pipeline = client.create_pipeline();
80    ///     pipeline.set("key1", "value1").forget();
81    ///     pipeline.set("key2", "value2").forget();
82    ///     pipeline.get::<_, String>("key1").queue();
83    ///     pipeline.get::<_, String>("key2").queue();
84    ///
85    ///     let (value1, value2): (String, String) = pipeline.execute().await?;
86    ///     assert_eq!("value1", value1);
87    ///     assert_eq!("value2", value2);
88    ///
89    ///     Ok(())
90    /// }
91    /// ```    
92    pub async fn execute<T: DeserializeOwned>(self) -> Result<T> {
93        let num_commands = self.commands.len();
94        let results = self
95            .client
96            .send_batch(self.commands, self.retry_on_error)
97            .await?;
98
99        if num_commands > 1 {
100            let mut filtered_results = zip(results, self.forget_flags.iter())
101                .filter_map(|(value, forget_flag)| if *forget_flag { None } else { Some(value) })
102                .collect::<Vec<_>>();
103
104            if filtered_results.len() == 1 {
105                let result = filtered_results.pop().unwrap();
106                result.to()
107            } else {
108                let deserializer = RespBatchDeserializer::new(&filtered_results);
109                T::deserialize(&deserializer)
110            }
111        } else {
112            results[0].to()
113        }
114    }
115}
116
117/// Extension trait dedicated to [`PreparedCommand`](crate::client::PreparedCommand)
118/// to add specific methods for the [`Pipeline`](crate::client::Pipeline) &
119/// the [`Transaction`](crate::client::Transaction) executors
120pub trait BatchPreparedCommand<R = ()> {
121    /// Queue a command.
122    fn queue(self);
123
124    /// Queue a command and forget its response.
125    fn forget(self);
126}
127
128impl<'a, R: Response> BatchPreparedCommand for PreparedCommand<'a, &'a mut Pipeline<'_>, R> {
129    /// Queue a command.
130    #[inline]
131    fn queue(self) {
132        self.executor.queue(self.command)
133    }
134
135    /// Queue a command and forget its response.
136    #[inline]
137    fn forget(self) {
138        self.executor.forget(self.command)
139    }
140}
141
142impl<'a> BitmapCommands<'a> for &'a mut Pipeline<'_> {}
143impl<'a> BloomCommands<'a> for &'a mut Pipeline<'_> {}
144impl<'a> ClusterCommands<'a> for &'a mut Pipeline<'_> {}
145impl<'a> ConnectionCommands<'a> for &'a mut Pipeline<'_> {}
146impl<'a> CountMinSketchCommands<'a> for &'a mut Pipeline<'_> {}
147impl<'a> CuckooCommands<'a> for &'a mut Pipeline<'_> {}
148impl<'a> GenericCommands<'a> for &'a mut Pipeline<'_> {}
149impl<'a> GeoCommands<'a> for &'a mut Pipeline<'_> {}
150#[cfg_attr(docsrs, doc(cfg(feature = "redis-graph")))]
151#[cfg(feature = "redis-graph")]
152impl<'a> GraphCommands<'a> for &'a mut Pipeline<'_> {}
153impl<'a> HashCommands<'a> for &'a mut Pipeline<'_> {}
154impl<'a> HyperLogLogCommands<'a> for &'a mut Pipeline<'_> {}
155impl<'a> JsonCommands<'a> for &'a mut Pipeline<'_> {}
156impl<'a> ListCommands<'a> for &'a mut Pipeline<'_> {}
157impl<'a> SearchCommands<'a> for &'a mut Pipeline<'_> {}
158impl<'a> SetCommands<'a> for &'a mut Pipeline<'_> {}
159impl<'a> ScriptingCommands<'a> for &'a mut Pipeline<'_> {}
160impl<'a> ServerCommands<'a> for &'a mut Pipeline<'_> {}
161impl<'a> SortedSetCommands<'a> for &'a mut Pipeline<'_> {}
162impl<'a> StreamCommands<'a> for &'a mut Pipeline<'_> {}
163impl<'a> StringCommands<'a> for &'a mut Pipeline<'_> {}
164impl<'a> TDigestCommands<'a> for &'a mut Pipeline<'_> {}
165impl<'a> TimeSeriesCommands<'a> for &'a mut Pipeline<'_> {}
166impl<'a> TopKCommands<'a> for &'a mut Pipeline<'_> {}
167impl<'a> VectorSetCommands<'a> for &'a Pipeline<'_> {}