From c360ea141664b6d09d1c39c4a7341f7f6915ac1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Michel?= Date: Wed, 12 Jul 2023 11:38:55 +0000 Subject: [PATCH] add generic moq transport trait --- Cargo.toml | 1 + moq-demo/Cargo.toml | 8 +- moq-demo/src/main.rs | 164 +++++++++++++++++++++++++++-- moq-transport-trait/Cargo.toml | 28 +++++ moq-transport-trait/src/control.rs | 124 ++++++++++++++++++++++ moq-transport-trait/src/lib.rs | 11 ++ moq-transport-trait/src/object.rs | 135 ++++++++++++++++++++++++ moq-transport-trait/src/server.rs | 94 +++++++++++++++++ moq-warp/Cargo.toml | 3 + moq-warp/src/relay/contribute.rs | 33 +++--- moq-warp/src/relay/control.rs | 11 +- moq-warp/src/relay/distribute.rs | 22 ++-- moq-warp/src/relay/server.rs | 40 +++---- moq-warp/src/relay/session.rs | 80 +++++++++----- 14 files changed, 668 insertions(+), 86 deletions(-) create mode 100644 moq-transport-trait/Cargo.toml create mode 100644 moq-transport-trait/src/control.rs create mode 100644 moq-transport-trait/src/lib.rs create mode 100644 moq-transport-trait/src/object.rs create mode 100644 moq-transport-trait/src/server.rs diff --git a/Cargo.toml b/Cargo.toml index bfe7c31..ff2de30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,4 +5,5 @@ members = [ "moq-demo", "moq-warp", "transport", + "moq-transport-trait", ] diff --git a/moq-demo/Cargo.toml b/moq-demo/Cargo.toml index de84b7e..023672b 100644 --- a/moq-demo/Cargo.toml +++ b/moq-demo/Cargo.toml @@ -27,7 +27,7 @@ rustls = "0.21.2" rustls-pemfile = "1.0.2" # Async stuff -tokio = { version = "1.27", features = ["full"] } +tokio = { version = "1.29.1", features = ["full"] } # Web server to serve the fingerprint warp = { version = "0.3.3", features = ["tls"] } @@ -38,3 +38,9 @@ clap = { version = "4.0", features = [ "derive" ] } log = { version = "0.4", features = ["std"] } env_logger = "0.9.3" anyhow = "1.0.70" + +moq-transport-trait = { path = "../moq-transport-trait" } +# moq-generic-transport = { path = "../transport" } +moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"} +webtransport_quiche = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" } +async_webtransport = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" } \ No newline at end of file diff --git a/moq-demo/src/main.rs b/moq-demo/src/main.rs index 2c1fdcc..a10da91 100644 --- a/moq-demo/src/main.rs +++ b/moq-demo/src/main.rs @@ -1,11 +1,15 @@ -use std::{fs, io, net, path, sync}; +use std::{fs, io, net, path, sync::{self, Arc}}; use anyhow::Context; +use async_webtransport_handler::{AsyncWebTransportServer, regex::Regex}; use clap::Parser; +use moq_transport::{Role, SetupServer, Version}; use ring::digest::{digest, SHA256}; +use tokio::task::JoinSet; use warp::Filter; -use moq_warp::{relay, source}; +use moq_warp::{relay::{self, ServerConfig}, source}; +use webtransport_quiche::quiche; /// Search for a pattern in a file and display the lines that contain it. #[derive(Parser, Clone)] @@ -25,6 +29,61 @@ struct Cli { /// Use the media file at this path #[arg(short, long, default_value = "media/fragmented.mp4")] media: path::PathBuf, + + /// use quiche instead of quinn + #[arg(short, long)] + quiche: bool, +} + + +// Create a new server +pub fn new_quiche(config: ServerConfig) -> anyhow::Result<(AsyncWebTransportServer, tokio::net::UdpSocket, Vec)> { + let mut quic_config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); + + println!("loading cert {:?}, key {:?}", config.cert, config.key); + quic_config.load_cert_chain_from_pem_file(config.cert.to_str().unwrap()).unwrap(); + quic_config.load_priv_key_from_pem_file(config.key.to_str().unwrap()).unwrap(); + quic_config + .set_application_protos(quiche::h3::APPLICATION_PROTOCOL) + .unwrap(); + + quic_config.set_cc_algorithm_name("cubic").unwrap(); + quic_config.set_max_idle_timeout(10000); + quic_config.set_max_recv_udp_payload_size(1200); + quic_config.set_max_send_udp_payload_size(1200); + quic_config.set_initial_max_data(1_000_000_000); + quic_config.set_initial_max_stream_data_bidi_local(100_000_000); + quic_config.set_initial_max_stream_data_bidi_remote(100_000_000); + quic_config.set_initial_max_stream_data_uni(100_000_000); + quic_config.set_initial_max_streams_bidi(1_000_000); + quic_config.set_initial_max_streams_uni(1_000_000); + quic_config.set_disable_active_migration(true); + quic_config.enable_early_data(); + quic_config.grease(false); + // quic_config.set_fec_scheduler_algorithm(quiche::FECSchedulerAlgorithm::BurstsOnly); + // quic_config.send_fec(args.get_bool("--send-fec")); + // quic_config.receive_fec(args.get_bool("--receive-fec")); + // quic_config.set_real_time(args.get_bool("--real-time-cc")); + let h3_config = quiche::h3::Config::new().unwrap(); + + let keylog = if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") { + let file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(keylog_path) + .unwrap(); + + Some(file) + } else { + None + }; + + let (server, socket) = AsyncWebTransportServer::with_configs(config.addr, + quic_config, h3_config, keylog)?; + let uri_root = "/"; + let regexes = [Regex::new(format!("{}", uri_root).as_str()).unwrap()]; + + Ok((server, socket, regexes.to_vec())) } #[tokio::main] @@ -35,6 +94,10 @@ async fn main() -> anyhow::Result<()> { // Create a web server to serve the fingerprint let serve = serve_http(args.clone()); + let mut tasks = JoinSet::new(); + tasks.spawn(async move { + serve.await.unwrap(); + }); // Create a fake media source from disk. let media = source::File::new(args.media).context("failed to open file source")?; @@ -44,22 +107,105 @@ async fn main() -> anyhow::Result<()> { .announce("quic.video/demo", media.source()) .context("failed to announce file source")?; + let mut tasks = JoinSet::new(); + tasks.spawn(async move { + media.run().await.unwrap(); + }); + // Create a server to actually serve the media let config = relay::ServerConfig { addr: args.addr, cert: args.cert, key: args.key, - broker, + broker: broker.clone(), }; - let server = relay::Server::new(config).context("failed to create server")?; + if args.quiche { + let (server, socket, regexes) = new_quiche(config).unwrap(); + let server = Arc::new(std::sync::Mutex::new(server)); + let socket = Arc::new(socket); + let mut buf = vec![0; 10000]; + let mut tasks = JoinSet::new(); + 'mainloop: loop { + println!("listen..."); + let cid = { + // let mut server = endpoint.quiche_server.lock().await; + let ret = async_webtransport_handler::AsyncWebTransportServer::listen_ref(server.clone(), socket.clone(), &mut buf).await?; + println!("listen returned {:?}", ret); + match ret { + Some(cid) => cid, + None => continue 'mainloop, + } + }; + + loop { + println!("poll"); + match server.lock().unwrap().poll(&cid, ®exes[..]) { + Ok(async_webtransport_handler::Event::NewSession(path, session_id, _regex_index)) => { + + let server = server.clone(); + let cid = cid.clone(); + let broker = broker.clone(); + tasks.spawn(async move { + // let control_stream = async_webtransport_handler::ServerBidiStream::new(server.clone(), cid.clone(), session_id, session_id); + let mut webtransport_session = async_webtransport_handler::WebTransportSession::new(server.clone(), cid.clone(), session_id); + let control_stream = moq_generic_transport::accept_bidi(&mut webtransport_session).await.unwrap().unwrap(); + // let control_stream = async_webtransport_handler::ServerBidiStream::new(server.clone(), cid.clone(), session_id, control_stream_id); + // let session = moq_transport_trait::Session::new(Box::new(control_stream), Box::new(webtransport_session)); + let received_client_setup = moq_transport_trait::Session::accept(Box::new(control_stream), Box::new(webtransport_session)).await.unwrap(); + // TODO: maybe reject setup + let role = match received_client_setup.setup().role { + Role::Publisher => Role::Subscriber, + Role::Subscriber => Role::Publisher, + Role::Both => Role::Both, + }; + let setup_server = SetupServer { + version: Version::DRAFT_00, + role, + }; + + let session = received_client_setup.accept(setup_server).await.unwrap(); + let session = relay::Session::from_session(session, broker.clone()).await.unwrap(); + session.run().await + }); + }, + Ok(async_webtransport_handler::Event::StreamData(session_id, stream_id)) => { + log::trace!("new data!"); + }, + Ok(async_webtransport_handler::Event::Done) => { + println!("H3 Done"); + break; + }, + Ok(async_webtransport_handler::Event::GoAway) => { + println!("GOAWAY"); + break; + }, + + Err(_) => todo!(), + } + } + } + + // let session = moq_transport_trait::Session::new(control_stream, connection) + // let server = relay::Server::new(config).context("failed to create server")?; + // // Run all of the above + // tokio::select! { + // res = server.run() => res.context("failed to run server"), + // res = media.run() => res.context("failed to run media source"), + // res = serve => res.context("failed to run HTTP server"), + // } + } else { + // let server = relay::Server::new(config).context("failed to create server")?; + + // Run all of the above + // tokio::select! { + // res = server.run() => res.context("failed to run server"), + // res = media.run() => res.context("failed to run media source"), + // res = serve => res.context("failed to run HTTP server"), + // } - // Run all of the above - tokio::select! { - res = server.run() => res.context("failed to run server"), - res = media.run() => res.context("failed to run media source"), - res = serve => res.context("failed to run HTTP server"), } + Ok(()) } // Run a HTTP server using Warp diff --git a/moq-transport-trait/Cargo.toml b/moq-transport-trait/Cargo.toml new file mode 100644 index 0000000..2f494a0 --- /dev/null +++ b/moq-transport-trait/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "moq-transport-trait" +description = "Media over QUIC" +authors = [ "Luke Curley" ] +repository = "https://github.com/kixelated/moq-rs" +license = "MIT OR Apache-2.0" + +version = "0.1.0" +edition = "2021" + +keywords = [ "quic", "http3", "webtransport", "media", "live" ] +categories = [ "multimedia", "network-programming", "web-programming" ] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +moq-transport = { path = "../moq-transport" } + +# WebTransport support: TODO pin a version when released +moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"} +http = "0.2" + +tokio = { version = "1.27", features = ["macros", "sync"] } +bytes = "1" + +log = "0.4" +anyhow = "1.0.70" +thiserror = "1.0.21" diff --git a/moq-transport-trait/src/control.rs b/moq-transport-trait/src/control.rs new file mode 100644 index 0000000..f2d512f --- /dev/null +++ b/moq-transport-trait/src/control.rs @@ -0,0 +1,124 @@ +use moq_generic_transport::{SendStream, RecvStream, BidiStream, SendStreamUnframed}; +use moq_transport::{Decode, DecodeError, Encode, Message}; + +use bytes::{Buf, BytesMut}; + +use std::io::Cursor; +use std::marker::PhantomData; +use std::sync::Arc; +use tokio::sync::Mutex; + + +pub struct Control> { + sender: ControlSend, + recver: ControlRecv, +} + +impl> Control { + pub(crate) fn new(stream: Box) -> Self { + let (sender, recver) = stream.split(); + let sender = ControlSend::new(Box::new(sender)); + let recver = ControlRecv::new(Box::new(recver)); + + Self { sender, recver } + } + + pub fn split(self) -> (ControlSend, ControlRecv) { + (self.sender, self.recver) + } + + pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { + self.sender.send(msg).await + } + + pub async fn recv(&mut self) -> anyhow::Result { + self.recver.recv().await + } +} + +pub struct ControlSend { + stream: Box, + buf: BytesMut, // reuse a buffer to encode messages. +} + +impl ControlSend { + pub fn new(inner: Box) -> Self { + Self { + buf: BytesMut::new(), + stream: inner, + } + } + + pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { + let msg = msg.into(); + log::info!("sending message: {:?}", msg); + + self.buf.clear(); + msg.encode(&mut self.buf)?; + + // TODO make this work with select! + moq_generic_transport::send(self.stream.as_mut(), &mut self.buf).await?; + + Ok(()) + } + + // Helper that lets multiple threads send control messages. + pub fn share(self) -> ControlShared { + ControlShared { + stream: Arc::new(Mutex::new(self)), + _marker: PhantomData, + } + } +} + +// Helper that allows multiple threads to send control messages. +// There's no equivalent for receiving since only one thread should be receiving at a time. +#[derive(Clone)] +pub struct ControlShared { + stream: Arc>>, + _marker: PhantomData +} + +impl ControlShared { + pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { + let mut stream = self.stream.lock().await; + stream.send(msg).await + } +} + +pub struct ControlRecv { + stream: Box, + buf: BytesMut, // data we've read but haven't fully decoded yet +} + +impl ControlRecv { + pub fn new(inner: Box) -> Self { + Self { + buf: BytesMut::new(), + stream: inner, + } + } + + // Read the next full message from the stream. + pub async fn recv(&mut self) -> anyhow::Result { + loop { + // Read the contents of the buffer + let mut peek = Cursor::new(&self.buf); + + match Message::decode(&mut peek) { + Ok(msg) => { + // We've successfully decoded a message, so we can advance the buffer. + self.buf.advance(peek.position() as usize); + + log::info!("received message: {:?}", msg); + return Ok(msg); + } + Err(DecodeError::UnexpectedEnd) => { + // The decode failed, so we need to append more data. + moq_generic_transport::recv(self.stream.as_mut(), &mut self.buf).await?; + } + Err(e) => return Err(e.into()), + } + } + } +} diff --git a/moq-transport-trait/src/lib.rs b/moq-transport-trait/src/lib.rs new file mode 100644 index 0000000..e08eca1 --- /dev/null +++ b/moq-transport-trait/src/lib.rs @@ -0,0 +1,11 @@ +mod control; +mod object; +mod server; + +use std::sync::Arc; +use std::sync::Mutex; +pub type SharedConnection = Arc>>; + +pub use control::*; +pub use object::*; +pub use server::*; diff --git a/moq-transport-trait/src/object.rs b/moq-transport-trait/src/object.rs new file mode 100644 index 0000000..bebca90 --- /dev/null +++ b/moq-transport-trait/src/object.rs @@ -0,0 +1,135 @@ +use anyhow::Context; +use bytes::{Buf, BytesMut}; +use moq_generic_transport::{Connection, SendStream, SendStreamUnframed, RecvStream}; +use moq_transport::{Decode, DecodeError, Encode, Object}; +use std::{io::Cursor, marker::PhantomData}; + +use crate::SharedConnection; + +// TODO support clients + +// We could replace this generic soup by just if we forced Connection's SendStream +// to provide SendStreamUnframes's send() method. Without that, we have to make Connection's +// SendStream type more specific and force it to implement SendStreamUnframes as well. +pub struct Objects { + send: SendObjects, + recv: RecvObjects, +} + +impl + Send> Objects { + pub fn new(session: SharedConnection) -> Self { + let send = SendObjects::new(session.clone()); + let recv = RecvObjects::new(session); + Self { send, recv } + } + + pub fn split(self) -> (SendObjects, RecvObjects) { + (self.send, self.recv) + } + + pub async fn recv(&mut self) -> anyhow::Result<(Object, R)> { + self.recv.recv().await + } + + pub async fn send(&mut self, header: Object) -> anyhow::Result { + self.send.send(header).await + } +} + +pub struct SendObjects { + session: SharedConnection, + + // A reusable buffer for encoding messages. + buf: BytesMut, + _marker: PhantomData, +} + +impl> SendObjects { + pub fn new(session: SharedConnection) -> Self { + Self { + session, + buf: BytesMut::new(), + _marker: PhantomData, + } + } + + pub async fn send(&mut self, header: Object) -> anyhow::Result { + self.buf.clear(); + header.encode(&mut self.buf).unwrap(); + + // TODO support select! without making a new stream. + let mut stream = moq_generic_transport::open_send_shared(self.session.clone()) + .await + .context("failed to open uni stream")?; + + moq_generic_transport::send(&mut stream, &mut self.buf).await?; + + Ok(stream) + } +} + +impl> Clone for SendObjects { + fn clone(&self) -> Self { + Self { + session: self.session.clone(), + buf: BytesMut::new(), + _marker: PhantomData, + } + } +} + +// Not clone, so we don't accidentally have two listners. +pub struct RecvObjects { + session: SharedConnection, + + // A uni stream that's been accepted but not fully read from yet. + stream: Option>, + + // Data that we've read but haven't formed a full message yet. + buf: BytesMut, +} + +impl> RecvObjects { + pub fn new(session: SharedConnection) -> Self { + Self { + session, + stream: None, + buf: BytesMut::new(), + } + } + + pub async fn recv(&mut self) -> anyhow::Result<(Object, R)> { + // Make sure any state is saved across await boundaries so this works with select! + + let stream = match self.stream.as_mut() { + Some(stream) => stream, + None => { + let stream = moq_generic_transport::accept_recv_shared(self.session.clone()) + .await + .context("failed to accept uni stream")? + .context("no uni stream")?; + + self.stream.insert(Box::new(stream)) + } + }; + + loop { + // Read the contents of the buffer + let mut peek = Cursor::new(&self.buf); + + match Object::decode(&mut peek) { + Ok(header) => { + let stream = self.stream.take().unwrap(); + self.buf.advance(peek.position() as usize); + + return Ok((header, *stream)); + } + Err(DecodeError::UnexpectedEnd) => { + // The decode failed, so we need to append more data. + moq_generic_transport::recv(stream.as_mut(), &mut self.buf).await?; + } + Err(e) => return Err(e.into()), + } + } + } +} diff --git a/moq-transport-trait/src/server.rs b/moq-transport-trait/src/server.rs new file mode 100644 index 0000000..91273dd --- /dev/null +++ b/moq-transport-trait/src/server.rs @@ -0,0 +1,94 @@ + +use anyhow::Context; +use moq_generic_transport::{Connection, BidiStream, SendStream, SendStreamUnframed, RecvStream}; +use moq_transport::{Message, SetupClient, SetupServer}; + +use crate::SharedConnection; + +use super::{Control, Objects}; +// pub struct Server { +// // The Webtransport/QUIC server, with an already established session/connection. +// endpoint: Box, +// } + +// impl Server { +// pub fn new(endpoint: Box) -> Self { +// let handshake = JoinSet::new(); +// Self { endpoint } +// } + +// // Accept the next WebTransport session. +// pub async fn accept(&mut self) -> anyhow::Result { +// loop { +// tokio::select!( +// // Accept the connection and start the WebTransport handshake. +// conn = self.endpoint.accept() => { +// let conn = conn.context("failed to accept connection")?; +// self.handshake.spawn(async move { +// Connecting::new(conn).accept().await +// }); +// }, +// // Return any mostly finished WebTransport handshakes. +// res = self.handshake.join_next(), if !self.handshake.is_empty() => { +// let res = res.expect("no tasks").expect("task aborted"); +// match res { +// Ok(session) => return Ok(session), +// Err(err) => log::warn!("failed to accept session: {:?}", err), +// } +// }, +// ) +// } +// } +// } + + +pub struct Session, C: Connection + Send> { + pub control: Control, + pub objects: Objects, +} + +impl, R: RecvStream + Send + 'static, C: Connection + Send> Session { + + pub async fn accept(control_stream: Box, connection: Box) -> anyhow::Result> { + let mut control = Control::new(control_stream); + let objects = Objects::new(std::sync::Arc::new(std::sync::Mutex::new(connection))); + + let setup_client = match control.recv().await.context("failed to read SETUP")? { + Message::SetupClient(setup) => setup, + _ => anyhow::bail!("expected CLIENT SETUP"), + }; + Ok(AcceptSetup { setup_client, control, objects }) + + } + pub fn split(self) -> (Control, Objects) { + (self.control, self.objects) + } +} + + +pub struct AcceptSetup, C: Connection + Send> { + setup_client: SetupClient, + control: Control, + objects: Objects, +} + +impl, C: Connection + Send> AcceptSetup { + // Return the setup message we received. + pub fn setup(&self) -> &SetupClient { + &self.setup_client + } + + // Accept the session with our own setup message. + pub async fn accept(mut self, setup_server: SetupServer) -> anyhow::Result> { + self.control.send(setup_server).await?; + Ok(Session { + control: self.control, + objects: self.objects, + }) + } + + pub async fn reject(self) -> anyhow::Result<()> { + // TODO Close the QUIC connection with an error code. + Ok(()) + } +} \ No newline at end of file diff --git a/moq-warp/Cargo.toml b/moq-warp/Cargo.toml index a87babe..3ee9355 100644 --- a/moq-warp/Cargo.toml +++ b/moq-warp/Cargo.toml @@ -17,7 +17,10 @@ categories = [ "multimedia", "network-programming", "web-programming" ] [dependencies] moq-transport = { path = "../moq-transport" } moq-transport-quinn = { path = "../moq-transport-quinn" } +moq-transport-trait = { path = "../moq-transport-trait" } +moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"} +bytes = "1" tokio = "1.27" mp4 = "0.13.0" anyhow = "1.0.70" diff --git a/moq-warp/src/relay/contribute.rs b/moq-warp/src/relay/contribute.rs index 7f3660d..dca150a 100644 --- a/moq-warp/src/relay/contribute.rs +++ b/moq-warp/src/relay/contribute.rs @@ -2,12 +2,15 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time; +use bytes::Buf; +use moq_generic_transport::{Connection, RecvStream, SendStream, SendStreamUnframed, BidiStream}; use tokio::io::AsyncReadExt; use tokio::sync::mpsc; use tokio::task::JoinSet; // lock across await boundaries use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; -use moq_transport_quinn::{RecvObjects, RecvStream}; +use moq_transport_trait::{RecvObjects}; + use anyhow::Context; @@ -16,9 +19,9 @@ use crate::model::{broadcast, segment, track}; use crate::source::Source; // TODO experiment with making this Clone, so every task can have its own copy. -pub struct Session { +pub struct Session, C: Connection + Send> { // Used to receive objects. - objects: RecvObjects, + objects: RecvObjects, // Used to send and receive control messages. control: control::Component, @@ -36,9 +39,9 @@ pub struct Session { run_segments: JoinSet>, // receiving objects } -impl Session { +impl + Send + 'static, B: BidiStream, C: Connection + Send> Session { pub fn new( - objects: RecvObjects, + objects: RecvObjects, control: control::Component, broker: broker::Broadcasts, ) -> Self { @@ -63,7 +66,7 @@ impl Session { }, object = self.objects.recv() => { let (object, stream) = object.context("failed to receive object")?; - let res = self.receive_object(object, stream).await; + let res = self.receive_object(object, Box::new(stream)).await; if let Err(err) = res { log::error!("failed to receive object: {:?}", err); } @@ -88,7 +91,7 @@ impl Session { } } - async fn receive_object(&mut self, object: Object, stream: RecvStream) -> anyhow::Result<()> { + async fn receive_object + Send + 'static>(&mut self, object: Object, stream: Box) -> anyhow::Result<()> { let track = object.track; let segment = segment::Info { @@ -111,16 +114,18 @@ impl Session { Ok(()) } - async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> { - let mut buf = [0u8; 32 * 1024]; + async fn run_segment + Send + 'static>(mut segment: segment::Publisher, mut stream: Box) -> anyhow::Result<()> { + // let mut buf = [0u8; 32 * 1024]; loop { - let size = stream.read(&mut buf).await.context("failed to read from stream")?; - if size == 0 { + let mut b = bytes::BytesMut::new(); + let stream_finished = !moq_generic_transport::recv(stream.as_mut(), &mut b).await?; + // let size = stream.read(&mut buf).await.context("failed to read from stream")?; + if stream_finished { return Ok(()); } - let chunk = buf[..size].to_vec(); - segment.fragments.push(chunk.into()) + // let chunk = buf[..size].to_vec(); + segment.fragments.push(b.chunk().to_vec().into()) } } @@ -174,7 +179,7 @@ impl Session { } } -impl Drop for Session { +impl, R: RecvStream + Send, C: Connection + Send> Drop for Session { fn drop(&mut self) { // Unannounce all broadcasts we have announced. // TODO make this automatic so we can't screw up? diff --git a/moq-warp/src/relay/control.rs b/moq-warp/src/relay/control.rs index 2151594..0be96da 100644 --- a/moq-warp/src/relay/control.rs +++ b/moq-warp/src/relay/control.rs @@ -1,17 +1,18 @@ +use moq_generic_transport::{SendStream, SendStreamUnframed, BidiStream}; use tokio::sync::mpsc; use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk}; -use moq_transport_quinn::Control; +use moq_transport_trait::Control; -pub struct Main { - control: Control, +pub struct Main> { + control: Control, outgoing: mpsc::Receiver, contribute: mpsc::Sender, distribute: mpsc::Sender, } -impl Main { +impl> Main { pub async fn run(mut self) -> anyhow::Result<()> { loop { tokio::select! { @@ -51,7 +52,7 @@ impl Component { } // Splits a control stream into two components, based on if it's a message for contribution or distribution. -pub fn split(control: Control) -> (Main, Component, Component) { +pub fn split>(control: Control) -> (Main, Component, Component) { let (outgoing_tx, outgoing_rx) = mpsc::channel(1); let (contribute_tx, contribute_rx) = mpsc::channel(1); let (distribute_tx, distribute_rx) = mpsc::channel(1); diff --git a/moq-warp/src/relay/distribute.rs b/moq-warp/src/relay/distribute.rs index 0db689d..734c10b 100644 --- a/moq-warp/src/relay/distribute.rs +++ b/moq-warp/src/relay/distribute.rs @@ -1,17 +1,19 @@ use anyhow::Context; +use bytes::Buf; +use moq_generic_transport::{SendStream, SendStreamUnframed, BidiStream, Connection}; use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; // allows locking across await use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; -use moq_transport_quinn::SendObjects; +use moq_transport_trait::SendObjects; use super::{broker, control}; use crate::model::{segment, track}; -pub struct Session { +pub struct Session, C: Connection + Send> { // Objects are sent to the client - objects: SendObjects, + objects: SendObjects, // Used to send and receive control messages. control: control::Component, @@ -23,9 +25,9 @@ pub struct Session { run_subscribes: JoinSet, // run subscriptions, sending the returned error if they fail } -impl Session { +impl, C: Connection + Send + 'static> Session { pub fn new( - objects: SendObjects, + objects: SendObjects, control: control::Component, broker: broker::Broadcasts, ) -> Self { @@ -119,7 +121,7 @@ impl Session { Ok(()) } - async fn run_subscribe(objects: SendObjects, track_id: VarInt, mut track: track::Subscriber) -> SubscribeError { + async fn run_subscribe(objects: SendObjects, track_id: VarInt, mut track: track::Subscriber) -> SubscribeError { let mut tasks = JoinSet::new(); let mut result = None; @@ -154,7 +156,7 @@ impl Session { } async fn serve_group( - mut objects: SendObjects, + mut objects: SendObjects, track_id: VarInt, mut segment: segment::Subscriber, ) -> anyhow::Result<()> { @@ -169,7 +171,11 @@ impl Session { // Write each fragment as they are available. while let Some(fragment) = segment.fragments.next().await { - stream.write_all(fragment.as_slice()).await?; + let mut buf = bytes::Bytes::copy_from_slice(fragment.as_slice()); + while buf.has_remaining() { + moq_generic_transport::send(&mut stream, &mut buf).await?; + } + // stream.write_all(fragment.as_slice()).await?; } // NOTE: stream is automatically closed when dropped diff --git a/moq-warp/src/relay/server.rs b/moq-warp/src/relay/server.rs index dfbbcfd..c22a6a6 100644 --- a/moq-warp/src/relay/server.rs +++ b/moq-warp/src/relay/server.rs @@ -80,26 +80,26 @@ impl Server { Ok(Self { server, broker, tasks }) } - pub async fn run(mut self) -> anyhow::Result<()> { - loop { - tokio::select! { - res = self.server.accept() => { - let session = res.context("failed to accept connection")?; - let broker = self.broker.clone(); + // pub async fn run(mut self) -> anyhow::Result<()> { + // loop { + // tokio::select! { + // res = self.server.accept() => { + // let session = res.context("failed to accept connection")?; + // let broker = self.broker.clone(); - self.tasks.spawn(async move { - let session: Session = Session::accept(session, broker).await?; - session.run().await - }); - }, - res = self.tasks.join_next(), if !self.tasks.is_empty() => { - let res = res.expect("no tasks").expect("task aborted"); + // self.tasks.spawn(async move { + // let session: Session = Session::accept(session, broker).await?; + // session.run().await + // }); + // }, + // res = self.tasks.join_next(), if !self.tasks.is_empty() => { + // let res = res.expect("no tasks").expect("task aborted"); - if let Err(err) = res { - log::error!("session terminated: {:?}", err); - } - }, - } - } - } + // if let Err(err) = res { + // log::error!("session terminated: {:?}", err); + // } + // }, + // } + // } + // } } diff --git a/moq-warp/src/relay/session.rs b/moq-warp/src/relay/session.rs index 20de40f..e9489f2 100644 --- a/moq-warp/src/relay/session.rs +++ b/moq-warp/src/relay/session.rs @@ -1,49 +1,71 @@ use anyhow::Context; +use moq_generic_transport::{SendStream, SendStreamUnframed, BidiStream, Connection, RecvStream}; use super::{broker, contribute, control, distribute}; use moq_transport::{Role, SetupServer, Version}; use moq_transport_quinn::Connect; -pub struct Session { +pub struct Session, C: Connection + Send> { // Split logic into contribution/distribution to reduce the problem space. - contribute: contribute::Session, - distribute: distribute::Session, + contribute: contribute::Session, + distribute: distribute::Session, // Used to receive control messages and forward to contribute/distribute. - control: control::Main, + control: control::Main, } -impl Session { - pub async fn accept(session: Connect, broker: broker::Broadcasts) -> anyhow::Result { - // Accep the WebTransport session. - // OPTIONAL validate the conn.uri() otherwise call conn.reject() - let session = session - .accept() - .await - .context(": server::Setupfailed to accept WebTransport session")?; +impl, R: RecvStream + Send + 'static, C: Connection + Send + 'static> Session { + // pub async fn accept(session: Connect, broker: broker::Broadcasts) -> anyhow::Result> { + // // Accep the WebTransport session. + // // OPTIONAL validate the conn.uri() otherwise call conn.reject() + // let session = session + // .accept() + // .await + // .context(": server::Setupfailed to accept WebTransport session")?; - session - .setup() - .versions - .iter() - .find(|v| **v == Version::DRAFT_00) - .context("failed to find supported version")?; + // session + // .setup() + // .versions + // .iter() + // .find(|v| **v == Version::DRAFT_00) + // .context("failed to find supported version")?; - // Choose our role based on the client's role. - let role = match session.setup().role { - Role::Publisher => Role::Subscriber, - Role::Subscriber => Role::Publisher, - Role::Both => Role::Both, - }; + // // Choose our role based on the client's role. + // let role = match session.setup().role { + // Role::Publisher => Role::Subscriber, + // Role::Subscriber => Role::Publisher, + // Role::Both => Role::Both, + // }; - let setup = SetupServer { - version: Version::DRAFT_00, - role, - }; + // let setup = SetupServer { + // version: Version::DRAFT_00, + // role, + // }; - let session = session.accept(setup).await?; + // let session = session.accept(setup).await?; + // let (control, objects) = session.split(); + // let (objects_send, objects_recv) = objects.split(); + + // let (control, contribute, distribute) = control::split(control); + + // let contribute = contribute::Session::new(objects_recv, contribute, broker.clone()); + // let distribute = distribute::Session::new(objects_send, distribute, broker); + + // let session = Self { + // control, + // contribute, + // distribute, + // }; + + // Ok(session) + // } + + pub async fn from_session( + session: moq_transport_trait::Session, + broker: broker::Broadcasts, + ) -> anyhow::Result> { let (control, objects) = session.split(); let (objects_send, objects_recv) = objects.split();