Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions rs/hang/src/import/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fmt, str::FromStr};

use bytes::Buf;

use crate::{self as hang, import::Aac, Error};
use crate::{self as hang, import::Aac, import::Opus, Error};

use super::{Avc3, Fmp4};

Expand All @@ -14,6 +14,8 @@ pub enum DecoderFormat {
Fmp4,
/// Raw AAC frames (not ADTS).
Aac,
/// Raw Opus frames (not Ogg).
Opus,
}

impl FromStr for DecoderFormat {
Expand All @@ -28,6 +30,7 @@ impl FromStr for DecoderFormat {
}
"fmp4" | "cmaf" => Ok(DecoderFormat::Fmp4),
"aac" => Ok(DecoderFormat::Aac),
"opus" => Ok(DecoderFormat::Opus),
_ => Err(Error::UnknownFormat(s.to_string())),
}
}
Expand All @@ -39,6 +42,7 @@ impl fmt::Display for DecoderFormat {
DecoderFormat::Avc3 => write!(f, "avc3"),
DecoderFormat::Fmp4 => write!(f, "fmp4"),
DecoderFormat::Aac => write!(f, "aac"),
DecoderFormat::Opus => write!(f, "opus"),
}
}
}
Expand All @@ -50,6 +54,7 @@ enum DecoderKind {
// Boxed because it's a large struct and clippy complains about the size.
Fmp4(Box<Fmp4>),
Aac(Aac),
Opus(Opus),
}

/// A generic interface for importing a stream of media into a hang broadcast.
Expand All @@ -67,6 +72,7 @@ impl Decoder {
DecoderFormat::Avc3 => Avc3::new(broadcast).into(),
DecoderFormat::Fmp4 => Box::new(Fmp4::new(broadcast)).into(),
DecoderFormat::Aac => Aac::new(broadcast).into(),
DecoderFormat::Opus => Opus::new(broadcast).into(),
};

Self { decoder }
Expand All @@ -83,6 +89,7 @@ impl Decoder {
DecoderKind::Avc3(decoder) => decoder.initialize(buf)?,
DecoderKind::Fmp4(decoder) => decoder.decode(buf)?,
DecoderKind::Aac(decoder) => decoder.initialize(buf)?,
DecoderKind::Opus(decoder) => decoder.initialize(buf)?,
}

anyhow::ensure!(!buf.has_remaining(), "buffer was not fully consumed");
Expand All @@ -107,8 +114,9 @@ impl Decoder {
match &mut self.decoder {
DecoderKind::Avc3(decoder) => decoder.decode_stream(buf, None)?,
DecoderKind::Fmp4(decoder) => decoder.decode(buf)?,
// TODO Fix or make this more type safe.
// TODO Fix or make these more type safe.
DecoderKind::Aac(_) => anyhow::bail!("AAC does not support stream decoding"),
DecoderKind::Opus(_) => anyhow::bail!("Opus does not support stream decoding"),
}

Ok(())
Expand All @@ -133,6 +141,7 @@ impl Decoder {
DecoderKind::Avc3(decoder) => decoder.decode_frame(buf, pts)?,
DecoderKind::Fmp4(decoder) => decoder.decode(buf)?,
DecoderKind::Aac(decoder) => decoder.decode(buf, pts)?,
DecoderKind::Opus(decoder) => decoder.decode(buf, pts)?,
}

Ok(())
Expand All @@ -144,6 +153,7 @@ impl Decoder {
DecoderKind::Avc3(decoder) => decoder.is_initialized(),
DecoderKind::Fmp4(decoder) => decoder.is_initialized(),
DecoderKind::Aac(decoder) => decoder.is_initialized(),
DecoderKind::Opus(decoder) => decoder.is_initialized(),
}
}
}
2 changes: 2 additions & 0 deletions rs/hang/src/import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ mod avc3;
mod decoder;
mod fmp4;
mod hls;
mod opus;

pub use aac::*;
pub use avc3::*;
pub use decoder::*;
pub use fmp4::*;
pub use hls::*;
pub use opus::*;
114 changes: 114 additions & 0 deletions rs/hang/src/import/opus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use crate as hang;
use anyhow::Context;
use buf_list::BufList;
use bytes::Buf;
use moq_lite as moq;

/// Opus decoder, initialized via OpusHead.
pub struct Opus {
broadcast: hang::BroadcastProducer,
track: Option<hang::TrackProducer>,
zero: Option<tokio::time::Instant>,
}

