Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http server): configurable request body limit #162

Merged
merged 10 commits into from
Nov 20, 2020
2 changes: 1 addition & 1 deletion benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn concurrent_tasks() -> Vec<usize> {
}

async fn http_server(tx: Sender<SocketAddr>) {
let server = HttpServer::new("127.0.0.1:0").await.unwrap();
let server = HttpServer::new("127.0.0.1:0", HttpConfig { max_request_body_size: u32::MAX }).await.unwrap();
let mut say_hello = server.register_method("say_hello".to_string()).unwrap();
tx.send(*server.local_addr()).unwrap();
loop {
Expand Down
3 changes: 1 addition & 2 deletions examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

async fn run_server(server_started_tx: Sender<()>, url: &str) {
let server = HttpServer::new(url).await.unwrap();
let server = HttpServer::new(url, HttpConfig::default()).await.unwrap();
let mut say_hello = server.register_method("say_hello".to_string()).unwrap();

server_started_tx.send(()).unwrap();
loop {
let r = say_hello.next().await;
Expand Down
22 changes: 3 additions & 19 deletions src/client/http/client.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,9 @@
use crate::client::http::transport::HttpTransportClient;
use crate::types::client::{Error, Mismatch};
use crate::types::error::{Error, Mismatch};
use crate::types::http::HttpConfig;
use crate::types::jsonrpc::{self, JsonValue};
use std::sync::atomic::{AtomicU64, Ordering};

/// Default maximum request body size (10 MB).
const DEFAULT_MAX_BODY_SIZE_TEN_MB: u32 = 10 * 1024 * 1024;

/// HTTP configuration.
#[derive(Copy, Clone)]
pub struct HttpConfig {
/// Maximum request body size in bytes.
pub max_request_body_size: u32,
}

/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
///
/// WARNING: The async methods must be executed on [Tokio 0.2](https://docs.rs/tokio/0.2.22/tokio).
Expand All @@ -23,19 +14,12 @@ pub struct HttpClient {
request_id: AtomicU64,
}

impl Default for HttpConfig {
fn default() -> Self {
Self { max_request_body_size: DEFAULT_MAX_BODY_SIZE_TEN_MB }
}
}

impl HttpClient {
/// Initializes a new HTTP client.
///
/// Fails when the URL is invalid.
pub fn new(target: impl AsRef<str>, config: HttpConfig) -> Result<Self, Error> {
let transport = HttpTransportClient::new(target, config.max_request_body_size)
.map_err(|e| Error::TransportError(Box::new(e)))?;
let transport = HttpTransportClient::new(target, config).map_err(|e| Error::TransportError(Box::new(e)))?;
Ok(Self { transport, request_id: AtomicU64::new(0) })
}

Expand Down
3 changes: 2 additions & 1 deletion src/client/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ mod transport;
#[cfg(test)]
mod tests;

pub use client::{HttpClient, HttpConfig};
pub use crate::types::http::HttpConfig;
pub use client::HttpClient;
pub use transport::HttpTransportClient;
2 changes: 1 addition & 1 deletion src/client/http/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::client::{HttpClient, HttpConfig};
use crate::types::client::Error;
use crate::types::error::Error;
use crate::types::jsonrpc::{self, ErrorCode, JsonValue, Params};

use jsonrpsee_test_utils::helpers::*;
Expand Down
81 changes: 27 additions & 54 deletions src/client/http/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
// that we need to be guaranteed that hyper doesn't re-use an existing connection if we ever reset
// the JSON-RPC request id to a value that might have already been used.

use crate::types::jsonrpc;
use futures::StreamExt;
use crate::types::{error::GenericTransportError, http::HttpConfig, jsonrpc};
use crate::utils::http::hyper_helpers;
use thiserror::Error;

const CONTENT_TYPE_JSON: &str = "application/json";
Expand All @@ -20,15 +20,15 @@ pub struct HttpTransportClient {
/// HTTP client,
client: hyper::Client<hyper::client::HttpConnector>,
/// Configurable max request body size
max_request_body_size: u32,
config: HttpConfig,
}

impl HttpTransportClient {
/// Initializes a new HTTP client.
pub fn new(target: impl AsRef<str>, max_request_body_size: u32) -> Result<Self, Error> {
pub fn new(target: impl AsRef<str>, config: HttpConfig) -> Result<Self, Error> {
let target = url::Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {}", e)))?;
if target.scheme() == "http" {
Ok(HttpTransportClient { client: hyper::Client::new(), target, max_request_body_size })
Ok(HttpTransportClient { client: hyper::Client::new(), target, config })
} else {
Err(Error::Url("URL scheme not supported, expects 'http'".into()))
}
Expand All @@ -39,7 +39,7 @@ impl HttpTransportClient {
let body = jsonrpc::to_vec(&request).map_err(Error::Serialization)?;
log::debug!("send: {}", request);

if body.len() > self.max_request_body_size as usize {
if body.len() > self.config.max_request_body_size as usize {
return Err(Error::RequestTooLarge);
}

Expand Down Expand Up @@ -70,22 +70,8 @@ impl HttpTransportClient {
request: jsonrpc::Request,
) -> Result<jsonrpc::Response, Error> {
let response = self.send_request(request).await?;
let body_size = read_content_length(response.headers()).unwrap_or(0);
let mut body_fut: hyper::Body = response.into_body();

if body_size > self.max_request_body_size {
return Err(Error::RequestTooLarge);
}

let mut body = Vec::with_capacity(body_size as usize);

while let Some(chunk) = body_fut.next().await {
let chunk = chunk.map_err(|e| Error::Http(Box::new(e)))?;
if chunk.len() + body.len() > self.max_request_body_size as usize {
return Err(Error::RequestTooLarge);
}
body.extend_from_slice(&chunk);
}
let (parts, body) = response.into_parts();
let body = hyper_helpers::read_response_to_body(&parts.headers, body, self.config).await?;

// Note that we don't check the Content-Type of the request. This is deemed
// unnecessary, as a parsing error while happen anyway.
Expand All @@ -95,23 +81,6 @@ impl HttpTransportClient {
}
}

// Read `content_length` from HTTP Header.
//
// Returns `Some(val)` if `content_length` contains exactly one value.
// None otherwise.
fn read_content_length(header: &hyper::header::HeaderMap) -> Option<u32> {
let values = header.get_all("content-length");
let mut iter = values.iter();
let content_length = iter.next()?;
if iter.next().is_some() {
return None;
}

// HTTP Content-Length indicates number of bytes in decimal.
let length = content_length.to_str().ok()?;
u32::from_str_radix(length, 10).ok()
}

/// Error that can happen during a request.
#[derive(Debug, Error)]
pub enum Error {
Expand Down Expand Up @@ -144,26 +113,40 @@ pub enum Error {
ParseError(#[source] serde_json::error::Error),

/// Request body too large.
#[error("The request body was to large")]
#[error("The request body was too large")]
RequestTooLarge,
}

impl<T> From<GenericTransportError<T>> for Error
where
T: std::error::Error + Send + Sync + 'static,
{
fn from(err: GenericTransportError<T>) -> Self {
match err {
GenericTransportError::<T>::TooLarge => Self::RequestTooLarge,
GenericTransportError::<T>::Inner(e) => Self::Http(Box::new(e)),
}
}
}

#[cfg(test)]
mod tests {
use super::{read_content_length, Error, HttpTransportClient};
use super::{Error, HttpTransportClient};
use crate::types::http::HttpConfig;
use crate::types::jsonrpc::{Call, Id, MethodCall, Params, Request, Version};

#[test]
fn invalid_http_url_rejected() {
let err = HttpTransportClient::new("ws://localhost:9933", 1337).unwrap_err();
let err = HttpTransportClient::new("ws://localhost:9933", HttpConfig::default()).unwrap_err();
assert!(matches!(err, Error::Url(_)));
}

#[tokio::test]
async fn request_limit_works() {
let eighty_bytes_limit = 80;
let client = HttpTransportClient::new("http://localhost:9933", eighty_bytes_limit).unwrap();
assert_eq!(client.max_request_body_size, eighty_bytes_limit);
let client =
HttpTransportClient::new("http://localhost:9933", HttpConfig { max_request_body_size: 80 }).unwrap();
assert_eq!(client.config.max_request_body_size, eighty_bytes_limit);

let request = Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
Expand All @@ -176,14 +159,4 @@ mod tests {
let response = client.send_request(request).await.unwrap_err();
assert!(matches!(response, Error::RequestTooLarge));
}

#[test]
fn read_content_length_works() {
let mut header = hyper::header::HeaderMap::new();
header.insert(hyper::header::CONTENT_LENGTH, "177".parse().unwrap());
assert_eq!(read_content_length(&header), Some(177));

header.append(hyper::header::CONTENT_LENGTH, "999".parse().unwrap());
assert_eq!(read_content_length(&header), None);
}
}
13 changes: 9 additions & 4 deletions src/client/ws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::client::ws::{RawClient, RawClientEvent, RawClientRequestId, WsTransportClient};
use crate::types::client::Error;
use crate::types::error::Error;
use crate::types::jsonrpc::{self, JsonValue};

use futures::{
Expand Down Expand Up @@ -123,7 +123,7 @@ impl Client {
let method = method.into();
let params = params.into();
log::trace!("[frontend]: send notification: method={:?}, params={:?}", method, params);
self.to_back.clone().send(FrontToBack::Notification { method, params }).await.map_err(Error::InternalChannel)
self.to_back.clone().send(FrontToBack::Notification { method, params }).await.map_err(Error::Internal)
}

/// Perform a request towards the server.
Expand All @@ -139,7 +139,11 @@ impl Client {
let params = params.into();
log::trace!("[frontend]: send request: method={:?}, params={:?}", method, params);
let (send_back_tx, send_back_rx) = oneshot::channel();
self.to_back.clone().send(FrontToBack::StartRequest { method, params, send_back: send_back_tx }).await?;
self.to_back
.clone()
.send(FrontToBack::StartRequest { method, params, send_back: send_back_tx })
.await
.map_err(Error::Internal)?;

// TODO: send a `ChannelClosed` message if we close the channel unexpectedly

Expand Down Expand Up @@ -181,7 +185,8 @@ impl Client {
params: params.into(),
send_back: send_back_tx,
})
.await?;
.await
.map_err(Error::Internal)?;

let notifs_rx = match send_back_rx.await {
Ok(Ok(v)) => v,
Expand Down
2 changes: 0 additions & 2 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

mod raw;
mod server;
#[allow(unused)]
mod server_utils;
mod transport;

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/http/raw/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl From<HttpTransportServer> for RawServer {
impl<'a> RawServerRequest<'a> {
/// Returns the id of the request.
///
/// If this request object is dropped, you can retreive it again later by calling
/// If this request object is dropped, you can retrieve it again later by calling
/// [`request_by_id`](crate::raw::RawServer::request_by_id).
pub fn id(&self) -> RawServerRequestId {
RawServerRequestId { inner: self.inner.id() }
Expand Down
5 changes: 3 additions & 2 deletions src/http/raw/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@

use crate::client::HttpTransportClient;
use crate::http::{HttpRawServer, HttpRawServerEvent, HttpTransportServer};
use crate::types::http::HttpConfig;
use crate::types::jsonrpc::{self, Call, MethodCall, Notification, Params, Request, Version};
use serde_json::Value;

async fn connection_context() -> (HttpTransportClient, HttpRawServer) {
let server = HttpTransportServer::new(&"127.0.0.1:0".parse().unwrap()).await.unwrap();
let server = HttpTransportServer::new(&"127.0.0.1:0".parse().unwrap(), HttpConfig::default()).await.unwrap();
let uri = format!("http://{}", server.local_addr());
let client = HttpTransportClient::new(&uri, 10 * 1024 * 1024).unwrap();
let client = HttpTransportClient::new(&uri, HttpConfig::default()).unwrap();
(client, server.into())
}

Expand Down
23 changes: 12 additions & 11 deletions src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@

use crate::http::raw::{RawServer, RawServerEvent, RawServerRequestId};
use crate::http::transport::HttpTransportServer;
use crate::types::error::Error;
use crate::types::http::HttpConfig;
use crate::types::jsonrpc::{self, JsonValue};
use crate::types::server::Error;

use futures::{channel::mpsc, future::Either, pin_mut, prelude::*};
use parking_lot::Mutex;
Expand All @@ -49,7 +50,7 @@ pub struct Server {
local_addr: SocketAddr,
/// Channel to send requests to the background task.
to_back: mpsc::UnboundedSender<FrontToBack>,
/// List of methods (for RPC queries, subscriptions, and unsubscriptions) that have been
/// List of methods (for RPC queries and notifications) that have been
/// registered. Serves no purpose except to check for duplicates.
registered_methods: Arc<Mutex<HashSet<String>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random thought: maybe this HashSet could use Fnv hashing? Or do we ever expose this on the wire?

Copy link
Member Author

@niklasad1 niklasad1 Nov 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fine, it's internal in the server

EDIT: FNV hashing is only faster for short keys because Strings are used we can't know for sure but most likely yes ^^

/// Next unique ID used when registering a subscription.
Expand Down Expand Up @@ -111,9 +112,9 @@ enum FrontToBack {

impl Server {
/// Initializes a new server based upon this raw server.
pub async fn new(url: &str) -> Result<Self, Box<dyn error::Error + Send + Sync>> {
let sockaddr = url.parse()?;
let transport_server = HttpTransportServer::new(&sockaddr).await?;
pub async fn new(url: impl AsRef<str>, config: HttpConfig) -> Result<Self, Box<dyn error::Error + Send + Sync>> {
let sockaddr = url.as_ref().parse()?;
let transport_server = HttpTransportServer::new(&sockaddr, config).await?;
let local_addr = *transport_server.local_addr();

// We use an unbounded channel because the only exchanged messages concern registering
Expand All @@ -129,7 +130,7 @@ impl Server {
Ok(Server {
local_addr,
to_back,
registered_methods: Arc::new(Mutex::new(Default::default())),
registered_methods: Arc::new(Mutex::new(HashSet::new())),
next_subscription_unique_id: Arc::new(atomic::AtomicUsize::new(0)),
})
}
Expand All @@ -155,15 +156,15 @@ impl Server {
allow_losses: bool,
) -> Result<RegisteredNotification, Error> {
if !self.registered_methods.lock().insert(method_name.clone()) {
return Err(Error::AlreadyRegistered(method_name));
return Err(Error::MethodAlreadyRegistered(method_name));
}

log::trace!("[frontend]: register_notification={}", method_name);
let (tx, rx) = mpsc::channel(32);

self.to_back
.unbounded_send(FrontToBack::RegisterNotifications { name: method_name, handler: tx, allow_losses })
.map_err(|e| Error::InternalChannel(e.into_send_error()))?;
.map_err(|e| Error::Internal(e.into_send_error()))?;

Ok(RegisteredNotification { queries_rx: rx })
}
Expand All @@ -180,15 +181,15 @@ impl Server {
/// Returns an error if the method name was already registered.
pub fn register_method(&self, method_name: String) -> Result<RegisteredMethod, Error> {
if !self.registered_methods.lock().insert(method_name.clone()) {
return Err(Error::AlreadyRegistered(method_name));
return Err(Error::MethodAlreadyRegistered(method_name));
}

log::trace!("[frontend]: register_method={}", method_name);
let (tx, rx) = mpsc::channel(32);

self.to_back
.unbounded_send(FrontToBack::RegisterMethod { name: method_name, handler: tx })
.map_err(|e| Error::InternalChannel(e.into_send_error()))?;
.map_err(|e| Error::Internal(e.into_send_error()))?;

Ok(RegisteredMethod { to_back: self.to_back.clone(), queries_rx: rx })
}
Expand Down Expand Up @@ -230,7 +231,7 @@ impl IncomingRequest {
self.to_back
.send(FrontToBack::AnswerRequest { request_id: self.request_id, answer: response.into() })
.await
.map_err(Error::InternalChannel)
.map_err(Error::Internal)
}
}

Expand Down
Loading