Add some more connection logging. (#78)
So I can debug why my handshake is failing.
This commit is contained in:
parent
fbd06da2ee
commit
11f8be65d5
|
@ -89,6 +89,8 @@ impl Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(mut self) -> anyhow::Result<()> {
|
pub async fn run(mut self) -> anyhow::Result<()> {
|
||||||
|
log::info!("listening on {}", self.server.local_addr()?);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
res = self.server.accept() => {
|
res = self.server.accept() => {
|
||||||
|
|
|
@ -18,9 +18,18 @@ impl Session {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&mut self, conn: quinn::Connecting) -> anyhow::Result<()> {
|
pub async fn run(&mut self, conn: quinn::Connecting) -> anyhow::Result<()> {
|
||||||
|
log::debug!("received QUIC handshake: ip={:?}", conn.remote_address());
|
||||||
|
|
||||||
// Wait for the QUIC connection to be established.
|
// Wait for the QUIC connection to be established.
|
||||||
let conn = conn.await.context("failed to establish QUIC connection")?;
|
let conn = conn.await.context("failed to establish QUIC connection")?;
|
||||||
|
|
||||||
|
log::debug!(
|
||||||
|
"established QUIC connection: ip={:?} id={}",
|
||||||
|
conn.remote_address(),
|
||||||
|
conn.stable_id()
|
||||||
|
);
|
||||||
|
let id = conn.stable_id();
|
||||||
|
|
||||||
// Wait for the CONNECT request.
|
// Wait for the CONNECT request.
|
||||||
let request = webtransport_quinn::accept(conn)
|
let request = webtransport_quinn::accept(conn)
|
||||||
.await
|
.await
|
||||||
|
@ -28,30 +37,38 @@ impl Session {
|
||||||
|
|
||||||
let path = request.uri().path().to_string();
|
let path = request.uri().path().to_string();
|
||||||
|
|
||||||
|
log::debug!("received WebTransport CONNECT: id={} path={}", id, path);
|
||||||
|
|
||||||
// Accept the CONNECT request.
|
// Accept the CONNECT request.
|
||||||
let session = request
|
let session = request
|
||||||
.ok()
|
.ok()
|
||||||
.await
|
.await
|
||||||
.context("failed to respond to WebTransport request")?;
|
.context("failed to respond to WebTransport request")?;
|
||||||
|
|
||||||
|
log::debug!("accepted WebTransport CONNECT: id={} path={}", id, path);
|
||||||
|
|
||||||
// Perform the MoQ handshake.
|
// Perform the MoQ handshake.
|
||||||
let request = moq_transport::session::Server::accept(session)
|
let request = moq_transport::session::Server::accept(session)
|
||||||
.await
|
.await
|
||||||
.context("failed to accept handshake")?;
|
.context("failed to accept handshake")?;
|
||||||
|
|
||||||
|
log::debug!("received MoQ SETUP: id={} role={:?}", id, request.role());
|
||||||
|
|
||||||
let role = request.role();
|
let role = request.role();
|
||||||
|
|
||||||
match role {
|
match role {
|
||||||
Role::Publisher => self.serve_publisher(request, &path).await,
|
Role::Publisher => self.serve_publisher(id, request, &path).await,
|
||||||
Role::Subscriber => self.serve_subscriber(request, &path).await,
|
Role::Subscriber => self.serve_subscriber(id, request, &path).await,
|
||||||
Role::Both => request.reject(300),
|
Role::Both => request.reject(300),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
log::debug!("closing connection: id={}", id);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn serve_publisher(&mut self, request: Request, path: &str) {
|
async fn serve_publisher(&mut self, id: usize, request: Request, path: &str) {
|
||||||
log::info!("publisher: path={}", path);
|
log::info!("serving publisher: id={}, path={}", id, path);
|
||||||
|
|
||||||
let (publisher, subscriber) = broadcast::new();
|
let (publisher, subscriber) = broadcast::new();
|
||||||
|
|
||||||
|
@ -61,7 +78,7 @@ impl Session {
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(err) = self.run_publisher(request, publisher).await {
|
if let Err(err) = self.run_publisher(request, publisher).await {
|
||||||
log::warn!("pubisher error: path={} err={:?}", path, err);
|
log::warn!("error serving pubisher: id={} path={} err={:?}", id, path, err);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.broadcasts.lock().unwrap().remove(path);
|
self.broadcasts.lock().unwrap().remove(path);
|
||||||
|
@ -73,8 +90,8 @@ impl Session {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn serve_subscriber(&mut self, request: Request, path: &str) {
|
async fn serve_subscriber(&mut self, id: usize, request: Request, path: &str) {
|
||||||
log::info!("subscriber: path={}", path);
|
log::info!("serving subscriber: id={} path={}", id, path);
|
||||||
|
|
||||||
let broadcast = match self.broadcasts.lock().unwrap().get(path) {
|
let broadcast = match self.broadcasts.lock().unwrap().get(path) {
|
||||||
Some(broadcast) => broadcast.clone(),
|
Some(broadcast) => broadcast.clone(),
|
||||||
|
@ -84,7 +101,7 @@ impl Session {
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(err) = self.run_subscriber(request, broadcast).await {
|
if let Err(err) = self.run_subscriber(request, broadcast).await {
|
||||||
log::warn!("subscriber error: path={} err={:?}", path, err);
|
log::warn!("error serving subscriber: id={} path={} err={:?}", id, path, err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue