From 4e11e8bafce7f23a4c80a5244efcd84a184d9b1e Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 3 Aug 2023 13:49:42 -0700 Subject: [PATCH] Actually make moq-transport generic. pog --- Cargo.lock | 28 +-- Cargo.toml | 7 +- {moq-demo => moq-quinn}/Cargo.toml | 6 +- {moq-demo => moq-quinn}/src/main.rs | 0 {moq-demo => moq-quinn}/src/server.rs | 13 +- moq-transport-quinn/Cargo.toml | 26 -- moq-transport-quinn/src/control.rs | 94 ------- moq-transport-quinn/src/lib.rs | 9 - moq-transport-quinn/src/object.rs | 150 ------------ moq-transport-quinn/src/session.rs | 98 -------- moq-transport-quinn/src/stream.rs | 115 --------- moq-transport/Cargo.toml | 5 +- moq-transport/src/coding/encode.rs | 3 - moq-transport/src/lib.rs | 12 +- .../src/{control => message}/announce.rs | 0 .../{control => message}/announce_error.rs | 0 .../src/{control => message}/announce_ok.rs | 0 .../src/{control => message}/go_away.rs | 0 moq-transport/src/{control => message}/mod.rs | 17 +- moq-transport/src/message/receiver.rs | 49 ++++ moq-transport/src/message/sender.rs | 68 ++++++ .../src/{control => message}/subscribe.rs | 0 .../{control => message}/subscribe_error.rs | 0 .../src/{control => message}/subscribe_ok.rs | 0 .../src/{object.rs => object/header.rs} | 7 +- moq-transport/src/object/mod.rs | 7 + moq-transport/src/object/receiver.rs | 128 ++++++++++ moq-transport/src/object/sender.rs | 229 ++++++++++++++++++ moq-transport/src/object/stream.rs | 103 ++++++++ moq-transport/src/session/mod.rs | 109 +++++++++ .../setup_client.rs => setup/client.rs} | 6 +- moq-transport/src/setup/mod.rs | 9 + moq-transport/src/{control => setup}/role.rs | 0 .../setup_server.rs => setup/server.rs} | 6 +- .../src/{control => setup}/version.rs | 0 moq-warp/Cargo.toml | 2 +- moq-warp/src/model/mod.rs | 2 +- moq-warp/src/relay/broker.rs | 18 +- moq-warp/src/relay/contribute.rs | 74 +++--- moq-warp/src/relay/distribute.rs | 65 +++-- moq-warp/src/relay/{control.rs => message.rs} | 37 ++- moq-warp/src/relay/mod.rs | 6 +- moq-warp/src/relay/session.rs | 28 ++- 43 files changed, 889 insertions(+), 647 deletions(-) rename {moq-demo => moq-quinn}/Cargo.toml (81%) rename {moq-demo => moq-quinn}/src/main.rs (100%) rename {moq-demo => moq-quinn}/src/server.rs (90%) delete mode 100644 moq-transport-quinn/Cargo.toml delete mode 100644 moq-transport-quinn/src/control.rs delete mode 100644 moq-transport-quinn/src/lib.rs delete mode 100644 moq-transport-quinn/src/object.rs delete mode 100644 moq-transport-quinn/src/session.rs delete mode 100644 moq-transport-quinn/src/stream.rs rename moq-transport/src/{control => message}/announce.rs (100%) rename moq-transport/src/{control => message}/announce_error.rs (100%) rename moq-transport/src/{control => message}/announce_ok.rs (100%) rename moq-transport/src/{control => message}/go_away.rs (100%) rename moq-transport/src/{control => message}/mod.rs (92%) create mode 100644 moq-transport/src/message/receiver.rs create mode 100644 moq-transport/src/message/sender.rs rename moq-transport/src/{control => message}/subscribe.rs (100%) rename moq-transport/src/{control => message}/subscribe_error.rs (100%) rename moq-transport/src/{control => message}/subscribe_ok.rs (100%) rename moq-transport/src/{object.rs => object/header.rs} (93%) create mode 100644 moq-transport/src/object/mod.rs create mode 100644 moq-transport/src/object/receiver.rs create mode 100644 moq-transport/src/object/sender.rs create mode 100644 moq-transport/src/object/stream.rs create mode 100644 moq-transport/src/session/mod.rs rename moq-transport/src/{control/setup_client.rs => setup/client.rs} (91%) create mode 100644 moq-transport/src/setup/mod.rs rename moq-transport/src/{control => setup}/role.rs (100%) rename moq-transport/src/{control/setup_server.rs => setup/server.rs} (90%) rename moq-transport/src/{control => setup}/version.rs (100%) rename moq-warp/src/relay/{control.rs => message.rs} (77%) diff --git a/Cargo.lock b/Cargo.lock index 79f3b45..d7ee5a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -919,7 +919,7 @@ dependencies = [ ] [[package]] -name = "moq-demo" +name = "moq-quinn" version = "0.1.0" dependencies = [ "anyhow", @@ -928,7 +928,6 @@ dependencies = [ "hex", "log", "moq-transport", - "moq-transport-quinn", "moq-warp", "quinn", "ring", @@ -936,30 +935,19 @@ dependencies = [ "rustls-pemfile", "tokio", "warp", + "webtransport-generic", "webtransport-quinn", ] [[package]] name = "moq-transport" version = "0.1.0" -dependencies = [ - "bytes", - "thiserror", -] - -[[package]] -name = "moq-transport-quinn" -version = "0.1.0" dependencies = [ "anyhow", "bytes", - "http", - "log", - "moq-transport", - "quinn", "thiserror", "tokio", - "webtransport-quinn", + "webtransport-generic", ] [[package]] @@ -970,12 +958,12 @@ dependencies = [ "bytes", "log", "moq-transport", - "moq-transport-quinn", "quinn", "ring", "rustls 0.21.2", "rustls-pemfile", "tokio", + "webtransport-generic", ] [[package]] @@ -1913,19 +1901,13 @@ dependencies = [ [[package]] name = "webtransport-generic" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ba4583e96bb0ef08142f868bf0d28f90211eced56a473768ee27446864a2310" dependencies = [ "bytes", - "log", - "thiserror", ] [[package]] name = "webtransport-proto" version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21fefb5728651d507b444659853b47896116179ea8fd0348d02de080250892c7" dependencies = [ "bytes", "http", @@ -1935,8 +1917,6 @@ dependencies = [ [[package]] name = "webtransport-quinn" version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96a4ac9975117d8c63c4d04577d594b3130fe2023b7363ebc613905acf98590a" dependencies = [ "async-std", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 20c5996..5407a66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,2 @@ [workspace] -members = [ - "moq-transport", - "moq-transport-quinn", - "moq-demo", - "moq-warp", -] +members = ["moq-transport", "moq-quinn", "moq-warp"] diff --git a/moq-demo/Cargo.toml b/moq-quinn/Cargo.toml similarity index 81% rename from moq-demo/Cargo.toml rename to moq-quinn/Cargo.toml index fbffbb4..bf050b6 100644 --- a/moq-demo/Cargo.toml +++ b/moq-quinn/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "moq-demo" +name = "moq-quinn" description = "Media over QUIC" authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" @@ -16,12 +16,12 @@ categories = ["multimedia", "network-programming", "web-programming"] [dependencies] moq-transport = { path = "../moq-transport" } -moq-transport-quinn = { path = "../moq-transport-quinn" } moq-warp = { path = "../moq-warp" } +webtransport-generic = { path = "../../webtransport-rs/webtransport-generic", version = "0.3" } # QUIC quinn = "0.10" -webtransport-quinn = "0.4.2" +webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn", version = "0.4.2" } # Crypto ring = "0.16.20" diff --git a/moq-demo/src/main.rs b/moq-quinn/src/main.rs similarity index 100% rename from moq-demo/src/main.rs rename to moq-quinn/src/main.rs diff --git a/moq-demo/src/server.rs b/moq-quinn/src/server.rs similarity index 90% rename from moq-demo/src/server.rs rename to moq-quinn/src/server.rs index 178f3ea..59c227d 100644 --- a/moq-demo/src/server.rs +++ b/moq-quinn/src/server.rs @@ -1,16 +1,15 @@ -use moq_warp::relay::broker; - use std::{fs, io, net, path, sync, time}; use anyhow::Context; +use moq_warp::relay; use tokio::task::JoinSet; pub struct Server { server: quinn::Endpoint, // The media sources. - broker: broker::Broadcasts, + broker: relay::Broker, // The active connections. conns: JoinSet>, @@ -62,7 +61,7 @@ impl Server { server_config.transport = sync::Arc::new(transport_config); let server = quinn::Endpoint::server(server_config, config.addr)?; - let broker = broker::Broadcasts::new(); + let broker = relay::Broker::new(); let conns = JoinSet::new(); @@ -88,7 +87,7 @@ impl Server { } } - async fn handle(conn: quinn::Connecting, broker: broker::Broadcasts) -> anyhow::Result<()> { + async fn handle(conn: quinn::Connecting, broker: relay::Broker) -> anyhow::Result<()> { // Wait for the QUIC connection to be established. let conn = conn.await.context("failed to establish QUIC connection")?; @@ -106,12 +105,12 @@ impl Server { .context("failed to respond to WebTransport request")?; // Perform the MoQ handshake. - let session = moq_transport_quinn::accept(session, moq_transport::Role::Both) + let session = moq_transport::Session::accept(session, moq_transport::setup::Role::Both) .await .context("failed to perform MoQ handshake")?; // Run the relay code. - let session = moq_warp::relay::Session::new(session, broker); + let session = relay::Session::new(session, broker); session.run().await } } diff --git a/moq-transport-quinn/Cargo.toml b/moq-transport-quinn/Cargo.toml deleted file mode 100644 index dc40d99..0000000 --- a/moq-transport-quinn/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "moq-transport-quinn" -description = "Media over QUIC" -authors = ["Luke Curley"] -repository = "https://github.com/kixelated/moq-rs" -license = "MIT OR Apache-2.0" - -version = "0.1.0" -edition = "2021" - -keywords = ["quic", "http3", "webtransport", "media", "live"] -categories = ["multimedia", "network-programming", "web-programming"] - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -moq-transport = { path = "../moq-transport" } - -quinn = "0.10" -http = "0.2" -webtransport-quinn = "0.4.2" -tokio = { version = "1.27", features = ["macros", "io-util"] } -bytes = "1" -log = "0.4" -anyhow = "1.0.70" -thiserror = "1.0.21" diff --git a/moq-transport-quinn/src/control.rs b/moq-transport-quinn/src/control.rs deleted file mode 100644 index b70578f..0000000 --- a/moq-transport-quinn/src/control.rs +++ /dev/null @@ -1,94 +0,0 @@ -use moq_transport::{Decode, DecodeError, Encode, Message}; - -use bytes::{Buf, BytesMut}; - -use std::io::Cursor; -use std::sync::Arc; -use tokio::{io::AsyncReadExt, sync::Mutex}; - -use webtransport_quinn::{RecvStream, SendStream}; - -pub struct SendControl { - stream: SendStream, - buf: BytesMut, // reuse a buffer to encode messages. -} - -impl SendControl { - pub fn new(stream: SendStream) -> Self { - Self { - buf: BytesMut::new(), - stream, - } - } - - pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { - let msg = msg.into(); - log::info!("sending message: {:?}", msg); - - self.buf.clear(); - msg.encode(&mut self.buf)?; - - // TODO make this work with select! - self.stream.write_all(&self.buf).await?; - - Ok(()) - } - - // Helper that lets multiple threads send control messages. - pub fn share(self) -> ControlShared { - ControlShared { - stream: Arc::new(Mutex::new(self)), - } - } -} - -// Helper that allows multiple threads to send control messages. -// There's no equivalent for receiving since only one thread should be receiving at a time. -#[derive(Clone)] -pub struct ControlShared { - stream: Arc>, -} - -impl ControlShared { - pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { - let mut stream = self.stream.lock().await; - stream.send(msg).await - } -} - -pub struct RecvControl { - stream: RecvStream, - buf: BytesMut, // data we've read but haven't fully decoded yet -} - -impl RecvControl { - pub fn new(stream: RecvStream) -> Self { - Self { - buf: BytesMut::new(), - stream, - } - } - - // Read the next full message from the stream. - pub async fn recv(&mut self) -> anyhow::Result { - loop { - // Read the contents of the buffer - let mut peek = Cursor::new(&self.buf); - - match Message::decode(&mut peek) { - Ok(msg) => { - // We've successfully decoded a message, so we can advance the buffer. - self.buf.advance(peek.position() as usize); - - log::info!("received message: {:?}", msg); - return Ok(msg); - } - Err(DecodeError::UnexpectedEnd) => { - // The decode failed, so we need to append more data. - self.stream.read_buf(&mut self.buf).await?; - } - Err(e) => return Err(e.into()), - } - } - } -} diff --git a/moq-transport-quinn/src/lib.rs b/moq-transport-quinn/src/lib.rs deleted file mode 100644 index d210460..0000000 --- a/moq-transport-quinn/src/lib.rs +++ /dev/null @@ -1,9 +0,0 @@ -mod control; -mod object; -mod session; -mod stream; - -pub use control::*; -pub use object::*; -pub use session::*; -pub use stream::*; diff --git a/moq-transport-quinn/src/object.rs b/moq-transport-quinn/src/object.rs deleted file mode 100644 index d348fe5..0000000 --- a/moq-transport-quinn/src/object.rs +++ /dev/null @@ -1,150 +0,0 @@ -use std::{collections::BinaryHeap, io::Cursor, sync::Arc}; - -use anyhow::Context; -use bytes::{Buf, BytesMut}; -use moq_transport::{Decode, DecodeError, Encode, Object}; - -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::sync::Mutex; -use tokio::task::JoinSet; -use webtransport_quinn::Session; - -use crate::{RecvStream, SendStream, SendStreamOrder}; - -// Allow this to be cloned so we can have multiple senders. -#[derive(Clone)] -pub struct SendObjects { - // This is a tokio mutex since we need to lock across await boundaries. - inner: Arc>, -} - -impl SendObjects { - pub fn new(session: Session) -> Self { - let inner = SendObjectsInner::new(session); - Self { - inner: Arc::new(Mutex::new(inner)), - } - } - - pub async fn open(&mut self, header: Object) -> anyhow::Result { - let mut inner = self.inner.lock().await; - inner.open(header).await - } -} - -struct SendObjectsInner { - session: Session, - - // Quinn supports a i32 for priority, but the wire format is a u64. - // Our work around is to keep a list of streams in priority order and use the index as the priority. - // This involves more work, so TODO either increase the Quinn size or reduce the wire size. - ordered: BinaryHeap, - ordered_swap: BinaryHeap, // reuse memory to avoid allocations - - // A reusable buffer for encoding headers. - // TODO figure out how to use BufMut on the stack and remove this. - buf: BytesMut, -} - -impl SendObjectsInner { - fn new(session: Session) -> Self { - Self { - session, - ordered: BinaryHeap::new(), - ordered_swap: BinaryHeap::new(), - buf: BytesMut::new(), - } - } - - pub async fn open(&mut self, header: Object) -> anyhow::Result { - let stream = self.session.open_uni().await.context("failed to open uni stream")?; - let (mut stream, priority) = SendStream::with_order(stream, header.send_order.into_inner()); - - // Add the priority to our existing list. - self.ordered.push(priority); - - // Loop through the list and update the priorities of any still active streams. - let mut index = 0; - while let Some(stream) = self.ordered.pop() { - if stream.update(index).is_ok() { - // Add the stream to the new list so it'll be in sorted order. - self.ordered_swap.push(stream); - index += 1; - } - } - - // Swap the lists so we can reuse the memory. - std::mem::swap(&mut self.ordered, &mut self.ordered_swap); - - // Encode and write the stream header. - // TODO do this in SendStream so we don't hold the lock. - // Otherwise, - self.buf.clear(); - header.encode(&mut self.buf).unwrap(); - stream.write_all(&self.buf).await.context("failed to write header")?; - - // log::info!("created stream: {:?}", header); - - Ok(stream) - } -} - -// Not clone, so we don't accidentally have two listners. -pub struct RecvObjects { - session: Session, - - // Streams that we've accepted but haven't read the header from yet. - streams: JoinSet>, -} - -impl RecvObjects { - pub fn new(session: Session) -> Self { - Self { - session, - streams: JoinSet::new(), - } - } - - pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> { - loop { - tokio::select! { - res = self.session.accept_uni() => { - let stream = res.context("failed to accept stream")?; - self.streams.spawn(async move { Self::read(stream).await }); - }, - res = self.streams.join_next(), if !self.streams.is_empty() => { - return res.unwrap().context("failed to run join set")?; - } - } - } - } - - async fn read(mut stream: webtransport_quinn::RecvStream) -> anyhow::Result<(Object, RecvStream)> { - let mut buf = BytesMut::new(); - - loop { - // Read more data into the buffer. - stream.read_buf(&mut buf).await?; - - // Use a cursor to read the buffer and remember how much we read. - let mut read = Cursor::new(&mut buf); - - let header = match Object::decode(&mut read) { - Ok(header) => header, - Err(DecodeError::UnexpectedEnd) => continue, - Err(err) => return Err(err.into()), - }; - - // We parsed a full header, advance the buffer. - let size = read.position() as usize; - buf.advance(size); - let buf = buf.freeze(); - - // log::info!("received stream: {:?}", header); - - let stream = RecvStream::new(buf, stream); - - return Ok((header, stream)); - } - } -} diff --git a/moq-transport-quinn/src/session.rs b/moq-transport-quinn/src/session.rs deleted file mode 100644 index 4ae0130..0000000 --- a/moq-transport-quinn/src/session.rs +++ /dev/null @@ -1,98 +0,0 @@ -use anyhow::Context; - -use moq_transport::{Message, SetupClient, SetupServer}; - -use super::{RecvControl, RecvObjects, SendControl, SendObjects}; - -/// Called by a server with an established WebTransport session. -// TODO close the session with an error code -pub async fn accept(session: webtransport_quinn::Session, role: moq_transport::Role) -> anyhow::Result { - let (send, recv) = session.accept_bi().await.context("failed to accept bidi stream")?; - - let mut send_control = SendControl::new(send); - let mut recv_control = RecvControl::new(recv); - - let setup_client = match recv_control.recv().await.context("failed to read SETUP")? { - Message::SetupClient(setup) => setup, - _ => anyhow::bail!("expected CLIENT SETUP"), - }; - - setup_client - .versions - .iter() - .find(|version| **version == moq_transport::Version::DRAFT_00) - .context("no supported versions")?; - - if !setup_client.role.compatible(role) { - anyhow::bail!("incompatible roles: {:?} {:?}", setup_client.role, role); - } - - let setup_server = SetupServer { - role, - version: moq_transport::Version::DRAFT_00, - }; - - send_control - .send(moq_transport::Message::SetupServer(setup_server)) - .await - .context("failed to send setup server")?; - - let send_objects = SendObjects::new(session.clone()); - let recv_objects = RecvObjects::new(session.clone()); - - Ok(Session { - send_control, - recv_control, - send_objects, - recv_objects, - }) -} - -/// Called by a client with an established WebTransport session. -pub async fn connect(session: webtransport_quinn::Session, role: moq_transport::Role) -> anyhow::Result { - let (send, recv) = session.open_bi().await.context("failed to oen bidi stream")?; - - let mut send_control = SendControl::new(send); - let mut recv_control = RecvControl::new(recv); - - let setup_client = SetupClient { - role, - versions: vec![moq_transport::Version::DRAFT_00].into(), - path: "".to_string(), - }; - - send_control - .send(moq_transport::Message::SetupClient(setup_client)) - .await - .context("failed to send SETUP CLIENT")?; - - let setup_server = match recv_control.recv().await.context("failed to read SETUP")? { - Message::SetupServer(setup) => setup, - _ => anyhow::bail!("expected SERVER SETUP"), - }; - - if setup_server.version != moq_transport::Version::DRAFT_00 { - anyhow::bail!("unsupported version: {:?}", setup_server.version); - } - - if !setup_server.role.compatible(role) { - anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role); - } - - let send_objects = SendObjects::new(session.clone()); - let recv_objects = RecvObjects::new(session.clone()); - - Ok(Session { - send_control, - recv_control, - send_objects, - recv_objects, - }) -} - -pub struct Session { - pub send_control: SendControl, - pub recv_control: RecvControl, - pub send_objects: SendObjects, - pub recv_objects: RecvObjects, -} diff --git a/moq-transport-quinn/src/stream.rs b/moq-transport-quinn/src/stream.rs deleted file mode 100644 index 2063b6d..0000000 --- a/moq-transport-quinn/src/stream.rs +++ /dev/null @@ -1,115 +0,0 @@ -use std::{ - io, - pin::{pin, Pin}, - sync::{Arc, Mutex, Weak}, - task::{self, Poll}, -}; - -use bytes::{BufMut, Bytes}; -use tokio::io::{AsyncRead, AsyncWrite}; - -// Ugh, so we need to wrap SendStream with a mutex because we need to be able to call set_priority on it. -// The problem is that set_priority takes a i32, while send_order is a VarInt -// So the solution is to maintain a priority queue of active streams and constantly update the priority with their index. -// So the library might update the priority of the stream at any point, while the application might similtaniously write to it. -// The only upside is that we don't expose set_priority, so the application can't screw with things. -pub struct SendStream { - stream: Arc>, -} - -impl SendStream { - // Create a new stream with the given order, returning a handle that allows us to update the priority. - pub(crate) fn with_order(stream: webtransport_quinn::SendStream, order: u64) -> (Self, SendStreamOrder) { - let stream = Arc::new(Mutex::new(stream)); - let weak = Arc::>::downgrade(&stream); - - (SendStream { stream }, SendStreamOrder { stream: weak, order }) - } -} - -pub(crate) struct SendStreamOrder { - // We use Weak here so we don't prevent the stream from being closed when dereferenced. - // update() will return an error if the stream was closed instead. - stream: Weak>, - order: u64, -} - -impl SendStreamOrder { - pub(crate) fn update(&self, index: i32) -> Result<(), webtransport_quinn::StreamClosed> { - let stream = self.stream.upgrade().ok_or(webtransport_quinn::StreamClosed)?; - let mut stream = stream.lock().unwrap(); - stream.set_priority(index) - } -} - -impl PartialEq for SendStreamOrder { - fn eq(&self, other: &Self) -> bool { - self.order == other.order - } -} - -impl Eq for SendStreamOrder {} - -impl PartialOrd for SendStreamOrder { - fn partial_cmp(&self, other: &Self) -> Option { - // We reverse the order so the lower send order is higher priority. - other.order.partial_cmp(&self.order) - } -} - -impl Ord for SendStreamOrder { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // We reverse the order so the lower send order is higher priority. - other.order.cmp(&self.order) - } -} - -// We implement AsyncWrite so we can grab the mutex on each write attempt, instead of holding it for the entire async function. -impl AsyncWrite for SendStream { - fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> task::Poll> { - let mut stream = self.stream.lock().unwrap(); - Pin::new(&mut *stream).poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll> { - let mut stream = self.stream.lock().unwrap(); - Pin::new(&mut *stream).poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll> { - let mut stream = self.stream.lock().unwrap(); - Pin::new(&mut *stream).poll_shutdown(cx) - } -} - -// Unfortunately, we need to wrap RecvStream with a buffer since moq-transport::Coding only supports buffered reads. -// We first serve any data in the buffer, then we poll the stream. -pub struct RecvStream { - buf: Bytes, - stream: webtransport_quinn::RecvStream, -} - -impl RecvStream { - pub(crate) fn new(buf: Bytes, stream: webtransport_quinn::RecvStream) -> Self { - Self { buf, stream } - } - - pub fn stop(&mut self, code: u32) { - self.stream.stop(code).ok(); - } -} - -impl AsyncRead for RecvStream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - if !self.buf.is_empty() { - buf.put(&mut pin!(self).buf); - Poll::Ready(Ok(())) - } else { - Pin::new(&mut self.stream).poll_read(cx, buf) - } - } -} diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 1706a6a..6aee5fa 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -16,4 +16,7 @@ categories = ["multimedia", "network-programming", "web-programming"] [dependencies] bytes = "1" -thiserror = "1.0.21" +thiserror = "1" +anyhow = "1" +webtransport-generic = { path = "../../webtransport-rs/webtransport-generic", version = "0.3" } +tokio = { version = "1.27", features = ["macros", "io-util"] } diff --git a/moq-transport/src/coding/encode.rs b/moq-transport/src/coding/encode.rs index 99c2712..f8db06a 100644 --- a/moq-transport/src/coding/encode.rs +++ b/moq-transport/src/coding/encode.rs @@ -10,9 +10,6 @@ pub enum EncodeError { #[error("varint too large")] BoundsExceeded(#[from] BoundsExceeded), - - #[error("unknown error")] - Unknown, } pub trait Encode: Sized { diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index 8d045ac..655a9d2 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -1,7 +1,9 @@ mod coding; -mod control; -mod object; +pub mod message; +pub mod object; +pub mod session; +pub mod setup; -pub use coding::*; -pub use control::*; -pub use object::*; +pub use coding::VarInt; +pub use message::Message; +pub use session::Session; diff --git a/moq-transport/src/control/announce.rs b/moq-transport/src/message/announce.rs similarity index 100% rename from moq-transport/src/control/announce.rs rename to moq-transport/src/message/announce.rs diff --git a/moq-transport/src/control/announce_error.rs b/moq-transport/src/message/announce_error.rs similarity index 100% rename from moq-transport/src/control/announce_error.rs rename to moq-transport/src/message/announce_error.rs diff --git a/moq-transport/src/control/announce_ok.rs b/moq-transport/src/message/announce_ok.rs similarity index 100% rename from moq-transport/src/control/announce_ok.rs rename to moq-transport/src/message/announce_ok.rs diff --git a/moq-transport/src/control/go_away.rs b/moq-transport/src/message/go_away.rs similarity index 100% rename from moq-transport/src/control/go_away.rs rename to moq-transport/src/message/go_away.rs diff --git a/moq-transport/src/control/mod.rs b/moq-transport/src/message/mod.rs similarity index 92% rename from moq-transport/src/control/mod.rs rename to moq-transport/src/message/mod.rs index 1070a1c..54e1f71 100644 --- a/moq-transport/src/control/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -2,27 +2,24 @@ mod announce; mod announce_error; mod announce_ok; mod go_away; -mod role; -mod setup_client; -mod setup_server; +mod receiver; +mod sender; mod subscribe; mod subscribe_error; mod subscribe_ok; -mod version; pub use announce::*; pub use announce_error::*; pub use announce_ok::*; pub use go_away::*; -pub use role::*; -pub use setup_client::*; -pub use setup_server::*; +pub use receiver::*; +pub use sender::*; pub use subscribe::*; pub use subscribe_error::*; pub use subscribe_ok::*; -pub use version::*; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; +use crate::setup; use bytes::{Buf, BufMut}; use std::fmt; @@ -87,6 +84,10 @@ macro_rules! message_types { } } +// Just so we can use the macro above. +type SetupClient = setup::Client; +type SetupServer = setup::Server; + // Each message is prefixed with the given VarInt type. message_types! { // NOTE: Object and Setup are in other modules. diff --git a/moq-transport/src/message/receiver.rs b/moq-transport/src/message/receiver.rs new file mode 100644 index 0000000..b359df7 --- /dev/null +++ b/moq-transport/src/message/receiver.rs @@ -0,0 +1,49 @@ +use crate::coding::{Decode, DecodeError}; +use crate::message::Message; + +use bytes::{Buf, BytesMut}; + +use std::io::Cursor; + +use webtransport_generic::AsyncRecvStream; + +pub struct Receiver +where + R: AsyncRecvStream, // TODO take RecvStream instead +{ + stream: R, + buf: BytesMut, // data we've read but haven't fully decoded yet +} + +impl Receiver +where + R: AsyncRecvStream, +{ + pub fn new(stream: R) -> Self { + Self { + buf: BytesMut::new(), + stream, + } + } + + // Read the next full message from the stream. + pub async fn recv(&mut self) -> anyhow::Result { + loop { + // Read the contents of the buffer + let mut peek = Cursor::new(&self.buf); + + match Message::decode(&mut peek) { + Ok(msg) => { + // We've successfully decoded a message, so we can advance the buffer. + self.buf.advance(peek.position() as usize); + return Ok(msg); + } + Err(DecodeError::UnexpectedEnd) => { + // The decode failed, so we need to append more data. + self.stream.recv(&mut self.buf).await?; + } + Err(e) => return Err(e.into()), + } + } + } +} diff --git a/moq-transport/src/message/sender.rs b/moq-transport/src/message/sender.rs new file mode 100644 index 0000000..cdee6bf --- /dev/null +++ b/moq-transport/src/message/sender.rs @@ -0,0 +1,68 @@ +use crate::coding::Encode; +use crate::message::Message; + +use bytes::BytesMut; + +use webtransport_generic::AsyncSendStream; + +pub struct Sender +where + S: AsyncSendStream, // TODO take SendStream instead +{ + stream: S, + buf: BytesMut, // reuse a buffer to encode messages. +} + +impl Sender +where + S: AsyncSendStream, +{ + pub fn new(stream: S) -> Self { + Self { + buf: BytesMut::new(), + stream, + } + } + + pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { + let msg = msg.into(); + + self.buf.clear(); + msg.encode(&mut self.buf)?; + + self.stream.send(&mut self.buf).await?; + + Ok(()) + } + + /* + // Helper that lets multiple threads send control messages. + pub fn share(self) -> ControlShared { + ControlShared { + stream: Arc::new(Mutex::new(self)), + } + } + */ +} + +/* +// Helper that allows multiple threads to send control messages. +// There's no equivalent for receiving since only one thread should be receiving at a time. +#[derive(Clone)] +pub struct SendControlShared +where + S: AsyncSendStream, +{ + stream: Arc>>, +} + +impl SendControlShared +where + S: AsyncSendStream, +{ + pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { + let mut stream = self.stream.lock().await; + stream.send(msg).await + } +} +*/ diff --git a/moq-transport/src/control/subscribe.rs b/moq-transport/src/message/subscribe.rs similarity index 100% rename from moq-transport/src/control/subscribe.rs rename to moq-transport/src/message/subscribe.rs diff --git a/moq-transport/src/control/subscribe_error.rs b/moq-transport/src/message/subscribe_error.rs similarity index 100% rename from moq-transport/src/control/subscribe_error.rs rename to moq-transport/src/message/subscribe_error.rs diff --git a/moq-transport/src/control/subscribe_ok.rs b/moq-transport/src/message/subscribe_ok.rs similarity index 100% rename from moq-transport/src/control/subscribe_ok.rs rename to moq-transport/src/message/subscribe_ok.rs diff --git a/moq-transport/src/object.rs b/moq-transport/src/object/header.rs similarity index 93% rename from moq-transport/src/object.rs rename to moq-transport/src/object/header.rs index d937a82..7e8a7ce 100644 --- a/moq-transport/src/object.rs +++ b/moq-transport/src/object/header.rs @@ -1,9 +1,8 @@ use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - use bytes::{Buf, BufMut}; #[derive(Debug)] -pub struct Object { +pub struct Header { // An ID for this track. // Proposal: https://github.com/moq-wg/moq-transport/issues/209 pub track: VarInt, @@ -18,7 +17,7 @@ pub struct Object { pub send_order: VarInt, } -impl Decode for Object { +impl Decode for Header { fn decode(r: &mut R) -> Result { let typ = VarInt::decode(r)?; if typ.into_inner() != 0 { @@ -41,7 +40,7 @@ impl Decode for Object { } } -impl Encode for Object { +impl Encode for Header { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { VarInt::from_u32(0).encode(w)?; self.track.encode(w)?; diff --git a/moq-transport/src/object/mod.rs b/moq-transport/src/object/mod.rs new file mode 100644 index 0000000..56dace8 --- /dev/null +++ b/moq-transport/src/object/mod.rs @@ -0,0 +1,7 @@ +mod header; +mod receiver; +mod sender; + +pub use header::*; +pub use receiver::*; +pub use sender::*; diff --git a/moq-transport/src/object/receiver.rs b/moq-transport/src/object/receiver.rs new file mode 100644 index 0000000..e9667c6 --- /dev/null +++ b/moq-transport/src/object/receiver.rs @@ -0,0 +1,128 @@ +use std::io::Cursor; +use std::task::{self, Poll}; + +use crate::coding::{Decode, DecodeError}; +use crate::object::Header; + +use anyhow::Context; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use tokio::task::JoinSet; + +use webtransport_generic::RecvStream as GenericRecvStream; +use webtransport_generic::{AsyncRecvStream, AsyncSession}; + +pub struct Receiver +where + S: AsyncSession, +{ + session: S, + + // Streams that we've accepted but haven't read the header from yet. + streams: JoinSet)>>, +} + +impl Receiver +where + S: AsyncSession, + S::RecvStream: AsyncRecvStream, +{ + pub fn new(session: S) -> Self { + Self { + session, + streams: JoinSet::new(), + } + } + + pub async fn recv(&mut self) -> anyhow::Result<(Header, RecvStream)> { + loop { + tokio::select! { + res = self.session.accept_uni() => { + let stream = res.context("failed to accept stream")?; + self.streams.spawn(async move { Self::read(stream).await }); + }, + res = self.streams.join_next(), if !self.streams.is_empty() => { + return res.unwrap().context("failed to run join set")?; + } + } + } + } + + async fn read(mut stream: S::RecvStream) -> anyhow::Result<(Header, RecvStream)> { + let mut buf = BytesMut::new(); + + loop { + // Read more data into the buffer. + stream.recv(&mut buf).await?; + + // Use a cursor to read the buffer and remember how much we read. + let mut read = Cursor::new(&mut buf); + + let header = match Header::decode(&mut read) { + Ok(header) => header, + Err(DecodeError::UnexpectedEnd) => continue, + Err(err) => return Err(err.into()), + }; + + // We parsed a full header, advance the buffer. + let size = read.position() as usize; + buf.advance(size); + let buf = buf.freeze(); + + // log::info!("received stream: {:?}", header); + + let stream = RecvStream::new(buf, stream); + + return Ok((header, stream)); + } + } +} + +// Unfortunately, we need to wrap RecvStream with a buffer since moq-transport::Coding only supports buffered reads. +// We first serve any data in the buffer, then we poll the stream. +// TODO fix this so we don't need the wrapper. +pub struct RecvStream +where + R: GenericRecvStream, +{ + buf: Bytes, + stream: R, +} + +impl RecvStream +where + R: GenericRecvStream, +{ + pub(crate) fn new(buf: Bytes, stream: R) -> Self { + Self { buf, stream } + } + + pub fn stop(&mut self, code: u32) { + self.stream.stop(code) + } +} + +impl GenericRecvStream for RecvStream +where + R: GenericRecvStream, +{ + type Error = R::Error; + + fn poll_recv( + &mut self, + cx: &mut task::Context<'_>, + buf: &mut B, + ) -> Poll, Self::Error>> { + if !self.buf.is_empty() { + let size = self.buf.len(); + buf.put(&mut self.buf); + let size = size - self.buf.len(); + Poll::Ready(Ok(Some(size))) + } else { + self.stream.poll_recv(cx, buf) + } + } + + fn stop(&mut self, error_code: u32) { + self.stream.stop(error_code) + } +} diff --git a/moq-transport/src/object/sender.rs b/moq-transport/src/object/sender.rs new file mode 100644 index 0000000..2937305 --- /dev/null +++ b/moq-transport/src/object/sender.rs @@ -0,0 +1,229 @@ +use std::sync::{Mutex, Weak}; +use std::task::{self, Poll}; +use std::{collections::BinaryHeap, sync::Arc}; + +use anyhow::Context; +use bytes::{Buf, BytesMut}; + +use crate::coding::Encode; +use crate::object::Header; + +use webtransport_generic::SendStream as GenericSendStream; +use webtransport_generic::{AsyncSendStream, AsyncSession}; + +// Allow this to be cloned so we can have multiple senders. +pub struct Sender +where + S: AsyncSession, + S::SendStream: AsyncSendStream, +{ + // The session. + session: S, + + // A reusable buffer for the stream header. + buf: BytesMut, + + // Register new streams with an inner object that will prioritize them. + inner: Arc>>, +} + +impl Sender +where + S: AsyncSession, + S::SendStream: AsyncSendStream, +{ + pub fn new(session: S) -> Self { + let inner = SenderInner::new(); + Self { + session, + buf: BytesMut::new(), + inner: Arc::new(Mutex::new(inner)), + } + } + + pub async fn open(&mut self, header: Header) -> anyhow::Result> { + let stream = self.session.open_uni().await.context("failed to open uni stream")?; + + let mut stream = { + let mut inner = self.inner.lock().unwrap(); + inner.register(stream, header.send_order.into_inner())? + }; + + self.buf.clear(); + header.encode(&mut self.buf).unwrap(); + stream.send_all(&mut self.buf).await.context("failed to write header")?; + + // log::info!("created stream: {:?}", header); + + header.encode(&mut self.buf).unwrap(); + stream.send_all(&mut self.buf).await.context("failed to write header")?; + + Ok(stream) + } +} + +impl Clone for Sender +where + S: AsyncSession, + S::SendStream: AsyncSendStream, +{ + fn clone(&self) -> Self { + Sender { + session: self.session.clone(), + buf: BytesMut::new(), + inner: self.inner.clone(), + } + } +} + +struct SenderInner +where + S: GenericSendStream, +{ + // Quinn supports a i32 for priority, but the wire format is a u64. + // Our work around is to keep a list of streams in priority order and use the index as the priority. + // This involves more work, so TODO either increase the Quinn size or reduce the wire size. + ordered: BinaryHeap>, + ordered_swap: BinaryHeap>, // reuse memory to avoid allocations +} + +impl SenderInner +where + S: GenericSendStream, +{ + fn new() -> Self { + Self { + ordered: BinaryHeap::new(), + ordered_swap: BinaryHeap::new(), + } + } + + pub fn register(&mut self, stream: S, order: u64) -> anyhow::Result> { + let stream = SendStream::new(stream); + let order = SendOrder::new(&stream, order); + + // Add the priority to our existing list. + self.ordered.push(order); + + // Loop through the list and update the priorities of any still active streams. + let mut index = 0; + while let Some(stream) = self.ordered.pop() { + if stream.set_priority(index).is_some() { + // Add the stream to the new list so it'll be in sorted order. + self.ordered_swap.push(stream); + index += 1; + } + } + + // Swap the lists so we can reuse the memory. + std::mem::swap(&mut self.ordered, &mut self.ordered_swap); + + Ok(stream) + } +} + +struct SendOrder +where + S: GenericSendStream, +{ + // We use Weak here so we don't prevent the stream from being closed when dereferenced. + // set_priority() will return None if the stream was closed. + stream: Weak>, + order: u64, +} + +impl SendOrder +where + S: GenericSendStream, +{ + fn new(stream: &SendStream, order: u64) -> Self { + let stream = stream.weak(); + Self { stream, order } + } + + fn set_priority(&self, index: i32) -> Option<()> { + let stream = self.stream.upgrade()?; + let mut stream = stream.lock().unwrap(); + stream.set_priority(index); + Some(()) + } +} + +impl PartialEq for SendOrder +where + S: GenericSendStream, +{ + fn eq(&self, other: &Self) -> bool { + self.order == other.order + } +} + +impl Eq for SendOrder where S: GenericSendStream {} + +impl PartialOrd for SendOrder +where + S: GenericSendStream, +{ + fn partial_cmp(&self, other: &Self) -> Option { + // We reverse the order so the lower send order is higher priority. + other.order.partial_cmp(&self.order) + } +} + +impl Ord for SendOrder +where + S: GenericSendStream, +{ + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // We reverse the order so the lower send order is higher priority. + other.order.cmp(&self.order) + } +} + +// Ugh, so we need to wrap SendStream with a mutex because we need to be able to call set_priority on it. +// The problem is that set_priority takes a i32, while send_order is a VarInt +// So the solution is to maintain a priority queue of active streams and constantly update the priority with their index. +// So the library might update the priority of the stream at any point, while the application might similtaniously write to it. +pub struct SendStream +where + S: GenericSendStream, +{ + // All SendStream methods are &mut, so we need to wrap them with an internal mutex. + inner: Arc>, +} + +impl SendStream +where + S: GenericSendStream, +{ + pub(crate) fn new(stream: S) -> Self { + Self { + inner: Arc::new(Mutex::new(stream)), + } + } + + pub fn weak(&self) -> Weak> { + Arc::>::downgrade(&self.inner) + } +} + +impl GenericSendStream for SendStream +where + S: GenericSendStream, +{ + type Error = S::Error; + + fn poll_send(&mut self, cx: &mut task::Context<'_>, buf: &mut B) -> Poll> { + self.inner.lock().unwrap().poll_send(cx, buf) + } + + fn reset(&mut self, reset_code: u32) { + self.inner.lock().unwrap().reset(reset_code) + } + + // The application should NOT use this method. + // The library will automatically set the stream priority on creation based on the header. + fn set_priority(&mut self, order: i32) { + self.inner.lock().unwrap().set_priority(order) + } +} diff --git a/moq-transport/src/object/stream.rs b/moq-transport/src/object/stream.rs new file mode 100644 index 0000000..911ebba --- /dev/null +++ b/moq-transport/src/object/stream.rs @@ -0,0 +1,103 @@ +use std::{ + sync::{Arc, Mutex, Weak}, + task::{Context, Poll}, +}; + +use bytes::{Buf, BufMut, Bytes}; + +use webtransport_generic::RecvStream as GenericRecvStream; +use webtransport_generic::SendStream as GenericSendStream; + +// Ugh, so we need to wrap SendStream with a mutex because we need to be able to call set_priority on it. +// The problem is that set_priority takes a i32, while send_order is a VarInt +// So the solution is to maintain a priority queue of active streams and constantly update the priority with their index. +// So the library might update the priority of the stream at any point, while the application might similtaniously write to it. +pub struct SendStream +where + S: GenericSendStream, +{ + // All SendStream methods are &mut, so we need to wrap them with an internal mutex. + inner: Arc>, +} + +impl SendStream +where + S: GenericSendStream, +{ + pub(crate) fn new(stream: S) -> Self { + Self { + inner: Arc::new(Mutex::new(stream)), + } + } + + pub fn weak(&self) -> Weak> { + Arc::>::downgrade(&self.inner) + } +} + +impl GenericSendStream for SendStream +where + S: GenericSendStream, +{ + type Error = S::Error; + + fn poll_send(&mut self, cx: &mut Context<'_>, buf: &mut B) -> Poll> { + self.inner.lock().unwrap().poll_send(cx, buf) + } + + fn reset(&mut self, reset_code: u32) { + self.inner.lock().unwrap().reset(reset_code) + } + + // The application should NOT use this method. + // The library will automatically set the stream priority on creation based on the header. + fn set_priority(&mut self, order: i32) { + self.inner.lock().unwrap().set_priority(order) + } +} + +// Unfortunately, we need to wrap RecvStream with a buffer since moq-transport::Coding only supports buffered reads. +// We first serve any data in the buffer, then we poll the stream. +// TODO fix this so we don't need the wrapper. +pub struct RecvStream +where + R: GenericRecvStream, +{ + buf: Bytes, + stream: R, +} + +impl RecvStream +where + R: GenericRecvStream, +{ + pub(crate) fn new(buf: Bytes, stream: R) -> Self { + Self { buf, stream } + } + + pub fn stop(&mut self, code: u32) { + self.stream.stop(code) + } +} + +impl GenericRecvStream for RecvStream +where + R: GenericRecvStream, +{ + type Error = R::Error; + + fn poll_recv(&mut self, cx: &mut Context<'_>, buf: &mut B) -> Poll, Self::Error>> { + if !self.buf.is_empty() { + let size = self.buf.len(); + buf.put(&mut self.buf); + let size = size - self.buf.len(); + Poll::Ready(Ok(Some(size))) + } else { + self.stream.poll_recv(cx, buf) + } + } + + fn stop(&mut self, error_code: u32) { + self.stream.stop(error_code) + } +} diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session/mod.rs new file mode 100644 index 0000000..3ce9732 --- /dev/null +++ b/moq-transport/src/session/mod.rs @@ -0,0 +1,109 @@ +use anyhow::Context; + +use crate::{message, object, setup}; +use webtransport_generic::{AsyncRecvStream, AsyncSendStream, AsyncSession}; + +pub struct Session +where + S: AsyncSession, + S::SendStream: AsyncSendStream, + S::RecvStream: AsyncRecvStream, +{ + pub send_control: message::Sender, + pub recv_control: message::Receiver, + pub send_objects: object::Sender, + pub recv_objects: object::Receiver, +} + +impl Session +where + S: AsyncSession, + S::SendStream: AsyncSendStream, + S::RecvStream: AsyncRecvStream, +{ + /// Called by a server with an established WebTransport session. + // TODO close the session with an error code + pub async fn accept(session: S, role: setup::Role) -> anyhow::Result { + let (send, recv) = session.accept_bi().await.context("failed to accept bidi stream")?; + + let mut send_control = message::Sender::new(send); + let mut recv_control = message::Receiver::new(recv); + + let setup_client = match recv_control.recv().await.context("failed to read SETUP")? { + message::Message::SetupClient(setup) => setup, + _ => anyhow::bail!("expected CLIENT SETUP"), + }; + + setup_client + .versions + .iter() + .find(|version| **version == setup::Version::DRAFT_00) + .context("no supported versions")?; + + if !setup_client.role.compatible(role) { + anyhow::bail!("incompatible roles: {:?} {:?}", setup_client.role, role); + } + + let setup_server = setup::Server { + role, + version: setup::Version::DRAFT_00, + }; + + send_control + .send(message::Message::SetupServer(setup_server)) + .await + .context("failed to send setup server")?; + + let send_objects = object::Sender::new(session.clone()); + let recv_objects = object::Receiver::new(session.clone()); + + Ok(Session { + send_control, + recv_control, + send_objects, + recv_objects, + }) + } + + /// Called by a client with an established WebTransport session. + pub async fn connect(session: S, role: setup::Role) -> anyhow::Result { + let (send, recv) = session.open_bi().await.context("failed to oen bidi stream")?; + + let mut send_control = message::Sender::new(send); + let mut recv_control = message::Receiver::new(recv); + + let setup_client = setup::Client { + role, + versions: vec![setup::Version::DRAFT_00].into(), + path: "".to_string(), + }; + + send_control + .send(message::Message::SetupClient(setup_client)) + .await + .context("failed to send SETUP CLIENT")?; + + let setup_server = match recv_control.recv().await.context("failed to read SETUP")? { + message::Message::SetupServer(setup) => setup, + _ => anyhow::bail!("expected SERVER SETUP"), + }; + + if setup_server.version != setup::Version::DRAFT_00 { + anyhow::bail!("unsupported version: {:?}", setup_server.version); + } + + if !setup_server.role.compatible(role) { + anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role); + } + + let send_objects = object::Sender::new(session.clone()); + let recv_objects = object::Receiver::new(session.clone()); + + Ok(Session { + send_control, + recv_control, + send_objects, + recv_objects, + }) + } +} diff --git a/moq-transport/src/control/setup_client.rs b/moq-transport/src/setup/client.rs similarity index 91% rename from moq-transport/src/control/setup_client.rs rename to moq-transport/src/setup/client.rs index a2f3126..f284911 100644 --- a/moq-transport/src/control/setup_client.rs +++ b/moq-transport/src/setup/client.rs @@ -5,7 +5,7 @@ use bytes::{Buf, BufMut}; // Sent by the client to setup up the session. #[derive(Debug)] -pub struct SetupClient { +pub struct Client { // NOTE: This is not a message type, but rather the control stream header. // Proposal: https://github.com/moq-wg/moq-transport/issues/138 @@ -20,7 +20,7 @@ pub struct SetupClient { pub path: String, } -impl Decode for SetupClient { +impl Decode for Client { fn decode(r: &mut R) -> Result { let versions = Versions::decode(r)?; let role = Role::decode(r)?; @@ -30,7 +30,7 @@ impl Decode for SetupClient { } } -impl Encode for SetupClient { +impl Encode for Client { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.versions.encode(w)?; self.role.encode(w)?; diff --git a/moq-transport/src/setup/mod.rs b/moq-transport/src/setup/mod.rs new file mode 100644 index 0000000..3fc9ab7 --- /dev/null +++ b/moq-transport/src/setup/mod.rs @@ -0,0 +1,9 @@ +mod client; +mod role; +mod server; +mod version; + +pub use client::*; +pub use role::*; +pub use server::*; +pub use version::*; diff --git a/moq-transport/src/control/role.rs b/moq-transport/src/setup/role.rs similarity index 100% rename from moq-transport/src/control/role.rs rename to moq-transport/src/setup/role.rs diff --git a/moq-transport/src/control/setup_server.rs b/moq-transport/src/setup/server.rs similarity index 90% rename from moq-transport/src/control/setup_server.rs rename to moq-transport/src/setup/server.rs index c1a6092..1fba799 100644 --- a/moq-transport/src/control/setup_server.rs +++ b/moq-transport/src/setup/server.rs @@ -7,7 +7,7 @@ use bytes::{Buf, BufMut}; // NOTE: This is not a message type, but rather the control stream header. // Proposal: https://github.com/moq-wg/moq-transport/issues/138 #[derive(Debug)] -pub struct SetupServer { +pub struct Server { // The list of supported versions in preferred order. pub version: Version, @@ -16,7 +16,7 @@ pub struct SetupServer { pub role: Role, } -impl Decode for SetupServer { +impl Decode for Server { fn decode(r: &mut R) -> Result { let version = Version::decode(r)?; let role = Role::decode(r)?; @@ -25,7 +25,7 @@ impl Decode for SetupServer { } } -impl Encode for SetupServer { +impl Encode for Server { fn encode(&self, w: &mut W) -> Result<(), EncodeError> { self.version.encode(w)?; self.role.encode(w)?; diff --git a/moq-transport/src/control/version.rs b/moq-transport/src/setup/version.rs similarity index 100% rename from moq-transport/src/control/version.rs rename to moq-transport/src/setup/version.rs diff --git a/moq-warp/Cargo.toml b/moq-warp/Cargo.toml index d7cf872..ce11f6b 100644 --- a/moq-warp/Cargo.toml +++ b/moq-warp/Cargo.toml @@ -16,7 +16,7 @@ categories = ["multimedia", "network-programming", "web-programming"] [dependencies] moq-transport = { path = "../moq-transport" } -moq-transport-quinn = { path = "../moq-transport-quinn" } +webtransport-generic = { path = "../../webtransport-rs/webtransport-generic", version = "0.3" } tokio = "1.27" anyhow = "1.0.70" diff --git a/moq-warp/src/model/mod.rs b/moq-warp/src/model/mod.rs index 610d7b9..55e5425 100644 --- a/moq-warp/src/model/mod.rs +++ b/moq-warp/src/model/mod.rs @@ -2,4 +2,4 @@ pub mod broadcast; pub mod fragment; pub mod segment; pub mod track; -pub(crate) mod watch; +pub mod watch; diff --git a/moq-warp/src/relay/broker.rs b/moq-warp/src/relay/broker.rs index bbcbc53..912c1e9 100644 --- a/moq-warp/src/relay/broker.rs +++ b/moq-warp/src/relay/broker.rs @@ -7,20 +7,20 @@ use std::sync::{Arc, Mutex}; use anyhow::Context; #[derive(Clone, Default)] -pub struct Broadcasts { +pub struct Broker { // Operate on the inner struct so we can share/clone the outer struct. - inner: Arc>, + inner: Arc>, } #[derive(Default)] -struct BroadcastsInner { +struct BrokerInner { // TODO Automatically reclaim dropped sources. lookup: HashMap>, - updates: watch::Publisher, + updates: watch::Publisher, } #[derive(Clone)] -pub enum Update { +pub enum BrokerUpdate { // Broadcast was announced Insert(String), // TODO include source? @@ -28,13 +28,13 @@ pub enum Update { Remove(String, broadcast::Error), } -impl Broadcasts { +impl Broker { pub fn new() -> Self { Default::default() } // Return the list of available broadcasts, and a subscriber that will return updates (add/remove). - pub fn available(&self) -> (Vec, watch::Subscriber) { + pub fn available(&self) -> (Vec, watch::Subscriber) { // Grab the lock. let this = self.inner.lock().unwrap(); @@ -55,7 +55,7 @@ impl Broadcasts { } this.lookup.insert(namespace.to_string(), source); - this.updates.push(Update::Insert(namespace.to_string())); + this.updates.push(BrokerUpdate::Insert(namespace.to_string())); Ok(()) } @@ -64,7 +64,7 @@ impl Broadcasts { let mut this = self.inner.lock().unwrap(); this.lookup.remove(namespace).context("namespace was not published")?; - this.updates.push(Update::Remove(namespace.to_string(), error)); + this.updates.push(BrokerUpdate::Remove(namespace.to_string(), error)); Ok(()) } diff --git a/moq-warp/src/relay/contribute.rs b/moq-warp/src/relay/contribute.rs index c443723..d18041e 100644 --- a/moq-warp/src/relay/contribute.rs +++ b/moq-warp/src/relay/contribute.rs @@ -2,30 +2,36 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time; -use tokio::io::AsyncReadExt; use tokio::sync::mpsc; use tokio::task::JoinSet; // lock across await boundaries -use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; -use moq_transport_quinn::{RecvObjects, RecvStream}; +use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk}; +use moq_transport::{object, VarInt}; +use webtransport_generic::{AsyncRecvStream, AsyncSendStream, AsyncSession}; use bytes::BytesMut; use anyhow::Context; -use super::{broker, control}; use crate::model::{broadcast, segment, track}; +use crate::relay::{ + message::{Component, Contribute}, + Broker, +}; // TODO experiment with making this Clone, so every task can have its own copy. -pub struct Session { +pub struct Session +where + S: AsyncSession, +{ // Used to receive objects. - objects: RecvObjects, + objects: object::Receiver, // Used to send and receive control messages. - control: control::Component, + control: Component, // Globally announced namespaces, which we can add ourselves to. - broker: broker::Broadcasts, + broker: Broker, // The names of active broadcasts being produced. broadcasts: HashMap>, @@ -37,12 +43,13 @@ pub struct Session { run_segments: JoinSet>, // receiving objects } -impl Session { - pub fn new( - objects: RecvObjects, - control: control::Component, - broker: broker::Broadcasts, - ) -> Self { +impl Session +where + S: AsyncSession, + S::SendStream: AsyncSendStream, + S::RecvStream: AsyncRecvStream, +{ + pub fn new(objects: object::Receiver, control: Component, broker: Broker) -> Self { Self { objects, control, @@ -81,23 +88,27 @@ impl Session { } } - async fn receive_message(&mut self, msg: control::Contribute) -> anyhow::Result<()> { + async fn receive_message(&mut self, msg: Contribute) -> anyhow::Result<()> { match msg { - control::Contribute::Announce(msg) => self.receive_announce(msg).await, - control::Contribute::SubscribeOk(msg) => self.receive_subscribe_ok(msg), - control::Contribute::SubscribeError(msg) => self.receive_subscribe_error(msg), + Contribute::Announce(msg) => self.receive_announce(msg).await, + Contribute::SubscribeOk(msg) => self.receive_subscribe_ok(msg), + Contribute::SubscribeError(msg) => self.receive_subscribe_error(msg), } } - async fn receive_object(&mut self, object: Object, stream: RecvStream) -> anyhow::Result<()> { - let track = object.track; + async fn receive_object( + &mut self, + header: object::Header, + stream: object::RecvStream, + ) -> anyhow::Result<()> { + let track = header.track; // Keep objects in memory for 10s let expires = time::Instant::now() + time::Duration::from_secs(10); let segment = segment::Info { - sequence: object.sequence, - send_order: object.send_order, + sequence: header.sequence, + send_order: header.send_order, expires: Some(expires), }; @@ -115,19 +126,19 @@ impl Session { Ok(()) } - async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> { + async fn run_segment( + mut segment: segment::Publisher, + mut stream: object::RecvStream, + ) -> anyhow::Result<()> { let mut buf = BytesMut::new(); - loop { - let size = stream.read_buf(&mut buf).await?; - if size == 0 { - return Ok(()); - } - + while stream.recv(&mut buf).await?.is_some() { // Split off the data we read into the buffer, freezing it so multiple threads can read simitaniously. let data = buf.split().freeze(); segment.fragments.push(data); } + + Ok(()) } async fn receive_announce(&mut self, msg: Announce) -> anyhow::Result<()> { @@ -180,7 +191,10 @@ impl Session { } } -impl Drop for Session { +impl Drop for Session +where + S: AsyncSession, +{ fn drop(&mut self) { // Unannounce all broadcasts we have announced. // TODO make this automatic so we can't screw up? diff --git a/moq-warp/src/relay/distribute.rs b/moq-warp/src/relay/distribute.rs index 33fa6f4..e43bd34 100644 --- a/moq-warp/src/relay/distribute.rs +++ b/moq-warp/src/relay/distribute.rs @@ -1,33 +1,42 @@ use anyhow::Context; -use tokio::{io::AsyncWriteExt, task::JoinSet}; // allows locking across await +use tokio::task::JoinSet; // allows locking across await -use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; -use moq_transport_quinn::SendObjects; +use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk}; +use moq_transport::{object, VarInt}; +use webtransport_generic::{AsyncRecvStream, AsyncSendStream, AsyncSession}; -use super::{broker, control}; use crate::model::{segment, track}; +use crate::relay::{ + message::{Component, Distribute}, + Broker, BrokerUpdate, +}; -pub struct Session { +pub struct Session +where + S: AsyncSession, + S::SendStream: AsyncSendStream, +{ // Objects are sent to the client - objects: SendObjects, + objects: object::Sender, // Used to send and receive control messages. - control: control::Component, + control: Component, // Globally announced namespaces, which can be subscribed to. - broker: broker::Broadcasts, + broker: Broker, // A list of tasks that are currently running. run_subscribes: JoinSet, // run subscriptions, sending the returned error if they fail } -impl Session { - pub fn new( - objects: SendObjects, - control: control::Component, - broker: broker::Broadcasts, - ) -> Self { +impl Session +where + S: AsyncSession, + S::SendStream: AsyncSendStream, + S::RecvStream: AsyncRecvStream, +{ + pub fn new(objects: object::Sender, control: Component, broker: Broker) -> Self { Self { objects, control, @@ -40,7 +49,7 @@ impl Session { // Announce all available tracks and get a stream of updates. let (available, mut updates) = self.broker.available(); for namespace in available { - self.on_available(broker::Update::Insert(namespace)).await?; + self.on_available(BrokerUpdate::Insert(namespace)).await?; } loop { @@ -61,11 +70,11 @@ impl Session { } } - async fn receive_message(&mut self, msg: control::Distribute) -> anyhow::Result<()> { + async fn receive_message(&mut self, msg: Distribute) -> anyhow::Result<()> { match msg { - control::Distribute::AnnounceOk(msg) => self.receive_announce_ok(msg), - control::Distribute::AnnounceError(msg) => self.receive_announce_error(msg), - control::Distribute::Subscribe(msg) => self.receive_subscribe(msg).await, + Distribute::AnnounceOk(msg) => self.receive_announce_ok(msg), + Distribute::AnnounceError(msg) => self.receive_announce_error(msg), + Distribute::Subscribe(msg) => self.receive_subscribe(msg).await, } } @@ -119,7 +128,11 @@ impl Session { Ok(()) } - async fn run_subscribe(objects: SendObjects, track_id: VarInt, mut track: track::Subscriber) -> SubscribeError { + async fn run_subscribe( + objects: object::Sender, + track_id: VarInt, + mut track: track::Subscriber, + ) -> SubscribeError { let mut tasks = JoinSet::new(); let mut result = None; @@ -154,11 +167,11 @@ impl Session { } async fn serve_group( - mut objects: SendObjects, + mut objects: object::Sender, track_id: VarInt, mut segment: segment::Subscriber, ) -> anyhow::Result<()> { - let object = Object { + let object = object::Header { track: track_id, group: segment.sequence, sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object @@ -169,7 +182,7 @@ impl Session { // Write each fragment as they are available. while let Some(mut fragment) = segment.fragments.next().await { - stream.write_all_buf(&mut fragment).await?; + stream.send_all(&mut fragment).await?; } // NOTE: stream is automatically closed when dropped @@ -177,16 +190,16 @@ impl Session { Ok(()) } - async fn on_available(&mut self, delta: broker::Update) -> anyhow::Result<()> { + async fn on_available(&mut self, delta: BrokerUpdate) -> anyhow::Result<()> { match delta { - broker::Update::Insert(name) => { + BrokerUpdate::Insert(name) => { self.control .send(Announce { track_namespace: name.clone(), }) .await } - broker::Update::Remove(name, error) => { + BrokerUpdate::Remove(name, error) => { self.control .send(AnnounceError { track_namespace: name, diff --git a/moq-warp/src/relay/control.rs b/moq-warp/src/relay/message.rs similarity index 77% rename from moq-warp/src/relay/control.rs rename to moq-warp/src/relay/message.rs index c53a359..202f49e 100644 --- a/moq-warp/src/relay/control.rs +++ b/moq-warp/src/relay/message.rs @@ -1,11 +1,18 @@ use tokio::sync::mpsc; -use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk}; -use moq_transport_quinn::{RecvControl, SendControl}; +use moq_transport::message::{ + self, Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk, +}; +use webtransport_generic::{AsyncRecvStream, AsyncSendStream, AsyncSession}; -pub struct Main { - send_control: SendControl, - recv_control: RecvControl, +pub struct Main +where + S: AsyncSession, + S::SendStream: AsyncSendStream, + S::RecvStream: AsyncRecvStream, +{ + send_control: message::Sender, + recv_control: message::Receiver, outgoing: mpsc::Receiver, @@ -13,7 +20,12 @@ pub struct Main { distribute: mpsc::Sender, } -impl Main { +impl Main +where + S: AsyncSession, + S::SendStream: AsyncSendStream, + S::RecvStream: AsyncRecvStream, +{ pub async fn run(mut self) -> anyhow::Result<()> { loop { tokio::select! { @@ -53,10 +65,15 @@ impl Component { } // Splits a control stream into two components, based on if it's a message for contribution or distribution. -pub fn split( - send_control: SendControl, - recv_control: RecvControl, -) -> (Main, Component, Component) { +pub fn split( + send_control: message::Sender, + recv_control: message::Receiver, +) -> (Main, Component, Component) +where + S: AsyncSession, + S::SendStream: AsyncSendStream, + S::RecvStream: AsyncRecvStream, +{ let (outgoing_tx, outgoing_rx) = mpsc::channel(1); let (contribute_tx, contribute_rx) = mpsc::channel(1); let (distribute_tx, distribute_rx) = mpsc::channel(1); diff --git a/moq-warp/src/relay/mod.rs b/moq-warp/src/relay/mod.rs index 109e3a5..6dcc217 100644 --- a/moq-warp/src/relay/mod.rs +++ b/moq-warp/src/relay/mod.rs @@ -1,8 +1,8 @@ -pub mod broker; - +mod broker; mod contribute; -mod control; mod distribute; +mod message; mod session; +pub use broker::*; pub use session::*; diff --git a/moq-warp/src/relay/session.rs b/moq-warp/src/relay/session.rs index 392ef96..0eea68c 100644 --- a/moq-warp/src/relay/session.rs +++ b/moq-warp/src/relay/session.rs @@ -1,17 +1,29 @@ -use super::{broker, contribute, control, distribute}; +use crate::relay::{contribute, distribute, message, Broker}; -pub struct Session { +use webtransport_generic::{AsyncRecvStream, AsyncSendStream, AsyncSession}; + +pub struct Session +where + S: AsyncSession, + S::SendStream: AsyncSendStream, + S::RecvStream: AsyncRecvStream, +{ // Split logic into contribution/distribution to reduce the problem space. - contribute: contribute::Session, - distribute: distribute::Session, + contribute: contribute::Session, + distribute: distribute::Session, // Used to receive control messages and forward to contribute/distribute. - control: control::Main, + control: message::Main, } -impl Session { - pub fn new(session: moq_transport_quinn::Session, broker: broker::Broadcasts) -> Session { - let (control, contribute, distribute) = control::split(session.send_control, session.recv_control); +impl Session +where + S: AsyncSession, + S::SendStream: AsyncSendStream, + S::RecvStream: AsyncRecvStream, +{ + pub fn new(session: moq_transport::Session, broker: Broker) -> Self { + let (control, contribute, distribute) = message::split(session.send_control, session.recv_control); let contribute = contribute::Session::new(session.recv_objects, contribute, broker.clone()); let distribute = distribute::Session::new(session.send_objects, distribute, broker);