diff --git a/Cargo.toml b/Cargo.toml index 56558baef..7ec03d44f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ all-features = true default = ["bracketed-paste"] bracketed-paste = [] event-stream = ["futures-core"] +use-dev-tty = ["filedescriptor"] # # Shared dependencies @@ -56,8 +57,9 @@ crossterm_winapi = "0.9" # [target.'cfg(unix)'.dependencies] libc = "0.2" -mio = { version = "0.8", features = ["os-poll"] } signal-hook = { version = "0.3.13" } +filedescriptor = { version = "0.8", optional = true } +mio = { version = "0.8", features = ["os-poll"] } signal-hook-mio = { version = "0.2.3", features = ["support-v0_8"] } # diff --git a/src/event/source/unix.rs b/src/event/source/unix.rs index 8a44d5a5d..b866d7d93 100644 --- a/src/event/source/unix.rs +++ b/src/event/source/unix.rs @@ -1,241 +1,11 @@ -use std::{collections::VecDeque, io, time::Duration}; +#[cfg(feature = "use-dev-tty")] +pub(crate) mod tty; -use mio::{unix::SourceFd, Events, Interest, Poll, Token}; -use signal_hook_mio::v0_8::Signals; +#[cfg(not(feature = "use-dev-tty"))] +pub(crate) mod mio; -use crate::Result; +#[cfg(feature = "use-dev-tty")] +pub(crate) use self::tty::UnixInternalEventSource; -#[cfg(feature = "event-stream")] -use super::super::sys::Waker; -use super::super::{ - source::EventSource, - sys::unix::{ - file_descriptor::{tty_fd, FileDesc}, - parse::parse_event, - }, - timeout::PollTimeout, - Event, InternalEvent, -}; - -// Tokens to identify file descriptor -const TTY_TOKEN: Token = Token(0); -const SIGNAL_TOKEN: Token = Token(1); -#[cfg(feature = "event-stream")] -const WAKE_TOKEN: Token = Token(2); - -// I (@zrzka) wasn't able to read more than 1_022 bytes when testing -// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes -// is enough. -const TTY_BUFFER_SIZE: usize = 1_024; - -pub(crate) struct UnixInternalEventSource { - poll: Poll, - events: Events, - parser: Parser, - tty_buffer: [u8; TTY_BUFFER_SIZE], - tty_fd: FileDesc, - signals: Signals, - #[cfg(feature = "event-stream")] - waker: Waker, -} - -impl UnixInternalEventSource { - pub fn new() -> Result { - UnixInternalEventSource::from_file_descriptor(tty_fd()?) - } - - pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result { - let poll = Poll::new()?; - let registry = poll.registry(); - - let tty_raw_fd = input_fd.raw_fd(); - let mut tty_ev = SourceFd(&tty_raw_fd); - registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?; - - let mut signals = Signals::new(&[signal_hook::consts::SIGWINCH])?; - registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?; - - #[cfg(feature = "event-stream")] - let waker = Waker::new(registry, WAKE_TOKEN)?; - - Ok(UnixInternalEventSource { - poll, - events: Events::with_capacity(3), - parser: Parser::default(), - tty_buffer: [0u8; TTY_BUFFER_SIZE], - tty_fd: input_fd, - signals, - #[cfg(feature = "event-stream")] - waker, - }) - } -} - -impl EventSource for UnixInternalEventSource { - fn try_read(&mut self, timeout: Option) -> Result> { - if let Some(event) = self.parser.next() { - return Ok(Some(event)); - } - - let timeout = PollTimeout::new(timeout); - - loop { - if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) { - // Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds. - // Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned). - // https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes - if e.kind() == io::ErrorKind::Interrupted { - continue; - } else { - return Err(e); - } - }; - - if self.events.is_empty() { - // No readiness events = timeout - return Ok(None); - } - - for token in self.events.iter().map(|x| x.token()) { - match token { - TTY_TOKEN => { - loop { - match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) { - Ok(read_count) => { - if read_count > 0 { - self.parser.advance( - &self.tty_buffer[..read_count], - read_count == TTY_BUFFER_SIZE, - ); - } - } - Err(e) => { - // No more data to read at the moment. We will receive another event - if e.kind() == io::ErrorKind::WouldBlock { - break; - } - // once more data is available to read. - else if e.kind() == io::ErrorKind::Interrupted { - continue; - } - } - }; - - if let Some(event) = self.parser.next() { - return Ok(Some(event)); - } - } - } - SIGNAL_TOKEN => { - for signal in self.signals.pending() { - match signal { - signal_hook::consts::SIGWINCH => { - // TODO Should we remove tput? - // - // This can take a really long time, because terminal::size can - // launch new process (tput) and then it parses its output. It's - // not a really long time from the absolute time point of view, but - // it's a really long time from the mio, async-std/tokio executor, ... - // point of view. - let new_size = crate::terminal::size()?; - return Ok(Some(InternalEvent::Event(Event::Resize( - new_size.0, new_size.1, - )))); - } - _ => unreachable!("Synchronize signal registration & handling"), - }; - } - } - #[cfg(feature = "event-stream")] - WAKE_TOKEN => { - return Err(std::io::Error::new( - std::io::ErrorKind::Interrupted, - "Poll operation was woken up by `Waker::wake`", - )); - } - _ => unreachable!("Synchronize Evented handle registration & token handling"), - } - } - - // Processing above can take some time, check if timeout expired - if timeout.elapsed() { - return Ok(None); - } - } - } - - #[cfg(feature = "event-stream")] - fn waker(&self) -> Waker { - self.waker.clone() - } -} - -// -// Following `Parser` structure exists for two reasons: -// -// * mimic anes Parser interface -// * move the advancing, parsing, ... stuff out of the `try_read` method -// -#[derive(Debug)] -struct Parser { - buffer: Vec, - internal_events: VecDeque, -} - -impl Default for Parser { - fn default() -> Self { - Parser { - // This buffer is used for -> 1 <- ANSI escape sequence. Are we - // aware of any ANSI escape sequence that is bigger? Can we make - // it smaller? - // - // Probably not worth spending more time on this as "there's a plan" - // to use the anes crate parser. - buffer: Vec::with_capacity(256), - // TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can - // fit? What is an average sequence length? Let's guess here - // and say that the average ANSI escape sequence length is 8 bytes. Thus - // the buffer size should be 1024/8=128 to avoid additional allocations - // when processing large amounts of data. - // - // There's no need to make it bigger, because when you look at the `try_read` - // method implementation, all events are consumed before the next TTY_BUFFER - // is processed -> events pushed. - internal_events: VecDeque::with_capacity(128), - } - } -} - -impl Parser { - fn advance(&mut self, buffer: &[u8], more: bool) { - for (idx, byte) in buffer.iter().enumerate() { - let more = idx + 1 < buffer.len() || more; - - self.buffer.push(*byte); - - match parse_event(&self.buffer, more) { - Ok(Some(ie)) => { - self.internal_events.push_back(ie); - self.buffer.clear(); - } - Ok(None) => { - // Event can't be parsed, because we don't have enough bytes for - // the current sequence. Keep the buffer and process next bytes. - } - Err(_) => { - // Event can't be parsed (not enough parameters, parameter is not a number, ...). - // Clear the buffer and continue with another sequence. - self.buffer.clear(); - } - } - } - } -} - -impl Iterator for Parser { - type Item = InternalEvent; - - fn next(&mut self) -> Option { - self.internal_events.pop_front() - } -} +#[cfg(not(feature = "use-dev-tty"))] +pub(crate) use self::mio::UnixInternalEventSource; \ No newline at end of file diff --git a/src/event/source/unix/mio.rs b/src/event/source/unix/mio.rs new file mode 100644 index 000000000..67bda3735 --- /dev/null +++ b/src/event/source/unix/mio.rs @@ -0,0 +1,241 @@ +use std::{collections::VecDeque, io, time::Duration}; + +use mio::{unix::SourceFd, Events, Interest, Poll, Token}; +use signal_hook_mio::v0_8::Signals; + +use crate::Result; + +#[cfg(feature = "event-stream")] +use crate::event::sys::Waker; +use crate::event::{ + source::EventSource, + sys::unix::{ + file_descriptor::{tty_fd, FileDesc}, + parse::parse_event, + }, + timeout::PollTimeout, + Event, InternalEvent, +}; + +// Tokens to identify file descriptor +const TTY_TOKEN: Token = Token(0); +const SIGNAL_TOKEN: Token = Token(1); +#[cfg(feature = "event-stream")] +const WAKE_TOKEN: Token = Token(2); + +// I (@zrzka) wasn't able to read more than 1_022 bytes when testing +// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes +// is enough. +const TTY_BUFFER_SIZE: usize = 1_024; + +pub(crate) struct UnixInternalEventSource { + poll: Poll, + events: Events, + parser: Parser, + tty_buffer: [u8; TTY_BUFFER_SIZE], + tty_fd: FileDesc, + signals: Signals, + #[cfg(feature = "event-stream")] + waker: Waker, +} + +impl UnixInternalEventSource { + pub fn new() -> Result { + UnixInternalEventSource::from_file_descriptor(tty_fd()?) + } + + pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result { + let poll = Poll::new()?; + let registry = poll.registry(); + + let tty_raw_fd = input_fd.raw_fd(); + let mut tty_ev = SourceFd(&tty_raw_fd); + registry.register(&mut tty_ev, TTY_TOKEN, Interest::READABLE)?; + + let mut signals = Signals::new([signal_hook::consts::SIGWINCH])?; + registry.register(&mut signals, SIGNAL_TOKEN, Interest::READABLE)?; + + #[cfg(feature = "event-stream")] + let waker = Waker::new(registry, WAKE_TOKEN)?; + + Ok(UnixInternalEventSource { + poll, + events: Events::with_capacity(3), + parser: Parser::default(), + tty_buffer: [0u8; TTY_BUFFER_SIZE], + tty_fd: input_fd, + signals, + #[cfg(feature = "event-stream")] + waker, + }) + } +} + +impl EventSource for UnixInternalEventSource { + fn try_read(&mut self, timeout: Option) -> Result> { + if let Some(event) = self.parser.next() { + return Ok(Some(event)); + } + + let timeout = PollTimeout::new(timeout); + + loop { + if let Err(e) = self.poll.poll(&mut self.events, timeout.leftover()) { + // Mio will throw an interrupted error in case of cursor position retrieval. We need to retry until it succeeds. + // Previous versions of Mio (< 0.7) would automatically retry the poll call if it was interrupted (if EINTR was returned). + // https://docs.rs/mio/0.7.0/mio/struct.Poll.html#notes + if e.kind() == io::ErrorKind::Interrupted { + continue; + } else { + return Err(e); + } + }; + + if self.events.is_empty() { + // No readiness events = timeout + return Ok(None); + } + + for token in self.events.iter().map(|x| x.token()) { + match token { + TTY_TOKEN => { + loop { + match self.tty_fd.read(&mut self.tty_buffer, TTY_BUFFER_SIZE) { + Ok(read_count) => { + if read_count > 0 { + self.parser.advance( + &self.tty_buffer[..read_count], + read_count == TTY_BUFFER_SIZE, + ); + } + } + Err(e) => { + // No more data to read at the moment. We will receive another event + if e.kind() == io::ErrorKind::WouldBlock { + break; + } + // once more data is available to read. + else if e.kind() == io::ErrorKind::Interrupted { + continue; + } + } + }; + + if let Some(event) = self.parser.next() { + return Ok(Some(event)); + } + } + } + SIGNAL_TOKEN => { + for signal in self.signals.pending() { + match signal { + signal_hook::consts::SIGWINCH => { + // TODO Should we remove tput? + // + // This can take a really long time, because terminal::size can + // launch new process (tput) and then it parses its output. It's + // not a really long time from the absolute time point of view, but + // it's a really long time from the mio, async-std/tokio executor, ... + // point of view. + let new_size = crate::terminal::size()?; + return Ok(Some(InternalEvent::Event(Event::Resize( + new_size.0, new_size.1, + )))); + } + _ => unreachable!("Synchronize signal registration & handling"), + }; + } + } + #[cfg(feature = "event-stream")] + WAKE_TOKEN => { + return Err(std::io::Error::new( + std::io::ErrorKind::Interrupted, + "Poll operation was woken up by `Waker::wake`", + )); + } + _ => unreachable!("Synchronize Evented handle registration & token handling"), + } + } + + // Processing above can take some time, check if timeout expired + if timeout.elapsed() { + return Ok(None); + } + } + } + + #[cfg(feature = "event-stream")] + fn waker(&self) -> Waker { + self.waker.clone() + } +} + +// +// Following `Parser` structure exists for two reasons: +// +// * mimic anes Parser interface +// * move the advancing, parsing, ... stuff out of the `try_read` method +// +#[derive(Debug)] +struct Parser { + buffer: Vec, + internal_events: VecDeque, +} + +impl Default for Parser { + fn default() -> Self { + Parser { + // This buffer is used for -> 1 <- ANSI escape sequence. Are we + // aware of any ANSI escape sequence that is bigger? Can we make + // it smaller? + // + // Probably not worth spending more time on this as "there's a plan" + // to use the anes crate parser. + buffer: Vec::with_capacity(256), + // TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can + // fit? What is an average sequence length? Let's guess here + // and say that the average ANSI escape sequence length is 8 bytes. Thus + // the buffer size should be 1024/8=128 to avoid additional allocations + // when processing large amounts of data. + // + // There's no need to make it bigger, because when you look at the `try_read` + // method implementation, all events are consumed before the next TTY_BUFFER + // is processed -> events pushed. + internal_events: VecDeque::with_capacity(128), + } + } +} + +impl Parser { + fn advance(&mut self, buffer: &[u8], more: bool) { + for (idx, byte) in buffer.iter().enumerate() { + let more = idx + 1 < buffer.len() || more; + + self.buffer.push(*byte); + + match parse_event(&self.buffer, more) { + Ok(Some(ie)) => { + self.internal_events.push_back(ie); + self.buffer.clear(); + } + Ok(None) => { + // Event can't be parsed, because we don't have enough bytes for + // the current sequence. Keep the buffer and process next bytes. + } + Err(_) => { + // Event can't be parsed (not enough parameters, parameter is not a number, ...). + // Clear the buffer and continue with another sequence. + self.buffer.clear(); + } + } + } + } +} + +impl Iterator for Parser { + type Item = InternalEvent; + + fn next(&mut self) -> Option { + self.internal_events.pop_front() + } +} diff --git a/src/event/source/unix/tty.rs b/src/event/source/unix/tty.rs new file mode 100644 index 000000000..a4ab1f24f --- /dev/null +++ b/src/event/source/unix/tty.rs @@ -0,0 +1,264 @@ +use std::os::unix::prelude::AsRawFd; +use std::{collections::VecDeque, io, os::unix::net::UnixStream, time::Duration}; + +use signal_hook::low_level::pipe; + +use crate::event::timeout::PollTimeout; +use crate::event::Event; +use crate::Result; + +use filedescriptor::{poll, pollfd, POLLIN}; + +#[cfg(feature = "event-stream")] +use crate::event::sys::Waker; + +use crate::{ + event::{ + sys::unix::{ + file_descriptor::{tty_fd, FileDesc}, + parse::parse_event, + }, + InternalEvent, + source::EventSource, + }, +}; + +/// Holds a prototypical Waker and a receiver we can wait on when doing select(). +#[cfg(feature = "event-stream")] +struct WakePipe { + receiver: UnixStream, + waker: Waker, +} + +#[cfg(feature = "event-stream")] +impl WakePipe { + fn new() -> Result { + let (receiver, sender) = nonblocking_unix_pair()?; + Ok(WakePipe { + receiver, + waker: Waker::new(sender), + }) + } +} + +// I (@zrzka) wasn't able to read more than 1_022 bytes when testing +// reading on macOS/Linux -> we don't need bigger buffer and 1k of bytes +// is enough. +const TTY_BUFFER_SIZE: usize = 1_024; + +pub(crate) struct UnixInternalEventSource { + parser: Parser, + tty_buffer: [u8; TTY_BUFFER_SIZE], + tty: FileDesc, + winch_signal_receiver: UnixStream, + #[cfg(feature = "event-stream")] + wake_pipe: WakePipe, +} + +fn nonblocking_unix_pair() -> Result<(UnixStream, UnixStream)> { + let (receiver, sender) = UnixStream::pair()?; + receiver.set_nonblocking(true)?; + sender.set_nonblocking(true)?; + Ok((receiver, sender)) +} + +impl UnixInternalEventSource { + pub fn new() -> Result { + UnixInternalEventSource::from_file_descriptor(tty_fd()?) + } + + pub(crate) fn from_file_descriptor(input_fd: FileDesc) -> Result { + Ok(UnixInternalEventSource { + parser: Parser::default(), + tty_buffer: [0u8; TTY_BUFFER_SIZE], + tty: input_fd, + winch_signal_receiver: { + let (receiver, sender) = nonblocking_unix_pair()?; + // Unregistering is unnecessary because EventSource is a singleton + pipe::register(libc::SIGWINCH, sender)?; + receiver + }, + #[cfg(feature = "event-stream")] + wake_pipe: WakePipe::new()?, + }) + } +} + +/// read_complete reads from a non-blocking file descriptor +/// until the buffer is full or it would block. +/// +/// Similar to `std::io::Read::read_to_end`, except this function +/// only fills the given buffer and does not read beyond that. +fn read_complete(fd: &FileDesc, buf: &mut [u8]) -> Result { + loop { + match fd.read(buf, buf.len()) { + Ok(x) => return Ok(x), + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => return Ok(0), + io::ErrorKind::Interrupted => continue, + _ => return Err(e), + }, + } + } +} + +impl EventSource for UnixInternalEventSource { + fn try_read(&mut self, timeout: Option) -> Result> { + let timeout = PollTimeout::new(timeout); + + fn make_pollfd(fd: &F) -> pollfd { + pollfd { + fd: fd.as_raw_fd(), + events: POLLIN, + revents: 0, + } + } + + #[cfg(not(feature = "event-stream"))] + let mut fds = [ + make_pollfd(&self.tty), + make_pollfd(&self.winch_signal_receiver), + ]; + + #[cfg(feature = "event-stream")] + let mut fds = [ + make_pollfd(&self.tty), + make_pollfd(&self.winch_signal_receiver), + make_pollfd(&self.wake_pipe.receiver), + ]; + + while timeout.leftover().map_or(true, |t| !t.is_zero()) { + // check if there are buffered events from the last read + if let Some(event) = self.parser.next() { + return Ok(Some(event)); + } + match poll(&mut fds, timeout.leftover()) { + Err(filedescriptor::Error::Io(e)) => return Err(e), + res => res.expect("polling tty"), + }; + if fds[0].revents & POLLIN != 0 { + loop { + let read_count = read_complete(&self.tty, &mut self.tty_buffer)?; + if read_count > 0 { + self.parser.advance( + &self.tty_buffer[..read_count], + read_count == TTY_BUFFER_SIZE, + ); + } + + if let Some(event) = self.parser.next() { + return Ok(Some(event)); + } + + if read_count == 0 { + break; + } + } + } + if fds[1].revents & POLLIN != 0 { + let fd = FileDesc::new(self.winch_signal_receiver.as_raw_fd(), false); + // drain the pipe + while read_complete(&fd, &mut [0; 1024])? != 0 {} + // TODO Should we remove tput? + // + // This can take a really long time, because terminal::size can + // launch new process (tput) and then it parses its output. It's + // not a really long time from the absolute time point of view, but + // it's a really long time from the mio, async-std/tokio executor, ... + // point of view. + let new_size = crate::terminal::size()?; + return Ok(Some(InternalEvent::Event(Event::Resize( + new_size.0, new_size.1, + )))); + } + + #[cfg(feature = "event-stream")] + if fds[2].revents & POLLIN != 0 { + let fd = FileDesc::new(self.wake_pipe.receiver.as_raw_fd(), false); + // drain the pipe + while read_complete(&fd, &mut [0; 1024])? != 0 {} + + return Err(std::io::Error::new( + std::io::ErrorKind::Interrupted, + "Poll operation was woken up by `Waker::wake`", + )); + } + } + Ok(None) + } + + #[cfg(feature = "event-stream")] + fn waker(&self) -> Waker { + self.wake_pipe.waker.clone() + } +} + +// +// Following `Parser` structure exists for two reasons: +// +// * mimic anes Parser interface +// * move the advancing, parsing, ... stuff out of the `try_read` method +// +#[derive(Debug)] +struct Parser { + buffer: Vec, + internal_events: VecDeque, +} + +impl Default for Parser { + fn default() -> Self { + Parser { + // This buffer is used for -> 1 <- ANSI escape sequence. Are we + // aware of any ANSI escape sequence that is bigger? Can we make + // it smaller? + // + // Probably not worth spending more time on this as "there's a plan" + // to use the anes crate parser. + buffer: Vec::with_capacity(256), + // TTY_BUFFER_SIZE is 1_024 bytes. How many ANSI escape sequences can + // fit? What is an average sequence length? Let's guess here + // and say that the average ANSI escape sequence length is 8 bytes. Thus + // the buffer size should be 1024/8=128 to avoid additional allocations + // when processing large amounts of data. + // + // There's no need to make it bigger, because when you look at the `try_read` + // method implementation, all events are consumed before the next TTY_BUFFER + // is processed -> events pushed. + internal_events: VecDeque::with_capacity(128), + } + } +} + +impl Parser { + fn advance(&mut self, buffer: &[u8], more: bool) { + for (idx, byte) in buffer.iter().enumerate() { + let more = idx + 1 < buffer.len() || more; + + self.buffer.push(*byte); + + match parse_event(&self.buffer, more) { + Ok(Some(ie)) => { + self.internal_events.push_back(ie); + self.buffer.clear(); + } + Ok(None) => { + // Event can't be parsed, because we don't have enough bytes for + // the current sequence. Keep the buffer and process next bytes. + } + Err(_) => { + // Event can't be parsed (not enough parameters, parameter is not a number, ...). + // Clear the buffer and continue with another sequence. + self.buffer.clear(); + } + } + } + } +} + +impl Iterator for Parser { + type Item = InternalEvent; + + fn next(&mut self) -> Option { + self.internal_events.pop_front() + } +} diff --git a/src/event/sys/unix/file_descriptor.rs b/src/event/sys/unix/file_descriptor.rs index 8b481766b..268d57ebf 100644 --- a/src/event/sys/unix/file_descriptor.rs +++ b/src/event/sys/unix/file_descriptor.rs @@ -1,6 +1,9 @@ use std::{ fs, io, - os::unix::io::{IntoRawFd, RawFd}, + os::unix::{ + io::{IntoRawFd, RawFd}, + prelude::AsRawFd, + }, }; use libc::size_t; @@ -34,7 +37,7 @@ impl FileDesc { self.fd, buffer.as_mut_ptr() as *mut libc::c_void, size as size_t, - ) as isize + ) }; if result < 0 { @@ -63,6 +66,12 @@ impl Drop for FileDesc { } } +impl AsRawFd for FileDesc { + fn as_raw_fd(&self) -> RawFd { + self.raw_fd() + } +} + /// Creates a file descriptor pointing to the standard input or `/dev/tty`. pub fn tty_fd() -> Result { let (fd, close_on_drop) = if unsafe { libc::isatty(libc::STDIN_FILENO) == 1 } { diff --git a/src/event/sys/unix/parse.rs b/src/event/sys/unix/parse.rs index 0cfbd1a5e..5d0aa1716 100644 --- a/src/event/sys/unix/parse.rs +++ b/src/event/sys/unix/parse.rs @@ -104,11 +104,11 @@ pub(crate) fn parse_event(buffer: &[u8], input_available: bool) -> Result