Skip to content

Commit eb28641

Browse files
Ruben24240xE282B0
authored andcommitted
feat(client): Make clients able to use non-Send executor (hyperium#3184)
Closes hyperium#3017 BREAKING CHANGE: `client::conn::http2` types now use another generic for an `Executor`. Code that names `Connection` needs to include the additional generic parameter. Signed-off-by: Sven Pfennig <[email protected]>
1 parent c25a4b8 commit eb28641

File tree

9 files changed

+652
-213
lines changed

9 files changed

+652
-213
lines changed

examples/single_threaded.rs

+157-14
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
#![deny(warnings)]
22

3+
use http_body_util::BodyExt;
34
use hyper::server::conn::http2;
45
use std::cell::Cell;
56
use std::net::SocketAddr;
67
use std::rc::Rc;
8+
use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt};
79
use tokio::net::TcpListener;
810

911
use hyper::body::{Body as HttpBody, Bytes, Frame};
1012
use hyper::service::service_fn;
13+
use hyper::Request;
1114
use hyper::{Error, Response};
1215
use std::marker::PhantomData;
1316
use std::pin::Pin;
1417
use std::task::{Context, Poll};
18+
use std::thread;
19+
use tokio::net::TcpStream;
1520

1621
struct Body {
1722
// Our Body type is !Send and !Sync:
@@ -40,28 +45,57 @@ impl HttpBody for Body {
4045
}
4146
}
4247

43-
fn main() -> Result<(), Box<dyn std::error::Error>> {
48+
fn main() {
4449
pretty_env_logger::init();
4550

46-
// Configure a runtime that runs everything on the current thread
47-
let rt = tokio::runtime::Builder::new_current_thread()
48-
.enable_all()
49-
.build()
50-
.expect("build runtime");
51-
52-
// Combine it with a `LocalSet, which means it can spawn !Send futures...
53-
let local = tokio::task::LocalSet::new();
54-
local.block_on(&rt, run())
51+
let server = thread::spawn(move || {
52+
// Configure a runtime for the server that runs everything on the current thread
53+
let rt = tokio::runtime::Builder::new_current_thread()
54+
.enable_all()
55+
.build()
56+
.expect("build runtime");
57+
58+
// Combine it with a `LocalSet, which means it can spawn !Send futures...
59+
let local = tokio::task::LocalSet::new();
60+
local.block_on(&rt, server()).unwrap();
61+
});
62+
63+
let client = thread::spawn(move || {
64+
// Configure a runtime for the client that runs everything on the current thread
65+
let rt = tokio::runtime::Builder::new_current_thread()
66+
.enable_all()
67+
.build()
68+
.expect("build runtime");
69+
70+
// Combine it with a `LocalSet, which means it can spawn !Send futures...
71+
let local = tokio::task::LocalSet::new();
72+
local
73+
.block_on(
74+
&rt,
75+
client("http://localhost:3000".parse::<hyper::Uri>().unwrap()),
76+
)
77+
.unwrap();
78+
});
79+
80+
server.join().unwrap();
81+
client.join().unwrap();
5582
}
5683

57-
async fn run() -> Result<(), Box<dyn std::error::Error>> {
58-
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
84+
async fn server() -> Result<(), Box<dyn std::error::Error>> {
85+
let mut stdout = io::stdout();
5986

87+
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
6088
// Using a !Send request counter is fine on 1 thread...
6189
let counter = Rc::new(Cell::new(0));
6290

6391
let listener = TcpListener::bind(addr).await?;
64-
println!("Listening on http://{}", addr);
92+
93+
stdout
94+
.write_all(format!("Listening on http://{}", addr).as_bytes())
95+
.await
96+
.unwrap();
97+
stdout.flush().await.unwrap();
98+
6599
loop {
66100
let (stream, _) = listener.accept().await?;
67101

@@ -80,12 +114,121 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
80114
.serve_connection(stream, service)
81115
.await
82116
{
83-
println!("Error serving connection: {:?}", err);
117+
let mut stdout = io::stdout();
118+
stdout
119+
.write_all(format!("Error serving connection: {:?}", err).as_bytes())
120+
.await
121+
.unwrap();
122+
stdout.flush().await.unwrap();
84123
}
85124
});
86125
}
87126
}
88127

