diff --git a/moq-transport/src/error.rs b/moq-transport/src/error.rs index e147e23..56238b5 100644 --- a/moq-transport/src/error.rs +++ b/moq-transport/src/error.rs @@ -3,13 +3,17 @@ use thiserror::Error; use crate::VarInt; /// A MoQTransport error with an associated error code. -#[derive(Copy, Clone, Debug, Error)] +#[derive(Clone, Debug, Error)] pub enum Error { /// A clean termination, represented as error code 0. /// This error is automatically used when publishers or subscribers are dropped without calling close. #[error("closed")] Closed, + /// A session error occured. + #[error("session error: {0}")] + Session(#[from] webtransport_quinn::SessionError), + /// An ANNOUNCE_RESET or SUBSCRIBE_RESET was sent by the publisher. #[error("reset code={0:?}")] Reset(u32), @@ -31,16 +35,16 @@ pub enum Error { Role(VarInt), /// An error occured while reading from the QUIC stream. - #[error("failed to read from stream")] - Read, + #[error("failed to read from stream: {0}")] + Read(#[from] webtransport_quinn::ReadError), /// An error occured while writing to the QUIC stream. - #[error("failed to write to stream")] - Write, + #[error("failed to write to stream: {0}")] + Write(#[from] webtransport_quinn::WriteError), /// An unclassified error because I'm lazy. TODO classify these errors - #[error("unknown error")] - Unknown, + #[error("unknown error: {0}")] + Unknown(String), } impl Error { @@ -53,9 +57,10 @@ impl Error { Self::NotFound => 404, Self::Role(_) => 405, Self::Duplicate => 409, - Self::Unknown => 500, - Self::Write => 501, - Self::Read => 502, + Self::Unknown(_) => 500, + Self::Write(_) => 501, + Self::Read(_) => 502, + Self::Session(_) => 503, } } @@ -67,10 +72,11 @@ impl Error { Self::Stop => "stop", Self::NotFound => "not found", Self::Duplicate => "duplicate", - Self::Role(_msg) => "role violation", - Self::Unknown => "unknown", - Self::Read => "read error", - Self::Write => "write error", + Self::Role(_) => "role violation", + Self::Read(_) => "read error", + Self::Write(_) => "write error", + Self::Session(_) => "session error", + Self::Unknown(_) => "unknown", } } } diff --git a/moq-transport/src/model/broadcast.rs b/moq-transport/src/model/broadcast.rs index 652f105..316b7aa 100644 --- a/moq-transport/src/model/broadcast.rs +++ b/moq-transport/src/model/broadcast.rs @@ -45,7 +45,7 @@ impl State { } pub fn insert(&mut self, track: track::Subscriber) -> Result<(), Error> { - self.closed?; + self.closed.clone()?; match self.tracks.entry(track.name.clone()) { hash_map::Entry::Occupied(_) => return Err(Error::Duplicate), @@ -56,7 +56,7 @@ impl State { } pub fn request(&mut self, name: &str) -> Result { - self.closed?; + self.closed.clone()?; // Create a new track. let (publisher, subscriber) = track::new(name); @@ -76,7 +76,7 @@ impl State { return Ok(true); } - self.closed?; + self.closed.clone()?; Ok(false) } @@ -86,7 +86,7 @@ impl State { } pub fn close(&mut self, err: Error) -> Result<(), Error> { - self.closed?; + self.closed.clone()?; self.closed = Err(err); Ok(()) } diff --git a/moq-transport/src/model/segment.rs b/moq-transport/src/model/segment.rs index d2db43a..b338ef7 100644 --- a/moq-transport/src/model/segment.rs +++ b/moq-transport/src/model/segment.rs @@ -50,7 +50,7 @@ struct State { impl State { pub fn close(&mut self, err: Error) -> Result<(), Error> { - self.closed?; + self.closed.clone()?; self.closed = Err(err); Ok(()) } @@ -99,7 +99,7 @@ impl Publisher { /// Write a new chunk of bytes. pub fn write_chunk(&mut self, data: Bytes) -> Result<(), Error> { let mut state = self.state.lock_mut(); - state.closed?; + state.closed.clone()?; state.data.push(data); Ok(()) } @@ -167,9 +167,9 @@ impl Subscriber { return Ok(Some(chunk)); } - match state.closed { + match &state.closed { Err(Error::Closed) => return Ok(None), - Err(err) => return Err(err), + Err(err) => return Err(err.clone()), Ok(()) => state.changed(), } }; diff --git a/moq-transport/src/model/track.rs b/moq-transport/src/model/track.rs index 02ad04a..9b146f5 100644 --- a/moq-transport/src/model/track.rs +++ b/moq-transport/src/model/track.rs @@ -54,13 +54,13 @@ struct State { impl State { pub fn close(&mut self, err: Error) -> Result<(), Error> { - self.closed?; + self.closed.clone()?; self.closed = Err(err); Ok(()) } pub fn insert(&mut self, segment: segment::Subscriber) -> Result<(), Error> { - self.closed?; + self.closed.clone()?; let entry = match self.lookup.entry(segment.sequence) { indexmap::map::Entry::Occupied(_entry) => return Err(Error::Duplicate), @@ -236,9 +236,9 @@ impl Subscriber { } // Otherwise check if we need to return an error. - match state.closed { + match &state.closed { Err(Error::Closed) => return Ok(None), - Err(err) => return Err(err), + Err(err) => return Err(err.clone()), Ok(()) => state.changed(), } }; diff --git a/moq-transport/src/session/control.rs b/moq-transport/src/session/control.rs index 65295a7..b981553 100644 --- a/moq-transport/src/session/control.rs +++ b/moq-transport/src/session/control.rs @@ -24,12 +24,19 @@ impl Control { pub async fn send + fmt::Debug>(&self, msg: T) -> Result<(), Error> { let mut stream = self.send.lock().await; log::info!("sending message: {:?}", msg); - msg.into().encode(&mut *stream).await.map_err(|_e| Error::Write) + msg.into() + .encode(&mut *stream) + .await + .map_err(|e| Error::Unknown(e.to_string()))?; + Ok(()) } // It's likely a mistake to call this from two different tasks, but it's easier to just support it. pub async fn recv(&self) -> Result { let mut stream = self.recv.lock().await; - Message::decode(&mut *stream).await.map_err(|_e| Error::Read) + let msg = Message::decode(&mut *stream) + .await + .map_err(|e| Error::Unknown(e.to_string()))?; + Ok(msg) } } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index bffdf6c..d277447 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -52,7 +52,7 @@ impl Publisher { } // NOTE: this is not cancel safe, but it's fine since the other branch is a fatal error. msg = self.control.recv() => { - let msg = msg.map_err(|_x| Error::Read)?; + let msg = msg?; log::info!("message received: {:?}", msg); if let Err(err) = self.recv_message(&msg).await { @@ -166,15 +166,16 @@ impl Publisher { log::debug!("serving object: {:?}", object); - let mut stream = self.webtransport.open_uni().await.map_err(|_e| Error::Write)?; - + let mut stream = self.webtransport.open_uni().await?; stream.set_priority(object.priority).ok(); - // TODO better handle the error. - object.encode(&mut stream).await.map_err(|_e| Error::Write)?; + object + .encode(&mut stream) + .await + .map_err(|e| Error::Unknown(e.to_string()))?; while let Some(data) = segment.read_chunk().await? { - stream.write_chunk(data).await.map_err(|_e| Error::Write)?; + stream.write_chunk(data).await?; } Ok(()) diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index f89f400..2accd6e 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -62,7 +62,7 @@ impl Subscriber { async fn run_inbound(mut self) -> Result<(), Error> { loop { - let msg = self.control.recv().await.map_err(|_e| Error::Read)?; + let msg = self.control.recv().await?; log::info!("message received: {:?}", msg); if let Err(err) = self.recv_message(&msg).await { @@ -95,7 +95,7 @@ impl Subscriber { async fn run_streams(self) -> Result<(), Error> { loop { // Accept all incoming unidirectional streams. - let stream = self.webtransport.accept_uni().await.map_err(|_| Error::Read)?; + let stream = self.webtransport.accept_uni().await?; let this = self.clone(); tokio::spawn(async move { @@ -108,7 +108,9 @@ impl Subscriber { async fn run_stream(self, mut stream: RecvStream) -> Result<(), Error> { // Decode the object on the data stream. - let object = message::Object::decode(&mut stream).await.map_err(|_| Error::Read)?; + let object = message::Object::decode(&mut stream) + .await + .map_err(|e| Error::Unknown(e.to_string()))?; log::debug!("received object: {:?}", object); @@ -124,7 +126,7 @@ impl Subscriber { })? }; - while let Some(data) = stream.read_chunk(usize::MAX, true).await.map_err(|_| Error::Read)? { + while let Some(data) = stream.read_chunk(usize::MAX, true).await? { publisher.write_chunk(data.bytes)?; }