-
Notifications
You must be signed in to change notification settings - Fork 134
Add Opus decoder #811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Opus decoder #811
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| use crate as hang; | ||
| use anyhow::Context; | ||
| use buf_list::BufList; | ||
| use bytes::Buf; | ||
| use moq_lite as moq; | ||
|
|
||
| /// Opus decoder, initialized via OpusHead. | ||
| pub struct Opus { | ||
| broadcast: hang::BroadcastProducer, | ||
| track: Option<hang::TrackProducer>, | ||
| zero: Option<tokio::time::Instant>, | ||
| } | ||
|
|
||
| impl Opus { | ||
| pub fn new(broadcast: hang::BroadcastProducer) -> Self { | ||
| Self { | ||
| broadcast, | ||
| track: None, | ||
| zero: None, | ||
| } | ||
| } | ||
|
|
||
| pub fn initialize<T: Buf>(&mut self, buf: &mut T) -> anyhow::Result<()> { | ||
| // Parse OpusHead (https://datatracker.ietf.org/doc/html/rfc7845#section-5.1) | ||
| // - Verifies "OpusHead" magic signature | ||
| // - Reads channel count | ||
| // - Reads sample rate | ||
| // - Ignores pre-skip, gain, channel mapping for now | ||
|
|
||
| anyhow::ensure!(buf.remaining() >= 19, "OpusHead must be at least 19 bytes"); | ||
| const OPUS_HEAD: u64 = u64::from_be_bytes(*b"OpusHead"); | ||
| let signature = buf.get_u64(); | ||
| anyhow::ensure!(signature == OPUS_HEAD, "invalid OpusHead signature"); | ||
|
|
||
| buf.advance(1); // Skip version | ||
| let channel_count = buf.get_u8() as u32; | ||
| buf.advance(2); // Skip pre-skip (lol) | ||
| let sample_rate = buf.get_u32_le(); | ||
|
|
||
| // Skip gain, channel mapping until if/when we support them | ||
| if buf.remaining() > 0 { | ||
| buf.advance(buf.remaining()); | ||
| } | ||
|
|
||
| let track = moq::Track { | ||
| name: self.broadcast.track_name("audio"), | ||
| priority: 2, | ||
| }; | ||
|
|
||
| let config = hang::catalog::AudioConfig { | ||
| codec: hang::catalog::AudioCodec::Opus, | ||
| sample_rate, | ||
| channel_count, | ||
| bitrate: None, | ||
| description: None, | ||
| }; | ||
|
|
||
| tracing::debug!(name = ?track.name, ?config, "starting track"); | ||
|
|
||
| let track = track.produce(); | ||
| self.broadcast.insert_track(track.consumer); | ||
|
|
||
| let mut catalog = self.broadcast.catalog.lock(); | ||
| let audio = catalog.insert_audio(track.producer.info.name.clone(), config); | ||
| audio.priority = 2; | ||
|
|
||
| self.track = Some(track.producer.into()); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| pub fn decode<T: Buf>(&mut self, buf: &mut T, pts: Option<hang::Timestamp>) -> anyhow::Result<()> { | ||
| let pts = self.pts(pts)?; | ||
| let track = self.track.as_mut().context("not initialized")?; | ||
|
|
||
| // Create a BufList at chunk boundaries, potentially avoiding allocations. | ||
| let mut payload = BufList::new(); | ||
| while !buf.chunk().is_empty() { | ||
| payload.push_chunk(buf.copy_to_bytes(buf.chunk().len())); | ||
| } | ||
|
|
||
| let frame = hang::Frame { | ||
| timestamp: pts, | ||
| keyframe: true, | ||
| payload, | ||
| }; | ||
|
|
||
| track.write(frame)?; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| pub fn is_initialized(&self) -> bool { | ||
| self.track.is_some() | ||
| } | ||
|
|
||
| fn pts(&mut self, hint: Option<hang::Timestamp>) -> anyhow::Result<hang::Timestamp> { | ||
| if let Some(pts) = hint { | ||
| return Ok(pts); | ||
| } | ||
|
|
||
| let zero = self.zero.get_or_insert_with(tokio::time::Instant::now); | ||
| Ok(hang::Timestamp::from_micros(zero.elapsed().as_micros() as u64)?) | ||
| } | ||
|
Comment on lines
+97
to
+104
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Timestamp truncation could silently wrap after ~584k years. Line 103 casts 🔎 Suggested fix fn pts(&mut self, hint: Option<hang::Timestamp>) -> anyhow::Result<hang::Timestamp> {
if let Some(pts) = hint {
return Ok(pts);
}
let zero = self.zero.get_or_insert_with(tokio::time::Instant::now);
- Ok(hang::Timestamp::from_micros(zero.elapsed().as_micros() as u64)?)
+ let micros: u64 = zero.elapsed().as_micros().try_into()
+ .context("timestamp overflow")?;
+ Ok(hang::Timestamp::from_micros(micros)?)
}🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| impl Drop for Opus { | ||
| fn drop(&mut self) { | ||
| if let Some(track) = self.track.take() { | ||
| tracing::debug!(name = ?track.info.name, "ending track"); | ||
| self.broadcast.catalog.lock().remove_audio(&track.info.name); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option are the
try_get_xxxmethods inbytes::Buf. I really don't like how the default behavior is to panic; it's not like it's faster anyway because it still performs the range check.