Pretty gud.

This commit is contained in:
Luke Curley 2023-04-24 13:07:06 -07:00
parent c3dd45b7a7
commit 15c3352d80
3 changed files with 57 additions and 33 deletions

View File

@ -8,14 +8,17 @@ use mp4::ReadBox;
pub struct Source {
reader: io::BufReader<fs::File>,
start: time::Instant,
pending: Option<Fragment>,
start: time::Instant,
timescale: Option<u64>,
}
pub struct Fragment {
pub typ: mp4::BoxType,
pub data: Vec<u8>,
pub keyframe: bool,
pub timestamp: u64, // only used to simulate a live stream
pub timestamp: Option<u64>, // only used to simulate a live stream
}
impl Source {
@ -28,46 +31,66 @@ impl Source {
reader,
start,
pending: None,
timescale: None,
})
}
pub fn next(&mut self) -> anyhow::Result<Option<Fragment>> {
let pending = match self.pending.take() {
Some(f) => f,
None => self.next_inner()?,
if self.pending.is_none() {
self.pending = Some(self.next_inner()?);
};
if pending.timestamp > 0 && pending.timestamp < self.start.elapsed().as_millis() as u64 {
self.pending = Some(pending);
if self.timeout().is_some() {
return Ok(None)
}
Ok(Some(pending))
let pending = self.pending.take();
Ok(pending)
}
fn next_inner(&mut self) -> anyhow::Result<Fragment> {
// Read the next full atom.
let atom = read_box(&mut self.reader)?;
let mut timestamp = 0;
let mut timestamp = None;
let mut keyframe = false;
// Before we return it, let's do some simple parsing.
let mut reader = io::Cursor::new(&atom);
let header = mp4::BoxHeader::read(&mut reader)?;
if header.name == mp4::BoxType::MoofBox {
let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
match header.name {
mp4::BoxType::MoovBox => {
// We need to parse the moov to get the timescale.
let moov = mp4::MoovBox::read_box(&mut reader, header.size)?;
self.timescale = Some(moov.traks[0].mdia.mdhd.timescale.into());
},
mp4::BoxType::MoofBox => {
let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
keyframe = has_keyframe(&moof);
timestamp = first_timestamp(&moof);
keyframe = has_keyframe(&moof);
timestamp = first_timestamp(&moof);
}
_ => {},
}
Ok(Fragment {
typ: header.name,
data: atom,
keyframe,
timestamp,
})
}
// Simulate a live stream by sleeping until the next timestamp in the media.
pub fn timeout(&self) -> Option<time::Duration> {
let timestamp = self.pending.as_ref()?.timestamp?;
let timescale = self.timescale?;
let delay = time::Duration::from_millis(1000 * timestamp / timescale);
let elapsed = self.start.elapsed();
delay.checked_sub(elapsed)
}
}
// Read a full MP4 atom into a vector.
@ -139,16 +162,6 @@ fn has_keyframe(moof: &mp4::MoofBox) -> bool {
false
}
fn first_timestamp(moof: &mp4::MoofBox) -> u64 {
let traf = match moof.trafs.first() {
Some(t) => t,
None => return 0,
};
let tfdt = match &traf.tfdt {
Some(t) => t,
None => return 0,
};
tfdt.base_media_decode_time
fn first_timestamp(moof: &mp4::MoofBox) -> Option<u64> {
Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time)
}

View File

@ -14,7 +14,6 @@ pub struct Init {
#[derive(Serialize, Deserialize)]
pub struct Segment {
pub init: String,
pub timestamp: u64,
}
impl Message {

View File

@ -6,10 +6,13 @@ use quiche::h3::webtransport;
use crate::{media,transport};
use super::message;
use mp4;
#[derive(Default)]
pub struct Session {
media: Option<media::Source>,
stream_id: Option<u64>, // stream ID of the current segment
styp: Option<Vec<u8>>,
}
impl transport::App for Session {
@ -58,7 +61,7 @@ impl transport::App for Session {
}
fn timeout(&self) -> Option<time::Duration> {
None
self.media.as_ref().and_then(|m| m.timeout())
}
}
@ -74,8 +77,6 @@ impl Session {
None => return Ok(()),
};
log::debug!("{} {}", fragment.keyframe, fragment.timestamp);
let mut stream_id = match self.stream_id {
Some(stream_id) => stream_id,
None => {
@ -101,7 +102,6 @@ impl Session {
let mut message = message::Message::new();
message.segment = Some(message::Segment{
init: "video".to_string(),
timestamp: fragment.timestamp,
});
let data = message.serialize()?;
@ -111,18 +111,30 @@ impl Session {
// TODO handle when stream is full
stream_id = session.open_stream(conn, false)?;
session.send_stream_data(conn, stream_id, data.as_slice())?;
let styp = self.styp.as_ref().expect("missing ftyp mox");
session.send_stream_data(conn, stream_id, &styp)?;
}
let data = fragment.data.as_slice();
// TODO check if stream is writable
session.send_stream_data(conn, stream_id, data)?;
log::debug!("wrote {} to {}", std::str::from_utf8(&data[4..8]).unwrap(), stream_id);
let size = session.send_stream_data(conn, stream_id, data)?;
if size < data.len() {
anyhow::bail!("partial write: {} < {}", size, data.len());
}
// Save for the next fragment
self.stream_id = Some(stream_id);
// Save the ftyp fragment but modify it to be a styp for furture segments.
if fragment.typ == mp4::BoxType::FtypBox {
let mut data = fragment.data;
data[4] = b's'; // ftyp to styp
self.styp = Some(data);
}
Ok(())
}
}