Implement (forked) moq-transport-00 (#34)

Not backwards compatible.

JS side: https://github.com/kixelated/moq-js/pull/14
This commit is contained in:
kixelated 2023-06-16 11:38:19 -07:00 committed by GitHub
parent 9c17146746
commit d7872ef77d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 3673 additions and 455 deletions

376
Cargo.lock generated
View File

@ -4,9 +4,9 @@ version = 3
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
version = "1.0.1" version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67fc08ce920c31afb70f013dcce1bfc3a3195de6a228474e45e1f145b36f8d04" checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41"
dependencies = [ dependencies = [
"memchr", "memchr",
] ]
@ -47,7 +47,7 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b" checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b"
dependencies = [ dependencies = [
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
@ -57,7 +57,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188"
dependencies = [ dependencies = [
"anstyle", "anstyle",
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
@ -66,6 +66,17 @@ version = "1.0.71"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8"
[[package]]
name = "async-trait"
version = "0.1.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -91,9 +102,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.21.1" version = "0.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f1e31e207a6b8fb791a38ea3105e6cb541f55e4d029902d3039a4ad07cc4105" checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
@ -142,9 +153,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.3.0" version = "4.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93aae7a4192245f70fe75dd9157fc7b4a5bf53e88d30bd4396f7d8f9284d5acc" checksum = "80672091db20273a15cf9fdd4e47ed43b5091ec9841bf4c6145c9dfbbcae09ed"
dependencies = [ dependencies = [
"clap_builder", "clap_builder",
"clap_derive", "clap_derive",
@ -153,9 +164,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_builder" name = "clap_builder"
version = "4.3.0" version = "4.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f423e341edefb78c9caba2d9c7f7687d0e72e89df3ce3394554754393ac3990" checksum = "c1458a1df40e1e2afebb7ab60ce55c1fa8f431146205aa5f4887e0b111c27636"
dependencies = [ dependencies = [
"anstream", "anstream",
"anstyle", "anstyle",
@ -166,9 +177,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_derive" name = "clap_derive"
version = "4.3.0" version = "4.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "191d9573962933b4027f932c600cd252ce27a8ad5979418fe78e43c07996f27b" checksum = "b8cd2b2a819ad6eec39e8f1d6b53001af1e5469f8c177579cdaeb313115b825f"
dependencies = [ dependencies = [
"heck", "heck",
"proc-macro2", "proc-macro2",
@ -190,9 +201,9 @@ checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]] [[package]]
name = "cpufeatures" name = "cpufeatures"
version = "0.2.7" version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e4c1eaa2012c47becbbad2ab175484c2a84d1185b566fb2cc5b8707343dfe58" checksum = "03e69e28e9f7f77debdedbaafa2866e1de9ba56df55a8bd7cfc724c25a09987c"
dependencies = [ dependencies = [
"libc", "libc",
] ]
@ -247,7 +258,7 @@ checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a"
dependencies = [ dependencies = [
"errno-dragonfly", "errno-dragonfly",
"libc", "libc",
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
@ -277,9 +288,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]] [[package]]
name = "form_urlencoded" name = "form_urlencoded"
version = "1.1.0" version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8" checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652"
dependencies = [ dependencies = [
"percent-encoding", "percent-encoding",
] ]
@ -385,9 +396,9 @@ dependencies = [
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.9" version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
@ -416,7 +427,21 @@ dependencies = [
[[package]] [[package]]
name = "h3" name = "h3"
version = "0.0.2" version = "0.0.2"
source = "git+https://github.com/security-union/h3?branch=add-webtransport#db5c723f653911a476bfd8ffcfebf0f8f2eb980d" source = "git+https://github.com/security-union/h3?branch=add-webtransport#fa956e0d44e66c04545741908fcb3690b0890be6"
dependencies = [
"bytes",
"fastrand",
"futures-util",
"http",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]]
name = "h3"
version = "0.0.2"
source = "git+https://github.com/hyperium/h3?branch=master#3ef7c1a37b635e8446322d8f8d3a68580a208ad8"
dependencies = [ dependencies = [
"bytes", "bytes",
"fastrand", "fastrand",
@ -430,11 +455,25 @@ dependencies = [
[[package]] [[package]]
name = "h3-quinn" name = "h3-quinn"
version = "0.0.2" version = "0.0.2"
source = "git+https://github.com/security-union/h3?branch=add-webtransport#db5c723f653911a476bfd8ffcfebf0f8f2eb980d" source = "git+https://github.com/security-union/h3?branch=add-webtransport#fa956e0d44e66c04545741908fcb3690b0890be6"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures", "futures",
"h3", "h3 0.0.2 (git+https://github.com/security-union/h3?branch=add-webtransport)",
"quinn",
"quinn-proto",
"tokio",
"tokio-util",
]
[[package]]
name = "h3-quinn"
version = "0.0.3"
source = "git+https://github.com/hyperium/h3?branch=master#3ef7c1a37b635e8446322d8f8d3a68580a208ad8"
dependencies = [
"bytes",
"futures",
"h3 0.0.2 (git+https://github.com/hyperium/h3?branch=master)",
"quinn", "quinn",
"quinn-proto", "quinn-proto",
"tokio", "tokio",
@ -444,11 +483,25 @@ dependencies = [
[[package]] [[package]]
name = "h3-webtransport" name = "h3-webtransport"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/security-union/h3?branch=add-webtransport#db5c723f653911a476bfd8ffcfebf0f8f2eb980d" source = "git+https://github.com/security-union/h3?branch=add-webtransport#fa956e0d44e66c04545741908fcb3690b0890be6"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-util", "futures-util",
"h3", "h3 0.0.2 (git+https://github.com/security-union/h3?branch=add-webtransport)",
"http",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]]
name = "h3-webtransport"
version = "0.1.0"
source = "git+https://github.com/hyperium/h3?branch=master#3ef7c1a37b635e8446322d8f8d3a68580a208ad8"
dependencies = [
"bytes",
"futures-util",
"h3 0.0.2 (git+https://github.com/hyperium/h3?branch=master)",
"http", "http",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
@ -588,9 +641,9 @@ dependencies = [
[[package]] [[package]]
name = "idna" name = "idna"
version = "0.3.0" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
dependencies = [ dependencies = [
"unicode-bidi", "unicode-bidi",
"unicode-normalization", "unicode-normalization",
@ -617,13 +670,13 @@ dependencies = [
[[package]] [[package]]
name = "io-lifetimes" name = "io-lifetimes"
version = "1.0.10" version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c66c74d2ae7e79a5a8f7ac924adbe38ee42a859c6539ad869eb51f0b52dc220" checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2"
dependencies = [ dependencies = [
"hermit-abi 0.3.1", "hermit-abi 0.3.1",
"libc", "libc",
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
@ -635,7 +688,7 @@ dependencies = [
"hermit-abi 0.3.1", "hermit-abi 0.3.1",
"io-lifetimes", "io-lifetimes",
"rustix", "rustix",
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
@ -646,18 +699,18 @@ checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6"
[[package]] [[package]]
name = "js-sys" name = "js-sys"
version = "0.3.63" version = "0.3.64"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f37a4a5928311ac501dee68b3c7613a1037d0edb30c8e5427bd832d55d1b790" checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a"
dependencies = [ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.144" version = "0.2.146"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1" checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b"
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
@ -667,9 +720,9 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
version = "0.4.9" version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"scopeguard", "scopeguard",
@ -677,12 +730,9 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.17" version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "memchr" name = "memchr"
@ -708,18 +758,41 @@ dependencies = [
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.8.6" version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
dependencies = [ dependencies = [
"libc", "libc",
"log",
"wasi", "wasi",
"windows-sys 0.45.0", "windows-sys",
] ]
[[package]] [[package]]
name = "moq" name = "moq-transport"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"clap",
"env_logger",
"futures",
"h3 0.0.2 (git+https://github.com/security-union/h3?branch=add-webtransport)",
"h3-quinn 0.0.2",
"h3-webtransport 0.1.0 (git+https://github.com/security-union/h3?branch=add-webtransport)",
"http",
"log",
"quinn",
"quinn-proto",
"ring",
"rustls 0.21.2",
"rustls-pemfile",
"thiserror",
"tokio",
]
[[package]]
name = "moq-warp"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
@ -727,19 +800,23 @@ dependencies = [
"clap", "clap",
"env_logger", "env_logger",
"futures", "futures",
"h3", "h3 0.0.2 (git+https://github.com/hyperium/h3?branch=master)",
"h3-quinn", "h3-quinn 0.0.3",
"h3-webtransport", "h3-webtransport 0.1.0 (git+https://github.com/hyperium/h3?branch=master)",
"hex", "hex",
"http", "http",
"log", "log",
"moq-transport",
"mp4", "mp4",
"paste",
"quinn", "quinn",
"quinn-proto",
"ring", "ring",
"rustls 0.21.1", "rustls 0.21.2",
"rustls-pemfile", "rustls-pemfile",
"serde", "serde",
"serde_json", "serde_json",
"thiserror",
"tokio", "tokio",
"warp", "warp",
] ]
@ -831,9 +908,9 @@ dependencies = [
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.17.1" version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
@ -847,22 +924,28 @@ dependencies = [
[[package]] [[package]]
name = "parking_lot_core" name = "parking_lot_core"
version = "0.9.7" version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521" checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"redox_syscall", "redox_syscall",
"smallvec", "smallvec",
"windows-sys 0.45.0", "windows-targets",
] ]
[[package]] [[package]]
name = "percent-encoding" name = "paste"
version = "2.2.0" version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" checksum = "9f746c4065a8fa3fe23974dd82f15431cc8d40779821001404d10d2e79ca7d79"
[[package]]
name = "percent-encoding"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]] [[package]]
name = "pin-project" name = "pin-project"
@ -904,9 +987,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.58" version = "1.0.60"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa1fb82fc0c281dd9671101b66b771ebbe1eaf967b96ac8740dcba4b70005ca8" checksum = "dec2b086b7a862cf4de201096214fa870344cf922b2b30c167badb3af3195406"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@ -923,7 +1006,7 @@ dependencies = [
"quinn-proto", "quinn-proto",
"quinn-udp", "quinn-udp",
"rustc-hash", "rustc-hash",
"rustls 0.21.1", "rustls 0.21.2",
"thiserror", "thiserror",
"tokio", "tokio",
"tracing", "tracing",
@ -939,7 +1022,7 @@ dependencies = [
"rand", "rand",
"ring", "ring",
"rustc-hash", "rustc-hash",
"rustls 0.21.1", "rustls 0.21.2",
"slab", "slab",
"thiserror", "thiserror",
"tinyvec", "tinyvec",
@ -956,14 +1039,14 @@ dependencies = [
"libc", "libc",
"socket2 0.5.3", "socket2 0.5.3",
"tracing", "tracing",
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.27" version = "1.0.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f4f29d145265ec1c483c7c654450edde0bfe043d3938d6972630663356d9500" checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@ -1000,18 +1083,18 @@ dependencies = [
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.16" version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
dependencies = [ dependencies = [
"bitflags", "bitflags",
] ]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.8.1" version = "1.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370" checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
@ -1020,9 +1103,9 @@ dependencies = [
[[package]] [[package]]
name = "regex-syntax" name = "regex-syntax"
version = "0.7.1" version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c" checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
[[package]] [[package]]
name = "ring" name = "ring"
@ -1047,16 +1130,16 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.37.19" version = "0.37.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acf8729d8542766f1b2cf77eb034d52f40d375bb8b615d0b147089946e16613d" checksum = "b96e891d04aa506a6d1f318d2771bcb1c7dfda84e126660ace067c9b474bb2c0"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"errno", "errno",
"io-lifetimes", "io-lifetimes",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
@ -1073,9 +1156,9 @@ dependencies = [
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.21.1" version = "0.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c911ba11bc8433e811ce56fde130ccf32f5127cab0e0194e9c68c5a5b671791e" checksum = "e32ca28af694bc1bbf399c33a516dbdf1c90090b8ab23c2bc24f834aa2247f5f"
dependencies = [ dependencies = [
"log", "log",
"ring", "ring",
@ -1089,7 +1172,7 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
dependencies = [ dependencies = [
"base64 0.21.1", "base64 0.21.2",
] ]
[[package]] [[package]]
@ -1132,18 +1215,18 @@ dependencies = [
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.163" version = "1.0.164"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2113ab51b87a539ae008b5c6c02dc020ffa39afd2d83cffcb3f4eb2722cebec2" checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.163" version = "1.0.164"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e" checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -1225,7 +1308,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
@ -1248,9 +1331,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.16" version = "2.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01" checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -1303,9 +1386,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.28.1" version = "1.28.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes", "bytes",
@ -1317,7 +1400,7 @@ dependencies = [
"signal-hook-registry", "signal-hook-registry",
"socket2 0.4.9", "socket2 0.4.9",
"tokio-macros", "tokio-macros",
"windows-sys 0.48.0", "windows-sys",
] ]
[[package]] [[package]]
@ -1466,9 +1549,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460"
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.8" version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0"
[[package]] [[package]]
name = "unicode-normalization" name = "unicode-normalization"
@ -1487,9 +1570,9 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]] [[package]]
name = "url" name = "url"
version = "2.3.1" version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb"
dependencies = [ dependencies = [
"form_urlencoded", "form_urlencoded",
"idna", "idna",
@ -1516,11 +1599,10 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]] [[package]]
name = "want" name = "want"
version = "0.3.0" version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e"
dependencies = [ dependencies = [
"log",
"try-lock", "try-lock",
] ]
@ -1564,9 +1646,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]] [[package]]
name = "wasm-bindgen" name = "wasm-bindgen"
version = "0.2.86" version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bba0e8cb82ba49ff4e229459ff22a191bbe9a1cb3a341610c9c33efc27ddf73" checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"wasm-bindgen-macro", "wasm-bindgen-macro",
@ -1574,9 +1656,9 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-backend" name = "wasm-bindgen-backend"
version = "0.2.86" version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b04bc93f9d6bdee709f6bd2118f57dd6679cf1176a1af464fca3ab0d66d8fb" checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd"
dependencies = [ dependencies = [
"bumpalo", "bumpalo",
"log", "log",
@ -1589,9 +1671,9 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-macro" name = "wasm-bindgen-macro"
version = "0.2.86" version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14d6b024f1a526bb0234f52840389927257beb670610081360e5a03c5df9c258" checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d"
dependencies = [ dependencies = [
"quote", "quote",
"wasm-bindgen-macro-support", "wasm-bindgen-macro-support",
@ -1599,9 +1681,9 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-macro-support" name = "wasm-bindgen-macro-support"
version = "0.2.86" version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8" checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -1612,15 +1694,15 @@ dependencies = [
[[package]] [[package]]
name = "wasm-bindgen-shared" name = "wasm-bindgen-shared"
version = "0.2.86" version = "0.2.87"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1"
[[package]] [[package]]
name = "web-sys" name = "web-sys"
version = "0.3.63" version = "0.3.64"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bdd9ef4e984da1187bf8110c5cf5b845fbc87a23602cdf912386a76fcd3a7c2" checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b"
dependencies = [ dependencies = [
"js-sys", "js-sys",
"wasm-bindgen", "wasm-bindgen",
@ -1667,37 +1749,13 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets 0.42.2",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [ dependencies = [
"windows-targets 0.48.0", "windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
] ]
[[package]] [[package]]
@ -1706,93 +1764,51 @@ version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5" checksum = "7b1eb6f0cd7c80c79759c929114ef071b87354ce476d9d94271031c0497adfd5"
dependencies = [ dependencies = [
"windows_aarch64_gnullvm 0.48.0", "windows_aarch64_gnullvm",
"windows_aarch64_msvc 0.48.0", "windows_aarch64_msvc",
"windows_i686_gnu 0.48.0", "windows_i686_gnu",
"windows_i686_msvc 0.48.0", "windows_i686_msvc",
"windows_x86_64_gnu 0.48.0", "windows_x86_64_gnu",
"windows_x86_64_gnullvm 0.48.0", "windows_x86_64_gnullvm",
"windows_x86_64_msvc 0.48.0", "windows_x86_64_msvc",
] ]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]] [[package]]
name = "windows_aarch64_gnullvm" name = "windows_aarch64_gnullvm"
version = "0.48.0" version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]] [[package]]
name = "windows_aarch64_msvc" name = "windows_aarch64_msvc"
version = "0.48.0" version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3"
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]] [[package]]
name = "windows_i686_gnu" name = "windows_i686_gnu"
version = "0.48.0" version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241"
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]] [[package]]
name = "windows_i686_msvc" name = "windows_i686_msvc"
version = "0.48.0" version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]] [[package]]
name = "windows_x86_64_gnu" name = "windows_x86_64_gnu"
version = "0.48.0" version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]] [[package]]
name = "windows_x86_64_gnullvm" name = "windows_x86_64_gnullvm"
version = "0.48.0" version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]] [[package]]
name = "windows_x86_64_msvc" name = "windows_x86_64_msvc"
version = "0.48.0" version = "0.48.0"

View File

@ -1,41 +1,6 @@
[package] [workspace]
name = "moq"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html members = [
"moq-transport",
[dependencies] "moq-warp"
# Fork of h3 with WebTransport support ]
h3 = { git = "https://github.com/security-union/h3", branch = "add-webtransport" }
h3-quinn = { git = "https://github.com/security-union/h3", branch = "add-webtransport" }
h3-webtransport = { git = "https://github.com/security-union/h3", branch = "add-webtransport" }
quinn = { version = "0.10", default-features = false, features = ["runtime-tokio", "tls-rustls", "ring"] }
# Crypto dependencies
ring = "0.16"
rustls = { version = "0.21", features = ["dangerous_configuration"] }
rustls-pemfile = "1.0.2"
# Async stuff
tokio = { version = "1.27", features = ["full"] }
futures = "0.3"
# Media
mp4 = "0.13.0"
# Encoding
bytes = "1"
serde = "1.0.160"
serde_json = "1.0"
# Web server to serve the fingerprint
http = "0.2"
warp = { version = "0.3.3", features = ["tls"] }
hex = "0.4.3"
# Logging and utility
clap = { version = "4.0", features = [ "derive" ] }
log = { version = "0.4", features = ["std"] }
env_logger = "0.9.3"
anyhow = "1.0.70"

37
moq-transport/Cargo.toml Normal file
View File

@ -0,0 +1,37 @@
[package]
name = "moq-transport"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# Fork of h3 with WebTransport support
h3 = { git = "https://github.com/security-union/h3", branch = "add-webtransport" }
h3-quinn = { git = "https://github.com/security-union/h3", branch = "add-webtransport" }
h3-webtransport = { git = "https://github.com/security-union/h3", branch = "add-webtransport" }
quinn = { version = "0.10", default-features = false, features = ["runtime-tokio", "tls-rustls", "ring"] }
quinn-proto = "0.10"
http = "0.2"
# Crypto dependencies
ring = "0.16"
rustls = { version = "0.21", features = ["dangerous_configuration"] }
rustls-pemfile = "1.0.2"
# Async stuff
tokio = { version = "1.27", features = ["full"] }
futures = "0.3"
# Encoding
bytes = "1"
# Logging
clap = { version = "4.0", features = [ "derive" ] }
log = { version = "0.4", features = ["std"] }
env_logger = "0.9.3"
# Utility
anyhow = "1.0.70"
thiserror = "1.0.21"
async-trait = "0.1"

View File

@ -0,0 +1,54 @@
use super::VarInt;
use bytes::Bytes;
use std::str;
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncReadExt};
#[async_trait]
pub trait Decode: Sized {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self>;
}
#[async_trait]
impl Decode for Bytes {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
Vec::<u8>::decode(r).await.map(Bytes::from)
}
}
#[async_trait]
impl Decode for Vec<u8> {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let size = u64::decode(r).await?;
// NOTE: we don't use with_capacity since size is from an untrusted source
let mut buf = Vec::new();
r.take(size).read_to_end(&mut buf).await?;
Ok(buf)
}
}
#[async_trait]
impl Decode for String {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let data = Vec::decode(r).await?;
let s = str::from_utf8(&data)?.to_string();
Ok(s)
}
}
#[async_trait]
impl Decode for u64 {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
VarInt::decode(r).await.map(Into::into)
}
}
#[async_trait]
impl Decode for usize {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
VarInt::decode(r).await.map(Into::into)
}
}

View File

@ -0,0 +1,24 @@
use super::{Decode, Encode};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
use std::time::Duration;
#[async_trait]
impl Encode for Duration {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
let ms = self.as_millis();
let ms = u64::try_from(ms)?;
ms.encode(w).await
}
}
#[async_trait]
impl Decode for Duration {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let ms = u64::decode(r).await?;
let ms = ms;
Ok(Self::from_millis(ms))
}
}

View File

@ -0,0 +1,58 @@
use async_trait::async_trait;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use super::VarInt;
use bytes::Bytes;
#[async_trait]
pub trait Encode: Sized {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()>;
}
#[async_trait]
impl Encode for Bytes {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.len().encode(w).await?;
w.write_all(self).await?;
Ok(())
}
}
#[async_trait]
impl Encode for Vec<u8> {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.len().encode(w).await?;
w.write_all(self).await?;
Ok(())
}
}
#[async_trait]
impl Encode for &[u8] {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.len().encode(w).await?;
w.write_all(self).await?;
Ok(())
}
}
#[async_trait]
impl Encode for String {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.as_bytes().encode(w).await
}
}
#[async_trait]
impl Encode for u64 {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
VarInt::try_from(*self)?.encode(w).await
}
}
#[async_trait]
impl Encode for usize {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
VarInt::try_from(*self)?.encode(w).await
}
}

