diff --git a/server/src/session/mod.rs b/server/src/session/mod.rs index 0b4af65..e5c7cb0 100644 --- a/server/src/session/mod.rs +++ b/server/src/session/mod.rs @@ -71,7 +71,7 @@ impl transport::App for Session { self.streams.poll(conn); // Fetch the next media fragment, possibly queuing up stream data. - self.poll_source(conn, session).expect("poll_source"); + self.poll_source(conn, session)?; Ok(()) } diff --git a/server/src/transport/server.rs b/server/src/transport/server.rs index d0ab1f7..19031af 100644 --- a/server/src/transport/server.rs +++ b/server/src/transport/server.rs @@ -151,7 +151,6 @@ impl Server { // Check if it's an existing connection. if let Some(conn) = self.conns.get_mut(&hdr.dcid) { - // initial or handshake traffic. conn.quiche.recv(src, info)?; if conn.session.is_none() && conn.quiche.is_established() { @@ -162,7 +161,6 @@ impl Server { continue; } else if let Some(conn) = self.conns.get_mut(&conn_id) { - // 1-RTT traffic. conn.quiche.recv(src, info)?; // TODO is this needed here? @@ -176,7 +174,8 @@ impl Server { } if hdr.ty != quiche::Type::Initial { - anyhow::bail!("unknown connection ID"); + log::warn!("unknown connection ID"); + continue; } let mut dst = [0; MAX_DATAGRAM_SIZE]; @@ -222,11 +221,13 @@ impl Server { // The token was not valid, meaning the retry failed, so // drop the packet. if odcid.is_none() { - anyhow::bail!("invalid token"); + log::warn!("invalid token"); + continue; } if scid.len() != hdr.dcid.len() { - anyhow::bail!("invalid connection ID"); + log::warn!("invalid connection ID"); + continue; } // Reuse the source connection ID we sent in the Retry packet, @@ -234,6 +235,8 @@ impl Server { let conn_id = hdr.dcid.clone(); let local_addr = self.socket.local_addr().unwrap(); + log::debug!("new connection: dcid={:?} scid={:?}", hdr.dcid, scid); + let mut conn = quiche::accept(&conn_id, odcid.as_ref(), local_addr, from, &mut self.quic)?; @@ -246,13 +249,16 @@ impl Server { app: T::default(), }; - self.conns - .insert(user.quiche.source_id().into_owned(), user); + self.conns.insert(conn_id, user); } } pub fn app(&mut self) -> anyhow::Result<()> { for conn in self.conns.values_mut() { + if conn.quiche.is_closed() { + continue; + } + if let Some(session) = &mut conn.session { if let Err(e) = conn.app.poll(&mut conn.quiche, session) { log::debug!("app error: {:?}", e); @@ -271,23 +277,12 @@ impl Server { // them on the UDP socket, until quiche reports that there are no more // packets to be sent. pub fn send(&mut self) -> anyhow::Result<()> { - let mut pkt = [0; MAX_DATAGRAM_SIZE]; - for conn in self.conns.values_mut() { - loop { - let (size, info) = match conn.quiche.send(&mut pkt) { - Ok(v) => v, - Err(quiche::Error::Done) => return Ok(()), - Err(e) => return Err(e.into()), - }; + let conn = &mut conn.quiche; - let pkt = &pkt[..size]; - - match self.socket.send_to(pkt, info.to) { - Err(err) if err.kind() == io::ErrorKind::WouldBlock => break, - Err(err) => return Err(err.into()), - Ok(_) => (), - } + if let Err(e) = send_conn(&self.socket, conn) { + log::error!("{} send failed: {:?}", conn.trace_id(), e); + conn.close(false, 0x1, b"fail").ok(); } } @@ -300,6 +295,27 @@ impl Server { } } +// Send any pending packets for the connection over the socket. +fn send_conn(socket: &mio::net::UdpSocket, conn: &mut quiche::Connection) -> anyhow::Result<()> { + let mut pkt = [0; MAX_DATAGRAM_SIZE]; + + loop { + let (size, info) = match conn.send(&mut pkt) { + Ok(v) => v, + Err(quiche::Error::Done) => return Ok(()), + Err(e) => return Err(e.into()), + }; + + let pkt = &pkt[..size]; + + match socket.send_to(pkt, info.to) { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(()), + Err(e) => return Err(e.into()), + Ok(_) => (), + } + } +} + /// Generate a stateless retry token. /// /// The token includes the static string `"quiche"` followed by the IP address