cargo fmt

This commit is contained in:
Luke Curley 2023-05-09 14:24:14 -06:00
parent 28f5b97308
commit 0f4d823d39
4 changed files with 47 additions and 38 deletions

View File

@ -1,6 +1,6 @@
use std::{fs, io, time}; use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap,VecDeque};
use std::io::Read; use std::io::Read;
use std::{fs, io, time};
use anyhow; use anyhow;
@ -18,7 +18,7 @@ pub struct Source {
pub init: Vec<u8>, pub init: Vec<u8>,
// The timescale used for each track. // The timescale used for each track.
timescale: HashMap<u32, u32>, timescales: HashMap<u32, u32>,
// Any fragments parsed and ready to be returned by next(). // Any fragments parsed and ready to be returned by next().
fragments: VecDeque<Fragment>, fragments: VecDeque<Fragment>,
@ -60,16 +60,12 @@ impl Source {
// Parse the moov box so we can detect the timescales for each track. // Parse the moov box so we can detect the timescales for each track.
let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?; let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?;
let timescale = moov.traks
.iter()
.map(|trak| (trak.tkhd.track_id, trak.mdia.mdhd.timescale))
.collect();
Ok(Self{ Ok(Self {
reader, reader,
start, start,
init, init,
timescale, timescales: timescales(&moov),
fragments: VecDeque::new(), fragments: VecDeque::new(),
}) })
} }
@ -94,7 +90,9 @@ impl Source {
let header = mp4::BoxHeader::read(&mut reader)?; let header = mp4::BoxHeader::read(&mut reader)?;
match header.name { match header.name {
mp4::BoxType::FtypBox | mp4::BoxType::MoovBox => anyhow::bail!("must call init first"), mp4::BoxType::FtypBox | mp4::BoxType::MoovBox => {
anyhow::bail!("must call init first")
}
mp4::BoxType::MoofBox => { mp4::BoxType::MoofBox => {
let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
@ -136,7 +134,7 @@ impl Source {
let timestamp = next.timestamp; let timestamp = next.timestamp;
// Find the timescale for the track. // Find the timescale for the track.
let timescale = self.timescale.get(&next.track_id).unwrap(); let timescale = self.timescales.get(&next.track_id).unwrap();
let delay = time::Duration::from_millis(1000 * timestamp / *timescale as u64); let delay = time::Duration::from_millis(1000 * timestamp / *timescale as u64);
let elapsed = self.start.elapsed(); let elapsed = self.start.elapsed();
@ -223,3 +221,10 @@ fn has_keyframe(moof: &mp4::MoofBox) -> bool {
fn first_timestamp(moof: &mp4::MoofBox) -> Option<u64> { fn first_timestamp(moof: &mp4::MoofBox) -> Option<u64> {
Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time) Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time)
} }
fn timescales(moov: &mp4::MoovBox) -> HashMap<u32, u32> {
moov.traks
.iter()
.map(|trak| (trak.tkhd.track_id, trak.mdia.mdhd.timescale))
.collect()
}

View File

@ -1,7 +1,7 @@
mod message; mod message;
use std::time;
use std::collections::hash_map as hmap; use std::collections::hash_map as hmap;
use std::time;
use quiche; use quiche;
use quiche::h3::webtransport; use quiche::h3::webtransport;
@ -29,11 +29,8 @@ impl transport::App for Session {
Ok(e) => e, Ok(e) => e,
}; };
log::debug!("webtransport event: {:?}", event);
match event { match event {
webtransport::ServerEvent::ConnectRequest(req) => { webtransport::ServerEvent::ConnectRequest(_req) => {
log::debug!("new connect {:?}", req);
// you can handle request with // you can handle request with
// req.authority() // req.authority()
// req.path() // req.path()
@ -46,7 +43,7 @@ impl transport::App for Session {
// Create a JSON header. // Create a JSON header.
let mut message = message::Message::new(); let mut message = message::Message::new();
message.init = Some(message::Init{}); message.init = Some(message::Init {});
let data = message.serialize()?; let data = message.serialize()?;
// Create a new stream and write the header. // Create a new stream and write the header.
@ -59,8 +56,7 @@ impl transport::App for Session {
webtransport::ServerEvent::StreamData(stream_id) => { webtransport::ServerEvent::StreamData(stream_id) => {
let mut buf = vec![0; 10000]; let mut buf = vec![0; 10000];
while let Ok(len) = session.recv_stream_data(conn, stream_id, &mut buf) { while let Ok(len) = session.recv_stream_data(conn, stream_id, &mut buf) {
let stream_data = &buf[0..len]; let _stream_data = &buf[0..len];
log::debug!("stream data {:?}", stream_data);
} }
} }
@ -105,7 +101,7 @@ impl Session {
Some(stream_id) if fragment.keyframe => { Some(stream_id) if fragment.keyframe => {
self.streams.send(conn, *stream_id, &[], true)?; self.streams.send(conn, *stream_id, &[], true)?;
None None
}, }
// Use the existing stream // Use the existing stream
Some(stream_id) => Some(*stream_id), Some(stream_id) => Some(*stream_id),
@ -143,7 +139,7 @@ impl Session {
self.tracks.insert(fragment.track_id, stream_id); self.tracks.insert(fragment.track_id, stream_id);
stream_id stream_id
}, }
}; };
// Write the current fragment. // Write the current fragment.

View File

@ -26,13 +26,17 @@ impl Streams {
fin: bool, fin: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if buf.is_empty() && !fin { if buf.is_empty() && !fin {
return Ok(()) return Ok(());
} }
// Get the index of the stream, or add it to the list of streams. // Get the index of the stream, or add it to the list of streams.
let pos = self.ordered.iter().position(|s| s.id == id).unwrap_or_else(|| { let pos = self
.ordered
.iter()
.position(|s| s.id == id)
.unwrap_or_else(|| {
// Create a new stream // Create a new stream
let stream = Stream{ let stream = Stream {
id, id,
buffer: VecDeque::new(), buffer: VecDeque::new(),
fin: false, fin: false,
@ -93,7 +97,8 @@ impl Streams {
// Remove streams that are done. // Remove streams that are done.
// No need to reprioritize, since the streams are still in order order. // No need to reprioritize, since the streams are still in order order.
self.ordered.retain(|stream| !stream.buffer.is_empty() || !stream.fin); self.ordered
.retain(|stream| !stream.buffer.is_empty() || !stream.fin);
Ok(()) Ok(())
} }
@ -105,7 +110,7 @@ impl Streams {
Some(pos) => self.ordered.remove(pos), Some(pos) => self.ordered.remove(pos),
// This is a new stream, insert it into the list. // This is a new stream, insert it into the list.
None => Stream{ None => Stream {
id, id,
buffer: VecDeque::new(), buffer: VecDeque::new(),
fin: false, fin: false,
@ -120,7 +125,10 @@ impl Streams {
fn insert(&mut self, conn: &mut quiche::Connection, stream: Stream) -> usize { fn insert(&mut self, conn: &mut quiche::Connection, stream: Stream) -> usize {
// Look for the position to insert the stream. // Look for the position to insert the stream.
let pos = match self.ordered.binary_search_by_key(&stream.order, |s| s.order) { let pos = match self
.ordered
.binary_search_by_key(&stream.order, |s| s.order)
{
Ok(pos) | Err(pos) => pos, Ok(pos) | Err(pos) => pos,
}; };
@ -129,7 +137,7 @@ impl Streams {
// Reprioritize all later streams. // Reprioritize all later streams.
// TODO we can avoid this if stream_priorty takes a u64 // TODO we can avoid this if stream_priorty takes a u64
for (i, stream) in self.ordered[pos..].iter().enumerate() { for (i, stream) in self.ordered[pos..].iter().enumerate() {
_ = conn.stream_priority(stream.id, (pos+i) as u8, true); _ = conn.stream_priority(stream.id, (pos + i) as u8, true);
} }
pos pos