-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement AlsaMixerObserver with actor model approach #34
Open
st8ed
wants to merge
9
commits into
mopidy:main
Choose a base branch
from
st8ed:feature/implement-observer-actor
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
63caa27
Implement AlsaMixerObserver as an actor
st8ed 8f51dfa
Add tests for AlsaMixerObserver
st8ed 5389a0c
Regroup imports in package init (isort)
st8ed a460ba8
Ignore non-severe errors when accessing mixer
st8ed 414d880
Fix crash caused by invalid poll descriptors
st8ed 71025c3
Fix inconsistent observer tests
st8ed 7ed2a04
Fix missing timeout unit conversion
st8ed 6f8ab46
Improve logging
st8ed c02d3b1
Clarify pyalsaaudio mitigation
st8ed File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,9 @@ | ||
import logging | ||
import math | ||
import random | ||
import select | ||
import threading | ||
import struct | ||
import time | ||
|
||
import alsaaudio | ||
import gi | ||
|
@@ -12,6 +14,7 @@ | |
|
||
from mopidy import exceptions, mixer # noqa isort:skip | ||
|
||
from .polling_actor import PollingActor # noqa | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
@@ -55,19 +58,33 @@ def __init__(self, config): | |
|
||
self._last_volume = None | ||
self._last_mute = None | ||
self._observer = None | ||
|
||
logger.info( | ||
f"Mixing using ALSA, {self.device_title}, " | ||
f"mixer control {self.control!r}." | ||
) | ||
|
||
def on_start(self): | ||
self._observer = AlsaMixerObserver( | ||
device=self.device, | ||
control=self.control, | ||
callback=self.actor_ref.proxy().trigger_events_for_changed_values, | ||
self._observer = AlsaMixerObserver.start( | ||
self._await_mixer(), self.actor_ref.proxy() | ||
) | ||
self._observer.start() | ||
|
||
def on_stop(self): | ||
self._stop_observer() | ||
|
||
def on_failure(self, exception_type, exception_value, traceback): | ||
self._stop_observer() | ||
|
||
def restart_observer(self, exc=None): | ||
self._stop_observer() | ||
self._observer = AlsaMixerObserver.start( | ||
self._await_mixer(exc), self.actor_ref.proxy() | ||
) | ||
|
||
def _stop_observer(self): | ||
if self._observer is not None and self._observer.is_alive(): | ||
self._observer.stop() | ||
|
||
@property | ||
def _mixer(self): | ||
|
@@ -78,18 +95,50 @@ def _mixer(self): | |
control=self.control, | ||
) | ||
|
||
def _await_mixer(self, exc_0=None, sleep=True): | ||
while True: | ||
try: | ||
if exc_0 is not None: | ||
exc, exc_0 = exc_0, None | ||
raise exc | ||
|
||
return self._mixer | ||
|
||
except (alsaaudio.ALSAAudioError, OSError) as exc: | ||
logger.info( | ||
f"Could not open ALSA {self.device_title}. " | ||
"Retrying in a few seconds... " | ||
f"Error: {exc}" | ||
) | ||
|
||
if sleep: | ||
time.sleep(random.uniform(7, 10)) | ||
|
||
def get_volume(self): | ||
channels = self._mixer.getvolume() | ||
try: | ||
channels = self._mixer.getvolume() | ||
except alsaaudio.ALSAAudioError as exc: | ||
logger.debug(f"Could not get ALSA mixer volume: {exc}") | ||
return None | ||
|
||
if not channels: | ||
return None | ||
elif channels.count(channels[0]) == len(channels): | ||
return self.mixer_volume_to_volume(channels[0]) | ||
else: | ||
# Not all channels have the same volume | ||
logger.debug( | ||
"Could not determine single ALSA mixer volume " | ||
"because channels have different volumes" | ||
) | ||
return None | ||
|
||
def set_volume(self, volume): | ||
self._mixer.setvolume(self.volume_to_mixer_volume(volume)) | ||
try: | ||
self._mixer.setvolume(self.volume_to_mixer_volume(volume)) | ||
except alsaaudio.ALSAAudioError as exc: | ||
logger.debug(f"Could not set ALSA mixer volume: {exc}") | ||
return False | ||
|
||
return True | ||
|
||
def mixer_volume_to_volume(self, mixer_volume): | ||
|
@@ -144,22 +193,26 @@ def get_mute(self): | |
try: | ||
channels_muted = self._mixer.getmute() | ||
except alsaaudio.ALSAAudioError as exc: | ||
logger.debug(f"Getting mute state failed: {exc}") | ||
logger.debug(f"Could not get ALSA mixer mute state: {exc}") | ||
return None | ||
|
||
if all(channels_muted): | ||
return True | ||
elif not any(channels_muted): | ||
return False | ||
else: | ||
# Not all channels have the same mute state | ||
logger.debug( | ||
"Could not determine single ALSA mixer mute state " | ||
"because channels have different mute states" | ||
) | ||
return None | ||
|
||
def set_mute(self, mute): | ||
try: | ||
self._mixer.setmute(int(mute)) | ||
return True | ||
except alsaaudio.ALSAAudioError as exc: | ||
logger.debug(f"Setting mute state failed: {exc}") | ||
logger.debug(f"Could not set ALSA mixer mute state: {exc}") | ||
return False | ||
|
||
def trigger_events_for_changed_values(self): | ||
|
@@ -173,36 +226,44 @@ def trigger_events_for_changed_values(self): | |
self.trigger_mute_changed(self._last_mute) | ||
|
||
|
||
class AlsaMixerObserver(threading.Thread): | ||
daemon = True | ||
name = "AlsaMixerObserver" | ||
class AlsaMixerObserver(PollingActor): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could set Ref https://pykka.readthedocs.io/en/stable/runtimes/threading/#pykka.ThreadingActor.use_daemon_thread |
||
|
||
def __init__(self, device, control, callback=None): | ||
super().__init__() | ||
self.running = True | ||
name = "alsamixer-observer" | ||
|
||
# Keep the mixer instance alive for the descriptors to work | ||
self.mixer = alsaaudio.Mixer(device=device, control=control) | ||
combine_events = True | ||
|
||
descriptors = self.mixer.polldescriptors() | ||
assert len(descriptors) == 1 | ||
self.fd = descriptors[0][0] | ||
self.event_mask = descriptors[0][1] | ||
def __init__(self, mixer, parent): | ||
# Note: ALSA mixer instance must be kept alive | ||
# to keep poll descriptors open | ||
self._mixer = mixer | ||
self._parent = parent | ||
|
||
self.callback = callback | ||
# TODO: When a yet unreleased version of pyalsaaudio is used (> 0.9.0) | ||
# this function with its call below can be safely removed. | ||
# See https://github.com/larsimmisch/pyalsaaudio/pull/108 | ||
def mitigate_invalid_fd_conversion(fd): | ||
return fd != struct.unpack("I", b"\xFF\xFF\xFF\xFF")[0] | ||
|
||
def stop(self): | ||
self.running = False | ||
super().__init__( | ||
fds=tuple( | ||
(fd, event_mask | select.EPOLLET) | ||
for (fd, event_mask) in self._mixer.polldescriptors() | ||
if fd != -1 and mitigate_invalid_fd_conversion(fd) | ||
) | ||
) | ||
|
||
def run(self): | ||
poller = select.epoll() | ||
poller.register(self.fd, self.event_mask | select.EPOLLET) | ||
while self.running: | ||
try: | ||
events = poller.poll(timeout=1) | ||
if events and self.callback is not None: | ||
self.callback() | ||
except OSError as exc: | ||
# poller.poll() will raise an IOError because of the | ||
# interrupted system call when suspending the machine. | ||
logger.debug(f"Ignored IO error: {exc}") | ||
def on_start(self): | ||
logger.debug( | ||
f"Starting AlsaMixerObserver with {len(self._fds)} valid poll descriptors" | ||
) | ||
|
||
def on_failure(self, exception_type, exception_value, traceback): | ||
if exception_type is OSError: | ||
# OSError can normally occur after suspend/resume or device disconnection | ||
self._parent.restart_observer(exception_value) | ||
|
||
def on_poll(self, fd, event): | ||
if event & (select.EPOLLHUP | select.EPOLLERR): | ||
self._parent.restart_observer() | ||
else: | ||
self._parent.trigger_events_for_changed_values().get() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
import logging | ||
import os | ||
import queue | ||
import select | ||
import sys | ||
from typing import Any, NamedTuple, Tuple | ||
|
||
import pykka | ||
import pykka._envelope | ||
import pykka.messages | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class PollingActor(pykka.ThreadingActor): | ||
|
||
combine_events = False | ||
|
||
def __init__(self, fds=tuple()): | ||
super().__init__() | ||
|
||
self._fds = fds | ||
self._poll = None | ||
|
||
def _start_actor_loop(self): | ||
try: | ||
self._wake_fd_read, self._wake_fd_write = os.pipe() | ||
logging.debug( | ||
f"Wake channel for {self} is opened with " | ||
f"rfd={self._wake_fd_read:d} and wfd={self._wake_fd_write:d}" | ||
) | ||
|
||
self._poll = select.epoll() | ||
self._poll.register( | ||
self._wake_fd_read, select.EPOLLIN | select.EPOLLET | ||
) | ||
|
||
for fd, event_mask in self._fds: | ||
self._poll.register(fd, event_mask) | ||
|
||
self.actor_inbox._actor = self | ||
except Exception: | ||
self._handle_failure(*sys.exc_info()) | ||
return | ||
|
||
super()._start_actor_loop() | ||
|
||
def _stop(self): | ||
super()._stop() | ||
|
||
os.close(self._wake_fd_write) | ||
os.close(self._wake_fd_read) | ||
|
||
def _listen(self, timeout): | ||
assert ( | ||
self._poll is not None | ||
), "Must not request events before poll initialization" | ||
|
||
logging.debug( | ||
f"Actor {self} is entering poll sleep with timeout = {timeout!r}" | ||
) | ||
events = self._poll.poll(timeout) | ||
logging.debug(f"Actor {self} has been woken with events {events!r}") | ||
|
||
# Don't handle any events if | ||
# actor has been woken during stopping, | ||
# so it can quickly finish its lifecycle | ||
if not self.actor_ref.is_alive(): | ||
return tuple() | ||
|
||
return ( | ||
(fd, event) for (fd, event) in events if fd != self._wake_fd_read | ||
) | ||
|
||
def _wake(self): | ||
logging.debug(f"Waking actor {self}") | ||
os.write(self._wake_fd_write, b"\xFF") | ||
|
||
def _handle_receive(self, message): | ||
if isinstance(message, ActorError): | ||
self._handle_failure(*message.exc_info) | ||
try: | ||
self.on_failure(*message.exc_info) | ||
except Exception: | ||
self._handle_failure(*sys.exc_info()) | ||
return | ||
|
||
if isinstance(message, PollEvent): | ||
return self.on_poll(message.fd, message.event) | ||
|
||
return super()._handle_receive(message) | ||
|
||
def on_poll(self, fd, event): | ||
raise NotImplementedError("Use a subclass of PollingActor") | ||
|
||
@classmethod | ||
def _create_actor_inbox(cls): | ||
return PollingActorInbox(cls.combine_events) | ||
|
||
|
||
class PollingActorInbox(queue.Queue): | ||
def __init__(self, combine_events=False): | ||
super().__init__() | ||
|
||
self._actor = None | ||
self._combine_events = combine_events | ||
|
||
def put(self, item, block=True, timeout=None): | ||
if self._actor is not None: | ||
self._actor._wake() | ||
|
||
super().put(item, block, timeout) | ||
|
||
def get(self, block=True, timeout=None): | ||
assert ( | ||
self._actor is not None | ||
), "Actor must be set before starting polling" | ||
|
||
while True: | ||
if not self.empty(): | ||
return super().get(False) | ||
|
||
try: | ||
# If a non-blocking call is requested simulate | ||
# it with the minimal timeout of 1 millisecond | ||
if not block: | ||
events = self._actor._listen(1) | ||
else: | ||
# TODO: Since this can be called more than once | ||
# we need to properly update timeout if it isn't None | ||
events = self._actor._listen( | ||
timeout * 1000 if timeout is not None else None | ||
) | ||
except Exception: | ||
return pykka._envelope.Envelope( | ||
ActorError(exc_info=sys.exc_info()) | ||
) | ||
|
||
if self._combine_events: | ||
events = filter(PollingActorInbox._combine_filter(), events) | ||
|
||
for event in events: | ||
super().put(pykka._envelope.Envelope(PollEvent(*event))) | ||
|
||
if not block and self.empty(): | ||
raise queue.Empty | ||
|
||
def _combine_filter(): | ||
trigger = False | ||
|
||
def combiner(event): | ||
nonlocal trigger | ||
|
||
if event[1] & ~(select.EPOLLIN | select.EPOLLOUT | select.EPOLLPRI): | ||
return True | ||
|
||
if trigger: | ||
return False | ||
|
||
trigger = True | ||
return True | ||
|
||
return combiner | ||
|
||
|
||
class PollEvent(NamedTuple): | ||
|
||
fd: int | ||
|
||
event: int | ||
|
||
|
||
class ActorError(NamedTuple): | ||
exc_info: Tuple[Any] |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the call to
AlsaMixerObserver.start()
should be moved to a new method named_start_observer()
, to match what you've done for the "stop observer" call.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It came to my mind, but there's more than one way to implement
_start_observer
:I struggled to decide which one to choose, so being guided by "There should be one -- and preferably only one -- obvious way to do it" I introduced
restart_observer()
as public method. This way_start_observer()
would then need to be implemented as no. 1 from above, which is very repetitive in my opinion and unnecessary because it's internal method.Does such reasoning make sense or it's still better to explicitly define
_start_observer()
as in no. 1 only for the sake of consistency?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think it's important for consistency. It helps define the One True Way of doing what needs to be done.