From df5d3627549cecde838a9ce472ca2f5684df95b2 Mon Sep 17 00:00:00 2001 From: kixelated Date: Fri, 3 Nov 2023 15:10:15 +0900 Subject: [PATCH] Add optional/required extensions. (#117) --- Cargo.lock | 7 ++ moq-transport/Cargo.toml | 1 + moq-transport/src/lib.rs | 4 +- moq-transport/src/message/announce.rs | 5 +- moq-transport/src/message/announce_ok.rs | 9 ++- moq-transport/src/message/announce_reset.rs | 5 +- moq-transport/src/message/go_away.rs | 5 +- moq-transport/src/message/mod.rs | 9 ++- moq-transport/src/message/object.rs | 18 +++-- moq-transport/src/message/subscribe.rs | 26 ++++-- moq-transport/src/message/subscribe_error.rs | 5 +- moq-transport/src/message/subscribe_fin.rs | 5 +- moq-transport/src/message/subscribe_ok.rs | 5 +- moq-transport/src/message/subscribe_reset.rs | 5 +- moq-transport/src/message/unannounce.rs | 5 +- moq-transport/src/message/unsubscribe.rs | 5 +- moq-transport/src/session/client.rs | 43 +++++++--- moq-transport/src/session/control.rs | 10 ++- moq-transport/src/session/error.rs | 6 ++ moq-transport/src/session/publisher.rs | 13 ++- moq-transport/src/session/server.rs | 54 +++++++++---- moq-transport/src/session/subscriber.rs | 12 ++- moq-transport/src/setup/client.rs | 16 +++- moq-transport/src/setup/extension.rs | 84 ++++++++++++++++++++ moq-transport/src/setup/mod.rs | 2 + moq-transport/src/setup/server.rs | 15 +++- moq-transport/src/setup/version.rs | 14 +++- 27 files changed, 296 insertions(+), 92 deletions(-) create mode 100644 moq-transport/src/setup/extension.rs diff --git a/Cargo.lock b/Cargo.lock index 5020f28..6820423 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1111,6 +1111,7 @@ dependencies = [ "bytes", "indexmap 2.0.0", "log", + "paste", "quinn", "thiserror", "tokio", @@ -1321,6 +1322,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + [[package]] name = "percent-encoding" version = "2.3.0" diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 7c9a0bd..661a117 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -26,3 +26,4 @@ webtransport-quinn = "0.6" #webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" } async-trait = "0.1" +paste = "1" diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index cdff944..08f4485 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -5,9 +5,7 @@ //! The specification is a work in progress and will change. //! See the [specification](https://datatracker.ietf.org/doc/draft-ietf-moq-transport/) and [github](https://github.com/moq-wg/moq-transport) for any updates. //! -//! **FORKED**: This implementation makes some changes to the protocol. -//! See [KIXEL_01](crate::setup::Version::KIXEL_01) for a list of differences. -//! Many of these will get merged into the specification, so don't panic. +//! This implementation has some required extensions until the draft stablizes. See: [Extensions](crate::setup::Extensions) mod coding; mod error; diff --git a/moq-transport/src/message/announce.rs b/moq-transport/src/message/announce.rs index 709339d..281fffa 100644 --- a/moq-transport/src/message/announce.rs +++ b/moq-transport/src/message/announce.rs @@ -1,6 +1,7 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; use crate::coding::{AsyncRead, AsyncWrite}; +use crate::setup::Extensions; /// Sent by the publisher to announce the availability of a group of tracks. #[derive(Clone, Debug)] @@ -13,14 +14,14 @@ pub struct Announce { } impl Announce { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { let namespace = String::decode(r).await?; let params = Params::decode(r).await?; Ok(Self { namespace, params }) } - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { self.namespace.encode(w).await?; self.params.encode(w).await?; diff --git a/moq-transport/src/message/announce_ok.rs b/moq-transport/src/message/announce_ok.rs index a5c4792..300279e 100644 --- a/moq-transport/src/message/announce_ok.rs +++ b/moq-transport/src/message/announce_ok.rs @@ -1,4 +1,7 @@ -use crate::coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError}; +use crate::{ + coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError}, + setup::Extensions, +}; /// Sent by the subscriber to accept an Announce. #[derive(Clone, Debug)] @@ -9,12 +12,12 @@ pub struct AnnounceOk { } impl AnnounceOk { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { let namespace = String::decode(r).await?; Ok(Self { namespace }) } - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { self.namespace.encode(w).await } } diff --git a/moq-transport/src/message/announce_reset.rs b/moq-transport/src/message/announce_reset.rs index e21886b..24d3f81 100644 --- a/moq-transport/src/message/announce_reset.rs +++ b/moq-transport/src/message/announce_reset.rs @@ -1,6 +1,7 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; +use crate::setup::Extensions; /// Sent by the subscriber to reject an Announce. #[derive(Clone, Debug)] @@ -16,7 +17,7 @@ pub struct AnnounceError { } impl AnnounceError { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { let namespace = String::decode(r).await?; let code = VarInt::decode(r).await?.try_into()?; let reason = String::decode(r).await?; @@ -28,7 +29,7 @@ impl AnnounceError { }) } - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { self.namespace.encode(w).await?; VarInt::from_u32(self.code).encode(w).await?; self.reason.encode(w).await?; diff --git a/moq-transport/src/message/go_away.rs b/moq-transport/src/message/go_away.rs index c86152a..7999c9a 100644 --- a/moq-transport/src/message/go_away.rs +++ b/moq-transport/src/message/go_away.rs @@ -1,6 +1,7 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use crate::coding::{AsyncRead, AsyncWrite}; +use crate::setup::Extensions; /// Sent by the server to indicate that the client should connect to a different server. #[derive(Clone, Debug)] @@ -9,12 +10,12 @@ pub struct GoAway { } impl GoAway { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { let url = String::decode(r).await?; Ok(Self { url }) } - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { self.url.encode(w).await } } diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index 34260d4..d32a936 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -61,6 +61,7 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use std::fmt; use crate::coding::{AsyncRead, AsyncWrite}; +use crate::setup::Extensions; // Use a macro to generate the message types rather than copy-paste. // This implements a decode/encode method that uses the specified type. @@ -73,23 +74,23 @@ macro_rules! message_types { } impl Message { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, ext: &Extensions) -> Result { let t = VarInt::decode(r).await?; match t.into_inner() { $($val => { - let msg = $name::decode(r).await?; + let msg = $name::decode(r, ext).await?; Ok(Self::$name(msg)) })* _ => Err(DecodeError::InvalidMessage(t)), } } - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, ext: &Extensions) -> Result<(), EncodeError> { match self { $(Self::$name(ref m) => { VarInt::from_u32($val).encode(w).await?; - m.encode(w).await + m.encode(w, ext).await },)* } } diff --git a/moq-transport/src/message/object.rs b/moq-transport/src/message/object.rs index c4cb412..90efa23 100644 --- a/moq-transport/src/message/object.rs +++ b/moq-transport/src/message/object.rs @@ -4,6 +4,7 @@ use tokio::io::AsyncReadExt; use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; +use crate::setup; /// Sent by the publisher as the header of each data stream. #[derive(Clone, Debug)] @@ -30,7 +31,7 @@ pub struct Object { } impl Object { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, extensions: &setup::Extensions) -> Result { // Try reading the first byte, returning a special error if the stream naturally ended. let typ = match r.read_u8().await { Ok(b) => VarInt::decode_byte(b, r).await?, @@ -49,9 +50,12 @@ impl Object { let sequence = VarInt::decode(r).await?; let priority = VarInt::decode(r).await?.try_into()?; - let expires = match VarInt::decode(r).await?.into_inner() { - 0 => None, - secs => Some(time::Duration::from_secs(secs)), + let expires = match extensions.object_expires { + true => match VarInt::decode(r).await?.into_inner() { + 0 => None, + secs => Some(time::Duration::from_secs(secs)), + }, + false => None, }; // The presence of the size field depends on the type. @@ -70,7 +74,7 @@ impl Object { }) } - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, extensions: &setup::Extensions) -> Result<(), EncodeError> { // The kind changes based on the presence of the size. let kind = match self.size { Some(_) => VarInt::from_u32(2), @@ -91,7 +95,9 @@ impl Object { Some(expires) => expires.as_secs(), }; - VarInt::try_from(expires)?.encode(w).await?; + if extensions.object_expires { + VarInt::try_from(expires)?.encode(w).await?; + } if let Some(size) = self.size { size.encode(w).await?; diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs index 8f24f16..e64d5a1 100644 --- a/moq-transport/src/message/subscribe.rs +++ b/moq-transport/src/message/subscribe.rs @@ -1,6 +1,7 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; +use crate::setup::Extensions; /// Sent by the subscriber to request all future objects for the given track. /// @@ -12,7 +13,9 @@ pub struct Subscribe { pub id: VarInt, /// The track namespace. - pub namespace: String, + /// + /// Must be None if `extensions.subscribe_split` is false. + pub namespace: Option, /// The track name. pub name: String, @@ -28,9 +31,14 @@ pub struct Subscribe { } impl Subscribe { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, ext: &Extensions) -> Result { let id = VarInt::decode(r).await?; - let namespace = String::decode(r).await?; + + let namespace = match ext.subscribe_split { + true => Some(String::decode(r).await?), + false => None, + }; + let name = String::decode(r).await?; let start_group = SubscribeLocation::decode(r).await?; @@ -64,9 +72,17 @@ impl Subscribe { }) } - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, ext: &Extensions) -> Result<(), EncodeError> { self.id.encode(w).await?; - self.namespace.encode(w).await?; + + if self.namespace.is_some() != ext.subscribe_split { + panic!("namespace must be None if subscribe_split is false"); + } + + if ext.subscribe_split { + self.namespace.as_ref().unwrap().encode(w).await?; + } + self.name.encode(w).await?; self.start_group.encode(w).await?; diff --git a/moq-transport/src/message/subscribe_error.rs b/moq-transport/src/message/subscribe_error.rs index fa481bf..9ef4c91 100644 --- a/moq-transport/src/message/subscribe_error.rs +++ b/moq-transport/src/message/subscribe_error.rs @@ -1,5 +1,6 @@ use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; +use crate::setup::Extensions; /// Sent by the publisher to reject a Subscribe. #[derive(Clone, Debug)] @@ -17,7 +18,7 @@ pub struct SubscribeError { } impl SubscribeError { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { let id = VarInt::decode(r).await?; let code = VarInt::decode(r).await?.try_into()?; let reason = String::decode(r).await?; @@ -25,7 +26,7 @@ impl SubscribeError { Ok(Self { id, code, reason }) } - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { self.id.encode(w).await?; VarInt::from_u32(self.code).encode(w).await?; self.reason.encode(w).await?; diff --git a/moq-transport/src/message/subscribe_fin.rs b/moq-transport/src/message/subscribe_fin.rs index e7899fe..b070971 100644 --- a/moq-transport/src/message/subscribe_fin.rs +++ b/moq-transport/src/message/subscribe_fin.rs @@ -1,5 +1,6 @@ use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; +use crate::setup::Extensions; /// Sent by the publisher to cleanly terminate a Subscribe. #[derive(Clone, Debug)] @@ -14,7 +15,7 @@ pub struct SubscribeFin { } impl SubscribeFin { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { let id = VarInt::decode(r).await?; let final_group = VarInt::decode(r).await?; let final_object = VarInt::decode(r).await?; @@ -26,7 +27,7 @@ impl SubscribeFin { }) } - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { self.id.encode(w).await?; self.final_group.encode(w).await?; self.final_object.encode(w).await?; diff --git a/moq-transport/src/message/subscribe_ok.rs b/moq-transport/src/message/subscribe_ok.rs index f96c022..11864e6 100644 --- a/moq-transport/src/message/subscribe_ok.rs +++ b/moq-transport/src/message/subscribe_ok.rs @@ -1,6 +1,7 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; +use crate::setup::Extensions; /// Sent by the publisher to accept a Subscribe. #[derive(Clone, Debug)] @@ -14,7 +15,7 @@ pub struct SubscribeOk { } impl SubscribeOk { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { let id = VarInt::decode(r).await?; let expires = VarInt::decode(r).await?; Ok(Self { id, expires }) @@ -22,7 +23,7 @@ impl SubscribeOk { } impl SubscribeOk { - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { self.id.encode(w).await?; self.expires.encode(w).await?; Ok(()) diff --git a/moq-transport/src/message/subscribe_reset.rs b/moq-transport/src/message/subscribe_reset.rs index bd458b3..e488b28 100644 --- a/moq-transport/src/message/subscribe_reset.rs +++ b/moq-transport/src/message/subscribe_reset.rs @@ -1,5 +1,6 @@ use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; +use crate::setup::Extensions; /// Sent by the publisher to terminate a Subscribe. #[derive(Clone, Debug)] @@ -20,7 +21,7 @@ pub struct SubscribeReset { } impl SubscribeReset { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { let id = VarInt::decode(r).await?; let code = VarInt::decode(r).await?.try_into()?; let reason = String::decode(r).await?; @@ -36,7 +37,7 @@ impl SubscribeReset { }) } - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { self.id.encode(w).await?; VarInt::from_u32(self.code).encode(w).await?; self.reason.encode(w).await?; diff --git a/moq-transport/src/message/unannounce.rs b/moq-transport/src/message/unannounce.rs index e93188c..a2c2e39 100644 --- a/moq-transport/src/message/unannounce.rs +++ b/moq-transport/src/message/unannounce.rs @@ -1,6 +1,7 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError}; use crate::coding::{AsyncRead, AsyncWrite}; +use crate::setup::Extensions; /// Sent by the publisher to terminate an Announce. #[derive(Clone, Debug)] @@ -10,13 +11,13 @@ pub struct Unannounce { } impl Unannounce { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { let namespace = String::decode(r).await?; Ok(Self { namespace }) } - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { self.namespace.encode(w).await?; Ok(()) diff --git a/moq-transport/src/message/unsubscribe.rs b/moq-transport/src/message/unsubscribe.rs index d7d49e2..5361f59 100644 --- a/moq-transport/src/message/unsubscribe.rs +++ b/moq-transport/src/message/unsubscribe.rs @@ -1,6 +1,7 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use crate::coding::{AsyncRead, AsyncWrite}; +use crate::setup::Extensions; /// Sent by the subscriber to terminate a Subscribe. #[derive(Clone, Debug)] @@ -12,14 +13,14 @@ pub struct Unsubscribe { } impl Unsubscribe { - pub async fn decode(r: &mut R) -> Result { + pub async fn decode(r: &mut R, _ext: &Extensions) -> Result { let id = VarInt::decode(r).await?; Ok(Self { id }) } } impl Unsubscribe { - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> { self.id.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/session/client.rs b/moq-transport/src/session/client.rs index f12e8b8..2918e46 100644 --- a/moq-transport/src/session/client.rs +++ b/moq-transport/src/session/client.rs @@ -1,6 +1,6 @@ -use super::{Publisher, SessionError, Subscriber}; +use super::{Control, Publisher, SessionError, Subscriber}; use crate::{cache::broadcast, setup}; -use webtransport_quinn::{RecvStream, SendStream, Session}; +use webtransport_quinn::Session; /// An endpoint that connects to a URL to publish and/or consume live streams. pub struct Client {} @@ -9,7 +9,6 @@ impl Client { /// Connect using an established WebTransport session, performing the MoQ handshake as a publisher. pub async fn publisher(session: Session, source: broadcast::Subscriber) -> Result { let control = Self::send_setup(&session, setup::Role::Publisher).await?; - let publisher = Publisher::new(session, control, source); Ok(publisher) } @@ -17,7 +16,6 @@ impl Client { /// Connect using an established WebTransport session, performing the MoQ handshake as a subscriber. pub async fn subscriber(session: Session, source: broadcast::Publisher) -> Result { let control = Self::send_setup(&session, setup::Role::Subscriber).await?; - let subscriber = Subscriber::new(session, control, source); Ok(subscriber) } @@ -29,26 +27,47 @@ impl Client { } */ - async fn send_setup(session: &Session, role: setup::Role) -> Result<(SendStream, RecvStream), SessionError> { + async fn send_setup(session: &Session, role: setup::Role) -> Result { let mut control = session.open_bi().await?; + let versions: setup::Versions = [setup::Version::DRAFT_01, setup::Version::KIXEL_01].into(); + let client = setup::Client { role, - versions: vec![setup::Version::KIXEL_01].into(), + versions: versions.clone(), params: Default::default(), + + // Offer all extensions + extensions: setup::Extensions { + object_expires: true, + subscriber_id: true, + subscribe_split: true, + }, }; client.encode(&mut control.0).await?; - let server = setup::Server::decode(&mut control.1).await?; + let mut server = setup::Server::decode(&mut control.1).await?; - if server.version != setup::Version::KIXEL_01 { - return Err(SessionError::Version( - vec![setup::Version::KIXEL_01].into(), - vec![server.version].into(), - )); + match server.version { + setup::Version::DRAFT_01 => { + // We always require this extension + server.extensions.require_subscriber_id()?; + + if server.role.is_publisher() { + // We only require object expires if we're a subscriber, so we don't cache objects indefinitely. + server.extensions.require_object_expires()?; + } + } + setup::Version::KIXEL_01 => { + // KIXEL_01 didn't support extensions; all were enabled. + server.extensions = client.extensions.clone() + } + _ => return Err(SessionError::Version(versions, [server.version].into())), } + let control = Control::new(control.0, control.1, server.extensions); + Ok(control) } } diff --git a/moq-transport/src/session/control.rs b/moq-transport/src/session/control.rs index 7c84e80..0686650 100644 --- a/moq-transport/src/session/control.rs +++ b/moq-transport/src/session/control.rs @@ -6,19 +6,21 @@ use tokio::sync::Mutex; use webtransport_quinn::{RecvStream, SendStream}; use super::SessionError; -use crate::message::Message; +use crate::{message::Message, setup::Extensions}; #[derive(Debug, Clone)] pub(crate) struct Control { send: Arc>, recv: Arc>, + pub ext: Extensions, } impl Control { - pub fn new(send: SendStream, recv: RecvStream) -> Self { + pub fn new(send: SendStream, recv: RecvStream, ext: Extensions) -> Self { Self { send: Arc::new(Mutex::new(send)), recv: Arc::new(Mutex::new(recv)), + ext, } } @@ -26,7 +28,7 @@ impl Control { let mut stream = self.send.lock().await; log::info!("sending message: {:?}", msg); msg.into() - .encode(&mut *stream) + .encode(&mut *stream, &self.ext) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; Ok(()) @@ -35,7 +37,7 @@ impl Control { // It's likely a mistake to call this from two different tasks, but it's easier to just support it. pub async fn recv(&self) -> Result { let mut stream = self.recv.lock().await; - let msg = Message::decode(&mut *stream) + let msg = Message::decode(&mut *stream, &self.ext) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; Ok(msg) diff --git a/moq-transport/src/session/error.rs b/moq-transport/src/session/error.rs index c0bdee0..cb816db 100644 --- a/moq-transport/src/session/error.rs +++ b/moq-transport/src/session/error.rs @@ -44,6 +44,10 @@ pub enum SessionError { #[error("invalid size: {0}")] InvalidSize(VarInt), + /// A required extension was not offered. + #[error("required extension not offered: {0:?}")] + RequiredExtension(VarInt), + /// An unclassified error because I'm lazy. TODO classify these errors #[error("unknown error: {0}")] Unknown(String), @@ -66,6 +70,7 @@ impl MoqError for SessionError { Self::Decode(_) => 500, Self::InvalidPriority(_) => 400, Self::InvalidSize(_) => 400, + Self::RequiredExtension(_) => 426, } } @@ -90,6 +95,7 @@ impl MoqError for SessionError { Self::StreamMapping => "streaming mapping conflict".to_owned(), Self::InvalidPriority(priority) => format!("invalid priority: {}", priority), Self::InvalidSize(size) => format!("invalid size: {}", size), + Self::RequiredExtension(id) => format!("required extension was missing: {:?}", id), } } } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index eb926f7..05cb462 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -4,7 +4,7 @@ use std::{ }; use tokio::task::AbortHandle; -use webtransport_quinn::{RecvStream, SendStream, Session}; +use webtransport_quinn::Session; use crate::{ cache::{broadcast, segment, track, CacheError}, @@ -27,13 +27,11 @@ pub struct Publisher { } impl Publisher { - pub(crate) fn new(webtransport: Session, control: (SendStream, RecvStream), source: broadcast::Subscriber) -> Self { - let control = Control::new(control.0, control.1); - + pub(crate) fn new(webtransport: Session, control: Control, source: broadcast::Subscriber) -> Self { Self { webtransport, - subscribes: Default::default(), control, + subscribes: Default::default(), source, } } @@ -140,7 +138,8 @@ impl Publisher { fn start_subscribe(&mut self, msg: message::Subscribe) -> Result { // We currently don't use the namespace field in SUBSCRIBE - if !msg.namespace.is_empty() { + // Make sure the namespace is empty if it's provided. + if msg.namespace.as_ref().map_or(false, |namespace| !namespace.is_empty()) { return Err(CacheError::NotFound.into()); } @@ -209,7 +208,7 @@ impl Publisher { }; object - .encode(&mut stream) + .encode(&mut stream, &self.control.ext) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; diff --git a/moq-transport/src/session/server.rs b/moq-transport/src/session/server.rs index 0bd6137..b41871f 100644 --- a/moq-transport/src/session/server.rs +++ b/moq-transport/src/session/server.rs @@ -1,4 +1,4 @@ -use super::{Publisher, SessionError, Subscriber}; +use super::{Control, Publisher, SessionError, Subscriber}; use crate::{cache::broadcast, setup}; use webtransport_quinn::{RecvStream, SendStream, Session}; @@ -13,13 +13,32 @@ impl Server { pub async fn accept(session: Session) -> Result { let mut control = session.accept_bi().await?; - let client = setup::Client::decode(&mut control.1).await?; + let mut client = setup::Client::decode(&mut control.1).await?; - client - .versions - .iter() - .find(|version| **version == setup::Version::KIXEL_01) - .ok_or_else(|| SessionError::Version(client.versions.clone(), vec![setup::Version::KIXEL_01].into()))?; + if client.versions.contains(&setup::Version::DRAFT_01) { + // We always require subscriber ID. + client.extensions.require_subscriber_id()?; + + // We require OBJECT_EXPIRES for publishers only. + if client.role.is_publisher() { + client.extensions.require_object_expires()?; + } + + // We don't require SUBSCRIBE_SPLIT since it's easy enough to support, but it's clearly an oversight. + // client.extensions.require(&Extension::SUBSCRIBE_SPLIT)?; + } else if client.versions.contains(&setup::Version::KIXEL_01) { + // Extensions didn't exist in KIXEL_01, so we set them manually. + client.extensions = setup::Extensions { + object_expires: true, + subscriber_id: true, + subscribe_split: true, + }; + } else { + return Err(SessionError::Version( + client.versions, + [setup::Version::DRAFT_01, setup::Version::KIXEL_01].into(), + )); + } Ok(Request { session, @@ -39,17 +58,21 @@ pub struct Request { impl Request { /// Accept the session as a publisher, using the provided broadcast to serve subscriptions. pub async fn publisher(mut self, source: broadcast::Subscriber) -> Result { - self.send_setup(setup::Role::Publisher).await?; + let setup = self.setup(setup::Role::Publisher)?; + setup.encode(&mut self.control.0).await?; - let publisher = Publisher::new(self.session, self.control, source); + let control = Control::new(self.control.0, self.control.1, setup.extensions); + let publisher = Publisher::new(self.session, control, source); Ok(publisher) } /// Accept the session as a subscriber only. pub async fn subscriber(mut self, source: broadcast::Publisher) -> Result { - self.send_setup(setup::Role::Subscriber).await?; + let setup = self.setup(setup::Role::Subscriber)?; + setup.encode(&mut self.control.0).await?; - let subscriber = Subscriber::new(self.session, self.control, source); + let control = Control::new(self.control.0, self.control.1, setup.extensions); + let subscriber = Subscriber::new(self.session, control, source); Ok(subscriber) } @@ -60,10 +83,11 @@ impl Request { } */ - async fn send_setup(&mut self, role: setup::Role) -> Result<(), SessionError> { + fn setup(&mut self, role: setup::Role) -> Result { let server = setup::Server { role, - version: setup::Version::KIXEL_01, + version: setup::Version::DRAFT_01, + extensions: self.client.extensions.clone(), params: Default::default(), }; @@ -73,9 +97,7 @@ impl Request { return Err(SessionError::RoleIncompatible(self.client.role, server.role)); } - server.encode(&mut self.control.0).await?; - - Ok(()) + Ok(server) } /// Reject the request, closing the Webtransport session. diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 601bd2f..a538dcf 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -1,4 +1,4 @@ -use webtransport_quinn::{RecvStream, SendStream, Session}; +use webtransport_quinn::{RecvStream, Session}; use std::{ collections::HashMap, @@ -35,9 +35,7 @@ pub struct Subscriber { } impl Subscriber { - pub(crate) fn new(webtransport: Session, control: (SendStream, RecvStream), source: broadcast::Publisher) -> Self { - let control = Control::new(control.0, control.1); - + pub(crate) fn new(webtransport: Session, control: Control, source: broadcast::Publisher) -> Self { Self { webtransport, subscribes: Default::default(), @@ -108,7 +106,7 @@ impl Subscriber { async fn run_stream(self, mut stream: RecvStream) -> Result<(), SessionError> { // Decode the object on the data stream. - let mut object = message::Object::decode(&mut stream) + let mut object = message::Object::decode(&mut stream, &self.control.ext) .await .map_err(|e| SessionError::Unknown(e.to_string()))?; @@ -137,7 +135,7 @@ impl Subscriber { loop { if let Some(0) = remain { // Decode the next object from the stream. - let next = match message::Object::decode(&mut stream).await { + let next = match message::Object::decode(&mut stream, &self.control.ext).await { Ok(next) => next, // No more objects @@ -191,7 +189,7 @@ impl Subscriber { let msg = message::Subscribe { id, - namespace: "".to_string(), + namespace: self.control.ext.subscribe_split.then(|| "".to_string()), name, // TODO correctly support these diff --git a/moq-transport/src/setup/client.rs b/moq-transport/src/setup/client.rs index 756ded8..a18eb7d 100644 --- a/moq-transport/src/setup/client.rs +++ b/moq-transport/src/setup/client.rs @@ -1,4 +1,4 @@ -use super::{Role, Versions}; +use super::{Extensions, Role, Versions}; use crate::{ coding::{Decode, DecodeError, Encode, EncodeError, Params}, VarInt, @@ -17,6 +17,9 @@ pub struct Client { /// Indicate if the client is a publisher, a subscriber, or both. pub role: Role, + /// A list of known/offered extensions. + pub extensions: Extensions, + /// Unknown parameters. pub params: Params, } @@ -43,7 +46,14 @@ impl Client { return Err(DecodeError::InvalidParameter); } - Ok(Self { versions, role, params }) + let extensions = Extensions::load(&mut params).await?; + + Ok(Self { + versions, + role, + extensions, + params, + }) } /// Encode a server setup message. @@ -53,6 +63,8 @@ impl Client { let mut params = self.params.clone(); params.set(VarInt::from_u32(0), self.role).await?; + self.extensions.store(&mut params).await?; + params.encode(w).await?; Ok(()) diff --git a/moq-transport/src/setup/extension.rs b/moq-transport/src/setup/extension.rs new file mode 100644 index 0000000..9e8c8cc --- /dev/null +++ b/moq-transport/src/setup/extension.rs @@ -0,0 +1,84 @@ +use tokio::io::{AsyncRead, AsyncWrite}; + +use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params}; +use crate::session::SessionError; +use crate::VarInt; +use paste::paste; + +/// This is a custom extension scheme to allow/require draft PRs. +/// +/// By convention, the extension number is the PR number + 0xe0000. + +macro_rules! extensions { + {$($name:ident = $val:expr,)*} => { + #[derive(Clone, Default, Debug)] + pub struct Extensions { + $( + pub $name: bool, + )* + } + + impl Extensions { + pub async fn load(params: &mut Params) -> Result { + let mut extensions = Self::default(); + + $( + if let Some(_) = params.get::(VarInt::from_u32($val)).await? { + extensions.$name = true + } + )* + + Ok(extensions) + } + + pub async fn store(&self, params: &mut Params) -> Result<(), EncodeError> { + $( + if self.$name { + params.set(VarInt::from_u32($val), ExtensionExists{}).await?; + } + )* + + Ok(()) + } + + paste! { + $( + pub fn [](&self) -> Result<(), SessionError> { + match self.$name { + true => Ok(()), + false => Err(SessionError::RequiredExtension(VarInt::from_u32($val))), + } + } + )* + } + } + } +} + +struct ExtensionExists; + +#[async_trait::async_trait] +impl Decode for ExtensionExists { + async fn decode(_r: &mut R) -> Result { + Ok(ExtensionExists {}) + } +} + +#[async_trait::async_trait] +impl Encode for ExtensionExists { + async fn encode(&self, _w: &mut W) -> Result<(), EncodeError> { + Ok(()) + } +} + +extensions! { + // required for publishers: OBJECT contains expires VarInt in seconds: https://github.com/moq-wg/moq-transport/issues/249 + // TODO write up a PR + object_expires = 0xe00f9, + + // required: SUBSCRIBE chooses track ID: https://github.com/moq-wg/moq-transport/pull/258 + subscriber_id = 0xe0102, + + // optional: SUBSCRIBE contains namespace/name tuple: https://github.com/moq-wg/moq-transport/pull/277 + subscribe_split = 0xe0115, +} diff --git a/moq-transport/src/setup/mod.rs b/moq-transport/src/setup/mod.rs index e5c59c8..e7662e7 100644 --- a/moq-transport/src/setup/mod.rs +++ b/moq-transport/src/setup/mod.rs @@ -5,11 +5,13 @@ //! Both sides negotate the [Version] and [Role]. mod client; +mod extension; mod role; mod server; mod version; pub use client::*; +pub use extension::*; pub use role::*; pub use server::*; pub use version::*; diff --git a/moq-transport/src/setup/server.rs b/moq-transport/src/setup/server.rs index aca9975..7f73119 100644 --- a/moq-transport/src/setup/server.rs +++ b/moq-transport/src/setup/server.rs @@ -1,4 +1,4 @@ -use super::{Role, Version}; +use super::{Extensions, Role, Version}; use crate::{ coding::{Decode, DecodeError, Encode, EncodeError, Params}, VarInt, @@ -18,6 +18,9 @@ pub struct Server { // Proposal: moq-wg/moq-transport#151 pub role: Role, + /// Custom extensions. + pub extensions: Extensions, + /// Unknown parameters. pub params: Params, } @@ -43,7 +46,14 @@ impl Server { return Err(DecodeError::InvalidParameter); } - Ok(Self { version, role, params }) + let extensions = Extensions::load(&mut params).await?; + + Ok(Self { + version, + role, + extensions, + params, + }) } /// Encode the server setup. @@ -53,6 +63,7 @@ impl Server { let mut params = self.params.clone(); params.set(VarInt::from_u32(0), self.role).await?; + self.extensions.store(&mut params).await?; params.encode(w).await?; Ok(()) diff --git a/moq-transport/src/setup/version.rs b/moq-transport/src/setup/version.rs index e67fe84..5f7dca5 100644 --- a/moq-transport/src/setup/version.rs +++ b/moq-transport/src/setup/version.rs @@ -9,9 +9,12 @@ use std::ops::Deref; pub struct Version(pub VarInt); impl Version { - /// + /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-00.html pub const DRAFT_00: Version = Version(VarInt::from_u32(0xff00)); + /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-01.html + pub const DRAFT_01: Version = Version(VarInt::from_u32(0xff01)); + /// Fork of draft-ietf-moq-transport-00. /// /// Rough list of differences: @@ -60,13 +63,12 @@ impl Version { /// Fork of draft-ietf-moq-transport-01. /// /// Most of the KIXEL_00 changes made it into the draft, or were reverted. - /// Check out the referenced issue on: github.com/moq-wg/moq-transport + /// This was only used for a short time until extensions were created. /// /// - SUBSCRIBE contains a separate track namespace and track name field (accidental revert). [#277](https://github.com/moq-wg/moq-transport/pull/277) /// - SUBSCRIBE contains the `track_id` instead of SUBSCRIBE_OK. [#145](https://github.com/moq-wg/moq-transport/issues/145) /// - SUBSCRIBE_* reference `track_id` the instead of the `track_full_name`. [#145](https://github.com/moq-wg/moq-transport/issues/145) /// - OBJECT `priority` is still a VarInt, but the max value is a u32 (implementation reasons) - /// - OBJECT `expires` was added, a VarInt in seconds. [#249](https://github.com/moq-wg/moq-transport/issues/249) /// - OBJECT messages within the same `group` MUST be on the same QUIC stream. pub const KIXEL_01: Version = Version(VarInt::from_u32(0xbad01)); } @@ -145,3 +147,9 @@ impl From> for Versions { Self(vs) } } + +impl From<[Version; N]> for Versions { + fn from(vs: [Version; N]) -> Self { + Self(vs.to_vec()) + } +}