Skip to content

Commit 8a85a0b

Browse files
committed
Add async methods for callers that run inside an async executor
This otherwise errors out with an error like this, since you can't use block_on inside another block_on: "cannot execute `LocalPool` executor from within another executor"
1 parent 8b1f259 commit 8a85a0b

File tree

2 files changed

+33
-10
lines changed

2 files changed

+33
-10
lines changed

src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ extern crate serde_json;
1616

1717
mod sidekiq;
1818
pub use crate::sidekiq::{
19-
create_redis_pool, Client, ClientError, ClientOpts, Job, JobOpts, RedisPool,
19+
create_async_redis_pool, create_redis_pool, Client, ClientError, ClientOpts, Job, JobOpts,
20+
RedisPool,
2021
};
2122
pub use serde_json::value::Value;

src/sidekiq/mod.rs

+31-9
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@ enum ErrorKind {
3232
impl std::error::Error for ClientError {}
3333

3434
pub fn create_redis_pool() -> Result<ConnectionManager, ClientError> {
35+
block_on(create_async_redis_pool())
36+
}
37+
38+
pub async fn create_async_redis_pool() -> Result<ConnectionManager, ClientError> {
3539
let redis_url =
3640
&env::var(&REDIS_URL_ENV.to_owned()).unwrap_or_else(|_| REDIS_URL_DEFAULT.to_owned());
3741
// Note: this connection is multiplexed. Users of this object will call clone(), but the same underlying connection will be used.
3842
// https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html
39-
match block_on(ConnectionManager::new(
40-
redis::Client::open((*redis_url).clone()).unwrap(),
41-
)) {
43+
match ConnectionManager::new(redis::Client::open((*redis_url).clone()).unwrap()).await {
4244
Ok(pool) => Ok(pool),
4345
Err(err) => Err(ClientError {
4446
kind: ErrorKind::Redis(err),
@@ -225,21 +227,41 @@ impl Client {
225227
}
226228

227229
pub fn perform_in(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
228-
let interval: f64 = interval.whole_seconds() as f64;
229-
block_on(self.raw_push(&[job], self.calc_at(interval)))
230+
block_on(self.perform_in_async(interval, job))
230231
}
231232

232233
pub fn perform_at(&self, datetime: OffsetDateTime, job: Job) -> Result<(), ClientError> {
233-
let timestamp: f64 = datetime.unix_timestamp() as f64;
234-
block_on(self.raw_push(&[job], self.calc_at(timestamp)))
234+
block_on(self.perform_at_async(datetime, job))
235235
}
236236

237237
pub fn push(&self, job: Job) -> Result<(), ClientError> {
238-
block_on(self.raw_push(&[job], None))
238+
block_on(self.push_async(job))
239239
}
240240

241241
pub fn push_bulk(&self, jobs: &[Job]) -> Result<(), ClientError> {
242-
block_on(self.raw_push(jobs, None))
242+
block_on(self.push_bulk_async(jobs))
243+
}
244+
245+
pub async fn perform_in_async(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
246+
let interval: f64 = interval.whole_seconds() as f64;
247+
self.raw_push(&[job], self.calc_at(interval)).await
248+
}
249+
250+
pub async fn perform_at_async(
251+
&self,
252+
datetime: OffsetDateTime,
253+
job: Job,
254+
) -> Result<(), ClientError> {
255+
let timestamp: f64 = datetime.unix_timestamp() as f64;
256+
self.raw_push(&[job], self.calc_at(timestamp)).await
257+
}
258+
259+
pub async fn push_async(&self, job: Job) -> Result<(), ClientError> {
260+
self.raw_push(&[job], None).await
261+
}
262+
263+
pub async fn push_bulk_async(&self, jobs: &[Job]) -> Result<(), ClientError> {
264+
self.raw_push(jobs, None).await
243265
}
244266

245267
async fn raw_push(&self, payloads: &[Job], at: Option<f64>) -> Result<(), ClientError> {

0 commit comments

Comments
 (0)