Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async-std support #1343

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ all-features = true
maintenance = { status = "experimental" }

[dependencies]
async-io = { version = "1.6", optional = true }
libc = "0.2.69"
mio = { version = "0.8", features = ["net", "os-poll"] }
mio = { version = "0.8", features = ["net", "os-poll"], optional = true }
proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.8", default-features = false }
socket2 = "0.4"
tracing = "0.1.10"
tokio = { version = "1.0.1", features = ["net"] }
tokio = { version = "1.0.1", features = ["net"], optional = true }

[features]
runtime-async-std = [ "async-io" ]
runtime-tokio = [ "tokio", "mio" ]
108 changes: 108 additions & 0 deletions quinn-udp/src/async-std/fallback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use std::{
io::{self, IoSliceMut},
net::SocketAddr,
task::{Context, Poll},
time::Instant,
};

use proto::Transmit;

use super::{log_sendmsg_error, RecvMeta, UdpState, IO_ERROR_LOG_INTERVAL};

/// Tokio-compatible UDP socket with some useful specializations.
///
/// Unlike a standard tokio UDP socket, this allows ECN bits to be read and written on some
/// platforms.
#[derive(Debug)]
pub struct UdpSocket {
io: async_io::Async<std::net::UdpSocket>,
last_send_error: Instant,
}

impl UdpSocket {
pub fn from_std(socket: std::net::UdpSocket) -> io::Result<UdpSocket> {
socket.set_nonblocking(true)?;
let now = Instant::now();
Ok(UdpSocket {
io: async_io::Async::new(socket)?,
last_send_error: now.checked_sub(2 * IO_ERROR_LOG_INTERVAL).unwrap_or(now),
})
}

pub fn poll_send(
&mut self,
_state: &UdpState,
cx: &mut Context,
transmits: &[Transmit],
) -> Poll<Result<usize, io::Error>> {
let mut sent = 0;
for transmit in transmits {
let e = match self.io.poll_writable(cx) {
Poll::Ready(Ok(_)) => {
match self
.io
.get_mut()
.send_to(&transmit.contents, transmit.destination)
{
Ok(_) => {
sent += 1;
None
}
Err(e) => Some(e),
}
}
// We need to report that some packets were sent in this case, so we rely on
// errors being either harmlessly transient (in the case of WouldBlock) or
// recurring on the next call.
Poll::Ready(Err(_)) | Poll::Pending if sent != 0 => return Poll::Ready(Ok(sent)),
Poll::Ready(Err(e)) => Some(e),
Poll::Pending => return Poll::Pending,
};
if let Some(e) = e {
// WouldBlock is expected to be returned as `Poll::Pending`
debug_assert!(e.kind() != io::ErrorKind::WouldBlock);

// Errors are ignored, since they will ususally be handled
// by higher level retransmits and timeouts.
// - PermissionDenied errors have been observed due to iptable rules.
// Those are not fatal errors, since the
// configuration can be dynamically changed.
// - Destination unreachable errors have been observed for other
log_sendmsg_error(&mut self.last_send_error, e, transmit);
sent += 1;
}
}
Poll::Ready(Ok(sent))
}

pub fn poll_recv(
&mut self,
cx: &mut Context,
bufs: &mut [IoSliceMut<'_>],
meta: &mut [RecvMeta],
) -> Poll<io::Result<usize>> {
debug_assert!(!bufs.is_empty());
ready!(self.io.poll_readable(cx))?;
let (len, addr) = self.io.get_mut().recv_from(&mut bufs[0])?;
meta[0] = RecvMeta {
len,
addr,
ecn: None,
dst_ip: None,
};
Poll::Ready(Ok(1))
}

pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.get_ref().local_addr()
}
}

/// Returns the platforms UDP socket capabilities
pub fn udp_state() -> super::UdpState {
super::UdpState {
max_gso_segments: std::sync::atomic::AtomicUsize::new(1),
}
}

pub const BATCH_SIZE: usize = 1;
21 changes: 16 additions & 5 deletions quinn-udp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,26 @@ macro_rules! ready {
};
}

#[cfg(unix)]
#[cfg(all(feature = "runtime-tokio", feature = "runtime-async-std"))]
compile_error!("runtime-tokio and runtime-async-std are mutually exclusive");

#[cfg(not(any(feature = "runtime-tokio", feature = "runtime-async-std")))]
compile_error!("choose either runtime-tokio or runtime-async-std");

#[cfg(all(feature = "runtime-tokio", unix))]
mod cmsg;
#[cfg(unix)]
#[path = "unix.rs"]

#[cfg(all(feature = "runtime-tokio", unix))]
#[path = "tokio/unix.rs"]
mod imp;

// No ECN support
#[cfg(not(unix))]
#[path = "fallback.rs"]
#[cfg(all(feature = "runtime-tokio", not(unix)))]
#[path = "tokio/fallback.rs"]
mod imp;

