diff --git a/server/src/media/source.rs b/server/src/media/source.rs index 45320b1..6566f6a 100644 --- a/server/src/media/source.rs +++ b/server/src/media/source.rs @@ -8,14 +8,17 @@ use mp4::ReadBox; pub struct Source { reader: io::BufReader, - start: time::Instant, pending: Option, + + start: time::Instant, + timescale: Option, } pub struct Fragment { + pub typ: mp4::BoxType, pub data: Vec, pub keyframe: bool, - pub timestamp: u64, // only used to simulate a live stream + pub timestamp: Option, // only used to simulate a live stream } impl Source { @@ -28,46 +31,66 @@ impl Source { reader, start, pending: None, + timescale: None, }) } pub fn next(&mut self) -> anyhow::Result> { - let pending = match self.pending.take() { - Some(f) => f, - None => self.next_inner()?, + if self.pending.is_none() { + self.pending = Some(self.next_inner()?); }; - if pending.timestamp > 0 && pending.timestamp < self.start.elapsed().as_millis() as u64 { - self.pending = Some(pending); + if self.timeout().is_some() { return Ok(None) } - Ok(Some(pending)) + let pending = self.pending.take(); + Ok(pending) } fn next_inner(&mut self) -> anyhow::Result { // Read the next full atom. let atom = read_box(&mut self.reader)?; - let mut timestamp = 0; + let mut timestamp = None; let mut keyframe = false; // Before we return it, let's do some simple parsing. let mut reader = io::Cursor::new(&atom); let header = mp4::BoxHeader::read(&mut reader)?; - if header.name == mp4::BoxType::MoofBox { - let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; + match header.name { + mp4::BoxType::MoovBox => { + // We need to parse the moov to get the timescale. + let moov = mp4::MoovBox::read_box(&mut reader, header.size)?; + self.timescale = Some(moov.traks[0].mdia.mdhd.timescale.into()); + }, + mp4::BoxType::MoofBox => { + let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; - keyframe = has_keyframe(&moof); - timestamp = first_timestamp(&moof); + keyframe = has_keyframe(&moof); + timestamp = first_timestamp(&moof); + } + _ => {}, } Ok(Fragment { + typ: header.name, data: atom, keyframe, timestamp, }) } + + // Simulate a live stream by sleeping until the next timestamp in the media. + pub fn timeout(&self) -> Option { + let timestamp = self.pending.as_ref()?.timestamp?; + let timescale = self.timescale?; + + let delay = time::Duration::from_millis(1000 * timestamp / timescale); + let elapsed = self.start.elapsed(); + + delay.checked_sub(elapsed) + } } // Read a full MP4 atom into a vector. @@ -139,16 +162,6 @@ fn has_keyframe(moof: &mp4::MoofBox) -> bool { false } -fn first_timestamp(moof: &mp4::MoofBox) -> u64 { - let traf = match moof.trafs.first() { - Some(t) => t, - None => return 0, - }; - - let tfdt = match &traf.tfdt { - Some(t) => t, - None => return 0, - }; - - tfdt.base_media_decode_time +fn first_timestamp(moof: &mp4::MoofBox) -> Option { + Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time) } diff --git a/server/src/session/message.rs b/server/src/session/message.rs index 314660a..8718b81 100644 --- a/server/src/session/message.rs +++ b/server/src/session/message.rs @@ -14,7 +14,6 @@ pub struct Init { #[derive(Serialize, Deserialize)] pub struct Segment { pub init: String, - pub timestamp: u64, } impl Message { diff --git a/server/src/session/session.rs b/server/src/session/session.rs index d694956..9edd481 100644 --- a/server/src/session/session.rs +++ b/server/src/session/session.rs @@ -6,10 +6,13 @@ use quiche::h3::webtransport; use crate::{media,transport}; use super::message; +use mp4; + #[derive(Default)] pub struct Session { media: Option, stream_id: Option, // stream ID of the current segment + styp: Option>, } impl transport::App for Session { @@ -58,7 +61,7 @@ impl transport::App for Session { } fn timeout(&self) -> Option { - None + self.media.as_ref().and_then(|m| m.timeout()) } } @@ -74,8 +77,6 @@ impl Session { None => return Ok(()), }; - log::debug!("{} {}", fragment.keyframe, fragment.timestamp); - let mut stream_id = match self.stream_id { Some(stream_id) => stream_id, None => { @@ -101,7 +102,6 @@ impl Session { let mut message = message::Message::new(); message.segment = Some(message::Segment{ init: "video".to_string(), - timestamp: fragment.timestamp, }); let data = message.serialize()?; @@ -111,18 +111,30 @@ impl Session { // TODO handle when stream is full stream_id = session.open_stream(conn, false)?; session.send_stream_data(conn, stream_id, data.as_slice())?; + + let styp = self.styp.as_ref().expect("missing ftyp mox"); + session.send_stream_data(conn, stream_id, &styp)?; } let data = fragment.data.as_slice(); // TODO check if stream is writable - session.send_stream_data(conn, stream_id, data)?; - - log::debug!("wrote {} to {}", std::str::from_utf8(&data[4..8]).unwrap(), stream_id); + let size = session.send_stream_data(conn, stream_id, data)?; + if size < data.len() { + anyhow::bail!("partial write: {} < {}", size, data.len()); + } // Save for the next fragment self.stream_id = Some(stream_id); + // Save the ftyp fragment but modify it to be a styp for furture segments. + if fragment.typ == mp4::BoxType::FtypBox { + let mut data = fragment.data; + data[4] = b's'; // ftyp to styp + + self.styp = Some(data); + } + Ok(()) } } \ No newline at end of file