Skip to content

Commit

Permalink
fs,sync: expose poll_ fns on misc types (#3308)
Browse files Browse the repository at this point in the history
Includes methods on:

* fs::DirEntry
* io::Lines
* io::Split
* sync::mpsc::Receiver
* sync::misc::UnboundedReceiver
  • Loading branch information
Darksonn authored Dec 22, 2020
1 parent f95ad18 commit 0b83b3b
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 39 deletions.
20 changes: 19 additions & 1 deletion tokio/src/fs/read_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,25 @@ impl ReadDir {
poll_fn(|cx| self.poll_next_entry(cx)).await
}

fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Option<DirEntry>>> {
/// Polls for the next directory entry in the stream.
///
/// This method returns:
///
/// * `Poll::Pending` if the next directory entry is not yet available.
/// * `Poll::Ready(Ok(Some(entry)))` if the next directory entry is available.
/// * `Poll::Ready(Ok(None))` if there are no more directory entries in this
/// stream.
/// * `Poll::Ready(Err(err))` if an IO error occurred while reading the next
/// directory entry.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when the next directory entry
/// becomes available on the underlying IO resource.
///
/// Note that on multiple calls to `poll_next_entry`, only the `Waker` from
/// the `Context` passed to the most recent call is scheduled to receive a
/// wakeup.
pub fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<Option<DirEntry>>> {
loop {
match self.0 {
State::Idle(ref mut std) => {
Expand Down
18 changes: 17 additions & 1 deletion tokio/src/io/util/lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,23 @@ impl<R> Lines<R>
where
R: AsyncBufRead,
{
fn poll_next_line(
/// Polls for the next line in the stream.
///
/// This method returns:
///
/// * `Poll::Pending` if the next line is not yet available.
/// * `Poll::Ready(Ok(Some(line)))` if the next line is available.
/// * `Poll::Ready(Ok(None))` if there are no more lines in this stream.
/// * `Poll::Ready(Err(err))` if an IO error occurred while reading the next line.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when more bytes become
/// available on the underlying IO resource.
///
/// Note that on multiple calls to `poll_next_line`, only the `Waker` from
/// the `Context` passed to the most recent call is scheduled to receive a
/// wakeup.
pub fn poll_next_line(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<Option<String>>> {
Expand Down
19 changes: 18 additions & 1 deletion tokio/src/io/util/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,24 @@ impl<R> Split<R>
where
R: AsyncBufRead,
{
fn poll_next_segment(
/// Polls for the next segment in the stream.
///
/// This method returns:
///
/// * `Poll::Pending` if the next segment is not yet available.
/// * `Poll::Ready(Ok(Some(segment)))` if the next segment is available.
/// * `Poll::Ready(Ok(None))` if there are no more segments in this stream.
/// * `Poll::Ready(Err(err))` if an IO error occurred while reading the
/// next segment.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when more bytes become
/// available on the underlying IO resource.
///
/// Note that on multiple calls to `poll_next_segment`, only the `Waker`
/// from the `Context` passed to the most recent call is scheduled to
/// receive a wakeup.
pub fn poll_next_segment(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<Option<Vec<u8>>>> {
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ mod util;
/// release, most of the Tokio stream utilities have been moved into the [`tokio-stream`]
/// crate.
///
/// # Why was `Stream` no included in Tokio 1.0?
/// # Why was `Stream` not included in Tokio 1.0?
///
/// Originally, we had planned to ship Tokio 1.0 with a stable `Stream` type
/// but unfortunetly the [RFC] had not been merged in time for `Stream` to
Expand All @@ -424,6 +424,7 @@ mod util;
/// to create a `impl Stream` from `async fn` using the [`async-stream`] crate.
///
/// [`tokio-stream`]: https://docs.rs/tokio-stream
/// [`async-stream`]: https://docs.rs/async-stream
/// [RFC]: https://github.com/rust-lang/rfcs/pull/2996
///
/// # Example
Expand Down
8 changes: 3 additions & 5 deletions tokio/src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,9 @@ impl TcpListener {
/// Polls to accept a new incoming connection to this listener.
///
/// If there is no connection to accept, `Poll::Pending` is returned and the
/// current task will be notified by a waker.
///
/// When ready, the most recent task that called `poll_accept` is notified.
/// The caller is responsible to ensure that `poll_accept` is called from a
/// single task. Failing to do this could result in tasks hanging.
/// current task will be notified by a waker. Note that on multiple calls
/// to `poll_accept`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
loop {
let ev = ready!(self.io.registration().poll_read_ready(cx))?;
Expand Down
10 changes: 4 additions & 6 deletions tokio/src/net/unix/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,10 @@ impl UnixListener {

/// Polls to accept a new incoming connection to this listener.
///
/// If there is no connection to accept, `Poll::Pending` is returned and
/// the current task will be notified by a waker.
///
/// When ready, the most recent task that called `poll_accept` is notified.
/// The caller is responsible to ensure that `poll_accept` is called from a
/// single task. Failing to do this could result in tasks hanging.
/// If there is no connection to accept, `Poll::Pending` is returned and the
/// current task will be notified by a waker. Note that on multiple calls
/// to `poll_accept`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
let (sock, addr) = ready!(self.io.registration().poll_read_io(cx, || self.io.accept()))?;
let addr = SocketAddr(addr);
Expand Down
25 changes: 19 additions & 6 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ cfg_time! {
}

use std::fmt;
#[cfg(any(feature = "signal", feature = "process"))]
use std::task::{Context, Poll};

/// Send values to the associated `Receiver`.
Expand Down Expand Up @@ -147,11 +146,6 @@ impl<T> Receiver<T> {
poll_fn(|cx| self.chan.recv(cx)).await
}

#[cfg(any(feature = "signal", feature = "process"))]
pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Blocking receive to call outside of asynchronous contexts.
///
/// # Panics
Expand Down Expand Up @@ -243,6 +237,25 @@ impl<T> Receiver<T> {
pub fn close(&mut self) {
self.chan.close();
}

/// Polls to receive the next message on this channel.
///
/// This method returns:
///
/// * `Poll::Pending` if no messages are available but the channel is not
/// closed.
/// * `Poll::Ready(Some(message))` if a message is available.
/// * `Poll::Ready(None)` if the channel has been closed and all messages
/// sent before it was closed have been received.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
}

impl<T> fmt::Debug for Receiver<T> {
Expand Down
23 changes: 19 additions & 4 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ impl<T> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}

fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Receives the next value for this receiver.
///
/// `None` is returned when all `Sender` halves have dropped, indicating
Expand Down Expand Up @@ -159,6 +155,25 @@ impl<T> UnboundedReceiver<T> {
pub fn close(&mut self) {
self.chan.close();
}

/// Polls to receive the next message on this channel.
///
/// This method returns:
///
/// * `Poll::Pending` if no messages are available but the channel is not
/// closed.
/// * `Poll::Ready(Some(message))` if a message is available.
/// * `Poll::Ready(None)` if the channel has been closed and all messages
/// sent before it was closed have been received.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided
/// `Context` is scheduled to receive a wakeup when a message is sent on any
/// receiver, or when the channel is closed. Note that on multiple calls to
/// `poll_recv`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup.
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
}

impl<T> UnboundedSender<T> {
Expand Down
41 changes: 27 additions & 14 deletions tokio/tests/support/mpsc_stream.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,42 @@
#![allow(dead_code)]

use async_stream::stream;
use tokio::sync::mpsc::{self, Sender, UnboundedSender};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
use tokio_stream::Stream;

struct UnboundedStream<T> {
recv: UnboundedReceiver<T>,
}
impl<T> Stream for UnboundedStream<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
Pin::into_inner(self).recv.poll_recv(cx)
}
}

pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) {
let (tx, mut rx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();

let stream = stream! {
while let Some(item) = rx.recv().await {
yield item;
}
};
let stream = UnboundedStream { recv: rx };

(tx, stream)
}

struct BoundedStream<T> {
recv: Receiver<T>,
}
impl<T> Stream for BoundedStream<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
Pin::into_inner(self).recv.poll_recv(cx)
}
}

pub fn channel_stream<T: Unpin>(size: usize) -> (Sender<T>, impl Stream<Item = T>) {
let (tx, mut rx) = mpsc::channel(size);
let (tx, rx) = mpsc::channel(size);

let stream = stream! {
while let Some(item) = rx.recv().await {
yield item;
}
};
let stream = BoundedStream { recv: rx };

(tx, stream)
}

0 comments on commit 0b83b3b

Please sign in to comment.