diff --git a/moq-relay/src/server.rs b/moq-relay/src/server.rs index f7b69a5..cf4fffb 100644 --- a/moq-relay/src/server.rs +++ b/moq-relay/src/server.rs @@ -89,6 +89,8 @@ impl Server { } pub async fn run(mut self) -> anyhow::Result<()> { + log::info!("listening on {}", self.server.local_addr()?); + loop { tokio::select! { res = self.server.accept() => { diff --git a/moq-relay/src/session.rs b/moq-relay/src/session.rs index dcc367d..0a8c28d 100644 --- a/moq-relay/src/session.rs +++ b/moq-relay/src/session.rs @@ -18,9 +18,18 @@ impl Session { } pub async fn run(&mut self, conn: quinn::Connecting) -> anyhow::Result<()> { + log::debug!("received QUIC handshake: ip={:?}", conn.remote_address()); + // Wait for the QUIC connection to be established. let conn = conn.await.context("failed to establish QUIC connection")?; + log::debug!( + "established QUIC connection: ip={:?} id={}", + conn.remote_address(), + conn.stable_id() + ); + let id = conn.stable_id(); + // Wait for the CONNECT request. let request = webtransport_quinn::accept(conn) .await @@ -28,30 +37,38 @@ impl Session { let path = request.uri().path().to_string(); + log::debug!("received WebTransport CONNECT: id={} path={}", id, path); + // Accept the CONNECT request. let session = request .ok() .await .context("failed to respond to WebTransport request")?; + log::debug!("accepted WebTransport CONNECT: id={} path={}", id, path); + // Perform the MoQ handshake. let request = moq_transport::session::Server::accept(session) .await .context("failed to accept handshake")?; + log::debug!("received MoQ SETUP: id={} role={:?}", id, request.role()); + let role = request.role(); match role { - Role::Publisher => self.serve_publisher(request, &path).await, - Role::Subscriber => self.serve_subscriber(request, &path).await, + Role::Publisher => self.serve_publisher(id, request, &path).await, + Role::Subscriber => self.serve_subscriber(id, request, &path).await, Role::Both => request.reject(300), }; + log::debug!("closing connection: id={}", id); + Ok(()) } - async fn serve_publisher(&mut self, request: Request, path: &str) { - log::info!("publisher: path={}", path); + async fn serve_publisher(&mut self, id: usize, request: Request, path: &str) { + log::info!("serving publisher: id={}, path={}", id, path); let (publisher, subscriber) = broadcast::new(); @@ -61,7 +78,7 @@ impl Session { }; if let Err(err) = self.run_publisher(request, publisher).await { - log::warn!("pubisher error: path={} err={:?}", path, err); + log::warn!("error serving pubisher: id={} path={} err={:?}", id, path, err); } self.broadcasts.lock().unwrap().remove(path); @@ -73,8 +90,8 @@ impl Session { Ok(()) } - async fn serve_subscriber(&mut self, request: Request, path: &str) { - log::info!("subscriber: path={}", path); + async fn serve_subscriber(&mut self, id: usize, request: Request, path: &str) { + log::info!("serving subscriber: id={} path={}", id, path); let broadcast = match self.broadcasts.lock().unwrap().get(path) { Some(broadcast) => broadcast.clone(), @@ -84,7 +101,7 @@ impl Session { }; if let Err(err) = self.run_subscriber(request, broadcast).await { - log::warn!("subscriber error: path={} err={:?}", path, err); + log::warn!("error serving subscriber: id={} path={} err={:?}", id, path, err); } }