Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement AlsaMixerObserver with actor model approach #34

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion mopidy_alsamixer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pathlib

import pkg_resources

from mopidy import config, ext

__version__ = pkg_resources.get_distribution("Mopidy-ALSAMixer").version
Expand Down
123 changes: 86 additions & 37 deletions mopidy_alsamixer/mixer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
import math
import random
import select
import threading
import struct
import time

import alsaaudio
import gi
Expand All @@ -12,6 +14,7 @@

from mopidy import exceptions, mixer # noqa isort:skip

from .polling_actor import PollingActor # noqa

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,19 +58,33 @@ def __init__(self, config):

self._last_volume = None
self._last_mute = None
self._observer = None

logger.info(
f"Mixing using ALSA, {self.device_title}, "
f"mixer control {self.control!r}."
)

def on_start(self):
self._observer = AlsaMixerObserver(
device=self.device,
control=self.control,
callback=self.actor_ref.proxy().trigger_events_for_changed_values,
self._observer = AlsaMixerObserver.start(
self._await_mixer(), self.actor_ref.proxy()
)

def on_stop(self):
self._stop_observer()

def on_failure(self, exception_type, exception_value, traceback):
self._stop_observer()

def restart_observer(self, exc=None):
self._stop_observer()
self._observer = AlsaMixerObserver.start(
self._await_mixer(exc), self.actor_ref.proxy()
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the call to AlsaMixerObserver.start() should be moved to a new method named _start_observer(), to match what you've done for the "stop observer" call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It came to my mind, but there's more than one way to implement _start_observer:

def _start_observer(self, exc):
    # Perhaps we should raise an exception is observer is already running?
    self._observer = AlsaMixerObserver.start(exc, self.actor_ref.proxy())
def _start_observer(self, exc):
    # This check is meaningless iif _start_observer()
    # is used only within AlsaMixer class
    if self._observer is None:
        self._observer = AlsaMixerObserver.start(exc, self.actor_ref.proxy())
def _start_observer(self, exc):
    # This is actually restart_observer()
    self._stop_observer()
    self._observer = AlsaMixerObserver.start(exc, self.actor_ref.proxy())

I struggled to decide which one to choose, so being guided by "There should be one -- and preferably only one -- obvious way to do it" I introduced restart_observer() as public method. This way _start_observer() would then need to be implemented as no. 1 from above, which is very repetitive in my opinion and unnecessary because it's internal method.

Does such reasoning make sense or it's still better to explicitly define _start_observer() as in no. 1 only for the sake of consistency?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it's important for consistency. It helps define the One True Way of doing what needs to be done.

self._observer.start()

def _stop_observer(self):
if self._observer is not None and self._observer.is_alive():
self._observer.stop()

@property
def _mixer(self):
Expand All @@ -78,8 +95,30 @@ def _mixer(self):
control=self.control,
)

def _await_mixer(self, exc_0=None, sleep=True):
while True:
try:
if exc_0 is not None:
exc, exc_0 = exc_0, None
raise exc

return self._mixer

except (alsaaudio.ALSAAudioError, OSError) as exc:
logger.info(
f"Could not open ALSA {self.device_title}. "
"Retrying in a few seconds... "
f"Error: {exc}"
)

if sleep:
time.sleep(random.uniform(7, 10))

def get_volume(self):
channels = self._mixer.getvolume()
try:
channels = self._mixer.getvolume()
except alsaaudio.ALSAAudioError:
return None
if not channels:
return None
elif channels.count(channels[0]) == len(channels):
Expand All @@ -89,7 +128,11 @@ def get_volume(self):
return None

def set_volume(self, volume):
self._mixer.setvolume(self.volume_to_mixer_volume(volume))
try:
self._mixer.setvolume(self.volume_to_mixer_volume(volume))
except alsaaudio.ALSAAudioError as exc:
logger.debug(f"Setting volume failed: {exc}")
return False
return True

def mixer_volume_to_volume(self, mixer_volume):
Expand Down Expand Up @@ -143,8 +186,7 @@ def volume_to_mixer_volume(self, volume):
def get_mute(self):
try:
channels_muted = self._mixer.getmute()
except alsaaudio.ALSAAudioError as exc:
logger.debug(f"Getting mute state failed: {exc}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why this logging has been removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it for consistency (get_volume has no such logging) and because I thought this method (as well as get_volume) may be called repeatedly generating lots of error messages due to the actual error being suppressed.

I'm grateful you bring this up because now I think it's better to do the opposite and add logging to get_volume too, as ALSAAudioError can occur during alsaaudio.Mixer.getvolume, not only during alsaaudio.Mixer.

except alsaaudio.ALSAAudioError:
return None
if all(channels_muted):
return True
Expand Down Expand Up @@ -173,36 +215,43 @@ def trigger_events_for_changed_values(self):
self.trigger_mute_changed(self._last_mute)


class AlsaMixerObserver(threading.Thread):
daemon = True
name = "AlsaMixerObserver"
class AlsaMixerObserver(PollingActor):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could set use_daemon_thread = True as a class variable here to make sure this thread will not block the application from exiting.

Ref https://pykka.readthedocs.io/en/stable/runtimes/threading/#pykka.ThreadingActor.use_daemon_thread


def __init__(self, device, control, callback=None):
super().__init__()
self.running = True
name = "alsamixer-observer"

# Keep the mixer instance alive for the descriptors to work
self.mixer = alsaaudio.Mixer(device=device, control=control)
combine_events = True

descriptors = self.mixer.polldescriptors()
assert len(descriptors) == 1
self.fd = descriptors[0][0]
self.event_mask = descriptors[0][1]
def __init__(self, mixer, parent):
# Note: ALSA mixer instance must be kept alive
# to keep poll descriptors open
self._mixer = mixer
self._parent = parent

self.callback = callback
# FIXME: Remove when a new version of pyalsaaudio is released
# See https://github.com/larsimmisch/pyalsaaudio/pull/108
def check_fd(fd):
return fd != -1 and fd != struct.unpack("I", b"\xFF\xFF\xFF\xFF")[0]

def stop(self):
self.running = False
super().__init__(
fds=tuple(
(fd, event_mask | select.EPOLLET)
for (fd, event_mask) in self._mixer.polldescriptors()
if check_fd(fd)
)
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't super clear exactly what code needs updating (and how) when a new version of pyalsaaudio is released.

Is this code forwards-compatible, or will it break with newer versions of pyalsaaudio?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added clarifications. It's compatible with newer versions, I tried to underline it now phrasing it as "it can be removed".

Compatibility follows from the fact that max absolute value of correct file descriptor data type -- signed int32 -- is about a half of unsigned int32 (used in current latest release). So this check essentially becomes just garbage.


def run(self):
poller = select.epoll()
poller.register(self.fd, self.event_mask | select.EPOLLET)
while self.running:
try:
events = poller.poll(timeout=1)
if events and self.callback is not None:
self.callback()
except OSError as exc:
# poller.poll() will raise an IOError because of the
# interrupted system call when suspending the machine.
logger.debug(f"Ignored IO error: {exc}")
def on_start(self):
logger.debug(
f"Starting AlsaMixerObserver with {len(self._fds)} valid poll descriptors"
)

def on_failure(self, exception_type, exception_value, traceback):
if exception_type is OSError:
# OSError can normally occur after suspend/resume or device disconnection
self._parent.restart_observer(exception_value)

def on_poll(self, fd, event):
if event & (select.EPOLLHUP | select.EPOLLERR):
self._parent.restart_observer()
else:
self._parent.trigger_events_for_changed_values().get()
172 changes: 172 additions & 0 deletions mopidy_alsamixer/polling_actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import logging
import os
import queue
import select
import sys
from typing import Any, NamedTuple, Tuple

import pykka
import pykka._envelope
import pykka.messages

logger = logging.getLogger(__name__)


class PollingActor(pykka.ThreadingActor):

combine_events = False

def __init__(self, fds=tuple()):
super().__init__()

self._fds = fds
self._poll = None

def _start_actor_loop(self):
try:
self._wake_fd_read, self._wake_fd_write = os.pipe()
logging.debug(
f"Wake channel for {self} is opened with "
f"rfd={self._wake_fd_read:d} and wfd={self._wake_fd_write:d}"
)

self._poll = select.epoll()
self._poll.register(
self._wake_fd_read, select.EPOLLIN | select.EPOLLET
)

for fd, event_mask in self._fds:
self._poll.register(fd, event_mask)

self.actor_inbox._actor = self
except Exception:
self._handle_failure(*sys.exc_info())
return

super()._start_actor_loop()

def _stop(self):
super()._stop()

os.close(self._wake_fd_write)
os.close(self._wake_fd_read)

def _listen(self, timeout):
assert (
self._poll is not None
), "Must not request events before poll initialization"

logging.debug(
f"Actor {self} is entering poll sleep with timeout = {timeout!r}"
)
events = self._poll.poll(timeout)
logging.debug(f"Actor {self} has been woken with events {events!r}")

# Don't handle any events if
# actor has been woken during stopping,
# so it can quickly finish its lifecycle
if not self.actor_ref.is_alive():
return tuple()

return (
(fd, event) for (fd, event) in events if fd != self._wake_fd_read
)

def _wake(self):
logging.debug(f"Waking actor {self}")
os.write(self._wake_fd_write, b"\xFF")

def _handle_receive(self, message):
if isinstance(message, ActorError):
self._handle_failure(*message.exc_info)
try:
self.on_failure(*message.exc_info)
except Exception:
self._handle_failure(*sys.exc_info())
return

if isinstance(message, PollEvent):
return self.on_poll(message.fd, message.event)

return super()._handle_receive(message)

def on_poll(self, fd, event):
raise NotImplementedError("Use a subclass of PollingActor")

@classmethod
def _create_actor_inbox(cls):
return PollingActorInbox(cls.combine_events)


class PollingActorInbox(queue.Queue):
def __init__(self, combine_events=False):
super().__init__()

self._actor = None
self._combine_events = combine_events

def put(self, item, block=True, timeout=None):
if self._actor is not None:
self._actor._wake()

super().put(item, block, timeout)

def get(self, block=True, timeout=None):
assert (
self._actor is not None
), "Actor must be set before starting polling"

while True:
if not self.empty():
return super().get(False)

try:
# If a non-blocking call is requested simulate
# it with the minimal timeout of 1 millisecond
if not block:
events = self._actor._listen(1)
else:
# TODO: Since this can be called more than once
# we need to properly update timeout if it isn't None
events = self._actor._listen(timeout)
except Exception:
return pykka._envelope.Envelope(
ActorError(exc_info=sys.exc_info())
)

if self._combine_events:
events = filter(PollingActorInbox._combine_filter(), events)

for event in events:
super().put(pykka._envelope.Envelope(PollEvent(*event)))

if not block and self.empty():
raise queue.Empty

def _combine_filter():
trigger = False

def combiner(event):
nonlocal trigger

if event[1] & ~(select.EPOLLIN | select.EPOLLOUT | select.EPOLLPRI):
return True

if trigger:
return False

trigger = True
return True

return combiner


class PollEvent(NamedTuple):

fd: int

event: int


class ActorError(NamedTuple):
exc_info: Tuple[Any]
Loading