diff --git a/music_assistant/controllers/streams/audio.py b/music_assistant/controllers/streams/audio.py index fbb94ef4bc..3c1dba37ac 100644 --- a/music_assistant/controllers/streams/audio.py +++ b/music_assistant/controllers/streams/audio.py @@ -29,6 +29,7 @@ MediaType, PlayerFeature, PlayerType, + ProviderFeature, StreamType, VolumeNormalizationMode, ) @@ -87,7 +88,7 @@ resample_pcm_audio, ) from music_assistant.helpers.dsp import filter_to_ffmpeg_params -from music_assistant.helpers.ffmpeg import FFMpeg, get_ffmpeg_stream +from music_assistant.helpers.ffmpeg import FFMpeg, get_ffmpeg_stream, mix_pcm_streams from music_assistant.helpers.playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER from music_assistant.helpers.util import ( @@ -108,6 +109,7 @@ from music_assistant.mass import MusicAssistant from music_assistant.models.music_provider import MusicProvider from music_assistant.models.player import Player + from music_assistant.models.plugin import PluginProvider from music_assistant.providers.sync_group import SyncGroupPlayer # ruff: noqa: PLR0915 @@ -2251,3 +2253,47 @@ async def _update_hls_radio_metadata( except Exception as err: self.logger.debug("Error fetching HLS metadata: %s", err) + + # ------------------------------------------------------------------ + # Audio overlay support (used by Rain Mood and similar plugins) + # ------------------------------------------------------------------ + + def get_active_overlay_providers(self, player_id: str) -> list[PluginProvider]: + """ + Return all plugins with AUDIO_OVERLAY feature that are active for the given player. + + :param player_id: The player to check. + """ + return [ + prov # type: ignore[misc] + for prov in self.mass.get_providers_supporting_feature(ProviderFeature.AUDIO_OVERLAY) + if prov.is_overlay_active(player_id) # type: ignore[union-attr] + ] + + async def wrap_overlay_if_active( + self, + audio_input: AsyncGenerator[bytes, None], + player_id: str, + pcm_format: AudioFormat, + ) -> AsyncGenerator[bytes, None]: + """ + Wrap audio_input with every active overlay plugin stream mixed in. + + Calling this as an async generator function returns an AsyncGenerator immediately, + so it is safe to call from both sync and async contexts. + + :param audio_input: Async generator producing raw PCM bytes. + :param player_id: The player (or queue) to check for active overlays. + :param pcm_format: The PCM format used by audio_input. + """ + overlay_gens: list[AsyncGenerator[bytes, None]] = [] + for prov in self.get_active_overlay_providers(player_id): + gen = await prov.get_overlay_stream(player_id, pcm_format) + if gen is not None: + overlay_gens.append(gen) + if not overlay_gens: + async for chunk in audio_input: + yield chunk + return + async for chunk in mix_pcm_streams([audio_input, *overlay_gens], pcm_format): + yield chunk diff --git a/music_assistant/controllers/streams/controller.py b/music_assistant/controllers/streams/controller.py index d5df674535..b4577524ef 100644 --- a/music_assistant/controllers/streams/controller.py +++ b/music_assistant/controllers/streams/controller.py @@ -520,6 +520,8 @@ async def serve_queue_item_stream(self, request: web.Request) -> web.StreamRespo "float", queue_item.extra_attributes.get("playback_speed", 1.0) ), ) + audio_input = self.audio.wrap_overlay_if_active(audio_input, queue_id, pcm_format) + # stream the audio # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into # the desired output format for the player including any player specific filter params @@ -590,7 +592,7 @@ async def serve_queue_item_stream(self, request: web.Request) -> web.StreamRespo ) return resp - async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse: + async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse: # noqa: PLR0915 """Stream Queue Flow audio to player.""" self._log_request(request) queue_id = request.match_info["queue_id"] @@ -656,10 +658,15 @@ async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamRespo # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name) + flow_audio_input = self.audio.get_queue_flow_stream( + queue=queue, start_queue_item=start_queue_item, pcm_format=flow_pcm_format + ) + flow_audio_input = self.audio.wrap_overlay_if_active( + flow_audio_input, queue_id, flow_pcm_format + ) + async for chunk in get_ffmpeg_stream( - audio_input=self.audio.get_queue_flow_stream( - queue=queue, start_queue_item=start_queue_item, pcm_format=flow_pcm_format - ), + audio_input=flow_audio_input, input_format=flow_pcm_format, output_format=output_format, filter_params=self.audio.get_player_filter_params( @@ -955,19 +962,23 @@ def get_stream( flow_stream = self.audio.get_queue_flow_stream( queue=queue, start_queue_item=start_queue_item, pcm_format=pcm_format ) + # Use queue_id (the queue owner) not player_id (which may be a transport bridge). + flow_stream = self.audio.wrap_overlay_if_active(flow_stream, queue_id, pcm_format) if use_flow_stream_buffering: return buffered(flow_stream, buffer_size=30, min_buffer_before_yield=1) return flow_stream # single item stream (e.g. radio or non-flow mode) queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id) assert queue_item - return self.audio.get_queue_item_stream( + item_stream = self.audio.get_queue_item_stream( queue_item=queue_item, pcm_format=pcm_format, playback_speed=cast( "float", queue_item.extra_attributes.get("playback_speed", 1.0) ), ) + # Use queue_id (the queue owner) not player_id (which may be a transport bridge). + return self.audio.wrap_overlay_if_active(item_stream, queue_id, pcm_format) # assume url or some other direct path # NOTE: this will fail if its an uri not playable by ffmpeg return get_ffmpeg_stream( diff --git a/music_assistant/helpers/ffmpeg.py b/music_assistant/helpers/ffmpeg.py index 9b7b1bbbd4..5a9ea271d5 100644 --- a/music_assistant/helpers/ffmpeg.py +++ b/music_assistant/helpers/ffmpeg.py @@ -4,6 +4,7 @@ import asyncio import logging +import os import time from collections import deque from collections.abc import AsyncGenerator @@ -247,6 +248,140 @@ async def get_ffmpeg_stream( raise AudioError(log_tail) +async def mix_pcm_streams( # noqa: PLR0915 + inputs: list[AsyncGenerator[bytes, None]], + pcm_format: AudioFormat, +) -> AsyncGenerator[bytes, None]: + """ + Mix N PCM async generators of identical format using ffmpeg's amix filter. + + All inputs must produce raw PCM bytes in pcm_format. Output is the same format. + The mix duration is bound to the first input (intended to be the music); when + the first input ends the process terminates and any remaining bytes on the + other inputs are discarded. + + :param inputs: List of PCM async generators. First entry drives the duration. + :param pcm_format: PCM format shared by all inputs and the output. + """ + if not inputs: + return + if len(inputs) == 1: + async for chunk in inputs[0]: + yield chunk + return + + fmt = pcm_format.content_type.value + sample_rate = str(pcm_format.sample_rate) + channels = str(pcm_format.channels) + n_inputs = len(inputs) + + # Create one OS pipe per input. Child process reads from read_fds via `-i pipe:`; + # we (parent) feed into write_fds from each async generator and close on EOF. + pipes: list[tuple[int, int]] = [os.pipe() for _ in range(n_inputs)] + read_fds = [r for r, _ in pipes] + write_fds = [w for _, w in pipes] + + amix_filter = ( + "".join(f"[{i}:a]" for i in range(n_inputs)) + + f"amix=inputs={n_inputs}:duration=first:normalize=0[mix]" + ) + + cmd: list[str] = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-nostats"] + for read_fd in read_fds: + cmd += [ + "-f", + fmt, + "-ar", + sample_rate, + "-ac", + channels, + "-i", + f"pipe:{read_fd}", + ] + cmd += [ + "-filter_complex", + amix_filter, + "-map", + "[mix]", + "-f", + fmt, + "-ar", + sample_rate, + "-ac", + channels, + "pipe:1", + ] + + proc: asyncio.subprocess.Process | None = None + feeder_tasks: list[asyncio.Task[None]] = [] + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + pass_fds=tuple(read_fds), + ) + # The child now owns its inherited copies of read_fds; close ours. + for fd in read_fds: + with suppress(OSError): + os.close(fd) + read_fds.clear() + + loop = asyncio.get_running_loop() + + async def _feeder(write_fd: int, gen: AsyncGenerator[bytes, None]) -> None: + writer: asyncio.StreamWriter | None = None + try: + reader = asyncio.StreamReader() + protocol = asyncio.StreamReaderProtocol(reader) + transport, _ = await loop.connect_write_pipe( + lambda: protocol, os.fdopen(write_fd, "wb") + ) + writer = asyncio.StreamWriter(transport, protocol, reader, loop) + async for chunk in gen: + writer.write(chunk) + await writer.drain() + except (BrokenPipeError, ConnectionResetError, asyncio.CancelledError): + pass + finally: + if writer is not None: + with suppress(Exception): + writer.close() + else: + with suppress(OSError): + os.close(write_fd) + + feeder_tasks = [ + asyncio.create_task(_feeder(w, g)) for w, g in zip(write_fds, inputs, strict=True) + ] + write_fds.clear() # ownership moved to feeders + + assert proc.stdout is not None + while True: + chunk = await proc.stdout.read(65536) + if not chunk: + break + yield chunk + finally: + for task in feeder_tasks: + if not task.done(): + task.cancel() + for task in feeder_tasks: + with suppress(asyncio.CancelledError, Exception): + await task + for fd in read_fds: + with suppress(OSError): + os.close(fd) + for fd in write_fds: + with suppress(OSError): + os.close(fd) + if proc is not None and proc.returncode is None: + with suppress(ProcessLookupError): + proc.kill() + with suppress(Exception): + await proc.wait() + + def get_ffmpeg_args( # noqa: PLR0915 input_format: AudioFormat, output_format: AudioFormat, diff --git a/music_assistant/models/plugin.py b/music_assistant/models/plugin.py index 2de136fe27..74b7d77f06 100644 --- a/music_assistant/models/plugin.py +++ b/music_assistant/models/plugin.py @@ -260,6 +260,33 @@ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> lis """ raise NotImplementedError + def is_overlay_active(self, player_id: str) -> bool: + """ + Return whether an audio overlay is currently active for this player. + + Will only be called if ProviderFeature.AUDIO_OVERLAY is declared. + + :param player_id: The player to check. + """ + return False + + async def get_overlay_stream( + self, + player_id: str, + pcm_format: AudioFormat, + ) -> AsyncGenerator[bytes, None] | None: + """ + Return a volume-adjusted PCM overlay stream for the given player. + + Will only be called if ProviderFeature.AUDIO_OVERLAY is declared. + The returned bytes must be in the same format as pcm_format. + + :param player_id: The player for which the overlay is requested. + :param pcm_format: The PCM format the overlay must be produced in. + :returns: Async generator of raw PCM bytes, or None if no overlay is active. + """ + return None + async def resolve_image(self, path: str) -> str | bytes: """ Resolve an image from an image path. diff --git a/music_assistant/providers/rain_mood/__init__.py b/music_assistant/providers/rain_mood/__init__.py new file mode 100644 index 0000000000..3e7def181a --- /dev/null +++ b/music_assistant/providers/rain_mood/__init__.py @@ -0,0 +1,324 @@ +"""Rainy Mood Plugin Provider for Music Assistant. + +Mixes looping rain sounds from rainymood.com transparently into whatever the +player is already playing from its queue. The queue is never touched: the +rain audio is injected into the PCM stream that the streams controller serves +to the player, by implementing the AUDIO_OVERLAY plugin interface. + +A persistent FFmpeg subprocess is kept alive per player so that rain audio +continues seamlessly across track transitions and seeks. The subprocess is +only restarted when explicitly stopped and re-enabled, or when the PCM format +changes. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncGenerator, Callable +from contextlib import suppress +from typing import TYPE_CHECKING, Any, cast + +import numpy as np +from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig +from music_assistant_models.enums import ConfigEntryType, EventType, PlaybackState, ProviderFeature + +from music_assistant.models.plugin import PluginProvider + +if TYPE_CHECKING: + from music_assistant_models.event import MassEvent + from music_assistant_models.media_items.audio_format import AudioFormat + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + from music_assistant.models import ProviderInstanceType + +RAIN_URL = "https://media.rainymood.com/0.mp3" + +CONF_RAIN_RATIO = "rain_ratio" + +SUPPORTED_FEATURES: set[ProviderFeature] = {ProviderFeature.AUDIO_OVERLAY} + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return RainyMoodPlugin(mass, manifest, config, SUPPORTED_FEATURES) + + +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, # noqa: ARG001 +) -> tuple[ConfigEntry, ...]: + """ + Return Config entries to setup this provider. + + :param mass: MusicAssistant instance. + :param instance_id: ID of an existing provider instance (None if new instance setup). + :param action: Optional action key called from config entries UI. + :param values: The (intermediate) raw values for config entries sent with the action. + """ + return ( + ConfigEntry( + key=CONF_RAIN_RATIO, + type=ConfigEntryType.INTEGER, + range=(0, 200), + default_value=100, + label="Rain Volume Ratio (%)", + description="Rain loudness relative to the music. 100 % = equally loud, 0 % = inaudible, 200 % = twice as loud.", + ), + ) + + +class RainBuffer: + """Persistent FFmpeg subprocess that streams looping rain PCM.""" + + def __init__(self) -> None: + """Initialize the RainBuffer.""" + self._proc: asyncio.subprocess.Process | None = None + self._pcm_format: AudioFormat | None = None + + async def start(self, pcm_format: AudioFormat) -> None: + """Start the FFmpeg rain process configured to output pcm_format.""" + await self.stop() + self._pcm_format = pcm_format + fmt = pcm_format.content_type.value + cmd = [ + "ffmpeg", + "-hide_banner", + "-loglevel", + "error", + "-nostats", + "-stream_loop", + "-1", + "-reconnect", + "1", + "-reconnect_streamed", + "1", + "-reconnect_on_http_error", + "5xx,429", + "-reconnect_delay_max", + "10", + "-i", + RAIN_URL, + "-f", + fmt, + "-ar", + str(pcm_format.sample_rate), + "-ac", + str(pcm_format.channels), + "pipe:1", + ] + self._proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + + async def stop(self) -> None: + """Kill the FFmpeg process.""" + proc, self._proc = self._proc, None + if proc is not None: + with suppress(Exception): + proc.kill() + + async def ensure_format(self, pcm_format: AudioFormat) -> None: + """Restart the subprocess if the requested format differs from the current one.""" + if self._pcm_format != pcm_format: + await self.start(pcm_format) + + async def read(self, n: int) -> bytes | None: + """ + Read exactly n bytes of rain PCM. + + :param n: Number of bytes to read. + :returns: Raw PCM bytes, or None if the process has ended. + """ + if self._proc is None or self._proc.stdout is None or self._proc.returncode is not None: + return None + try: + return await self._proc.stdout.readexactly(n) + except (asyncio.IncompleteReadError, Exception): + return None + + async def scaled_stream( + self, rain_vol: float, pcm_format: AudioFormat + ) -> AsyncGenerator[bytes, None]: + """ + Yield rain PCM chunks scaled by rain_vol. + + :param rain_vol: Volume multiplier (1.0 = same level as music). + :param pcm_format: PCM format for dtype selection. + """ + fmt = pcm_format.content_type.value + dtype: Any = np.float32 if "f32" in fmt else np.int16 + clip_min: float = -1.0 if dtype == np.float32 else -32768 + clip_max: float = 1.0 if dtype == np.float32 else 32767 + chunk_size = 4096 + while True: + data = await self.read(chunk_size) + if data is None: + return + arr = np.frombuffer(data, dtype=dtype) + scaled = np.clip(arr * rain_vol, clip_min, clip_max).astype(dtype) + yield scaled.tobytes() + + +class RainyMoodPlugin(PluginProvider): + """Rainy Mood Plugin Provider. + + When enabled for a player, implements the AUDIO_OVERLAY interface so the + streams controller can mix rain into the regular queue PCM output. + A persistent RainBuffer ensures rain audio continues seamlessly across + track transitions and seeks without restarting from the beginning. + """ + + def __init__( + self, + mass: MusicAssistant, + manifest: ProviderManifest, + config: ProviderConfig, + supported_features: set[ProviderFeature], + ) -> None: + """Initialize the Rainy Mood plugin.""" + super().__init__(mass, manifest, config, supported_features) + self._active_players: set[str] = set() + self._rain_buffers: dict[str, RainBuffer] = {} + self._unregister_handles: list[Callable[[], None]] = [] + + # ------------------------------------------------------------------ + # Provider lifecycle + # ------------------------------------------------------------------ + + async def loaded_in_mass(self) -> None: + """Call after the provider has been loaded.""" + self._unregister_handles.append( + self.mass.register_api_command("rain_mood/enable", self._enable) + ) + self._unregister_handles.append( + self.mass.register_api_command("rain_mood/disable", self._disable) + ) + self._unregister_handles.append( + self.mass.register_api_command("rain_mood/status", self._status) + ) + self._unregister_handles.append( + self.mass.subscribe(self._on_queue_updated, EventType.QUEUE_UPDATED) + ) + + async def unload(self, is_removed: bool = False) -> None: + """Handle unload/close of the provider.""" + for handle in self._unregister_handles: + handle() + self._unregister_handles.clear() + for buf in self._rain_buffers.values(): + await buf.stop() + self._rain_buffers.clear() + self._active_players.clear() + + # ------------------------------------------------------------------ + # AUDIO_OVERLAY interface (called by the streams controller) + # ------------------------------------------------------------------ + + def is_overlay_active(self, player_id: str) -> bool: + """ + Return whether the rain overlay is currently active for this player. + + :param player_id: The player to check. + """ + return player_id in self._active_players + + async def get_overlay_stream( + self, + player_id: str, + pcm_format: AudioFormat, + ) -> AsyncGenerator[bytes, None] | None: + """ + Return a volume-adjusted PCM rain stream matching pcm_format. + + :param player_id: The player for which the overlay is requested. + :param pcm_format: The PCM format the overlay must be produced in. + :returns: Async generator of raw PCM bytes, or None if overlay is not active. + """ + if player_id not in self._active_players: + return None + buf = self._rain_buffers.get(player_id) + if buf is None: + return None + await buf.ensure_format(pcm_format) + rain_vol = float(cast("int", self.config.get_value(CONF_RAIN_RATIO))) / 100.0 + return buf.scaled_stream(rain_vol, pcm_format) + + # ------------------------------------------------------------------ + # Event handlers + # ------------------------------------------------------------------ + + async def _on_queue_updated(self, event: MassEvent) -> None: + """Auto-disable when the queue finishes playing.""" + queue = event.data + if queue.queue_id not in self._active_players: + return + if queue.items == 0 and queue.state == PlaybackState.IDLE: + self._active_players.discard(queue.queue_id) + await self._stop_rain_buffer(queue.queue_id) + self.logger.info("Rainy Mood auto-disabled for player %s (queue ended)", queue.queue_id) + + # ------------------------------------------------------------------ + # API commands (called from the frontend) + # ------------------------------------------------------------------ + + async def _enable(self, player_id: str) -> dict[str, Any]: + """ + Enable rain overlay for the given player. + + :param player_id: The player on which to activate rain. + :returns: Status dict. + """ + if player_id in self._active_players: + return {"active": True} + self._active_players.add(player_id) + self._rain_buffers[player_id] = RainBuffer() + self.logger.info("Rainy Mood enabled for player %s", player_id) + queue = self.mass.player_queues.get(player_id) + if queue and queue.state == PlaybackState.PLAYING: + await self._restart_stream(player_id) + return {"active": True} + + async def _disable(self, player_id: str) -> dict[str, Any]: + """ + Disable rain overlay for the given player. + + :param player_id: The player on which to deactivate rain. + :returns: Status dict. + """ + if player_id not in self._active_players: + return {"active": False} + self._active_players.discard(player_id) + await self._stop_rain_buffer(player_id) + self.logger.info("Rainy Mood disabled for player %s", player_id) + queue = self.mass.player_queues.get(player_id) + if queue and queue.state == PlaybackState.PLAYING: + await self._restart_stream(player_id) + return {"active": False} + + async def _stop_rain_buffer(self, player_id: str) -> None: + buf = self._rain_buffers.pop(player_id, None) + if buf: + await buf.stop() + + async def _restart_stream(self, player_id: str) -> None: + """Resume playback from the current position to force a stream re-request.""" + try: + await self.mass.player_queues.resume(player_id) + except Exception as err: + self.logger.debug("Could not restart stream for %s: %s", player_id, err) + + async def _status(self, player_id: str) -> dict[str, Any]: + """ + Return whether rain is currently active for the given player. + + :param player_id: The player to query. + :returns: Status dict with 'active' bool. + """ + return {"active": player_id in self._active_players} diff --git a/music_assistant/providers/rain_mood/manifest.json b/music_assistant/providers/rain_mood/manifest.json new file mode 100644 index 0000000000..d96009c39a --- /dev/null +++ b/music_assistant/providers/rain_mood/manifest.json @@ -0,0 +1,12 @@ +{ + "type": "plugin", + "stage": "experimental", + "domain": "rain_mood", + "name": "Rainy Mood", + "description": "Mix ambient rain sounds from rainymood.com with your currently playing music.", + "codeowners": [], + "requirements": [], + "multi_instance": false, + "builtin": false, + "icon": "weather-rainy" +}