View File

@ -0,0 +1,9 @@
mod decode;
mod duration;
mod encode;
mod varint;
pub use decode::*;
pub use duration::*;
pub use encode::*;
pub use varint::*;

View File

@ -0,0 +1,139 @@
// Based on quinn-proto
// https://github.com/quinn-rs/quinn/blob/main/quinn-proto/src/varint.rs
// Licensed via Apache 2.0 and MIT
use std::convert::{TryFrom, TryInto};
use std::fmt;
use super::{Decode, Encode};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
#[derive(Debug, Copy, Clone, Eq, PartialEq, Error)]
#[error("value too large for varint encoding")]
pub struct BoundsExceeded;
/// An integer less than 2^62
///
/// Values of this type are suitable for encoding as QUIC variable-length integer.
// It would be neat if we could express to Rust that the top two bits are available for use as enum
// discriminants
#[derive(Default, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub(crate) struct VarInt(u64);
impl From<VarInt> for u64 {
fn from(x: VarInt) -> Self {
x.0
}
}
impl From<VarInt> for usize {
fn from(x: VarInt) -> Self {
x.0 as usize
}
}
impl From<u8> for VarInt {
fn from(x: u8) -> Self {
Self(x.into())
}
}
impl From<u16> for VarInt {
fn from(x: u16) -> Self {
Self(x.into())
}
}
impl From<u32> for VarInt {
fn from(x: u32) -> Self {
Self(x.into())
}
}
impl TryFrom<u64> for VarInt {
type Error = BoundsExceeded;
/// Succeeds iff `x` < 2^62
fn try_from(x: u64) -> Result<Self, BoundsExceeded> {
if x < 2u64.pow(62) {
Ok(Self(x))
} else {
Err(BoundsExceeded)
}
}
}
impl TryFrom<usize> for VarInt {
type Error = BoundsExceeded;
/// Succeeds iff `x` < 2^62
fn try_from(x: usize) -> Result<Self, BoundsExceeded> {
Self::try_from(x as u64)
}
}
impl fmt::Debug for VarInt {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl fmt::Display for VarInt {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
use async_trait::async_trait;
#[async_trait]
impl Decode for VarInt {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let mut buf = [0; 8];
r.read_exact(buf[0..1].as_mut()).await?;
let tag = buf[0] >> 6;
buf[0] &= 0b0011_1111;
let x = match tag {
0b00 => u64::from(buf[0]),
0b01 => {
r.read_exact(buf[1..2].as_mut()).await?;
u64::from(u16::from_be_bytes(buf[..2].try_into().unwrap()))
}
0b10 => {
r.read_exact(buf[1..4].as_mut()).await?;
u64::from(u32::from_be_bytes(buf[..4].try_into().unwrap()))
}
0b11 => {
r.read_exact(buf[1..8].as_mut()).await?;
u64::from_be_bytes(buf)
}
_ => unreachable!(),
};
Ok(Self(x))
}
}
#[async_trait]
impl Encode for VarInt {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
let x = self.0;
if x < 2u64.pow(6) {
w.write_u8(x as u8).await?;
} else if x < 2u64.pow(14) {
w.write_u16(0b01 << 14 | x as u16).await?;
} else if x < 2u64.pow(30) {
w.write_u32(0b10 << 30 | x as u32).await?;
} else if x < 2u64.pow(62) {
w.write_u64(0b11 << 62 | x).await?;
} else {
anyhow::bail!("malformed VarInt");
}
Ok(())
}
}