#[cfg(feature = "runtime-async-std")]
#[path = "async-std/fallback.rs"]
mod imp;

pub use imp::UdpSocket;
Expand Down
File renamed without changes.
File renamed without changes.
17 changes: 14 additions & 3 deletions quinn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,39 @@ native-certs = ["proto/native-certs"]
tls-rustls = ["rustls", "webpki", "proto/tls-rustls", "ring"]
# Enables `Endpoint::client` and `Endpoint::server` conveniences
ring = ["proto/ring"]
runtime-tokio = ["tokio", "udp/runtime-tokio"]
runtime-async-std = ["futures-channel", "futures-lite", "async-notify", "async-io", "async-std", "udp/runtime-async-std"]

[badges]
codecov = { repository = "djc/quinn" }
maintenance = { status = "experimental" }

[dependencies]
# for async-std runtime support
async-io = { version = "1.6", optional = true }
# for async-std runtime support
async-notify = { version = "0.2", optional = true }
# for async-std runtime support
async-std = { version = "1.11", optional = true }
bytes = "1"
# Enables futures::io::{AsyncRead, AsyncWrite} support for streams
futures-io = { version = "0.3.19", optional = true }
# for async-std runtime support
futures-channel = { version = "0.3", optional = true }
# Implements futures::Stream for async streams such as `Incoming`
futures-core = { version = "0.3.19", optional = true }
# for async-std runtime support
futures-lite = { version = "1.12", optional = true }
rustc-hash = "1.1"
proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.8", default-features = false }
rustls = { version = "0.20.3", default-features = false, features = ["quic"], optional = true }
thiserror = "1.0.21"
tracing = "0.1.10"
tokio = { version = "1.0.1", features = ["rt", "time", "sync"] }
tokio = { version = "1.0.1", features = ["rt", "time", "sync"], optional = true }
udp = { package = "quinn-udp", path = "../quinn-udp", version = "0.1.0" }
webpki = { version = "0.22", default-features = false, optional = true }

[dev-dependencies]
anyhow = "1.0.22"
async-std = { version = "1.11", features = [ "attributes" ] }
crc = "2"
bencher = "0.1.5"
directories-next = "2"
Expand Down
7 changes: 4 additions & 3 deletions quinn/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ struct Opt {
rebind: bool,
}

fn main() {
#[cfg_attr(feature = "async-std", async_std::main)]
#[cfg_attr(feature = "tokio", tokio::main)]
async fn main() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
Expand All @@ -50,7 +52,7 @@ fn main() {
.unwrap();
let opt = Opt::from_args();
let code = {
if let Err(e) = run(opt) {
if let Err(e) = run(opt).await {
eprintln!("ERROR: {}", e);
1
} else {
Expand All @@ -60,7 +62,6 @@ fn main() {
::std::process::exit(code);
}

#[tokio::main]
async fn run(options: Opt) -> Result<()> {
let url = options.url;
let remote = (url.host_str().unwrap(), url.port().unwrap_or(4433))
Expand Down
16 changes: 11 additions & 5 deletions quinn/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ use structopt::{self, StructOpt};
use tracing::{error, info, info_span};
use tracing_futures::Instrument as _;

#[cfg(feature = "runtime-async-std")]
use async_std::task::spawn;
#[cfg(feature = "runtime-tokio")]
use tokio::spawn;

mod common;

#[derive(StructOpt, Debug)]
Expand All @@ -40,7 +45,9 @@ struct Opt {
listen: SocketAddr,
}

fn main() {
#[cfg_attr(feature = "async-std", async_std::main)]
#[cfg_attr(feature = "tokio", tokio::main)]
async fn main() {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
Expand All @@ -49,7 +56,7 @@ fn main() {
.unwrap();
let opt = Opt::from_args();
let code = {
if let Err(e) = run(opt) {
if let Err(e) = run(opt).await {
eprintln!("ERROR: {}", e);
1
} else {
Expand All @@ -59,7 +66,6 @@ fn main() {
::std::process::exit(code);
}

#[tokio::main]
#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
async fn run(options: Opt) -> Result<()> {
let (certs, key) = if let (Some(key_path), Some(cert_path)) = (&options.key, &options.cert) {
Expand Down Expand Up @@ -150,7 +156,7 @@ async fn run(options: Opt) -> Result<()> {
while let Some(conn) = incoming.next().await {
info!("connection incoming");
let fut = handle_connection(root.clone(), conn);
tokio::spawn(async move {
spawn(async move {
if let Err(e) = fut.await {
error!("connection failed: {reason}", reason = e.to_string())
}
Expand Down Expand Up @@ -192,7 +198,7 @@ async fn handle_connection(root: Arc<Path>, conn: quinn::Connecting) -> Result<(
Ok(s) => s,
};
let fut = handle_request(root.clone(), stream);
tokio::spawn(
spawn(
async move {
if let Err(e) = fut.await {
error!("failed: {reason}", reason = e.to_string());
Expand Down
Loading