Skip to content

Commit ddc64e8

Browse files
hawkwLucioFranco
andauthored
update to Tokio 0.3 (#476)
This branch updates Tower to Tokio 0.3. Unlike #474, this branch uses Tokio 0.3's synchronization primitives, rather than continuing to depend on Tokio 0.2. I think that we ought to try to use Tokio 0.3's channels whenever feasible, because the 0.2 channels have pathological memory usage patterns in some cases (see tokio-rs/tokio#2637). @LucioFranco let me know what you think of the approach used here and we can compare notes! For the most part, this was a pretty mechanical change: updating versions in Cargo.toml, tracking feature flag changes, renaming `tokio::time::delay` to `sleep`, and so on. Tokio's channel receivers also lost their `poll_recv` methods, but we can easily replicate that by enabling the `"stream"` feature and using `poll_next` instead. The one actually significant change is that `tokio::sync::mpsc::Sender` lost its `poll_ready` method, which impacts the way `tower::buffer` is implemeted. When the buffer's channel is full, we want to exert backpressure in `poll_ready`, so that callers such as load balancers could choose to call another service rather than waiting for buffer capacity. Previously, we did this by calling `poll_ready` on the underlying channel sender. Unfortunately, this can't be done easily using Tokio 0.3's bounded MPSC channel, because it no longer exposes a polling-based interface, only an `async fn ready`, which borrows the sender. Therefore, we implement our own bounded MPSC on top of the unbounded channel, using a semaphore to limit how many items are in the channel. I factored out the code for polling a semaphore acquire future from `limit::concurrency` into its own module, and reused it in `Buffer`. Additionally, the buffer tests needed to be updated, because they currently don't actually poll the buffer service before calling it. This violates the `Service` contract, and the new code actually fails as a result. Closes #473 Closes #474 Co-authored-by: Lucio Franco <[email protected]> Signed-off-by: Eliza Weisman <[email protected]>
1 parent 1a84543 commit ddc64e8

File tree

22 files changed

+316
-149
lines changed

22 files changed

+316
-149
lines changed

deny.toml

+6-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@ confidence-threshold = 0.8
1515
[bans]
1616
multiple-versions = "deny"
1717
highlight = "all"
18-
skip-tree = [
19-
{ name = "tower", version = "=0.3"}
18+
skip-tree = [{ name = "tower", version = ">=0.3, <=0.4" }]
19+
skip = [
20+
# `quickcheck` and `tracing-subscriber` depend on incompatible versions of
21+
# `wasi` via their dependencies on `rand` and `chrono`, respectively; we
22+
# can't really fix this.
23+
{ name = "wasi" },
2024
]
2125

2226
[sources]

examples/Cargo.toml

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[package]
2+
name = "examples"
3+
version = "0.0.0"
4+
publish = false
5+
edition = "2018"
6+
7+
# If you copy one of the examples into a new project, you should be using
8+
# [dependencies] instead.
9+
[dev-dependencies]
10+
tower = { version = "0.4", path = "../tower", features = ["full"] }
11+
tower-service = "0.3"
12+
tokio = { version = "0.3", features = ["full"] }
13+
rand = "0.7"
14+
pin-project = "1.0"
15+
futures = "0.3"
16+
tracing = "0.1"
17+
tracing-subscriber = "0.2"
18+
hdrhistogram = "7"
19+
20+
[[example]]
21+
name = "balance"
22+
path = "balance.rs"

tower-test/Cargo.toml

+5-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ name = "tower-test"
88
# - README.md
99
# - Update CHANGELOG.md.
1010
# - Create "v0.1.x" git tag.
11-
version = "0.3.0"
11+
version = "0.4.0"
1212
authors = ["Tower Maintainers <[email protected]>"]
1313
license = "MIT"
1414
readme = "README.md"
@@ -23,11 +23,11 @@ edition = "2018"
2323

2424
[dependencies]
2525
futures-util = { version = "0.3", default-features = false }
26-
tokio = { version = "0.2", features = ["sync"]}
26+
tokio = { version = "0.3", features = ["sync"] }
27+
tokio-test = { version = "0.3" }
2728
tower-layer = { version = "0.3", path = "../tower-layer" }
28-
tokio-test = "0.2"
2929
tower-service = { version = "0.3" }
30-
pin-project = "0.4.17"
30+
pin-project = "1"
3131

3232
[dev-dependencies]
33-
tokio = { version = "0.2", features = ["macros"] }
33+
tokio = { version = "0.3", features = ["macros"] }

tower/Cargo.toml

+11-11
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ name = "tower"
88
# - README.md
99
# - Update CHANGELOG.md.
1010
# - Create "vX.X.X" git tag.
11-
version = "0.3.1"
11+
version = "0.4.0"
1212
authors = ["Tower Maintainers <[email protected]>"]
1313
license = "MIT"
1414
readme = "README.md"
@@ -26,26 +26,26 @@ edition = "2018"
2626
[features]
2727
default = ["log"]
2828
log = ["tracing/log"]
29-
balance = ["discover", "load", "ready-cache", "make", "rand", "slab"]
30-
buffer = ["tokio/sync", "tokio/rt-core"]
29+
balance = ["discover", "load", "ready-cache", "make", "rand", "slab", "tokio/stream"]
30+
buffer = ["tokio/sync", "tokio/rt", "tokio/stream"]
3131
discover = []
3232
filter = []
3333
hedge = ["util", "filter", "futures-util", "hdrhistogram", "tokio/time"]
34-
limit = ["tokio/time"]
34+
limit = ["tokio/time", "tokio/sync"]
3535
load = ["tokio/time"]
3636
load-shed = []
3737
make = ["tokio/io-std"]
3838
ready-cache = ["futures-util", "indexmap", "tokio/sync"]
3939
reconnect = ["make", "tokio/io-std"]
4040
retry = ["tokio/time"]
41-
spawn-ready = ["futures-util", "tokio/sync", "tokio/rt-core"]
41+
spawn-ready = ["futures-util", "tokio/sync", "tokio/rt"]
4242
steer = ["futures-util"]
4343
timeout = ["tokio/time"]
4444
util = ["futures-util"]
4545

4646
[dependencies]
4747
futures-core = "0.3"
48-
pin-project = "0.4.17"
48+
pin-project = "1"
4949
tower-layer = { version = "0.3", path = "../tower-layer" }
5050
tower-service = { version = "0.3" }
5151
tracing = "0.1.2"
@@ -55,16 +55,16 @@ hdrhistogram = { version = "6.0", optional = true }
5555
indexmap = { version = "1.0.2", optional = true }
5656
rand = { version = "0.7", features = ["small_rng"], optional = true }
5757
slab = { version = "0.4", optional = true }
58-
tokio = { version = "0.2", optional = true, features = ["sync"] }
58+
tokio = { version = "0.3", optional = true, features = ["sync"] }
5959

6060
[dev-dependencies]
6161
futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await"] }
6262
hdrhistogram = "6.0"
6363
quickcheck = { version = "0.9", default-features = false }
64-
tokio = { version = "0.2", features = ["macros", "stream", "sync", "test-util" ] }
65-
tokio-test = "0.2"
66-
tower-test = { version = "0.3", path = "../tower-test" }
67-
tracing-subscriber = "0.1.1"
64+
tokio = { version = "0.3", features = ["macros", "stream", "sync", "test-util", "rt-multi-thread"] }
65+
tokio-test = "0.3"
66+
tower-test = { version = "0.4", path = "../tower-test" }
67+
tracing-subscriber = "0.2.14"
6868
# env_logger = { version = "0.5.3", default-features = false }
6969
# log = "0.4.1"
7070

tower/examples/tower-balance.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ fn gen_disco() -> impl Discover<
118118
let latency = Duration::from_millis(rand::thread_rng().gen_range(0, maxms));
119119

120120
async move {
121-
time::delay_until(start + latency).await;
121+
time::sleep_until(start + latency).await;
122122
let latency = start.elapsed();
123123
Ok(Rsp { latency, instance })
124124
}

tower/src/balance/pool/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ where
9090
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
9191
let mut this = self.project();
9292

93-
while let Poll::Ready(Some(sid)) = this.died_rx.as_mut().poll_recv(cx) {
93+
while let Poll::Ready(Some(sid)) = this.died_rx.as_mut().poll_next(cx) {
9494
this.services.remove(sid);
9595
tracing::trace!(
9696
pool.services = this.services.len(),

tower/src/buffer/message.rs

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub(crate) struct Message<Request, Fut> {
77
pub(crate) request: Request,
88
pub(crate) tx: Tx<Fut>,
99
pub(crate) span: tracing::Span,
10+
pub(super) _permit: crate::semaphore::Permit,
1011
}
1112

1213
/// Response sender

tower/src/buffer/service.rs

+55-31
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use super::{
44
worker::{Handle, Worker},
55
};
66

7+
use crate::semaphore::Semaphore;
78
use futures_core::ready;
89
use std::task::{Context, Poll};
910
use tokio::sync::{mpsc, oneshot};
@@ -17,7 +18,19 @@ pub struct Buffer<T, Request>
1718
where
1819
T: Service<Request>,
1920
{
20-
tx: mpsc::Sender<Message<Request, T::Future>>,
21+
// Note: this actually _is_ bounded, but rather than using Tokio's unbounded
22+
// channel, we use tokio's semaphore separately to implement the bound.
23+
tx: mpsc::UnboundedSender<Message<Request, T::Future>>,
24+
// When the buffer's channel is full, we want to exert backpressure in
25+
// `poll_ready`, so that callers such as load balancers could choose to call
26+
// another service rather than waiting for buffer capacity.
27+
//
28+
// Unfortunately, this can't be done easily using Tokio's bounded MPSC
29+
// channel, because it doesn't expose a polling-based interface, only an
30+
// `async fn ready`, which borrows the sender. Therefore, we implement our
31+
// own bounded MPSC on top of the unbounded channel, using a semaphore to
32+
// limit how many items are in the channel.
33+
semaphore: Semaphore,
2134
handle: Handle,
2235
}
2336

@@ -50,10 +63,9 @@ where
5063
T::Error: Send + Sync,
5164
Request: Send + 'static,
5265
{
53-
let (tx, rx) = mpsc::channel(bound);
54-
let (handle, worker) = Worker::new(service, rx);
66+
let (service, worker) = Self::pair(service, bound);
5567
tokio::spawn(worker);
56-
Buffer { tx, handle }
68+
service
5769
}
5870

5971
/// Creates a new `Buffer` wrapping `service`, but returns the background worker.
@@ -67,9 +79,17 @@ where
6779
T::Error: Send + Sync,
6880
Request: Send + 'static,
6981
{
70-
let (tx, rx) = mpsc::channel(bound);
82+
let (tx, rx) = mpsc::unbounded_channel();
7183
let (handle, worker) = Worker::new(service, rx);
72-
(Buffer { tx, handle }, worker)
84+
let semaphore = Semaphore::new(bound);
85+
(
86+
Buffer {
87+
tx,
88+
handle,
89+
semaphore,
90+
},
91+
worker,
92+
)
7393
}
7494

7595
fn get_worker_error(&self) -> crate::BoxError {
@@ -87,40 +107,43 @@ where
87107
type Future = ResponseFuture<T::Future>;
88108

89109
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90-
// If the inner service has errored, then we error here.
91-
if let Err(_) = ready!(self.tx.poll_ready(cx)) {
92-
Poll::Ready(Err(self.get_worker_error()))
93-
} else {
94-
Poll::Ready(Ok(()))
110+
// First, check if the worker is still alive.
111+
if self.tx.is_closed() {
112+
// If the inner service has errored, then we error here.
113+
return Poll::Ready(Err(self.get_worker_error()));
95114
}
115+
116+
// Then, poll to acquire a semaphore permit. If we acquire a permit,
117+
// then there's enough buffer capacity to send a new request. Otherwise,
118+
// we need to wait for capacity.
119+
ready!(self.semaphore.poll_acquire(cx));
120+
121+
Poll::Ready(Ok(()))
96122
}
97123

98124
fn call(&mut self, request: Request) -> Self::Future {
99-
// TODO:
100-
// ideally we'd poll_ready again here so we don't allocate the oneshot
101-
// if the try_send is about to fail, but sadly we can't call poll_ready
102-
// outside of task context.
103-
let (tx, rx) = oneshot::channel();
125+
tracing::trace!("sending request to buffer worker");
126+
let _permit = self
127+
.semaphore
128+
.take_permit()
129+
.expect("buffer full; poll_ready must be called first");
104130

105131
// get the current Span so that we can explicitly propagate it to the worker
106132
// if we didn't do this, events on the worker related to this span wouldn't be counted
107133
// towards that span since the worker would have no way of entering it.
108134
let span = tracing::Span::current();
109-
tracing::trace!(parent: &span, "sending request to buffer worker");
110-
match self.tx.try_send(Message { request, span, tx }) {
111-
Err(mpsc::error::TrySendError::Closed(_)) => {
112-
ResponseFuture::failed(self.get_worker_error())
113-
}
114-
Err(mpsc::error::TrySendError::Full(_)) => {
115-
// When `mpsc::Sender::poll_ready` returns `Ready`, a slot
116-
// in the channel is reserved for the handle. Other `Sender`
117-
// handles may not send a message using that slot. This
118-
// guarantees capacity for `request`.
119-
//
120-
// Given this, the only way to hit this code path is if
121-
// `poll_ready` has not been called & `Ready` returned.
122-
panic!("buffer full; poll_ready must be called first");
123-
}
135+
136+
// If we've made it here, then a semaphore permit has already been
137+
// acquired, so we can freely allocate a oneshot.
138+
let (tx, rx) = oneshot::channel();
139+
140+
match self.tx.send(Message {
141+
request,
142+
span,
143+
tx,
144+
_permit,
145+
}) {
146+
Err(_) => ResponseFuture::failed(self.get_worker_error()),
124147
Ok(_) => ResponseFuture::new(rx),
125148
}
126149
}
@@ -134,6 +157,7 @@ where
134157
Self {
135158
tx: self.tx.clone(),
136159
handle: self.handle.clone(),
160+
semaphore: self.semaphore.clone(),
137161
}
138162
}
139163
}

tower/src/buffer/worker.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
pin::Pin,
1111
task::{Context, Poll},
1212
};
13-
use tokio::sync::mpsc;
13+
use tokio::{stream::Stream, sync::mpsc};
1414
use tower_service::Service;
1515

1616
/// Task that handles processing the buffer. This type should not be used
@@ -28,7 +28,7 @@ where
2828
T::Error: Into<crate::BoxError>,
2929
{
3030
current_message: Option<Message<Request, T::Future>>,
31-
rx: mpsc::Receiver<Message<Request, T::Future>>,
31+
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
3232
service: T,
3333
finish: bool,
3434
failed: Option<ServiceError>,
@@ -48,7 +48,7 @@ where
4848
{
4949
pub(crate) fn new(
5050
service: T,
51-
rx: mpsc::Receiver<Message<Request, T::Future>>,
51+
rx: mpsc::UnboundedReceiver<Message<Request, T::Future>>,
5252
) -> (Handle, Worker<T, Request>) {
5353
let handle = Handle {
5454
inner: Arc::new(Mutex::new(None)),
@@ -80,11 +80,11 @@ where
8080
}
8181

8282
tracing::trace!("worker polling for next message");
83-
if let Some(mut msg) = self.current_message.take() {
84-
// poll_closed returns Poll::Ready is the receiver is dropped.
85-
// Returning Pending means it is still alive, so we should still
86-
// use it.
87-
if msg.tx.poll_closed(cx).is_pending() {
83+
if let Some(msg) = self.current_message.take() {
84+
// If the oneshot sender is closed, then the receiver is dropped,
85+
// and nobody cares about the response. If this is the case, we
86+
// should continue to the next request.
87+
if !msg.tx.is_closed() {
8888
tracing::trace!("resuming buffered request");
8989
return Poll::Ready(Some((msg, false)));
9090
}
@@ -93,8 +93,8 @@ where
9393
}
9494

9595
// Get the next request
96-
while let Some(mut msg) = ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
97-
if msg.tx.poll_closed(cx).is_pending() {
96+
while let Some(msg) = ready!(Pin::new(&mut self.rx).poll_next(cx)) {
97+
if !msg.tx.is_closed() {
9898
tracing::trace!("processing new request");
9999
return Poll::Ready(Some((msg, true)));
100100
}

tower/src/hedge/delay.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ where
3737
#[pin_project(project = StateProj)]
3838
#[derive(Debug)]
3939
enum State<Request, F> {
40-
Delaying(#[pin] tokio::time::Delay, Option<Request>),
40+
Delaying(#[pin] tokio::time::Sleep, Option<Request>),
4141
Called(#[pin] F),
4242
}
4343

@@ -70,10 +70,10 @@ where
7070
}
7171

7272
fn call(&mut self, request: Request) -> Self::Future {
73-
let deadline = tokio::time::Instant::now() + self.policy.delay(&request);
73+
let delay = self.policy.delay(&request);
7474
ResponseFuture {
7575
service: Some(self.service.clone()),
76-
state: State::Delaying(tokio::time::delay_until(deadline), Some(request)),
76+
state: State::Delaying(tokio::time::sleep(delay), Some(request)),
7777
}
7878
}
7979
}

tower/src/hedge/mod.rs

+1-6
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
//! Pre-emptively retry requests which have been outstanding for longer
22
//! than a given latency percentile.
33
4-
#![warn(
5-
missing_debug_implementations,
6-
missing_docs,
7-
rust_2018_idioms,
8-
unreachable_pub
9-
)]
4+
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
105

116
use crate::filter::Filter;
127
use futures_util::future;

tower/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ pub use tower_layer::Layer;
7777
#[doc(inline)]
7878
pub use tower_service::Service;
7979

80+
#[cfg(any(feature = "buffer", feature = "limit"))]
81+
mod semaphore;
82+
8083
#[allow(unreachable_pub)]
8184
mod sealed {
8285
pub trait Sealed<T> {}

0 commit comments

Comments
 (0)