From 28f5b973083cad5b42def2f78c1a1c4402bb6f42 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 9 May 2023 10:29:39 -0600 Subject: [PATCH] Implement prioritization. Not tested. --- server/src/media/source.rs | 8 +- server/src/session/mod.rs | 11 ++- server/src/transport/streams.rs | 128 ++++++++++++++++++++++---------- 3 files changed, 103 insertions(+), 44 deletions(-) diff --git a/server/src/media/source.rs b/server/src/media/source.rs index e639ce6..b8579bc 100644 --- a/server/src/media/source.rs +++ b/server/src/media/source.rs @@ -35,7 +35,7 @@ pub struct Fragment { pub keyframe: bool, // The timestamp of the fragment, in milliseconds, to simulate a live stream. - pub timestamp: Option, + pub timestamp: u64, } impl Source { @@ -107,7 +107,7 @@ impl Source { track_id: moof.trafs[0].tfhd.track_id, data: atom, keyframe: has_keyframe(&moof), - timestamp: first_timestamp(&moof), + timestamp: first_timestamp(&moof).expect("couldn't find timestamp"), }) } mp4::BoxType::MdatBox => { @@ -117,7 +117,7 @@ impl Source { track_id: moof.track_id, data: atom, keyframe: false, - timestamp: None, + timestamp: moof.timestamp, }); // We have some media data, return so we can start sending it. @@ -133,7 +133,7 @@ impl Source { // Simulate a live stream by sleeping until the next timestamp in the media. pub fn timeout(&self) -> Option { let next = self.fragments.front()?; - let timestamp = next.timestamp?; + let timestamp = next.timestamp; // Find the timescale for the track. let timescale = self.timescale.get(&next.track_id).unwrap(); diff --git a/server/src/session/mod.rs b/server/src/session/mod.rs index 0d59e8e..88360b7 100644 --- a/server/src/session/mod.rs +++ b/server/src/session/mod.rs @@ -120,11 +120,17 @@ impl Session { // Open a new stream. None => { + // Create a new unidirectional stream. let stream_id = session.open_stream(conn, false)?; - // TODO: conn.stream_priority(stream_id, urgency, incremental) + + // Set the stream priority to be equal to the timestamp. + // We subtract from u64::MAX so newer media is sent important. + // TODO prioritize audio + let order = u64::MAX - fragment.timestamp; + self.streams.send_order(conn, stream_id, order); // Encode a JSON header indicating this is a new track. - let mut message = message::Message::new(); + let mut message: message::Message = message::Message::new(); message.segment = Some(message::Segment { track_id: fragment.track_id, }); @@ -133,6 +139,7 @@ impl Session { let data = message.serialize()?; self.streams.send(conn, stream_id, &data, false)?; + // Keep a mapping from the track id to the current stream id. self.tracks.insert(fragment.track_id, stream_id); stream_id diff --git a/server/src/transport/streams.rs b/server/src/transport/streams.rs index 215731b..ccb929b 100644 --- a/server/src/transport/streams.rs +++ b/server/src/transport/streams.rs @@ -1,4 +1,3 @@ -use std::collections::hash_map as hmap; use std::collections::VecDeque; use anyhow; @@ -6,16 +5,19 @@ use quiche; #[derive(Default)] pub struct Streams { - lookup: hmap::HashMap, + ordered: Vec, } -#[derive(Default)] -struct State { +struct Stream { + id: u64, + order: u64, + buffer: VecDeque, fin: bool, } impl Streams { + // Write the data to the given stream, buffering it if needed. pub fn send( &mut self, conn: &mut quiche::Connection, @@ -23,63 +25,113 @@ impl Streams { buf: &[u8], fin: bool, ) -> anyhow::Result<()> { - match self.lookup.entry(id) { - hmap::Entry::Occupied(mut entry) => { - // Add to the existing buffer. - let state = entry.get_mut(); - state.buffer.extend(buf); - state.fin |= fin; - } - hmap::Entry::Vacant(entry) => { - let size = conn.stream_send(id, buf, fin)?; + if buf.is_empty() && !fin { + return Ok(()) + } - if size < buf.len() { - // Short write, save the rest for later. - let mut buffer = VecDeque::with_capacity(buf.len()); - buffer.extend(&buf[size..]); + // Get the index of the stream, or add it to the list of streams. + let pos = self.ordered.iter().position(|s| s.id == id).unwrap_or_else(|| { + // Create a new stream + let stream = Stream{ + id, + buffer: VecDeque::new(), + fin: false, + order: 0, // Default to highest priority until send_order is called. + }; - entry.insert(State { buffer, fin }); - } - } + self.insert(conn, stream) + }); + + let stream = &mut self.ordered[pos]; + + // Check if we've already closed the stream, just in case. + if stream.fin && !buf.is_empty() { + anyhow::bail!("stream is already finished"); + } + + // If there's no data buffered, try to write it immediately. + let size = if stream.buffer.is_empty() { + conn.stream_send(id, buf, fin)? + } else { + 0 }; + if size < buf.len() { + // Short write, save the rest for later. + stream.buffer.extend(&buf[size..]); + } + + stream.fin |= fin; + Ok(()) } + // Flush any pending stream data. pub fn poll(&mut self, conn: &mut quiche::Connection) -> anyhow::Result<()> { - 'outer: for id in conn.writable() { - // Check if there's any buffered data for this stream. - let mut entry = match self.lookup.entry(id) { - hmap::Entry::Occupied(entry) => entry, - hmap::Entry::Vacant(_) => continue, - }; - - let state = entry.get_mut(); - + // Loop over stream in order order. + 'outer: for stream in self.ordered.iter_mut() { // Keep reading from the buffer until it's empty. - while !state.buffer.is_empty() { + while !stream.buffer.is_empty() { // VecDeque is a ring buffer, so we can't write the whole thing at once. - let parts = state.buffer.as_slices(); + let parts = stream.buffer.as_slices(); - let size = conn.stream_send(id, parts.0, false)?; + let size = conn.stream_send(stream.id, parts.0, false)?; if size == 0 { // No more space available for this stream. continue 'outer; } // Remove the bytes that were written. - state.buffer.drain(..size); + stream.buffer.drain(..size); } - if state.fin { + if stream.fin { // Write the stream done signal. - conn.stream_send(id, &[], true)?; + conn.stream_send(stream.id, &[], true)?; } - - // We can remove the value from the lookup once we've flushed everything. - entry.remove(); } + // Remove streams that are done. + // No need to reprioritize, since the streams are still in order order. + self.ordered.retain(|stream| !stream.buffer.is_empty() || !stream.fin); + Ok(()) } + + // Set the send order of the stream. + pub fn send_order(&mut self, conn: &mut quiche::Connection, id: u64, order: u64) { + let mut stream = match self.ordered.iter().position(|s| s.id == id) { + // Remove the stream from the existing list. + Some(pos) => self.ordered.remove(pos), + + // This is a new stream, insert it into the list. + None => Stream{ + id, + buffer: VecDeque::new(), + fin: false, + order, + }, + }; + + stream.order = order; + + self.insert(conn, stream); + } + + fn insert(&mut self, conn: &mut quiche::Connection, stream: Stream) -> usize { + // Look for the position to insert the stream. + let pos = match self.ordered.binary_search_by_key(&stream.order, |s| s.order) { + Ok(pos) | Err(pos) => pos, + }; + + self.ordered.insert(pos, stream); + + // Reprioritize all later streams. + // TODO we can avoid this if stream_priorty takes a u64 + for (i, stream) in self.ordered[pos..].iter().enumerate() { + _ = conn.stream_priority(stream.id, (pos+i) as u8, true); + } + + pos + } }