From 7c3eae0a7a7f0f0cdf9180467502423cc3c70b1a Mon Sep 17 00:00:00 2001 From: kixelated Date: Sat, 8 Jul 2023 09:13:29 -0700 Subject: [PATCH] Make moq-transport generic. (#41) The API is now synchronous and any quinn stuff has been moved to another package. The quinn stuff will be slowly moved into moq-transport with generic traits. --- Cargo.lock | 24 ++- Cargo.toml | 1 + moq-demo/src/main.rs | 2 +- moq-transport-quinn/Cargo.toml | 31 +++ moq-transport-quinn/src/control.rs | 122 ++++++++++++ moq-transport-quinn/src/lib.rs | 7 + moq-transport-quinn/src/object.rs | 136 +++++++++++++ moq-transport-quinn/src/server.rs | 179 ++++++++++++++++++ moq-transport/Cargo.toml | 13 +- moq-transport/src/coding/decode.rs | 80 +++++--- moq-transport/src/coding/duration.rs | 15 +- moq-transport/src/coding/encode.rs | 94 +++++++-- moq-transport/src/coding/varint.rs | 51 +++-- moq-transport/src/control/announce.rs | 15 +- moq-transport/src/control/announce_error.rs | 23 +-- moq-transport/src/control/announce_ok.rs | 15 +- moq-transport/src/control/go_away.rs | 15 +- moq-transport/src/control/mod.rs | 70 ++++--- moq-transport/src/{setup => control}/role.rs | 29 ++- moq-transport/src/control/setup_client.rs | 41 ++++ moq-transport/src/control/setup_server.rs | 35 ++++ moq-transport/src/control/stream.rs | 79 -------- moq-transport/src/control/subscribe.rs | 23 +-- moq-transport/src/control/subscribe_error.rs | 23 +-- moq-transport/src/control/subscribe_ok.rs | 19 +- .../src/{setup => control}/version.rs | 31 ++- moq-transport/src/lib.rs | 12 +- moq-transport/src/object.rs | 54 ++++++ moq-transport/src/object/header.rs | 56 ------ moq-transport/src/object/mod.rs | 5 - moq-transport/src/object/recv.rs | 26 --- moq-transport/src/object/send.rs | 24 --- moq-transport/src/object/session.rs | 35 ---- moq-transport/src/object/transport.rs | 51 ----- moq-transport/src/server/endpoint.rs | 42 ---- moq-transport/src/server/handshake.rs | 114 ----------- moq-transport/src/server/mod.rs | 6 - moq-transport/src/server/setup.rs | 42 ---- moq-transport/src/setup/client.rs | 54 ------ moq-transport/src/setup/mod.rs | 15 -- moq-transport/src/setup/server.rs | 44 ----- moq-warp/Cargo.toml | 3 +- moq-warp/src/relay/contribute.rs | 45 +++-- moq-warp/src/relay/control.rs | 50 ++--- moq-warp/src/relay/distribute.rs | 60 +++--- moq-warp/src/relay/server.rs | 6 +- moq-warp/src/relay/session.rs | 32 ++-- 47 files changed, 1001 insertions(+), 948 deletions(-) create mode 100644 moq-transport-quinn/Cargo.toml create mode 100644 moq-transport-quinn/src/control.rs create mode 100644 moq-transport-quinn/src/lib.rs create mode 100644 moq-transport-quinn/src/object.rs create mode 100644 moq-transport-quinn/src/server.rs rename moq-transport/src/{setup => control}/role.rs (55%) create mode 100644 moq-transport/src/control/setup_client.rs create mode 100644 moq-transport/src/control/setup_server.rs delete mode 100644 moq-transport/src/control/stream.rs rename moq-transport/src/{setup => control}/version.rs (55%) create mode 100644 moq-transport/src/object.rs delete mode 100644 moq-transport/src/object/header.rs delete mode 100644 moq-transport/src/object/mod.rs delete mode 100644 moq-transport/src/object/recv.rs delete mode 100644 moq-transport/src/object/send.rs delete mode 100644 moq-transport/src/object/session.rs delete mode 100644 moq-transport/src/object/transport.rs delete mode 100644 moq-transport/src/server/endpoint.rs delete mode 100644 moq-transport/src/server/handshake.rs delete mode 100644 moq-transport/src/server/mod.rs delete mode 100644 moq-transport/src/server/setup.rs delete mode 100644 moq-transport/src/setup/client.rs delete mode 100644 moq-transport/src/setup/mod.rs delete mode 100644 moq-transport/src/setup/server.rs diff --git a/Cargo.lock b/Cargo.lock index aa74566..0a746fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,17 +66,6 @@ version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" -[[package]] -name = "async-trait" -version = "0.1.68" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "atty" version = "0.2.14" @@ -763,15 +752,24 @@ dependencies = [ [[package]] name = "moq-transport" version = "0.1.0" +dependencies = [ + "bytes", + "log", + "thiserror", +] + +[[package]] +name = "moq-transport-quinn" +version = "0.1.0" dependencies = [ "anyhow", - "async-trait", "bytes", "h3", "h3-quinn", "h3-webtransport", "http", "log", + "moq-transport", "quinn", "thiserror", "tokio", @@ -782,9 +780,9 @@ name = "moq-warp" version = "0.1.0" dependencies = [ "anyhow", - "async-trait", "log", "moq-transport", + "moq-transport-quinn", "mp4", "quinn", "ring", diff --git a/Cargo.toml b/Cargo.toml index 2f6f7f7..20c5996 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "moq-transport", + "moq-transport-quinn", "moq-demo", "moq-warp", ] diff --git a/moq-demo/src/main.rs b/moq-demo/src/main.rs index 7f9c7bd..2c1fdcc 100644 --- a/moq-demo/src/main.rs +++ b/moq-demo/src/main.rs @@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> { let broker = relay::broker::Broadcasts::new(); broker - .announce("demo", media.source()) + .announce("quic.video/demo", media.source()) .context("failed to announce file source")?; // Create a server to actually serve the media diff --git a/moq-transport-quinn/Cargo.toml b/moq-transport-quinn/Cargo.toml new file mode 100644 index 0000000..b525beb --- /dev/null +++ b/moq-transport-quinn/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "moq-transport-quinn" +description = "Media over QUIC" +authors = [ "Luke Curley" ] +repository = "https://github.com/kixelated/moq-rs" +license = "MIT OR Apache-2.0" + +version = "0.1.0" +edition = "2021" + +keywords = [ "quic", "http3", "webtransport", "media", "live" ] +categories = [ "multimedia", "network-programming", "web-programming" ] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +moq-transport = { path = "../moq-transport" } + +# WebTransport support: TODO pin a version when released +h3 = { git = "https://github.com/hyperium/h3", branch = "master" } +h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" } +h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" } +quinn = "0.10" +http = "0.2" + +tokio = { version = "1.27", features = ["macros"] } +bytes = "1" + +log = "0.4" +anyhow = "1.0.70" +thiserror = "1.0.21" diff --git a/moq-transport-quinn/src/control.rs b/moq-transport-quinn/src/control.rs new file mode 100644 index 0000000..d4fd823 --- /dev/null +++ b/moq-transport-quinn/src/control.rs @@ -0,0 +1,122 @@ +use moq_transport::{Decode, DecodeError, Encode, Message}; + +use bytes::{Buf, Bytes, BytesMut}; + +use h3::quic::BidiStream; +use std::io::Cursor; +use std::sync::Arc; +use tokio::sync::Mutex; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +pub struct Control { + sender: ControlSend, + recver: ControlRecv, +} + +impl Control { + pub(crate) fn new(stream: h3_webtransport::stream::BidiStream, Bytes>) -> Self { + let (sender, recver) = stream.split(); + let sender = ControlSend::new(sender); + let recver = ControlRecv::new(recver); + + Self { sender, recver } + } + + pub fn split(self) -> (ControlSend, ControlRecv) { + (self.sender, self.recver) + } + + pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { + self.sender.send(msg).await + } + + pub async fn recv(&mut self) -> anyhow::Result { + self.recver.recv().await + } +} + +pub struct ControlSend { + stream: h3_webtransport::stream::SendStream, Bytes>, + buf: BytesMut, // reuse a buffer to encode messages. +} + +impl ControlSend { + pub fn new(inner: h3_webtransport::stream::SendStream, Bytes>) -> Self { + Self { + buf: BytesMut::new(), + stream: inner, + } + } + + pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { + let msg = msg.into(); + log::info!("sending message: {:?}", msg); + + self.buf.clear(); + msg.encode(&mut self.buf)?; + + // TODO make this work with select! + self.stream.write_all(&self.buf).await?; + + Ok(()) + } + + // Helper that lets multiple threads send control messages. + pub fn share(self) -> ControlShared { + ControlShared { + stream: Arc::new(Mutex::new(self)), + } + } +} + +// Helper that allows multiple threads to send control messages. +// There's no equivalent for receiving since only one thread should be receiving at a time. +#[derive(Clone)] +pub struct ControlShared { + stream: Arc>, +} + +impl ControlShared { + pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { + let mut stream = self.stream.lock().await; + stream.send(msg).await + } +} + +pub struct ControlRecv { + stream: h3_webtransport::stream::RecvStream, + buf: BytesMut, // data we've read but haven't fully decoded yet +} + +impl ControlRecv { + pub fn new(inner: h3_webtransport::stream::RecvStream) -> Self { + Self { + buf: BytesMut::new(), + stream: inner, + } + } + + // Read the next full message from the stream. + pub async fn recv(&mut self) -> anyhow::Result { + loop { + // Read the contents of the buffer + let mut peek = Cursor::new(&self.buf); + + match Message::decode(&mut peek) { + Ok(msg) => { + // We've successfully decoded a message, so we can advance the buffer. + self.buf.advance(peek.position() as usize); + + log::info!("received message: {:?}", msg); + return Ok(msg); + } + Err(DecodeError::UnexpectedEnd) => { + // The decode failed, so we need to append more data. + self.stream.read_buf(&mut self.buf).await?; + } + Err(e) => return Err(e.into()), + } + } + } +} diff --git a/moq-transport-quinn/src/lib.rs b/moq-transport-quinn/src/lib.rs new file mode 100644 index 0000000..c07ce03 --- /dev/null +++ b/moq-transport-quinn/src/lib.rs @@ -0,0 +1,7 @@ +mod control; +mod object; +mod server; + +pub use control::*; +pub use object::*; +pub use server::*; diff --git a/moq-transport-quinn/src/object.rs b/moq-transport-quinn/src/object.rs new file mode 100644 index 0000000..6f64a50 --- /dev/null +++ b/moq-transport-quinn/src/object.rs @@ -0,0 +1,136 @@ +use anyhow::Context; +use bytes::{Buf, Bytes, BytesMut}; +use moq_transport::{Decode, DecodeError, Encode, Object}; +use std::{io::Cursor, sync::Arc}; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +// TODO support clients +type WebTransportSession = h3_webtransport::server::WebTransportSession; + +// Reduce some typing +pub type SendStream = h3_webtransport::stream::SendStream, Bytes>; +pub type RecvStream = h3_webtransport::stream::RecvStream; + +pub struct Objects { + send: SendObjects, + recv: RecvObjects, +} + +impl Objects { + pub fn new(session: Arc) -> Self { + let send = SendObjects::new(session.clone()); + let recv = RecvObjects::new(session); + Self { send, recv } + } + + pub fn split(self) -> (SendObjects, RecvObjects) { + (self.send, self.recv) + } + + pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> { + self.recv.recv().await + } + + pub async fn send(&mut self, header: Object) -> anyhow::Result { + self.send.send(header).await + } +} + +pub struct SendObjects { + session: Arc, + + // A reusable buffer for encoding messages. + buf: BytesMut, +} + +impl SendObjects { + pub fn new(session: Arc) -> Self { + Self { + session, + buf: BytesMut::new(), + } + } + + pub async fn send(&mut self, header: Object) -> anyhow::Result { + self.buf.clear(); + header.encode(&mut self.buf).unwrap(); + + let mut stream = self + .session + .open_uni(self.session.session_id()) + .await + .context("failed to open uni stream")?; + + // TODO support select! without making a new stream. + stream.write_all(&self.buf).await?; + + Ok(stream) + } +} + +impl Clone for SendObjects { + fn clone(&self) -> Self { + Self { + session: self.session.clone(), + buf: BytesMut::new(), + } + } +} + +// Not clone, so we don't accidentally have two listners. +pub struct RecvObjects { + session: Arc, + + // A uni stream that's been accepted but not fully read from yet. + stream: Option, + + // Data that we've read but haven't formed a full message yet. + buf: BytesMut, +} + +impl RecvObjects { + pub fn new(session: Arc) -> Self { + Self { + session, + stream: None, + buf: BytesMut::new(), + } + } + + pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> { + // Make sure any state is saved across await boundaries so this works with select! + + let stream = match self.stream.as_mut() { + Some(stream) => stream, + None => { + let (_session_id, stream) = self + .session + .accept_uni() + .await + .context("failed to accept uni stream")? + .context("no uni stream")?; + + self.stream.insert(stream) + } + }; + + loop { + // Read the contents of the buffer + let mut peek = Cursor::new(&self.buf); + + match Object::decode(&mut peek) { + Ok(header) => { + let stream = self.stream.take().unwrap(); + self.buf.advance(peek.position() as usize); + return Ok((header, stream)); + } + Err(DecodeError::UnexpectedEnd) => { + // The decode failed, so we need to append more data. + stream.read_buf(&mut self.buf).await?; + } + Err(e) => return Err(e.into()), + } + } + } +} diff --git a/moq-transport-quinn/src/server.rs b/moq-transport-quinn/src/server.rs new file mode 100644 index 0000000..366036d --- /dev/null +++ b/moq-transport-quinn/src/server.rs @@ -0,0 +1,179 @@ +use std::sync::Arc; + +use anyhow::Context; +use bytes::Bytes; +use tokio::task::JoinSet; + +use moq_transport::{Message, SetupClient, SetupServer}; + +use super::{Control, Objects}; + +pub struct Server { + // The QUIC server, yielding new connections and sessions. + endpoint: quinn::Endpoint, + + // A list of connections that are completing the WebTransport handshake. + handshake: JoinSet>, +} + +impl Server { + pub fn new(endpoint: quinn::Endpoint) -> Self { + let handshake = JoinSet::new(); + Self { endpoint, handshake } + } + + // Accept the next WebTransport session. + pub async fn accept(&mut self) -> anyhow::Result { + loop { + tokio::select!( + // Accept the connection and start the WebTransport handshake. + conn = self.endpoint.accept() => { + let conn = conn.context("failed to accept connection")?; + self.handshake.spawn(async move { + Connecting::new(conn).accept().await + }); + }, + // Return any mostly finished WebTransport handshakes. + res = self.handshake.join_next(), if !self.handshake.is_empty() => { + let res = res.expect("no tasks").expect("task aborted"); + match res { + Ok(session) => return Ok(session), + Err(err) => log::warn!("failed to accept session: {:?}", err), + } + }, + ) + } + } +} + +struct Connecting { + conn: quinn::Connecting, +} + +impl Connecting { + pub fn new(conn: quinn::Connecting) -> Self { + Self { conn } + } + + pub async fn accept(self) -> anyhow::Result { + let conn = self.conn.await.context("failed to accept h3 connection")?; + + let mut conn = h3::server::builder() + .enable_webtransport(true) + .enable_connect(true) + .enable_datagram(true) + .max_webtransport_sessions(1) + .send_grease(true) + .build(h3_quinn::Connection::new(conn)) + .await + .context("failed to create h3 server")?; + + let (req, stream) = conn + .accept() + .await + .context("failed to accept h3 session")? + .context("failed to accept h3 request")?; + + let ext = req.extensions(); + anyhow::ensure!(req.method() == http::Method::CONNECT, "expected CONNECT request"); + anyhow::ensure!( + ext.get::() == Some(&h3::ext::Protocol::WEB_TRANSPORT), + "expected WebTransport CONNECT" + ); + + // Let the application decide if we accept this CONNECT request. + Ok(Connect { conn, req, stream }) + } +} + +// The WebTransport CONNECT has arrived, and we need to decide if we accept it. +pub struct Connect { + // Inspect to decide whether to accept() or reject() the session. + req: http::Request<()>, + + conn: h3::server::Connection, + stream: h3::server::RequestStream, Bytes>, +} + +impl Connect { + // Expose the received URI + pub fn uri(&self) -> &http::Uri { + self.req.uri() + } + + // Accept the WebTransport session. + pub async fn accept(self) -> anyhow::Result { + let session = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?; + let session = Arc::new(session); + + let stream = session + .accept_bi() + .await + .context("failed to accept bidi stream")? + .unwrap(); + + let objects = Objects::new(session.clone()); + + let stream = match stream { + h3_webtransport::server::AcceptedBi::BidiStream(_session_id, stream) => stream, + h3_webtransport::server::AcceptedBi::Request(..) => anyhow::bail!("additional http requests not supported"), + }; + + let mut control = Control::new(stream); + let setup = match control.recv().await.context("failed to read SETUP")? { + Message::SetupClient(setup) => setup, + _ => anyhow::bail!("expected CLIENT SETUP"), + }; + + // Let the application decide if we accept this MoQ session. + Ok(Setup { + setup, + control, + objects, + }) + } + + // Reject the WebTransport session with a HTTP response. + pub async fn reject(mut self, resp: http::Response<()>) -> anyhow::Result<()> { + self.stream.send_response(resp).await?; + Ok(()) + } +} + +pub struct Setup { + setup: SetupClient, + control: Control, + objects: Objects, +} + +impl Setup { + // Return the setup message we received. + pub fn setup(&self) -> &SetupClient { + &self.setup + } + + // Accept the session with our own setup message. + pub async fn accept(mut self, setup: SetupServer) -> anyhow::Result { + self.control.send(setup).await?; + Ok(Session { + control: self.control, + objects: self.objects, + }) + } + + pub async fn reject(self) -> anyhow::Result<()> { + // TODO Close the QUIC connection with an error code. + Ok(()) + } +} + +pub struct Session { + pub control: Control, + pub objects: Objects, +} + +impl Session { + pub fn split(self) -> (Control, Objects) { + (self.control, self.objects) + } +} diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index a1df185..5d1d559 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -15,17 +15,6 @@ categories = [ "multimedia", "network-programming", "web-programming" ] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -# WebTransport support: TODO pin a version when released -h3 = { git = "https://github.com/hyperium/h3", branch = "master" } -h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" } -h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" } -quinn = "0.10" -http = "0.2" - -tokio = { version = "1.27", features = ["macros"] } bytes = "1" - -log = "0.4" -anyhow = "1.0.70" thiserror = "1.0.21" -async-trait = "0.1" +log = "0.4" diff --git a/moq-transport/src/coding/decode.rs b/moq-transport/src/coding/decode.rs index c817e46..4ed04cf 100644 --- a/moq-transport/src/coding/decode.rs +++ b/moq-transport/src/coding/decode.rs @@ -1,40 +1,76 @@ use super::VarInt; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use std::str; -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncReadExt}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum DecodeError { + #[error("unexpected end of buffer")] + UnexpectedEnd, + + #[error("invalid string")] + InvalidString(#[from] str::Utf8Error), + + #[error("invalid type: {0:?}")] + InvalidType(VarInt), + + #[error("unknown error")] + Unknown, +} -#[async_trait] pub trait Decode: Sized { - async fn decode(r: &mut R) -> anyhow::Result; + // Decodes a message, returning UnexpectedEnd if there's not enough bytes in the buffer. + fn decode(r: &mut R) -> Result; } -#[async_trait] impl Decode for Bytes { - async fn decode(r: &mut R) -> anyhow::Result { - Vec::::decode(r).await.map(Bytes::from) - } -} - -#[async_trait] -impl Decode for Vec { - async fn decode(r: &mut R) -> anyhow::Result { - let size = VarInt::decode(r).await?; - - // NOTE: we don't use with_capacity since size is from an untrusted source - let mut buf = Vec::new(); - r.take(size.into()).read_to_end(&mut buf).await?; + fn decode(r: &mut R) -> Result { + let size = VarInt::decode(r)?.into_inner() as usize; + if r.remaining() < size { + return Err(DecodeError::UnexpectedEnd); + } + let buf = r.copy_to_bytes(size); Ok(buf) } } -#[async_trait] +impl Decode for Vec { + fn decode(r: &mut R) -> Result { + Bytes::decode(r).map(|b| b.to_vec()) + } +} + impl Decode for String { - async fn decode(r: &mut R) -> anyhow::Result { - let data = Vec::decode(r).await?; + fn decode(r: &mut R) -> Result { + let data = Vec::decode(r)?; let s = str::from_utf8(&data)?.to_string(); Ok(s) } } + +impl Decode for u8 { + fn decode(r: &mut R) -> Result { + if r.remaining() < 1 { + return Err(DecodeError::UnexpectedEnd); + } + + Ok(r.get_u8()) + } +} + +/* +impl Decode for [u8; N] { + fn decode(r: &mut R) -> Result { + if r.remaining() < N { + return Err(DecodeError::UnexpectedEnd); + } + + let mut buf = [0; N]; + r.copy_to_slice(&mut buf); + + Ok(buf) + } +} +*/ diff --git a/moq-transport/src/coding/duration.rs b/moq-transport/src/coding/duration.rs index 7d0ab37..2ac098a 100644 --- a/moq-transport/src/coding/duration.rs +++ b/moq-transport/src/coding/duration.rs @@ -1,23 +1,20 @@ -use crate::coding::{Decode, Encode, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; +use bytes::{Buf, BufMut}; use std::time::Duration; -#[async_trait] impl Encode for Duration { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { let ms = self.as_millis(); let ms = VarInt::try_from(ms)?; - ms.encode(w).await + ms.encode(w) } } -#[async_trait] impl Decode for Duration { - async fn decode(r: &mut R) -> anyhow::Result { - let ms = VarInt::decode(r).await?; + fn decode(r: &mut R) -> Result { + let ms = VarInt::decode(r)?; Ok(Self::from_millis(ms.into())) } } diff --git a/moq-transport/src/coding/encode.rs b/moq-transport/src/coding/encode.rs index be57bd1..99c2712 100644 --- a/moq-transport/src/coding/encode.rs +++ b/moq-transport/src/coding/encode.rs @@ -1,41 +1,95 @@ -use async_trait::async_trait; -use tokio::io::{AsyncWrite, AsyncWriteExt}; +use super::{BoundsExceeded, VarInt}; +use bytes::{BufMut, Bytes}; -use super::VarInt; -use bytes::Bytes; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum EncodeError { + #[error("unexpected end of buffer")] + UnexpectedEnd, + + #[error("varint too large")] + BoundsExceeded(#[from] BoundsExceeded), + + #[error("unknown error")] + Unknown, +} -#[async_trait] pub trait Encode: Sized { - async fn encode(&self, w: &mut W) -> anyhow::Result<()>; + fn encode(&self, w: &mut W) -> Result<(), EncodeError>; } -#[async_trait] impl Encode for Bytes { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - self.as_ref().encode(w).await + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.as_ref().encode(w) } } -#[async_trait] impl Encode for Vec { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - self.as_slice().encode(w).await + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.as_slice().encode(w) } } -#[async_trait] impl Encode for &[u8] { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - let size: VarInt = self.len().try_into()?; - size.encode(w).await?; - w.write_all(self).await?; + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + let size = VarInt::try_from(self.len())?; + size.encode(w)?; + + if w.remaining_mut() < self.len() { + return Err(EncodeError::UnexpectedEnd); + } + w.put_slice(self); + Ok(()) } } -#[async_trait] impl Encode for String { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - self.as_bytes().encode(w).await + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.as_bytes().encode(w) + } +} + +impl Encode for u8 { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + if w.remaining_mut() < 1 { + return Err(EncodeError::UnexpectedEnd); + } + + w.put_u8(*self); + Ok(()) + } +} + +impl Encode for u16 { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + if w.remaining_mut() < 2 { + return Err(EncodeError::UnexpectedEnd); + } + + w.put_u16(*self); + Ok(()) + } +} + +impl Encode for u32 { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + if w.remaining_mut() < 4 { + return Err(EncodeError::UnexpectedEnd); + } + + w.put_u32(*self); + Ok(()) + } +} +impl Encode for u64 { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + if w.remaining_mut() < 8 { + return Err(EncodeError::UnexpectedEnd); + } + + w.put_u64(*self); + Ok(()) } } diff --git a/moq-transport/src/coding/varint.rs b/moq-transport/src/coding/varint.rs index c275cc4..80afcd5 100644 --- a/moq-transport/src/coding/varint.rs +++ b/moq-transport/src/coding/varint.rs @@ -5,12 +5,11 @@ use std::convert::{TryFrom, TryInto}; use std::fmt; -use crate::coding::{Decode, Encode}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use bytes::{Buf, BufMut}; use thiserror::Error; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; - #[derive(Debug, Copy, Clone, Eq, PartialEq, Error)] #[error("value too large for varint encoding")] pub struct BoundsExceeded; @@ -102,6 +101,7 @@ impl TryFrom for VarInt { impl TryFrom for VarInt { type Error = BoundsExceeded; + /// Succeeds iff `x` < 2^62 fn try_from(x: usize) -> Result { Self::try_from(x as u64) @@ -120,13 +120,15 @@ impl fmt::Display for VarInt { } } -use async_trait::async_trait; - -#[async_trait] impl Decode for VarInt { - async fn decode(r: &mut R) -> anyhow::Result { + fn decode(r: &mut R) -> Result { let mut buf = [0; 8]; - r.read_exact(buf[0..1].as_mut()).await?; + + if r.remaining() < 1 { + return Err(DecodeError::UnexpectedEnd); + } + + buf[0] = r.get_u8(); let tag = buf[0] >> 6; buf[0] &= 0b0011_1111; @@ -134,15 +136,27 @@ impl Decode for VarInt { let x = match tag { 0b00 => u64::from(buf[0]), 0b01 => { - r.read_exact(buf[1..2].as_mut()).await?; + if r.remaining() < 1 { + return Err(DecodeError::UnexpectedEnd); + } + + r.copy_to_slice(buf[1..2].as_mut()); u64::from(u16::from_be_bytes(buf[..2].try_into().unwrap())) } 0b10 => { - r.read_exact(buf[1..4].as_mut()).await?; + if r.remaining() < 3 { + return Err(DecodeError::UnexpectedEnd); + } + + r.copy_to_slice(buf[1..4].as_mut()); u64::from(u32::from_be_bytes(buf[..4].try_into().unwrap())) } 0b11 => { - r.read_exact(buf[1..8].as_mut()).await?; + if r.remaining() < 7 { + return Err(DecodeError::UnexpectedEnd); + } + + r.copy_to_slice(buf[1..8].as_mut()); u64::from_be_bytes(buf) } _ => unreachable!(), @@ -152,22 +166,19 @@ impl Decode for VarInt { } } -#[async_trait] impl Encode for VarInt { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { let x = self.0; if x < 2u64.pow(6) { - w.write_u8(x as u8).await?; + (x as u8).encode(w) } else if x < 2u64.pow(14) { - w.write_u16(0b01 << 14 | x as u16).await?; + (0b01 << 14 | x as u16).encode(w) } else if x < 2u64.pow(30) { - w.write_u32(0b10 << 30 | x as u32).await?; + (0b10 << 30 | x as u32).encode(w) } else if x < 2u64.pow(62) { - w.write_u64(0b11 << 62 | x).await?; + (0b11 << 62 | x).encode(w) } else { - anyhow::bail!("malformed VarInt"); + unreachable!("malformed VarInt"); } - - Ok(()) } } diff --git a/moq-transport/src/control/announce.rs b/moq-transport/src/control/announce.rs index 1bffe11..54424ac 100644 --- a/moq-transport/src/control/announce.rs +++ b/moq-transport/src/control/announce.rs @@ -1,7 +1,6 @@ -use crate::coding::{Decode, Encode}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; +use bytes::{Buf, BufMut}; #[derive(Debug)] pub struct Announce { @@ -9,18 +8,16 @@ pub struct Announce { pub track_namespace: String, } -#[async_trait] impl Decode for Announce { - async fn decode(r: &mut R) -> anyhow::Result { - let track_namespace = String::decode(r).await?; + fn decode(r: &mut R) -> Result { + let track_namespace = String::decode(r)?; Ok(Self { track_namespace }) } } -#[async_trait] impl Encode for Announce { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - self.track_namespace.encode(w).await?; + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_namespace.encode(w)?; Ok(()) } } diff --git a/moq-transport/src/control/announce_error.rs b/moq-transport/src/control/announce_error.rs index d8c49b1..ae09cfd 100644 --- a/moq-transport/src/control/announce_error.rs +++ b/moq-transport/src/control/announce_error.rs @@ -1,7 +1,6 @@ -use crate::coding::{Decode, Encode, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; +use bytes::{Buf, BufMut}; #[derive(Debug)] pub struct AnnounceError { @@ -16,12 +15,11 @@ pub struct AnnounceError { pub reason: String, } -#[async_trait] impl Decode for AnnounceError { - async fn decode(r: &mut R) -> anyhow::Result { - let track_namespace = String::decode(r).await?; - let code = VarInt::decode(r).await?; - let reason = String::decode(r).await?; + fn decode(r: &mut R) -> Result { + let track_namespace = String::decode(r)?; + let code = VarInt::decode(r)?; + let reason = String::decode(r)?; Ok(Self { track_namespace, @@ -31,12 +29,11 @@ impl Decode for AnnounceError { } } -#[async_trait] impl Encode for AnnounceError { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - self.track_namespace.encode(w).await?; - self.code.encode(w).await?; - self.reason.encode(w).await?; + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_namespace.encode(w)?; + self.code.encode(w)?; + self.reason.encode(w)?; Ok(()) } diff --git a/moq-transport/src/control/announce_ok.rs b/moq-transport/src/control/announce_ok.rs index 63049ad..cb6d7a9 100644 --- a/moq-transport/src/control/announce_ok.rs +++ b/moq-transport/src/control/announce_ok.rs @@ -1,7 +1,6 @@ -use crate::coding::{Decode, Encode}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; +use bytes::{Buf, BufMut}; #[derive(Debug)] pub struct AnnounceOk { @@ -10,17 +9,15 @@ pub struct AnnounceOk { pub track_namespace: String, } -#[async_trait] impl Decode for AnnounceOk { - async fn decode(r: &mut R) -> anyhow::Result { - let track_namespace = String::decode(r).await?; + fn decode(r: &mut R) -> Result { + let track_namespace = String::decode(r)?; Ok(Self { track_namespace }) } } -#[async_trait] impl Encode for AnnounceOk { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - self.track_namespace.encode(w).await + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_namespace.encode(w) } } diff --git a/moq-transport/src/control/go_away.rs b/moq-transport/src/control/go_away.rs index 6d798db..e91d933 100644 --- a/moq-transport/src/control/go_away.rs +++ b/moq-transport/src/control/go_away.rs @@ -1,24 +1,21 @@ -use crate::coding::{Decode, Encode}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; +use bytes::{Buf, BufMut}; #[derive(Debug)] pub struct GoAway { pub url: String, } -#[async_trait] impl Decode for GoAway { - async fn decode(r: &mut R) -> anyhow::Result { - let url = String::decode(r).await?; + fn decode(r: &mut R) -> Result { + let url = String::decode(r)?; Ok(Self { url }) } } -#[async_trait] impl Encode for GoAway { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - self.url.encode(w).await + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.url.encode(w) } } diff --git a/moq-transport/src/control/mod.rs b/moq-transport/src/control/mod.rs index cb34047..1070a1c 100644 --- a/moq-transport/src/control/mod.rs +++ b/moq-transport/src/control/mod.rs @@ -2,27 +2,38 @@ mod announce; mod announce_error; mod announce_ok; mod go_away; -mod stream; +mod role; +mod setup_client; +mod setup_server; mod subscribe; mod subscribe_error; mod subscribe_ok; +mod version; pub use announce::*; pub use announce_error::*; pub use announce_ok::*; pub use go_away::*; -pub use stream::*; +pub use role::*; +pub use setup_client::*; +pub use setup_server::*; pub use subscribe::*; pub use subscribe_error::*; pub use subscribe_ok::*; +pub use version::*; -use crate::coding::{Decode, Encode, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use async_trait::async_trait; +use bytes::{Buf, BufMut}; use std::fmt; -use tokio::io::{AsyncRead, AsyncWrite}; -use anyhow::Context; +// NOTE: This is forked from moq-transport-00. +// 1. SETUP role indicates local support ("I can subscribe"), not remote support ("server must publish") +// 2. SETUP_SERVER is id=2 to disambiguate +// 3. messages do not have a specified length. +// 4. messages are sent over a single bidrectional stream (after SETUP), not unidirectional streams. +// 5. SUBSCRIBE specifies the track_id, not SUBSCRIBE_OK +// 6. optional parameters are written in order, and zero when unset (setup, announce, subscribe) // Use a macro to generate the message types rather than copy-paste. // This implements a decode/encode method that uses the specified type. @@ -32,45 +43,33 @@ macro_rules! message_types { $($name($name)),* } - #[async_trait] - impl Decode for Message { - async fn decode(r: &mut R) -> anyhow::Result { - let t = VarInt::decode(r).await.context("failed to decode type")?; - Ok(match t.into_inner() { + impl Decode for Message { + fn decode(r: &mut R) -> Result { + let t = VarInt::decode(r)?; + + match t.into_inner() { $($val => { - let msg = $name::decode(r).await.context(concat!("failed to decode ", stringify!($name)))?; - Self::$name(msg) + let msg = $name::decode(r)?; + Ok(Self::$name(msg)) })* - _ => anyhow::bail!("invalid type: {}", t), - }) + _ => Err(DecodeError::InvalidType(t)), + } } } - #[async_trait] + impl Encode for Message { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { match self { $(Self::$name(ref m) => { - VarInt::from_u32($val).encode(w).await.context("failed to encode type")?; - m.encode(w).await.context("failed to encode message") + VarInt::from_u32($val).encode(w)?; + m.encode(w) },)* } } } - // Unwrap the enum into the specified type. - $(impl TryFrom for $name { - type Error = anyhow::Error; - - fn try_from(m: Message) -> Result { - match m { - Message::$name(m) => Ok(m), - _ => anyhow::bail!("invalid message type"), - } - } - })* - $(impl From<$name> for Message { fn from(m: $name) -> Self { Message::$name(m) @@ -88,17 +87,12 @@ macro_rules! message_types { } } -// NOTE: These messages are forked from moq-transport-00. -// 1. subscribe specifies the track_id, not subscribe_ok -// 2. messages lack a specified length -// 3. optional parameters are not supported (announce, subscribe) -// 4. not allowed on undirectional streams; only after SETUP on the bidirectional stream - // Each message is prefixed with the given VarInt type. message_types! { // NOTE: Object and Setup are in other modules. // Object = 0x0 - // Setup = 0x1 + SetupClient = 0x1, + SetupServer = 0x2, Subscribe = 0x3, SubscribeOk = 0x4, SubscribeError = 0x5, diff --git a/moq-transport/src/setup/role.rs b/moq-transport/src/control/role.rs similarity index 55% rename from moq-transport/src/setup/role.rs rename to moq-transport/src/control/role.rs index f18518d..15a1073 100644 --- a/moq-transport/src/setup/role.rs +++ b/moq-transport/src/control/role.rs @@ -1,7 +1,6 @@ -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; +use bytes::{Buf, BufMut}; -use crate::coding::{Decode, Encode, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Role { @@ -37,29 +36,27 @@ impl From for VarInt { } impl TryFrom for Role { - type Error = anyhow::Error; + type Error = DecodeError; fn try_from(v: VarInt) -> Result { - Ok(match v.into_inner() { - 0x0 => Self::Publisher, - 0x1 => Self::Subscriber, - 0x2 => Self::Both, - _ => anyhow::bail!("invalid role: {}", v), - }) + match v.into_inner() { + 0x0 => Ok(Self::Publisher), + 0x1 => Ok(Self::Subscriber), + 0x2 => Ok(Self::Both), + _ => Err(DecodeError::InvalidType(v)), + } } } -#[async_trait] impl Decode for Role { - async fn decode(r: &mut R) -> anyhow::Result { - let v = VarInt::decode(r).await?; + fn decode(r: &mut R) -> Result { + let v = VarInt::decode(r)?; v.try_into() } } -#[async_trait] impl Encode for Role { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - VarInt::from(*self).encode(w).await + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + VarInt::from(*self).encode(w) } } diff --git a/moq-transport/src/control/setup_client.rs b/moq-transport/src/control/setup_client.rs new file mode 100644 index 0000000..a2f3126 --- /dev/null +++ b/moq-transport/src/control/setup_client.rs @@ -0,0 +1,41 @@ +use super::{Role, Versions}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; + +use bytes::{Buf, BufMut}; + +// Sent by the client to setup up the session. +#[derive(Debug)] +pub struct SetupClient { + // NOTE: This is not a message type, but rather the control stream header. + // Proposal: https://github.com/moq-wg/moq-transport/issues/138 + + // The list of supported versions in preferred order. + pub versions: Versions, + + // Indicate if the client is a publisher, a subscriber, or both. + // Proposal: moq-wg/moq-transport#151 + pub role: Role, + + // The path, non-empty ONLY when not using WebTransport. + pub path: String, +} + +impl Decode for SetupClient { + fn decode(r: &mut R) -> Result { + let versions = Versions::decode(r)?; + let role = Role::decode(r)?; + let path = String::decode(r)?; + + Ok(Self { versions, role, path }) + } +} + +impl Encode for SetupClient { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.versions.encode(w)?; + self.role.encode(w)?; + self.path.encode(w)?; + + Ok(()) + } +} diff --git a/moq-transport/src/control/setup_server.rs b/moq-transport/src/control/setup_server.rs new file mode 100644 index 0000000..c1a6092 --- /dev/null +++ b/moq-transport/src/control/setup_server.rs @@ -0,0 +1,35 @@ +use super::{Role, Version}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError}; + +use bytes::{Buf, BufMut}; + +// Sent by the server in response to a client. +// NOTE: This is not a message type, but rather the control stream header. +// Proposal: https://github.com/moq-wg/moq-transport/issues/138 +#[derive(Debug)] +pub struct SetupServer { + // The list of supported versions in preferred order. + pub version: Version, + + // param: 0x0: Indicate if the server is a publisher, a subscriber, or both. + // Proposal: moq-wg/moq-transport#151 + pub role: Role, +} + +impl Decode for SetupServer { + fn decode(r: &mut R) -> Result { + let version = Version::decode(r)?; + let role = Role::decode(r)?; + + Ok(Self { version, role }) + } +} + +impl Encode for SetupServer { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.version.encode(w)?; + self.role.encode(w)?; + + Ok(()) + } +} diff --git a/moq-transport/src/control/stream.rs b/moq-transport/src/control/stream.rs deleted file mode 100644 index 170a667..0000000 --- a/moq-transport/src/control/stream.rs +++ /dev/null @@ -1,79 +0,0 @@ -use crate::coding::{Decode, Encode}; -use crate::control::Message; - -use bytes::Bytes; - -use h3::quic::BidiStream; -use std::sync::Arc; -use tokio::sync::Mutex; - -pub struct Stream { - sender: SendStream, - recver: RecvStream, -} - -impl Stream { - pub(crate) fn new(stream: h3_webtransport::stream::BidiStream, Bytes>) -> Self { - let (sender, recver) = stream.split(); - let sender = SendStream { stream: sender }; - let recver = RecvStream { stream: recver }; - - Self { sender, recver } - } - - pub fn split(self) -> (SendStream, RecvStream) { - (self.sender, self.recver) - } - - pub async fn send(&mut self, msg: Message) -> anyhow::Result<()> { - self.sender.send(msg).await - } - - pub async fn recv(&mut self) -> anyhow::Result { - self.recver.recv().await - } -} - -pub struct SendStream { - stream: h3_webtransport::stream::SendStream, Bytes>, -} - -impl SendStream { - pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { - let msg = msg.into(); - log::info!("sending message: {:?}", msg); - msg.encode(&mut self.stream).await - } - - // Helper that lets multiple threads send control messages. - pub fn share(self) -> SendShared { - SendShared { - stream: Arc::new(Mutex::new(self)), - } - } -} - -// Helper that allows multiple threads to send control messages. -#[derive(Clone)] -pub struct SendShared { - stream: Arc>, -} - -impl SendShared { - pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { - let mut stream = self.stream.lock().await; - stream.send(msg).await - } -} - -pub struct RecvStream { - stream: h3_webtransport::stream::RecvStream, -} - -impl RecvStream { - pub async fn recv(&mut self) -> anyhow::Result { - let msg = Message::decode(&mut self.stream).await?; - log::info!("received message: {:?}", msg); - Ok(msg) - } -} diff --git a/moq-transport/src/control/subscribe.rs b/moq-transport/src/control/subscribe.rs index 36a1be2..abbdb92 100644 --- a/moq-transport/src/control/subscribe.rs +++ b/moq-transport/src/control/subscribe.rs @@ -1,7 +1,6 @@ -use crate::coding::{Decode, Encode, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; +use bytes::{Buf, BufMut}; #[derive(Debug)] pub struct Subscribe { @@ -16,12 +15,11 @@ pub struct Subscribe { pub track_name: String, } -#[async_trait] impl Decode for Subscribe { - async fn decode(r: &mut R) -> anyhow::Result { - let track_id = VarInt::decode(r).await?; - let track_namespace = String::decode(r).await?; - let track_name = String::decode(r).await?; + fn decode(r: &mut R) -> Result { + let track_id = VarInt::decode(r)?; + let track_namespace = String::decode(r)?; + let track_name = String::decode(r)?; Ok(Self { track_id, @@ -31,12 +29,11 @@ impl Decode for Subscribe { } } -#[async_trait] impl Encode for Subscribe { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - self.track_id.encode(w).await?; - self.track_namespace.encode(w).await?; - self.track_name.encode(w).await?; + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_id.encode(w)?; + self.track_namespace.encode(w)?; + self.track_name.encode(w)?; Ok(()) } diff --git a/moq-transport/src/control/subscribe_error.rs b/moq-transport/src/control/subscribe_error.rs index 6455ed6..b7412c6 100644 --- a/moq-transport/src/control/subscribe_error.rs +++ b/moq-transport/src/control/subscribe_error.rs @@ -1,7 +1,6 @@ -use crate::coding::{Decode, Encode, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; +use bytes::{Buf, BufMut}; #[derive(Debug)] pub struct SubscribeError { @@ -17,23 +16,21 @@ pub struct SubscribeError { pub reason: String, } -#[async_trait] impl Decode for SubscribeError { - async fn decode(r: &mut R) -> anyhow::Result { - let track_id = VarInt::decode(r).await?; - let code = VarInt::decode(r).await?; - let reason = String::decode(r).await?; + fn decode(r: &mut R) -> Result { + let track_id = VarInt::decode(r)?; + let code = VarInt::decode(r)?; + let reason = String::decode(r)?; Ok(Self { track_id, code, reason }) } } -#[async_trait] impl Encode for SubscribeError { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - self.track_id.encode(w).await?; - self.code.encode(w).await?; - self.reason.encode(w).await?; + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_id.encode(w)?; + self.code.encode(w)?; + self.reason.encode(w)?; Ok(()) } diff --git a/moq-transport/src/control/subscribe_ok.rs b/moq-transport/src/control/subscribe_ok.rs index 23454ed..b502626 100644 --- a/moq-transport/src/control/subscribe_ok.rs +++ b/moq-transport/src/control/subscribe_ok.rs @@ -1,9 +1,8 @@ -use crate::coding::{Decode, Encode, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use std::time::Duration; -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; +use bytes::{Buf, BufMut}; #[derive(Debug)] pub struct SubscribeOk { @@ -17,22 +16,20 @@ pub struct SubscribeOk { pub expires: Option, } -#[async_trait] impl Decode for SubscribeOk { - async fn decode(r: &mut R) -> anyhow::Result { - let track_id = VarInt::decode(r).await?; - let expires = Duration::decode(r).await?; + fn decode(r: &mut R) -> Result { + let track_id = VarInt::decode(r)?; + let expires = Duration::decode(r)?; let expires = if expires == Duration::ZERO { None } else { Some(expires) }; Ok(Self { track_id, expires }) } } -#[async_trait] impl Encode for SubscribeOk { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - self.track_id.encode(w).await?; - self.expires.unwrap_or_default().encode(w).await?; + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_id.encode(w)?; + self.expires.unwrap_or_default().encode(w)?; Ok(()) } diff --git a/moq-transport/src/setup/version.rs b/moq-transport/src/control/version.rs similarity index 55% rename from moq-transport/src/setup/version.rs rename to moq-transport/src/control/version.rs index 847f3e8..0ba323b 100644 --- a/moq-transport/src/setup/version.rs +++ b/moq-transport/src/control/version.rs @@ -1,7 +1,6 @@ -use crate::coding::{Decode, Encode, VarInt}; +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; +use bytes::{Buf, BufMut}; use std::ops::Deref; @@ -24,32 +23,29 @@ impl From for VarInt { } } -#[async_trait] impl Decode for Version { - async fn decode(r: &mut R) -> anyhow::Result { - let v = VarInt::decode(r).await?; + fn decode(r: &mut R) -> Result { + let v = VarInt::decode(r)?; Ok(Self(v)) } } -#[async_trait] impl Encode for Version { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - self.0.encode(w).await + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.0.encode(w) } } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Versions(pub Vec); -#[async_trait] impl Decode for Versions { - async fn decode(r: &mut R) -> anyhow::Result { - let count = VarInt::decode(r).await?.into_inner(); + fn decode(r: &mut R) -> Result { + let count = VarInt::decode(r)?.into_inner(); let mut vs = Vec::new(); for _ in 0..count { - let v = Version::decode(r).await?; + let v = Version::decode(r)?; vs.push(v); } @@ -57,14 +53,15 @@ impl Decode for Versions { } } -#[async_trait] impl Encode for Versions { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { let size: VarInt = self.0.len().try_into()?; - size.encode(w).await?; + size.encode(w)?; + for v in &self.0 { - v.encode(w).await?; + v.encode(w)?; } + Ok(()) } } diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index eb66a07..8d045ac 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -1,7 +1,7 @@ -pub mod coding; -pub mod control; -pub mod object; -pub mod server; -pub mod setup; +mod coding; +mod control; +mod object; -pub use coding::VarInt; +pub use coding::*; +pub use control::*; +pub use object::*; diff --git a/moq-transport/src/object.rs b/moq-transport/src/object.rs new file mode 100644 index 0000000..d937a82 --- /dev/null +++ b/moq-transport/src/object.rs @@ -0,0 +1,54 @@ +use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; + +use bytes::{Buf, BufMut}; + +#[derive(Debug)] +pub struct Object { + // An ID for this track. + // Proposal: https://github.com/moq-wg/moq-transport/issues/209 + pub track: VarInt, + + // The group sequence number. + pub group: VarInt, + + // The object sequence number. + pub sequence: VarInt, + + // The priority/send order. + pub send_order: VarInt, +} + +impl Decode for Object { + fn decode(r: &mut R) -> Result { + let typ = VarInt::decode(r)?; + if typ.into_inner() != 0 { + return Err(DecodeError::InvalidType(typ)); + } + + // NOTE: size has been omitted + + let track = VarInt::decode(r)?; + let group = VarInt::decode(r)?; + let sequence = VarInt::decode(r)?; + let send_order = VarInt::decode(r)?; + + Ok(Self { + track, + group, + sequence, + send_order, + }) + } +} + +impl Encode for Object { + fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + VarInt::from_u32(0).encode(w)?; + self.track.encode(w)?; + self.group.encode(w)?; + self.sequence.encode(w)?; + self.send_order.encode(w)?; + + Ok(()) + } +} diff --git a/moq-transport/src/object/header.rs b/moq-transport/src/object/header.rs deleted file mode 100644 index b1ca425..0000000 --- a/moq-transport/src/object/header.rs +++ /dev/null @@ -1,56 +0,0 @@ -use crate::coding::{Decode, Encode, VarInt}; - -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; - -// Another name for OBJECT, sent as a header for data streams. -#[derive(Debug)] -pub struct Header { - // An ID for this track. - // Proposal: https://github.com/moq-wg/moq-transport/issues/209 - pub track_id: VarInt, - - // The group sequence number. - pub group_sequence: VarInt, - - // The object sequence number. - pub object_sequence: VarInt, - - // The priority/send order. - pub send_order: VarInt, -} - -#[async_trait] -impl Decode for Header { - async fn decode(r: &mut R) -> anyhow::Result { - let typ = VarInt::decode(r).await?; - anyhow::ensure!(u64::from(typ) == 0, "OBJECT type must be 0"); - - // NOTE: size has been omitted - - let track_id = VarInt::decode(r).await?; - let group_sequence = VarInt::decode(r).await?; - let object_sequence = VarInt::decode(r).await?; - let send_order = VarInt::decode(r).await?; - - Ok(Self { - track_id, - group_sequence, - object_sequence, - send_order, - }) - } -} - -#[async_trait] -impl Encode for Header { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - VarInt::from_u32(0).encode(w).await?; - self.track_id.encode(w).await?; - self.group_sequence.encode(w).await?; - self.object_sequence.encode(w).await?; - self.send_order.encode(w).await?; - - Ok(()) - } -} diff --git a/moq-transport/src/object/mod.rs b/moq-transport/src/object/mod.rs deleted file mode 100644 index bd5b430..0000000 --- a/moq-transport/src/object/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod header; -mod transport; - -pub use header::*; -pub use transport::*; diff --git a/moq-transport/src/object/recv.rs b/moq-transport/src/object/recv.rs deleted file mode 100644 index f9526e8..0000000 --- a/moq-transport/src/object/recv.rs +++ /dev/null @@ -1,26 +0,0 @@ -use super::Header; - -use std::sync::Arc; - -// Reduce some typing for implementors. -pub type RecvStream = h3_webtransport::stream::RecvStream; - -// Not clone, so we don't accidentally have two listners. -pub struct Receiver { - transport: Arc, -} - -impl Receiver { - pub async fn recv(&mut self) -> anyhow::Result<(Header, RecvStream)> { - let (_session_id, mut stream) = self - .transport - .accept_uni() - .await - .context("failed to accept uni stream")? - .context("no uni stream")?; - - let header = Header::decode(&mut stream).await?; - - Ok((header, stream)) - } -} diff --git a/moq-transport/src/object/send.rs b/moq-transport/src/object/send.rs deleted file mode 100644 index 9f1bc87..0000000 --- a/moq-transport/src/object/send.rs +++ /dev/null @@ -1,24 +0,0 @@ -use super::{Header, SendStream, WebTransportSession}; - -pub type SendStream = h3_webtransport::stream::SendStream, Bytes>; - -#[derive(Clone)] -pub struct Sender { - transport: Arc, -} - -impl Sender { - pub async fn send(&self, header: Header) -> anyhow::Result { - let mut stream = self - .transport - .open_uni(self.transport.session_id()) - .await - .context("failed to open uni stream")?; - - // TODO set send_order based on header - - header.encode(&mut stream).await?; - - Ok(stream) - } -} diff --git a/moq-transport/src/object/session.rs b/moq-transport/src/object/session.rs deleted file mode 100644 index 61a7dff..0000000 --- a/moq-transport/src/object/session.rs +++ /dev/null @@ -1,35 +0,0 @@ -use super::{Header, Receiver, RecvStream, SendStream, Sender}; - -use anyhow::Context; -use bytes::Bytes; - -use crate::coding::{Decode, Encode}; - -use std::sync::Arc; - -// TODO support clients -type WebTransportSession = h3_webtransport::server::WebTransportSession; - -pub struct Session { - pub send: Sender, - pub recv: Receiver, -} - -impl Session { - pub fn new(transport: WebTransportSession) -> Self { - let shared = Arc::new(transport); - - Self { - send: Sender::new(shared.clone()), - recv: Sender::new(shared), - } - } - - pub async fn recv(&mut self) -> anyhow::Result<(Header, RecvStream)> { - self.recv.recv().await - } - - pub async fn send(&self, header: Header) -> anyhow::Result { - self.send.send(header).await - } -} diff --git a/moq-transport/src/object/transport.rs b/moq-transport/src/object/transport.rs deleted file mode 100644 index f59c2cf..0000000 --- a/moq-transport/src/object/transport.rs +++ /dev/null @@ -1,51 +0,0 @@ -use super::Header; -use anyhow::Context; -use bytes::Bytes; - -use crate::coding::{Decode, Encode}; - -// TODO support clients -type WebTransportSession = h3_webtransport::server::WebTransportSession; - -// Reduce some typing for implementors. -pub type SendStream = h3_webtransport::stream::SendStream, Bytes>; -pub type RecvStream = h3_webtransport::stream::RecvStream; - -pub struct Transport { - transport: WebTransportSession, -} - -impl Transport { - pub fn new(transport: WebTransportSession) -> Self { - Self { transport } - } - - // TODO This should be &mut self to prevent multiple threads trying to read objects - pub async fn recv(&self) -> anyhow::Result<(Header, RecvStream)> { - let (_session_id, mut stream) = self - .transport - .accept_uni() - .await - .context("failed to accept uni stream")? - .context("no uni stream")?; - - let header = Header::decode(&mut stream).await?; - - Ok((header, stream)) - } - - // This can be &self since threads can create streams in parallel. - pub async fn send(&self, header: Header) -> anyhow::Result { - let mut stream = self - .transport - .open_uni(self.transport.session_id()) - .await - .context("failed to open uni stream")?; - - // TODO set send_order based on header - - header.encode(&mut stream).await?; - - Ok(stream) - } -} diff --git a/moq-transport/src/server/endpoint.rs b/moq-transport/src/server/endpoint.rs deleted file mode 100644 index 7ab1e61..0000000 --- a/moq-transport/src/server/endpoint.rs +++ /dev/null @@ -1,42 +0,0 @@ -use super::handshake::{Accept, Connecting}; - -use anyhow::Context; -use tokio::task::JoinSet; - -pub struct Endpoint { - // The QUIC server, yielding new connections and sessions. - endpoint: quinn::Endpoint, - - // A list of connections that are completing the WebTransport handshake. - handshake: JoinSet>, -} - -impl Endpoint { - pub fn new(endpoint: quinn::Endpoint) -> Self { - let handshake = JoinSet::new(); - Self { endpoint, handshake } - } - - // Accept the next WebTransport session. - pub async fn accept(&mut self) -> anyhow::Result { - loop { - tokio::select!( - // Accept the connection and start the WebTransport handshake. - conn = self.endpoint.accept() => { - let conn = conn.context("failed to accept connection")?; - self.handshake.spawn(async move { - Connecting::new(conn).accept().await - }); - }, - // Return any mostly finished WebTransport handshakes. - res = self.handshake.join_next(), if !self.handshake.is_empty() => { - let res = res.expect("no tasks").expect("task aborted"); - match res { - Ok(session) => return Ok(session), - Err(err) => log::warn!("failed to accept session: {:?}", err), - } - }, - ) - } - } -} diff --git a/moq-transport/src/server/handshake.rs b/moq-transport/src/server/handshake.rs deleted file mode 100644 index f4416c7..0000000 --- a/moq-transport/src/server/handshake.rs +++ /dev/null @@ -1,114 +0,0 @@ -use super::setup::{RecvSetup, SendSetup}; -use crate::{control, object, setup}; - -use anyhow::Context; -use bytes::Bytes; - -pub struct Connecting { - conn: quinn::Connecting, -} - -impl Connecting { - pub fn new(conn: quinn::Connecting) -> Self { - Self { conn } - } - - pub async fn accept(self) -> anyhow::Result { - let conn = self.conn.await.context("failed to accept h3 connection")?; - - let mut conn = h3::server::builder() - .enable_webtransport(true) - .enable_connect(true) - .enable_datagram(true) - .max_webtransport_sessions(1) - .send_grease(true) - .build(h3_quinn::Connection::new(conn)) - .await - .context("failed to create h3 server")?; - - let (req, stream) = conn - .accept() - .await - .context("failed to accept h3 session")? - .context("failed to accept h3 request")?; - - let ext = req.extensions(); - anyhow::ensure!(req.method() == http::Method::CONNECT, "expected CONNECT request"); - anyhow::ensure!( - ext.get::() == Some(&h3::ext::Protocol::WEB_TRANSPORT), - "expected WebTransport CONNECT" - ); - - // Return the request after validating the bare minimum. - let accept = Accept { conn, req, stream }; - - Ok(accept) - } -} - -// The WebTransport handshake is complete, but we need to decide if we accept it or return 404. -pub struct Accept { - // Inspect to decide whether to accept() or reject() the session. - req: http::Request<()>, - - conn: h3::server::Connection, - stream: h3::server::RequestStream, Bytes>, -} - -impl Accept { - // Expose the received URI - pub fn uri(&self) -> &http::Uri { - self.req.uri() - } - - // Accept the WebTransport session. - pub async fn accept(self) -> anyhow::Result { - let transport = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?; - - let stream = transport - .accept_bi() - .await - .context("failed to accept bidi stream")? - .unwrap(); - - let transport = object::Transport::new(transport); - - let stream = match stream { - h3_webtransport::server::AcceptedBi::BidiStream(_session_id, stream) => stream, - h3_webtransport::server::AcceptedBi::Request(..) => anyhow::bail!("additional http requests not supported"), - }; - - let setup = RecvSetup::new(stream).recv().await?; - - Ok(Setup { transport, setup }) - } - - // Reject the WebTransport session with a HTTP response. - pub async fn reject(mut self, resp: http::Response<()>) -> anyhow::Result<()> { - self.stream.send_response(resp).await?; - Ok(()) - } -} - -pub struct Setup { - setup: SendSetup, - transport: object::Transport, -} - -impl Setup { - // Return the setup message we received. - pub fn setup(&self) -> &setup::Client { - &self.setup.client - } - - // Accept the session with our own setup message. - pub async fn accept(self, setup: setup::Server) -> anyhow::Result<(object::Transport, control::Stream)> { - let control = self.setup.send(setup).await?; - Ok((self.transport, control)) - } - - pub async fn reject(self) -> anyhow::Result<()> { - // TODO Close the QUIC connection with an error code. - Ok(()) - } -} diff --git a/moq-transport/src/server/mod.rs b/moq-transport/src/server/mod.rs deleted file mode 100644 index 80df7ee..0000000 --- a/moq-transport/src/server/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -mod endpoint; -mod handshake; -mod setup; - -pub use endpoint::*; -pub use handshake::*; diff --git a/moq-transport/src/server/setup.rs b/moq-transport/src/server/setup.rs deleted file mode 100644 index 0e821af..0000000 --- a/moq-transport/src/server/setup.rs +++ /dev/null @@ -1,42 +0,0 @@ -use crate::coding::{Decode, Encode}; -use crate::{control, setup}; - -use anyhow::Context; -use bytes::Bytes; - -pub(crate) struct RecvSetup { - stream: h3_webtransport::stream::BidiStream, Bytes>, -} - -impl RecvSetup { - pub fn new(stream: h3_webtransport::stream::BidiStream, Bytes>) -> Self { - Self { stream } - } - - pub async fn recv(mut self) -> anyhow::Result { - let setup = setup::Client::decode(&mut self.stream) - .await - .context("failed to read client SETUP message")?; - - Ok(SendSetup::new(self.stream, setup)) - } -} - -pub(crate) struct SendSetup { - pub client: setup::Client, - stream: h3_webtransport::stream::BidiStream, Bytes>, -} - -impl SendSetup { - pub fn new( - stream: h3_webtransport::stream::BidiStream, Bytes>, - client: setup::Client, - ) -> Self { - Self { stream, client } - } - - pub async fn send(mut self, setup: setup::Server) -> anyhow::Result { - setup.encode(&mut self.stream).await?; - Ok(control::Stream::new(self.stream)) - } -} diff --git a/moq-transport/src/setup/client.rs b/moq-transport/src/setup/client.rs deleted file mode 100644 index f23e861..0000000 --- a/moq-transport/src/setup/client.rs +++ /dev/null @@ -1,54 +0,0 @@ -use super::{Role, Versions}; -use crate::coding::{Decode, Encode, VarInt}; - -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; - -use anyhow::Context; - -// Sent by the client to setup up the session. -#[derive(Debug)] -pub struct Client { - // NOTE: This is not a message type, but rather the control stream header. - // Proposal: https://github.com/moq-wg/moq-transport/issues/138 - - // The list of supported versions in preferred order. - pub versions: Versions, - - // Indicate if the client is a publisher, a subscriber, or both. - // Proposal: moq-wg/moq-transport#151 - pub role: Role, - - // The path, non-empty ONLY when not using WebTransport. - pub path: String, -} - -#[async_trait] -impl Decode for Client { - async fn decode(r: &mut R) -> anyhow::Result { - let typ = VarInt::decode(r).await.context("failed to read type")?; - anyhow::ensure!(typ.into_inner() == 1, "client SETUP must be type 1"); - - let versions = Versions::decode(r).await.context("failed to read supported versions")?; - anyhow::ensure!(!versions.is_empty(), "client must support at least one version"); - - let role = Role::decode(r).await.context("failed to decode role")?; - let path = String::decode(r).await.context("failed to read path")?; - - Ok(Self { versions, role, path }) - } -} - -#[async_trait] -impl Encode for Client { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - VarInt::from_u32(1).encode(w).await?; - - anyhow::ensure!(!self.versions.is_empty(), "client must support at least one version"); - self.versions.encode(w).await?; - self.role.encode(w).await?; - self.path.encode(w).await?; - - Ok(()) - } -} diff --git a/moq-transport/src/setup/mod.rs b/moq-transport/src/setup/mod.rs deleted file mode 100644 index e420112..0000000 --- a/moq-transport/src/setup/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -mod client; -mod role; -mod server; -mod version; - -pub use client::*; -pub use role::*; -pub use server::*; -pub use version::*; - -// NOTE: These are forked from moq-transport-00. -// 1. messages lack a sized length -// 2. parameters are not optional and written in order (role + path) -// 3. role indicates local support only, not remote support -// 4. server setup is id=2 to disambiguate diff --git a/moq-transport/src/setup/server.rs b/moq-transport/src/setup/server.rs deleted file mode 100644 index 3f2f43b..0000000 --- a/moq-transport/src/setup/server.rs +++ /dev/null @@ -1,44 +0,0 @@ -use super::{Role, Version}; -use crate::coding::{Decode, Encode, VarInt}; - -use anyhow::Context; -use async_trait::async_trait; -use tokio::io::{AsyncRead, AsyncWrite}; - -// Sent by the server in response to a client. -// NOTE: This is not a message type, but rather the control stream header. -// Proposal: https://github.com/moq-wg/moq-transport/issues/138 -#[derive(Debug)] -pub struct Server { - // The list of supported versions in preferred order. - pub version: Version, - - // param: 0x0: Indicate if the server is a publisher, a subscriber, or both. - // Proposal: moq-wg/moq-transport#151 - pub role: Role, -} - -#[async_trait] -impl Decode for Server { - async fn decode(r: &mut R) -> anyhow::Result { - let typ = VarInt::decode(r).await.context("failed to read type")?; - anyhow::ensure!(typ.into_inner() == 2, "server SETUP must be type 2"); - - let version = Version::decode(r).await.context("failed to read version")?; - let role = Role::decode(r).await.context("failed to read role")?; - - Ok(Self { version, role }) - } -} - -#[async_trait] -impl Encode for Server { - async fn encode(&self, w: &mut W) -> anyhow::Result<()> { - VarInt::from_u32(2).encode(w).await?; // setup type - - self.version.encode(w).await?; - self.role.encode(w).await?; - - Ok(()) - } -} diff --git a/moq-warp/Cargo.toml b/moq-warp/Cargo.toml index 4141c14..a87babe 100644 --- a/moq-warp/Cargo.toml +++ b/moq-warp/Cargo.toml @@ -16,6 +16,7 @@ categories = [ "multimedia", "network-programming", "web-programming" ] [dependencies] moq-transport = { path = "../moq-transport" } +moq-transport-quinn = { path = "../moq-transport-quinn" } tokio = "1.27" mp4 = "0.13.0" @@ -27,5 +28,3 @@ quinn = "0.10" ring = "0.16.20" rustls = "0.21.2" rustls-pemfile = "1.0.2" - -async-trait = "0.1" diff --git a/moq-warp/src/relay/contribute.rs b/moq-warp/src/relay/contribute.rs index d6dfd79..d21b684 100644 --- a/moq-warp/src/relay/contribute.rs +++ b/moq-warp/src/relay/contribute.rs @@ -6,8 +6,8 @@ use tokio::io::AsyncReadExt; use tokio::sync::mpsc; use tokio::task::JoinSet; // lock across await boundaries -use moq_transport::coding::VarInt; -use moq_transport::object; +use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; +use moq_transport_quinn::{RecvObjects, RecvStream}; use anyhow::Context; @@ -18,8 +18,7 @@ use crate::source::Source; // TODO experiment with making this Clone, so every task can have its own copy. pub struct Session { // Used to receive objects. - // TODO split into send/receive halves. - transport: Arc, + objects: RecvObjects, // Used to send and receive control messages. control: control::Component, @@ -39,12 +38,12 @@ pub struct Session { impl Session { pub fn new( - transport: Arc, + objects: RecvObjects, control: control::Component, broker: broker::Broadcasts, ) -> Self { Self { - transport, + objects, control, broker, broadcasts: HashMap::new(), @@ -62,9 +61,9 @@ impl Session { log::error!("failed to produce segment: {:?}", err); } }, - object = self.transport.recv() => { - let (header, stream )= object.context("failed to receive object")?; - let res = self.receive_object(header, stream).await; + object = self.objects.recv() => { + let (object, stream) = object.context("failed to receive object")?; + let res = self.receive_object(object, stream).await; if let Err(err) = res { log::error!("failed to receive object: {:?}", err); } @@ -89,19 +88,19 @@ impl Session { } } - async fn receive_object(&mut self, header: object::Header, stream: object::RecvStream) -> anyhow::Result<()> { - let id = header.track_id; + async fn receive_object(&mut self, object: Object, stream: RecvStream) -> anyhow::Result<()> { + let track = object.track; let segment = segment::Info { - sequence: header.object_sequence, - send_order: header.send_order, + sequence: object.sequence, + send_order: object.send_order, expires: Some(time::Instant::now() + time::Duration::from_secs(10)), }; let segment = segment::Publisher::new(segment); self.publishers - .push_segment(id, segment.subscribe()) + .push_segment(track, segment.subscribe()) .context("failed to publish segment")?; // TODO implement a timeout @@ -112,7 +111,7 @@ impl Session { Ok(()) } - async fn run_segment(mut segment: segment::Publisher, mut stream: object::RecvStream) -> anyhow::Result<()> { + async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> { let mut buf = [0u8; 32 * 1024]; loop { let size = stream.read(&mut buf).await.context("failed to read from stream")?; @@ -125,16 +124,16 @@ impl Session { } } - async fn receive_announce(&mut self, msg: control::Announce) -> anyhow::Result<()> { + async fn receive_announce(&mut self, msg: Announce) -> anyhow::Result<()> { match self.receive_announce_inner(&msg).await { Ok(()) => { - let msg = control::AnnounceOk { + let msg = AnnounceOk { track_namespace: msg.track_namespace, }; self.control.send(msg).await } Err(e) => { - let msg = control::AnnounceError { + let msg = AnnounceError { track_namespace: msg.track_namespace, code: VarInt::from_u32(1), reason: e.to_string(), @@ -144,7 +143,7 @@ impl Session { } } - async fn receive_announce_inner(&mut self, msg: &control::Announce) -> anyhow::Result<()> { + async fn receive_announce_inner(&mut self, msg: &Announce) -> anyhow::Result<()> { // Create a broadcast and announce it. // We don't actually start producing the broadcast until we receive a subscription. let broadcast = Arc::new(Broadcast::new(&msg.track_namespace, &self.publishers)); @@ -155,12 +154,12 @@ impl Session { Ok(()) } - fn receive_subscribe_ok(&mut self, _msg: control::SubscribeOk) -> anyhow::Result<()> { + fn receive_subscribe_ok(&mut self, _msg: SubscribeOk) -> anyhow::Result<()> { // TODO make sure this is for a track we are subscribed to Ok(()) } - fn receive_subscribe_error(&mut self, msg: control::SubscribeError) -> anyhow::Result<()> { + fn receive_subscribe_error(&mut self, msg: SubscribeError) -> anyhow::Result<()> { let error = track::Error { code: msg.code, reason: format!("upstream error: {}", msg.reason), @@ -282,10 +281,10 @@ impl Publishers { } // Returns the next subscribe message we need to issue. - pub async fn incoming(&mut self) -> anyhow::Result { + pub async fn incoming(&mut self) -> anyhow::Result { let (namespace, track) = self.receiver.recv().await.context("no more subscriptions")?; - let msg = control::Subscribe { + let msg = Subscribe { track_id: VarInt::try_from(self.next)?, track_namespace: namespace, track_name: track.name, diff --git a/moq-warp/src/relay/control.rs b/moq-warp/src/relay/control.rs index 9339e6a..2151594 100644 --- a/moq-warp/src/relay/control.rs +++ b/moq-warp/src/relay/control.rs @@ -1,11 +1,11 @@ -use moq_transport::control; use tokio::sync::mpsc; -pub use control::*; +use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk}; +use moq_transport_quinn::Control; pub struct Main { - control: control::Stream, - outgoing: mpsc::Receiver, + control: Control, + outgoing: mpsc::Receiver, contribute: mpsc::Sender, distribute: mpsc::Sender, @@ -21,7 +21,7 @@ impl Main { } } - pub async fn handle(&mut self, msg: control::Message) -> anyhow::Result<()> { + pub async fn handle(&mut self, msg: Message) -> anyhow::Result<()> { match msg.try_into() { Ok(msg) => self.contribute.send(msg).await?, Err(msg) => match msg.try_into() { @@ -36,7 +36,7 @@ impl Main { pub struct Component { incoming: mpsc::Receiver, - outgoing: mpsc::Sender, + outgoing: mpsc::Sender, } impl Component { @@ -51,7 +51,7 @@ impl Component { } // Splits a control stream into two components, based on if it's a message for contribution or distribution. -pub fn split(control: control::Stream) -> (Main, Component, Component) { +pub fn split(control: Control) -> (Main, Component, Component) { let (outgoing_tx, outgoing_rx) = mpsc::channel(1); let (contribute_tx, contribute_rx) = mpsc::channel(1); let (distribute_tx, distribute_rx) = mpsc::channel(1); @@ -79,19 +79,19 @@ pub fn split(control: control::Stream) -> (Main, Component, Componen // Messages we expect to receive from the client for contribution. #[derive(Debug)] pub enum Contribute { - Announce(control::Announce), - SubscribeOk(control::SubscribeOk), - SubscribeError(control::SubscribeError), + Announce(Announce), + SubscribeOk(SubscribeOk), + SubscribeError(SubscribeError), } -impl TryFrom for Contribute { - type Error = control::Message; +impl TryFrom for Contribute { + type Error = Message; - fn try_from(msg: control::Message) -> Result { + fn try_from(msg: Message) -> Result { match msg { - control::Message::Announce(msg) => Ok(Self::Announce(msg)), - control::Message::SubscribeOk(msg) => Ok(Self::SubscribeOk(msg)), - control::Message::SubscribeError(msg) => Ok(Self::SubscribeError(msg)), + Message::Announce(msg) => Ok(Self::Announce(msg)), + Message::SubscribeOk(msg) => Ok(Self::SubscribeOk(msg)), + Message::SubscribeError(msg) => Ok(Self::SubscribeError(msg)), _ => Err(msg), } } @@ -100,19 +100,19 @@ impl TryFrom for Contribute { // Messages we expect to receive from the client for distribution. #[derive(Debug)] pub enum Distribute { - AnnounceOk(control::AnnounceOk), - AnnounceError(control::AnnounceError), - Subscribe(control::Subscribe), + AnnounceOk(AnnounceOk), + AnnounceError(AnnounceError), + Subscribe(Subscribe), } -impl TryFrom for Distribute { - type Error = control::Message; +impl TryFrom for Distribute { + type Error = Message; - fn try_from(value: control::Message) -> Result { + fn try_from(value: Message) -> Result { match value { - control::Message::AnnounceOk(msg) => Ok(Self::AnnounceOk(msg)), - control::Message::AnnounceError(msg) => Ok(Self::AnnounceError(msg)), - control::Message::Subscribe(msg) => Ok(Self::Subscribe(msg)), + Message::AnnounceOk(msg) => Ok(Self::AnnounceOk(msg)), + Message::AnnounceError(msg) => Ok(Self::AnnounceError(msg)), + Message::Subscribe(msg) => Ok(Self::Subscribe(msg)), _ => Err(value), } } diff --git a/moq-warp/src/relay/distribute.rs b/moq-warp/src/relay/distribute.rs index 152ca90..0db689d 100644 --- a/moq-warp/src/relay/distribute.rs +++ b/moq-warp/src/relay/distribute.rs @@ -3,17 +3,15 @@ use anyhow::Context; use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; // allows locking across await -use std::sync::Arc; - -use moq_transport::coding::VarInt; -use moq_transport::object; +use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; +use moq_transport_quinn::SendObjects; use super::{broker, control}; use crate::model::{segment, track}; pub struct Session { - // Objects are sent to the client using this transport. - transport: Arc, + // Objects are sent to the client + objects: SendObjects, // Used to send and receive control messages. control: control::Component, @@ -22,17 +20,17 @@ pub struct Session { broker: broker::Broadcasts, // A list of tasks that are currently running. - run_subscribes: JoinSet, // run subscriptions, sending the returned error if they fail + run_subscribes: JoinSet, // run subscriptions, sending the returned error if they fail } impl Session { pub fn new( - transport: Arc, + objects: SendObjects, control: control::Component, broker: broker::Broadcasts, ) -> Self { Self { - transport, + objects, control, broker, run_subscribes: JoinSet::new(), @@ -72,22 +70,22 @@ impl Session { } } - fn receive_announce_ok(&mut self, _msg: control::AnnounceOk) -> anyhow::Result<()> { + fn receive_announce_ok(&mut self, _msg: AnnounceOk) -> anyhow::Result<()> { // TODO make sure we sent this announce Ok(()) } - fn receive_announce_error(&mut self, msg: control::AnnounceError) -> anyhow::Result<()> { + fn receive_announce_error(&mut self, msg: AnnounceError) -> anyhow::Result<()> { // TODO make sure we sent this announce // TODO remove this from the list of subscribable broadcasts. anyhow::bail!("received ANNOUNCE_ERROR({:?}): {}", msg.code, msg.reason) } - async fn receive_subscribe(&mut self, msg: control::Subscribe) -> anyhow::Result<()> { + async fn receive_subscribe(&mut self, msg: Subscribe) -> anyhow::Result<()> { match self.receive_subscribe_inner(&msg).await { Ok(()) => { self.control - .send(control::SubscribeOk { + .send(SubscribeOk { track_id: msg.track_id, expires: None, }) @@ -95,7 +93,7 @@ impl Session { } Err(e) => { self.control - .send(control::SubscribeError { + .send(SubscribeError { track_id: msg.track_id, code: VarInt::from_u32(1), reason: e.to_string(), @@ -105,27 +103,23 @@ impl Session { } } - async fn receive_subscribe_inner(&mut self, msg: &control::Subscribe) -> anyhow::Result<()> { + async fn receive_subscribe_inner(&mut self, msg: &Subscribe) -> anyhow::Result<()> { let track = self .broker .subscribe(&msg.track_namespace, &msg.track_name) .context("could not find broadcast")?; // TODO can we just clone self? - let transport = self.transport.clone(); + let objects = self.objects.clone(); let track_id = msg.track_id; self.run_subscribes - .spawn(async move { Self::run_subscribe(transport, track_id, track).await }); + .spawn(async move { Self::run_subscribe(objects, track_id, track).await }); Ok(()) } - async fn run_subscribe( - transport: Arc, - track_id: VarInt, - mut track: track::Subscriber, - ) -> control::SubscribeError { + async fn run_subscribe(objects: SendObjects, track_id: VarInt, mut track: track::Subscriber) -> SubscribeError { let mut tasks = JoinSet::new(); let mut result = None; @@ -135,11 +129,11 @@ impl Session { segment = track.next_segment(), if result.is_none() => { match segment { Ok(segment) => { - let transport = transport.clone(); - tasks.spawn(async move { Self::serve_group(transport, track_id, segment).await }); + let objects = objects.clone(); + tasks.spawn(async move { Self::serve_group(objects, track_id, segment).await }); }, Err(e) => { - result = Some(control::SubscribeError { + result = Some(SubscribeError { track_id, code: e.code, reason: e.reason, @@ -160,18 +154,18 @@ impl Session { } async fn serve_group( - transport: Arc, + mut objects: SendObjects, track_id: VarInt, mut segment: segment::Subscriber, ) -> anyhow::Result<()> { - let header = object::Header { - track_id, - group_sequence: segment.sequence, - object_sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object + let object = Object { + track: track_id, + group: segment.sequence, + sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object send_order: segment.send_order, }; - let mut stream = transport.send(header).await?; + let mut stream = objects.send(object).await?; // Write each fragment as they are available. while let Some(fragment) = segment.fragments.next().await { @@ -187,14 +181,14 @@ impl Session { match delta { broker::Update::Insert(name) => { self.control - .send(control::Announce { + .send(Announce { track_namespace: name.clone(), }) .await } broker::Update::Remove(name, error) => { self.control - .send(control::AnnounceError { + .send(AnnounceError { track_namespace: name, code: error.code, reason: error.reason, diff --git a/moq-warp/src/relay/server.rs b/moq-warp/src/relay/server.rs index e39c66b..dfbbcfd 100644 --- a/moq-warp/src/relay/server.rs +++ b/moq-warp/src/relay/server.rs @@ -1,7 +1,5 @@ use super::{broker, Session}; -use moq_transport::server::Endpoint; - use std::{fs, io, net, path, sync, time}; use anyhow::Context; @@ -10,7 +8,7 @@ use tokio::task::JoinSet; pub struct Server { // The MoQ transport server. - server: Endpoint, + server: moq_transport_quinn::Server, // The media sources. broker: broker::Broadcasts, @@ -76,7 +74,7 @@ impl Server { let server = quinn::Endpoint::server(server_config, config.addr)?; let broker = config.broker; - let server = Endpoint::new(server); + let server = moq_transport_quinn::Server::new(server); let tasks = JoinSet::new(); Ok(Self { server, broker, tasks }) diff --git a/moq-warp/src/relay/session.rs b/moq-warp/src/relay/session.rs index 0c1e9e4..90d1908 100644 --- a/moq-warp/src/relay/session.rs +++ b/moq-warp/src/relay/session.rs @@ -1,11 +1,10 @@ use anyhow::Context; -use std::sync::Arc; - -use moq_transport::{server, setup}; - use super::{broker, contribute, control, distribute}; +use moq_transport::{Role, SetupServer, Version}; +use moq_transport_quinn::Connect; + pub struct Session { // Split logic into contribution/distribution to reduce the problem space. contribute: contribute::Session, @@ -16,7 +15,7 @@ pub struct Session { } impl Session { - pub async fn accept(session: server::Accept, broker: broker::Broadcasts) -> anyhow::Result { + pub async fn accept(session: Connect, broker: broker::Broadcasts) -> anyhow::Result { // Accep the WebTransport session. // OPTIONAL validate the conn.uri() otherwise call conn.reject() let session = session @@ -28,26 +27,25 @@ impl Session { .setup() .versions .iter() - .find(|v| **v == setup::Version::DRAFT_00) + .find(|v| **v == Version::DRAFT_00) .context("failed to find supported version")?; - match session.setup().role { - setup::Role::Subscriber => {} - _ => anyhow::bail!("TODO publishing not yet supported"), - } + // TODO use the role to decide if we can publish or subscribe - let setup = setup::Server { - version: setup::Version::DRAFT_00, - role: setup::Role::Publisher, + let setup = SetupServer { + version: Version::DRAFT_00, + role: Role::Publisher, }; - let (transport, control) = session.accept(setup).await?; - let transport = Arc::new(transport); + let session = session.accept(setup).await?; + + let (control, objects) = session.split(); + let (objects_send, objects_recv) = objects.split(); let (control, contribute, distribute) = control::split(control); - let contribute = contribute::Session::new(transport.clone(), contribute, broker.clone()); - let distribute = distribute::Session::new(transport, distribute, broker); + let contribute = contribute::Session::new(objects_recv, contribute, broker.clone()); + let distribute = distribute::Session::new(objects_send, distribute, broker); let session = Self { control,