diff --git a/player/src/transport/index.ts b/player/src/transport/index.ts index 90bb29b..ee58ed1 100644 --- a/player/src/transport/index.ts +++ b/player/src/transport/index.ts @@ -44,7 +44,6 @@ export default class Transport { // Helper function to make creating a promise easier private async connect(props: TransportInit): Promise { - let options: WebTransportOptions = {}; if (props.fingerprint) { options.serverCertificateHashes = [ props.fingerprint ] @@ -98,11 +97,15 @@ export default class Transport { return this.handleInit(r, msg.init as Message.Init) } else if (msg.segment) { return this.handleSegment(r, msg.segment as Message.Segment) + } else { + console.warn("unknown message", msg); } } } async handleInit(stream: Stream.Reader, msg: Message.Init) { + console.log("handle init", msg); + let track = this.tracks.get(msg.id); if (!track) { track = new MP4.InitParser() @@ -118,6 +121,8 @@ export default class Transport { const info = await track.info + console.log(info); + if (info.audioTracks.length + info.videoTracks.length != 1) { throw new Error("expected a single track") } @@ -140,6 +145,8 @@ export default class Transport { } async handleSegment(stream: Stream.Reader, msg: Message.Segment) { + console.log("handle segment", msg); + let track = this.tracks.get(msg.init); if (!track) { track = new MP4.InitParser() diff --git a/player/src/video/decoder.ts b/player/src/video/decoder.ts index 376f99a..61c8113 100644 --- a/player/src/video/decoder.ts +++ b/player/src/video/decoder.ts @@ -53,6 +53,8 @@ export default class Decoder { const input = MP4.New(); input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => { + console.log(samples) + for (let sample of samples) { const timestamp = 1000 * sample.dts / sample.timescale // milliseconds diff --git a/server/src/lib.rs b/server/src/lib.rs index 915e3b1..2c973b8 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,2 +1,3 @@ pub mod transport; -//mod media; \ No newline at end of file +pub mod session; +pub mod media; \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index 9c44d0f..1ffa226 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,7 +1,4 @@ -use quiche::h3::webtransport; -use warp::transport; - -use std::time; +use warp::{session,transport}; use clap::Parser; use env_logger; @@ -26,31 +23,6 @@ struct Cli { media: String, } -#[derive(Default)] -struct Connection { - webtransport: Option, -} - -impl transport::App for Connection { - fn poll(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()> { - if !conn.is_established() { - // Wait until the handshake finishes - return Ok(()) - } - - if self.webtransport.is_none() { - self.webtransport = Some(webtransport::ServerSession::with_transport(conn)?) - } - - let webtransport = self.webtransport.as_mut().unwrap(); - - Ok(()) - } - - fn timeout(&self) -> Option { - None - } -} fn main() -> anyhow::Result<()> { env_logger::init(); @@ -62,6 +34,6 @@ fn main() -> anyhow::Result<()> { key: args.key, }; - let mut server = transport::Server::::new(server_config).unwrap(); + let mut server = transport::Server::::new(server_config).unwrap(); server.run() } \ No newline at end of file diff --git a/server/src/media.rs b/server/src/media.rs deleted file mode 100644 index 5e86adb..0000000 --- a/server/src/media.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::{io,fs}; - -use mp4; -use anyhow; -use bytes; - -use mp4::ReadBox; - -pub struct Source { - pub segments: Vec, -} - -impl Source { - pub fn new(path: &str) -> anyhow::Result { - let f = fs::read(path)?; - let mut bytes = bytes::Bytes::from(f); - - let mut segments = Vec::new(); - let mut current = Segment::new(); - - while bytes.len() > 0 { - // NOTE: Cloning is cheap, since the underlying bytes are reference counted. - let mut reader = io::Cursor::new(bytes.clone()); - - let header = mp4::BoxHeader::read(&mut reader)?; - let size: usize = header.size as usize; - - assert!(size > 0, "empty box"); - - let frag = bytes.split_to(size); - let fragment = Fragment{ bytes: frag }; - - match header.name { - /* - mp4::BoxType::FtypBox => { - } - mp4::BoxType::MoovBox => { - moov = mp4::MoovBox::read_box(&mut reader, size)? - } - mp4::BoxType::EmsgBox => { - let emsg = mp4::EmsgBox::read_box(&mut reader, size)?; - emsgs.push(emsg); - } - mp4::BoxType::MdatBox => { - mp4::skip_box(&mut reader, size)?; - } - */ - mp4::BoxType::MoofBox => { - let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; - if has_keyframe(moof) { - segments.push(current); - current = Segment::new(); - } - } - _ => (), - } - - current.fragments.push(fragment); - } - - segments.push(current); - - Ok(Self { segments }) - } -} - -fn has_keyframe(moof: mp4::MoofBox) -> bool { - for traf in moof.trafs { - // TODO trak default flags if this is None - let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default(); - let trun = traf.trun.expect("missing trun box"); - - for i in 0..trun.sample_count { - let mut flags = match trun.sample_flags.get(i as usize) { - Some(f) => *f, - None => default_flags, - }; - - if i == 0 && trun.first_sample_flags.is_some() { - flags = trun.first_sample_flags.unwrap(); - } - - // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177 - let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther - let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample - - if keyframe && non_sync { - return true - } - } - } - - false -} - -pub struct Segment { - pub fragments: Vec, -} - -impl Segment { - fn new() -> Self { - Segment { fragments: Vec::new() } - } -} - -pub struct Fragment { - pub bytes: bytes::Bytes, -} diff --git a/server/src/media/mod.rs b/server/src/media/mod.rs index b5cb700..80a709f 100644 --- a/server/src/media/mod.rs +++ b/server/src/media/mod.rs @@ -1 +1,3 @@ -pub mod source; \ No newline at end of file +mod source; + +pub use source::{Fragment,Source}; \ No newline at end of file diff --git a/server/src/media/source.rs b/server/src/media/source.rs index 7d83b5b..45320b1 100644 --- a/server/src/media/source.rs +++ b/server/src/media/source.rs @@ -9,15 +9,13 @@ use mp4::ReadBox; pub struct Source { reader: io::BufReader, start: time::Instant, - pending: Option, - sequence: u64, } pub struct Fragment { pub data: Vec, - pub segment_id: u64, - pub timestamp: u64, + pub keyframe: bool, + pub timestamp: u64, // only used to simulate a live stream } impl Source { @@ -30,7 +28,6 @@ impl Source { reader, start, pending: None, - sequence: 0, }) } @@ -52,29 +49,23 @@ impl Source { // Read the next full atom. let atom = read_box(&mut self.reader)?; let mut timestamp = 0; + let mut keyframe = false; // Before we return it, let's do some simple parsing. let mut reader = io::Cursor::new(&atom); let header = mp4::BoxHeader::read(&mut reader)?; + if header.name == mp4::BoxType::MoofBox { + let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; - match header.name { - mp4::BoxType::MoofBox => { - let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; - - if has_keyframe(&moof) { - self.sequence += 1 - } - - timestamp = first_timestamp(&moof); - } - _ => (), + keyframe = has_keyframe(&moof); + timestamp = first_timestamp(&moof); } Ok(Fragment { data: atom, - segment_id: self.sequence, - timestamp: timestamp, + keyframe, + timestamp, }) } } @@ -139,7 +130,7 @@ fn has_keyframe(moof: &mp4::MoofBox) -> bool { let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample - if keyframe && non_sync { + if keyframe && !non_sync { return true } } diff --git a/server/src/message.rs b/server/src/session/message.rs similarity index 92% rename from server/src/message.rs rename to server/src/session/message.rs index 1421e91..314660a 100644 --- a/server/src/message.rs +++ b/server/src/session/message.rs @@ -31,8 +31,8 @@ impl Message { let size = bytes.len() + 8; let mut out = Vec::with_capacity(size); + out.extend_from_slice(&(size as u32).to_be_bytes()); out.extend_from_slice(b"warp"); - out.extend_from_slice(&size.to_be_bytes()); out.extend_from_slice(bytes); Ok(out) diff --git a/server/src/session/mod.rs b/server/src/session/mod.rs new file mode 100644 index 0000000..031805c --- /dev/null +++ b/server/src/session/mod.rs @@ -0,0 +1,4 @@ +mod session; +mod message; + +pub use session::Session; \ No newline at end of file diff --git a/server/src/session/session.rs b/server/src/session/session.rs new file mode 100644 index 0000000..d694956 --- /dev/null +++ b/server/src/session/session.rs @@ -0,0 +1,128 @@ +use std::time; + +use quiche; +use quiche::h3::webtransport; + +use crate::{media,transport}; +use super::message; + +#[derive(Default)] +pub struct Session { + media: Option, + stream_id: Option, // stream ID of the current segment +} + +impl transport::App for Session { + // Process any updates to a session. + fn poll(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()> { + loop { + let event = match session.poll(conn) { + Err(webtransport::Error::Done) => break, + Err(e) => return Err(e.into()), + Ok(e) => e, + }; + + log::debug!("webtransport event: {:?}", event); + + match event { + webtransport::ServerEvent::ConnectRequest(req) => { + log::debug!("new connect {:?}", req); + // you can handle request with + // req.authority() + // req.path() + // and you can validate this request with req.origin() + + // TODO + let media = media::Source::new("../media/fragmented.mp4")?; + self.media = Some(media); + + session.accept_connect_request(conn, None).unwrap(); + }, + webtransport::ServerEvent::StreamData(stream_id) => { + let mut buf = vec![0; 10000]; + while let Ok(len) = + session.recv_stream_data(conn, stream_id, &mut buf) + { + let stream_data = &buf[0..len]; + log::debug!("stream data {:?}", stream_data); + } + } + + _ => {}, + } + } + + self.poll_source(conn, session)?; + + Ok(()) + } + + fn timeout(&self) -> Option { + None + } +} + +impl Session { + fn poll_source(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()> { + let media = match &mut self.media { + Some(m) => m, + None => return Ok(()), + }; + + let fragment = match media.next()? { + Some(f) => f, + None => return Ok(()), + }; + + log::debug!("{} {}", fragment.keyframe, fragment.timestamp); + + let mut stream_id = match self.stream_id { + Some(stream_id) => stream_id, + None => { + let mut message = message::Message::new(); + message.init = Some(message::Init{ + id: "video".to_string(), + }); + + let data = message.serialize()?; + + // TODO handle when stream is full + let stream_id = session.open_stream(conn, false)?; + session.send_stream_data(conn, stream_id, data.as_slice())?; + + stream_id + }, + }; + + if fragment.keyframe { + // Close the prior stream. + conn.stream_send(stream_id, &[], true)?; + + let mut message = message::Message::new(); + message.segment = Some(message::Segment{ + init: "video".to_string(), + timestamp: fragment.timestamp, + }); + + let data = message.serialize()?; + + // TODO: conn.stream_priority(stream_id, urgency, incremental) + + // TODO handle when stream is full + stream_id = session.open_stream(conn, false)?; + session.send_stream_data(conn, stream_id, data.as_slice())?; + } + + let data = fragment.data.as_slice(); + + // TODO check if stream is writable + session.send_stream_data(conn, stream_id, data)?; + + log::debug!("wrote {} to {}", std::str::from_utf8(&data[4..8]).unwrap(), stream_id); + + // Save for the next fragment + self.stream_id = Some(stream_id); + + Ok(()) + } +} \ No newline at end of file diff --git a/server/src/transport/mod.rs b/server/src/transport/mod.rs index c003fa3..c7b8325 100644 --- a/server/src/transport/mod.rs +++ b/server/src/transport/mod.rs @@ -1,5 +1,4 @@ mod server; -mod session; mod connection; mod app; diff --git a/server/src/transport/session.rs b/server/src/transport/session.rs deleted file mode 100644 index 1334704..0000000 --- a/server/src/transport/session.rs +++ /dev/null @@ -1,252 +0,0 @@ -use std::collections::hash_map as hmap; -use quiche::h3::webtransport; - -type Session = webtransport::ServerSession; -type Map = hmap::HashMap, Session>; - -/* -impl Session { - pub fn with_transport(conn: &mut quiche::Connection) -> anyhow::Result { - let session = webtransport::ServerSession::with_transport(conn)?; - - Ok(Self{ - session - }) - } - - // Process any updates to a session. - pub fn poll(&mut self) -> anyhow::Result<()> { - log::debug!("poll conn"); - while self.poll_once()? {} - - log::debug!("poll streams"); - self.poll_streams()?; - - Ok(()) - } - - // Process any updates to a session. - pub fn poll_once(&mut self) -> anyhow::Result { - let session = match &mut self.session { - Some(s) => s, - None => return Ok(false), - }; - - let event = match session.poll(&mut self.conn) { - Err(webtransport::Error::Done) => return Ok(false), - Err(e) => return Err(e.into()), - Ok(e) => e, - }; - - match event { - webtransport::ServerEvent::ConnectRequest(req) => { - log::debug!("new connect {:?}", req); - // you can handle request with - // req.authority() - // req.path() - // and you can validate this request with req.origin() - session.accept_connect_request(&mut self.conn, None).unwrap(); - }, - webtransport::ServerEvent::StreamData(stream_id) => { - log::debug!("on stream data {}", stream_id); - - let mut buf = vec![0; 10000]; - while let Ok(len) = - session.recv_stream_data(&mut self.conn, stream_id, &mut buf) - { - let stream_data = &buf[0..len]; - log::debug!("stream data {:?}", stream_data); - -/* - // handle stream_data - if (stream_id & 0x2) == 0 { - // bidirectional stream - // you can send data through this stream. - session - .send_stream_data(&mut self.conn, stream_id, stream_data) - .unwrap(); - } else { - // you cannot send data through client-initiated-unidirectional-stream. - // so, open new server-initiated-unidirectional-stream, and send data - // through it. - let new_stream_id = - session.open_stream(&mut self.conn, false).unwrap(); - session - .send_stream_data(&mut self.conn, new_stream_id, stream_data) - .unwrap(); - } - */ - } - } - - webtransport::ServerEvent::StreamFinished(stream_id) => { - // A WebTrnasport stream finished, handle it. - log::debug!("stream finished {}", stream_id); - } - - webtransport::ServerEvent::Datagram => { - log::debug!("datagram"); - } - - webtransport::ServerEvent::SessionReset(e) => { - log::debug!("session reset {}", e); - // Peer reset session stream, handle it. - } - - webtransport::ServerEvent::SessionFinished => { - log::debug!("session finished"); - // Peer finish session stream, handle it. - } - - webtransport::ServerEvent::SessionGoAway => { - log::debug!("session go away"); - // Peer signalled it is going away, handle it. - } - - webtransport::ServerEvent::Other(stream_id, event) => { - log::debug!("session other: {} {:?}", stream_id, event); - // Original h3::Event which is not related to WebTransport. - } - } - - Ok(true) - } - -/* - fn poll_source(&mut self) -> anyhow::Result<()> { - let media = match &mut self.media { - Some(m) => m, - None => return Ok(()), - }; - - let fragment = match media.next()? { - Some(f) => f, - None => return Ok(()), - }; - - // Get or create a new stream for each unique segment ID. - let stream_id = match self.segments.entry(fragment.segment_id) { - map::Entry::Occupied(e) => e.into_mut(), - map::Entry::Vacant(e) => { - let stream_id = self.start_stream(&fragment)?; - e.insert(stream_id) - }, - }; - - // Get or create a buffered object for each unique stream ID. - let buffered = match self.streams.entry(*stream_id) { - map::Entry::Occupied(e) => e.into_mut(), - map::Entry::Vacant(e) => e.insert(Buffered::new()), - }; - - let session = match &mut self.session { - Some(s) => s, - None => return Ok(()), - }; - - let data = fragment.data.as_slice(); - - match self.conn.stream_writable(*stream_id, data.len()) { - Ok(true) if buffered.len() == 0 => { - session.send_stream_data(&mut self.conn, *stream_id, data)?; - }, - Ok(_) => buffered.push_back(fragment.data), - Err(quiche::Error::Done) => {}, // stream closed? - Err(e) => anyhow::bail!(e), - }; - - Ok(()) - } - - fn start_stream(&mut self, fragment: &source::Fragment) -> anyhow::Result { - let conn = &mut self.conn; - let session = self.session.as_mut().unwrap(); - - let stream_id = session.open_stream(conn, false)?; - - // TODO: conn.stream_priority(stream_id, urgency, incremental) - - let mut message = message::Message::new(); - if fragment.segment_id == 0 { - message.init = Some(message::Init{ - id: "video".to_string(), - }); - } else { - message.segment = Some(message::Segment{ - init: "video".to_string(), - timestamp: fragment.timestamp, - }); - } - - let data= message.serialize()?; - match conn.stream_writable(stream_id, data.len()) { - Ok(true) => { - session.send_stream_data(conn, stream_id, data.as_slice())?; - }, - Ok(false) => { - let mut buffered = Buffered::new(); - buffered.push_back(data); - - self.streams.insert(stream_id, buffered); - }, - Err(quiche::Error::Done) => {}, - Err(e) => anyhow::bail!(e), - }; - - Ok(stream_id) - } -*/ - - fn poll_streams(&mut self) -> anyhow::Result<()> { - // TODO make sure this loops in priority order - for stream_id in self.conn.writable() { - self.poll_stream(stream_id)?; - } - - // Remove any entry buffered values. - self.streams.retain(|_, buffered| buffered.len() > 0 ); - - Ok(()) - } - - pub fn poll_stream(&mut self, stream_id: u64) -> anyhow::Result<()> { - let buffered = match self.streams.get_mut(&stream_id) { - Some(b) => b, - None => return Ok(()), - }; - - let conn = &mut self.conn; - - let session = match &mut self.session { - Some(s) => s, - None => return Ok(()), - }; - - while let Some(data) = buffered.pop_front() { - match conn.stream_writable(stream_id, data.len()) { - Ok(true) => { - session.send_stream_data(conn, stream_id, data.as_slice())?; - }, - Ok(false) => { - buffered.push_front(data); - return Ok(()); - }, - Err(quiche::Error::Done) => {}, - Err(e) => anyhow::bail!(e), - }; - } - - Ok(()) - } - - pub fn timeout(&self) -> Option { - self.conn.timeout() - } - - pub fn on_timeout(&mut self) { - self.conn.on_timeout() - - // custom stuff here - } -} -*/ \ No newline at end of file