Skip to content

Commit feb8366

Browse files
committed
Replace use of r2d2 with redis-rs ConnectionManager
Using r2d2 with redis doesn't really make sense, as the redis crate can multiplex a connection, and in fact its the recommended approach for most use cases: redis-rs/redis-rs#388 (comment) Therefore this changes things to use redis-rs internally, with the intent of adding an async API to this crate in the future. Additionally this enables TLS for Redis connections, to support Redis servers that require TLS. This change is mostly API-compatible, but does change the underlying RedisPool type (its still called "pool", but its actually a ConnectionManager), and drops the exported type RedisPooledConnection.
1 parent fa2db19 commit feb8366

File tree

3 files changed

+29
-52
lines changed

3 files changed

+29
-52
lines changed

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ edition = "2018"
1616
travis-ci = { repository = "spk/rust-sidekiq" }
1717

1818
[dependencies]
19+
futures = "*"
1920
rand = "0.8"
2021
serde = "1.0"
2122
serde_json = "1.0"
22-
r2d2 = "0.8"
23-
r2d2_redis = "0.14"
23+
redis = { version = "*", features = ["connection-manager", "async-std-comp", "async-std-tls-comp"] }
2424
time = "0.3"

src/lib.rs

-3
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,12 @@
1010
#![deny(warnings)]
1111
#![crate_name = "sidekiq"]
1212

13-
extern crate r2d2;
14-
extern crate r2d2_redis;
1513
extern crate rand;
1614
extern crate serde;
1715
extern crate serde_json;
1816

1917
mod sidekiq;
2018
pub use crate::sidekiq::{
2119
create_redis_pool, Client, ClientError, ClientOpts, Job, JobOpts, RedisPool,
22-
RedisPooledConnection,
2320
};
2421
pub use serde_json::value::Value;

src/sidekiq/mod.rs

+27-47
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,20 @@ use std::fmt;
44
use std::time::{SystemTime, UNIX_EPOCH};
55

66
use crate::Value;
7-
use r2d2_redis::{r2d2, redis, RedisConnectionManager};
87
use rand::distributions::Alphanumeric;
98
use rand::{thread_rng, Rng};
109
use serde::ser::SerializeStruct;
1110
use serde::{Serialize, Serializer};
1211

1312
use time::{Duration, OffsetDateTime};
1413

14+
use futures::executor::block_on;
15+
use futures::future::{TryFutureExt};
16+
use redis::aio::ConnectionManager;
17+
1518
const REDIS_URL_ENV: &str = "REDIS_URL";
1619
const REDIS_URL_DEFAULT: &str = "redis://127.0.0.1/";
17-
pub type RedisPooledConnection = r2d2::PooledConnection<RedisConnectionManager>;
18-
pub type RedisPool = r2d2::Pool<RedisConnectionManager>;
20+
pub type RedisPool = ConnectionManager;
1921

