Switch to webtransport-quinn (#44)
This commit is contained in:
parent
a2dd281126
commit
52d3fc81be
|
@ -2,6 +2,21 @@
|
||||||
# It is not intended for manual editing.
|
# It is not intended for manual editing.
|
||||||
version = 3
|
version = 3
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "addr2line"
|
||||||
|
version = "0.20.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f4fa78e18c64fce05e902adecd7a5eed15a5e0a3439f7b0e169f0252214865e3"
|
||||||
|
dependencies = [
|
||||||
|
"gimli",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "adler"
|
||||||
|
version = "1.0.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "aho-corasick"
|
name = "aho-corasick"
|
||||||
version = "1.0.2"
|
version = "1.0.2"
|
||||||
|
@ -66,6 +81,113 @@ version = "1.0.71"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8"
|
checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-channel"
|
||||||
|
version = "1.9.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
|
||||||
|
dependencies = [
|
||||||
|
"concurrent-queue",
|
||||||
|
"event-listener",
|
||||||
|
"futures-core",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-executor"
|
||||||
|
version = "1.5.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb"
|
||||||
|
dependencies = [
|
||||||
|
"async-lock",
|
||||||
|
"async-task",
|
||||||
|
"concurrent-queue",
|
||||||
|
"fastrand",
|
||||||
|
"futures-lite",
|
||||||
|
"slab",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-global-executor"
|
||||||
|
version = "2.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776"
|
||||||
|
dependencies = [
|
||||||
|
"async-channel",
|
||||||
|
"async-executor",
|
||||||
|
"async-io",
|
||||||
|
"async-lock",
|
||||||
|
"blocking",
|
||||||
|
"futures-lite",
|
||||||
|
"once_cell",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-io"
|
||||||
|
version = "1.13.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af"
|
||||||
|
dependencies = [
|
||||||
|
"async-lock",
|
||||||
|
"autocfg",
|
||||||
|
"cfg-if",
|
||||||
|
"concurrent-queue",
|
||||||
|
"futures-lite",
|
||||||
|
"log",
|
||||||
|
"parking",
|
||||||
|
"polling",
|
||||||
|
"rustix",
|
||||||
|
"slab",
|
||||||
|
"socket2 0.4.9",
|
||||||
|
"waker-fn",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-lock"
|
||||||
|
version = "2.7.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7"
|
||||||
|
dependencies = [
|
||||||
|
"event-listener",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-std"
|
||||||
|
version = "1.12.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d"
|
||||||
|
dependencies = [
|
||||||
|
"async-channel",
|
||||||
|
"async-global-executor",
|
||||||
|
"async-io",
|
||||||
|
"async-lock",
|
||||||
|
"crossbeam-utils",
|
||||||
|
"futures-channel",
|
||||||
|
"futures-core",
|
||||||
|
"futures-io",
|
||||||
|
"futures-lite",
|
||||||
|
"gloo-timers",
|
||||||
|
"kv-log-macro",
|
||||||
|
"log",
|
||||||
|
"memchr",
|
||||||
|
"once_cell",
|
||||||
|
"pin-project-lite",
|
||||||
|
"pin-utils",
|
||||||
|
"slab",
|
||||||
|
"wasm-bindgen-futures",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-task"
|
||||||
|
version = "4.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "atomic-waker"
|
||||||
|
version = "1.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "atty"
|
name = "atty"
|
||||||
version = "0.2.14"
|
version = "0.2.14"
|
||||||
|
@ -83,6 +205,21 @@ version = "1.1.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "backtrace"
|
||||||
|
version = "0.3.68"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12"
|
||||||
|
dependencies = [
|
||||||
|
"addr2line",
|
||||||
|
"cc",
|
||||||
|
"cfg-if",
|
||||||
|
"libc",
|
||||||
|
"miniz_oxide",
|
||||||
|
"object",
|
||||||
|
"rustc-demangle",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "base64"
|
name = "base64"
|
||||||
version = "0.13.1"
|
version = "0.13.1"
|
||||||
|
@ -110,6 +247,21 @@ dependencies = [
|
||||||
"generic-array",
|
"generic-array",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "blocking"
|
||||||
|
version = "1.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65"
|
||||||
|
dependencies = [
|
||||||
|
"async-channel",
|
||||||
|
"async-lock",
|
||||||
|
"async-task",
|
||||||
|
"atomic-waker",
|
||||||
|
"fastrand",
|
||||||
|
"futures-lite",
|
||||||
|
"log",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bumpalo"
|
name = "bumpalo"
|
||||||
version = "3.13.0"
|
version = "3.13.0"
|
||||||
|
@ -188,6 +340,15 @@ version = "1.0.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
|
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "concurrent-queue"
|
||||||
|
version = "2.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c"
|
||||||
|
dependencies = [
|
||||||
|
"crossbeam-utils",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "core-foundation"
|
name = "core-foundation"
|
||||||
version = "0.9.3"
|
version = "0.9.3"
|
||||||
|
@ -213,6 +374,15 @@ dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "crossbeam-utils"
|
||||||
|
version = "0.8.16"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crypto-common"
|
name = "crypto-common"
|
||||||
version = "0.1.6"
|
version = "0.1.6"
|
||||||
|
@ -276,6 +446,12 @@ dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "event-listener"
|
||||||
|
version = "2.5.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fastrand"
|
name = "fastrand"
|
||||||
version = "1.9.0"
|
version = "1.9.0"
|
||||||
|
@ -348,6 +524,21 @@ version = "0.3.28"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
|
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-lite"
|
||||||
|
version = "1.13.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
|
||||||
|
dependencies = [
|
||||||
|
"fastrand",
|
||||||
|
"futures-core",
|
||||||
|
"futures-io",
|
||||||
|
"memchr",
|
||||||
|
"parking",
|
||||||
|
"pin-project-lite",
|
||||||
|
"waker-fn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-macro"
|
name = "futures-macro"
|
||||||
version = "0.3.28"
|
version = "0.3.28"
|
||||||
|
@ -410,6 +601,24 @@ dependencies = [
|
||||||
"wasi",
|
"wasi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "gimli"
|
||||||
|
version = "0.27.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "gloo-timers"
|
||||||
|
version = "0.2.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c"
|
||||||
|
dependencies = [
|
||||||
|
"futures-channel",
|
||||||
|
"futures-core",
|
||||||
|
"js-sys",
|
||||||
|
"wasm-bindgen",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "h2"
|
name = "h2"
|
||||||
version = "0.3.19"
|
version = "0.3.19"
|
||||||
|
@ -429,48 +638,6 @@ dependencies = [
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "h3"
|
|
||||||
version = "0.0.2"
|
|
||||||
source = "git+https://github.com/hyperium/h3?branch=master#3ef7c1a37b635e8446322d8f8d3a68580a208ad8"
|
|
||||||
dependencies = [
|
|
||||||
"bytes",
|
|
||||||
"fastrand",
|
|
||||||
"futures-util",
|
|
||||||
"http",
|
|
||||||
"pin-project-lite",
|
|
||||||
"tokio",
|
|
||||||
"tracing",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "h3-quinn"
|
|
||||||
version = "0.0.3"
|
|
||||||
source = "git+https://github.com/hyperium/h3?branch=master#3ef7c1a37b635e8446322d8f8d3a68580a208ad8"
|
|
||||||
dependencies = [
|
|
||||||
"bytes",
|
|
||||||
"futures",
|
|
||||||
"h3",
|
|
||||||
"quinn",
|
|
||||||
"quinn-proto",
|
|
||||||
"tokio",
|
|
||||||
"tokio-util",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "h3-webtransport"
|
|
||||||
version = "0.1.0"
|
|
||||||
source = "git+https://github.com/hyperium/h3?branch=master#3ef7c1a37b635e8446322d8f8d3a68580a208ad8"
|
|
||||||
dependencies = [
|
|
||||||
"bytes",
|
|
||||||
"futures-util",
|
|
||||||
"h3",
|
|
||||||
"http",
|
|
||||||
"pin-project-lite",
|
|
||||||
"tokio",
|
|
||||||
"tracing",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hashbrown"
|
name = "hashbrown"
|
||||||
version = "0.12.3"
|
version = "0.12.3"
|
||||||
|
@ -669,6 +836,15 @@ dependencies = [
|
||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "kv-log-macro"
|
||||||
|
version = "1.0.7"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
|
||||||
|
dependencies = [
|
||||||
|
"log",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libc"
|
name = "libc"
|
||||||
version = "0.2.146"
|
version = "0.2.146"
|
||||||
|
@ -696,6 +872,9 @@ name = "log"
|
||||||
version = "0.4.19"
|
version = "0.4.19"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
|
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
|
||||||
|
dependencies = [
|
||||||
|
"value-bag",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "memchr"
|
name = "memchr"
|
||||||
|
@ -719,6 +898,15 @@ dependencies = [
|
||||||
"unicase",
|
"unicase",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "miniz_oxide"
|
||||||
|
version = "0.7.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
|
||||||
|
dependencies = [
|
||||||
|
"adler",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mio"
|
name = "mio"
|
||||||
version = "0.8.8"
|
version = "0.8.8"
|
||||||
|
@ -740,6 +928,7 @@ dependencies = [
|
||||||
"hex",
|
"hex",
|
||||||
"log",
|
"log",
|
||||||
"moq-transport",
|
"moq-transport",
|
||||||
|
"moq-transport-quinn",
|
||||||
"moq-warp",
|
"moq-warp",
|
||||||
"quinn",
|
"quinn",
|
||||||
"ring",
|
"ring",
|
||||||
|
@ -747,6 +936,7 @@ dependencies = [
|
||||||
"rustls-pemfile",
|
"rustls-pemfile",
|
||||||
"tokio",
|
"tokio",
|
||||||
"warp",
|
"warp",
|
||||||
|
"webtransport-quinn",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -764,15 +954,13 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"bytes",
|
"bytes",
|
||||||
"h3",
|
|
||||||
"h3-quinn",
|
|
||||||
"h3-webtransport",
|
|
||||||
"http",
|
"http",
|
||||||
"log",
|
"log",
|
||||||
"moq-transport",
|
"moq-transport",
|
||||||
"quinn",
|
"quinn",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"webtransport-quinn",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -876,6 +1064,15 @@ dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "object"
|
||||||
|
version = "0.31.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8bda667d9f2b5051b8833f59f3bf748b28ef54f850f4fcb389a252aa383866d1"
|
||||||
|
dependencies = [
|
||||||
|
"memchr",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "once_cell"
|
name = "once_cell"
|
||||||
version = "1.18.0"
|
version = "1.18.0"
|
||||||
|
@ -888,6 +1085,12 @@ version = "0.1.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "parking"
|
||||||
|
version = "2.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "parking_lot"
|
name = "parking_lot"
|
||||||
version = "0.12.1"
|
version = "0.12.1"
|
||||||
|
@ -949,6 +1152,22 @@ version = "0.1.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "polling"
|
||||||
|
version = "2.8.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce"
|
||||||
|
dependencies = [
|
||||||
|
"autocfg",
|
||||||
|
"bitflags",
|
||||||
|
"cfg-if",
|
||||||
|
"concurrent-queue",
|
||||||
|
"libc",
|
||||||
|
"log",
|
||||||
|
"pin-project-lite",
|
||||||
|
"windows-sys 0.48.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ppv-lite86"
|
name = "ppv-lite86"
|
||||||
version = "0.2.17"
|
version = "0.2.17"
|
||||||
|
@ -971,7 +1190,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "21252f1c0fc131f1b69182db8f34837e8a69737b8251dff75636a9be0518c324"
|
checksum = "21252f1c0fc131f1b69182db8f34837e8a69737b8251dff75636a9be0518c324"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-io",
|
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"quinn-proto",
|
"quinn-proto",
|
||||||
"quinn-udp",
|
"quinn-udp",
|
||||||
|
@ -1093,6 +1311,12 @@ dependencies = [
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rustc-demangle"
|
||||||
|
version = "0.1.23"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustc-hash"
|
name = "rustc-hash"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
|
@ -1401,11 +1625,12 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio"
|
name = "tokio"
|
||||||
version = "1.28.2"
|
version = "1.29.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2"
|
checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"autocfg",
|
"autocfg",
|
||||||
|
"backtrace",
|
||||||
"bytes",
|
"bytes",
|
||||||
"libc",
|
"libc",
|
||||||
"mio",
|
"mio",
|
||||||
|
@ -1606,12 +1831,24 @@ version = "0.2.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
|
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "value-bag"
|
||||||
|
version = "1.4.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "version_check"
|
name = "version_check"
|
||||||
version = "0.9.4"
|
version = "0.9.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
|
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "waker-fn"
|
||||||
|
version = "1.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "want"
|
name = "want"
|
||||||
version = "0.3.1"
|
version = "0.3.1"
|
||||||
|
@ -1684,6 +1921,18 @@ dependencies = [
|
||||||
"wasm-bindgen-shared",
|
"wasm-bindgen-shared",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wasm-bindgen-futures"
|
||||||
|
version = "0.4.37"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"js-sys",
|
||||||
|
"wasm-bindgen",
|
||||||
|
"web-sys",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "wasm-bindgen-macro"
|
name = "wasm-bindgen-macro"
|
||||||
version = "0.2.87"
|
version = "0.2.87"
|
||||||
|
@ -1733,6 +1982,46 @@ dependencies = [
|
||||||
"untrusted",
|
"untrusted",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "webtransport-generic"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0ba4583e96bb0ef08142f868bf0d28f90211eced56a473768ee27446864a2310"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"log",
|
||||||
|
"thiserror",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "webtransport-proto"
|
||||||
|
version = "0.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "21fefb5728651d507b444659853b47896116179ea8fd0348d02de080250892c7"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"http",
|
||||||
|
"thiserror",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "webtransport-quinn"
|
||||||
|
version = "0.4.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c645a48f4bac5ce504cef2dd02b373f8b8a2a7de9a72f59395a54799958f3cf2"
|
||||||
|
dependencies = [
|
||||||
|
"async-std",
|
||||||
|
"bytes",
|
||||||
|
"futures",
|
||||||
|
"http",
|
||||||
|
"quinn",
|
||||||
|
"quinn-proto",
|
||||||
|
"thiserror",
|
||||||
|
"tokio",
|
||||||
|
"webtransport-generic",
|
||||||
|
"webtransport-proto",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "winapi"
|
name = "winapi"
|
||||||
version = "0.3.9"
|
version = "0.3.9"
|
||||||
|
|
|
@ -16,10 +16,12 @@ categories = [ "multimedia", "network-programming", "web-programming" ]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
moq-transport = { path = "../moq-transport" }
|
moq-transport = { path = "../moq-transport" }
|
||||||
|
moq-transport-quinn = { path = "../moq-transport-quinn" }
|
||||||
moq-warp = { path = "../moq-warp" }
|
moq-warp = { path = "../moq-warp" }
|
||||||
|
|
||||||
# QUIC
|
# QUIC
|
||||||
quinn = "0.10"
|
quinn = "0.10"
|
||||||
|
webtransport-quinn = "0.4"
|
||||||
|
|
||||||
# Crypto
|
# Crypto
|
||||||
ring = "0.16.20"
|
ring = "0.16.20"
|
||||||
|
|
|
@ -7,6 +7,9 @@ use warp::Filter;
|
||||||
|
|
||||||
use moq_warp::{relay, source};
|
use moq_warp::{relay, source};
|
||||||
|
|
||||||
|
mod server;
|
||||||
|
use server::*;
|
||||||
|
|
||||||
/// Search for a pattern in a file and display the lines that contain it.
|
/// Search for a pattern in a file and display the lines that contain it.
|
||||||
#[derive(Parser, Clone)]
|
#[derive(Parser, Clone)]
|
||||||
struct Cli {
|
struct Cli {
|
||||||
|
@ -45,14 +48,14 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.context("failed to announce file source")?;
|
.context("failed to announce file source")?;
|
||||||
|
|
||||||
// Create a server to actually serve the media
|
// Create a server to actually serve the media
|
||||||
let config = relay::ServerConfig {
|
let config = ServerConfig {
|
||||||
addr: args.addr,
|
addr: args.addr,
|
||||||
cert: args.cert,
|
cert: args.cert,
|
||||||
key: args.key,
|
key: args.key,
|
||||||
broker,
|
broker,
|
||||||
};
|
};
|
||||||
|
|
||||||
let server = relay::Server::new(config).context("failed to create server")?;
|
let server = Server::new(config).context("failed to create server")?;
|
||||||
|
|
||||||
// Run all of the above
|
// Run all of the above
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use super::{broker, Session};
|
use moq_warp::relay::broker;
|
||||||
|
|
||||||
use std::{fs, io, net, path, sync, time};
|
use std::{fs, io, net, path, sync, time};
|
||||||
|
|
||||||
|
@ -7,21 +7,19 @@ use anyhow::Context;
|
||||||
use tokio::task::JoinSet;
|
use tokio::task::JoinSet;
|
||||||
|
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
// The MoQ transport server.
|
server: quinn::Endpoint,
|
||||||
server: moq_transport_quinn::Server,
|
|
||||||
|
|
||||||
// The media sources.
|
// The media sources.
|
||||||
broker: broker::Broadcasts,
|
broker: broker::Broadcasts,
|
||||||
|
|
||||||
// Sessions actively being run.
|
// The active connections.
|
||||||
tasks: JoinSet<anyhow::Result<()>>,
|
conns: JoinSet<anyhow::Result<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ServerConfig {
|
pub struct ServerConfig {
|
||||||
pub addr: net::SocketAddr,
|
pub addr: net::SocketAddr,
|
||||||
pub cert: path::PathBuf,
|
pub cert: path::PathBuf,
|
||||||
pub key: path::PathBuf,
|
pub key: path::PathBuf,
|
||||||
|
|
||||||
pub broker: broker::Broadcasts,
|
pub broker: broker::Broadcasts,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,14 +51,7 @@ impl Server {
|
||||||
.with_single_cert(certs, key)?;
|
.with_single_cert(certs, key)?;
|
||||||
|
|
||||||
tls_config.max_early_data_size = u32::MAX;
|
tls_config.max_early_data_size = u32::MAX;
|
||||||
let alpn: Vec<Vec<u8>> = vec![
|
tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()];
|
||||||
b"h3".to_vec(),
|
|
||||||
b"h3-32".to_vec(),
|
|
||||||
b"h3-31".to_vec(),
|
|
||||||
b"h3-30".to_vec(),
|
|
||||||
b"h3-29".to_vec(),
|
|
||||||
];
|
|
||||||
tls_config.alpn_protocols = alpn;
|
|
||||||
|
|
||||||
let mut server_config = quinn::ServerConfig::with_crypto(sync::Arc::new(tls_config));
|
let mut server_config = quinn::ServerConfig::with_crypto(sync::Arc::new(tls_config));
|
||||||
|
|
||||||
|
@ -74,32 +65,54 @@ impl Server {
|
||||||
let server = quinn::Endpoint::server(server_config, config.addr)?;
|
let server = quinn::Endpoint::server(server_config, config.addr)?;
|
||||||
let broker = config.broker;
|
let broker = config.broker;
|
||||||
|
|
||||||
let server = moq_transport_quinn::Server::new(server);
|
let conns = JoinSet::new();
|
||||||
let tasks = JoinSet::new();
|
|
||||||
|
|
||||||
Ok(Self { server, broker, tasks })
|
Ok(Self { server, broker, conns })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(mut self) -> anyhow::Result<()> {
|
pub async fn run(mut self) -> anyhow::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
res = self.server.accept() => {
|
res = self.server.accept() => {
|
||||||
let session = res.context("failed to accept connection")?;
|
let conn = res.context("failed to accept QUIC connection")?;
|
||||||
let broker = self.broker.clone();
|
let broker = self.broker.clone();
|
||||||
|
|
||||||
self.tasks.spawn(async move {
|
self.conns.spawn(async move { Self::handle(conn, broker).await });
|
||||||
let session: Session = Session::accept(session, broker).await?;
|
|
||||||
session.run().await
|
|
||||||
});
|
|
||||||
},
|
},
|
||||||
res = self.tasks.join_next(), if !self.tasks.is_empty() => {
|
res = self.conns.join_next(), if !self.conns.is_empty() => {
|
||||||
let res = res.expect("no tasks").expect("task aborted");
|
let res = res.expect("no tasks").expect("task aborted");
|
||||||
|
|
||||||
if let Err(err) = res {
|
if let Err(err) = res {
|
||||||
log::error!("session terminated: {:?}", err);
|
log::error!("connection terminated: {:?}", err);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle(conn: quinn::Connecting, broker: broker::Broadcasts) -> anyhow::Result<()> {
|
||||||
|
// Wait for the QUIC connection to be established.
|
||||||
|
let conn = conn.await.context("failed to establish QUIC connection")?;
|
||||||
|
|
||||||
|
// Wait for the CONNECT request.
|
||||||
|
let request = webtransport_quinn::accept(conn)
|
||||||
|
.await
|
||||||
|
.context("failed to receive WebTransport request")?;
|
||||||
|
|
||||||
|
// TODO parse the request URI
|
||||||
|
|
||||||
|
// Accept the CONNECT request.
|
||||||
|
let session = request
|
||||||
|
.ok()
|
||||||
|
.await
|
||||||
|
.context("failed to respond to WebTransport request")?;
|
||||||
|
|
||||||
|
// Perform the MoQ handshake.
|
||||||
|
let session = moq_transport_quinn::accept(session, moq_transport::Role::Both)
|
||||||
|
.await
|
||||||
|
.context("failed to perform MoQ handshake")?;
|
||||||
|
|
||||||
|
// Run the relay code.
|
||||||
|
let session = moq_warp::relay::Session::new(session, broker);
|
||||||
|
session.run().await
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -16,16 +16,11 @@ categories = [ "multimedia", "network-programming", "web-programming" ]
|
||||||
[dependencies]
|
[dependencies]
|
||||||
moq-transport = { path = "../moq-transport" }
|
moq-transport = { path = "../moq-transport" }
|
||||||
|
|
||||||
# WebTransport support: TODO pin a version when released
|
|
||||||
h3 = { git = "https://github.com/hyperium/h3", branch = "master" }
|
|
||||||
h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" }
|
|
||||||
h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" }
|
|
||||||
quinn = "0.10"
|
quinn = "0.10"
|
||||||
http = "0.2"
|
http = "0.2"
|
||||||
|
webtransport-quinn = "0.4.1"
|
||||||
tokio = { version = "1.27", features = ["macros"] }
|
tokio = { version = "1.27", features = ["macros"] }
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
|
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
anyhow = "1.0.70"
|
anyhow = "1.0.70"
|
||||||
thiserror = "1.0.21"
|
thiserror = "1.0.21"
|
||||||
|
|
|
@ -1,51 +1,24 @@
|
||||||
|
use anyhow::Context;
|
||||||
use moq_transport::{Decode, DecodeError, Encode, Message};
|
use moq_transport::{Decode, DecodeError, Encode, Message};
|
||||||
|
|
||||||
use bytes::{Buf, Bytes, BytesMut};
|
use bytes::{Buf, BufMut, BytesMut};
|
||||||
|
|
||||||
use h3::quic::BidiStream;
|
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use webtransport_quinn::{RecvStream, SendStream};
|
||||||
|
|
||||||
pub struct Control {
|
pub struct SendControl {
|
||||||
sender: ControlSend,
|
stream: SendStream,
|
||||||
recver: ControlRecv,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Control {
|
|
||||||
pub(crate) fn new(stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<Bytes>, Bytes>) -> Self {
|
|
||||||
let (sender, recver) = stream.split();
|
|
||||||
let sender = ControlSend::new(sender);
|
|
||||||
let recver = ControlRecv::new(recver);
|
|
||||||
|
|
||||||
Self { sender, recver }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn split(self) -> (ControlSend, ControlRecv) {
|
|
||||||
(self.sender, self.recver)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
|
|
||||||
self.sender.send(msg).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn recv(&mut self) -> anyhow::Result<Message> {
|
|
||||||
self.recver.recv().await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ControlSend {
|
|
||||||
stream: h3_webtransport::stream::SendStream<h3_quinn::SendStream<Bytes>, Bytes>,
|
|
||||||
buf: BytesMut, // reuse a buffer to encode messages.
|
buf: BytesMut, // reuse a buffer to encode messages.
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ControlSend {
|
impl SendControl {
|
||||||
pub fn new(inner: h3_webtransport::stream::SendStream<h3_quinn::SendStream<Bytes>, Bytes>) -> Self {
|
pub fn new(stream: SendStream) -> Self {
|
||||||
Self {
|
Self {
|
||||||
buf: BytesMut::new(),
|
buf: BytesMut::new(),
|
||||||
stream: inner,
|
stream,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,7 +47,7 @@ impl ControlSend {
|
||||||
// There's no equivalent for receiving since only one thread should be receiving at a time.
|
// There's no equivalent for receiving since only one thread should be receiving at a time.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ControlShared {
|
pub struct ControlShared {
|
||||||
stream: Arc<Mutex<ControlSend>>,
|
stream: Arc<Mutex<SendControl>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ControlShared {
|
impl ControlShared {
|
||||||
|
@ -84,16 +57,16 @@ impl ControlShared {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ControlRecv {
|
pub struct RecvControl {
|
||||||
stream: h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, Bytes>,
|
stream: RecvStream,
|
||||||
buf: BytesMut, // data we've read but haven't fully decoded yet
|
buf: BytesMut, // data we've read but haven't fully decoded yet
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ControlRecv {
|
impl RecvControl {
|
||||||
pub fn new(inner: h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, Bytes>) -> Self {
|
pub fn new(stream: RecvStream) -> Self {
|
||||||
Self {
|
Self {
|
||||||
buf: BytesMut::new(),
|
buf: BytesMut::new(),
|
||||||
stream: inner,
|
stream,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,7 +86,8 @@ impl ControlRecv {
|
||||||
}
|
}
|
||||||
Err(DecodeError::UnexpectedEnd) => {
|
Err(DecodeError::UnexpectedEnd) => {
|
||||||
// The decode failed, so we need to append more data.
|
// The decode failed, so we need to append more data.
|
||||||
self.stream.read_buf(&mut self.buf).await?;
|
let chunk = self.stream.read_chunk(1024, true).await?.context("stream closed")?;
|
||||||
|
self.buf.put(chunk.bytes);
|
||||||
}
|
}
|
||||||
Err(e) => return Err(e.into()),
|
Err(e) => return Err(e.into()),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
mod control;
|
mod control;
|
||||||
mod object;
|
mod object;
|
||||||
mod server;
|
mod session;
|
||||||
|
mod stream;
|
||||||
|
|
||||||
pub use control::*;
|
pub use control::*;
|
||||||
pub use object::*;
|
pub use object::*;
|
||||||
pub use server::*;
|
pub use session::*;
|
||||||
|
pub use stream::*;
|
||||||
|
|
|
@ -1,136 +1,147 @@
|
||||||
|
use std::{collections::BinaryHeap, io::Cursor, sync::Arc};
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use bytes::{Buf, Bytes, BytesMut};
|
use bytes::BytesMut;
|
||||||
use moq_transport::{Decode, DecodeError, Encode, Object};
|
use moq_transport::{Decode, DecodeError, Encode, Object};
|
||||||
use std::{io::Cursor, sync::Arc};
|
|
||||||
|
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::task::JoinSet;
|
||||||
|
use tokio::{io::AsyncBufReadExt, sync::Mutex};
|
||||||
|
use webtransport_quinn::Session;
|
||||||
|
|
||||||
// TODO support clients
|
use crate::{RecvStream, SendStream, SendStreamOrder};
|
||||||
type WebTransportSession = h3_webtransport::server::WebTransportSession<h3_quinn::Connection, Bytes>;
|
|
||||||
|
|
||||||
// Reduce some typing
|
|
||||||
pub type SendStream = h3_webtransport::stream::SendStream<h3_quinn::SendStream<Bytes>, Bytes>;
|
|
||||||
pub type RecvStream = h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, Bytes>;
|
|
||||||
|
|
||||||
pub struct Objects {
|
|
||||||
send: SendObjects,
|
|
||||||
recv: RecvObjects,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Objects {
|
|
||||||
pub fn new(session: Arc<WebTransportSession>) -> Self {
|
|
||||||
let send = SendObjects::new(session.clone());
|
|
||||||
let recv = RecvObjects::new(session);
|
|
||||||
Self { send, recv }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn split(self) -> (SendObjects, RecvObjects) {
|
|
||||||
(self.send, self.recv)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> {
|
|
||||||
self.recv.recv().await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn send(&mut self, header: Object) -> anyhow::Result<SendStream> {
|
|
||||||
self.send.send(header).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Allow this to be cloned so we can have multiple senders.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct SendObjects {
|
pub struct SendObjects {
|
||||||
session: Arc<WebTransportSession>,
|
// This is a tokio mutex since we need to lock across await boundaries.
|
||||||
|
inner: Arc<Mutex<SendObjectsInner>>,
|
||||||
// A reusable buffer for encoding messages.
|
|
||||||
buf: BytesMut,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SendObjects {
|
impl SendObjects {
|
||||||
pub fn new(session: Arc<WebTransportSession>) -> Self {
|
pub fn new(session: Session) -> Self {
|
||||||
|
let inner = SendObjectsInner::new(session);
|
||||||
|
Self {
|
||||||
|
inner: Arc::new(Mutex::new(inner)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn open(&mut self, header: Object) -> anyhow::Result<SendStream> {
|
||||||
|
let mut inner = self.inner.lock().await;
|
||||||
|
inner.open(header).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SendObjectsInner {
|
||||||
|
session: Session,
|
||||||
|
|
||||||
|
// Quinn supports a i32 for priority, but the wire format is a u64.
|
||||||
|
// Our work around is to keep a list of streams in priority order and use the index as the priority.
|
||||||
|
// This involves more work, so TODO either increase the Quinn size or reduce the wire size.
|
||||||
|
ordered: BinaryHeap<SendStreamOrder>,
|
||||||
|
ordered_swap: BinaryHeap<SendStreamOrder>, // reuse memory to avoid allocations
|
||||||
|
|
||||||
|
// A reusable buffer for encoding headers.
|
||||||
|
// TODO figure out how to use BufMut on the stack and remove this.
|
||||||
|
buf: BytesMut,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SendObjectsInner {
|
||||||
|
fn new(session: Session) -> Self {
|
||||||
Self {
|
Self {
|
||||||
session,
|
session,
|
||||||
|
ordered: BinaryHeap::new(),
|
||||||
|
ordered_swap: BinaryHeap::new(),
|
||||||
buf: BytesMut::new(),
|
buf: BytesMut::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send(&mut self, header: Object) -> anyhow::Result<SendStream> {
|
pub async fn open(&mut self, header: Object) -> anyhow::Result<SendStream> {
|
||||||
|
let stream = self.session.open_uni().await.context("failed to open uni stream")?;
|
||||||
|
let (mut stream, priority) = SendStream::with_order(stream, header.send_order.into_inner());
|
||||||
|
|
||||||
|
// Add the priority to our existing list.
|
||||||
|
self.ordered.push(priority);
|
||||||
|
|
||||||
|
// Loop through the list and update the priorities of any still active streams.
|
||||||
|
let mut index = 0;
|
||||||
|
while let Some(stream) = self.ordered.pop() {
|
||||||
|
if stream.update(index).is_ok() {
|
||||||
|
// Add the stream to the new list so it'll be in sorted order.
|
||||||
|
self.ordered_swap.push(stream);
|
||||||
|
index += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Swap the lists so we can reuse the memory.
|
||||||
|
std::mem::swap(&mut self.ordered, &mut self.ordered_swap);
|
||||||
|
|
||||||
|
// Encode and write the stream header.
|
||||||
|
// TODO do this in SendStream so we don't hold the lock.
|
||||||
|
// Otherwise,
|
||||||
self.buf.clear();
|
self.buf.clear();
|
||||||
header.encode(&mut self.buf).unwrap();
|
header.encode(&mut self.buf).unwrap();
|
||||||
|
stream.write_all(&self.buf).await.context("failed to write header")?;
|
||||||
let mut stream = self
|
|
||||||
.session
|
|
||||||
.open_uni(self.session.session_id())
|
|
||||||
.await
|
|
||||||
.context("failed to open uni stream")?;
|
|
||||||
|
|
||||||
// TODO support select! without making a new stream.
|
|
||||||
stream.write_all(&self.buf).await?;
|
|
||||||
|
|
||||||
Ok(stream)
|
Ok(stream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Clone for SendObjects {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
Self {
|
|
||||||
session: self.session.clone(),
|
|
||||||
buf: BytesMut::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Not clone, so we don't accidentally have two listners.
|
// Not clone, so we don't accidentally have two listners.
|
||||||
pub struct RecvObjects {
|
pub struct RecvObjects {
|
||||||
session: Arc<WebTransportSession>,
|
session: Session,
|
||||||
|
|
||||||
// A uni stream that's been accepted but not fully read from yet.
|
// Streams that we've accepted but haven't read the header from yet.
|
||||||
stream: Option<RecvStream>,
|
streams: JoinSet<anyhow::Result<(Object, RecvStream)>>,
|
||||||
|
|
||||||
// Data that we've read but haven't formed a full message yet.
|
|
||||||
buf: BytesMut,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RecvObjects {
|
impl RecvObjects {
|
||||||
pub fn new(session: Arc<WebTransportSession>) -> Self {
|
pub fn new(session: Session) -> Self {
|
||||||
Self {
|
Self {
|
||||||
session,
|
session,
|
||||||
stream: None,
|
streams: JoinSet::new(),
|
||||||
buf: BytesMut::new(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> {
|
pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> {
|
||||||
// Make sure any state is saved across await boundaries so this works with select!
|
loop {
|
||||||
|
tokio::select! {
|
||||||
let stream = match self.stream.as_mut() {
|
res = self.session.accept_uni() => {
|
||||||
Some(stream) => stream,
|
let stream = res.context("failed to accept stream")?;
|
||||||
None => {
|
self.streams.spawn(async move { Self::read(stream).await });
|
||||||
let (_session_id, stream) = self
|
},
|
||||||
.session
|
res = self.streams.join_next(), if !self.streams.is_empty() => {
|
||||||
.accept_uni()
|
return res.unwrap().context("failed to run join set")?;
|
||||||
.await
|
|
||||||
.context("failed to accept uni stream")?
|
|
||||||
.context("no uni stream")?;
|
|
||||||
|
|
||||||
self.stream.insert(stream)
|
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read(stream: webtransport_quinn::RecvStream) -> anyhow::Result<(Object, RecvStream)> {
|
||||||
|
let mut stream = RecvStream::new(stream);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Read the contents of the buffer
|
// Read more data into the buffer.
|
||||||
let mut peek = Cursor::new(&self.buf);
|
let data = stream.fill_buf().await?;
|
||||||
|
if data.is_empty() {
|
||||||
|
anyhow::bail!("stream closed before reading header");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use a cursor to read the buffer and remember how much we read.
|
||||||
|
let mut read = Cursor::new(data);
|
||||||
|
|
||||||
|
let header = match Object::decode(&mut read) {
|
||||||
|
Ok(header) => header,
|
||||||
|
Err(DecodeError::UnexpectedEnd) => continue,
|
||||||
|
Err(err) => return Err(err.into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
// We parsed a full header, advance the cursor.
|
||||||
|
// The borrow checker requires these on separate lines.
|
||||||
|
let size = read.position() as usize;
|
||||||
|
stream.consume(size);
|
||||||
|
|
||||||
match Object::decode(&mut peek) {
|
|
||||||
Ok(header) => {
|
|
||||||
let stream = self.stream.take().unwrap();
|
|
||||||
self.buf.advance(peek.position() as usize);
|
|
||||||
return Ok((header, stream));
|
return Ok((header, stream));
|
||||||
}
|
}
|
||||||
Err(DecodeError::UnexpectedEnd) => {
|
|
||||||
// The decode failed, so we need to append more data.
|
|
||||||
stream.read_buf(&mut self.buf).await?;
|
|
||||||
}
|
|
||||||
Err(e) => return Err(e.into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,179 +0,0 @@
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use bytes::Bytes;
|
|
||||||
use tokio::task::JoinSet;
|
|
||||||
|
|
||||||
use moq_transport::{Message, SetupClient, SetupServer};
|
|
||||||
|
|
||||||
use super::{Control, Objects};
|
|
||||||
|
|
||||||
pub struct Server {
|
|
||||||
// The QUIC server, yielding new connections and sessions.
|
|
||||||
endpoint: quinn::Endpoint,
|
|
||||||
|
|
||||||
// A list of connections that are completing the WebTransport handshake.
|
|
||||||
handshake: JoinSet<anyhow::Result<Connect>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Server {
|
|
||||||
pub fn new(endpoint: quinn::Endpoint) -> Self {
|
|
||||||
let handshake = JoinSet::new();
|
|
||||||
Self { endpoint, handshake }
|
|
||||||
}
|
|
||||||
|
|
||||||
// Accept the next WebTransport session.
|
|
||||||
pub async fn accept(&mut self) -> anyhow::Result<Connect> {
|
|
||||||
loop {
|
|
||||||
tokio::select!(
|
|
||||||
// Accept the connection and start the WebTransport handshake.
|
|
||||||
conn = self.endpoint.accept() => {
|
|
||||||
let conn = conn.context("failed to accept connection")?;
|
|
||||||
self.handshake.spawn(async move {
|
|
||||||
Connecting::new(conn).accept().await
|
|
||||||
});
|
|
||||||
},
|
|
||||||
// Return any mostly finished WebTransport handshakes.
|
|
||||||
res = self.handshake.join_next(), if !self.handshake.is_empty() => {
|
|
||||||
let res = res.expect("no tasks").expect("task aborted");
|
|
||||||
match res {
|
|
||||||
Ok(session) => return Ok(session),
|
|
||||||
Err(err) => log::warn!("failed to accept session: {:?}", err),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Connecting {
|
|
||||||
conn: quinn::Connecting,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Connecting {
|
|
||||||
pub fn new(conn: quinn::Connecting) -> Self {
|
|
||||||
Self { conn }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn accept(self) -> anyhow::Result<Connect> {
|
|
||||||
let conn = self.conn.await.context("failed to accept h3 connection")?;
|
|
||||||
|
|
||||||
let mut conn = h3::server::builder()
|
|
||||||
.enable_webtransport(true)
|
|
||||||
.enable_connect(true)
|
|
||||||
.enable_datagram(true)
|
|
||||||
.max_webtransport_sessions(1)
|
|
||||||
.send_grease(true)
|
|
||||||
.build(h3_quinn::Connection::new(conn))
|
|
||||||
.await
|
|
||||||
.context("failed to create h3 server")?;
|
|
||||||
|
|
||||||
let (req, stream) = conn
|
|
||||||
.accept()
|
|
||||||
.await
|
|
||||||
.context("failed to accept h3 session")?
|
|
||||||
.context("failed to accept h3 request")?;
|
|
||||||
|
|
||||||
let ext = req.extensions();
|
|
||||||
anyhow::ensure!(req.method() == http::Method::CONNECT, "expected CONNECT request");
|
|
||||||
anyhow::ensure!(
|
|
||||||
ext.get::<h3::ext::Protocol>() == Some(&h3::ext::Protocol::WEB_TRANSPORT),
|
|
||||||
"expected WebTransport CONNECT"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Let the application decide if we accept this CONNECT request.
|
|
||||||
Ok(Connect { conn, req, stream })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// The WebTransport CONNECT has arrived, and we need to decide if we accept it.
|
|
||||||
pub struct Connect {
|
|
||||||
// Inspect to decide whether to accept() or reject() the session.
|
|
||||||
req: http::Request<()>,
|
|
||||||
|
|
||||||
conn: h3::server::Connection<h3_quinn::Connection, Bytes>,
|
|
||||||
stream: h3::server::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Connect {
|
|
||||||
// Expose the received URI
|
|
||||||
pub fn uri(&self) -> &http::Uri {
|
|
||||||
self.req.uri()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Accept the WebTransport session.
|
|
||||||
pub async fn accept(self) -> anyhow::Result<Setup> {
|
|
||||||
let session = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?;
|
|
||||||
let session = Arc::new(session);
|
|
||||||
|
|
||||||
let stream = session
|
|
||||||
.accept_bi()
|
|
||||||
.await
|
|
||||||
.context("failed to accept bidi stream")?
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let objects = Objects::new(session.clone());
|
|
||||||
|
|
||||||
let stream = match stream {
|
|
||||||
h3_webtransport::server::AcceptedBi::BidiStream(_session_id, stream) => stream,
|
|
||||||
h3_webtransport::server::AcceptedBi::Request(..) => anyhow::bail!("additional http requests not supported"),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut control = Control::new(stream);
|
|
||||||
let setup = match control.recv().await.context("failed to read SETUP")? {
|
|
||||||
Message::SetupClient(setup) => setup,
|
|
||||||
_ => anyhow::bail!("expected CLIENT SETUP"),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Let the application decide if we accept this MoQ session.
|
|
||||||
Ok(Setup {
|
|
||||||
setup,
|
|
||||||
control,
|
|
||||||
objects,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reject the WebTransport session with a HTTP response.
|
|
||||||
pub async fn reject(mut self, resp: http::Response<()>) -> anyhow::Result<()> {
|
|
||||||
self.stream.send_response(resp).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Setup {
|
|
||||||
setup: SetupClient,
|
|
||||||
control: Control,
|
|
||||||
objects: Objects,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Setup {
|
|
||||||
// Return the setup message we received.
|
|
||||||
pub fn setup(&self) -> &SetupClient {
|
|
||||||
&self.setup
|
|
||||||
}
|
|
||||||
|
|
||||||
// Accept the session with our own setup message.
|
|
||||||
pub async fn accept(mut self, setup: SetupServer) -> anyhow::Result<Session> {
|
|
||||||
self.control.send(setup).await?;
|
|
||||||
Ok(Session {
|
|
||||||
control: self.control,
|
|
||||||
objects: self.objects,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn reject(self) -> anyhow::Result<()> {
|
|
||||||
// TODO Close the QUIC connection with an error code.
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Session {
|
|
||||||
pub control: Control,
|
|
||||||
pub objects: Objects,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Session {
|
|
||||||
pub fn split(self) -> (Control, Objects) {
|
|
||||||
(self.control, self.objects)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
use anyhow::Context;
|
||||||
|
|
||||||
|
use moq_transport::{Message, SetupClient, SetupServer};
|
||||||
|
|
||||||
|
use super::{RecvControl, RecvObjects, SendControl, SendObjects};
|
||||||
|
|
||||||
|
/// Called by a server with an established WebTransport session.
|
||||||
|
// TODO close the session with an error code
|
||||||
|
pub async fn accept(session: webtransport_quinn::Session, role: moq_transport::Role) -> anyhow::Result<Session> {
|
||||||
|
let (send, recv) = session.accept_bi().await.context("failed to accept bidi stream")?;
|
||||||
|
|
||||||
|
let mut send_control = SendControl::new(send);
|
||||||
|
let mut recv_control = RecvControl::new(recv);
|
||||||
|
|
||||||
|
let setup_client = match recv_control.recv().await.context("failed to read SETUP")? {
|
||||||
|
Message::SetupClient(setup) => setup,
|
||||||
|
_ => anyhow::bail!("expected CLIENT SETUP"),
|
||||||
|
};
|
||||||
|
|
||||||
|
setup_client
|
||||||
|
.versions
|
||||||
|
.iter()
|
||||||
|
.find(|version| **version == moq_transport::Version::DRAFT_00)
|
||||||
|
.context("no supported versions")?;
|
||||||
|
|
||||||
|
if !setup_client.role.compatible(role) {
|
||||||
|
anyhow::bail!("incompatible roles: {:?} {:?}", setup_client.role, role);
|
||||||
|
}
|
||||||
|
|
||||||
|
let setup_server = SetupServer {
|
||||||
|
role,
|
||||||
|
version: moq_transport::Version::DRAFT_00,
|
||||||
|
};
|
||||||
|
|
||||||
|
send_control
|
||||||
|
.send(moq_transport::Message::SetupServer(setup_server))
|
||||||
|
.await
|
||||||
|
.context("failed to send setup server")?;
|
||||||
|
|
||||||
|
let send_objects = SendObjects::new(session.clone());
|
||||||
|
let recv_objects = RecvObjects::new(session.clone());
|
||||||
|
|
||||||
|
Ok(Session {
|
||||||
|
send_control,
|
||||||
|
recv_control,
|
||||||
|
send_objects,
|
||||||
|
recv_objects,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Called by a client with an established WebTransport session.
|
||||||
|
pub async fn connect(session: webtransport_quinn::Session, role: moq_transport::Role) -> anyhow::Result<Session> {
|
||||||
|
let (send, recv) = session.open_bi().await.context("failed to oen bidi stream")?;
|
||||||
|
|
||||||
|
let mut send_control = SendControl::new(send);
|
||||||
|
let mut recv_control = RecvControl::new(recv);
|
||||||
|
|
||||||
|
let setup_client = SetupClient {
|
||||||
|
role,
|
||||||
|
versions: vec![moq_transport::Version::DRAFT_00].into(),
|
||||||
|
path: "".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
send_control
|
||||||
|
.send(moq_transport::Message::SetupClient(setup_client))
|
||||||
|
.await
|
||||||
|
.context("failed to send SETUP CLIENT")?;
|
||||||
|
|
||||||
|
let setup_server = match recv_control.recv().await.context("failed to read SETUP")? {
|
||||||
|
Message::SetupServer(setup) => setup,
|
||||||
|
_ => anyhow::bail!("expected SERVER SETUP"),
|
||||||
|
};
|
||||||
|
|
||||||
|
if setup_server.version != moq_transport::Version::DRAFT_00 {
|
||||||
|
anyhow::bail!("unsupported version: {:?}", setup_server.version);
|
||||||
|
}
|
||||||
|
|
||||||
|
if !setup_server.role.compatible(role) {
|
||||||
|
anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role);
|
||||||
|
}
|
||||||
|
|
||||||
|
let send_objects = SendObjects::new(session.clone());
|
||||||
|
let recv_objects = RecvObjects::new(session.clone());
|
||||||
|
|
||||||
|
Ok(Session {
|
||||||
|
send_control,
|
||||||
|
recv_control,
|
||||||
|
send_objects,
|
||||||
|
recv_objects,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Session {
|
||||||
|
pub send_control: SendControl,
|
||||||
|
pub recv_control: RecvControl,
|
||||||
|
pub send_objects: SendObjects,
|
||||||
|
pub recv_objects: RecvObjects,
|
||||||
|
}
|
|
@ -0,0 +1,115 @@
|
||||||
|
use std::{
|
||||||
|
io,
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
|
pin::Pin,
|
||||||
|
sync::{Arc, Mutex, Weak},
|
||||||
|
task,
|
||||||
|
};
|
||||||
|
|
||||||
|
use tokio::io::{AsyncWrite, BufReader};
|
||||||
|
|
||||||
|
// Ugh, so we need to wrap SendStream with a mutex because we need to be able to call set_priority on it.
|
||||||
|
// The problem is that set_priority takes a i32, while send_order is a VarInt
|
||||||
|
// So the solution is to maintain a priority queue of active streams and constantly update the priority with their index.
|
||||||
|
// So the library might update the priority of the stream at any point, while the application might similtaniously write to it.
|
||||||
|
// The only upside is that we don't expose set_priority, so the application can't screw with things.
|
||||||
|
pub struct SendStream {
|
||||||
|
stream: Arc<Mutex<webtransport_quinn::SendStream>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SendStream {
|
||||||
|
// Create a new stream with the given order, returning a handle that allows us to update the priority.
|
||||||
|
pub(crate) fn with_order(stream: webtransport_quinn::SendStream, order: u64) -> (Self, SendStreamOrder) {
|
||||||
|
let stream = Arc::new(Mutex::new(stream));
|
||||||
|
let weak = Arc::<Mutex<webtransport_quinn::SendStream>>::downgrade(&stream);
|
||||||
|
|
||||||
|
(SendStream { stream }, SendStreamOrder { stream: weak, order })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct SendStreamOrder {
|
||||||
|
// We use Weak here so we don't prevent the stream from being closed when dereferenced.
|
||||||
|
// update() will return an error if the stream was closed instead.
|
||||||
|
stream: Weak<Mutex<webtransport_quinn::SendStream>>,
|
||||||
|
order: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SendStreamOrder {
|
||||||
|
pub(crate) fn update(&self, index: i32) -> Result<(), webtransport_quinn::StreamClosed> {
|
||||||
|
let stream = self.stream.upgrade().ok_or(webtransport_quinn::StreamClosed)?;
|
||||||
|
let mut stream = stream.lock().unwrap();
|
||||||
|
stream.set_priority(index)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for SendStreamOrder {
|
||||||
|
fn eq(&self, other: &Self) -> bool {
|
||||||
|
self.order == other.order
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Eq for SendStreamOrder {}
|
||||||
|
|
||||||
|
impl PartialOrd for SendStreamOrder {
|
||||||
|
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||||
|
// We reverse the order so the lower send order is higher priority.
|
||||||
|
other.order.partial_cmp(&self.order)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Ord for SendStreamOrder {
|
||||||
|
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||||
|
// We reverse the order so the lower send order is higher priority.
|
||||||
|
other.order.cmp(&self.order)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We implement AsyncWrite so we can grab the mutex on each write attempt, instead of holding it for the entire async function.
|
||||||
|
impl AsyncWrite for SendStream {
|
||||||
|
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> task::Poll<io::Result<usize>> {
|
||||||
|
let mut stream = self.stream.lock().unwrap();
|
||||||
|
Pin::new(&mut *stream).poll_write(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<io::Result<()>> {
|
||||||
|
let mut stream = self.stream.lock().unwrap();
|
||||||
|
Pin::new(&mut *stream).poll_flush(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<io::Result<()>> {
|
||||||
|
let mut stream = self.stream.lock().unwrap();
|
||||||
|
Pin::new(&mut *stream).poll_shutdown(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unfortunately, we need to wrap RecvStream with a buffer since moq-transport::Coding only supports buffered reads.
|
||||||
|
// TODO support unbuffered reads so we only read the MoQ header and then hand off the stream.
|
||||||
|
// NOTE: We can't use AsyncRead::chain because we need to get the inner stream for stop.
|
||||||
|
pub struct RecvStream {
|
||||||
|
stream: BufReader<webtransport_quinn::RecvStream>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RecvStream {
|
||||||
|
pub(crate) fn new(stream: webtransport_quinn::RecvStream) -> Self {
|
||||||
|
let stream = BufReader::new(stream);
|
||||||
|
Self { stream }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn stop(self, code: u32) {
|
||||||
|
self.stream.into_inner().stop(code).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for RecvStream {
|
||||||
|
type Target = BufReader<webtransport_quinn::RecvStream>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.stream
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DerefMut for RecvStream {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.stream
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,6 +23,10 @@ impl Role {
|
||||||
Self::Publisher => false,
|
Self::Publisher => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn compatible(&self, other: Role) -> bool {
|
||||||
|
self.is_publisher() == other.is_subscriber() && self.is_subscriber() == other.is_publisher()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Role> for VarInt {
|
impl From<Role> for VarInt {
|
||||||
|
|
|
@ -73,3 +73,9 @@ impl Deref for Versions {
|
||||||
&self.0
|
&self.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<Vec<Version>> for Versions {
|
||||||
|
fn from(vs: Vec<Version>) -> Self {
|
||||||
|
Self(vs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time;
|
use std::time;
|
||||||
|
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncBufReadExt;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::task::JoinSet; // lock across await boundaries
|
use tokio::task::JoinSet; // lock across await boundaries
|
||||||
|
|
||||||
|
@ -91,10 +91,13 @@ impl Session {
|
||||||
async fn receive_object(&mut self, object: Object, stream: RecvStream) -> anyhow::Result<()> {
|
async fn receive_object(&mut self, object: Object, stream: RecvStream) -> anyhow::Result<()> {
|
||||||
let track = object.track;
|
let track = object.track;
|
||||||
|
|
||||||
|
// Keep objects in memory for 10s
|
||||||
|
let expires = time::Instant::now() + time::Duration::from_secs(10);
|
||||||
|
|
||||||
let segment = segment::Info {
|
let segment = segment::Info {
|
||||||
sequence: object.sequence,
|
sequence: object.sequence,
|
||||||
send_order: object.send_order,
|
send_order: object.send_order,
|
||||||
expires: Some(time::Instant::now() + time::Duration::from_secs(2)), // TODO increase this once send_order is implemented
|
expires: Some(expires),
|
||||||
};
|
};
|
||||||
|
|
||||||
let segment = segment::Publisher::new(segment);
|
let segment = segment::Publisher::new(segment);
|
||||||
|
@ -112,14 +115,15 @@ impl Session {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> {
|
async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> {
|
||||||
let mut buf = [0u8; 32 * 1024];
|
|
||||||
loop {
|
loop {
|
||||||
let size = stream.read(&mut buf).await.context("failed to read from stream")?;
|
let buf = stream.fill_buf().await?;
|
||||||
if size == 0 {
|
if buf.is_empty() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunk = buf[..size].to_vec();
|
let chunk = buf.to_vec();
|
||||||
|
stream.consume(chunk.len());
|
||||||
|
|
||||||
segment.fragments.push(chunk.into())
|
segment.fragments.push(chunk.into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk};
|
use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk};
|
||||||
use moq_transport_quinn::Control;
|
use moq_transport_quinn::{RecvControl, SendControl};
|
||||||
|
|
||||||
pub struct Main {
|
pub struct Main {
|
||||||
control: Control,
|
send_control: SendControl,
|
||||||
|
recv_control: RecvControl,
|
||||||
|
|
||||||
outgoing: mpsc::Receiver<Message>,
|
outgoing: mpsc::Receiver<Message>,
|
||||||
|
|
||||||
contribute: mpsc::Sender<Contribute>,
|
contribute: mpsc::Sender<Contribute>,
|
||||||
|
@ -15,8 +17,8 @@ impl Main {
|
||||||
pub async fn run(mut self) -> anyhow::Result<()> {
|
pub async fn run(mut self) -> anyhow::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(msg) = self.outgoing.recv() => self.control.send(msg).await?,
|
Some(msg) = self.outgoing.recv() => self.send_control.send(msg).await?,
|
||||||
Ok(msg) = self.control.recv() => self.handle(msg).await?,
|
Ok(msg) = self.recv_control.recv() => self.handle(msg).await?,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -51,13 +53,17 @@ impl<T> Component<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Splits a control stream into two components, based on if it's a message for contribution or distribution.
|
// Splits a control stream into two components, based on if it's a message for contribution or distribution.
|
||||||
pub fn split(control: Control) -> (Main, Component<Contribute>, Component<Distribute>) {
|
pub fn split(
|
||||||
|
send_control: SendControl,
|
||||||
|
recv_control: RecvControl,
|
||||||
|
) -> (Main, Component<Contribute>, Component<Distribute>) {
|
||||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(1);
|
let (outgoing_tx, outgoing_rx) = mpsc::channel(1);
|
||||||
let (contribute_tx, contribute_rx) = mpsc::channel(1);
|
let (contribute_tx, contribute_rx) = mpsc::channel(1);
|
||||||
let (distribute_tx, distribute_rx) = mpsc::channel(1);
|
let (distribute_tx, distribute_rx) = mpsc::channel(1);
|
||||||
|
|
||||||
let control = Main {
|
let control = Main {
|
||||||
control,
|
send_control,
|
||||||
|
recv_control,
|
||||||
outgoing: outgoing_rx,
|
outgoing: outgoing_rx,
|
||||||
contribute: contribute_tx,
|
contribute: contribute_tx,
|
||||||
distribute: distribute_tx,
|
distribute: distribute_tx,
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::{io::AsyncWriteExt, task::JoinSet}; // allows locking across await
|
||||||
use tokio::task::JoinSet; // allows locking across await
|
|
||||||
|
|
||||||
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
|
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
|
||||||
use moq_transport_quinn::SendObjects;
|
use moq_transport_quinn::SendObjects;
|
||||||
|
@ -165,7 +164,7 @@ impl Session {
|
||||||
send_order: segment.send_order,
|
send_order: segment.send_order,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut stream = objects.send(object).await?;
|
let mut stream = objects.open(object).await?;
|
||||||
|
|
||||||
// Write each fragment as they are available.
|
// Write each fragment as they are available.
|
||||||
while let Some(fragment) = segment.fragments.next().await {
|
while let Some(fragment) = segment.fragments.next().await {
|
||||||
|
|
|
@ -3,8 +3,6 @@ pub mod broker;
|
||||||
mod contribute;
|
mod contribute;
|
||||||
mod control;
|
mod control;
|
||||||
mod distribute;
|
mod distribute;
|
||||||
mod server;
|
|
||||||
mod session;
|
mod session;
|
||||||
|
|
||||||
pub use server::*;
|
|
||||||
pub use session::*;
|
pub use session::*;
|
||||||
|
|
|
@ -1,10 +1,5 @@
|
||||||
use anyhow::Context;
|
|
||||||
|
|
||||||
use super::{broker, contribute, control, distribute};
|
use super::{broker, contribute, control, distribute};
|
||||||
|
|
||||||
use moq_transport::{Role, SetupServer, Version};
|
|
||||||
use moq_transport_quinn::Connect;
|
|
||||||
|
|
||||||
pub struct Session {
|
pub struct Session {
|
||||||
// Split logic into contribution/distribution to reduce the problem space.
|
// Split logic into contribution/distribution to reduce the problem space.
|
||||||
contribute: contribute::Session,
|
contribute: contribute::Session,
|
||||||
|
@ -15,50 +10,17 @@ pub struct Session {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Session {
|
impl Session {
|
||||||
pub async fn accept(session: Connect, broker: broker::Broadcasts) -> anyhow::Result<Session> {
|
pub fn new(session: moq_transport_quinn::Session, broker: broker::Broadcasts) -> Session {
|
||||||
// Accep the WebTransport session.
|
let (control, contribute, distribute) = control::split(session.send_control, session.recv_control);
|
||||||
// OPTIONAL validate the conn.uri() otherwise call conn.reject()
|
|
||||||
let session = session
|
|
||||||
.accept()
|
|
||||||
.await
|
|
||||||
.context(": server::Setupfailed to accept WebTransport session")?;
|
|
||||||
|
|
||||||
session
|
let contribute = contribute::Session::new(session.recv_objects, contribute, broker.clone());
|
||||||
.setup()
|
let distribute = distribute::Session::new(session.send_objects, distribute, broker);
|
||||||
.versions
|
|
||||||
.iter()
|
|
||||||
.find(|v| **v == Version::DRAFT_00)
|
|
||||||
.context("failed to find supported version")?;
|
|
||||||
|
|
||||||
// Choose our role based on the client's role.
|
Self {
|
||||||
let role = match session.setup().role {
|
|
||||||
Role::Publisher => Role::Subscriber,
|
|
||||||
Role::Subscriber => Role::Publisher,
|
|
||||||
Role::Both => Role::Both,
|
|
||||||
};
|
|
||||||
|
|
||||||
let setup = SetupServer {
|
|
||||||
version: Version::DRAFT_00,
|
|
||||||
role,
|
|
||||||
};
|
|
||||||
|
|
||||||
let session = session.accept(setup).await?;
|
|
||||||
|
|
||||||
let (control, objects) = session.split();
|
|
||||||
let (objects_send, objects_recv) = objects.split();
|
|
||||||
|
|
||||||
let (control, contribute, distribute) = control::split(control);
|
|
||||||
|
|
||||||
let contribute = contribute::Session::new(objects_recv, contribute, broker.clone());
|
|
||||||
let distribute = distribute::Session::new(objects_send, distribute, broker);
|
|
||||||
|
|
||||||
let session = Self {
|
|
||||||
control,
|
control,
|
||||||
contribute,
|
contribute,
|
||||||
distribute,
|
distribute,
|
||||||
};
|
}
|
||||||
|
|
||||||
Ok(session)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(self) -> anyhow::Result<()> {
|
pub async fn run(self) -> anyhow::Result<()> {
|
||||||
|
|
Loading…
Reference in New Issue