From e578b757e59a074f38ea4f7c243f767bb8a35374 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 2 May 2023 11:05:05 -0700 Subject: [PATCH] wip --- server/Cargo.lock | 3 +- server/Cargo.toml | 2 +- server/src/media/source.rs | 160 +++++++++++++++++++++++++-------- server/src/transport/server.rs | 12 ++- 4 files changed, 135 insertions(+), 42 deletions(-) diff --git a/server/Cargo.lock b/server/Cargo.lock index 00d51d5..b470484 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -329,8 +329,7 @@ dependencies = [ [[package]] name = "mp4" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "509348cba250e7b852a875100a2ddce7a36ee3abf881a681c756670c1774264d" +source = "git+https://github.com/kixelated/mp4-rust.git?branch=trexs#efefcc47353f477518bff01493785ae0daa8efd4" dependencies = [ "byteorder", "bytes", diff --git a/server/Cargo.toml b/server/Cargo.toml index b397086..660ab93 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -13,6 +13,6 @@ mio = { version = "0.8", features = ["net", "os-poll"] } env_logger = "0.9.3" ring = "0.16" anyhow = "1.0.70" -mp4 = "0.13.0" +mp4 = { git = "https://github.com/kixelated/mp4-rust.git", branch = "trexs" } serde = "1.0.160" serde_json = "1.0" \ No newline at end of file diff --git a/server/src/media/source.rs b/server/src/media/source.rs index 6566f6a..0a7d019 100644 --- a/server/src/media/source.rs +++ b/server/src/media/source.rs @@ -1,24 +1,46 @@ use std::{io,fs,time}; use io::Read; +use std::collections::{VecDeque}; + +use std::io::Write; use mp4; use anyhow; -use mp4::ReadBox; +use mp4::{ReadBox,WriteBox}; pub struct Source { + // We read the file once, in order, and don't seek backwards. reader: io::BufReader, - pending: Option, + // Any fragments parsed and ready to be returned by next(). + fragments: VecDeque, + + // The timestamp when the broadcast "started", so we can sleep to simulate a live stream. start: time::Instant, - timescale: Option, + + // The raw ftyp box, which we need duplicate for each track, but we don't know how many tracks exist yet. + ftyp: Vec, + + // The parsed moov box, so we can look up track information later. + moov: Option, } pub struct Fragment { + // The track ID for the fragment. + pub track: u32, + + // The type of the fragment. pub typ: mp4::BoxType, + + // The data of the fragment. pub data: Vec, + + // Whether this fragment is a keyframe. pub keyframe: bool, - pub timestamp: Option, // only used to simulate a live stream + + // The timestamp of the fragment, in milliseconds, to simulate a live stream. + pub timestamp: Option } impl Source { @@ -30,61 +52,123 @@ impl Source { Ok(Self{ reader, start, - pending: None, - timescale: None, + fragments: VecDeque::new(), + ftyp: Vec::new(), + moov: None, }) } pub fn next(&mut self) -> anyhow::Result> { - if self.pending.is_none() { - self.pending = Some(self.next_inner()?); + if self.fragments.is_empty() { + self.parse()?; }; if self.timeout().is_some() { return Ok(None) } - let pending = self.pending.take(); - Ok(pending) + Ok(self.fragments.pop_front()) } - fn next_inner(&mut self) -> anyhow::Result { - // Read the next full atom. - let atom = read_box(&mut self.reader)?; - let mut timestamp = None; - let mut keyframe = false; + fn parse(&mut self) -> anyhow::Result<()> { + loop { + // Read the next full atom. + let atom = read_box(&mut self.reader)?; - // Before we return it, let's do some simple parsing. - let mut reader = io::Cursor::new(&atom); - let header = mp4::BoxHeader::read(&mut reader)?; + // Before we return it, let's do some simple parsing. + let mut reader = io::Cursor::new(&atom); + let header = mp4::BoxHeader::read(&mut reader)?; - match header.name { - mp4::BoxType::MoovBox => { - // We need to parse the moov to get the timescale. - let moov = mp4::MoovBox::read_box(&mut reader, header.size)?; - self.timescale = Some(moov.traks[0].mdia.mdhd.timescale.into()); - }, - mp4::BoxType::MoofBox => { - let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; + match header.name { + mp4::BoxType::FtypBox => { + // Don't return anything until we know the total number of tracks. + // To be honest, I didn't expect the borrow checker to allow this, but it does! + self.ftyp = atom; + }, + mp4::BoxType::MoovBox => { + // We need to split the moov based on the tracks. + let moov = mp4::MoovBox::read_box(&mut reader, header.size)?; - keyframe = has_keyframe(&moof); - timestamp = first_timestamp(&moof); + for trak in &moov.traks { + let track_id = trak.tkhd.track_id; + + // Push the styp atom for each track. + self.fragments.push_back(Fragment { + track: track_id, + typ: mp4::BoxType::FtypBox, + data: self.ftyp.clone(), + keyframe: false, + timestamp: None, + }); + + // Unfortunately, we need to create a brand new moov atom for each track. + // We remove every box for other track IDs. + let mut toov = moov.clone(); + toov.traks.retain(|t| t.tkhd.track_id == track_id); + toov.mvex.as_mut().expect("missing mvex").trexs.retain(|f| f.track_id == track_id); + + // Marshal the box. + let mut toov_data = Vec::new(); + toov.write_box(&mut toov_data)?; + + let mut file = std::fs::File::create(format!("track{}.mp4", track_id))?; + file.write_all(toov_data.as_slice()); + + self.fragments.push_back(Fragment { + track: track_id, + typ: mp4::BoxType::MoovBox, + data: toov_data, + keyframe: false, + timestamp: None, + }); + } + + self.moov = Some(moov); + }, + mp4::BoxType::MoofBox => { + let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; + + if moof.trafs.len() != 1 { + // We can't split the mdat atom, so this is impossible to support + anyhow::bail!("multiple tracks per moof atom") + } + + self.fragments.push_back(Fragment{ + track: moof.trafs[0].tfhd.track_id, + typ: mp4::BoxType::MoofBox, + data: atom, + keyframe: has_keyframe(&moof), + timestamp: first_timestamp(&moof), + }) + }, + mp4::BoxType::MdatBox => { + let moof = self.fragments.back().expect("no atom before mdat"); + assert!(moof.typ == mp4::BoxType::MoofBox, "no moof before mdat"); + + self.fragments.push_back(Fragment{ + track: moof.track, + typ: mp4::BoxType::MoofBox, + data: atom, + keyframe: false, + timestamp: None, + }); + + // We have some media data, return so we can start sending it. + return Ok(()) + }, + _ => anyhow::bail!("unknown top-level atom: {:?}", header.name), } - _ => {}, } - - Ok(Fragment { - typ: header.name, - data: atom, - keyframe, - timestamp, - }) } // Simulate a live stream by sleeping until the next timestamp in the media. pub fn timeout(&self) -> Option { - let timestamp = self.pending.as_ref()?.timestamp?; - let timescale = self.timescale?; + let next = self.fragments.front()?; + let timestamp = next.timestamp?; + + // Find the timescale for the track. + let track = self.moov.as_ref()?.traks.iter().find(|t| t.tkhd.track_id == next.track)?; + let timescale = track.mdia.mdhd.timescale as u64; let delay = time::Duration::from_millis(1000 * timestamp / timescale); let elapsed = self.start.elapsed(); diff --git a/server/src/transport/server.rs b/server/src/transport/server.rs index bbc2a94..6c3eebc 100644 --- a/server/src/transport/server.rs +++ b/server/src/transport/server.rs @@ -84,6 +84,7 @@ impl Server { self.receive()?; self.app()?; self.send()?; + self.cleanup(); } } @@ -242,7 +243,11 @@ impl Server { pub fn app(&mut self) -> anyhow::Result<()> { for (_, conn) in &mut self.conns { if let Some(session) = &mut conn.session { - conn.app.poll(&mut conn.quiche, session)?; + if let Err(e) = conn.app.poll(&mut conn.quiche, session) { + // Close the connection on any application error + let reason = format!("app error: {:?}", e); + conn.quiche.close(true, 0xff, reason.as_bytes()).ok(); + } } } @@ -275,6 +280,11 @@ impl Server { Ok(()) } + + pub fn cleanup(&mut self) { + // Garbage collect closed connections. + self.conns.retain(|_, ref mut c| !c.quiche.is_closed() ); + } } /// Generate a stateless retry token.