diff --git a/Cargo.toml b/Cargo.toml index ff2de30..2ce6f6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,8 +2,9 @@ members = [ "moq-transport", "moq-transport-quinn", - "moq-demo", + "moq-demo-quiche", + "moq-demo-quinn", "moq-warp", "transport", - "moq-transport-trait", + "moq-transport-generic", ] diff --git a/moq-demo/Cargo.toml b/moq-demo-quiche/Cargo.toml similarity index 91% rename from moq-demo/Cargo.toml rename to moq-demo-quiche/Cargo.toml index 023672b..f6dfdfc 100644 --- a/moq-demo/Cargo.toml +++ b/moq-demo-quiche/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "moq-demo" +name = "moq-demo-quiche" description = "Media over QUIC" authors = [ "Luke Curley" ] repository = "https://github.com/kixelated/moq-rs" @@ -39,7 +39,7 @@ log = { version = "0.4", features = ["std"] } env_logger = "0.9.3" anyhow = "1.0.70" -moq-transport-trait = { path = "../moq-transport-trait" } +moq-transport-generic = { path = "../moq-transport-generic" } # moq-generic-transport = { path = "../transport" } moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"} webtransport_quiche = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" } diff --git a/moq-demo/src/main.rs b/moq-demo-quiche/src/main.rs similarity index 59% rename from moq-demo/src/main.rs rename to moq-demo-quiche/src/main.rs index a10da91..9306429 100644 --- a/moq-demo/src/main.rs +++ b/moq-demo-quiche/src/main.rs @@ -120,92 +120,67 @@ async fn main() -> anyhow::Result<()> { broker: broker.clone(), }; - if args.quiche { - let (server, socket, regexes) = new_quiche(config).unwrap(); - let server = Arc::new(std::sync::Mutex::new(server)); - let socket = Arc::new(socket); - let mut buf = vec![0; 10000]; - let mut tasks = JoinSet::new(); - 'mainloop: loop { - println!("listen..."); - let cid = { - // let mut server = endpoint.quiche_server.lock().await; - let ret = async_webtransport_handler::AsyncWebTransportServer::listen_ref(server.clone(), socket.clone(), &mut buf).await?; - println!("listen returned {:?}", ret); - match ret { - Some(cid) => cid, - None => continue 'mainloop, - } - }; - - loop { - println!("poll"); - match server.lock().unwrap().poll(&cid, ®exes[..]) { - Ok(async_webtransport_handler::Event::NewSession(path, session_id, _regex_index)) => { + let (server, socket, regexes) = new_quiche(config).unwrap(); + let server = Arc::new(std::sync::Mutex::new(server)); + let socket = Arc::new(socket); + let mut buf = vec![0; 10000]; + let mut tasks = JoinSet::new(); + 'mainloop: loop { + println!("listen..."); + let cid = { + // let mut server = endpoint.quiche_server.lock().await; + let ret = async_webtransport_handler::AsyncWebTransportServer::listen_ref(server.clone(), socket.clone(), &mut buf).await?; + println!("listen returned {:?}", ret); + match ret { + Some(cid) => cid, + None => continue 'mainloop, + } + }; + + loop { + println!("poll"); + match server.lock().unwrap().poll(&cid, ®exes[..]) { + Ok(async_webtransport_handler::Event::NewSession(path, session_id, _regex_index)) => { - let server = server.clone(); - let cid = cid.clone(); - let broker = broker.clone(); - tasks.spawn(async move { - // let control_stream = async_webtransport_handler::ServerBidiStream::new(server.clone(), cid.clone(), session_id, session_id); - let mut webtransport_session = async_webtransport_handler::WebTransportSession::new(server.clone(), cid.clone(), session_id); - let control_stream = moq_generic_transport::accept_bidi(&mut webtransport_session).await.unwrap().unwrap(); - // let control_stream = async_webtransport_handler::ServerBidiStream::new(server.clone(), cid.clone(), session_id, control_stream_id); - // let session = moq_transport_trait::Session::new(Box::new(control_stream), Box::new(webtransport_session)); - let received_client_setup = moq_transport_trait::Session::accept(Box::new(control_stream), Box::new(webtransport_session)).await.unwrap(); - // TODO: maybe reject setup - let role = match received_client_setup.setup().role { - Role::Publisher => Role::Subscriber, - Role::Subscriber => Role::Publisher, - Role::Both => Role::Both, - }; - let setup_server = SetupServer { - version: Version::DRAFT_00, - role, - }; - - let session = received_client_setup.accept(setup_server).await.unwrap(); - let session = relay::Session::from_session(session, broker.clone()).await.unwrap(); - session.run().await - }); - }, - Ok(async_webtransport_handler::Event::StreamData(session_id, stream_id)) => { - log::trace!("new data!"); - }, - Ok(async_webtransport_handler::Event::Done) => { - println!("H3 Done"); - break; - }, - Ok(async_webtransport_handler::Event::GoAway) => { - println!("GOAWAY"); - break; - }, + let server = server.clone(); + let cid = cid.clone(); + let broker = broker.clone(); + tasks.spawn(async move { + let mut webtransport_session = async_webtransport_handler::WebTransportSession::new(server.clone(), cid.clone(), session_id); + let control_stream = moq_generic_transport::accept_bidi(&mut webtransport_session).await.unwrap().unwrap(); + let received_client_setup = moq_transport_generic::Session::accept(Box::new(control_stream), Box::new(webtransport_session)).await.unwrap(); + // TODO: maybe reject setup + let role = match received_client_setup.setup().role { + Role::Publisher => Role::Subscriber, + Role::Subscriber => Role::Publisher, + Role::Both => Role::Both, + }; + let setup_server = SetupServer { + version: Version::DRAFT_00, + role, + }; + + let session = received_client_setup.accept(setup_server).await.unwrap(); + let session = relay::Session::from_transport_session(session, broker.clone()).await.unwrap(); + session.run().await + }); + }, + Ok(async_webtransport_handler::Event::StreamData(session_id, stream_id)) => { + log::trace!("new data!"); + }, + Ok(async_webtransport_handler::Event::Done) => { + println!("H3 Done"); + break; + }, + Ok(async_webtransport_handler::Event::GoAway) => { + println!("GOAWAY"); + break; + }, - Err(_) => todo!(), - } + Err(_) => todo!(), } } - - // let session = moq_transport_trait::Session::new(control_stream, connection) - // let server = relay::Server::new(config).context("failed to create server")?; - // // Run all of the above - // tokio::select! { - // res = server.run() => res.context("failed to run server"), - // res = media.run() => res.context("failed to run media source"), - // res = serve => res.context("failed to run HTTP server"), - // } - } else { - // let server = relay::Server::new(config).context("failed to create server")?; - - // Run all of the above - // tokio::select! { - // res = server.run() => res.context("failed to run server"), - // res = media.run() => res.context("failed to run media source"), - // res = serve => res.context("failed to run HTTP server"), - // } - } - Ok(()) } // Run a HTTP server using Warp diff --git a/moq-demo-quinn/Cargo.toml b/moq-demo-quinn/Cargo.toml new file mode 100644 index 0000000..aaa64dd --- /dev/null +++ b/moq-demo-quinn/Cargo.toml @@ -0,0 +1,53 @@ +[package] +name = "moq-demo-quinn" +description = "Media over QUIC" +authors = [ "Luke Curley" ] +repository = "https://github.com/kixelated/moq-rs" +license = "MIT OR Apache-2.0" + +version = "0.1.0" +edition = "2021" + +keywords = [ "quic", "http3", "webtransport", "media", "live" ] +categories = [ "multimedia", "network-programming", "web-programming" ] + + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +moq-transport = { path = "../moq-transport" } +moq-transport-quinn = { path = "../moq-transport-quinn" } +moq-warp = { path = "../moq-warp" } + +h3 = { git = "https://github.com/hyperium/h3", branch = "master" } +h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" } +h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" } + +# QUIC +quinn = "0.10" + +# Crypto +ring = "0.16.20" +rustls = "0.21.2" +rustls-pemfile = "1.0.2" + +# Async stuff +tokio = { version = "1.29.1", features = ["full"] } + +# Web server to serve the fingerprint +warp = { version = "0.3.3", features = ["tls"] } +hex = "0.4.3" + +# Logging +clap = { version = "4.0", features = [ "derive" ] } +log = { version = "0.4", features = ["std"] } +env_logger = "0.9.3" +anyhow = "1.0.70" + +bytes= "1" + +moq-transport-generic = { path = "../moq-transport-generic" } +# moq-generic-transport = { path = "../transport" } +moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"} +webtransport_quiche = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" } +async_webtransport = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" } \ No newline at end of file diff --git a/moq-demo-quinn/src/main.rs b/moq-demo-quinn/src/main.rs new file mode 100644 index 0000000..46c6b95 --- /dev/null +++ b/moq-demo-quinn/src/main.rs @@ -0,0 +1,152 @@ +use std::{fs, io, net, path}; + +use anyhow::Context; +use clap::Parser; +use moq_transport::{Role, SetupServer, Version}; +use ring::digest::{digest, SHA256}; +use tokio::task::JoinSet; +use warp::Filter; + +use moq_warp::{relay::{self}, source}; + +mod server; + +/// Search for a pattern in a file and display the lines that contain it. +#[derive(Parser, Clone)] +struct Cli { + /// Listen on this address + #[arg(short, long, default_value = "[::]:4443")] + addr: net::SocketAddr, + + /// Use the certificate file at this path + #[arg(short, long, default_value = "cert/localhost.crt")] + cert: path::PathBuf, + + /// Use the private key at this path + #[arg(short, long, default_value = "cert/localhost.key")] + key: path::PathBuf, + + /// Use the media file at this path + #[arg(short, long, default_value = "media/fragmented.mp4")] + media: path::PathBuf, + +} + + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + let args = Cli::parse(); + + // Create a web server to serve the fingerprint + let serve = serve_http(args.clone()); + let mut tasks = JoinSet::new(); + tasks.spawn(async move { + serve.await.unwrap(); + }); + + // Create a fake media source from disk. + let media = source::File::new(args.media).context("failed to open file source")?; + + let broker = relay::broker::Broadcasts::new(); + broker + .announce("quic.video/demo", media.source()) + .context("failed to announce file source")?; + + let mut tasks = JoinSet::new(); + tasks.spawn(async move { + media.run().await.unwrap(); + }); + + // Create a server to actually serve the media + let config = relay::ServerConfig { + addr: args.addr, + cert: args.cert, + key: args.key, + broker: broker.clone(), + }; + + let quinn = server::Server::new_quinn_connection(config).unwrap(); + + let mut tasks = JoinSet::new(); + loop { + let broker = broker.clone(); + tokio::select! { + connect = server::Server::accept_new_webtransport_session(&quinn) => { + tasks.spawn(async move { + let client_setup = connect?.accept().await?; + // TODO: maybe reject setup + let role = match client_setup.setup().role { + Role::Publisher => Role::Subscriber, + Role::Subscriber => Role::Publisher, + Role::Both => Role::Both, + }; + let setup_server = SetupServer { + version: Version::DRAFT_00, + role, + }; + + let session = client_setup.accept(setup_server).await.unwrap(); + let session = relay::Session::from_transport_session(session, broker.clone()).await.unwrap(); + session.run().await?; + let ret: anyhow::Result<()> = Ok(()); + ret + }); + } + res = tasks.join_next(), if !tasks.is_empty() => { + let res = res.expect("no tasks").expect("task aborted"); + + if let Err(err) = res { + log::error!("session terminated: {:?}", err); + } + }, + } + } + + + + + // let server = relay::Server::new(config).context("failed to create server")?; + + // Run all of the above + // tokio::select! { + // res = server.run() => res.context("failed to run server"), + // res = media.run() => res.context("failed to run media source"), + // res = serve => res.context("failed to run HTTP server"), + // } +} + +// Run a HTTP server using Warp +// TODO remove this when Chrome adds support for self-signed certificates using WebTransport +async fn serve_http(args: Cli) -> anyhow::Result<()> { + // Read the PEM certificate file + let crt = fs::File::open(&args.cert)?; + let mut crt = io::BufReader::new(crt); + + // Parse the DER certificate + let certs = rustls_pemfile::certs(&mut crt)?; + let cert = certs.first().expect("no certificate found"); + + // Compute the SHA-256 digest + let fingerprint = digest(&SHA256, cert.as_ref()); + let fingerprint = hex::encode(fingerprint.as_ref()); + let fingerprint = std::sync::Arc::new(fingerprint); + + let cors = warp::cors().allow_any_origin(); + + // What an annoyingly complicated way to serve a static String + // I spent a long time trying to find the exact way of cloning and dereferencing the Arc. + let routes = warp::path!("fingerprint") + .map(move || (*(fingerprint.clone())).clone()) + .with(cors); + + warp::serve(routes) + .tls() + .cert_path(args.cert) + .key_path(args.key) + .run(args.addr) + .await; + + Ok(()) +} diff --git a/moq-demo-quinn/src/server/mod.rs b/moq-demo-quinn/src/server/mod.rs new file mode 100644 index 0000000..207ed6f --- /dev/null +++ b/moq-demo-quinn/src/server/mod.rs @@ -0,0 +1,247 @@ +use std::{fs, io, sync::{self, Arc}, time}; + +use anyhow::Context; +use bytes::Bytes; +use h3::{quic::StreamId, proto::varint::VarInt}; +use h3_webtransport::server::AcceptedBi; +use moq_transport::Message; +use moq_transport_generic::{AcceptSetup, Control, Objects}; +use moq_warp::relay::{broker, ServerConfig}; +use tokio::task::JoinSet; +use warp::{Future, http}; + +use self::stream::{QuinnBidiStream, QuinnSendStream, QuinnRecvStream}; + +mod stream; + +fn stream_id_to_u64(stream_id: StreamId) -> u64 { + let val: VarInt = stream_id.into(); + val.into_inner() +} + +pub struct Server { + // The MoQ transport server. + server: h3_webtransport::server::WebTransportSession, +} + +impl Server { + // Create a new server + pub fn new_quinn_connection(config: ServerConfig) -> anyhow::Result { + // Read the PEM certificate chain + let certs = fs::File::open(config.cert).context("failed to open cert file")?; + let mut certs = io::BufReader::new(certs); + let certs = rustls_pemfile::certs(&mut certs)? + .into_iter() + .map(rustls::Certificate) + .collect(); + + // Read the PEM private key + let keys = fs::File::open(config.key).context("failed to open key file")?; + let mut keys = io::BufReader::new(keys); + let mut keys = rustls_pemfile::pkcs8_private_keys(&mut keys)?; + + anyhow::ensure!(keys.len() == 1, "expected a single key"); + let key = rustls::PrivateKey(keys.remove(0)); + + let mut tls_config = rustls::ServerConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_protocol_versions(&[&rustls::version::TLS13]) + .unwrap() + .with_no_client_auth() + .with_single_cert(certs, key)?; + + tls_config.max_early_data_size = u32::MAX; + let alpn: Vec> = vec![ + b"h3".to_vec(), + b"h3-32".to_vec(), + b"h3-31".to_vec(), + b"h3-30".to_vec(), + b"h3-29".to_vec(), + ]; + tls_config.alpn_protocols = alpn; + + let mut server_config = quinn::ServerConfig::with_crypto(sync::Arc::new(tls_config)); + + // Enable BBR congestion control + // TODO validate the implementation + let mut transport_config = quinn::TransportConfig::default(); + transport_config.keep_alive_interval(Some(time::Duration::from_secs(2))); + transport_config.congestion_controller_factory(sync::Arc::new(quinn::congestion::BbrConfig::default())); + + server_config.transport = sync::Arc::new(transport_config); + let server = quinn::Endpoint::server(server_config, config.addr)?; + + Ok(server) + } + + pub async fn accept_new_webtransport_session(endpoint: &h3_quinn::Endpoint) -> anyhow::Result { + let mut handshake = JoinSet::new(); + // perform a quic handshake + loop { + tokio::select!( + // Accept the connection and start the WebTransport handshake. + conn = endpoint.accept() => { + let conn = conn.context("failed to accept connection").unwrap(); + handshake.spawn(async move { + + let conn = conn.await.context("failed to accept h3 connection")?; + + let mut conn = h3::server::builder() + .enable_webtransport(true) + .enable_connect(true) + .enable_datagram(true) + .max_webtransport_sessions(1) + .send_grease(true) + .build(h3_quinn::Connection::new(conn)) + .await + .context("failed to create h3 server")?; + + let (req, stream) = conn + .accept() + .await + .context("failed to accept h3 session")? + .context("failed to accept h3 request")?; + + let ext = req.extensions(); + anyhow::ensure!(req.method() == http::Method::CONNECT, "expected CONNECT request"); + anyhow::ensure!( + ext.get::() == Some(&h3::ext::Protocol::WEB_TRANSPORT), + "expected WebTransport CONNECT" + ); + + // Let the application decide if we accept this CONNECT request. + Ok(Connect { conn, req, stream }) + }); + }, + // Return any mostly finished WebTransport handshakes. + res = handshake.join_next(), if !handshake.is_empty() => { + let res = res.expect("no tasks").expect("task aborted"); + match res { + Ok(connect_request) => return Ok(connect_request), + Err(err) => log::warn!("failed to accept session: {:?}", err), + } + }, + ) + } + } + + // pub async fn run(mut self) -> anyhow::Result<()> { + // loop { + // tokio::select! { + // res = self.server.accept() => { + // let session = res.context("failed to accept connection")?; + // let broker = self.broker.clone(); + + // self.tasks.spawn(async move { + // let session: Session = Session::accept(session, broker).await?; + // session.run().await + // }); + // }, + // res = self.tasks.join_next(), if !self.tasks.is_empty() => { + // let res = res.expect("no tasks").expect("task aborted"); + + // if let Err(err) = res { + // log::error!("session terminated: {:?}", err); + // } + // }, + // } + // } + // } +} + +// The WebTransport CONNECT has arrived, and we need to decide if we accept it. +pub struct Connect { + // Inspect to decide whether to accept() or reject() the session. + req: http::Request<()>, + + conn: h3::server::Connection, + stream: h3::server::RequestStream, Bytes>, +} + +impl Connect { + // Expose the received URI + pub fn uri(&self) -> &http::Uri { + self.req.uri() + } + + // Accept the WebTransport session. + pub async fn accept(self) -> anyhow::Result> { + let session = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?; + let mut session = Server{server: session}; + + let control_stream = moq_generic_transport::accept_bidi(&mut session) + .await + .context("failed to accept bidi stream")? + .unwrap(); + + Ok(moq_transport_generic::Session::accept(Box::new(control_stream), Box::new(session)).await?) + } + + // Reject the WebTransport session with a HTTP response. + pub async fn reject(mut self, resp: http::Response<()>) -> anyhow::Result<()> { + self.stream.send_response(resp).await?; + Ok(()) + } +} + + +impl moq_generic_transport::Connection for Server { + type BidiStream = QuinnBidiStream; + + type SendStream = QuinnSendStream; + + type RecvStream = QuinnRecvStream; + + fn poll_accept_recv( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, anyhow::Error>> { + let fut = self.server.accept_uni(); + let fut = std::pin::pin!(fut); + fut.poll(cx) + .map_ok(|opt| opt.map(|(_, s)| QuinnRecvStream::new(s))) + .map_err(|e| anyhow::anyhow!("{:?}", e)) + } + + fn poll_accept_bidi( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, anyhow::Error>> { + let fut = self.server.accept_bi(); + let fut = std::pin::pin!(fut); + let res = std::task::ready!(fut.poll(cx).map_err(|e| anyhow::anyhow!("{:?}", e))); + match res { + Ok(Some(AcceptedBi::Request(_, _))) => std::task::Poll::Ready(Err(anyhow::anyhow!("received new session whils accepting bidi stream"))), + Ok(Some(AcceptedBi::BidiStream(_, s))) => std::task::Poll::Ready(Ok(Some(QuinnBidiStream::new(s)))), + Ok(None) => std::task::Poll::Ready(Ok(None)), + Err(e) => std::task::Poll::Ready(Err(e)), + } + } + + fn poll_open_bidi( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let fut = self.server.open_bi(self.server.session_id()); + let fut = std::pin::pin!(fut); + fut.poll(cx) + .map_ok(|s| QuinnBidiStream::new(s)) + .map_err(|e| anyhow::anyhow!("{:?}", e)) + } + + fn poll_open_send( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let fut = self.server.open_uni(self.server.session_id()); + let fut = std::pin::pin!(fut); + fut.poll(cx) + .map_ok(|s| QuinnSendStream::new(s)) + .map_err(|e| anyhow::anyhow!("{:?}", e)) + } + + fn close(&mut self, _code: u64, _reason: &[u8]) { + todo!("not implemented") + } +} \ No newline at end of file diff --git a/moq-demo-quinn/src/server/stream.rs b/moq-demo-quinn/src/server/stream.rs new file mode 100644 index 0000000..d81fdbe --- /dev/null +++ b/moq-demo-quinn/src/server/stream.rs @@ -0,0 +1,149 @@ +use h3::quic::SendStream; +use h3::quic::RecvStream; +use h3::quic::BidiStream; +use h3::quic::SendStreamUnframed; + +use super::stream_id_to_u64; + +pub struct QuinnSendStream { + stream: h3_webtransport::stream::SendStream, bytes::Bytes> +} + +impl QuinnSendStream { + pub fn new(stream: h3_webtransport::stream::SendStream, bytes::Bytes>) -> QuinnSendStream { + QuinnSendStream { stream } + } +} + +impl moq_generic_transport::SendStream for QuinnSendStream { + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.stream.poll_ready(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) + } + + fn poll_finish(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.stream.poll_finish(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) + } + + fn reset(&mut self, reset_code: u64) { + self.stream.reset(reset_code) + } + + fn send_id(&self) -> u64 { + stream_id_to_u64(self.stream.send_id()) + } +} + +impl moq_generic_transport::SendStreamUnframed for QuinnSendStream { + fn poll_send( + &mut self, + cx: &mut std::task::Context<'_>, + buf: &mut D, + ) -> std::task::Poll> { + self.stream.poll_send(cx, buf).map_err(|e| anyhow::anyhow!("{:?}", e)) + } +} + + +pub struct QuinnRecvStream { + stream: h3_webtransport::stream::RecvStream +} + +impl QuinnRecvStream { + pub fn new(stream: h3_webtransport::stream::RecvStream) -> QuinnRecvStream { + QuinnRecvStream { stream } + } +} + +impl moq_generic_transport::RecvStream for QuinnRecvStream { + type Buf = bytes::Bytes; + + fn poll_data( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, anyhow::Error>> { + self.stream.poll_data(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) + } + + fn stop_sending(&mut self, error_code: u64) { + self.stream.stop_sending(error_code) + } + + fn recv_id(&self) -> u64 { + stream_id_to_u64(self.stream.recv_id()) + } +} + +pub struct QuinnBidiStream { + stream: h3_webtransport::stream::BidiStream, bytes::Bytes> +} + +impl QuinnBidiStream { + pub fn new(stream: h3_webtransport::stream::BidiStream, bytes::Bytes>) -> QuinnBidiStream { + QuinnBidiStream { stream } + } +} + +impl moq_generic_transport::BidiStream for QuinnBidiStream { + type SendStream = QuinnSendStream; + + type RecvStream = QuinnRecvStream; + + fn split(self) -> (Self::SendStream, Self::RecvStream) { + let (send, recv) = self.stream.split(); + let send = QuinnSendStream{ + stream: send, + }; + let recv = QuinnRecvStream{ + stream: recv, + }; + (send, recv) + } +} + +impl moq_generic_transport::RecvStream for QuinnBidiStream { + type Buf = bytes::Bytes; + + fn poll_data( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, anyhow::Error>> { + self.stream.poll_data(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) + } + + fn stop_sending(&mut self, error_code: u64) { + self.stream.stop_sending(error_code) + } + + fn recv_id(&self) -> u64 { + stream_id_to_u64(self.stream.recv_id()) + } +} + +impl moq_generic_transport::SendStream for QuinnBidiStream { + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.stream.poll_ready(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) + } + + fn poll_finish(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.stream.poll_finish(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) + } + + fn reset(&mut self, reset_code: u64) { + self.stream.reset(reset_code) + } + + fn send_id(&self) -> u64 { + stream_id_to_u64(self.stream.send_id()) + } +} + +impl moq_generic_transport::SendStreamUnframed for QuinnBidiStream { + fn poll_send( + &mut self, + cx: &mut std::task::Context<'_>, + buf: &mut D, + ) -> std::task::Poll> { + self.stream.poll_send(cx, buf).map_err(|e| anyhow::anyhow!("{:?}", e)) + } +} + diff --git a/moq-transport-trait/Cargo.toml b/moq-transport-generic/Cargo.toml similarity index 96% rename from moq-transport-trait/Cargo.toml rename to moq-transport-generic/Cargo.toml index 2f494a0..2c54182 100644 --- a/moq-transport-trait/Cargo.toml +++ b/moq-transport-generic/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "moq-transport-trait" +name = "moq-transport-generic" description = "Media over QUIC" authors = [ "Luke Curley" ] repository = "https://github.com/kixelated/moq-rs" diff --git a/moq-transport-trait/src/control.rs b/moq-transport-generic/src/control.rs similarity index 95% rename from moq-transport-trait/src/control.rs rename to moq-transport-generic/src/control.rs index 9dca713..1be280c 100644 --- a/moq-transport-trait/src/control.rs +++ b/moq-transport-generic/src/control.rs @@ -1,10 +1,9 @@ -use moq_generic_transport::{SendStream, RecvStream, BidiStream, SendStreamUnframed, Connection}; +use moq_generic_transport::{RecvStream, BidiStream}; use moq_transport::{Decode, DecodeError, Encode, Message}; use bytes::{Buf, BytesMut}; use std::io::Cursor; -use std::marker::PhantomData; use std::sync::Arc; use tokio::sync::Mutex; diff --git a/moq-transport-trait/src/lib.rs b/moq-transport-generic/src/lib.rs similarity index 100% rename from moq-transport-trait/src/lib.rs rename to moq-transport-generic/src/lib.rs diff --git a/moq-transport-trait/src/object.rs b/moq-transport-generic/src/object.rs similarity index 100% rename from moq-transport-trait/src/object.rs rename to moq-transport-generic/src/object.rs diff --git a/moq-transport-trait/src/server.rs b/moq-transport-generic/src/server.rs similarity index 55% rename from moq-transport-trait/src/server.rs rename to moq-transport-generic/src/server.rs index d58c444..9e45e07 100644 --- a/moq-transport-trait/src/server.rs +++ b/moq-transport-generic/src/server.rs @@ -1,47 +1,10 @@ use anyhow::Context; -use moq_generic_transport::{Connection, BidiStream, SendStream, SendStreamUnframed, RecvStream}; +use moq_generic_transport::{Connection, RecvStream}; use moq_transport::{Message, SetupClient, SetupServer}; -use crate::SharedConnection; use super::{Control, Objects}; -// pub struct Server { -// // The Webtransport/QUIC server, with an already established session/connection. -// endpoint: Box, -// } - -// impl Server { -// pub fn new(endpoint: Box) -> Self { -// let handshake = JoinSet::new(); -// Self { endpoint } -// } - -// // Accept the next WebTransport session. -// pub async fn accept(&mut self) -> anyhow::Result { -// loop { -// tokio::select!( -// // Accept the connection and start the WebTransport handshake. -// conn = self.endpoint.accept() => { -// let conn = conn.context("failed to accept connection")?; -// self.handshake.spawn(async move { -// Connecting::new(conn).accept().await -// }); -// }, -// // Return any mostly finished WebTransport handshakes. -// res = self.handshake.join_next(), if !self.handshake.is_empty() => { -// let res = res.expect("no tasks").expect("task aborted"); -// match res { -// Ok(session) => return Ok(session), -// Err(err) => log::warn!("failed to accept session: {:?}", err), -// } -// }, -// ) -// } -// } -// } - - pub struct Session { pub control: Control, pub objects: Objects, diff --git a/moq-warp/Cargo.toml b/moq-warp/Cargo.toml index 3ee9355..028d853 100644 --- a/moq-warp/Cargo.toml +++ b/moq-warp/Cargo.toml @@ -17,7 +17,7 @@ categories = [ "multimedia", "network-programming", "web-programming" ] [dependencies] moq-transport = { path = "../moq-transport" } moq-transport-quinn = { path = "../moq-transport-quinn" } -moq-transport-trait = { path = "../moq-transport-trait" } +moq-transport-generic = { path = "../moq-transport-generic" } moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"} bytes = "1" diff --git a/moq-warp/src/relay/contribute.rs b/moq-warp/src/relay/contribute.rs index e940f0d..afa0aad 100644 --- a/moq-warp/src/relay/contribute.rs +++ b/moq-warp/src/relay/contribute.rs @@ -4,13 +4,12 @@ use std::sync::{Arc, Mutex}; use std::time; use bytes::Buf; -use moq_generic_transport::{Connection, RecvStream, SendStream, SendStreamUnframed, BidiStream}; -use tokio::io::AsyncReadExt; +use moq_generic_transport::{Connection, RecvStream}; use tokio::sync::mpsc; use tokio::task::JoinSet; // lock across await boundaries use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; -use moq_transport_trait::{RecvObjects}; +use moq_transport_generic::RecvObjects; use anyhow::Context; diff --git a/moq-warp/src/relay/control.rs b/moq-warp/src/relay/control.rs index 0da62d6..1400334 100644 --- a/moq-warp/src/relay/control.rs +++ b/moq-warp/src/relay/control.rs @@ -2,7 +2,7 @@ use moq_generic_transport::{SendStream, SendStreamUnframed, BidiStream}; use tokio::sync::mpsc; use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk}; -use moq_transport_trait::Control; +use moq_transport_generic::Control; pub struct Main { control: Control, diff --git a/moq-warp/src/relay/distribute.rs b/moq-warp/src/relay/distribute.rs index e5f6145..541aedd 100644 --- a/moq-warp/src/relay/distribute.rs +++ b/moq-warp/src/relay/distribute.rs @@ -3,12 +3,11 @@ use std::marker::PhantomData; use anyhow::Context; use bytes::Buf; -use moq_generic_transport::{SendStream, SendStreamUnframed, BidiStream, Connection}; -use tokio::io::AsyncWriteExt; +use moq_generic_transport::{SendStream, SendStreamUnframed, Connection}; use tokio::task::JoinSet; // allows locking across await use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; -use moq_transport_trait::SendObjects; +use moq_transport_generic::SendObjects; use super::{broker, control}; use crate::model::{segment, track}; diff --git a/moq-warp/src/relay/server.rs b/moq-warp/src/relay/server.rs index c22a6a6..dd89f8a 100644 --- a/moq-warp/src/relay/server.rs +++ b/moq-warp/src/relay/server.rs @@ -1,21 +1,6 @@ -use super::{broker, Session}; +use super::broker; -use std::{fs, io, net, path, sync, time}; - -use anyhow::Context; - -use tokio::task::JoinSet; - -pub struct Server { - // The MoQ transport server. - server: moq_transport_quinn::Server, - - // The media sources. - broker: broker::Broadcasts, - - // Sessions actively being run. - tasks: JoinSet>, -} +use std::{net, path}; pub struct ServerConfig { pub addr: net::SocketAddr, @@ -24,82 +9,3 @@ pub struct ServerConfig { pub broker: broker::Broadcasts, } - -impl Server { - // Create a new server - pub fn new(config: ServerConfig) -> anyhow::Result { - // Read the PEM certificate chain - let certs = fs::File::open(config.cert).context("failed to open cert file")?; - let mut certs = io::BufReader::new(certs); - let certs = rustls_pemfile::certs(&mut certs)? - .into_iter() - .map(rustls::Certificate) - .collect(); - - // Read the PEM private key - let keys = fs::File::open(config.key).context("failed to open key file")?; - let mut keys = io::BufReader::new(keys); - let mut keys = rustls_pemfile::pkcs8_private_keys(&mut keys)?; - - anyhow::ensure!(keys.len() == 1, "expected a single key"); - let key = rustls::PrivateKey(keys.remove(0)); - - let mut tls_config = rustls::ServerConfig::builder() - .with_safe_default_cipher_suites() - .with_safe_default_kx_groups() - .with_protocol_versions(&[&rustls::version::TLS13]) - .unwrap() - .with_no_client_auth() - .with_single_cert(certs, key)?; - - tls_config.max_early_data_size = u32::MAX; - let alpn: Vec> = vec![ - b"h3".to_vec(), - b"h3-32".to_vec(), - b"h3-31".to_vec(), - b"h3-30".to_vec(), - b"h3-29".to_vec(), - ]; - tls_config.alpn_protocols = alpn; - - let mut server_config = quinn::ServerConfig::with_crypto(sync::Arc::new(tls_config)); - - // Enable BBR congestion control - // TODO validate the implementation - let mut transport_config = quinn::TransportConfig::default(); - transport_config.keep_alive_interval(Some(time::Duration::from_secs(2))); - transport_config.congestion_controller_factory(sync::Arc::new(quinn::congestion::BbrConfig::default())); - - server_config.transport = sync::Arc::new(transport_config); - let server = quinn::Endpoint::server(server_config, config.addr)?; - let broker = config.broker; - - let server = moq_transport_quinn::Server::new(server); - let tasks = JoinSet::new(); - - Ok(Self { server, broker, tasks }) - } - - // pub async fn run(mut self) -> anyhow::Result<()> { - // loop { - // tokio::select! { - // res = self.server.accept() => { - // let session = res.context("failed to accept connection")?; - // let broker = self.broker.clone(); - - // self.tasks.spawn(async move { - // let session: Session = Session::accept(session, broker).await?; - // session.run().await - // }); - // }, - // res = self.tasks.join_next(), if !self.tasks.is_empty() => { - // let res = res.expect("no tasks").expect("task aborted"); - - // if let Err(err) = res { - // log::error!("session terminated: {:?}", err); - // } - // }, - // } - // } - // } -} diff --git a/moq-warp/src/relay/session.rs b/moq-warp/src/relay/session.rs index f265871..df87781 100644 --- a/moq-warp/src/relay/session.rs +++ b/moq-warp/src/relay/session.rs @@ -1,12 +1,9 @@ use std::marker::PhantomData; -use anyhow::Context; -use moq_generic_transport::{SendStream, SendStreamUnframed, BidiStream, Connection, RecvStream}; +use moq_generic_transport::{SendStream, SendStreamUnframed, Connection, RecvStream}; use super::{broker, contribute, control, distribute}; -use moq_transport::{Role, SetupServer, Version}; -use moq_transport_quinn::Connect; pub struct Session { // Split logic into contribution/distribution to reduce the problem space. @@ -25,54 +22,8 @@ impl Session where S: SendStream + SendStreamUnframed + Send, C: Connection + Send + 'static { - // pub async fn accept(session: Connect, broker: broker::Broadcasts) -> anyhow::Result> { - // // Accep the WebTransport session. - // // OPTIONAL validate the conn.uri() otherwise call conn.reject() - // let session = session - // .accept() - // .await - // .context(": server::Setupfailed to accept WebTransport session")?; - - // session - // .setup() - // .versions - // .iter() - // .find(|v| **v == Version::DRAFT_00) - // .context("failed to find supported version")?; - - // // Choose our role based on the client's role. - // let role = match session.setup().role { - // Role::Publisher => Role::Subscriber, - // Role::Subscriber => Role::Publisher, - // Role::Both => Role::Both, - // }; - - // let setup = SetupServer { - // version: Version::DRAFT_00, - // role, - // }; - - // let session = session.accept(setup).await?; - - // let (control, objects) = session.split(); - // let (objects_send, objects_recv) = objects.split(); - - // let (control, contribute, distribute) = control::split(control); - - // let contribute = contribute::Session::new(objects_recv, contribute, broker.clone()); - // let distribute = distribute::Session::new(objects_send, distribute, broker); - - // let session = Self { - // control, - // contribute, - // distribute, - // }; - - // Ok(session) - // } - - pub async fn from_session( - session: moq_transport_trait::Session, + pub async fn from_transport_session( + session: moq_transport_generic::Session, broker: broker::Broadcasts, ) -> anyhow::Result> { let (control, objects) = session.split();