Implement prioritization.

Not tested.
This commit is contained in:
Luke Curley 2023-05-09 10:29:39 -06:00
parent 29921ba46d
commit 28f5b97308
3 changed files with 103 additions and 44 deletions

View File

@ -35,7 +35,7 @@ pub struct Fragment {
pub keyframe: bool, pub keyframe: bool,
// The timestamp of the fragment, in milliseconds, to simulate a live stream. // The timestamp of the fragment, in milliseconds, to simulate a live stream.
pub timestamp: Option<u64>, pub timestamp: u64,
} }
impl Source { impl Source {
@ -107,7 +107,7 @@ impl Source {
track_id: moof.trafs[0].tfhd.track_id, track_id: moof.trafs[0].tfhd.track_id,
data: atom, data: atom,
keyframe: has_keyframe(&moof), keyframe: has_keyframe(&moof),
timestamp: first_timestamp(&moof), timestamp: first_timestamp(&moof).expect("couldn't find timestamp"),
}) })
} }
mp4::BoxType::MdatBox => { mp4::BoxType::MdatBox => {
@ -117,7 +117,7 @@ impl Source {
track_id: moof.track_id, track_id: moof.track_id,
data: atom, data: atom,
keyframe: false, keyframe: false,
timestamp: None, timestamp: moof.timestamp,
}); });
// We have some media data, return so we can start sending it. // We have some media data, return so we can start sending it.
@ -133,7 +133,7 @@ impl Source {
// Simulate a live stream by sleeping until the next timestamp in the media. // Simulate a live stream by sleeping until the next timestamp in the media.
pub fn timeout(&self) -> Option<time::Duration> { pub fn timeout(&self) -> Option<time::Duration> {
let next = self.fragments.front()?; let next = self.fragments.front()?;
let timestamp = next.timestamp?; let timestamp = next.timestamp;
// Find the timescale for the track. // Find the timescale for the track.
let timescale = self.timescale.get(&next.track_id).unwrap(); let timescale = self.timescale.get(&next.track_id).unwrap();

View File

@ -120,11 +120,17 @@ impl Session {
// Open a new stream. // Open a new stream.
None => { None => {
// Create a new unidirectional stream.
let stream_id = session.open_stream(conn, false)?; let stream_id = session.open_stream(conn, false)?;
// TODO: conn.stream_priority(stream_id, urgency, incremental)
// Set the stream priority to be equal to the timestamp.
// We subtract from u64::MAX so newer media is sent important.
// TODO prioritize audio
let order = u64::MAX - fragment.timestamp;
self.streams.send_order(conn, stream_id, order);
// Encode a JSON header indicating this is a new track. // Encode a JSON header indicating this is a new track.
let mut message = message::Message::new(); let mut message: message::Message = message::Message::new();
message.segment = Some(message::Segment { message.segment = Some(message::Segment {
track_id: fragment.track_id, track_id: fragment.track_id,
}); });
@ -133,6 +139,7 @@ impl Session {
let data = message.serialize()?; let data = message.serialize()?;
self.streams.send(conn, stream_id, &data, false)?; self.streams.send(conn, stream_id, &data, false)?;
// Keep a mapping from the track id to the current stream id.
self.tracks.insert(fragment.track_id, stream_id); self.tracks.insert(fragment.track_id, stream_id);
stream_id stream_id

View File

@ -1,4 +1,3 @@
use std::collections::hash_map as hmap;
use std::collections::VecDeque; use std::collections::VecDeque;
use anyhow; use anyhow;
@ -6,16 +5,19 @@ use quiche;
#[derive(Default)] #[derive(Default)]
pub struct Streams { pub struct Streams {
lookup: hmap::HashMap<u64, State>, ordered: Vec<Stream>,
} }
#[derive(Default)] struct Stream {
struct State { id: u64,
order: u64,
buffer: VecDeque<u8>, buffer: VecDeque<u8>,
fin: bool, fin: bool,
} }
impl Streams { impl Streams {
// Write the data to the given stream, buffering it if needed.
pub fn send( pub fn send(
&mut self, &mut self,
conn: &mut quiche::Connection, conn: &mut quiche::Connection,
@ -23,63 +25,113 @@ impl Streams {
buf: &[u8], buf: &[u8],
fin: bool, fin: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match self.lookup.entry(id) { if buf.is_empty() && !fin {
hmap::Entry::Occupied(mut entry) => { return Ok(())
// Add to the existing buffer. }
let state = entry.get_mut();
state.buffer.extend(buf);
state.fin |= fin;
}
hmap::Entry::Vacant(entry) => {
let size = conn.stream_send(id, buf, fin)?;
if size < buf.len() { // Get the index of the stream, or add it to the list of streams.
// Short write, save the rest for later. let pos = self.ordered.iter().position(|s| s.id == id).unwrap_or_else(|| {
let mut buffer = VecDeque::with_capacity(buf.len()); // Create a new stream
buffer.extend(&buf[size..]); let stream = Stream{
id,
buffer: VecDeque::new(),
fin: false,
order: 0, // Default to highest priority until send_order is called.
};
entry.insert(State { buffer, fin }); self.insert(conn, stream)
} });
}
let stream = &mut self.ordered[pos];
// Check if we've already closed the stream, just in case.
if stream.fin && !buf.is_empty() {
anyhow::bail!("stream is already finished");
}
// If there's no data buffered, try to write it immediately.
let size = if stream.buffer.is_empty() {
conn.stream_send(id, buf, fin)?
} else {
0
}; };
if size < buf.len() {
// Short write, save the rest for later.
stream.buffer.extend(&buf[size..]);
}
stream.fin |= fin;
Ok(()) Ok(())
} }
// Flush any pending stream data.
pub fn poll(&mut self, conn: &mut quiche::Connection) -> anyhow::Result<()> { pub fn poll(&mut self, conn: &mut quiche::Connection) -> anyhow::Result<()> {
'outer: for id in conn.writable() { // Loop over stream in order order.
// Check if there's any buffered data for this stream. 'outer: for stream in self.ordered.iter_mut() {
let mut entry = match self.lookup.entry(id) {
hmap::Entry::Occupied(entry) => entry,
hmap::Entry::Vacant(_) => continue,
};
let state = entry.get_mut();
// Keep reading from the buffer until it's empty. // Keep reading from the buffer until it's empty.
while !state.buffer.is_empty() { while !stream.buffer.is_empty() {
// VecDeque is a ring buffer, so we can't write the whole thing at once. // VecDeque is a ring buffer, so we can't write the whole thing at once.
let parts = state.buffer.as_slices(); let parts = stream.buffer.as_slices();
let size = conn.stream_send(id, parts.0, false)?; let size = conn.stream_send(stream.id, parts.0, false)?;
if size == 0 { if size == 0 {
// No more space available for this stream. // No more space available for this stream.
continue 'outer; continue 'outer;
} }
// Remove the bytes that were written. // Remove the bytes that were written.
state.buffer.drain(..size); stream.buffer.drain(..size);
} }
if state.fin { if stream.fin {
// Write the stream done signal. // Write the stream done signal.
conn.stream_send(id, &[], true)?; conn.stream_send(stream.id, &[], true)?;
} }
// We can remove the value from the lookup once we've flushed everything.
entry.remove();
} }
// Remove streams that are done.
// No need to reprioritize, since the streams are still in order order.
self.ordered.retain(|stream| !stream.buffer.is_empty() || !stream.fin);
Ok(()) Ok(())
} }
// Set the send order of the stream.
pub fn send_order(&mut self, conn: &mut quiche::Connection, id: u64, order: u64) {
let mut stream = match self.ordered.iter().position(|s| s.id == id) {
// Remove the stream from the existing list.
Some(pos) => self.ordered.remove(pos),
// This is a new stream, insert it into the list.
None => Stream{
id,
buffer: VecDeque::new(),
fin: false,
order,
},
};
stream.order = order;
self.insert(conn, stream);
}
fn insert(&mut self, conn: &mut quiche::Connection, stream: Stream) -> usize {
// Look for the position to insert the stream.
let pos = match self.ordered.binary_search_by_key(&stream.order, |s| s.order) {
Ok(pos) | Err(pos) => pos,
};
self.ordered.insert(pos, stream);
// Reprioritize all later streams.
// TODO we can avoid this if stream_priorty takes a u64
for (i, stream) in self.ordered[pos..].iter().enumerate() {
_ = conn.stream_priority(stream.id, (pos+i) as u8, true);
}
pos
}
} }