Skip to content

Commit

Permalink
Replace use of r2d2 with redis-rs ConnectionManager
Browse files Browse the repository at this point in the history
Using r2d2 with redis isn't really necessary, as the redis crate can
multiplex a connection, and in fact its the recommended approach for most
use cases: redis-rs/redis-rs#388 (comment)

Therefore this changes things to use redis-rs internally, with the intent
of adding an async API to this crate in the future. Additionally this
enables TLS for Redis connections, to support Redis servers that require
TLS.

This change is mostly API-compatible, but does change the underlying
RedisPool type (its still called "pool", but its actually a
ConnectionManager), and drops the exported type RedisPooledConnection.
  • Loading branch information
lfittl committed Jul 28, 2022
1 parent fa2db19 commit 8b1f259
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 69 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ edition = "2018"
travis-ci = { repository = "spk/rust-sidekiq" }

[dependencies]
futures = "*"
rand = "0.8"
serde = "1.0"
serde_json = "1.0"
r2d2 = "0.8"
r2d2_redis = "0.14"
redis = { version = "*", features = ["connection-manager", "async-std-comp", "async-std-tls-comp"] }
time = "0.3"
3 changes: 0 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@
#![deny(warnings)]
#![crate_name = "sidekiq"]

extern crate r2d2;
extern crate r2d2_redis;
extern crate rand;
extern crate serde;
extern crate serde_json;

mod sidekiq;
pub use crate::sidekiq::{
create_redis_pool, Client, ClientError, ClientOpts, Job, JobOpts, RedisPool,
RedisPooledConnection,
};
pub use serde_json::value::Value;
112 changes: 48 additions & 64 deletions src/sidekiq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ use std::fmt;
use std::time::{SystemTime, UNIX_EPOCH};

use crate::Value;
use r2d2_redis::{r2d2, redis, RedisConnectionManager};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use serde::ser::SerializeStruct;
use serde::{Serialize, Serializer};

use time::{Duration, OffsetDateTime};

use futures::executor::block_on;
use futures::future::TryFutureExt;
use redis::aio::ConnectionManager;

const REDIS_URL_ENV: &str = "REDIS_URL";
const REDIS_URL_DEFAULT: &str = "redis://127.0.0.1/";
pub type RedisPooledConnection = r2d2::PooledConnection<RedisConnectionManager>;
pub type RedisPool = r2d2::Pool<RedisConnectionManager>;
pub type RedisPool = ConnectionManager;