View File

@ -0,0 +1,26 @@
use crate::coding::{Decode, Encode};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct Announce {
// The track namespace
pub track_namespace: String,
}
#[async_trait]
impl Decode for Announce {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let track_namespace = String::decode(r).await?;
Ok(Self { track_namespace })
}
}
#[async_trait]
impl Encode for Announce {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.track_namespace.encode(w).await?;
Ok(())
}
}

View File

@ -0,0 +1,43 @@
use crate::coding::{Decode, Encode};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct AnnounceError {
// Echo back the namespace that was announced.
// TODO Propose using an ID to save bytes.
pub track_namespace: String,
// An error code.
pub code: u64,
// An optional, human-readable reason.
pub reason: String,
}
#[async_trait]
impl Decode for AnnounceError {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let track_namespace = String::decode(r).await?;
let code = u64::decode(r).await?;
let reason = String::decode(r).await?;
Ok(Self {
track_namespace,
code,
reason,
})
}
}
#[async_trait]
impl Encode for AnnounceError {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.track_namespace.encode(w).await?;
self.code.encode(w).await?;
self.reason.encode(w).await?;
Ok(())
}
}

View File

@ -0,0 +1,26 @@
use crate::coding::{Decode, Encode};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct AnnounceOk {
// Echo back the namespace that was announced.
// TODO Propose using an ID to save bytes.
pub track_namespace: String,
}
#[async_trait]
impl Decode for AnnounceOk {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let track_namespace = String::decode(r).await?;
Ok(Self { track_namespace })
}
}
#[async_trait]
impl Encode for AnnounceOk {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.track_namespace.encode(w).await
}
}

