Skip to content

Commit

Permalink
Upgrade hyper to 1.0. (#955)
Browse files Browse the repository at this point in the history
This PR updates hyper to 1.0 and adjusts the built-in HTTP server accordingly.
  • Loading branch information
partim authored Apr 29, 2024
1 parent 7410894 commit bf9ee15
Show file tree
Hide file tree
Showing 15 changed files with 536 additions and 202 deletions.
441 changes: 355 additions & 86 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ crossbeam-queue = "0.3.1"
dirs = "5"
form_urlencoded = "1.0"
futures = "0.3.4"
hyper = { version = "0.14", features = [ "server", "stream" ] }
http-body-util = "0.1"
hyper = { version = "1.2", features = [ "server" ] }
hyper-util = { version = "0.1", features = [ "server" ] }
listenfd = "1"
log = "0.4.8"
pin-project-lite = "0.2.4"
rand = "0.8.1"
reqwest = { version = "0.11.0", default-features = false, features = ["blocking", "rustls-tls" ] }
reqwest = { version = "0.12.4", default-features = false, features = ["blocking", "rustls-tls" ] }
ring = "0.17"
#rpki = { version = "0.18", features = [ "repository", "rrdp", "rtr", "serde", "slurm" ] }
rpki = { git = "https://github.com/NLnetLabs/rpki-rs.git", features = [ "repository", "rrdp", "rtr", "serde", "slurm" ] }
Expand Down
46 changes: 23 additions & 23 deletions src/http/delta.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
//! Handles endpoints related to output of payload deltas.
use std::convert::Infallible;
use std::str::FromStr;
use std::sync::Arc;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream;
use hyper::{Body, Method, Request};
use rpki::rtr::Serial;
use rpki::rtr::payload::{Action, PayloadRef};
use rpki::rtr::server::{NotifySender, PayloadDiff};
Expand All @@ -15,12 +14,13 @@ use crate::payload::{
use crate::utils::fmt::WriteOrPanic;
use crate::utils::date::format_iso_date;
use crate::utils::json::JsonBuilder;
use super::request::Request;
use super::response::{ContentType, Response, ResponseBuilder};

//------------ handle_get_or_head --------------------------------------------

pub fn handle_get_or_head(
req: &Request<Body>,
req: &Request,
history: &SharedHistory,
) -> Option<Response> {
if req.uri().path() != "/json-delta" {
Expand All @@ -37,7 +37,7 @@ pub fn handle_get_or_head(
Err(response) => return Some(response)
};

if *req.method() == Method::HEAD {
if req.is_head() {
return Some(
ResponseBuilder::ok().content_type(ContentType::JSON).empty()
)
Expand Down Expand Up @@ -68,29 +68,29 @@ fn handle_delta(
session: u64, from_serial: Serial, to_serial: Serial,
delta: Arc<PayloadDelta>, created: DateTime<Utc>,
) -> Response {
ResponseBuilder::ok().content_type(ContentType::JSON)
.body(Body::wrap_stream(stream::iter(
DeltaStream::new(session, from_serial, to_serial, delta, created)
.map(Result::<_, Infallible>::Ok)
)))
ResponseBuilder::ok().content_type(ContentType::JSON).stream(
stream::iter(
DeltaStream::new(session, from_serial, to_serial, delta, created)
)
)
}

fn handle_reset(
session: u64, to_serial: Serial, snapshot: Arc<PayloadSnapshot>,
created: DateTime<Utc>,
) -> Response {
ResponseBuilder::ok().content_type(ContentType::JSON)
.body(Body::wrap_stream(stream::iter(
SnapshotStream::new(session, to_serial, snapshot, created)
.map(Result::<_, Infallible>::Ok)
)))
ResponseBuilder::ok().content_type(ContentType::JSON).stream(
stream::iter(
SnapshotStream::new(session, to_serial, snapshot, created)
)
)
}


//------------ handle_notify_get_or_head -------------------------------------

pub async fn handle_notify_get_or_head(
req: &Request<Body>,
req: &Request,
history: &SharedHistory,
notify: &NotifySender,
) -> Option<Response> {
Expand All @@ -107,7 +107,7 @@ pub async fn handle_notify_get_or_head(
notify.subscribe().recv().await;
}

if *req.method() == Method::HEAD {
if req.is_head() {
Some(
ResponseBuilder::ok().content_type(ContentType::JSON).empty()
)
Expand All @@ -126,7 +126,7 @@ pub async fn handle_notify_get_or_head(
}

fn need_wait(
req: &Request<Body>,
req: &Request,
history: &SharedHistory,
) -> Result<bool, Response> {
let version = match version_from_query(req.uri().query())? {
Expand Down Expand Up @@ -327,7 +327,7 @@ impl DeltaStream {
}

impl Iterator for DeltaStream {
type Item = Vec<u8>;
type Item = Bytes;

fn next(&mut self) -> Option<Self::Item> {
#[allow(clippy::question_mark)]
Expand All @@ -337,13 +337,13 @@ impl Iterator for DeltaStream {
let mut vec = self.header.take().unwrap_or_default();
loop {
if vec.len() > 64000 {
return Some(vec)
return Some(vec.into())
}
if self.next_announce(&mut vec) {
continue;
}
if !self.next_withdraw(&mut vec) {
return Some(vec)
return Some(vec.into())
}
}
}
Expand Down Expand Up @@ -450,7 +450,7 @@ impl SnapshotStream {
}

impl Iterator for SnapshotStream {
type Item = Vec<u8>;
type Item = Bytes;

fn next(&mut self) -> Option<Self::Item> {
use rpki::rtr::server::PayloadSet;
Expand All @@ -460,7 +460,7 @@ impl Iterator for SnapshotStream {
let mut vec = self.header.take().unwrap_or_default();
loop {
if vec.len() > 64000 {
return Some(vec)
return Some(vec.into())
}
match iter.next() {
Some(payload) => {
Expand All @@ -477,7 +477,7 @@ impl Iterator for SnapshotStream {

self.iter = None;
DeltaStream::append_footer(&mut vec);
Some(vec)
Some(vec.into())
}
}

9 changes: 3 additions & 6 deletions src/http/dispatch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Rules on how to dispatch a request.
use std::sync::Arc;
use hyper::{Body, Method, Request};
use rpki::rtr::server::NotifySender;
use crate::config::Config;
use crate::metrics::{HttpServerMetrics, SharedRtrServerMetrics};
use crate::payload::SharedHistory;
use crate::process::LogOutput;
use super::{delta, log, metrics, payload, status, validity};
use super::request::Request;
use super::response::Response;

//------------ State ---------------------------------------------------------
Expand Down Expand Up @@ -43,12 +43,9 @@ impl State {
&self.metrics
}

pub async fn handle_request(
&self,
req: Request<Body>,
) -> Response {
pub async fn handle_request(&self, req: Request) -> Response {
self.metrics.inc_requests();
if *req.method() != Method::GET && *req.method() != Method::HEAD {
if !req.is_get_or_head() {
return Response::method_not_allowed()
}

Expand Down
74 changes: 32 additions & 42 deletions src/http/listener.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
//! The HTTP listener.
use std::io;
use std::convert::Infallible;
use std::future::Future;
use std::net::{SocketAddr, TcpListener as StdListener};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::pin_mut;
use futures::future::{pending, select_all};
use hyper::Server;
use hyper::server::accept::Accept;
use hyper::service::{make_service_fn, service_fn};
use hyper::service::service_fn;
use hyper_util::rt::{TokioExecutor, TokioIo};
use log::error;
use rpki::rtr::server::NotifySender;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
Expand Down Expand Up @@ -109,19 +107,6 @@ async fn single_http_listener(
listener: StdListener,
state: Arc<State>,
) {
let make_service = make_service_fn(|_conn| {
let state = state.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let state = state.clone();
async move {
Ok::<_, Infallible>(
state.handle_request(req).await.into_hyper()
)
}
}))
}
});
let listener = HttpAccept {
sock: match TcpListener::from_std(listener) {
Ok(listener) => listener,
Expand All @@ -133,8 +118,28 @@ async fn single_http_listener(
tls: tls_config.map(Into::into),
metrics: state.metrics().clone(),
};
if let Err(err) = Server::builder(listener).serve(make_service).await {
error!("Fatal error in HTTP server {}: {}", addr, err);
loop {
let stream = match listener.accept().await {
Ok(some) => some,
Err(err) => {
error!("Fatal error in HTTP server {}: {}", addr, err);
break;
}
};
let service_state = state.clone();
tokio::task::spawn(async move {
let _ = hyper_util::server::conn::auto::Builder::new(
TokioExecutor::new()
).serve_connection(
TokioIo::new(stream),
service_fn(move |req| {
let state = service_state.clone();
async move {
state.handle_request(req.into()).await.into_hyper()
}
})
).await;
});
}
}

Expand All @@ -147,29 +152,14 @@ struct HttpAccept {
metrics: Arc<HttpServerMetrics>,
}

impl Accept for HttpAccept {
type Conn = HttpStream;
type Error = io::Error;

fn poll_accept(
mut self: Pin<&mut Self>,
cx: &mut Context
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let sock = &mut self.sock;
pin_mut!(sock);
match sock.poll_accept(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok((sock, _addr))) => {
self.metrics.inc_conn_open();
Poll::Ready(Some(Ok(HttpStream {
sock: MaybeTlsTcpStream::new(sock, self.tls.as_ref()),
metrics: self.metrics.clone()
})))
}
Poll::Ready(Err(err)) => {
Poll::Ready(Some(Err(err)))
}
}
impl HttpAccept {
async fn accept(&self) -> Result<HttpStream, io::Error> {
let (sock, _) = self.sock.accept().await?;
self.metrics.inc_conn_open();
Ok(HttpStream {
sock: MaybeTlsTcpStream::new(sock, self.tls.as_ref()),
metrics: self.metrics.clone()
})
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/http/log.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Handles endpoints related to the log.
use std::sync::Arc;
use hyper::{Body, Method, Request};
use crate::process::LogOutput;
use super::request::Request;
use super::response::{ContentType, Response, ResponseBuilder};

//------------ State ---------------------------------------------------------
Expand All @@ -18,11 +18,11 @@ impl State {

pub fn handle_get_or_head(
&self,
req: &Request<Body>,
req: &Request,
) -> Option<Response> {
if req.uri().path() == "/log" {
let res = ResponseBuilder::ok().content_type(ContentType::TEXT);
if *req.method() == Method::HEAD {
if req.is_head() {
Some(res.empty())
}
else {
Expand Down
9 changes: 5 additions & 4 deletions src/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,29 @@
use std::{cmp, fmt};
use std::fmt::Write;
use chrono::Utc;
use hyper::{Body, Method, Request};
use crate::config::FilterPolicy;
use crate::metrics::{
HttpServerMetrics, Metrics, PayloadMetrics, PublicationMetrics,
RrdpRepositoryMetrics, RsyncModuleMetrics, SharedRtrServerMetrics,
VrpMetrics
};
use crate::payload::SharedHistory;
use super::request::Request;
use super::response::{ContentType, Response, ResponseBuilder};


//------------ handle_get ----------------------------------------------------

pub async fn handle_get_or_head(
req: &Request<Body>,
req: &Request,
history: &SharedHistory,
http: &HttpServerMetrics,
rtr: &SharedRtrServerMetrics,
) -> Option<Response> {
let head = *req.method() == Method::HEAD;
match req.uri().path() {
"/metrics" => Some(handle_metrics(head, history, http, rtr).await),
"/metrics" => {
Some(handle_metrics(req.is_head(), history, http, rtr).await)
}
_ => None
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ pub use self::response::ContentType;
mod dispatch;
mod listener;

// The following module helps making responses.
// The following modules helps dealing with requests and responses
mod request;
mod response;

// Finally, these modules actually handle requests.
Expand Down
Loading

0 comments on commit bf9ee15

Please sign in to comment.