-
Notifications
You must be signed in to change notification settings - Fork 290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace mio polling with filedescriptor's poll() #735
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,241 +1,11 @@ | ||
use std::{collections::VecDeque, io, time::Duration}; | ||
#[cfg(feature = "use-dev-tty")] | ||
pub(crate) mod tty; | ||
|
||
use mio::{unix::SourceFd, Events, Interest, Poll, Token}; | ||
use signal_hook_mio::v0_8::Signals; | ||
#[cfg(not(feature = "use-dev-tty"))] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then here we can use the mio feature flag |
||
pub(crate) mod mio; | ||
|
||
use crate::Result; | ||
#[cfg(feature = "use-dev-tty")] | ||
pub(crate) use self::tty::UnixInternalEventSource; | ||
|
||
#[cfg(feature = "event-stream")] | ||
use super::super::sys::Waker; | ||
use super::super::{ | ||
source::EventSource, | ||
sys::unix::{ | ||
file_descriptor::{tty_fd, FileDesc}, | ||
parse::parse_event, | ||
}, | ||
timeout::PollTimeout, | ||
Event, InternalEvent, | ||
}; | ||
|
||
// Tokens to identify file descriptor | ||
const TTY_TOKEN: Token = Token(0); | ||
const SIGNAL_TOKEN: Token = Token(1); | ||
#[cfg(feature = "event-stream")] | ||
const WAKE_TOKEN: Token = Token(2); | ||
|
||
// I (@zrzka) wasn't able to read more than 1_022 bytes when testing | ||
// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes | ||
// is enough. | ||
const TTY_BUFFER_SIZE: usize = 1_024; | ||
|
||
pub(crate) struct UnixInternalEventSource { | ||
poll: Poll, | ||
events: Events, | ||
parser: Parser, | ||
tty_buffer: [u8; TTY_BUFFER_SIZE], | ||
tty_fd: FileDesc, | ||
signals: Signals, | ||
#[cfg(feature = "event-stream")] | ||
waker: Waker, | ||
} | ||
|
||
impl UnixInternalEventSource { | ||
pub fn new() -> Result<Self> { | ||
UnixInternalEventSource::from_file_descriptor(tty_fd()?) | ||
} | ||
|
||
pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result<Self> { | ||
let poll = Poll::new()?; | ||
let registry = poll.registry(); | ||
|
||
let tty_raw_fd = input_fd.raw_fd(); | ||
let mut tty_ev = SourceFd(&tty_raw_fd); | ||
registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?; | ||
|
||
let mut signals = Signals::new(&[signal_hook::consts::SIGWINCH])?; | ||
registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?; | ||
|
||
#[cfg(feature = "event-stream")] | ||
let waker = Waker::new(registry, WAKE_TOKEN)?; | ||
|
||
Ok(UnixInternalEventSource { | ||
poll, | ||
events: Events::with_capacity(3), | ||
parser: Parser::default(), | ||
tty_buffer: [0u8; TTY_BUFFER_SIZE], | ||
tty_fd: input_fd, | ||
signals, | ||
#[cfg(feature = "event-stream")] | ||
waker, | ||
}) | ||
} | ||
} | ||
|
||
impl EventSource for UnixInternalEventSource { | ||
fn try_read(&mut self, timeout: Option<Duration>) -> Result<Option<InternalEvent>> { | ||
if let Some(event) = self.parser.next() { | ||
return Ok(Some(event)); | ||
} | ||
|
||
let timeout = PollTimeout::new(timeout); | ||
|
||
loop { | ||
if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) { | ||
// Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds. | ||
// Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned). | ||
// https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes | ||
if e.kind() == io::ErrorKind::Interrupted { | ||
continue; | ||
} else { | ||
return Err(e); | ||
} | ||
}; | ||
|
||
if self.events.is_empty() { | ||
// No readiness events = timeout | ||
return Ok(None); | ||
} | ||
|
||
for token in self.events.iter().map(|x| x.token()) { | ||
match token { | ||
TTY_TOKEN => { | ||
loop { | ||
match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) { | ||
Ok(read_count) => { | ||
if read_count > 0 { | ||
self.parser.advance( | ||
&self.tty_buffer[..read_count], | ||
read_count == TTY_BUFFER_SIZE, | ||
); | ||
} | ||
} | ||
Err(e) => { | ||
// No more data to read at the moment. We will receive another event | ||
if e.kind() == io::ErrorKind::WouldBlock { | ||
break; | ||
} | ||
// once more data is available to read. | ||
else if e.kind() == io::ErrorKind::Interrupted { | ||
continue; | ||
} | ||
} | ||
}; | ||
|
||
if let Some(event) = self.parser.next() { | ||
return Ok(Some(event)); | ||
} | ||
} | ||
} | ||
SIGNAL_TOKEN => { | ||
for signal in self.signals.pending() { | ||
match signal { | ||
signal_hook::consts::SIGWINCH => { | ||
// TODO Should we remove tput? | ||
// | ||
// This can take a really long time, because terminal::size can | ||
// launch new process (tput) and then it parses its output. It's | ||
// not a really long time from the absolute time point of view, but | ||
// it's a really long time from the mio, async-std/tokio executor, ... | ||
// point of view. | ||
let new_size = crate::terminal::size()?; | ||
return Ok(Some(InternalEvent::Event(Event::Resize( | ||
new_size.0, new_size.1, | ||
)))); | ||
} | ||
_ => unreachable!("Synchronize signal registration & handling"), | ||
}; | ||
} | ||
} | ||
#[cfg(feature = "event-stream")] | ||
WAKE_TOKEN => { | ||
return Err(std::io::Error::new( | ||
std::io::ErrorKind::Interrupted, | ||
"Poll operation was woken up by `Waker::wake`", | ||
)); | ||
} | ||
_ => unreachable!("Synchronize Evented handle registration & token handling"), | ||
} | ||
} | ||
|
||
// Processing above can take some time, check if timeout expired | ||
if timeout.elapsed() { | ||
return Ok(None); | ||
} | ||
} | ||
} | ||
|
||
#[cfg(feature = "event-stream")] | ||
fn waker(&self) -> Waker { | ||
self.waker.clone() | ||
} | ||
} | ||
|
||
// | ||
// Following `Parser` structure exists for two reasons: | ||
// | ||
// * mimic anes Parser interface | ||
// * move the advancing, parsing, ... stuff out of the `try_read` method | ||
// | ||
#[derive(Debug)] | ||
struct Parser { | ||
buffer: Vec<u8>, | ||
internal_events: VecDeque<InternalEvent>, | ||
} | ||
|
||
impl Default for Parser { | ||
fn default() -> Self { | ||
Parser { | ||
// This buffer is used for -> 1 <- ANSI escape sequence. Are we | ||
// aware of any ANSI escape sequence that is bigger? Can we make | ||
// it smaller? | ||
// | ||
// Probably not worth spending more time on this as "there's a plan" | ||
// to use the anes crate parser. | ||
buffer: Vec::with_capacity(256), | ||
// TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can | ||
// fit? What is an average sequence length? Let's guess here | ||
// and say that the average ANSI escape sequence length is 8 bytes. Thus | ||
// the buffer size should be 1024/8=128 to avoid additional allocations | ||
// when processing large amounts of data. | ||
// | ||
// There's no need to make it bigger, because when you look at the `try_read` | ||
// method implementation, all events are consumed before the next TTY_BUFFER | ||
// is processed -> events pushed. | ||
internal_events: VecDeque::with_capacity(128), | ||
} | ||
} | ||
} | ||
|
||
impl Parser { | ||
fn advance(&mut self, buffer: &[u8], more: bool) { | ||
for (idx, byte) in buffer.iter().enumerate() { | ||
let more = idx + 1 < buffer.len() || more; | ||
|
||
self.buffer.push(*byte); | ||
|
||
match parse_event(&self.buffer, more) { | ||
Ok(Some(ie)) => { | ||
self.internal_events.push_back(ie); | ||
self.buffer.clear(); | ||
} | ||
Ok(None) => { | ||
// Event can't be parsed, because we don't have enough bytes for | ||
// the current sequence. Keep the buffer and process next bytes. | ||
} | ||
Err(_) => { | ||
// Event can't be parsed (not enough parameters, parameter is not a number, ...). | ||
// Clear the buffer and continue with another sequence. | ||
self.buffer.clear(); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl Iterator for Parser { | ||
type Item = InternalEvent; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
self.internal_events.pop_front() | ||
} | ||
} | ||
#[cfg(not(feature = "use-dev-tty"))] | ||
pub(crate) use self::mio::UnixInternalEventSource; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont we want to make a freature flag: mio and refer to the mio dependency, and make this enabled by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which would you like to be enabled by default?
I think default features are a bit of a pain to deal with since you can't disable them individually, so if we keep mio as the default by making it a default feature it would be troublesome for users to opt out, as well as possibly breaking. this seemed cleaner since it makes the new behavior opt in and the feature can be deprecated easily later. Also it's intended to include #407.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. So then we temp can not opt out from mio but can opt in for the new feature.