-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathpipeline.rs
68 lines (56 loc) · 2.09 KB
/
pipeline.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use crate::{
client::BatchPreparedCommand,
commands::{FlushingMode, ServerCommands, StringCommands},
resp::{cmd, Value},
tests::{get_cluster_test_client, get_test_client},
Result,
};
use serial_test::serial;
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
#[serial]
async fn pipeline() -> Result<()> {
let client = get_test_client().await?;
client.flushdb(FlushingMode::Sync).await?;
let mut pipeline = client.create_pipeline();
pipeline.set("key1", "value1").forget();
pipeline.set("key2", "value2").forget();
pipeline.get::<_, ()>("key1").queue();
pipeline.get::<_, ()>("key2").queue();
let (value1, value2): (String, String) = pipeline.execute().await?;
assert_eq!("value1", value1);
assert_eq!("value2", value2);
Ok(())
}
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
#[serial]
async fn error() -> Result<()> {
let client = get_test_client().await?;
client.flushdb(FlushingMode::Sync).await?;
let mut pipeline = client.create_pipeline();
pipeline.set("key1", "value1").forget();
pipeline.set("key2", "value2").forget();
pipeline.queue(cmd("UNKNOWN"));
pipeline.get::<_, ()>("key1").queue();
pipeline.get::<_, ()>("key2").queue();
let result: Result<(Value, String, String)> = pipeline.execute().await;
assert!(result.is_err());
Ok(())
}
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
#[serial]
async fn pipeline_on_cluster() -> Result<()> {
let client = get_cluster_test_client().await?;
client.flushall(FlushingMode::Sync).await?;
let mut pipeline = client.create_pipeline();
pipeline.set("key1", "value1").forget();
pipeline.set("key2", "value2").forget();
pipeline.get::<_, ()>("key1").queue();
pipeline.get::<_, ()>("key2").queue();
let (value1, value2): (String, String) = pipeline.execute().await?;
assert_eq!("value1", value1);
assert_eq!("value2", value2);
Ok(())
}