128+
struct IOTypeNotSend {
129+
_marker: PhantomData<*const ()>,
130+
stream: TcpStream,
131+
}
132+
133+
impl IOTypeNotSend {
134+
fn new(stream: TcpStream) -> Self {
135+
Self {
136+
_marker: PhantomData,
137+
stream,
138+
}
139+
}
140+
}
141+
142+
impl AsyncWrite for IOTypeNotSend {
143+
fn poll_write(
144+
mut self: Pin<&mut Self>,
145+
cx: &mut Context<'_>,
146+
buf: &[u8],
147+
) -> Poll<Result<usize, std::io::Error>> {
148+
Pin::new(&mut self.stream).poll_write(cx, buf)
149+
}
150+
151+
fn poll_flush(
152+
mut self: Pin<&mut Self>,
153+
cx: &mut Context<'_>,
154+
) -> Poll<Result<(), std::io::Error>> {
155+
Pin::new(&mut self.stream).poll_flush(cx)
156+
}
157+
158+
fn poll_shutdown(
159+
mut self: Pin<&mut Self>,
160+
cx: &mut Context<'_>,
161+
) -> Poll<Result<(), std::io::Error>> {
162+
Pin::new(&mut self.stream).poll_shutdown(cx)
163+
}
164+
}
165+
166+
impl AsyncRead for IOTypeNotSend {
167+
fn poll_read(
168+
mut self: Pin<&mut Self>,
169+
cx: &mut Context<'_>,
170+
buf: &mut tokio::io::ReadBuf<'_>,
171+
) -> Poll<std::io::Result<()>> {
172+
Pin::new(&mut self.stream).poll_read(cx, buf)
173+
}
174+
}
175+
176+
async fn client(url: hyper::Uri) -> Result<(), Box<dyn std::error::Error>> {
177+
let host = url.host().expect("uri has no host");
178+
let port = url.port_u16().unwrap_or(80);
179+
let addr = format!("{}:{}", host, port);
180+
let stream = TcpStream::connect(addr).await?;
181+
182+
let stream = IOTypeNotSend::new(stream);
183+
184+
let (mut sender, conn) = hyper::client::conn::http2::handshake(LocalExec, stream).await?;
185+
186+
tokio::task::spawn_local(async move {
187+
if let Err(err) = conn.await {
188+
let mut stdout = io::stdout();
189+
stdout
190+
.write_all(format!("Connection failed: {:?}", err).as_bytes())
191+
.await
192+
.unwrap();
193+
stdout.flush().await.unwrap();
194+
}
195+
});
196+
197+
let authority = url.authority().unwrap().clone();
198+
199+
// Make 4 requests
200+
for _ in 0..4 {
201+
let req = Request::builder()
202+
.uri(url.clone())
203+
.header(hyper::header::HOST, authority.as_str())
204+
.body(Body::from("test".to_string()))?;
205+
206+
let mut res = sender.send_request(req).await?;
207+
208+
let mut stdout = io::stdout();
209+
stdout
210+
.write_all(format!("Response: {}\n", res.status()).as_bytes())
211+
.await
212+
.unwrap();
213+
stdout
214+
.write_all(format!("Headers: {:#?}\n", res.headers()).as_bytes())
215+
.await
216+
.unwrap();
217+
stdout.flush().await.unwrap();
218+
219+
// Print the response body
220+
while let Some(next) = res.frame().await {
221+
let frame = next?;
222+
if let Some(chunk) = frame.data_ref() {
223+
stdout.write_all(&chunk).await.unwrap();
224+
}
225+
}
226+
stdout.write_all(b"\n-----------------\n").await.unwrap();
227+
stdout.flush().await.unwrap();
228+
}
229+
Ok(())
230+
}
231+
89232
// NOTE: This part is only needed for HTTP/2. HTTP/1 doesn't need an executor.
90233
//
91234
// Since the Server needs to spawn some background tasks, we needed

src/client/conn/http2.rs

+43-34
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! HTTP/2 client connections
22
3-
use std::error::Error as StdError;
3+
use std::error::Error;
44
use std::fmt;
55
use std::marker::PhantomData;
66
use std::sync::Arc;
@@ -12,12 +12,10 @@ use tokio::io::{AsyncRead, AsyncWrite};
1212
use super::super::dispatch;
1313
use crate::body::{Body, Incoming as IncomingBody};
1414
use crate::common::time::Time;
15-
use crate::common::{
16-
exec::{BoxSendFuture, Exec},
17-
task, Future, Pin, Poll,
18-
};
15+
use crate::common::{task, Future, Pin, Poll};
1916
use crate::proto;
20-
use crate::rt::{Executor, Timer};
17+
use crate::rt::bounds::ExecutorClient;
18+
use crate::rt::Timer;
2119

