diff --git a/moq-warp/src/relay/contribute.rs b/moq-warp/src/relay/contribute.rs index d21b684..7f3660d 100644 --- a/moq-warp/src/relay/contribute.rs +++ b/moq-warp/src/relay/contribute.rs @@ -94,7 +94,7 @@ impl Session { let segment = segment::Info { sequence: object.sequence, send_order: object.send_order, - expires: Some(time::Instant::now() + time::Duration::from_secs(10)), + expires: Some(time::Instant::now() + time::Duration::from_secs(2)), // TODO increase this once send_order is implemented }; let segment = segment::Publisher::new(segment); @@ -162,7 +162,7 @@ impl Session { fn receive_subscribe_error(&mut self, msg: SubscribeError) -> anyhow::Result<()> { let error = track::Error { code: msg.code, - reason: format!("upstream error: {}", msg.reason), + reason: msg.reason, }; // Stop producing the track. @@ -284,13 +284,16 @@ impl Publishers { pub async fn incoming(&mut self) -> anyhow::Result { let (namespace, track) = self.receiver.recv().await.context("no more subscriptions")?; + let id = VarInt::try_from(self.next)?; + self.next += 1; + let msg = Subscribe { - track_id: VarInt::try_from(self.next)?, + track_id: id, track_namespace: namespace, - track_name: track.name, + track_name: track.name.clone(), }; - self.next += 1; + self.tracks.insert(id, Some(track)); Ok(msg) } diff --git a/moq-warp/src/relay/session.rs b/moq-warp/src/relay/session.rs index 90d1908..20de40f 100644 --- a/moq-warp/src/relay/session.rs +++ b/moq-warp/src/relay/session.rs @@ -30,11 +30,16 @@ impl Session { .find(|v| **v == Version::DRAFT_00) .context("failed to find supported version")?; - // TODO use the role to decide if we can publish or subscribe + // Choose our role based on the client's role. + let role = match session.setup().role { + Role::Publisher => Role::Subscriber, + Role::Subscriber => Role::Publisher, + Role::Both => Role::Both, + }; let setup = SetupServer { version: Version::DRAFT_00, - role: Role::Publisher, + role, }; let session = session.accept(setup).await?; diff --git a/moq-warp/src/source/file.rs b/moq-warp/src/source/file.rs index 9993eaf..0d99812 100644 --- a/moq-warp/src/source/file.rs +++ b/moq-warp/src/source/file.rs @@ -54,7 +54,7 @@ impl File { // Create the catalog track let (_catalog, subscriber) = Self::create_catalog(init); - source.insert("catalog".to_string(), subscriber); + source.insert("0".to_string(), subscriber); let mut tracks = HashMap::new(); @@ -83,7 +83,7 @@ impl File { fn create_catalog(raw: Vec) -> (track::Publisher, track::Subscriber) { // Create a track with a single segment containing the init data. - let mut catalog = track::Publisher::new("catalog"); + let mut catalog = track::Publisher::new("0"); // Subscribe to the catalog before we push the segment. let subscriber = catalog.subscribe(); @@ -213,7 +213,7 @@ impl Track { .unwrap(); // Delete segments after 10s. - let expires = Some(now + time::Duration::from_secs(10)); + let expires = Some(now + time::Duration::from_secs(2)); // TODO increase this once send order is implemented let sequence = self.sequence.try_into().context("sequence too large")?; self.sequence += 1;