From 2b1a3adecc4464a38abba39c17d12d51ca2fa0f5 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 27 Apr 2023 13:21:16 -0700 Subject: [PATCH] Video woooorks. --- media/generate | 9 +++- player/src/video/decoder.ts | 4 +- server/src/session/session.rs | 86 ++++++++++++++++----------------- server/src/transport/mod.rs | 4 +- server/src/transport/streams.rs | 79 ++++++++++++++++++++++++++++++ 5 files changed, 133 insertions(+), 49 deletions(-) create mode 100644 server/src/transport/streams.rs diff --git a/media/generate b/media/generate index b387ac9..922d2bc 100755 --- a/media/generate +++ b/media/generate @@ -1,6 +1,13 @@ #!/bin/bash +cd "$(dirname "$0")" + +# empty_moov: Uses moof fragments instead of one giant moov/mdat pair. +# frag_every_frame: Creates a moof for each frame. +# separate_moof: Splits audio and video into separate moof flags. +# omit_tfhd_offset: Removes absolute byte offsets so we can fragment. + ffmpeg -i source.mp4 \ + -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset \ -c:v copy \ -an \ - -movflags frag_every_frame+empty_moov \ fragmented.mp4 diff --git a/player/src/video/decoder.ts b/player/src/video/decoder.ts index 61c8113..582dcd2 100644 --- a/player/src/video/decoder.ts +++ b/player/src/video/decoder.ts @@ -53,8 +53,6 @@ export default class Decoder { const input = MP4.New(); input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => { - console.log(samples) - for (let sample of samples) { const timestamp = 1000 * sample.dts / sample.timescale // milliseconds @@ -92,7 +90,7 @@ export default class Decoder { for (let raw of init.raw) { raw.fileStart = offset - input.appendBuffer(raw) + offset = input.appendBuffer(raw) } const stream = new Stream.Reader(msg.reader, msg.buffer) diff --git a/server/src/session/session.rs b/server/src/session/session.rs index 9edd481..d01dd00 100644 --- a/server/src/session/session.rs +++ b/server/src/session/session.rs @@ -6,13 +6,12 @@ use quiche::h3::webtransport; use crate::{media,transport}; use super::message; -use mp4; - #[derive(Default)] pub struct Session { media: Option, stream_id: Option, // stream ID of the current segment - styp: Option>, + + streams: transport::Streams, // An easy way of buffering stream data. } impl transport::App for Session { @@ -55,6 +54,10 @@ impl transport::App for Session { } } + // Send any pending stream data. + self.streams.poll(conn)?; + + // Fetch the next media fragment, possibly queuing up stream data. self.poll_source(conn, session)?; Ok(()) @@ -67,19 +70,46 @@ impl transport::App for Session { impl Session { fn poll_source(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()> { + // Get the media source once the connection is established. let media = match &mut self.media { Some(m) => m, None => return Ok(()), }; + // Get the next media fragment. let fragment = match media.next()? { Some(f) => f, None => return Ok(()), }; - let mut stream_id = match self.stream_id { - Some(stream_id) => stream_id, + // Check if we have already created a stream for this fragment. + let stream_id = match self.stream_id { + Some(old_stream_id) if fragment.keyframe => { + // This is the start of a new segment. + + // Close the prior stream. + self.streams.send(conn, old_stream_id, &[], true)?; + + // Encode a JSON header indicating this is the video track. + let mut message = message::Message::new(); + message.segment = Some(message::Segment{ + init: "video".to_string(), + }); + + // Open a new stream. + let stream_id = session.open_stream(conn, false)?; + // TODO: conn.stream_priority(stream_id, urgency, incremental) + + // Write the header. + let data = message.serialize()?; + self.streams.send(conn, stream_id, &data, false)?; + + stream_id + }, None => { + // This is the start of an init segment. + + // Create a JSON header. let mut message = message::Message::new(); message.init = Some(message::Init{ id: "video".to_string(), @@ -87,54 +117,22 @@ impl Session { let data = message.serialize()?; - // TODO handle when stream is full + // Create a new stream and write the header. let stream_id = session.open_stream(conn, false)?; - session.send_stream_data(conn, stream_id, data.as_slice())?; + self.streams.send(conn, stream_id, data.as_slice(), false)?; stream_id - }, + } + Some(stream_id) => stream_id, // Continuation of init or segment }; - if fragment.keyframe { - // Close the prior stream. - conn.stream_send(stream_id, &[], true)?; - - let mut message = message::Message::new(); - message.segment = Some(message::Segment{ - init: "video".to_string(), - }); - - let data = message.serialize()?; - - // TODO: conn.stream_priority(stream_id, urgency, incremental) - - // TODO handle when stream is full - stream_id = session.open_stream(conn, false)?; - session.send_stream_data(conn, stream_id, data.as_slice())?; - - let styp = self.styp.as_ref().expect("missing ftyp mox"); - session.send_stream_data(conn, stream_id, &styp)?; - } - + // Write the current fragment. let data = fragment.data.as_slice(); + self.streams.send(conn, stream_id, data, false)?; - // TODO check if stream is writable - let size = session.send_stream_data(conn, stream_id, data)?; - if size < data.len() { - anyhow::bail!("partial write: {} < {}", size, data.len()); - } - - // Save for the next fragment + // Save the stream ID for the next fragment. self.stream_id = Some(stream_id); - // Save the ftyp fragment but modify it to be a styp for furture segments. - if fragment.typ == mp4::BoxType::FtypBox { - let mut data = fragment.data; - data[4] = b's'; // ftyp to styp - - self.styp = Some(data); - } - Ok(()) } } \ No newline at end of file diff --git a/server/src/transport/mod.rs b/server/src/transport/mod.rs index c7b8325..f8d86e8 100644 --- a/server/src/transport/mod.rs +++ b/server/src/transport/mod.rs @@ -1,6 +1,8 @@ mod server; mod connection; mod app; +mod streams; pub use app::App; -pub use server::{Config, Server}; \ No newline at end of file +pub use server::{Config, Server}; +pub use streams::Streams; \ No newline at end of file diff --git a/server/src/transport/streams.rs b/server/src/transport/streams.rs new file mode 100644 index 0000000..6b78233 --- /dev/null +++ b/server/src/transport/streams.rs @@ -0,0 +1,79 @@ +use std::collections::hash_map as hmap; +use std::collections::VecDeque; + +use quiche; +use anyhow; + +#[derive(Default)] +pub struct Streams { + lookup: hmap::HashMap, +} + +#[derive(Default)] +struct State { + buffer: VecDeque, + fin: bool, +} + +impl Streams { + pub fn send(&mut self, conn: &mut quiche::Connection, id: u64, buf: &[u8], fin: bool) -> anyhow::Result<()> { + match self.lookup.entry(id) { + hmap::Entry::Occupied(mut entry) => { + // Add to the existing buffer. + let state = entry.get_mut(); + state.buffer.extend(buf); + state.fin |= fin; + }, + hmap::Entry::Vacant(entry) => { + let size = conn.stream_send(id, buf, fin)?; + + if size < buf.len() { + // Short write, save the rest for later. + let mut buffer = VecDeque::with_capacity(buf.len()); + buffer.extend(&buf[size..]); + + entry.insert(State{buffer, fin}); + } + }, + }; + + Ok(()) + } + + pub fn poll(&mut self, conn: &mut quiche::Connection) -> anyhow::Result<()> { + 'outer: for id in conn.writable() { + // Check if there's any buffered data for this stream. + let mut entry = match self.lookup.entry(id) { + hmap::Entry::Occupied(entry) => entry, + hmap::Entry::Vacant(_) => continue, + }; + + let state = entry.get_mut(); + + // Keep reading from the buffer until it's empty. + while state.buffer.len() > 0 { + // VecDeque is a ring buffer, so we can't write the whole thing at once. + let parts = state.buffer.as_slices(); + + let size = conn.stream_send(id, parts.0, false)?; + if size == 0 { + // No more space available for this stream. + continue 'outer + } + + // Remove the bytes that were written. + state.buffer.drain(..size); + } + + if state.fin { + // Write the stream done signal. + conn.stream_send(id, &[], true)?; + } + + // We can remove the value from the lookup once we've flushed everything. + entry.remove(); + } + + Ok(()) + } +} \ No newline at end of file