diff --git a/server/src/main.rs b/server/src/main.rs index a6a1119..3d4d422 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,7 +1,6 @@ use warp::{session, transport}; use clap::Parser; -use env_logger; /// Search for a pattern in a file and display the lines that contain it. #[derive(Parser)] diff --git a/server/src/media/source.rs b/server/src/media/source.rs index 6ce9dcb..38cfb97 100644 --- a/server/src/media/source.rs +++ b/server/src/media/source.rs @@ -58,7 +58,7 @@ impl Source { }) } - pub fn next(&mut self) -> anyhow::Result> { + pub fn get(&mut self) -> anyhow::Result> { if self.fragments.is_empty() { self.parse()?; }; @@ -116,7 +116,7 @@ impl Source { toov.write_box(&mut toov_data)?; let mut file = std::fs::File::create(format!("track{}.mp4", track_id))?; - file.write_all(toov_data.as_slice()); + file.write_all(toov_data.as_slice())?; self.fragments.push_back(Fragment { track: track_id, diff --git a/server/src/session/mod.rs b/server/src/session/mod.rs index 592f525..47e0074 100644 --- a/server/src/session/mod.rs +++ b/server/src/session/mod.rs @@ -1,4 +1,145 @@ mod message; -mod session; -pub use session::Session; +use std::time; + +use quiche; +use quiche::h3::webtransport; + +use crate::{media, transport}; + +#[derive(Default)] +pub struct Session { + media: Option, + stream_id: Option, // stream ID of the current segment + + streams: transport::Streams, // An easy way of buffering stream data. +} + +impl transport::App for Session { + // Process any updates to a session. + fn poll( + &mut self, + conn: &mut quiche::Connection, + session: &mut webtransport::ServerSession, + ) -> anyhow::Result<()> { + loop { + let event = match session.poll(conn) { + Err(webtransport::Error::Done) => break, + Err(e) => return Err(e.into()), + Ok(e) => e, + }; + + log::debug!("webtransport event: {:?}", event); + + match event { + webtransport::ServerEvent::ConnectRequest(req) => { + log::debug!("new connect {:?}", req); + // you can handle request with + // req.authority() + // req.path() + // and you can validate this request with req.origin() + + // TODO + let media = media::Source::new("../media/fragmented.mp4")?; + self.media = Some(media); + + session.accept_connect_request(conn, None).unwrap(); + } + webtransport::ServerEvent::StreamData(stream_id) => { + let mut buf = vec![0; 10000]; + while let Ok(len) = session.recv_stream_data(conn, stream_id, &mut buf) { + let stream_data = &buf[0..len]; + log::debug!("stream data {:?}", stream_data); + } + } + + _ => {} + } + } + + // Send any pending stream data. + self.streams.poll(conn)?; + + // Fetch the next media fragment, possibly queuing up stream data. + self.poll_source(conn, session)?; + + Ok(()) + } + + fn timeout(&self) -> Option { + self.media.as_ref().and_then(|m| m.timeout()) + } +} + +impl Session { + fn poll_source( + &mut self, + conn: &mut quiche::Connection, + session: &mut webtransport::ServerSession, + ) -> anyhow::Result<()> { + // Get the media source once the connection is established. + let media = match &mut self.media { + Some(m) => m, + None => return Ok(()), + }; + + // Get the next media fragment. + let fragment = match media.get()? { + Some(f) => f, + None => return Ok(()), + }; + + // Check if we have already created a stream for this fragment. + let stream_id = match self.stream_id { + Some(old_stream_id) if fragment.keyframe => { + // This is the start of a new segment. + + // Close the prior stream. + self.streams.send(conn, old_stream_id, &[], true)?; + + // Encode a JSON header indicating this is the video track. + let mut message = message::Message::new(); + message.segment = Some(message::Segment { + init: "video".to_string(), + }); + + // Open a new stream. + let stream_id = session.open_stream(conn, false)?; + // TODO: conn.stream_priority(stream_id, urgency, incremental) + + // Write the header. + let data = message.serialize()?; + self.streams.send(conn, stream_id, &data, false)?; + + stream_id + } + None => { + // This is the start of an init segment. + + // Create a JSON header. + let mut message = message::Message::new(); + message.init = Some(message::Init { + id: "video".to_string(), + }); + + let data = message.serialize()?; + + // Create a new stream and write the header. + let stream_id = session.open_stream(conn, false)?; + self.streams.send(conn, stream_id, data.as_slice(), false)?; + + stream_id + } + Some(stream_id) => stream_id, // Continuation of init or segment + }; + + // Write the current fragment. + let data = fragment.data.as_slice(); + self.streams.send(conn, stream_id, data, false)?; + + // Save the stream ID for the next fragment. + self.stream_id = Some(stream_id); + + Ok(()) + } +} diff --git a/server/src/session/session.rs b/server/src/session/session.rs index 8b48521..8b13789 100644 --- a/server/src/session/session.rs +++ b/server/src/session/session.rs @@ -1,144 +1 @@ -use std::time; -use quiche; -use quiche::h3::webtransport; - -use super::message; -use crate::{media, transport}; - -#[derive(Default)] -pub struct Session { - media: Option, - stream_id: Option, // stream ID of the current segment - - streams: transport::Streams, // An easy way of buffering stream data. -} - -impl transport::App for Session { - // Process any updates to a session. - fn poll( - &mut self, - conn: &mut quiche::Connection, - session: &mut webtransport::ServerSession, - ) -> anyhow::Result<()> { - loop { - let event = match session.poll(conn) { - Err(webtransport::Error::Done) => break, - Err(e) => return Err(e.into()), - Ok(e) => e, - }; - - log::debug!("webtransport event: {:?}", event); - - match event { - webtransport::ServerEvent::ConnectRequest(req) => { - log::debug!("new connect {:?}", req); - // you can handle request with - // req.authority() - // req.path() - // and you can validate this request with req.origin() - - // TODO - let media = media::Source::new("../media/fragmented.mp4")?; - self.media = Some(media); - - session.accept_connect_request(conn, None).unwrap(); - } - webtransport::ServerEvent::StreamData(stream_id) => { - let mut buf = vec![0; 10000]; - while let Ok(len) = session.recv_stream_data(conn, stream_id, &mut buf) { - let stream_data = &buf[0..len]; - log::debug!("stream data {:?}", stream_data); - } - } - - _ => {} - } - } - - // Send any pending stream data. - self.streams.poll(conn)?; - - // Fetch the next media fragment, possibly queuing up stream data. - self.poll_source(conn, session)?; - - Ok(()) - } - - fn timeout(&self) -> Option { - self.media.as_ref().and_then(|m| m.timeout()) - } -} - -impl Session { - fn poll_source( - &mut self, - conn: &mut quiche::Connection, - session: &mut webtransport::ServerSession, - ) -> anyhow::Result<()> { - // Get the media source once the connection is established. - let media = match &mut self.media { - Some(m) => m, - None => return Ok(()), - }; - - // Get the next media fragment. - let fragment = match media.next()? { - Some(f) => f, - None => return Ok(()), - }; - - // Check if we have already created a stream for this fragment. - let stream_id = match self.stream_id { - Some(old_stream_id) if fragment.keyframe => { - // This is the start of a new segment. - - // Close the prior stream. - self.streams.send(conn, old_stream_id, &[], true)?; - - // Encode a JSON header indicating this is the video track. - let mut message = message::Message::new(); - message.segment = Some(message::Segment { - init: "video".to_string(), - }); - - // Open a new stream. - let stream_id = session.open_stream(conn, false)?; - // TODO: conn.stream_priority(stream_id, urgency, incremental) - - // Write the header. - let data = message.serialize()?; - self.streams.send(conn, stream_id, &data, false)?; - - stream_id - } - None => { - // This is the start of an init segment. - - // Create a JSON header. - let mut message = message::Message::new(); - message.init = Some(message::Init { - id: "video".to_string(), - }); - - let data = message.serialize()?; - - // Create a new stream and write the header. - let stream_id = session.open_stream(conn, false)?; - self.streams.send(conn, stream_id, data.as_slice(), false)?; - - stream_id - } - Some(stream_id) => stream_id, // Continuation of init or segment - }; - - // Write the current fragment. - let data = fragment.data.as_slice(); - self.streams.send(conn, stream_id, data, false)?; - - // Save the stream ID for the next fragment. - self.stream_id = Some(stream_id); - - Ok(()) - } -} diff --git a/server/src/transport/server.rs b/server/src/transport/server.rs index cc9cbe1..202d526 100644 --- a/server/src/transport/server.rs +++ b/server/src/transport/server.rs @@ -250,7 +250,7 @@ impl Server { } pub fn app(&mut self) -> anyhow::Result<()> { - for (_, conn) in &mut self.conns { + for conn in self.conns.values_mut() { if let Some(session) = &mut conn.session { if let Err(e) = conn.app.poll(&mut conn.quiche, session) { // Close the connection on any application error @@ -279,7 +279,7 @@ impl Server { let pkt = &pkt[..size]; - match self.socket.send_to(&pkt, info.to) { + match self.socket.send_to(pkt, info.to) { Err(err) if err.kind() == io::ErrorKind::WouldBlock => break, Err(err) => return Err(err.into()), Ok(_) => (), diff --git a/server/src/transport/streams.rs b/server/src/transport/streams.rs index a69717c..215731b 100644 --- a/server/src/transport/streams.rs +++ b/server/src/transport/streams.rs @@ -57,7 +57,7 @@ impl Streams { let state = entry.get_mut(); // Keep reading from the buffer until it's empty. - while state.buffer.len() > 0 { + while !state.buffer.is_empty() { // VecDeque is a ring buffer, so we can't write the whole thing at once. let parts = state.buffer.as_slices();