rustis/client/
pipeline.rs1#[cfg(feature = "redis-graph")]
2use crate::commands::GraphCommands;
3use crate::commands::{
4 BloomCommands, CountMinSketchCommands, CuckooCommands, JsonCommands, SearchCommands,
5 TDigestCommands, TimeSeriesCommands, TopKCommands, VectorSetCommands,
6};
7use crate::{
8 client::{Client, PreparedCommand},
9 commands::{
10 BitmapCommands, ClusterCommands, ConnectionCommands, GenericCommands, GeoCommands,
11 HashCommands, HyperLogLogCommands, ListCommands, ScriptingCommands, ServerCommands,
12 SetCommands, SortedSetCommands, StreamCommands, StringCommands,
13 },
14 resp::{Command, RespBatchDeserializer, Response},
15 Result,
16};
17use serde::de::DeserializeOwned;
18use std::iter::zip;
19
20pub struct Pipeline<'a> {
22 client: &'a Client,
23 commands: Vec<Command>,
24 forget_flags: Vec<bool>,
25 retry_on_error: Option<bool>,
26}
27
28impl Pipeline<'_> {
29 pub(crate) fn new(client: &Client) -> Pipeline {
30 Pipeline {
31 client,
32 commands: Vec::new(),
33 forget_flags: Vec::new(),
34 retry_on_error: None,
35 }
36 }
37 pub fn retry_on_error(&mut self, retry_on_error: bool) {
41 self.retry_on_error = Some(retry_on_error);
42 }
43
44 pub fn queue(&mut self, command: Command) {
46 self.commands.push(command);
47 self.forget_flags.push(false);
48 }
49
50 pub fn forget(&mut self, command: Command) {
52 self.commands.push(command);
53 self.forget_flags.push(true);
54 }
55
56 pub async fn execute<T: DeserializeOwned>(self) -> Result<T> {
93 let num_commands = self.commands.len();
94 let results = self
95 .client
96 .send_batch(self.commands, self.retry_on_error)
97 .await?;
98
99 if num_commands > 1 {
100 let mut filtered_results = zip(results, self.forget_flags.iter())
101 .filter_map(|(value, forget_flag)| if *forget_flag { None } else { Some(value) })
102 .collect::<Vec<_>>();
103
104 if filtered_results.len() == 1 {
105 let result = filtered_results.pop().unwrap();
106 result.to()
107 } else {
108 let deserializer = RespBatchDeserializer::new(&filtered_results);
109 T::deserialize(&deserializer)
110 }
111 } else {
112 results[0].to()
113 }
114 }
115}
116
117pub trait BatchPreparedCommand<R = ()> {
121 fn queue(self);
123
124 fn forget(self);
126}
127
128impl<'a, R: Response> BatchPreparedCommand for PreparedCommand<'a, &'a mut Pipeline<'_>, R> {
129 #[inline]
131 fn queue(self) {
132 self.executor.queue(self.command)
133 }
134
135 #[inline]
137 fn forget(self) {
138 self.executor.forget(self.command)
139 }
140}
141
142impl<'a> BitmapCommands<'a> for &'a mut Pipeline<'_> {}
143impl<'a> BloomCommands<'a> for &'a mut Pipeline<'_> {}
144impl<'a> ClusterCommands<'a> for &'a mut Pipeline<'_> {}
145impl<'a> ConnectionCommands<'a> for &'a mut Pipeline<'_> {}
146impl<'a> CountMinSketchCommands<'a> for &'a mut Pipeline<'_> {}
147impl<'a> CuckooCommands<'a> for &'a mut Pipeline<'_> {}
148impl<'a> GenericCommands<'a> for &'a mut Pipeline<'_> {}
149impl<'a> GeoCommands<'a> for &'a mut Pipeline<'_> {}
150#[cfg_attr(docsrs, doc(cfg(feature = "redis-graph")))]
151#[cfg(feature = "redis-graph")]
152impl<'a> GraphCommands<'a> for &'a mut Pipeline<'_> {}
153impl<'a> HashCommands<'a> for &'a mut Pipeline<'_> {}
154impl<'a> HyperLogLogCommands<'a> for &'a mut Pipeline<'_> {}
155impl<'a> JsonCommands<'a> for &'a mut Pipeline<'_> {}
156impl<'a> ListCommands<'a> for &'a mut Pipeline<'_> {}
157impl<'a> SearchCommands<'a> for &'a mut Pipeline<'_> {}
158impl<'a> SetCommands<'a> for &'a mut Pipeline<'_> {}
159impl<'a> ScriptingCommands<'a> for &'a mut Pipeline<'_> {}
160impl<'a> ServerCommands<'a> for &'a mut Pipeline<'_> {}
161impl<'a> SortedSetCommands<'a> for &'a mut Pipeline<'_> {}
162impl<'a> StreamCommands<'a> for &'a mut Pipeline<'_> {}
163impl<'a> StringCommands<'a> for &'a mut Pipeline<'_> {}
164impl<'a> TDigestCommands<'a> for &'a mut Pipeline<'_> {}
165impl<'a> TimeSeriesCommands<'a> for &'a mut Pipeline<'_> {}
166impl<'a> TopKCommands<'a> for &'a mut Pipeline<'_> {}
167impl<'a> VectorSetCommands<'a> for &'a Pipeline<'_> {}