@@ -4,18 +4,20 @@ use std::fmt;
4
4
use std:: time:: { SystemTime , UNIX_EPOCH } ;
5
5
6
6
use crate :: Value ;
7
- use r2d2_redis:: { r2d2, redis, RedisConnectionManager } ;
8
7
use rand:: distributions:: Alphanumeric ;
9
8
use rand:: { thread_rng, Rng } ;
10
9
use serde:: ser:: SerializeStruct ;
11
10
use serde:: { Serialize , Serializer } ;
12
11
13
12
use time:: { Duration , OffsetDateTime } ;
14
13
14
+ use futures:: executor:: block_on;
15
+ use futures:: future:: { TryFutureExt } ;
16
+ use redis:: aio:: ConnectionManager ;
17
+
15
18
const REDIS_URL_ENV : & str = "REDIS_URL" ;
16
19
const REDIS_URL_DEFAULT : & str = "redis://127.0.0.1/" ;
17
- pub type RedisPooledConnection = r2d2:: PooledConnection < RedisConnectionManager > ;
18
- pub type RedisPool = r2d2:: Pool < RedisConnectionManager > ;
20
+ pub type RedisPool = ConnectionManager ;
19
21
20
22
#[ derive( Debug ) ]
21
23
pub struct ClientError {
@@ -25,19 +27,21 @@ pub struct ClientError {
25
27
#[ derive( Debug ) ]
26
28
enum ErrorKind {
27
29
Redis ( redis:: RedisError ) ,
28
- PoolInit ( r2d2:: Error ) ,
29
30
}
30
31
31
32
impl std:: error:: Error for ClientError { }
32
33
33
- pub fn create_redis_pool ( ) -> Result < RedisPool , ClientError > {
34
+ pub fn create_redis_pool ( ) -> Result < ConnectionManager , ClientError > {
34
35
let redis_url =
35
36
& env:: var ( & REDIS_URL_ENV . to_owned ( ) ) . unwrap_or_else ( |_| REDIS_URL_DEFAULT . to_owned ( ) ) ;
36
- let url = redis:: parse_redis_url ( redis_url) . unwrap ( ) ;
37
- let manager = RedisConnectionManager :: new ( url) . unwrap ( ) ;
38
- r2d2:: Pool :: new ( manager) . map_err ( |err| ClientError {
39
- kind : ErrorKind :: PoolInit ( err) ,
40
- } )
37
+ // Note: this connection is multiplexed. Users of this object will call clone(), but the same underlying connection will be used.
38
+ // https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html
39
+ match block_on ( ConnectionManager :: new ( redis:: Client :: open ( ( * redis_url) . clone ( ) ) . unwrap ( ) ) ) {
40
+ Ok ( pool) => Ok ( pool) ,
41
+ Err ( err) => Err ( ClientError {
42
+ kind : ErrorKind :: Redis ( err) ,
43
+ } ) ,
44
+ }
41
45
}
42
46
43
47
pub struct Job {
@@ -54,7 +58,6 @@ impl fmt::Display for ClientError {
54
58
fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
55
59
match self . kind {
56
60
ErrorKind :: Redis ( ref err) => err. fmt ( f) ,
57
- ErrorKind :: PoolInit ( ref err) => err. fmt ( f) ,
58
61
}
59
62
}
60
63
}
@@ -67,14 +70,6 @@ impl From<redis::RedisError> for ClientError {
67
70
}
68
71
}
69
72
70
- impl From < r2d2:: Error > for ClientError {
71
- fn from ( error : r2d2:: Error ) -> ClientError {
72
- ClientError {
73
- kind : ErrorKind :: PoolInit ( error) ,
74
- }
75
- }
76
- }
77
-
78
73
pub struct JobOpts {
79
74
pub retry : i64 ,
80
75
pub queue : String ,
@@ -157,7 +152,7 @@ pub struct ClientOpts {
157
152
}
158
153
159
154
pub struct Client {
160
- pub redis_pool : RedisPool ,
155
+ pub redis_pool : ConnectionManager ,
161
156
pub namespace : Option < String > ,
162
157
}
163
158
@@ -202,22 +197,13 @@ pub struct Client {
202
197
/// }
203
198
/// ```
204
199
impl Client {
205
- pub fn new ( redis_pool : RedisPool , opts : ClientOpts ) -> Client {
200
+ pub fn new ( redis_pool : ConnectionManager , opts : ClientOpts ) -> Client {
206
201
Client {
207
202
redis_pool,
208
203
namespace : opts. namespace ,
209
204
}
210
205
}
211
206
212
- fn connect ( & self ) -> Result < RedisPooledConnection , ClientError > {
213
- match self . redis_pool . get ( ) {
214
- Ok ( conn) => Ok ( conn) ,
215
- Err ( err) => Err ( ClientError {
216
- kind : ErrorKind :: PoolInit ( err) ,
217
- } ) ,
218
- }
219
- }
220
-
221
207
fn calc_at ( & self , target_millsec_number : f64 ) -> Option < f64 > {
222
208
let maximum_target: f64 = 1_000_000_000_f64 ;
223
209
let target_millsec: f64 = target_millsec_number;
@@ -238,46 +224,42 @@ impl Client {
238
224
239
225
pub fn perform_in ( & self , interval : Duration , job : Job ) -> Result < ( ) , ClientError > {
240
226
let interval: f64 = interval. whole_seconds ( ) as f64 ;
241
- self . raw_push ( & [ job] , self . calc_at ( interval) )
227
+ block_on ( self . raw_push ( & [ job] , self . calc_at ( interval) ) )
242
228
}
243
229
244
230
pub fn perform_at ( & self , datetime : OffsetDateTime , job : Job ) -> Result < ( ) , ClientError > {
245
231
let timestamp: f64 = datetime. unix_timestamp ( ) as f64 ;
246
- self . raw_push ( & [ job] , self . calc_at ( timestamp) )
232
+ block_on ( self . raw_push ( & [ job] , self . calc_at ( timestamp) ) )
247
233
}
248
234
249
235
pub fn push ( & self , job : Job ) -> Result < ( ) , ClientError > {
250
- self . raw_push ( & [ job] , None )
236
+ block_on ( self . raw_push ( & [ job] , None ) )
251
237
}
252
238
253
239
pub fn push_bulk ( & self , jobs : & [ Job ] ) -> Result < ( ) , ClientError > {
254
- self . raw_push ( jobs, None )
240
+ block_on ( self . raw_push ( jobs, None ) )
255
241
}
256
242
257
- fn raw_push ( & self , payloads : & [ Job ] , at : Option < f64 > ) -> Result < ( ) , ClientError > {
243
+ async fn raw_push ( & self , payloads : & [ Job ] , at : Option < f64 > ) -> Result < ( ) , ClientError > {
258
244
let payload = & payloads[ 0 ] ;
259
245
let to_push = payloads
260
246
. iter ( )
261
247
. map ( |entry| serde_json:: to_string ( & entry) . unwrap ( ) )
262
248
. collect :: < Vec < _ > > ( ) ;
263
249
264
250
if let Some ( value) = at {
265
- match self . connect ( ) {
266
- Ok ( mut conn) => redis:: pipe ( )
251
+ redis:: pipe ( )
267
252
. atomic ( )
268
253
. cmd ( "ZADD" )
269
254
. arg ( self . schedule_queue_name ( ) )
270
255
. arg ( value)
271
256
. arg ( to_push)
272
- . query ( & mut * conn )
257
+ . query_async ( & mut self . redis_pool . clone ( ) )
273
258
. map_err ( |err| ClientError {
274
259
kind : ErrorKind :: Redis ( err) ,
275
- } ) ,
276
- Err ( err) => Err ( err) ,
277
- }
260
+ } ) . await
278
261
} else {
279
- match self . connect ( ) {
280
- Ok ( mut conn) => redis:: pipe ( )
262
+ redis:: pipe ( )
281
263
. atomic ( )
282
264
. cmd ( "SADD" )
283
265
. arg ( "queues" )
@@ -286,12 +268,10 @@ impl Client {
286
268
. cmd ( "LPUSH" )
287
269
. arg ( self . queue_name ( & payload. queue ) )
288
270
. arg ( to_push)
289
- . query ( & mut * conn )
271
+ . query_async ( & mut self . redis_pool . clone ( ) )
290
272
. map_err ( |err| ClientError {
291
273
kind : ErrorKind :: Redis ( err) ,
292
- } ) ,
293
- Err ( err) => Err ( err) ,
294
- }
274
+ } ) . await
295
275
}
296
276
}
297
277
0 commit comments