Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion music_assistant/controllers/streams/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
MediaType,
PlayerFeature,
PlayerType,
ProviderFeature,
StreamType,
VolumeNormalizationMode,
)
Expand Down Expand Up @@ -87,7 +88,7 @@
resample_pcm_audio,
)
from music_assistant.helpers.dsp import filter_to_ffmpeg_params
from music_assistant.helpers.ffmpeg import FFMpeg, get_ffmpeg_stream
from music_assistant.helpers.ffmpeg import FFMpeg, get_ffmpeg_stream, mix_pcm_streams
from music_assistant.helpers.playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
from music_assistant.helpers.util import (
Expand All @@ -108,6 +109,7 @@
from music_assistant.mass import MusicAssistant
from music_assistant.models.music_provider import MusicProvider
from music_assistant.models.player import Player
from music_assistant.models.plugin import PluginProvider
from music_assistant.providers.sync_group import SyncGroupPlayer

# ruff: noqa: PLR0915
Expand Down Expand Up @@ -2251,3 +2253,47 @@ async def _update_hls_radio_metadata(

except Exception as err:
self.logger.debug("Error fetching HLS metadata: %s", err)

# ------------------------------------------------------------------
# Audio overlay support (used by Rain Mood and similar plugins)
# ------------------------------------------------------------------

def get_active_overlay_providers(self, player_id: str) -> list[PluginProvider]:
"""
Return all plugins with AUDIO_OVERLAY feature that are active for the given player.

:param player_id: The player to check.
"""
return [
prov # type: ignore[misc]
for prov in self.mass.get_providers_supporting_feature(ProviderFeature.AUDIO_OVERLAY)
if prov.is_overlay_active(player_id) # type: ignore[union-attr]
]

async def wrap_overlay_if_active(
self,
audio_input: AsyncGenerator[bytes, None],
player_id: str,
pcm_format: AudioFormat,
) -> AsyncGenerator[bytes, None]:
"""
Wrap audio_input with every active overlay plugin stream mixed in.

Calling this as an async generator function returns an AsyncGenerator immediately,
so it is safe to call from both sync and async contexts.

:param audio_input: Async generator producing raw PCM bytes.
:param player_id: The player (or queue) to check for active overlays.
:param pcm_format: The PCM format used by audio_input.
"""
overlay_gens: list[AsyncGenerator[bytes, None]] = []
for prov in self.get_active_overlay_providers(player_id):
gen = await prov.get_overlay_stream(player_id, pcm_format)
if gen is not None:
overlay_gens.append(gen)
if not overlay_gens:
async for chunk in audio_input:
yield chunk
return
async for chunk in mix_pcm_streams([audio_input, *overlay_gens], pcm_format):
yield chunk
21 changes: 16 additions & 5 deletions music_assistant/controllers/streams/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ async def serve_queue_item_stream(self, request: web.Request) -> web.StreamRespo
"float", queue_item.extra_attributes.get("playback_speed", 1.0)
),
)
audio_input = self.audio.wrap_overlay_if_active(audio_input, queue_id, pcm_format)

# stream the audio
# this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
# the desired output format for the player including any player specific filter params
Expand Down Expand Up @@ -590,7 +592,7 @@ async def serve_queue_item_stream(self, request: web.Request) -> web.StreamRespo
)
return resp

async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse:
async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse: # noqa: PLR0915
"""Stream Queue Flow audio to player."""
self._log_request(request)
queue_id = request.match_info["queue_id"]
Expand Down Expand Up @@ -656,10 +658,15 @@ async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamRespo
# such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)

flow_audio_input = self.audio.get_queue_flow_stream(
queue=queue, start_queue_item=start_queue_item, pcm_format=flow_pcm_format
)
flow_audio_input = self.audio.wrap_overlay_if_active(
flow_audio_input, queue_id, flow_pcm_format
)

