diff --git a/tower/CHANGELOG.md b/tower/CHANGELOG.md index 5bdc7e3da..63263f804 100644 --- a/tower/CHANGELOG.md +++ b/tower/CHANGELOG.md @@ -11,9 +11,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **util**: Removed deprecated `ServiceExt::ready_and` method and `ReadyAnd` future ([#652]) +- **retry**: **Breaking Change** `retry::Policy::retry` now accepts `&mut Req` and `&mut Res` instead of the previous mutable versions. This + increases the flexibility of the retry policy. To update, update your method signature to include `mut` for both parameters. ([#584]) [#652]: https://github.com/tower-rs/tower/pull/652 +[#584]: https://github.com/tower-rs/tower/pull/584 # 0.4.12 (February 16, 2022) diff --git a/tower/src/retry/future.rs b/tower/src/retry/future.rs index d18a5abb7..c23e6e3af 100644 --- a/tower/src/retry/future.rs +++ b/tower/src/retry/future.rs @@ -74,9 +74,9 @@ where loop { match this.state.as_mut().project() { StateProj::Called { future } => { - let result = ready!(future.poll(cx)); - if let Some(ref req) = this.request { - match this.retry.policy.retry(req, result.as_ref()) { + let mut result = ready!(future.poll(cx)); + if let Some(req) = &mut this.request { + match this.retry.policy.retry(req, &mut result) { Some(checking) => { this.state.set(State::Checking { checking }); } diff --git a/tower/src/retry/policy.rs b/tower/src/retry/policy.rs index 6f8a5b790..20adc3d6d 100644 --- a/tower/src/retry/policy.rs +++ b/tower/src/retry/policy.rs @@ -16,7 +16,7 @@ use std::future::Future; /// impl Policy for Attempts { /// type Future = future::Ready; /// -/// fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option { +/// fn retry(&self, req: &mut Req, result: &mut Result) -> Option { /// match result { /// Ok(_) => { /// // Treat all `Response`s as success, @@ -56,9 +56,28 @@ pub trait Policy: Sized { /// If the request *should* be retried, return `Some` future of a new /// policy that would apply for the next request attempt. /// + /// ## Mutating Requests + /// + /// The policy MAY chose to mutate the `req`: if the request is mutated, the + /// mutated request will be sent to the inner service in the next retry. + /// This can be helpful for use cases like tracking the retry count in a + /// header. + /// + /// ## Mutating Results + /// + /// The policy MAY chose to mutate the result. This enables the retry + /// policy to convert a failure into a success and vice versa. For example, + /// if the policy is used to poll while waiting for a state change, the + /// policy can switch the result to emit a specific error when retries are + /// exhausted. + /// + /// The policy can also record metadata on the request to include + /// information about the number of retries required or to record that a + /// failure failed after exhausting all retries. + /// /// [`Service::Response`]: crate::Service::Response /// [`Service::Error`]: crate::Service::Error - fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option; + fn retry(&self, req: &mut Req, result: &mut Result) -> Option; /// Tries to clone a request before being passed to the inner service. /// diff --git a/tower/tests/builder.rs b/tower/tests/builder.rs index 8bc8ce1c1..90bb70ae4 100644 --- a/tower/tests/builder.rs +++ b/tower/tests/builder.rs @@ -45,7 +45,7 @@ where { type Future = Ready; - fn retry(&self, _req: &Req, _result: Result<&Res, &E>) -> Option { + fn retry(&self, _req: &mut Req, _result: &mut Result) -> Option { None } diff --git a/tower/tests/retry/main.rs b/tower/tests/retry/main.rs index a07242b0b..d0d7a8aec 100644 --- a/tower/tests/retry/main.rs +++ b/tower/tests/retry/main.rs @@ -90,6 +90,27 @@ async fn success_with_cannot_clone() { assert_ready_ok!(fut.poll(), "world"); } +#[tokio::test(flavor = "current_thread")] +async fn retry_mutating_policy() { + let _t = support::trace_init(); + + let (mut service, mut handle) = new_service(MutatingPolicy { remaining: 2 }); + + assert_ready_ok!(service.poll_ready()); + let mut fut = task::spawn(service.call("hello")); + + assert_request_eq!(handle, "hello").send_response("world"); + assert_pending!(fut.poll()); + // the policy alters the request. in real life, this might be setting a header + assert_request_eq!(handle, "retrying").send_response("world"); + + assert_pending!(fut.poll()); + + assert_request_eq!(handle, "retrying").send_response("world"); + + assert_ready_err!(fut.poll(), "out of retries"); +} + type Req = &'static str; type Res = &'static str; type InnerError = &'static str; @@ -102,7 +123,7 @@ struct RetryErrors; impl Policy for RetryErrors { type Future = future::Ready; - fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { + fn retry(&self, _: &mut Req, result: &mut Result) -> Option { if result.is_err() { Some(future::ready(RetryErrors)) } else { @@ -120,7 +141,7 @@ struct Limit(usize); impl Policy for Limit { type Future = future::Ready; - fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { + fn retry(&self, _: &mut Req, result: &mut Result) -> Option { if result.is_err() && self.0 > 0 { Some(future::ready(Limit(self.0 - 1))) } else { @@ -138,8 +159,8 @@ struct UnlessErr(InnerError); impl Policy for UnlessErr { type Future = future::Ready; - fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { - result.err().and_then(|err| { + fn retry(&self, _: &mut Req, result: &mut Result) -> Option { + result.as_ref().err().and_then(|err| { if err.to_string() != self.0 { Some(future::ready(self.clone())) } else { @@ -158,7 +179,7 @@ struct CannotClone; impl Policy for CannotClone { type Future = future::Ready; - fn retry(&self, _: &Req, _: Result<&Res, &Error>) -> Option { + fn retry(&self, _: &mut Req, _: &mut Result) -> Option { unreachable!("retry cannot be called since request isn't cloned"); } @@ -167,6 +188,36 @@ impl Policy for CannotClone { } } +/// Test policy that changes the request to `retrying` during retries and the result to `"out of retries"` +/// when retries are exhausted. +#[derive(Clone)] +struct MutatingPolicy { + remaining: usize, +} + +impl Policy for MutatingPolicy +where + Error: From<&'static str>, +{ + type Future = future::Ready; + + fn retry(&self, req: &mut Req, result: &mut Result) -> Option { + if self.remaining == 0 { + *result = Err("out of retries".into()); + None + } else { + *req = "retrying"; + Some(future::ready(MutatingPolicy { + remaining: self.remaining - 1, + })) + } + } + + fn clone_request(&self, req: &Req) -> Option { + Some(*req) + } +} + fn new_service + Clone>( policy: P, ) -> (mock::Spawn>, Handle) {