2220
/// The sender side of an established connection.
2321
pub struct SendRequest<B> {
@@ -37,20 +35,22 @@ impl<B> Clone for SendRequest<B> {
3735
/// In most cases, this should just be spawned into an executor, so that it
3836
/// can process incoming and outgoing messages, notice hangups, and the like.
3937
#[must_use = "futures do nothing unless polled"]
40-
pub struct Connection<T, B>
38+
pub struct Connection<T, B, E>
4139
where
42-
T: AsyncRead + AsyncWrite + Send + 'static,
40+
T: AsyncRead + AsyncWrite + 'static + Unpin,
4341
B: Body + 'static,
42+
E: ExecutorClient<B, T> + Unpin,
43+
B::Error: Into<Box<dyn Error + Send + Sync>>,
4444
{
45-
inner: (PhantomData<T>, proto::h2::ClientTask<B>),
45+
inner: (PhantomData<T>, proto::h2::ClientTask<B, E, T>),
4646
}
4747

4848
/// A builder to configure an HTTP connection.
4949
///
5050
/// After setting options, the builder is used to create a handshake future.
5151
#[derive(Clone, Debug)]
52-
pub struct Builder {
53-
pub(super) exec: Exec,
52+
pub struct Builder<Ex> {
53+
pub(super) exec: Ex,
5454
pub(super) timer: Time,
5555
h2_builder: proto::h2::client::Config,
5656
}
@@ -59,13 +59,16 @@ pub struct Builder {
5959
///
6060
/// This is a shortcut for `Builder::new().handshake(io)`.
6161
/// See [`client::conn`](crate::client::conn) for more.
62-
pub async fn handshake<E, T, B>(exec: E, io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
62+
pub async fn handshake<E, T, B>(
63+
exec: E,
64+
io: T,
65+
) -> crate::Result<(SendRequest<B>, Connection<T, B, E>)>
6366
where
64-
E: Executor<BoxSendFuture> + Send + Sync + 'static,
65-
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
67+
T: AsyncRead + AsyncWrite + Unpin + 'static,
6668
B: Body + 'static,
6769
B::Data: Send,
68-
B::Error: Into<Box<dyn StdError + Send + Sync>>,
70+
B::Error: Into<Box<dyn Error + Send + Sync>>,
71+
E: ExecutorClient<B, T> + Unpin + Clone,
6972
{
7073
Builder::new(exec).handshake(io).await
7174
}
@@ -188,12 +191,13 @@ impl<B> fmt::Debug for SendRequest<B> {
188191

189192
// ===== impl Connection
190193

191-
impl<T, B> Connection<T, B>
194+
impl<T, B, E> Connection<T, B, E>
192195
where
193-
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
196+
T: AsyncRead + AsyncWrite + Unpin + 'static,
194197
B: Body + Unpin + Send + 'static,
195198
B::Data: Send,
196-
B::Error: Into<Box<dyn StdError + Send + Sync>>,
199+
B::Error: Into<Box<dyn Error + Send + Sync>>,
200+
E: ExecutorClient<B, T> + Unpin,
197201
{
198202
/// Returns whether the [extended CONNECT protocol][1] is enabled or not.
199203
///
@@ -209,22 +213,26 @@ where
209213
}
210214
}
211215

212-
impl<T, B> fmt::Debug for Connection<T, B>
216+
impl<T, B, E> fmt::Debug for Connection<T, B, E>
213217
where
214-
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
218+
T: AsyncRead + AsyncWrite + fmt::Debug + 'static + Unpin,
215219
B: Body + 'static,
220+
E: ExecutorClient<B, T> + Unpin,
221+
B::Error: Into<Box<dyn Error + Send + Sync>>,
216222
{
217223
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218224
f.debug_struct("Connection").finish()
219225
}
220226
}
221227

222-
impl<T, B> Future for Connection<T, B>
228+
impl<T, B, E> Future for Connection<T, B, E>
223229
where
224-
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
225-
B: Body + Send + 'static,
230+
T: AsyncRead + AsyncWrite + Unpin + 'static,
231+
B: Body + 'static + Unpin,
226232
B::Data: Send,
227-
B::Error: Into<Box<dyn StdError + Send + Sync>>,
233+
E: Unpin,
234+
B::Error: Into<Box<dyn Error + Send + Sync>>,
235+
E: ExecutorClient<B, T> + 'static + Send + Sync + Unpin,
228236
{
229237
type Output = crate::Result<()>;
230238

@@ -239,22 +247,22 @@ where
239247

240248
// ===== impl Builder
241249

242-
impl Builder {
250+
impl<Ex> Builder<Ex>
251+
where
252+
Ex: Clone,
253+
{
243254
/// Creates a new connection builder.
244255
#[inline]
245-
pub fn new<E>(exec: E) -> Builder
246-
where
247-
E: Executor<BoxSendFuture> + Send + Sync + 'static,
248-
{
256+
pub fn new(exec: Ex) -> Builder<Ex> {
249257
Builder {
250-
exec: Exec::new(exec),
258+
exec,
251259
timer: Time::Empty,
252260
h2_builder: Default::default(),
253261
}
254262
}
255263

256264
/// Provide a timer to execute background HTTP2 tasks.
257-
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
265+
pub fn timer<M>(&mut self, timer: M) -> &mut Builder<Ex>
258266
where
259267
M: Timer + Send + Sync + 'static,
260268
{
@@ -388,12 +396,13 @@ impl Builder {
388396
pub fn handshake<T, B>(
389397
&self,
390398
io: T,
391-
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
399+
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B, Ex>)>>
392400
where
393-
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
401+
T: AsyncRead + AsyncWrite + Unpin + 'static,
394402
B: Body + 'static,
395403
B::Data: Send,
396-
B::Error: Into<Box<dyn StdError + Send + Sync>>,
404+
B::Error: Into<Box<dyn Error + Send + Sync>>,
405+
Ex: ExecutorClient<B, T> + Unpin,
397406
{
398407
let opts = self.clone();
399408

0 commit comments

Comments
 (0)