Fix some crashes with the server.
This commit is contained in:
parent
58a1aa85ac
commit
e15812ebed
|
@ -71,7 +71,7 @@ impl transport::App for Session {
|
||||||
self.streams.poll(conn);
|
self.streams.poll(conn);
|
||||||
|
|
||||||
// Fetch the next media fragment, possibly queuing up stream data.
|
// Fetch the next media fragment, possibly queuing up stream data.
|
||||||
self.poll_source(conn, session).expect("poll_source");
|
self.poll_source(conn, session)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -151,7 +151,6 @@ impl<T: app::App> Server<T> {
|
||||||
|
|
||||||
// Check if it's an existing connection.
|
// Check if it's an existing connection.
|
||||||
if let Some(conn) = self.conns.get_mut(&hdr.dcid) {
|
if let Some(conn) = self.conns.get_mut(&hdr.dcid) {
|
||||||
// initial or handshake traffic.
|
|
||||||
conn.quiche.recv(src, info)?;
|
conn.quiche.recv(src, info)?;
|
||||||
|
|
||||||
if conn.session.is_none() && conn.quiche.is_established() {
|
if conn.session.is_none() && conn.quiche.is_established() {
|
||||||
|
@ -162,7 +161,6 @@ impl<T: app::App> Server<T> {
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
} else if let Some(conn) = self.conns.get_mut(&conn_id) {
|
} else if let Some(conn) = self.conns.get_mut(&conn_id) {
|
||||||
// 1-RTT traffic.
|
|
||||||
conn.quiche.recv(src, info)?;
|
conn.quiche.recv(src, info)?;
|
||||||
|
|
||||||
// TODO is this needed here?
|
// TODO is this needed here?
|
||||||
|
@ -176,7 +174,8 @@ impl<T: app::App> Server<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
if hdr.ty != quiche::Type::Initial {
|
if hdr.ty != quiche::Type::Initial {
|
||||||
anyhow::bail!("unknown connection ID");
|
log::warn!("unknown connection ID");
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut dst = [0; MAX_DATAGRAM_SIZE];
|
let mut dst = [0; MAX_DATAGRAM_SIZE];
|
||||||
|
@ -222,11 +221,13 @@ impl<T: app::App> Server<T> {
|
||||||
// The token was not valid, meaning the retry failed, so
|
// The token was not valid, meaning the retry failed, so
|
||||||
// drop the packet.
|
// drop the packet.
|
||||||
if odcid.is_none() {
|
if odcid.is_none() {
|
||||||
anyhow::bail!("invalid token");
|
log::warn!("invalid token");
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if scid.len() != hdr.dcid.len() {
|
if scid.len() != hdr.dcid.len() {
|
||||||
anyhow::bail!("invalid connection ID");
|
log::warn!("invalid connection ID");
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reuse the source connection ID we sent in the Retry packet,
|
// Reuse the source connection ID we sent in the Retry packet,
|
||||||
|
@ -234,6 +235,8 @@ impl<T: app::App> Server<T> {
|
||||||
let conn_id = hdr.dcid.clone();
|
let conn_id = hdr.dcid.clone();
|
||||||
let local_addr = self.socket.local_addr().unwrap();
|
let local_addr = self.socket.local_addr().unwrap();
|
||||||
|
|
||||||
|
log::debug!("new connection: dcid={:?} scid={:?}", hdr.dcid, scid);
|
||||||
|
|
||||||
let mut conn =
|
let mut conn =
|
||||||
quiche::accept(&conn_id, odcid.as_ref(), local_addr, from, &mut self.quic)?;
|
quiche::accept(&conn_id, odcid.as_ref(), local_addr, from, &mut self.quic)?;
|
||||||
|
|
||||||
|
@ -246,13 +249,16 @@ impl<T: app::App> Server<T> {
|
||||||
app: T::default(),
|
app: T::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
self.conns
|
self.conns.insert(conn_id, user);
|
||||||
.insert(user.quiche.source_id().into_owned(), user);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn app(&mut self) -> anyhow::Result<()> {
|
pub fn app(&mut self) -> anyhow::Result<()> {
|
||||||
for conn in self.conns.values_mut() {
|
for conn in self.conns.values_mut() {
|
||||||
|
if conn.quiche.is_closed() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(session) = &mut conn.session {
|
if let Some(session) = &mut conn.session {
|
||||||
if let Err(e) = conn.app.poll(&mut conn.quiche, session) {
|
if let Err(e) = conn.app.poll(&mut conn.quiche, session) {
|
||||||
log::debug!("app error: {:?}", e);
|
log::debug!("app error: {:?}", e);
|
||||||
|
@ -271,23 +277,12 @@ impl<T: app::App> Server<T> {
|
||||||
// them on the UDP socket, until quiche reports that there are no more
|
// them on the UDP socket, until quiche reports that there are no more
|
||||||
// packets to be sent.
|
// packets to be sent.
|
||||||
pub fn send(&mut self) -> anyhow::Result<()> {
|
pub fn send(&mut self) -> anyhow::Result<()> {
|
||||||
let mut pkt = [0; MAX_DATAGRAM_SIZE];
|
|
||||||
|
|
||||||
for conn in self.conns.values_mut() {
|
for conn in self.conns.values_mut() {
|
||||||
loop {
|
let conn = &mut conn.quiche;
|
||||||
let (size, info) = match conn.quiche.send(&mut pkt) {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(quiche::Error::Done) => return Ok(()),
|
|
||||||
Err(e) => return Err(e.into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let pkt = &pkt[..size];
|
if let Err(e) = send_conn(&self.socket, conn) {
|
||||||
|
log::error!("{} send failed: {:?}", conn.trace_id(), e);
|
||||||
match self.socket.send_to(pkt, info.to) {
|
conn.close(false, 0x1, b"fail").ok();
|
||||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => break,
|
|
||||||
Err(err) => return Err(err.into()),
|
|
||||||
Ok(_) => (),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,6 +295,27 @@ impl<T: app::App> Server<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send any pending packets for the connection over the socket.
|
||||||
|
fn send_conn(socket: &mio::net::UdpSocket, conn: &mut quiche::Connection) -> anyhow::Result<()> {
|
||||||
|
let mut pkt = [0; MAX_DATAGRAM_SIZE];
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (size, info) = match conn.send(&mut pkt) {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(quiche::Error::Done) => return Ok(()),
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let pkt = &pkt[..size];
|
||||||
|
|
||||||
|
match socket.send_to(pkt, info.to) {
|
||||||
|
Err(e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(()),
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
Ok(_) => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Generate a stateless retry token.
|
/// Generate a stateless retry token.
|
||||||
///
|
///
|
||||||
/// The token includes the static string `"quiche"` followed by the IP address
|
/// The token includes the static string `"quiche"` followed by the IP address
|
||||||
|
|
Loading…
Reference in New Issue