2022
#[derive(Debug)]
2123
pub struct ClientError {
@@ -25,19 +27,21 @@ pub struct ClientError {
2527
#[derive(Debug)]
2628
enum ErrorKind {
2729
Redis(redis::RedisError),
28-
PoolInit(r2d2::Error),
2930
}
3031

3132
impl std::error::Error for ClientError {}
3233

33-
pub fn create_redis_pool() -> Result<RedisPool, ClientError> {
34+
pub fn create_redis_pool() -> Result<ConnectionManager, ClientError> {
3435
let redis_url =
3536
&env::var(&REDIS_URL_ENV.to_owned()).unwrap_or_else(|_| REDIS_URL_DEFAULT.to_owned());
36-
let url = redis::parse_redis_url(redis_url).unwrap();
37-
let manager = RedisConnectionManager::new(url).unwrap();
38-
r2d2::Pool::new(manager).map_err(|err| ClientError {
39-
kind: ErrorKind::PoolInit(err),
40-
})
37+
// Note: this connection is multiplexed. Users of this object will call clone(), but the same underlying connection will be used.
38+
// https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html
39+
match block_on(ConnectionManager::new(redis::Client::open((*redis_url).clone()).unwrap())) {
40+
Ok(pool) => Ok(pool),
41+
Err(err) => Err(ClientError {
42+
kind: ErrorKind::Redis(err),
43+
}),
44+
}
4145
}
4246

4347
pub struct Job {
@@ -54,7 +58,6 @@ impl fmt::Display for ClientError {
5458
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
5559
match self.kind {
5660
ErrorKind::Redis(ref err) => err.fmt(f),
57-
ErrorKind::PoolInit(ref err) => err.fmt(f),
5861
}
5962
}
6063
}
@@ -67,14 +70,6 @@ impl From<redis::RedisError> for ClientError {
6770
}
6871
}
6972

70-
impl From<r2d2::Error> for ClientError {
71-
fn from(error: r2d2::Error) -> ClientError {
72-
ClientError {
73-
kind: ErrorKind::PoolInit(error),
74-
}
75-
}
76-
}
77-
7873
pub struct JobOpts {
7974
pub retry: i64,
8075
pub queue: String,
@@ -157,7 +152,7 @@ pub struct ClientOpts {
157152
}
158153

159154
pub struct Client {
160-
pub redis_pool: RedisPool,
155+
pub redis_pool: ConnectionManager,
161156
pub namespace: Option<String>,
162157
}
163158

@@ -202,22 +197,13 @@ pub struct Client {
202197
/// }
203198
/// ```
204199
impl Client {
205-
pub fn new(redis_pool: RedisPool, opts: ClientOpts) -> Client {
200+
pub fn new(redis_pool: ConnectionManager, opts: ClientOpts) -> Client {
206201
Client {
207202
redis_pool,
208203
namespace: opts.namespace,
209204
}
210205
}
211206

212-
fn connect(&self) -> Result<RedisPooledConnection, ClientError> {
213-
match self.redis_pool.get() {
214-
Ok(conn) => Ok(conn),
215-
Err(err) => Err(ClientError {
216-
kind: ErrorKind::PoolInit(err),
217-
}),
218-
}
219-
}
220-
221207
fn calc_at(&self, target_millsec_number: f64) -> Option<f64> {
222208
let maximum_target: f64 = 1_000_000_000_f64;
223209
let target_millsec: f64 = target_millsec_number;
@@ -238,46 +224,42 @@ impl Client {
238224

239225
pub fn perform_in(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
240226
let interval: f64 = interval.whole_seconds() as f64;
241-
self.raw_push(&[job], self.calc_at(interval))
227+
block_on(self.raw_push(&[job], self.calc_at(interval)))
242228
}
243229

244230
pub fn perform_at(&self, datetime: OffsetDateTime, job: Job) -> Result<(), ClientError> {
245231
let timestamp: f64 = datetime.unix_timestamp() as f64;
246-
self.raw_push(&[job], self.calc_at(timestamp))
232+
block_on(self.raw_push(&[job], self.calc_at(timestamp)))
247233
}
248234

249235
pub fn push(&self, job: Job) -> Result<(), ClientError> {
250-
self.raw_push(&[job], None)
236+
block_on(self.raw_push(&[job], None))
251237
}
252238

253239
pub fn push_bulk(&self, jobs: &[Job]) -> Result<(), ClientError> {
254-
self.raw_push(jobs, None)
240+
block_on(self.raw_push(jobs, None))
255241
}
256242

257-
fn raw_push(&self, payloads: &[Job], at: Option<f64>) -> Result<(), ClientError> {
243+
async fn raw_push(&self, payloads: &[Job], at: Option<f64>) -> Result<(), ClientError> {
258244
let payload = &payloads[0];
259245
let to_push = payloads
260246
.iter()
261247
.map(|entry| serde_json::to_string(&entry).unwrap())
262248
.collect::<Vec<_>>();
263249

264250
if let Some(value) = at {
265-
match self.connect() {
266-
Ok(mut conn) => redis::pipe()
251+
redis::pipe()
267252
.atomic()
268253
.cmd("ZADD")
269254
.arg(self.schedule_queue_name())
270255
.arg(value)
271256
.arg(to_push)
272-
.query(&mut *conn)
257+
.query_async(&mut self.redis_pool.clone())
273258
.map_err(|err| ClientError {
274259
kind: ErrorKind::Redis(err),
275-
}),
276-
Err(err) => Err(err),
277-
}
260+
}).await
278261
} else {
279-
match self.connect() {
280-
Ok(mut conn) => redis::pipe()
262+
redis::pipe()
281263
.atomic()
282264
.cmd("SADD")
283265
.arg("queues")
@@ -286,12 +268,10 @@ impl Client {
286268
.cmd("LPUSH")
287269
.arg(self.queue_name(&payload.queue))
288270
.arg(to_push)
289-
.query(&mut *conn)
271+
.query_async(&mut self.redis_pool.clone())
290272
.map_err(|err| ClientError {
291273
kind: ErrorKind::Redis(err),
292-
}),
293-
Err(err) => Err(err),
294-
}
274+
}).await
295275
}
296276
}
297277

0 commit comments

Comments
 (0)