impl Opus {
pub fn new(broadcast: hang::BroadcastProducer) -> Self {
Self {
broadcast,
track: None,
zero: None,
}
}

pub fn initialize<T: Buf>(&mut self, buf: &mut T) -> anyhow::Result<()> {
// Parse OpusHead (https://datatracker.ietf.org/doc/html/rfc7845#section-5.1)
// - Verifies "OpusHead" magic signature
// - Reads channel count
// - Reads sample rate
// - Ignores pre-skip, gain, channel mapping for now

anyhow::ensure!(buf.remaining() >= 19, "OpusHead must be at least 19 bytes");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option are thetry_get_xxx methods in bytes::Buf. I really don't like how the default behavior is to panic; it's not like it's faster anyway because it still performs the range check.

const OPUS_HEAD: u64 = u64::from_be_bytes(*b"OpusHead");
let signature = buf.get_u64();
anyhow::ensure!(signature == OPUS_HEAD, "invalid OpusHead signature");

buf.advance(1); // Skip version
let channel_count = buf.get_u8() as u32;
buf.advance(2); // Skip pre-skip (lol)
let sample_rate = buf.get_u32_le();

// Skip gain, channel mapping until if/when we support them
if buf.remaining() > 0 {
buf.advance(buf.remaining());
}

let track = moq::Track {
name: self.broadcast.track_name("audio"),
priority: 2,
};

let config = hang::catalog::AudioConfig {
codec: hang::catalog::AudioCodec::Opus,
sample_rate,
channel_count,
bitrate: None,
description: None,
};

tracing::debug!(name = ?track.name, ?config, "starting track");

let track = track.produce();
self.broadcast.insert_track(track.consumer);

let mut catalog = self.broadcast.catalog.lock();
let audio = catalog.insert_audio(track.producer.info.name.clone(), config);
audio.priority = 2;

self.track = Some(track.producer.into());

Ok(())
}

pub fn decode<T: Buf>(&mut self, buf: &mut T, pts: Option<hang::Timestamp>) -> anyhow::Result<()> {
let pts = self.pts(pts)?;
let track = self.track.as_mut().context("not initialized")?;

// Create a BufList at chunk boundaries, potentially avoiding allocations.
let mut payload = BufList::new();
while !buf.chunk().is_empty() {
payload.push_chunk(buf.copy_to_bytes(buf.chunk().len()));
}

let frame = hang::Frame {
timestamp: pts,
keyframe: true,
payload,
};

track.write(frame)?;

Ok(())
}

pub fn is_initialized(&self) -> bool {
self.track.is_some()
}

fn pts(&mut self, hint: Option<hang::Timestamp>) -> anyhow::Result<hang::Timestamp> {
if let Some(pts) = hint {
return Ok(pts);
}

let zero = self.zero.get_or_insert_with(tokio::time::Instant::now);
Ok(hang::Timestamp::from_micros(zero.elapsed().as_micros() as u64)?)
}
Comment on lines +97 to +104
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Timestamp truncation could silently wrap after ~584k years.

Line 103 casts as_micros() (u128) to u64 before calling from_micros, which silently truncates overflow. While not realistic in practice, using .try_into()? would properly propagate overflow errors.

🔎 Suggested fix
 	fn pts(&mut self, hint: Option<hang::Timestamp>) -> anyhow::Result<hang::Timestamp> {
 		if let Some(pts) = hint {
 			return Ok(pts);
 		}
 
 		let zero = self.zero.get_or_insert_with(tokio::time::Instant::now);
-		Ok(hang::Timestamp::from_micros(zero.elapsed().as_micros() as u64)?)
+		let micros: u64 = zero.elapsed().as_micros().try_into()
+			.context("timestamp overflow")?;
+		Ok(hang::Timestamp::from_micros(micros)?)
 	}
🤖 Prompt for AI Agents
In rs/hang/src/import/opus.rs around lines 97 to 104, the code casts
zero.elapsed().as_micros() (u128) to u64 using as which silently truncates on
overflow; replace the direct cast with a fallible conversion (e.g.
.as_micros().try_into()?), then pass the resulting u64 into
hang::Timestamp::from_micros so any overflow becomes a propagated error rather
than silent truncation.

}

impl Drop for Opus {
fn drop(&mut self) {
if let Some(track) = self.track.take() {
tracing::debug!(name = ?track.info.name, "ending track");
self.broadcast.catalog.lock().remove_audio(&track.info.name);
}
}
}
Loading