async for chunk in get_ffmpeg_stream(
audio_input=self.audio.get_queue_flow_stream(
queue=queue, start_queue_item=start_queue_item, pcm_format=flow_pcm_format
),
audio_input=flow_audio_input,
input_format=flow_pcm_format,
output_format=output_format,
filter_params=self.audio.get_player_filter_params(
Expand Down Expand Up @@ -955,19 +962,23 @@ def get_stream(
flow_stream = self.audio.get_queue_flow_stream(
queue=queue, start_queue_item=start_queue_item, pcm_format=pcm_format
)
# Use queue_id (the queue owner) not player_id (which may be a transport bridge).
flow_stream = self.audio.wrap_overlay_if_active(flow_stream, queue_id, pcm_format)
if use_flow_stream_buffering:
return buffered(flow_stream, buffer_size=30, min_buffer_before_yield=1)
return flow_stream
# single item stream (e.g. radio or non-flow mode)
queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
assert queue_item
return self.audio.get_queue_item_stream(
item_stream = self.audio.get_queue_item_stream(
queue_item=queue_item,
pcm_format=pcm_format,
playback_speed=cast(
"float", queue_item.extra_attributes.get("playback_speed", 1.0)
),
)
# Use queue_id (the queue owner) not player_id (which may be a transport bridge).
return self.audio.wrap_overlay_if_active(item_stream, queue_id, pcm_format)
# assume url or some other direct path
# NOTE: this will fail if its an uri not playable by ffmpeg
return get_ffmpeg_stream(
Expand Down
135 changes: 135 additions & 0 deletions music_assistant/helpers/ffmpeg.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import logging
import os
import time
from collections import deque
from collections.abc import AsyncGenerator
Expand Down Expand Up @@ -247,6 +248,140 @@ async def get_ffmpeg_stream(
raise AudioError(log_tail)


async def mix_pcm_streams( # noqa: PLR0915
inputs: list[AsyncGenerator[bytes, None]],
pcm_format: AudioFormat,
) -> AsyncGenerator[bytes, None]:
"""
Mix N PCM async generators of identical format using ffmpeg's amix filter.

All inputs must produce raw PCM bytes in pcm_format. Output is the same format.
The mix duration is bound to the first input (intended to be the music); when
the first input ends the process terminates and any remaining bytes on the
other inputs are discarded.

:param inputs: List of PCM async generators. First entry drives the duration.
:param pcm_format: PCM format shared by all inputs and the output.
"""
if not inputs:
return
if len(inputs) == 1:
async for chunk in inputs[0]:
yield chunk
return

fmt = pcm_format.content_type.value
sample_rate = str(pcm_format.sample_rate)
channels = str(pcm_format.channels)
n_inputs = len(inputs)

# Create one OS pipe per input. Child process reads from read_fds via `-i pipe:<fd>`;
# we (parent) feed into write_fds from each async generator and close on EOF.
pipes: list[tuple[int, int]] = [os.pipe() for _ in range(n_inputs)]
read_fds = [r for r, _ in pipes]
write_fds = [w for _, w in pipes]

amix_filter = (
"".join(f"[{i}:a]" for i in range(n_inputs))
+ f"amix=inputs={n_inputs}:duration=first:normalize=0[mix]"
)

cmd: list[str] = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-nostats"]
for read_fd in read_fds:
cmd += [
"-f",
fmt,
"-ar",
sample_rate,
"-ac",
channels,
"-i",
f"pipe:{read_fd}",
]
cmd += [
"-filter_complex",
amix_filter,
"-map",
"[mix]",
"-f",
fmt,
"-ar",
sample_rate,
"-ac",
channels,
"pipe:1",
]

proc: asyncio.subprocess.Process | None = None
feeder_tasks: list[asyncio.Task[None]] = []
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
pass_fds=tuple(read_fds),
)
# The child now owns its inherited copies of read_fds; close ours.
for fd in read_fds:
with suppress(OSError):
os.close(fd)
read_fds.clear()

loop = asyncio.get_running_loop()

async def _feeder(write_fd: int, gen: AsyncGenerator[bytes, None]) -> None:
writer: asyncio.StreamWriter | None = None
try:
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
transport, _ = await loop.connect_write_pipe(
lambda: protocol, os.fdopen(write_fd, "wb")
)
writer = asyncio.StreamWriter(transport, protocol, reader, loop)
async for chunk in gen:
writer.write(chunk)
await writer.drain()
except (BrokenPipeError, ConnectionResetError, asyncio.CancelledError):
pass
finally:
if writer is not None:
with suppress(Exception):
writer.close()
else:
with suppress(OSError):
os.close(write_fd)

feeder_tasks = [
asyncio.create_task(_feeder(w, g)) for w, g in zip(write_fds, inputs, strict=True)
]
write_fds.clear() # ownership moved to feeders

assert proc.stdout is not None
while True:
chunk = await proc.stdout.read(65536)
if not chunk:
break
yield chunk
finally:
for task in feeder_tasks:
if not task.done():
task.cancel()
for task in feeder_tasks:
with suppress(asyncio.CancelledError, Exception):
await task
for fd in read_fds:
with suppress(OSError):
os.close(fd)
for fd in write_fds:
with suppress(OSError):
os.close(fd)
if proc is not None and proc.returncode is None:
with suppress(ProcessLookupError):
proc.kill()
with suppress(Exception):
await proc.wait()


def get_ffmpeg_args( # noqa: PLR0915
input_format: AudioFormat,
output_format: AudioFormat,
Expand Down
27 changes: 27 additions & 0 deletions music_assistant/models/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,33 @@ async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> lis
"""
raise NotImplementedError

def is_overlay_active(self, player_id: str) -> bool:
"""
Return whether an audio overlay is currently active for this player.

Will only be called if ProviderFeature.AUDIO_OVERLAY is declared.

:param player_id: The player to check.
"""
return False

async def get_overlay_stream(
self,
player_id: str,
pcm_format: AudioFormat,
) -> AsyncGenerator[bytes, None] | None:
"""
Return a volume-adjusted PCM overlay stream for the given player.

Will only be called if ProviderFeature.AUDIO_OVERLAY is declared.
The returned bytes must be in the same format as pcm_format.

:param player_id: The player for which the overlay is requested.
:param pcm_format: The PCM format the overlay must be produced in.
:returns: Async generator of raw PCM bytes, or None if no overlay is active.
"""
return None

async def resolve_image(self, path: str) -> str | bytes:
"""
Resolve an image from an image path.
Expand Down
Loading
Loading