diff --git a/Cargo.lock b/Cargo.lock index 0a746fa..7c85782 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4fa78e18c64fce05e902adecd7a5eed15a5e0a3439f7b0e169f0252214865e3" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aho-corasick" version = "1.0.2" @@ -66,6 +81,113 @@ version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix", + "slab", + "socket2 0.4.9", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" + +[[package]] +name = "atomic-waker" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" + [[package]] name = "atty" version = "0.2.14" @@ -83,6 +205,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backtrace" +version = "0.3.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.13.1" @@ -110,6 +247,21 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", + "log", +] + [[package]] name = "bumpalo" version = "3.13.0" @@ -188,6 +340,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "concurrent-queue" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -213,6 +374,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -276,6 +446,12 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fastrand" version = "1.9.0" @@ -348,6 +524,21 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.28" @@ -410,6 +601,24 @@ dependencies = [ "wasi", ] +[[package]] +name = "gimli" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" + +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.19" @@ -429,48 +638,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "h3" -version = "0.0.2" -source = "git+https://github.com/hyperium/h3?branch=master#3ef7c1a37b635e8446322d8f8d3a68580a208ad8" -dependencies = [ - "bytes", - "fastrand", - "futures-util", - "http", - "pin-project-lite", - "tokio", - "tracing", -] - -[[package]] -name = "h3-quinn" -version = "0.0.3" -source = "git+https://github.com/hyperium/h3?branch=master#3ef7c1a37b635e8446322d8f8d3a68580a208ad8" -dependencies = [ - "bytes", - "futures", - "h3", - "quinn", - "quinn-proto", - "tokio", - "tokio-util", -] - -[[package]] -name = "h3-webtransport" -version = "0.1.0" -source = "git+https://github.com/hyperium/h3?branch=master#3ef7c1a37b635e8446322d8f8d3a68580a208ad8" -dependencies = [ - "bytes", - "futures-util", - "h3", - "http", - "pin-project-lite", - "tokio", - "tracing", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -669,6 +836,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "libc" version = "0.2.146" @@ -696,6 +872,9 @@ name = "log" version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +dependencies = [ + "value-bag", +] [[package]] name = "memchr" @@ -719,6 +898,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.8.8" @@ -740,6 +928,7 @@ dependencies = [ "hex", "log", "moq-transport", + "moq-transport-quinn", "moq-warp", "quinn", "ring", @@ -747,6 +936,7 @@ dependencies = [ "rustls-pemfile", "tokio", "warp", + "webtransport-quinn", ] [[package]] @@ -764,15 +954,13 @@ version = "0.1.0" dependencies = [ "anyhow", "bytes", - "h3", - "h3-quinn", - "h3-webtransport", "http", "log", "moq-transport", "quinn", "thiserror", "tokio", + "webtransport-quinn", ] [[package]] @@ -876,6 +1064,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bda667d9f2b5051b8833f59f3bf748b28ef54f850f4fcb389a252aa383866d1" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.18.0" @@ -888,6 +1085,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "parking" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" + [[package]] name = "parking_lot" version = "0.12.1" @@ -949,6 +1152,22 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -971,7 +1190,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21252f1c0fc131f1b69182db8f34837e8a69737b8251dff75636a9be0518c324" dependencies = [ "bytes", - "futures-io", "pin-project-lite", "quinn-proto", "quinn-udp", @@ -1093,6 +1311,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + [[package]] name = "rustc-hash" version = "1.1.0" @@ -1401,11 +1625,12 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.28.2" +version = "1.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" +checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" dependencies = [ "autocfg", + "backtrace", "bytes", "libc", "mio", @@ -1606,12 +1831,24 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "value-bag" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" + [[package]] name = "version_check" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "want" version = "0.3.1" @@ -1684,6 +1921,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.87" @@ -1733,6 +1982,46 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webtransport-generic" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ba4583e96bb0ef08142f868bf0d28f90211eced56a473768ee27446864a2310" +dependencies = [ + "bytes", + "log", + "thiserror", +] + +[[package]] +name = "webtransport-proto" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21fefb5728651d507b444659853b47896116179ea8fd0348d02de080250892c7" +dependencies = [ + "bytes", + "http", + "thiserror", +] + +[[package]] +name = "webtransport-quinn" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c645a48f4bac5ce504cef2dd02b373f8b8a2a7de9a72f59395a54799958f3cf2" +dependencies = [ + "async-std", + "bytes", + "futures", + "http", + "quinn", + "quinn-proto", + "thiserror", + "tokio", + "webtransport-generic", + "webtransport-proto", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/moq-demo/Cargo.toml b/moq-demo/Cargo.toml index de84b7e..e0228e0 100644 --- a/moq-demo/Cargo.toml +++ b/moq-demo/Cargo.toml @@ -1,25 +1,27 @@ [package] name = "moq-demo" description = "Media over QUIC" -authors = [ "Luke Curley" ] +authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" version = "0.1.0" edition = "2021" -keywords = [ "quic", "http3", "webtransport", "media", "live" ] -categories = [ "multimedia", "network-programming", "web-programming" ] +keywords = ["quic", "http3", "webtransport", "media", "live"] +categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] moq-transport = { path = "../moq-transport" } +moq-transport-quinn = { path = "../moq-transport-quinn" } moq-warp = { path = "../moq-warp" } # QUIC quinn = "0.10" +webtransport-quinn = "0.4" # Crypto ring = "0.16.20" @@ -34,7 +36,7 @@ warp = { version = "0.3.3", features = ["tls"] } hex = "0.4.3" # Logging -clap = { version = "4.0", features = [ "derive" ] } +clap = { version = "4.0", features = ["derive"] } log = { version = "0.4", features = ["std"] } env_logger = "0.9.3" anyhow = "1.0.70" diff --git a/moq-demo/src/main.rs b/moq-demo/src/main.rs index 2c1fdcc..59dc147 100644 --- a/moq-demo/src/main.rs +++ b/moq-demo/src/main.rs @@ -7,6 +7,9 @@ use warp::Filter; use moq_warp::{relay, source}; +mod server; +use server::*; + /// Search for a pattern in a file and display the lines that contain it. #[derive(Parser, Clone)] struct Cli { @@ -45,14 +48,14 @@ async fn main() -> anyhow::Result<()> { .context("failed to announce file source")?; // Create a server to actually serve the media - let config = relay::ServerConfig { + let config = ServerConfig { addr: args.addr, cert: args.cert, key: args.key, broker, }; - let server = relay::Server::new(config).context("failed to create server")?; + let server = Server::new(config).context("failed to create server")?; // Run all of the above tokio::select! { diff --git a/moq-warp/src/relay/server.rs b/moq-demo/src/server.rs similarity index 61% rename from moq-warp/src/relay/server.rs rename to moq-demo/src/server.rs index dfbbcfd..5c9692b 100644 --- a/moq-warp/src/relay/server.rs +++ b/moq-demo/src/server.rs @@ -1,4 +1,4 @@ -use super::{broker, Session}; +use moq_warp::relay::broker; use std::{fs, io, net, path, sync, time}; @@ -7,21 +7,19 @@ use anyhow::Context; use tokio::task::JoinSet; pub struct Server { - // The MoQ transport server. - server: moq_transport_quinn::Server, + server: quinn::Endpoint, // The media sources. broker: broker::Broadcasts, - // Sessions actively being run. - tasks: JoinSet>, + // The active connections. + conns: JoinSet>, } pub struct ServerConfig { pub addr: net::SocketAddr, pub cert: path::PathBuf, pub key: path::PathBuf, - pub broker: broker::Broadcasts, } @@ -53,14 +51,7 @@ impl Server { .with_single_cert(certs, key)?; tls_config.max_early_data_size = u32::MAX; - let alpn: Vec> = vec![ - b"h3".to_vec(), - b"h3-32".to_vec(), - b"h3-31".to_vec(), - b"h3-30".to_vec(), - b"h3-29".to_vec(), - ]; - tls_config.alpn_protocols = alpn; + tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; let mut server_config = quinn::ServerConfig::with_crypto(sync::Arc::new(tls_config)); @@ -74,32 +65,54 @@ impl Server { let server = quinn::Endpoint::server(server_config, config.addr)?; let broker = config.broker; - let server = moq_transport_quinn::Server::new(server); - let tasks = JoinSet::new(); + let conns = JoinSet::new(); - Ok(Self { server, broker, tasks }) + Ok(Self { server, broker, conns }) } pub async fn run(mut self) -> anyhow::Result<()> { loop { tokio::select! { res = self.server.accept() => { - let session = res.context("failed to accept connection")?; + let conn = res.context("failed to accept QUIC connection")?; let broker = self.broker.clone(); - self.tasks.spawn(async move { - let session: Session = Session::accept(session, broker).await?; - session.run().await - }); + self.conns.spawn(async move { Self::handle(conn, broker).await }); }, - res = self.tasks.join_next(), if !self.tasks.is_empty() => { + res = self.conns.join_next(), if !self.conns.is_empty() => { let res = res.expect("no tasks").expect("task aborted"); - if let Err(err) = res { - log::error!("session terminated: {:?}", err); + log::error!("connection terminated: {:?}", err); } }, } } } + + async fn handle(conn: quinn::Connecting, broker: broker::Broadcasts) -> anyhow::Result<()> { + // Wait for the QUIC connection to be established. + let conn = conn.await.context("failed to establish QUIC connection")?; + + // Wait for the CONNECT request. + let request = webtransport_quinn::accept(conn) + .await + .context("failed to receive WebTransport request")?; + + // TODO parse the request URI + + // Accept the CONNECT request. + let session = request + .ok() + .await + .context("failed to respond to WebTransport request")?; + + // Perform the MoQ handshake. + let session = moq_transport_quinn::accept(session, moq_transport::Role::Both) + .await + .context("failed to perform MoQ handshake")?; + + // Run the relay code. + let session = moq_warp::relay::Session::new(session, broker); + session.run().await + } } diff --git a/moq-transport-quinn/Cargo.toml b/moq-transport-quinn/Cargo.toml index b525beb..f640db7 100644 --- a/moq-transport-quinn/Cargo.toml +++ b/moq-transport-quinn/Cargo.toml @@ -1,31 +1,26 @@ [package] name = "moq-transport-quinn" description = "Media over QUIC" -authors = [ "Luke Curley" ] +authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" version = "0.1.0" edition = "2021" -keywords = [ "quic", "http3", "webtransport", "media", "live" ] -categories = [ "multimedia", "network-programming", "web-programming" ] +keywords = ["quic", "http3", "webtransport", "media", "live"] +categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] moq-transport = { path = "../moq-transport" } -# WebTransport support: TODO pin a version when released -h3 = { git = "https://github.com/hyperium/h3", branch = "master" } -h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" } -h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" } quinn = "0.10" http = "0.2" - +webtransport-quinn = "0.4.1" tokio = { version = "1.27", features = ["macros"] } bytes = "1" - log = "0.4" anyhow = "1.0.70" thiserror = "1.0.21" diff --git a/moq-transport-quinn/src/control.rs b/moq-transport-quinn/src/control.rs index d4fd823..5dd3f84 100644 --- a/moq-transport-quinn/src/control.rs +++ b/moq-transport-quinn/src/control.rs @@ -1,51 +1,24 @@ +use anyhow::Context; use moq_transport::{Decode, DecodeError, Encode, Message}; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Buf, BufMut, BytesMut}; -use h3::quic::BidiStream; use std::io::Cursor; use std::sync::Arc; use tokio::sync::Mutex; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use webtransport_quinn::{RecvStream, SendStream}; -pub struct Control { - sender: ControlSend, - recver: ControlRecv, -} - -impl Control { - pub(crate) fn new(stream: h3_webtransport::stream::BidiStream, Bytes>) -> Self { - let (sender, recver) = stream.split(); - let sender = ControlSend::new(sender); - let recver = ControlRecv::new(recver); - - Self { sender, recver } - } - - pub fn split(self) -> (ControlSend, ControlRecv) { - (self.sender, self.recver) - } - - pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { - self.sender.send(msg).await - } - - pub async fn recv(&mut self) -> anyhow::Result { - self.recver.recv().await - } -} - -pub struct ControlSend { - stream: h3_webtransport::stream::SendStream, Bytes>, +pub struct SendControl { + stream: SendStream, buf: BytesMut, // reuse a buffer to encode messages. } -impl ControlSend { - pub fn new(inner: h3_webtransport::stream::SendStream, Bytes>) -> Self { +impl SendControl { + pub fn new(stream: SendStream) -> Self { Self { buf: BytesMut::new(), - stream: inner, + stream, } } @@ -74,7 +47,7 @@ impl ControlSend { // There's no equivalent for receiving since only one thread should be receiving at a time. #[derive(Clone)] pub struct ControlShared { - stream: Arc>, + stream: Arc>, } impl ControlShared { @@ -84,16 +57,16 @@ impl ControlShared { } } -pub struct ControlRecv { - stream: h3_webtransport::stream::RecvStream, +pub struct RecvControl { + stream: RecvStream, buf: BytesMut, // data we've read but haven't fully decoded yet } -impl ControlRecv { - pub fn new(inner: h3_webtransport::stream::RecvStream) -> Self { +impl RecvControl { + pub fn new(stream: RecvStream) -> Self { Self { buf: BytesMut::new(), - stream: inner, + stream, } } @@ -113,7 +86,8 @@ impl ControlRecv { } Err(DecodeError::UnexpectedEnd) => { // The decode failed, so we need to append more data. - self.stream.read_buf(&mut self.buf).await?; + let chunk = self.stream.read_chunk(1024, true).await?.context("stream closed")?; + self.buf.put(chunk.bytes); } Err(e) => return Err(e.into()), } diff --git a/moq-transport-quinn/src/lib.rs b/moq-transport-quinn/src/lib.rs index c07ce03..d210460 100644 --- a/moq-transport-quinn/src/lib.rs +++ b/moq-transport-quinn/src/lib.rs @@ -1,7 +1,9 @@ mod control; mod object; -mod server; +mod session; +mod stream; pub use control::*; pub use object::*; -pub use server::*; +pub use session::*; +pub use stream::*; diff --git a/moq-transport-quinn/src/object.rs b/moq-transport-quinn/src/object.rs index 6f64a50..bac4716 100644 --- a/moq-transport-quinn/src/object.rs +++ b/moq-transport-quinn/src/object.rs @@ -1,136 +1,147 @@ +use std::{collections::BinaryHeap, io::Cursor, sync::Arc}; + use anyhow::Context; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::BytesMut; use moq_transport::{Decode, DecodeError, Encode, Object}; -use std::{io::Cursor, sync::Arc}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncWriteExt; +use tokio::task::JoinSet; +use tokio::{io::AsyncBufReadExt, sync::Mutex}; +use webtransport_quinn::Session; -// TODO support clients -type WebTransportSession = h3_webtransport::server::WebTransportSession; - -// Reduce some typing -pub type SendStream = h3_webtransport::stream::SendStream, Bytes>; -pub type RecvStream = h3_webtransport::stream::RecvStream; - -pub struct Objects { - send: SendObjects, - recv: RecvObjects, -} - -impl Objects { - pub fn new(session: Arc) -> Self { - let send = SendObjects::new(session.clone()); - let recv = RecvObjects::new(session); - Self { send, recv } - } - - pub fn split(self) -> (SendObjects, RecvObjects) { - (self.send, self.recv) - } - - pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> { - self.recv.recv().await - } - - pub async fn send(&mut self, header: Object) -> anyhow::Result { - self.send.send(header).await - } -} +use crate::{RecvStream, SendStream, SendStreamOrder}; +// Allow this to be cloned so we can have multiple senders. +#[derive(Clone)] pub struct SendObjects { - session: Arc, - - // A reusable buffer for encoding messages. - buf: BytesMut, + // This is a tokio mutex since we need to lock across await boundaries. + inner: Arc>, } impl SendObjects { - pub fn new(session: Arc) -> Self { + pub fn new(session: Session) -> Self { + let inner = SendObjectsInner::new(session); + Self { + inner: Arc::new(Mutex::new(inner)), + } + } + + pub async fn open(&mut self, header: Object) -> anyhow::Result { + let mut inner = self.inner.lock().await; + inner.open(header).await + } +} + +struct SendObjectsInner { + session: Session, + + // Quinn supports a i32 for priority, but the wire format is a u64. + // Our work around is to keep a list of streams in priority order and use the index as the priority. + // This involves more work, so TODO either increase the Quinn size or reduce the wire size. + ordered: BinaryHeap, + ordered_swap: BinaryHeap, // reuse memory to avoid allocations + + // A reusable buffer for encoding headers. + // TODO figure out how to use BufMut on the stack and remove this. + buf: BytesMut, +} + +impl SendObjectsInner { + fn new(session: Session) -> Self { Self { session, + ordered: BinaryHeap::new(), + ordered_swap: BinaryHeap::new(), buf: BytesMut::new(), } } - pub async fn send(&mut self, header: Object) -> anyhow::Result { + pub async fn open(&mut self, header: Object) -> anyhow::Result { + let stream = self.session.open_uni().await.context("failed to open uni stream")?; + let (mut stream, priority) = SendStream::with_order(stream, header.send_order.into_inner()); + + // Add the priority to our existing list. + self.ordered.push(priority); + + // Loop through the list and update the priorities of any still active streams. + let mut index = 0; + while let Some(stream) = self.ordered.pop() { + if stream.update(index).is_ok() { + // Add the stream to the new list so it'll be in sorted order. + self.ordered_swap.push(stream); + index += 1; + } + } + + // Swap the lists so we can reuse the memory. + std::mem::swap(&mut self.ordered, &mut self.ordered_swap); + + // Encode and write the stream header. + // TODO do this in SendStream so we don't hold the lock. + // Otherwise, self.buf.clear(); header.encode(&mut self.buf).unwrap(); - - let mut stream = self - .session - .open_uni(self.session.session_id()) - .await - .context("failed to open uni stream")?; - - // TODO support select! without making a new stream. - stream.write_all(&self.buf).await?; + stream.write_all(&self.buf).await.context("failed to write header")?; Ok(stream) } } -impl Clone for SendObjects { - fn clone(&self) -> Self { - Self { - session: self.session.clone(), - buf: BytesMut::new(), - } - } -} - // Not clone, so we don't accidentally have two listners. pub struct RecvObjects { - session: Arc, + session: Session, - // A uni stream that's been accepted but not fully read from yet. - stream: Option, - - // Data that we've read but haven't formed a full message yet. - buf: BytesMut, + // Streams that we've accepted but haven't read the header from yet. + streams: JoinSet>, } impl RecvObjects { - pub fn new(session: Arc) -> Self { + pub fn new(session: Session) -> Self { Self { session, - stream: None, - buf: BytesMut::new(), + streams: JoinSet::new(), } } pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> { - // Make sure any state is saved across await boundaries so this works with select! - - let stream = match self.stream.as_mut() { - Some(stream) => stream, - None => { - let (_session_id, stream) = self - .session - .accept_uni() - .await - .context("failed to accept uni stream")? - .context("no uni stream")?; - - self.stream.insert(stream) - } - }; - loop { - // Read the contents of the buffer - let mut peek = Cursor::new(&self.buf); - - match Object::decode(&mut peek) { - Ok(header) => { - let stream = self.stream.take().unwrap(); - self.buf.advance(peek.position() as usize); - return Ok((header, stream)); + tokio::select! { + res = self.session.accept_uni() => { + let stream = res.context("failed to accept stream")?; + self.streams.spawn(async move { Self::read(stream).await }); + }, + res = self.streams.join_next(), if !self.streams.is_empty() => { + return res.unwrap().context("failed to run join set")?; } - Err(DecodeError::UnexpectedEnd) => { - // The decode failed, so we need to append more data. - stream.read_buf(&mut self.buf).await?; - } - Err(e) => return Err(e.into()), } } } + + async fn read(stream: webtransport_quinn::RecvStream) -> anyhow::Result<(Object, RecvStream)> { + let mut stream = RecvStream::new(stream); + + loop { + // Read more data into the buffer. + let data = stream.fill_buf().await?; + if data.is_empty() { + anyhow::bail!("stream closed before reading header"); + } + + // Use a cursor to read the buffer and remember how much we read. + let mut read = Cursor::new(data); + + let header = match Object::decode(&mut read) { + Ok(header) => header, + Err(DecodeError::UnexpectedEnd) => continue, + Err(err) => return Err(err.into()), + }; + + // We parsed a full header, advance the cursor. + // The borrow checker requires these on separate lines. + let size = read.position() as usize; + stream.consume(size); + + return Ok((header, stream)); + } + } } diff --git a/moq-transport-quinn/src/server.rs b/moq-transport-quinn/src/server.rs deleted file mode 100644 index 366036d..0000000 --- a/moq-transport-quinn/src/server.rs +++ /dev/null @@ -1,179 +0,0 @@ -use std::sync::Arc; - -use anyhow::Context; -use bytes::Bytes; -use tokio::task::JoinSet; - -use moq_transport::{Message, SetupClient, SetupServer}; - -use super::{Control, Objects}; - -pub struct Server { - // The QUIC server, yielding new connections and sessions. - endpoint: quinn::Endpoint, - - // A list of connections that are completing the WebTransport handshake. - handshake: JoinSet>, -} - -impl Server { - pub fn new(endpoint: quinn::Endpoint) -> Self { - let handshake = JoinSet::new(); - Self { endpoint, handshake } - } - - // Accept the next WebTransport session. - pub async fn accept(&mut self) -> anyhow::Result { - loop { - tokio::select!( - // Accept the connection and start the WebTransport handshake. - conn = self.endpoint.accept() => { - let conn = conn.context("failed to accept connection")?; - self.handshake.spawn(async move { - Connecting::new(conn).accept().await - }); - }, - // Return any mostly finished WebTransport handshakes. - res = self.handshake.join_next(), if !self.handshake.is_empty() => { - let res = res.expect("no tasks").expect("task aborted"); - match res { - Ok(session) => return Ok(session), - Err(err) => log::warn!("failed to accept session: {:?}", err), - } - }, - ) - } - } -} - -struct Connecting { - conn: quinn::Connecting, -} - -impl Connecting { - pub fn new(conn: quinn::Connecting) -> Self { - Self { conn } - } - - pub async fn accept(self) -> anyhow::Result { - let conn = self.conn.await.context("failed to accept h3 connection")?; - - let mut conn = h3::server::builder() - .enable_webtransport(true) - .enable_connect(true) - .enable_datagram(true) - .max_webtransport_sessions(1) - .send_grease(true) - .build(h3_quinn::Connection::new(conn)) - .await - .context("failed to create h3 server")?; - - let (req, stream) = conn - .accept() - .await - .context("failed to accept h3 session")? - .context("failed to accept h3 request")?; - - let ext = req.extensions(); - anyhow::ensure!(req.method() == http::Method::CONNECT, "expected CONNECT request"); - anyhow::ensure!( - ext.get::() == Some(&h3::ext::Protocol::WEB_TRANSPORT), - "expected WebTransport CONNECT" - ); - - // Let the application decide if we accept this CONNECT request. - Ok(Connect { conn, req, stream }) - } -} - -// The WebTransport CONNECT has arrived, and we need to decide if we accept it. -pub struct Connect { - // Inspect to decide whether to accept() or reject() the session. - req: http::Request<()>, - - conn: h3::server::Connection, - stream: h3::server::RequestStream, Bytes>, -} - -impl Connect { - // Expose the received URI - pub fn uri(&self) -> &http::Uri { - self.req.uri() - } - - // Accept the WebTransport session. - pub async fn accept(self) -> anyhow::Result { - let session = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?; - let session = Arc::new(session); - - let stream = session - .accept_bi() - .await - .context("failed to accept bidi stream")? - .unwrap(); - - let objects = Objects::new(session.clone()); - - let stream = match stream { - h3_webtransport::server::AcceptedBi::BidiStream(_session_id, stream) => stream, - h3_webtransport::server::AcceptedBi::Request(..) => anyhow::bail!("additional http requests not supported"), - }; - - let mut control = Control::new(stream); - let setup = match control.recv().await.context("failed to read SETUP")? { - Message::SetupClient(setup) => setup, - _ => anyhow::bail!("expected CLIENT SETUP"), - }; - - // Let the application decide if we accept this MoQ session. - Ok(Setup { - setup, - control, - objects, - }) - } - - // Reject the WebTransport session with a HTTP response. - pub async fn reject(mut self, resp: http::Response<()>) -> anyhow::Result<()> { - self.stream.send_response(resp).await?; - Ok(()) - } -} - -pub struct Setup { - setup: SetupClient, - control: Control, - objects: Objects, -} - -impl Setup { - // Return the setup message we received. - pub fn setup(&self) -> &SetupClient { - &self.setup - } - - // Accept the session with our own setup message. - pub async fn accept(mut self, setup: SetupServer) -> anyhow::Result { - self.control.send(setup).await?; - Ok(Session { - control: self.control, - objects: self.objects, - }) - } - - pub async fn reject(self) -> anyhow::Result<()> { - // TODO Close the QUIC connection with an error code. - Ok(()) - } -} - -pub struct Session { - pub control: Control, - pub objects: Objects, -} - -impl Session { - pub fn split(self) -> (Control, Objects) { - (self.control, self.objects) - } -} diff --git a/moq-transport-quinn/src/session.rs b/moq-transport-quinn/src/session.rs new file mode 100644 index 0000000..4ae0130 --- /dev/null +++ b/moq-transport-quinn/src/session.rs @@ -0,0 +1,98 @@ +use anyhow::Context; + +use moq_transport::{Message, SetupClient, SetupServer}; + +use super::{RecvControl, RecvObjects, SendControl, SendObjects}; + +/// Called by a server with an established WebTransport session. +// TODO close the session with an error code +pub async fn accept(session: webtransport_quinn::Session, role: moq_transport::Role) -> anyhow::Result { + let (send, recv) = session.accept_bi().await.context("failed to accept bidi stream")?; + + let mut send_control = SendControl::new(send); + let mut recv_control = RecvControl::new(recv); + + let setup_client = match recv_control.recv().await.context("failed to read SETUP")? { + Message::SetupClient(setup) => setup, + _ => anyhow::bail!("expected CLIENT SETUP"), + }; + + setup_client + .versions + .iter() + .find(|version| **version == moq_transport::Version::DRAFT_00) + .context("no supported versions")?; + + if !setup_client.role.compatible(role) { + anyhow::bail!("incompatible roles: {:?} {:?}", setup_client.role, role); + } + + let setup_server = SetupServer { + role, + version: moq_transport::Version::DRAFT_00, + }; + + send_control + .send(moq_transport::Message::SetupServer(setup_server)) + .await + .context("failed to send setup server")?; + + let send_objects = SendObjects::new(session.clone()); + let recv_objects = RecvObjects::new(session.clone()); + + Ok(Session { + send_control, + recv_control, + send_objects, + recv_objects, + }) +} + +/// Called by a client with an established WebTransport session. +pub async fn connect(session: webtransport_quinn::Session, role: moq_transport::Role) -> anyhow::Result { + let (send, recv) = session.open_bi().await.context("failed to oen bidi stream")?; + + let mut send_control = SendControl::new(send); + let mut recv_control = RecvControl::new(recv); + + let setup_client = SetupClient { + role, + versions: vec![moq_transport::Version::DRAFT_00].into(), + path: "".to_string(), + }; + + send_control + .send(moq_transport::Message::SetupClient(setup_client)) + .await + .context("failed to send SETUP CLIENT")?; + + let setup_server = match recv_control.recv().await.context("failed to read SETUP")? { + Message::SetupServer(setup) => setup, + _ => anyhow::bail!("expected SERVER SETUP"), + }; + + if setup_server.version != moq_transport::Version::DRAFT_00 { + anyhow::bail!("unsupported version: {:?}", setup_server.version); + } + + if !setup_server.role.compatible(role) { + anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role); + } + + let send_objects = SendObjects::new(session.clone()); + let recv_objects = RecvObjects::new(session.clone()); + + Ok(Session { + send_control, + recv_control, + send_objects, + recv_objects, + }) +} + +pub struct Session { + pub send_control: SendControl, + pub recv_control: RecvControl, + pub send_objects: SendObjects, + pub recv_objects: RecvObjects, +} diff --git a/moq-transport-quinn/src/stream.rs b/moq-transport-quinn/src/stream.rs new file mode 100644 index 0000000..e5561ce --- /dev/null +++ b/moq-transport-quinn/src/stream.rs @@ -0,0 +1,115 @@ +use std::{ + io, + ops::{Deref, DerefMut}, + pin::Pin, + sync::{Arc, Mutex, Weak}, + task, +}; + +use tokio::io::{AsyncWrite, BufReader}; + +// Ugh, so we need to wrap SendStream with a mutex because we need to be able to call set_priority on it. +// The problem is that set_priority takes a i32, while send_order is a VarInt +// So the solution is to maintain a priority queue of active streams and constantly update the priority with their index. +// So the library might update the priority of the stream at any point, while the application might similtaniously write to it. +// The only upside is that we don't expose set_priority, so the application can't screw with things. +pub struct SendStream { + stream: Arc>, +} + +impl SendStream { + // Create a new stream with the given order, returning a handle that allows us to update the priority. + pub(crate) fn with_order(stream: webtransport_quinn::SendStream, order: u64) -> (Self, SendStreamOrder) { + let stream = Arc::new(Mutex::new(stream)); + let weak = Arc::>::downgrade(&stream); + + (SendStream { stream }, SendStreamOrder { stream: weak, order }) + } +} + +pub(crate) struct SendStreamOrder { + // We use Weak here so we don't prevent the stream from being closed when dereferenced. + // update() will return an error if the stream was closed instead. + stream: Weak>, + order: u64, +} + +impl SendStreamOrder { + pub(crate) fn update(&self, index: i32) -> Result<(), webtransport_quinn::StreamClosed> { + let stream = self.stream.upgrade().ok_or(webtransport_quinn::StreamClosed)?; + let mut stream = stream.lock().unwrap(); + stream.set_priority(index) + } +} + +impl PartialEq for SendStreamOrder { + fn eq(&self, other: &Self) -> bool { + self.order == other.order + } +} + +impl Eq for SendStreamOrder {} + +impl PartialOrd for SendStreamOrder { + fn partial_cmp(&self, other: &Self) -> Option { + // We reverse the order so the lower send order is higher priority. + other.order.partial_cmp(&self.order) + } +} + +impl Ord for SendStreamOrder { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // We reverse the order so the lower send order is higher priority. + other.order.cmp(&self.order) + } +} + +// We implement AsyncWrite so we can grab the mutex on each write attempt, instead of holding it for the entire async function. +impl AsyncWrite for SendStream { + fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> task::Poll> { + let mut stream = self.stream.lock().unwrap(); + Pin::new(&mut *stream).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll> { + let mut stream = self.stream.lock().unwrap(); + Pin::new(&mut *stream).poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll> { + let mut stream = self.stream.lock().unwrap(); + Pin::new(&mut *stream).poll_shutdown(cx) + } +} + +// Unfortunately, we need to wrap RecvStream with a buffer since moq-transport::Coding only supports buffered reads. +// TODO support unbuffered reads so we only read the MoQ header and then hand off the stream. +// NOTE: We can't use AsyncRead::chain because we need to get the inner stream for stop. +pub struct RecvStream { + stream: BufReader, +} + +impl RecvStream { + pub(crate) fn new(stream: webtransport_quinn::RecvStream) -> Self { + let stream = BufReader::new(stream); + Self { stream } + } + + pub fn stop(self, code: u32) { + self.stream.into_inner().stop(code).ok(); + } +} + +impl Deref for RecvStream { + type Target = BufReader; + + fn deref(&self) -> &Self::Target { + &self.stream + } +} + +impl DerefMut for RecvStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.stream + } +} diff --git a/moq-transport/src/control/role.rs b/moq-transport/src/control/role.rs index 15a1073..ea7d413 100644 --- a/moq-transport/src/control/role.rs +++ b/moq-transport/src/control/role.rs @@ -23,6 +23,10 @@ impl Role { Self::Publisher => false, } } + + pub fn compatible(&self, other: Role) -> bool { + self.is_publisher() == other.is_subscriber() && self.is_subscriber() == other.is_publisher() + } } impl From for VarInt { diff --git a/moq-transport/src/control/version.rs b/moq-transport/src/control/version.rs index 0ba323b..6c18cd5 100644 --- a/moq-transport/src/control/version.rs +++ b/moq-transport/src/control/version.rs @@ -73,3 +73,9 @@ impl Deref for Versions { &self.0 } } + +impl From> for Versions { + fn from(vs: Vec) -> Self { + Self(vs) + } +} diff --git a/moq-warp/src/relay/contribute.rs b/moq-warp/src/relay/contribute.rs index 7f3660d..ef0cb14 100644 --- a/moq-warp/src/relay/contribute.rs +++ b/moq-warp/src/relay/contribute.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time; -use tokio::io::AsyncReadExt; +use tokio::io::AsyncBufReadExt; use tokio::sync::mpsc; use tokio::task::JoinSet; // lock across await boundaries @@ -91,10 +91,13 @@ impl Session { async fn receive_object(&mut self, object: Object, stream: RecvStream) -> anyhow::Result<()> { let track = object.track; + // Keep objects in memory for 10s + let expires = time::Instant::now() + time::Duration::from_secs(10); + let segment = segment::Info { sequence: object.sequence, send_order: object.send_order, - expires: Some(time::Instant::now() + time::Duration::from_secs(2)), // TODO increase this once send_order is implemented + expires: Some(expires), }; let segment = segment::Publisher::new(segment); @@ -112,14 +115,15 @@ impl Session { } async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> { - let mut buf = [0u8; 32 * 1024]; loop { - let size = stream.read(&mut buf).await.context("failed to read from stream")?; - if size == 0 { + let buf = stream.fill_buf().await?; + if buf.is_empty() { return Ok(()); } - let chunk = buf[..size].to_vec(); + let chunk = buf.to_vec(); + stream.consume(chunk.len()); + segment.fragments.push(chunk.into()) } } diff --git a/moq-warp/src/relay/control.rs b/moq-warp/src/relay/control.rs index 2151594..c53a359 100644 --- a/moq-warp/src/relay/control.rs +++ b/moq-warp/src/relay/control.rs @@ -1,10 +1,12 @@ use tokio::sync::mpsc; use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk}; -use moq_transport_quinn::Control; +use moq_transport_quinn::{RecvControl, SendControl}; pub struct Main { - control: Control, + send_control: SendControl, + recv_control: RecvControl, + outgoing: mpsc::Receiver, contribute: mpsc::Sender, @@ -15,8 +17,8 @@ impl Main { pub async fn run(mut self) -> anyhow::Result<()> { loop { tokio::select! { - Some(msg) = self.outgoing.recv() => self.control.send(msg).await?, - Ok(msg) = self.control.recv() => self.handle(msg).await?, + Some(msg) = self.outgoing.recv() => self.send_control.send(msg).await?, + Ok(msg) = self.recv_control.recv() => self.handle(msg).await?, } } } @@ -51,13 +53,17 @@ impl Component { } // Splits a control stream into two components, based on if it's a message for contribution or distribution. -pub fn split(control: Control) -> (Main, Component, Component) { +pub fn split( + send_control: SendControl, + recv_control: RecvControl, +) -> (Main, Component, Component) { let (outgoing_tx, outgoing_rx) = mpsc::channel(1); let (contribute_tx, contribute_rx) = mpsc::channel(1); let (distribute_tx, distribute_rx) = mpsc::channel(1); let control = Main { - control, + send_control, + recv_control, outgoing: outgoing_rx, contribute: contribute_tx, distribute: distribute_tx, diff --git a/moq-warp/src/relay/distribute.rs b/moq-warp/src/relay/distribute.rs index 0db689d..3065a0a 100644 --- a/moq-warp/src/relay/distribute.rs +++ b/moq-warp/src/relay/distribute.rs @@ -1,7 +1,6 @@ use anyhow::Context; -use tokio::io::AsyncWriteExt; -use tokio::task::JoinSet; // allows locking across await +use tokio::{io::AsyncWriteExt, task::JoinSet}; // allows locking across await use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; use moq_transport_quinn::SendObjects; @@ -165,7 +164,7 @@ impl Session { send_order: segment.send_order, }; - let mut stream = objects.send(object).await?; + let mut stream = objects.open(object).await?; // Write each fragment as they are available. while let Some(fragment) = segment.fragments.next().await { diff --git a/moq-warp/src/relay/mod.rs b/moq-warp/src/relay/mod.rs index 485adfa..109e3a5 100644 --- a/moq-warp/src/relay/mod.rs +++ b/moq-warp/src/relay/mod.rs @@ -3,8 +3,6 @@ pub mod broker; mod contribute; mod control; mod distribute; -mod server; mod session; -pub use server::*; pub use session::*; diff --git a/moq-warp/src/relay/session.rs b/moq-warp/src/relay/session.rs index 20de40f..392ef96 100644 --- a/moq-warp/src/relay/session.rs +++ b/moq-warp/src/relay/session.rs @@ -1,10 +1,5 @@ -use anyhow::Context; - use super::{broker, contribute, control, distribute}; -use moq_transport::{Role, SetupServer, Version}; -use moq_transport_quinn::Connect; - pub struct Session { // Split logic into contribution/distribution to reduce the problem space. contribute: contribute::Session, @@ -15,50 +10,17 @@ pub struct Session { } impl Session { - pub async fn accept(session: Connect, broker: broker::Broadcasts) -> anyhow::Result { - // Accep the WebTransport session. - // OPTIONAL validate the conn.uri() otherwise call conn.reject() - let session = session - .accept() - .await - .context(": server::Setupfailed to accept WebTransport session")?; + pub fn new(session: moq_transport_quinn::Session, broker: broker::Broadcasts) -> Session { + let (control, contribute, distribute) = control::split(session.send_control, session.recv_control); - session - .setup() - .versions - .iter() - .find(|v| **v == Version::DRAFT_00) - .context("failed to find supported version")?; + let contribute = contribute::Session::new(session.recv_objects, contribute, broker.clone()); + let distribute = distribute::Session::new(session.send_objects, distribute, broker); - // Choose our role based on the client's role. - let role = match session.setup().role { - Role::Publisher => Role::Subscriber, - Role::Subscriber => Role::Publisher, - Role::Both => Role::Both, - }; - - let setup = SetupServer { - version: Version::DRAFT_00, - role, - }; - - let session = session.accept(setup).await?; - - let (control, objects) = session.split(); - let (objects_send, objects_recv) = objects.split(); - - let (control, contribute, distribute) = control::split(control); - - let contribute = contribute::Session::new(objects_recv, contribute, broker.clone()); - let distribute = distribute::Session::new(objects_send, distribute, broker); - - let session = Self { + Self { control, contribute, distribute, - }; - - Ok(session) + } } pub async fn run(self) -> anyhow::Result<()> {