From e9fa5db5b622f91445f2163df0bc45339aed4cff Mon Sep 17 00:00:00 2001 From: Jonathan Goren Date: Tue, 6 Sep 2022 14:10:33 +0300 Subject: [PATCH 1/4] replace mio polling with filedescriptor since mio doesn't handle ptys correctly macOS, use filedescriptor's poll() which falls back to select(). --- Cargo.toml | 3 +- src/event/source/unix.rs | 251 ++++++++++++++------------ src/event/sys/unix/file_descriptor.rs | 11 +- src/event/sys/unix/waker.rs | 24 +-- 4 files changed, 161 insertions(+), 128 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 56558baef..cc3b12d9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,9 +56,8 @@ crossterm_winapi = "0.9" # [target.'cfg(unix)'.dependencies] libc = "0.2" -mio = { version = "0.8", features = ["os-poll"] } signal-hook = { version = "0.3.13" } -signal-hook-mio = { version = "0.2.3", features = ["support-v0_8"] } +filedescriptor = "0.8" # # Dev dependencies (examples, ...) diff --git a/src/event/source/unix.rs b/src/event/source/unix.rs index 8a44d5a5d..5f5a26cc2 100644 --- a/src/event/source/unix.rs +++ b/src/event/source/unix.rs @@ -1,27 +1,45 @@ -use std::{collections::VecDeque, io, time::Duration}; +use std::os::unix::prelude::AsRawFd; +use std::{collections::VecDeque, io, os::unix::net::UnixStream, time::Duration}; -use mio::{unix::SourceFd, Events, Interest, Poll, Token}; -use signal_hook_mio::v0_8::Signals; +use signal_hook::low_level::pipe; +use crate::event::timeout::PollTimeout; +use crate::event::Event; use crate::Result; +use filedescriptor::{poll, pollfd, POLLIN}; + #[cfg(feature = "event-stream")] use super::super::sys::Waker; -use super::super::{ - source::EventSource, - sys::unix::{ - file_descriptor::{tty_fd, FileDesc}, - parse::parse_event, + +use super::{ + super::{ + sys::unix::{ + file_descriptor::{tty_fd, FileDesc}, + parse::parse_event, + }, + InternalEvent, }, - timeout::PollTimeout, - Event, InternalEvent, + EventSource, }; -// Tokens to identify file descriptor -const TTY_TOKEN: Token = Token(0); -const SIGNAL_TOKEN: Token = Token(1); +/// Holds a prototypical Waker and a receiver we can wait on when doing select(). #[cfg(feature = "event-stream")] -const WAKE_TOKEN: Token = Token(2); +struct WakePipe { + receiver: UnixStream, + waker: Waker, +} + +#[cfg(feature = "event-stream")] +impl WakePipe { + fn new() -> Result { + let (receiver, sender) = nonblocking_unix_pair()?; + Ok(WakePipe { + receiver, + waker: Waker::new(sender), + }) + } +} // I (@zrzka) wasn't able to read more than 1_022 bytes when testing // reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes @@ -29,14 +47,19 @@ const WAKE_TOKEN: Token = Token(2); const TTY_BUFFER_SIZE: usize = 1_024; pub(crate) struct UnixInternalEventSource { - poll: Poll, - events: Events, parser: Parser, tty_buffer: [u8; TTY_BUFFER_SIZE], - tty_fd: FileDesc, - signals: Signals, + tty: FileDesc, + winch_signal_receiver: UnixStream, #[cfg(feature = "event-stream")] - waker: Waker, + wake_pipe: WakePipe, +} + +fn nonblocking_unix_pair() -> Result<(UnixStream, UnixStream)> { + let (receiver, sender) = UnixStream::pair()?; + receiver.set_nonblocking(true)?; + sender.set_nonblocking(true)?; + Ok((receiver, sender)) } impl UnixInternalEventSource { @@ -45,128 +68,128 @@ impl UnixInternalEventSource { } pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result { - let poll = Poll::new()?; - let registry = poll.registry(); - - let tty_raw_fd = input_fd.raw_fd(); - let mut tty_ev = SourceFd(&tty_raw_fd); - registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?; - - let mut signals = Signals::new(&[signal_hook::consts::SIGWINCH])?; - registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?; - - #[cfg(feature = "event-stream")] - let waker = Waker::new(registry, WAKE_TOKEN)?; - Ok(UnixInternalEventSource { - poll, - events: Events::with_capacity(3), parser: Parser::default(), tty_buffer: [0u8; TTY_BUFFER_SIZE], - tty_fd: input_fd, - signals, + tty: input_fd, + winch_signal_receiver: { + let (receiver, sender) = nonblocking_unix_pair()?; + // Unregistering is unnecessary because EventSource is a singleton + pipe::register(libc::SIGWINCH, sender)?; + receiver + }, #[cfg(feature = "event-stream")] - waker, + wake_pipe: WakePipe::new()?, }) } } +/// read_complete reads from a non-blocking file descriptor +/// until the buffer is full or it would block. +/// +/// Similar to `std::io::Read::read_to_end`, except this function +/// only fills the given buffer and does not read beyond that. +fn read_complete(fd: &FileDesc, buf: &mut [u8]) -> Result { + loop { + match fd.read(buf, buf.len()) { + Ok(x) => return Ok(x), + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => return Ok(0), + io::ErrorKind::Interrupted => continue, + _ => return Err(e), + }, + } + } +} + impl EventSource for UnixInternalEventSource { fn try_read(&mut self, timeout: Option) -> Result> { - if let Some(event) = self.parser.next() { - return Ok(Some(event)); + let timeout = PollTimeout::new(timeout); + + fn make_pollfd(fd: &F) -> pollfd { + pollfd { + fd: fd.as_raw_fd(), + events: POLLIN, + revents: 0, + } } - let timeout = PollTimeout::new(timeout); + #[cfg(not(feature = "event-stream"))] + let mut fds = [ + make_pollfd(&self.tty), + make_pollfd(&self.winch_signal_receiver), + ]; - loop { - if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) { - // Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds. - // Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned). - // https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes - if e.kind() == io::ErrorKind::Interrupted { - continue; - } else { - return Err(e); - } - }; + #[cfg(feature = "event-stream")] + let mut fds = [ + make_pollfd(&self.tty), + make_pollfd(&self.winch_signal_receiver), + make_pollfd(&self.wake_pipe.receiver), + ]; - if self.events.is_empty() { - // No readiness events = timeout - return Ok(None); + while timeout.leftover().map_or(true, |t| !t.is_zero()) { + // check if there are buffered events from the last read + if let Some(event) = self.parser.next() { + return Ok(Some(event)); } - - for token in self.events.iter().map(|x| x.token()) { - match token { - TTY_TOKEN => { - loop { - match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) { - Ok(read_count) => { - if read_count > 0 { - self.parser.advance( - &self.tty_buffer[..read_count], - read_count == TTY_BUFFER_SIZE, - ); - } - } - Err(e) => { - // No more data to read at the moment. We will receive another event - if e.kind() == io::ErrorKind::WouldBlock { - break; - } - // once more data is available to read. - else if e.kind() == io::ErrorKind::Interrupted { - continue; - } - } - }; - - if let Some(event) = self.parser.next() { - return Ok(Some(event)); - } - } + match poll(&mut fds, timeout.leftover()) { + Err(filedescriptor::Error::Io(e)) => return Err(e), + res => res.expect("polling tty"), + }; + if fds[0].events & POLLIN != 0 { + loop { + let read_count = read_complete(&self.tty, &mut self.tty_buffer)?; + if read_count > 0 { + self.parser.advance( + &self.tty_buffer[..read_count], + read_count == TTY_BUFFER_SIZE, + ); } - SIGNAL_TOKEN => { - for signal in self.signals.pending() { - match signal { - signal_hook::consts::SIGWINCH => { - // TODO Should we remove tput? - // - // This can take a really long time, because terminal::size can - // launch new process (tput) and then it parses its output. It's - // not a really long time from the absolute time point of view, but - // it's a really long time from the mio, async-std/tokio executor, ... - // point of view. - let new_size = crate::terminal::size()?; - return Ok(Some(InternalEvent::Event(Event::Resize( - new_size.0, new_size.1, - )))); - } - _ => unreachable!("Synchronize signal registration & handling"), - }; - } + + if let Some(event) = self.parser.next() { + return Ok(Some(event)); } - #[cfg(feature = "event-stream")] - WAKE_TOKEN => { - return Err(std::io::Error::new( - std::io::ErrorKind::Interrupted, - "Poll operation was woken up by `Waker::wake`", - )); + + if read_count == 0 { + break; } - _ => unreachable!("Synchronize Evented handle registration & token handling"), } } + if fds[1].events & POLLIN != 0 { + let fd = FileDesc::new(self.winch_signal_receiver.as_raw_fd(), false); + // drain the pipe + while read_complete(&fd, &mut [0; 1024])? != 0 {} + // TODO Should we remove tput? + // + // This can take a really long time, because terminal::size can + // launch new process (tput) and then it parses its output. It's + // not a really long time from the absolute time point of view, but + // it's a really long time from the mio, async-std/tokio executor, ... + // point of view. + let new_size = crate::terminal::size()?; + return Ok(Some(InternalEvent::Event(Event::Resize( + new_size.0, new_size.1, + )))); + } + + #[cfg(feature = "event-stream")] + if fds[2].events & POLLIN != 0 { + let fd = FileDesc::new(self.wake_pipe.receiver.as_raw_fd(), false); + // drain the pipe + while read_complete(&fd, &mut [0; 1024])? != 0 {} - // Processing above can take some time, check if timeout expired - if timeout.elapsed() { - return Ok(None); + return Err(std::io::Error::new( + std::io::ErrorKind::Interrupted, + "Poll operation was woken up by `Waker::wake`", + )); } } + Ok(None) } #[cfg(feature = "event-stream")] fn waker(&self) -> Waker { - self.waker.clone() + self.wake_pipe.waker.clone() } } diff --git a/src/event/sys/unix/file_descriptor.rs b/src/event/sys/unix/file_descriptor.rs index 8b481766b..025845e86 100644 --- a/src/event/sys/unix/file_descriptor.rs +++ b/src/event/sys/unix/file_descriptor.rs @@ -1,6 +1,9 @@ use std::{ fs, io, - os::unix::io::{IntoRawFd, RawFd}, + os::unix::{ + io::{IntoRawFd, RawFd}, + prelude::AsRawFd, + }, }; use libc::size_t; @@ -63,6 +66,12 @@ impl Drop for FileDesc { } } +impl AsRawFd for FileDesc { + fn as_raw_fd(&self) -> RawFd { + self.raw_fd() + } +} + /// Creates a file descriptor pointing to the standard input or `/dev/tty`. pub fn tty_fd() -> Result { let (fd, close_on_drop) = if unsafe { libc::isatty(libc::STDIN_FILENO) == 1 } { diff --git a/src/event/sys/unix/waker.rs b/src/event/sys/unix/waker.rs index 4509e9ba3..47868f129 100644 --- a/src/event/sys/unix/waker.rs +++ b/src/event/sys/unix/waker.rs @@ -1,29 +1,31 @@ -use std::sync::{Arc, Mutex}; - -use mio::{Registry, Token}; +use std::{ + io::Write, + os::unix::net::UnixStream, + sync::{Arc, Mutex}, +}; use crate::Result; -/// Allows to wake up the `mio::Poll::poll()` method. -/// This type wraps `mio::Waker`, for more information see its documentation. +/// Allows to wake up the EventSource::try_read() method. #[derive(Clone, Debug)] pub(crate) struct Waker { - inner: Arc>, + inner: Arc>, } impl Waker { /// Create a new `Waker`. - pub(crate) fn new(registry: &Registry, waker_token: Token) -> Result { - Ok(Self { - inner: Arc::new(Mutex::new(mio::Waker::new(registry, waker_token)?)), - }) + pub(crate) fn new(writer: UnixStream) -> Self { + Self { + inner: Arc::new(Mutex::new(writer)), + } } /// Wake up the [`Poll`] associated with this `Waker`. /// /// Readiness is set to `Ready::readable()`. pub(crate) fn wake(&self) -> Result<()> { - self.inner.lock().unwrap().wake() + self.inner.lock().unwrap().write(&[0])?; + Ok(()) } /// Resets the state so the same waker can be reused. From a1373d7e8f9aa66e3105063476b0cc83609f1698 Mon Sep 17 00:00:00 2001 From: Jonathan Goren Date: Thu, 5 Jan 2023 13:39:00 +0200 Subject: [PATCH 2/4] fix incorrect check of poll output --- src/event/source/unix.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/event/source/unix.rs b/src/event/source/unix.rs index 5f5a26cc2..01f63f0c3 100644 --- a/src/event/source/unix.rs +++ b/src/event/source/unix.rs @@ -136,7 +136,7 @@ impl EventSource for UnixInternalEventSource { Err(filedescriptor::Error::Io(e)) => return Err(e), res => res.expect("polling tty"), }; - if fds[0].events & POLLIN != 0 { + if fds[0].revents & POLLIN != 0 { loop { let read_count = read_complete(&self.tty, &mut self.tty_buffer)?; if read_count > 0 { @@ -155,7 +155,7 @@ impl EventSource for UnixInternalEventSource { } } } - if fds[1].events & POLLIN != 0 { + if fds[1].revents & POLLIN != 0 { let fd = FileDesc::new(self.winch_signal_receiver.as_raw_fd(), false); // drain the pipe while read_complete(&fd, &mut [0; 1024])? != 0 {} @@ -173,7 +173,7 @@ impl EventSource for UnixInternalEventSource { } #[cfg(feature = "event-stream")] - if fds[2].events & POLLIN != 0 { + if fds[2].revents & POLLIN != 0 { let fd = FileDesc::new(self.wake_pipe.receiver.as_raw_fd(), false); // drain the pipe while read_complete(&fd, &mut [0; 1024])? != 0 {} From be8032343090498ecc9cbe544f44614d4a1eab87 Mon Sep 17 00:00:00 2001 From: Jonathan Goren Date: Mon, 9 Jan 2023 11:18:48 +0200 Subject: [PATCH 3/4] add use-dev-tty feature flag --- Cargo.toml | 5 +- src/event/source/unix.rs | 269 +------------------------------- src/event/source/unix/mio.rs | 241 ++++++++++++++++++++++++++++ src/event/source/unix/tty.rs | 264 +++++++++++++++++++++++++++++++ src/event/sys/unix/waker.rs | 43 +---- src/event/sys/unix/waker/mio.rs | 36 +++++ src/event/sys/unix/waker/tty.rs | 38 +++++ 7 files changed, 599 insertions(+), 297 deletions(-) create mode 100644 src/event/source/unix/mio.rs create mode 100644 src/event/source/unix/tty.rs create mode 100644 src/event/sys/unix/waker/mio.rs create mode 100644 src/event/sys/unix/waker/tty.rs diff --git a/Cargo.toml b/Cargo.toml index cc3b12d9b..7ec03d44f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ all-features = true default = ["bracketed-paste"] bracketed-paste = [] event-stream = ["futures-core"] +use-dev-tty = ["filedescriptor"] # # Shared dependencies @@ -57,7 +58,9 @@ crossterm_winapi = "0.9" [target.'cfg(unix)'.dependencies] libc = "0.2" signal-hook = { version = "0.3.13" } -filedescriptor = "0.8" +filedescriptor = { version = "0.8", optional = true } +mio = { version = "0.8", features = ["os-poll"] } +signal-hook-mio = { version = "0.2.3", features = ["support-v0_8"] } # # Dev dependencies (examples, ...) diff --git a/src/event/source/unix.rs b/src/event/source/unix.rs index 01f63f0c3..b866d7d93 100644 --- a/src/event/source/unix.rs +++ b/src/event/source/unix.rs @@ -1,264 +1,11 @@ -use std::os::unix::prelude::AsRawFd; -use std::{collections::VecDeque, io, os::unix::net::UnixStream, time::Duration}; +#[cfg(feature = "use-dev-tty")] +pub(crate) mod tty; -use signal_hook::low_level::pipe; +#[cfg(not(feature = "use-dev-tty"))] +pub(crate) mod mio; -use crate::event::timeout::PollTimeout; -use crate::event::Event; -use crate::Result; +#[cfg(feature = "use-dev-tty")] +pub(crate) use self::tty::UnixInternalEventSource; -use filedescriptor::{poll, pollfd, POLLIN}; - -#[cfg(feature = "event-stream")] -use super::super::sys::Waker; - -use super::{ - super::{ - sys::unix::{ - file_descriptor::{tty_fd, FileDesc}, - parse::parse_event, - }, - InternalEvent, - }, - EventSource, -}; - -/// Holds a prototypical Waker and a receiver we can wait on when doing select(). -#[cfg(feature = "event-stream")] -struct WakePipe { - receiver: UnixStream, - waker: Waker, -} - -#[cfg(feature = "event-stream")] -impl WakePipe { - fn new() -> Result { - let (receiver, sender) = nonblocking_unix_pair()?; - Ok(WakePipe { - receiver, - waker: Waker::new(sender), - }) - } -} - -// I (@zrzka) wasn't able to read more than 1_022 bytes when testing -// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes -// is enough. -const TTY_BUFFER_SIZE: usize = 1_024; - -pub(crate) struct UnixInternalEventSource { - parser: Parser, - tty_buffer: [u8; TTY_BUFFER_SIZE], - tty: FileDesc, - winch_signal_receiver: UnixStream, - #[cfg(feature = "event-stream")] - wake_pipe: WakePipe, -} - -fn nonblocking_unix_pair() -> Result<(UnixStream, UnixStream)> { - let (receiver, sender) = UnixStream::pair()?; - receiver.set_nonblocking(true)?; - sender.set_nonblocking(true)?; - Ok((receiver, sender)) -} - -impl UnixInternalEventSource { - pub fn new() -> Result { - UnixInternalEventSource::from_file_descriptor(tty_fd()?) - } - - pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result { - Ok(UnixInternalEventSource { - parser: Parser::default(), - tty_buffer: [0u8; TTY_BUFFER_SIZE], - tty: input_fd, - winch_signal_receiver: { - let (receiver, sender) = nonblocking_unix_pair()?; - // Unregistering is unnecessary because EventSource is a singleton - pipe::register(libc::SIGWINCH, sender)?; - receiver - }, - #[cfg(feature = "event-stream")] - wake_pipe: WakePipe::new()?, - }) - } -} - -/// read_complete reads from a non-blocking file descriptor -/// until the buffer is full or it would block. -/// -/// Similar to `std::io::Read::read_to_end`, except this function -/// only fills the given buffer and does not read beyond that. -fn read_complete(fd: &FileDesc, buf: &mut [u8]) -> Result { - loop { - match fd.read(buf, buf.len()) { - Ok(x) => return Ok(x), - Err(e) => match e.kind() { - io::ErrorKind::WouldBlock => return Ok(0), - io::ErrorKind::Interrupted => continue, - _ => return Err(e), - }, - } - } -} - -impl EventSource for UnixInternalEventSource { - fn try_read(&mut self, timeout: Option) -> Result> { - let timeout = PollTimeout::new(timeout); - - fn make_pollfd(fd: &F) -> pollfd { - pollfd { - fd: fd.as_raw_fd(), - events: POLLIN, - revents: 0, - } - } - - #[cfg(not(feature = "event-stream"))] - let mut fds = [ - make_pollfd(&self.tty), - make_pollfd(&self.winch_signal_receiver), - ]; - - #[cfg(feature = "event-stream")] - let mut fds = [ - make_pollfd(&self.tty), - make_pollfd(&self.winch_signal_receiver), - make_pollfd(&self.wake_pipe.receiver), - ]; - - while timeout.leftover().map_or(true, |t| !t.is_zero()) { - // check if there are buffered events from the last read - if let Some(event) = self.parser.next() { - return Ok(Some(event)); - } - match poll(&mut fds, timeout.leftover()) { - Err(filedescriptor::Error::Io(e)) => return Err(e), - res => res.expect("polling tty"), - }; - if fds[0].revents & POLLIN != 0 { - loop { - let read_count = read_complete(&self.tty, &mut self.tty_buffer)?; - if read_count > 0 { - self.parser.advance( - &self.tty_buffer[..read_count], - read_count == TTY_BUFFER_SIZE, - ); - } - - if let Some(event) = self.parser.next() { - return Ok(Some(event)); - } - - if read_count == 0 { - break; - } - } - } - if fds[1].revents & POLLIN != 0 { - let fd = FileDesc::new(self.winch_signal_receiver.as_raw_fd(), false); - // drain the pipe - while read_complete(&fd, &mut [0; 1024])? != 0 {} - // TODO Should we remove tput? - // - // This can take a really long time, because terminal::size can - // launch new process (tput) and then it parses its output. It's - // not a really long time from the absolute time point of view, but - // it's a really long time from the mio, async-std/tokio executor, ... - // point of view. - let new_size = crate::terminal::size()?; - return Ok(Some(InternalEvent::Event(Event::Resize( - new_size.0, new_size.1, - )))); - } - - #[cfg(feature = "event-stream")] - if fds[2].revents & POLLIN != 0 { - let fd = FileDesc::new(self.wake_pipe.receiver.as_raw_fd(), false); - // drain the pipe - while read_complete(&fd, &mut [0; 1024])? != 0 {} - - return Err(std::io::Error::new( - std::io::ErrorKind::Interrupted, - "Poll operation was woken up by `Waker::wake`", - )); - } - } - Ok(None) - } - - #[cfg(feature = "event-stream")] - fn waker(&self) -> Waker { - self.wake_pipe.waker.clone() - } -} - -// -// Following `Parser` structure exists for two reasons: -// -// * mimic anes Parser interface -// * move the advancing, parsing, ... stuff out of the `try_read` method -// -#[derive(Debug)] -struct Parser { - buffer: Vec, - internal_events: VecDeque, -} - -impl Default for Parser { - fn default() -> Self { - Parser { - // This buffer is used for -> 1 <- ANSI escape sequence. Are we - // aware of any ANSI escape sequence that is bigger? Can we make - // it smaller? - // - // Probably not worth spending more time on this as "there's a plan" - // to use the anes crate parser. - buffer: Vec::with_capacity(256), - // TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can - // fit? What is an average sequence length? Let's guess here - // and say that the average ANSI escape sequence length is 8 bytes. Thus - // the buffer size should be 1024/8=128 to avoid additional allocations - // when processing large amounts of data. - // - // There's no need to make it bigger, because when you look at the `try_read` - // method implementation, all events are consumed before the next TTY_BUFFER - // is processed -> events pushed. - internal_events: VecDeque::with_capacity(128), - } - } -} - -impl Parser { - fn advance(&mut self, buffer: &[u8], more: bool) { - for (idx, byte) in buffer.iter().enumerate() { - let more = idx + 1 < buffer.len() || more; - - self.buffer.push(*byte); - - match parse_event(&self.buffer, more) { - Ok(Some(ie)) => { - self.internal_events.push_back(ie); - self.buffer.clear(); - } - Ok(None) => { - // Event can't be parsed, because we don't have enough bytes for - // the current sequence. Keep the buffer and process next bytes. - } - Err(_) => { - // Event can't be parsed (not enough parameters, parameter is not a number, ...). - // Clear the buffer and continue with another sequence. - self.buffer.clear(); - } - } - } - } -} - -impl Iterator for Parser { - type Item = InternalEvent; - - fn next(&mut self) -> Option { - self.internal_events.pop_front() - } -} +#[cfg(not(feature = "use-dev-tty"))] +pub(crate) use self::mio::UnixInternalEventSource; \ No newline at end of file diff --git a/src/event/source/unix/mio.rs b/src/event/source/unix/mio.rs new file mode 100644 index 000000000..622e9f35a --- /dev/null +++ b/src/event/source/unix/mio.rs @@ -0,0 +1,241 @@ +use std::{collections::VecDeque, io, time::Duration}; + +use mio::{unix::SourceFd, Events, Interest, Poll, Token}; +use signal_hook_mio::v0_8::Signals; + +use crate::Result; + +#[cfg(feature = "event-stream")] +use crate::event::sys::Waker; +use crate::event::{ + source::EventSource, + sys::unix::{ + file_descriptor::{tty_fd, FileDesc}, + parse::parse_event, + }, + timeout::PollTimeout, + Event, InternalEvent, +}; + +// Tokens to identify file descriptor +const TTY_TOKEN: Token = Token(0); +const SIGNAL_TOKEN: Token = Token(1); +#[cfg(feature = "event-stream")] +const WAKE_TOKEN: Token = Token(2); + +// I (@zrzka) wasn't able to read more than 1_022 bytes when testing +// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes +// is enough. +const TTY_BUFFER_SIZE: usize = 1_024; + +pub(crate) struct UnixInternalEventSource { + poll: Poll, + events: Events, + parser: Parser, + tty_buffer: [u8; TTY_BUFFER_SIZE], + tty_fd: FileDesc, + signals: Signals, + #[cfg(feature = "event-stream")] + waker: Waker, +} + +impl UnixInternalEventSource { + pub fn new() -> Result { + UnixInternalEventSource::from_file_descriptor(tty_fd()?) + } + + pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result { + let poll = Poll::new()?; + let registry = poll.registry(); + + let tty_raw_fd = input_fd.raw_fd(); + let mut tty_ev = SourceFd(&tty_raw_fd); + registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?; + + let mut signals = Signals::new(&[signal_hook::consts::SIGWINCH])?; + registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?; + + #[cfg(feature = "event-stream")] + let waker = Waker::new(registry, WAKE_TOKEN)?; + + Ok(UnixInternalEventSource { + poll, + events: Events::with_capacity(3), + parser: Parser::default(), + tty_buffer: [0u8; TTY_BUFFER_SIZE], + tty_fd: input_fd, + signals, + #[cfg(feature = "event-stream")] + waker, + }) + } +} + +impl EventSource for UnixInternalEventSource { + fn try_read(&mut self, timeout: Option) -> Result> { + if let Some(event) = self.parser.next() { + return Ok(Some(event)); + } + + let timeout = PollTimeout::new(timeout); + + loop { + if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) { + // Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds. + // Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned). + // https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes + if e.kind() == io::ErrorKind::Interrupted { + continue; + } else { + return Err(e); + } + }; + + if self.events.is_empty() { + // No readiness events = timeout + return Ok(None); + } + + for token in self.events.iter().map(|x| x.token()) { + match token { + TTY_TOKEN => { + loop { + match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) { + Ok(read_count) => { + if read_count > 0 { + self.parser.advance( + &self.tty_buffer[..read_count], + read_count == TTY_BUFFER_SIZE, + ); + } + } + Err(e) => { + // No more data to read at the moment. We will receive another event + if e.kind() == io::ErrorKind::WouldBlock { + break; + } + // once more data is available to read. + else if e.kind() == io::ErrorKind::Interrupted { + continue; + } + } + }; + + if let Some(event) = self.parser.next() { + return Ok(Some(event)); + } + } + } + SIGNAL_TOKEN => { + for signal in self.signals.pending() { + match signal { + signal_hook::consts::SIGWINCH => { + // TODO Should we remove tput? + // + // This can take a really long time, because terminal::size can + // launch new process (tput) and then it parses its output. It's + // not a really long time from the absolute time point of view, but + // it's a really long time from the mio, async-std/tokio executor, ... + // point of view. + let new_size = crate::terminal::size()?; + return Ok(Some(InternalEvent::Event(Event::Resize( + new_size.0, new_size.1, + )))); + } + _ => unreachable!("Synchronize signal registration & handling"), + }; + } + } + #[cfg(feature = "event-stream")] + WAKE_TOKEN => { + return Err(std::io::Error::new( + std::io::ErrorKind::Interrupted, + "Poll operation was woken up by `Waker::wake`", + )); + } + _ => unreachable!("Synchronize Evented handle registration & token handling"), + } + } + + // Processing above can take some time, check if timeout expired + if timeout.elapsed() { + return Ok(None); + } + } + } + + #[cfg(feature = "event-stream")] + fn waker(&self) -> Waker { + self.waker.clone() + } +} + +// +// Following `Parser` structure exists for two reasons: +// +// * mimic anes Parser interface +// * move the advancing, parsing, ... stuff out of the `try_read` method +// +#[derive(Debug)] +struct Parser { + buffer: Vec, + internal_events: VecDeque, +} + +impl Default for Parser { + fn default() -> Self { + Parser { + // This buffer is used for -> 1 <- ANSI escape sequence. Are we + // aware of any ANSI escape sequence that is bigger? Can we make + // it smaller? + // + // Probably not worth spending more time on this as "there's a plan" + // to use the anes crate parser. + buffer: Vec::with_capacity(256), + // TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can + // fit? What is an average sequence length? Let's guess here + // and say that the average ANSI escape sequence length is 8 bytes. Thus + // the buffer size should be 1024/8=128 to avoid additional allocations + // when processing large amounts of data. + // + // There's no need to make it bigger, because when you look at the `try_read` + // method implementation, all events are consumed before the next TTY_BUFFER + // is processed -> events pushed. + internal_events: VecDeque::with_capacity(128), + } + } +} + +impl Parser { + fn advance(&mut self, buffer: &[u8], more: bool) { + for (idx, byte) in buffer.iter().enumerate() { + let more = idx + 1 < buffer.len() || more; + + self.buffer.push(*byte); + + match parse_event(&self.buffer, more) { + Ok(Some(ie)) => { + self.internal_events.push_back(ie); + self.buffer.clear(); + } + Ok(None) => { + // Event can't be parsed, because we don't have enough bytes for + // the current sequence. Keep the buffer and process next bytes. + } + Err(_) => { + // Event can't be parsed (not enough parameters, parameter is not a number, ...). + // Clear the buffer and continue with another sequence. + self.buffer.clear(); + } + } + } + } +} + +impl Iterator for Parser { + type Item = InternalEvent; + + fn next(&mut self) -> Option { + self.internal_events.pop_front() + } +} diff --git a/src/event/source/unix/tty.rs b/src/event/source/unix/tty.rs new file mode 100644 index 000000000..a4ab1f24f --- /dev/null +++ b/src/event/source/unix/tty.rs @@ -0,0 +1,264 @@ +use std::os::unix::prelude::AsRawFd; +use std::{collections::VecDeque, io, os::unix::net::UnixStream, time::Duration}; + +use signal_hook::low_level::pipe; + +use crate::event::timeout::PollTimeout; +use crate::event::Event; +use crate::Result; + +use filedescriptor::{poll, pollfd, POLLIN}; + +#[cfg(feature = "event-stream")] +use crate::event::sys::Waker; + +use crate::{ + event::{ + sys::unix::{ + file_descriptor::{tty_fd, FileDesc}, + parse::parse_event, + }, + InternalEvent, + source::EventSource, + }, +}; + +/// Holds a prototypical Waker and a receiver we can wait on when doing select(). +#[cfg(feature = "event-stream")] +struct WakePipe { + receiver: UnixStream, + waker: Waker, +} + +#[cfg(feature = "event-stream")] +impl WakePipe { + fn new() -> Result { + let (receiver, sender) = nonblocking_unix_pair()?; + Ok(WakePipe { + receiver, + waker: Waker::new(sender), + }) + } +} + +// I (@zrzka) wasn't able to read more than 1_022 bytes when testing +// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes +// is enough. +const TTY_BUFFER_SIZE: usize = 1_024; + +pub(crate) struct UnixInternalEventSource { + parser: Parser, + tty_buffer: [u8; TTY_BUFFER_SIZE], + tty: FileDesc, + winch_signal_receiver: UnixStream, + #[cfg(feature = "event-stream")] + wake_pipe: WakePipe, +} + +fn nonblocking_unix_pair() -> Result<(UnixStream, UnixStream)> { + let (receiver, sender) = UnixStream::pair()?; + receiver.set_nonblocking(true)?; + sender.set_nonblocking(true)?; + Ok((receiver, sender)) +} + +impl UnixInternalEventSource { + pub fn new() -> Result { + UnixInternalEventSource::from_file_descriptor(tty_fd()?) + } + + pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result { + Ok(UnixInternalEventSource { + parser: Parser::default(), + tty_buffer: [0u8; TTY_BUFFER_SIZE], + tty: input_fd, + winch_signal_receiver: { + let (receiver, sender) = nonblocking_unix_pair()?; + // Unregistering is unnecessary because EventSource is a singleton + pipe::register(libc::SIGWINCH, sender)?; + receiver + }, + #[cfg(feature = "event-stream")] + wake_pipe: WakePipe::new()?, + }) + } +} + +/// read_complete reads from a non-blocking file descriptor +/// until the buffer is full or it would block. +/// +/// Similar to `std::io::Read::read_to_end`, except this function +/// only fills the given buffer and does not read beyond that. +fn read_complete(fd: &FileDesc, buf: &mut [u8]) -> Result { + loop { + match fd.read(buf, buf.len()) { + Ok(x) => return Ok(x), + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => return Ok(0), + io::ErrorKind::Interrupted => continue, + _ => return Err(e), + }, + } + } +} + +impl EventSource for UnixInternalEventSource { + fn try_read(&mut self, timeout: Option) -> Result> { + let timeout = PollTimeout::new(timeout); + + fn make_pollfd(fd: &F) -> pollfd { + pollfd { + fd: fd.as_raw_fd(), + events: POLLIN, + revents: 0, + } + } + + #[cfg(not(feature = "event-stream"))] + let mut fds = [ + make_pollfd(&self.tty), + make_pollfd(&self.winch_signal_receiver), + ]; + + #[cfg(feature = "event-stream")] + let mut fds = [ + make_pollfd(&self.tty), + make_pollfd(&self.winch_signal_receiver), + make_pollfd(&self.wake_pipe.receiver), + ]; + + while timeout.leftover().map_or(true, |t| !t.is_zero()) { + // check if there are buffered events from the last read + if let Some(event) = self.parser.next() { + return Ok(Some(event)); + } + match poll(&mut fds, timeout.leftover()) { + Err(filedescriptor::Error::Io(e)) => return Err(e), + res => res.expect("polling tty"), + }; + if fds[0].revents & POLLIN != 0 { + loop { + let read_count = read_complete(&self.tty, &mut self.tty_buffer)?; + if read_count > 0 { + self.parser.advance( + &self.tty_buffer[..read_count], + read_count == TTY_BUFFER_SIZE, + ); + } + + if let Some(event) = self.parser.next() { + return Ok(Some(event)); + } + + if read_count == 0 { + break; + } + } + } + if fds[1].revents & POLLIN != 0 { + let fd = FileDesc::new(self.winch_signal_receiver.as_raw_fd(), false); + // drain the pipe + while read_complete(&fd, &mut [0; 1024])? != 0 {} + // TODO Should we remove tput? + // + // This can take a really long time, because terminal::size can + // launch new process (tput) and then it parses its output. It's + // not a really long time from the absolute time point of view, but + // it's a really long time from the mio, async-std/tokio executor, ... + // point of view. + let new_size = crate::terminal::size()?; + return Ok(Some(InternalEvent::Event(Event::Resize( + new_size.0, new_size.1, + )))); + } + + #[cfg(feature = "event-stream")] + if fds[2].revents & POLLIN != 0 { + let fd = FileDesc::new(self.wake_pipe.receiver.as_raw_fd(), false); + // drain the pipe + while read_complete(&fd, &mut [0; 1024])? != 0 {} + + return Err(std::io::Error::new( + std::io::ErrorKind::Interrupted, + "Poll operation was woken up by `Waker::wake`", + )); + } + } + Ok(None) + } + + #[cfg(feature = "event-stream")] + fn waker(&self) -> Waker { + self.wake_pipe.waker.clone() + } +} + +// +// Following `Parser` structure exists for two reasons: +// +// * mimic anes Parser interface +// * move the advancing, parsing, ... stuff out of the `try_read` method +// +#[derive(Debug)] +struct Parser { + buffer: Vec, + internal_events: VecDeque, +} + +impl Default for Parser { + fn default() -> Self { + Parser { + // This buffer is used for -> 1 <- ANSI escape sequence. Are we + // aware of any ANSI escape sequence that is bigger? Can we make + // it smaller? + // + // Probably not worth spending more time on this as "there's a plan" + // to use the anes crate parser. + buffer: Vec::with_capacity(256), + // TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can + // fit? What is an average sequence length? Let's guess here + // and say that the average ANSI escape sequence length is 8 bytes. Thus + // the buffer size should be 1024/8=128 to avoid additional allocations + // when processing large amounts of data. + // + // There's no need to make it bigger, because when you look at the `try_read` + // method implementation, all events are consumed before the next TTY_BUFFER + // is processed -> events pushed. + internal_events: VecDeque::with_capacity(128), + } + } +} + +impl Parser { + fn advance(&mut self, buffer: &[u8], more: bool) { + for (idx, byte) in buffer.iter().enumerate() { + let more = idx + 1 < buffer.len() || more; + + self.buffer.push(*byte); + + match parse_event(&self.buffer, more) { + Ok(Some(ie)) => { + self.internal_events.push_back(ie); + self.buffer.clear(); + } + Ok(None) => { + // Event can't be parsed, because we don't have enough bytes for + // the current sequence. Keep the buffer and process next bytes. + } + Err(_) => { + // Event can't be parsed (not enough parameters, parameter is not a number, ...). + // Clear the buffer and continue with another sequence. + self.buffer.clear(); + } + } + } + } +} + +impl Iterator for Parser { + type Item = InternalEvent; + + fn next(&mut self) -> Option { + self.internal_events.pop_front() + } +} diff --git a/src/event/sys/unix/waker.rs b/src/event/sys/unix/waker.rs index 47868f129..8ac63676c 100644 --- a/src/event/sys/unix/waker.rs +++ b/src/event/sys/unix/waker.rs @@ -1,38 +1,11 @@ -use std::{ - io::Write, - os::unix::net::UnixStream, - sync::{Arc, Mutex}, -}; +#[cfg(feature = "use-dev-tty")] +pub(crate) mod tty; -use crate::Result; +#[cfg(not(feature = "use-dev-tty"))] +pub(crate) mod mio; -/// Allows to wake up the EventSource::try_read() method. -#[derive(Clone, Debug)] -pub(crate) struct Waker { - inner: Arc>, -} +#[cfg(feature = "use-dev-tty")] +pub(crate) use self::tty::Waker; -impl Waker { - /// Create a new `Waker`. - pub(crate) fn new(writer: UnixStream) -> Self { - Self { - inner: Arc::new(Mutex::new(writer)), - } - } - - /// Wake up the [`Poll`] associated with this `Waker`. - /// - /// Readiness is set to `Ready::readable()`. - pub(crate) fn wake(&self) -> Result<()> { - self.inner.lock().unwrap().write(&[0])?; - Ok(()) - } - - /// Resets the state so the same waker can be reused. - /// - /// This function is not impl - #[allow(dead_code, clippy::clippy::unnecessary_wraps)] - pub(crate) fn reset(&self) -> Result<()> { - Ok(()) - } -} +#[cfg(not(feature = "use-dev-tty"))] +pub(crate) use self::mio::Waker; \ No newline at end of file diff --git a/src/event/sys/unix/waker/mio.rs b/src/event/sys/unix/waker/mio.rs new file mode 100644 index 000000000..891f006ac --- /dev/null +++ b/src/event/sys/unix/waker/mio.rs @@ -0,0 +1,36 @@ +use std::sync::{Arc, Mutex}; + +use mio::{Registry, Token}; + +use crate::Result; + +/// Allows to wake up the `mio::Poll::poll()` method. +/// This type wraps `mio::Waker`, for more information see its documentation. +#[derive(Clone, Debug)] +pub(crate) struct Waker { + inner: Arc>, +} + +impl Waker { + /// Create a new `Waker`. + pub(crate) fn new(registry: &Registry, waker_token: Token) -> Result { + Ok(Self { + inner: Arc::new(Mutex::new(mio::Waker::new(registry, waker_token)?)), + }) + } + + /// Wake up the [`Poll`] associated with this `Waker`. + /// + /// Readiness is set to `Ready::readable()`. + pub(crate) fn wake(&self) -> Result<()> { + self.inner.lock().unwrap().wake() + } + + /// Resets the state so the same waker can be reused. + /// + /// This function is not impl + #[allow(dead_code, clippy::clippy::unnecessary_wraps)] + pub(crate) fn reset(&self) -> Result<()> { + Ok(()) + } +} \ No newline at end of file diff --git a/src/event/sys/unix/waker/tty.rs b/src/event/sys/unix/waker/tty.rs new file mode 100644 index 000000000..47868f129 --- /dev/null +++ b/src/event/sys/unix/waker/tty.rs @@ -0,0 +1,38 @@ +use std::{ + io::Write, + os::unix::net::UnixStream, + sync::{Arc, Mutex}, +}; + +use crate::Result; + +/// Allows to wake up the EventSource::try_read() method. +#[derive(Clone, Debug)] +pub(crate) struct Waker { + inner: Arc>, +} + +impl Waker { + /// Create a new `Waker`. + pub(crate) fn new(writer: UnixStream) -> Self { + Self { + inner: Arc::new(Mutex::new(writer)), + } + } + + /// Wake up the [`Poll`] associated with this `Waker`. + /// + /// Readiness is set to `Ready::readable()`. + pub(crate) fn wake(&self) -> Result<()> { + self.inner.lock().unwrap().write(&[0])?; + Ok(()) + } + + /// Resets the state so the same waker can be reused. + /// + /// This function is not impl + #[allow(dead_code, clippy::clippy::unnecessary_wraps)] + pub(crate) fn reset(&self) -> Result<()> { + Ok(()) + } +} From 6b390dce280572f9fc778f00831d9be74db310cc Mon Sep 17 00:00:00 2001 From: Jonathan Goren Date: Tue, 10 Jan 2023 11:23:59 +0200 Subject: [PATCH 4/4] fix clippy warnings --- src/event/source/unix/mio.rs | 2 +- src/event/sys/unix/file_descriptor.rs | 2 +- src/event/sys/unix/parse.rs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/event/source/unix/mio.rs b/src/event/source/unix/mio.rs index 622e9f35a..67bda3735 100644 --- a/src/event/source/unix/mio.rs +++ b/src/event/source/unix/mio.rs @@ -52,7 +52,7 @@ impl UnixInternalEventSource { let mut tty_ev = SourceFd(&tty_raw_fd); registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?; - let mut signals = Signals::new(&[signal_hook::consts::SIGWINCH])?; + let mut signals = Signals::new([signal_hook::consts::SIGWINCH])?; registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?; #[cfg(feature = "event-stream")] diff --git a/src/event/sys/unix/file_descriptor.rs b/src/event/sys/unix/file_descriptor.rs index 025845e86..268d57ebf 100644 --- a/src/event/sys/unix/file_descriptor.rs +++ b/src/event/sys/unix/file_descriptor.rs @@ -37,7 +37,7 @@ impl FileDesc { self.fd, buffer.as_mut_ptr() as *mut libc::c_void, size as size_t, - ) as isize + ) }; if result < 0 { diff --git a/src/event/sys/unix/parse.rs b/src/event/sys/unix/parse.rs index 0cfbd1a5e..5d0aa1716 100644 --- a/src/event/sys/unix/parse.rs +++ b/src/event/sys/unix/parse.rs @@ -104,11 +104,11 @@ pub(crate) fn parse_event(buffer: &[u8], input_available: bool) -> Result