Fix the buffering used for parsing. (#50)

fill_buf didn't work like I expected. This code is much better anyway.
This commit is contained in:
kixelated 2023-08-02 11:41:28 -07:00 committed by GitHub
parent 0e239935a6
commit 3a65873055
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 67 additions and 65 deletions

1
Cargo.lock generated
View File

@ -967,6 +967,7 @@ name = "moq-warp"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"log",
"moq-transport",
"moq-transport-quinn",

View File

@ -1,11 +1,10 @@
use anyhow::Context;
use moq_transport::{Decode, DecodeError, Encode, Message};
use bytes::{Buf, BufMut, BytesMut};
use bytes::{Buf, BytesMut};
use std::io::Cursor;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::{io::AsyncReadExt, sync::Mutex};
use webtransport_quinn::{RecvStream, SendStream};
@ -86,8 +85,7 @@ impl RecvControl {
}
Err(DecodeError::UnexpectedEnd) => {
// The decode failed, so we need to append more data.
let chunk = self.stream.read_chunk(1024, true).await?.context("stream closed")?;
self.buf.put(chunk.bytes);
self.stream.read_buf(&mut self.buf).await?;
}
Err(e) => return Err(e.into()),
}

View File

@ -1,12 +1,12 @@
use std::{collections::BinaryHeap, io::Cursor, sync::Arc};
use anyhow::Context;
use bytes::BytesMut;
use bytes::{Buf, BytesMut};
use moq_transport::{Decode, DecodeError, Encode, Object};
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tokio::{io::AsyncBufReadExt, sync::Mutex};
use webtransport_quinn::Session;
use crate::{RecvStream, SendStream, SendStreamOrder};
@ -83,6 +83,8 @@ impl SendObjectsInner {
header.encode(&mut self.buf).unwrap();
stream.write_all(&self.buf).await.context("failed to write header")?;
// log::info!("created stream: {:?}", header);
Ok(stream)
}
}
@ -117,18 +119,15 @@ impl RecvObjects {
}
}
async fn read(stream: webtransport_quinn::RecvStream) -> anyhow::Result<(Object, RecvStream)> {
let mut stream = RecvStream::new(stream);
async fn read(mut stream: webtransport_quinn::RecvStream) -> anyhow::Result<(Object, RecvStream)> {
let mut buf = BytesMut::new();
loop {
// Read more data into the buffer.
let data = stream.fill_buf().await?;
if data.is_empty() {
anyhow::bail!("stream closed before reading header");
}
stream.read_buf(&mut buf).await?;
// Use a cursor to read the buffer and remember how much we read.
let mut read = Cursor::new(data);
let mut read = Cursor::new(&mut buf);
let header = match Object::decode(&mut read) {
Ok(header) => header,
@ -136,10 +135,14 @@ impl RecvObjects {
Err(err) => return Err(err.into()),
};
// We parsed a full header, advance the cursor.
// The borrow checker requires these on separate lines.
// We parsed a full header, advance the buffer.
let size = read.position() as usize;
stream.consume(size);
buf.advance(size);
let buf = buf.freeze();
// log::info!("received stream: {:?}", header);
let stream = RecvStream::new(buf, stream);
return Ok((header, stream));
}

View File

@ -1,12 +1,12 @@
use std::{
io,
ops::{Deref, DerefMut},
pin::Pin,
pin::{pin, Pin},
sync::{Arc, Mutex, Weak},
task,
task::{self, Poll},
};
use tokio::io::{AsyncWrite, BufReader};
use bytes::{BufMut, Bytes};
use tokio::io::{AsyncRead, AsyncWrite};
// Ugh, so we need to wrap SendStream with a mutex because we need to be able to call set_priority on it.
// The problem is that set_priority takes a i32, while send_order is a VarInt
@ -83,33 +83,33 @@ impl AsyncWrite for SendStream {
}
// Unfortunately, we need to wrap RecvStream with a buffer since moq-transport::Coding only supports buffered reads.
// TODO support unbuffered reads so we only read the MoQ header and then hand off the stream.
// NOTE: We can't use AsyncRead::chain because we need to get the inner stream for stop.
// We first serve any data in the buffer, then we poll the stream.
pub struct RecvStream {
stream: BufReader<webtransport_quinn::RecvStream>,
buf: Bytes,
stream: webtransport_quinn::RecvStream,
}
impl RecvStream {
pub(crate) fn new(stream: webtransport_quinn::RecvStream) -> Self {
let stream = BufReader::new(stream);
Self { stream }
pub(crate) fn new(buf: Bytes, stream: webtransport_quinn::RecvStream) -> Self {
Self { buf, stream }
}
pub fn stop(self, code: u32) {
self.stream.into_inner().stop(code).ok();
pub fn stop(&mut self, code: u32) {
self.stream.stop(code).ok();
}
}
impl Deref for RecvStream {
type Target = BufReader<webtransport_quinn::RecvStream>;
fn deref(&self) -> &Self::Target {
&self.stream
impl AsyncRead for RecvStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if !self.buf.is_empty() {
buf.put(&mut pin!(self).buf);
Poll::Ready(Ok(()))
} else {
Pin::new(&mut self.stream).poll_read(cx, buf)
}
}
impl DerefMut for RecvStream {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.stream
}
}

View File

@ -21,6 +21,7 @@ moq-transport-quinn = { path = "../moq-transport-quinn" }
tokio = "1.27"
anyhow = "1.0.70"
log = "0.4" # TODO remove
bytes = "1.4"
# QUIC stuff
quinn = "0.10"

View File

@ -1,10 +1,5 @@
use super::watch;
use std::sync::Arc;
use bytes::Bytes;
// Use Arc to avoid cloning the data for each subscriber.
pub type Shared = Arc<Vec<u8>>;
// TODO combine fragments into the same buffer, instead of separate buffers.
pub type Publisher = watch::Publisher<Shared>;
pub type Subscriber = watch::Subscriber<Shared>;
pub type Publisher = watch::Publisher<Bytes>;
pub type Subscriber = watch::Subscriber<Bytes>;

View File

@ -1,5 +1,6 @@
use super::{fragment, watch};
use super::watch;
use bytes::Bytes;
use moq_transport::VarInt;
use std::ops::Deref;
use std::sync::Arc;
@ -21,7 +22,7 @@ pub struct Publisher {
pub info: Arc<Info>,
// A list of fragments that make up the segment.
pub fragments: watch::Publisher<fragment::Shared>,
pub fragments: watch::Publisher<Bytes>,
}
impl Publisher {
@ -53,7 +54,7 @@ pub struct Subscriber {
pub info: Arc<Info>,
// A list of fragments that make up the segment.
pub fragments: watch::Subscriber<fragment::Shared>,
pub fragments: watch::Subscriber<Bytes>,
}
impl Deref for Subscriber {

View File

@ -2,13 +2,15 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio::task::JoinSet; // lock across await boundaries
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
use moq_transport_quinn::{RecvObjects, RecvStream};
use bytes::BytesMut;
use anyhow::Context;
use super::{broker, control};
@ -114,16 +116,17 @@ impl Session {
}
async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> {
let mut buf = BytesMut::new();
loop {
let buf = stream.fill_buf().await?;
if buf.is_empty() {
let size = stream.read_buf(&mut buf).await?;
if size == 0 {
return Ok(());
}
let chunk = buf.to_vec();
stream.consume(chunk.len());
segment.fragments.push(chunk.into())
// Split off the data we read into the buffer, freezing it so multiple threads can read simitaniously.
let data = buf.split().freeze();
segment.fragments.push(data);
}
}

View File

@ -168,8 +168,8 @@ impl Session {
let mut stream = objects.open(object).await?;
// Write each fragment as they are available.
while let Some(fragment) = segment.fragments.next().await {
stream.write_all(fragment.as_slice()).await?;
while let Some(mut fragment) = segment.fragments.next().await {
stream.write_all_buf(&mut fragment).await?;
}
// NOTE: stream is automatically closed when dropped