Compare commits

...

4 Commits

Author SHA1 Message Date
Luke Curley d2ab194d03 Remove unused file. 2023-08-03 13:58:13 -07:00
Luke Curley 4e11e8bafc Actually make moq-transport generic.
pog
2023-08-03 13:57:33 -07:00
Luke Curley e79b37aa00 Remove unused var. 2023-08-02 11:28:53 -07:00
Luke Curley 975d6b2580 Fix the buffering used for parsing.
fill_buf didn't work like I expected. This code is much better anyway.
2023-08-02 11:25:41 -07:00
44 changed files with 806 additions and 665 deletions

29
Cargo.lock generated
View File

@ -919,7 +919,7 @@ dependencies = [
]
[[package]]
name = "moq-demo"
name = "moq-quinn"
version = "0.1.0"
dependencies = [
"anyhow",
@ -928,7 +928,6 @@ dependencies = [
"hex",
"log",
"moq-transport",
"moq-transport-quinn",
"moq-warp",
"quinn",
"ring",
@ -936,30 +935,19 @@ dependencies = [
"rustls-pemfile",
"tokio",
"warp",
"webtransport-generic",
"webtransport-quinn",
]
[[package]]
name = "moq-transport"
version = "0.1.0"
dependencies = [
"bytes",
"thiserror",
]
[[package]]
name = "moq-transport-quinn"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"http",
"log",
"moq-transport",
"quinn",
"thiserror",
"tokio",
"webtransport-quinn",
"webtransport-generic",
]
[[package]]
@ -967,14 +955,15 @@ name = "moq-warp"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"log",
"moq-transport",
"moq-transport-quinn",
"quinn",
"ring",
"rustls 0.21.2",
"rustls-pemfile",
"tokio",
"webtransport-generic",
]
[[package]]
@ -1912,19 +1901,13 @@ dependencies = [
[[package]]
name = "webtransport-generic"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ba4583e96bb0ef08142f868bf0d28f90211eced56a473768ee27446864a2310"
dependencies = [
"bytes",
"log",
"thiserror",
]
[[package]]
name = "webtransport-proto"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21fefb5728651d507b444659853b47896116179ea8fd0348d02de080250892c7"
dependencies = [
"bytes",
"http",
@ -1934,8 +1917,6 @@ dependencies = [
[[package]]
name = "webtransport-quinn"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96a4ac9975117d8c63c4d04577d594b3130fe2023b7363ebc613905acf98590a"
dependencies = [
"async-std",
"bytes",

View File

@ -1,7 +1,2 @@
[workspace]
members = [
"moq-transport",
"moq-transport-quinn",
"moq-demo",
"moq-warp",
]
members = ["moq-transport", "moq-quinn", "moq-warp"]

View File

@ -1,5 +1,5 @@
[package]
name = "moq-demo"
name = "moq-quinn"
description = "Media over QUIC"
authors = ["Luke Curley"]
repository = "https://github.com/kixelated/moq-rs"
@ -16,12 +16,12 @@ categories = ["multimedia", "network-programming", "web-programming"]
[dependencies]
moq-transport = { path = "../moq-transport" }
moq-transport-quinn = { path = "../moq-transport-quinn" }
moq-warp = { path = "../moq-warp" }
webtransport-generic = { path = "../../webtransport-rs/webtransport-generic", version = "0.3" }
# QUIC
quinn = "0.10"
webtransport-quinn = "0.4.2"
webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn", version = "0.4.2" }
# Crypto
ring = "0.16.20"

View File

@ -1,16 +1,15 @@
use moq_warp::relay::broker;
use std::{fs, io, net, path, sync, time};
use anyhow::Context;
use moq_warp::relay;
use tokio::task::JoinSet;
pub struct Server {
server: quinn::Endpoint,
// The media sources.
broker: broker::Broadcasts,
broker: relay::Broker,
// The active connections.
conns: JoinSet<anyhow::Result<()>>,
@ -62,7 +61,7 @@ impl Server {
server_config.transport = sync::Arc::new(transport_config);
let server = quinn::Endpoint::server(server_config, config.addr)?;
let broker = broker::Broadcasts::new();
let broker = relay::Broker::new();
let conns = JoinSet::new();
@ -88,7 +87,7 @@ impl Server {
}
}
async fn handle(conn: quinn::Connecting, broker: broker::Broadcasts) -> anyhow::Result<()> {
async fn handle(conn: quinn::Connecting, broker: relay::Broker) -> anyhow::Result<()> {
// Wait for the QUIC connection to be established.
let conn = conn.await.context("failed to establish QUIC connection")?;
@ -106,12 +105,12 @@ impl Server {
.context("failed to respond to WebTransport request")?;
// Perform the MoQ handshake.
let session = moq_transport_quinn::accept(session, moq_transport::Role::Both)
let session = moq_transport::Session::accept(session, moq_transport::setup::Role::Both)
.await
.context("failed to perform MoQ handshake")?;
// Run the relay code.
let session = moq_warp::relay::Session::new(session, broker);
let session = relay::Session::new(session, broker);
session.run().await
}
}

View File

@ -1,26 +0,0 @@
[package]
name = "moq-transport-quinn"
description = "Media over QUIC"
authors = ["Luke Curley"]
repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0"
version = "0.1.0"
edition = "2021"
keywords = ["quic", "http3", "webtransport", "media", "live"]
categories = ["multimedia", "network-programming", "web-programming"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
moq-transport = { path = "../moq-transport" }
quinn = "0.10"
http = "0.2"
webtransport-quinn = "0.4.2"
tokio = { version = "1.27", features = ["macros", "io-util"] }
bytes = "1"
log = "0.4"
anyhow = "1.0.70"
thiserror = "1.0.21"

View File

@ -1,96 +0,0 @@
use anyhow::Context;
use moq_transport::{Decode, DecodeError, Encode, Message};
use bytes::{Buf, BufMut, BytesMut};
use std::io::Cursor;
use std::sync::Arc;
use tokio::sync::Mutex;
use webtransport_quinn::{RecvStream, SendStream};
pub struct SendControl {
stream: SendStream,
buf: BytesMut, // reuse a buffer to encode messages.
}
impl SendControl {
pub fn new(stream: SendStream) -> Self {
Self {
buf: BytesMut::new(),
stream,
}
}
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
let msg = msg.into();
log::info!("sending message: {:?}", msg);
self.buf.clear();
msg.encode(&mut self.buf)?;
// TODO make this work with select!
self.stream.write_all(&self.buf).await?;
Ok(())
}
// Helper that lets multiple threads send control messages.
pub fn share(self) -> ControlShared {
ControlShared {
stream: Arc::new(Mutex::new(self)),
}
}
}
// Helper that allows multiple threads to send control messages.
// There's no equivalent for receiving since only one thread should be receiving at a time.
#[derive(Clone)]
pub struct ControlShared {
stream: Arc<Mutex<SendControl>>,
}
impl ControlShared {
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
let mut stream = self.stream.lock().await;
stream.send(msg).await
}
}
pub struct RecvControl {
stream: RecvStream,
buf: BytesMut, // data we've read but haven't fully decoded yet
}
impl RecvControl {
pub fn new(stream: RecvStream) -> Self {
Self {
buf: BytesMut::new(),
stream,
}
}
// Read the next full message from the stream.
pub async fn recv(&mut self) -> anyhow::Result<Message> {
loop {
// Read the contents of the buffer
let mut peek = Cursor::new(&self.buf);
match Message::decode(&mut peek) {
Ok(msg) => {
// We've successfully decoded a message, so we can advance the buffer.
self.buf.advance(peek.position() as usize);
log::info!("received message: {:?}", msg);
return Ok(msg);
}
Err(DecodeError::UnexpectedEnd) => {
// The decode failed, so we need to append more data.
let chunk = self.stream.read_chunk(1024, true).await?.context("stream closed")?;
self.buf.put(chunk.bytes);
}
Err(e) => return Err(e.into()),
}
}
}
}

View File

@ -1,9 +0,0 @@
mod control;
mod object;
mod session;
mod stream;
pub use control::*;
pub use object::*;
pub use session::*;
pub use stream::*;

View File

@ -1,147 +0,0 @@
use std::{collections::BinaryHeap, io::Cursor, sync::Arc};
use anyhow::Context;
use bytes::BytesMut;
use moq_transport::{Decode, DecodeError, Encode, Object};
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
use tokio::{io::AsyncBufReadExt, sync::Mutex};
use webtransport_quinn::Session;
use crate::{RecvStream, SendStream, SendStreamOrder};
// Allow this to be cloned so we can have multiple senders.
#[derive(Clone)]
pub struct SendObjects {
// This is a tokio mutex since we need to lock across await boundaries.
inner: Arc<Mutex<SendObjectsInner>>,
}
impl SendObjects {
pub fn new(session: Session) -> Self {
let inner = SendObjectsInner::new(session);
Self {
inner: Arc::new(Mutex::new(inner)),
}
}
pub async fn open(&mut self, header: Object) -> anyhow::Result<SendStream> {
let mut inner = self.inner.lock().await;
inner.open(header).await
}
}
struct SendObjectsInner {
session: Session,
// Quinn supports a i32 for priority, but the wire format is a u64.
// Our work around is to keep a list of streams in priority order and use the index as the priority.
// This involves more work, so TODO either increase the Quinn size or reduce the wire size.
ordered: BinaryHeap<SendStreamOrder>,
ordered_swap: BinaryHeap<SendStreamOrder>, // reuse memory to avoid allocations
// A reusable buffer for encoding headers.
// TODO figure out how to use BufMut on the stack and remove this.
buf: BytesMut,
}
impl SendObjectsInner {
fn new(session: Session) -> Self {
Self {
session,
ordered: BinaryHeap::new(),
ordered_swap: BinaryHeap::new(),
buf: BytesMut::new(),
}
}
pub async fn open(&mut self, header: Object) -> anyhow::Result<SendStream> {
let stream = self.session.open_uni().await.context("failed to open uni stream")?;
let (mut stream, priority) = SendStream::with_order(stream, header.send_order.into_inner());
// Add the priority to our existing list.
self.ordered.push(priority);
// Loop through the list and update the priorities of any still active streams.
let mut index = 0;
while let Some(stream) = self.ordered.pop() {
if stream.update(index).is_ok() {
// Add the stream to the new list so it'll be in sorted order.
self.ordered_swap.push(stream);
index += 1;
}
}
// Swap the lists so we can reuse the memory.
std::mem::swap(&mut self.ordered, &mut self.ordered_swap);
// Encode and write the stream header.
// TODO do this in SendStream so we don't hold the lock.
// Otherwise,
self.buf.clear();
header.encode(&mut self.buf).unwrap();
stream.write_all(&self.buf).await.context("failed to write header")?;
Ok(stream)
}
}
// Not clone, so we don't accidentally have two listners.
pub struct RecvObjects {
session: Session,
// Streams that we've accepted but haven't read the header from yet.
streams: JoinSet<anyhow::Result<(Object, RecvStream)>>,
}
impl RecvObjects {
pub fn new(session: Session) -> Self {
Self {
session,
streams: JoinSet::new(),
}
}
pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> {
loop {
tokio::select! {
res = self.session.accept_uni() => {
let stream = res.context("failed to accept stream")?;
self.streams.spawn(async move { Self::read(stream).await });
},
res = self.streams.join_next(), if !self.streams.is_empty() => {
return res.unwrap().context("failed to run join set")?;
}
}
}
}
async fn read(stream: webtransport_quinn::RecvStream) -> anyhow::Result<(Object, RecvStream)> {
let mut stream = RecvStream::new(stream);
loop {
// Read more data into the buffer.
let data = stream.fill_buf().await?;
if data.is_empty() {
anyhow::bail!("stream closed before reading header");
}
// Use a cursor to read the buffer and remember how much we read.
let mut read = Cursor::new(data);
let header = match Object::decode(&mut read) {
Ok(header) => header,
Err(DecodeError::UnexpectedEnd) => continue,
Err(err) => return Err(err.into()),
};
// We parsed a full header, advance the cursor.
// The borrow checker requires these on separate lines.
let size = read.position() as usize;
stream.consume(size);
return Ok((header, stream));
}
}
}

View File

@ -1,98 +0,0 @@
use anyhow::Context;
use moq_transport::{Message, SetupClient, SetupServer};
use super::{RecvControl, RecvObjects, SendControl, SendObjects};
/// Called by a server with an established WebTransport session.
// TODO close the session with an error code
pub async fn accept(session: webtransport_quinn::Session, role: moq_transport::Role) -> anyhow::Result<Session> {
let (send, recv) = session.accept_bi().await.context("failed to accept bidi stream")?;
let mut send_control = SendControl::new(send);
let mut recv_control = RecvControl::new(recv);
let setup_client = match recv_control.recv().await.context("failed to read SETUP")? {
Message::SetupClient(setup) => setup,
_ => anyhow::bail!("expected CLIENT SETUP"),
};
setup_client
.versions
.iter()
.find(|version| **version == moq_transport::Version::DRAFT_00)
.context("no supported versions")?;
if !setup_client.role.compatible(role) {
anyhow::bail!("incompatible roles: {:?} {:?}", setup_client.role, role);
}
let setup_server = SetupServer {
role,
version: moq_transport::Version::DRAFT_00,
};
send_control
.send(moq_transport::Message::SetupServer(setup_server))
.await
.context("failed to send setup server")?;
let send_objects = SendObjects::new(session.clone());
let recv_objects = RecvObjects::new(session.clone());
Ok(Session {
send_control,
recv_control,
send_objects,
recv_objects,
})
}
/// Called by a client with an established WebTransport session.
pub async fn connect(session: webtransport_quinn::Session, role: moq_transport::Role) -> anyhow::Result<Session> {
let (send, recv) = session.open_bi().await.context("failed to oen bidi stream")?;
let mut send_control = SendControl::new(send);
let mut recv_control = RecvControl::new(recv);
let setup_client = SetupClient {
role,
versions: vec![moq_transport::Version::DRAFT_00].into(),
path: "".to_string(),
};
send_control
.send(moq_transport::Message::SetupClient(setup_client))
.await
.context("failed to send SETUP CLIENT")?;
let setup_server = match recv_control.recv().await.context("failed to read SETUP")? {
Message::SetupServer(setup) => setup,
_ => anyhow::bail!("expected SERVER SETUP"),
};
if setup_server.version != moq_transport::Version::DRAFT_00 {
anyhow::bail!("unsupported version: {:?}", setup_server.version);
}
if !setup_server.role.compatible(role) {
anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role);
}
let send_objects = SendObjects::new(session.clone());
let recv_objects = RecvObjects::new(session.clone());
Ok(Session {
send_control,
recv_control,
send_objects,
recv_objects,
})
}
pub struct Session {
pub send_control: SendControl,
pub recv_control: RecvControl,
pub send_objects: SendObjects,
pub recv_objects: RecvObjects,
}

View File

@ -1,115 +0,0 @@
use std::{
io,
ops::{Deref, DerefMut},
pin::Pin,
sync::{Arc, Mutex, Weak},
task,
};
use tokio::io::{AsyncWrite, BufReader};
// Ugh, so we need to wrap SendStream with a mutex because we need to be able to call set_priority on it.
// The problem is that set_priority takes a i32, while send_order is a VarInt
// So the solution is to maintain a priority queue of active streams and constantly update the priority with their index.
// So the library might update the priority of the stream at any point, while the application might similtaniously write to it.
// The only upside is that we don't expose set_priority, so the application can't screw with things.
pub struct SendStream {
stream: Arc<Mutex<webtransport_quinn::SendStream>>,
}
impl SendStream {
// Create a new stream with the given order, returning a handle that allows us to update the priority.
pub(crate) fn with_order(stream: webtransport_quinn::SendStream, order: u64) -> (Self, SendStreamOrder) {
let stream = Arc::new(Mutex::new(stream));
let weak = Arc::<Mutex<webtransport_quinn::SendStream>>::downgrade(&stream);
(SendStream { stream }, SendStreamOrder { stream: weak, order })
}
}
pub(crate) struct SendStreamOrder {
// We use Weak here so we don't prevent the stream from being closed when dereferenced.
// update() will return an error if the stream was closed instead.
stream: Weak<Mutex<webtransport_quinn::SendStream>>,
order: u64,
}
impl SendStreamOrder {
pub(crate) fn update(&self, index: i32) -> Result<(), webtransport_quinn::StreamClosed> {
let stream = self.stream.upgrade().ok_or(webtransport_quinn::StreamClosed)?;
let mut stream = stream.lock().unwrap();
stream.set_priority(index)
}
}
impl PartialEq for SendStreamOrder {
fn eq(&self, other: &Self) -> bool {
self.order == other.order
}
}
impl Eq for SendStreamOrder {}
impl PartialOrd for SendStreamOrder {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
// We reverse the order so the lower send order is higher priority.
other.order.partial_cmp(&self.order)
}
}
impl Ord for SendStreamOrder {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// We reverse the order so the lower send order is higher priority.
other.order.cmp(&self.order)
}
}
// We implement AsyncWrite so we can grab the mutex on each write attempt, instead of holding it for the entire async function.
impl AsyncWrite for SendStream {
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> task::Poll<io::Result<usize>> {
let mut stream = self.stream.lock().unwrap();
Pin::new(&mut *stream).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<io::Result<()>> {
let mut stream = self.stream.lock().unwrap();
Pin::new(&mut *stream).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<io::Result<()>> {
let mut stream = self.stream.lock().unwrap();
Pin::new(&mut *stream).poll_shutdown(cx)
}
}
// Unfortunately, we need to wrap RecvStream with a buffer since moq-transport::Coding only supports buffered reads.
// TODO support unbuffered reads so we only read the MoQ header and then hand off the stream.
// NOTE: We can't use AsyncRead::chain because we need to get the inner stream for stop.
pub struct RecvStream {
stream: BufReader<webtransport_quinn::RecvStream>,
}
impl RecvStream {
pub(crate) fn new(stream: webtransport_quinn::RecvStream) -> Self {
let stream = BufReader::new(stream);
Self { stream }
}
pub fn stop(self, code: u32) {
self.stream.into_inner().stop(code).ok();
}
}
impl Deref for RecvStream {
type Target = BufReader<webtransport_quinn::RecvStream>;
fn deref(&self) -> &Self::Target {
&self.stream
}
}
impl DerefMut for RecvStream {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.stream
}
}

View File

@ -16,4 +16,7 @@ categories = ["multimedia", "network-programming", "web-programming"]
[dependencies]
bytes = "1"
thiserror = "1.0.21"
thiserror = "1"
anyhow = "1"
webtransport-generic = { path = "../../webtransport-rs/webtransport-generic", version = "0.3" }
tokio = { version = "1.27", features = ["macros", "io-util"] }

View File

@ -10,9 +10,6 @@ pub enum EncodeError {
#[error("varint too large")]
BoundsExceeded(#[from] BoundsExceeded),
#[error("unknown error")]
Unknown,
}
pub trait Encode: Sized {

View File

@ -1,7 +1,9 @@
mod coding;
mod control;
mod object;
pub mod message;
pub mod object;
pub mod session;
pub mod setup;
pub use coding::*;
pub use control::*;
pub use object::*;
pub use coding::VarInt;
pub use message::Message;
pub use session::Session;

View File

@ -2,27 +2,24 @@ mod announce;
mod announce_error;
mod announce_ok;
mod go_away;
mod role;
mod setup_client;
mod setup_server;
mod receiver;
mod sender;
mod subscribe;
mod subscribe_error;
mod subscribe_ok;
mod version;
pub use announce::*;
pub use announce_error::*;
pub use announce_ok::*;
pub use go_away::*;
pub use role::*;
pub use setup_client::*;
pub use setup_server::*;
pub use receiver::*;
pub use sender::*;
pub use subscribe::*;
pub use subscribe_error::*;
pub use subscribe_ok::*;
pub use version::*;
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use crate::setup;
use bytes::{Buf, BufMut};
use std::fmt;
@ -87,6 +84,10 @@ macro_rules! message_types {
}
}
// Just so we can use the macro above.
type SetupClient = setup::Client;
type SetupServer = setup::Server;
// Each message is prefixed with the given VarInt type.
message_types! {
// NOTE: Object and Setup are in other modules.

View File

@ -0,0 +1,49 @@
use crate::coding::{Decode, DecodeError};
use crate::message::Message;
use bytes::{Buf, BytesMut};
use std::io::Cursor;
use webtransport_generic::AsyncRecvStream;
pub struct Receiver<R>
where
R: AsyncRecvStream, // TODO take RecvStream instead
{
stream: R,
buf: BytesMut, // data we've read but haven't fully decoded yet
}
impl<R> Receiver<R>
where
R: AsyncRecvStream,
{
pub fn new(stream: R) -> Self {
Self {
buf: BytesMut::new(),
stream,
}
}
// Read the next full message from the stream.
pub async fn recv(&mut self) -> anyhow::Result<Message> {
loop {
// Read the contents of the buffer
let mut peek = Cursor::new(&self.buf);
match Message::decode(&mut peek) {
Ok(msg) => {
// We've successfully decoded a message, so we can advance the buffer.
self.buf.advance(peek.position() as usize);
return Ok(msg);
}
Err(DecodeError::UnexpectedEnd) => {
// The decode failed, so we need to append more data.
self.stream.recv(&mut self.buf).await?;
}
Err(e) => return Err(e.into()),
}
}
}
}

View File

@ -0,0 +1,68 @@
use crate::coding::Encode;
use crate::message::Message;
use bytes::BytesMut;
use webtransport_generic::AsyncSendStream;
pub struct Sender<S>
where
S: AsyncSendStream, // TODO take SendStream instead
{
stream: S,
buf: BytesMut, // reuse a buffer to encode messages.
}
impl<S> Sender<S>
where
S: AsyncSendStream,
{
pub fn new(stream: S) -> Self {
Self {
buf: BytesMut::new(),
stream,
}
}
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
let msg = msg.into();
self.buf.clear();
msg.encode(&mut self.buf)?;
self.stream.send(&mut self.buf).await?;
Ok(())
}
/*
// Helper that lets multiple threads send control messages.
pub fn share(self) -> ControlShared<S> {
ControlShared {
stream: Arc::new(Mutex::new(self)),
}
}
*/
}
/*
// Helper that allows multiple threads to send control messages.
// There's no equivalent for receiving since only one thread should be receiving at a time.
#[derive(Clone)]
pub struct SendControlShared<S>
where
S: AsyncSendStream,
{
stream: Arc<Mutex<SendControl<S>>>,
}
impl<S> SendControlShared<S>
where
S: AsyncSendStream,
{
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
let mut stream = self.stream.lock().await;
stream.send(msg).await
}
}
*/

View File

@ -1,9 +1,8 @@
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use bytes::{Buf, BufMut};
#[derive(Debug)]
pub struct Object {
pub struct Header {
// An ID for this track.
// Proposal: https://github.com/moq-wg/moq-transport/issues/209
pub track: VarInt,
@ -18,7 +17,7 @@ pub struct Object {
pub send_order: VarInt,
}
impl Decode for Object {
impl Decode for Header {
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let typ = VarInt::decode(r)?;
if typ.into_inner() != 0 {
@ -41,7 +40,7 @@ impl Decode for Object {
}
}
impl Encode for Object {
impl Encode for Header {
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
VarInt::from_u32(0).encode(w)?;
self.track.encode(w)?;

View File

@ -0,0 +1,7 @@
mod header;
mod receiver;
mod sender;
pub use header::*;
pub use receiver::*;
pub use sender::*;

View File

@ -0,0 +1,128 @@
use std::io::Cursor;
use std::task::{self, Poll};
use crate::coding::{Decode, DecodeError};
use crate::object::Header;
use anyhow::Context;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use tokio::task::JoinSet;
use webtransport_generic::RecvStream as GenericRecvStream;
use webtransport_generic::{AsyncRecvStream, AsyncSession};
pub struct Receiver<S>
where
S: AsyncSession,
{
session: S,
// Streams that we've accepted but haven't read the header from yet.
streams: JoinSet<anyhow::Result<(Header, RecvStream<S::RecvStream>)>>,
}
impl<S> Receiver<S>
where
S: AsyncSession,
S::RecvStream: AsyncRecvStream,
{
pub fn new(session: S) -> Self {
Self {
session,
streams: JoinSet::new(),
}
}
pub async fn recv(&mut self) -> anyhow::Result<(Header, RecvStream<S::RecvStream>)> {
loop {
tokio::select! {
res = self.session.accept_uni() => {
let stream = res.context("failed to accept stream")?;
self.streams.spawn(async move { Self::read(stream).await });
},
res = self.streams.join_next(), if !self.streams.is_empty() => {
return res.unwrap().context("failed to run join set")?;
}
}
}
}
async fn read(mut stream: S::RecvStream) -> anyhow::Result<(Header, RecvStream<S::RecvStream>)> {
let mut buf = BytesMut::new();
loop {
// Read more data into the buffer.
stream.recv(&mut buf).await?;
// Use a cursor to read the buffer and remember how much we read.
let mut read = Cursor::new(&mut buf);
let header = match Header::decode(&mut read) {
Ok(header) => header,
Err(DecodeError::UnexpectedEnd) => continue,
Err(err) => return Err(err.into()),
};
// We parsed a full header, advance the buffer.
let size = read.position() as usize;
buf.advance(size);
let buf = buf.freeze();
// log::info!("received stream: {:?}", header);
let stream = RecvStream::new(buf, stream);
return Ok((header, stream));
}
}
}
// Unfortunately, we need to wrap RecvStream with a buffer since moq-transport::Coding only supports buffered reads.
// We first serve any data in the buffer, then we poll the stream.
// TODO fix this so we don't need the wrapper.
pub struct RecvStream<R>
where
R: GenericRecvStream,
{
buf: Bytes,
stream: R,
}
impl<R> RecvStream<R>
where
R: GenericRecvStream,
{
pub(crate) fn new(buf: Bytes, stream: R) -> Self {
Self { buf, stream }
}
pub fn stop(&mut self, code: u32) {
self.stream.stop(code)
}
}
impl<R> GenericRecvStream for RecvStream<R>
where
R: GenericRecvStream,
{
type Error = R::Error;
fn poll_recv<B: BufMut>(
&mut self,
cx: &mut task::Context<'_>,
buf: &mut B,
) -> Poll<Result<Option<usize>, Self::Error>> {
if !self.buf.is_empty() {
let size = self.buf.len();
buf.put(&mut self.buf);
let size = size - self.buf.len();
Poll::Ready(Ok(Some(size)))
} else {
self.stream.poll_recv(cx, buf)
}
}
fn stop(&mut self, error_code: u32) {
self.stream.stop(error_code)
}
}

View File

@ -0,0 +1,229 @@
use std::sync::{Mutex, Weak};
use std::task::{self, Poll};
use std::{collections::BinaryHeap, sync::Arc};
use anyhow::Context;
use bytes::{Buf, BytesMut};
use crate::coding::Encode;
use crate::object::Header;
use webtransport_generic::SendStream as GenericSendStream;
use webtransport_generic::{AsyncSendStream, AsyncSession};
// Allow this to be cloned so we can have multiple senders.
pub struct Sender<S>
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
{
// The session.
session: S,
// A reusable buffer for the stream header.
buf: BytesMut,
// Register new streams with an inner object that will prioritize them.
inner: Arc<Mutex<SenderInner<S::SendStream>>>,
}
impl<S> Sender<S>
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
{
pub fn new(session: S) -> Self {
let inner = SenderInner::new();
Self {
session,
buf: BytesMut::new(),
inner: Arc::new(Mutex::new(inner)),
}
}
pub async fn open(&mut self, header: Header) -> anyhow::Result<SendStream<S::SendStream>> {
let stream = self.session.open_uni().await.context("failed to open uni stream")?;
let mut stream = {
let mut inner = self.inner.lock().unwrap();
inner.register(stream, header.send_order.into_inner())?
};
self.buf.clear();
header.encode(&mut self.buf).unwrap();
stream.send_all(&mut self.buf).await.context("failed to write header")?;
// log::info!("created stream: {:?}", header);
header.encode(&mut self.buf).unwrap();
stream.send_all(&mut self.buf).await.context("failed to write header")?;
Ok(stream)
}
}
impl<S> Clone for Sender<S>
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
{
fn clone(&self) -> Self {
Sender {
session: self.session.clone(),
buf: BytesMut::new(),
inner: self.inner.clone(),
}
}
}
struct SenderInner<S>
where
S: GenericSendStream,
{
// Quinn supports a i32 for priority, but the wire format is a u64.
// Our work around is to keep a list of streams in priority order and use the index as the priority.
// This involves more work, so TODO either increase the Quinn size or reduce the wire size.
ordered: BinaryHeap<SendOrder<S>>,
ordered_swap: BinaryHeap<SendOrder<S>>, // reuse memory to avoid allocations
}
impl<S> SenderInner<S>
where
S: GenericSendStream,
{
fn new() -> Self {
Self {
ordered: BinaryHeap::new(),
ordered_swap: BinaryHeap::new(),
}
}
pub fn register(&mut self, stream: S, order: u64) -> anyhow::Result<SendStream<S>> {
let stream = SendStream::new(stream);
let order = SendOrder::new(&stream, order);
// Add the priority to our existing list.
self.ordered.push(order);
// Loop through the list and update the priorities of any still active streams.
let mut index = 0;
while let Some(stream) = self.ordered.pop() {
if stream.set_priority(index).is_some() {
// Add the stream to the new list so it'll be in sorted order.
self.ordered_swap.push(stream);
index += 1;
}
}
// Swap the lists so we can reuse the memory.
std::mem::swap(&mut self.ordered, &mut self.ordered_swap);
Ok(stream)
}
}
struct SendOrder<S>
where
S: GenericSendStream,
{
// We use Weak here so we don't prevent the stream from being closed when dereferenced.
// set_priority() will return None if the stream was closed.
stream: Weak<Mutex<S>>,
order: u64,
}
impl<S> SendOrder<S>
where
S: GenericSendStream,
{
fn new(stream: &SendStream<S>, order: u64) -> Self {
let stream = stream.weak();
Self { stream, order }
}
fn set_priority(&self, index: i32) -> Option<()> {
let stream = self.stream.upgrade()?;
let mut stream = stream.lock().unwrap();
stream.set_priority(index);
Some(())
}
}
impl<S> PartialEq for SendOrder<S>
where
S: GenericSendStream,
{
fn eq(&self, other: &Self) -> bool {
self.order == other.order
}
}
impl<S> Eq for SendOrder<S> where S: GenericSendStream {}
impl<S> PartialOrd for SendOrder<S>
where
S: GenericSendStream,
{
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
// We reverse the order so the lower send order is higher priority.
other.order.partial_cmp(&self.order)
}
}
impl<S> Ord for SendOrder<S>
where
S: GenericSendStream,
{
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// We reverse the order so the lower send order is higher priority.
other.order.cmp(&self.order)
}
}
// Ugh, so we need to wrap SendStream with a mutex because we need to be able to call set_priority on it.
// The problem is that set_priority takes a i32, while send_order is a VarInt
// So the solution is to maintain a priority queue of active streams and constantly update the priority with their index.
// So the library might update the priority of the stream at any point, while the application might similtaniously write to it.
pub struct SendStream<S>
where
S: GenericSendStream,
{
// All SendStream methods are &mut, so we need to wrap them with an internal mutex.
inner: Arc<Mutex<S>>,
}
impl<S> SendStream<S>
where
S: GenericSendStream,
{
pub(crate) fn new(stream: S) -> Self {
Self {
inner: Arc::new(Mutex::new(stream)),
}
}
pub fn weak(&self) -> Weak<Mutex<S>> {
Arc::<Mutex<S>>::downgrade(&self.inner)
}
}
impl<S> GenericSendStream for SendStream<S>
where
S: GenericSendStream,
{
type Error = S::Error;
fn poll_send<B: Buf>(&mut self, cx: &mut task::Context<'_>, buf: &mut B) -> Poll<Result<usize, Self::Error>> {
self.inner.lock().unwrap().poll_send(cx, buf)
}
fn reset(&mut self, reset_code: u32) {
self.inner.lock().unwrap().reset(reset_code)
}
// The application should NOT use this method.
// The library will automatically set the stream priority on creation based on the header.
fn set_priority(&mut self, order: i32) {
self.inner.lock().unwrap().set_priority(order)
}
}

View File

@ -0,0 +1,109 @@
use anyhow::Context;
use crate::{message, object, setup};
use webtransport_generic::{AsyncRecvStream, AsyncSendStream, AsyncSession};
pub struct Session<S>
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
S::RecvStream: AsyncRecvStream,
{
pub send_control: message::Sender<S::SendStream>,
pub recv_control: message::Receiver<S::RecvStream>,
pub send_objects: object::Sender<S>,
pub recv_objects: object::Receiver<S>,
}
impl<S> Session<S>
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
S::RecvStream: AsyncRecvStream,
{
/// Called by a server with an established WebTransport session.
// TODO close the session with an error code
pub async fn accept(session: S, role: setup::Role) -> anyhow::Result<Self> {
let (send, recv) = session.accept_bi().await.context("failed to accept bidi stream")?;
let mut send_control = message::Sender::new(send);
let mut recv_control = message::Receiver::new(recv);
let setup_client = match recv_control.recv().await.context("failed to read SETUP")? {
message::Message::SetupClient(setup) => setup,
_ => anyhow::bail!("expected CLIENT SETUP"),
};
setup_client
.versions
.iter()
.find(|version| **version == setup::Version::DRAFT_00)
.context("no supported versions")?;
if !setup_client.role.compatible(role) {
anyhow::bail!("incompatible roles: {:?} {:?}", setup_client.role, role);
}
let setup_server = setup::Server {
role,
version: setup::Version::DRAFT_00,
};
send_control
.send(message::Message::SetupServer(setup_server))
.await
.context("failed to send setup server")?;
let send_objects = object::Sender::new(session.clone());
let recv_objects = object::Receiver::new(session.clone());
Ok(Session {
send_control,
recv_control,
send_objects,
recv_objects,
})
}
/// Called by a client with an established WebTransport session.
pub async fn connect(session: S, role: setup::Role) -> anyhow::Result<Self> {
let (send, recv) = session.open_bi().await.context("failed to oen bidi stream")?;
let mut send_control = message::Sender::new(send);
let mut recv_control = message::Receiver::new(recv);
let setup_client = setup::Client {
role,
versions: vec![setup::Version::DRAFT_00].into(),
path: "".to_string(),
};
send_control
.send(message::Message::SetupClient(setup_client))
.await
.context("failed to send SETUP CLIENT")?;
let setup_server = match recv_control.recv().await.context("failed to read SETUP")? {
message::Message::SetupServer(setup) => setup,
_ => anyhow::bail!("expected SERVER SETUP"),
};
if setup_server.version != setup::Version::DRAFT_00 {
anyhow::bail!("unsupported version: {:?}", setup_server.version);
}
if !setup_server.role.compatible(role) {
anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role);
}
let send_objects = object::Sender::new(session.clone());
let recv_objects = object::Receiver::new(session.clone());
Ok(Session {
send_control,
recv_control,
send_objects,
recv_objects,
})
}
}

View File

@ -5,7 +5,7 @@ use bytes::{Buf, BufMut};
// Sent by the client to setup up the session.
#[derive(Debug)]
pub struct SetupClient {
pub struct Client {
// NOTE: This is not a message type, but rather the control stream header.
// Proposal: https://github.com/moq-wg/moq-transport/issues/138
@ -20,7 +20,7 @@ pub struct SetupClient {
pub path: String,
}
impl Decode for SetupClient {
impl Decode for Client {
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let versions = Versions::decode(r)?;
let role = Role::decode(r)?;
@ -30,7 +30,7 @@ impl Decode for SetupClient {
}
}
impl Encode for SetupClient {
impl Encode for Client {
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.versions.encode(w)?;
self.role.encode(w)?;

View File

@ -0,0 +1,9 @@
mod client;
mod role;
mod server;
mod version;
pub use client::*;
pub use role::*;
pub use server::*;
pub use version::*;

View File

@ -7,7 +7,7 @@ use bytes::{Buf, BufMut};
// NOTE: This is not a message type, but rather the control stream header.
// Proposal: https://github.com/moq-wg/moq-transport/issues/138
#[derive(Debug)]
pub struct SetupServer {
pub struct Server {
// The list of supported versions in preferred order.
pub version: Version,
@ -16,7 +16,7 @@ pub struct SetupServer {
pub role: Role,
}
impl Decode for SetupServer {
impl Decode for Server {
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let version = Version::decode(r)?;
let role = Role::decode(r)?;
@ -25,7 +25,7 @@ impl Decode for SetupServer {
}
}
impl Encode for SetupServer {
impl Encode for Server {
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.version.encode(w)?;
self.role.encode(w)?;

View File

@ -16,11 +16,12 @@ categories = [ "multimedia", "network-programming", "web-programming" ]
[dependencies]
moq-transport = { path = "../moq-transport" }
moq-transport-quinn = { path = "../moq-transport-quinn" }
webtransport-generic = { path = "../../webtransport-rs/webtransport-generic", version = "0.3" }
tokio = "1.27"
anyhow = "1.0.70"
log = "0.4" # TODO remove
bytes = "1.4"
# QUIC stuff
quinn = "0.10"

View File

@ -1,10 +1,5 @@
use super::watch;
use std::sync::Arc;
use bytes::Bytes;
// Use Arc to avoid cloning the data for each subscriber.
pub type Shared = Arc<Vec<u8>>;
// TODO combine fragments into the same buffer, instead of separate buffers.
pub type Publisher = watch::Publisher<Shared>;
pub type Subscriber = watch::Subscriber<Shared>;
pub type Publisher = watch::Publisher<Bytes>;
pub type Subscriber = watch::Subscriber<Bytes>;

View File

@ -2,4 +2,4 @@ pub mod broadcast;
pub mod fragment;
pub mod segment;
pub mod track;
pub(crate) mod watch;
pub mod watch;

View File

@ -1,5 +1,6 @@
use super::{fragment, watch};
use super::watch;
use bytes::Bytes;
use moq_transport::VarInt;
use std::ops::Deref;
use std::sync::Arc;
@ -21,7 +22,7 @@ pub struct Publisher {
pub info: Arc<Info>,
// A list of fragments that make up the segment.
pub fragments: watch::Publisher<fragment::Shared>,
pub fragments: watch::Publisher<Bytes>,
}
impl Publisher {
@ -53,7 +54,7 @@ pub struct Subscriber {
pub info: Arc<Info>,
// A list of fragments that make up the segment.
pub fragments: watch::Subscriber<fragment::Shared>,
pub fragments: watch::Subscriber<Bytes>,
}
impl Deref for Subscriber {

View File

@ -7,20 +7,20 @@ use std::sync::{Arc, Mutex};
use anyhow::Context;
#[derive(Clone, Default)]
pub struct Broadcasts {
pub struct Broker {
// Operate on the inner struct so we can share/clone the outer struct.
inner: Arc<Mutex<BroadcastsInner>>,
inner: Arc<Mutex<BrokerInner>>,
}
#[derive(Default)]
struct BroadcastsInner {
struct BrokerInner {
// TODO Automatically reclaim dropped sources.
lookup: HashMap<String, Arc<contribute::Broadcast>>,
updates: watch::Publisher<Update>,
updates: watch::Publisher<BrokerUpdate>,
}
#[derive(Clone)]
pub enum Update {
pub enum BrokerUpdate {
// Broadcast was announced
Insert(String), // TODO include source?
@ -28,13 +28,13 @@ pub enum Update {
Remove(String, broadcast::Error),
}
impl Broadcasts {
impl Broker {
pub fn new() -> Self {
Default::default()
}
// Return the list of available broadcasts, and a subscriber that will return updates (add/remove).
pub fn available(&self) -> (Vec<String>, watch::Subscriber<Update>) {
pub fn available(&self) -> (Vec<String>, watch::Subscriber<BrokerUpdate>) {
// Grab the lock.
let this = self.inner.lock().unwrap();
@ -55,7 +55,7 @@ impl Broadcasts {
}
this.lookup.insert(namespace.to_string(), source);
this.updates.push(Update::Insert(namespace.to_string()));
this.updates.push(BrokerUpdate::Insert(namespace.to_string()));
Ok(())
}
@ -64,7 +64,7 @@ impl Broadcasts {
let mut this = self.inner.lock().unwrap();
this.lookup.remove(namespace).context("namespace was not published")?;
this.updates.push(Update::Remove(namespace.to_string(), error));
this.updates.push(BrokerUpdate::Remove(namespace.to_string(), error));
Ok(())
}

View File

@ -2,28 +2,36 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time;
use tokio::io::AsyncBufReadExt;
use tokio::sync::mpsc;
use tokio::task::JoinSet; // lock across await boundaries
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
use moq_transport_quinn::{RecvObjects, RecvStream};
use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk};
use moq_transport::{object, VarInt};
use webtransport_generic::{AsyncRecvStream, AsyncSendStream, AsyncSession};
use bytes::BytesMut;
use anyhow::Context;
use super::{broker, control};
use crate::model::{broadcast, segment, track};
use crate::relay::{
message::{Component, Contribute},
Broker,
};
// TODO experiment with making this Clone, so every task can have its own copy.
pub struct Session {
pub struct Session<S>
where
S: AsyncSession,
{
// Used to receive objects.
objects: RecvObjects,
objects: object::Receiver<S>,
// Used to send and receive control messages.
control: control::Component<control::Contribute>,
control: Component<Contribute>,
// Globally announced namespaces, which we can add ourselves to.
broker: broker::Broadcasts,
broker: Broker,
// The names of active broadcasts being produced.
broadcasts: HashMap<String, Arc<Broadcast>>,
@ -35,12 +43,13 @@ pub struct Session {
run_segments: JoinSet<anyhow::Result<()>>, // receiving objects
}
impl Session {
pub fn new(
objects: RecvObjects,
control: control::Component<control::Contribute>,
broker: broker::Broadcasts,
) -> Self {
impl<S> Session<S>
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
S::RecvStream: AsyncRecvStream,
{
pub fn new(objects: object::Receiver<S>, control: Component<Contribute>, broker: Broker) -> Self {
Self {
objects,
control,
@ -79,23 +88,27 @@ impl Session {
}
}
async fn receive_message(&mut self, msg: control::Contribute) -> anyhow::Result<()> {
async fn receive_message(&mut self, msg: Contribute) -> anyhow::Result<()> {
match msg {
control::Contribute::Announce(msg) => self.receive_announce(msg).await,
control::Contribute::SubscribeOk(msg) => self.receive_subscribe_ok(msg),
control::Contribute::SubscribeError(msg) => self.receive_subscribe_error(msg),
Contribute::Announce(msg) => self.receive_announce(msg).await,
Contribute::SubscribeOk(msg) => self.receive_subscribe_ok(msg),
Contribute::SubscribeError(msg) => self.receive_subscribe_error(msg),
}
}
async fn receive_object(&mut self, object: Object, stream: RecvStream) -> anyhow::Result<()> {
let track = object.track;
async fn receive_object(
&mut self,
header: object::Header,
stream: object::RecvStream<S::RecvStream>,
) -> anyhow::Result<()> {
let track = header.track;
// Keep objects in memory for 10s
let expires = time::Instant::now() + time::Duration::from_secs(10);
let segment = segment::Info {
sequence: object.sequence,
send_order: object.send_order,
sequence: header.sequence,
send_order: header.send_order,
expires: Some(expires),
};
@ -113,18 +126,19 @@ impl Session {
Ok(())
}
async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> {
loop {
let buf = stream.fill_buf().await?;
if buf.is_empty() {
return Ok(());
async fn run_segment(
mut segment: segment::Publisher,
mut stream: object::RecvStream<S::RecvStream>,
) -> anyhow::Result<()> {
let mut buf = BytesMut::new();
while stream.recv(&mut buf).await?.is_some() {
// Split off the data we read into the buffer, freezing it so multiple threads can read simitaniously.
let data = buf.split().freeze();
segment.fragments.push(data);
}
let chunk = buf.to_vec();
stream.consume(chunk.len());
segment.fragments.push(chunk.into())
}
Ok(())
}
async fn receive_announce(&mut self, msg: Announce) -> anyhow::Result<()> {
@ -177,7 +191,10 @@ impl Session {
}
}
impl Drop for Session {
impl<S> Drop for Session<S>
where
S: AsyncSession,
{
fn drop(&mut self) {
// Unannounce all broadcasts we have announced.
// TODO make this automatic so we can't screw up?

View File

@ -1,33 +1,42 @@
use anyhow::Context;
use tokio::{io::AsyncWriteExt, task::JoinSet}; // allows locking across await
use tokio::task::JoinSet; // allows locking across await
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
use moq_transport_quinn::SendObjects;
use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk};
use moq_transport::{object, VarInt};
use webtransport_generic::{AsyncRecvStream, AsyncSendStream, AsyncSession};
use super::{broker, control};
use crate::model::{segment, track};
use crate::relay::{
message::{Component, Distribute},
Broker, BrokerUpdate,
};
pub struct Session {
pub struct Session<S>
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
{
// Objects are sent to the client
objects: SendObjects,
objects: object::Sender<S>,
// Used to send and receive control messages.
control: control::Component<control::Distribute>,
control: Component<Distribute>,
// Globally announced namespaces, which can be subscribed to.
broker: broker::Broadcasts,
broker: Broker,
// A list of tasks that are currently running.
run_subscribes: JoinSet<SubscribeError>, // run subscriptions, sending the returned error if they fail
}
impl Session {
pub fn new(
objects: SendObjects,
control: control::Component<control::Distribute>,
broker: broker::Broadcasts,
) -> Self {
impl<S> Session<S>
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
S::RecvStream: AsyncRecvStream,
{
pub fn new(objects: object::Sender<S>, control: Component<Distribute>, broker: Broker) -> Self {
Self {
objects,
control,
@ -40,7 +49,7 @@ impl Session {
// Announce all available tracks and get a stream of updates.
let (available, mut updates) = self.broker.available();
for namespace in available {
self.on_available(broker::Update::Insert(namespace)).await?;
self.on_available(BrokerUpdate::Insert(namespace)).await?;
}
loop {
@ -61,11 +70,11 @@ impl Session {
}
}
async fn receive_message(&mut self, msg: control::Distribute) -> anyhow::Result<()> {
async fn receive_message(&mut self, msg: Distribute) -> anyhow::Result<()> {
match msg {
control::Distribute::AnnounceOk(msg) => self.receive_announce_ok(msg),
control::Distribute::AnnounceError(msg) => self.receive_announce_error(msg),
control::Distribute::Subscribe(msg) => self.receive_subscribe(msg).await,
Distribute::AnnounceOk(msg) => self.receive_announce_ok(msg),
Distribute::AnnounceError(msg) => self.receive_announce_error(msg),
Distribute::Subscribe(msg) => self.receive_subscribe(msg).await,
}
}
@ -119,7 +128,11 @@ impl Session {
Ok(())
}
async fn run_subscribe(objects: SendObjects, track_id: VarInt, mut track: track::Subscriber) -> SubscribeError {
async fn run_subscribe(
objects: object::Sender<S>,
track_id: VarInt,
mut track: track::Subscriber,
) -> SubscribeError {
let mut tasks = JoinSet::new();
let mut result = None;
@ -154,11 +167,11 @@ impl Session {
}
async fn serve_group(
mut objects: SendObjects,
mut objects: object::Sender<S>,
track_id: VarInt,
mut segment: segment::Subscriber,
) -> anyhow::Result<()> {
let object = Object {
let object = object::Header {
track: track_id,
group: segment.sequence,
sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object
@ -168,8 +181,8 @@ impl Session {
let mut stream = objects.open(object).await?;
// Write each fragment as they are available.
while let Some(fragment) = segment.fragments.next().await {
stream.write_all(fragment.as_slice()).await?;
while let Some(mut fragment) = segment.fragments.next().await {
stream.send_all(&mut fragment).await?;
}
// NOTE: stream is automatically closed when dropped
@ -177,16 +190,16 @@ impl Session {
Ok(())
}
async fn on_available(&mut self, delta: broker::Update) -> anyhow::Result<()> {
async fn on_available(&mut self, delta: BrokerUpdate) -> anyhow::Result<()> {
match delta {
broker::Update::Insert(name) => {
BrokerUpdate::Insert(name) => {
self.control
.send(Announce {
track_namespace: name.clone(),
})
.await
}
broker::Update::Remove(name, error) => {
BrokerUpdate::Remove(name, error) => {
self.control
.send(AnnounceError {
track_namespace: name,

View File

@ -1,11 +1,18 @@
use tokio::sync::mpsc;
use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk};
use moq_transport_quinn::{RecvControl, SendControl};
use moq_transport::message::{
self, Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk,
};
use webtransport_generic::{AsyncRecvStream, AsyncSendStream, AsyncSession};
pub struct Main {
send_control: SendControl,
recv_control: RecvControl,
pub struct Main<S>
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
S::RecvStream: AsyncRecvStream,
{
send_control: message::Sender<S::SendStream>,
recv_control: message::Receiver<S::RecvStream>,
outgoing: mpsc::Receiver<Message>,
@ -13,7 +20,12 @@ pub struct Main {
distribute: mpsc::Sender<Distribute>,
}
impl Main {
impl<S> Main<S>
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
S::RecvStream: AsyncRecvStream,
{
pub async fn run(mut self) -> anyhow::Result<()> {
loop {
tokio::select! {
@ -53,10 +65,15 @@ impl<T> Component<T> {
}
// Splits a control stream into two components, based on if it's a message for contribution or distribution.
pub fn split(
send_control: SendControl,
recv_control: RecvControl,
) -> (Main, Component<Contribute>, Component<Distribute>) {
pub fn split<S>(
send_control: message::Sender<S::SendStream>,
recv_control: message::Receiver<S::RecvStream>,
) -> (Main<S>, Component<Contribute>, Component<Distribute>)
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
S::RecvStream: AsyncRecvStream,
{
let (outgoing_tx, outgoing_rx) = mpsc::channel(1);
let (contribute_tx, contribute_rx) = mpsc::channel(1);
let (distribute_tx, distribute_rx) = mpsc::channel(1);

View File

@ -1,8 +1,8 @@
pub mod broker;
mod broker;
mod contribute;
mod control;
mod distribute;
mod message;
mod session;
pub use broker::*;
pub use session::*;

View File

@ -1,17 +1,29 @@
use super::{broker, contribute, control, distribute};
use crate::relay::{contribute, distribute, message, Broker};
pub struct Session {
use webtransport_generic::{AsyncRecvStream, AsyncSendStream, AsyncSession};
pub struct Session<S>
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
S::RecvStream: AsyncRecvStream,
{
// Split logic into contribution/distribution to reduce the problem space.
contribute: contribute::Session,
distribute: distribute::Session,
contribute: contribute::Session<S>,
distribute: distribute::Session<S>,
// Used to receive control messages and forward to contribute/distribute.
control: control::Main,
control: message::Main<S>,
}
impl Session {
pub fn new(session: moq_transport_quinn::Session, broker: broker::Broadcasts) -> Session {
let (control, contribute, distribute) = control::split(session.send_control, session.recv_control);
impl<S> Session<S>
where
S: AsyncSession,
S::SendStream: AsyncSendStream,
S::RecvStream: AsyncRecvStream,
{
pub fn new(session: moq_transport::Session<S>, broker: Broker) -> Self {
let (control, contribute, distribute) = message::split(session.send_control, session.recv_control);
let contribute = contribute::Session::new(session.recv_objects, contribute, broker.clone());
let distribute = distribute::Session::new(session.send_objects, distribute, broker);