From f9c3f8b8985a75396592c7fd08fc0434759d331b Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 18 Jul 2023 10:42:38 -0700 Subject: [PATCH] WIP --- Cargo.lock | 303 +++++++++++++++++- Cargo.toml | 7 +- moq-demo-quinn/src/server/stream.rs | 60 ---- {moq-demo-quinn => moq-demo}/Cargo.toml | 21 +- {moq-demo-quinn => moq-demo}/src/main.rs | 14 +- .../src/server/mod.rs | 101 +++--- moq-transport/Cargo.toml | 10 +- moq-transport/src/network/control.rs | 76 ++--- moq-transport/src/network/mod.rs | 10 +- moq-transport/src/network/object.rs | 142 -------- moq-transport/src/network/objects.rs | 103 ++++++ moq-transport/src/network/server.rs | 49 +-- moq-transport/src/network/stream.rs | 60 ---- moq-warp/Cargo.toml | 10 +- 14 files changed, 529 insertions(+), 437 deletions(-) delete mode 100644 moq-demo-quinn/src/server/stream.rs rename {moq-demo-quinn => moq-demo}/Cargo.toml (56%) rename {moq-demo-quinn => moq-demo}/src/main.rs (96%) rename {moq-demo-quinn => moq-demo}/src/server/mod.rs (74%) delete mode 100644 moq-transport/src/network/object.rs create mode 100644 moq-transport/src/network/objects.rs delete mode 100644 moq-transport/src/network/stream.rs diff --git a/Cargo.lock b/Cargo.lock index 0a746fa..bf12a38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,6 +66,124 @@ version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix", + "slab", + "socket2 0.4.9", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" + +[[package]] +name = "async-trait" +version = "0.1.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2d0f03b3640e3a630367e40c468cb7f309529c708ed1d88597047b0e7c6ef7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "atomic-waker" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" + [[package]] name = "atty" version = "0.2.14" @@ -110,6 +228,21 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", + "log", +] + [[package]] name = "bumpalo" version = "3.13.0" @@ -188,6 +321,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "concurrent-queue" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -213,6 +355,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -276,6 +427,12 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fastrand" version = "1.9.0" @@ -348,6 +505,21 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.28" @@ -410,6 +582,18 @@ dependencies = [ "wasi", ] +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.19" @@ -669,6 +853,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "libc" version = "0.2.146" @@ -696,6 +889,9 @@ name = "log" version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +dependencies = [ + "value-bag", +] [[package]] name = "memchr" @@ -735,6 +931,7 @@ name = "moq-demo" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "clap", "env_logger", "hex", @@ -747,15 +944,21 @@ dependencies = [ "rustls-pemfile", "tokio", "warp", + "webtransport-generic 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "webtransport-quinn", ] [[package]] name = "moq-transport" version = "0.1.0" dependencies = [ + "anyhow", "bytes", + "futures", "log", "thiserror", + "tokio", + "webtransport-generic 0.2.0", ] [[package]] @@ -780,15 +983,15 @@ name = "moq-warp" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "log", "moq-transport", - "moq-transport-quinn", "mp4", - "quinn", "ring", "rustls 0.21.2", "rustls-pemfile", "tokio", + "webtransport-generic 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -888,6 +1091,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "parking" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" + [[package]] name = "parking_lot" version = "0.12.1" @@ -949,6 +1158,22 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1606,12 +1831,24 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "value-bag" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" + [[package]] name = "version_check" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "want" version = "0.3.1" @@ -1684,6 +1921,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.87" @@ -1733,6 +1982,56 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webtransport-generic" +version = "0.2.0" +dependencies = [ + "async-trait", + "bytes", + "log", + "thiserror", +] + +[[package]] +name = "webtransport-generic" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db80267637ca8d24cd3425a3e993842ed9e128620f5fcd9603145fee8fe808fd" +dependencies = [ + "bytes", + "log", + "thiserror", +] + +[[package]] +name = "webtransport-proto" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24a891faa138075b2338bc3da6dc6a67c174118a088e2809db6882d410973cd6" +dependencies = [ + "bytes", + "http", + "thiserror", +] + +[[package]] +name = "webtransport-quinn" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea529a3044e99379b180581e8b0773fa7feb1fc53e341f36927a755ac05abd62" +dependencies = [ + "async-std", + "bytes", + "futures", + "http", + "quinn", + "quinn-proto", + "thiserror", + "tokio-util", + "webtransport-generic 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "webtransport-proto", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 37d1cdf..125e35a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,2 @@ [workspace] -members = [ - "moq-transport", - "moq-transport-quinn", - "moq-demo-quinn", - "moq-warp", -] +members = ["moq-transport", "moq-transport-quinn", "moq-demo", "moq-warp"] diff --git a/moq-demo-quinn/src/server/stream.rs b/moq-demo-quinn/src/server/stream.rs deleted file mode 100644 index 1eb4834..0000000 --- a/moq-demo-quinn/src/server/stream.rs +++ /dev/null @@ -1,60 +0,0 @@ -use h3::quic::SendStream; -use h3::quic::RecvStream; -use h3::quic::SendStreamUnframed; - -pub struct QuinnSendStream { - stream: h3_webtransport::stream::SendStream, bytes::Bytes> -} - -impl QuinnSendStream { - pub fn new(stream: h3_webtransport::stream::SendStream, bytes::Bytes>) -> QuinnSendStream { - QuinnSendStream { stream } - } -} - -impl webtransport_generic::SendStream for QuinnSendStream { - - type Error = anyhow::Error; - - fn poll_finish(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { - self.stream.poll_finish(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) - } - - fn reset(&mut self, reset_code: u32) { - self.stream.reset(reset_code as u64) - } - - fn poll_send( - &mut self, - cx: &mut std::task::Context<'_>, - buf: &mut D, - ) -> std::task::Poll> { - self.stream.poll_send(cx, buf).map_err(|e| anyhow::anyhow!("{:?}", e)) - } -} - -pub struct QuinnRecvStream { - stream: h3_webtransport::stream::RecvStream -} - -impl QuinnRecvStream { - pub fn new(stream: h3_webtransport::stream::RecvStream) -> QuinnRecvStream { - QuinnRecvStream { stream } - } -} - -impl webtransport_generic::RecvStream for QuinnRecvStream { - type Error = anyhow::Error; - type Buf = bytes::Bytes; - - fn poll_data( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll, Self::Error>> { - self.stream.poll_data(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) - } - - fn stop_sending(&mut self, error_code: u32) { - self.stream.stop_sending(error_code as u64) - } -} diff --git a/moq-demo-quinn/Cargo.toml b/moq-demo/Cargo.toml similarity index 56% rename from moq-demo-quinn/Cargo.toml rename to moq-demo/Cargo.toml index 592e8ab..1384ff8 100644 --- a/moq-demo-quinn/Cargo.toml +++ b/moq-demo/Cargo.toml @@ -1,16 +1,15 @@ [package] -name = "moq-demo-quinn" +name = "moq-demo" description = "Media over QUIC" -authors = [ "Luke Curley" ] +authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" version = "0.1.0" edition = "2021" -keywords = [ "quic", "http3", "webtransport", "media", "live" ] -categories = [ "multimedia", "network-programming", "web-programming" ] - +keywords = ["quic", "http3", "webtransport", "media", "live"] +categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -32,17 +31,15 @@ warp = { version = "0.3.3", features = ["tls"] } hex = "0.4.3" # Logging -clap = { version = "4.0", features = [ "derive" ] } +clap = { version = "4.0", features = ["derive"] } log = { version = "0.4", features = ["std"] } env_logger = "0.9.3" anyhow = "1.0.70" -bytes= "1" +bytes = "1" + +webtransport-generic = "0.2" +webtransport-quinn = "0.3" -webtransport-generic = {git = "https://github.com/kixelated/webtransport-rs"} moq-transport = { path = "../moq-transport" } moq-warp = { path = "../moq-warp" } - -h3 = { git = "https://github.com/hyperium/h3", branch = "master" } -h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" } -h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" } diff --git a/moq-demo-quinn/src/main.rs b/moq-demo/src/main.rs similarity index 96% rename from moq-demo-quinn/src/main.rs rename to moq-demo/src/main.rs index 5fd3536..d7fec18 100644 --- a/moq-demo-quinn/src/main.rs +++ b/moq-demo/src/main.rs @@ -7,7 +7,10 @@ use ring::digest::{digest, SHA256}; use tokio::task::JoinSet; use warp::Filter; -use moq_warp::{relay::{self, broker::Broadcasts}, source}; +use moq_warp::{ + relay::{self, broker::Broadcasts}, + source, +}; mod server; @@ -29,10 +32,8 @@ struct Cli { /// Use the media file at this path #[arg(short, long, default_value = "media/fragmented.mp4")] media: path::PathBuf, - } - #[tokio::main] async fn main() -> anyhow::Result<()> { env_logger::init(); @@ -49,7 +50,7 @@ async fn main() -> anyhow::Result<()> { broker .announce("quic.video/demo", media.source()) .context("failed to announce file source")?; - + // Create a server to actually serve the media let config = relay::ServerConfig { addr: args.addr, @@ -63,12 +64,11 @@ async fn main() -> anyhow::Result<()> { res = media.run() => res.context("failed to run media source"), res = serve => res.context("failed to run HTTP server"), } - } async fn run_server(config: relay::ServerConfig, broker: Broadcasts) -> anyhow::Result<()> { + let quinn = server::Server::new(config).unwrap(); - let quinn = server::Server::new_quinn_connection(config).unwrap(); let mut tasks = JoinSet::new(); loop { let broker = broker.clone(); @@ -86,7 +86,7 @@ async fn run_server(config: relay::ServerConfig, broker: Broadcasts) -> anyhow:: version: Version::DRAFT_00, role, }; - + let session = client_setup.accept(setup_server).await?; let session = relay::Session::from_transport_session(session, broker.clone()).await?; session.run().await?; diff --git a/moq-demo-quinn/src/server/mod.rs b/moq-demo/src/server/mod.rs similarity index 74% rename from moq-demo-quinn/src/server/mod.rs rename to moq-demo/src/server/mod.rs index 0cb2b35..ace4b48 100644 --- a/moq-demo-quinn/src/server/mod.rs +++ b/moq-demo/src/server/mod.rs @@ -7,20 +7,16 @@ use h3_webtransport::server::AcceptedBi; use moq_transport::AcceptSetup; use moq_warp::relay::ServerConfig; use tokio::task::JoinSet; -use warp::{Future, http}; - -use self::stream::{QuinnSendStream, QuinnRecvStream}; - -mod stream; +use warp::{http, Future}; pub struct Server { // The MoQ transport server. - server: h3_webtransport::server::WebTransportSession, + server: quinn::Endpoint, } impl Server { // Create a new server - pub fn new_quinn_connection(config: ServerConfig) -> anyhow::Result { + pub fn new(config: ServerConfig) -> anyhow::Result { // Read the PEM certificate chain let certs = fs::File::open(config.cert).context("failed to open cert file")?; let mut certs = io::BufReader::new(certs); @@ -46,13 +42,7 @@ impl Server { .with_single_cert(certs, key)?; tls_config.max_early_data_size = u32::MAX; - let alpn: Vec> = vec![ - b"h3".to_vec(), - b"h3-32".to_vec(), - b"h3-31".to_vec(), - b"h3-30".to_vec(), - b"h3-29".to_vec(), - ]; + let alpn: Vec> = vec![webtransport_quinn::ALPN]; tls_config.alpn_protocols = alpn; let mut server_config = quinn::ServerConfig::with_crypto(std::sync::Arc::new(tls_config)); @@ -66,7 +56,7 @@ impl Server { server_config.transport = std::sync::Arc::new(transport_config); let server = quinn::Endpoint::server(server_config, config.addr)?; - Ok(server) + Ok(Self { server }) } pub async fn accept_new_webtransport_session(endpoint: &h3_quinn::Endpoint) -> anyhow::Result { @@ -78,7 +68,7 @@ impl Server { conn = endpoint.accept() => { let conn = conn.context("failed to accept connection").unwrap(); handshake.spawn(async move { - + let conn = conn.await.context("failed to accept h3 connection")?; let mut conn = h3::server::builder() @@ -119,7 +109,6 @@ impl Server { ) } } - } // The WebTransport CONNECT has arrived, and we need to decide if we accept it. @@ -132,18 +121,17 @@ pub struct Connect { } impl Connect { - // Accept the WebTransport session. pub async fn accept(self) -> anyhow::Result> { let session = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?; - let mut session = Server{server: session}; + let mut session = Server { server: session }; let (control_stream_send, control_stream_recv) = moq_transport::accept_bidi(&mut session) .await .context("failed to accept bidi stream")? .unwrap(); - Ok(moq_transport::Session::accept(Box::new(control_stream_send), Box::new(control_stream_recv), Box::new(session)).await?) + Ok(moq_transport::Session::accept(session).await?) } // Reject the WebTransport session with a HTTP response. @@ -153,34 +141,34 @@ impl Connect { } } - impl webtransport_generic::Connection for Server { - type Error = anyhow::Error; - type SendStream = QuinnSendStream; + type SendStream = QuinnSendStream; - type RecvStream = QuinnRecvStream; + type RecvStream = QuinnRecvStream; - fn poll_accept_uni( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll, Self::Error>> { + fn poll_accept_uni( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, Self::Error>> { let fut = self.server.accept_uni(); let fut = std::pin::pin!(fut); fut.poll(cx) - .map_ok(|opt| opt.map(|(_, s)| QuinnRecvStream::new(s))) - .map_err(|e| anyhow::anyhow!("{:?}", e)) - } + .map_ok(|opt| opt.map(|(_, s)| QuinnRecvStream::new(s))) + .map_err(|e| anyhow::anyhow!("{:?}", e)) + } - fn poll_accept_bidi( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll, Self::Error>> { + fn poll_accept_bidi( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, Self::Error>> { let fut = self.server.accept_bi(); let fut = std::pin::pin!(fut); let res = std::task::ready!(fut.poll(cx).map_err(|e| anyhow::anyhow!("{:?}", e))); match res { - Ok(Some(AcceptedBi::Request(_, _))) => std::task::Poll::Ready(Err(anyhow::anyhow!("received new session whils accepting bidi stream"))), + Ok(Some(AcceptedBi::Request(_, _))) => { + std::task::Poll::Ready(Err(anyhow::anyhow!("received new session whils accepting bidi stream"))) + } Ok(Some(AcceptedBi::BidiStream(_, s))) => { let (send, recv) = s.split(); std::task::Poll::Ready(Ok(Some((QuinnSendStream::new(send), QuinnRecvStream::new(recv))))) @@ -188,35 +176,34 @@ impl webtransport_generic::Connection for Server { Ok(None) => std::task::Poll::Ready(Ok(None)), Err(e) => std::task::Poll::Ready(Err(e)), } - } + } - fn poll_open_bidi( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + fn poll_open_bidi( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { let fut = self.server.open_bi(self.server.session_id()); let fut = std::pin::pin!(fut); fut.poll(cx) - .map_ok(|s| { + .map_ok(|s| { let (send, recv) = s.split(); (QuinnSendStream::new(send), QuinnRecvStream::new(recv)) - } - ) - .map_err(|e| anyhow::anyhow!("{:?}", e)) - } + }) + .map_err(|e| anyhow::anyhow!("{:?}", e)) + } - fn poll_open_uni( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + fn poll_open_uni( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { let fut = self.server.open_uni(self.server.session_id()); let fut = std::pin::pin!(fut); fut.poll(cx) - .map_ok(|s| QuinnSendStream::new(s)) - .map_err(|e| anyhow::anyhow!("{:?}", e)) - } + .map_ok(|s| QuinnSendStream::new(s)) + .map_err(|e| anyhow::anyhow!("{:?}", e)) + } - fn close(&mut self, _code: u32, _reason: &[u8]) { - todo!("not implemented") - } -} \ No newline at end of file + fn close(&mut self, _code: u32, _reason: &[u8]) { + todo!("not implemented") + } +} diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 4b63293..c691dd6 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -1,15 +1,15 @@ [package] name = "moq-transport" description = "Media over QUIC" -authors = [ "Luke Curley" ] +authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" version = "0.1.0" edition = "2021" -keywords = [ "quic", "http3", "webtransport", "media", "live" ] -categories = [ "multimedia", "network-programming", "web-programming" ] +keywords = ["quic", "http3", "webtransport", "media", "live"] +categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -20,5 +20,5 @@ thiserror = "1.0.21" log = "0.4" tokio = { version = "1.27", features = ["full"] } anyhow = "1.0.70" - -webtransport-generic = {git = "https://github.com/kixelated/webtransport-rs"} \ No newline at end of file +webtransport-generic = { version = "0.2", path = "../../webtransport-rs/webtransport-generic" } +futures = "0.3" diff --git a/moq-transport/src/network/control.rs b/moq-transport/src/network/control.rs index 0388953..e1e6a40 100644 --- a/moq-transport/src/network/control.rs +++ b/moq-transport/src/network/control.rs @@ -1,55 +1,25 @@ -use webtransport_generic::{RecvStream, SendStream}; use crate::{Decode, DecodeError, Encode, Message}; -use crate::network::stream::{recv, send}; +use webtransport_generic::{RecvStream, SendStream}; use bytes::{Buf, BytesMut}; +use std::future; use std::io::Cursor; use std::sync::Arc; use tokio::sync::Mutex; use anyhow::Context; - -pub struct Control { - sender: ControlSend, - recver: ControlRecv, -} - -impl Control{ - pub(crate) fn new(sender: Box, recver: Box) -> Self { - let sender = ControlSend::new(sender); - let recver = ControlRecv::new(recver); - - Self { sender, recver } - } - - pub fn split(self) -> (ControlSend, ControlRecv) { - (self.sender, self.recver) - } - - pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { - self.sender.send(msg) - .await - .map_err(|e| anyhow::anyhow!("{:?}", e)) - .context("error sending control message") - } - - pub async fn recv(&mut self) -> anyhow::Result { - self.recver.recv().await - } -} - -pub struct ControlSend { - stream: Box, +pub struct SendControl { + stream: S, buf: BytesMut, // reuse a buffer to encode messages. } -impl ControlSend { - pub fn new(inner: Box) -> Self { +impl SendControl { + pub fn new(stream: S) -> Self { Self { buf: BytesMut::new(), - stream: inner, + stream, } } @@ -61,16 +31,17 @@ impl ControlSend { msg.encode(&mut self.buf)?; // TODO make this work with select! - send(self.stream.as_mut(), &mut self.buf) - .await - .map_err(|e| anyhow::anyhow!("{:?}", e.into())) - .context("error sending control message")?; + self.stream + .send(&mut self.buf) + .await + .context("error sending control message")?; + Ok(()) } // Helper that lets multiple threads send control messages. - pub fn share(self) -> ControlShared { - ControlShared { + pub fn share(self) -> SharedControl { + SharedControl { stream: Arc::new(Mutex::new(self)), } } @@ -79,27 +50,27 @@ impl ControlSend { // Helper that allows multiple threads to send control messages. // There's no equivalent for receiving since only one thread should be receiving at a time. #[derive(Clone)] -pub struct ControlShared { - stream: Arc>>, +pub struct SharedControl { + stream: Arc>>, } -impl ControlShared { +impl SharedControl { pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { let mut stream = self.stream.lock().await; stream.send(msg).await } } -pub struct ControlRecv { - stream: Box, +pub struct RecvControl { + stream: R, buf: BytesMut, // data we've read but haven't fully decoded yet } -impl ControlRecv { - pub fn new(inner: Box) -> Self { +impl RecvControl { + pub fn new(stream: R) -> Self { Self { buf: BytesMut::new(), - stream: inner, + stream, } } @@ -119,9 +90,8 @@ impl ControlRecv { } Err(DecodeError::UnexpectedEnd) => { // The decode failed, so we need to append more data. - recv(self.stream.as_mut(), &mut self.buf) + future::poll_fn(|cx| self.stream.poll_recv(cx, &mut self.buf)) .await - .map_err(|e| anyhow::anyhow!("{:?}", e.into())) .context("error receiving control message")?; } Err(e) => return Err(e.into()), diff --git a/moq-transport/src/network/mod.rs b/moq-transport/src/network/mod.rs index fb0bf32..aa5c6dd 100644 --- a/moq-transport/src/network/mod.rs +++ b/moq-transport/src/network/mod.rs @@ -1,13 +1,7 @@ -mod stream; mod control; -mod object; +mod objects; mod server; -use std::sync::Arc; -use std::sync::Mutex; -pub type SharedConnection = Arc>>; - -pub use stream::*; pub use control::*; -pub use object::*; +pub use objects::*; pub use server::*; diff --git a/moq-transport/src/network/object.rs b/moq-transport/src/network/object.rs deleted file mode 100644 index e78adaf..0000000 --- a/moq-transport/src/network/object.rs +++ /dev/null @@ -1,142 +0,0 @@ -use anyhow::Context; -use bytes::{Buf, BytesMut}; -use webtransport_generic::{Connection, SendStream, RecvStream}; -use crate::{Decode, DecodeError, Encode, Object}; -use std::{io::Cursor, marker::PhantomData}; - -use crate::network::SharedConnection; - -use super::stream::{open_uni_shared, send, recv, accept_uni_shared}; - -// TODO support clients - -pub struct Objects { - send: SendObjects, - recv: RecvObjects, -} - -impl + Send> Objects { - pub fn new(session: SharedConnection) -> Self { - let send = SendObjects::new(session.clone()); - let recv = RecvObjects::new(session); - Self { send, recv } - } - - pub fn split(self) -> (SendObjects, RecvObjects) { - (self.send, self.recv) - } - - pub async fn recv(&mut self) -> anyhow::Result<(Object, R)> { - self.recv.recv().await - } - - pub async fn send(&mut self, header: Object) -> anyhow::Result { - self.send.send(header).await - } -} - -pub struct SendObjects { - session: SharedConnection, - - // A reusable buffer for encoding messages. - buf: BytesMut, - _marker: PhantomData, -} - -impl> SendObjects { - pub fn new(session: SharedConnection) -> Self { - Self { - session, - buf: BytesMut::new(), - _marker: PhantomData, - } - } - - pub async fn send(&mut self, header: Object) -> anyhow::Result { - self.buf.clear(); - header.encode(&mut self.buf).unwrap(); - - // TODO support select! without making a new stream. - let mut stream = open_uni_shared(self.session.clone()) - .await - .map_err(|e| anyhow::anyhow!("{:?}", e.into())) - .context("failed to open uni stream")?; - - send(&mut stream, &mut self.buf) - .await - .map_err(|e| anyhow::anyhow!("{:?}", e.into())) - .context("failed to send data on stream")?; - - Ok(stream) - } -} - -impl> Clone for SendObjects { - fn clone(&self) -> Self { - Self { - session: self.session.clone(), - buf: BytesMut::new(), - _marker: PhantomData, - } - } -} - -// Not clone, so we don't accidentally have two listners. -pub struct RecvObjects { - session: SharedConnection, - - // A uni stream that's been accepted but not fully read from yet. - stream: Option>, - - // Data that we've read but haven't formed a full message yet. - buf: BytesMut, -} - -impl> RecvObjects { - pub fn new(session: SharedConnection) -> Self { - Self { - session, - stream: None, - buf: BytesMut::new(), - } - } - - pub async fn recv(&mut self) -> anyhow::Result<(Object, R)> { - // Make sure any state is saved across await boundaries so this works with select! - - let stream = match self.stream.as_mut() { - Some(stream) => stream, - None => { - let stream = accept_uni_shared(self.session.clone()) - .await - .map_err(|e| anyhow::anyhow!("{:?}", e.into())) - .context("failed to accept uni stream")? - .context("no uni stream")?; - - self.stream.insert(Box::new(stream)) - } - }; - - loop { - // Read the contents of the buffer - let mut peek = Cursor::new(&self.buf); - - match Object::decode(&mut peek) { - Ok(header) => { - let stream = self.stream.take().unwrap(); - self.buf.advance(peek.position() as usize); - - return Ok((header, *stream)); - } - Err(DecodeError::UnexpectedEnd) => { - // The decode failed, so we need to append more data. - recv(stream.as_mut(), &mut self.buf) - .await - .map_err(|e| anyhow::anyhow!("{:?}", e.into())) - .context("failed to recv data on stream")?; - } - Err(e) => return Err(e.into()), - } - } - } -} diff --git a/moq-transport/src/network/objects.rs b/moq-transport/src/network/objects.rs new file mode 100644 index 0000000..a48d32f --- /dev/null +++ b/moq-transport/src/network/objects.rs @@ -0,0 +1,103 @@ +use crate::{Decode, DecodeError, Encode, Object}; +use anyhow::Context; +use bytes::{Buf, BytesMut}; +use futures::StreamExt; +use futures::TryStreamExt; +use std::{ + io::Cursor, + pin::Pin, + task::{ready, Poll}, +}; +use tokio::task::JoinSet; +use webtransport_generic::{AsyncRecvStream, AsyncSendStream, AsyncSession}; + +// TODO support clients + +pub struct SendObjects { + session: S, + + // A reusable buffer for encoding messages. + buf: BytesMut, +} + +impl SendObjects { + pub fn new(session: S) -> Self { + Self { + session, + buf: BytesMut::new(), + } + } + + pub async fn send(&mut self, header: Object) -> anyhow::Result { + self.buf.clear(); + header.encode(&mut self.buf).unwrap(); + + let mut stream = self.session.open_uni().await.context("failed to open uni stream")?; + + stream + .send(&mut self.buf) + .await + .context("failed to send data on stream")?; + + Ok(stream) + } +} + +pub struct RecvObjects { + session: S, + objects: JoinSet>, +} + +impl RecvObjects { + pub fn new(session: S) -> Self { + let streams = futures::stream::unfold(session.clone(), |mut session| async move { + match session.accept_uni().await { + Ok(stream) => Some((stream, session)), + Err(e) => None, + } + }); + + let objects = streams.map(|mut stream| async move {}); + + // Decode the object header for up to 16 streams in parallel. + // Otherwise, a lost packet for the first chunk of a stream would block future streams. + let objects = objects.buffer_unordered(16); + + let objects = Box::pin(objects); + + //try_buffer_unordered + + let objects = JoinSet::new(); + + Self { session, objects } + } + + pub async fn next(&mut self) -> anyhow::Result<(Object, S::RecvStream)> { + loop { + tokio::select! { + res = self.objects.join_next(), if !self.objects.is_empty() => { + return res.unwrap()?; + }, + res = self.session.accept_uni() => { + let stream = res?; + self.objects.spawn(async move { Self::fetch(stream).await }); + }, + } + } + } + + async fn fetch(mut stream: S::RecvStream) -> anyhow::Result<(Object, S::RecvStream)> { + let mut buf = Vec::with_capacity(64); + + loop { + stream.recv(&mut buf).await?.context("no more data")?; + let mut peek = Cursor::new(&buf); + + match Object::decode(&mut peek) { + Ok(header) => return Ok((header, stream)), + Err(DecodeError::UnexpectedEnd) => continue, + Err(err) => return Err(err.into()), + } + } + } +} diff --git a/moq-transport/src/network/server.rs b/moq-transport/src/network/server.rs index 82727f9..e6e1c10 100644 --- a/moq-transport/src/network/server.rs +++ b/moq-transport/src/network/server.rs @@ -1,41 +1,48 @@ - -use anyhow::Context; -use webtransport_generic::{Connection, RecvStream}; +/* use crate::{Message, SetupClient, SetupServer}; +use anyhow::Context; +use webtransport_generic::Session as Generic; +use super::{RecvControl, RecvObjects, SendControl, SendObjects}; -use super::{Control, Objects}; -pub struct Session { - pub control: Control, - pub objects: Objects, +pub struct Session { + pub send_control: SendControl, + pub recv_control: RecvControl, + pub send_objects: SendObjects, + pub recv_objects: RecvObjects, } -impl + Send> Session { +impl Session { + pub async fn accept( + session: S, + ) -> anyhow::Result> { + let send_objects = SendObjects::new(session.clone()); + let recv_objects = RecvObjects::new(session.clone()); - pub async fn accept(control_stream_send: Box, control_stream_recv: Box::, connection: Box) -> anyhow::Result> { - let mut control = Control::new(control_stream_send, control_stream_recv); - let objects = Objects::new(std::sync::Arc::new(std::sync::Mutex::new(connection))); - - let setup_client = match control.recv().await.context("failed to read SETUP")? { + let setup_client = match recv_control await.context("failed to read SETUP")? { Message::SetupClient(setup) => setup, _ => anyhow::bail!("expected CLIENT SETUP"), }; - Ok(AcceptSetup { setup_client, control, objects }) + Ok(AcceptSetup { + setup_client, + control, + objects, + }) } - pub fn split(self) -> (Control, Objects) { + + pub fn split(self) -> (Control, Objects) { (self.control, self.objects) } } - -pub struct AcceptSetup { +pub struct AcceptSetup { setup_client: SetupClient, - control: Control, + control: Control, objects: Objects, } -impl AcceptSetup { +impl AcceptSetup { // Return the setup message we received. pub fn setup(&self) -> &SetupClient { &self.setup_client @@ -54,4 +61,6 @@ impl AcceptSetup { // TODO Close the QUIC connection with an error code. Ok(()) } -} \ No newline at end of file +} + +*/ diff --git a/moq-transport/src/network/stream.rs b/moq-transport/src/network/stream.rs deleted file mode 100644 index f3e7821..0000000 --- a/moq-transport/src/network/stream.rs +++ /dev/null @@ -1,60 +0,0 @@ -use std::sync::Arc; - -use bytes::{Buf, BytesMut, BufMut}; -use webtransport_generic::{Connection, RecvStream, SendStream}; - - - -pub async fn accept_uni(conn: &mut C) -> Result, C::Error> { - std::future::poll_fn(|cx| conn.poll_accept_uni(cx)).await -} - -pub async fn accept_uni_shared(conn: Arc>>) -> Result, C::Error> { - Ok(std::future::poll_fn(|cx| conn.lock().unwrap().poll_accept_uni(cx)).await?) - -} - -pub async fn accept_bidi(conn: &mut C) -> Result, C::Error> { - std::future::poll_fn(|cx| conn.poll_accept_bidi(cx)).await - -} - -pub async fn accept_bidi_shared(conn: Arc>>) -> Result, C::Error> { - std::future::poll_fn(|cx| conn.lock().unwrap().poll_accept_bidi(cx)).await - -} - -pub async fn open_uni(conn: &mut C) -> anyhow::Result { - std::future::poll_fn(|cx| conn.poll_open_uni(cx)).await - -} - -pub async fn open_uni_shared(conn: Arc>>) -> Result { - std::future::poll_fn(|cx| conn.lock().unwrap().poll_open_uni(cx)).await - -} - -pub async fn open_bidi(conn: &mut C) -> anyhow::Result<(C::SendStream, C::RecvStream), C::Error> { - std::future::poll_fn(|cx| conn.poll_open_bidi(cx)).await - -} - -pub async fn open_bidi_shared(conn: Arc>>) -> Result<(C::SendStream, C::RecvStream), C::Error> { - std::future::poll_fn(|cx| conn.lock().unwrap().poll_open_bidi(cx)).await - -} - -pub async fn recv(recv: &mut R , outbuf: &mut BytesMut) -> Result { - let buf = std::future::poll_fn(|cx| recv.poll_data(cx)).await?; - match buf { - Some(buf) => { - outbuf.put(buf); - Ok(true) - } - None => Ok(false) // stream finished - } -} - -pub async fn send(send: &mut S, buf: &mut B) -> Result { - std::future::poll_fn(|cx| send.poll_send(cx, buf)).await -} diff --git a/moq-warp/Cargo.toml b/moq-warp/Cargo.toml index 4c2f1ee..e96425b 100644 --- a/moq-warp/Cargo.toml +++ b/moq-warp/Cargo.toml @@ -1,28 +1,28 @@ [package] name = "moq-warp" description = "Media over QUIC" -authors = [ "Luke Curley" ] +authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" version = "0.1.0" edition = "2021" -keywords = [ "quic", "http3", "webtransport", "media", "live" ] -categories = [ "multimedia", "network-programming", "web-programming" ] +keywords = ["quic", "http3", "webtransport", "media", "live"] +categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] moq-transport = { path = "../moq-transport" } -webtransport-generic = {git = "https://github.com/kixelated/webtransport-rs"} +webtransport-generic = "0.2" bytes = "1" tokio = "1.27" mp4 = "0.13.0" anyhow = "1.0.70" -log = "0.4" # TODO remove +log = "0.4" # TODO remove # QUIC stuff ring = "0.16.20"