Video woooorks.

This commit is contained in:
Luke Curley 2023-04-27 13:21:16 -07:00
parent 15c3352d80
commit 2b1a3adecc
5 changed files with 133 additions and 49 deletions

View File

@ -1,6 +1,13 @@
#!/bin/bash
cd "$(dirname "$0")"
# empty_moov: Uses moof fragments instead of one giant moov/mdat pair.
# frag_every_frame: Creates a moof for each frame.
# separate_moof: Splits audio and video into separate moof flags.
# omit_tfhd_offset: Removes absolute byte offsets so we can fragment.
ffmpeg -i source.mp4 \
-movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset \
-c:v copy \
-an \
-movflags frag_every_frame+empty_moov \
fragmented.mp4

View File

@ -53,8 +53,6 @@ export default class Decoder {
const input = MP4.New();
input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => {
console.log(samples)
for (let sample of samples) {
const timestamp = 1000 * sample.dts / sample.timescale // milliseconds
@ -92,7 +90,7 @@ export default class Decoder {
for (let raw of init.raw) {
raw.fileStart = offset
input.appendBuffer(raw)
offset = input.appendBuffer(raw)
}
const stream = new Stream.Reader(msg.reader, msg.buffer)

View File

@ -6,13 +6,12 @@ use quiche::h3::webtransport;
use crate::{media,transport};
use super::message;
use mp4;
#[derive(Default)]
pub struct Session {
media: Option<media::Source>,
stream_id: Option<u64>, // stream ID of the current segment
styp: Option<Vec<u8>>,
streams: transport::Streams, // An easy way of buffering stream data.
}
impl transport::App for Session {
@ -55,6 +54,10 @@ impl transport::App for Session {
}
}
// Send any pending stream data.
self.streams.poll(conn)?;
// Fetch the next media fragment, possibly queuing up stream data.
self.poll_source(conn, session)?;
Ok(())
@ -67,19 +70,46 @@ impl transport::App for Session {
impl Session {
fn poll_source(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()> {
// Get the media source once the connection is established.
let media = match &mut self.media {
Some(m) => m,
None => return Ok(()),
};
// Get the next media fragment.
let fragment = match media.next()? {
Some(f) => f,
None => return Ok(()),
};
let mut stream_id = match self.stream_id {
Some(stream_id) => stream_id,
// Check if we have already created a stream for this fragment.
let stream_id = match self.stream_id {
Some(old_stream_id) if fragment.keyframe => {
// This is the start of a new segment.
// Close the prior stream.
self.streams.send(conn, old_stream_id, &[], true)?;
// Encode a JSON header indicating this is the video track.
let mut message = message::Message::new();
message.segment = Some(message::Segment{
init: "video".to_string(),
});
// Open a new stream.
let stream_id = session.open_stream(conn, false)?;
// TODO: conn.stream_priority(stream_id, urgency, incremental)
// Write the header.
let data = message.serialize()?;
self.streams.send(conn, stream_id, &data, false)?;
stream_id
},
None => {
// This is the start of an init segment.
// Create a JSON header.
let mut message = message::Message::new();
message.init = Some(message::Init{
id: "video".to_string(),
@ -87,54 +117,22 @@ impl Session {
let data = message.serialize()?;
// TODO handle when stream is full
// Create a new stream and write the header.
let stream_id = session.open_stream(conn, false)?;
session.send_stream_data(conn, stream_id, data.as_slice())?;
self.streams.send(conn, stream_id, data.as_slice(), false)?;
stream_id
},
}
Some(stream_id) => stream_id, // Continuation of init or segment
};
if fragment.keyframe {
// Close the prior stream.
conn.stream_send(stream_id, &[], true)?;
let mut message = message::Message::new();
message.segment = Some(message::Segment{
init: "video".to_string(),
});
let data = message.serialize()?;
// TODO: conn.stream_priority(stream_id, urgency, incremental)
// TODO handle when stream is full
stream_id = session.open_stream(conn, false)?;
session.send_stream_data(conn, stream_id, data.as_slice())?;
let styp = self.styp.as_ref().expect("missing ftyp mox");
session.send_stream_data(conn, stream_id, &styp)?;
}
// Write the current fragment.
let data = fragment.data.as_slice();
self.streams.send(conn, stream_id, data, false)?;
// TODO check if stream is writable
let size = session.send_stream_data(conn, stream_id, data)?;
if size < data.len() {
anyhow::bail!("partial write: {} < {}", size, data.len());
}
// Save for the next fragment
// Save the stream ID for the next fragment.
self.stream_id = Some(stream_id);
// Save the ftyp fragment but modify it to be a styp for furture segments.
if fragment.typ == mp4::BoxType::FtypBox {
let mut data = fragment.data;
data[4] = b's'; // ftyp to styp
self.styp = Some(data);
}
Ok(())
}
}

View File

@ -1,6 +1,8 @@
mod server;
mod connection;
mod app;
mod streams;
pub use app::App;
pub use server::{Config, Server};
pub use server::{Config, Server};
pub use streams::Streams;

View File

@ -0,0 +1,79 @@
use std::collections::hash_map as hmap;
use std::collections::VecDeque;
use quiche;
use anyhow;
#[derive(Default)]
pub struct Streams {
lookup: hmap::HashMap<u64, State>,
}
#[derive(Default)]
struct State {
buffer: VecDeque<u8>,
fin: bool,
}
impl Streams {
pub fn send(&mut self, conn: &mut quiche::Connection, id: u64, buf: &[u8], fin: bool) -> anyhow::Result<()> {
match self.lookup.entry(id) {
hmap::Entry::Occupied(mut entry) => {
// Add to the existing buffer.
let state = entry.get_mut();
state.buffer.extend(buf);
state.fin |= fin;
},
hmap::Entry::Vacant(entry) => {
let size = conn.stream_send(id, buf, fin)?;
if size < buf.len() {
// Short write, save the rest for later.
let mut buffer = VecDeque::with_capacity(buf.len());
buffer.extend(&buf[size..]);
entry.insert(State{buffer, fin});
}
},
};
Ok(())
}
pub fn poll(&mut self, conn: &mut quiche::Connection) -> anyhow::Result<()> {
'outer: for id in conn.writable() {
// Check if there's any buffered data for this stream.
let mut entry = match self.lookup.entry(id) {
hmap::Entry::Occupied(entry) => entry,
hmap::Entry::Vacant(_) => continue,
};
let state = entry.get_mut();
// Keep reading from the buffer until it's empty.
while state.buffer.len() > 0 {
// VecDeque is a ring buffer, so we can't write the whole thing at once.
let parts = state.buffer.as_slices();
let size = conn.stream_send(id, parts.0, false)?;
if size == 0 {
// No more space available for this stream.
continue 'outer
}
// Remove the bytes that were written.
state.buffer.drain(..size);
}
if state.fin {
// Write the stream done signal.
conn.stream_send(id, &[], true)?;
}
// We can remove the value from the lookup once we've flushed everything.
entry.remove();
}
Ok(())
}
}