Proggers.

This commit is contained in:
Luke Curley 2023-04-24 11:45:46 -07:00
parent bb0437a3bb
commit c3dd45b7a7
12 changed files with 160 additions and 414 deletions

View File

@ -44,7 +44,6 @@ export default class Transport {
// Helper function to make creating a promise easier // Helper function to make creating a promise easier
private async connect(props: TransportInit): Promise<WebTransport> { private async connect(props: TransportInit): Promise<WebTransport> {
let options: WebTransportOptions = {}; let options: WebTransportOptions = {};
if (props.fingerprint) { if (props.fingerprint) {
options.serverCertificateHashes = [ props.fingerprint ] options.serverCertificateHashes = [ props.fingerprint ]
@ -98,11 +97,15 @@ export default class Transport {
return this.handleInit(r, msg.init as Message.Init) return this.handleInit(r, msg.init as Message.Init)
} else if (msg.segment) { } else if (msg.segment) {
return this.handleSegment(r, msg.segment as Message.Segment) return this.handleSegment(r, msg.segment as Message.Segment)
} else {
console.warn("unknown message", msg);
} }
} }
} }
async handleInit(stream: Stream.Reader, msg: Message.Init) { async handleInit(stream: Stream.Reader, msg: Message.Init) {
console.log("handle init", msg);
let track = this.tracks.get(msg.id); let track = this.tracks.get(msg.id);
if (!track) { if (!track) {
track = new MP4.InitParser() track = new MP4.InitParser()
@ -118,6 +121,8 @@ export default class Transport {
const info = await track.info const info = await track.info
console.log(info);
if (info.audioTracks.length + info.videoTracks.length != 1) { if (info.audioTracks.length + info.videoTracks.length != 1) {
throw new Error("expected a single track") throw new Error("expected a single track")
} }
@ -140,6 +145,8 @@ export default class Transport {
} }
async handleSegment(stream: Stream.Reader, msg: Message.Segment) { async handleSegment(stream: Stream.Reader, msg: Message.Segment) {
console.log("handle segment", msg);
let track = this.tracks.get(msg.init); let track = this.tracks.get(msg.init);
if (!track) { if (!track) {
track = new MP4.InitParser() track = new MP4.InitParser()

View File

@ -53,6 +53,8 @@ export default class Decoder {
const input = MP4.New(); const input = MP4.New();
input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => { input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => {
console.log(samples)
for (let sample of samples) { for (let sample of samples) {
const timestamp = 1000 * sample.dts / sample.timescale // milliseconds const timestamp = 1000 * sample.dts / sample.timescale // milliseconds

View File

@ -1,2 +1,3 @@
pub mod transport; pub mod transport;
//mod media; pub mod session;
pub mod media;

View File

@ -1,7 +1,4 @@
use quiche::h3::webtransport; use warp::{session,transport};
use warp::transport;
use std::time;
use clap::Parser; use clap::Parser;
use env_logger; use env_logger;
@ -26,31 +23,6 @@ struct Cli {
media: String, media: String,
} }
#[derive(Default)]
struct Connection {
webtransport: Option<webtransport::ServerSession>,
}
impl transport::App for Connection {
fn poll(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()> {
if !conn.is_established() {
// Wait until the handshake finishes
return Ok(())
}
if self.webtransport.is_none() {
self.webtransport = Some(webtransport::ServerSession::with_transport(conn)?)
}
let webtransport = self.webtransport.as_mut().unwrap();
Ok(())
}
fn timeout(&self) -> Option<time::Duration> {
None
}
}
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
env_logger::init(); env_logger::init();
@ -62,6 +34,6 @@ fn main() -> anyhow::Result<()> {
key: args.key, key: args.key,
}; };
let mut server = transport::Server::<Connection>::new(server_config).unwrap(); let mut server = transport::Server::<session::Session>::new(server_config).unwrap();
server.run() server.run()
} }

View File

@ -1,108 +0,0 @@
use std::{io,fs};
use mp4;
use anyhow;
use bytes;
use mp4::ReadBox;
pub struct Source {
pub segments: Vec<Segment>,
}
impl Source {
pub fn new(path: &str) -> anyhow::Result<Self> {
let f = fs::read(path)?;
let mut bytes = bytes::Bytes::from(f);
let mut segments = Vec::new();
let mut current = Segment::new();
while bytes.len() > 0 {
// NOTE: Cloning is cheap, since the underlying bytes are reference counted.
let mut reader = io::Cursor::new(bytes.clone());
let header = mp4::BoxHeader::read(&mut reader)?;
let size: usize = header.size as usize;
assert!(size > 0, "empty box");
let frag = bytes.split_to(size);
let fragment = Fragment{ bytes: frag };
match header.name {
/*
mp4::BoxType::FtypBox => {
}
mp4::BoxType::MoovBox => {
moov = mp4::MoovBox::read_box(&mut reader, size)?
}
mp4::BoxType::EmsgBox => {
let emsg = mp4::EmsgBox::read_box(&mut reader, size)?;
emsgs.push(emsg);
}
mp4::BoxType::MdatBox => {
mp4::skip_box(&mut reader, size)?;
}
*/
mp4::BoxType::MoofBox => {
let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
if has_keyframe(moof) {
segments.push(current);
current = Segment::new();
}
}
_ => (),
}
current.fragments.push(fragment);
}
segments.push(current);
Ok(Self { segments })
}
}
fn has_keyframe(moof: mp4::MoofBox) -> bool {
for traf in moof.trafs {
// TODO trak default flags if this is None
let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default();
let trun = traf.trun.expect("missing trun box");
for i in 0..trun.sample_count {
let mut flags = match trun.sample_flags.get(i as usize) {
Some(f) => *f,
None => default_flags,
};
if i == 0 && trun.first_sample_flags.is_some() {
flags = trun.first_sample_flags.unwrap();
}
// https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177
let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther
let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample
if keyframe && non_sync {
return true
}
}
}
false
}
pub struct Segment {
pub fragments: Vec<Fragment>,
}
impl Segment {
fn new() -> Self {
Segment { fragments: Vec::new() }
}
}
pub struct Fragment {
pub bytes: bytes::Bytes,
}

View File

@ -1 +1,3 @@
pub mod source; mod source;
pub use source::{Fragment,Source};

View File

@ -9,15 +9,13 @@ use mp4::ReadBox;
pub struct Source { pub struct Source {
reader: io::BufReader<fs::File>, reader: io::BufReader<fs::File>,
start: time::Instant, start: time::Instant,
pending: Option<Fragment>, pending: Option<Fragment>,
sequence: u64,
} }
pub struct Fragment { pub struct Fragment {
pub data: Vec<u8>, pub data: Vec<u8>,
pub segment_id: u64, pub keyframe: bool,
pub timestamp: u64, pub timestamp: u64, // only used to simulate a live stream
} }
impl Source { impl Source {
@ -30,7 +28,6 @@ impl Source {
reader, reader,
start, start,
pending: None, pending: None,
sequence: 0,
}) })
} }
@ -52,29 +49,23 @@ impl Source {
// Read the next full atom. // Read the next full atom.
let atom = read_box(&mut self.reader)?; let atom = read_box(&mut self.reader)?;
let mut timestamp = 0; let mut timestamp = 0;
let mut keyframe = false;
// Before we return it, let's do some simple parsing. // Before we return it, let's do some simple parsing.
let mut reader = io::Cursor::new(&atom); let mut reader = io::Cursor::new(&atom);
let header = mp4::BoxHeader::read(&mut reader)?; let header = mp4::BoxHeader::read(&mut reader)?;
if header.name == mp4::BoxType::MoofBox {
match header.name {
mp4::BoxType::MoofBox => {
let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
if has_keyframe(&moof) { keyframe = has_keyframe(&moof);
self.sequence += 1
}
timestamp = first_timestamp(&moof); timestamp = first_timestamp(&moof);
} }
_ => (),
}
Ok(Fragment { Ok(Fragment {
data: atom, data: atom,
segment_id: self.sequence, keyframe,
timestamp: timestamp, timestamp,
}) })
} }
} }
@ -139,7 +130,7 @@ fn has_keyframe(moof: &mp4::MoofBox) -> bool {
let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther
let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample
if keyframe && non_sync { if keyframe && !non_sync {
return true return true
} }
} }

View File

@ -31,8 +31,8 @@ impl Message {
let size = bytes.len() + 8; let size = bytes.len() + 8;
let mut out = Vec::with_capacity(size); let mut out = Vec::with_capacity(size);
out.extend_from_slice(&(size as u32).to_be_bytes());
out.extend_from_slice(b"warp"); out.extend_from_slice(b"warp");
out.extend_from_slice(&size.to_be_bytes());
out.extend_from_slice(bytes); out.extend_from_slice(bytes);
Ok(out) Ok(out)

View File

@ -0,0 +1,4 @@
mod session;
mod message;
pub use session::Session;

View File

@ -0,0 +1,128 @@
use std::time;
use quiche;
use quiche::h3::webtransport;
use crate::{media,transport};
use super::message;
#[derive(Default)]
pub struct Session {
media: Option<media::Source>,
stream_id: Option<u64>, // stream ID of the current segment
}
impl transport::App for Session {
// Process any updates to a session.
fn poll(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()> {
loop {
let event = match session.poll(conn) {
Err(webtransport::Error::Done) => break,
Err(e) => return Err(e.into()),
Ok(e) => e,
};
log::debug!("webtransport event: {:?}", event);
match event {
webtransport::ServerEvent::ConnectRequest(req) => {
log::debug!("new connect {:?}", req);
// you can handle request with
// req.authority()
// req.path()
// and you can validate this request with req.origin()
// TODO
let media = media::Source::new("../media/fragmented.mp4")?;
self.media = Some(media);
session.accept_connect_request(conn, None).unwrap();
},
webtransport::ServerEvent::StreamData(stream_id) => {
let mut buf = vec![0; 10000];
while let Ok(len) =
session.recv_stream_data(conn, stream_id, &mut buf)
{
let stream_data = &buf[0..len];
log::debug!("stream data {:?}", stream_data);
}
}
_ => {},
}
}
self.poll_source(conn, session)?;
Ok(())
}
fn timeout(&self) -> Option<time::Duration> {
None
}
}
impl Session {
fn poll_source(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()> {
let media = match &mut self.media {
Some(m) => m,
None => return Ok(()),
};
let fragment = match media.next()? {
Some(f) => f,
None => return Ok(()),
};
log::debug!("{} {}", fragment.keyframe, fragment.timestamp);
let mut stream_id = match self.stream_id {
Some(stream_id) => stream_id,
None => {
let mut message = message::Message::new();
message.init = Some(message::Init{
id: "video".to_string(),
});
let data = message.serialize()?;
// TODO handle when stream is full
let stream_id = session.open_stream(conn, false)?;
session.send_stream_data(conn, stream_id, data.as_slice())?;
stream_id
},
};
if fragment.keyframe {
// Close the prior stream.
conn.stream_send(stream_id, &[], true)?;
let mut message = message::Message::new();
message.segment = Some(message::Segment{
init: "video".to_string(),
timestamp: fragment.timestamp,
});
let data = message.serialize()?;
// TODO: conn.stream_priority(stream_id, urgency, incremental)
// TODO handle when stream is full
stream_id = session.open_stream(conn, false)?;
session.send_stream_data(conn, stream_id, data.as_slice())?;
}
let data = fragment.data.as_slice();
// TODO check if stream is writable
session.send_stream_data(conn, stream_id, data)?;
log::debug!("wrote {} to {}", std::str::from_utf8(&data[4..8]).unwrap(), stream_id);
// Save for the next fragment
self.stream_id = Some(stream_id);
Ok(())
}
}

View File

@ -1,5 +1,4 @@
mod server; mod server;
mod session;
mod connection; mod connection;
mod app; mod app;

View File

@ -1,252 +0,0 @@
use std::collections::hash_map as hmap;
use quiche::h3::webtransport;
type Session = webtransport::ServerSession;
type Map = hmap::HashMap<quiche::ConnectionId<'static>, Session>;
/*
impl Session {
pub fn with_transport(conn: &mut quiche::Connection) -> anyhow::Result<Self> {
let session = webtransport::ServerSession::with_transport(conn)?;
Ok(Self{
session
})
}
// Process any updates to a session.
pub fn poll(&mut self) -> anyhow::Result<()> {
log::debug!("poll conn");
while self.poll_once()? {}
log::debug!("poll streams");
self.poll_streams()?;
Ok(())
}
// Process any updates to a session.
pub fn poll_once(&mut self) -> anyhow::Result<bool> {
let session = match &mut self.session {
Some(s) => s,
None => return Ok(false),
};
let event = match session.poll(&mut self.conn) {
Err(webtransport::Error::Done) => return Ok(false),
Err(e) => return Err(e.into()),
Ok(e) => e,
};
match event {
webtransport::ServerEvent::ConnectRequest(req) => {
log::debug!("new connect {:?}", req);
// you can handle request with
// req.authority()
// req.path()
// and you can validate this request with req.origin()
session.accept_connect_request(&mut self.conn, None).unwrap();
},
webtransport::ServerEvent::StreamData(stream_id) => {
log::debug!("on stream data {}", stream_id);
let mut buf = vec![0; 10000];
while let Ok(len) =
session.recv_stream_data(&mut self.conn, stream_id, &mut buf)
{
let stream_data = &buf[0..len];
log::debug!("stream data {:?}", stream_data);
/*
// handle stream_data
if (stream_id & 0x2) == 0 {
// bidirectional stream
// you can send data through this stream.
session
.send_stream_data(&mut self.conn, stream_id, stream_data)
.unwrap();
} else {
// you cannot send data through client-initiated-unidirectional-stream.
// so, open new server-initiated-unidirectional-stream, and send data
// through it.
let new_stream_id =
session.open_stream(&mut self.conn, false).unwrap();
session
.send_stream_data(&mut self.conn, new_stream_id, stream_data)
.unwrap();
}
*/
}
}
webtransport::ServerEvent::StreamFinished(stream_id) => {
// A WebTrnasport stream finished, handle it.
log::debug!("stream finished {}", stream_id);
}
webtransport::ServerEvent::Datagram => {
log::debug!("datagram");
}
webtransport::ServerEvent::SessionReset(e) => {
log::debug!("session reset {}", e);
// Peer reset session stream, handle it.
}
webtransport::ServerEvent::SessionFinished => {
log::debug!("session finished");
// Peer finish session stream, handle it.
}
webtransport::ServerEvent::SessionGoAway => {
log::debug!("session go away");
// Peer signalled it is going away, handle it.
}
webtransport::ServerEvent::Other(stream_id, event) => {
log::debug!("session other: {} {:?}", stream_id, event);
// Original h3::Event which is not related to WebTransport.
}
}
Ok(true)
}
/*
fn poll_source(&mut self) -> anyhow::Result<()> {
let media = match &mut self.media {
Some(m) => m,
None => return Ok(()),
};
let fragment = match media.next()? {
Some(f) => f,
None => return Ok(()),
};
// Get or create a new stream for each unique segment ID.
let stream_id = match self.segments.entry(fragment.segment_id) {
map::Entry::Occupied(e) => e.into_mut(),
map::Entry::Vacant(e) => {
let stream_id = self.start_stream(&fragment)?;
e.insert(stream_id)
},
};
// Get or create a buffered object for each unique stream ID.
let buffered = match self.streams.entry(*stream_id) {
map::Entry::Occupied(e) => e.into_mut(),
map::Entry::Vacant(e) => e.insert(Buffered::new()),
};
let session = match &mut self.session {
Some(s) => s,
None => return Ok(()),
};
let data = fragment.data.as_slice();
match self.conn.stream_writable(*stream_id, data.len()) {
Ok(true) if buffered.len() == 0 => {
session.send_stream_data(&mut self.conn, *stream_id, data)?;
},
Ok(_) => buffered.push_back(fragment.data),
Err(quiche::Error::Done) => {}, // stream closed?
Err(e) => anyhow::bail!(e),
};
Ok(())
}
fn start_stream(&mut self, fragment: &source::Fragment) -> anyhow::Result<u64> {
let conn = &mut self.conn;
let session = self.session.as_mut().unwrap();
let stream_id = session.open_stream(conn, false)?;
// TODO: conn.stream_priority(stream_id, urgency, incremental)
let mut message = message::Message::new();
if fragment.segment_id == 0 {
message.init = Some(message::Init{
id: "video".to_string(),
});
} else {
message.segment = Some(message::Segment{
init: "video".to_string(),
timestamp: fragment.timestamp,
});
}
let data= message.serialize()?;
match conn.stream_writable(stream_id, data.len()) {
Ok(true) => {
session.send_stream_data(conn, stream_id, data.as_slice())?;
},
Ok(false) => {
let mut buffered = Buffered::new();
buffered.push_back(data);
self.streams.insert(stream_id, buffered);
},
Err(quiche::Error::Done) => {},
Err(e) => anyhow::bail!(e),
};
Ok(stream_id)
}
*/
fn poll_streams(&mut self) -> anyhow::Result<()> {
// TODO make sure this loops in priority order
for stream_id in self.conn.writable() {
self.poll_stream(stream_id)?;
}
// Remove any entry buffered values.
self.streams.retain(|_, buffered| buffered.len() > 0 );
Ok(())
}
pub fn poll_stream(&mut self, stream_id: u64) -> anyhow::Result<()> {
let buffered = match self.streams.get_mut(&stream_id) {
Some(b) => b,
None => return Ok(()),
};
let conn = &mut self.conn;
let session = match &mut self.session {
Some(s) => s,
None => return Ok(()),
};
while let Some(data) = buffered.pop_front() {
match conn.stream_writable(stream_id, data.len()) {
Ok(true) => {
session.send_stream_data(conn, stream_id, data.as_slice())?;
},
Ok(false) => {
buffered.push_front(data);
return Ok(());
},
Err(quiche::Error::Done) => {},
Err(e) => anyhow::bail!(e),
};
}
Ok(())
}
pub fn timeout(&self) -> Option<time::Duration> {
self.conn.timeout()
}
pub fn on_timeout(&mut self) {
self.conn.on_timeout()
// custom stuff here
}
}
*/