diff --git a/Cargo.lock b/Cargo.lock index 75120d5..79f3b45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -967,6 +967,7 @@ name = "moq-warp" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "log", "moq-transport", "moq-transport-quinn", diff --git a/moq-transport-quinn/src/control.rs b/moq-transport-quinn/src/control.rs index 5dd3f84..b70578f 100644 --- a/moq-transport-quinn/src/control.rs +++ b/moq-transport-quinn/src/control.rs @@ -1,11 +1,10 @@ -use anyhow::Context; use moq_transport::{Decode, DecodeError, Encode, Message}; -use bytes::{Buf, BufMut, BytesMut}; +use bytes::{Buf, BytesMut}; use std::io::Cursor; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::{io::AsyncReadExt, sync::Mutex}; use webtransport_quinn::{RecvStream, SendStream}; @@ -86,8 +85,7 @@ impl RecvControl { } Err(DecodeError::UnexpectedEnd) => { // The decode failed, so we need to append more data. - let chunk = self.stream.read_chunk(1024, true).await?.context("stream closed")?; - self.buf.put(chunk.bytes); + self.stream.read_buf(&mut self.buf).await?; } Err(e) => return Err(e.into()), } diff --git a/moq-transport-quinn/src/object.rs b/moq-transport-quinn/src/object.rs index bac4716..d348fe5 100644 --- a/moq-transport-quinn/src/object.rs +++ b/moq-transport-quinn/src/object.rs @@ -1,12 +1,12 @@ use std::{collections::BinaryHeap, io::Cursor, sync::Arc}; use anyhow::Context; -use bytes::BytesMut; +use bytes::{Buf, BytesMut}; use moq_transport::{Decode, DecodeError, Encode, Object}; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::Mutex; use tokio::task::JoinSet; -use tokio::{io::AsyncBufReadExt, sync::Mutex}; use webtransport_quinn::Session; use crate::{RecvStream, SendStream, SendStreamOrder}; @@ -83,6 +83,8 @@ impl SendObjectsInner { header.encode(&mut self.buf).unwrap(); stream.write_all(&self.buf).await.context("failed to write header")?; + // log::info!("created stream: {:?}", header); + Ok(stream) } } @@ -117,18 +119,15 @@ impl RecvObjects { } } - async fn read(stream: webtransport_quinn::RecvStream) -> anyhow::Result<(Object, RecvStream)> { - let mut stream = RecvStream::new(stream); + async fn read(mut stream: webtransport_quinn::RecvStream) -> anyhow::Result<(Object, RecvStream)> { + let mut buf = BytesMut::new(); loop { // Read more data into the buffer. - let data = stream.fill_buf().await?; - if data.is_empty() { - anyhow::bail!("stream closed before reading header"); - } + stream.read_buf(&mut buf).await?; // Use a cursor to read the buffer and remember how much we read. - let mut read = Cursor::new(data); + let mut read = Cursor::new(&mut buf); let header = match Object::decode(&mut read) { Ok(header) => header, @@ -136,10 +135,14 @@ impl RecvObjects { Err(err) => return Err(err.into()), }; - // We parsed a full header, advance the cursor. - // The borrow checker requires these on separate lines. + // We parsed a full header, advance the buffer. let size = read.position() as usize; - stream.consume(size); + buf.advance(size); + let buf = buf.freeze(); + + // log::info!("received stream: {:?}", header); + + let stream = RecvStream::new(buf, stream); return Ok((header, stream)); } diff --git a/moq-transport-quinn/src/stream.rs b/moq-transport-quinn/src/stream.rs index e5561ce..2063b6d 100644 --- a/moq-transport-quinn/src/stream.rs +++ b/moq-transport-quinn/src/stream.rs @@ -1,12 +1,12 @@ use std::{ io, - ops::{Deref, DerefMut}, - pin::Pin, + pin::{pin, Pin}, sync::{Arc, Mutex, Weak}, - task, + task::{self, Poll}, }; -use tokio::io::{AsyncWrite, BufReader}; +use bytes::{BufMut, Bytes}; +use tokio::io::{AsyncRead, AsyncWrite}; // Ugh, so we need to wrap SendStream with a mutex because we need to be able to call set_priority on it. // The problem is that set_priority takes a i32, while send_order is a VarInt @@ -83,33 +83,33 @@ impl AsyncWrite for SendStream { } // Unfortunately, we need to wrap RecvStream with a buffer since moq-transport::Coding only supports buffered reads. -// TODO support unbuffered reads so we only read the MoQ header and then hand off the stream. -// NOTE: We can't use AsyncRead::chain because we need to get the inner stream for stop. +// We first serve any data in the buffer, then we poll the stream. pub struct RecvStream { - stream: BufReader, + buf: Bytes, + stream: webtransport_quinn::RecvStream, } impl RecvStream { - pub(crate) fn new(stream: webtransport_quinn::RecvStream) -> Self { - let stream = BufReader::new(stream); - Self { stream } + pub(crate) fn new(buf: Bytes, stream: webtransport_quinn::RecvStream) -> Self { + Self { buf, stream } } - pub fn stop(self, code: u32) { - self.stream.into_inner().stop(code).ok(); + pub fn stop(&mut self, code: u32) { + self.stream.stop(code).ok(); } } -impl Deref for RecvStream { - type Target = BufReader; - - fn deref(&self) -> &Self::Target { - &self.stream - } -} - -impl DerefMut for RecvStream { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.stream +impl AsyncRead for RecvStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + if !self.buf.is_empty() { + buf.put(&mut pin!(self).buf); + Poll::Ready(Ok(())) + } else { + Pin::new(&mut self.stream).poll_read(cx, buf) + } } } diff --git a/moq-warp/Cargo.toml b/moq-warp/Cargo.toml index 81c42ff..d7cf872 100644 --- a/moq-warp/Cargo.toml +++ b/moq-warp/Cargo.toml @@ -1,15 +1,15 @@ [package] name = "moq-warp" description = "Media over QUIC" -authors = [ "Luke Curley" ] +authors = ["Luke Curley"] repository = "https://github.com/kixelated/moq-rs" license = "MIT OR Apache-2.0" version = "0.1.0" edition = "2021" -keywords = [ "quic", "http3", "webtransport", "media", "live" ] -categories = [ "multimedia", "network-programming", "web-programming" ] +keywords = ["quic", "http3", "webtransport", "media", "live"] +categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -20,7 +20,8 @@ moq-transport-quinn = { path = "../moq-transport-quinn" } tokio = "1.27" anyhow = "1.0.70" -log = "0.4" # TODO remove +log = "0.4" # TODO remove +bytes = "1.4" # QUIC stuff quinn = "0.10" diff --git a/moq-warp/src/model/fragment.rs b/moq-warp/src/model/fragment.rs index aaa4a48..c6a6aa4 100644 --- a/moq-warp/src/model/fragment.rs +++ b/moq-warp/src/model/fragment.rs @@ -1,10 +1,5 @@ use super::watch; -use std::sync::Arc; +use bytes::Bytes; -// Use Arc to avoid cloning the data for each subscriber. -pub type Shared = Arc>; - -// TODO combine fragments into the same buffer, instead of separate buffers. - -pub type Publisher = watch::Publisher; -pub type Subscriber = watch::Subscriber; +pub type Publisher = watch::Publisher; +pub type Subscriber = watch::Subscriber; diff --git a/moq-warp/src/model/segment.rs b/moq-warp/src/model/segment.rs index 7fb6533..685b7c5 100644 --- a/moq-warp/src/model/segment.rs +++ b/moq-warp/src/model/segment.rs @@ -1,5 +1,6 @@ -use super::{fragment, watch}; +use super::watch; +use bytes::Bytes; use moq_transport::VarInt; use std::ops::Deref; use std::sync::Arc; @@ -21,7 +22,7 @@ pub struct Publisher { pub info: Arc, // A list of fragments that make up the segment. - pub fragments: watch::Publisher, + pub fragments: watch::Publisher, } impl Publisher { @@ -53,7 +54,7 @@ pub struct Subscriber { pub info: Arc, // A list of fragments that make up the segment. - pub fragments: watch::Subscriber, + pub fragments: watch::Subscriber, } impl Deref for Subscriber { diff --git a/moq-warp/src/relay/contribute.rs b/moq-warp/src/relay/contribute.rs index 0b61fdc..c443723 100644 --- a/moq-warp/src/relay/contribute.rs +++ b/moq-warp/src/relay/contribute.rs @@ -2,13 +2,15 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time; -use tokio::io::AsyncBufReadExt; +use tokio::io::AsyncReadExt; use tokio::sync::mpsc; use tokio::task::JoinSet; // lock across await boundaries use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; use moq_transport_quinn::{RecvObjects, RecvStream}; +use bytes::BytesMut; + use anyhow::Context; use super::{broker, control}; @@ -114,16 +116,17 @@ impl Session { } async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> { + let mut buf = BytesMut::new(); + loop { - let buf = stream.fill_buf().await?; - if buf.is_empty() { + let size = stream.read_buf(&mut buf).await?; + if size == 0 { return Ok(()); } - let chunk = buf.to_vec(); - stream.consume(chunk.len()); - - segment.fragments.push(chunk.into()) + // Split off the data we read into the buffer, freezing it so multiple threads can read simitaniously. + let data = buf.split().freeze(); + segment.fragments.push(data); } } diff --git a/moq-warp/src/relay/distribute.rs b/moq-warp/src/relay/distribute.rs index f976414..33fa6f4 100644 --- a/moq-warp/src/relay/distribute.rs +++ b/moq-warp/src/relay/distribute.rs @@ -168,8 +168,8 @@ impl Session { let mut stream = objects.open(object).await?; // Write each fragment as they are available. - while let Some(fragment) = segment.fragments.next().await { - stream.write_all(fragment.as_slice()).await?; + while let Some(mut fragment) = segment.fragments.next().await { + stream.write_all_buf(&mut fragment).await?; } // NOTE: stream is automatically closed when dropped