View File

@ -0,0 +1,24 @@
use crate::coding::{Decode, Encode};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct GoAway {
pub url: String,
}
#[async_trait]
impl Decode for GoAway {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let url = String::decode(r).await?;
Ok(Self { url })
}
}
#[async_trait]
impl Encode for GoAway {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.url.encode(w).await
}
}

View File

@ -0,0 +1,110 @@
mod announce;
mod announce_error;
mod announce_ok;
mod go_away;
mod stream;
mod subscribe;
mod subscribe_error;
mod subscribe_ok;
pub use announce::*;
pub use announce_error::*;
pub use announce_ok::*;
pub use go_away::*;
pub use stream::*;
pub use subscribe::*;
pub use subscribe_error::*;
pub use subscribe_ok::*;
use crate::coding::{Decode, Encode};
use async_trait::async_trait;
use std::fmt;
use tokio::io::{AsyncRead, AsyncWrite};
use anyhow::Context;
// Use a macro to generate the message types rather than copy-paste.
// This implements a decode/encode method that uses the specified type.
macro_rules! message_types {
{$($name:ident = $val:expr,)*} => {
pub enum Message {
$($name($name)),*
}
#[async_trait]
impl Decode for Message {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let t = u64::decode(r).await.context("failed to decode type")?;
Ok(match u64::from(t) {
$($val => {
let msg = $name::decode(r).await.context(concat!("failed to decode ", stringify!($name)))?;
Self::$name(msg)
})*
_ => anyhow::bail!("invalid type: {}", t),
})
}
}
#[async_trait]
impl Encode for Message {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
match self {
$(Self::$name(ref m) => {
let id: u64 = $val; // tell the compiler this is a u64
id.encode(w).await.context("failed to encode type")?;
m.encode(w).await.context("failed to encode message")
},)*
}
}
}
// Unwrap the enum into the specified type.
$(impl TryFrom<Message> for $name {
type Error = anyhow::Error;
fn try_from(m: Message) -> Result<Self, Self::Error> {
match m {
Message::$name(m) => Ok(m),
_ => anyhow::bail!("invalid message type"),
}
}
})*
$(impl From<$name> for Message {
fn from(m: $name) -> Self {
Message::$name(m)
}
})*
impl fmt::Debug for Message {
// Delegate to the message formatter
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
$(Self::$name(ref m) => m.fmt(f),)*
}
}
}
}
}
// NOTE: These messages are forked from moq-transport-00.
// 1. subscribe specifies the track_id, not subscribe_ok
// 2. messages lack a specified length
// 3. optional parameters are not supported (announce, subscribe)
// 4. not allowed on undirectional streams; only after SETUP on the bidirectional stream
// Each message is prefixed with the given VarInt type.
message_types! {
// NOTE: Object and Setup are in other modules.
// Object = 0x0
// Setup = 0x1
Subscribe = 0x3,
SubscribeOk = 0x4,
SubscribeError = 0x5,
Announce = 0x6,
AnnounceOk = 0x7,
AnnounceError = 0x8,
GoAway = 0x10,
}

View File

@ -0,0 +1,61 @@
use crate::coding::{Decode, Encode};
use crate::control::Message;
use bytes::Bytes;
use h3::quic::BidiStream;
pub struct Stream {
sender: SendStream,
recver: RecvStream,
}
impl Stream {
pub(crate) fn new(stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<Bytes>, Bytes>) -> Self {
let (sender, recver) = stream.split();
let sender = SendStream::new(sender);
let recver = RecvStream::new(recver);
Self { sender, recver }
}
pub fn split(self) -> (SendStream, RecvStream) {
(self.sender, self.recver)
}
pub async fn send(&mut self, msg: Message) -> anyhow::Result<()> {
self.sender.send(msg).await
}
pub async fn recv(&mut self) -> anyhow::Result<Message> {
self.recver.recv().await
}
}
pub struct SendStream {
stream: h3_webtransport::stream::SendStream<h3_quinn::SendStream<Bytes>, Bytes>,
}
impl SendStream {
pub(crate) fn new(stream: h3_webtransport::stream::SendStream<h3_quinn::SendStream<Bytes>, Bytes>) -> Self {
Self { stream }
}
pub async fn send(&mut self, msg: Message) -> anyhow::Result<()> {
msg.encode(&mut self.stream).await
}
}
pub struct RecvStream {
stream: h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, Bytes>,
}
impl RecvStream {
pub(crate) fn new(stream: h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, Bytes>) -> Self {
Self { stream }
}
pub async fn recv(&mut self) -> anyhow::Result<Message> {
Message::decode(&mut self.stream).await
}
}

View File

@ -0,0 +1,43 @@
use crate::coding::{Decode, Encode};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct Subscribe {
// An ID we choose so we can map to the track_name.
// Proposal: https://github.com/moq-wg/moq-transport/issues/209
pub track_id: u64,
// The track namespace.
pub track_namespace: String,
// The track name.
pub track_name: String,
}
#[async_trait]
impl Decode for Subscribe {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let track_id = u64::decode(r).await?;
let track_namespace = String::decode(r).await?;
let track_name = String::decode(r).await?;
Ok(Self {
track_id,
track_namespace,
track_name,
})
}
}
#[async_trait]
impl Encode for Subscribe {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.track_id.encode(w).await?;
self.track_namespace.encode(w).await?;
self.track_name.encode(w).await?;
Ok(())
}
}

View File

@ -0,0 +1,40 @@
use crate::coding::{Decode, Encode};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct SubscribeError {
// NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209
// The ID for this track.
pub track_id: u64,
// An error code.
pub code: u64,
// An optional, human-readable reason.
pub reason: String,
}
#[async_trait]
impl Decode for SubscribeError {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let track_id = u64::decode(r).await?;
let code = u64::decode(r).await?;
let reason = String::decode(r).await?;
Ok(Self { track_id, code, reason })
}
}
#[async_trait]
impl Encode for SubscribeError {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.track_id.encode(w).await?;
self.code.encode(w).await?;
self.reason.encode(w).await?;
Ok(())
}
}

View File

@ -0,0 +1,39 @@
use crate::coding::{Decode, Encode};
use std::time::Duration;
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)]
pub struct SubscribeOk {
// NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209
// The ID for this track.
pub track_id: u64,
// The subscription will end after this duration has elapsed.
// A value of zero is invalid.
pub expires: Option<Duration>,
}
#[async_trait]
impl Decode for SubscribeOk {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let track_id = u64::decode(r).await?;
let expires = Duration::decode(r).await?;
let expires = if expires == Duration::ZERO { None } else { Some(expires) };
Ok(Self { track_id, expires })
}
}
#[async_trait]
impl Encode for SubscribeOk {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.track_id.encode(w).await?;
self.expires.unwrap_or_default().encode(w).await?;
Ok(())
}
}

