diff --git a/moq-transport/src/object/stream.rs b/moq-transport/src/object/stream.rs deleted file mode 100644 index 911ebba..0000000 --- a/moq-transport/src/object/stream.rs +++ /dev/null @@ -1,103 +0,0 @@ -use std::{ - sync::{Arc, Mutex, Weak}, - task::{Context, Poll}, -}; - -use bytes::{Buf, BufMut, Bytes}; - -use webtransport_generic::RecvStream as GenericRecvStream; -use webtransport_generic::SendStream as GenericSendStream; - -// Ugh, so we need to wrap SendStream with a mutex because we need to be able to call set_priority on it. -// The problem is that set_priority takes a i32, while send_order is a VarInt -// So the solution is to maintain a priority queue of active streams and constantly update the priority with their index. -// So the library might update the priority of the stream at any point, while the application might similtaniously write to it. -pub struct SendStream -where - S: GenericSendStream, -{ - // All SendStream methods are &mut, so we need to wrap them with an internal mutex. - inner: Arc>, -} - -impl SendStream -where - S: GenericSendStream, -{ - pub(crate) fn new(stream: S) -> Self { - Self { - inner: Arc::new(Mutex::new(stream)), - } - } - - pub fn weak(&self) -> Weak> { - Arc::>::downgrade(&self.inner) - } -} - -impl GenericSendStream for SendStream -where - S: GenericSendStream, -{ - type Error = S::Error; - - fn poll_send(&mut self, cx: &mut Context<'_>, buf: &mut B) -> Poll> { - self.inner.lock().unwrap().poll_send(cx, buf) - } - - fn reset(&mut self, reset_code: u32) { - self.inner.lock().unwrap().reset(reset_code) - } - - // The application should NOT use this method. - // The library will automatically set the stream priority on creation based on the header. - fn set_priority(&mut self, order: i32) { - self.inner.lock().unwrap().set_priority(order) - } -} - -// Unfortunately, we need to wrap RecvStream with a buffer since moq-transport::Coding only supports buffered reads. -// We first serve any data in the buffer, then we poll the stream. -// TODO fix this so we don't need the wrapper. -pub struct RecvStream -where - R: GenericRecvStream, -{ - buf: Bytes, - stream: R, -} - -impl RecvStream -where - R: GenericRecvStream, -{ - pub(crate) fn new(buf: Bytes, stream: R) -> Self { - Self { buf, stream } - } - - pub fn stop(&mut self, code: u32) { - self.stream.stop(code) - } -} - -impl GenericRecvStream for RecvStream -where - R: GenericRecvStream, -{ - type Error = R::Error; - - fn poll_recv(&mut self, cx: &mut Context<'_>, buf: &mut B) -> Poll, Self::Error>> { - if !self.buf.is_empty() { - let size = self.buf.len(); - buf.put(&mut self.buf); - let size = size - self.buf.len(); - Poll::Ready(Ok(Some(size))) - } else { - self.stream.poll_recv(cx, buf) - } - } - - fn stop(&mut self, error_code: u32) { - self.stream.stop(error_code) - } -}