diff --git a/Cargo.lock b/Cargo.lock index a702427..5020f28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1107,6 +1107,7 @@ dependencies = [ name = "moq-transport" version = "0.2.0" dependencies = [ + "async-trait", "bytes", "indexmap 2.0.0", "log", diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs index cb2c5a7..1eed6b9 100644 --- a/moq-pub/src/media.rs +++ b/moq-pub/src/media.rs @@ -1,6 +1,6 @@ use crate::cli::Config; use anyhow::{self, Context}; -use moq_transport::cache::{broadcast, segment, track}; +use moq_transport::cache::{broadcast, fragment, segment, track}; use moq_transport::VarInt; use mp4::{self, ReadBox}; use serde_json::json; @@ -44,11 +44,17 @@ impl Media { let mut init_track = broadcast.create_track("0.mp4")?; let mut init_segment = init_track.create_segment(segment::Info { sequence: VarInt::ZERO, - priority: i32::MAX, + priority: 0, expires: None, })?; - init_segment.write_chunk(init.into())?; + // Create a single fragment, optionally setting the size + let mut init_fragment = init_segment.create_fragment(fragment::Info { + sequence: VarInt::ZERO, + size: None, // size is only needed when we have multiple fragments. + })?; + + init_fragment.write_chunk(init.into())?; let mut tracks = HashMap::new(); @@ -128,7 +134,7 @@ impl Media { ) -> Result<(), anyhow::Error> { let mut segment = track.create_segment(segment::Info { sequence: VarInt::ZERO, - priority: i32::MAX, + priority: 0, expires: None, })?; @@ -211,8 +217,14 @@ impl Media { let catalog_str = serde_json::to_string_pretty(&catalog)?; log::info!("catalog: {}", catalog_str); + // Create a single fragment for the segment. + let mut fragment = segment.create_fragment(fragment::Info { + sequence: VarInt::ZERO, + size: None, // Size is only needed when we have multiple fragments. + })?; + // Add the segment and add the fragment. - segment.write_chunk(catalog_str.into())?; + fragment.write_chunk(catalog_str.into())?; Ok(()) } @@ -260,7 +272,7 @@ struct Track { track: track::Publisher, // The current segment - segment: Option, + current: Option, // The number of units per second. timescale: u64, @@ -274,16 +286,16 @@ impl Track { Self { track, sequence: 0, - segment: None, + current: None, timescale, } } pub fn header(&mut self, raw: Vec, fragment: Fragment) -> anyhow::Result<()> { - if let Some(segment) = self.segment.as_mut() { + if let Some(current) = self.current.as_mut() { if !fragment.keyframe { // Use the existing segment - segment.write_chunk(raw.into())?; + current.write_chunk(raw.into())?; return Ok(()); } } @@ -292,7 +304,7 @@ impl Track { // Compute the timestamp in milliseconds. // Overflows after 583 million years, so we're fine. - let timestamp: i32 = fragment + let timestamp: u32 = fragment .timestamp(self.timescale) .as_millis() .try_into() @@ -301,26 +313,34 @@ impl Track { // Create a new segment. let mut segment = self.track.create_segment(segment::Info { sequence: VarInt::try_from(self.sequence).context("sequence too large")?, - priority: timestamp, // newer segments are higher priority + + // Newer segments are higher priority + priority: u32::MAX.checked_sub(timestamp).context("priority too large")?, // Delete segments after 10s. expires: Some(time::Duration::from_secs(10)), })?; + // Create a single fragment for the segment that we will keep appending. + let mut fragment = segment.create_fragment(fragment::Info { + sequence: VarInt::ZERO, + size: None, + })?; + self.sequence += 1; // Insert the raw atom into the segment. - segment.write_chunk(raw.into())?; + fragment.write_chunk(raw.into())?; // Save for the next iteration - self.segment = Some(segment); + self.current = Some(fragment); Ok(()) } pub fn data(&mut self, raw: Vec) -> anyhow::Result<()> { - let segment = self.segment.as_mut().context("missing segment")?; - segment.write_chunk(raw.into())?; + let fragment = self.current.as_mut().context("missing current fragment")?; + fragment.write_chunk(raw.into())?; Ok(()) } diff --git a/moq-relay/src/error.rs b/moq-relay/src/error.rs index 54813ad..b943a93 100644 --- a/moq-relay/src/error.rs +++ b/moq-relay/src/error.rs @@ -37,15 +37,15 @@ impl moq_transport::MoqError for RelayError { } } - fn reason(&self) -> &str { + fn reason(&self) -> String { match self { - Self::Transport(err) => err.reason(), - Self::Cache(err) => err.reason(), - Self::MoqApi(_err) => "api error", - Self::Url(_) => "url error", - Self::MissingNode => "missing node", - Self::WebTransportServer(_) => "server error", - Self::WebTransportClient(_) => "upstream error", + Self::Transport(err) => format!("transport error: {}", err.reason()), + Self::Cache(err) => format!("cache error: {}", err.reason()), + Self::MoqApi(err) => format!("api error: {}", err), + Self::Url(err) => format!("url error: {}", err), + Self::MissingNode => "missing node".to_owned(), + Self::WebTransportServer(err) => format!("upstream server error: {}", err), + Self::WebTransportClient(err) => format!("upstream client error: {}", err), } } } diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 6f59f63..7c9a0bd 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -24,3 +24,5 @@ indexmap = "2" quinn = "0.10" webtransport-quinn = "0.6" #webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" } + +async-trait = "0.1" diff --git a/moq-transport/src/cache/broadcast.rs b/moq-transport/src/cache/broadcast.rs index d485c30..feb3824 100644 --- a/moq-transport/src/cache/broadcast.rs +++ b/moq-transport/src/cache/broadcast.rs @@ -136,12 +136,12 @@ impl Publisher { } /// Block until the next track requested by a subscriber. - pub async fn next_track(&mut self) -> Result, CacheError> { + pub async fn next_track(&mut self) -> Result { loop { let notify = { let state = self.state.lock(); if state.has_next()? { - return Ok(Some(state.into_mut().next())); + return Ok(state.into_mut().next()); } state.changed() diff --git a/moq-transport/src/cache/error.rs b/moq-transport/src/cache/error.rs index 99de101..d3f907b 100644 --- a/moq-transport/src/cache/error.rs +++ b/moq-transport/src/cache/error.rs @@ -39,13 +39,13 @@ impl MoqError for CacheError { } /// A reason that is sent over the wire. - fn reason(&self) -> &str { + fn reason(&self) -> String { match self { - Self::Closed => "closed", - Self::Reset(_) => "reset", - Self::Stop => "stop", - Self::NotFound => "not found", - Self::Duplicate => "duplicate", + Self::Closed => "closed".to_owned(), + Self::Reset(code) => format!("reset code: {}", code), + Self::Stop => "stop".to_owned(), + Self::NotFound => "not found".to_owned(), + Self::Duplicate => "duplicate".to_owned(), } } } diff --git a/moq-transport/src/cache/fragment.rs b/moq-transport/src/cache/fragment.rs new file mode 100644 index 0000000..49cab62 --- /dev/null +++ b/moq-transport/src/cache/fragment.rs @@ -0,0 +1,216 @@ +//! A fragment is a stream of bytes with a header, split into a [Publisher] and [Subscriber] handle. +//! +//! A [Publisher] writes an ordered stream of bytes in chunks. +//! There's no framing, so these chunks can be of any size or position, and won't be maintained over the network. +//! +//! A [Subscriber] reads an ordered stream of bytes in chunks. +//! These chunks are returned directly from the QUIC connection, so they may be of any size or position. +//! You can clone the [Subscriber] and each will read a copy of of all future chunks. (fanout) +//! +//! The fragment is closed with [CacheError::Closed] when all publishers or subscribers are dropped. +use core::fmt; +use std::{ops::Deref, sync::Arc}; + +use crate::VarInt; +use bytes::Bytes; + +use super::{CacheError, Watch}; + +/// Create a new segment with the given info. +pub fn new(info: Info) -> (Publisher, Subscriber) { + let state = Watch::new(State::default()); + let info = Arc::new(info); + + let publisher = Publisher::new(state.clone(), info.clone()); + let subscriber = Subscriber::new(state, info); + + (publisher, subscriber) +} + +/// Static information about the segment. +#[derive(Debug)] +pub struct Info { + // The sequence number of the fragment within the segment. + // NOTE: These may be received out of order or with gaps. + pub sequence: VarInt, + + // The size of the fragment, optionally None if this is the last fragment in a segment. + // TODO enforce this size. + pub size: Option, +} + +struct State { + // The data that has been received thus far. + chunks: Vec, + + // Set when the publisher is dropped. + closed: Result<(), CacheError>, +} + +impl State { + pub fn close(&mut self, err: CacheError) -> Result<(), CacheError> { + self.closed.clone()?; + self.closed = Err(err); + Ok(()) + } + + pub fn bytes(&self) -> usize { + self.chunks.iter().map(|f| f.len()).sum::() + } +} + +impl Default for State { + fn default() -> Self { + Self { + chunks: Vec::new(), + closed: Ok(()), + } + } +} + +impl fmt::Debug for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // We don't want to print out the contents, so summarize. + f.debug_struct("State") + .field("chunks", &self.chunks.len().to_string()) + .field("bytes", &self.bytes().to_string()) + .field("closed", &self.closed) + .finish() + } +} + +/// Used to write data to a segment and notify subscribers. +pub struct Publisher { + // Mutable segment state. + state: Watch, + + // Immutable segment state. + info: Arc, + + // Closes the segment when all Publishers are dropped. + _dropped: Arc, +} + +impl Publisher { + fn new(state: Watch, info: Arc) -> Self { + let _dropped = Arc::new(Dropped::new(state.clone())); + Self { state, info, _dropped } + } + + /// Write a new chunk of bytes. + pub fn write_chunk(&mut self, chunk: Bytes) -> Result<(), CacheError> { + let mut state = self.state.lock_mut(); + state.closed.clone()?; + state.chunks.push(chunk); + Ok(()) + } + + /// Close the segment with an error. + pub fn close(self, err: CacheError) -> Result<(), CacheError> { + self.state.lock_mut().close(err) + } +} + +impl Deref for Publisher { + type Target = Info; + + fn deref(&self) -> &Self::Target { + &self.info + } +} + +impl fmt::Debug for Publisher { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Publisher") + .field("state", &self.state) + .field("info", &self.info) + .finish() + } +} + +/// Notified when a segment has new data available. +#[derive(Clone)] +pub struct Subscriber { + // Modify the segment state. + state: Watch, + + // Immutable segment state. + info: Arc, + + // The number of chunks that we've read. + // NOTE: Cloned subscribers inherit this index, but then run in parallel. + index: usize, + + // Dropped when all Subscribers are dropped. + _dropped: Arc, +} + +impl Subscriber { + fn new(state: Watch, info: Arc) -> Self { + let _dropped = Arc::new(Dropped::new(state.clone())); + + Self { + state, + info, + index: 0, + _dropped, + } + } + + /// Block until the next chunk of bytes is available. + pub async fn read_chunk(&mut self) -> Result, CacheError> { + loop { + let notify = { + let state = self.state.lock(); + if self.index < state.chunks.len() { + let chunk = state.chunks[self.index].clone(); + self.index += 1; + return Ok(Some(chunk)); + } + + match &state.closed { + Err(CacheError::Closed) => return Ok(None), + Err(err) => return Err(err.clone()), + Ok(()) => state.changed(), + } + }; + + notify.await; // Try again when the state changes + } + } +} + +impl Deref for Subscriber { + type Target = Info; + + fn deref(&self) -> &Self::Target { + &self.info + } +} + +impl fmt::Debug for Subscriber { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Subscriber") + .field("state", &self.state) + .field("info", &self.info) + .field("index", &self.index) + .finish() + } +} + +struct Dropped { + // Modify the segment state. + state: Watch, +} + +impl Dropped { + fn new(state: Watch) -> Self { + Self { state } + } +} + +impl Drop for Dropped { + fn drop(&mut self) { + self.state.lock_mut().close(CacheError::Closed).ok(); + } +} diff --git a/moq-transport/src/cache/mod.rs b/moq-transport/src/cache/mod.rs index 175deed..96228cf 100644 --- a/moq-transport/src/cache/mod.rs +++ b/moq-transport/src/cache/mod.rs @@ -1,10 +1,17 @@ //! Allows a publisher to push updates, automatically caching and fanning it out to any subscribers. //! -//! The naming scheme doesn't match the spec because it's vague and confusing. -//! The hierarchy is: [broadcast] -> [track] -> [segment] -> [Bytes](bytes::Bytes) +//! The hierarchy is: [broadcast] -> [track] -> [segment] -> [fragment] -> [Bytes](bytes::Bytes) +//! +//! The naming scheme doesn't match the spec because it's more strict, and bikeshedding of course: +//! +//! - [broadcast] is kinda like "track namespace" +//! - [track] is "track" +//! - [segment] is "group" but MUST use a single stream. +//! - [fragment] is "object" but MUST have the same properties as the segment. pub mod broadcast; mod error; +pub mod fragment; pub mod segment; pub mod track; diff --git a/moq-transport/src/cache/segment.rs b/moq-transport/src/cache/segment.rs index 1a25c19..b4b31ef 100644 --- a/moq-transport/src/cache/segment.rs +++ b/moq-transport/src/cache/segment.rs @@ -1,20 +1,18 @@ -//! A segment is a stream of bytes with a header, split into a [Publisher] and [Subscriber] handle. +//! A segment is a stream of fragments with a header, split into a [Publisher] and [Subscriber] handle. //! -//! A [Publisher] writes an ordered stream of bytes in chunks. -//! There's no framing, so these chunks can be of any size or position, and won't be maintained over the network. +//! A [Publisher] writes an ordered stream of fragments. +//! Each fragment can have a sequence number, allowing the subscriber to detect gaps fragments. //! -//! A [Subscriber] reads an ordered stream of bytes in chunks. -//! These chunks are returned directly from the QUIC connection, so they may be of any size or position. -//! A closed [Subscriber] will receive a copy of all future chunks. (fanout) +//! A [Subscriber] reads an ordered stream of fragments. +//! The subscriber can be cloned, in which case each subscriber receives a copy of each fragment. (fanout) //! //! The segment is closed with [CacheError::Closed] when all publishers or subscribers are dropped. use core::fmt; use std::{ops::Deref, sync::Arc, time}; use crate::VarInt; -use bytes::Bytes; -use super::{CacheError, Watch}; +use super::{fragment, CacheError, Watch}; /// Create a new segment with the given info. pub fn new(info: Info) -> (Publisher, Subscriber) { @@ -31,10 +29,11 @@ pub fn new(info: Info) -> (Publisher, Subscriber) { #[derive(Debug)] pub struct Info { // The sequence number of the segment within the track. + // NOTE: These may be received out of order or with gaps. pub sequence: VarInt, // The priority of the segment within the BROADCAST. - pub priority: i32, + pub priority: u32, // Cache the segment for at most this long. pub expires: Option, @@ -42,7 +41,7 @@ pub struct Info { struct State { // The data that has been received thus far. - data: Vec, + fragments: Vec, // Set when the publisher is dropped. closed: Result<(), CacheError>, @@ -59,7 +58,7 @@ impl State { impl Default for State { fn default() -> Self { Self { - data: Vec::new(), + fragments: Vec::new(), closed: Ok(()), } } @@ -67,12 +66,8 @@ impl Default for State { impl fmt::Debug for State { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // We don't want to print out the contents, so summarize. - let size = self.data.iter().map(|chunk| chunk.len()).sum::(); - let data = format!("size={} chunks={}", size, self.data.len()); - f.debug_struct("State") - .field("data", &data) + .field("fragments", &self.fragments) .field("closed", &self.closed) .finish() } @@ -96,14 +91,20 @@ impl Publisher { Self { state, info, _dropped } } - /// Write a new chunk of bytes. - pub fn write_chunk(&mut self, data: Bytes) -> Result<(), CacheError> { + /// Write a fragment + pub fn push_fragment(&mut self, fragment: fragment::Subscriber) -> Result<(), CacheError> { let mut state = self.state.lock_mut(); state.closed.clone()?; - state.data.push(data); + state.fragments.push(fragment); Ok(()) } + pub fn create_fragment(&mut self, fragment: fragment::Info) -> Result { + let (publisher, subscriber) = fragment::new(fragment); + self.push_fragment(subscriber)?; + Ok(publisher) + } + /// Close the segment with an error. pub fn close(self, err: CacheError) -> Result<(), CacheError> { self.state.lock_mut().close(err) @@ -157,14 +158,14 @@ impl Subscriber { } /// Block until the next chunk of bytes is available. - pub async fn read_chunk(&mut self) -> Result, CacheError> { + pub async fn next_fragment(&mut self) -> Result, CacheError> { loop { let notify = { let state = self.state.lock(); - if self.index < state.data.len() { - let chunk = state.data[self.index].clone(); + if self.index < state.fragments.len() { + let fragment = state.fragments[self.index].clone(); self.index += 1; - return Ok(Some(chunk)); + return Ok(Some(fragment)); } match &state.closed { diff --git a/moq-transport/src/cache/track.rs b/moq-transport/src/cache/track.rs index 109a011..ad4d380 100644 --- a/moq-transport/src/cache/track.rs +++ b/moq-transport/src/cache/track.rs @@ -206,7 +206,7 @@ impl Subscriber { } } - /// Block until the next segment arrives, or return None if the track is [CacheError::Closed]. + /// Block until the next segment arrives pub async fn next_segment(&mut self) -> Result, CacheError> { loop { let notify = { diff --git a/moq-transport/src/coding/decode.rs b/moq-transport/src/coding/decode.rs index 7a84a55..a6fe94e 100644 --- a/moq-transport/src/coding/decode.rs +++ b/moq-transport/src/coding/decode.rs @@ -1,5 +1,5 @@ use super::{BoundsExceeded, VarInt}; -use std::str; +use std::{io, str}; use thiserror::Error; @@ -7,6 +7,13 @@ use thiserror::Error; // TODO Use trait aliases when they're stable, or add these bounds to every method. pub trait AsyncRead: tokio::io::AsyncRead + Unpin + Send {} impl AsyncRead for webtransport_quinn::RecvStream {} +impl AsyncRead for tokio::io::Take<&mut T> where T: AsyncRead {} +impl + Unpin + Send> AsyncRead for io::Cursor {} + +#[async_trait::async_trait] +pub trait Decode: Sized { + async fn decode(r: &mut R) -> Result; +} /// A decode error. #[derive(Error, Debug)] @@ -17,12 +24,32 @@ pub enum DecodeError { #[error("invalid string")] InvalidString(#[from] str::Utf8Error), - #[error("invalid type: {0:?}")] - InvalidType(VarInt), + #[error("invalid message: {0:?}")] + InvalidMessage(VarInt), + + #[error("invalid role: {0:?}")] + InvalidRole(VarInt), + + #[error("invalid subscribe location")] + InvalidSubscribeLocation, #[error("varint bounds exceeded")] BoundsExceeded(#[from] BoundsExceeded), + // TODO move these to ParamError + #[error("duplicate parameter")] + DupliateParameter, + + #[error("missing parameter")] + MissingParameter, + + #[error("invalid parameter")] + InvalidParameter, + #[error("io error: {0}")] IoError(#[from] std::io::Error), + + // Used to signal that the stream has ended. + #[error("no more messages")] + Final, } diff --git a/moq-transport/src/coding/encode.rs b/moq-transport/src/coding/encode.rs index 65bd697..b03cdb9 100644 --- a/moq-transport/src/coding/encode.rs +++ b/moq-transport/src/coding/encode.rs @@ -6,6 +6,12 @@ use thiserror::Error; // TODO Use trait aliases when they're stable, or add these bounds to every method. pub trait AsyncWrite: tokio::io::AsyncWrite + Unpin + Send {} impl AsyncWrite for webtransport_quinn::SendStream {} +impl AsyncWrite for Vec {} + +#[async_trait::async_trait] +pub trait Encode: Sized { + async fn encode(&self, w: &mut W) -> Result<(), EncodeError>; +} /// An encode error. #[derive(Error, Debug)] diff --git a/moq-transport/src/coding/mod.rs b/moq-transport/src/coding/mod.rs index f3cc9c8..ff57b4c 100644 --- a/moq-transport/src/coding/mod.rs +++ b/moq-transport/src/coding/mod.rs @@ -1,9 +1,11 @@ mod decode; mod encode; +mod params; mod string; mod varint; pub use decode::*; pub use encode::*; +pub use params::*; pub use string::*; pub use varint::*; diff --git a/moq-transport/src/coding/params.rs b/moq-transport/src/coding/params.rs index 7ee29d2..9cfd6f3 100644 --- a/moq-transport/src/coding/params.rs +++ b/moq-transport/src/coding/params.rs @@ -1,69 +1,85 @@ -use std::cmp::min; +use std::io::Cursor; +use std::{cmp::max, collections::HashMap}; -use crate::VarInt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use super::{AsyncRead, AsyncWrite, DecodeError, EncodeError}; -use tokio::io::AsyncReadExt; +use crate::coding::{AsyncRead, AsyncWrite, Decode, Encode}; -// I hate this parameter encoding so much. -// i hate it i hate it i hate it +use crate::{ + coding::{DecodeError, EncodeError}, + VarInt, +}; -// TODO Use #[async_trait] so we can do Param instead. -pub struct ParamInt(pub VarInt); +#[derive(Default, Debug, Clone)] +pub struct Params(pub HashMap>); -impl ParamInt { - pub async fn decode(r: &mut R) -> Result { - // Why do we have a redundant size in front of each VarInt? - let size = VarInt::decode(r).await?; - let mut take = r.take(size.into_inner()); - let value = VarInt::decode(&mut take).await?; +#[async_trait::async_trait] +impl Decode for Params { + async fn decode(mut r: &mut R) -> Result { + let mut params = HashMap::new(); - // Like seriously why do I have to check if the VarInt length mismatches. - if take.limit() != 0 { - return Err(DecodeError::InvalidSize); + // I hate this shit so much; let me encode my role and get on with my life. + let count = VarInt::decode(r).await?; + for _ in 0..count.into_inner() { + let kind = VarInt::decode(r).await?; + if params.contains_key(&kind) { + return Err(DecodeError::DupliateParameter); + } + + let size = VarInt::decode(r).await?; + + // Don't allocate the entire requested size to avoid a possible attack + // Instead, we allocate up to 1024 and keep appending as we read further. + let mut pr = r.take(size.into_inner()); + let mut buf = Vec::with_capacity(max(1024, pr.limit() as usize)); + pr.read_to_end(&mut buf).await?; + params.insert(kind, buf); + + r = pr.into_inner(); } - Ok(Self(value)) + Ok(Params(params)) } +} - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - // Seriously why do I have to compute the size. - let size = self.0.size(); - VarInt::try_from(size)?.encode(w).await?; +#[async_trait::async_trait] +impl Encode for Params { + async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + VarInt::try_from(self.0.len())?.encode(w).await?; - self.0.encode(w).await?; + for (kind, value) in self.0.iter() { + kind.encode(w).await?; + VarInt::try_from(value.len())?.encode(w).await?; + w.write_all(value).await?; + } Ok(()) } } -pub struct ParamBytes(pub Vec); - -impl ParamBytes { - pub async fn decode(r: &mut R) -> Result { - let size = VarInt::decode(r).await?; - let mut take = r.take(size.into_inner()); - let mut buf = Vec::with_capacity(min(take.limit() as usize, 1024)); - take.read_to_end(&mut buf).await?; - - Ok(Self(buf)) +impl Params { + pub fn new() -> Self { + Self::default() } - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - let size = VarInt::try_from(self.0.len())?; - size.encode(w).await?; - w.write_all(&self.0).await?; + pub async fn set(&mut self, kind: VarInt, p: P) -> Result<(), EncodeError> { + let mut value = Vec::new(); + p.encode(&mut value).await?; + self.0.insert(kind, value); Ok(()) } -} -pub struct ParamUnknown {} + pub fn has(&self, kind: VarInt) -> bool { + self.0.contains_key(&kind) + } -impl ParamUnknown { - pub async fn decode(r: &mut R) -> Result<(), DecodeError> { - // Really? Is there no way to advance without reading? - ParamBytes::decode(r).await?; - Ok(()) + pub async fn get(&mut self, kind: VarInt) -> Result, DecodeError> { + if let Some(value) = self.0.remove(&kind) { + let mut cursor = Cursor::new(value); + Ok(Some(P::decode(&mut cursor).await?)) + } else { + Ok(None) + } } } diff --git a/moq-transport/src/coding/string.rs b/moq-transport/src/coding/string.rs index 3bc912c..2cdff4a 100644 --- a/moq-transport/src/coding/string.rs +++ b/moq-transport/src/coding/string.rs @@ -5,20 +5,25 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::VarInt; -use super::{DecodeError, EncodeError}; +use super::{Decode, DecodeError, Encode, EncodeError}; -/// Encode a string with a varint length prefix. -pub async fn encode_string(s: &str, w: &mut W) -> Result<(), EncodeError> { - let size = VarInt::try_from(s.len())?; - size.encode(w).await?; - w.write_all(s.as_ref()).await?; - Ok(()) +#[async_trait::async_trait] +impl Encode for String { + async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + let size = VarInt::try_from(self.len())?; + size.encode(w).await?; + w.write_all(self.as_ref()).await?; + Ok(()) + } } -/// Decode a string with a varint length prefix. -pub async fn decode_string(r: &mut R) -> Result { - let size = VarInt::decode(r).await?.into_inner(); - let mut str = String::with_capacity(min(1024, size) as usize); - r.take(size).read_to_string(&mut str).await?; - Ok(str) +#[async_trait::async_trait] +impl Decode for String { + /// Decode a string with a varint length prefix. + async fn decode(r: &mut R) -> Result { + let size = VarInt::decode(r).await?.into_inner(); + let mut str = String::with_capacity(min(1024, size) as usize); + r.take(size).read_to_string(&mut str).await?; + Ok(str) + } } diff --git a/moq-transport/src/coding/varint.rs b/moq-transport/src/coding/varint.rs index 28542f3..8557de8 100644 --- a/moq-transport/src/coding/varint.rs +++ b/moq-transport/src/coding/varint.rs @@ -9,7 +9,7 @@ use crate::coding::{AsyncRead, AsyncWrite}; use thiserror::Error; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use super::{DecodeError, EncodeError}; +use super::{Decode, DecodeError, Encode, EncodeError}; #[derive(Debug, Copy, Clone, Eq, PartialEq, Error)] #[error("value out of range")] @@ -164,14 +164,23 @@ impl fmt::Display for VarInt { } } -impl VarInt { +#[async_trait::async_trait] +impl Decode for VarInt { /// Decode a varint from the given reader. - pub async fn decode(r: &mut R) -> Result { - let mut buf = [0u8; 8]; - r.read_exact(buf[0..1].as_mut()).await?; + async fn decode(r: &mut R) -> Result { + let b = r.read_u8().await?; + Self::decode_byte(b, r).await + } +} - let tag = buf[0] >> 6; - buf[0] &= 0b0011_1111; +impl VarInt { + /// Decode a varint given the first byte, reading the rest as needed. + /// This is silly but useful for determining if the stream has ended. + pub async fn decode_byte(b: u8, r: &mut R) -> Result { + let tag = b >> 6; + + let mut buf = [0u8; 8]; + buf[0] = b & 0b0011_1111; let x = match tag { 0b00 => u64::from(buf[0]), @@ -192,9 +201,12 @@ impl VarInt { Ok(Self(x)) } +} +#[async_trait::async_trait] +impl Encode for VarInt { /// Encode a varint to the given writer. - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { let x = self.0; if x < 2u64.pow(6) { w.write_u8(x as u8).await?; diff --git a/moq-transport/src/error.rs b/moq-transport/src/error.rs index 802caff..d070251 100644 --- a/moq-transport/src/error.rs +++ b/moq-transport/src/error.rs @@ -1,5 +1,7 @@ pub trait MoqError { /// An integer code that is sent over the wire. fn code(&self) -> u32; - fn reason(&self) -> &str; + + /// An optional reason sometimes sent over the wire. + fn reason(&self) -> String; } diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index bbde1d8..cdff944 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -5,8 +5,8 @@ //! The specification is a work in progress and will change. //! See the [specification](https://datatracker.ietf.org/doc/draft-ietf-moq-transport/) and [github](https://github.com/moq-wg/moq-transport) for any updates. //! -//! **FORKED**: This is implementation makes extensive changes to the protocol. -//! See [KIXEL_00](crate::setup::Version::KIXEL_00) for a list of differences. +//! **FORKED**: This implementation makes some changes to the protocol. +//! See [KIXEL_01](crate::setup::Version::KIXEL_01) for a list of differences. //! Many of these will get merged into the specification, so don't panic. mod coding; mod error; diff --git a/moq-transport/src/message/announce.rs b/moq-transport/src/message/announce.rs index cdc3ddd..709339d 100644 --- a/moq-transport/src/message/announce.rs +++ b/moq-transport/src/message/announce.rs @@ -1,22 +1,29 @@ -use crate::coding::{decode_string, encode_string, DecodeError, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; use crate::coding::{AsyncRead, AsyncWrite}; /// Sent by the publisher to announce the availability of a group of tracks. #[derive(Clone, Debug)] pub struct Announce { - // The track namespace + /// The track namespace pub namespace: String, + + /// Optional parameters + pub params: Params, } impl Announce { pub async fn decode(r: &mut R) -> Result { - let namespace = decode_string(r).await?; - Ok(Self { namespace }) + let namespace = String::decode(r).await?; + let params = Params::decode(r).await?; + + Ok(Self { namespace, params }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - encode_string(&self.namespace, w).await?; + self.namespace.encode(w).await?; + self.params.encode(w).await?; + Ok(()) } } diff --git a/moq-transport/src/message/announce_ok.rs b/moq-transport/src/message/announce_ok.rs index de8b4d3..a5c4792 100644 --- a/moq-transport/src/message/announce_ok.rs +++ b/moq-transport/src/message/announce_ok.rs @@ -1,4 +1,4 @@ -use crate::coding::{decode_string, encode_string, AsyncRead, AsyncWrite, DecodeError, EncodeError}; +use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError}; /// Sent by the subscriber to accept an Announce. #[derive(Clone, Debug)] @@ -10,11 +10,11 @@ pub struct AnnounceOk { impl AnnounceOk { pub async fn decode(r: &mut R) -> Result { - let namespace = decode_string(r).await?; + let namespace = String::decode(r).await?; Ok(Self { namespace }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - encode_string(&self.namespace, w).await + self.namespace.encode(w).await } } diff --git a/moq-transport/src/message/announce_reset.rs b/moq-transport/src/message/announce_reset.rs index 27e1326..e21886b 100644 --- a/moq-transport/src/message/announce_reset.rs +++ b/moq-transport/src/message/announce_reset.rs @@ -1,10 +1,10 @@ -use crate::coding::{decode_string, encode_string, DecodeError, EncodeError, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; /// Sent by the subscriber to reject an Announce. #[derive(Clone, Debug)] -pub struct AnnounceReset { +pub struct AnnounceError { // Echo back the namespace that was reset pub namespace: String, @@ -15,11 +15,11 @@ pub struct AnnounceReset { pub reason: String, } -impl AnnounceReset { +impl AnnounceError { pub async fn decode(r: &mut R) -> Result { - let namespace = decode_string(r).await?; + let namespace = String::decode(r).await?; let code = VarInt::decode(r).await?.try_into()?; - let reason = decode_string(r).await?; + let reason = String::decode(r).await?; Ok(Self { namespace, @@ -29,9 +29,9 @@ impl AnnounceReset { } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - encode_string(&self.namespace, w).await?; + self.namespace.encode(w).await?; VarInt::from_u32(self.code).encode(w).await?; - encode_string(&self.reason, w).await?; + self.reason.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/message/go_away.rs b/moq-transport/src/message/go_away.rs index 674cb5a..c86152a 100644 --- a/moq-transport/src/message/go_away.rs +++ b/moq-transport/src/message/go_away.rs @@ -1,4 +1,4 @@ -use crate::coding::{decode_string, encode_string, DecodeError, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use crate::coding::{AsyncRead, AsyncWrite}; @@ -10,11 +10,11 @@ pub struct GoAway { impl GoAway { pub async fn decode(r: &mut R) -> Result { - let url = decode_string(r).await?; + let url = String::decode(r).await?; Ok(Self { url }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - encode_string(&self.url, w).await + self.url.encode(w).await } } diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index 28ced81..34260d4 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -6,16 +6,17 @@ //! //! Messages sent by the publisher: //! - [Announce] -//! - [AnnounceReset] +//! - [Unannounce] //! - [SubscribeOk] +//! - [SubscribeError] //! - [SubscribeReset] //! - [Object] //! //! Messages sent by the subscriber: //! - [Subscribe] -//! - [SubscribeStop] +//! - [Unsubscribe] //! - [AnnounceOk] -//! - [AnnounceStop] +//! - [AnnounceError] //! //! Example flow: //! ```test @@ -32,26 +33,30 @@ mod announce; mod announce_ok; mod announce_reset; -mod announce_stop; mod go_away; mod object; mod subscribe; +mod subscribe_error; +mod subscribe_fin; mod subscribe_ok; mod subscribe_reset; -mod subscribe_stop; +mod unannounce; +mod unsubscribe; pub use announce::*; pub use announce_ok::*; pub use announce_reset::*; -pub use announce_stop::*; pub use go_away::*; pub use object::*; pub use subscribe::*; +pub use subscribe_error::*; +pub use subscribe_fin::*; pub use subscribe_ok::*; pub use subscribe_reset::*; -pub use subscribe_stop::*; +pub use unannounce::*; +pub use unsubscribe::*; -use crate::coding::{DecodeError, EncodeError, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use std::fmt; @@ -76,7 +81,7 @@ macro_rules! message_types { let msg = $name::decode(r).await?; Ok(Self::$name(msg)) })* - _ => Err(DecodeError::InvalidType(t)), + _ => Err(DecodeError::InvalidMessage(t)), } } @@ -127,15 +132,28 @@ macro_rules! message_types { message_types! { // NOTE: Object and Setup are in other modules. // Object = 0x0 - // SetupClient = 0x1 - // SetupServer = 0x2 + // ObjectUnbounded = 0x2 + // SetupClient = 0x40 + // SetupServer = 0x41 + + // SUBSCRIBE family, sent by subscriber Subscribe = 0x3, + Unsubscribe = 0xa, + + // SUBSCRIBE family, sent by publisher SubscribeOk = 0x4, - SubscribeReset = 0x5, - SubscribeStop = 0x15, + SubscribeError = 0x5, + SubscribeFin = 0xb, + SubscribeReset = 0xc, + + // ANNOUNCE family, sent by publisher Announce = 0x6, + Unannounce = 0x9, + + // ANNOUNCE family, sent by subscriber AnnounceOk = 0x7, - AnnounceReset = 0x8, - AnnounceStop = 0x18, + AnnounceError = 0x8, + + // Misc GoAway = 0x10, } diff --git a/moq-transport/src/message/object.rs b/moq-transport/src/message/object.rs index a606f31..c4cb412 100644 --- a/moq-transport/src/message/object.rs +++ b/moq-transport/src/message/object.rs @@ -1,9 +1,9 @@ -use std::time; +use std::{io, time}; -use crate::coding::{DecodeError, EncodeError, VarInt}; +use tokio::io::AsyncReadExt; use crate::coding::{AsyncRead, AsyncWrite}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; /// Sent by the publisher as the header of each data stream. #[derive(Clone, Debug)] @@ -13,47 +13,75 @@ pub struct Object { pub track: VarInt, // The sequence number within the track. + pub group: VarInt, + + // The sequence number within the group. pub sequence: VarInt, - // The priority, where **larger** values are sent first. - // Proposal: int32 instead of a varint. - pub priority: i32, + // The priority, where **smaller** values are sent first. + pub priority: u32, // Cache the object for at most this many seconds. // Zero means never expire. pub expires: Option, + + /// An optional size, allowing multiple OBJECTs on the same stream. + pub size: Option, } impl Object { pub async fn decode(r: &mut R) -> Result { - let typ = VarInt::decode(r).await?; - if typ.into_inner() != 0 { - return Err(DecodeError::InvalidType(typ)); - } + // Try reading the first byte, returning a special error if the stream naturally ended. + let typ = match r.read_u8().await { + Ok(b) => VarInt::decode_byte(b, r).await?, + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Err(DecodeError::Final), + Err(e) => return Err(e.into()), + }; - // NOTE: size has been omitted + let size_present = match typ.into_inner() { + 0 => false, + 2 => true, + _ => return Err(DecodeError::InvalidMessage(typ)), + }; let track = VarInt::decode(r).await?; + let group = VarInt::decode(r).await?; let sequence = VarInt::decode(r).await?; - let priority = r.read_i32().await?; // big-endian + let priority = VarInt::decode(r).await?.try_into()?; + let expires = match VarInt::decode(r).await?.into_inner() { 0 => None, secs => Some(time::Duration::from_secs(secs)), }; + // The presence of the size field depends on the type. + let size = match size_present { + true => Some(VarInt::decode(r).await?), + false => None, + }; + Ok(Self { track, + group, sequence, priority, expires, + size, }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - VarInt::ZERO.encode(w).await?; + // The kind changes based on the presence of the size. + let kind = match self.size { + Some(_) => VarInt::from_u32(2), + None => VarInt::ZERO, + }; + + kind.encode(w).await?; self.track.encode(w).await?; + self.group.encode(w).await?; self.sequence.encode(w).await?; - w.write_i32(self.priority).await?; + VarInt::from_u32(self.priority).encode(w).await?; // Round up if there's any decimal points. let expires = match self.expires { @@ -65,6 +93,10 @@ impl Object { VarInt::try_from(expires)?.encode(w).await?; + if let Some(size) = self.size { + size.encode(w).await?; + } + Ok(()) } } diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index 2f21a95..8f24f16 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -1,4 +1,4 @@ -use crate::coding::{decode_string, encode_string, DecodeError, EncodeError, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; @@ -7,32 +7,119 @@ use crate::coding::{AsyncRead, AsyncWrite}; /// Objects will use the provided ID instead of the full track name, to save bytes. #[derive(Clone, Debug)] pub struct Subscribe { - // An ID we choose so we can map to the track_name. + /// An ID we choose so we can map to the track_name. // Proposal: https://github.com/moq-wg/moq-transport/issues/209 pub id: VarInt, - // The track namespace. + /// The track namespace. pub namespace: String, - // The track name. + /// The track name. pub name: String, + + /// The start/end group/object. + pub start_group: SubscribeLocation, + pub start_object: SubscribeLocation, + pub end_group: SubscribeLocation, + pub end_object: SubscribeLocation, + + /// Optional parameters + pub params: Params, } impl Subscribe { pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; - let namespace = decode_string(r).await?; - let name = decode_string(r).await?; + let namespace = String::decode(r).await?; + let name = String::decode(r).await?; - Ok(Self { id, namespace, name }) + let start_group = SubscribeLocation::decode(r).await?; + let start_object = SubscribeLocation::decode(r).await?; + let end_group = SubscribeLocation::decode(r).await?; + let end_object = SubscribeLocation::decode(r).await?; + + // You can't have a start object without a start group. + if start_group == SubscribeLocation::None && start_object != SubscribeLocation::None { + return Err(DecodeError::InvalidSubscribeLocation); + } + + // You can't have an end object without an end group. + if end_group == SubscribeLocation::None && end_object != SubscribeLocation::None { + return Err(DecodeError::InvalidSubscribeLocation); + } + + // NOTE: There's some more location restrictions in the draft, but they're enforced at a higher level. + + let params = Params::decode(r).await?; + + Ok(Self { + id, + namespace, + name, + start_group, + start_object, + end_group, + end_object, + params, + }) } -} -impl Subscribe { pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; - encode_string(&self.namespace, w).await?; - encode_string(&self.name, w).await?; + self.namespace.encode(w).await?; + self.name.encode(w).await?; + + self.start_group.encode(w).await?; + self.start_object.encode(w).await?; + self.end_group.encode(w).await?; + self.end_object.encode(w).await?; + + self.params.encode(w).await?; + + Ok(()) + } +} + +/// Signal where the subscription should begin, relative to the current cache. +#[derive(Clone, Debug, PartialEq)] +pub enum SubscribeLocation { + None, + Absolute(VarInt), + Latest(VarInt), + Future(VarInt), +} + +impl SubscribeLocation { + pub async fn decode(r: &mut R) -> Result { + let kind = VarInt::decode(r).await?; + + match kind.into_inner() { + 0 => Ok(Self::None), + 1 => Ok(Self::Absolute(VarInt::decode(r).await?)), + 2 => Ok(Self::Latest(VarInt::decode(r).await?)), + 3 => Ok(Self::Future(VarInt::decode(r).await?)), + _ => Err(DecodeError::InvalidSubscribeLocation), + } + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + match self { + Self::None => { + VarInt::from_u32(0).encode(w).await?; + } + Self::Absolute(val) => { + VarInt::from_u32(1).encode(w).await?; + val.encode(w).await?; + } + Self::Latest(val) => { + VarInt::from_u32(2).encode(w).await?; + val.encode(w).await?; + } + Self::Future(val) => { + VarInt::from_u32(3).encode(w).await?; + val.encode(w).await?; + } + } Ok(()) } diff --git a/moq-transport/src/message/subscribe_error.rs b/moq-transport/src/message/subscribe_error.rs new file mode 100644 index 0000000..fa481bf --- /dev/null +++ b/moq-transport/src/message/subscribe_error.rs @@ -0,0 +1,35 @@ +use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +/// Sent by the publisher to reject a Subscribe. +#[derive(Clone, Debug)] +pub struct SubscribeError { + // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 + + // The ID for this subscription. + pub id: VarInt, + + // An error code. + pub code: u32, + + // An optional, human-readable reason. + pub reason: String, +} + +impl SubscribeError { + pub async fn decode(r: &mut R) -> Result { + let id = VarInt::decode(r).await?; + let code = VarInt::decode(r).await?.try_into()?; + let reason = String::decode(r).await?; + + Ok(Self { id, code, reason }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.id.encode(w).await?; + VarInt::from_u32(self.code).encode(w).await?; + self.reason.encode(w).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscribe_fin.rs b/moq-transport/src/message/subscribe_fin.rs new file mode 100644 index 0000000..e7899fe --- /dev/null +++ b/moq-transport/src/message/subscribe_fin.rs @@ -0,0 +1,36 @@ +use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +/// Sent by the publisher to cleanly terminate a Subscribe. +#[derive(Clone, Debug)] +pub struct SubscribeFin { + // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 + /// The ID for this subscription. + pub id: VarInt, + + /// The final group/object sent on this subscription. + pub final_group: VarInt, + pub final_object: VarInt, +} + +impl SubscribeFin { + pub async fn decode(r: &mut R) -> Result { + let id = VarInt::decode(r).await?; + let final_group = VarInt::decode(r).await?; + let final_object = VarInt::decode(r).await?; + + Ok(Self { + id, + final_group, + final_object, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.id.encode(w).await?; + self.final_group.encode(w).await?; + self.final_object.encode(w).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscribe_ok.rs b/moq-transport/src/message/subscribe_ok.rs index bbc7f39..f96c022 100644 --- a/moq-transport/src/message/subscribe_ok.rs +++ b/moq-transport/src/message/subscribe_ok.rs @@ -1,4 +1,4 @@ -use crate::coding::{DecodeError, EncodeError, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; @@ -6,21 +6,25 @@ use crate::coding::{AsyncRead, AsyncWrite}; #[derive(Clone, Debug)] pub struct SubscribeOk { // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 - - // The ID for this track. + /// The ID for this track. pub id: VarInt, + + /// The subscription will expire in this many milliseconds. + pub expires: VarInt, } impl SubscribeOk { pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; - Ok(Self { id }) + let expires = VarInt::decode(r).await?; + Ok(Self { id, expires }) } } impl SubscribeOk { pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; + self.expires.encode(w).await?; Ok(()) } } diff --git a/moq-transport/src/message/subscribe_reset.rs b/moq-transport/src/message/subscribe_reset.rs index 2daf9b2..bd458b3 100644 --- a/moq-transport/src/message/subscribe_reset.rs +++ b/moq-transport/src/message/subscribe_reset.rs @@ -1,35 +1,48 @@ -use crate::coding::{decode_string, encode_string, DecodeError, EncodeError, VarInt}; - use crate::coding::{AsyncRead, AsyncWrite}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -/// Sent by the publisher to reject a Subscribe. +/// Sent by the publisher to terminate a Subscribe. #[derive(Clone, Debug)] pub struct SubscribeReset { // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 - - // The ID for this subscription. + /// The ID for this subscription. pub id: VarInt, - // An error code. + /// An error code. pub code: u32, - // An optional, human-readable reason. + /// An optional, human-readable reason. pub reason: String, + + /// The final group/object sent on this subscription. + pub final_group: VarInt, + pub final_object: VarInt, } impl SubscribeReset { pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; let code = VarInt::decode(r).await?.try_into()?; - let reason = decode_string(r).await?; + let reason = String::decode(r).await?; + let final_group = VarInt::decode(r).await?; + let final_object = VarInt::decode(r).await?; - Ok(Self { id, code, reason }) + Ok(Self { + id, + code, + reason, + final_group, + final_object, + }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; VarInt::from_u32(self.code).encode(w).await?; - encode_string(&self.reason, w).await?; + self.reason.encode(w).await?; + + self.final_group.encode(w).await?; + self.final_object.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/message/announce_stop.rs b/moq-transport/src/message/unannounce.rs similarity index 65% rename from moq-transport/src/message/announce_stop.rs rename to moq-transport/src/message/unannounce.rs index e184d90..e93188c 100644 --- a/moq-transport/src/message/announce_stop.rs +++ b/moq-transport/src/message/unannounce.rs @@ -1,23 +1,23 @@ -use crate::coding::{decode_string, encode_string, DecodeError, EncodeError}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use crate::coding::{AsyncRead, AsyncWrite}; /// Sent by the publisher to terminate an Announce. #[derive(Clone, Debug)] -pub struct AnnounceStop { +pub struct Unannounce { // Echo back the namespace that was reset pub namespace: String, } -impl AnnounceStop { +impl Unannounce { pub async fn decode(r: &mut R) -> Result { - let namespace = decode_string(r).await?; + let namespace = String::decode(r).await?; Ok(Self { namespace }) } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - encode_string(&self.namespace, w).await?; + self.namespace.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/message/subscribe_stop.rs b/moq-transport/src/message/unsubscribe.rs similarity index 80% rename from moq-transport/src/message/subscribe_stop.rs rename to moq-transport/src/message/unsubscribe.rs index a1170d2..d7d49e2 100644 --- a/moq-transport/src/message/subscribe_stop.rs +++ b/moq-transport/src/message/unsubscribe.rs @@ -1,24 +1,24 @@ -use crate::coding::{DecodeError, EncodeError, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; /// Sent by the subscriber to terminate a Subscribe. #[derive(Clone, Debug)] -pub struct SubscribeStop { +pub struct Unsubscribe { // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 // The ID for this subscription. pub id: VarInt, } -impl SubscribeStop { +impl Unsubscribe { pub async fn decode(r: &mut R) -> Result { let id = VarInt::decode(r).await?; Ok(Self { id }) } } -impl SubscribeStop { +impl Unsubscribe { pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.id.encode(w).await?; Ok(()) diff --git a/moq-transport/src/session/client.rs b/moq-transport/src/session/client.rs index c9b26c1..f12e8b8 100644 --- a/moq-transport/src/session/client.rs +++ b/moq-transport/src/session/client.rs @@ -34,20 +34,19 @@ impl Client { let client = setup::Client { role, - versions: vec![setup::Version::KIXEL_00].into(), + versions: vec![setup::Version::KIXEL_01].into(), + params: Default::default(), }; client.encode(&mut control.0).await?; let server = setup::Server::decode(&mut control.1).await?; - if server.version != setup::Version::KIXEL_00 { - return Err(SessionError::Version(Some(server.version))); - } - - // Make sure the server replied with the - if !client.role.is_compatible(server.role) { - return Err(SessionError::RoleIncompatible(client.role, server.role)); + if server.version != setup::Version::KIXEL_01 { + return Err(SessionError::Version( + vec![setup::Version::KIXEL_01].into(), + vec![server.version].into(), + )); } Ok(control) diff --git a/moq-transport/src/session/error.rs b/moq-transport/src/session/error.rs index d940d0c..c0bdee0 100644 --- a/moq-transport/src/session/error.rs +++ b/moq-transport/src/session/error.rs @@ -14,8 +14,8 @@ pub enum SessionError { #[error("decode error: {0}")] Decode(#[from] coding::DecodeError), - #[error("unsupported version: {0:?}")] - Version(Option), + #[error("unsupported versions: client={0:?} server={1:?}")] + Version(setup::Versions, setup::Versions), #[error("incompatible roles: client={0:?} server={1:?}")] RoleIncompatible(setup::Role, setup::Role), @@ -32,6 +32,18 @@ pub enum SessionError { #[error("role violation: msg={0}")] RoleViolation(VarInt), + /// Our enforced stream mapping was disrespected. + #[error("stream mapping conflict")] + StreamMapping, + + /// The priority was invalid. + #[error("invalid priority: {0}")] + InvalidPriority(VarInt), + + /// The size was invalid. + #[error("invalid size: {0}")] + InvalidSize(VarInt), + /// An unclassified error because I'm lazy. TODO classify these errors #[error("unknown error: {0}")] Unknown(String), @@ -44,29 +56,40 @@ impl MoqError for SessionError { Self::Cache(err) => err.code(), Self::RoleIncompatible(..) => 406, Self::RoleViolation(..) => 405, + Self::StreamMapping => 409, Self::Unknown(_) => 500, Self::Write(_) => 501, Self::Read(_) => 502, Self::Session(_) => 503, - Self::Version(_) => 406, + Self::Version(..) => 406, Self::Encode(_) => 500, Self::Decode(_) => 500, + Self::InvalidPriority(_) => 400, + Self::InvalidSize(_) => 400, } } /// A reason that is sent over the wire. - fn reason(&self) -> &str { + fn reason(&self) -> String { match self { Self::Cache(err) => err.reason(), - Self::RoleViolation(_) => "role violation", - Self::RoleIncompatible(..) => "role incompatible", - Self::Read(_) => "read error", - Self::Write(_) => "write error", - Self::Session(_) => "session error", - Self::Unknown(_) => "unknown", - Self::Version(_) => "unsupported version", - Self::Encode(_) => "encode error", - Self::Decode(_) => "decode error", + Self::RoleViolation(kind) => format!("role violation for message type {:?}", kind), + Self::RoleIncompatible(client, server) => { + format!( + "role incompatible: client wanted {:?} but server wanted {:?}", + client, server + ) + } + Self::Read(err) => format!("read error: {}", err), + Self::Write(err) => format!("write error: {}", err), + Self::Session(err) => format!("session error: {}", err), + Self::Unknown(err) => format!("unknown error: {}", err), + Self::Version(client, server) => format!("unsupported versions: client={:?} server={:?}", client, server), + Self::Encode(err) => format!("encode error: {}", err), + Self::Decode(err) => format!("decode error: {}", err), + Self::StreamMapping => "streaming mapping conflict".to_owned(), + Self::InvalidPriority(priority) => format!("invalid priority: {}", priority), + Self::InvalidSize(size) => format!("invalid size: {}", size), } } } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 5c8e761..eb926f7 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -85,9 +85,9 @@ impl Publisher { async fn recv_message(&mut self, msg: &Message) -> Result<(), SessionError> { match msg { Message::AnnounceOk(msg) => self.recv_announce_ok(msg).await, - Message::AnnounceStop(msg) => self.recv_announce_stop(msg).await, + Message::AnnounceError(msg) => self.recv_announce_error(msg).await, Message::Subscribe(msg) => self.recv_subscribe(msg).await, - Message::SubscribeStop(msg) => self.recv_subscribe_stop(msg).await, + Message::Unsubscribe(msg) => self.recv_unsubscribe(msg).await, _ => Err(SessionError::RoleViolation(msg.id())), } } @@ -97,7 +97,7 @@ impl Publisher { Err(CacheError::NotFound.into()) } - async fn recv_announce_stop(&mut self, _msg: &message::AnnounceStop) -> Result<(), SessionError> { + async fn recv_announce_error(&mut self, _msg: &message::AnnounceError) -> Result<(), SessionError> { // We didn't send an announce. Err(CacheError::NotFound.into()) } @@ -115,14 +115,24 @@ impl Publisher { hash_map::Entry::Vacant(entry) => entry.insert(abort), }; - self.control.send(message::SubscribeOk { id: msg.id }).await + self.control + .send(message::SubscribeOk { + id: msg.id, + expires: VarInt::ZERO, + }) + .await } async fn reset_subscribe(&mut self, id: VarInt, err: E) -> Result<(), SessionError> { let msg = message::SubscribeReset { id, code: err.code(), - reason: err.reason().to_string(), + reason: err.reason(), + + // TODO properly populate these + // But first: https://github.com/moq-wg/moq-transport/issues/313 + final_group: VarInt::ZERO, + final_object: VarInt::ZERO, }; self.control.send(msg).await @@ -176,31 +186,42 @@ impl Publisher { } async fn run_segment(&self, id: VarInt, segment: &mut segment::Subscriber) -> Result<(), SessionError> { - let object = message::Object { - track: id, - sequence: segment.sequence, - priority: segment.priority, - expires: segment.expires, - }; - - log::trace!("serving object: {:?}", object); + log::trace!("serving group: {:?}", segment); let mut stream = self.webtransport.open_uni().await?; - stream.set_priority(object.priority).ok(); - object - .encode(&mut stream) - .await - .map_err(|e| SessionError::Unknown(e.to_string()))?; + // Convert the u32 to a i32, since the Quinn set_priority is signed. + let priority = (segment.priority as i64 - i32::MAX as i64) as i32; + stream.set_priority(priority).ok(); - while let Some(data) = segment.read_chunk().await? { - stream.write_chunk(data).await?; + while let Some(mut fragment) = segment.next_fragment().await? { + let object = message::Object { + track: id, + + // Properties of the segment + group: segment.sequence, + priority: segment.priority, + expires: segment.expires, + + // Properties of the fragment + sequence: fragment.sequence, + size: fragment.size, + }; + + object + .encode(&mut stream) + .await + .map_err(|e| SessionError::Unknown(e.to_string()))?; + + while let Some(chunk) = fragment.read_chunk().await? { + stream.write_all(&chunk).await?; + } } Ok(()) } - async fn recv_subscribe_stop(&mut self, msg: &message::SubscribeStop) -> Result<(), SessionError> { + async fn recv_unsubscribe(&mut self, msg: &message::Unsubscribe) -> Result<(), SessionError> { let abort = self .subscribes .lock() diff --git a/moq-transport/src/session/server.rs b/moq-transport/src/session/server.rs index 0c5205f..0bd6137 100644 --- a/moq-transport/src/session/server.rs +++ b/moq-transport/src/session/server.rs @@ -18,8 +18,8 @@ impl Server { client .versions .iter() - .find(|version| **version == setup::Version::KIXEL_00) - .ok_or_else(|| SessionError::Version(client.versions.last().cloned()))?; + .find(|version| **version == setup::Version::KIXEL_01) + .ok_or_else(|| SessionError::Version(client.versions.clone(), vec![setup::Version::KIXEL_01].into()))?; Ok(Request { session, @@ -63,7 +63,8 @@ impl Request { async fn send_setup(&mut self, role: setup::Role) -> Result<(), SessionError> { let server = setup::Server { role, - version: setup::Version::KIXEL_00, + version: setup::Version::KIXEL_01, + params: Default::default(), }; // We need to sure we support the opposite of the client's role. diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index c7fdde4..601bd2f 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -6,7 +6,8 @@ use std::{ }; use crate::{ - cache::{broadcast, segment, track, CacheError}, + cache::{broadcast, fragment, segment, track, CacheError}, + coding::DecodeError, message, message::Message, session::{Control, SessionError}, @@ -64,28 +65,28 @@ impl Subscriber { let msg = self.control.recv().await?; log::info!("message received: {:?}", msg); - if let Err(err) = self.recv_message(&msg).await { + if let Err(err) = self.recv_message(&msg) { log::warn!("message error: {:?} {:?}", err, msg); } } } - async fn recv_message(&mut self, msg: &Message) -> Result<(), SessionError> { + fn recv_message(&mut self, msg: &Message) -> Result<(), SessionError> { match msg { - Message::Announce(_) => Ok(()), // don't care - Message::AnnounceReset(_) => Ok(()), // also don't care - Message::SubscribeOk(_) => Ok(()), // guess what, don't care - Message::SubscribeReset(msg) => self.recv_subscribe_reset(msg).await, + Message::Announce(_) => Ok(()), // don't care + Message::Unannounce(_) => Ok(()), // also don't care + Message::SubscribeOk(_msg) => Ok(()), // don't care + Message::SubscribeReset(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code)), + Message::SubscribeFin(msg) => self.recv_subscribe_error(msg.id, CacheError::Closed), + Message::SubscribeError(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code)), Message::GoAway(_msg) => unimplemented!("GOAWAY"), _ => Err(SessionError::RoleViolation(msg.id())), } } - async fn recv_subscribe_reset(&mut self, msg: &message::SubscribeReset) -> Result<(), SessionError> { - let err = CacheError::Reset(msg.code); - + fn recv_subscribe_error(&mut self, id: VarInt, err: CacheError) -> Result<(), SessionError> { let mut subscribes = self.subscribes.lock().unwrap(); - let subscribe = subscribes.remove(&msg.id).ok_or(CacheError::NotFound)?; + let subscribe = subscribes.remove(&id).ok_or(CacheError::NotFound)?; subscribe.close(err)?; Ok(()) @@ -107,36 +108,82 @@ impl Subscriber { async fn run_stream(self, mut stream: RecvStream) -> Result<(), SessionError> { // Decode the object on the data stream. - let object = message::Object::decode(&mut stream) + let mut object = message::Object::decode(&mut stream) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; log::trace!("received object: {:?}", object); // A new scope is needed because the async compiler is dumb - let mut publisher = { + let mut segment = { let mut subscribes = self.subscribes.lock().unwrap(); let track = subscribes.get_mut(&object.track).ok_or(CacheError::NotFound)?; track.create_segment(segment::Info { - sequence: object.sequence, + sequence: object.group, priority: object.priority, expires: object.expires, })? }; - while let Some(data) = stream.read_chunk(usize::MAX, true).await? { - // NOTE: This does not make a copy! - // Bytes are immutable and ref counted. - publisher.write_chunk(data.bytes)?; + // Create the first fragment + let mut fragment = segment.create_fragment(fragment::Info { + sequence: object.sequence, + size: object.size, + })?; + + let mut remain = object.size.map(usize::from); + + loop { + if let Some(0) = remain { + // Decode the next object from the stream. + let next = match message::Object::decode(&mut stream).await { + Ok(next) => next, + + // No more objects + Err(DecodeError::Final) => break, + + // Unknown error + Err(err) => return Err(err.into()), + }; + + // NOTE: This is a custom restriction; not part of the moq-transport draft. + // We require every OBJECT to contain the same priority since prioritization is done per-stream. + // We also require every OBJECT to contain the same group so we know when the group ends, and can detect gaps. + if next.priority != object.priority && next.group != object.group { + return Err(SessionError::StreamMapping); + } + + // Create a new object. + fragment = segment.create_fragment(fragment::Info { + sequence: object.sequence, + size: object.size, + })?; + + object = next; + remain = object.size.map(usize::from); + } + + match stream.read_chunk(remain.unwrap_or(usize::MAX), true).await? { + // Unbounded object has ended + None if remain.is_none() => break, + + // Bounded object ended early, oops. + None => return Err(DecodeError::UnexpectedEnd.into()), + + // NOTE: This does not make a copy! + // Bytes are immutable and ref counted. + Some(data) => fragment.write_chunk(data.bytes)?, + } } Ok(()) } async fn run_source(mut self) -> Result<(), SessionError> { - // NOTE: This returns Closed when the source is closed. - while let Some(track) = self.source.next_track().await? { + loop { + // NOTE: This returns Closed when the source is closed. + let track = self.source.next_track().await?; let name = track.name.clone(); let id = VarInt::from_u32(self.next.fetch_add(1, atomic::Ordering::SeqCst)); @@ -146,11 +193,17 @@ impl Subscriber { id, namespace: "".to_string(), name, + + // TODO correctly support these + start_group: message::SubscribeLocation::Latest(VarInt::ZERO), + start_object: message::SubscribeLocation::Absolute(VarInt::ZERO), + end_group: message::SubscribeLocation::None, + end_object: message::SubscribeLocation::None, + + params: Default::default(), }; self.control.send(msg).await?; } - - Ok(()) } } diff --git a/moq-transport/src/setup/client.rs b/moq-transport/src/setup/client.rs index 220ff52..756ded8 100644 --- a/moq-transport/src/setup/client.rs +++ b/moq-transport/src/setup/client.rs @@ -1,6 +1,6 @@ use super::{Role, Versions}; use crate::{ - coding::{DecodeError, EncodeError}, + coding::{Decode, DecodeError, Encode, EncodeError, Params}, VarInt, }; @@ -15,29 +15,45 @@ pub struct Client { pub versions: Versions, /// Indicate if the client is a publisher, a subscriber, or both. - // Proposal: moq-wg/moq-transport#151 pub role: Role, + + /// Unknown parameters. + pub params: Params, } impl Client { /// Decode a client setup message. pub async fn decode(r: &mut R) -> Result { let typ = VarInt::decode(r).await?; - if typ.into_inner() != 1 { - return Err(DecodeError::InvalidType(typ)); + if typ.into_inner() != 0x40 { + return Err(DecodeError::InvalidMessage(typ)); } let versions = Versions::decode(r).await?; - let role = Role::decode(r).await?; + let mut params = Params::decode(r).await?; - Ok(Self { versions, role }) + let role = params + .get::(VarInt::from_u32(0)) + .await? + .ok_or(DecodeError::MissingParameter)?; + + // Make sure the PATH parameter isn't used + // TODO: This assumes WebTransport support only + if params.has(VarInt::from_u32(1)) { + return Err(DecodeError::InvalidParameter); + } + + Ok(Self { versions, role, params }) } /// Encode a server setup message. pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - VarInt::from_u32(1).encode(w).await?; + VarInt::from_u32(0x40).encode(w).await?; self.versions.encode(w).await?; - self.role.encode(w).await?; + + let mut params = self.params.clone(); + params.set(VarInt::from_u32(0), self.role).await?; + params.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/setup/role.rs b/moq-transport/src/setup/role.rs index 9620697..10b30e0 100644 --- a/moq-transport/src/setup/role.rs +++ b/moq-transport/src/setup/role.rs @@ -1,6 +1,6 @@ use crate::coding::{AsyncRead, AsyncWrite}; -use crate::coding::{DecodeError, EncodeError, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; /// Indicates the endpoint is a publisher, subscriber, or both. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -36,9 +36,9 @@ impl Role { impl From for VarInt { fn from(r: Role) -> Self { VarInt::from_u32(match r { - Role::Publisher => 0x0, - Role::Subscriber => 0x1, - Role::Both => 0x2, + Role::Publisher => 0x1, + Role::Subscriber => 0x2, + Role::Both => 0x3, }) } } @@ -48,23 +48,27 @@ impl TryFrom for Role { fn try_from(v: VarInt) -> Result { match v.into_inner() { - 0x0 => Ok(Self::Publisher), - 0x1 => Ok(Self::Subscriber), - 0x2 => Ok(Self::Both), - _ => Err(DecodeError::InvalidType(v)), + 0x1 => Ok(Self::Publisher), + 0x2 => Ok(Self::Subscriber), + 0x3 => Ok(Self::Both), + _ => Err(DecodeError::InvalidRole(v)), } } } -impl Role { +#[async_trait::async_trait] +impl Decode for Role { /// Decode the role. - pub async fn decode(r: &mut R) -> Result { + async fn decode(r: &mut R) -> Result { let v = VarInt::decode(r).await?; v.try_into() } +} +#[async_trait::async_trait] +impl Encode for Role { /// Encode the role. - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { VarInt::from(*self).encode(w).await } } diff --git a/moq-transport/src/setup/server.rs b/moq-transport/src/setup/server.rs index 819bb4c..aca9975 100644 --- a/moq-transport/src/setup/server.rs +++ b/moq-transport/src/setup/server.rs @@ -1,6 +1,6 @@ use super::{Role, Version}; use crate::{ - coding::{DecodeError, EncodeError}, + coding::{Decode, DecodeError, Encode, EncodeError, Params}, VarInt, }; @@ -17,27 +17,43 @@ pub struct Server { /// Indicate if the server is a publisher, a subscriber, or both. // Proposal: moq-wg/moq-transport#151 pub role: Role, + + /// Unknown parameters. + pub params: Params, } impl Server { /// Decode the server setup. pub async fn decode(r: &mut R) -> Result { let typ = VarInt::decode(r).await?; - if typ.into_inner() != 2 { - return Err(DecodeError::InvalidType(typ)); + if typ.into_inner() != 0x41 { + return Err(DecodeError::InvalidMessage(typ)); } let version = Version::decode(r).await?; - let role = Role::decode(r).await?; + let mut params = Params::decode(r).await?; - Ok(Self { version, role }) + let role = params + .get::(VarInt::from_u32(0)) + .await? + .ok_or(DecodeError::MissingParameter)?; + + // Make sure the PATH parameter isn't used + if params.has(VarInt::from_u32(1)) { + return Err(DecodeError::InvalidParameter); + } + + Ok(Self { version, role, params }) } /// Encode the server setup. pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - VarInt::from_u32(2).encode(w).await?; + VarInt::from_u32(0x41).encode(w).await?; self.version.encode(w).await?; - self.role.encode(w).await?; + + let mut params = self.params.clone(); + params.set(VarInt::from_u32(0), self.role).await?; + params.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/setup/version.rs b/moq-transport/src/setup/version.rs index 17b67d3..e67fe84 100644 --- a/moq-transport/src/setup/version.rs +++ b/moq-transport/src/setup/version.rs @@ -1,4 +1,4 @@ -use crate::coding::{DecodeError, EncodeError, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; @@ -56,6 +56,19 @@ impl Version { /// # GROUP /// - GROUP concept was removed, replaced with OBJECT as a QUIC stream. pub const KIXEL_00: Version = Version(VarInt::from_u32(0xbad00)); + + /// Fork of draft-ietf-moq-transport-01. + /// + /// Most of the KIXEL_00 changes made it into the draft, or were reverted. + /// Check out the referenced issue on: github.com/moq-wg/moq-transport + /// + /// - SUBSCRIBE contains a separate track namespace and track name field (accidental revert). [#277](https://github.com/moq-wg/moq-transport/pull/277) + /// - SUBSCRIBE contains the `track_id` instead of SUBSCRIBE_OK. [#145](https://github.com/moq-wg/moq-transport/issues/145) + /// - SUBSCRIBE_* reference `track_id` the instead of the `track_full_name`. [#145](https://github.com/moq-wg/moq-transport/issues/145) + /// - OBJECT `priority` is still a VarInt, but the max value is a u32 (implementation reasons) + /// - OBJECT `expires` was added, a VarInt in seconds. [#249](https://github.com/moq-wg/moq-transport/issues/249) + /// - OBJECT messages within the same `group` MUST be on the same QUIC stream. + pub const KIXEL_01: Version = Version(VarInt::from_u32(0xbad01)); } impl From for Version { @@ -88,9 +101,10 @@ impl Version { #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Versions(Vec); -impl Versions { +#[async_trait::async_trait] +impl Decode for Versions { /// Decode the version list. - pub async fn decode(r: &mut R) -> Result { + async fn decode(r: &mut R) -> Result { let count = VarInt::decode(r).await?.into_inner(); let mut vs = Vec::new(); @@ -101,9 +115,12 @@ impl Versions { Ok(Self(vs)) } +} +#[async_trait::async_trait] +impl Encode for Versions { /// Encode the version list. - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { let size: VarInt = self.0.len().try_into()?; size.encode(w).await?;