View File

@ -0,0 +1,56 @@
use crate::coding::{Decode, Encode};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
// Another name for OBJECT, sent as a header for data streams.
#[derive(Debug)]
pub struct Header {
// An ID for this track.
// Proposal: https://github.com/moq-wg/moq-transport/issues/209
pub track_id: u64,
// The group sequence number.
pub group_sequence: u64,
// The object sequence number.
pub object_sequence: u64,
// The priority/send order.
pub send_order: u64,
}
#[async_trait]
impl Decode for Header {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let typ = u64::decode(r).await?;
anyhow::ensure!(typ == 0, "OBJECT type must be 0");
// NOTE: size has been omitted
let track_id = u64::decode(r).await?;
let group_sequence = u64::decode(r).await?;
let object_sequence = u64::decode(r).await?;
let send_order = u64::decode(r).await?;
Ok(Self {
track_id,
group_sequence,
object_sequence,
send_order,
})
}
}
#[async_trait]
impl Encode for Header {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
0u64.encode(w).await?;
self.track_id.encode(w).await?;
self.group_sequence.encode(w).await?;
self.object_sequence.encode(w).await?;
self.send_order.encode(w).await?;
Ok(())
}
}

View File

@ -0,0 +1,5 @@
mod header;
mod transport;
pub use header::*;
pub use transport::*;

View File

@ -0,0 +1,49 @@
use super::Header;
use anyhow::Context;
use bytes::Bytes;
use crate::coding::{Decode, Encode};
// TODO support clients
type WebTransportSession = h3_webtransport::server::WebTransportSession<h3_quinn::Connection, Bytes>;
// Reduce some typing for implementors.
pub type SendStream = h3_webtransport::stream::SendStream<h3_quinn::SendStream<Bytes>, Bytes>;
pub type RecvStream = h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, Bytes>;
pub struct Transport {
transport: WebTransportSession,
}
impl Transport {
pub fn new(transport: WebTransportSession) -> Self {
Self { transport }
}
pub async fn recv(&self) -> anyhow::Result<(Header, RecvStream)> {
let (_session_id, mut stream) = self
.transport
.accept_uni()
.await
.context("failed to accept uni stream")?
.context("no uni stream")?;
let header = Header::decode(&mut stream).await?;
Ok((header, stream))
}
pub async fn send(&self, header: Header) -> anyhow::Result<SendStream> {
let mut stream = self
.transport
.open_uni(self.transport.session_id())
.await
.context("failed to open uni stream")?;
// TODO set send_order based on header
header.encode(&mut stream).await?;
Ok(stream)
}
}

5
moq-transport/src/lib.rs Normal file
View File

@ -0,0 +1,5 @@
pub mod coding;
pub mod control;
pub mod data;
pub mod server;
pub mod setup;

View File

@ -0,0 +1,42 @@
use super::handshake::{Accept, Connecting};
use anyhow::Context;
use tokio::task::JoinSet;
pub struct Endpoint {
// The QUIC server, yielding new connections and sessions.
endpoint: quinn::Endpoint,
// A list of connections that are completing the WebTransport handshake.
handshake: JoinSet<anyhow::Result<Accept>>,
}
impl Endpoint {
pub fn new(endpoint: quinn::Endpoint) -> Self {
let handshake = JoinSet::new();
Self { endpoint, handshake }
}
// Accept the next WebTransport session.
pub async fn accept(&mut self) -> anyhow::Result<Accept> {
loop {
tokio::select!(
// Accept the connection and start the WebTransport handshake.
conn = self.endpoint.accept() => {
let conn = conn.context("failed to accept connection")?;
self.handshake.spawn(async move {
Connecting::new(conn).accept().await
});
},
// Return any mostly finished WebTransport handshakes.
res = self.handshake.join_next(), if !self.handshake.is_empty() => {
let res = res.expect("no tasks").expect("task aborted");
match res {
Ok(session) => return Ok(session),
Err(err) => log::warn!("failed to accept session: {:?}", err),
}
},
)
}
}
}

View File

@ -0,0 +1,114 @@
use super::setup::{RecvSetup, SendSetup};
use crate::{control, data, setup};
use anyhow::Context;
use bytes::Bytes;
pub struct Connecting {
conn: quinn::Connecting,
}
impl Connecting {
pub fn new(conn: quinn::Connecting) -> Self {
Self { conn }
}
pub async fn accept(self) -> anyhow::Result<Accept> {
let conn = self.conn.await.context("failed to accept h3 connection")?;
let mut conn = h3::server::builder()
.enable_webtransport(true)
.enable_connect(true)
.enable_datagram(true)
.max_webtransport_sessions(1)
.send_grease(true)
.build(h3_quinn::Connection::new(conn))
.await
.context("failed to create h3 server")?;
let (req, stream) = conn
.accept()
.await
.context("failed to accept h3 session")?
.context("failed to accept h3 request")?;
let ext = req.extensions();
anyhow::ensure!(req.method() == http::Method::CONNECT, "expected CONNECT request");
anyhow::ensure!(
ext.get::<h3::ext::Protocol>() == Some(&h3::ext::Protocol::WEB_TRANSPORT),
"expected WebTransport CONNECT"
);
// Return the request after validating the bare minimum.
let accept = Accept { conn, req, stream };
Ok(accept)
}
}
// The WebTransport handshake is complete, but we need to decide if we accept it or return 404.
pub struct Accept {
// Inspect to decide whether to accept() or reject() the session.
req: http::Request<()>,
conn: h3::server::Connection<h3_quinn::Connection, Bytes>,
stream: h3::server::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
}
impl Accept {
// Expose the received URI
pub fn uri(&self) -> &http::Uri {
self.req.uri()
}
// Accept the WebTransport session.
pub async fn accept(self) -> anyhow::Result<Setup> {
let transport = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?;
let stream = transport
.accept_bi()
.await
.context("failed to accept bidi stream")?
.unwrap();
let transport = data::Transport::new(transport);
let stream = match stream {
h3_webtransport::server::AcceptedBi::BidiStream(_session_id, stream) => stream,
h3_webtransport::server::AcceptedBi::Request(..) => anyhow::bail!("additional http requests not supported"),
};
let setup = RecvSetup::new(stream).recv().await?;
Ok(Setup { transport, setup })
}
// Reject the WebTransport session with a HTTP response.
pub async fn reject(mut self, resp: http::Response<()>) -> anyhow::Result<()> {
self.stream.send_response(resp).await?;
Ok(())
}
}
pub struct Setup {
setup: SendSetup,
transport: data::Transport,
}
impl Setup {
// Return the setup message we received.
pub fn setup(&self) -> &setup::Client {
&self.setup.client
}
// Accept the session with our own setup message.
pub async fn accept(self, setup: setup::Server) -> anyhow::Result<(data::Transport, control::Stream)> {
let control = self.setup.send(setup).await?;
Ok((self.transport, control))
}
pub async fn reject(self) -> anyhow::Result<()> {
// TODO Close the QUIC connection with an error code.
Ok(())
}
}

View File

@ -0,0 +1,6 @@
mod endpoint;
mod handshake;
mod setup;
pub use endpoint::*;
pub use handshake::*;

View File

@ -0,0 +1,42 @@
use crate::coding::{Decode, Encode};
use crate::{control, setup};
use anyhow::Context;
use bytes::Bytes;
pub(crate) struct RecvSetup {
stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<Bytes>, Bytes>,
}
impl RecvSetup {
pub fn new(stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<Bytes>, Bytes>) -> Self {
Self { stream }
}
pub async fn recv(mut self) -> anyhow::Result<SendSetup> {
let setup = setup::Client::decode(&mut self.stream)
.await
.context("failed to read client SETUP message")?;
Ok(SendSetup::new(self.stream, setup))
}
}
pub(crate) struct SendSetup {
pub client: setup::Client,
stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<Bytes>, Bytes>,
}
impl SendSetup {
pub fn new(
stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<Bytes>, Bytes>,
client: setup::Client,
) -> Self {
Self { stream, client }
}
pub async fn send(mut self, setup: setup::Server) -> anyhow::Result<control::Stream> {
setup.encode(&mut self.stream).await?;
Ok(control::Stream::new(self.stream))
}
}

