diff --git a/native/audio-engine/src/audio_output.rs b/native/audio-engine/src/audio_output.rs new file mode 100644 index 00000000..ad6ab266 --- /dev/null +++ b/native/audio-engine/src/audio_output.rs @@ -0,0 +1,161 @@ +//! 跨线程安全的音频输出 +//! +//! `cpal::Stream`(以及包装它的 `rodio::OutputStream`)是 `!Send` 的—— +//! cpal 文档明确要求 Stream 的创建、持有和 drop 都在同一线程上完成 +//! (macOS CoreAudio 是真雷区,Windows WASAPI / Linux ALSA 是契约要求)。 +//! +//! 但 NAPI 的 async fn 跑在多线程 tokio runtime 上,`.await` 后 Future +//! 可能在任意 worker thread 恢复,原本通过 `unsafe impl Send` 绕过类型系统的 +//! 做法在 macOS 上是真 UB,其它平台属于"现在凑合能跑"的契约违反。 +//! +//! 本模块的做法:开一个专用 `audio-output-owner` 线程独占持有 `OutputStream`, +//! 对外只暴露 `Send` 的 `OutputStreamHandle`(rodio 文档承诺该类型跨线程安全)。 +//! Stream 在该线程上创建,在该线程上 drop,永远不会被跨线程访问。 + +use std::sync::mpsc; +use std::thread::{self, JoinHandle}; + +use anyhow::{Context, Result}; +use cpal::traits::{DeviceTrait, HostTrait}; +use rodio::{OutputStream, OutputStreamHandle}; +use tracing::{debug, warn}; + +/// 持有音频输出的跨线程句柄。`Send`,可放进 `InnerPlayer` 而不需 `unsafe impl Send`。 +/// +/// 内部专用线程独占 `OutputStream`,drop 这个结构会通过 channel 通知线程退出, +/// 线程退出时 drop `OutputStream`——确保 `cpal::Stream` 创建和销毁都在同一线程。 +/// +/// # Examples +/// +/// ```ignore +/// // 走系统默认设备 +/// let output = AudioOutput::new(None)?; +/// let sink = Sink::try_new(output.handle())?; +/// // sink 可在任意线程上使用;output 持有的 cpal::Stream 始终在专用线程上 +/// ``` +pub struct AudioOutput { + handle: OutputStreamHandle, + /// drop 这个 sender 会让 owner 线程的 recv 返回 Err,从而退出并释放 Stream + /// 包成 Option 是为了 Drop 里能 take() 出来显式 drop,从而在 join 前先关闭 channel + shutdown: Option>, + /// owner 线程句柄,Drop 时 join 等待 cpal stream 在该线程真正释放 + thread: Option>, +} + +impl AudioOutput { + /// 在专用线程上创建音频输出 + /// + /// # Arguments + /// * `device_name` - 输出设备名,`None` 走系统默认设备 + /// + /// # Errors + /// - 找不到指定设备 + /// - 无可用音频设备 + /// - 专用线程 spawn 失败 + pub fn new(device_name: Option<&str>) -> Result { + let device_name = device_name.map(String::from); + + // 把构建结果回传给调用线程;用 sync_channel 容量 1 避免发送方阻塞 + let (result_tx, result_rx) = mpsc::sync_channel::>(1); + // 调用方 drop AudioOutput 时关闭,触发 owner 线程退出 + let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(); + + let thread = thread::Builder::new() + .name("audio-output-owner".to_string()) + .spawn(move || { + debug!(device = ?device_name, "audio-output-owner: starting"); + let build_result = build_output_stream(device_name.as_deref()); + match build_result { + Ok((stream, handle)) => { + if result_tx.send(Ok(handle)).is_err() { + // 调用方已放弃接收:在本线程 drop stream 后退出 + warn!("audio-output-owner: receiver dropped before handshake"); + drop(stream); + return; + } + // 持有 stream,等待 shutdown 信号或 channel 关闭 + let _ = shutdown_rx.recv(); + debug!("audio-output-owner: shutting down, dropping cpal stream"); + drop(stream); + } + Err(err) => { + warn!(error = %err, "audio-output-owner: build_output_stream failed"); + let _ = result_tx.send(Err(err)); + } + } + }) + .context("failed to spawn audio-output-owner thread")?; + + let handle = result_rx + .recv() + .context("audio output owner thread terminated unexpectedly")??; + + Ok(Self { + handle, + shutdown: Some(shutdown_tx), + thread: Some(thread), + }) + } + + /// 借出 `OutputStreamHandle`,用于创建 `Sink` 等 + pub fn handle(&self) -> &OutputStreamHandle { + &self.handle + } +} + +impl Drop for AudioOutput { + /// 确定性释放:先 drop 发送端通知 owner 线程退出,再 join 等待 cpal stream 真正释放 + /// + /// 这样 `set_output_device` 等场景里新旧 stream 不会重叠占用设备, + /// 在 macOS / Linux 上避免 "device busy" 风险 + fn drop(&mut self) { + // 先 drop sender 让 owner 线程的 shutdown_rx.recv() 返回 Err 退出 + drop(self.shutdown.take()); + if let Some(thread) = self.thread.take() { + // 忽略 join 错误:owner 线程已经在 stream drop 时尽力清理过了 + let _ = thread.join(); + } + } +} + +/// 构建 cpal/rodio 输出流;**仅在 `audio-output-owner` 线程内调用**, +/// 保证 `OutputStream` 的创建、持有和 drop 都发生在同一线程上 +fn build_output_stream(device_name: Option<&str>) -> Result<(OutputStream, OutputStreamHandle)> { + match device_name { + Some(name) => { + let host = cpal::default_host(); + let device = host + .output_devices() + .context("Failed to enumerate output devices")? + .find(|d| d.name().map(|got| got == name).unwrap_or(false)) + .with_context(|| format!("Output device '{}' not found", name))?; + OutputStream::try_from_device(&device).context("Failed to open named output device") + } + None => OutputStream::try_default().context("Failed to open default output device"), + } +} + +/// 枚举所有输出设备,返回 `(name, is_default)` 列表 +/// 纯查询,不涉及 `!Send` 状态,调用方任意线程都能用 +pub fn list_output_devices() -> Vec<(String, bool)> { + let host = cpal::default_host(); + let default_name = host.default_output_device().and_then(|d| d.name().ok()); + host.output_devices() + .map(|devices| { + devices + .filter_map(|device| { + let name = device.name().ok()?; + let is_default = default_name.as_ref() == Some(&name); + Some((name, is_default)) + }) + .collect() + }) + .unwrap_or_default() +} + +/// 取系统默认输出设备名 +pub fn default_device_name() -> Option { + cpal::default_host() + .default_output_device() + .and_then(|d| d.name().ok()) +} diff --git a/native/audio-engine/src/lib.rs b/native/audio-engine/src/lib.rs index f8f6056f..60eeea81 100644 --- a/native/audio-engine/src/lib.rs +++ b/native/audio-engine/src/lib.rs @@ -1,6 +1,7 @@ //! FFmpeg 音频解码 + rodio 播放 + FFT 频谱分析。 //! 通过 NAPI-RS 暴露给 Node.js,作为 Electron 主进程的原生模块。 +mod audio_output; mod decoder; mod equalizer; mod fft; @@ -541,7 +542,7 @@ impl AudioPlayer { /// 获取所有音频输出设备列表 #[napi] pub fn get_output_devices(&self) -> Vec { - player::InnerPlayer::get_output_devices() + audio_output::list_output_devices() .into_iter() .map(|(name, is_default)| JsAudioDevice { name, is_default }) .collect() @@ -550,7 +551,7 @@ impl AudioPlayer { /// 获取系统默认输出设备名称 #[napi] pub fn get_default_device_name(&self) -> Option { - player::InnerPlayer::get_default_device_name() + audio_output::default_device_name() } /// 切换输出设备(传 None/undefined 使用系统默认) diff --git a/native/audio-engine/src/player.rs b/native/audio-engine/src/player.rs index c51810b2..b9e8cdbc 100644 --- a/native/audio-engine/src/player.rs +++ b/native/audio-engine/src/player.rs @@ -4,11 +4,11 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use anyhow::{Context, Result}; -use cpal::traits::{DeviceTrait, HostTrait}; use parking_lot::Mutex; -use rodio::{OutputStream, OutputStreamHandle, Sink}; +use rodio::Sink; use tracing::{debug, info}; +use crate::audio_output::AudioOutput; use crate::decoder; use crate::equalizer::{Equalizer, EQ_BAND_COUNT}; use crate::fft::FftAnalyzer; @@ -65,8 +65,9 @@ pub enum PlayerState { /// 内部播放器,管理音频输出、解码和状态 pub struct InnerPlayer { - stream: Option, - stream_handle: Option, + /// 跨线程安全的音频输出包装:内部专用线程独占 cpal::Stream, + /// 此处只持有 Send 的 OutputStreamHandle,保证 InnerPlayer 整体是 Send 的 + output: Option, /// 使用 Arc 包装,允许 fade 线程在 Mutex 外操作音量 sink: Option>, shared: Option>, @@ -157,12 +158,12 @@ pub struct SeekTake { pub was_playing: bool, } -// SAFETY: InnerPlayer 持有 cpal::Stream(!Send),但 NAPI async fn 要求 -// `&AudioPlayer: Send` → `Arc>: Sync` → `InnerPlayer: Send`。 -// 这里仅为类型系统占位。**运行时严格保证 InnerPlayer 不跨线程访问**: -// - lib.rs async load/seek 中 spawn_blocking 闭包不持有 inner 引用,仅持有纯数据 -// - 所有 inner.lock() 都在主线程上执行,cpal Stream 不会跨线程被 mutate -unsafe impl Send for InnerPlayer {} +/// 编译期保证 `InnerPlayer: Send`:cpal::Stream(!Send)已通过 AudioOutput 隔离到专用线程, +/// 此处不再需要 `unsafe impl Send`。如果未来有人加了 !Send 字段,这条断言会编译失败提醒。 +const _: fn() = || { + fn assert_send() {} + assert_send::(); +}; /// 等待线程退出,最多等 timeout_ms 毫秒,超时则放弃 fn join_with_timeout(handle: JoinHandle<()>, timeout_ms: u64) { @@ -174,45 +175,29 @@ fn join_with_timeout(handle: JoinHandle<()>, timeout_ms: u64) { } impl InnerPlayer { - /// 确保音频输出已就绪,未初始化或设备丢失时重新打开 - fn ensure_output(&mut self) -> Result<&OutputStreamHandle> { - if self.stream_handle.is_none() { - let (stream, handle) = Self::build_output_stream(self.selected_device_name.as_ref())?; - self.stream = Some(stream); - self.stream_handle = Some(handle); + /// 未初始化时通过 `AudioOutput::new` 懒构造音频输出,返回当前输出。 + /// 设备失效时的重建由 `reinit_output` 显式处理,不在此函数内自动恢复。 + fn ensure_output(&mut self) -> Result<&AudioOutput> { + if self.output.is_none() { + self.output = Some(AudioOutput::new(self.selected_device_name.as_deref())?); } - Ok(self.stream_handle.as_ref().unwrap()) - } - - /// 根据选择的设备名称构建音频输出流 - fn build_output_stream( - device_name: Option<&String>, - ) -> Result<(OutputStream, OutputStreamHandle)> { - match device_name { - Some(name) => { - let host = cpal::default_host(); - let device = host - .output_devices() - .context("Failed to enumerate output devices")? - .find(|d| d.name().map(|n| n == *name).unwrap_or(false)) - .with_context(|| format!("Output device '{}' not found", name))?; - OutputStream::try_from_device(&device).context("Failed to open named output device") - } - None => OutputStream::try_default().context("Failed to open default output device"), + // None 分支不可达:上面分支若进入则 output 已被赋为 Some,否则进入这里之前就是 Some + // 仍写成 match:在 NLL 限制下既能让借用检查通过,又规避非测试代码 unwrap/expect + match self.output { + Some(ref output) => Ok(output), + None => Err(anyhow::anyhow!( + "ensure_output post-condition violated: output should be Some" + )), } } pub fn new() -> Result { // 延迟初始化:构造时不要求有音频设备,load 时再打开 - let (stream, stream_handle) = match Self::build_output_stream(None) { - Ok(s) => (Some(s.0), Some(s.1)), - Err(_) => (None, None), - }; + let output = AudioOutput::new(None).ok(); debug!("InnerPlayer 已创建"); Ok(Self { - stream, - stream_handle, + output, sink: None, shared: None, decoder_thread: None, @@ -246,31 +231,6 @@ impl InnerPlayer { }) } - /// 获取所有输出设备列表 - pub fn get_output_devices() -> Vec<(String, bool)> { - let host = cpal::default_host(); - let default_name = host.default_output_device().and_then(|d| d.name().ok()); - - host.output_devices() - .map(|devices| { - devices - .filter_map(|d| { - let name = d.name().ok()?; - let is_default = default_name.as_ref() == Some(&name); - Some((name, is_default)) - }) - .collect() - }) - .unwrap_or_default() - } - - /// 获取系统默认输出设备名称 - pub fn get_default_device_name() -> Option { - cpal::default_host() - .default_output_device() - .and_then(|d| d.name().ok()) - } - /// 切换输出设备(None = 系统默认) pub fn set_output_device(&mut self, device_name: Option) -> Result<()> { info!(device = ?device_name, "切换输出设备"); @@ -444,9 +404,11 @@ impl InnerPlayer { self.fft_enabled.load(Ordering::Relaxed) } - /// 重新初始化音频输出设备(系统休眠唤醒后调用,恢复失效的 OutputStream 句柄) + /// 重新初始化音频输出设备(系统休眠唤醒、设备热拔插等场景调用) /// - /// 保存当前播放状态(来源、位置、音量),重建音频输出后自动恢复: + /// 通过赋值新的 `AudioOutput` 触发旧 `AudioOutput` 的 `Drop`:旧 owner 线程 + /// 退出 + 旧 `cpal::Stream` 在该线程内释放,再创建新 `AudioOutput`(新 owner 线程)。 + /// 保存当前播放状态(来源、位置、音量),重建后自动恢复: /// - Playing → seek 到原位置继续播放 /// - Paused → seek 到原位置并暂停 /// - 其他状态 → 仅重建输出,不恢复播放 @@ -462,9 +424,8 @@ impl InnerPlayer { self.stop_internal(); // 重建音频输出(使用用户选择的设备或系统默认) - let (stream, stream_handle) = Self::build_output_stream(self.selected_device_name.as_ref())?; - self.stream = Some(stream); - self.stream_handle = Some(stream_handle); + // 旧 AudioOutput 在赋值时被 drop,其 owner 线程随之退出并 drop 旧 cpal::Stream + self.output = Some(AudioOutput::new(self.selected_device_name.as_deref())?); // 恢复播放状态 match prev_state { @@ -555,7 +516,6 @@ impl InnerPlayer { (old_threads, token) } - /// 给 lib.rs async seek 用:原子发出停止信号 + take 所有旧线程 handle(不 join) pub fn take_for_async_seek(&mut self) -> SeekTake { if let Some(flag) = self.fade_cancel.take() { @@ -608,8 +568,10 @@ impl InnerPlayer { shared: Arc, handle: JoinHandle, ) -> Result<()> { - let stream_handle = self.ensure_output()?; - let sink = Arc::new(Sink::try_new(stream_handle).context("Failed to create audio sink")?); + let sink = { + let output = self.ensure_output()?; + Arc::new(Sink::try_new(output.handle()).context("Failed to create audio sink")?) + }; self.equalizer.lock().reset_state(); self.tempo.lock().reset(); @@ -676,8 +638,10 @@ impl InnerPlayer { return Ok(None); } - let stream_handle = self.ensure_output()?; - let sink = Arc::new(Sink::try_new(stream_handle).context("Failed to create audio sink")?); + let sink = { + let output = self.ensure_output()?; + Arc::new(Sink::try_new(output.handle()).context("Failed to create audio sink")?) + }; let decoder_source = DecoderSource::new( Arc::clone(&shared), @@ -740,10 +704,10 @@ impl InnerPlayer { let (mut metadata, decode_handle) = decoder::start_decode(source, Arc::clone(&shared), self.cover_cache_dir.as_deref())?; - let sink = Arc::new( - Sink::try_new(self.stream_handle.as_ref().unwrap()) - .context("Failed to create audio sink")?, - ); + let sink = { + let output = self.ensure_output()?; + Arc::new(Sink::try_new(output.handle()).context("Failed to create audio sink")?) + }; let decoder_source = DecoderSource::new( Arc::clone(&shared), @@ -928,9 +892,7 @@ impl InnerPlayer { } // 在已有上下文上 seek(不重新打开文件) - let seek_ok = decoder_data - .as_mut() - .is_some_and(|d| d.seek(position_secs)); + let seek_ok = decoder_data.as_mut().is_some_and(|d| d.seek(position_secs)); if !seek_ok { // 回收失败(线程 panic 或 seek 出错),回退到从头 load,不再递归 seek @@ -954,9 +916,10 @@ impl InnerPlayer { } let handle = decoder::resume_decode(decoder_data, Arc::clone(&shared)); - let out_handle = self.ensure_output()?; - let sink = - Arc::new(Sink::try_new(out_handle).context("Failed to create audio sink")?); + let sink = { + let output = self.ensure_output()?; + Arc::new(Sink::try_new(output.handle()).context("Failed to create audio sink")?) + }; // seek 时同样清空滤波器状态,避免不连续样本导致瞬态 self.equalizer.lock().reset_state();