-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathsentinel_connection.rs
156 lines (137 loc) · 5.11 KB
/
sentinel_connection.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use crate::{
client::{Config, SentinelConfig},
commands::{RoleResult, SentinelCommands, ServerCommands},
resp::{Command, RespBuf},
sleep, Error, Result, RetryReason, StandaloneConnection,
};
use log::debug;
use smallvec::SmallVec;
pub struct SentinelConnection {
sentinel_config: SentinelConfig,
config: Config,
pub inner_connection: StandaloneConnection,
}
impl SentinelConnection {
#[inline]
pub async fn write(&mut self, command: &Command) -> Result<()> {
self.inner_connection.write(command).await
}
#[inline]
pub async fn write_batch(
&mut self,
commands: SmallVec<[&mut Command; 10]>,
retry_reasons: &[RetryReason],
) -> Result<()> {
self.inner_connection
.write_batch(commands, retry_reasons)
.await
}
#[inline]
pub async fn read(&mut self) -> Option<Result<RespBuf>> {
self.inner_connection.read().await
}
#[inline]
pub async fn reconnect(&mut self) -> Result<()> {
self.inner_connection =
Self::connect_to_sentinel(&self.sentinel_config, &self.config).await?;
Ok(())
}
/// Follow `Redis service discovery via Sentinel` documentation
/// #See <https://redis.io/docs/reference/sentinel-clients/#redis-service-discovery-via-sentinel>
///
/// # Remark
/// this function must be desugared because of async recursion:
/// <https://doc.rust-lang.org/error-index.html#E0733>
pub async fn connect(
sentinel_config: &SentinelConfig,
config: &Config,
) -> Result<SentinelConnection> {
let inner_connection = Self::connect_to_sentinel(sentinel_config, config).await?;
Ok(SentinelConnection {
sentinel_config: sentinel_config.clone(),
config: config.clone(),
inner_connection,
})
}
async fn connect_to_sentinel(
sentinel_config: &SentinelConfig,
config: &Config,
) -> Result<StandaloneConnection> {
let mut restart = false;
let mut unreachable_sentinel = true;
let mut sentinel_node_config = config.clone();
sentinel_node_config
.username
.clone_from(&sentinel_config.username);
sentinel_node_config
.password
.clone_from(&sentinel_config.password);
loop {
for sentinel_instance in &sentinel_config.instances {
// Step 1: connecting to Sentinel
let (host, port) = sentinel_instance;
let mut sentinel_connection =
match StandaloneConnection::connect(host, *port, &sentinel_node_config).await {
Ok(sentinel_connection) => sentinel_connection,
Err(e) => {
debug!("Cannot connect to Sentinel {}:{} : {}", *host, *port, e);
continue;
}
};
// Step 2: ask for master address
let (master_host, master_port) = match sentinel_connection
.sentinel_get_master_addr_by_name(sentinel_config.service_name.clone())
.await
{
Ok(Some((master_host, master_port))) => (master_host, master_port),
Ok(None) => {
debug!(
"Sentinel {}:{} does not know master `{}`",
*host, *port, sentinel_config.service_name
);
unreachable_sentinel = false;
continue;
}
Err(e) => {
debug!("Cannot execute command `SENTINEL get-master-addr-by-name` with Sentinel {}:{}: {}", *host, *port, e);
continue;
}
};
// Step 3: call the ROLE command in the target instance
let mut master_connection =
StandaloneConnection::connect(&master_host, master_port, config).await?;
let role: RoleResult = master_connection.role().await?;
if let RoleResult::Master {
master_replication_offset: _,
replica_infos: _,
} = role
{
return Ok(master_connection);
} else {
sleep(sentinel_config.wait_between_failures).await;
// restart from the beginning
restart = true;
break;
}
}
if !restart {
break;
} else {
restart = false;
}
}
if unreachable_sentinel {
Err(Error::Sentinel(
"All Sentinel instances are unreachable".to_owned(),
))
} else {
Err(Error::Sentinel(format!(
"master {} is unknown by all Sentinel instances",
sentinel_config.service_name
)))
}
}
pub(crate) fn tag(&self) -> &str {
self.inner_connection.tag()
}
}