Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace use of r2d2 with redis-rs ConnectionManager #18

Merged
merged 2 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ edition = "2018"
travis-ci = { repository = "spk/rust-sidekiq" }

[dependencies]
futures = "*"
rand = "0.8"
serde = "1.0"
serde_json = "1.0"
r2d2 = "0.8"
r2d2_redis = "0.14"
redis = { version = "*", features = ["connection-manager", "async-std-comp", "async-std-tls-comp"] }
time = "0.3"
6 changes: 2 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
#![deny(warnings)]
#![crate_name = "sidekiq"]

extern crate r2d2;
extern crate r2d2_redis;
extern crate rand;
extern crate serde;
extern crate serde_json;

mod sidekiq;
pub use crate::sidekiq::{
create_redis_pool, Client, ClientError, ClientOpts, Job, JobOpts, RedisPool,
RedisPooledConnection,
create_async_redis_pool, create_redis_pool, Client, ClientError, ClientOpts, Job, JobOpts,
RedisPool,
};
pub use serde_json::value::Value;
138 changes: 72 additions & 66 deletions src/sidekiq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ use std::fmt;
use std::time::{SystemTime, UNIX_EPOCH};

use crate::Value;
use r2d2_redis::{r2d2, redis, RedisConnectionManager};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use serde::ser::SerializeStruct;
use serde::{Serialize, Serializer};

use time::{Duration, OffsetDateTime};

use futures::executor::block_on;
use futures::future::TryFutureExt;
use redis::aio::ConnectionManager;

const REDIS_URL_ENV: &str = "REDIS_URL";
const REDIS_URL_DEFAULT: &str = "redis://127.0.0.1/";
pub type RedisPooledConnection = r2d2::PooledConnection<RedisConnectionManager>;
pub type RedisPool = r2d2::Pool<RedisConnectionManager>;
pub type RedisPool = ConnectionManager;

