diff --git a/server/src/media/mod.rs b/server/src/media/mod.rs index f73c588..e7ec5eb 100644 --- a/server/src/media/mod.rs +++ b/server/src/media/mod.rs @@ -1,3 +1,3 @@ mod source; -pub use source::{Fragment, Source}; \ No newline at end of file +pub use source::{Fragment, Source}; diff --git a/server/src/media/source.rs b/server/src/media/source.rs index b8579bc..ebbb5e4 100644 --- a/server/src/media/source.rs +++ b/server/src/media/source.rs @@ -1,6 +1,6 @@ -use std::{fs, io, time}; -use std::collections::{HashMap,VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::io::Read; +use std::{fs, io, time}; use anyhow; @@ -18,7 +18,7 @@ pub struct Source { pub init: Vec, // The timescale used for each track. - timescale: HashMap, + timescales: HashMap, // Any fragments parsed and ready to be returned by next(). fragments: VecDeque, @@ -60,16 +60,12 @@ impl Source { // Parse the moov box so we can detect the timescales for each track. let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?; - let timescale = moov.traks - .iter() - .map(|trak| (trak.tkhd.track_id, trak.mdia.mdhd.timescale)) - .collect(); - Ok(Self{ + Ok(Self { reader, start, init, - timescale, + timescales: timescales(&moov), fragments: VecDeque::new(), }) } @@ -94,7 +90,9 @@ impl Source { let header = mp4::BoxHeader::read(&mut reader)?; match header.name { - mp4::BoxType::FtypBox | mp4::BoxType::MoovBox => anyhow::bail!("must call init first"), + mp4::BoxType::FtypBox | mp4::BoxType::MoovBox => { + anyhow::bail!("must call init first") + } mp4::BoxType::MoofBox => { let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; @@ -136,7 +134,7 @@ impl Source { let timestamp = next.timestamp; // Find the timescale for the track. - let timescale = self.timescale.get(&next.track_id).unwrap(); + let timescale = self.timescales.get(&next.track_id).unwrap(); let delay = time::Duration::from_millis(1000 * timestamp / *timescale as u64); let elapsed = self.start.elapsed(); @@ -222,4 +220,11 @@ fn has_keyframe(moof: &mp4::MoofBox) -> bool { fn first_timestamp(moof: &mp4::MoofBox) -> Option { Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time) -} \ No newline at end of file +} + +fn timescales(moov: &mp4::MoovBox) -> HashMap { + moov.traks + .iter() + .map(|trak| (trak.tkhd.track_id, trak.mdia.mdhd.timescale)) + .collect() +} diff --git a/server/src/session/mod.rs b/server/src/session/mod.rs index 88360b7..b3b38e7 100644 --- a/server/src/session/mod.rs +++ b/server/src/session/mod.rs @@ -1,7 +1,7 @@ mod message; -use std::time; use std::collections::hash_map as hmap; +use std::time; use quiche; use quiche::h3::webtransport; @@ -29,11 +29,8 @@ impl transport::App for Session { Ok(e) => e, }; - log::debug!("webtransport event: {:?}", event); - match event { - webtransport::ServerEvent::ConnectRequest(req) => { - log::debug!("new connect {:?}", req); + webtransport::ServerEvent::ConnectRequest(_req) => { // you can handle request with // req.authority() // req.path() @@ -46,7 +43,7 @@ impl transport::App for Session { // Create a JSON header. let mut message = message::Message::new(); - message.init = Some(message::Init{}); + message.init = Some(message::Init {}); let data = message.serialize()?; // Create a new stream and write the header. @@ -59,8 +56,7 @@ impl transport::App for Session { webtransport::ServerEvent::StreamData(stream_id) => { let mut buf = vec![0; 10000]; while let Ok(len) = session.recv_stream_data(conn, stream_id, &mut buf) { - let stream_data = &buf[0..len]; - log::debug!("stream data {:?}", stream_data); + let _stream_data = &buf[0..len]; } } @@ -105,7 +101,7 @@ impl Session { Some(stream_id) if fragment.keyframe => { self.streams.send(conn, *stream_id, &[], true)?; None - }, + } // Use the existing stream Some(stream_id) => Some(*stream_id), @@ -143,7 +139,7 @@ impl Session { self.tracks.insert(fragment.track_id, stream_id); stream_id - }, + } }; // Write the current fragment. diff --git a/server/src/transport/streams.rs b/server/src/transport/streams.rs index ccb929b..149e5de 100644 --- a/server/src/transport/streams.rs +++ b/server/src/transport/streams.rs @@ -26,21 +26,25 @@ impl Streams { fin: bool, ) -> anyhow::Result<()> { if buf.is_empty() && !fin { - return Ok(()) + return Ok(()); } // Get the index of the stream, or add it to the list of streams. - let pos = self.ordered.iter().position(|s| s.id == id).unwrap_or_else(|| { - // Create a new stream - let stream = Stream{ - id, - buffer: VecDeque::new(), - fin: false, - order: 0, // Default to highest priority until send_order is called. - }; + let pos = self + .ordered + .iter() + .position(|s| s.id == id) + .unwrap_or_else(|| { + // Create a new stream + let stream = Stream { + id, + buffer: VecDeque::new(), + fin: false, + order: 0, // Default to highest priority until send_order is called. + }; - self.insert(conn, stream) - }); + self.insert(conn, stream) + }); let stream = &mut self.ordered[pos]; @@ -93,7 +97,8 @@ impl Streams { // Remove streams that are done. // No need to reprioritize, since the streams are still in order order. - self.ordered.retain(|stream| !stream.buffer.is_empty() || !stream.fin); + self.ordered + .retain(|stream| !stream.buffer.is_empty() || !stream.fin); Ok(()) } @@ -105,7 +110,7 @@ impl Streams { Some(pos) => self.ordered.remove(pos), // This is a new stream, insert it into the list. - None => Stream{ + None => Stream { id, buffer: VecDeque::new(), fin: false, @@ -120,7 +125,10 @@ impl Streams { fn insert(&mut self, conn: &mut quiche::Connection, stream: Stream) -> usize { // Look for the position to insert the stream. - let pos = match self.ordered.binary_search_by_key(&stream.order, |s| s.order) { + let pos = match self + .ordered + .binary_search_by_key(&stream.order, |s| s.order) + { Ok(pos) | Err(pos) => pos, }; @@ -129,7 +137,7 @@ impl Streams { // Reprioritize all later streams. // TODO we can avoid this if stream_priorty takes a u64 for (i, stream) in self.ordered[pos..].iter().enumerate() { - _ = conn.stream_priority(stream.id, (pos+i) as u8, true); + _ = conn.stream_priority(stream.id, (pos + i) as u8, true); } pos