From c5d8873e4e85bd63856fa2cde22eb4a62ea0ff09 Mon Sep 17 00:00:00 2001 From: kixelated Date: Tue, 15 Aug 2023 10:20:03 -0700 Subject: [PATCH] Webtransport generic (#51) Switched to the webtransport-generic crate so quinn or quiche (with adapter) can be used. This also involved switching out the decoder/encoder since it meant a wrapper was required. --- Cargo.lock | 35 ++-- Cargo.toml | 7 +- {moq-demo => moq-quinn}/Cargo.toml | 6 +- {moq-demo => moq-quinn}/src/main.rs | 0 {moq-demo => moq-quinn}/src/server.rs | 13 +- moq-transport-quinn/Cargo.toml | 26 --- moq-transport-quinn/src/control.rs | 94 ----------- moq-transport-quinn/src/lib.rs | 9 -- moq-transport-quinn/src/object.rs | 150 ------------------ moq-transport-quinn/src/session.rs | 98 ------------ moq-transport-quinn/src/stream.rs | 115 -------------- moq-transport/Cargo.toml | 7 +- moq-transport/src/coding/decode.rs | 61 +------ moq-transport/src/coding/duration.rs | 20 --- moq-transport/src/coding/encode.rs | 89 +---------- moq-transport/src/coding/mod.rs | 4 +- moq-transport/src/coding/string.rs | 22 +++ moq-transport/src/coding/varint.rs | 52 +++--- moq-transport/src/control/announce.rs | 23 --- moq-transport/src/control/announce_error.rs | 40 ----- moq-transport/src/control/announce_ok.rs | 23 --- moq-transport/src/control/go_away.rs | 21 --- moq-transport/src/control/subscribe.rs | 40 ----- moq-transport/src/control/subscribe_error.rs | 37 ----- moq-transport/src/control/subscribe_ok.rs | 36 ----- moq-transport/src/lib.rs | 12 +- moq-transport/src/message/announce.rs | 21 +++ moq-transport/src/message/announce_error.rs | 38 +++++ moq-transport/src/message/announce_ok.rs | 21 +++ moq-transport/src/message/go_away.rs | 19 +++ moq-transport/src/{control => message}/mod.rs | 40 +++-- moq-transport/src/message/receiver.rs | 19 +++ moq-transport/src/message/sender.rs | 21 +++ moq-transport/src/message/subscribe.rs | 40 +++++ moq-transport/src/message/subscribe_error.rs | 37 +++++ moq-transport/src/message/subscribe_ok.rs | 34 ++++ moq-transport/src/object.rs | 54 ------- moq-transport/src/object/header.rs | 54 +++++++ moq-transport/src/object/mod.rs | 7 + moq-transport/src/object/receiver.rs | 42 +++++ moq-transport/src/object/sender.rs | 29 ++++ moq-transport/src/session/mod.rs | 99 ++++++++++++ .../setup_client.rs => setup/client.rs} | 26 ++- moq-transport/src/setup/mod.rs | 9 ++ moq-transport/src/{control => setup}/role.rs | 16 +- .../setup_server.rs => setup/server.rs} | 22 ++- .../src/{control => setup}/version.rs | 33 ++-- moq-warp/Cargo.toml | 8 +- moq-warp/src/model/mod.rs | 2 +- moq-warp/src/model/segment.rs | 2 +- moq-warp/src/relay/broker.rs | 18 +-- moq-warp/src/relay/contribute.rs | 55 +++---- moq-warp/src/relay/distribute.rs | 59 +++---- moq-warp/src/relay/{control.rs => message.rs} | 22 +-- moq-warp/src/relay/mod.rs | 6 +- moq-warp/src/relay/session.rs | 18 ++- 56 files changed, 726 insertions(+), 1185 deletions(-) rename {moq-demo => moq-quinn}/Cargo.toml (89%) rename {moq-demo => moq-quinn}/src/main.rs (100%) rename {moq-demo => moq-quinn}/src/server.rs (90%) delete mode 100644 moq-transport-quinn/Cargo.toml delete mode 100644 moq-transport-quinn/src/control.rs delete mode 100644 moq-transport-quinn/src/lib.rs delete mode 100644 moq-transport-quinn/src/object.rs delete mode 100644 moq-transport-quinn/src/session.rs delete mode 100644 moq-transport-quinn/src/stream.rs delete mode 100644 moq-transport/src/coding/duration.rs create mode 100644 moq-transport/src/coding/string.rs delete mode 100644 moq-transport/src/control/announce.rs delete mode 100644 moq-transport/src/control/announce_error.rs delete mode 100644 moq-transport/src/control/announce_ok.rs delete mode 100644 moq-transport/src/control/go_away.rs delete mode 100644 moq-transport/src/control/subscribe.rs delete mode 100644 moq-transport/src/control/subscribe_error.rs delete mode 100644 moq-transport/src/control/subscribe_ok.rs create mode 100644 moq-transport/src/message/announce.rs create mode 100644 moq-transport/src/message/announce_error.rs create mode 100644 moq-transport/src/message/announce_ok.rs create mode 100644 moq-transport/src/message/go_away.rs rename moq-transport/src/{control => message}/mod.rs (75%) create mode 100644 moq-transport/src/message/receiver.rs create mode 100644 moq-transport/src/message/sender.rs create mode 100644 moq-transport/src/message/subscribe.rs create mode 100644 moq-transport/src/message/subscribe_error.rs create mode 100644 moq-transport/src/message/subscribe_ok.rs delete mode 100644 moq-transport/src/object.rs create mode 100644 moq-transport/src/object/header.rs create mode 100644 moq-transport/src/object/mod.rs create mode 100644 moq-transport/src/object/receiver.rs create mode 100644 moq-transport/src/object/sender.rs create mode 100644 moq-transport/src/session/mod.rs rename moq-transport/src/{control/setup_client.rs => setup/client.rs} (51%) create mode 100644 moq-transport/src/setup/mod.rs rename moq-transport/src/{control => setup}/role.rs (73%) rename moq-transport/src/{control/setup_server.rs => setup/server.rs} (53%) rename moq-transport/src/{control => setup}/version.rs (56%) rename moq-warp/src/relay/{control.rs => message.rs} (84%) diff --git a/Cargo.lock b/Cargo.lock index 79f3b45..339c1fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -919,7 +919,7 @@ dependencies = [ ] [[package]] -name = "moq-demo" +name = "moq-quinn" version = "0.1.0" dependencies = [ "anyhow", @@ -928,7 +928,6 @@ dependencies = [ "hex", "log", "moq-transport", - "moq-transport-quinn", "moq-warp", "quinn", "ring", @@ -936,30 +935,19 @@ dependencies = [ "rustls-pemfile", "tokio", "warp", + "webtransport-generic", "webtransport-quinn", ] [[package]] name = "moq-transport" version = "0.1.0" -dependencies = [ - "bytes", - "thiserror", -] - -[[package]] -name = "moq-transport-quinn" -version = "0.1.0" dependencies = [ "anyhow", "bytes", - "http", - "log", - "moq-transport", - "quinn", "thiserror", "tokio", - "webtransport-quinn", + "webtransport-generic", ] [[package]] @@ -970,12 +958,8 @@ dependencies = [ "bytes", "log", "moq-transport", - "moq-transport-quinn", - "quinn", - "ring", - "rustls 0.21.2", - "rustls-pemfile", "tokio", + "webtransport-generic", ] [[package]] @@ -1912,13 +1896,12 @@ dependencies = [ [[package]] name = "webtransport-generic" -version = "0.3.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ba4583e96bb0ef08142f868bf0d28f90211eced56a473768ee27446864a2310" +checksum = "df712317d761312996f654739debeb3838eb02c6fd9146d9efdfd08a46674e45" dependencies = [ "bytes", - "log", - "thiserror", + "tokio", ] [[package]] @@ -1934,9 +1917,9 @@ dependencies = [ [[package]] name = "webtransport-quinn" -version = "0.4.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96a4ac9975117d8c63c4d04577d594b3130fe2023b7363ebc613905acf98590a" +checksum = "b558ddb09b77347cca94bf2fd726d72c3753b60875eb3d2b7388adc12b9b4a1f" dependencies = [ "async-std", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 20c5996..5407a66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,2 @@ [workspace] -members = [ - "moq-transport", - "moq-transport-quinn", - "moq-demo", - "moq-warp", -] +members = ["moq-transport", "moq-quinn", "moq-warp"] diff --git a/moq-demo/Cargo.toml b/moq-quinn/Cargo.toml similarity index 89% rename from moq-demo/Cargo.toml rename to moq-quinn/Cargo.toml index fbffbb4..815adbb 100644 --- a/moq-demo/Cargo.toml +++ b/moq-quinn/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "moq-demo" +name = "moq-quinn" description = "Media over QUIC" authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" @@ -16,12 +16,12 @@ categories = ["multimedia", "network-programming", "web-programming"] [dependencies] moq-transport = { path = "../moq-transport" } -moq-transport-quinn = { path = "../moq-transport-quinn" } moq-warp = { path = "../moq-warp" } # QUIC quinn = "0.10" -webtransport-quinn = "0.4.2" +webtransport-generic = "0.5" +webtransport-quinn = "0.5" # Crypto ring = "0.16.20" diff --git a/moq-demo/src/main.rs b/moq-quinn/src/main.rs similarity index 100% rename from moq-demo/src/main.rs rename to moq-quinn/src/main.rs diff --git a/moq-demo/src/server.rs b/moq-quinn/src/server.rs similarity index 90% rename from moq-demo/src/server.rs rename to moq-quinn/src/server.rs index 178f3ea..59c227d 100644 --- a/moq-demo/src/server.rs +++ b/moq-quinn/src/server.rs @@ -1,16 +1,15 @@ -use moq_warp::relay::broker; - use std::{fs, io, net, path, sync, time}; use anyhow::Context; +use moq_warp::relay; use tokio::task::JoinSet; pub struct Server { server: quinn::Endpoint, // The media sources. - broker: broker::Broadcasts, + broker: relay::Broker, // The active connections. conns: JoinSet>, @@ -62,7 +61,7 @@ impl Server { server_config.transport = sync::Arc::new(transport_config); let server = quinn::Endpoint::server(server_config, config.addr)?; - let broker = broker::Broadcasts::new(); + let broker = relay::Broker::new(); let conns = JoinSet::new(); @@ -88,7 +87,7 @@ impl Server { } } - async fn handle(conn: quinn::Connecting, broker: broker::Broadcasts) -> anyhow::Result<()> { + async fn handle(conn: quinn::Connecting, broker: relay::Broker) -> anyhow::Result<()> { // Wait for the QUIC connection to be established. let conn = conn.await.context("failed to establish QUIC connection")?; @@ -106,12 +105,12 @@ impl Server { .context("failed to respond to WebTransport request")?; // Perform the MoQ handshake. - let session = moq_transport_quinn::accept(session, moq_transport::Role::Both) + let session = moq_transport::Session::accept(session, moq_transport::setup::Role::Both) .await .context("failed to perform MoQ handshake")?; // Run the relay code. - let session = moq_warp::relay::Session::new(session, broker); + let session = relay::Session::new(session, broker); session.run().await } } diff --git a/moq-transport-quinn/Cargo.toml b/moq-transport-quinn/Cargo.toml deleted file mode 100644 index dc40d99..0000000 --- a/moq-transport-quinn/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "moq-transport-quinn" -description = "Media over QUIC" -authors = ["Luke Curley"] -repository = "https://github.com/kixelated/moq-rs" -license = "MIT OR Apache-2.0" - -version = "0.1.0" -edition = "2021" - -keywords = ["quic", "http3", "webtransport", "media", "live"] -categories = ["multimedia", "network-programming", "web-programming"] - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -moq-transport = { path = "../moq-transport" } - -quinn = "0.10" -http = "0.2" -webtransport-quinn = "0.4.2" -tokio = { version = "1.27", features = ["macros", "io-util"] } -bytes = "1" -log = "0.4" -anyhow = "1.0.70" -thiserror = "1.0.21" diff --git a/moq-transport-quinn/src/control.rs b/moq-transport-quinn/src/control.rs deleted file mode 100644 index b70578f..0000000 --- a/moq-transport-quinn/src/control.rs +++ /dev/null @@ -1,94 +0,0 @@ -use moq_transport::{Decode, DecodeError, Encode, Message}; - -use bytes::{Buf, BytesMut}; - -use std::io::Cursor; -use std::sync::Arc; -use tokio::{io::AsyncReadExt, sync::Mutex}; - -use webtransport_quinn::{RecvStream, SendStream}; - -pub struct SendControl { - stream: SendStream, - buf: BytesMut, // reuse a buffer to encode messages. -} - -impl SendControl { - pub fn new(stream: SendStream) -> Self { - Self { - buf: BytesMut::new(), - stream, - } - } - - pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { - let msg = msg.into(); - log::info!("sending message: {:?}", msg); - - self.buf.clear(); - msg.encode(&mut self.buf)?; - - // TODO make this work with select! - self.stream.write_all(&self.buf).await?; - - Ok(()) - } - - // Helper that lets multiple threads send control messages. - pub fn share(self) -> ControlShared { - ControlShared { - stream: Arc::new(Mutex::new(self)), - } - } -} - -// Helper that allows multiple threads to send control messages. -// There's no equivalent for receiving since only one thread should be receiving at a time. -#[derive(Clone)] -pub struct ControlShared { - stream: Arc>, -} - -impl ControlShared { - pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { - let mut stream = self.stream.lock().await; - stream.send(msg).await - } -} - -pub struct RecvControl { - stream: RecvStream, - buf: BytesMut, // data we've read but haven't fully decoded yet -} - -impl RecvControl { - pub fn new(stream: RecvStream) -> Self { - Self { - buf: BytesMut::new(), - stream, - } - } - - // Read the next full message from the stream. - pub async fn recv(&mut self) -> anyhow::Result { - loop { - // Read the contents of the buffer - let mut peek = Cursor::new(&self.buf); - - match Message::decode(&mut peek) { - Ok(msg) => { - // We've successfully decoded a message, so we can advance the buffer. - self.buf.advance(peek.position() as usize); - - log::info!("received message: {:?}", msg); - return Ok(msg); - } - Err(DecodeError::UnexpectedEnd) => { - // The decode failed, so we need to append more data. - self.stream.read_buf(&mut self.buf).await?; - } - Err(e) => return Err(e.into()), - } - } - } -} diff --git a/moq-transport-quinn/src/lib.rs b/moq-transport-quinn/src/lib.rs deleted file mode 100644 index d210460..0000000 --- a/moq-transport-quinn/src/lib.rs +++ /dev/null @@ -1,9 +0,0 @@ -mod control; -mod object; -mod session; -mod stream; - -pub use control::*; -pub use object::*; -pub use session::*; -pub use stream::*; diff --git a/moq-transport-quinn/src/object.rs b/moq-transport-quinn/src/object.rs deleted file mode 100644 index d348fe5..0000000 --- a/moq-transport-quinn/src/object.rs +++ /dev/null @@ -1,150 +0,0 @@ -use std::{collections::BinaryHeap, io::Cursor, sync::Arc}; - -use anyhow::Context; -use bytes::{Buf, BytesMut}; -use moq_transport::{Decode, DecodeError, Encode, Object}; - -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::sync::Mutex; -use tokio::task::JoinSet; -use webtransport_quinn::Session; - -use crate::{RecvStream, SendStream, SendStreamOrder}; - -// Allow this to be cloned so we can have multiple senders. -#[derive(Clone)] -pub struct SendObjects { - // This is a tokio mutex since we need to lock across await boundaries. - inner: Arc>, -} - -impl SendObjects { - pub fn new(session: Session) -> Self { - let inner = SendObjectsInner::new(session); - Self { - inner: Arc::new(Mutex::new(inner)), - } - } - - pub async fn open(&mut self, header: Object) -> anyhow::Result { - let mut inner = self.inner.lock().await; - inner.open(header).await - } -} - -struct SendObjectsInner { - session: Session, - - // Quinn supports a i32 for priority, but the wire format is a u64. - // Our work around is to keep a list of streams in priority order and use the index as the priority. - // This involves more work, so TODO either increase the Quinn size or reduce the wire size. - ordered: BinaryHeap, - ordered_swap: BinaryHeap, // reuse memory to avoid allocations - - // A reusable buffer for encoding headers. - // TODO figure out how to use BufMut on the stack and remove this. - buf: BytesMut, -} - -impl SendObjectsInner { - fn new(session: Session) -> Self { - Self { - session, - ordered: BinaryHeap::new(), - ordered_swap: BinaryHeap::new(), - buf: BytesMut::new(), - } - } - - pub async fn open(&mut self, header: Object) -> anyhow::Result { - let stream = self.session.open_uni().await.context("failed to open uni stream")?; - let (mut stream, priority) = SendStream::with_order(stream, header.send_order.into_inner()); - - // Add the priority to our existing list. - self.ordered.push(priority); - - // Loop through the list and update the priorities of any still active streams. - let mut index = 0; - while let Some(stream) = self.ordered.pop() { - if stream.update(index).is_ok() { - // Add the stream to the new list so it'll be in sorted order. - self.ordered_swap.push(stream); - index += 1; - } - } - - // Swap the lists so we can reuse the memory. - std::mem::swap(&mut self.ordered, &mut self.ordered_swap); - - // Encode and write the stream header. - // TODO do this in SendStream so we don't hold the lock. - // Otherwise, - self.buf.clear(); - header.encode(&mut self.buf).unwrap(); - stream.write_all(&self.buf).await.context("failed to write header")?; - - // log::info!("created stream: {:?}", header); - - Ok(stream) - } -} - -// Not clone, so we don't accidentally have two listners. -pub struct RecvObjects { - session: Session, - - // Streams that we've accepted but haven't read the header from yet. - streams: JoinSet>, -} - -impl RecvObjects { - pub fn new(session: Session) -> Self { - Self { - session, - streams: JoinSet::new(), - } - } - - pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> { - loop { - tokio::select! { - res = self.session.accept_uni() => { - let stream = res.context("failed to accept stream")?; - self.streams.spawn(async move { Self::read(stream).await }); - }, - res = self.streams.join_next(), if !self.streams.is_empty() => { - return res.unwrap().context("failed to run join set")?; - } - } - } - } - - async fn read(mut stream: webtransport_quinn::RecvStream) -> anyhow::Result<(Object, RecvStream)> { - let mut buf = BytesMut::new(); - - loop { - // Read more data into the buffer. - stream.read_buf(&mut buf).await?; - - // Use a cursor to read the buffer and remember how much we read. - let mut read = Cursor::new(&mut buf); - - let header = match Object::decode(&mut read) { - Ok(header) => header, - Err(DecodeError::UnexpectedEnd) => continue, - Err(err) => return Err(err.into()), - }; - - // We parsed a full header, advance the buffer. - let size = read.position() as usize; - buf.advance(size); - let buf = buf.freeze(); - - // log::info!("received stream: {:?}", header); - - let stream = RecvStream::new(buf, stream); - - return Ok((header, stream)); - } - } -} diff --git a/moq-transport-quinn/src/session.rs b/moq-transport-quinn/src/session.rs deleted file mode 100644 index 4ae0130..0000000 --- a/moq-transport-quinn/src/session.rs +++ /dev/null @@ -1,98 +0,0 @@ -use anyhow::Context; - -use moq_transport::{Message, SetupClient, SetupServer}; - -use super::{RecvControl, RecvObjects, SendControl, SendObjects}; - -/// Called by a server with an established WebTransport session. -// TODO close the session with an error code -pub async fn accept(session: webtransport_quinn::Session, role: moq_transport::Role) -> anyhow::Result { - let (send, recv) = session.accept_bi().await.context("failed to accept bidi stream")?; - - let mut send_control = SendControl::new(send); - let mut recv_control = RecvControl::new(recv); - - let setup_client = match recv_control.recv().await.context("failed to read SETUP")? { - Message::SetupClient(setup) => setup, - _ => anyhow::bail!("expected CLIENT SETUP"), - }; - - setup_client - .versions - .iter() - .find(|version| **version == moq_transport::Version::DRAFT_00) - .context("no supported versions")?; - - if !setup_client.role.compatible(role) { - anyhow::bail!("incompatible roles: {:?} {:?}", setup_client.role, role); - } - - let setup_server = SetupServer { - role, - version: moq_transport::Version::DRAFT_00, - }; - - send_control - .send(moq_transport::Message::SetupServer(setup_server)) - .await - .context("failed to send setup server")?; - - let send_objects = SendObjects::new(session.clone()); - let recv_objects = RecvObjects::new(session.clone()); - - Ok(Session { - send_control, - recv_control, - send_objects, - recv_objects, - }) -} - -/// Called by a client with an established WebTransport session. -pub async fn connect(session: webtransport_quinn::Session, role: moq_transport::Role) -> anyhow::Result { - let (send, recv) = session.open_bi().await.context("failed to oen bidi stream")?; - - let mut send_control = SendControl::new(send); - let mut recv_control = RecvControl::new(recv); - - let setup_client = SetupClient { - role, - versions: vec![moq_transport::Version::DRAFT_00].into(), - path: "".to_string(), - }; - - send_control - .send(moq_transport::Message::SetupClient(setup_client)) - .await - .context("failed to send SETUP CLIENT")?; - - let setup_server = match recv_control.recv().await.context("failed to read SETUP")? { - Message::SetupServer(setup) => setup, - _ => anyhow::bail!("expected SERVER SETUP"), - }; - - if setup_server.version != moq_transport::Version::DRAFT_00 { - anyhow::bail!("unsupported version: {:?}", setup_server.version); - } - - if !setup_server.role.compatible(role) { - anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role); - } - - let send_objects = SendObjects::new(session.clone()); - let recv_objects = RecvObjects::new(session.clone()); - - Ok(Session { - send_control, - recv_control, - send_objects, - recv_objects, - }) -} - -pub struct Session { - pub send_control: SendControl, - pub recv_control: RecvControl, - pub send_objects: SendObjects, - pub recv_objects: RecvObjects, -} diff --git a/moq-transport-quinn/src/stream.rs b/moq-transport-quinn/src/stream.rs deleted file mode 100644 index 2063b6d..0000000 --- a/moq-transport-quinn/src/stream.rs +++ /dev/null @@ -1,115 +0,0 @@ -use std::{ - io, - pin::{pin, Pin}, - sync::{Arc, Mutex, Weak}, - task::{self, Poll}, -}; - -use bytes::{BufMut, Bytes}; -use tokio::io::{AsyncRead, AsyncWrite}; - -// Ugh, so we need to wrap SendStream with a mutex because we need to be able to call set_priority on it. -// The problem is that set_priority takes a i32, while send_order is a VarInt -// So the solution is to maintain a priority queue of active streams and constantly update the priority with their index. -// So the library might update the priority of the stream at any point, while the application might similtaniously write to it. -// The only upside is that we don't expose set_priority, so the application can't screw with things. -pub struct SendStream { - stream: Arc>, -} - -impl SendStream { - // Create a new stream with the given order, returning a handle that allows us to update the priority. - pub(crate) fn with_order(stream: webtransport_quinn::SendStream, order: u64) -> (Self, SendStreamOrder) { - let stream = Arc::new(Mutex::new(stream)); - let weak = Arc::>::downgrade(&stream); - - (SendStream { stream }, SendStreamOrder { stream: weak, order }) - } -} - -pub(crate) struct SendStreamOrder { - // We use Weak here so we don't prevent the stream from being closed when dereferenced. - // update() will return an error if the stream was closed instead. - stream: Weak>, - order: u64, -} - -impl SendStreamOrder { - pub(crate) fn update(&self, index: i32) -> Result<(), webtransport_quinn::StreamClosed> { - let stream = self.stream.upgrade().ok_or(webtransport_quinn::StreamClosed)?; - let mut stream = stream.lock().unwrap(); - stream.set_priority(index) - } -} - -impl PartialEq for SendStreamOrder { - fn eq(&self, other: &Self) -> bool { - self.order == other.order - } -} - -impl Eq for SendStreamOrder {} - -impl PartialOrd for SendStreamOrder { - fn partial_cmp(&self, other: &Self) -> Option { - // We reverse the order so the lower send order is higher priority. - other.order.partial_cmp(&self.order) - } -} - -impl Ord for SendStreamOrder { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // We reverse the order so the lower send order is higher priority. - other.order.cmp(&self.order) - } -} - -// We implement AsyncWrite so we can grab the mutex on each write attempt, instead of holding it for the entire async function. -impl AsyncWrite for SendStream { - fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> task::Poll> { - let mut stream = self.stream.lock().unwrap(); - Pin::new(&mut *stream).poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll> { - let mut stream = self.stream.lock().unwrap(); - Pin::new(&mut *stream).poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll> { - let mut stream = self.stream.lock().unwrap(); - Pin::new(&mut *stream).poll_shutdown(cx) - } -} - -// Unfortunately, we need to wrap RecvStream with a buffer since moq-transport::Coding only supports buffered reads. -// We first serve any data in the buffer, then we poll the stream. -pub struct RecvStream { - buf: Bytes, - stream: webtransport_quinn::RecvStream, -} - -impl RecvStream { - pub(crate) fn new(buf: Bytes, stream: webtransport_quinn::RecvStream) -> Self { - Self { buf, stream } - } - - pub fn stop(&mut self, code: u32) { - self.stream.stop(code).ok(); - } -} - -impl AsyncRead for RecvStream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - if !self.buf.is_empty() { - buf.put(&mut pin!(self).buf); - Poll::Ready(Ok(())) - } else { - Pin::new(&mut self.stream).poll_read(cx, buf) - } - } -} diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 1706a6a..5a8f3ba 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -15,5 +15,8 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -bytes = "1" -thiserror = "1.0.21" +bytes = "1.4" +thiserror = "1" +anyhow = "1" +webtransport-generic = "0.5" +tokio = { version = "1.27", features = ["macros", "io-util", "rt", "sync"] } diff --git a/moq-transport/src/coding/decode.rs b/moq-transport/src/coding/decode.rs index 4ed04cf..e2f8f3a 100644 --- a/moq-transport/src/coding/decode.rs +++ b/moq-transport/src/coding/decode.rs @@ -1,5 +1,4 @@ use super::VarInt; -use bytes::{Buf, Bytes}; use std::str; use thiserror::Error; @@ -15,62 +14,6 @@ pub enum DecodeError { #[error("invalid type: {0:?}")] InvalidType(VarInt), - #[error("unknown error")] - Unknown, + #[error("io error: {0}")] + IoError(#[from] std::io::Error), } - -pub trait Decode: Sized { - // Decodes a message, returning UnexpectedEnd if there's not enough bytes in the buffer. - fn decode(r: &mut R) -> Result; -} - -impl Decode for Bytes { - fn decode(r: &mut R) -> Result { - let size = VarInt::decode(r)?.into_inner() as usize; - if r.remaining() < size { - return Err(DecodeError::UnexpectedEnd); - } - - let buf = r.copy_to_bytes(size); - Ok(buf) - } -} - -impl Decode for Vec { - fn decode(r: &mut R) -> Result { - Bytes::decode(r).map(|b| b.to_vec()) - } -} - -impl Decode for String { - fn decode(r: &mut R) -> Result { - let data = Vec::decode(r)?; - let s = str::from_utf8(&data)?.to_string(); - Ok(s) - } -} - -impl Decode for u8 { - fn decode(r: &mut R) -> Result { - if r.remaining() < 1 { - return Err(DecodeError::UnexpectedEnd); - } - - Ok(r.get_u8()) - } -} - -/* -impl Decode for [u8; N] { - fn decode(r: &mut R) -> Result { - if r.remaining() < N { - return Err(DecodeError::UnexpectedEnd); - } - - let mut buf = [0; N]; - r.copy_to_slice(&mut buf); - - Ok(buf) - } -} -*/ diff --git a/moq-transport/src/coding/duration.rs b/moq-transport/src/coding/duration.rs deleted file mode 100644 index 2ac098a..0000000 --- a/moq-transport/src/coding/duration.rs +++ /dev/null @@ -1,20 +0,0 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - -use bytes::{Buf, BufMut}; - -use std::time::Duration; - -impl Encode for Duration { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - let ms = self.as_millis(); - let ms = VarInt::try_from(ms)?; - ms.encode(w) - } -} - -impl Decode for Duration { - fn decode(r: &mut R) -> Result { - let ms = VarInt::decode(r)?; - Ok(Self::from_millis(ms.into())) - } -} diff --git a/moq-transport/src/coding/encode.rs b/moq-transport/src/coding/encode.rs index 99c2712..cd0d928 100644 --- a/moq-transport/src/coding/encode.rs +++ b/moq-transport/src/coding/encode.rs @@ -1,95 +1,12 @@ -use super::{BoundsExceeded, VarInt}; -use bytes::{BufMut, Bytes}; +use super::BoundsExceeded; use thiserror::Error; #[derive(Error, Debug)] pub enum EncodeError { - #[error("unexpected end of buffer")] - UnexpectedEnd, - #[error("varint too large")] BoundsExceeded(#[from] BoundsExceeded), - #[error("unknown error")] - Unknown, -} - -pub trait Encode: Sized { - fn encode(&self, w: &mut W) -> Result<(), EncodeError>; -} - -impl Encode for Bytes { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.as_ref().encode(w) - } -} - -impl Encode for Vec { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.as_slice().encode(w) - } -} - -impl Encode for &[u8] { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - let size = VarInt::try_from(self.len())?; - size.encode(w)?; - - if w.remaining_mut() < self.len() { - return Err(EncodeError::UnexpectedEnd); - } - w.put_slice(self); - - Ok(()) - } -} - -impl Encode for String { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.as_bytes().encode(w) - } -} - -impl Encode for u8 { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - if w.remaining_mut() < 1 { - return Err(EncodeError::UnexpectedEnd); - } - - w.put_u8(*self); - Ok(()) - } -} - -impl Encode for u16 { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - if w.remaining_mut() < 2 { - return Err(EncodeError::UnexpectedEnd); - } - - w.put_u16(*self); - Ok(()) - } -} - -impl Encode for u32 { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - if w.remaining_mut() < 4 { - return Err(EncodeError::UnexpectedEnd); - } - - w.put_u32(*self); - Ok(()) - } -} -impl Encode for u64 { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - if w.remaining_mut() < 8 { - return Err(EncodeError::UnexpectedEnd); - } - - w.put_u64(*self); - Ok(()) - } + #[error("i/o error: {0}")] + IoError(#[from] std::io::Error), } diff --git a/moq-transport/src/coding/mod.rs b/moq-transport/src/coding/mod.rs index a632307..f3cc9c8 100644 --- a/moq-transport/src/coding/mod.rs +++ b/moq-transport/src/coding/mod.rs @@ -1,9 +1,9 @@ mod decode; -mod duration; mod encode; +mod string; mod varint; pub use decode::*; -pub use duration::*; pub use encode::*; +pub use string::*; pub use varint::*; diff --git a/moq-transport/src/coding/string.rs b/moq-transport/src/coding/string.rs new file mode 100644 index 0000000..24c7bb0 --- /dev/null +++ b/moq-transport/src/coding/string.rs @@ -0,0 +1,22 @@ +use std::cmp::min; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use webtransport_generic::{RecvStream, SendStream}; + +use crate::VarInt; + +use super::{DecodeError, EncodeError}; + +pub async fn encode_string(s: &str, w: &mut W) -> Result<(), EncodeError> { + let size = VarInt::try_from(s.len())?; + size.encode(w).await?; + w.write_all(s.as_ref()).await?; + Ok(()) +} + +pub async fn decode_string(r: &mut R) -> Result { + let size = VarInt::decode(r).await?.into_inner(); + let mut str = String::with_capacity(min(1024, size) as usize); + r.take(size).read_to_string(&mut str).await?; + Ok(str) +} diff --git a/moq-transport/src/coding/varint.rs b/moq-transport/src/coding/varint.rs index 80afcd5..41a95ca 100644 --- a/moq-transport/src/coding/varint.rs +++ b/moq-transport/src/coding/varint.rs @@ -5,10 +5,11 @@ use std::convert::{TryFrom, TryInto}; use std::fmt; -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; - -use bytes::{Buf, BufMut}; use thiserror::Error; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use webtransport_generic::{RecvStream, SendStream}; + +use super::{DecodeError, EncodeError}; #[derive(Debug, Copy, Clone, Eq, PartialEq, Error)] #[error("value too large for varint encoding")] @@ -120,15 +121,10 @@ impl fmt::Display for VarInt { } } -impl Decode for VarInt { - fn decode(r: &mut R) -> Result { - let mut buf = [0; 8]; - - if r.remaining() < 1 { - return Err(DecodeError::UnexpectedEnd); - } - - buf[0] = r.get_u8(); +impl VarInt { + pub async fn decode(r: &mut R) -> Result { + let mut buf = [0u8; 8]; + r.read_exact(buf[0..1].as_mut()).await?; let tag = buf[0] >> 6; buf[0] &= 0b0011_1111; @@ -136,27 +132,15 @@ impl Decode for VarInt { let x = match tag { 0b00 => u64::from(buf[0]), 0b01 => { - if r.remaining() < 1 { - return Err(DecodeError::UnexpectedEnd); - } - - r.copy_to_slice(buf[1..2].as_mut()); + r.read_exact(buf[1..2].as_mut()).await?; u64::from(u16::from_be_bytes(buf[..2].try_into().unwrap())) } 0b10 => { - if r.remaining() < 3 { - return Err(DecodeError::UnexpectedEnd); - } - - r.copy_to_slice(buf[1..4].as_mut()); + r.read_exact(buf[1..4].as_mut()).await?; u64::from(u32::from_be_bytes(buf[..4].try_into().unwrap())) } 0b11 => { - if r.remaining() < 7 { - return Err(DecodeError::UnexpectedEnd); - } - - r.copy_to_slice(buf[1..8].as_mut()); + r.read_exact(buf[1..8].as_mut()).await?; u64::from_be_bytes(buf) } _ => unreachable!(), @@ -164,21 +148,21 @@ impl Decode for VarInt { Ok(Self(x)) } -} -impl Encode for VarInt { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { let x = self.0; if x < 2u64.pow(6) { - (x as u8).encode(w) + w.write_u8(x as u8).await?; } else if x < 2u64.pow(14) { - (0b01 << 14 | x as u16).encode(w) + w.write_u16(0b01 << 14 | x as u16).await?; } else if x < 2u64.pow(30) { - (0b10 << 30 | x as u32).encode(w) + w.write_u32(0b10 << 30 | x as u32).await?; } else if x < 2u64.pow(62) { - (0b11 << 62 | x).encode(w) + w.write_u64(0b11 << 62 | x).await?; } else { unreachable!("malformed VarInt"); } + + Ok(()) } } diff --git a/moq-transport/src/control/announce.rs b/moq-transport/src/control/announce.rs deleted file mode 100644 index 54424ac..0000000 --- a/moq-transport/src/control/announce.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; - -use bytes::{Buf, BufMut}; - -#[derive(Debug)] -pub struct Announce { - // The track namespace - pub track_namespace: String, -} - -impl Decode for Announce { - fn decode(r: &mut R) -> Result { - let track_namespace = String::decode(r)?; - Ok(Self { track_namespace }) - } -} - -impl Encode for Announce { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.track_namespace.encode(w)?; - Ok(()) - } -} diff --git a/moq-transport/src/control/announce_error.rs b/moq-transport/src/control/announce_error.rs deleted file mode 100644 index ae09cfd..0000000 --- a/moq-transport/src/control/announce_error.rs +++ /dev/null @@ -1,40 +0,0 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - -use bytes::{Buf, BufMut}; - -#[derive(Debug)] -pub struct AnnounceError { - // Echo back the namespace that was announced. - // TODO Propose using an ID to save bytes. - pub track_namespace: String, - - // An error code. - pub code: VarInt, - - // An optional, human-readable reason. - pub reason: String, -} - -impl Decode for AnnounceError { - fn decode(r: &mut R) -> Result { - let track_namespace = String::decode(r)?; - let code = VarInt::decode(r)?; - let reason = String::decode(r)?; - - Ok(Self { - track_namespace, - code, - reason, - }) - } -} - -impl Encode for AnnounceError { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.track_namespace.encode(w)?; - self.code.encode(w)?; - self.reason.encode(w)?; - - Ok(()) - } -} diff --git a/moq-transport/src/control/announce_ok.rs b/moq-transport/src/control/announce_ok.rs deleted file mode 100644 index cb6d7a9..0000000 --- a/moq-transport/src/control/announce_ok.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; - -use bytes::{Buf, BufMut}; - -#[derive(Debug)] -pub struct AnnounceOk { - // Echo back the namespace that was announced. - // TODO Propose using an ID to save bytes. - pub track_namespace: String, -} - -impl Decode for AnnounceOk { - fn decode(r: &mut R) -> Result { - let track_namespace = String::decode(r)?; - Ok(Self { track_namespace }) - } -} - -impl Encode for AnnounceOk { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.track_namespace.encode(w) - } -} diff --git a/moq-transport/src/control/go_away.rs b/moq-transport/src/control/go_away.rs deleted file mode 100644 index e91d933..0000000 --- a/moq-transport/src/control/go_away.rs +++ /dev/null @@ -1,21 +0,0 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; - -use bytes::{Buf, BufMut}; - -#[derive(Debug)] -pub struct GoAway { - pub url: String, -} - -impl Decode for GoAway { - fn decode(r: &mut R) -> Result { - let url = String::decode(r)?; - Ok(Self { url }) - } -} - -impl Encode for GoAway { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.url.encode(w) - } -} diff --git a/moq-transport/src/control/subscribe.rs b/moq-transport/src/control/subscribe.rs deleted file mode 100644 index abbdb92..0000000 --- a/moq-transport/src/control/subscribe.rs +++ /dev/null @@ -1,40 +0,0 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - -use bytes::{Buf, BufMut}; - -#[derive(Debug)] -pub struct Subscribe { - // An ID we choose so we can map to the track_name. - // Proposal: https://github.com/moq-wg/moq-transport/issues/209 - pub track_id: VarInt, - - // The track namespace. - pub track_namespace: String, - - // The track name. - pub track_name: String, -} - -impl Decode for Subscribe { - fn decode(r: &mut R) -> Result { - let track_id = VarInt::decode(r)?; - let track_namespace = String::decode(r)?; - let track_name = String::decode(r)?; - - Ok(Self { - track_id, - track_namespace, - track_name, - }) - } -} - -impl Encode for Subscribe { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.track_id.encode(w)?; - self.track_namespace.encode(w)?; - self.track_name.encode(w)?; - - Ok(()) - } -} diff --git a/moq-transport/src/control/subscribe_error.rs b/moq-transport/src/control/subscribe_error.rs deleted file mode 100644 index b7412c6..0000000 --- a/moq-transport/src/control/subscribe_error.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - -use bytes::{Buf, BufMut}; - -#[derive(Debug)] -pub struct SubscribeError { - // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 - - // The ID for this track. - pub track_id: VarInt, - - // An error code. - pub code: VarInt, - - // An optional, human-readable reason. - pub reason: String, -} - -impl Decode for SubscribeError { - fn decode(r: &mut R) -> Result { - let track_id = VarInt::decode(r)?; - let code = VarInt::decode(r)?; - let reason = String::decode(r)?; - - Ok(Self { track_id, code, reason }) - } -} - -impl Encode for SubscribeError { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.track_id.encode(w)?; - self.code.encode(w)?; - self.reason.encode(w)?; - - Ok(()) - } -} diff --git a/moq-transport/src/control/subscribe_ok.rs b/moq-transport/src/control/subscribe_ok.rs deleted file mode 100644 index b502626..0000000 --- a/moq-transport/src/control/subscribe_ok.rs +++ /dev/null @@ -1,36 +0,0 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - -use std::time::Duration; - -use bytes::{Buf, BufMut}; - -#[derive(Debug)] -pub struct SubscribeOk { - // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 - - // The ID for this track. - pub track_id: VarInt, - - // The subscription will end after this duration has elapsed. - // A value of zero is invalid. - pub expires: Option, -} - -impl Decode for SubscribeOk { - fn decode(r: &mut R) -> Result { - let track_id = VarInt::decode(r)?; - let expires = Duration::decode(r)?; - let expires = if expires == Duration::ZERO { None } else { Some(expires) }; - - Ok(Self { track_id, expires }) - } -} - -impl Encode for SubscribeOk { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.track_id.encode(w)?; - self.expires.unwrap_or_default().encode(w)?; - - Ok(()) - } -} diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index 8d045ac..655a9d2 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -1,7 +1,9 @@ mod coding; -mod control; -mod object; +pub mod message; +pub mod object; +pub mod session; +pub mod setup; -pub use coding::*; -pub use control::*; -pub use object::*; +pub use coding::VarInt; +pub use message::Message; +pub use session::Session; diff --git a/moq-transport/src/message/announce.rs b/moq-transport/src/message/announce.rs new file mode 100644 index 0000000..c9491b1 --- /dev/null +++ b/moq-transport/src/message/announce.rs @@ -0,0 +1,21 @@ +use crate::coding::{decode_string, encode_string, DecodeError, EncodeError}; + +use webtransport_generic::{RecvStream, SendStream}; + +#[derive(Debug)] +pub struct Announce { + // The track namespace + pub track_namespace: String, +} + +impl Announce { + pub async fn decode(r: &mut R) -> Result { + let track_namespace = decode_string(r).await?; + Ok(Self { track_namespace }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + encode_string(&self.track_namespace, w).await?; + Ok(()) + } +} diff --git a/moq-transport/src/message/announce_error.rs b/moq-transport/src/message/announce_error.rs new file mode 100644 index 0000000..d83350a --- /dev/null +++ b/moq-transport/src/message/announce_error.rs @@ -0,0 +1,38 @@ +use crate::coding::{decode_string, encode_string, DecodeError, EncodeError, VarInt}; + +use webtransport_generic::{RecvStream, SendStream}; + +#[derive(Debug)] +pub struct AnnounceError { + // Echo back the namespace that was announced. + // TODO Propose using an ID to save bytes. + pub track_namespace: String, + + // An error code. + pub code: VarInt, + + // An optional, human-readable reason. + pub reason: String, +} + +impl AnnounceError { + pub async fn decode(r: &mut R) -> Result { + let track_namespace = decode_string(r).await?; + let code = VarInt::decode(r).await?; + let reason = decode_string(r).await?; + + Ok(Self { + track_namespace, + code, + reason, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + encode_string(&self.track_namespace, w).await?; + self.code.encode(w).await?; + encode_string(&self.reason, w).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/message/announce_ok.rs b/moq-transport/src/message/announce_ok.rs new file mode 100644 index 0000000..4dd20a6 --- /dev/null +++ b/moq-transport/src/message/announce_ok.rs @@ -0,0 +1,21 @@ +use crate::coding::{decode_string, encode_string, DecodeError, EncodeError}; + +use webtransport_generic::{RecvStream, SendStream}; + +#[derive(Debug)] +pub struct AnnounceOk { + // Echo back the namespace that was announced. + // TODO Propose using an ID to save bytes. + pub track_namespace: String, +} + +impl AnnounceOk { + pub async fn decode(r: &mut R) -> Result { + let track_namespace = decode_string(r).await?; + Ok(Self { track_namespace }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + encode_string(&self.track_namespace, w).await + } +} diff --git a/moq-transport/src/message/go_away.rs b/moq-transport/src/message/go_away.rs new file mode 100644 index 0000000..1ebaf53 --- /dev/null +++ b/moq-transport/src/message/go_away.rs @@ -0,0 +1,19 @@ +use crate::coding::{decode_string, encode_string, DecodeError, EncodeError}; + +use webtransport_generic::{RecvStream, SendStream}; + +#[derive(Debug)] +pub struct GoAway { + pub url: String, +} + +impl GoAway { + pub async fn decode(r: &mut R) -> Result { + let url = decode_string(r).await?; + Ok(Self { url }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + encode_string(&self.url, w).await + } +} diff --git a/moq-transport/src/control/mod.rs b/moq-transport/src/message/mod.rs similarity index 75% rename from moq-transport/src/control/mod.rs rename to moq-transport/src/message/mod.rs index 1070a1c..e142eb1 100644 --- a/moq-transport/src/control/mod.rs +++ b/moq-transport/src/message/mod.rs @@ -2,31 +2,29 @@ mod announce; mod announce_error; mod announce_ok; mod go_away; -mod role; -mod setup_client; -mod setup_server; +mod receiver; +mod sender; mod subscribe; mod subscribe_error; mod subscribe_ok; -mod version; pub use announce::*; pub use announce_error::*; pub use announce_ok::*; pub use go_away::*; -pub use role::*; -pub use setup_client::*; -pub use setup_server::*; +pub use receiver::*; +pub use sender::*; pub use subscribe::*; pub use subscribe_error::*; pub use subscribe_ok::*; -pub use version::*; -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; +use crate::coding::{DecodeError, EncodeError, VarInt}; +use crate::setup; -use bytes::{Buf, BufMut}; use std::fmt; +use webtransport_generic::{RecvStream, SendStream}; + // NOTE: This is forked from moq-transport-00. // 1. SETUP role indicates local support ("I can subscribe"), not remote support ("server must publish") // 2. SETUP_SERVER is id=2 to disambiguate @@ -43,28 +41,24 @@ macro_rules! message_types { $($name($name)),* } - - impl Decode for Message { - fn decode(r: &mut R) -> Result { - let t = VarInt::decode(r)?; + impl Message { + pub async fn decode(r: &mut R) -> Result { + let t = VarInt::decode(r).await?; match t.into_inner() { $($val => { - let msg = $name::decode(r)?; + let msg = $name::decode(r).await?; Ok(Self::$name(msg)) })* _ => Err(DecodeError::InvalidType(t)), } } - } - - impl Encode for Message { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { match self { $(Self::$name(ref m) => { - VarInt::from_u32($val).encode(w)?; - m.encode(w) + VarInt::from_u32($val).encode(w).await?; + m.encode(w).await },)* } } @@ -87,6 +81,10 @@ macro_rules! message_types { } } +// Just so we can use the macro above. +type SetupClient = setup::Client; +type SetupServer = setup::Server; + // Each message is prefixed with the given VarInt type. message_types! { // NOTE: Object and Setup are in other modules. diff --git a/moq-transport/src/message/receiver.rs b/moq-transport/src/message/receiver.rs new file mode 100644 index 0000000..29b0590 --- /dev/null +++ b/moq-transport/src/message/receiver.rs @@ -0,0 +1,19 @@ +use crate::{coding::DecodeError, message::Message}; + +use webtransport_generic::RecvStream; + +pub struct Receiver { + stream: R, +} + +impl Receiver { + pub fn new(stream: R) -> Self { + Self { stream } + } + + // Read the next full message from the stream. + // NOTE: This is not cancellable; you must poll the future to completion. + pub async fn recv(&mut self) -> Result { + Message::decode(&mut self.stream).await + } +} diff --git a/moq-transport/src/message/sender.rs b/moq-transport/src/message/sender.rs new file mode 100644 index 0000000..5cefa2f --- /dev/null +++ b/moq-transport/src/message/sender.rs @@ -0,0 +1,21 @@ +use crate::message::Message; + +use webtransport_generic::SendStream; + +pub struct Sender { + stream: S, +} + +impl Sender { + pub fn new(stream: S) -> Self { + Self { stream } + } + + // Read the next full message from the stream. + // NOTE: This is not cancellable; you must poll the future to completion. + pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { + let msg = msg.into(); + msg.encode(&mut self.stream).await?; + Ok(()) + } +} diff --git a/moq-transport/src/message/subscribe.rs b/moq-transport/src/message/subscribe.rs new file mode 100644 index 0000000..4688240 --- /dev/null +++ b/moq-transport/src/message/subscribe.rs @@ -0,0 +1,40 @@ +use crate::coding::{decode_string, encode_string, DecodeError, EncodeError, VarInt}; + +use webtransport_generic::{RecvStream, SendStream}; + +#[derive(Debug)] +pub struct Subscribe { + // An ID we choose so we can map to the track_name. + // Proposal: https://github.com/moq-wg/moq-transport/issues/209 + pub track_id: VarInt, + + // The track namespace. + pub track_namespace: String, + + // The track name. + pub track_name: String, +} + +impl Subscribe { + pub async fn decode(r: &mut R) -> Result { + let track_id = VarInt::decode(r).await?; + let track_namespace = decode_string(r).await?; + let track_name = decode_string(r).await?; + + Ok(Self { + track_id, + track_namespace, + track_name, + }) + } +} + +impl Subscribe { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_id.encode(w).await?; + encode_string(&self.track_namespace, w).await?; + encode_string(&self.track_name, w).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscribe_error.rs b/moq-transport/src/message/subscribe_error.rs new file mode 100644 index 0000000..d1ab9b2 --- /dev/null +++ b/moq-transport/src/message/subscribe_error.rs @@ -0,0 +1,37 @@ +use crate::coding::{decode_string, encode_string, DecodeError, EncodeError, VarInt}; + +use webtransport_generic::{RecvStream, SendStream}; + +#[derive(Debug)] +pub struct SubscribeError { + // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 + + // The ID for this track. + pub track_id: VarInt, + + // An error code. + pub code: VarInt, + + // An optional, human-readable reason. + pub reason: String, +} + +impl SubscribeError { + pub async fn decode(r: &mut R) -> Result { + let track_id = VarInt::decode(r).await?; + let code = VarInt::decode(r).await?; + let reason = decode_string(r).await?; + + Ok(Self { track_id, code, reason }) + } +} + +impl SubscribeError { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_id.encode(w).await?; + self.code.encode(w).await?; + encode_string(&self.reason, w).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/message/subscribe_ok.rs b/moq-transport/src/message/subscribe_ok.rs new file mode 100644 index 0000000..bb4d94d --- /dev/null +++ b/moq-transport/src/message/subscribe_ok.rs @@ -0,0 +1,34 @@ +use crate::coding::{DecodeError, EncodeError, VarInt}; + +use webtransport_generic::{RecvStream, SendStream}; + +#[derive(Debug)] +pub struct SubscribeOk { + // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 + + // The ID for this track. + pub track_id: VarInt, + + // The subscription will end after this duration has elapsed. + // A value of zero is invalid. + pub expires: Option, +} + +impl SubscribeOk { + pub async fn decode(r: &mut R) -> Result { + let track_id = VarInt::decode(r).await?; + let expires = VarInt::decode(r).await?; + let expires = if expires.into_inner() == 0 { None } else { Some(expires) }; + + Ok(Self { track_id, expires }) + } +} + +impl SubscribeOk { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.track_id.encode(w).await?; + self.expires.unwrap_or_default().encode(w).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/object.rs b/moq-transport/src/object.rs deleted file mode 100644 index d937a82..0000000 --- a/moq-transport/src/object.rs +++ /dev/null @@ -1,54 +0,0 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; - -use bytes::{Buf, BufMut}; - -#[derive(Debug)] -pub struct Object { - // An ID for this track. - // Proposal: https://github.com/moq-wg/moq-transport/issues/209 - pub track: VarInt, - - // The group sequence number. - pub group: VarInt, - - // The object sequence number. - pub sequence: VarInt, - - // The priority/send order. - pub send_order: VarInt, -} - -impl Decode for Object { - fn decode(r: &mut R) -> Result { - let typ = VarInt::decode(r)?; - if typ.into_inner() != 0 { - return Err(DecodeError::InvalidType(typ)); - } - - // NOTE: size has been omitted - - let track = VarInt::decode(r)?; - let group = VarInt::decode(r)?; - let sequence = VarInt::decode(r)?; - let send_order = VarInt::decode(r)?; - - Ok(Self { - track, - group, - sequence, - send_order, - }) - } -} - -impl Encode for Object { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - VarInt::from_u32(0).encode(w)?; - self.track.encode(w)?; - self.group.encode(w)?; - self.sequence.encode(w)?; - self.send_order.encode(w)?; - - Ok(()) - } -} diff --git a/moq-transport/src/object/header.rs b/moq-transport/src/object/header.rs new file mode 100644 index 0000000..9310b4c --- /dev/null +++ b/moq-transport/src/object/header.rs @@ -0,0 +1,54 @@ +use crate::coding::{DecodeError, EncodeError, VarInt}; + +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use webtransport_generic::{RecvStream, SendStream}; + +#[derive(Debug)] +pub struct Header { + // An ID for this track. + // Proposal: https://github.com/moq-wg/moq-transport/issues/209 + pub track: VarInt, + + // The group sequence number. + pub group: VarInt, + + // The object sequence number. + pub sequence: VarInt, + + // The priority/send order. + // Proposal: int32 instead of a varint. + pub send_order: i32, +} + +impl Header { + pub async fn decode(r: &mut R) -> Result { + let typ = VarInt::decode(r).await?; + if typ.into_inner() != 0 { + return Err(DecodeError::InvalidType(typ)); + } + + // NOTE: size has been omitted + + let track = VarInt::decode(r).await?; + let group = VarInt::decode(r).await?; + let sequence = VarInt::decode(r).await?; + let send_order = r.read_i32().await?; // big-endian + + Ok(Self { + track, + group, + sequence, + send_order, + }) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + VarInt::from_u32(0).encode(w).await?; + self.track.encode(w).await?; + self.group.encode(w).await?; + self.sequence.encode(w).await?; + w.write_i32(self.send_order).await?; + + Ok(()) + } +} diff --git a/moq-transport/src/object/mod.rs b/moq-transport/src/object/mod.rs new file mode 100644 index 0000000..56dace8 --- /dev/null +++ b/moq-transport/src/object/mod.rs @@ -0,0 +1,7 @@ +mod header; +mod receiver; +mod sender; + +pub use header::*; +pub use receiver::*; +pub use sender::*; diff --git a/moq-transport/src/object/receiver.rs b/moq-transport/src/object/receiver.rs new file mode 100644 index 0000000..ad1eb14 --- /dev/null +++ b/moq-transport/src/object/receiver.rs @@ -0,0 +1,42 @@ +use crate::object::Header; + +use anyhow::Context; + +use tokio::task::JoinSet; + +use webtransport_generic::Session; + +pub struct Receiver { + session: S, + + // Streams that we've accepted but haven't read the header from yet. + streams: JoinSet>, +} + +impl Receiver { + pub fn new(session: S) -> Self { + Self { + session, + streams: JoinSet::new(), + } + } + + pub async fn recv(&mut self) -> anyhow::Result<(Header, S::RecvStream)> { + loop { + tokio::select! { + res = self.session.accept_uni() => { + let stream = res.context("failed to accept stream")?; + self.streams.spawn(async move { Self::read(stream).await }); + }, + res = self.streams.join_next(), if !self.streams.is_empty() => { + return res.unwrap().context("failed to run join set")?; + } + } + } + } + + async fn read(mut stream: S::RecvStream) -> anyhow::Result<(Header, S::RecvStream)> { + let header = Header::decode(&mut stream).await?; + Ok((header, stream)) + } +} diff --git a/moq-transport/src/object/sender.rs b/moq-transport/src/object/sender.rs new file mode 100644 index 0000000..4db881b --- /dev/null +++ b/moq-transport/src/object/sender.rs @@ -0,0 +1,29 @@ +use anyhow::Context; + +use crate::object::Header; + +use webtransport_generic::{SendStream, Session}; + +// Allow this to be cloned so we can have multiple senders. +#[derive(Clone)] +pub struct Sender { + // The session. + session: S, +} + +impl Sender { + pub fn new(session: S) -> Self { + Self { session } + } + + pub async fn open(&mut self, header: Header) -> anyhow::Result { + let mut stream = self.session.open_uni().await.context("failed to open uni stream")?; + + stream.set_priority(header.send_order); + header.encode(&mut stream).await.context("failed to write header")?; + + // log::info!("created stream: {:?}", header); + + Ok(stream) + } +} diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session/mod.rs new file mode 100644 index 0000000..2e7587e --- /dev/null +++ b/moq-transport/src/session/mod.rs @@ -0,0 +1,99 @@ +use anyhow::Context; + +use crate::{message, object, setup}; +use webtransport_generic::Session as WTSession; + +pub struct Session { + pub send_control: message::Sender, + pub recv_control: message::Receiver, + pub send_objects: object::Sender, + pub recv_objects: object::Receiver, +} + +impl Session { + /// Called by a server with an established WebTransport session. + // TODO close the session with an error code + pub async fn accept(session: S, role: setup::Role) -> anyhow::Result { + let (send, recv) = session.accept_bi().await.context("failed to accept bidi stream")?; + + let mut send_control = message::Sender::new(send); + let mut recv_control = message::Receiver::new(recv); + + let setup_client = match recv_control.recv().await.context("failed to read SETUP")? { + message::Message::SetupClient(setup) => setup, + _ => anyhow::bail!("expected CLIENT SETUP"), + }; + + setup_client + .versions + .iter() + .find(|version| **version == setup::Version::DRAFT_00) + .context("no supported versions")?; + + if !setup_client.role.compatible(role) { + anyhow::bail!("incompatible roles: {:?} {:?}", setup_client.role, role); + } + + let setup_server = setup::Server { + role, + version: setup::Version::DRAFT_00, + }; + + send_control + .send(message::Message::SetupServer(setup_server)) + .await + .context("failed to send setup server")?; + + let send_objects = object::Sender::new(session.clone()); + let recv_objects = object::Receiver::new(session.clone()); + + Ok(Session { + send_control, + recv_control, + send_objects, + recv_objects, + }) + } + + /// Called by a client with an established WebTransport session. + pub async fn connect(session: S, role: setup::Role) -> anyhow::Result { + let (send, recv) = session.open_bi().await.context("failed to oen bidi stream")?; + + let mut send_control = message::Sender::new(send); + let mut recv_control = message::Receiver::new(recv); + + let setup_client = setup::Client { + role, + versions: vec![setup::Version::DRAFT_00].into(), + path: "".to_string(), + }; + + send_control + .send(message::Message::SetupClient(setup_client)) + .await + .context("failed to send SETUP CLIENT")?; + + let setup_server = match recv_control.recv().await.context("failed to read SETUP")? { + message::Message::SetupServer(setup) => setup, + _ => anyhow::bail!("expected SERVER SETUP"), + }; + + if setup_server.version != setup::Version::DRAFT_00 { + anyhow::bail!("unsupported version: {:?}", setup_server.version); + } + + if !setup_server.role.compatible(role) { + anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role); + } + + let send_objects = object::Sender::new(session.clone()); + let recv_objects = object::Receiver::new(session.clone()); + + Ok(Session { + send_control, + recv_control, + send_objects, + recv_objects, + }) + } +} diff --git a/moq-transport/src/control/setup_client.rs b/moq-transport/src/setup/client.rs similarity index 51% rename from moq-transport/src/control/setup_client.rs rename to moq-transport/src/setup/client.rs index a2f3126..56bdb27 100644 --- a/moq-transport/src/control/setup_client.rs +++ b/moq-transport/src/setup/client.rs @@ -1,11 +1,11 @@ use super::{Role, Versions}; -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{decode_string, encode_string, DecodeError, EncodeError}; -use bytes::{Buf, BufMut}; +use webtransport_generic::{RecvStream, SendStream}; // Sent by the client to setup up the session. #[derive(Debug)] -pub struct SetupClient { +pub struct Client { // NOTE: This is not a message type, but rather the control stream header. // Proposal: https://github.com/moq-wg/moq-transport/issues/138 @@ -20,21 +20,19 @@ pub struct SetupClient { pub path: String, } -impl Decode for SetupClient { - fn decode(r: &mut R) -> Result { - let versions = Versions::decode(r)?; - let role = Role::decode(r)?; - let path = String::decode(r)?; +impl Client { + pub async fn decode(r: &mut R) -> Result { + let versions = Versions::decode(r).await?; + let role = Role::decode(r).await?; + let path = decode_string(r).await?; Ok(Self { versions, role, path }) } -} -impl Encode for SetupClient { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.versions.encode(w)?; - self.role.encode(w)?; - self.path.encode(w)?; + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.versions.encode(w).await?; + self.role.encode(w).await?; + encode_string(&self.path, w).await?; Ok(()) } diff --git a/moq-transport/src/setup/mod.rs b/moq-transport/src/setup/mod.rs new file mode 100644 index 0000000..3fc9ab7 --- /dev/null +++ b/moq-transport/src/setup/mod.rs @@ -0,0 +1,9 @@ +mod client; +mod role; +mod server; +mod version; + +pub use client::*; +pub use role::*; +pub use server::*; +pub use version::*; diff --git a/moq-transport/src/control/role.rs b/moq-transport/src/setup/role.rs similarity index 73% rename from moq-transport/src/control/role.rs rename to moq-transport/src/setup/role.rs index ea7d413..47e3d21 100644 --- a/moq-transport/src/control/role.rs +++ b/moq-transport/src/setup/role.rs @@ -1,6 +1,6 @@ -use bytes::{Buf, BufMut}; +use webtransport_generic::{RecvStream, SendStream}; -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; +use crate::coding::{DecodeError, EncodeError, VarInt}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Role { @@ -52,15 +52,13 @@ impl TryFrom for Role { } } -impl Decode for Role { - fn decode(r: &mut R) -> Result { - let v = VarInt::decode(r)?; +impl Role { + pub async fn decode(r: &mut R) -> Result { + let v = VarInt::decode(r).await?; v.try_into() } -} -impl Encode for Role { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - VarInt::from(*self).encode(w) + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + VarInt::from(*self).encode(w).await } } diff --git a/moq-transport/src/control/setup_server.rs b/moq-transport/src/setup/server.rs similarity index 53% rename from moq-transport/src/control/setup_server.rs rename to moq-transport/src/setup/server.rs index c1a6092..6ad6bee 100644 --- a/moq-transport/src/control/setup_server.rs +++ b/moq-transport/src/setup/server.rs @@ -1,13 +1,13 @@ use super::{Role, Version}; -use crate::coding::{Decode, DecodeError, Encode, EncodeError}; +use crate::coding::{DecodeError, EncodeError}; -use bytes::{Buf, BufMut}; +use webtransport_generic::{RecvStream, SendStream}; // Sent by the server in response to a client. // NOTE: This is not a message type, but rather the control stream header. // Proposal: https://github.com/moq-wg/moq-transport/issues/138 #[derive(Debug)] -pub struct SetupServer { +pub struct Server { // The list of supported versions in preferred order. pub version: Version, @@ -16,19 +16,17 @@ pub struct SetupServer { pub role: Role, } -impl Decode for SetupServer { - fn decode(r: &mut R) -> Result { - let version = Version::decode(r)?; - let role = Role::decode(r)?; +impl Server { + pub async fn decode(r: &mut R) -> Result { + let version = Version::decode(r).await?; + let role = Role::decode(r).await?; Ok(Self { version, role }) } -} -impl Encode for SetupServer { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.version.encode(w)?; - self.role.encode(w)?; + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.version.encode(w).await?; + self.role.encode(w).await?; Ok(()) } diff --git a/moq-transport/src/control/version.rs b/moq-transport/src/setup/version.rs similarity index 56% rename from moq-transport/src/control/version.rs rename to moq-transport/src/setup/version.rs index 6c18cd5..1fe75d4 100644 --- a/moq-transport/src/control/version.rs +++ b/moq-transport/src/setup/version.rs @@ -1,6 +1,6 @@ -use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; +use crate::coding::{DecodeError, EncodeError, VarInt}; -use bytes::{Buf, BufMut}; +use webtransport_generic::{RecvStream, SendStream}; use std::ops::Deref; @@ -23,43 +23,40 @@ impl From for VarInt { } } -impl Decode for Version { - fn decode(r: &mut R) -> Result { - let v = VarInt::decode(r)?; +impl Version { + pub async fn decode(r: &mut R) -> Result { + let v = VarInt::decode(r).await?; Ok(Self(v)) } -} -impl Encode for Version { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { - self.0.encode(w) + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + self.0.encode(w).await?; + Ok(()) } } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Versions(pub Vec); -impl Decode for Versions { - fn decode(r: &mut R) -> Result { - let count = VarInt::decode(r)?.into_inner(); +impl Versions { + pub async fn decode(r: &mut R) -> Result { + let count = VarInt::decode(r).await?.into_inner(); let mut vs = Vec::new(); for _ in 0..count { - let v = Version::decode(r)?; + let v = Version::decode(r).await?; vs.push(v); } Ok(Self(vs)) } -} -impl Encode for Versions { - fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { let size: VarInt = self.0.len().try_into()?; - size.encode(w)?; + size.encode(w).await?; for v in &self.0 { - v.encode(w)?; + v.encode(w).await?; } Ok(()) diff --git a/moq-warp/Cargo.toml b/moq-warp/Cargo.toml index d7cf872..0e1b02b 100644 --- a/moq-warp/Cargo.toml +++ b/moq-warp/Cargo.toml @@ -16,15 +16,9 @@ categories = ["multimedia", "network-programming", "web-programming"] [dependencies] moq-transport = { path = "../moq-transport" } -moq-transport-quinn = { path = "../moq-transport-quinn" } +webtransport-generic = "0.5" tokio = "1.27" anyhow = "1.0.70" log = "0.4" # TODO remove bytes = "1.4" - -# QUIC stuff -quinn = "0.10" -ring = "0.16.20" -rustls = "0.21.2" -rustls-pemfile = "1.0.2" diff --git a/moq-warp/src/model/mod.rs b/moq-warp/src/model/mod.rs index 610d7b9..55e5425 100644 --- a/moq-warp/src/model/mod.rs +++ b/moq-warp/src/model/mod.rs @@ -2,4 +2,4 @@ pub mod broadcast; pub mod fragment; pub mod segment; pub mod track; -pub(crate) mod watch; +pub mod watch; diff --git a/moq-warp/src/model/segment.rs b/moq-warp/src/model/segment.rs index 685b7c5..5cac4e3 100644 --- a/moq-warp/src/model/segment.rs +++ b/moq-warp/src/model/segment.rs @@ -12,7 +12,7 @@ pub struct Info { pub sequence: VarInt, // The priority of the segment within the BROADCAST. - pub send_order: VarInt, + pub send_order: i32, // The time at which the segment expires for cache purposes. pub expires: Option, diff --git a/moq-warp/src/relay/broker.rs b/moq-warp/src/relay/broker.rs index bbcbc53..912c1e9 100644 --- a/moq-warp/src/relay/broker.rs +++ b/moq-warp/src/relay/broker.rs @@ -7,20 +7,20 @@ use std::sync::{Arc, Mutex}; use anyhow::Context; #[derive(Clone, Default)] -pub struct Broadcasts { +pub struct Broker { // Operate on the inner struct so we can share/clone the outer struct. - inner: Arc>, + inner: Arc>, } #[derive(Default)] -struct BroadcastsInner { +struct BrokerInner { // TODO Automatically reclaim dropped sources. lookup: HashMap>, - updates: watch::Publisher, + updates: watch::Publisher, } #[derive(Clone)] -pub enum Update { +pub enum BrokerUpdate { // Broadcast was announced Insert(String), // TODO include source? @@ -28,13 +28,13 @@ pub enum Update { Remove(String, broadcast::Error), } -impl Broadcasts { +impl Broker { pub fn new() -> Self { Default::default() } // Return the list of available broadcasts, and a subscriber that will return updates (add/remove). - pub fn available(&self) -> (Vec, watch::Subscriber) { + pub fn available(&self) -> (Vec, watch::Subscriber) { // Grab the lock. let this = self.inner.lock().unwrap(); @@ -55,7 +55,7 @@ impl Broadcasts { } this.lookup.insert(namespace.to_string(), source); - this.updates.push(Update::Insert(namespace.to_string())); + this.updates.push(BrokerUpdate::Insert(namespace.to_string())); Ok(()) } @@ -64,7 +64,7 @@ impl Broadcasts { let mut this = self.inner.lock().unwrap(); this.lookup.remove(namespace).context("namespace was not published")?; - this.updates.push(Update::Remove(namespace.to_string(), error)); + this.updates.push(BrokerUpdate::Remove(namespace.to_string(), error)); Ok(()) } diff --git a/moq-warp/src/relay/contribute.rs b/moq-warp/src/relay/contribute.rs index c443723..1b0c07d 100644 --- a/moq-warp/src/relay/contribute.rs +++ b/moq-warp/src/relay/contribute.rs @@ -6,26 +6,30 @@ use tokio::io::AsyncReadExt; use tokio::sync::mpsc; use tokio::task::JoinSet; // lock across await boundaries -use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; -use moq_transport_quinn::{RecvObjects, RecvStream}; +use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk}; +use moq_transport::{object, VarInt}; +use webtransport_generic::Session as WTSession; use bytes::BytesMut; use anyhow::Context; -use super::{broker, control}; use crate::model::{broadcast, segment, track}; +use crate::relay::{ + message::{Component, Contribute}, + Broker, +}; // TODO experiment with making this Clone, so every task can have its own copy. -pub struct Session { +pub struct Session { // Used to receive objects. - objects: RecvObjects, + objects: object::Receiver, // Used to send and receive control messages. - control: control::Component, + control: Component, // Globally announced namespaces, which we can add ourselves to. - broker: broker::Broadcasts, + broker: Broker, // The names of active broadcasts being produced. broadcasts: HashMap>, @@ -37,12 +41,8 @@ pub struct Session { run_segments: JoinSet>, // receiving objects } -impl Session { - pub fn new( - objects: RecvObjects, - control: control::Component, - broker: broker::Broadcasts, - ) -> Self { +impl Session { + pub fn new(objects: object::Receiver, control: Component, broker: Broker) -> Self { Self { objects, control, @@ -81,23 +81,23 @@ impl Session { } } - async fn receive_message(&mut self, msg: control::Contribute) -> anyhow::Result<()> { + async fn receive_message(&mut self, msg: Contribute) -> anyhow::Result<()> { match msg { - control::Contribute::Announce(msg) => self.receive_announce(msg).await, - control::Contribute::SubscribeOk(msg) => self.receive_subscribe_ok(msg), - control::Contribute::SubscribeError(msg) => self.receive_subscribe_error(msg), + Contribute::Announce(msg) => self.receive_announce(msg).await, + Contribute::SubscribeOk(msg) => self.receive_subscribe_ok(msg), + Contribute::SubscribeError(msg) => self.receive_subscribe_error(msg), } } - async fn receive_object(&mut self, object: Object, stream: RecvStream) -> anyhow::Result<()> { - let track = object.track; + async fn receive_object(&mut self, header: object::Header, stream: S::RecvStream) -> anyhow::Result<()> { + let track = header.track; // Keep objects in memory for 10s let expires = time::Instant::now() + time::Duration::from_secs(10); let segment = segment::Info { - sequence: object.sequence, - send_order: object.send_order, + sequence: header.sequence, + send_order: header.send_order, expires: Some(expires), }; @@ -115,19 +115,16 @@ impl Session { Ok(()) } - async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> { + async fn run_segment(mut segment: segment::Publisher, mut stream: S::RecvStream) -> anyhow::Result<()> { let mut buf = BytesMut::new(); - loop { - let size = stream.read_buf(&mut buf).await?; - if size == 0 { - return Ok(()); - } - + while stream.read_buf(&mut buf).await? > 0 { // Split off the data we read into the buffer, freezing it so multiple threads can read simitaniously. let data = buf.split().freeze(); segment.fragments.push(data); } + + Ok(()) } async fn receive_announce(&mut self, msg: Announce) -> anyhow::Result<()> { @@ -180,7 +177,7 @@ impl Session { } } -impl Drop for Session { +impl Drop for Session { fn drop(&mut self) { // Unannounce all broadcasts we have announced. // TODO make this automatic so we can't screw up? diff --git a/moq-warp/src/relay/distribute.rs b/moq-warp/src/relay/distribute.rs index 33fa6f4..ed7235d 100644 --- a/moq-warp/src/relay/distribute.rs +++ b/moq-warp/src/relay/distribute.rs @@ -1,33 +1,34 @@ use anyhow::Context; -use tokio::{io::AsyncWriteExt, task::JoinSet}; // allows locking across await +use tokio::io::AsyncWriteExt; +use tokio::task::JoinSet; // allows locking across await -use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; -use moq_transport_quinn::SendObjects; +use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk}; +use moq_transport::{object, VarInt}; +use webtransport_generic::Session as WTSession; -use super::{broker, control}; use crate::model::{segment, track}; +use crate::relay::{ + message::{Component, Distribute}, + Broker, BrokerUpdate, +}; -pub struct Session { +pub struct Session { // Objects are sent to the client - objects: SendObjects, + objects: object::Sender, // Used to send and receive control messages. - control: control::Component, + control: Component, // Globally announced namespaces, which can be subscribed to. - broker: broker::Broadcasts, + broker: Broker, // A list of tasks that are currently running. run_subscribes: JoinSet, // run subscriptions, sending the returned error if they fail } -impl Session { - pub fn new( - objects: SendObjects, - control: control::Component, - broker: broker::Broadcasts, - ) -> Self { +impl Session { + pub fn new(objects: object::Sender, control: Component, broker: Broker) -> Self { Self { objects, control, @@ -40,7 +41,7 @@ impl Session { // Announce all available tracks and get a stream of updates. let (available, mut updates) = self.broker.available(); for namespace in available { - self.on_available(broker::Update::Insert(namespace)).await?; + self.on_available(BrokerUpdate::Insert(namespace)).await?; } loop { @@ -61,11 +62,11 @@ impl Session { } } - async fn receive_message(&mut self, msg: control::Distribute) -> anyhow::Result<()> { + async fn receive_message(&mut self, msg: Distribute) -> anyhow::Result<()> { match msg { - control::Distribute::AnnounceOk(msg) => self.receive_announce_ok(msg), - control::Distribute::AnnounceError(msg) => self.receive_announce_error(msg), - control::Distribute::Subscribe(msg) => self.receive_subscribe(msg).await, + Distribute::AnnounceOk(msg) => self.receive_announce_ok(msg), + Distribute::AnnounceError(msg) => self.receive_announce_error(msg), + Distribute::Subscribe(msg) => self.receive_subscribe(msg).await, } } @@ -119,7 +120,11 @@ impl Session { Ok(()) } - async fn run_subscribe(objects: SendObjects, track_id: VarInt, mut track: track::Subscriber) -> SubscribeError { + async fn run_subscribe( + objects: object::Sender, + track_id: VarInt, + mut track: track::Subscriber, + ) -> SubscribeError { let mut tasks = JoinSet::new(); let mut result = None; @@ -154,11 +159,11 @@ impl Session { } async fn serve_group( - mut objects: SendObjects, + mut objects: object::Sender, track_id: VarInt, mut segment: segment::Subscriber, ) -> anyhow::Result<()> { - let object = Object { + let object = object::Header { track: track_id, group: segment.sequence, sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object @@ -168,8 +173,8 @@ impl Session { let mut stream = objects.open(object).await?; // Write each fragment as they are available. - while let Some(mut fragment) = segment.fragments.next().await { - stream.write_all_buf(&mut fragment).await?; + while let Some(fragment) = segment.fragments.next().await { + stream.write_all(&fragment).await?; } // NOTE: stream is automatically closed when dropped @@ -177,16 +182,16 @@ impl Session { Ok(()) } - async fn on_available(&mut self, delta: broker::Update) -> anyhow::Result<()> { + async fn on_available(&mut self, delta: BrokerUpdate) -> anyhow::Result<()> { match delta { - broker::Update::Insert(name) => { + BrokerUpdate::Insert(name) => { self.control .send(Announce { track_namespace: name.clone(), }) .await } - broker::Update::Remove(name, error) => { + BrokerUpdate::Remove(name, error) => { self.control .send(AnnounceError { track_namespace: name, diff --git a/moq-warp/src/relay/control.rs b/moq-warp/src/relay/message.rs similarity index 84% rename from moq-warp/src/relay/control.rs rename to moq-warp/src/relay/message.rs index c53a359..04c2f9c 100644 --- a/moq-warp/src/relay/control.rs +++ b/moq-warp/src/relay/message.rs @@ -1,11 +1,13 @@ use tokio::sync::mpsc; -use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk}; -use moq_transport_quinn::{RecvControl, SendControl}; +use moq_transport::message::{ + self, Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk, +}; +use webtransport_generic::Session; -pub struct Main { - send_control: SendControl, - recv_control: RecvControl, +pub struct Main { + send_control: message::Sender, + recv_control: message::Receiver, outgoing: mpsc::Receiver, @@ -13,7 +15,7 @@ pub struct Main { distribute: mpsc::Sender, } -impl Main { +impl Main { pub async fn run(mut self) -> anyhow::Result<()> { loop { tokio::select! { @@ -53,10 +55,10 @@ impl Component { } // Splits a control stream into two components, based on if it's a message for contribution or distribution. -pub fn split( - send_control: SendControl, - recv_control: RecvControl, -) -> (Main, Component, Component) { +pub fn split( + send_control: message::Sender, + recv_control: message::Receiver, +) -> (Main, Component, Component) { let (outgoing_tx, outgoing_rx) = mpsc::channel(1); let (contribute_tx, contribute_rx) = mpsc::channel(1); let (distribute_tx, distribute_rx) = mpsc::channel(1); diff --git a/moq-warp/src/relay/mod.rs b/moq-warp/src/relay/mod.rs index 109e3a5..6dcc217 100644 --- a/moq-warp/src/relay/mod.rs +++ b/moq-warp/src/relay/mod.rs @@ -1,8 +1,8 @@ -pub mod broker; - +mod broker; mod contribute; -mod control; mod distribute; +mod message; mod session; +pub use broker::*; pub use session::*; diff --git a/moq-warp/src/relay/session.rs b/moq-warp/src/relay/session.rs index 392ef96..e9ae7f0 100644 --- a/moq-warp/src/relay/session.rs +++ b/moq-warp/src/relay/session.rs @@ -1,17 +1,19 @@ -use super::{broker, contribute, control, distribute}; +use crate::relay::{contribute, distribute, message, Broker}; -pub struct Session { +use webtransport_generic::Session as WTSession; + +pub struct Session { // Split logic into contribution/distribution to reduce the problem space. - contribute: contribute::Session, - distribute: distribute::Session, + contribute: contribute::Session, + distribute: distribute::Session, // Used to receive control messages and forward to contribute/distribute. - control: control::Main, + control: message::Main, } -impl Session { - pub fn new(session: moq_transport_quinn::Session, broker: broker::Broadcasts) -> Session { - let (control, contribute, distribute) = control::split(session.send_control, session.recv_control); +impl Session { + pub fn new(session: moq_transport::Session, broker: Broker) -> Self { + let (control, contribute, distribute) = message::split(session.send_control, session.recv_control); let contribute = contribute::Session::new(session.recv_objects, contribute, broker.clone()); let distribute = distribute::Session::new(session.send_objects, distribute, broker);