Skip to content

Commit faf9306

Browse files
committed
io: remove poll_{read,write}_buf from traits
These functions have object safety issues. It also has been decided to avoid vectored operations on the I/O traits. A later PR will bring back vectored operations on specific types that support them. Refs: #2879, #2716
1 parent ffa5bdb commit faf9306

19 files changed

+34
-580
lines changed

tokio-test/src/io.rs

-11
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
2222
use tokio::sync::mpsc;
2323
use tokio::time::{self, Delay, Duration, Instant};
2424

25-
use bytes::Buf;
2625
use futures_core::ready;
2726
use std::collections::VecDeque;
2827
use std::future::Future;
@@ -439,16 +438,6 @@ impl AsyncWrite for Mock {
439438
}
440439
}
441440

442-
fn poll_write_buf<B: Buf>(
443-
self: Pin<&mut Self>,
444-
cx: &mut task::Context<'_>,
445-
buf: &mut B,
446-
) -> Poll<io::Result<usize>> {
447-
let n = ready!(self.poll_write(cx, buf.bytes()))?;
448-
buf.advance(n);
449-
Poll::Ready(Ok(n))
450-
}
451-
452441
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
453442
Poll::Ready(Ok(()))
454443
}

tokio-util/src/codec/framed_impl.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ where
118118
type Item = Result<U::Item, U::Error>;
119119

