wip
This commit is contained in:
parent
2b1a3adecc
commit
e578b757e5
|
@ -329,8 +329,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mp4"
|
name = "mp4"
|
||||||
version = "0.13.0"
|
version = "0.13.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/kixelated/mp4-rust.git?branch=trexs#efefcc47353f477518bff01493785ae0daa8efd4"
|
||||||
checksum = "509348cba250e7b852a875100a2ddce7a36ee3abf881a681c756670c1774264d"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
|
|
@ -13,6 +13,6 @@ mio = { version = "0.8", features = ["net", "os-poll"] }
|
||||||
env_logger = "0.9.3"
|
env_logger = "0.9.3"
|
||||||
ring = "0.16"
|
ring = "0.16"
|
||||||
anyhow = "1.0.70"
|
anyhow = "1.0.70"
|
||||||
mp4 = "0.13.0"
|
mp4 = { git = "https://github.com/kixelated/mp4-rust.git", branch = "trexs" }
|
||||||
serde = "1.0.160"
|
serde = "1.0.160"
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
|
@ -1,24 +1,46 @@
|
||||||
use std::{io,fs,time};
|
use std::{io,fs,time};
|
||||||
use io::Read;
|
use io::Read;
|
||||||
|
use std::collections::{VecDeque};
|
||||||
|
|
||||||
|
use std::io::Write;
|
||||||
|
|
||||||
use mp4;
|
use mp4;
|
||||||
use anyhow;
|
use anyhow;
|
||||||
|
|
||||||
use mp4::ReadBox;
|
use mp4::{ReadBox,WriteBox};
|
||||||
|
|
||||||
pub struct Source {
|
pub struct Source {
|
||||||
|
// We read the file once, in order, and don't seek backwards.
|
||||||
reader: io::BufReader<fs::File>,
|
reader: io::BufReader<fs::File>,
|
||||||
pending: Option<Fragment>,
|
|
||||||
|
|
||||||
|
// Any fragments parsed and ready to be returned by next().
|
||||||
|
fragments: VecDeque<Fragment>,
|
||||||
|
|
||||||
|
// The timestamp when the broadcast "started", so we can sleep to simulate a live stream.
|
||||||
start: time::Instant,
|
start: time::Instant,
|
||||||
timescale: Option<u64>,
|
|
||||||
|
// The raw ftyp box, which we need duplicate for each track, but we don't know how many tracks exist yet.
|
||||||
|
ftyp: Vec<u8>,
|
||||||
|
|
||||||
|
// The parsed moov box, so we can look up track information later.
|
||||||
|
moov: Option<mp4::MoovBox>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Fragment {
|
pub struct Fragment {
|
||||||
|
// The track ID for the fragment.
|
||||||
|
pub track: u32,
|
||||||
|
|
||||||
|
// The type of the fragment.
|
||||||
pub typ: mp4::BoxType,
|
pub typ: mp4::BoxType,
|
||||||
|
|
||||||
|
// The data of the fragment.
|
||||||
pub data: Vec<u8>,
|
pub data: Vec<u8>,
|
||||||
|
|
||||||
|
// Whether this fragment is a keyframe.
|
||||||
pub keyframe: bool,
|
pub keyframe: bool,
|
||||||
pub timestamp: Option<u64>, // only used to simulate a live stream
|
|
||||||
|
// The timestamp of the fragment, in milliseconds, to simulate a live stream.
|
||||||
|
pub timestamp: Option<u64>
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Source {
|
impl Source {
|
||||||
|
@ -30,61 +52,123 @@ impl Source {
|
||||||
Ok(Self{
|
Ok(Self{
|
||||||
reader,
|
reader,
|
||||||
start,
|
start,
|
||||||
pending: None,
|
fragments: VecDeque::new(),
|
||||||
timescale: None,
|
ftyp: Vec::new(),
|
||||||
|
moov: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn next(&mut self) -> anyhow::Result<Option<Fragment>> {
|
pub fn next(&mut self) -> anyhow::Result<Option<Fragment>> {
|
||||||
if self.pending.is_none() {
|
if self.fragments.is_empty() {
|
||||||
self.pending = Some(self.next_inner()?);
|
self.parse()?;
|
||||||
};
|
};
|
||||||
|
|
||||||
if self.timeout().is_some() {
|
if self.timeout().is_some() {
|
||||||
return Ok(None)
|
return Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
let pending = self.pending.take();
|
Ok(self.fragments.pop_front())
|
||||||
Ok(pending)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_inner(&mut self) -> anyhow::Result<Fragment> {
|
fn parse(&mut self) -> anyhow::Result<()> {
|
||||||
|
loop {
|
||||||
// Read the next full atom.
|
// Read the next full atom.
|
||||||
let atom = read_box(&mut self.reader)?;
|
let atom = read_box(&mut self.reader)?;
|
||||||
let mut timestamp = None;
|
|
||||||
let mut keyframe = false;
|
|
||||||
|
|
||||||
// Before we return it, let's do some simple parsing.
|
// Before we return it, let's do some simple parsing.
|
||||||
let mut reader = io::Cursor::new(&atom);
|
let mut reader = io::Cursor::new(&atom);
|
||||||
let header = mp4::BoxHeader::read(&mut reader)?;
|
let header = mp4::BoxHeader::read(&mut reader)?;
|
||||||
|
|
||||||
match header.name {
|
match header.name {
|
||||||
|
mp4::BoxType::FtypBox => {
|
||||||
|
// Don't return anything until we know the total number of tracks.
|
||||||
|
// To be honest, I didn't expect the borrow checker to allow this, but it does!
|
||||||
|
self.ftyp = atom;
|
||||||
|
},
|
||||||
mp4::BoxType::MoovBox => {
|
mp4::BoxType::MoovBox => {
|
||||||
// We need to parse the moov to get the timescale.
|
// We need to split the moov based on the tracks.
|
||||||
let moov = mp4::MoovBox::read_box(&mut reader, header.size)?;
|
let moov = mp4::MoovBox::read_box(&mut reader, header.size)?;
|
||||||
self.timescale = Some(moov.traks[0].mdia.mdhd.timescale.into());
|
|
||||||
|
for trak in &moov.traks {
|
||||||
|
let track_id = trak.tkhd.track_id;
|
||||||
|
|
||||||
|
// Push the styp atom for each track.
|
||||||
|
self.fragments.push_back(Fragment {
|
||||||
|
track: track_id,
|
||||||
|
typ: mp4::BoxType::FtypBox,
|
||||||
|
data: self.ftyp.clone(),
|
||||||
|
keyframe: false,
|
||||||
|
timestamp: None,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Unfortunately, we need to create a brand new moov atom for each track.
|
||||||
|
// We remove every box for other track IDs.
|
||||||
|
let mut toov = moov.clone();
|
||||||
|
toov.traks.retain(|t| t.tkhd.track_id == track_id);
|
||||||
|
toov.mvex.as_mut().expect("missing mvex").trexs.retain(|f| f.track_id == track_id);
|
||||||
|
|
||||||
|
// Marshal the box.
|
||||||
|
let mut toov_data = Vec::new();
|
||||||
|
toov.write_box(&mut toov_data)?;
|
||||||
|
|
||||||
|
let mut file = std::fs::File::create(format!("track{}.mp4", track_id))?;
|
||||||
|
file.write_all(toov_data.as_slice());
|
||||||
|
|
||||||
|
self.fragments.push_back(Fragment {
|
||||||
|
track: track_id,
|
||||||
|
typ: mp4::BoxType::MoovBox,
|
||||||
|
data: toov_data,
|
||||||
|
keyframe: false,
|
||||||
|
timestamp: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
self.moov = Some(moov);
|
||||||
},
|
},
|
||||||
mp4::BoxType::MoofBox => {
|
mp4::BoxType::MoofBox => {
|
||||||
let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
|
let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
|
||||||
|
|
||||||
keyframe = has_keyframe(&moof);
|
if moof.trafs.len() != 1 {
|
||||||
timestamp = first_timestamp(&moof);
|
// We can't split the mdat atom, so this is impossible to support
|
||||||
}
|
anyhow::bail!("multiple tracks per moof atom")
|
||||||
_ => {},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Fragment {
|
self.fragments.push_back(Fragment{
|
||||||
typ: header.name,
|
track: moof.trafs[0].tfhd.track_id,
|
||||||
|
typ: mp4::BoxType::MoofBox,
|
||||||
data: atom,
|
data: atom,
|
||||||
keyframe,
|
keyframe: has_keyframe(&moof),
|
||||||
timestamp,
|
timestamp: first_timestamp(&moof),
|
||||||
})
|
})
|
||||||
|
},
|
||||||
|
mp4::BoxType::MdatBox => {
|
||||||
|
let moof = self.fragments.back().expect("no atom before mdat");
|
||||||
|
assert!(moof.typ == mp4::BoxType::MoofBox, "no moof before mdat");
|
||||||
|
|
||||||
|
self.fragments.push_back(Fragment{
|
||||||
|
track: moof.track,
|
||||||
|
typ: mp4::BoxType::MoofBox,
|
||||||
|
data: atom,
|
||||||
|
keyframe: false,
|
||||||
|
timestamp: None,
|
||||||
|
});
|
||||||
|
|
||||||
|
// We have some media data, return so we can start sending it.
|
||||||
|
return Ok(())
|
||||||
|
},
|
||||||
|
_ => anyhow::bail!("unknown top-level atom: {:?}", header.name),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Simulate a live stream by sleeping until the next timestamp in the media.
|
// Simulate a live stream by sleeping until the next timestamp in the media.
|
||||||
pub fn timeout(&self) -> Option<time::Duration> {
|
pub fn timeout(&self) -> Option<time::Duration> {
|
||||||
let timestamp = self.pending.as_ref()?.timestamp?;
|
let next = self.fragments.front()?;
|
||||||
let timescale = self.timescale?;
|
let timestamp = next.timestamp?;
|
||||||
|
|
||||||
|
// Find the timescale for the track.
|
||||||
|
let track = self.moov.as_ref()?.traks.iter().find(|t| t.tkhd.track_id == next.track)?;
|
||||||
|
let timescale = track.mdia.mdhd.timescale as u64;
|
||||||
|
|
||||||
let delay = time::Duration::from_millis(1000 * timestamp / timescale);
|
let delay = time::Duration::from_millis(1000 * timestamp / timescale);
|
||||||
let elapsed = self.start.elapsed();
|
let elapsed = self.start.elapsed();
|
||||||
|
|
|
@ -84,6 +84,7 @@ impl<T: app::App> Server<T> {
|
||||||
self.receive()?;
|
self.receive()?;
|
||||||
self.app()?;
|
self.app()?;
|
||||||
self.send()?;
|
self.send()?;
|
||||||
|
self.cleanup();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,7 +243,11 @@ impl<T: app::App> Server<T> {
|
||||||
pub fn app(&mut self) -> anyhow::Result<()> {
|
pub fn app(&mut self) -> anyhow::Result<()> {
|
||||||
for (_, conn) in &mut self.conns {
|
for (_, conn) in &mut self.conns {
|
||||||
if let Some(session) = &mut conn.session {
|
if let Some(session) = &mut conn.session {
|
||||||
conn.app.poll(&mut conn.quiche, session)?;
|
if let Err(e) = conn.app.poll(&mut conn.quiche, session) {
|
||||||
|
// Close the connection on any application error
|
||||||
|
let reason = format!("app error: {:?}", e);
|
||||||
|
conn.quiche.close(true, 0xff, reason.as_bytes()).ok();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -275,6 +280,11 @@ impl<T: app::App> Server<T> {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn cleanup(&mut self) {
|
||||||
|
// Garbage collect closed connections.
|
||||||
|
self.conns.retain(|_, ref mut c| !c.quiche.is_closed() );
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate a stateless retry token.
|
/// Generate a stateless retry token.
|
||||||
|
|
Loading…
Reference in New Issue