#[derive(Debug)]
pub struct ClientError {
Expand All @@ -25,19 +27,25 @@ pub struct ClientError {
#[derive(Debug)]
enum ErrorKind {
Redis(redis::RedisError),
PoolInit(r2d2::Error),
}

impl std::error::Error for ClientError {}

pub fn create_redis_pool() -> Result<RedisPool, ClientError> {
pub fn create_redis_pool() -> Result<ConnectionManager, ClientError> {
block_on(create_async_redis_pool())
}

pub async fn create_async_redis_pool() -> Result<ConnectionManager, ClientError> {
let redis_url =
&env::var(&REDIS_URL_ENV.to_owned()).unwrap_or_else(|_| REDIS_URL_DEFAULT.to_owned());
let url = redis::parse_redis_url(redis_url).unwrap();
let manager = RedisConnectionManager::new(url).unwrap();
r2d2::Pool::new(manager).map_err(|err| ClientError {
kind: ErrorKind::PoolInit(err),
})
// Note: this connection is multiplexed. Users of this object will call clone(), but the same underlying connection will be used.
// https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html
match ConnectionManager::new(redis::Client::open((*redis_url).clone()).unwrap()).await {
Ok(pool) => Ok(pool),
Err(err) => Err(ClientError {
kind: ErrorKind::Redis(err),
}),
}
}

pub struct Job {
Expand All @@ -54,7 +62,6 @@ impl fmt::Display for ClientError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.kind {
ErrorKind::Redis(ref err) => err.fmt(f),
ErrorKind::PoolInit(ref err) => err.fmt(f),
}
}
}
Expand All @@ -67,14 +74,6 @@ impl From<redis::RedisError> for ClientError {
}
}

impl From<r2d2::Error> for ClientError {
fn from(error: r2d2::Error) -> ClientError {
ClientError {
kind: ErrorKind::PoolInit(error),
}
}
}

pub struct JobOpts {
pub retry: i64,
pub queue: String,
Expand Down Expand Up @@ -157,7 +156,7 @@ pub struct ClientOpts {
}

pub struct Client {
pub redis_pool: RedisPool,
pub redis_pool: ConnectionManager,
pub namespace: Option<String>,
}

Expand Down Expand Up @@ -202,22 +201,13 @@ pub struct Client {
/// }
/// ```
impl Client {
pub fn new(redis_pool: RedisPool, opts: ClientOpts) -> Client {
pub fn new(redis_pool: ConnectionManager, opts: ClientOpts) -> Client {
Client {
redis_pool,
namespace: opts.namespace,
}
}

fn connect(&self) -> Result<RedisPooledConnection, ClientError> {
match self.redis_pool.get() {
Ok(conn) => Ok(conn),
Err(err) => Err(ClientError {
kind: ErrorKind::PoolInit(err),
}),
}
}

fn calc_at(&self, target_millsec_number: f64) -> Option<f64> {
let maximum_target: f64 = 1_000_000_000_f64;
let target_millsec: f64 = target_millsec_number;
Expand All @@ -237,61 +227,77 @@ impl Client {
}

pub fn perform_in(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
let interval: f64 = interval.whole_seconds() as f64;
self.raw_push(&[job], self.calc_at(interval))
block_on(self.perform_in_async(interval, job))
}

pub fn perform_at(&self, datetime: OffsetDateTime, job: Job) -> Result<(), ClientError> {
let timestamp: f64 = datetime.unix_timestamp() as f64;
self.raw_push(&[job], self.calc_at(timestamp))
block_on(self.perform_at_async(datetime, job))
}

pub fn push(&self, job: Job) -> Result<(), ClientError> {
self.raw_push(&[job], None)
block_on(self.push_async(job))
}

pub fn push_bulk(&self, jobs: &[Job]) -> Result<(), ClientError> {
self.raw_push(jobs, None)
block_on(self.push_bulk_async(jobs))
}

pub async fn perform_in_async(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
let interval: f64 = interval.whole_seconds() as f64;
self.raw_push(&[job], self.calc_at(interval)).await
}

pub async fn perform_at_async(
&self,
datetime: OffsetDateTime,
job: Job,
) -> Result<(), ClientError> {
let timestamp: f64 = datetime.unix_timestamp() as f64;
self.raw_push(&[job], self.calc_at(timestamp)).await
}

pub async fn push_async(&self, job: Job) -> Result<(), ClientError> {
self.raw_push(&[job], None).await
}

pub async fn push_bulk_async(&self, jobs: &[Job]) -> Result<(), ClientError> {
self.raw_push(jobs, None).await
}

fn raw_push(&self, payloads: &[Job], at: Option<f64>) -> Result<(), ClientError> {
async fn raw_push(&self, payloads: &[Job], at: Option<f64>) -> Result<(), ClientError> {
let payload = &payloads[0];
let to_push = payloads
.iter()
.map(|entry| serde_json::to_string(&entry).unwrap())
.collect::<Vec<_>>();

if let Some(value) = at {
match self.connect() {
Ok(mut conn) => redis::pipe()
.atomic()
.cmd("ZADD")
.arg(self.schedule_queue_name())
.arg(value)
.arg(to_push)
.query(&mut *conn)
.map_err(|err| ClientError {
kind: ErrorKind::Redis(err),
}),
Err(err) => Err(err),
}
redis::pipe()
.atomic()
.cmd("ZADD")
.arg(self.schedule_queue_name())
.arg(value)
.arg(to_push)
.query_async(&mut self.redis_pool.clone())
.map_err(|err| ClientError {
kind: ErrorKind::Redis(err),
})
.await
} else {
match self.connect() {
Ok(mut conn) => redis::pipe()
.atomic()
.cmd("SADD")
.arg("queues")
.arg(payload.queue.to_string())
.ignore()
.cmd("LPUSH")
.arg(self.queue_name(&payload.queue))
.arg(to_push)
.query(&mut *conn)
.map_err(|err| ClientError {
kind: ErrorKind::Redis(err),
}),
Err(err) => Err(err),
}
redis::pipe()
.atomic()
.cmd("SADD")
.arg("queues")
.arg(payload.queue.to_string())
.ignore()
.cmd("LPUSH")
.arg(self.queue_name(&payload.queue))
.arg(to_push)
.query_async(&mut self.redis_pool.clone())
.map_err(|err| ClientError {
kind: ErrorKind::Redis(err),
})
.await
}
}

Expand Down