View File

@ -0,0 +1,54 @@
use super::{Role, Versions};
use crate::coding::{Decode, Encode};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
use anyhow::Context;
// Sent by the client to setup up the session.
#[derive(Debug)]
pub struct Client {
// NOTE: This is not a message type, but rather the control stream header.
// Proposal: https://github.com/moq-wg/moq-transport/issues/138
// The list of supported versions in preferred order.
pub versions: Versions,
// Indicate if the client is a publisher, a subscriber, or both.
// Proposal: moq-wg/moq-transport#151
pub role: Role,
// The path, non-empty ONLY when not using WebTransport.
pub path: String,
}
#[async_trait]
impl Decode for Client {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let typ = u64::decode(r).await.context("failed to read type")?;
anyhow::ensure!(typ == 1, "client SETUP must be type 1");
let versions = Versions::decode(r).await.context("failed to read supported versions")?;
anyhow::ensure!(!versions.is_empty(), "client must support at least one version");
let role = Role::decode(r).await.context("failed to decode role")?;
let path = String::decode(r).await.context("failed to read path")?;
Ok(Self { versions, role, path })
}
}
#[async_trait]
impl Encode for Client {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
1u64.encode(w).await?;
anyhow::ensure!(!self.versions.is_empty(), "client must support at least one version");
self.versions.encode(w).await?;
self.role.encode(w).await?;
self.path.encode(w).await?;
Ok(())
}
}

View File

@ -0,0 +1,15 @@
mod client;
mod role;
mod server;
mod version;
pub use client::*;
pub use role::*;
pub use server::*;
pub use version::*;
// NOTE: These are forked from moq-transport-00.
// 1. messages lack a sized length
// 2. parameters are not optional and written in order (role + path)
// 3. role indicates local support only, not remote support
// 4. server setup is id=2 to disambiguate

View File

@ -0,0 +1,65 @@
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
use crate::coding::{Decode, Encode};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Role {
Publisher,
Subscriber,
Both,
}
impl Role {
pub fn is_publisher(&self) -> bool {
match self {
Self::Publisher | Self::Both => true,
Self::Subscriber => false,
}
}
pub fn is_subscriber(&self) -> bool {
match self {
Self::Subscriber | Self::Both => true,
Self::Publisher => false,
}
}
}
impl From<Role> for u64 {
fn from(r: Role) -> Self {
match r {
Role::Publisher => 0x0,
Role::Subscriber => 0x1,
Role::Both => 0x2,
}
}
}
impl TryFrom<u64> for Role {
type Error = anyhow::Error;
fn try_from(v: u64) -> Result<Self, Self::Error> {
Ok(match v {
0x0 => Self::Publisher,
0x1 => Self::Subscriber,
0x2 => Self::Both,
_ => anyhow::bail!("invalid role: {}", v),
})
}
}
#[async_trait]
impl Decode for Role {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let v = u64::decode(r).await?;
v.try_into()
}
}
#[async_trait]
impl Encode for Role {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
u64::from(*self).encode(w).await
}
}

View File

@ -0,0 +1,44 @@
use super::{Role, Version};
use crate::coding::{Decode, Encode};
use anyhow::Context;
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
// Sent by the server in response to a client.
// NOTE: This is not a message type, but rather the control stream header.
// Proposal: https://github.com/moq-wg/moq-transport/issues/138
#[derive(Debug)]
pub struct Server {
// The list of supported versions in preferred order.
pub version: Version,
// param: 0x0: Indicate if the server is a publisher, a subscriber, or both.
// Proposal: moq-wg/moq-transport#151
pub role: Role,
}
#[async_trait]
impl Decode for Server {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let typ = u64::decode(r).await.context("failed to read type")?;
anyhow::ensure!(typ == 2, "server SETUP must be type 2");
let version = Version::decode(r).await.context("failed to read version")?;
let role = Role::decode(r).await.context("failed to read role")?;
Ok(Self { version, role })
}
}
#[async_trait]
impl Encode for Server {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
2u64.encode(w).await?; // setup type
self.version.encode(w).await?;
self.role.encode(w).await?;
Ok(())
}
}

View File

@ -0,0 +1,77 @@
use crate::coding::{Decode, Encode};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
use std::ops::Deref;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Version(pub u64);
impl Version {
pub const DRAFT_00: Version = Version(0xff00);
}
impl From<u64> for Version {
fn from(v: u64) -> Self {
Self(v)
}
}
impl From<Version> for u64 {
fn from(v: Version) -> Self {
v.0
}
}
#[async_trait]
impl Decode for Version {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let v = u64::decode(r).await?;
Ok(Self(v))
}
}
#[async_trait]
impl Encode for Version {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.0.encode(w).await
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Versions(pub Vec<Version>);
#[async_trait]
impl Decode for Versions {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let count = u64::decode(r).await?;
let mut vs = Vec::new();
for _ in 0..count {
let v = Version::decode(r).await?;
vs.push(v);
}
Ok(Self(vs))
}
}
#[async_trait]
impl Encode for Versions {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
self.0.len().encode(w).await?;
for v in &self.0 {
v.encode(w).await?;
}
Ok(())
}
}
impl Deref for Versions {
type Target = Vec<Version>;
fn deref(&self) -> &Self::Target {
&self.0
}
}

1809
moq-warp/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

48
moq-warp/Cargo.toml Normal file
View File

@ -0,0 +1,48 @@
[package]
name = "moq-warp"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# WebTransport support: TODO pin a version when released
h3 = { git = "https://github.com/hyperium/h3", branch = "master" }
h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" }
h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" }
quinn = { version = "0.10", default-features = false, features = ["runtime-tokio", "tls-rustls", "ring"] }
quinn-proto = "0.10"
# Crypto dependencies
ring = "0.16"
rustls = { version = "0.21", features = ["dangerous_configuration"] }
rustls-pemfile = "1.0.2"
# Async stuff
tokio = { version = "1.27", features = ["full"] }
futures = "0.3"
# Media
mp4 = "0.13.0"
# Encoding
bytes = "1"
serde = "1.0.160"
serde_json = "1.0"
# Web server to serve the fingerprint
http = "0.2"
warp = { version = "0.3.3", features = ["tls"] }
hex = "0.4.3"
# Logging
clap = { version = "4.0", features = [ "derive" ] }
log = { version = "0.4", features = ["std"] }
env_logger = "0.9.3"
# Utility
anyhow = "1.0.70"
thiserror = "1.0.21"
paste = "1.0"
moq-transport = { path = "../moq-transport" }

5
moq-warp/src/app/mod.rs Normal file
View File

@ -0,0 +1,5 @@
mod server;
mod session;
pub use server::*;
pub use session::*;

View File

