diff --git a/moq-transport/src/model/segment.rs b/moq-transport/src/model/segment.rs index 9f21448..4e8bbe3 100644 --- a/moq-transport/src/model/segment.rs +++ b/moq-transport/src/model/segment.rs @@ -10,7 +10,7 @@ //! The segment is closed with [Error::Closed] when all publishers or subscribers are dropped. use core::fmt; use std::{ - future::{poll_fn, Future}, + future::poll_fn, io, ops::Deref, pin::Pin, @@ -21,7 +21,6 @@ use std::{ use crate::{Error, VarInt}; use bytes::{Bytes, BytesMut}; -use tokio::pin; use super::Watch; @@ -171,29 +170,29 @@ impl Subscriber { /// Check if there is a chunk available. pub fn poll_chunk(&mut self, cx: &mut Context<'_>) -> Poll, Error>> { + // If there's already buffered data, return it. if !self.buffer.is_empty() { let chunk = self.buffer.split().freeze(); return Poll::Ready(Ok(Some(chunk))); } + // Grab the lock and check if there's a new chunk available. let state = self.state.lock(); if self.index < state.data.len() { + // Yep, clone and return it. let chunk = state.data[self.index].clone(); self.index += 1; return Poll::Ready(Ok(Some(chunk))); } - let notify = match state.closed { + // Otherwise we wait until the state changes and try again. + match state.closed { Err(Error::Closed) => return Poll::Ready(Ok(None)), Err(err) => return Poll::Ready(Err(err)), - Ok(()) => state.changed(), // Wake up when the state changes + Ok(()) => state.waker(cx), // Wake us up when the state changes. }; - // Register our context with the notify waker. - pin!(notify); - let _ = notify.poll(cx); - Poll::Pending } @@ -227,7 +226,7 @@ impl tokio::io::AsyncRead for Subscriber { // No more data. Ok(None) => return Poll::Ready(Ok(())), - // TODO cast to io::Error + // Crudely cast to io::Error Err(err) => return Poll::Ready(Err(err.as_io())), }; diff --git a/moq-transport/src/model/watch.rs b/moq-transport/src/model/watch.rs index 93c8475..882d2f4 100644 --- a/moq-transport/src/model/watch.rs +++ b/moq-transport/src/model/watch.rs @@ -108,6 +108,11 @@ impl<'a, T> WatchRef<'a, T> { } } + // Release the lock and provide a context to wake when next updated. + pub fn waker(mut self, cx: &mut task::Context<'_>) { + self.lock.register(cx.waker()); + } + // Upgrade to a mutable references that automatically calls notify on drop. pub fn into_mut(self) -> WatchMut<'a, T> { WatchMut { lock: self.lock }