diff --git a/Cargo.toml b/Cargo.toml index 2ce6f6d..37d1cdf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,9 +2,6 @@ members = [ "moq-transport", "moq-transport-quinn", - "moq-demo-quiche", "moq-demo-quinn", "moq-warp", - "transport", - "moq-transport-generic", ] diff --git a/moq-demo-quiche/Cargo.toml b/moq-demo-quiche/Cargo.toml deleted file mode 100644 index f6dfdfc..0000000 --- a/moq-demo-quiche/Cargo.toml +++ /dev/null @@ -1,46 +0,0 @@ -[package] -name = "moq-demo-quiche" -description = "Media over QUIC" -authors = [ "Luke Curley" ] -repository = "https://github.com/kixelated/moq-rs" -license = "MIT OR Apache-2.0" - -version = "0.1.0" -edition = "2021" - -keywords = [ "quic", "http3", "webtransport", "media", "live" ] -categories = [ "multimedia", "network-programming", "web-programming" ] - - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -moq-transport = { path = "../moq-transport" } -moq-warp = { path = "../moq-warp" } - -# QUIC -quinn = "0.10" - -# Crypto -ring = "0.16.20" -rustls = "0.21.2" -rustls-pemfile = "1.0.2" - -# Async stuff -tokio = { version = "1.29.1", features = ["full"] } - -# Web server to serve the fingerprint -warp = { version = "0.3.3", features = ["tls"] } -hex = "0.4.3" - -# Logging -clap = { version = "4.0", features = [ "derive" ] } -log = { version = "0.4", features = ["std"] } -env_logger = "0.9.3" -anyhow = "1.0.70" - -moq-transport-generic = { path = "../moq-transport-generic" } -# moq-generic-transport = { path = "../transport" } -moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"} -webtransport_quiche = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" } -async_webtransport = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" } \ No newline at end of file diff --git a/moq-demo-quiche/src/main.rs b/moq-demo-quiche/src/main.rs deleted file mode 100644 index 9306429..0000000 --- a/moq-demo-quiche/src/main.rs +++ /dev/null @@ -1,218 +0,0 @@ -use std::{fs, io, net, path, sync::{self, Arc}}; - -use anyhow::Context; -use async_webtransport_handler::{AsyncWebTransportServer, regex::Regex}; -use clap::Parser; -use moq_transport::{Role, SetupServer, Version}; -use ring::digest::{digest, SHA256}; -use tokio::task::JoinSet; -use warp::Filter; - -use moq_warp::{relay::{self, ServerConfig}, source}; -use webtransport_quiche::quiche; - -/// Search for a pattern in a file and display the lines that contain it. -#[derive(Parser, Clone)] -struct Cli { - /// Listen on this address - #[arg(short, long, default_value = "[::]:4443")] - addr: net::SocketAddr, - - /// Use the certificate file at this path - #[arg(short, long, default_value = "cert/localhost.crt")] - cert: path::PathBuf, - - /// Use the private key at this path - #[arg(short, long, default_value = "cert/localhost.key")] - key: path::PathBuf, - - /// Use the media file at this path - #[arg(short, long, default_value = "media/fragmented.mp4")] - media: path::PathBuf, - - /// use quiche instead of quinn - #[arg(short, long)] - quiche: bool, -} - - -// Create a new server -pub fn new_quiche(config: ServerConfig) -> anyhow::Result<(AsyncWebTransportServer, tokio::net::UdpSocket, Vec)> { - let mut quic_config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); - - println!("loading cert {:?}, key {:?}", config.cert, config.key); - quic_config.load_cert_chain_from_pem_file(config.cert.to_str().unwrap()).unwrap(); - quic_config.load_priv_key_from_pem_file(config.key.to_str().unwrap()).unwrap(); - quic_config - .set_application_protos(quiche::h3::APPLICATION_PROTOCOL) - .unwrap(); - - quic_config.set_cc_algorithm_name("cubic").unwrap(); - quic_config.set_max_idle_timeout(10000); - quic_config.set_max_recv_udp_payload_size(1200); - quic_config.set_max_send_udp_payload_size(1200); - quic_config.set_initial_max_data(1_000_000_000); - quic_config.set_initial_max_stream_data_bidi_local(100_000_000); - quic_config.set_initial_max_stream_data_bidi_remote(100_000_000); - quic_config.set_initial_max_stream_data_uni(100_000_000); - quic_config.set_initial_max_streams_bidi(1_000_000); - quic_config.set_initial_max_streams_uni(1_000_000); - quic_config.set_disable_active_migration(true); - quic_config.enable_early_data(); - quic_config.grease(false); - // quic_config.set_fec_scheduler_algorithm(quiche::FECSchedulerAlgorithm::BurstsOnly); - // quic_config.send_fec(args.get_bool("--send-fec")); - // quic_config.receive_fec(args.get_bool("--receive-fec")); - // quic_config.set_real_time(args.get_bool("--real-time-cc")); - let h3_config = quiche::h3::Config::new().unwrap(); - - let keylog = if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") { - let file = std::fs::OpenOptions::new() - .create(true) - .append(true) - .open(keylog_path) - .unwrap(); - - Some(file) - } else { - None - }; - - let (server, socket) = AsyncWebTransportServer::with_configs(config.addr, - quic_config, h3_config, keylog)?; - let uri_root = "/"; - let regexes = [Regex::new(format!("{}", uri_root).as_str()).unwrap()]; - - Ok((server, socket, regexes.to_vec())) -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - env_logger::init(); - - let args = Cli::parse(); - - // Create a web server to serve the fingerprint - let serve = serve_http(args.clone()); - let mut tasks = JoinSet::new(); - tasks.spawn(async move { - serve.await.unwrap(); - }); - - // Create a fake media source from disk. - let media = source::File::new(args.media).context("failed to open file source")?; - - let broker = relay::broker::Broadcasts::new(); - broker - .announce("quic.video/demo", media.source()) - .context("failed to announce file source")?; - - let mut tasks = JoinSet::new(); - tasks.spawn(async move { - media.run().await.unwrap(); - }); - - // Create a server to actually serve the media - let config = relay::ServerConfig { - addr: args.addr, - cert: args.cert, - key: args.key, - broker: broker.clone(), - }; - - let (server, socket, regexes) = new_quiche(config).unwrap(); - let server = Arc::new(std::sync::Mutex::new(server)); - let socket = Arc::new(socket); - let mut buf = vec![0; 10000]; - let mut tasks = JoinSet::new(); - 'mainloop: loop { - println!("listen..."); - let cid = { - // let mut server = endpoint.quiche_server.lock().await; - let ret = async_webtransport_handler::AsyncWebTransportServer::listen_ref(server.clone(), socket.clone(), &mut buf).await?; - println!("listen returned {:?}", ret); - match ret { - Some(cid) => cid, - None => continue 'mainloop, - } - }; - - loop { - println!("poll"); - match server.lock().unwrap().poll(&cid, ®exes[..]) { - Ok(async_webtransport_handler::Event::NewSession(path, session_id, _regex_index)) => { - - let server = server.clone(); - let cid = cid.clone(); - let broker = broker.clone(); - tasks.spawn(async move { - let mut webtransport_session = async_webtransport_handler::WebTransportSession::new(server.clone(), cid.clone(), session_id); - let control_stream = moq_generic_transport::accept_bidi(&mut webtransport_session).await.unwrap().unwrap(); - let received_client_setup = moq_transport_generic::Session::accept(Box::new(control_stream), Box::new(webtransport_session)).await.unwrap(); - // TODO: maybe reject setup - let role = match received_client_setup.setup().role { - Role::Publisher => Role::Subscriber, - Role::Subscriber => Role::Publisher, - Role::Both => Role::Both, - }; - let setup_server = SetupServer { - version: Version::DRAFT_00, - role, - }; - - let session = received_client_setup.accept(setup_server).await.unwrap(); - let session = relay::Session::from_transport_session(session, broker.clone()).await.unwrap(); - session.run().await - }); - }, - Ok(async_webtransport_handler::Event::StreamData(session_id, stream_id)) => { - log::trace!("new data!"); - }, - Ok(async_webtransport_handler::Event::Done) => { - println!("H3 Done"); - break; - }, - Ok(async_webtransport_handler::Event::GoAway) => { - println!("GOAWAY"); - break; - }, - - Err(_) => todo!(), - } - } - } -} - -// Run a HTTP server using Warp -// TODO remove this when Chrome adds support for self-signed certificates using WebTransport -async fn serve_http(args: Cli) -> anyhow::Result<()> { - // Read the PEM certificate file - let crt = fs::File::open(&args.cert)?; - let mut crt = io::BufReader::new(crt); - - // Parse the DER certificate - let certs = rustls_pemfile::certs(&mut crt)?; - let cert = certs.first().expect("no certificate found"); - - // Compute the SHA-256 digest - let fingerprint = digest(&SHA256, cert.as_ref()); - let fingerprint = hex::encode(fingerprint.as_ref()); - let fingerprint = sync::Arc::new(fingerprint); - - let cors = warp::cors().allow_any_origin(); - - // What an annoyingly complicated way to serve a static String - // I spent a long time trying to find the exact way of cloning and dereferencing the Arc. - let routes = warp::path!("fingerprint") - .map(move || (*(fingerprint.clone())).clone()) - .with(cors); - - warp::serve(routes) - .tls() - .cert_path(args.cert) - .key_path(args.key) - .run(args.addr) - .await; - - Ok(()) -} diff --git a/moq-demo-quinn/Cargo.toml b/moq-demo-quinn/Cargo.toml index aaa64dd..eeca77d 100644 --- a/moq-demo-quinn/Cargo.toml +++ b/moq-demo-quinn/Cargo.toml @@ -15,13 +15,6 @@ categories = [ "multimedia", "network-programming", "web-programming" ] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-transport = { path = "../moq-transport" } -moq-transport-quinn = { path = "../moq-transport-quinn" } -moq-warp = { path = "../moq-warp" } - -h3 = { git = "https://github.com/hyperium/h3", branch = "master" } -h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" } -h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" } # QUIC quinn = "0.10" @@ -46,8 +39,11 @@ anyhow = "1.0.70" bytes= "1" -moq-transport-generic = { path = "../moq-transport-generic" } -# moq-generic-transport = { path = "../transport" } -moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"} -webtransport_quiche = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" } -async_webtransport = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" } \ No newline at end of file +webtransport-generic = {git = "https://github.com/kixelated/webtransport-rs"} +moq-transport = { path = "../moq-transport" } +moq-transport-quinn = { path = "../moq-transport-quinn" } +moq-warp = { path = "../moq-warp" } + +h3 = { git = "https://github.com/hyperium/h3", branch = "master" } +h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" } +h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" } diff --git a/moq-demo-quinn/src/server/mod.rs b/moq-demo-quinn/src/server/mod.rs index 207ed6f..b2a831a 100644 --- a/moq-demo-quinn/src/server/mod.rs +++ b/moq-demo-quinn/src/server/mod.rs @@ -1,24 +1,18 @@ -use std::{fs, io, sync::{self, Arc}, time}; +use std::{fs, io, time}; use anyhow::Context; use bytes::Bytes; -use h3::{quic::StreamId, proto::varint::VarInt}; +use h3::quic::BidiStream; use h3_webtransport::server::AcceptedBi; -use moq_transport::Message; -use moq_transport_generic::{AcceptSetup, Control, Objects}; -use moq_warp::relay::{broker, ServerConfig}; +use moq_transport::AcceptSetup; +use moq_warp::relay::ServerConfig; use tokio::task::JoinSet; use warp::{Future, http}; -use self::stream::{QuinnBidiStream, QuinnSendStream, QuinnRecvStream}; +use self::stream::{QuinnSendStream, QuinnRecvStream}; mod stream; -fn stream_id_to_u64(stream_id: StreamId) -> u64 { - let val: VarInt = stream_id.into(); - val.into_inner() -} - pub struct Server { // The MoQ transport server. server: h3_webtransport::server::WebTransportSession, @@ -61,15 +55,15 @@ impl Server { ]; tls_config.alpn_protocols = alpn; - let mut server_config = quinn::ServerConfig::with_crypto(sync::Arc::new(tls_config)); + let mut server_config = quinn::ServerConfig::with_crypto(std::sync::Arc::new(tls_config)); // Enable BBR congestion control // TODO validate the implementation let mut transport_config = quinn::TransportConfig::default(); transport_config.keep_alive_interval(Some(time::Duration::from_secs(2))); - transport_config.congestion_controller_factory(sync::Arc::new(quinn::congestion::BbrConfig::default())); + transport_config.congestion_controller_factory(std::sync::Arc::new(quinn::congestion::BbrConfig::default())); - server_config.transport = sync::Arc::new(transport_config); + server_config.transport = std::sync::Arc::new(transport_config); let server = quinn::Endpoint::server(server_config, config.addr)?; Ok(server) @@ -160,22 +154,18 @@ pub struct Connect { } impl Connect { - // Expose the received URI - pub fn uri(&self) -> &http::Uri { - self.req.uri() - } // Accept the WebTransport session. pub async fn accept(self) -> anyhow::Result> { let session = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?; let mut session = Server{server: session}; - let control_stream = moq_generic_transport::accept_bidi(&mut session) + let (control_stream_send, control_stream_recv) = moq_transport::accept_bidi(&mut session) .await .context("failed to accept bidi stream")? .unwrap(); - Ok(moq_transport_generic::Session::accept(Box::new(control_stream), Box::new(session)).await?) + Ok(moq_transport::Session::accept(Box::new(control_stream_send), Box::new(control_stream_recv), Box::new(session)).await?) } // Reject the WebTransport session with a HTTP response. @@ -186,17 +176,17 @@ impl Connect { } -impl moq_generic_transport::Connection for Server { - type BidiStream = QuinnBidiStream; +impl webtransport_generic::Connection for Server { + type Error = anyhow::Error; type SendStream = QuinnSendStream; type RecvStream = QuinnRecvStream; - fn poll_accept_recv( + fn poll_accept_uni( &mut self, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll, anyhow::Error>> { + ) -> std::task::Poll, Self::Error>> { let fut = self.server.accept_uni(); let fut = std::pin::pin!(fut); fut.poll(cx) @@ -207,13 +197,16 @@ impl moq_generic_transport::Connection for Server { fn poll_accept_bidi( &mut self, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll, anyhow::Error>> { + ) -> std::task::Poll, Self::Error>> { let fut = self.server.accept_bi(); let fut = std::pin::pin!(fut); let res = std::task::ready!(fut.poll(cx).map_err(|e| anyhow::anyhow!("{:?}", e))); match res { Ok(Some(AcceptedBi::Request(_, _))) => std::task::Poll::Ready(Err(anyhow::anyhow!("received new session whils accepting bidi stream"))), - Ok(Some(AcceptedBi::BidiStream(_, s))) => std::task::Poll::Ready(Ok(Some(QuinnBidiStream::new(s)))), + Ok(Some(AcceptedBi::BidiStream(_, s))) => { + let (send, recv) = s.split(); + std::task::Poll::Ready(Ok(Some((QuinnSendStream::new(send), QuinnRecvStream::new(recv))))) + } Ok(None) => std::task::Poll::Ready(Ok(None)), Err(e) => std::task::Poll::Ready(Err(e)), } @@ -222,18 +215,22 @@ impl moq_generic_transport::Connection for Server { fn poll_open_bidi( &mut self, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + ) -> std::task::Poll> { let fut = self.server.open_bi(self.server.session_id()); let fut = std::pin::pin!(fut); fut.poll(cx) - .map_ok(|s| QuinnBidiStream::new(s)) + .map_ok(|s| { + let (send, recv) = s.split(); + (QuinnSendStream::new(send), QuinnRecvStream::new(recv)) + } + ) .map_err(|e| anyhow::anyhow!("{:?}", e)) } - fn poll_open_send( + fn poll_open_uni( &mut self, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + ) -> std::task::Poll> { let fut = self.server.open_uni(self.server.session_id()); let fut = std::pin::pin!(fut); fut.poll(cx) @@ -241,7 +238,7 @@ impl moq_generic_transport::Connection for Server { .map_err(|e| anyhow::anyhow!("{:?}", e)) } - fn close(&mut self, _code: u64, _reason: &[u8]) { + fn close(&mut self, _code: u32, _reason: &[u8]) { todo!("not implemented") } } \ No newline at end of file diff --git a/moq-demo-quinn/src/server/stream.rs b/moq-demo-quinn/src/server/stream.rs index d81fdbe..1eb4834 100644 --- a/moq-demo-quinn/src/server/stream.rs +++ b/moq-demo-quinn/src/server/stream.rs @@ -1,10 +1,7 @@ use h3::quic::SendStream; use h3::quic::RecvStream; -use h3::quic::BidiStream; use h3::quic::SendStreamUnframed; -use super::stream_id_to_u64; - pub struct QuinnSendStream { stream: h3_webtransport::stream::SendStream, bytes::Bytes> } @@ -15,25 +12,18 @@ impl QuinnSendStream { } } -impl moq_generic_transport::SendStream for QuinnSendStream { - fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { - self.stream.poll_ready(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) - } +impl webtransport_generic::SendStream for QuinnSendStream { - fn poll_finish(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + type Error = anyhow::Error; + + fn poll_finish(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { self.stream.poll_finish(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) } - fn reset(&mut self, reset_code: u64) { - self.stream.reset(reset_code) + fn reset(&mut self, reset_code: u32) { + self.stream.reset(reset_code as u64) } - fn send_id(&self) -> u64 { - stream_id_to_u64(self.stream.send_id()) - } -} - -impl moq_generic_transport::SendStreamUnframed for QuinnSendStream { fn poll_send( &mut self, cx: &mut std::task::Context<'_>, @@ -43,7 +33,6 @@ impl moq_generic_transport::SendStreamUnframed for QuinnSendStream { } } - pub struct QuinnRecvStream { stream: h3_webtransport::stream::RecvStream } @@ -54,96 +43,18 @@ impl QuinnRecvStream { } } -impl moq_generic_transport::RecvStream for QuinnRecvStream { +impl webtransport_generic::RecvStream for QuinnRecvStream { + type Error = anyhow::Error; type Buf = bytes::Bytes; fn poll_data( &mut self, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll, anyhow::Error>> { + ) -> std::task::Poll, Self::Error>> { self.stream.poll_data(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) } - fn stop_sending(&mut self, error_code: u64) { - self.stream.stop_sending(error_code) - } - - fn recv_id(&self) -> u64 { - stream_id_to_u64(self.stream.recv_id()) + fn stop_sending(&mut self, error_code: u32) { + self.stream.stop_sending(error_code as u64) } } - -pub struct QuinnBidiStream { - stream: h3_webtransport::stream::BidiStream, bytes::Bytes> -} - -impl QuinnBidiStream { - pub fn new(stream: h3_webtransport::stream::BidiStream, bytes::Bytes>) -> QuinnBidiStream { - QuinnBidiStream { stream } - } -} - -impl moq_generic_transport::BidiStream for QuinnBidiStream { - type SendStream = QuinnSendStream; - - type RecvStream = QuinnRecvStream; - - fn split(self) -> (Self::SendStream, Self::RecvStream) { - let (send, recv) = self.stream.split(); - let send = QuinnSendStream{ - stream: send, - }; - let recv = QuinnRecvStream{ - stream: recv, - }; - (send, recv) - } -} - -impl moq_generic_transport::RecvStream for QuinnBidiStream { - type Buf = bytes::Bytes; - - fn poll_data( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll, anyhow::Error>> { - self.stream.poll_data(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) - } - - fn stop_sending(&mut self, error_code: u64) { - self.stream.stop_sending(error_code) - } - - fn recv_id(&self) -> u64 { - stream_id_to_u64(self.stream.recv_id()) - } -} - -impl moq_generic_transport::SendStream for QuinnBidiStream { - fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { - self.stream.poll_ready(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) - } - - fn poll_finish(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { - self.stream.poll_finish(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) - } - - fn reset(&mut self, reset_code: u64) { - self.stream.reset(reset_code) - } - - fn send_id(&self) -> u64 { - stream_id_to_u64(self.stream.send_id()) - } -} - -impl moq_generic_transport::SendStreamUnframed for QuinnBidiStream { - fn poll_send( - &mut self, - cx: &mut std::task::Context<'_>, - buf: &mut D, - ) -> std::task::Poll> { - self.stream.poll_send(cx, buf).map_err(|e| anyhow::anyhow!("{:?}", e)) - } -} - diff --git a/moq-transport-generic/Cargo.toml b/moq-transport-generic/Cargo.toml deleted file mode 100644 index 2c54182..0000000 --- a/moq-transport-generic/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -name = "moq-transport-generic" -description = "Media over QUIC" -authors = [ "Luke Curley" ] -repository = "https://github.com/kixelated/moq-rs" -license = "MIT OR Apache-2.0" - -version = "0.1.0" -edition = "2021" - -keywords = [ "quic", "http3", "webtransport", "media", "live" ] -categories = [ "multimedia", "network-programming", "web-programming" ] - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -moq-transport = { path = "../moq-transport" } - -# WebTransport support: TODO pin a version when released -moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"} -http = "0.2" - -tokio = { version = "1.27", features = ["macros", "sync"] } -bytes = "1" - -log = "0.4" -anyhow = "1.0.70" -thiserror = "1.0.21" diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 5d1d559..4b63293 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -18,3 +18,7 @@ categories = [ "multimedia", "network-programming", "web-programming" ] bytes = "1" thiserror = "1.0.21" log = "0.4" +tokio = { version = "1.27", features = ["full"] } +anyhow = "1.0.70" + +webtransport-generic = {git = "https://github.com/kixelated/webtransport-rs"} \ No newline at end of file diff --git a/moq-transport/src/lib.rs b/moq-transport/src/lib.rs index 8d045ac..3d5316f 100644 --- a/moq-transport/src/lib.rs +++ b/moq-transport/src/lib.rs @@ -1,7 +1,9 @@ mod coding; mod control; mod object; +mod network; pub use coding::*; pub use control::*; pub use object::*; +pub use network::*; diff --git a/moq-transport-generic/src/control.rs b/moq-transport/src/network/control.rs similarity index 62% rename from moq-transport-generic/src/control.rs rename to moq-transport/src/network/control.rs index 1be280c..0388953 100644 --- a/moq-transport-generic/src/control.rs +++ b/moq-transport/src/network/control.rs @@ -1,5 +1,6 @@ -use moq_generic_transport::{RecvStream, BidiStream}; -use moq_transport::{Decode, DecodeError, Encode, Message}; +use webtransport_generic::{RecvStream, SendStream}; +use crate::{Decode, DecodeError, Encode, Message}; +use crate::network::stream::{recv, send}; use bytes::{Buf, BytesMut}; @@ -7,27 +8,31 @@ use std::io::Cursor; use std::sync::Arc; use tokio::sync::Mutex; +use anyhow::Context; -pub struct Control { - sender: ControlSend, - recver: ControlRecv, + +pub struct Control { + sender: ControlSend, + recver: ControlRecv, } -impl Control{ - pub(crate) fn new(stream: Box) -> Self { - let (sender, recver) = stream.split(); - let sender = ControlSend::new(Box::new(sender)); - let recver = ControlRecv::new(Box::new(recver)); +impl Control{ + pub(crate) fn new(sender: Box, recver: Box) -> Self { + let sender = ControlSend::new(sender); + let recver = ControlRecv::new(recver); Self { sender, recver } } - pub fn split(self) -> (ControlSend, ControlRecv) { + pub fn split(self) -> (ControlSend, ControlRecv) { (self.sender, self.recver) } pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { - self.sender.send(msg).await + self.sender.send(msg) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e)) + .context("error sending control message") } pub async fn recv(&mut self) -> anyhow::Result { @@ -35,13 +40,13 @@ impl Control{ } } -pub struct ControlSend { - stream: Box, +pub struct ControlSend { + stream: Box, buf: BytesMut, // reuse a buffer to encode messages. } -impl ControlSend { - pub fn new(inner: Box) -> Self { +impl ControlSend { + pub fn new(inner: Box) -> Self { Self { buf: BytesMut::new(), stream: inner, @@ -56,13 +61,15 @@ impl ControlSend { msg.encode(&mut self.buf)?; // TODO make this work with select! - moq_generic_transport::send(self.stream.as_mut(), &mut self.buf).await?; - + send(self.stream.as_mut(), &mut self.buf) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e.into())) + .context("error sending control message")?; Ok(()) } // Helper that lets multiple threads send control messages. - pub fn share(self) -> ControlShared { + pub fn share(self) -> ControlShared { ControlShared { stream: Arc::new(Mutex::new(self)), } @@ -72,11 +79,11 @@ impl ControlSend { // Helper that allows multiple threads to send control messages. // There's no equivalent for receiving since only one thread should be receiving at a time. #[derive(Clone)] -pub struct ControlShared { - stream: Arc>>, +pub struct ControlShared { + stream: Arc>>, } -impl ControlShared { +impl ControlShared { pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { let mut stream = self.stream.lock().await; stream.send(msg).await @@ -112,7 +119,10 @@ impl ControlRecv { } Err(DecodeError::UnexpectedEnd) => { // The decode failed, so we need to append more data. - moq_generic_transport::recv(self.stream.as_mut(), &mut self.buf).await?; + recv(self.stream.as_mut(), &mut self.buf) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e.into())) + .context("error receiving control message")?; } Err(e) => return Err(e.into()), } diff --git a/moq-transport-generic/src/lib.rs b/moq-transport/src/network/mod.rs similarity index 85% rename from moq-transport-generic/src/lib.rs rename to moq-transport/src/network/mod.rs index e08eca1..fb0bf32 100644 --- a/moq-transport-generic/src/lib.rs +++ b/moq-transport/src/network/mod.rs @@ -1,3 +1,4 @@ +mod stream; mod control; mod object; mod server; @@ -6,6 +7,7 @@ use std::sync::Arc; use std::sync::Mutex; pub type SharedConnection = Arc>>; +pub use stream::*; pub use control::*; pub use object::*; pub use server::*; diff --git a/moq-transport-generic/src/object.rs b/moq-transport/src/network/object.rs similarity index 71% rename from moq-transport-generic/src/object.rs rename to moq-transport/src/network/object.rs index bebca90..e78adaf 100644 --- a/moq-transport-generic/src/object.rs +++ b/moq-transport/src/network/object.rs @@ -1,22 +1,21 @@ use anyhow::Context; use bytes::{Buf, BytesMut}; -use moq_generic_transport::{Connection, SendStream, SendStreamUnframed, RecvStream}; -use moq_transport::{Decode, DecodeError, Encode, Object}; +use webtransport_generic::{Connection, SendStream, RecvStream}; +use crate::{Decode, DecodeError, Encode, Object}; use std::{io::Cursor, marker::PhantomData}; -use crate::SharedConnection; +use crate::network::SharedConnection; + +use super::stream::{open_uni_shared, send, recv, accept_uni_shared}; // TODO support clients -// We could replace this generic soup by just if we forced Connection's SendStream -// to provide SendStreamUnframes's send() method. Without that, we have to make Connection's -// SendStream type more specific and force it to implement SendStreamUnframes as well. pub struct Objects { send: SendObjects, recv: RecvObjects, } -impl + Send> Objects { +impl + Send> Objects { pub fn new(session: SharedConnection) -> Self { let send = SendObjects::new(session.clone()); let recv = RecvObjects::new(session); @@ -44,7 +43,7 @@ pub struct SendObjects { _marker: PhantomData, } -impl> SendObjects { +impl> SendObjects { pub fn new(session: SharedConnection) -> Self { Self { session, @@ -58,17 +57,21 @@ impl> SendObje header.encode(&mut self.buf).unwrap(); // TODO support select! without making a new stream. - let mut stream = moq_generic_transport::open_send_shared(self.session.clone()) + let mut stream = open_uni_shared(self.session.clone()) .await + .map_err(|e| anyhow::anyhow!("{:?}", e.into())) .context("failed to open uni stream")?; - moq_generic_transport::send(&mut stream, &mut self.buf).await?; + send(&mut stream, &mut self.buf) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e.into())) + .context("failed to send data on stream")?; Ok(stream) } } -impl> Clone for SendObjects { +impl> Clone for SendObjects { fn clone(&self) -> Self { Self { session: self.session.clone(), @@ -104,8 +107,9 @@ impl> RecvObjects { let stream = match self.stream.as_mut() { Some(stream) => stream, None => { - let stream = moq_generic_transport::accept_recv_shared(self.session.clone()) + let stream = accept_uni_shared(self.session.clone()) .await + .map_err(|e| anyhow::anyhow!("{:?}", e.into())) .context("failed to accept uni stream")? .context("no uni stream")?; @@ -126,7 +130,10 @@ impl> RecvObjects { } Err(DecodeError::UnexpectedEnd) => { // The decode failed, so we need to append more data. - moq_generic_transport::recv(stream.as_mut(), &mut self.buf).await?; + recv(stream.as_mut(), &mut self.buf) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e.into())) + .context("failed to recv data on stream")?; } Err(e) => return Err(e.into()), } diff --git a/moq-transport-generic/src/server.rs b/moq-transport/src/network/server.rs similarity index 69% rename from moq-transport-generic/src/server.rs rename to moq-transport/src/network/server.rs index 9e45e07..82727f9 100644 --- a/moq-transport-generic/src/server.rs +++ b/moq-transport/src/network/server.rs @@ -1,19 +1,19 @@ use anyhow::Context; -use moq_generic_transport::{Connection, RecvStream}; -use moq_transport::{Message, SetupClient, SetupServer}; +use webtransport_generic::{Connection, RecvStream}; +use crate::{Message, SetupClient, SetupServer}; use super::{Control, Objects}; pub struct Session { - pub control: Control, + pub control: Control, pub objects: Objects, } impl + Send> Session { - pub async fn accept(control_stream: Box, connection: Box) -> anyhow::Result> { - let mut control = Control::new(control_stream); + pub async fn accept(control_stream_send: Box, control_stream_recv: Box::, connection: Box) -> anyhow::Result> { + let mut control = Control::new(control_stream_send, control_stream_recv); let objects = Objects::new(std::sync::Arc::new(std::sync::Mutex::new(connection))); let setup_client = match control.recv().await.context("failed to read SETUP")? { @@ -23,7 +23,7 @@ impl + Send> Session { Ok(AcceptSetup { setup_client, control, objects }) } - pub fn split(self) -> (Control, Objects) { + pub fn split(self) -> (Control, Objects) { (self.control, self.objects) } } @@ -31,7 +31,7 @@ impl + Send> Session { pub struct AcceptSetup { setup_client: SetupClient, - control: Control, + control: Control, objects: Objects, } diff --git a/moq-transport/src/network/stream.rs b/moq-transport/src/network/stream.rs new file mode 100644 index 0000000..f3e7821 --- /dev/null +++ b/moq-transport/src/network/stream.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; + +use bytes::{Buf, BytesMut, BufMut}; +use webtransport_generic::{Connection, RecvStream, SendStream}; + + + +pub async fn accept_uni(conn: &mut C) -> Result, C::Error> { + std::future::poll_fn(|cx| conn.poll_accept_uni(cx)).await +} + +pub async fn accept_uni_shared(conn: Arc>>) -> Result, C::Error> { + Ok(std::future::poll_fn(|cx| conn.lock().unwrap().poll_accept_uni(cx)).await?) + +} + +pub async fn accept_bidi(conn: &mut C) -> Result, C::Error> { + std::future::poll_fn(|cx| conn.poll_accept_bidi(cx)).await + +} + +pub async fn accept_bidi_shared(conn: Arc>>) -> Result, C::Error> { + std::future::poll_fn(|cx| conn.lock().unwrap().poll_accept_bidi(cx)).await + +} + +pub async fn open_uni(conn: &mut C) -> anyhow::Result { + std::future::poll_fn(|cx| conn.poll_open_uni(cx)).await + +} + +pub async fn open_uni_shared(conn: Arc>>) -> Result { + std::future::poll_fn(|cx| conn.lock().unwrap().poll_open_uni(cx)).await + +} + +pub async fn open_bidi(conn: &mut C) -> anyhow::Result<(C::SendStream, C::RecvStream), C::Error> { + std::future::poll_fn(|cx| conn.poll_open_bidi(cx)).await + +} + +pub async fn open_bidi_shared(conn: Arc>>) -> Result<(C::SendStream, C::RecvStream), C::Error> { + std::future::poll_fn(|cx| conn.lock().unwrap().poll_open_bidi(cx)).await + +} + +pub async fn recv(recv: &mut R , outbuf: &mut BytesMut) -> Result { + let buf = std::future::poll_fn(|cx| recv.poll_data(cx)).await?; + match buf { + Some(buf) => { + outbuf.put(buf); + Ok(true) + } + None => Ok(false) // stream finished + } +} + +pub async fn send(send: &mut S, buf: &mut B) -> Result { + std::future::poll_fn(|cx| send.poll_send(cx, buf)).await +} diff --git a/moq-warp/Cargo.toml b/moq-warp/Cargo.toml index 028d853..02519e0 100644 --- a/moq-warp/Cargo.toml +++ b/moq-warp/Cargo.toml @@ -17,8 +17,7 @@ categories = [ "multimedia", "network-programming", "web-programming" ] [dependencies] moq-transport = { path = "../moq-transport" } moq-transport-quinn = { path = "../moq-transport-quinn" } -moq-transport-generic = { path = "../moq-transport-generic" } -moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"} +webtransport-generic = {git = "https://github.com/kixelated/webtransport-rs"} bytes = "1" tokio = "1.27" diff --git a/moq-warp/src/relay/contribute.rs b/moq-warp/src/relay/contribute.rs index afa0aad..ae76a1e 100644 --- a/moq-warp/src/relay/contribute.rs +++ b/moq-warp/src/relay/contribute.rs @@ -4,12 +4,11 @@ use std::sync::{Arc, Mutex}; use std::time; use bytes::Buf; -use moq_generic_transport::{Connection, RecvStream}; +use webtransport_generic::{Connection, RecvStream}; use tokio::sync::mpsc; use tokio::task::JoinSet; // lock across await boundaries -use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; -use moq_transport_generic::RecvObjects; +use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt, RecvObjects}; use anyhow::Context; @@ -93,7 +92,7 @@ impl + Send> Sessi } } - async fn receive_object + Send + 'static>(&mut self, object: Object, stream: Box) -> anyhow::Result<()> { + async fn receive_object(&mut self, object: Object, stream: Box) -> anyhow::Result<()> { let track = object.track; let segment = segment::Info { @@ -116,17 +115,17 @@ impl + Send> Sessi Ok(()) } - async fn run_segment + Send + 'static>(mut segment: segment::Publisher, mut stream: Box) -> anyhow::Result<()> { - // let mut buf = [0u8; 32 * 1024]; + async fn run_segment(mut segment: segment::Publisher, mut stream: Box) -> anyhow::Result<()> { loop { let mut b = bytes::BytesMut::new(); - let stream_finished = !moq_generic_transport::recv(stream.as_mut(), &mut b).await?; - // let size = stream.read(&mut buf).await.context("failed to read from stream")?; + let stream_finished = !moq_transport::recv(stream.as_mut(), &mut b) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e.into())) + .context("error receiving control message")?; if stream_finished { return Ok(()); } - // let chunk = buf[..size].to_vec(); segment.fragments.push(b.chunk().to_vec().into()) } } diff --git a/moq-warp/src/relay/control.rs b/moq-warp/src/relay/control.rs index 1400334..24258c2 100644 --- a/moq-warp/src/relay/control.rs +++ b/moq-warp/src/relay/control.rs @@ -1,18 +1,17 @@ -use moq_generic_transport::{SendStream, SendStreamUnframed, BidiStream}; +use webtransport_generic::{SendStream, RecvStream}; use tokio::sync::mpsc; -use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk}; -use moq_transport_generic::Control; +use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk, Control}; -pub struct Main { - control: Control, +pub struct Main { + control: Control, outgoing: mpsc::Receiver, contribute: mpsc::Sender, distribute: mpsc::Sender, } -impl Main { +impl Main { pub async fn run(mut self) -> anyhow::Result<()> { loop { tokio::select! { @@ -52,7 +51,7 @@ impl Component { } // Splits a control stream into two components, based on if it's a message for contribution or distribution. -pub fn split>(control: Control) -> (Main, Component, Component) { +pub fn split(control: Control) -> (Main, Component, Component) { let (outgoing_tx, outgoing_rx) = mpsc::channel(1); let (contribute_tx, contribute_rx) = mpsc::channel(1); let (distribute_tx, distribute_rx) = mpsc::channel(1); diff --git a/moq-warp/src/relay/distribute.rs b/moq-warp/src/relay/distribute.rs index 541aedd..e70230c 100644 --- a/moq-warp/src/relay/distribute.rs +++ b/moq-warp/src/relay/distribute.rs @@ -3,16 +3,15 @@ use std::marker::PhantomData; use anyhow::Context; use bytes::Buf; -use moq_generic_transport::{SendStream, SendStreamUnframed, Connection}; +use webtransport_generic::{SendStream, Connection}; use tokio::task::JoinSet; // allows locking across await -use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; -use moq_transport_generic::SendObjects; +use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt, SendObjects}; use super::{broker, control}; use crate::model::{segment, track}; -pub struct Session { +pub struct Session { // Objects are sent to the client objects: SendObjects, @@ -29,7 +28,7 @@ pub struct Session Session where - S: SendStream + SendStreamUnframed + Send, + S: SendStream + Send, C: Connection + Send + 'static { pub fn new( objects: SendObjects, @@ -179,13 +178,14 @@ impl Session where while let Some(fragment) = segment.fragments.next().await { let mut buf = bytes::Bytes::copy_from_slice(fragment.as_slice()); while buf.has_remaining() { - moq_generic_transport::send(&mut stream, &mut buf).await?; + moq_transport::send(&mut stream, &mut buf) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e.into())) + .context("error sending control message")?; } - // stream.write_all(fragment.as_slice()).await?; } // NOTE: stream is automatically closed when dropped - Ok(()) } diff --git a/moq-warp/src/relay/session.rs b/moq-warp/src/relay/session.rs index df87781..46007b4 100644 --- a/moq-warp/src/relay/session.rs +++ b/moq-warp/src/relay/session.rs @@ -1,29 +1,28 @@ use std::marker::PhantomData; -use moq_generic_transport::{SendStream, SendStreamUnframed, Connection, RecvStream}; +use webtransport_generic::{SendStream, Connection, RecvStream}; use super::{broker, contribute, control, distribute}; -pub struct Session { +pub struct Session { // Split logic into contribution/distribution to reduce the problem space. contribute: contribute::Session, distribute: distribute::Session, // Used to receive control messages and forward to contribute/distribute. - control: control::Main, + control: control::Main, _marker: PhantomData, _marker_r: PhantomData, } -// impl + Send + 'static> Session { impl Session where R: RecvStream + Send + 'static, - S: SendStream + SendStreamUnframed + Send, + S: SendStream + Send, C: Connection + Send + 'static { pub async fn from_transport_session( - session: moq_transport_generic::Session, + session: moq_transport::Session, broker: broker::Broadcasts, ) -> anyhow::Result> { let (control, objects) = session.split(); diff --git a/transport/Cargo.toml b/transport/Cargo.toml deleted file mode 100644 index 1378e37..0000000 --- a/transport/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "moq-generic-transport" -description = "Media over QUIC" -authors = [ "Luke Curley" ] -repository = "https://github.com/kixelated/moq-rs" -license = "MIT OR Apache-2.0" - -version = "0.1.0" -edition = "2021" - -keywords = [ "quic", "http3", "webtransport", "media", "live" ] -categories = [ "multimedia", "network-programming", "web-programming" ] - - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -bytes = "1" -thiserror = "1.0.21" -log = "0.4" - -anyhow = "1.0.70" diff --git a/transport/src/lib.rs b/transport/src/lib.rs deleted file mode 100644 index a837d05..0000000 --- a/transport/src/lib.rs +++ /dev/null @@ -1,197 +0,0 @@ -// Coming from https://github.com/hyperium/h3, the goal is to -// do a PR with the changes afterwards - -use std::{task::{self, Poll}, sync::Arc}; -use bytes::{Buf, BufMut}; -use anyhow::Error; - -type ErrorCode = u64; -type StreamId = u64; - -/// Trait representing a QUIC connection. -pub trait Connection { - /// The type produced by `poll_accept_bidi()` - type BidiStream: BidiStream; - /// The type of the sending part of `BidiStream` - type SendStream: SendStream + SendStreamUnframed; - /// The type produced by `poll_accept_recv()` - type RecvStream: RecvStream; - - /// Accept an incoming unidirectional stream - /// - /// Returning `None` implies the connection is closing or closed. - fn poll_accept_recv( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Error>>; - - /// Accept an incoming bidirectional stream - /// - /// Returning `None` implies the connection is closing or closed. - fn poll_accept_bidi( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Error>>; - - /// Poll the connection to create a new bidirectional stream. - fn poll_open_bidi( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll>; - - /// Poll the connection to create a new unidirectional stream. - fn poll_open_send( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll>; - - /// Close the connection immediately - fn close(&mut self, code: ErrorCode, reason: &[u8]); -} - -/// Trait for opening outgoing streams -pub trait OpenStreams { - /// The type produced by `poll_open_bidi()` - type BidiStream: BidiStream; - /// The type produced by `poll_open_send()` - type SendStream: SendStream + SendStreamUnframed; - /// The type of the receiving part of `BidiStream` - type RecvStream: RecvStream; - - /// Poll the connection to create a new bidirectional stream. - fn poll_open_bidi( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll>; - - /// Poll the connection to create a new unidirectional stream. - fn poll_open_send( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll>; - - /// Close the connection immediately - fn close(&mut self, code: ErrorCode, reason: &[u8]); -} - -/// A trait describing the "send" actions of a QUIC stream. -pub trait SendStream { - - /// Polls if the stream can send more data. - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; - - /// Poll to finish the sending side of the stream. - fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll>; - - /// Send a QUIC reset code. - fn reset(&mut self, reset_code: u64); - - /// Get QUIC send stream id - fn send_id(&self) -> StreamId; -} - -/// Allows sending unframed pure bytes to a stream. Similar to [`AsyncWrite`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncWrite.html) -pub trait SendStreamUnframed: SendStream { - /// Attempts write data into the stream. - /// - /// Returns the number of bytes written. - /// - /// `buf` is advanced by the number of bytes written. - fn poll_send( - &mut self, - cx: &mut task::Context<'_>, - buf: &mut D, - ) -> Poll>; -} - -/// A trait describing the "receive" actions of a QUIC stream. -pub trait RecvStream { - /// The type of `Buf` for data received on this stream. - type Buf: Buf + Send; - - /// Poll the stream for more data. - /// - /// When the receive side will no longer receive more data (such as because - /// the peer closed their sending side), this should return `None`. - fn poll_data( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, anyhow::Error>>; - - /// Send a `STOP_SENDING` QUIC code. - fn stop_sending(&mut self, error_code: u64); - - /// Get QUIC send stream id - fn recv_id(&self) -> StreamId; - -} - -pub async fn accept_recv(conn: &mut C) -> anyhow::Result, Error> { - Ok(std::future::poll_fn(|cx| conn.poll_accept_recv(cx)).await?) - -} - -pub async fn accept_recv_shared(conn: Arc>>) -> anyhow::Result, Error> { - Ok(std::future::poll_fn(|cx| conn.lock().unwrap().poll_accept_recv(cx)).await?) - -} - -pub async fn accept_bidi(conn: &mut C) -> anyhow::Result, Error> { - Ok(std::future::poll_fn(|cx| conn.poll_accept_bidi(cx)).await?) - -} - -pub async fn accept_bidi_shared(conn: Arc>>) -> anyhow::Result, Error> { - Ok(std::future::poll_fn(|cx| conn.lock().unwrap().poll_accept_bidi(cx)).await?) - -} - -pub async fn open_send(conn: &mut C) -> anyhow::Result { - Ok(std::future::poll_fn(|cx| conn.poll_open_send(cx)).await?) - -} - -pub async fn open_send_shared(conn: Arc>>) -> anyhow::Result { - Ok(std::future::poll_fn(|cx| conn.lock().unwrap().poll_open_send(cx)).await?) - -} - -pub async fn open_bidi(conn: &mut C) -> anyhow::Result { - Ok(std::future::poll_fn(|cx| conn.poll_open_bidi(cx)).await?) - -} - -pub async fn open_bidi_shared(conn: Arc>>) -> anyhow::Result { - Ok(std::future::poll_fn(|cx| conn.lock().unwrap().poll_open_bidi(cx)).await?) - -} - - - -pub async fn recv>(recv: &mut R , outbuf: &mut BM) -> anyhow::Result { - let buf = std::future::poll_fn(|cx| recv.poll_data(cx)).await?; - match buf { - Some(buf) => { - outbuf.put(buf); - Ok(true) - } - None => Ok(false) // stream finished - } -} - -pub async fn send(send: &mut S, buf: &mut B) -> anyhow::Result { - Ok(std::future::poll_fn(|cx| send.poll_send(cx, buf)).await?) -} - - - -/// Optional trait to allow "splitting" a bidirectional stream into two sides. -pub trait BidiStream: SendStream + RecvStream { - /// The type for the send half. - type SendStream: SendStream + SendStreamUnframed; - /// The type for the receive half. - type RecvStream: RecvStream; - - /// Split this stream into two halves. - fn split(self) -> (Self::SendStream, Self::RecvStream); -} \ No newline at end of file