Better read/write error messages (#75)
Still need to properly support encode/decode though. The problem there is that encode/decode uses AsyncRead, which means we get io::Error instead of quinn::ReadError and quinn::WriteError. The io::Error type is not clonable so we just can't use it, well unless it's wrapped in an Arc or something gross.
This commit is contained in:
parent
89f1bc430d
commit
eaa8abcdc6
|
@ -3,13 +3,17 @@ use thiserror::Error;
|
||||||
use crate::VarInt;
|
use crate::VarInt;
|
||||||
|
|
||||||
/// A MoQTransport error with an associated error code.
|
/// A MoQTransport error with an associated error code.
|
||||||
#[derive(Copy, Clone, Debug, Error)]
|
#[derive(Clone, Debug, Error)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
/// A clean termination, represented as error code 0.
|
/// A clean termination, represented as error code 0.
|
||||||
/// This error is automatically used when publishers or subscribers are dropped without calling close.
|
/// This error is automatically used when publishers or subscribers are dropped without calling close.
|
||||||
#[error("closed")]
|
#[error("closed")]
|
||||||
Closed,
|
Closed,
|
||||||
|
|
||||||
|
/// A session error occured.
|
||||||
|
#[error("session error: {0}")]
|
||||||
|
Session(#[from] webtransport_quinn::SessionError),
|
||||||
|
|
||||||
/// An ANNOUNCE_RESET or SUBSCRIBE_RESET was sent by the publisher.
|
/// An ANNOUNCE_RESET or SUBSCRIBE_RESET was sent by the publisher.
|
||||||
#[error("reset code={0:?}")]
|
#[error("reset code={0:?}")]
|
||||||
Reset(u32),
|
Reset(u32),
|
||||||
|
@ -31,16 +35,16 @@ pub enum Error {
|
||||||
Role(VarInt),
|
Role(VarInt),
|
||||||
|
|
||||||
/// An error occured while reading from the QUIC stream.
|
/// An error occured while reading from the QUIC stream.
|
||||||
#[error("failed to read from stream")]
|
#[error("failed to read from stream: {0}")]
|
||||||
Read,
|
Read(#[from] webtransport_quinn::ReadError),
|
||||||
|
|
||||||
/// An error occured while writing to the QUIC stream.
|
/// An error occured while writing to the QUIC stream.
|
||||||
#[error("failed to write to stream")]
|
#[error("failed to write to stream: {0}")]
|
||||||
Write,
|
Write(#[from] webtransport_quinn::WriteError),
|
||||||
|
|
||||||
/// An unclassified error because I'm lazy. TODO classify these errors
|
/// An unclassified error because I'm lazy. TODO classify these errors
|
||||||
#[error("unknown error")]
|
#[error("unknown error: {0}")]
|
||||||
Unknown,
|
Unknown(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Error {
|
impl Error {
|
||||||
|
@ -53,9 +57,10 @@ impl Error {
|
||||||
Self::NotFound => 404,
|
Self::NotFound => 404,
|
||||||
Self::Role(_) => 405,
|
Self::Role(_) => 405,
|
||||||
Self::Duplicate => 409,
|
Self::Duplicate => 409,
|
||||||
Self::Unknown => 500,
|
Self::Unknown(_) => 500,
|
||||||
Self::Write => 501,
|
Self::Write(_) => 501,
|
||||||
Self::Read => 502,
|
Self::Read(_) => 502,
|
||||||
|
Self::Session(_) => 503,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,10 +72,11 @@ impl Error {
|
||||||
Self::Stop => "stop",
|
Self::Stop => "stop",
|
||||||
Self::NotFound => "not found",
|
Self::NotFound => "not found",
|
||||||
Self::Duplicate => "duplicate",
|
Self::Duplicate => "duplicate",
|
||||||
Self::Role(_msg) => "role violation",
|
Self::Role(_) => "role violation",
|
||||||
Self::Unknown => "unknown",
|
Self::Read(_) => "read error",
|
||||||
Self::Read => "read error",
|
Self::Write(_) => "write error",
|
||||||
Self::Write => "write error",
|
Self::Session(_) => "session error",
|
||||||
|
Self::Unknown(_) => "unknown",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ impl State {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert(&mut self, track: track::Subscriber) -> Result<(), Error> {
|
pub fn insert(&mut self, track: track::Subscriber) -> Result<(), Error> {
|
||||||
self.closed?;
|
self.closed.clone()?;
|
||||||
|
|
||||||
match self.tracks.entry(track.name.clone()) {
|
match self.tracks.entry(track.name.clone()) {
|
||||||
hash_map::Entry::Occupied(_) => return Err(Error::Duplicate),
|
hash_map::Entry::Occupied(_) => return Err(Error::Duplicate),
|
||||||
|
@ -56,7 +56,7 @@ impl State {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn request(&mut self, name: &str) -> Result<track::Subscriber, Error> {
|
pub fn request(&mut self, name: &str) -> Result<track::Subscriber, Error> {
|
||||||
self.closed?;
|
self.closed.clone()?;
|
||||||
|
|
||||||
// Create a new track.
|
// Create a new track.
|
||||||
let (publisher, subscriber) = track::new(name);
|
let (publisher, subscriber) = track::new(name);
|
||||||
|
@ -76,7 +76,7 @@ impl State {
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.closed?;
|
self.closed.clone()?;
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,7 +86,7 @@ impl State {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn close(&mut self, err: Error) -> Result<(), Error> {
|
pub fn close(&mut self, err: Error) -> Result<(), Error> {
|
||||||
self.closed?;
|
self.closed.clone()?;
|
||||||
self.closed = Err(err);
|
self.closed = Err(err);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,7 @@ struct State {
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
pub fn close(&mut self, err: Error) -> Result<(), Error> {
|
pub fn close(&mut self, err: Error) -> Result<(), Error> {
|
||||||
self.closed?;
|
self.closed.clone()?;
|
||||||
self.closed = Err(err);
|
self.closed = Err(err);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ impl Publisher {
|
||||||
/// Write a new chunk of bytes.
|
/// Write a new chunk of bytes.
|
||||||
pub fn write_chunk(&mut self, data: Bytes) -> Result<(), Error> {
|
pub fn write_chunk(&mut self, data: Bytes) -> Result<(), Error> {
|
||||||
let mut state = self.state.lock_mut();
|
let mut state = self.state.lock_mut();
|
||||||
state.closed?;
|
state.closed.clone()?;
|
||||||
state.data.push(data);
|
state.data.push(data);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -167,9 +167,9 @@ impl Subscriber {
|
||||||
return Ok(Some(chunk));
|
return Ok(Some(chunk));
|
||||||
}
|
}
|
||||||
|
|
||||||
match state.closed {
|
match &state.closed {
|
||||||
Err(Error::Closed) => return Ok(None),
|
Err(Error::Closed) => return Ok(None),
|
||||||
Err(err) => return Err(err),
|
Err(err) => return Err(err.clone()),
|
||||||
Ok(()) => state.changed(),
|
Ok(()) => state.changed(),
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -54,13 +54,13 @@ struct State {
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
pub fn close(&mut self, err: Error) -> Result<(), Error> {
|
pub fn close(&mut self, err: Error) -> Result<(), Error> {
|
||||||
self.closed?;
|
self.closed.clone()?;
|
||||||
self.closed = Err(err);
|
self.closed = Err(err);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert(&mut self, segment: segment::Subscriber) -> Result<(), Error> {
|
pub fn insert(&mut self, segment: segment::Subscriber) -> Result<(), Error> {
|
||||||
self.closed?;
|
self.closed.clone()?;
|
||||||
|
|
||||||
let entry = match self.lookup.entry(segment.sequence) {
|
let entry = match self.lookup.entry(segment.sequence) {
|
||||||
indexmap::map::Entry::Occupied(_entry) => return Err(Error::Duplicate),
|
indexmap::map::Entry::Occupied(_entry) => return Err(Error::Duplicate),
|
||||||
|
@ -236,9 +236,9 @@ impl Subscriber {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise check if we need to return an error.
|
// Otherwise check if we need to return an error.
|
||||||
match state.closed {
|
match &state.closed {
|
||||||
Err(Error::Closed) => return Ok(None),
|
Err(Error::Closed) => return Ok(None),
|
||||||
Err(err) => return Err(err),
|
Err(err) => return Err(err.clone()),
|
||||||
Ok(()) => state.changed(),
|
Ok(()) => state.changed(),
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -24,12 +24,19 @@ impl Control {
|
||||||
pub async fn send<T: Into<Message> + fmt::Debug>(&self, msg: T) -> Result<(), Error> {
|
pub async fn send<T: Into<Message> + fmt::Debug>(&self, msg: T) -> Result<(), Error> {
|
||||||
let mut stream = self.send.lock().await;
|
let mut stream = self.send.lock().await;
|
||||||
log::info!("sending message: {:?}", msg);
|
log::info!("sending message: {:?}", msg);
|
||||||
msg.into().encode(&mut *stream).await.map_err(|_e| Error::Write)
|
msg.into()
|
||||||
|
.encode(&mut *stream)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::Unknown(e.to_string()))?;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// It's likely a mistake to call this from two different tasks, but it's easier to just support it.
|
// It's likely a mistake to call this from two different tasks, but it's easier to just support it.
|
||||||
pub async fn recv(&self) -> Result<Message, Error> {
|
pub async fn recv(&self) -> Result<Message, Error> {
|
||||||
let mut stream = self.recv.lock().await;
|
let mut stream = self.recv.lock().await;
|
||||||
Message::decode(&mut *stream).await.map_err(|_e| Error::Read)
|
let msg = Message::decode(&mut *stream)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::Unknown(e.to_string()))?;
|
||||||
|
Ok(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ impl Publisher {
|
||||||
}
|
}
|
||||||
// NOTE: this is not cancel safe, but it's fine since the other branch is a fatal error.
|
// NOTE: this is not cancel safe, but it's fine since the other branch is a fatal error.
|
||||||
msg = self.control.recv() => {
|
msg = self.control.recv() => {
|
||||||
let msg = msg.map_err(|_x| Error::Read)?;
|
let msg = msg?;
|
||||||
|
|
||||||
log::info!("message received: {:?}", msg);
|
log::info!("message received: {:?}", msg);
|
||||||
if let Err(err) = self.recv_message(&msg).await {
|
if let Err(err) = self.recv_message(&msg).await {
|
||||||
|
@ -166,15 +166,16 @@ impl Publisher {
|
||||||
|
|
||||||
log::debug!("serving object: {:?}", object);
|
log::debug!("serving object: {:?}", object);
|
||||||
|
|
||||||
let mut stream = self.webtransport.open_uni().await.map_err(|_e| Error::Write)?;
|
let mut stream = self.webtransport.open_uni().await?;
|
||||||
|
|
||||||
stream.set_priority(object.priority).ok();
|
stream.set_priority(object.priority).ok();
|
||||||
|
|
||||||
// TODO better handle the error.
|
object
|
||||||
object.encode(&mut stream).await.map_err(|_e| Error::Write)?;
|
.encode(&mut stream)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::Unknown(e.to_string()))?;
|
||||||
|
|
||||||
while let Some(data) = segment.read_chunk().await? {
|
while let Some(data) = segment.read_chunk().await? {
|
||||||
stream.write_chunk(data).await.map_err(|_e| Error::Write)?;
|
stream.write_chunk(data).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -62,7 +62,7 @@ impl Subscriber {
|
||||||
|
|
||||||
async fn run_inbound(mut self) -> Result<(), Error> {
|
async fn run_inbound(mut self) -> Result<(), Error> {
|
||||||
loop {
|
loop {
|
||||||
let msg = self.control.recv().await.map_err(|_e| Error::Read)?;
|
let msg = self.control.recv().await?;
|
||||||
|
|
||||||
log::info!("message received: {:?}", msg);
|
log::info!("message received: {:?}", msg);
|
||||||
if let Err(err) = self.recv_message(&msg).await {
|
if let Err(err) = self.recv_message(&msg).await {
|
||||||
|
@ -95,7 +95,7 @@ impl Subscriber {
|
||||||
async fn run_streams(self) -> Result<(), Error> {
|
async fn run_streams(self) -> Result<(), Error> {
|
||||||
loop {
|
loop {
|
||||||
// Accept all incoming unidirectional streams.
|
// Accept all incoming unidirectional streams.
|
||||||
let stream = self.webtransport.accept_uni().await.map_err(|_| Error::Read)?;
|
let stream = self.webtransport.accept_uni().await?;
|
||||||
let this = self.clone();
|
let this = self.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
@ -108,7 +108,9 @@ impl Subscriber {
|
||||||
|
|
||||||
async fn run_stream(self, mut stream: RecvStream) -> Result<(), Error> {
|
async fn run_stream(self, mut stream: RecvStream) -> Result<(), Error> {
|
||||||
// Decode the object on the data stream.
|
// Decode the object on the data stream.
|
||||||
let object = message::Object::decode(&mut stream).await.map_err(|_| Error::Read)?;
|
let object = message::Object::decode(&mut stream)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::Unknown(e.to_string()))?;
|
||||||
|
|
||||||
log::debug!("received object: {:?}", object);
|
log::debug!("received object: {:?}", object);
|
||||||
|
|
||||||
|
@ -124,7 +126,7 @@ impl Subscriber {
|
||||||
})?
|
})?
|
||||||
};
|
};
|
||||||
|
|
||||||
while let Some(data) = stream.read_chunk(usize::MAX, true).await.map_err(|_| Error::Read)? {
|
while let Some(data) = stream.read_chunk(usize::MAX, true).await? {
|
||||||
publisher.write_chunk(data.bytes)?;
|
publisher.write_chunk(data.bytes)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue