diff --git a/.editorconfig b/.editorconfig index 6773ee4..5e16f8e 100644 --- a/.editorconfig +++ b/.editorconfig @@ -8,3 +8,10 @@ insert_final_newline = true indent_style = tab indent_size = 4 max_line_length = 120 + +[*.md] +trim_trailing_whitespace = false + +[*.yml] +indent_style = space +indent_size = 2 diff --git a/Cargo.lock b/Cargo.lock index 7187584..66dcfde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,6 +83,12 @@ dependencies = [ "backtrace", ] +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + [[package]] name = "async-channel" version = "1.9.0" @@ -103,7 +109,7 @@ dependencies = [ "async-lock", "async-task", "concurrent-queue", - "fastrand", + "fastrand 1.9.0", "futures-lite", "slab", ] @@ -137,7 +143,7 @@ dependencies = [ "log", "parking", "polling", - "rustix", + "rustix 0.37.23", "slab", "socket2 0.4.9", "waker-fn", @@ -184,6 +190,17 @@ version = "4.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" +[[package]] +name = "async-trait" +version = "0.1.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-waker" version = "1.1.1" @@ -207,6 +224,55 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -240,6 +306,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" + [[package]] name = "block-buffer" version = "0.10.4" @@ -259,7 +331,7 @@ dependencies = [ "async-lock", "async-task", "atomic-waker", - "fastrand", + "fastrand 1.9.0", "futures-lite", "log", ] @@ -353,6 +425,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "concurrent-queue" version = "2.2.0" @@ -480,12 +566,33 @@ dependencies = [ "instant", ] +[[package]] +name = "fastrand" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" + [[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.0" @@ -555,7 +662,7 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" dependencies = [ - "fastrand", + "fastrand 1.9.0", "futures-core", "futures-io", "memchr", @@ -790,6 +897,33 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" +dependencies = [ + "futures-util", + "http", + "hyper", + "rustls 0.21.7", + "tokio", + "tokio-rustls 0.24.1", +] + +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "idna" version = "0.4.0" @@ -840,6 +974,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "ipnet" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" + [[package]] name = "itoa" version = "1.0.9" @@ -882,6 +1022,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" +[[package]] +name = "linux-raw-sys" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" + [[package]] name = "lock_api" version = "0.4.10" @@ -901,6 +1047,12 @@ dependencies = [ "value-bag", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "memchr" version = "2.6.3" @@ -943,6 +1095,25 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "moq-api" +version = "0.0.1" +dependencies = [ + "axum", + "clap", + "env_logger", + "hyper", + "log", + "redis", + "reqwest", + "serde", + "serde_json", + "thiserror", + "tokio", + "tower", + "url", +] + [[package]] name = "moq-pub" version = "0.1.0" @@ -951,7 +1122,6 @@ dependencies = [ "clap", "clap_mangen", "env_logger", - "http", "log", "moq-transport", "mp4", @@ -963,7 +1133,7 @@ dependencies = [ "rustls-pemfile", "serde_json", "tokio", - "webtransport-generic", + "url", "webtransport-quinn", ] @@ -976,16 +1146,18 @@ dependencies = [ "env_logger", "hex", "log", + "moq-api", "moq-transport", "quinn", "ring", "rustls 0.21.7", + "rustls-native-certs", "rustls-pemfile", + "thiserror", "tokio", "tracing", - "tracing-subscriber", + "url", "warp", - "webtransport-generic", "webtransport-quinn", ] @@ -993,7 +1165,6 @@ dependencies = [ name = "moq-transport" version = "0.2.0" dependencies = [ - "anyhow", "bytes", "indexmap 2.0.0", "log", @@ -1051,13 +1222,21 @@ dependencies = [ ] [[package]] -name = "nu-ansi-term" -version = "0.46.0" +name = "native-tls" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" dependencies = [ - "overload", - "winapi", + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", ] [[package]] @@ -1128,6 +1307,32 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "openssl" +version = "0.10.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c" +dependencies = [ + "bitflags 2.4.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.1.5" @@ -1135,10 +1340,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] -name = "overload" -version = "0.1.1" +name = "openssl-sys" +version = "0.9.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +checksum = "db4d56a4c0478783083cfafcc42493dd4a981d41669da64b4572a2a089b51b1d" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] [[package]] name = "parking" @@ -1207,6 +1418,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" + [[package]] name = "polling" version = "2.8.0" @@ -1214,7 +1431,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" dependencies = [ "autocfg", - "bitflags", + "bitflags 1.3.2", "cfg-if", "concurrent-queue", "libc", @@ -1325,13 +1542,40 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redis" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "futures", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "rustls 0.21.7", + "rustls-native-certs", + "ryu", + "sha1_smol", + "socket2 0.4.9", + "tokio", + "tokio-retry", + "tokio-rustls 0.24.1", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] @@ -1363,6 +1607,49 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +[[package]] +name = "reqwest" +version = "0.11.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +dependencies = [ + "base64 0.21.4", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-rustls", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls 0.21.7", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "system-configuration", + "tokio", + "tokio-native-tls", + "tokio-rustls 0.24.1", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", + "winreg", +] + [[package]] name = "rfc6381-codec" version = "0.1.0" @@ -1413,11 +1700,24 @@ version = "0.37.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" dependencies = [ - "bitflags", + "bitflags 1.3.2", "errno", "io-lifetimes", "libc", - "linux-raw-sys", + "linux-raw-sys 0.3.8", + "windows-sys", +] + +[[package]] +name = "rustix" +version = "0.38.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7db8590df6dfcd144d22afd1b83b36c21a18d7cbc1dc4bb5295a8712e9eb662" +dependencies = [ + "bitflags 2.4.0", + "errno", + "libc", + "linux-raw-sys 0.4.10", "windows-sys", ] @@ -1476,6 +1776,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" + [[package]] name = "ryu" version = "1.0.15" @@ -1519,7 +1825,7 @@ version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -1567,6 +1873,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1591,13 +1907,10 @@ dependencies = [ ] [[package]] -name = "sharded-slab" -version = "0.1.4" +name = "sha1_smol" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" -dependencies = [ - "lazy_static", -] +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" [[package]] name = "signal-hook-registry" @@ -1672,6 +1985,46 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "tempfile" +version = "3.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" +dependencies = [ + "cfg-if", + "fastrand 2.0.1", + "redox_syscall", + "rustix 0.38.13", + "windows-sys", +] + [[package]] name = "termcolor" version = "1.2.0" @@ -1701,16 +2054,6 @@ dependencies = [ "syn", ] -[[package]] -name = "thread_local" -version = "1.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" -dependencies = [ - "cfg-if", - "once_cell", -] - [[package]] name = "tinyvec" version = "1.6.0" @@ -1756,6 +2099,27 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.23.4" @@ -1767,6 +2131,16 @@ dependencies = [ "webpki", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.7", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.14" @@ -1804,6 +2178,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -1841,32 +2237,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" dependencies = [ "once_cell", - "valuable", -] - -[[package]] -name = "tracing-log" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - -[[package]] -name = "tracing-subscriber" -version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" -dependencies = [ - "nu-ansi-term", - "sharded-slab", - "smallvec", - "thread_local", - "tracing-core", - "tracing-log", ] [[package]] @@ -1945,6 +2315,7 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] @@ -1959,18 +2330,18 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" -[[package]] -name = "valuable" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" - [[package]] name = "value-bag" version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" @@ -2016,7 +2387,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "tokio", - "tokio-rustls", + "tokio-rustls 0.23.4", "tokio-stream", "tokio-tungstenite", "tokio-util", @@ -2116,6 +2487,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki-roots" +version = "0.25.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" + [[package]] name = "webtransport-generic" version = "0.5.0" @@ -2128,20 +2505,21 @@ dependencies = [ [[package]] name = "webtransport-proto" -version = "0.5.4" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54d41127a79f4d34112114b626f71d197c3ddf4fc82d56ccddc03a851bd0ea4f" +checksum = "ebeada5037d6302980ae2e0ab8d840e329c1697c612c6c077172de2b7631a276" dependencies = [ "bytes", "http", "thiserror", + "url", ] [[package]] name = "webtransport-quinn" -version = "0.5.4" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a7cccdcf10a2fb3a18ebd51fb8734e385624cb04fde38b239dbda0f1e40ba21" +checksum = "cceb876dbd00a87b3fd8869d1c315e07c28b0eb54d59b592a07a634f5e2b64e1" dependencies = [ "async-std", "bytes", @@ -2151,6 +2529,7 @@ dependencies = [ "quinn-proto", "thiserror", "tokio", + "url", "webtransport-generic", "webtransport-proto", ] @@ -2251,3 +2630,13 @@ name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys", +] diff --git a/Cargo.toml b/Cargo.toml index 142b191..a5bae56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] -members = ["moq-transport", "moq-relay", "moq-pub"] +members = ["moq-transport", "moq-relay", "moq-pub", "moq-api"] resolver = "2" diff --git a/README.md b/README.md index 2f8d342..392a03b 100644 --- a/README.md +++ b/README.md @@ -6,39 +6,17 @@ Media over QUIC (MoQ) is a live media delivery protocol utilizing QUIC streams. See [quic.video](https://quic.video) for more information. This repository contains a few crates: -- **moq-relay**: A relay server, accepting content from publishers and fanning it out to subscribers. -- **moq-pub**: A publish client, accepting media from stdin (ex. via ffmpeg) and sending it to a remote server. -- **moq-transport**: An async implementation of the underlying MoQ protocol. -There's currently no way to actually view content with `moq-rs`; you'll need to use [moq-js](https://github.com/kixelated/moq-js) for that. +- **moq-relay**: A relay server, accepting content from publishers and fanning it out to subscribers. +- **moq-pub**: A publish client, accepting media from stdin (ex. via ffmpeg) and sending it to a remote server. +- **moq-transport**: An async implementation of the underlying MoQ protocol. +- **moq-api**: A HTTP API server that stores the origin for each broadcast, backed by redis. -## Setup +There's currently no way to view media with `moq-rs`; you'll need to use [moq-js](https://github.com/kixelated/moq-js) for that. -### Certificates +## Development -Unfortunately, QUIC mandates TLS and makes local development difficult. -If you have a valid certificate you can use it instead of self-signing. - -Use [mkcert](https://github.com/FiloSottile/mkcert) to generate a self-signed certificate. -Unfortunately, this currently requires Go in order to [fork](https://github.com/FiloSottile/mkcert/pull/513) the tool. - -```bash -./dev/cert -``` - -Unfortunately, WebTransport in Chrome currently (May 2023) doesn't verify certificates using the root CA. -The workaround is to use the `serverFingerprints` options, which requires the certificate MUST be only valid for at most **14 days**. -This is also why we're using a fork of mkcert, because it generates certificates valid for years by default. -This limitation will be removed once Chrome uses the system CA for WebTransport. - -### Media - -If you're using `moq-pub` then you'll want some test footage to broadcast. - -```bash -mkdir media -wget http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4 -O dev/source.mp4 -``` +See the [dev/README.md] helper scripts for local development. ## Usage @@ -46,53 +24,42 @@ wget http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBun **moq-relay** is a server that forwards subscriptions from publishers to subscribers, caching and deduplicating along the way. It's designed to be run in a datacenter, relaying media across multiple hops to deduplicate and improve QoS. - -You can run the development server with the following command, automatically using the self-signed certificate generated earlier: - -```bash -./dev/relay -``` +The relays register themselves via the [moq-api] endpoints, which is used to discover other relays and share broadcasts. Notable arguments: -- `--bind ` Listen on this address [default: [::]:4443] +- `--listen ` Listen on this address [default: [::]:4443] - `--cert ` Use the certificate file at this path - `--key ` Use the private key at this path +- `--fingerprint` Listen via HTTPS as well, serving the `/fingerprint` of the self-signed certificate. (dev only) This listens for WebTransport connections on `UDP https://localhost:4443` by default. You need a client to connect to that address, to both publish and consume media. -The server also listens on `TCP localhost:4443` when in development mode. -This is exclusively to serve a `/fingerprint` endpoint via HTTPS for self-signed certificates, which are not needed in production. - ### moq-pub This is a client that publishes a fMP4 stream from stdin over MoQ. This can be combined with ffmpeg (and other tools) to produce a live stream. -The following command runs a development instance, broadcasing `dev/source.mp4` to `localhost:4443`: - -```bash -./dev/pub -``` - Notable arguments: -- `` connect to the given address, which must start with moq://. +- `` connect to the given address, which must start with https:// for WebTransport. -### moq-js +**NOTE**: We're very particular about the fMP4 ingested. See [dev/pub] for the required ffmpeg flags. -There's currently no way to consume broadcasts with `moq-rs`, at least until somebody writes `moq-sub`. -Until then, you can use [moq.js](https://github.com/kixelated/moq-js) both watch broadcasts and publish broadcasts. +### moq-transport -There's a hosted version available at [quic.video](https://quic.video/). -There's a secret `?server` parameter that can be used to connect to a different address. +A media-agnostic library used by [moq-relay] and [moq-pub] to serve the underlying subscriptions. +It has caching/deduplication built-in, so your application is oblivious to the number of connections under the hood. +Somebody build a non-media application using this library and I'll link it here! -- Publish to localhost: `https://quic.video/publish/?server=localhost:4443` -- Watch from localhost: `https://quic.video/watch//?server=localhost:4443` +See the published [crate](https://crates.io/crates/moq-transport) and [documentation](https://docs.rs/moq-transport/latest/moq_transport/). -Note that self-signed certificates are ONLY supported if the server name starts with `localhost`. -You'll need to add an entry to `/etc/hosts` if you want to use a self-signed certs and an IP address. +### moq-api + +This is a API server that exposes a REST API. +It's used by relays to inserts themselves as origins when publishing, and to find the origin when subscribing. +It's basically just a thin wrapper around redis. ## License diff --git a/dev/README.md b/dev/README.md new file mode 100644 index 0000000..d952b72 --- /dev/null +++ b/dev/README.md @@ -0,0 +1,102 @@ +# dev + +This is a collection of helpful scripts for local development ONLY. + +## Setup + +### moq-relay + +Unfortunately, QUIC mandates TLS and makes local development difficult. +If you have a valid certificate you can use it instead of self-signing. + +Use [mkcert](https://github.com/FiloSottile/mkcert) to generate a self-signed certificate. +Unfortunately, this currently requires [Go](https://golang.org/) to be installed in order to [fork](https://github.com/FiloSottile/mkcert/pull/513) the tool. +Somebody should get that merged or make something similar in Rust... + +```bash +./dev/cert +``` + +Unfortunately, WebTransport in Chrome currently (May 2023) doesn't verify certificates using the root CA. +The workaround is to use the `serverFingerprints` options, which requires the certificate MUST be only valid for at most **14 days**. +This is also why we're using a fork of mkcert, because it generates certificates valid for years by default. +This limitation will be removed once Chrome uses the system CA for WebTransport. + +### moq-pub + +You'll want some test footage to broadcast. +Anything works, but make sure the codec is supported by the player since `moq-pub` does not re-encode. + +Here's a criticially acclaimed short film: + +```bash +mkdir media +wget http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4 -O dev/source.mp4 +``` + +`moq-pub` uses [ffmpeg](https://ffmpeg.org/) to convert the media to fMP4. +You should have it installed already if you're a video nerd, otherwise: + +```bash +brew install ffmpeg +``` + +### moq-api + +`moq-api` uses a redis instance to store active origins for clustering. +This is not relevant for most local development and the code path is skipped by default. + +However, if you want to test the clustering, you'll need either either [Docker](https://www.docker.com/) or [Podman](https://podman.io/) installed. +We run the redis instance via a container automatically as part of `dev/api`. + +## Development + +**tl;dr** run these commands in seperate terminals: + +```bash +./dev/cert +./dev/relay +./dev/pub +``` + +They will each print out a URL you can use to publish/watch broadcasts. + +### moq-relay + +You can run the relay with the following command, automatically using the self-signed certificates generated earlier. +This listens for WebTransport connections on WebTransport `https://localhost:4443` by default. + +```bash +./dev/relay +``` + +### moq-pub + +The following command runs a development instance, broadcasing `dev/source.mp4` to WebTransport `https://localhost:4443`: + +```bash +./dev/pub +``` + +### moq-api + +The following commands runs an API server, listening for HTTP requests on `http://localhost:4442` by default. + +```bash +./dev/api +``` + +Nodes can now register themselves via the API, which means you can run multiple interconnected relays. +There's two separate `dev/relay-0` and `dev/relay-1` scripts to test clustering locally: + +```bash +./dev/relay-0 +./dev/relay-1 +``` + +These listen on `:4443` and `:4444` respectively, inserting themselves into the origin database as `localhost:$PORT`. + +There's also a separate `dev/pub-1` script to publish to the `:4444` instance. +You can use the exisitng `dev/pub` script to publish to the `:4443` instance. + +If all goes well, you would be able to publish to one relay and watch from the other. diff --git a/dev/api b/dev/api new file mode 100755 index 0000000..f59b60d --- /dev/null +++ b/dev/api @@ -0,0 +1,45 @@ +#!/bin/bash +set -euo pipefail + +# Change directory to the root of the project +cd "$(dirname "$0")/.." + +# Run the API server on port 4442 by default +HOST="${HOST:-[::]}" +PORT="${PORT:-4442}" +LISTEN="${LISTEN:-$HOST:$PORT}" + +# Default to info log level +export RUST_LOG="${RUST_LOG:-info}" + +# Check for Podman/Docker and set runtime accordingly +if command -v podman &> /dev/null; then + RUNTIME=podman +elif command -v docker &> /dev/null; then + RUNTIME=docker +else + echo "Neither podman or docker found in PATH. Exiting." + exit 1 +fi + +REDIS_PORT=${REDIS_PORT:-6400} # The default is 6379, but we'll use 6400 to avoid conflicts + +# Cleanup function to stop Redis when script exits +cleanup() { + $RUNTIME rm -f moq-redis || true +} + +# Stop the redis instance if it's still running +cleanup + +# Run a Redis instance +REDIS_CONTAINER=$($RUNTIME run --rm --name moq-redis -d -p "$REDIS_PORT:6379" redis:latest) + +# Cleanup function to stop Redis when script exits +trap cleanup EXIT + +# Default to a sqlite database in memory +DATABASE="${DATABASE-sqlite::memory:}" + +# Run the relay and forward any arguments +cargo run --bin moq-api -- --listen "$LISTEN" --redis "redis://localhost:$REDIS_PORT" "$@" diff --git a/dev/pub b/dev/pub index 8bc94c4..35fbca6 100755 --- a/dev/pub +++ b/dev/pub @@ -4,22 +4,29 @@ set -euo pipefail # Change directory to the root of the project cd "$(dirname "$0")/.." +export RUST_LOG="${RUST_LOG:-info}" + # Connect to localhost by default. -HOST="${HOST:-localhost:4443}" +HOST="${HOST:-localhost}" +PORT="${PORT:-4443}" +ADDR="${ADDR:-$HOST:$PORT}" # Generate a random 16 character name by default. NAME="${NAME:-$(head /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c 16)}" -# Combine the host and name into a URI. -URI="${URI:-"moq://$HOST/$NAME"}" +# Combine the host and name into a URL. +URL="${URL:-"https://$ADDR/$NAME"}" # Default to a source video MEDIA="${MEDIA:-dev/source.mp4}" +# Print out the watch URL +echo "Watch URL: https://quic.video/watch/$NAME?server=$ADDR" + # Run ffmpeg and pipe the output to moq-pub ffmpeg -hide_banner -v quiet \ -stream_loop -1 -re \ -i "$MEDIA" \ -an \ -f mp4 -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset - \ - | RUST_LOG=info cargo run --bin moq-pub -- "$URI" "$@" + | RUST_LOG=info cargo run --bin moq-pub -- "$URL" "$@" diff --git a/dev/pub-1 b/dev/pub-1 new file mode 100755 index 0000000..8a70a5c --- /dev/null +++ b/dev/pub-1 @@ -0,0 +1,10 @@ +#!/bin/bash +set -euo pipefail + +# Change directory to the root of the project +cd "$(dirname "$0")/.." + +# Connect to the 2nd relay by default. +export PORT="${PORT:-4444}" + +./dev/pub diff --git a/dev/relay b/dev/relay index 8646e3c..c5fd379 100755 --- a/dev/relay +++ b/dev/relay @@ -4,10 +4,34 @@ set -euo pipefail # Change directory to the root of the project cd "$(dirname "$0")/.." +# Use info logging by default +export RUST_LOG="${RUST_LOG:-info}" + # Default to a self-signed certificate # TODO automatically generate if it doesn't exist. CERT="${CERT:-dev/localhost.crt}" KEY="${KEY:-dev/localhost.key}" +# Default to listening on localhost:4443 +HOST="${HOST:-[::]}" +PORT="${PORT:-4443}" +LISTEN="${LISTEN:-$HOST:$PORT}" + +# A list of optional args +ARGS="" + +# Connect to the given URL to get origins. +# TODO default to a public instance? +if [ -n "$API" ]; then + ARGS="$ARGS --api $API" +fi + +# Provide our node URL when registering origins. +if [ -n "$NODE" ]; then + ARGS="$ARGS --node $NODE" +fi + +echo "Publish URL: https://quic.video/publish/?server=localhost:${PORT}" + # Run the relay and forward any arguments -RUST_LOG=info cargo run --bin moq-relay -- --cert "$CERT" --key "$KEY" --fingerprint "$@" +cargo run --bin moq-relay -- --listen "$LISTEN" --cert "$CERT" --key "$KEY" --fingerprint $ARGS -- "$@" diff --git a/dev/relay-0 b/dev/relay-0 new file mode 100755 index 0000000..ec67141 --- /dev/null +++ b/dev/relay-0 @@ -0,0 +1,12 @@ +#!/bin/bash +set -euo pipefail + +# Change directory to the root of the project +cd "$(dirname "$0")/.." + +# Run an instance that advertises itself to the origin API. +export PORT="${PORT:-4443}" +export API="${API:-http://localhost:4442}" # TODO support HTTPS +export NODE="${NODE:-https://localhost:$PORT}" + +./dev/relay diff --git a/dev/relay-1 b/dev/relay-1 new file mode 100755 index 0000000..0f5acc5 --- /dev/null +++ b/dev/relay-1 @@ -0,0 +1,12 @@ +#!/bin/bash +set -euo pipefail + +# Change directory to the root of the project +cd "$(dirname "$0")/.." + +# Run an instance that advertises itself to the origin API. +export PORT="${PORT:-4444}" +export API="${API:-http://localhost:4442}" # TODO support HTTPS +export NODE="${NODE:-https://localhost:$PORT}" + +./dev/relay diff --git a/dev/setup b/dev/setup new file mode 100644 index 0000000..e5adf60 --- /dev/null +++ b/dev/setup @@ -0,0 +1,2 @@ +#!/bin/bash +set -euo pipefail diff --git a/moq-api/Cargo.toml b/moq-api/Cargo.toml new file mode 100644 index 0000000..282e82b --- /dev/null +++ b/moq-api/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "moq-api" +description = "Media over QUIC" +authors = ["Luke Curley"] +repository = "https://github.com/kixelated/moq-rs" +license = "MIT OR Apache-2.0" + +version = "0.0.1" +edition = "2021" + +keywords = ["quic", "http3", "webtransport", "media", "live"] +categories = ["multimedia", "network-programming", "web-programming"] + + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +# HTTP server +axum = "0.6" +hyper = { version = "0.14", features = ["full"] } +tokio = { version = "1", features = ["full"] } +tower = "0.4" + +# HTTP client +reqwest = { version = "0.11", features = ["json", "rustls-tls"] } + +# JSON encoding +serde = "1" +serde_json = "1" + +# CLI +clap = { version = "4", features = ["derive"] } + +# Database +redis = { version = "0.23", features = [ + "tokio-rustls-comp", + "connection-manager", +] } +url = { version = "2", features = ["serde"] } + +# Error handling +log = "0.4" +env_logger = "0.9" +thiserror = "1" diff --git a/moq-api/README.md b/moq-api/README.md new file mode 100644 index 0000000..324dad8 --- /dev/null +++ b/moq-api/README.md @@ -0,0 +1,4 @@ +# moq-api + +A thin HTTP API that wraps Redis. +Basically I didn't want the relays connecting to Redis directly. diff --git a/moq-api/src/client.rs b/moq-api/src/client.rs new file mode 100644 index 0000000..d60a417 --- /dev/null +++ b/moq-api/src/client.rs @@ -0,0 +1,47 @@ +use url::Url; + +use crate::{ApiError, Origin}; + +#[derive(Clone)] +pub struct Client { + // The address of the moq-api server + url: Url, + + client: reqwest::Client, +} + +impl Client { + pub fn new(url: Url) -> Self { + let client = reqwest::Client::new(); + Self { url, client } + } + + pub async fn get_origin(&self, id: &str) -> Result, ApiError> { + let url = self.url.join("origin/")?.join(id)?; + let resp = self.client.get(url).send().await?; + if resp.status() == reqwest::StatusCode::NOT_FOUND { + return Ok(None); + } + + let origin: Origin = resp.json().await?; + Ok(Some(origin)) + } + + pub async fn set_origin(&mut self, id: &str, origin: Origin) -> Result<(), ApiError> { + let url = self.url.join("origin/")?.join(id)?; + + let resp = self.client.post(url).json(&origin).send().await?; + resp.error_for_status()?; + + Ok(()) + } + + pub async fn delete_origin(&mut self, id: &str) -> Result<(), ApiError> { + let url = self.url.join("origin/")?.join(id)?; + + let resp = self.client.delete(url).send().await?; + resp.error_for_status()?; + + Ok(()) + } +} diff --git a/moq-api/src/error.rs b/moq-api/src/error.rs new file mode 100644 index 0000000..e1891d9 --- /dev/null +++ b/moq-api/src/error.rs @@ -0,0 +1,16 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ApiError { + #[error("redis error: {0}")] + Redis(#[from] redis::RedisError), + + #[error("reqwest error: {0}")] + Request(#[from] reqwest::Error), + + #[error("hyper error: {0}")] + Hyper(#[from] hyper::Error), + + #[error("url error: {0}")] + Url(#[from] url::ParseError), +} diff --git a/moq-api/src/lib.rs b/moq-api/src/lib.rs new file mode 100644 index 0000000..be117a0 --- /dev/null +++ b/moq-api/src/lib.rs @@ -0,0 +1,7 @@ +mod client; +mod error; +mod model; + +pub use client::*; +pub use error::*; +pub use model::*; diff --git a/moq-api/src/main.rs b/moq-api/src/main.rs new file mode 100644 index 0000000..daeebe5 --- /dev/null +++ b/moq-api/src/main.rs @@ -0,0 +1,14 @@ +use clap::Parser; + +mod server; +use moq_api::ApiError; +use server::{Server, ServerConfig}; + +#[tokio::main] +async fn main() -> Result<(), ApiError> { + env_logger::init(); + + let config = ServerConfig::parse(); + let server = Server::new(config); + server.run().await +} diff --git a/moq-api/src/model.rs b/moq-api/src/model.rs new file mode 100644 index 0000000..18ab5ed --- /dev/null +++ b/moq-api/src/model.rs @@ -0,0 +1,8 @@ +use serde::{Deserialize, Serialize}; + +use url::Url; + +#[derive(Serialize, Deserialize)] +pub struct Origin { + pub url: Url, +} diff --git a/moq-api/src/server.rs b/moq-api/src/server.rs new file mode 100644 index 0000000..671d83a --- /dev/null +++ b/moq-api/src/server.rs @@ -0,0 +1,145 @@ +use std::net; + +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::get, + Json, Router, +}; + +use clap::Parser; + +use redis::{aio::ConnectionManager, AsyncCommands}; + +use moq_api::{ApiError, Origin}; + +/// Runs a HTTP API to create/get origins for broadcasts. +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +pub struct ServerConfig { + /// Listen for HTTP requests on the given address + #[arg(long)] + pub listen: net::SocketAddr, + + /// Connect to the given redis instance + #[arg(long)] + pub redis: url::Url, +} + +pub struct Server { + config: ServerConfig, +} + +impl Server { + pub fn new(config: ServerConfig) -> Self { + Self { config } + } + + pub async fn run(self) -> Result<(), ApiError> { + log::info!("connecting to redis: url={}", self.config.redis); + + // Create the redis client. + let redis = redis::Client::open(self.config.redis)?; + let redis = redis + .get_tokio_connection_manager() // TODO get_tokio_connection_manager_with_backoff? + .await?; + + let app = Router::new() + .route("/origin/:id", get(get_origin).post(set_origin).delete(delete_origin)) + .with_state(redis); + + log::info!("serving requests: bind={}", self.config.listen); + + axum::Server::bind(&self.config.listen) + .serve(app.into_make_service()) + .await?; + + Ok(()) + } +} + +async fn get_origin( + Path(id): Path, + State(mut redis): State, +) -> Result, AppError> { + let key = origin_key(&id); + + log::debug!("get_origin: id={}", id); + + let payload: String = match redis.get(&key).await? { + Some(payload) => payload, + None => return Err(AppError::NotFound), + }; + + let origin: Origin = serde_json::from_str(&payload)?; + + Ok(Json(origin)) +} + +async fn set_origin( + State(mut redis): State, + Path(id): Path, + Json(origin): Json, +) -> Result<(), AppError> { + // TODO validate origin + + let key = origin_key(&id); + + // Convert the input back to JSON after validating it add adding any fields (TODO) + let payload = serde_json::to_string(&origin)?; + + let res: Option = redis::cmd("SET") + .arg(key) + .arg(payload) + .arg("NX") + .arg("EX") + .arg(60 * 60 * 24 * 2) // Set the key to expire in 2 days; just in case we forget to remove it. + .query_async(&mut redis) + .await?; + + if res.is_none() { + return Err(AppError::Duplicate); + } + + Ok(()) +} + +async fn delete_origin(Path(id): Path, State(mut redis): State) -> Result<(), AppError> { + let key = origin_key(&id); + match redis.del(key).await? { + 0 => Err(AppError::NotFound), + _ => Ok(()), + } +} + +fn origin_key(id: &str) -> String { + format!("origin.{}", id) +} + +#[derive(thiserror::Error, Debug)] +enum AppError { + #[error("redis error")] + Redis(#[from] redis::RedisError), + + #[error("json error")] + Json(#[from] serde_json::Error), + + #[error("not found")] + NotFound, + + #[error("duplicate ID")] + Duplicate, +} + +// Tell axum how to convert `AppError` into a response. +impl IntoResponse for AppError { + fn into_response(self) -> Response { + match self { + AppError::Redis(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("redis error: {}", e)).into_response(), + AppError::Json(e) => (StatusCode::INTERNAL_SERVER_ERROR, format!("json error: {}", e)).into_response(), + AppError::NotFound => StatusCode::NOT_FOUND.into_response(), + AppError::Duplicate => StatusCode::CONFLICT.into_response(), + } + } +} diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml index e5aa7d2..7c323bf 100644 --- a/moq-pub/Cargo.toml +++ b/moq-pub/Cargo.toml @@ -18,29 +18,29 @@ moq-transport = { path = "../moq-transport" } # QUIC quinn = "0.10" -webtransport-quinn = "0.5" -webtransport-generic = "0.5" -http = "0.2.9" +webtransport-quinn = "0.6" +#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" } +url = "2" # Crypto -ring = "0.16.20" -rustls = "0.21.2" -rustls-pemfile = "1.0.2" +ring = "0.16" +rustls = "0.21" +rustls-pemfile = "1" # Async stuff -tokio = { version = "1.27", features = ["full"] } +tokio = { version = "1", features = ["full"] } # CLI, logging, error handling -clap = { version = "4.0", features = ["derive"] } +clap = { version = "4", features = ["derive"] } log = { version = "0.4", features = ["std"] } -env_logger = "0.9.3" -mp4 = "0.13.0" -rustls-native-certs = "0.6.3" -anyhow = { version = "1.0.70", features = ["backtrace"] } -serde_json = "1.0.105" -rfc6381-codec = "0.1.0" +env_logger = "0.9" +mp4 = "0.13" +rustls-native-certs = "0.6" +anyhow = { version = "1", features = ["backtrace"] } +serde_json = "1" +rfc6381-codec = "0.1" [build-dependencies] -http = "0.2.9" -clap = { version = "4.0", features = ["derive"] } -clap_mangen = "0.2.12" +clap = { version = "4", features = ["derive"] } +clap_mangen = "0.2" +url = "2" diff --git a/moq-pub/src/cli.rs b/moq-pub/src/cli.rs index 824f0dd..1e34dcb 100644 --- a/moq-pub/src/cli.rs +++ b/moq-pub/src/cli.rs @@ -1,5 +1,6 @@ use clap::Parser; use std::net; +use url::Url; #[derive(Parser, Clone, Debug)] pub struct Config { @@ -17,18 +18,18 @@ pub struct Config { #[arg(long, default_value = "1500000")] pub bitrate: u32, - /// Connect to the given URI starting with moq:// - #[arg(value_parser = moq_uri)] - pub uri: http::Uri, + /// Connect to the given URL starting with https:// + #[arg(value_parser = moq_url)] + pub url: Url, } -fn moq_uri(s: &str) -> Result { - let uri = http::Uri::try_from(s).map_err(|e| e.to_string())?; +fn moq_url(s: &str) -> Result { + let url = Url::try_from(s).map_err(|e| e.to_string())?; // Make sure the scheme is moq - if uri.scheme_str() != Some("moq") { - return Err("uri scheme must be moq".to_string()); + if url.scheme() != "https" { + return Err("url scheme must be https:// for WebTransport".to_string()); } - Ok(uri) + Ok(url) } diff --git a/moq-pub/src/main.rs b/moq-pub/src/main.rs index 0f01e32..2f0d2a8 100644 --- a/moq-pub/src/main.rs +++ b/moq-pub/src/main.rs @@ -7,7 +7,7 @@ use cli::*; mod media; use media::*; -use moq_transport::model::broadcast; +use moq_transport::cache::broadcast; // TODO: clap complete @@ -39,14 +39,9 @@ async fn main() -> anyhow::Result<()> { let mut endpoint = quinn::Endpoint::client(config.bind)?; endpoint.set_default_client_config(quinn_client_config); - log::info!("connecting to {}", config.uri); + log::info!("connecting to relay: url={}", config.url); - // Change the uri scheme to "https" for WebTransport - let mut parts = config.uri.into_parts(); - parts.scheme = Some(http::uri::Scheme::HTTPS); - let uri = http::Uri::from_parts(parts)?; - - let session = webtransport_quinn::connect(&endpoint, &uri) + let session = webtransport_quinn::connect(&endpoint, &config.url) .await .context("failed to create WebTransport session")?; diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs index e74ddd4..d3babd6 100644 --- a/moq-pub/src/media.rs +++ b/moq-pub/src/media.rs @@ -1,6 +1,6 @@ use crate::cli::Config; use anyhow::{self, Context}; -use moq_transport::model::{broadcast, segment, track}; +use moq_transport::cache::{broadcast, segment, track}; use moq_transport::VarInt; use mp4::{self, ReadBox}; use serde_json::json; diff --git a/moq-relay/Cargo.toml b/moq-relay/Cargo.toml index b6907fd..35d4ca6 100644 --- a/moq-relay/Cargo.toml +++ b/moq-relay/Cargo.toml @@ -13,28 +13,35 @@ categories = ["multimedia", "network-programming", "web-programming"] [dependencies] moq-transport = { path = "../moq-transport" } +moq-api = { path = "../moq-api" } # QUIC quinn = "0.10" -webtransport-generic = "0.5" -webtransport-quinn = "0.5" +webtransport-quinn = "0.6" +#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" } +url = "2" # Crypto -ring = "0.16.20" -rustls = "0.21.2" -rustls-pemfile = "1.0.2" +ring = "0.16" +rustls = "0.21" +rustls-pemfile = "1" +rustls-native-certs = "0.6" # Async stuff -tokio = { version = "1.27", features = ["full"] } +tokio = { version = "1", features = ["full"] } # Web server to serve the fingerprint -warp = { version = "0.3.3", features = ["tls"] } -hex = "0.4.3" +warp = { version = "0.3", features = ["tls"] } +hex = "0.4" + +# Error handling +anyhow = { version = "1", features = ["backtrace"] } +thiserror = "1" + +# CLI +clap = { version = "4", features = ["derive"] } # Logging -clap = { version = "4.0", features = ["derive"] } log = { version = "0.4", features = ["std"] } -env_logger = "0.9.3" -anyhow = "1.0.70" +env_logger = "0.9" tracing = "0.1" -tracing-subscriber = "0.3.0" diff --git a/moq-relay/src/config.rs b/moq-relay/src/config.rs index 070edce..7ca82aa 100644 --- a/moq-relay/src/config.rs +++ b/moq-relay/src/config.rs @@ -1,4 +1,5 @@ use std::{net, path}; +use url::Url; use clap::Parser; @@ -7,7 +8,7 @@ use clap::Parser; pub struct Config { /// Listen on this address #[arg(long, default_value = "[::]:4443")] - pub bind: net::SocketAddr, + pub listen: net::SocketAddr, /// Use the certificate file at this path #[arg(long)] @@ -20,4 +21,15 @@ pub struct Config { /// Listen on HTTPS and serve /fingerprint, for self-signed certificates #[arg(long, action)] pub fingerprint: bool, + + /// Optional: Use the moq-api via HTTP to store origin information. + #[arg(long)] + pub api: Option, + + /// Our internal address which we advertise to other origins. + /// We use QUIC, so the certificate must be valid for this address. + /// This needs to be prefixed with https:// to use WebTransport + /// This is only used when --api is set. + #[arg(long)] + pub node: Option, } diff --git a/moq-relay/src/error.rs b/moq-relay/src/error.rs new file mode 100644 index 0000000..54813ad --- /dev/null +++ b/moq-relay/src/error.rs @@ -0,0 +1,51 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum RelayError { + #[error("transport error: {0}")] + Transport(#[from] moq_transport::session::SessionError), + + #[error("cache error: {0}")] + Cache(#[from] moq_transport::cache::CacheError), + + #[error("api error: {0}")] + MoqApi(#[from] moq_api::ApiError), + + #[error("url error: {0}")] + Url(#[from] url::ParseError), + + #[error("webtransport client error: {0}")] + WebTransportClient(#[from] webtransport_quinn::ClientError), + + #[error("webtransport server error: {0}")] + WebTransportServer(#[from] webtransport_quinn::ServerError), + + #[error("missing node")] + MissingNode, +} + +impl moq_transport::MoqError for RelayError { + fn code(&self) -> u32 { + match self { + Self::Transport(err) => err.code(), + Self::Cache(err) => err.code(), + Self::MoqApi(_err) => 504, + Self::Url(_) => 500, + Self::MissingNode => 500, + Self::WebTransportClient(_) => 504, + Self::WebTransportServer(_) => 500, + } + } + + fn reason(&self) -> &str { + match self { + Self::Transport(err) => err.reason(), + Self::Cache(err) => err.reason(), + Self::MoqApi(_err) => "api error", + Self::Url(_) => "url error", + Self::MissingNode => "missing node", + Self::WebTransportServer(_) => "server error", + Self::WebTransportClient(_) => "upstream error", + } + } +} diff --git a/moq-relay/src/main.rs b/moq-relay/src/main.rs index 471eea7..960a49b 100644 --- a/moq-relay/src/main.rs +++ b/moq-relay/src/main.rs @@ -6,10 +6,14 @@ use ring::digest::{digest, SHA256}; use warp::Filter; mod config; +mod error; +mod origin; mod server; mod session; pub use config::*; +pub use error::*; +pub use origin::*; pub use server::*; pub use session::*; @@ -18,15 +22,17 @@ async fn main() -> anyhow::Result<()> { env_logger::init(); // Disable tracing so we don't get a bunch of Quinn spam. + /* let tracer = tracing_subscriber::FmtSubscriber::builder() .with_max_level(tracing::Level::WARN) .finish(); tracing::subscriber::set_global_default(tracer).unwrap(); + */ let config = Config::parse(); // Create a server to actually serve the media - let server = Server::new(config.clone()).context("failed to create server")?; + let server = Server::new(config.clone()).await.context("failed to create server")?; // Run all of the above tokio::select! { @@ -63,7 +69,7 @@ async fn serve_http(config: Config) -> anyhow::Result<()> { .tls() .cert_path(config.cert) .key_path(config.key) - .run(config.bind) + .run(config.listen) .await; Ok(()) diff --git a/moq-relay/src/origin.rs b/moq-relay/src/origin.rs new file mode 100644 index 0000000..b9f96fb --- /dev/null +++ b/moq-relay/src/origin.rs @@ -0,0 +1,144 @@ +use std::{ + collections::{hash_map, HashMap}, + sync::{Arc, Mutex}, +}; + +use moq_transport::cache::{broadcast, CacheError}; +use url::Url; + +use crate::RelayError; + +#[derive(Clone)] +pub struct Origin { + // An API client used to get/set broadcasts. + // If None then we never use a remote origin. + api: Option, + + // The internal address of our node. + // If None then we can never advertise ourselves as an origin. + node: Option, + + // A map of active broadcasts. + lookup: Arc>>, + + // A QUIC endpoint we'll use to fetch from other origins. + quic: quinn::Endpoint, +} + +impl Origin { + pub fn new(api: Option, node: Option, quic: quinn::Endpoint) -> Self { + Self { + api, + node, + lookup: Default::default(), + quic, + } + } + + pub async fn create_broadcast(&mut self, id: &str) -> Result { + let (publisher, subscriber) = broadcast::new(); + + // Check if a broadcast already exists by that id. + match self.lookup.lock().unwrap().entry(id.to_string()) { + hash_map::Entry::Occupied(_) => return Err(CacheError::Duplicate.into()), + hash_map::Entry::Vacant(v) => v.insert(subscriber), + }; + + if let Some(ref mut api) = self.api { + // Make a URL for the broadcast. + let url = self.node.as_ref().ok_or(RelayError::MissingNode)?.clone().join(id)?; + + log::info!("announcing origin: id={} url={}", id, url); + + let entry = moq_api::Origin { url }; + + if let Err(err) = api.set_origin(id, entry).await { + self.lookup.lock().unwrap().remove(id); + return Err(err.into()); + } + } + + Ok(publisher) + } + + pub fn get_broadcast(&self, id: &str) -> broadcast::Subscriber { + let mut lookup = self.lookup.lock().unwrap(); + + if let Some(broadcast) = lookup.get(id) { + if broadcast.closed().is_none() { + return broadcast.clone(); + } + } + + let (publisher, subscriber) = broadcast::new(); + lookup.insert(id.to_string(), subscriber.clone()); + + let mut this = self.clone(); + let id = id.to_string(); + + // Rather than fetching from the API and connecting via QUIC inline, we'll spawn a task to do it. + // This way we could stop polling this session and it won't impact other session. + // It also means we'll only connect the API and QUIC once if N subscribers suddenly show up. + // However, the downside is that we don't return an error immediately. + // If that's important, it can be done but it gets a bit racey. + tokio::spawn(async move { + match this.fetch_broadcast(&id).await { + Ok(session) => { + if let Err(err) = this.run_broadcast(session, publisher).await { + log::warn!("failed to run broadcast: id={} err={:#?}", id, err); + } + } + Err(err) => { + log::warn!("failed to fetch broadcast: id={} err={:#?}", id, err); + publisher.close(CacheError::NotFound).ok(); + } + } + }); + + subscriber + } + + async fn fetch_broadcast(&mut self, id: &str) -> Result { + // Fetch the origin from the API. + let api = match self.api { + Some(ref mut api) => api, + + // We return NotFound here instead of earlier just to simulate an API fetch. + None => return Err(CacheError::NotFound.into()), + }; + + log::info!("fetching origin: id={}", id); + + let origin = api.get_origin(id).await?.ok_or(CacheError::NotFound)?; + + log::info!("connecting to origin: url={}", origin.url); + + // Establish the webtransport session. + let session = webtransport_quinn::connect(&self.quic, &origin.url).await?; + + Ok(session) + } + + async fn run_broadcast( + &mut self, + session: webtransport_quinn::Session, + broadcast: broadcast::Publisher, + ) -> Result<(), RelayError> { + let session = moq_transport::session::Client::subscriber(session, broadcast).await?; + + session.run().await?; + + Ok(()) + } + + pub async fn remove_broadcast(&mut self, id: &str) -> Result<(), RelayError> { + self.lookup.lock().unwrap().remove(id).ok_or(CacheError::NotFound)?; + + if let Some(ref mut api) = self.api { + log::info!("deleting origin: id={}", id); + api.delete_origin(id).await?; + } + + Ok(()) + } +} diff --git a/moq-relay/src/server.rs b/moq-relay/src/server.rs index cf4fffb..e45fb2c 100644 --- a/moq-relay/src/server.rs +++ b/moq-relay/src/server.rs @@ -1,39 +1,40 @@ use std::{ - collections::HashMap, fs, io::{self, Read}, - sync::{Arc, Mutex}, + sync::Arc, time, }; use anyhow::Context; -use moq_transport::model::broadcast; use tokio::task::JoinSet; -use crate::{Config, Session}; +use crate::{Config, Origin, Session}; pub struct Server { - server: quinn::Endpoint, + quic: quinn::Endpoint, // The active connections. conns: JoinSet>, // The map of active broadcasts by path. - broadcasts: Arc>>, + origin: Origin, } impl Server { // Create a new server - pub fn new(config: Config) -> anyhow::Result { + pub async fn new(config: Config) -> anyhow::Result { // Read the PEM certificate chain let certs = fs::File::open(config.cert).context("failed to open cert file")?; let mut certs = io::BufReader::new(certs); - let certs = rustls_pemfile::certs(&mut certs)? + + let certs: Vec = rustls_pemfile::certs(&mut certs)? .into_iter() .map(rustls::Certificate) .collect(); + anyhow::ensure!(!certs.is_empty(), "could not find certificate"); + // Read the PEM private key let mut keys = fs::File::open(config.key).context("failed to open key file")?; @@ -56,46 +57,84 @@ impl Server { let key = rustls::PrivateKey(keys.remove(0)); - let mut tls_config = rustls::ServerConfig::builder() + // Set up a QUIC endpoint that can act as both a client and server. + + // Create a list of acceptable root certificates. + let mut client_roots = rustls::RootCertStore::empty(); + + // For local development, we'll accept our own certificate. + for cert in &certs { + client_roots.add(cert).context("failed to add our cert to roots")?; + } + + // Add the platform's native root certificates. + for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs") { + client_roots.add(&rustls::Certificate(cert.0)).unwrap(); + } + + let mut client_config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(client_roots) + .with_no_client_auth(); + + let mut server_config = rustls::ServerConfig::builder() .with_safe_default_cipher_suites() .with_safe_default_kx_groups() .with_protocol_versions(&[&rustls::version::TLS13]) - .unwrap() + .context("failed to create server config")? .with_no_client_auth() .with_single_cert(certs, key)?; - tls_config.max_early_data_size = u32::MAX; - tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; - - let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_config)); + server_config.max_early_data_size = u32::MAX; + client_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; + server_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // Enable BBR congestion control // TODO validate the implementation let mut transport_config = quinn::TransportConfig::default(); - transport_config.keep_alive_interval(Some(time::Duration::from_secs(2))); + transport_config.max_idle_timeout(Some(time::Duration::from_secs(10).try_into().unwrap())); + transport_config.keep_alive_interval(Some(time::Duration::from_secs(4))); // TODO make this smarter transport_config.congestion_controller_factory(Arc::new(quinn::congestion::BbrConfig::default())); + let transport_config = Arc::new(transport_config); - server_config.transport = Arc::new(transport_config); - let server = quinn::Endpoint::server(server_config, config.bind)?; + let mut client_config = quinn::ClientConfig::new(Arc::new(client_config)); + let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(server_config)); + server_config.transport_config(transport_config.clone()); + client_config.transport_config(transport_config); - let broadcasts = Default::default(); + // There's a bit more boilerplate to make a generic endpoint. + let runtime = quinn::default_runtime().context("no async runtime")?; + let endpoint_config = quinn::EndpointConfig::default(); + let socket = std::net::UdpSocket::bind(config.listen).context("failed to bind UDP socket")?; + + // Create the generic QUIC endpoint. + let mut quic = quinn::Endpoint::new(endpoint_config, Some(server_config), socket, runtime) + .context("failed to create QUIC endpoint")?; + quic.set_default_client_config(client_config); + + let api = config.api.map(|url| { + log::info!("using moq-api: url={}", url); + moq_api::Client::new(url) + }); + + if let Some(ref node) = config.node { + log::info!("advertising origin: url={}", node); + } + + let origin = Origin::new(api, config.node, quic.clone()); let conns = JoinSet::new(); - Ok(Self { - server, - broadcasts, - conns, - }) + Ok(Self { quic, origin, conns }) } pub async fn run(mut self) -> anyhow::Result<()> { - log::info!("listening on {}", self.server.local_addr()?); + log::info!("listening on {}", self.quic.local_addr()?); loop { tokio::select! { - res = self.server.accept() => { + res = self.quic.accept() => { let conn = res.context("failed to accept QUIC connection")?; - let mut session = Session::new(self.broadcasts.clone()); + let mut session = Session::new(self.origin.clone()); self.conns.spawn(async move { session.run(conn).await }); }, res = self.conns.join_next(), if !self.conns.is_empty() => { diff --git a/moq-relay/src/session.rs b/moq-relay/src/session.rs index 0a8c28d..59403cf 100644 --- a/moq-relay/src/session.rs +++ b/moq-relay/src/session.rs @@ -1,20 +1,17 @@ -use std::{ - collections::{hash_map, HashMap}, - sync::{Arc, Mutex}, -}; - use anyhow::Context; -use moq_transport::{model::broadcast, session::Request, setup::Role}; +use moq_transport::{cache::broadcast, session::Request, setup::Role, MoqError}; + +use crate::Origin; #[derive(Clone)] pub struct Session { - broadcasts: Arc>>, + origin: Origin, } impl Session { - pub fn new(broadcasts: Arc>>) -> Self { - Self { broadcasts } + pub fn new(origin: Origin) -> Self { + Self { origin } } pub async fn run(&mut self, conn: quinn::Connecting) -> anyhow::Result<()> { @@ -35,7 +32,8 @@ impl Session { .await .context("failed to receive WebTransport request")?; - let path = request.uri().path().to_string(); + // Strip any leading and trailing slashes to get the broadcast name. + let path = request.url().path().trim_matches('/').to_string(); log::debug!("received WebTransport CONNECT: id={} path={}", id, path); @@ -45,8 +43,6 @@ impl Session { .await .context("failed to respond to WebTransport request")?; - log::debug!("accepted WebTransport CONNECT: id={} path={}", id, path); - // Perform the MoQ handshake. let request = moq_transport::session::Server::accept(session) .await @@ -59,7 +55,10 @@ impl Session { match role { Role::Publisher => self.serve_publisher(id, request, &path).await, Role::Subscriber => self.serve_subscriber(id, request, &path).await, - Role::Both => request.reject(300), + Role::Both => { + log::warn!("role both not supported: id={}", id); + request.reject(300); + } }; log::debug!("closing connection: id={}", id); @@ -70,18 +69,20 @@ impl Session { async fn serve_publisher(&mut self, id: usize, request: Request, path: &str) { log::info!("serving publisher: id={}, path={}", id, path); - let (publisher, subscriber) = broadcast::new(); - - match self.broadcasts.lock().unwrap().entry(path.to_string()) { - hash_map::Entry::Occupied(_) => return request.reject(409), - hash_map::Entry::Vacant(entry) => entry.insert(subscriber), + let broadcast = match self.origin.create_broadcast(path).await { + Ok(broadcast) => broadcast, + Err(err) => { + log::warn!("error accepting publisher: id={} path={} err={:#?}", id, path, err); + return request.reject(err.code()); + } }; - if let Err(err) = self.run_publisher(request, publisher).await { - log::warn!("error serving pubisher: id={} path={} err={:?}", id, path, err); + if let Err(err) = self.run_publisher(request, broadcast).await { + log::warn!("error serving publisher: id={} path={} err={:#?}", id, path, err); } - self.broadcasts.lock().unwrap().remove(path); + // TODO can we do this on drop? Otherwise we might miss it. + self.origin.remove_broadcast(path).await.ok(); } async fn run_publisher(&mut self, request: Request, publisher: broadcast::Publisher) -> anyhow::Result<()> { @@ -93,15 +94,10 @@ impl Session { async fn serve_subscriber(&mut self, id: usize, request: Request, path: &str) { log::info!("serving subscriber: id={} path={}", id, path); - let broadcast = match self.broadcasts.lock().unwrap().get(path) { - Some(broadcast) => broadcast.clone(), - None => { - return request.reject(404); - } - }; + let broadcast = self.origin.get_broadcast(path); if let Err(err) = self.run_subscriber(request, broadcast).await { - log::warn!("error serving subscriber: id={} path={} err={:?}", id, path, err); + log::warn!("error serving subscriber: id={} path={} err={:#?}", id, path, err); } } diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index d4b63f5..6f59f63 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -15,12 +15,12 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -bytes = "1.4" +bytes = "1" thiserror = "1" -anyhow = "1" -tokio = { version = "1.27", features = ["macros", "io-util", "sync"] } +tokio = { version = "1", features = ["macros", "io-util", "sync"] } log = "0.4" indexmap = "2" quinn = "0.10" -webtransport-quinn = "0.5.4" +webtransport-quinn = "0.6" +#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" } diff --git a/moq-transport/src/model/broadcast.rs b/moq-transport/src/cache/broadcast.rs similarity index 83% rename from moq-transport/src/model/broadcast.rs rename to moq-transport/src/cache/broadcast.rs index 316b7aa..58c28fa 100644 --- a/moq-transport/src/model/broadcast.rs +++ b/moq-transport/src/cache/broadcast.rs @@ -2,23 +2,21 @@ //! //! The [Publisher] can create tracks, either manually or on request. //! It receives all requests by a [Subscriber] for a tracks that don't exist. -//! The simplest implementation is to close every unknown track with [Error::NotFound]. +//! The simplest implementation is to close every unknown track with [CacheError::NotFound]. //! //! A [Subscriber] can request tracks by name. //! If the track already exists, it will be returned. //! If the track doesn't exist, it will be sent to [Unknown] to be handled. //! A [Subscriber] can be cloned to create multiple subscriptions. //! -//! The broadcast is automatically closed with [Error::Closed] when [Publisher] is dropped, or all [Subscriber]s are dropped. +//! The broadcast is automatically closed with [CacheError::Closed] when [Publisher] is dropped, or all [Subscriber]s are dropped. use std::{ collections::{hash_map, HashMap, VecDeque}, fmt, sync::Arc, }; -use crate::Error; - -use super::{track, Watch}; +use super::{track, CacheError, Watch}; /// Create a new broadcast. pub fn new() -> (Publisher, Subscriber) { @@ -35,27 +33,27 @@ pub fn new() -> (Publisher, Subscriber) { struct State { tracks: HashMap, requested: VecDeque, - closed: Result<(), Error>, + closed: Result<(), CacheError>, } impl State { - pub fn get(&self, name: &str) -> Result, Error> { + pub fn get(&self, name: &str) -> Result, CacheError> { // Don't check closed, so we can return from cache. Ok(self.tracks.get(name).cloned()) } - pub fn insert(&mut self, track: track::Subscriber) -> Result<(), Error> { + pub fn insert(&mut self, track: track::Subscriber) -> Result<(), CacheError> { self.closed.clone()?; match self.tracks.entry(track.name.clone()) { - hash_map::Entry::Occupied(_) => return Err(Error::Duplicate), + hash_map::Entry::Occupied(_) => return Err(CacheError::Duplicate), hash_map::Entry::Vacant(v) => v.insert(track), }; Ok(()) } - pub fn request(&mut self, name: &str) -> Result { + pub fn request(&mut self, name: &str) -> Result { self.closed.clone()?; // Create a new track. @@ -70,7 +68,7 @@ impl State { Ok(subscriber) } - pub fn has_next(&self) -> Result { + pub fn has_next(&self) -> Result { // Check if there's any elements in the queue before checking closed. if !self.requested.is_empty() { return Ok(true); @@ -85,7 +83,7 @@ impl State { self.requested.pop_front().expect("no entry in queue") } - pub fn close(&mut self, err: Error) -> Result<(), Error> { + pub fn close(&mut self, err: CacheError) -> Result<(), CacheError> { self.closed.clone()?; self.closed = Err(err); Ok(()) @@ -117,19 +115,19 @@ impl Publisher { } /// Create a new track with the given name, inserting it into the broadcast. - pub fn create_track(&mut self, name: &str) -> Result { + pub fn create_track(&mut self, name: &str) -> Result { let (publisher, subscriber) = track::new(name); self.state.lock_mut().insert(subscriber)?; Ok(publisher) } /// Insert a track into the broadcast. - pub fn insert_track(&mut self, track: track::Subscriber) -> Result<(), Error> { + pub fn insert_track(&mut self, track: track::Subscriber) -> Result<(), CacheError> { self.state.lock_mut().insert(track) } /// Block until the next track requested by a subscriber. - pub async fn next_track(&mut self) -> Result, Error> { + pub async fn next_track(&mut self) -> Result, CacheError> { loop { let notify = { let state = self.state.lock(); @@ -145,7 +143,7 @@ impl Publisher { } /// Close the broadcast with an error. - pub fn close(self, err: Error) -> Result<(), Error> { + pub fn close(self, err: CacheError) -> Result<(), CacheError> { self.state.lock_mut().close(err) } } @@ -173,8 +171,8 @@ impl Subscriber { /// Get a track from the broadcast by name. /// If the track does not exist, it will be created and potentially fufilled by the publisher (via Unknown). - /// Otherwise, it will return [Error::NotFound]. - pub fn get_track(&self, name: &str) -> Result { + /// Otherwise, it will return [CacheError::NotFound]. + pub fn get_track(&self, name: &str) -> Result { let state = self.state.lock(); if let Some(track) = state.get(name)? { return Ok(track); @@ -183,6 +181,11 @@ impl Subscriber { // Request a new track if it does not exist. state.into_mut().request(name) } + + /// Return if the broadcast is closed, either because the publisher was dropped or called [Publisher::close]. + pub fn closed(&self) -> Option { + self.state.lock().closed.as_ref().err().cloned() + } } impl fmt::Debug for Subscriber { @@ -206,6 +209,6 @@ impl Dropped { impl Drop for Dropped { fn drop(&mut self) { - self.state.lock_mut().close(Error::Closed).ok(); + self.state.lock_mut().close(CacheError::Closed).ok(); } } diff --git a/moq-transport/src/cache/error.rs b/moq-transport/src/cache/error.rs new file mode 100644 index 0000000..99de101 --- /dev/null +++ b/moq-transport/src/cache/error.rs @@ -0,0 +1,51 @@ +use thiserror::Error; + +use crate::MoqError; + +#[derive(Clone, Debug, Error)] +pub enum CacheError { + /// A clean termination, represented as error code 0. + /// This error is automatically used when publishers or subscribers are dropped without calling close. + #[error("closed")] + Closed, + + /// An ANNOUNCE_RESET or SUBSCRIBE_RESET was sent by the publisher. + #[error("reset code={0:?}")] + Reset(u32), + + /// An ANNOUNCE_STOP or SUBSCRIBE_STOP was sent by the subscriber. + #[error("stop")] + Stop, + + /// The requested resource was not found. + #[error("not found")] + NotFound, + + /// A resource already exists with that ID. + #[error("duplicate")] + Duplicate, +} + +impl MoqError for CacheError { + /// An integer code that is sent over the wire. + fn code(&self) -> u32 { + match self { + Self::Closed => 0, + Self::Reset(code) => *code, + Self::Stop => 206, + Self::NotFound => 404, + Self::Duplicate => 409, + } + } + + /// A reason that is sent over the wire. + fn reason(&self) -> &str { + match self { + Self::Closed => "closed", + Self::Reset(_) => "reset", + Self::Stop => "stop", + Self::NotFound => "not found", + Self::Duplicate => "duplicate", + } + } +} diff --git a/moq-transport/src/model/mod.rs b/moq-transport/src/cache/mod.rs similarity index 92% rename from moq-transport/src/model/mod.rs rename to moq-transport/src/cache/mod.rs index aa56585..175deed 100644 --- a/moq-transport/src/model/mod.rs +++ b/moq-transport/src/cache/mod.rs @@ -4,8 +4,11 @@ //! The hierarchy is: [broadcast] -> [track] -> [segment] -> [Bytes](bytes::Bytes) pub mod broadcast; +mod error; pub mod segment; pub mod track; pub(crate) mod watch; pub(crate) use watch::*; + +pub use error::*; diff --git a/moq-transport/src/model/segment.rs b/moq-transport/src/cache/segment.rs similarity index 88% rename from moq-transport/src/model/segment.rs rename to moq-transport/src/cache/segment.rs index b338ef7..1a25c19 100644 --- a/moq-transport/src/model/segment.rs +++ b/moq-transport/src/cache/segment.rs @@ -7,14 +7,14 @@ //! These chunks are returned directly from the QUIC connection, so they may be of any size or position. //! A closed [Subscriber] will receive a copy of all future chunks. (fanout) //! -//! The segment is closed with [Error::Closed] when all publishers or subscribers are dropped. +//! The segment is closed with [CacheError::Closed] when all publishers or subscribers are dropped. use core::fmt; use std::{ops::Deref, sync::Arc, time}; -use crate::{Error, VarInt}; +use crate::VarInt; use bytes::Bytes; -use super::Watch; +use super::{CacheError, Watch}; /// Create a new segment with the given info. pub fn new(info: Info) -> (Publisher, Subscriber) { @@ -45,11 +45,11 @@ struct State { data: Vec, // Set when the publisher is dropped. - closed: Result<(), Error>, + closed: Result<(), CacheError>, } impl State { - pub fn close(&mut self, err: Error) -> Result<(), Error> { + pub fn close(&mut self, err: CacheError) -> Result<(), CacheError> { self.closed.clone()?; self.closed = Err(err); Ok(()) @@ -97,7 +97,7 @@ impl Publisher { } /// Write a new chunk of bytes. - pub fn write_chunk(&mut self, data: Bytes) -> Result<(), Error> { + pub fn write_chunk(&mut self, data: Bytes) -> Result<(), CacheError> { let mut state = self.state.lock_mut(); state.closed.clone()?; state.data.push(data); @@ -105,7 +105,7 @@ impl Publisher { } /// Close the segment with an error. - pub fn close(self, err: Error) -> Result<(), Error> { + pub fn close(self, err: CacheError) -> Result<(), CacheError> { self.state.lock_mut().close(err) } } @@ -157,7 +157,7 @@ impl Subscriber { } /// Block until the next chunk of bytes is available. - pub async fn read_chunk(&mut self) -> Result, Error> { + pub async fn read_chunk(&mut self) -> Result, CacheError> { loop { let notify = { let state = self.state.lock(); @@ -168,7 +168,7 @@ impl Subscriber { } match &state.closed { - Err(Error::Closed) => return Ok(None), + Err(CacheError::Closed) => return Ok(None), Err(err) => return Err(err.clone()), Ok(()) => state.changed(), } @@ -210,6 +210,6 @@ impl Dropped { impl Drop for Dropped { fn drop(&mut self) { - self.state.lock_mut().close(Error::Closed).ok(); + self.state.lock_mut().close(CacheError::Closed).ok(); } } diff --git a/moq-transport/src/model/track.rs b/moq-transport/src/cache/track.rs similarity index 92% rename from moq-transport/src/model/track.rs rename to moq-transport/src/cache/track.rs index 9b146f5..109a011 100644 --- a/moq-transport/src/model/track.rs +++ b/moq-transport/src/cache/track.rs @@ -10,14 +10,14 @@ //! Segments will be cached for a potentially limited duration added to the unreliable nature. //! A cloned [Subscriber] will receive a copy of all new segment going forward (fanout). //! -//! The track is closed with [Error::Closed] when all publishers or subscribers are dropped. +//! The track is closed with [CacheError::Closed] when all publishers or subscribers are dropped. use std::{collections::BinaryHeap, fmt, ops::Deref, sync::Arc, time}; use indexmap::IndexMap; -use super::{segment, Watch}; -use crate::{Error, VarInt}; +use super::{segment, CacheError, Watch}; +use crate::VarInt; /// Create a track with the given name. pub fn new(name: &str) -> (Publisher, Subscriber) { @@ -49,21 +49,21 @@ struct State { pruned: usize, // Set when the publisher is closed/dropped, or all subscribers are dropped. - closed: Result<(), Error>, + closed: Result<(), CacheError>, } impl State { - pub fn close(&mut self, err: Error) -> Result<(), Error> { + pub fn close(&mut self, err: CacheError) -> Result<(), CacheError> { self.closed.clone()?; self.closed = Err(err); Ok(()) } - pub fn insert(&mut self, segment: segment::Subscriber) -> Result<(), Error> { + pub fn insert(&mut self, segment: segment::Subscriber) -> Result<(), CacheError> { self.closed.clone()?; let entry = match self.lookup.entry(segment.sequence) { - indexmap::map::Entry::Occupied(_entry) => return Err(Error::Duplicate), + indexmap::map::Entry::Occupied(_entry) => return Err(CacheError::Duplicate), indexmap::map::Entry::Vacant(entry) => entry, }; @@ -144,19 +144,19 @@ impl Publisher { } /// Insert a new segment. - pub fn insert_segment(&mut self, segment: segment::Subscriber) -> Result<(), Error> { + pub fn insert_segment(&mut self, segment: segment::Subscriber) -> Result<(), CacheError> { self.state.lock_mut().insert(segment) } /// Create an insert a segment with the given info. - pub fn create_segment(&mut self, info: segment::Info) -> Result { + pub fn create_segment(&mut self, info: segment::Info) -> Result { let (publisher, subscriber) = segment::new(info); self.insert_segment(subscriber)?; Ok(publisher) } /// Close the segment with an error. - pub fn close(self, err: Error) -> Result<(), Error> { + pub fn close(self, err: CacheError) -> Result<(), CacheError> { self.state.lock_mut().close(err) } } @@ -206,8 +206,8 @@ impl Subscriber { } } - /// Block until the next segment arrives, or return None if the track is [Error::Closed]. - pub async fn next_segment(&mut self) -> Result, Error> { + /// Block until the next segment arrives, or return None if the track is [CacheError::Closed]. + pub async fn next_segment(&mut self) -> Result, CacheError> { loop { let notify = { let state = self.state.lock(); @@ -237,7 +237,7 @@ impl Subscriber { // Otherwise check if we need to return an error. match &state.closed { - Err(Error::Closed) => return Ok(None), + Err(CacheError::Closed) => return Ok(None), Err(err) => return Err(err.clone()), Ok(()) => state.changed(), } @@ -279,7 +279,7 @@ impl Dropped { impl Drop for Dropped { fn drop(&mut self) { - self.state.lock_mut().close(Error::Closed).ok(); + self.state.lock_mut().close(CacheError::Closed).ok(); } } diff --git a/moq-transport/src/model/watch.rs b/moq-transport/src/cache/watch.rs similarity index 100% rename from moq-transport/src/model/watch.rs rename to moq-transport/src/cache/watch.rs diff --git a/moq-transport/src/coding/params.rs b/moq-transport/src/coding/params.rs new file mode 100644 index 0000000..7ee29d2 --- /dev/null +++ b/moq-transport/src/coding/params.rs @@ -0,0 +1,69 @@ +use std::cmp::min; + +use crate::VarInt; + +use super::{AsyncRead, AsyncWrite, DecodeError, EncodeError}; +use tokio::io::AsyncReadExt; + +// I hate this parameter encoding so much. +// i hate it i hate it i hate it + +// TODO Use #[async_trait] so we can do Param instead. +pub struct ParamInt(pub VarInt); + +impl ParamInt { + pub async fn decode(r: &mut R) -> Result { + // Why do we have a redundant size in front of each VarInt? + let size = VarInt::decode(r).await?; + let mut take = r.take(size.into_inner()); + let value = VarInt::decode(&mut take).await?; + + // Like seriously why do I have to check if the VarInt length mismatches. + if take.limit() != 0 { + return Err(DecodeError::InvalidSize); + } + + Ok(Self(value)) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + // Seriously why do I have to compute the size. + let size = self.0.size(); + VarInt::try_from(size)?.encode(w).await?; + + self.0.encode(w).await?; + + Ok(()) + } +} + +pub struct ParamBytes(pub Vec); + +impl ParamBytes { + pub async fn decode(r: &mut R) -> Result { + let size = VarInt::decode(r).await?; + let mut take = r.take(size.into_inner()); + let mut buf = Vec::with_capacity(min(take.limit() as usize, 1024)); + take.read_to_end(&mut buf).await?; + + Ok(Self(buf)) + } + + pub async fn encode(&self, w: &mut W) -> Result<(), EncodeError> { + let size = VarInt::try_from(self.0.len())?; + size.encode(w).await?; + w.write_all(&self.0).await?; + + Ok(()) + } +} + +pub struct ParamUnknown {} + +impl ParamUnknown { + pub async fn decode(r: &mut R) -> Result<(), DecodeError> { + // Really? Is there no way to advance without reading? + ParamBytes::decode(r).await?; + Ok(()) + } +} diff --git a/moq-transport/src/error.rs b/moq-transport/src/error.rs index 56238b5..802caff 100644 --- a/moq-transport/src/error.rs +++ b/moq-transport/src/error.rs @@ -1,82 +1,5 @@ -use thiserror::Error; - -use crate::VarInt; - -/// A MoQTransport error with an associated error code. -#[derive(Clone, Debug, Error)] -pub enum Error { - /// A clean termination, represented as error code 0. - /// This error is automatically used when publishers or subscribers are dropped without calling close. - #[error("closed")] - Closed, - - /// A session error occured. - #[error("session error: {0}")] - Session(#[from] webtransport_quinn::SessionError), - - /// An ANNOUNCE_RESET or SUBSCRIBE_RESET was sent by the publisher. - #[error("reset code={0:?}")] - Reset(u32), - - /// An ANNOUNCE_STOP or SUBSCRIBE_STOP was sent by the subscriber. - #[error("stop")] - Stop, - - /// The requested resource was not found. - #[error("not found")] - NotFound, - - /// A resource already exists with that ID. - #[error("duplicate")] - Duplicate, - - /// The role negiotiated in the handshake was violated. For example, a publisher sent a SUBSCRIBE, or a subscriber sent an OBJECT. - #[error("role violation: msg={0}")] - Role(VarInt), - - /// An error occured while reading from the QUIC stream. - #[error("failed to read from stream: {0}")] - Read(#[from] webtransport_quinn::ReadError), - - /// An error occured while writing to the QUIC stream. - #[error("failed to write to stream: {0}")] - Write(#[from] webtransport_quinn::WriteError), - - /// An unclassified error because I'm lazy. TODO classify these errors - #[error("unknown error: {0}")] - Unknown(String), -} - -impl Error { +pub trait MoqError { /// An integer code that is sent over the wire. - pub fn code(&self) -> u32 { - match self { - Self::Closed => 0, - Self::Reset(code) => *code, - Self::Stop => 206, - Self::NotFound => 404, - Self::Role(_) => 405, - Self::Duplicate => 409, - Self::Unknown(_) => 500, - Self::Write(_) => 501, - Self::Read(_) => 502, - Self::Session(_) => 503, - } - } - - /// A reason that is sent over the wire. - pub fn reason(&self) -> &str { - match self { - Self::Closed => "closed", - Self::Reset(_) => "reset", - Self::Stop => "stop", - Self::NotFound => "not found", - Self::Duplicate => "duplicate", - Self::Role(_) => "role violation", - Self::Read(_) => "read error", - Self::Write(_) => "write error", - Self::Session(_) => "session error", - Self::Unknown(_) => "unknown", - } - } + fn code(&self) -> u32; + fn reason(&self) -> &str; } diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index 4516ea8..bbde1d8 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -11,10 +11,10 @@ mod coding; mod error; +pub mod cache; pub mod message; -pub mod model; pub mod session; pub mod setup; pub use coding::VarInt; -pub use error::*; +pub use error::MoqError; diff --git a/moq-transport/src/session/client.rs b/moq-transport/src/session/client.rs index c9ceffc..c9b26c1 100644 --- a/moq-transport/src/session/client.rs +++ b/moq-transport/src/session/client.rs @@ -1,15 +1,13 @@ -use super::{Publisher, Subscriber}; -use crate::{model::broadcast, setup}; +use super::{Publisher, SessionError, Subscriber}; +use crate::{cache::broadcast, setup}; use webtransport_quinn::{RecvStream, SendStream, Session}; -use anyhow::Context; - /// An endpoint that connects to a URL to publish and/or consume live streams. pub struct Client {} impl Client { /// Connect using an established WebTransport session, performing the MoQ handshake as a publisher. - pub async fn publisher(session: Session, source: broadcast::Subscriber) -> anyhow::Result { + pub async fn publisher(session: Session, source: broadcast::Subscriber) -> Result { let control = Self::send_setup(&session, setup::Role::Publisher).await?; let publisher = Publisher::new(session, control, source); @@ -17,7 +15,7 @@ impl Client { } /// Connect using an established WebTransport session, performing the MoQ handshake as a subscriber. - pub async fn subscriber(session: Session, source: broadcast::Publisher) -> anyhow::Result { + pub async fn subscriber(session: Session, source: broadcast::Publisher) -> Result { let control = Self::send_setup(&session, setup::Role::Subscriber).await?; let subscriber = Subscriber::new(session, control, source); @@ -31,30 +29,25 @@ impl Client { } */ - async fn send_setup(session: &Session, role: setup::Role) -> anyhow::Result<(SendStream, RecvStream)> { - let mut control = session.open_bi().await.context("failed to oen bidi stream")?; + async fn send_setup(session: &Session, role: setup::Role) -> Result<(SendStream, RecvStream), SessionError> { + let mut control = session.open_bi().await?; let client = setup::Client { role, versions: vec![setup::Version::KIXEL_00].into(), }; - client - .encode(&mut control.0) - .await - .context("failed to send SETUP CLIENT")?; + client.encode(&mut control.0).await?; - let server = setup::Server::decode(&mut control.1) - .await - .context("failed to read SETUP")?; + let server = setup::Server::decode(&mut control.1).await?; if server.version != setup::Version::KIXEL_00 { - anyhow::bail!("unsupported version: {:?}", server.version); + return Err(SessionError::Version(Some(server.version))); } // Make sure the server replied with the if !client.role.is_compatible(server.role) { - anyhow::bail!("incompatible roles: client={:?} server={:?}", client.role, server.role); + return Err(SessionError::RoleIncompatible(client.role, server.role)); } Ok(control) diff --git a/moq-transport/src/session/control.rs b/moq-transport/src/session/control.rs index b981553..7c84e80 100644 --- a/moq-transport/src/session/control.rs +++ b/moq-transport/src/session/control.rs @@ -5,7 +5,8 @@ use std::{fmt, sync::Arc}; use tokio::sync::Mutex; use webtransport_quinn::{RecvStream, SendStream}; -use crate::{message::Message, Error}; +use super::SessionError; +use crate::message::Message; #[derive(Debug, Clone)] pub(crate) struct Control { @@ -21,22 +22,22 @@ impl Control { } } - pub async fn send + fmt::Debug>(&self, msg: T) -> Result<(), Error> { + pub async fn send + fmt::Debug>(&self, msg: T) -> Result<(), SessionError> { let mut stream = self.send.lock().await; log::info!("sending message: {:?}", msg); msg.into() .encode(&mut *stream) .await - .map_err(|e| Error::Unknown(e.to_string()))?; + .map_err(|e| SessionError::Unknown(e.to_string()))?; Ok(()) } // It's likely a mistake to call this from two different tasks, but it's easier to just support it. - pub async fn recv(&self) -> Result { + pub async fn recv(&self) -> Result { let mut stream = self.recv.lock().await; let msg = Message::decode(&mut *stream) .await - .map_err(|e| Error::Unknown(e.to_string()))?; + .map_err(|e| SessionError::Unknown(e.to_string()))?; Ok(msg) } } diff --git a/moq-transport/src/session/error.rs b/moq-transport/src/session/error.rs new file mode 100644 index 0000000..d940d0c --- /dev/null +++ b/moq-transport/src/session/error.rs @@ -0,0 +1,72 @@ +use crate::{cache, coding, setup, MoqError, VarInt}; + +#[derive(thiserror::Error, Debug)] +pub enum SessionError { + #[error("webtransport error: {0}")] + Session(#[from] webtransport_quinn::SessionError), + + #[error("cache error: {0}")] + Cache(#[from] cache::CacheError), + + #[error("encode error: {0}")] + Encode(#[from] coding::EncodeError), + + #[error("decode error: {0}")] + Decode(#[from] coding::DecodeError), + + #[error("unsupported version: {0:?}")] + Version(Option), + + #[error("incompatible roles: client={0:?} server={1:?}")] + RoleIncompatible(setup::Role, setup::Role), + + /// An error occured while reading from the QUIC stream. + #[error("failed to read from stream: {0}")] + Read(#[from] webtransport_quinn::ReadError), + + /// An error occured while writing to the QUIC stream. + #[error("failed to write to stream: {0}")] + Write(#[from] webtransport_quinn::WriteError), + + /// The role negiotiated in the handshake was violated. For example, a publisher sent a SUBSCRIBE, or a subscriber sent an OBJECT. + #[error("role violation: msg={0}")] + RoleViolation(VarInt), + + /// An unclassified error because I'm lazy. TODO classify these errors + #[error("unknown error: {0}")] + Unknown(String), +} + +impl MoqError for SessionError { + /// An integer code that is sent over the wire. + fn code(&self) -> u32 { + match self { + Self::Cache(err) => err.code(), + Self::RoleIncompatible(..) => 406, + Self::RoleViolation(..) => 405, + Self::Unknown(_) => 500, + Self::Write(_) => 501, + Self::Read(_) => 502, + Self::Session(_) => 503, + Self::Version(_) => 406, + Self::Encode(_) => 500, + Self::Decode(_) => 500, + } + } + + /// A reason that is sent over the wire. + fn reason(&self) -> &str { + match self { + Self::Cache(err) => err.reason(), + Self::RoleViolation(_) => "role violation", + Self::RoleIncompatible(..) => "role incompatible", + Self::Read(_) => "read error", + Self::Write(_) => "write error", + Self::Session(_) => "session error", + Self::Unknown(_) => "unknown", + Self::Version(_) => "unsupported version", + Self::Encode(_) => "encode error", + Self::Decode(_) => "decode error", + } + } +} diff --git a/moq-transport/src/session/mod.rs b/moq-transport/src/session/mod.rs index d6e7ca2..50b36a9 100644 --- a/moq-transport/src/session/mod.rs +++ b/moq-transport/src/session/mod.rs @@ -14,12 +14,14 @@ mod client; mod control; +mod error; mod publisher; mod server; mod subscriber; pub use client::*; pub(crate) use control::*; +pub use error::*; pub use publisher::*; pub use server::*; pub use subscriber::*; diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index d277447..4ae3c60 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -7,13 +7,13 @@ use tokio::task::AbortHandle; use webtransport_quinn::{RecvStream, SendStream, Session}; use crate::{ + cache::{broadcast, segment, track, CacheError}, message, message::Message, - model::{broadcast, segment, track}, - Error, VarInt, + MoqError, VarInt, }; -use super::Control; +use super::{Control, SessionError}; /// Serves broadcasts over the network, automatically handling subscriptions and caching. // TODO Clone specific fields when a task actually needs it. @@ -39,16 +39,30 @@ impl Publisher { } // TODO Serve a broadcast without sending an ANNOUNCE. - // fn serve(&mut self, broadcast: broadcast::Subscriber) -> Result<(), Error> { + // fn serve(&mut self, broadcast: broadcast::Subscriber) -> Result<(), SessionError> { // TODO Wait until the next subscribe that doesn't route to an ANNOUNCE. - // pub async fn subscribed(&mut self) -> Result { + // pub async fn subscribed(&mut self) -> Result { - pub async fn run(mut self) -> Result<(), Error> { + pub async fn run(mut self) -> Result<(), SessionError> { + let res = self.run_inner().await; + + // Terminate all active subscribes on error. + self.subscribes + .lock() + .unwrap() + .drain() + .for_each(|(_, abort)| abort.abort()); + + res + } + + pub async fn run_inner(&mut self) -> Result<(), SessionError> { loop { tokio::select! { - _stream = self.webtransport.accept_uni() => { - return Err(Error::Role(VarInt::ZERO)); + stream = self.webtransport.accept_uni() => { + stream?; + return Err(SessionError::RoleViolation(VarInt::ZERO)); } // NOTE: this is not cancel safe, but it's fine since the other branch is a fatal error. msg = self.control.recv() => { @@ -63,27 +77,27 @@ impl Publisher { } } - async fn recv_message(&mut self, msg: &Message) -> Result<(), Error> { + async fn recv_message(&mut self, msg: &Message) -> Result<(), SessionError> { match msg { Message::AnnounceOk(msg) => self.recv_announce_ok(msg).await, Message::AnnounceStop(msg) => self.recv_announce_stop(msg).await, Message::Subscribe(msg) => self.recv_subscribe(msg).await, Message::SubscribeStop(msg) => self.recv_subscribe_stop(msg).await, - _ => Err(Error::Role(msg.id())), + _ => Err(SessionError::RoleViolation(msg.id())), } } - async fn recv_announce_ok(&mut self, _msg: &message::AnnounceOk) -> Result<(), Error> { + async fn recv_announce_ok(&mut self, _msg: &message::AnnounceOk) -> Result<(), SessionError> { // We didn't send an announce. - Err(Error::NotFound) + Err(CacheError::NotFound.into()) } - async fn recv_announce_stop(&mut self, _msg: &message::AnnounceStop) -> Result<(), Error> { + async fn recv_announce_stop(&mut self, _msg: &message::AnnounceStop) -> Result<(), SessionError> { // We didn't send an announce. - Err(Error::NotFound) + Err(CacheError::NotFound.into()) } - async fn recv_subscribe(&mut self, msg: &message::Subscribe) -> Result<(), Error> { + async fn recv_subscribe(&mut self, msg: &message::Subscribe) -> Result<(), SessionError> { // Assume that the subscribe ID is unique for now. let abort = match self.start_subscribe(msg.clone()) { Ok(abort) => abort, @@ -92,14 +106,14 @@ impl Publisher { // Insert the abort handle into the lookup table. match self.subscribes.lock().unwrap().entry(msg.id) { - hash_map::Entry::Occupied(_) => return Err(Error::Duplicate), // TODO fatal, because we already started the task + hash_map::Entry::Occupied(_) => return Err(CacheError::Duplicate.into()), // TODO fatal, because we already started the task hash_map::Entry::Vacant(entry) => entry.insert(abort), }; self.control.send(message::SubscribeOk { id: msg.id }).await } - async fn reset_subscribe(&mut self, id: VarInt, err: Error) -> Result<(), Error> { + async fn reset_subscribe(&mut self, id: VarInt, err: E) -> Result<(), SessionError> { let msg = message::SubscribeReset { id, code: err.code(), @@ -109,10 +123,10 @@ impl Publisher { self.control.send(msg).await } - fn start_subscribe(&mut self, msg: message::Subscribe) -> Result { + fn start_subscribe(&mut self, msg: message::Subscribe) -> Result { // We currently don't use the namespace field in SUBSCRIBE if !msg.namespace.is_empty() { - return Err(Error::NotFound); + return Err(CacheError::NotFound.into()); } let mut track = self.source.get_track(&msg.name)?; @@ -125,11 +139,11 @@ impl Publisher { let res = this.run_subscribe(msg.id, &mut track).await; if let Err(err) = &res { - log::warn!("failed to serve track: name={} err={:?}", track.name, err); + log::warn!("failed to serve track: name={} err={:#?}", track.name, err); } // Make sure we send a reset at the end. - let err = res.err().unwrap_or(Error::Closed); + let err = res.err().unwrap_or(CacheError::Closed.into()); this.reset_subscribe(msg.id, err).await.ok(); // We're all done, so clean up the abort handle. @@ -139,7 +153,7 @@ impl Publisher { Ok(handle.abort_handle()) } - async fn run_subscribe(&self, id: VarInt, track: &mut track::Subscriber) -> Result<(), Error> { + async fn run_subscribe(&self, id: VarInt, track: &mut track::Subscriber) -> Result<(), SessionError> { // TODO add an Ok method to track::Publisher so we can send SUBSCRIBE_OK while let Some(mut segment) = track.next_segment().await? { @@ -156,7 +170,7 @@ impl Publisher { Ok(()) } - async fn run_segment(&self, id: VarInt, segment: &mut segment::Subscriber) -> Result<(), Error> { + async fn run_segment(&self, id: VarInt, segment: &mut segment::Subscriber) -> Result<(), SessionError> { let object = message::Object { track: id, sequence: segment.sequence, @@ -172,7 +186,7 @@ impl Publisher { object .encode(&mut stream) .await - .map_err(|e| Error::Unknown(e.to_string()))?; + .map_err(|e| SessionError::Unknown(e.to_string()))?; while let Some(data) = segment.read_chunk().await? { stream.write_chunk(data).await?; @@ -181,10 +195,15 @@ impl Publisher { Ok(()) } - async fn recv_subscribe_stop(&mut self, msg: &message::SubscribeStop) -> Result<(), Error> { - let abort = self.subscribes.lock().unwrap().remove(&msg.id).ok_or(Error::NotFound)?; + async fn recv_subscribe_stop(&mut self, msg: &message::SubscribeStop) -> Result<(), SessionError> { + let abort = self + .subscribes + .lock() + .unwrap() + .remove(&msg.id) + .ok_or(CacheError::NotFound)?; abort.abort(); - self.reset_subscribe(msg.id, Error::Stop).await + self.reset_subscribe(msg.id, CacheError::Stop).await } } diff --git a/moq-transport/src/session/server.rs b/moq-transport/src/session/server.rs index b571209..0c5205f 100644 --- a/moq-transport/src/session/server.rs +++ b/moq-transport/src/session/server.rs @@ -1,10 +1,8 @@ -use super::{Publisher, Subscriber}; -use crate::{model::broadcast, setup}; +use super::{Publisher, SessionError, Subscriber}; +use crate::{cache::broadcast, setup}; use webtransport_quinn::{RecvStream, SendStream, Session}; -use anyhow::Context; - /// An endpoint that accepts connections, publishing and/or consuming live streams. pub struct Server {} @@ -12,18 +10,16 @@ impl Server { /// Accept an established Webtransport session, performing the MoQ handshake. /// /// This returns a [Request] half-way through the handshake that allows the application to accept or deny the session. - pub async fn accept(session: Session) -> anyhow::Result { - let mut control = session.accept_bi().await.context("failed to accept bidi stream")?; + pub async fn accept(session: Session) -> Result { + let mut control = session.accept_bi().await?; - let client = setup::Client::decode(&mut control.1) - .await - .context("failed to read CLIENT SETUP")?; + let client = setup::Client::decode(&mut control.1).await?; client .versions .iter() .find(|version| **version == setup::Version::KIXEL_00) - .context("no supported versions")?; + .ok_or_else(|| SessionError::Version(client.versions.last().cloned()))?; Ok(Request { session, @@ -42,7 +38,7 @@ pub struct Request { impl Request { /// Accept the session as a publisher, using the provided broadcast to serve subscriptions. - pub async fn publisher(mut self, source: broadcast::Subscriber) -> anyhow::Result { + pub async fn publisher(mut self, source: broadcast::Subscriber) -> Result { self.send_setup(setup::Role::Publisher).await?; let publisher = Publisher::new(self.session, self.control, source); @@ -50,7 +46,7 @@ impl Request { } /// Accept the session as a subscriber only. - pub async fn subscriber(mut self, source: broadcast::Publisher) -> anyhow::Result { + pub async fn subscriber(mut self, source: broadcast::Publisher) -> Result { self.send_setup(setup::Role::Subscriber).await?; let subscriber = Subscriber::new(self.session, self.control, source); @@ -64,7 +60,7 @@ impl Request { } */ - async fn send_setup(&mut self, role: setup::Role) -> anyhow::Result<()> { + async fn send_setup(&mut self, role: setup::Role) -> Result<(), SessionError> { let server = setup::Server { role, version: setup::Version::KIXEL_00, @@ -73,17 +69,10 @@ impl Request { // We need to sure we support the opposite of the client's role. // ex. if the client is a publisher, we must be a subscriber ONLY. if !self.client.role.is_compatible(server.role) { - anyhow::bail!( - "incompatible roles: client={:?} server={:?}", - self.client.role, - server.role - ); + return Err(SessionError::RoleIncompatible(self.client.role, server.role)); } - server - .encode(&mut self.control.0) - .await - .context("failed to send setup server")?; + server.encode(&mut self.control.0).await?; Ok(()) } diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index 2accd6e..3148c2b 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -6,14 +6,13 @@ use std::{ }; use crate::{ + cache::{broadcast, segment, track, CacheError}, message, message::Message, - model::{broadcast, segment, track}, - Error, VarInt, + session::{Control, SessionError}, + VarInt, }; -use super::Control; - /// Receives broadcasts over the network, automatically handling subscriptions and caching. // TODO Clone specific fields when a task actually needs it. #[derive(Clone, Debug)] @@ -47,7 +46,7 @@ impl Subscriber { } } - pub async fn run(self) -> Result<(), Error> { + pub async fn run(self) -> Result<(), SessionError> { let inbound = self.clone().run_inbound(); let streams = self.clone().run_streams(); let source = self.clone().run_source(); @@ -60,7 +59,7 @@ impl Subscriber { } } - async fn run_inbound(mut self) -> Result<(), Error> { + async fn run_inbound(mut self) -> Result<(), SessionError> { loop { let msg = self.control.recv().await?; @@ -71,28 +70,28 @@ impl Subscriber { } } - async fn recv_message(&mut self, msg: &Message) -> Result<(), Error> { + async fn recv_message(&mut self, msg: &Message) -> Result<(), SessionError> { match msg { Message::Announce(_) => Ok(()), // don't care Message::AnnounceReset(_) => Ok(()), // also don't care Message::SubscribeOk(_) => Ok(()), // guess what, don't care Message::SubscribeReset(msg) => self.recv_subscribe_reset(msg).await, Message::GoAway(_msg) => unimplemented!("GOAWAY"), - _ => Err(Error::Role(msg.id())), + _ => Err(SessionError::RoleViolation(msg.id())), } } - async fn recv_subscribe_reset(&mut self, msg: &message::SubscribeReset) -> Result<(), Error> { - let err = Error::Reset(msg.code); + async fn recv_subscribe_reset(&mut self, msg: &message::SubscribeReset) -> Result<(), SessionError> { + let err = CacheError::Reset(msg.code); let mut subscribes = self.subscribes.lock().unwrap(); - let subscribe = subscribes.remove(&msg.id).ok_or(Error::NotFound)?; + let subscribe = subscribes.remove(&msg.id).ok_or(CacheError::NotFound)?; subscribe.close(err)?; Ok(()) } - async fn run_streams(self) -> Result<(), Error> { + async fn run_streams(self) -> Result<(), SessionError> { loop { // Accept all incoming unidirectional streams. let stream = self.webtransport.accept_uni().await?; @@ -100,24 +99,24 @@ impl Subscriber { tokio::spawn(async move { if let Err(err) = this.run_stream(stream).await { - log::warn!("failed to receive stream: err={:?}", err); + log::warn!("failed to receive stream: err={:#?}", err); } }); } } - async fn run_stream(self, mut stream: RecvStream) -> Result<(), Error> { + async fn run_stream(self, mut stream: RecvStream) -> Result<(), SessionError> { // Decode the object on the data stream. let object = message::Object::decode(&mut stream) .await - .map_err(|e| Error::Unknown(e.to_string()))?; + .map_err(|e| SessionError::Unknown(e.to_string()))?; log::debug!("received object: {:?}", object); // A new scope is needed because the async compiler is dumb let mut publisher = { let mut subscribes = self.subscribes.lock().unwrap(); - let track = subscribes.get_mut(&object.track).ok_or(Error::NotFound)?; + let track = subscribes.get_mut(&object.track).ok_or(CacheError::NotFound)?; track.create_segment(segment::Info { sequence: object.sequence, @@ -127,13 +126,15 @@ impl Subscriber { }; while let Some(data) = stream.read_chunk(usize::MAX, true).await? { + // NOTE: This does not make a copy! + // Bytes are immutable and ref counted. publisher.write_chunk(data.bytes)?; } Ok(()) } - async fn run_source(mut self) -> Result<(), Error> { + async fn run_source(mut self) -> Result<(), SessionError> { while let Some(track) = self.source.next_track().await? { let name = track.name.clone();