Compare commits
24 Commits
main
...
generic-tr
Author | SHA1 | Date |
---|---|---|
Luke Curley | f9c3f8b898 | |
François Michel | a06d273e69 | |
François Michel | 2427ea09af | |
François Michel | 912c438cc9 | |
François Michel | 6212dc5e2f | |
François Michel | 6ba26c8a16 | |
François Michel | 05da131c9a | |
François Michel | 4b9a31fa89 | |
François Michel | 8e763e910a | |
François Michel | 880079e142 | |
François Michel | 94b8b55ea6 | |
François Michel | c360ea1416 | |
François Michel | 6ca7ab5124 | |
François Michel | f4353892ef | |
François Michel | 72d11851e1 | |
François Michel | 9f690f7f00 | |
François Michel | 7545027775 | |
François Michel | 17b96ff51b | |
François Michel | 31bd538481 | |
François Michel | 6657464462 | |
François Michel | c50da7ed37 | |
François Michel | f33251d69b | |
François Michel | 78f68a6fa0 | |
François Michel | 9d48201504 |
|
@ -66,6 +66,124 @@ version = "1.0.71"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8"
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"event-listener",
|
||||
"futures-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-executor"
|
||||
version = "1.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6fa3dc5f2a8564f07759c008b9109dc0d39de92a88d5588b8a5036d286383afb"
|
||||
dependencies = [
|
||||
"async-lock",
|
||||
"async-task",
|
||||
"concurrent-queue",
|
||||
"fastrand",
|
||||
"futures-lite",
|
||||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-global-executor"
|
||||
version = "2.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"async-executor",
|
||||
"async-io",
|
||||
"async-lock",
|
||||
"blocking",
|
||||
"futures-lite",
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-io"
|
||||
version = "1.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af"
|
||||
dependencies = [
|
||||
"async-lock",
|
||||
"autocfg",
|
||||
"cfg-if",
|
||||
"concurrent-queue",
|
||||
"futures-lite",
|
||||
"log",
|
||||
"parking",
|
||||
"polling",
|
||||
"rustix",
|
||||
"slab",
|
||||
"socket2 0.4.9",
|
||||
"waker-fn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-lock"
|
||||
version = "2.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7"
|
||||
dependencies = [
|
||||
"event-listener",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-std"
|
||||
version = "1.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"async-global-executor",
|
||||
"async-io",
|
||||
"async-lock",
|
||||
"crossbeam-utils",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-lite",
|
||||
"gloo-timers",
|
||||
"kv-log-macro",
|
||||
"log",
|
||||
"memchr",
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
"slab",
|
||||
"wasm-bindgen-futures",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-task"
|
||||
version = "4.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae"
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.69"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b2d0f03b3640e3a630367e40c468cb7f309529c708ed1d88597047b0e7c6ef7"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-waker"
|
||||
version = "1.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3"
|
||||
|
||||
[[package]]
|
||||
name = "atty"
|
||||
version = "0.2.14"
|
||||
|
@ -110,6 +228,21 @@ dependencies = [
|
|||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "blocking"
|
||||
version = "1.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"async-lock",
|
||||
"async-task",
|
||||
"atomic-waker",
|
||||
"fastrand",
|
||||
"futures-lite",
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bumpalo"
|
||||
version = "3.13.0"
|
||||
|
@ -188,6 +321,15 @@ version = "1.0.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation"
|
||||
version = "0.9.3"
|
||||
|
@ -213,6 +355,15 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crypto-common"
|
||||
version = "0.1.6"
|
||||
|
@ -276,6 +427,12 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "2.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
|
||||
|
||||
[[package]]
|
||||
name = "fastrand"
|
||||
version = "1.9.0"
|
||||
|
@ -348,6 +505,21 @@ version = "0.3.28"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
|
||||
|
||||
[[package]]
|
||||
name = "futures-lite"
|
||||
version = "1.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
|
||||
dependencies = [
|
||||
"fastrand",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"memchr",
|
||||
"parking",
|
||||
"pin-project-lite",
|
||||
"waker-fn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-macro"
|
||||
version = "0.3.28"
|
||||
|
@ -410,6 +582,18 @@ dependencies = [
|
|||
"wasi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gloo-timers"
|
||||
version = "0.2.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"js-sys",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.3.19"
|
||||
|
@ -669,6 +853,15 @@ dependencies = [
|
|||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kv-log-macro"
|
||||
version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
|
||||
dependencies = [
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.146"
|
||||
|
@ -696,6 +889,9 @@ name = "log"
|
|||
version = "0.4.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
|
||||
dependencies = [
|
||||
"value-bag",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
|
@ -735,6 +931,7 @@ name = "moq-demo"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"clap",
|
||||
"env_logger",
|
||||
"hex",
|
||||
|
@ -747,15 +944,21 @@ dependencies = [
|
|||
"rustls-pemfile",
|
||||
"tokio",
|
||||
"warp",
|
||||
"webtransport-generic 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"webtransport-quinn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "moq-transport"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"futures",
|
||||
"log",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"webtransport-generic 0.2.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -780,15 +983,15 @@ name = "moq-warp"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"log",
|
||||
"moq-transport",
|
||||
"moq-transport-quinn",
|
||||
"mp4",
|
||||
"quinn",
|
||||
"ring",
|
||||
"rustls 0.21.2",
|
||||
"rustls-pemfile",
|
||||
"tokio",
|
||||
"webtransport-generic 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -888,6 +1091,12 @@ version = "0.1.5"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "parking"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e"
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.1"
|
||||
|
@ -949,6 +1158,22 @@ version = "0.1.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "polling"
|
||||
version = "2.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
"concurrent-queue",
|
||||
"libc",
|
||||
"log",
|
||||
"pin-project-lite",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.17"
|
||||
|
@ -1606,12 +1831,24 @@ version = "0.2.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
|
||||
|
||||
[[package]]
|
||||
name = "value-bag"
|
||||
version = "1.4.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3"
|
||||
|
||||
[[package]]
|
||||
name = "version_check"
|
||||
version = "0.9.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
|
||||
|
||||
[[package]]
|
||||
name = "waker-fn"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
|
||||
|
||||
[[package]]
|
||||
name = "want"
|
||||
version = "0.3.1"
|
||||
|
@ -1684,6 +1921,18 @@ dependencies = [
|
|||
"wasm-bindgen-shared",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-futures"
|
||||
version = "0.4.37"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"js-sys",
|
||||
"wasm-bindgen",
|
||||
"web-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasm-bindgen-macro"
|
||||
version = "0.2.87"
|
||||
|
@ -1733,6 +1982,56 @@ dependencies = [
|
|||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webtransport-generic"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"log",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webtransport-generic"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db80267637ca8d24cd3425a3e993842ed9e128620f5fcd9603145fee8fe808fd"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"log",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webtransport-proto"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "24a891faa138075b2338bc3da6dc6a67c174118a088e2809db6882d410973cd6"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"http",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webtransport-quinn"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ea529a3044e99379b180581e8b0773fa7feb1fc53e341f36927a755ac05abd62"
|
||||
dependencies = [
|
||||
"async-std",
|
||||
"bytes",
|
||||
"futures",
|
||||
"http",
|
||||
"quinn",
|
||||
"quinn-proto",
|
||||
"thiserror",
|
||||
"tokio-util",
|
||||
"webtransport-generic 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"webtransport-proto",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winapi"
|
||||
version = "0.3.9"
|
||||
|
|
|
@ -1,7 +1,2 @@
|
|||
[workspace]
|
||||
members = [
|
||||
"moq-transport",
|
||||
"moq-transport-quinn",
|
||||
"moq-demo",
|
||||
"moq-warp",
|
||||
]
|
||||
members = ["moq-transport", "moq-transport-quinn", "moq-demo", "moq-warp"]
|
||||
|
|
|
@ -11,12 +11,9 @@ edition = "2021"
|
|||
keywords = ["quic", "http3", "webtransport", "media", "live"]
|
||||
categories = ["multimedia", "network-programming", "web-programming"]
|
||||
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
moq-transport = { path = "../moq-transport" }
|
||||
moq-warp = { path = "../moq-warp" }
|
||||
|
||||
# QUIC
|
||||
quinn = "0.10"
|
||||
|
@ -38,3 +35,11 @@ clap = { version = "4.0", features = [ "derive" ] }
|
|||
log = { version = "0.4", features = ["std"] }
|
||||
env_logger = "0.9.3"
|
||||
anyhow = "1.0.70"
|
||||
|
||||
bytes = "1"
|
||||
|
||||
webtransport-generic = "0.2"
|
||||
webtransport-quinn = "0.3"
|
||||
|
||||
moq-transport = { path = "../moq-transport" }
|
||||
moq-warp = { path = "../moq-warp" }
|
||||
|
|
|
@ -1,11 +1,18 @@
|
|||
use std::{fs, io, net, path, sync};
|
||||
use std::{fs, io, net, path};
|
||||
|
||||
use anyhow::Context;
|
||||
use clap::Parser;
|
||||
use moq_transport::{Role, SetupServer, Version};
|
||||
use ring::digest::{digest, SHA256};
|
||||
use tokio::task::JoinSet;
|
||||
use warp::Filter;
|
||||
|
||||
use moq_warp::{relay, source};
|
||||
use moq_warp::{
|
||||
relay::{self, broker::Broadcasts},
|
||||
source,
|
||||
};
|
||||
|
||||
mod server;
|
||||
|
||||
/// Search for a pattern in a file and display the lines that contain it.
|
||||
#[derive(Parser, Clone)]
|
||||
|
@ -49,19 +56,55 @@ async fn main() -> anyhow::Result<()> {
|
|||
addr: args.addr,
|
||||
cert: args.cert,
|
||||
key: args.key,
|
||||
broker,
|
||||
broker: broker.clone(),
|
||||
};
|
||||
|
||||
let server = relay::Server::new(config).context("failed to create server")?;
|
||||
|
||||
// Run all of the above
|
||||
tokio::select! {
|
||||
res = server.run() => res.context("failed to run server"),
|
||||
res = run_server(config, broker) => res.context("failed to run server"),
|
||||
res = media.run() => res.context("failed to run media source"),
|
||||
res = serve => res.context("failed to run HTTP server"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_server(config: relay::ServerConfig, broker: Broadcasts) -> anyhow::Result<()> {
|
||||
let quinn = server::Server::new(config).unwrap();
|
||||
|
||||
let mut tasks = JoinSet::new();
|
||||
loop {
|
||||
let broker = broker.clone();
|
||||
tokio::select! {
|
||||
connect = server::Server::accept_new_webtransport_session(&quinn) => {
|
||||
tasks.spawn(async move {
|
||||
let client_setup = connect?.accept().await?;
|
||||
// TODO: maybe reject setup
|
||||
let role = match client_setup.setup().role {
|
||||
Role::Publisher => Role::Subscriber,
|
||||
Role::Subscriber => Role::Publisher,
|
||||
Role::Both => Role::Both,
|
||||
};
|
||||
let setup_server = SetupServer {
|
||||
version: Version::DRAFT_00,
|
||||
role,
|
||||
};
|
||||
|
||||
let session = client_setup.accept(setup_server).await?;
|
||||
let session = relay::Session::from_transport_session(session, broker.clone()).await?;
|
||||
session.run().await?;
|
||||
let ret: anyhow::Result<()> = Ok(());
|
||||
ret
|
||||
});
|
||||
}
|
||||
res = tasks.join_next(), if !tasks.is_empty() => {
|
||||
let res = res.expect("no tasks").expect("task aborted");
|
||||
|
||||
if let Err(err) = res {
|
||||
log::error!("session terminated: {:?}", err);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run a HTTP server using Warp
|
||||
// TODO remove this when Chrome adds support for self-signed certificates using WebTransport
|
||||
async fn serve_http(args: Cli) -> anyhow::Result<()> {
|
||||
|
@ -76,7 +119,7 @@ async fn serve_http(args: Cli) -> anyhow::Result<()> {
|
|||
// Compute the SHA-256 digest
|
||||
let fingerprint = digest(&SHA256, cert.as_ref());
|
||||
let fingerprint = hex::encode(fingerprint.as_ref());
|
||||
let fingerprint = sync::Arc::new(fingerprint);
|
||||
let fingerprint = std::sync::Arc::new(fingerprint);
|
||||
|
||||
let cors = warp::cors().allow_any_origin();
|
||||
|
||||
|
|
|
@ -0,0 +1,209 @@
|
|||
use std::{fs, io, time};
|
||||
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use h3::quic::BidiStream;
|
||||
use h3_webtransport::server::AcceptedBi;
|
||||
use moq_transport::AcceptSetup;
|
||||
use moq_warp::relay::ServerConfig;
|
||||
use tokio::task::JoinSet;
|
||||
use warp::{http, Future};
|
||||
|
||||
pub struct Server {
|
||||
// The MoQ transport server.
|
||||
server: quinn::Endpoint,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
// Create a new server
|
||||
pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
|
||||
// Read the PEM certificate chain
|
||||
let certs = fs::File::open(config.cert).context("failed to open cert file")?;
|
||||
let mut certs = io::BufReader::new(certs);
|
||||
let certs = rustls_pemfile::certs(&mut certs)?
|
||||
.into_iter()
|
||||
.map(rustls::Certificate)
|
||||
.collect();
|
||||
|
||||
// Read the PEM private key
|
||||
let keys = fs::File::open(config.key).context("failed to open key file")?;
|
||||
let mut keys = io::BufReader::new(keys);
|
||||
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut keys)?;
|
||||
|
||||
anyhow::ensure!(keys.len() == 1, "expected a single key");
|
||||
let key = rustls::PrivateKey(keys.remove(0));
|
||||
|
||||
let mut tls_config = rustls::ServerConfig::builder()
|
||||
.with_safe_default_cipher_suites()
|
||||
.with_safe_default_kx_groups()
|
||||
.with_protocol_versions(&[&rustls::version::TLS13])
|
||||
.unwrap()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(certs, key)?;
|
||||
|
||||
tls_config.max_early_data_size = u32::MAX;
|
||||
let alpn: Vec<Vec<u8>> = vec![webtransport_quinn::ALPN];
|
||||
tls_config.alpn_protocols = alpn;
|
||||
|
||||
let mut server_config = quinn::ServerConfig::with_crypto(std::sync::Arc::new(tls_config));
|
||||
|
||||
// Enable BBR congestion control
|
||||
// TODO validate the implementation
|
||||
let mut transport_config = quinn::TransportConfig::default();
|
||||
transport_config.keep_alive_interval(Some(time::Duration::from_secs(2)));
|
||||
transport_config.congestion_controller_factory(std::sync::Arc::new(quinn::congestion::BbrConfig::default()));
|
||||
|
||||
server_config.transport = std::sync::Arc::new(transport_config);
|
||||
let server = quinn::Endpoint::server(server_config, config.addr)?;
|
||||
|
||||
Ok(Self { server })
|
||||
}
|
||||
|
||||
pub async fn accept_new_webtransport_session(endpoint: &h3_quinn::Endpoint) -> anyhow::Result<Connect> {
|
||||
let mut handshake = JoinSet::new();
|
||||
// perform a quic handshake
|
||||
loop {
|
||||
tokio::select!(
|
||||
// Accept the connection and start the WebTransport handshake.
|
||||
conn = endpoint.accept() => {
|
||||
let conn = conn.context("failed to accept connection").unwrap();
|
||||
handshake.spawn(async move {
|
||||
|
||||
let conn = conn.await.context("failed to accept h3 connection")?;
|
||||
|
||||
let mut conn = h3::server::builder()
|
||||
.enable_webtransport(true)
|
||||
.enable_connect(true)
|
||||
.enable_datagram(true)
|
||||
.max_webtransport_sessions(1)
|
||||
.send_grease(true)
|
||||
.build(h3_quinn::Connection::new(conn))
|
||||
.await
|
||||
.context("failed to create h3 server")?;
|
||||
|
||||
let (req, stream) = conn
|
||||
.accept()
|
||||
.await
|
||||
.context("failed to accept h3 session")?
|
||||
.context("failed to accept h3 request")?;
|
||||
|
||||
let ext = req.extensions();
|
||||
anyhow::ensure!(req.method() == http::Method::CONNECT, "expected CONNECT request");
|
||||
anyhow::ensure!(
|
||||
ext.get::<h3::ext::Protocol>() == Some(&h3::ext::Protocol::WEB_TRANSPORT),
|
||||
"expected WebTransport CONNECT"
|
||||
);
|
||||
|
||||
// Let the application decide if we accept this CONNECT request.
|
||||
Ok(Connect { conn, req, stream })
|
||||
});
|
||||
},
|
||||
// Return any mostly finished WebTransport handshakes.
|
||||
res = handshake.join_next(), if !handshake.is_empty() => {
|
||||
let res = res.expect("no tasks").expect("task aborted");
|
||||
match res {
|
||||
Ok(connect_request) => return Ok(connect_request),
|
||||
Err(err) => log::warn!("failed to accept session: {:?}", err),
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The WebTransport CONNECT has arrived, and we need to decide if we accept it.
|
||||
pub struct Connect {
|
||||
// Inspect to decide whether to accept() or reject() the session.
|
||||
req: http::Request<()>,
|
||||
|
||||
conn: h3::server::Connection<h3_quinn::Connection, Bytes>,
|
||||
stream: h3::server::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
|
||||
}
|
||||
|
||||
impl Connect {
|
||||
// Accept the WebTransport session.
|
||||
pub async fn accept(self) -> anyhow::Result<AcceptSetup<Server>> {
|
||||
let session = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?;
|
||||
let mut session = Server { server: session };
|
||||
|
||||
let (control_stream_send, control_stream_recv) = moq_transport::accept_bidi(&mut session)
|
||||
.await
|
||||
.context("failed to accept bidi stream")?
|
||||
.unwrap();
|
||||
|
||||
Ok(moq_transport::Session::accept(session).await?)
|
||||
}
|
||||
|
||||
// Reject the WebTransport session with a HTTP response.
|
||||
pub async fn reject(mut self, resp: http::Response<()>) -> anyhow::Result<()> {
|
||||
self.stream.send_response(resp).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl webtransport_generic::Connection for Server {
|
||||
type Error = anyhow::Error;
|
||||
type SendStream = QuinnSendStream;
|
||||
|
||||
type RecvStream = QuinnRecvStream;
|
||||
|
||||
fn poll_accept_uni(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<Option<Self::RecvStream>, Self::Error>> {
|
||||
let fut = self.server.accept_uni();
|
||||
let fut = std::pin::pin!(fut);
|
||||
fut.poll(cx)
|
||||
.map_ok(|opt| opt.map(|(_, s)| QuinnRecvStream::new(s)))
|
||||
.map_err(|e| anyhow::anyhow!("{:?}", e))
|
||||
}
|
||||
|
||||
fn poll_accept_bidi(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<Option<(Self::SendStream, Self::RecvStream)>, Self::Error>> {
|
||||
let fut = self.server.accept_bi();
|
||||
let fut = std::pin::pin!(fut);
|
||||
let res = std::task::ready!(fut.poll(cx).map_err(|e| anyhow::anyhow!("{:?}", e)));
|
||||
match res {
|
||||
Ok(Some(AcceptedBi::Request(_, _))) => {
|
||||
std::task::Poll::Ready(Err(anyhow::anyhow!("received new session whils accepting bidi stream")))
|
||||
}
|
||||
Ok(Some(AcceptedBi::BidiStream(_, s))) => {
|
||||
let (send, recv) = s.split();
|
||||
std::task::Poll::Ready(Ok(Some((QuinnSendStream::new(send), QuinnRecvStream::new(recv)))))
|
||||
}
|
||||
Ok(None) => std::task::Poll::Ready(Ok(None)),
|
||||
Err(e) => std::task::Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_open_bidi(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(Self::SendStream, Self::RecvStream), Self::Error>> {
|
||||
let fut = self.server.open_bi(self.server.session_id());
|
||||
let fut = std::pin::pin!(fut);
|
||||
fut.poll(cx)
|
||||
.map_ok(|s| {
|
||||
let (send, recv) = s.split();
|
||||
(QuinnSendStream::new(send), QuinnRecvStream::new(recv))
|
||||
})
|
||||
.map_err(|e| anyhow::anyhow!("{:?}", e))
|
||||
}
|
||||
|
||||
fn poll_open_uni(
|
||||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<Self::SendStream, Self::Error>> {
|
||||
let fut = self.server.open_uni(self.server.session_id());
|
||||
let fut = std::pin::pin!(fut);
|
||||
fut.poll(cx)
|
||||
.map_ok(|s| QuinnSendStream::new(s))
|
||||
.map_err(|e| anyhow::anyhow!("{:?}", e))
|
||||
}
|
||||
|
||||
fn close(&mut self, _code: u32, _reason: &[u8]) {
|
||||
todo!("not implemented")
|
||||
}
|
||||
}
|
|
@ -18,3 +18,7 @@ categories = [ "multimedia", "network-programming", "web-programming" ]
|
|||
bytes = "1"
|
||||
thiserror = "1.0.21"
|
||||
log = "0.4"
|
||||
tokio = { version = "1.27", features = ["full"] }
|
||||
anyhow = "1.0.70"
|
||||
webtransport-generic = { version = "0.2", path = "../../webtransport-rs/webtransport-generic" }
|
||||
futures = "0.3"
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
mod coding;
|
||||
mod control;
|
||||
mod object;
|
||||
mod network;
|
||||
|
||||
pub use coding::*;
|
||||
pub use control::*;
|
||||
pub use object::*;
|
||||
pub use network::*;
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
use crate::{Decode, DecodeError, Encode, Message};
|
||||
use webtransport_generic::{RecvStream, SendStream};
|
||||
|
||||
use bytes::{Buf, BytesMut};
|
||||
|
||||
use std::future;
|
||||
use std::io::Cursor;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
pub struct SendControl<S: SendStream> {
|
||||
stream: S,
|
||||
buf: BytesMut, // reuse a buffer to encode messages.
|
||||
}
|
||||
|
||||
impl<S: SendStream> SendControl<S> {
|
||||
pub fn new(stream: S) -> Self {
|
||||
Self {
|
||||
buf: BytesMut::new(),
|
||||
stream,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
|
||||
let msg = msg.into();
|
||||
log::info!("sending message: {:?}", msg);
|
||||
|
||||
self.buf.clear();
|
||||
msg.encode(&mut self.buf)?;
|
||||
|
||||
// TODO make this work with select!
|
||||
self.stream
|
||||
.send(&mut self.buf)
|
||||
.await
|
||||
.context("error sending control message")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Helper that lets multiple threads send control messages.
|
||||
pub fn share(self) -> SharedControl<S> {
|
||||
SharedControl {
|
||||
stream: Arc::new(Mutex::new(self)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper that allows multiple threads to send control messages.
|
||||
// There's no equivalent for receiving since only one thread should be receiving at a time.
|
||||
#[derive(Clone)]
|
||||
pub struct SharedControl<S: SendStream> {
|
||||
stream: Arc<Mutex<SendControl<S>>>,
|
||||
}
|
||||
|
||||
impl<S: SendStream> SharedControl<S> {
|
||||
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
|
||||
let mut stream = self.stream.lock().await;
|
||||
stream.send(msg).await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RecvControl<R: RecvStream> {
|
||||
stream: R,
|
||||
buf: BytesMut, // data we've read but haven't fully decoded yet
|
||||
}
|
||||
|
||||
impl<R: RecvStream> RecvControl<R> {
|
||||
pub fn new(stream: R) -> Self {
|
||||
Self {
|
||||
buf: BytesMut::new(),
|
||||
stream,
|
||||
}
|
||||
}
|
||||
|
||||
// Read the next full message from the stream.
|
||||
pub async fn recv(&mut self) -> anyhow::Result<Message> {
|
||||
loop {
|
||||
// Read the contents of the buffer
|
||||
let mut peek = Cursor::new(&self.buf);
|
||||
|
||||
match Message::decode(&mut peek) {
|
||||
Ok(msg) => {
|
||||
// We've successfully decoded a message, so we can advance the buffer.
|
||||
self.buf.advance(peek.position() as usize);
|
||||
|
||||
log::info!("received message: {:?}", msg);
|
||||
return Ok(msg);
|
||||
}
|
||||
Err(DecodeError::UnexpectedEnd) => {
|
||||
// The decode failed, so we need to append more data.
|
||||
future::poll_fn(|cx| self.stream.poll_recv(cx, &mut self.buf))
|
||||
.await
|
||||
.context("error receiving control message")?;
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
mod control;
|
||||
mod objects;
|
||||
mod server;
|
||||
|
||||
pub use control::*;
|
||||
pub use objects::*;
|
||||
pub use server::*;
|
|
@ -0,0 +1,103 @@
|
|||
use crate::{Decode, DecodeError, Encode, Object};
|
||||
use anyhow::Context;
|
||||
use bytes::{Buf, BytesMut};
|
||||
use futures::StreamExt;
|
||||
use futures::TryStreamExt;
|
||||
use std::{
|
||||
io::Cursor,
|
||||
pin::Pin,
|
||||
task::{ready, Poll},
|
||||
};
|
||||
use tokio::task::JoinSet;
|
||||
use webtransport_generic::{AsyncRecvStream, AsyncSendStream, AsyncSession};
|
||||
|
||||
// TODO support clients
|
||||
|
||||
pub struct SendObjects<S: AsyncSession> {
|
||||
session: S,
|
||||
|
||||
// A reusable buffer for encoding messages.
|
||||
buf: BytesMut,
|
||||
}
|
||||
|
||||
impl<S: AsyncSession> SendObjects<S> {
|
||||
pub fn new(session: S) -> Self {
|
||||
Self {
|
||||
session,
|
||||
buf: BytesMut::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send(&mut self, header: Object) -> anyhow::Result<S::SendStream> {
|
||||
self.buf.clear();
|
||||
header.encode(&mut self.buf).unwrap();
|
||||
|
||||
let mut stream = self.session.open_uni().await.context("failed to open uni stream")?;
|
||||
|
||||
stream
|
||||
.send(&mut self.buf)
|
||||
.await
|
||||
.context("failed to send data on stream")?;
|
||||
|
||||
Ok(stream)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RecvObjects<S: AsyncSession> {
|
||||
session: S,
|
||||
objects: JoinSet<anyhow::Result<(Object, S::RecvStream)>>,
|
||||
}
|
||||
|
||||
impl<S: AsyncSession + Clone> RecvObjects<S> {
|
||||
pub fn new(session: S) -> Self {
|
||||
let streams = futures::stream::unfold(session.clone(), |mut session| async move {
|
||||
match session.accept_uni().await {
|
||||
Ok(stream) => Some((stream, session)),
|
||||
Err(e) => None,
|
||||
}
|
||||
});
|
||||
|
||||
let objects = streams.map(|mut stream| async move {});
|
||||
|
||||
// Decode the object header for up to 16 streams in parallel.
|
||||
// Otherwise, a lost packet for the first chunk of a stream would block future streams.
|
||||
let objects = objects.buffer_unordered(16);
|
||||
|
||||
let objects = Box::pin(objects);
|
||||
|
||||
//try_buffer_unordered
|
||||
|
||||
let objects = JoinSet::new();
|
||||
|
||||
Self { session, objects }
|
||||
}
|
||||
|
||||
pub async fn next(&mut self) -> anyhow::Result<(Object, S::RecvStream)> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
res = self.objects.join_next(), if !self.objects.is_empty() => {
|
||||
return res.unwrap()?;
|
||||
},
|
||||
res = self.session.accept_uni() => {
|
||||
let stream = res?;
|
||||
self.objects.spawn(async move { Self::fetch(stream).await });
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch(mut stream: S::RecvStream) -> anyhow::Result<(Object, S::RecvStream)> {
|
||||
let mut buf = Vec::with_capacity(64);
|
||||
|
||||
loop {
|
||||
stream.recv(&mut buf).await?.context("no more data")?;
|
||||
let mut peek = Cursor::new(&buf);
|
||||
|
||||
match Object::decode(&mut peek) {
|
||||
Ok(header) => return Ok((header, stream)),
|
||||
Err(DecodeError::UnexpectedEnd) => continue,
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
use crate::{Message, SetupClient, SetupServer};
|
||||
use anyhow::Context;
|
||||
use webtransport_generic::Session as Generic;
|
||||
|
||||
use super::{RecvControl, RecvObjects, SendControl, SendObjects};
|
||||
|
||||
pub struct Session<S: Generic> {
|
||||
pub send_control: SendControl<S::SendStream>,
|
||||
pub recv_control: RecvControl<S::RecvStream>,
|
||||
pub send_objects: SendObjects<S>,
|
||||
pub recv_objects: RecvObjects<S>,
|
||||
}
|
||||
|
||||
impl<S: Generic + Send> Session<S> {
|
||||
pub async fn accept(
|
||||
session: S,
|
||||
) -> anyhow::Result<AcceptSetup<S>> {
|
||||
let send_objects = SendObjects::new(session.clone());
|
||||
let recv_objects = RecvObjects::new(session.clone());
|
||||
|
||||
let setup_client = match recv_control await.context("failed to read SETUP")? {
|
||||
Message::SetupClient(setup) => setup,
|
||||
_ => anyhow::bail!("expected CLIENT SETUP"),
|
||||
};
|
||||
|
||||
Ok(AcceptSetup {
|
||||
setup_client,
|
||||
control,
|
||||
objects,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn split(self) -> (Control<C>, Objects<C>) {
|
||||
(self.control, self.objects)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AcceptSetup<C: Generic + Send> {
|
||||
setup_client: SetupClient,
|
||||
control: Control<C>,
|
||||
objects: Objects<C>,
|
||||
}
|
||||
|
||||
impl<C: Generic + Send> AcceptSetup<C> {
|
||||
// Return the setup message we received.
|
||||
pub fn setup(&self) -> &SetupClient {
|
||||
&self.setup_client
|
||||
}
|
||||
|
||||
// Accept the session with our own setup message.
|
||||
pub async fn accept(mut self, setup_server: SetupServer) -> anyhow::Result<Session<C>> {
|
||||
self.control.send(setup_server).await?;
|
||||
Ok(Session {
|
||||
control: self.control,
|
||||
objects: self.objects,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn reject(self) -> anyhow::Result<()> {
|
||||
// TODO Close the QUIC connection with an error code.
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
|
@ -16,15 +16,15 @@ categories = [ "multimedia", "network-programming", "web-programming" ]
|
|||
|
||||
[dependencies]
|
||||
moq-transport = { path = "../moq-transport" }
|
||||
moq-transport-quinn = { path = "../moq-transport-quinn" }
|
||||
webtransport-generic = "0.2"
|
||||
|
||||
bytes = "1"
|
||||
tokio = "1.27"
|
||||
mp4 = "0.13.0"
|
||||
anyhow = "1.0.70"
|
||||
log = "0.4" # TODO remove
|
||||
|
||||
# QUIC stuff
|
||||
quinn = "0.10"
|
||||
ring = "0.16.20"
|
||||
rustls = "0.21.2"
|
||||
rustls-pemfile = "1.0.2"
|
||||
|
|
|
@ -2,12 +2,13 @@ use std::collections::HashMap;
|
|||
use std::sync::{Arc, Mutex};
|
||||
use std::time;
|
||||
|
||||
use tokio::io::AsyncReadExt;
|
||||
use bytes::Buf;
|
||||
use webtransport_generic::{Connection, RecvStream};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinSet; // lock across await boundaries
|
||||
|
||||
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
|
||||
use moq_transport_quinn::{RecvObjects, RecvStream};
|
||||
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt, RecvObjects};
|
||||
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
|
@ -16,9 +17,9 @@ use crate::model::{broadcast, segment, track};
|
|||
use crate::source::Source;
|
||||
|
||||
// TODO experiment with making this Clone, so every task can have its own copy.
|
||||
pub struct Session {
|
||||
pub struct Session<C: Connection + Send> {
|
||||
// Used to receive objects.
|
||||
objects: RecvObjects,
|
||||
objects: RecvObjects<C>,
|
||||
|
||||
// Used to send and receive control messages.
|
||||
control: control::Component<control::Contribute>,
|
||||
|
@ -36,9 +37,9 @@ pub struct Session {
|
|||
run_segments: JoinSet<anyhow::Result<()>>, // receiving objects
|
||||
}
|
||||
|
||||
impl Session {
|
||||
impl<R: RecvStream + Send + 'static, C: Connection<RecvStream = R> + Send> Session<C> {
|
||||
pub fn new(
|
||||
objects: RecvObjects,
|
||||
objects: RecvObjects<C>,
|
||||
control: control::Component<control::Contribute>,
|
||||
broker: broker::Broadcasts,
|
||||
) -> Self {
|
||||
|
@ -63,7 +64,7 @@ impl Session {
|
|||
},
|
||||
object = self.objects.recv() => {
|
||||
let (object, stream) = object.context("failed to receive object")?;
|
||||
let res = self.receive_object(object, stream).await;
|
||||
let res = self.receive_object(object, Box::new(stream)).await;
|
||||
if let Err(err) = res {
|
||||
log::error!("failed to receive object: {:?}", err);
|
||||
}
|
||||
|
@ -88,7 +89,7 @@ impl Session {
|
|||
}
|
||||
}
|
||||
|
||||
async fn receive_object(&mut self, object: Object, stream: RecvStream) -> anyhow::Result<()> {
|
||||
async fn receive_object(&mut self, object: Object, stream: Box<R>) -> anyhow::Result<()> {
|
||||
let track = object.track;
|
||||
|
||||
let segment = segment::Info {
|
||||
|
@ -111,16 +112,18 @@ impl Session {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> {
|
||||
let mut buf = [0u8; 32 * 1024];
|
||||
async fn run_segment(mut segment: segment::Publisher, mut stream: Box<R>) -> anyhow::Result<()> {
|
||||
loop {
|
||||
let size = stream.read(&mut buf).await.context("failed to read from stream")?;
|
||||
if size == 0 {
|
||||
let mut b = bytes::BytesMut::new();
|
||||
let stream_finished = !moq_transport::recv(stream.as_mut(), &mut b)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{:?}", e.into()))
|
||||
.context("error receiving control message")?;
|
||||
if stream_finished {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let chunk = buf[..size].to_vec();
|
||||
segment.fragments.push(chunk.into())
|
||||
segment.fragments.push(b.chunk().to_vec().into())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -174,7 +177,7 @@ impl Session {
|
|||
}
|
||||
}
|
||||
|
||||
impl Drop for Session {
|
||||
impl<C: Connection + Send> Drop for Session<C> {
|
||||
fn drop(&mut self) {
|
||||
// Unannounce all broadcasts we have announced.
|
||||
// TODO make this automatic so we can't screw up?
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
use webtransport_generic::{SendStream, RecvStream};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk};
|
||||
use moq_transport_quinn::Control;
|
||||
use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk, Control};
|
||||
|
||||
pub struct Main {
|
||||
control: Control,
|
||||
pub struct Main<S: SendStream, R: RecvStream> {
|
||||
control: Control<S, R>,
|
||||
outgoing: mpsc::Receiver<Message>,
|
||||
|
||||
contribute: mpsc::Sender<Contribute>,
|
||||
distribute: mpsc::Sender<Distribute>,
|
||||
}
|
||||
|
||||
impl Main {
|
||||
impl<S: SendStream, R: RecvStream> Main <S, R> {
|
||||
pub async fn run(mut self) -> anyhow::Result<()> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
|
@ -51,7 +51,7 @@ impl<T> Component<T> {
|
|||
}
|
||||
|
||||
// Splits a control stream into two components, based on if it's a message for contribution or distribution.
|
||||
pub fn split(control: Control) -> (Main, Component<Contribute>, Component<Distribute>) {
|
||||
pub fn split<S: SendStream, R: RecvStream>(control: Control<S, R>) -> (Main<S, R>, Component<Contribute>, Component<Distribute>) {
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(1);
|
||||
let (contribute_tx, contribute_rx) = mpsc::channel(1);
|
||||
let (distribute_tx, distribute_rx) = mpsc::channel(1);
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
use anyhow::Context;
|
||||
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use bytes::Buf;
|
||||
use webtransport_generic::{SendStream, Connection};
|
||||
use tokio::task::JoinSet; // allows locking across await
|
||||
|
||||
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
|
||||
use moq_transport_quinn::SendObjects;
|
||||
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt, SendObjects};
|
||||
|
||||
use super::{broker, control};
|
||||
use crate::model::{segment, track};
|
||||
|
||||
pub struct Session {
|
||||
pub struct Session<C: Connection + Send> {
|
||||
// Objects are sent to the client
|
||||
objects: SendObjects,
|
||||
objects: SendObjects<C>,
|
||||
|
||||
// Used to send and receive control messages.
|
||||
control: control::Component<control::Distribute>,
|
||||
|
@ -23,9 +23,11 @@ pub struct Session {
|
|||
run_subscribes: JoinSet<SubscribeError>, // run subscriptions, sending the returned error if they fail
|
||||
}
|
||||
|
||||
impl Session {
|
||||
impl<S, C> Session<C> where
|
||||
S: SendStream + Send,
|
||||
C: Connection<SendStream = S> + Send + 'static {
|
||||
pub fn new(
|
||||
objects: SendObjects,
|
||||
objects: SendObjects<C>,
|
||||
control: control::Component<control::Distribute>,
|
||||
broker: broker::Broadcasts,
|
||||
) -> Self {
|
||||
|
@ -119,7 +121,7 @@ impl Session {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_subscribe(objects: SendObjects, track_id: VarInt, mut track: track::Subscriber) -> SubscribeError {
|
||||
async fn run_subscribe(objects: SendObjects<C>, track_id: VarInt, mut track: track::Subscriber) -> SubscribeError {
|
||||
let mut tasks = JoinSet::new();
|
||||
let mut result = None;
|
||||
|
||||
|
@ -154,7 +156,7 @@ impl Session {
|
|||
}
|
||||
|
||||
async fn serve_group(
|
||||
mut objects: SendObjects,
|
||||
mut objects: SendObjects<C>,
|
||||
track_id: VarInt,
|
||||
mut segment: segment::Subscriber,
|
||||
) -> anyhow::Result<()> {
|
||||
|
@ -169,11 +171,16 @@ impl Session {
|
|||
|
||||
// Write each fragment as they are available.
|
||||
while let Some(fragment) = segment.fragments.next().await {
|
||||
stream.write_all(fragment.as_slice()).await?;
|
||||
let mut buf = bytes::Bytes::copy_from_slice(fragment.as_slice());
|
||||
while buf.has_remaining() {
|
||||
moq_transport::send(&mut stream, &mut buf)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("{:?}", e.into()))
|
||||
.context("error sending control message")?;
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: stream is automatically closed when dropped
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -1,21 +1,6 @@
|
|||
use super::{broker, Session};
|
||||
use super::broker;
|
||||
|
||||
use std::{fs, io, net, path, sync, time};
|
||||
|
||||
use anyhow::Context;
|
||||
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
pub struct Server {
|
||||
// The MoQ transport server.
|
||||
server: moq_transport_quinn::Server,
|
||||
|
||||
// The media sources.
|
||||
broker: broker::Broadcasts,
|
||||
|
||||
// Sessions actively being run.
|
||||
tasks: JoinSet<anyhow::Result<()>>,
|
||||
}
|
||||
use std::{net, path};
|
||||
|
||||
pub struct ServerConfig {
|
||||
pub addr: net::SocketAddr,
|
||||
|
@ -24,82 +9,3 @@ pub struct ServerConfig {
|
|||
|
||||
pub broker: broker::Broadcasts,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
// Create a new server
|
||||
pub fn new(config: ServerConfig) -> anyhow::Result<Self> {
|
||||
// Read the PEM certificate chain
|
||||
let certs = fs::File::open(config.cert).context("failed to open cert file")?;
|
||||
let mut certs = io::BufReader::new(certs);
|
||||
let certs = rustls_pemfile::certs(&mut certs)?
|
||||
.into_iter()
|
||||
.map(rustls::Certificate)
|
||||
.collect();
|
||||
|
||||
// Read the PEM private key
|
||||
let keys = fs::File::open(config.key).context("failed to open key file")?;
|
||||
let mut keys = io::BufReader::new(keys);
|
||||
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut keys)?;
|
||||
|
||||
anyhow::ensure!(keys.len() == 1, "expected a single key");
|
||||
let key = rustls::PrivateKey(keys.remove(0));
|
||||
|
||||
let mut tls_config = rustls::ServerConfig::builder()
|
||||
.with_safe_default_cipher_suites()
|
||||
.with_safe_default_kx_groups()
|
||||
.with_protocol_versions(&[&rustls::version::TLS13])
|
||||
.unwrap()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(certs, key)?;
|
||||
|
||||
tls_config.max_early_data_size = u32::MAX;
|
||||
let alpn: Vec<Vec<u8>> = vec![
|
||||
b"h3".to_vec(),
|
||||
b"h3-32".to_vec(),
|
||||
b"h3-31".to_vec(),
|
||||
b"h3-30".to_vec(),
|
||||
b"h3-29".to_vec(),
|
||||
];
|
||||
tls_config.alpn_protocols = alpn;
|
||||
|
||||
let mut server_config = quinn::ServerConfig::with_crypto(sync::Arc::new(tls_config));
|
||||
|
||||
// Enable BBR congestion control
|
||||
// TODO validate the implementation
|
||||
let mut transport_config = quinn::TransportConfig::default();
|
||||
transport_config.keep_alive_interval(Some(time::Duration::from_secs(2)));
|
||||
transport_config.congestion_controller_factory(sync::Arc::new(quinn::congestion::BbrConfig::default()));
|
||||
|
||||
server_config.transport = sync::Arc::new(transport_config);
|
||||
let server = quinn::Endpoint::server(server_config, config.addr)?;
|
||||
let broker = config.broker;
|
||||
|
||||
let server = moq_transport_quinn::Server::new(server);
|
||||
let tasks = JoinSet::new();
|
||||
|
||||
Ok(Self { server, broker, tasks })
|
||||
}
|
||||
|
||||
pub async fn run(mut self) -> anyhow::Result<()> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
res = self.server.accept() => {
|
||||
let session = res.context("failed to accept connection")?;
|
||||
let broker = self.broker.clone();
|
||||
|
||||
self.tasks.spawn(async move {
|
||||
let session: Session = Session::accept(session, broker).await?;
|
||||
session.run().await
|
||||
});
|
||||
},
|
||||
res = self.tasks.join_next(), if !self.tasks.is_empty() => {
|
||||
let res = res.expect("no tasks").expect("task aborted");
|
||||
|
||||
if let Err(err) = res {
|
||||
log::error!("session terminated: {:?}", err);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,49 +1,26 @@
|
|||
use anyhow::Context;
|
||||
use webtransport_generic::{SendStream, Connection, RecvStream};
|
||||
|
||||
use super::{broker, contribute, control, distribute};
|
||||
|
||||
use moq_transport::{Role, SetupServer, Version};
|
||||
use moq_transport_quinn::Connect;
|
||||
|
||||
pub struct Session {
|
||||
pub struct Session<C: Connection + Send> {
|
||||
// Split logic into contribution/distribution to reduce the problem space.
|
||||
contribute: contribute::Session,
|
||||
distribute: distribute::Session,
|
||||
contribute: contribute::Session<C>,
|
||||
distribute: distribute::Session<C>,
|
||||
|
||||
// Used to receive control messages and forward to contribute/distribute.
|
||||
control: control::Main,
|
||||
control: control::Main<C::SendStream, C::RecvStream>,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
pub async fn accept(session: Connect, broker: broker::Broadcasts) -> anyhow::Result<Session> {
|
||||
// Accep the WebTransport session.
|
||||
// OPTIONAL validate the conn.uri() otherwise call conn.reject()
|
||||
let session = session
|
||||
.accept()
|
||||
.await
|
||||
.context(": server::Setupfailed to accept WebTransport session")?;
|
||||
|
||||
session
|
||||
.setup()
|
||||
.versions
|
||||
.iter()
|
||||
.find(|v| **v == Version::DRAFT_00)
|
||||
.context("failed to find supported version")?;
|
||||
|
||||
// Choose our role based on the client's role.
|
||||
let role = match session.setup().role {
|
||||
Role::Publisher => Role::Subscriber,
|
||||
Role::Subscriber => Role::Publisher,
|
||||
Role::Both => Role::Both,
|
||||
};
|
||||
|
||||
let setup = SetupServer {
|
||||
version: Version::DRAFT_00,
|
||||
role,
|
||||
};
|
||||
|
||||
let session = session.accept(setup).await?;
|
||||
|
||||
impl<R, S, C> Session<C> where
|
||||
R: RecvStream + Send + 'static,
|
||||
S: SendStream + Send,
|
||||
C: Connection<RecvStream = R, SendStream = S> + Send + 'static
|
||||
{
|
||||
pub async fn from_transport_session(
|
||||
session: moq_transport::Session<C>,
|
||||
broker: broker::Broadcasts,
|
||||
) -> anyhow::Result<Session<C>> {
|
||||
let (control, objects) = session.split();
|
||||
let (objects_send, objects_recv) = objects.split();
|
||||
|
||||
|
|
Loading…
Reference in New Issue