@ -3,16 +3,20 @@ use crate::media;
use std::{fs, io, net, path, sync, time}; use std::{fs, io, net, path, sync, time};
use super::WebTransportSession;
use anyhow::Context; use anyhow::Context;
pub struct Server { use moq_transport::server;
// The QUIC server, yielding new connections and sessions. use tokio::task::JoinSet;
server: quinn::Endpoint,
// The media source pub struct Server {
broadcast: media::Broadcast, // The MoQ transport server.
server: server::Endpoint,
// The media source.
broadcasts: media::Broadcasts,
// Sessions actively being run.
sessions: JoinSet<anyhow::Result<()>>,
} }
pub struct ServerConfig { pub struct ServerConfig {
@ -20,7 +24,7 @@ pub struct ServerConfig {
pub cert: path::PathBuf, pub cert: path::PathBuf,
pub key: path::PathBuf, pub key: path::PathBuf,
pub broadcast: media::Broadcast, pub broadcasts: media::Broadcasts,
} }
impl Server { impl Server {
@ -70,56 +74,38 @@ impl Server {
server_config.transport = sync::Arc::new(transport_config); server_config.transport = sync::Arc::new(transport_config);
let server = quinn::Endpoint::server(server_config, config.addr)?; let server = quinn::Endpoint::server(server_config, config.addr)?;
let broadcast = config.broadcast; let broadcasts = config.broadcasts;
Ok(Self { server, broadcast }) let server = server::Endpoint::new(server);
let sessions = JoinSet::new();
Ok(Self {
server,
broadcasts,
sessions,
})
} }
pub async fn run(&mut self) -> anyhow::Result<()> { pub async fn run(&mut self) -> anyhow::Result<()> {
loop { loop {
let conn = self.server.accept().await.context("failed to accept connection")?; tokio::select! {
let broadcast = self.broadcast.clone(); res = self.server.accept() => {
let session = res.context("failed to accept connection")?;
let broadcasts = self.broadcasts.clone();
tokio::spawn(async move { self.sessions.spawn(async move {
let session = Self::accept_session(conn).await.context("failed to accept session")?; let session: Session = Session::accept(session, broadcasts).await?;
session.serve().await
});
},
res = self.sessions.join_next(), if !self.sessions.is_empty() => {
let res = res.expect("no tasks").expect("task aborted");
// Use a wrapper run the session. if let Err(err) = res {
let session = Session::new(session); log::error!("session terminated: {:?}", err);
session.serve_broadcast(broadcast).await }
}); },
}
} }
} }
async fn accept_session(conn: quinn::Connecting) -> anyhow::Result<WebTransportSession> {
let conn = conn.await.context("failed to accept h3 connection")?;
let mut conn = h3::server::builder()
.enable_webtransport(true)
.enable_connect(true)
.enable_datagram(true)
.max_webtransport_sessions(1)
.send_grease(true)
.build(h3_quinn::Connection::new(conn))
.await
.context("failed to create h3 server")?;
let (req, stream) = conn
.accept()
.await
.context("failed to accept h3 session")?
.context("failed to accept h3 request")?;
let ext = req.extensions();
anyhow::ensure!(req.method() == http::Method::CONNECT, "expected CONNECT request");
anyhow::ensure!(
ext.get::<h3::ext::Protocol>() == Some(&h3::ext::Protocol::WEB_TRANSPORT),
"expected WebTransport CONNECT"
);
let session = WebTransportSession::accept(req, stream, conn)
.await
.context("failed to accept WebTransport session")?;
Ok(session)
}
} }

225
moq-warp/src/app/session.rs Normal file
View File

@ -0,0 +1,225 @@
use crate::media;
use anyhow::Context;
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
use std::sync::Arc;
use moq_transport::{control, data, server, setup};
pub struct Session {
// Used to send/receive data streams.
transport: Arc<data::Transport>,
// Used to send/receive control messages.
control: control::Stream,
// The list of available broadcasts for the session.
media: media::Broadcasts,
// The list of active subscriptions.
tasks: JoinSet<anyhow::Result<()>>,
}
impl Session {
pub async fn accept(session: server::Accept, media: media::Broadcasts) -> anyhow::Result<Session> {
// Accep the WebTransport session.
// OPTIONAL validate the conn.uri() otherwise call conn.reject()
let session = session
.accept()
.await
.context("failed to accept WebTransport session")?;
session
.setup()
.versions
.iter()
.find(|v| **v == setup::Version::DRAFT_00)
.context("failed to find supported version")?;
match session.setup().role {
setup::Role::Subscriber => {}
_ => anyhow::bail!("TODO publishing not yet supported"),
}
let setup = setup::Server {
version: setup::Version::DRAFT_00,
role: setup::Role::Publisher,
};
let (transport, control) = session.accept(setup).await?;
let session = Self {
transport: Arc::new(transport),
control,
media,
tasks: JoinSet::new(),
};
Ok(session)
}
pub async fn serve(mut self) -> anyhow::Result<()> {
// TODO fix lazy: make a copy of the strings to avoid the borrow checker on self.
let broadcasts: Vec<String> = self.media.keys().cloned().collect();
// Announce each available broadcast immediately.
for name in broadcasts {
self.send_message(control::Announce {
track_namespace: name.clone(),
})
.await?;
}
loop {
tokio::select! {
msg = self.control.recv() => {
let msg = msg.context("failed to receive control message")?;
self.handle_message(msg).await?;
},
res = self.tasks.join_next(), if !self.tasks.is_empty() => {
let res = res.expect("no tasks").expect("task aborted");
if let Err(err) = res {
log::warn!("failed to serve subscription: {:?}", err);
}
}
}
}
}
async fn handle_message(&mut self, msg: control::Message) -> anyhow::Result<()> {
log::info!("received message: {:?}", msg);
// TODO implement publish and subscribe halves of the protocol.
match msg {
control::Message::Announce(_) => anyhow::bail!("ANNOUNCE not supported"),
control::Message::AnnounceOk(ref _ok) => Ok(()), // noop
control::Message::AnnounceError(ref err) => {
anyhow::bail!("received ANNOUNCE_ERROR({:?}): {}", err.code, err.reason)
}
control::Message::Subscribe(ref sub) => self.receive_subscribe(sub).await,
control::Message::SubscribeOk(_) => anyhow::bail!("SUBSCRIBE OK not supported"),
control::Message::SubscribeError(_) => anyhow::bail!("SUBSCRIBE ERROR not supported"),
control::Message::GoAway(_) => anyhow::bail!("goaway not supported"),
}
}
async fn send_message<T: Into<control::Message>>(&mut self, msg: T) -> anyhow::Result<()> {
let msg = msg.into();
log::info!("sending message: {:?}", msg);
self.control.send(msg).await
}
async fn receive_subscribe(&mut self, sub: &control::Subscribe) -> anyhow::Result<()> {
match self.subscribe(sub) {
Ok(()) => {
self.send_message(control::SubscribeOk {
track_id: sub.track_id,
expires: None,
})
.await
}
Err(e) => {
self.send_message(control::SubscribeError {
track_id: sub.track_id,
code: 1,
reason: e.to_string(),
})
.await
}
}
}
fn subscribe(&mut self, sub: &control::Subscribe) -> anyhow::Result<()> {
let broadcast = self
.media
.get(&sub.track_namespace)
.context("unknown track namespace")?;
let track = broadcast
.tracks
.get(&sub.track_name)
.context("unknown track name")?
.clone();
let sub = Subscription {
track,
track_id: sub.track_id,
transport: self.transport.clone(),
};
self.tasks.spawn(async move { sub.serve().await });
Ok(())
}
}
pub struct Subscription {
transport: Arc<data::Transport>,
track_id: u64,
track: media::Track,
}
impl Subscription {
pub async fn serve(mut self) -> anyhow::Result<()> {
let mut tasks = JoinSet::new();
let mut done = false;
loop {
tokio::select! {
// Accept new tracks added to the broadcast.
segment = self.track.segments.next(), if !done => {
match segment {
Some(segment) => {
let group = Group {
segment,
transport: self.transport.clone(),
track_id: self.track_id,
};
tasks.spawn(async move { group.serve().await });
},
None => done = true, // no more segments in the track
}
},
// Poll any pending segments until they exit.
res = tasks.join_next(), if !tasks.is_empty() => {
let res = res.expect("no tasks").expect("task aborted");
res.context("failed serve segment")?
},
else => return Ok(()), // all segments received and finished serving
}
}
}
}
struct Group {
transport: Arc<data::Transport>,
track_id: u64,
segment: media::Segment,
}
impl Group {
pub async fn serve(mut self) -> anyhow::Result<()> {
// TODO proper values
let header = moq_transport::data::Header {
track_id: self.track_id,
group_sequence: 0, // TODO
object_sequence: 0, // Always zero since we send an entire group as an object
send_order: 0, // TODO
};
let mut stream = self.transport.send(header).await?;
// Write each fragment as they are available.
while let Some(fragment) = self.segment.fragments.next().await {
stream.write_all(fragment.as_slice()).await?;
}
// NOTE: stream is automatically closed when dropped
Ok(())
}
}

View File