#[derive(Debug)]
pub struct ClientError {
Expand All @@ -25,19 +27,23 @@ pub struct ClientError {
#[derive(Debug)]
enum ErrorKind {
Redis(redis::RedisError),
PoolInit(r2d2::Error),
}

impl std::error::Error for ClientError {}

pub fn create_redis_pool() -> Result<RedisPool, ClientError> {
pub fn create_redis_pool() -> Result<ConnectionManager, ClientError> {
let redis_url =
&env::var(&REDIS_URL_ENV.to_owned()).unwrap_or_else(|_| REDIS_URL_DEFAULT.to_owned());
let url = redis::parse_redis_url(redis_url).unwrap();
let manager = RedisConnectionManager::new(url).unwrap();
r2d2::Pool::new(manager).map_err(|err| ClientError {
kind: ErrorKind::PoolInit(err),
})
// Note: this connection is multiplexed. Users of this object will call clone(), but the same underlying connection will be used.
// https://docs.rs/redis/latest/redis/aio/struct.ConnectionManager.html
match block_on(ConnectionManager::new(
redis::Client::open((*redis_url).clone()).unwrap(),
)) {
Ok(pool) => Ok(pool),
Err(err) => Err(ClientError {
kind: ErrorKind::Redis(err),
}),
}
}

pub struct Job {
Expand All @@ -54,7 +60,6 @@ impl fmt::Display for ClientError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.kind {
ErrorKind::Redis(ref err) => err.fmt(f),
ErrorKind::PoolInit(ref err) => err.fmt(f),
}
}
}
Expand All @@ -67,14 +72,6 @@ impl From<redis::RedisError> for ClientError {
}
}

impl From<r2d2::Error> for ClientError {
fn from(error: r2d2::Error) -> ClientError {
ClientError {
kind: ErrorKind::PoolInit(error),
}
}
}

pub struct JobOpts {
pub retry: i64,
pub queue: String,
Expand Down Expand Up @@ -157,7 +154,7 @@ pub struct ClientOpts {
}

pub struct Client {
pub redis_pool: RedisPool,
pub redis_pool: ConnectionManager,
pub namespace: Option<String>,
}

Expand Down Expand Up @@ -202,22 +199,13 @@ pub struct Client {
/// }
/// ```
impl Client {
pub fn new(redis_pool: RedisPool, opts: ClientOpts) -> Client {
pub fn new(redis_pool: ConnectionManager, opts: ClientOpts) -> Client {
Client {
redis_pool,
namespace: opts.namespace,
}
}

fn connect(&self) -> Result<RedisPooledConnection, ClientError> {
match self.redis_pool.get() {
Ok(conn) => Ok(conn),
Err(err) => Err(ClientError {
kind: ErrorKind::PoolInit(err),
}),
}
}

fn calc_at(&self, target_millsec_number: f64) -> Option<f64> {
let maximum_target: f64 = 1_000_000_000_f64;
let target_millsec: f64 = target_millsec_number;
Expand All @@ -238,60 +226,56 @@ impl Client {

pub fn perform_in(&self, interval: Duration, job: Job) -> Result<(), ClientError> {
let interval: f64 = interval.whole_seconds() as f64;
self.raw_push(&[job], self.calc_at(interval))
block_on(self.raw_push(&[job], self.calc_at(interval)))
}

pub fn perform_at(&self, datetime: OffsetDateTime, job: Job) -> Result<(), ClientError> {
let timestamp: f64 = datetime.unix_timestamp() as f64;
self.raw_push(&[job], self.calc_at(timestamp))
block_on(self.raw_push(&[job], self.calc_at(timestamp)))
}

pub fn push(&self, job: Job) -> Result<(), ClientError> {
self.raw_push(&[job], None)
block_on(self.raw_push(&[job], None))
}

pub fn push_bulk(&self, jobs: &[Job]) -> Result<(), ClientError> {
self.raw_push(jobs, None)
block_on(self.raw_push(jobs, None))
}

fn raw_push(&self, payloads: &[Job], at: Option<f64>) -> Result<(), ClientError> {
async fn raw_push(&self, payloads: &[Job], at: Option<f64>) -> Result<(), ClientError> {
let payload = &payloads[0];
let to_push = payloads
.iter()
.map(|entry| serde_json::to_string(&entry).unwrap())
.collect::<Vec<_>>();

if let Some(value) = at {
match self.connect() {
Ok(mut conn) => redis::pipe()
.atomic()
.cmd("ZADD")
.arg(self.schedule_queue_name())
.arg(value)
.arg(to_push)
.query(&mut *conn)
.map_err(|err| ClientError {
kind: ErrorKind::Redis(err),
}),
Err(err) => Err(err),
}
redis::pipe()
.atomic()
.cmd("ZADD")
.arg(self.schedule_queue_name())
.arg(value)
.arg(to_push)
.query_async(&mut self.redis_pool.clone())
.map_err(|err| ClientError {
kind: ErrorKind::Redis(err),
})
.await
} else {
match self.connect() {
Ok(mut conn) => redis::pipe()
.atomic()
.cmd("SADD")
.arg("queues")
.arg(payload.queue.to_string())
.ignore()
.cmd("LPUSH")
.arg(self.queue_name(&payload.queue))
.arg(to_push)
.query(&mut *conn)
.map_err(|err| ClientError {
kind: ErrorKind::Redis(err),
}),
Err(err) => Err(err),
}
redis::pipe()
.atomic()
.cmd("SADD")
.arg("queues")
.arg(payload.queue.to_string())
.ignore()
.cmd("LPUSH")
.arg(self.queue_name(&payload.queue))
.arg(to_push)
.query_async(&mut self.redis_pool.clone())
.map_err(|err| ClientError {
kind: ErrorKind::Redis(err),
})
.await
}
}

Expand Down

0 comments on commit 8b1f259

Please sign in to comment.