Skip to content

Commit

Permalink
Add async methods for callers that run inside an async executor
Browse files Browse the repository at this point in the history
This otherwise errors out with an error like this, since you can't
use block_on inside another block_on:

"cannot execute `LocalPool` executor from within another executor"
  • Loading branch information
lfittl committed Jul 28, 2022
1 parent 8b1f259 commit 8547860
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ extern crate serde_json;

mod sidekiq;
pub use crate::sidekiq::{
create_redis_pool, Client, ClientError, ClientOpts, Job, JobOpts, RedisPool,
create_redis_pool, create_async_redis_pool, Client, ClientError, ClientOpts, Job, JobOpts, RedisPool,
};
pub use serde_json::value::Value;
43 changes: 37 additions & 6 deletions src/sidekiq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ pub fn create_redis_pool() -> Result<ConnectionManager, ClientError> {
}
}

pub async fn create_async_redis_pool() -> Result<ConnectionManager, ClientError> {
let redis_url =
&env::var(&REDIS_URL_ENV.to_owned()).unwrap_or_else(|_| REDIS_URL_DEFAULT.to_owned());
// Note: this connection is multiplexed. Users of this object will call clone(), but the same underlying connection will be used.
// https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html
match ConnectionManager::new(
redis::Client::open((*redis_url).clone()).unwrap(),
).await {
Ok(pool) => Ok(pool),
Err(err) => Err(ClientError {
kind: ErrorKind::Redis(err),
}),
}
}

pub struct Job {
pub class: String,
pub args: Vec<Value>,
Expand Down Expand Up @@ -225,21 +240,37 @@ impl Client {
}

pub fn perform_in(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
let interval: f64 = interval.whole_seconds() as f64;
block_on(self.raw_push(&[job], self.calc_at(interval)))
block_on(self.perform_in_async(interval, job))
}

pub fn perform_at(&self, datetime: OffsetDateTime, job: Job) -> Result<(), ClientError> {
let timestamp: f64 = datetime.unix_timestamp() as f64;
block_on(self.raw_push(&[job], self.calc_at(timestamp)))
block_on(self.perform_at_async(datetime, job))
}

pub fn push(&self, job: Job) -> Result<(), ClientError> {
block_on(self.raw_push(&[job], None))
block_on(self.push_async(job))
}

pub fn push_bulk(&self, jobs: &[Job]) -> Result<(), ClientError> {
block_on(self.raw_push(jobs, None))
block_on(self.push_bulk_async(jobs))
}

pub async fn perform_in_async(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
let interval: f64 = interval.whole_seconds() as f64;
self.raw_push(&[job], self.calc_at(interval)).await
}

pub async fn perform_at_async(&self, datetime: OffsetDateTime, job: Job) -> Result<(), ClientError> {
let timestamp: f64 = datetime.unix_timestamp() as f64;
self.raw_push(&[job], self.calc_at(timestamp)).await
}

pub async fn push_async(&self, job: Job) -> Result<(), ClientError> {
self.raw_push(&[job], None).await
}

pub async fn push_bulk_async(&self, jobs: &[Job]) -> Result<(), ClientError> {
self.raw_push(jobs, None).await
}

async fn raw_push(&self, payloads: &[Job], at: Option<f64>) -> Result<(), ClientError> {
Expand Down

0 comments on commit 8547860

Please sign in to comment.