120120
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121+
use crate::io::poll_read_buf;
122+
121123
let mut pinned = self.project();
122124
let state: &mut ReadFrame = pinned.state.borrow_mut();
123125
loop {
@@ -148,7 +150,7 @@ where
148150
// got room for at least one byte to read to ensure that we don't
149151
// get a spurious 0 that looks like EOF
150152
state.buffer.reserve(1);
151-
let bytect = match pinned.inner.as_mut().poll_read_buf(cx, &mut state.buffer)? {
153+
let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? {
152154
Poll::Ready(ct) => ct,
153155
Poll::Pending => return Poll::Pending,
154156
};

tokio-util/src/io/mod.rs

+25
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,28 @@ mod stream_reader;
1111

1212
pub use self::reader_stream::ReaderStream;
1313
pub use self::stream_reader::StreamReader;
14+
15+
use tokio::io::{AsyncRead, ReadBuf};
16+
17+
use bytes::BufMut;
18+
use futures_core::ready;
19+
use std::io;
20+
use std::pin::Pin;
21+
use std::task::{Context, Poll};
22+
23+
pub(crate) fn poll_read_buf<T: AsyncRead>(cx: &mut Context<'_>, io: Pin<&mut T>, buf: &mut impl BufMut) -> Poll<io::Result<usize>> {
24+
if !buf.has_remaining_mut() {
25+
return Poll::Ready(Ok(0));
26+
}
27+
28+
let mut b = ReadBuf::uninit(buf.bytes_mut());
29+
30+
ready!(io.poll_read(cx, &mut b))?;
31+
let n = b.filled().len();
32+
33+
// Safety: we can assume `n` bytes were read, since they are in`filled`.
34+
unsafe {
35+
buf.advance_mut(n);
36+
}
37+
Poll::Ready(Ok(n))
38+
}

tokio-util/src/io/reader_stream.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ impl<R: AsyncRead> ReaderStream<R> {
7070
impl<R: AsyncRead> Stream for ReaderStream<R> {
7171
type Item = std::io::Result<Bytes>;
7272
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
73+
use crate::io::poll_read_buf;
74+
7375
let mut this = self.as_mut().project();
7476

7577
let reader = match this.reader.as_pin_mut() {
@@ -81,7 +83,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> {
8183
this.buf.reserve(CAPACITY);
8284
}
8385

84-
match reader.poll_read_buf(cx, &mut this.buf) {
86+
match poll_read_buf(cx, reader, &mut this.buf) {
8587
Poll::Pending => Poll::Pending,
8688
Poll::Ready(Err(err)) => {
8789
self.project().reader.set(None);

tokio-util/src/io/stream_reader.rs

+1-24
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use bytes::{Buf, BufMut};
1+
use bytes::Buf;
22
use futures_core::stream::Stream;
33
use pin_project_lite::pin_project;
44
use std::io;
@@ -119,29 +119,6 @@ where
119119
self.consume(len);
120120
Poll::Ready(Ok(()))
121121
}
122-
fn poll_read_buf<BM: BufMut>(
123-
mut self: Pin<&mut Self>,
124-
cx: &mut Context<'_>,
125-
buf: &mut BM,
126-
) -> Poll<io::Result<usize>>
127-
where
128-
Self: Sized,
129-
{
130-
if !buf.has_remaining_mut() {
131-
return Poll::Ready(Ok(0));
132-
}
133-
134-
let inner_buf = match self.as_mut().poll_fill_buf(cx) {
135-
Poll::Ready(Ok(buf)) => buf,
136-
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
137-
Poll::Pending => return Poll::Pending,
138-
};
139-
let len = std::cmp::min(inner_buf.len(), buf.remaining_mut());
140-
buf.put_slice(&inner_buf[..len]);
141-
142-
self.consume(len);
143-
Poll::Ready(Ok(len))
144-
}
145122
}
146123

147124
impl<S, B, E> AsyncBufRead for StreamReader<S, B>

tokio/Cargo.toml

+1-2
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ signal = [
8383
stream = ["futures-core"]
8484
sync = ["fnv"]
8585
test-util = []
86-
tcp = ["io-driver", "iovec"]
86+
tcp = ["io-driver"]
8787
time = ["slab"]
8888
udp = ["io-driver"]
8989
uds = ["io-driver", "mio-uds", "libc"]
@@ -100,7 +100,6 @@ futures-core = { version = "0.3.0", optional = true }
100100
lazy_static = { version = "1.0.2", optional = true }
101101
memchr = { version = "2.2", optional = true }
102102
mio = { version = "0.6.20", optional = true }
103-
iovec = { version = "0.1.4", optional = true }
104103
num_cpus = { version = "1.8.0", optional = true }
105104
parking_lot = { version = "0.11.0", optional = true } # Not in full
106105
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`

tokio/src/io/async_read.rs

-31
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use super::ReadBuf;
2-
use bytes::BufMut;
32
use std::io;
43
use std::ops::DerefMut;
54
use std::pin::Pin;
@@ -54,36 +53,6 @@ pub trait AsyncRead {
5453
cx: &mut Context<'_>,
5554
buf: &mut ReadBuf<'_>,
5655
) -> Poll<io::Result<()>>;
57-
58-
/// Pulls some bytes from this source into the specified `BufMut`, returning
59-
/// how many bytes were read.
60-
///
61-
/// The `buf` provided will have bytes read into it and the internal cursor
62-
/// will be advanced if any bytes were read. Note that this method typically
63-
/// will not reallocate the buffer provided.
64-
fn poll_read_buf<B: BufMut>(
65-
self: Pin<&mut Self>,
66-
cx: &mut Context<'_>,
67-
buf: &mut B,
68-
) -> Poll<io::Result<usize>>
69-
where
70-
Self: Sized,
71-
{
72-
if !buf.has_remaining_mut() {
73-
return Poll::Ready(Ok(0));
74-
}
75-
76-
let mut b = ReadBuf::uninit(buf.bytes_mut());
77-
78-
ready!(self.poll_read(cx, &mut b))?;
79-
let n = b.filled().len();
80-
81-
// Safety: we can assume `n` bytes were read, since they are in`filled`.
82-
unsafe {
83-
buf.advance_mut(n);
84-
}
85-
Poll::Ready(Ok(n))
86-
}
8756
}
8857

8958
macro_rules! deref_async_read {

tokio/src/io/async_write.rs

-22
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use bytes::Buf;
21
use std::io;
32
use std::ops::DerefMut;
43
use std::pin::Pin;
@@ -128,27 +127,6 @@ pub trait AsyncWrite {
128127
/// This function will panic if not called within the context of a future's
129128
/// task.
130129
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
131-
132-
/// Writes a `Buf` into this value, returning how many bytes were written.
133-
///
134-
/// Note that this method will advance the `buf` provided automatically by
135-
/// the number of bytes written.
136-
fn poll_write_buf<B: Buf>(
137-
self: Pin<&mut Self>,
138-
cx: &mut Context<'_>,
139-
buf: &mut B,
140-
) -> Poll<Result<usize, io::Error>>
141-
where
142-
Self: Sized,
143-
{
144-
if !buf.has_remaining() {
145-
return Poll::Ready(Ok(0));
146-
}
147-
148-
let n = ready!(self.poll_write(cx, buf.bytes()))?;
149-
buf.advance(n);
150-
Poll::Ready(Ok(n))
151-
}
152130
}
153131

154132
macro_rules! deref_async_write {

tokio/src/io/split.rs

-19
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
77
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
88

9-
use bytes::{Buf, BufMut};
109
use std::cell::UnsafeCell;
1110
use std::fmt;
1211
use std::io;
@@ -107,15 +106,6 @@ impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
107106
let mut inner = ready!(self.inner.poll_lock(cx));
108107
inner.stream_pin().poll_read(cx, buf)
109108
}
110-
111-
fn poll_read_buf<B: BufMut>(
112-
self: Pin<&mut Self>,
113-
cx: &mut Context<'_>,
114-
buf: &mut B,
115-
) -> Poll<io::Result<usize>> {
116-
let mut inner = ready!(self.inner.poll_lock(cx));
117-
inner.stream_pin().poll_read_buf(cx, buf)
118-
}
119109
}
120110

121111
impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
@@ -137,15 +127,6 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
137127
let mut inner = ready!(self.inner.poll_lock(cx));
138128
inner.stream_pin().poll_shutdown(cx)
139129
}
140-
141-
fn poll_write_buf<B: Buf>(
142-
self: Pin<&mut Self>,
143-
cx: &mut Context<'_>,
144-
buf: &mut B,
145-
) -> Poll<Result<usize, io::Error>> {
146-
let mut inner = ready!(self.inner.poll_lock(cx));
147-
inner.stream_pin().poll_write_buf(cx, buf)
148-
}
149130
}
150131

151132
impl<T> Inner<T> {

tokio/src/io/util/async_read_ext.rs

-68
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::io::util::chain::{chain, Chain};
22
use crate::io::util::read::{read, Read};
3-
use crate::io::util::read_buf::{read_buf, ReadBuf};
43
use crate::io::util::read_exact::{read_exact, ReadExact};
54
use crate::io::util::read_int::{
65
ReadI128, ReadI128Le, ReadI16, ReadI16Le, ReadI32, ReadI32Le, ReadI64, ReadI64Le, ReadI8,
@@ -13,8 +12,6 @@ use crate::io::util::read_to_string::{read_to_string, ReadToString};
1312
use crate::io::util::take::{take, Take};
1413
use crate::io::AsyncRead;
1514

16-
use bytes::BufMut;
17-
1815
cfg_io_util! {
1916
/// Defines numeric reader
2017
macro_rules! read_impl {
@@ -166,71 +163,6 @@ cfg_io_util! {
166163
read(self, buf)
167164
}
168165

169-
/// Pulls some bytes from this source into the specified buffer,
170-
/// advancing the buffer's internal cursor.
171-
///
172-
/// Equivalent to:
173-
///
174-
/// ```ignore
175-
/// async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> io::Result<usize>;
176-
/// ```
177-
///
178-
/// Usually, only a single `read` syscall is issued, even if there is
179-
/// more space in the supplied buffer.
180-
///
181-
/// This function does not provide any guarantees about whether it
182-
/// completes immediately or asynchronously
183-
///
184-
/// # Return
185-
///
186-
/// On a successful read, the number of read bytes is returned. If the
187-
/// supplied buffer is not empty and the function returns `Ok(0)` then
188-
/// the source as reached an "end-of-file" event.
189-
///
190-
/// # Errors
191-
///
192-
/// If this function encounters any form of I/O or other error, an error
193-
/// variant will be returned. If an error is returned then it must be
194-
/// guaranteed that no bytes were read.
195-
///
196-
/// # Examples
197-
///
198-
/// [`File`] implements `Read` and [`BytesMut`] implements [`BufMut`]:
199-
///
200-
/// [`File`]: crate::fs::File
201-
/// [`BytesMut`]: bytes::BytesMut
202-
/// [`BufMut`]: bytes::BufMut
203-
///
204-
/// ```no_run
205-
/// use tokio::fs::File;
206-
/// use tokio::io::{self, AsyncReadExt};
207-
///
208-
/// use bytes::BytesMut;
209-
///
210-
/// #[tokio::main]
211-
/// async fn main() -> io::Result<()> {
212-
/// let mut f = File::open("foo.txt").await?;
213-
/// let mut buffer = BytesMut::with_capacity(10);
214-
///
215-
/// assert!(buffer.is_empty());
216-
///
217-
/// // read up to 10 bytes, note that the return value is not needed
218-
/// // to access the data that was read as `buffer`'s internal
219-
/// // cursor is updated.
220-
/// f.read_buf(&mut buffer).await?;
221-
///
222-
/// println!("The bytes: {:?}", &buffer[..]);
223-
/// Ok(())
224-
/// }
225-
/// ```
226-
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
227-
where
228-
Self: Sized + Unpin,
229-
B: BufMut,
230-
{
231-
read_buf(self, buf)
232-
}
233-
234166
/// Reads the exact number of bytes required to fill `buf`.
235167
///
236168
/// Equivalent to:

0 commit comments

Comments
 (0)