rustis/client/
pipeline.rs1#[cfg(feature = "redis-graph")]
2use crate::commands::GraphCommands;
3#[cfg(feature = "redis-json")]
4use crate::commands::JsonCommands;
5#[cfg(feature = "redis-search")]
6use crate::commands::SearchCommands;
7#[cfg(feature = "redis-time-series")]
8use crate::commands::TimeSeriesCommands;
9#[cfg(feature = "redis-bloom")]
10use crate::commands::{
11 BloomCommands, CountMinSketchCommands, CuckooCommands, TDigestCommands, TopKCommands,
12};
13use crate::{
14 client::{Client, PreparedCommand},
15 commands::{
16 BitmapCommands, ClusterCommands, ConnectionCommands, GenericCommands, GeoCommands,
17 HashCommands, HyperLogLogCommands, ListCommands, ScriptingCommands, ServerCommands,
18 SetCommands, SortedSetCommands, StreamCommands, StringCommands,
19 },
20 resp::{Command, RespBatchDeserializer, Response},
21 Result,
22};
23use serde::de::DeserializeOwned;
24use std::iter::zip;
25
26pub struct Pipeline<'a> {
28 client: &'a Client,
29 commands: Vec<Command>,
30 forget_flags: Vec<bool>,
31 retry_on_error: Option<bool>,
32}
33
34impl<'a> Pipeline<'a> {
35 pub(crate) fn new(client: &'a Client) -> Pipeline {
36 Pipeline {
37 client,
38 commands: Vec::new(),
39 forget_flags: Vec::new(),
40 retry_on_error: None,
41 }
42 }
43 pub fn retry_on_error(&mut self, retry_on_error: bool) {
47 self.retry_on_error = Some(retry_on_error);
48 }
49
50 pub fn queue(&mut self, command: Command) {
52 self.commands.push(command);
53 self.forget_flags.push(false);
54 }
55
56 pub fn forget(&mut self, command: Command) {
58 self.commands.push(command);
59 self.forget_flags.push(true);
60 }
61
62 pub async fn execute<T: DeserializeOwned>(self) -> Result<T> {
99 let num_commands = self.commands.len();
100 let results = self
101 .client
102 .send_batch(self.commands, self.retry_on_error)
103 .await?;
104
105 if num_commands > 1 {
106 let mut filtered_results = zip(results, self.forget_flags.iter())
107 .filter_map(|(value, forget_flag)| if *forget_flag { None } else { Some(value) })
108 .collect::<Vec<_>>();
109
110 if filtered_results.len() == 1 {
111 let result = filtered_results.pop().unwrap();
112 result.to()
113 } else {
114 let deserializer = RespBatchDeserializer::new(&filtered_results);
115 T::deserialize(&deserializer)
116 }
117 } else {
118 results[0].to()
119 }
120 }
121}
122
123pub trait BatchPreparedCommand<R = ()> {
127 fn queue(self);
129
130 fn forget(self);
132}
133
134impl<'a, 'b, R: Response> BatchPreparedCommand for PreparedCommand<'a, &'a mut Pipeline<'b>, R> {
135 #[inline]
137 fn queue(self) {
138 self.executor.queue(self.command)
139 }
140
141 #[inline]
143 fn forget(self) {
144 self.executor.forget(self.command)
145 }
146}
147
148impl<'a, 'b> BitmapCommands<'a> for &'a mut Pipeline<'b> {}
149#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
150#[cfg(feature = "redis-bloom")]
151impl<'a, 'b> BloomCommands<'a> for &'a mut Pipeline<'b> {}
152impl<'a, 'b> ClusterCommands<'a> for &'a mut Pipeline<'b> {}
153impl<'a, 'b> ConnectionCommands<'a> for &'a mut Pipeline<'b> {}
154#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
155#[cfg(feature = "redis-bloom")]
156impl<'a, 'b> CountMinSketchCommands<'a> for &'a mut Pipeline<'b> {}
157#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
158#[cfg(feature = "redis-bloom")]
159impl<'a, 'b> CuckooCommands<'a> for &'a mut Pipeline<'b> {}
160impl<'a, 'b> GenericCommands<'a> for &'a mut Pipeline<'b> {}
161impl<'a, 'b> GeoCommands<'a> for &'a mut Pipeline<'b> {}
162#[cfg_attr(docsrs, doc(cfg(feature = "redis-graph")))]
163#[cfg(feature = "redis-graph")]
164impl<'a, 'b> GraphCommands<'a> for &'a mut Pipeline<'b> {}
165impl<'a, 'b> HashCommands<'a> for &'a mut Pipeline<'b> {}
166impl<'a, 'b> HyperLogLogCommands<'a> for &'a mut Pipeline<'b> {}
167#[cfg_attr(docsrs, doc(cfg(feature = "redis-json")))]
168#[cfg(feature = "redis-json")]
169impl<'a, 'b> JsonCommands<'a> for &'a mut Pipeline<'b> {}
170impl<'a, 'b> ListCommands<'a> for &'a mut Pipeline<'b> {}
171#[cfg_attr(docsrs, doc(cfg(feature = "redis-search")))]
172#[cfg(feature = "redis-search")]
173impl<'a, 'b> SearchCommands<'a> for &'a mut Pipeline<'b> {}
174impl<'a, 'b> SetCommands<'a> for &'a mut Pipeline<'b> {}
175impl<'a, 'b> ScriptingCommands<'a> for &'a mut Pipeline<'b> {}
176impl<'a, 'b> ServerCommands<'a> for &'a mut Pipeline<'b> {}
177impl<'a, 'b> SortedSetCommands<'a> for &'a mut Pipeline<'b> {}
178impl<'a, 'b> StreamCommands<'a> for &'a mut Pipeline<'b> {}
179impl<'a, 'b> StringCommands<'a> for &'a mut Pipeline<'b> {}
180#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
181#[cfg(feature = "redis-bloom")]
182impl<'a, 'b> TDigestCommands<'a> for &'a mut Pipeline<'b> {}
183#[cfg_attr(docsrs, doc(cfg(feature = "redis-time-series")))]
184#[cfg(feature = "redis-time-series")]
185impl<'a, 'b> TimeSeriesCommands<'a> for &'a mut Pipeline<'b> {}
186#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
187#[cfg(feature = "redis-bloom")]
188impl<'a, 'b> TopKCommands<'a> for &'a mut Pipeline<'b> {}