From 5c3f7940534b072140f3cae3a471ffa2233cf991 Mon Sep 17 00:00:00 2001 From: kixelated Date: Wed, 23 Aug 2023 15:28:27 -0700 Subject: [PATCH] A few minor changes to the API. (#52) The only salvagable remains from a multi-day refactoring effort. The main benefit is that Setup messages are no longer part of the Message enum, so match will be a lot easier. --- moq-transport/src/lib.rs | 1 + moq-transport/src/message/mod.rs | 9 +-- moq-transport/src/object/header.rs | 54 ------------------ moq-transport/src/object/mod.rs | 57 ++++++++++++++++++- moq-transport/src/object/receiver.rs | 10 ++-- moq-transport/src/object/sender.rs | 8 +-- .../src/{session/mod.rs => session.rs} | 36 ++++++------ moq-transport/src/setup/client.rs | 11 +++- moq-transport/src/setup/server.rs | 11 +++- moq-warp/src/relay/contribute.rs | 10 ++-- moq-warp/src/relay/distribute.rs | 4 +- 11 files changed, 110 insertions(+), 101 deletions(-) delete mode 100644 moq-transport/src/object/header.rs rename moq-transport/src/{session/mod.rs => session.rs} (68%) diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index 655a9d2..cda369a 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -6,4 +6,5 @@ pub mod setup; pub use coding::VarInt; pub use message::Message; +pub use object::Object; pub use session::Session; diff --git a/moq-transport/src/message/mod.rs b/moq-transport/src/message/mod.rs index e142eb1..5281d04 100644 --- a/moq-transport/src/message/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -19,7 +19,6 @@ pub use subscribe_error::*; pub use subscribe_ok::*; use crate::coding::{DecodeError, EncodeError, VarInt}; -use crate::setup; use std::fmt; @@ -81,16 +80,12 @@ macro_rules! message_types { } } -// Just so we can use the macro above. -type SetupClient = setup::Client; -type SetupServer = setup::Server; - // Each message is prefixed with the given VarInt type. message_types! { // NOTE: Object and Setup are in other modules. // Object = 0x0 - SetupClient = 0x1, - SetupServer = 0x2, + // SetupClient = 0x1 + // SetupServer = 0x2 Subscribe = 0x3, SubscribeOk = 0x4, SubscribeError = 0x5, diff --git a/moq-transport/src/object/header.rs b/moq-transport/src/object/header.rs deleted file mode 100644 index 9310b4c..0000000 --- a/moq-transport/src/object/header.rs +++ /dev/null @@ -1,54 +0,0 @@ -use crate::coding::{DecodeError, EncodeError, VarInt}; - -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use webtransport_generic::{RecvStream, SendStream}; - -#[derive(Debug)] -pub struct Header { - // An ID for this track. - // Proposal: https://github.com/moq-wg/moq-transport/issues/209 - pub track: VarInt, - - // The group sequence number. - pub group: VarInt, - - // The object sequence number. - pub sequence: VarInt, - - // The priority/send order. - // Proposal: int32 instead of a varint. - pub send_order: i32, -} - -impl Header { - pub async fn decode(r: &mut R) -> Result { - let typ = VarInt::decode(r).await?; - if typ.into_inner() != 0 { - return Err(DecodeError::InvalidType(typ)); - } - - // NOTE: size has been omitted - - let track = VarInt::decode(r).await?; - let group = VarInt::decode(r).await?; - let sequence = VarInt::decode(r).await?; - let send_order = r.read_i32().await?; // big-endian - - Ok(Self { - track, - group, - sequence, - send_order, - }) - } - - pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - VarInt::from_u32(0).encode(w).await?; - self.track.encode(w).await?; - self.group.encode(w).await?; - self.sequence.encode(w).await?; - w.write_i32(self.send_order).await?; - - Ok(()) - } -} diff --git a/moq-transport/src/object/mod.rs b/moq-transport/src/object/mod.rs index 56dace8..b70bfcd 100644 --- a/moq-transport/src/object/mod.rs +++ b/moq-transport/src/object/mod.rs @@ -1,7 +1,60 @@ -mod header; mod receiver; mod sender; -pub use header::*; pub use receiver::*; pub use sender::*; + +use crate::coding::{DecodeError, EncodeError, VarInt}; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use webtransport_generic::{RecvStream, SendStream}; + +#[derive(Debug)] +pub struct Object { + // An ID for this track. + // Proposal: https://github.com/moq-wg/moq-transport/issues/209 + pub track: VarInt, + + // The group sequence number. + pub group: VarInt, + + // The object sequence number. + pub sequence: VarInt, + + // The priority/send order. + // Proposal: int32 instead of a varint. + pub send_order: i32, +} + +impl Object { + pub async fn decode(r: &mut R) -> Result { + let typ = VarInt::decode(r).await?; + if typ.into_inner() != 0 { + return Err(DecodeError::InvalidType(typ)); + } + + // NOTE: size has been omitted + + let track = VarInt::decode(r).await?; + let group = VarInt::decode(r).await?; + let sequence = VarInt::decode(r).await?; + let send_order = r.read_i32().await?; // big-endian + + Ok(Self { + track, + group, + sequence, + send_order, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + VarInt::from_u32(0).encode(w).await?; + self.track.encode(w).await?; + self.group.encode(w).await?; + self.sequence.encode(w).await?; + w.write_i32(self.send_order).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/object/receiver.rs b/moq-transport/src/object/receiver.rs index ad1eb14..521efb5 100644 --- a/moq-transport/src/object/receiver.rs +++ b/moq-transport/src/object/receiver.rs @@ -1,4 +1,4 @@ -use crate::object::Header; +use crate::Object; use anyhow::Context; @@ -10,7 +10,7 @@ pub struct Receiver { session: S, // Streams that we've accepted but haven't read the header from yet. - streams: JoinSet>, + streams: JoinSet>, } impl Receiver { @@ -21,7 +21,7 @@ impl Receiver { } } - pub async fn recv(&mut self) -> anyhow::Result<(Header, S::RecvStream)> { + pub async fn recv(&mut self) -> anyhow::Result<(Object, S::RecvStream)> { loop { tokio::select! { res = self.session.accept_uni() => { @@ -35,8 +35,8 @@ impl Receiver { } } - async fn read(mut stream: S::RecvStream) -> anyhow::Result<(Header, S::RecvStream)> { - let header = Header::decode(&mut stream).await?; + async fn read(mut stream: S::RecvStream) -> anyhow::Result<(Object, S::RecvStream)> { + let header = Object::decode(&mut stream).await?; Ok((header, stream)) } } diff --git a/moq-transport/src/object/sender.rs b/moq-transport/src/object/sender.rs index 4db881b..d075658 100644 --- a/moq-transport/src/object/sender.rs +++ b/moq-transport/src/object/sender.rs @@ -1,6 +1,6 @@ use anyhow::Context; -use crate::object::Header; +use crate::Object; use webtransport_generic::{SendStream, Session}; @@ -16,11 +16,11 @@ impl Sender { Self { session } } - pub async fn open(&mut self, header: Header) -> anyhow::Result { + pub async fn open(&mut self, object: Object) -> anyhow::Result { let mut stream = self.session.open_uni().await.context("failed to open uni stream")?; - stream.set_priority(header.send_order); - header.encode(&mut stream).await.context("failed to write header")?; + stream.set_priority(object.send_order); + object.encode(&mut stream).await.context("failed to write header")?; // log::info!("created stream: {:?}", header); diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session.rs similarity index 68% rename from moq-transport/src/session/mod.rs rename to moq-transport/src/session.rs index 2e7587e..a5760e6 100644 --- a/moq-transport/src/session/mod.rs +++ b/moq-transport/src/session.rs @@ -14,15 +14,11 @@ impl Session { /// Called by a server with an established WebTransport session. // TODO close the session with an error code pub async fn accept(session: S, role: setup::Role) -> anyhow::Result { - let (send, recv) = session.accept_bi().await.context("failed to accept bidi stream")?; + let (mut send, mut recv) = session.accept_bi().await.context("failed to accept bidi stream")?; - let mut send_control = message::Sender::new(send); - let mut recv_control = message::Receiver::new(recv); - - let setup_client = match recv_control.recv().await.context("failed to read SETUP")? { - message::Message::SetupClient(setup) => setup, - _ => anyhow::bail!("expected CLIENT SETUP"), - }; + let setup_client = setup::Client::decode(&mut recv) + .await + .context("failed to read CLIENT SETUP")?; setup_client .versions @@ -39,11 +35,14 @@ impl Session { version: setup::Version::DRAFT_00, }; - send_control - .send(message::Message::SetupServer(setup_server)) + setup_server + .encode(&mut send) .await .context("failed to send setup server")?; + let send_control = message::Sender::new(send); + let recv_control = message::Receiver::new(recv); + let send_objects = object::Sender::new(session.clone()); let recv_objects = object::Receiver::new(session.clone()); @@ -57,10 +56,7 @@ impl Session { /// Called by a client with an established WebTransport session. pub async fn connect(session: S, role: setup::Role) -> anyhow::Result { - let (send, recv) = session.open_bi().await.context("failed to oen bidi stream")?; - - let mut send_control = message::Sender::new(send); - let mut recv_control = message::Receiver::new(recv); + let (mut send, mut recv) = session.open_bi().await.context("failed to oen bidi stream")?; let setup_client = setup::Client { role, @@ -68,15 +64,12 @@ impl Session { path: "".to_string(), }; - send_control - .send(message::Message::SetupClient(setup_client)) + setup_client + .encode(&mut send) .await .context("failed to send SETUP CLIENT")?; - let setup_server = match recv_control.recv().await.context("failed to read SETUP")? { - message::Message::SetupServer(setup) => setup, - _ => anyhow::bail!("expected SERVER SETUP"), - }; + let setup_server = setup::Server::decode(&mut recv).await.context("failed to read SETUP")?; if setup_server.version != setup::Version::DRAFT_00 { anyhow::bail!("unsupported version: {:?}", setup_server.version); @@ -86,6 +79,9 @@ impl Session { anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role); } + let send_control = message::Sender::new(send); + let recv_control = message::Receiver::new(recv); + let send_objects = object::Sender::new(session.clone()); let recv_objects = object::Receiver::new(session.clone()); diff --git a/moq-transport/src/setup/client.rs b/moq-transport/src/setup/client.rs index 56bdb27..6648c66 100644 --- a/moq-transport/src/setup/client.rs +++ b/moq-transport/src/setup/client.rs @@ -1,5 +1,8 @@ use super::{Role, Versions}; -use crate::coding::{decode_string, encode_string, DecodeError, EncodeError}; +use crate::{ + coding::{decode_string, encode_string, DecodeError, EncodeError}, + VarInt, +}; use webtransport_generic::{RecvStream, SendStream}; @@ -22,6 +25,11 @@ pub struct Client { impl Client { pub async fn decode(r: &mut R) -> Result { + let typ = VarInt::decode(r).await?; + if typ.into_inner() != 1 { + return Err(DecodeError::InvalidType(typ)); + } + let versions = Versions::decode(r).await?; let role = Role::decode(r).await?; let path = decode_string(r).await?; @@ -30,6 +38,7 @@ impl Client { } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + VarInt::from_u32(1).encode(w).await?; self.versions.encode(w).await?; self.role.encode(w).await?; encode_string(&self.path, w).await?; diff --git a/moq-transport/src/setup/server.rs b/moq-transport/src/setup/server.rs index 6ad6bee..85ebacc 100644 --- a/moq-transport/src/setup/server.rs +++ b/moq-transport/src/setup/server.rs @@ -1,5 +1,8 @@ use super::{Role, Version}; -use crate::coding::{DecodeError, EncodeError}; +use crate::{ + coding::{DecodeError, EncodeError}, + VarInt, +}; use webtransport_generic::{RecvStream, SendStream}; @@ -18,6 +21,11 @@ pub struct Server { impl Server { pub async fn decode(r: &mut R) -> Result { + let typ = VarInt::decode(r).await?; + if typ.into_inner() != 2 { + return Err(DecodeError::InvalidType(typ)); + } + let version = Version::decode(r).await?; let role = Role::decode(r).await?; @@ -25,6 +33,7 @@ impl Server { } pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + VarInt::from_u32(2).encode(w).await?; self.version.encode(w).await?; self.role.encode(w).await?; diff --git a/moq-warp/src/relay/contribute.rs b/moq-warp/src/relay/contribute.rs index 1b0c07d..c36eb0e 100644 --- a/moq-warp/src/relay/contribute.rs +++ b/moq-warp/src/relay/contribute.rs @@ -7,7 +7,7 @@ use tokio::sync::mpsc; use tokio::task::JoinSet; // lock across await boundaries use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk}; -use moq_transport::{object, VarInt}; +use moq_transport::{object, Object, VarInt}; use webtransport_generic::Session as WTSession; use bytes::BytesMut; @@ -89,15 +89,15 @@ impl Session { } } - async fn receive_object(&mut self, header: object::Header, stream: S::RecvStream) -> anyhow::Result<()> { - let track = header.track; + async fn receive_object(&mut self, obj: Object, stream: S::RecvStream) -> anyhow::Result<()> { + let track = obj.track; // Keep objects in memory for 10s let expires = time::Instant::now() + time::Duration::from_secs(10); let segment = segment::Info { - sequence: header.sequence, - send_order: header.send_order, + sequence: obj.sequence, + send_order: obj.send_order, expires: Some(expires), }; diff --git a/moq-warp/src/relay/distribute.rs b/moq-warp/src/relay/distribute.rs index ed7235d..f1c275a 100644 --- a/moq-warp/src/relay/distribute.rs +++ b/moq-warp/src/relay/distribute.rs @@ -4,7 +4,7 @@ use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; // allows locking across await use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk}; -use moq_transport::{object, VarInt}; +use moq_transport::{object, Object, VarInt}; use webtransport_generic::Session as WTSession; use crate::model::{segment, track}; @@ -163,7 +163,7 @@ impl Session { track_id: VarInt, mut segment: segment::Subscriber, ) -> anyhow::Result<()> { - let object = object::Header { + let object = Object { track: track_id, group: segment.sequence, sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object