Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract runtime support #1364

Merged
merged 4 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion perf/src/bin/perf_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
use anyhow::{Context, Result};
use bytes::Bytes;
use clap::Parser;
use quinn::TokioRuntime;
use tokio::sync::Semaphore;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -103,7 +104,7 @@ async fn run(opt: Opt) -> Result<()> {

let socket = bind_socket(bind_addr, opt.send_buffer_size, opt.recv_buffer_size)?;

let (endpoint, _) = quinn::Endpoint::new(Default::default(), None, socket)?;
let (endpoint, _) = quinn::Endpoint::new(Default::default(), None, socket, TokioRuntime)?;

let mut crypto = rustls::ClientConfig::builder()
.with_cipher_suites(perf::PERF_CIPHER_SUITES)
Expand Down
11 changes: 8 additions & 3 deletions perf/src/bin/perf_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{fs, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use anyhow::{Context, Result};
use bytes::Bytes;
use clap::Parser;
use quinn::TokioRuntime;
use tracing::{debug, error, info};

use perf::bind_socket;
Expand Down Expand Up @@ -77,9 +78,13 @@ async fn run(opt: Opt) -> Result<()> {

let socket = bind_socket(opt.listen, opt.send_buffer_size, opt.recv_buffer_size)?;

let (endpoint, mut incoming) =
quinn::Endpoint::new(Default::default(), Some(server_config), socket)
.context("creating endpoint")?;
let (endpoint, mut incoming) = quinn::Endpoint::new(
Default::default(),
Some(server_config),
socket,
TokioRuntime,
)
.context("creating endpoint")?;

info!("listening on {}", endpoint.local_addr().unwrap());

Expand Down
2 changes: 1 addition & 1 deletion quinn-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ description = "State machine for the QUIC transport protocol"
keywords = ["quic"]
categories = [ "network-programming", "asynchronous" ]
workspace = ".."
edition = "2018"
edition = "2021"
rust-version = "1.57"

[package.metadata.docs.rs]
Expand Down
3 changes: 1 addition & 2 deletions quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "quinn-udp"
version = "0.1.0"
version = "0.2.0"
license = "MIT OR Apache-2.0"
repository = "https://github.com/quinn-rs/quinn"
description = "UDP sockets with ECN information for the QUIC transport protocol"
Expand All @@ -20,4 +20,3 @@ libc = "0.2.69"
proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.8", default-features = false }
socket2 = "0.4"
tracing = "0.1.10"
tokio = { version = "1.0.1", features = ["net"] }
86 changes: 45 additions & 41 deletions quinn-udp/src/fallback.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,58 @@
use std::{
io::{self, IoSliceMut},
net::SocketAddr,
task::{Context, Poll},
time::Instant,
};

use proto::Transmit;
use tokio::io::ReadBuf;

use super::{log_sendmsg_error, RecvMeta, UdpState, IO_ERROR_LOG_INTERVAL};
use super::{log_sendmsg_error, RecvMeta, UdpSockRef, UdpState, IO_ERROR_LOG_INTERVAL};

/// Tokio-compatible UDP socket with some useful specializations.
/// Fallback UDP socket interface that stubs out all special functionality
///
/// Unlike a standard tokio UDP socket, this allows ECN bits to be read and written on some
/// platforms.
/// Used when a better implementation is not available for a particular target, at the cost of
/// reduced performance compared to that enabled by some target-specific interfaces.
#[derive(Debug)]
pub struct UdpSocket {
io: tokio::net::UdpSocket,
pub struct UdpSocketState {
last_send_error: Instant,
}

impl UdpSocket {
pub fn from_std(socket: std::net::UdpSocket) -> io::Result<UdpSocket> {
socket.set_nonblocking(true)?;
impl UdpSocketState {
pub fn new() -> Self {
let now = Instant::now();
Ok(UdpSocket {
io: tokio::net::UdpSocket::from_std(socket)?,
Self {
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
})
}
}

pub fn configure(socket: UdpSockRef<'_>) -> io::Result<()> {
socket.0.set_nonblocking(true)
}

pub fn poll_send(
pub fn send(
&mut self,
socket: UdpSockRef<'_>,
_state: &UdpState,
cx: &mut Context,
transmits: &[Transmit],
) -> Poll<Result<usize, io::Error>> {
) -> Result<usize, io::Error> {
let mut sent = 0;
for transmit in transmits {
match self
.io
.poll_send_to(cx, &transmit.contents, transmit.destination)
{
Poll::Ready(Ok(_)) => {
match socket.0.send_to(
&transmit.contents,
&socket2::SockAddr::from(transmit.destination),
) {
Ok(_) => {
sent += 1;
}
// We need to report that some packets were sent in this case, so we rely on
// errors being either harmlessly transient (in the case of WouldBlock) or
// recurring on the next call.
Poll::Ready(Err(_)) | Poll::Pending if sent != 0 => return Poll::Ready(Ok(sent)),
Poll::Ready(Err(e)) => {
// WouldBlock is expected to be returned as `Poll::Pending`
debug_assert!(e.kind() != io::ErrorKind::WouldBlock);
Err(_) if sent != 0 => return Ok(sent),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
return Err(e);
}

// Errors are ignored, since they will ususally be handled
// Other errors are ignored, since they will ususally be handled
// by higher level retransmits and timeouts.
// - PermissionDenied errors have been observed due to iptable rules.
// Those are not fatal errors, since the
Expand All @@ -62,34 +61,39 @@ impl UdpSocket {
log_sendmsg_error(&mut self.last_send_error, e, transmit);
sent += 1;
}
Poll::Pending => return Poll::Pending,
}
}
Poll::Ready(Ok(sent))
Ok(sent)
}

pub fn poll_recv(
pub fn recv(
&self,
cx: &mut Context,
socket: UdpSockRef<'_>,
bufs: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> Poll<io::Result<usize>> {
debug_assert!(!bufs.is_empty());
let mut buf = ReadBuf::new(&mut bufs[0]);
let addr = ready!(self.io.poll_recv_from(cx, &mut buf))?;
let len = buf.filled().len();
) -> io::Result<usize> {
// Safety: both `IoSliceMut` and `MaybeUninitSlice` promise to have the
// same layout, that of `iovec`/`WSABUF`. Furthermore `recv_vectored`
// promises to not write unitialised bytes to the `bufs` and pass it
// directly to the `recvmsg` system call, so this is safe.
let bufs = unsafe {
&mut *(bufs as *mut [IoSliceMut<'_>] as *mut [socket2::MaybeUninitSlice<'_>])
};
let (len, _flags, addr) = socket.0.recv_from_vectored(bufs)?;
meta[0] = RecvMeta {
len,
stride: len,
addr,
addr: addr.as_socket().unwrap(),
ecn: None,
dst_ip: None,
};
Poll::Ready(Ok(1))
Ok(1)
}
}

pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.local_addr()
impl Default for UdpSocketState {
fn default() -> Self {
Self::new()
}
}

Expand Down
42 changes: 32 additions & 10 deletions quinn-udp/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
//! Uniform interface to send/recv UDP packets with ECN information.
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
use std::{
net::{IpAddr, Ipv6Addr, SocketAddr},
sync::atomic::{AtomicUsize, Ordering},
Expand All @@ -8,15 +12,6 @@ use std::{
use proto::{EcnCodepoint, Transmit};
use tracing::warn;

macro_rules! ready {
($e:expr $(,)?) => {
match $e {
std::task::Poll::Ready(t) => t,
std::task::Poll::Pending => return std::task::Poll::Pending,
}
};
}

#[cfg(unix)]
mod cmsg;
#[cfg(unix)]
Expand All @@ -28,7 +23,7 @@ mod imp;
#[path = "fallback.rs"]
mod imp;

pub use imp::UdpSocket;
pub use imp::UdpSocketState;

/// Number of UDP packets to send/receive at a time
pub const BATCH_SIZE: usize = imp::BATCH_SIZE;
Expand Down Expand Up @@ -114,3 +109,30 @@ fn log_sendmsg_error(
err, transmit.destination, transmit.src_ip, transmit.ecn, transmit.contents.len(), transmit.segment_size);
}
}

/// A borrowed UDP socket
///
/// On Unix, constructible via `From<T: AsRawFd>`. On Windows, constructible via `From<T:
/// AsRawSocket>`.
// Wrapper around socket2 to avoid making it a public dependency and incurring stability risk
pub struct UdpSockRef<'a>(socket2::SockRef<'a>);

#[cfg(unix)]
impl<'s, S> From<&'s S> for UdpSockRef<'s>
where
S: AsRawFd,
{
fn from(socket: &'s S) -> Self {
Self(socket.into())
}
}

#[cfg(windows)]
impl<'s, S> From<&'s S> for UdpSockRef<'s>
where
S: AsRawSocket,
{
fn from(socket: &'s S) -> Self {
Self(socket.into())
}
}
Loading