@ -1,6 +1,9 @@
use moq::{app, media}; use moq_warp::{app, media};
use std::{fs, io, net, path, sync}; use std::{fs, io, net, path, sync};
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Context; use anyhow::Context;
use clap::Parser; use clap::Parser;
use ring::digest::{digest, SHA256}; use ring::digest::{digest, SHA256};
@ -38,12 +41,15 @@ async fn main() -> anyhow::Result<()> {
// Create a fake media source from disk. // Create a fake media source from disk.
let mut media = media::Source::new(args.media).context("failed to open fragmented.mp4")?; let mut media = media::Source::new(args.media).context("failed to open fragmented.mp4")?;
let mut broadcasts = HashMap::new();
broadcasts.insert("demo".to_string(), media.broadcast());
// Create a server to actually serve the media // Create a server to actually serve the media
let config = app::ServerConfig { let config = app::ServerConfig {
addr: args.addr, addr: args.addr,
cert: args.cert, cert: args.cert,
key: args.key, key: args.key,
broadcast: media.broadcast(), broadcasts: Arc::new(broadcasts),
}; };
let mut server = app::Server::new(config).context("failed to create server")?; let mut server = app::Server::new(config).context("failed to create server")?;

View File

@ -1,16 +1,20 @@
use super::Subscriber; use super::Subscriber;
use std::{sync, time}; use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
// Map from track namespace to broadcast.
// TODO support updates
pub type Broadcasts = Arc<HashMap<String, Broadcast>>;
#[derive(Clone)] #[derive(Clone)]
pub struct Broadcast { pub struct Broadcast {
pub tracks: Subscriber<Track>, // TODO support updates.
pub tracks: Arc<HashMap<String, Track>>,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Track { pub struct Track {
// The track ID as stored in the MP4
pub id: u32,
// A list of segments, which are independently decodable. // A list of segments, which are independently decodable.
pub segments: Subscriber<Segment>, pub segments: Subscriber<Segment>,
} }
@ -18,11 +22,11 @@ pub struct Track {
#[derive(Clone)] #[derive(Clone)]
pub struct Segment { pub struct Segment {
// The timestamp of the segment. // The timestamp of the segment.
pub timestamp: time::Duration, pub timestamp: Duration,
// A list of fragments that make up the segment. // A list of fragments that make up the segment.
pub fragments: Subscriber<Fragment>, pub fragments: Subscriber<Fragment>,
} }
// Use Arc to avoid cloning the entire MP4 data for each subscriber. // Use Arc to avoid cloning the entire MP4 data for each subscriber.
pub type Fragment = sync::Arc<Vec<u8>>; pub type Fragment = Arc<Vec<u8>>;

View File

@ -9,6 +9,7 @@ use mp4::ReadBox;
use anyhow::Context; use anyhow::Context;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
use super::{Broadcast, Fragment, Producer, Segment, Track}; use super::{Broadcast, Fragment, Producer, Segment, Track};
@ -16,7 +17,7 @@ pub struct Source {
// We read the file once, in order, and don't seek backwards. // We read the file once, in order, and don't seek backwards.
reader: io::BufReader<fs::File>, reader: io::BufReader<fs::File>,
// The tracks we're producing // The subscribable broadcast.
broadcast: Broadcast, broadcast: Broadcast,
// The tracks we're producing. // The tracks we're producing.
@ -45,20 +46,15 @@ impl Source {
// Parse the moov box so we can detect the timescales for each track. // Parse the moov box so we can detect the timescales for each track.
let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?; let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?;
// Create a producer to populate the tracks. let mut tracks = HashMap::new();
let mut tracks = Producer::<Track>::new();
let broadcast = Broadcast {
tracks: tracks.subscribe(),
};
// Create the init track // Create the init track
let init_track = Self::create_init_track(init); let init_track = Self::create_init_track(init);
tracks.push(init_track); tracks.insert("catalog".to_string(), init_track);
// Create a map with the current segment for each track. // Create a map with the current segment for each track.
// NOTE: We don't add the init track to this, since it's not part of the MP4. // NOTE: We don't add the init track to this, since it's not part of the MP4.
let mut lookup = HashMap::new(); let mut sources = HashMap::new();
for trak in &moov.traks { for trak in &moov.traks {
let track_id = trak.tkhd.track_id; let track_id = trak.tkhd.track_id;
@ -68,20 +64,29 @@ impl Source {
let segments = Producer::<Segment>::new(); let segments = Producer::<Segment>::new();
tracks.push(Track { // Insert the subscribable track for consumerts.
id: track_id, // The track_name is just the integer track ID.
segments: segments.subscribe(), let track_name = track_id.to_string();
}); tracks.insert(
track_name,
Track {
segments: segments.subscribe(),
},
);
// Store the track publisher in a map so we can update it later. // Store the track publisher in a map so we can update it later.
let track = SourceTrack::new(segments, timescale); let source = SourceTrack::new(segments, timescale);
lookup.insert(track_id, track); sources.insert(track_id, source);
} }
let broadcast = Broadcast {
tracks: Arc::new(tracks),
};
Ok(Self { Ok(Self {
reader, reader,
broadcast, broadcast,
tracks: lookup, tracks: sources,
}) })
} }
@ -98,7 +103,6 @@ impl Source {
}); });
Track { Track {
id: 0xff,
segments: segments.subscribe(), segments: segments.subscribe(),
} }
} }

View File

@ -121,7 +121,7 @@ impl<T: Clone> Subscriber<T> {
// Return None if we've consumed all entries and the queue is closed. // Return None if we've consumed all entries and the queue is closed.
None None
} else { } else {
panic!("impossible subscriber state") unreachable!("impossible subscriber state")
} }
} }
} }

View File

@ -1,34 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Default)]
pub struct Message {
pub init: Option<Init>,
pub segment: Option<Segment>,
}
#[derive(Serialize, Deserialize)]
pub struct Init {}
#[derive(Serialize, Deserialize)]
pub struct Segment {
pub track_id: u32,
}
impl Message {
pub fn new() -> Self {
Default::default()
}
pub fn serialize(&self) -> anyhow::Result<Vec<u8>> {
let str = serde_json::to_string(self)?;
let bytes = str.as_bytes();
let size = bytes.len() + 8;
let mut out = Vec::with_capacity(size);
out.extend_from_slice(&(size as u32).to_be_bytes());
out.extend_from_slice(b"warp");
out.extend_from_slice(bytes);
Ok(out)
}
}

View File

@ -1,8 +0,0 @@
mod message;
mod server;
mod session;
pub use server::{Server, ServerConfig};
// Reduce the amount of typing
type WebTransportSession = h3_webtransport::server::WebTransportSession<h3_quinn::Connection, bytes::Bytes>;

View File

@ -1,115 +0,0 @@
use crate::media;
use anyhow::Context;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
use super::WebTransportSession;
use super::message;
#[derive(Clone)]
pub struct Session {
// The underlying transport session
transport: Arc<WebTransportSession>,
}
impl Session {
pub fn new(transport: WebTransportSession) -> Self {
let transport = Arc::new(transport);
Self { transport }
}
pub async fn serve_broadcast(&self, mut broadcast: media::Broadcast) -> anyhow::Result<()> {
let mut tasks = JoinSet::new();
let mut done = false;
loop {
tokio::select! {
// Accept new tracks added to the broadcast.
track = broadcast.tracks.next(), if !done => {
match track {
Some(track) => {
let session = self.clone();
tasks.spawn(async move {
session.serve_track(track).await
});
},
None => done = true,
}
},
// Poll any pending tracks until they exit.
res = tasks.join_next(), if !tasks.is_empty() => {
let res = res.context("no tracks running")?;
let res = res.context("failed to run track")?;
res.context("failed to serve track")?;
},
else => return Ok(()),
}
}
}
pub async fn serve_track(&self, mut track: media::Track) -> anyhow::Result<()> {
let mut tasks = JoinSet::new();
let mut done = false;
loop {
tokio::select! {
// Accept new tracks added to the broadcast.
segment = track.segments.next(), if !done => {
match segment {
Some(segment) => {
let track = track.clone();
let session = self.clone();
tasks.spawn(async move {
session.serve_segment(track, segment).await
});
},
None => done = true,
}
},
// Poll any pending segments until they exit.
res = tasks.join_next(), if !tasks.is_empty() => {
let res = res.context("no tasks running")?;
let res = res.context("failed to run segment")?;
res.context("failed serve segment")?
},
else => return Ok(()),
}
}
}
pub async fn serve_segment(&self, track: media::Track, mut segment: media::Segment) -> anyhow::Result<()> {
let mut stream = self.transport.open_uni(self.transport.session_id()).await?;
// TODO support prioirty
// stream.set_priority(0);
// Encode a JSON header indicating this is a new segment.
let mut message: message::Message = message::Message::new();
// TODO combine init and segment messages into one.
if track.id == 0xff {
message.init = Some(message::Init {});
} else {
message.segment = Some(message::Segment { track_id: track.id });
}
// Write the JSON header.
let data = message.serialize()?;
stream.write_all(data.as_slice()).await?;
// Write each fragment as they are available.
while let Some(fragment) = segment.fragments.next().await {
stream.write_all(fragment.as_slice()).await?;
}
// NOTE: stream is automatically closed when dropped
Ok(())
}
}