diff --git a/moq-transport-trait/src/control.rs b/moq-transport-trait/src/control.rs index f2d512f..9dca713 100644 --- a/moq-transport-trait/src/control.rs +++ b/moq-transport-trait/src/control.rs @@ -1,4 +1,4 @@ -use moq_generic_transport::{SendStream, RecvStream, BidiStream, SendStreamUnframed}; +use moq_generic_transport::{SendStream, RecvStream, BidiStream, SendStreamUnframed, Connection}; use moq_transport::{Decode, DecodeError, Encode, Message}; use bytes::{Buf, BytesMut}; @@ -9,12 +9,12 @@ use std::sync::Arc; use tokio::sync::Mutex; -pub struct Control> { - sender: ControlSend, +pub struct Control { + sender: ControlSend, recver: ControlRecv, } -impl> Control { +impl Control{ pub(crate) fn new(stream: Box) -> Self { let (sender, recver) = stream.split(); let sender = ControlSend::new(Box::new(sender)); @@ -23,7 +23,7 @@ impl> Control< Self { sender, recver } } - pub fn split(self) -> (ControlSend, ControlRecv) { + pub fn split(self) -> (ControlSend, ControlRecv) { (self.sender, self.recver) } @@ -36,13 +36,13 @@ impl> Control< } } -pub struct ControlSend { - stream: Box, +pub struct ControlSend { + stream: Box, buf: BytesMut, // reuse a buffer to encode messages. } -impl ControlSend { - pub fn new(inner: Box) -> Self { +impl ControlSend { + pub fn new(inner: Box) -> Self { Self { buf: BytesMut::new(), stream: inner, @@ -63,10 +63,9 @@ impl ControlSend { } // Helper that lets multiple threads send control messages. - pub fn share(self) -> ControlShared { + pub fn share(self) -> ControlShared { ControlShared { stream: Arc::new(Mutex::new(self)), - _marker: PhantomData, } } } @@ -74,12 +73,11 @@ impl ControlSend { // Helper that allows multiple threads to send control messages. // There's no equivalent for receiving since only one thread should be receiving at a time. #[derive(Clone)] -pub struct ControlShared { - stream: Arc>>, - _marker: PhantomData +pub struct ControlShared { + stream: Arc>>, } -impl ControlShared { +impl ControlShared { pub async fn send>(&mut self, msg: T) -> anyhow::Result<()> { let mut stream = self.stream.lock().await; stream.send(msg).await diff --git a/moq-transport-trait/src/server.rs b/moq-transport-trait/src/server.rs index 91273dd..d58c444 100644 --- a/moq-transport-trait/src/server.rs +++ b/moq-transport-trait/src/server.rs @@ -42,14 +42,14 @@ use super::{Control, Objects}; // } -pub struct Session, C: Connection + Send> { - pub control: Control, +pub struct Session { + pub control: Control, pub objects: Objects, } -impl, R: RecvStream + Send + 'static, C: Connection + Send> Session { +impl + Send> Session { - pub async fn accept(control_stream: Box, connection: Box) -> anyhow::Result> { + pub async fn accept(control_stream: Box, connection: Box) -> anyhow::Result> { let mut control = Control::new(control_stream); let objects = Objects::new(std::sync::Arc::new(std::sync::Mutex::new(connection))); @@ -60,26 +60,26 @@ impl (Control, Objects) { + pub fn split(self) -> (Control, Objects) { (self.control, self.objects) } } -pub struct AcceptSetup, C: Connection + Send> { +pub struct AcceptSetup { setup_client: SetupClient, - control: Control, + control: Control, objects: Objects, } -impl, C: Connection + Send> AcceptSetup { +impl AcceptSetup { // Return the setup message we received. pub fn setup(&self) -> &SetupClient { &self.setup_client } // Accept the session with our own setup message. - pub async fn accept(mut self, setup_server: SetupServer) -> anyhow::Result> { + pub async fn accept(mut self, setup_server: SetupServer) -> anyhow::Result> { self.control.send(setup_server).await?; Ok(Session { control: self.control, diff --git a/moq-warp/src/relay/contribute.rs b/moq-warp/src/relay/contribute.rs index dca150a..e940f0d 100644 --- a/moq-warp/src/relay/contribute.rs +++ b/moq-warp/src/relay/contribute.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::marker::PhantomData; use std::sync::{Arc, Mutex}; use std::time; @@ -19,7 +20,7 @@ use crate::model::{broadcast, segment, track}; use crate::source::Source; // TODO experiment with making this Clone, so every task can have its own copy. -pub struct Session, C: Connection + Send> { +pub struct Session { // Used to receive objects. objects: RecvObjects, @@ -37,9 +38,10 @@ pub struct Session>, // receiving objects + _marker: PhantomData, } -impl + Send + 'static, B: BidiStream, C: Connection + Send> Session { +impl + Send> Session { pub fn new( objects: RecvObjects, control: control::Component, @@ -52,6 +54,7 @@ impl broadcasts: HashMap::new(), publishers: Publishers::new(), run_segments: JoinSet::new(), + _marker: PhantomData, } } @@ -179,7 +182,7 @@ impl } } -impl, R: RecvStream + Send, C: Connection + Send> Drop for Session { +impl Drop for Session { fn drop(&mut self) { // Unannounce all broadcasts we have announced. // TODO make this automatic so we can't screw up? diff --git a/moq-warp/src/relay/control.rs b/moq-warp/src/relay/control.rs index 0be96da..0da62d6 100644 --- a/moq-warp/src/relay/control.rs +++ b/moq-warp/src/relay/control.rs @@ -4,15 +4,15 @@ use tokio::sync::mpsc; use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk}; use moq_transport_trait::Control; -pub struct Main> { - control: Control, +pub struct Main { + control: Control, outgoing: mpsc::Receiver, contribute: mpsc::Sender, distribute: mpsc::Sender, } -impl> Main { +impl Main { pub async fn run(mut self) -> anyhow::Result<()> { loop { tokio::select! { @@ -52,7 +52,7 @@ impl Component { } // Splits a control stream into two components, based on if it's a message for contribution or distribution. -pub fn split>(control: Control) -> (Main, Component, Component) { +pub fn split>(control: Control) -> (Main, Component, Component) { let (outgoing_tx, outgoing_rx) = mpsc::channel(1); let (contribute_tx, contribute_rx) = mpsc::channel(1); let (distribute_tx, distribute_rx) = mpsc::channel(1); diff --git a/moq-warp/src/relay/distribute.rs b/moq-warp/src/relay/distribute.rs index 734c10b..e5f6145 100644 --- a/moq-warp/src/relay/distribute.rs +++ b/moq-warp/src/relay/distribute.rs @@ -1,3 +1,5 @@ +use std::marker::PhantomData; + use anyhow::Context; use bytes::Buf; @@ -11,7 +13,7 @@ use moq_transport_trait::SendObjects; use super::{broker, control}; use crate::model::{segment, track}; -pub struct Session, C: Connection + Send> { +pub struct Session { // Objects are sent to the client objects: SendObjects, @@ -23,9 +25,13 @@ pub struct Session, // run subscriptions, sending the returned error if they fail + + _marker: PhantomData, } -impl, C: Connection + Send + 'static> Session { +impl Session where + S: SendStream + SendStreamUnframed + Send, + C: Connection + Send + 'static { pub fn new( objects: SendObjects, control: control::Component, @@ -36,6 +42,7 @@ impl, C control, broker, run_subscribes: JoinSet::new(), + _marker: PhantomData, } } diff --git a/moq-warp/src/relay/session.rs b/moq-warp/src/relay/session.rs index e9489f2..f265871 100644 --- a/moq-warp/src/relay/session.rs +++ b/moq-warp/src/relay/session.rs @@ -1,3 +1,5 @@ +use std::marker::PhantomData; + use anyhow::Context; use moq_generic_transport::{SendStream, SendStreamUnframed, BidiStream, Connection, RecvStream}; @@ -6,16 +8,23 @@ use super::{broker, contribute, control, distribute}; use moq_transport::{Role, SetupServer, Version}; use moq_transport_quinn::Connect; -pub struct Session, C: Connection + Send> { +pub struct Session { // Split logic into contribution/distribution to reduce the problem space. - contribute: contribute::Session, - distribute: distribute::Session, + contribute: contribute::Session, + distribute: distribute::Session, // Used to receive control messages and forward to contribute/distribute. - control: control::Main, + control: control::Main, + _marker: PhantomData, + _marker_r: PhantomData, } -impl, R: RecvStream + Send + 'static, C: Connection + Send + 'static> Session { +// impl + Send + 'static> Session { +impl Session where + R: RecvStream + Send + 'static, + S: SendStream + SendStreamUnframed + Send, + C: Connection + Send + 'static +{ // pub async fn accept(session: Connect, broker: broker::Broadcasts) -> anyhow::Result> { // // Accep the WebTransport session. // // OPTIONAL validate the conn.uri() otherwise call conn.reject() @@ -63,9 +72,9 @@ impl, + session: moq_transport_trait::Session, broker: broker::Broadcasts, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let (control, objects) = session.split(); let (objects_send, objects_recv) = objects.split(); @@ -78,6 +87,8 @@ impl