Skip to content

Commit dd54f20

Browse files
committed
feat(server): add experimental pipeline flush aggregation option to Http
By enabling `Http::pipeline`, the connection will aggregate response writes to try to improve sending more responses in a single syscall.
1 parent 16e834d commit dd54f20

File tree

4 files changed

+90
-7
lines changed

4 files changed

+90
-7
lines changed

src/http/conn.rs

+3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ where I: AsyncRead + AsyncWrite,
4747
}
4848
}
4949

50+
pub fn set_flush_pipeline(&mut self, enabled: bool) {
51+
self.io.set_flush_pipeline(enabled);
52+
}
5053

5154
fn poll2(&mut self) -> Poll<Option<Frame<http::MessageHead<T::Incoming>, http::Chunk, ::Error>>, io::Error> {
5255
trace!("Conn::poll()");

src/http/io.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const INIT_BUFFER_SIZE: usize = 8192;
1313
pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
1414

1515
pub struct Buffered<T> {
16+
flush_pipeline: bool,
1617
io: T,
1718
read_blocked: bool,
1819
read_buf: BytesMut,
@@ -31,13 +32,18 @@ impl<T> fmt::Debug for Buffered<T> {
3132
impl<T: AsyncRead + AsyncWrite> Buffered<T> {
3233
pub fn new(io: T) -> Buffered<T> {
3334
Buffered {
35+
flush_pipeline: false,
3436
io: io,
3537
read_buf: BytesMut::with_capacity(0),
3638
write_buf: WriteBuf::new(),
3739
read_blocked: false,
3840
}
3941
}
4042

43+
pub fn set_flush_pipeline(&mut self, enabled: bool) {
44+
self.flush_pipeline = enabled;
45+
}
46+
4147
pub fn read_buf(&self) -> &[u8] {
4248
self.read_buf.as_ref()
4349
}
@@ -139,7 +145,9 @@ impl<T: Write> Write for Buffered<T> {
139145
}
140146

141147
fn flush(&mut self) -> io::Result<()> {
142-
if self.write_buf.remaining() == 0 {
148+
if self.flush_pipeline && self.read_buf.is_empty() {
149+
Ok(())
150+
} else if self.write_buf.remaining() == 0 {
143151
self.io.flush()
144152
} else {
145153
loop {

src/server/mod.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub use http::request::Request;
4949
/// configured with various protocol-level options such as keepalive.
5050
pub struct Http<B = ::Chunk> {
5151
keep_alive: bool,
52+
pipeline: bool,
5253
_marker: PhantomData<B>,
5354
}
5455

@@ -73,6 +74,7 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
7374
pub fn new() -> Http<B> {
7475
Http {
7576
keep_alive: true,
77+
pipeline: false,
7678
_marker: PhantomData,
7779
}
7880
}
@@ -85,6 +87,16 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
8587
self
8688
}
8789

90+
/// Aggregates flushes to better support pipelined responses.
91+
///
92+
/// Experimental, may be have bugs.
93+
///
94+
/// Default is false.
95+
pub fn pipeline(&mut self, enabled: bool) -> &mut Self {
96+
self.pipeline = enabled;
97+
self
98+
}
99+
88100
/// Bind the provided `addr` and return a server ready to handle
89101
/// connections.
90102
///
@@ -185,6 +197,7 @@ impl<B> fmt::Debug for Http<B> {
185197
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
186198
f.debug_struct("Http")
187199
.field("keep_alive", &self.keep_alive)
200+
.field("pipeline", &self.pipeline)
188201
.finish()
189202
}
190203
}
@@ -223,8 +236,10 @@ impl<T, B> ServerProto<T> for Http<B>
223236
} else {
224237
http::KA::Disabled
225238
};
239+
let mut conn = http::Conn::new(io, ka);
240+
conn.set_flush_pipeline(self.pipeline);
226241
__ProtoBindTransport {
227-
inner: future::ok(http::Conn::new(io, ka)),
242+
inner: future::ok(conn),
228243
}
229244
}
230245
}

tests/server.rs

+62-5
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,58 @@ fn expect_continue() {
437437
assert_eq!(body, msg);
438438
}
439439

440+
#[test]
441+
fn pipline_disabled() {
442+
let server = serve();
443+
let mut req = connect(server.addr());
444+
server.reply().status(hyper::Ok);
445+
server.reply().status(hyper::Ok);
446+
447+
req.write_all(b"\
448+
GET / HTTP/1.1\r\n\
449+
Host: example.domain\r\n\
450+
\r\n\
451+
GET / HTTP/1.1\r\n\
452+
Host: example.domain\r\n\
453+
\r\n\
454+
").expect("write 1");
455+
456+
let mut buf = vec![0; 4096];
457+
let n = req.read(&mut buf).expect("read 1");
458+
assert_ne!(n, 0);
459+
let n = req.read(&mut buf).expect("read 2");
460+
assert_ne!(n, 0);
461+
}
462+
463+
#[test]
464+
fn pipeline_enabled() {
465+
let server = serve_with_options(ServeOptions {
466+
pipeline: true,
467+
.. Default::default()
468+
});
469+
let mut req = connect(server.addr());
470+
server.reply().status(hyper::Ok);
471+
server.reply().status(hyper::Ok);
472+
473+
req.write_all(b"\
474+
GET / HTTP/1.1\r\n\
475+
Host: example.domain\r\n\
476+
\r\n\
477+
GET / HTTP/1.1\r\n\
478+
Host: example.domain\r\n\
479+
Connection: close\r\n\
480+
\r\n\
481+
").expect("write 1");
482+
483+
let mut buf = vec![0; 4096];
484+
let n = req.read(&mut buf).expect("read 1");
485+
assert_ne!(n, 0);
486+
// with pipeline enabled, both responses should have been in the first read
487+
// so a second read should be EOF
488+
let n = req.read(&mut buf).expect("read 2");
489+
assert_eq!(n, 0);
490+
}
491+
440492
// -------------------------------------------------
441493
// the Server that is used to run all the tests with
442494
// -------------------------------------------------
@@ -577,6 +629,7 @@ fn serve() -> Serve {
577629
#[derive(Default)]
578630
struct ServeOptions {
579631
keep_alive_disabled: bool,
632+
pipeline: bool,
580633
timeout: Option<Duration>,
581634
}
582635

@@ -591,15 +644,19 @@ fn serve_with_options(options: ServeOptions) -> Serve {
591644
let addr = "127.0.0.1:0".parse().unwrap();
592645

593646
let keep_alive = !options.keep_alive_disabled;
647+
let pipeline = options.pipeline;
594648
let dur = options.timeout;
595649

596650
let thread_name = format!("test-server-{:?}", dur);
597651
let thread = thread::Builder::new().name(thread_name).spawn(move || {
598-
let srv = Http::new().keep_alive(keep_alive).bind(&addr, TestService {
599-
tx: Arc::new(Mutex::new(msg_tx.clone())),
600-
_timeout: dur,
601-
reply: reply_rx,
602-
}).unwrap();
652+
let srv = Http::new()
653+
.keep_alive(keep_alive)
654+
.pipeline(pipeline)
655+
.bind(&addr, TestService {
656+
tx: Arc::new(Mutex::new(msg_tx.clone())),
657+
_timeout: dur,
658+
reply: reply_rx,
659+
}).unwrap();
603660
addr_tx.send(srv.local_addr().unwrap()).unwrap();
604661
srv.run_until(shutdown_rx.then(|_| Ok(()))).unwrap();
605662
}).unwrap();

0 commit comments

Comments
 (0)