diff --git a/server/src/lib.rs b/server/src/lib.rs index 2c973b8..6715979 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,3 +1,3 @@ -pub mod transport; +pub mod media; pub mod session; -pub mod media; \ No newline at end of file +pub mod transport; diff --git a/server/src/main.rs b/server/src/main.rs index 1ffa226..a6a1119 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,4 +1,4 @@ -use warp::{session,transport}; +use warp::{session, transport}; use clap::Parser; use env_logger; @@ -28,7 +28,7 @@ fn main() -> anyhow::Result<()> { let args = Cli::parse(); - let server_config = transport::Config{ + let server_config = transport::Config { addr: args.addr, cert: args.cert, key: args.key, @@ -36,4 +36,4 @@ fn main() -> anyhow::Result<()> { let mut server = transport::Server::::new(server_config).unwrap(); server.run() -} \ No newline at end of file +} diff --git a/server/src/media/mod.rs b/server/src/media/mod.rs index 80a709f..e7ec5eb 100644 --- a/server/src/media/mod.rs +++ b/server/src/media/mod.rs @@ -1,3 +1,3 @@ mod source; -pub use source::{Fragment,Source}; \ No newline at end of file +pub use source::{Fragment, Source}; diff --git a/server/src/media/source.rs b/server/src/media/source.rs index 0a7d019..6ce9dcb 100644 --- a/server/src/media/source.rs +++ b/server/src/media/source.rs @@ -1,13 +1,13 @@ -use std::{io,fs,time}; use io::Read; -use std::collections::{VecDeque}; +use std::collections::VecDeque; +use std::{fs, io, time}; use std::io::Write; -use mp4; use anyhow; +use mp4; -use mp4::{ReadBox,WriteBox}; +use mp4::{ReadBox, WriteBox}; pub struct Source { // We read the file once, in order, and don't seek backwards. @@ -40,7 +40,7 @@ pub struct Fragment { pub keyframe: bool, // The timestamp of the fragment, in milliseconds, to simulate a live stream. - pub timestamp: Option + pub timestamp: Option, } impl Source { @@ -49,7 +49,7 @@ impl Source { let reader = io::BufReader::new(f); let start = time::Instant::now(); - Ok(Self{ + Ok(Self { reader, start, fragments: VecDeque::new(), @@ -64,7 +64,7 @@ impl Source { }; if self.timeout().is_some() { - return Ok(None) + return Ok(None); } Ok(self.fragments.pop_front()) @@ -84,7 +84,7 @@ impl Source { // Don't return anything until we know the total number of tracks. // To be honest, I didn't expect the borrow checker to allow this, but it does! self.ftyp = atom; - }, + } mp4::BoxType::MoovBox => { // We need to split the moov based on the tracks. let moov = mp4::MoovBox::read_box(&mut reader, header.size)?; @@ -105,7 +105,11 @@ impl Source { // We remove every box for other track IDs. let mut toov = moov.clone(); toov.traks.retain(|t| t.tkhd.track_id == track_id); - toov.mvex.as_mut().expect("missing mvex").trexs.retain(|f| f.track_id == track_id); + toov.mvex + .as_mut() + .expect("missing mvex") + .trexs + .retain(|f| f.track_id == track_id); // Marshal the box. let mut toov_data = Vec::new(); @@ -124,7 +128,7 @@ impl Source { } self.moov = Some(moov); - }, + } mp4::BoxType::MoofBox => { let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; @@ -133,19 +137,19 @@ impl Source { anyhow::bail!("multiple tracks per moof atom") } - self.fragments.push_back(Fragment{ + self.fragments.push_back(Fragment { track: moof.trafs[0].tfhd.track_id, typ: mp4::BoxType::MoofBox, data: atom, keyframe: has_keyframe(&moof), timestamp: first_timestamp(&moof), }) - }, + } mp4::BoxType::MdatBox => { let moof = self.fragments.back().expect("no atom before mdat"); assert!(moof.typ == mp4::BoxType::MoofBox, "no moof before mdat"); - self.fragments.push_back(Fragment{ + self.fragments.push_back(Fragment { track: moof.track, typ: mp4::BoxType::MoofBox, data: atom, @@ -154,8 +158,8 @@ impl Source { }); // We have some media data, return so we can start sending it. - return Ok(()) - }, + return Ok(()); + } _ => anyhow::bail!("unknown top-level atom: {:?}", header.name), } } @@ -167,7 +171,12 @@ impl Source { let timestamp = next.timestamp?; // Find the timescale for the track. - let track = self.moov.as_ref()?.traks.iter().find(|t| t.tkhd.track_id == next.track)?; + let track = self + .moov + .as_ref()? + .traks + .iter() + .find(|t| t.tkhd.track_id == next.track)?; let timescale = track.mdia.mdhd.timescale as u64; let delay = time::Duration::from_millis(1000 * timestamp / timescale); @@ -195,17 +204,21 @@ fn read_box(reader: &mut R) -> anyhow::Result> { 1 => { reader.read_exact(&mut buf)?; let size_large = u64::from_be_bytes(buf); - anyhow::ensure!(size_large >= 16, "impossible extended box size: {}", size_large); + anyhow::ensure!( + size_large >= 16, + "impossible extended box size: {}", + size_large + ); reader.take(size_large - 16) - }, + } 2..=7 => { anyhow::bail!("impossible box size: {}", size) } // Otherwise read based on the size. - size => reader.take(size - 8) + size => reader.take(size - 8), }; // Append to the vector and return it. @@ -238,7 +251,7 @@ fn has_keyframe(moof: &mp4::MoofBox) -> bool { let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample if keyframe && !non_sync { - return true + return true; } } } diff --git a/server/src/session/message.rs b/server/src/session/message.rs index 8718b81..74243e8 100644 --- a/server/src/session/message.rs +++ b/server/src/session/message.rs @@ -36,4 +36,4 @@ impl Message { Ok(out) } -} \ No newline at end of file +} diff --git a/server/src/session/mod.rs b/server/src/session/mod.rs index 031805c..592f525 100644 --- a/server/src/session/mod.rs +++ b/server/src/session/mod.rs @@ -1,4 +1,4 @@ -mod session; mod message; +mod session; -pub use session::Session; \ No newline at end of file +pub use session::Session; diff --git a/server/src/session/session.rs b/server/src/session/session.rs index d01dd00..8b48521 100644 --- a/server/src/session/session.rs +++ b/server/src/session/session.rs @@ -3,8 +3,8 @@ use std::time; use quiche; use quiche::h3::webtransport; -use crate::{media,transport}; use super::message; +use crate::{media, transport}; #[derive(Default)] pub struct Session { @@ -16,7 +16,11 @@ pub struct Session { impl transport::App for Session { // Process any updates to a session. - fn poll(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()> { + fn poll( + &mut self, + conn: &mut quiche::Connection, + session: &mut webtransport::ServerSession, + ) -> anyhow::Result<()> { loop { let event = match session.poll(conn) { Err(webtransport::Error::Done) => break, @@ -39,18 +43,16 @@ impl transport::App for Session { self.media = Some(media); session.accept_connect_request(conn, None).unwrap(); - }, + } webtransport::ServerEvent::StreamData(stream_id) => { let mut buf = vec![0; 10000]; - while let Ok(len) = - session.recv_stream_data(conn, stream_id, &mut buf) - { + while let Ok(len) = session.recv_stream_data(conn, stream_id, &mut buf) { let stream_data = &buf[0..len]; log::debug!("stream data {:?}", stream_data); } } - _ => {}, + _ => {} } } @@ -69,7 +71,11 @@ impl transport::App for Session { } impl Session { - fn poll_source(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()> { + fn poll_source( + &mut self, + conn: &mut quiche::Connection, + session: &mut webtransport::ServerSession, + ) -> anyhow::Result<()> { // Get the media source once the connection is established. let media = match &mut self.media { Some(m) => m, @@ -92,7 +98,7 @@ impl Session { // Encode a JSON header indicating this is the video track. let mut message = message::Message::new(); - message.segment = Some(message::Segment{ + message.segment = Some(message::Segment { init: "video".to_string(), }); @@ -105,13 +111,13 @@ impl Session { self.streams.send(conn, stream_id, &data, false)?; stream_id - }, + } None => { // This is the start of an init segment. // Create a JSON header. let mut message = message::Message::new(); - message.init = Some(message::Init{ + message.init = Some(message::Init { id: "video".to_string(), }); @@ -135,4 +141,4 @@ impl Session { Ok(()) } -} \ No newline at end of file +} diff --git a/server/src/transport/app.rs b/server/src/transport/app.rs index b55ce53..9024448 100644 --- a/server/src/transport/app.rs +++ b/server/src/transport/app.rs @@ -3,6 +3,10 @@ use std::time; use quiche::h3::webtransport; pub trait App: Default { - fn poll(&mut self, conn: &mut quiche::Connection, session: &mut webtransport::ServerSession) -> anyhow::Result<()>; + fn poll( + &mut self, + conn: &mut quiche::Connection, + session: &mut webtransport::ServerSession, + ) -> anyhow::Result<()>; fn timeout(&self) -> Option; } diff --git a/server/src/transport/connection.rs b/server/src/transport/connection.rs index 2fb12d9..e9766dc 100644 --- a/server/src/transport/connection.rs +++ b/server/src/transport/connection.rs @@ -12,4 +12,4 @@ pub struct Connection { pub quiche: quiche::Connection, pub session: Option, pub app: T, -} \ No newline at end of file +} diff --git a/server/src/transport/mod.rs b/server/src/transport/mod.rs index f8d86e8..60ef79d 100644 --- a/server/src/transport/mod.rs +++ b/server/src/transport/mod.rs @@ -1,8 +1,8 @@ -mod server; -mod connection; mod app; +mod connection; +mod server; mod streams; pub use app::App; pub use server::{Config, Server}; -pub use streams::Streams; \ No newline at end of file +pub use streams::Streams; diff --git a/server/src/transport/server.rs b/server/src/transport/server.rs index 6c3eebc..cc9cbe1 100644 --- a/server/src/transport/server.rs +++ b/server/src/transport/server.rs @@ -2,8 +2,8 @@ use std::io; use quiche::h3::webtransport; -use super::connection; use super::app; +use super::connection; const MAX_DATAGRAM_SIZE: usize = 1350; @@ -36,11 +36,9 @@ impl Server { let poll = mio::Poll::new().unwrap(); let events = mio::Events::with_capacity(1024); - poll.registry().register( - &mut socket, - mio::Token(0), - mio::Interest::READABLE, - ).unwrap(); + poll.registry() + .register(&mut socket, mio::Token(0), mio::Interest::READABLE) + .unwrap(); // Generate random values for connection IDs. let rng = ring::rand::SystemRandom::new(); @@ -50,7 +48,8 @@ impl Server { let mut quic = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); quic.load_cert_chain_from_pem_file(&config.cert).unwrap(); quic.load_priv_key_from_pem_file(&config.key).unwrap(); - quic.set_application_protos(quiche::h3::APPLICATION_PROTOCOL).unwrap(); + quic.set_application_protos(quiche::h3::APPLICATION_PROTOCOL) + .unwrap(); quic.set_max_idle_timeout(5000); quic.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE); quic.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE); @@ -92,17 +91,21 @@ impl Server { // Find the shorter timeout from all the active connections. // // TODO: use event loop that properly supports timers - let timeout = self.conns.values().filter_map(|c| { - let timeout = c.quiche.timeout(); - let expires = c.app.timeout(); + let timeout = self + .conns + .values() + .filter_map(|c| { + let timeout = c.quiche.timeout(); + let expires = c.app.timeout(); - match (timeout, expires) { - (Some(a), Some(b)) => Some(a.min(b)), - (Some(a), None) => Some(a), - (None, Some(b)) => Some(b), - (None, None) => None, - } - }).min(); + match (timeout, expires) { + (Some(a), Some(b)) => Some(a.min(b)), + (Some(a), None) => Some(a), + (None, Some(b)) => Some(b), + (None, None) => None, + } + }) + .min(); self.poll.poll(&mut self.events, timeout).unwrap(); @@ -120,7 +123,7 @@ impl Server { // Reads packets from the socket, updating any internal connection state. fn receive(&mut self) -> anyhow::Result<()> { - let mut src= [0; MAX_DATAGRAM_SIZE]; + let mut src = [0; MAX_DATAGRAM_SIZE]; // Try reading any data currently available on the socket. loop { @@ -150,20 +153,24 @@ impl Server { conn.quiche.recv(src, info)?; if conn.session.is_none() && conn.quiche.is_established() { - conn.session = Some(webtransport::ServerSession::with_transport(&mut conn.quiche)?) + conn.session = Some(webtransport::ServerSession::with_transport( + &mut conn.quiche, + )?) } - continue + continue; } else if let Some(conn) = self.conns.get_mut(&conn_id) { // 1-RTT traffic. conn.quiche.recv(src, info)?; // TODO is this needed here? if conn.session.is_none() && conn.quiche.is_established() { - conn.session = Some(webtransport::ServerSession::with_transport(&mut conn.quiche)?) + conn.session = Some(webtransport::ServerSession::with_transport( + &mut conn.quiche, + )?) } - continue + continue; } if hdr.ty != quiche::Type::Initial { @@ -174,10 +181,10 @@ impl Server { if !quiche::version_is_supported(hdr.version) { let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut dst).unwrap(); - let dst= &dst[..len]; + let dst = &dst[..len]; self.socket.send_to(dst, from).unwrap(); - continue + continue; } let mut scid = [0; quiche::MAX_CONN_ID_LEN]; @@ -202,10 +209,10 @@ impl Server { ) .unwrap(); - let dst= &dst[..len]; + let dst = &dst[..len]; self.socket.send_to(dst, from).unwrap(); - continue + continue; } let odcid = validate_token(&from, token); @@ -222,21 +229,23 @@ impl Server { // Reuse the source connection ID we sent in the Retry packet, // instead of changing it again. - let conn_id= hdr.dcid.clone(); + let conn_id = hdr.dcid.clone(); let local_addr = self.socket.local_addr().unwrap(); - let mut conn = quiche::accept(&conn_id, odcid.as_ref(), local_addr, from, &mut self.quic)?; + let mut conn = + quiche::accept(&conn_id, odcid.as_ref(), local_addr, from, &mut self.quic)?; // Process potentially coalesced packets. conn.recv(src, info)?; - let user = connection::Connection{ + let user = connection::Connection { quiche: conn, session: None, app: T::default(), }; - self.conns.insert(user.quiche.source_id().into_owned(), user); + self.conns + .insert(user.quiche.source_id().into_owned(), user); } } @@ -262,7 +271,7 @@ impl Server { for conn in self.conns.values_mut() { loop { - let (size , info) = match conn.quiche.send(&mut pkt) { + let (size, info) = match conn.quiche.send(&mut pkt) { Ok(v) => v, Err(quiche::Error::Done) => return Ok(()), Err(e) => return Err(e.into()), @@ -283,7 +292,7 @@ impl Server { pub fn cleanup(&mut self) { // Garbage collect closed connections. - self.conns.retain(|_, ref mut c| !c.quiche.is_closed() ); + self.conns.retain(|_, ref mut c| !c.quiche.is_closed()); } } @@ -319,7 +328,8 @@ fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec { /// Note that this function is only an example and doesn't do any cryptographic /// authenticate of the token. *It should not be used in production system*. fn validate_token<'a>( - src: &std::net::SocketAddr, token: &'a [u8], + src: &std::net::SocketAddr, + token: &'a [u8], ) -> Option> { if token.len() < 6 { return None; @@ -341,4 +351,4 @@ fn validate_token<'a>( } Some(quiche::ConnectionId::from_ref(&token[addr.len()..])) -} \ No newline at end of file +} diff --git a/server/src/transport/streams.rs b/server/src/transport/streams.rs index 6b78233..a69717c 100644 --- a/server/src/transport/streams.rs +++ b/server/src/transport/streams.rs @@ -1,8 +1,8 @@ use std::collections::hash_map as hmap; use std::collections::VecDeque; -use quiche; use anyhow; +use quiche; #[derive(Default)] pub struct Streams { @@ -16,14 +16,20 @@ struct State { } impl Streams { - pub fn send(&mut self, conn: &mut quiche::Connection, id: u64, buf: &[u8], fin: bool) -> anyhow::Result<()> { + pub fn send( + &mut self, + conn: &mut quiche::Connection, + id: u64, + buf: &[u8], + fin: bool, + ) -> anyhow::Result<()> { match self.lookup.entry(id) { hmap::Entry::Occupied(mut entry) => { // Add to the existing buffer. let state = entry.get_mut(); state.buffer.extend(buf); state.fin |= fin; - }, + } hmap::Entry::Vacant(entry) => { let size = conn.stream_send(id, buf, fin)?; @@ -32,9 +38,9 @@ impl Streams { let mut buffer = VecDeque::with_capacity(buf.len()); buffer.extend(&buf[size..]); - entry.insert(State{buffer, fin}); + entry.insert(State { buffer, fin }); } - }, + } }; Ok(()) @@ -58,7 +64,7 @@ impl Streams { let size = conn.stream_send(id, parts.0, false)?; if size == 0 { // No more space available for this stream. - continue 'outer + continue 'outer; } // Remove the bytes that were written. @@ -76,4 +82,4 @@ impl Streams { Ok(()) } -} \ No newline at end of file +}