Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve flexiblity of retry policy #584

Merged
merged 9 commits into from
Aug 1, 2022
6 changes: 3 additions & 3 deletions tower/src/retry/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ where
loop {
match this.state.as_mut().project() {
StateProj::Called(future) => {
let result = ready!(future.poll(cx));
if let Some(ref req) = this.request {
match this.retry.policy.retry(req, result.as_ref()) {
let mut result = ready!(future.poll(cx));
if let Some(ref mut req) = this.request {
match this.retry.policy.retry(req, &mut result) {
Some(checking) => {
this.state.set(State::Checking(checking));
}
Expand Down
4 changes: 2 additions & 2 deletions tower/src/retry/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::future::Future;
/// impl<E> Policy<Req, Res, E> for Attempts {
/// type Future = future::Ready<Self>;
///
/// fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
/// fn retry(&self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
/// match result {
/// Ok(_) => {
/// // Treat all `Response`s as success,
Expand Down Expand Up @@ -58,7 +58,7 @@ pub trait Policy<Req, Res, E>: Sized {
///
/// [`Service::Response`]: crate::Service::Response
/// [`Service::Error`]: crate::Service::Error
fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future>;
fn retry(&self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess getting a &mut Result<Res, E> allows users to change Err(_) into Ok(_) which might be kinda odd. Not sure. Feels like getting Result<&mut Res, &mut E> would fit the use-cases as well but not allow changing the Result variant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, that is a really good point david, what do you think about that @rcoh ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment wasn't threaded because of email:

The entire point is to change the variant (at least for me). Because there
are "success" cases the actually need to be retries and when I run out of
retries I need to change it to failure


/// Tries to clone a request before being passed to the inner service.
///
Expand Down
2 changes: 1 addition & 1 deletion tower/tests/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
{
type Future = Ready<Self>;

fn retry(&self, _req: &Req, _result: Result<&Res, &E>) -> Option<Self::Future> {
fn retry(&self, _req: &mut Req, _result: &mut Result<Res, E>) -> Option<Self::Future> {
None
}

Expand Down
62 changes: 57 additions & 5 deletions tower/tests/retry/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,30 @@ async fn success_with_cannot_clone() {
assert_ready_ok!(fut.poll(), "world");
}

#[tokio::test(flavor = "current_thread")]
async fn retry_mutating_policy() {
let _t = support::trace_init();

// Even though the request couldn't be cloned, if the first request succeeds,
// it should succeed overall.
let (mut service, mut handle) = new_service(MutatingPolicy { remaining: 2 });

assert_ready_ok!(service.poll_ready());
let mut fut = task::spawn(service.call("hello"));

assert_request_eq!(handle, "hello").send_response("world");
assert_pending!(fut.poll());
// the policy alters the request. in real life, this might be setting
// a header
assert_request_eq!(handle, "retrying").send_response("world");

assert_pending!(fut.poll());

assert_request_eq!(handle, "retrying").send_response("world");

assert_ready_err!(fut.poll(), "out of retries");
}

type Req = &'static str;
type Res = &'static str;
type InnerError = &'static str;
Expand All @@ -102,7 +126,7 @@ struct RetryErrors;

impl Policy<Req, Res, Error> for RetryErrors {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
fn retry(&self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
if result.is_err() {
Some(future::ready(RetryErrors))
} else {
Expand All @@ -120,7 +144,7 @@ struct Limit(usize);

impl Policy<Req, Res, Error> for Limit {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
fn retry(&self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
if result.is_err() && self.0 > 0 {
Some(future::ready(Limit(self.0 - 1)))
} else {
Expand All @@ -138,8 +162,8 @@ struct UnlessErr(InnerError);

impl Policy<Req, Res, Error> for UnlessErr {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option<Self::Future> {
result.err().and_then(|err| {
fn retry(&self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
result.as_ref().err().and_then(|err| {
if err.to_string() != self.0 {
Some(future::ready(self.clone()))
} else {
Expand All @@ -158,7 +182,7 @@ struct CannotClone;

impl Policy<Req, Res, Error> for CannotClone {
type Future = future::Ready<Self>;
fn retry(&self, _: &Req, _: Result<&Res, &Error>) -> Option<Self::Future> {
fn retry(&self, _: &mut Req, _: &mut Result<Res, Error>) -> Option<Self::Future> {
unreachable!("retry cannot be called since request isn't cloned");
}

Expand All @@ -167,6 +191,34 @@ impl Policy<Req, Res, Error> for CannotClone {
}
}

#[derive(Clone)]
struct MutatingPolicy {
remaining: usize,
}

impl Policy<Req, Res, Error> for MutatingPolicy
where
Error: From<&'static str>,
{
type Future = future::Ready<Self>;

fn retry(&self, req: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
if self.remaining == 0 {
*result = Err("out of retries".into());
None
} else {
*req = "retrying";
Some(future::ready(MutatingPolicy {
remaining: self.remaining - 1,
}))
}
}

fn clone_request(&self, req: &Req) -> Option<Req> {
Some(*req)
}
}

fn new_service<P: Policy<Req, Res, Error> + Clone>(
policy: P,
) -> (mock::Spawn<tower::retry::Retry<P, Mock>>, Handle) {
Expand Down