Make moq-transport generic. (#41)

The API is now synchronous and any quinn stuff has been moved to another
package. The quinn stuff will be slowly moved into moq-transport with
generic traits.
This commit is contained in:
kixelated 2023-07-08 09:13:29 -07:00 committed by GitHub
parent 3c43eed8bd
commit 7c3eae0a7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 1001 additions and 948 deletions

24
Cargo.lock generated
View File

@ -66,17 +66,6 @@ version = "1.0.71"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8"
[[package]]
name = "async-trait"
version = "0.1.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -763,15 +752,24 @@ dependencies = [
[[package]] [[package]]
name = "moq-transport" name = "moq-transport"
version = "0.1.0" version = "0.1.0"
dependencies = [
"bytes",
"log",
"thiserror",
]
[[package]]
name = "moq-transport-quinn"
version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"bytes", "bytes",
"h3", "h3",
"h3-quinn", "h3-quinn",
"h3-webtransport", "h3-webtransport",
"http", "http",
"log", "log",
"moq-transport",
"quinn", "quinn",
"thiserror", "thiserror",
"tokio", "tokio",
@ -782,9 +780,9 @@ name = "moq-warp"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"log", "log",
"moq-transport", "moq-transport",
"moq-transport-quinn",
"mp4", "mp4",
"quinn", "quinn",
"ring", "ring",

View File

@ -1,6 +1,7 @@
[workspace] [workspace]
members = [ members = [
"moq-transport", "moq-transport",
"moq-transport-quinn",
"moq-demo", "moq-demo",
"moq-warp", "moq-warp",
] ]

View File

@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
let broker = relay::broker::Broadcasts::new(); let broker = relay::broker::Broadcasts::new();
broker broker
.announce("demo", media.source()) .announce("quic.video/demo", media.source())
.context("failed to announce file source")?; .context("failed to announce file source")?;
// Create a server to actually serve the media // Create a server to actually serve the media

View File

@ -0,0 +1,31 @@
[package]
name = "moq-transport-quinn"
description = "Media over QUIC"
authors = [ "Luke Curley" ]
repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0"
version = "0.1.0"
edition = "2021"
keywords = [ "quic", "http3", "webtransport", "media", "live" ]
categories = [ "multimedia", "network-programming", "web-programming" ]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
moq-transport = { path = "../moq-transport" }
# WebTransport support: TODO pin a version when released
h3 = { git = "https://github.com/hyperium/h3", branch = "master" }
h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" }
h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" }
quinn = "0.10"
http = "0.2"
tokio = { version = "1.27", features = ["macros"] }
bytes = "1"
log = "0.4"
anyhow = "1.0.70"
thiserror = "1.0.21"

View File

@ -0,0 +1,122 @@
use moq_transport::{Decode, DecodeError, Encode, Message};
use bytes::{Buf, Bytes, BytesMut};
use h3::quic::BidiStream;
use std::io::Cursor;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
pub struct Control {
sender: ControlSend,
recver: ControlRecv,
}
impl Control {
pub(crate) fn new(stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<Bytes>, Bytes>) -> Self {
let (sender, recver) = stream.split();
let sender = ControlSend::new(sender);
let recver = ControlRecv::new(recver);
Self { sender, recver }
}
pub fn split(self) -> (ControlSend, ControlRecv) {
(self.sender, self.recver)
}
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
self.sender.send(msg).await
}
pub async fn recv(&mut self) -> anyhow::Result<Message> {
self.recver.recv().await
}
}
pub struct ControlSend {
stream: h3_webtransport::stream::SendStream<h3_quinn::SendStream<Bytes>, Bytes>,
buf: BytesMut, // reuse a buffer to encode messages.
}
impl ControlSend {
pub fn new(inner: h3_webtransport::stream::SendStream<h3_quinn::SendStream<Bytes>, Bytes>) -> Self {
Self {
buf: BytesMut::new(),
stream: inner,
}
}
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
let msg = msg.into();
log::info!("sending message: {:?}", msg);
self.buf.clear();
msg.encode(&mut self.buf)?;
// TODO make this work with select!
self.stream.write_all(&self.buf).await?;
Ok(())
}
// Helper that lets multiple threads send control messages.
pub fn share(self) -> ControlShared {
ControlShared {
stream: Arc::new(Mutex::new(self)),
}
}
}
// Helper that allows multiple threads to send control messages.
// There's no equivalent for receiving since only one thread should be receiving at a time.
#[derive(Clone)]
pub struct ControlShared {
stream: Arc<Mutex<ControlSend>>,
}
impl ControlShared {
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
let mut stream = self.stream.lock().await;
stream.send(msg).await
}
}
pub struct ControlRecv {
stream: h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, Bytes>,
buf: BytesMut, // data we've read but haven't fully decoded yet
}
impl ControlRecv {
pub fn new(inner: h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, Bytes>) -> Self {
Self {
buf: BytesMut::new(),
stream: inner,
}
}
// Read the next full message from the stream.
pub async fn recv(&mut self) -> anyhow::Result<Message> {
loop {
// Read the contents of the buffer
let mut peek = Cursor::new(&self.buf);
match Message::decode(&mut peek) {
Ok(msg) => {
// We've successfully decoded a message, so we can advance the buffer.
self.buf.advance(peek.position() as usize);
log::info!("received message: {:?}", msg);
return Ok(msg);
}
Err(DecodeError::UnexpectedEnd) => {
// The decode failed, so we need to append more data.
self.stream.read_buf(&mut self.buf).await?;
}
Err(e) => return Err(e.into()),
}
}
}
}

View File

@ -0,0 +1,7 @@
mod control;
mod object;
mod server;
pub use control::*;
pub use object::*;
pub use server::*;

View File

@ -0,0 +1,136 @@
use anyhow::Context;
use bytes::{Buf, Bytes, BytesMut};
use moq_transport::{Decode, DecodeError, Encode, Object};
use std::{io::Cursor, sync::Arc};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
// TODO support clients
type WebTransportSession = h3_webtransport::server::WebTransportSession<h3_quinn::Connection, Bytes>;
// Reduce some typing
pub type SendStream = h3_webtransport::stream::SendStream<h3_quinn::SendStream<Bytes>, Bytes>;
pub type RecvStream = h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, Bytes>;
pub struct Objects {
send: SendObjects,
recv: RecvObjects,
}
impl Objects {
pub fn new(session: Arc<WebTransportSession>) -> Self {
let send = SendObjects::new(session.clone());
let recv = RecvObjects::new(session);
Self { send, recv }
}
pub fn split(self) -> (SendObjects, RecvObjects) {
(self.send, self.recv)
}
pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> {
self.recv.recv().await
}
pub async fn send(&mut self, header: Object) -> anyhow::Result<SendStream> {
self.send.send(header).await
}
}
pub struct SendObjects {
session: Arc<WebTransportSession>,
// A reusable buffer for encoding messages.
buf: BytesMut,
}
impl SendObjects {
pub fn new(session: Arc<WebTransportSession>) -> Self {
Self {
session,
buf: BytesMut::new(),
}
}
pub async fn send(&mut self, header: Object) -> anyhow::Result<SendStream> {
self.buf.clear();
header.encode(&mut self.buf).unwrap();
let mut stream = self
.session
.open_uni(self.session.session_id())
.await
.context("failed to open uni stream")?;
// TODO support select! without making a new stream.
stream.write_all(&self.buf).await?;
Ok(stream)
}
}
impl Clone for SendObjects {
fn clone(&self) -> Self {
Self {
session: self.session.clone(),
buf: BytesMut::new(),
}
}
}
// Not clone, so we don't accidentally have two listners.
pub struct RecvObjects {
session: Arc<WebTransportSession>,
// A uni stream that's been accepted but not fully read from yet.
stream: Option<RecvStream>,
// Data that we've read but haven't formed a full message yet.
buf: BytesMut,
}
impl RecvObjects {
pub fn new(session: Arc<WebTransportSession>) -> Self {
Self {
session,
stream: None,
buf: BytesMut::new(),
}
}
pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> {
// Make sure any state is saved across await boundaries so this works with select!
let stream = match self.stream.as_mut() {
Some(stream) => stream,
None => {
let (_session_id, stream) = self
.session
.accept_uni()
.await
.context("failed to accept uni stream")?
.context("no uni stream")?;
self.stream.insert(stream)
}
};
loop {
// Read the contents of the buffer
let mut peek = Cursor::new(&self.buf);
match Object::decode(&mut peek) {
Ok(header) => {
let stream = self.stream.take().unwrap();
self.buf.advance(peek.position() as usize);
return Ok((header, stream));
}
Err(DecodeError::UnexpectedEnd) => {
// The decode failed, so we need to append more data.
stream.read_buf(&mut self.buf).await?;
}
Err(e) => return Err(e.into()),
}
}
}
}

View File

@ -0,0 +1,179 @@
use std::sync::Arc;
use anyhow::Context;
use bytes::Bytes;
use tokio::task::JoinSet;
use moq_transport::{Message, SetupClient, SetupServer};
use super::{Control, Objects};
pub struct Server {
// The QUIC server, yielding new connections and sessions.
endpoint: quinn::Endpoint,
// A list of connections that are completing the WebTransport handshake.
handshake: JoinSet<anyhow::Result<Connect>>,
}
impl Server {
pub fn new(endpoint: quinn::Endpoint) -> Self {
let handshake = JoinSet::new();
Self { endpoint, handshake }
}
// Accept the next WebTransport session.
pub async fn accept(&mut self) -> anyhow::Result<Connect> {
loop {
tokio::select!(
// Accept the connection and start the WebTransport handshake.
conn = self.endpoint.accept() => {
let conn = conn.context("failed to accept connection")?;
self.handshake.spawn(async move {
Connecting::new(conn).accept().await
});
},
// Return any mostly finished WebTransport handshakes.
res = self.handshake.join_next(), if !self.handshake.is_empty() => {
let res = res.expect("no tasks").expect("task aborted");
match res {
Ok(session) => return Ok(session),
Err(err) => log::warn!("failed to accept session: {:?}", err),
}
},
)
}
}
}
struct Connecting {
conn: quinn::Connecting,
}
impl Connecting {
pub fn new(conn: quinn::Connecting) -> Self {
Self { conn }
}
pub async fn accept(self) -> anyhow::Result<Connect> {
let conn = self.conn.await.context("failed to accept h3 connection")?;
let mut conn = h3::server::builder()
.enable_webtransport(true)
.enable_connect(true)
.enable_datagram(true)
.max_webtransport_sessions(1)
.send_grease(true)
.build(h3_quinn::Connection::new(conn))
.await
.context("failed to create h3 server")?;
let (req, stream) = conn
.accept()
.await
.context("failed to accept h3 session")?
.context("failed to accept h3 request")?;
let ext = req.extensions();
anyhow::ensure!(req.method() == http::Method::CONNECT, "expected CONNECT request");
anyhow::ensure!(
ext.get::<h3::ext::Protocol>() == Some(&h3::ext::Protocol::WEB_TRANSPORT),
"expected WebTransport CONNECT"
);
// Let the application decide if we accept this CONNECT request.
Ok(Connect { conn, req, stream })
}
}
// The WebTransport CONNECT has arrived, and we need to decide if we accept it.
pub struct Connect {
// Inspect to decide whether to accept() or reject() the session.
req: http::Request<()>,
conn: h3::server::Connection<h3_quinn::Connection, Bytes>,
stream: h3::server::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
}
impl Connect {
// Expose the received URI
pub fn uri(&self) -> &http::Uri {
self.req.uri()
}
// Accept the WebTransport session.
pub async fn accept(self) -> anyhow::Result<Setup> {
let session = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?;
let session = Arc::new(session);
let stream = session
.accept_bi()
.await
.context("failed to accept bidi stream")?
.unwrap();
let objects = Objects::new(session.clone());
let stream = match stream {
h3_webtransport::server::AcceptedBi::BidiStream(_session_id, stream) => stream,
h3_webtransport::server::AcceptedBi::Request(..) => anyhow::bail!("additional http requests not supported"),
};
let mut control = Control::new(stream);
let setup = match control.recv().await.context("failed to read SETUP")? {
Message::SetupClient(setup) => setup,
_ => anyhow::bail!("expected CLIENT SETUP"),
};
// Let the application decide if we accept this MoQ session.
Ok(Setup {
setup,
control,
objects,
})
}
// Reject the WebTransport session with a HTTP response.
pub async fn reject(mut self, resp: http::Response<()>) -> anyhow::Result<()> {
self.stream.send_response(resp).await?;
Ok(())
}
}
pub struct Setup {
setup: SetupClient,
control: Control,
objects: Objects,
}
impl Setup {
// Return the setup message we received.
pub fn setup(&self) -> &SetupClient {
&self.setup
}
// Accept the session with our own setup message.
pub async fn accept(mut self, setup: SetupServer) -> anyhow::Result<Session> {
self.control.send(setup).await?;
Ok(Session {
control: self.control,
objects: self.objects,
})
}
pub async fn reject(self) -> anyhow::Result<()> {
// TODO Close the QUIC connection with an error code.
Ok(())
}
}
pub struct Session {
pub control: Control,
pub objects: Objects,
}
impl Session {
pub fn split(self) -> (Control, Objects) {
(self.control, self.objects)
}
}

View File

@ -15,17 +15,6 @@ categories = [ "multimedia", "network-programming", "web-programming" ]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
# WebTransport support: TODO pin a version when released
h3 = { git = "https://github.com/hyperium/h3", branch = "master" }
h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" }
h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" }
quinn = "0.10"
http = "0.2"
tokio = { version = "1.27", features = ["macros"] }
bytes = "1" bytes = "1"
log = "0.4"
anyhow = "1.0.70"
thiserror = "1.0.21" thiserror = "1.0.21"
async-trait = "0.1" log = "0.4"

View File

@ -1,40 +1,76 @@
use super::VarInt; use super::VarInt;
use bytes::Bytes; use bytes::{Buf, Bytes};
use std::str; use std::str;
use async_trait::async_trait; use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt};
#[derive(Error, Debug)]
pub enum DecodeError {
#[error("unexpected end of buffer")]
UnexpectedEnd,
#[error("invalid string")]
InvalidString(#[from] str::Utf8Error),
#[error("invalid type: {0:?}")]
InvalidType(VarInt),
#[error("unknown error")]
Unknown,
}
#[async_trait]
pub trait Decode: Sized { pub trait Decode: Sized {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self>; // Decodes a message, returning UnexpectedEnd if there's not enough bytes in the buffer.
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError>;
} }
#[async_trait]
impl Decode for Bytes { impl Decode for Bytes {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
Vec::<u8>::decode(r).await.map(Bytes::from) let size = VarInt::decode(r)?.into_inner() as usize;
} if r.remaining() < size {
return Err(DecodeError::UnexpectedEnd);
} }
#[async_trait] let buf = r.copy_to_bytes(size);
impl Decode for Vec<u8> {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let size = VarInt::decode(r).await?;
// NOTE: we don't use with_capacity since size is from an untrusted source
let mut buf = Vec::new();
r.take(size.into()).read_to_end(&mut buf).await?;
Ok(buf) Ok(buf)
} }
} }
#[async_trait] impl Decode for Vec<u8> {
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
Bytes::decode(r).map(|b| b.to_vec())
}
}
impl Decode for String { impl Decode for String {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let data = Vec::decode(r).await?; let data = Vec::decode(r)?;
let s = str::from_utf8(&data)?.to_string(); let s = str::from_utf8(&data)?.to_string();
Ok(s) Ok(s)
} }
} }
impl Decode for u8 {
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
if r.remaining() < 1 {
return Err(DecodeError::UnexpectedEnd);
}
Ok(r.get_u8())
}
}
/*
impl<const N: usize> Decode for [u8; N] {
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
if r.remaining() < N {
return Err(DecodeError::UnexpectedEnd);
}
let mut buf = [0; N];
r.copy_to_slice(&mut buf);
Ok(buf)
}
}
*/

View File

@ -1,23 +1,20 @@
use crate::coding::{Decode, Encode, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use async_trait::async_trait; use bytes::{Buf, BufMut};
use tokio::io::{AsyncRead, AsyncWrite};
use std::time::Duration; use std::time::Duration;
#[async_trait]
impl Encode for Duration { impl Encode for Duration {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
let ms = self.as_millis(); let ms = self.as_millis();
let ms = VarInt::try_from(ms)?; let ms = VarInt::try_from(ms)?;
ms.encode(w).await ms.encode(w)
} }
} }
#[async_trait]
impl Decode for Duration { impl Decode for Duration {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let ms = VarInt::decode(r).await?; let ms = VarInt::decode(r)?;
Ok(Self::from_millis(ms.into())) Ok(Self::from_millis(ms.into()))
} }
} }

View File

@ -1,41 +1,95 @@
use async_trait::async_trait; use super::{BoundsExceeded, VarInt};
use tokio::io::{AsyncWrite, AsyncWriteExt}; use bytes::{BufMut, Bytes};
use super::VarInt; use thiserror::Error;
use bytes::Bytes;
#[derive(Error, Debug)]
pub enum EncodeError {
#[error("unexpected end of buffer")]
UnexpectedEnd,
#[error("varint too large")]
BoundsExceeded(#[from] BoundsExceeded),
#[error("unknown error")]
Unknown,
}
#[async_trait]
pub trait Encode: Sized { pub trait Encode: Sized {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()>; fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError>;
} }
#[async_trait]
impl Encode for Bytes { impl Encode for Bytes {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.as_ref().encode(w).await self.as_ref().encode(w)
} }
} }
#[async_trait]
impl Encode for Vec<u8> { impl Encode for Vec<u8> {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.as_slice().encode(w).await self.as_slice().encode(w)
} }
} }
#[async_trait]
impl Encode for &[u8] { impl Encode for &[u8] {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
let size: VarInt = self.len().try_into()?; let size = VarInt::try_from(self.len())?;
size.encode(w).await?; size.encode(w)?;
w.write_all(self).await?;
if w.remaining_mut() < self.len() {
return Err(EncodeError::UnexpectedEnd);
}
w.put_slice(self);
Ok(()) Ok(())
} }
} }
#[async_trait]
impl Encode for String { impl Encode for String {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.as_bytes().encode(w).await self.as_bytes().encode(w)
}
}
impl Encode for u8 {
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
if w.remaining_mut() < 1 {
return Err(EncodeError::UnexpectedEnd);
}
w.put_u8(*self);
Ok(())
}
}
impl Encode for u16 {
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
if w.remaining_mut() < 2 {
return Err(EncodeError::UnexpectedEnd);
}
w.put_u16(*self);
Ok(())
}
}
impl Encode for u32 {
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
if w.remaining_mut() < 4 {
return Err(EncodeError::UnexpectedEnd);
}
w.put_u32(*self);
Ok(())
}
}
impl Encode for u64 {
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
if w.remaining_mut() < 8 {
return Err(EncodeError::UnexpectedEnd);
}
w.put_u64(*self);
Ok(())
} }
} }

View File

@ -5,12 +5,11 @@
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::fmt; use std::fmt;
use crate::coding::{Decode, Encode}; use crate::coding::{Decode, DecodeError, Encode, EncodeError};
use bytes::{Buf, BufMut};
use thiserror::Error; use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
#[derive(Debug, Copy, Clone, Eq, PartialEq, Error)] #[derive(Debug, Copy, Clone, Eq, PartialEq, Error)]
#[error("value too large for varint encoding")] #[error("value too large for varint encoding")]
pub struct BoundsExceeded; pub struct BoundsExceeded;
@ -102,6 +101,7 @@ impl TryFrom<u128> for VarInt {
impl TryFrom<usize> for VarInt { impl TryFrom<usize> for VarInt {
type Error = BoundsExceeded; type Error = BoundsExceeded;
/// Succeeds iff `x` < 2^62 /// Succeeds iff `x` < 2^62
fn try_from(x: usize) -> Result<Self, BoundsExceeded> { fn try_from(x: usize) -> Result<Self, BoundsExceeded> {
Self::try_from(x as u64) Self::try_from(x as u64)
@ -120,13 +120,15 @@ impl fmt::Display for VarInt {
} }
} }
use async_trait::async_trait;
#[async_trait]
impl Decode for VarInt { impl Decode for VarInt {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let mut buf = [0; 8]; let mut buf = [0; 8];
r.read_exact(buf[0..1].as_mut()).await?;
if r.remaining() < 1 {
return Err(DecodeError::UnexpectedEnd);
}
buf[0] = r.get_u8();
let tag = buf[0] >> 6; let tag = buf[0] >> 6;
buf[0] &= 0b0011_1111; buf[0] &= 0b0011_1111;
@ -134,15 +136,27 @@ impl Decode for VarInt {
let x = match tag { let x = match tag {
0b00 => u64::from(buf[0]), 0b00 => u64::from(buf[0]),
0b01 => { 0b01 => {
r.read_exact(buf[1..2].as_mut()).await?; if r.remaining() < 1 {
return Err(DecodeError::UnexpectedEnd);
}
r.copy_to_slice(buf[1..2].as_mut());
u64::from(u16::from_be_bytes(buf[..2].try_into().unwrap())) u64::from(u16::from_be_bytes(buf[..2].try_into().unwrap()))
} }
0b10 => { 0b10 => {
r.read_exact(buf[1..4].as_mut()).await?; if r.remaining() < 3 {
return Err(DecodeError::UnexpectedEnd);
}
r.copy_to_slice(buf[1..4].as_mut());
u64::from(u32::from_be_bytes(buf[..4].try_into().unwrap())) u64::from(u32::from_be_bytes(buf[..4].try_into().unwrap()))
} }
0b11 => { 0b11 => {
r.read_exact(buf[1..8].as_mut()).await?; if r.remaining() < 7 {
return Err(DecodeError::UnexpectedEnd);
}
r.copy_to_slice(buf[1..8].as_mut());
u64::from_be_bytes(buf) u64::from_be_bytes(buf)
} }
_ => unreachable!(), _ => unreachable!(),
@ -152,22 +166,19 @@ impl Decode for VarInt {
} }
} }
#[async_trait]
impl Encode for VarInt { impl Encode for VarInt {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
let x = self.0; let x = self.0;
if x < 2u64.pow(6) { if x < 2u64.pow(6) {
w.write_u8(x as u8).await?; (x as u8).encode(w)
} else if x < 2u64.pow(14) { } else if x < 2u64.pow(14) {
w.write_u16(0b01 << 14 | x as u16).await?; (0b01 << 14 | x as u16).encode(w)
} else if x < 2u64.pow(30) { } else if x < 2u64.pow(30) {
w.write_u32(0b10 << 30 | x as u32).await?; (0b10 << 30 | x as u32).encode(w)
} else if x < 2u64.pow(62) { } else if x < 2u64.pow(62) {
w.write_u64(0b11 << 62 | x).await?; (0b11 << 62 | x).encode(w)
} else { } else {
anyhow::bail!("malformed VarInt"); unreachable!("malformed VarInt");
} }
Ok(())
} }
} }

View File

@ -1,7 +1,6 @@
use crate::coding::{Decode, Encode}; use crate::coding::{Decode, DecodeError, Encode, EncodeError};
use async_trait::async_trait; use bytes::{Buf, BufMut};
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)] #[derive(Debug)]
pub struct Announce { pub struct Announce {
@ -9,18 +8,16 @@ pub struct Announce {
pub track_namespace: String, pub track_namespace: String,
} }
#[async_trait]
impl Decode for Announce { impl Decode for Announce {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let track_namespace = String::decode(r).await?; let track_namespace = String::decode(r)?;
Ok(Self { track_namespace }) Ok(Self { track_namespace })
} }
} }
#[async_trait]
impl Encode for Announce { impl Encode for Announce {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.track_namespace.encode(w).await?; self.track_namespace.encode(w)?;
Ok(()) Ok(())
} }
} }

View File

@ -1,7 +1,6 @@
use crate::coding::{Decode, Encode, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use async_trait::async_trait; use bytes::{Buf, BufMut};
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)] #[derive(Debug)]
pub struct AnnounceError { pub struct AnnounceError {
@ -16,12 +15,11 @@ pub struct AnnounceError {
pub reason: String, pub reason: String,
} }
#[async_trait]
impl Decode for AnnounceError { impl Decode for AnnounceError {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let track_namespace = String::decode(r).await?; let track_namespace = String::decode(r)?;
let code = VarInt::decode(r).await?; let code = VarInt::decode(r)?;
let reason = String::decode(r).await?; let reason = String::decode(r)?;
Ok(Self { Ok(Self {
track_namespace, track_namespace,
@ -31,12 +29,11 @@ impl Decode for AnnounceError {
} }
} }
#[async_trait]
impl Encode for AnnounceError { impl Encode for AnnounceError {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.track_namespace.encode(w).await?; self.track_namespace.encode(w)?;
self.code.encode(w).await?; self.code.encode(w)?;
self.reason.encode(w).await?; self.reason.encode(w)?;
Ok(()) Ok(())
} }

View File

@ -1,7 +1,6 @@
use crate::coding::{Decode, Encode}; use crate::coding::{Decode, DecodeError, Encode, EncodeError};
use async_trait::async_trait; use bytes::{Buf, BufMut};
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)] #[derive(Debug)]
pub struct AnnounceOk { pub struct AnnounceOk {
@ -10,17 +9,15 @@ pub struct AnnounceOk {
pub track_namespace: String, pub track_namespace: String,
} }
#[async_trait]
impl Decode for AnnounceOk { impl Decode for AnnounceOk {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let track_namespace = String::decode(r).await?; let track_namespace = String::decode(r)?;
Ok(Self { track_namespace }) Ok(Self { track_namespace })
} }
} }
#[async_trait]
impl Encode for AnnounceOk { impl Encode for AnnounceOk {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.track_namespace.encode(w).await self.track_namespace.encode(w)
} }
} }

View File

@ -1,24 +1,21 @@
use crate::coding::{Decode, Encode}; use crate::coding::{Decode, DecodeError, Encode, EncodeError};
use async_trait::async_trait; use bytes::{Buf, BufMut};
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)] #[derive(Debug)]
pub struct GoAway { pub struct GoAway {
pub url: String, pub url: String,
} }
#[async_trait]
impl Decode for GoAway { impl Decode for GoAway {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let url = String::decode(r).await?; let url = String::decode(r)?;
Ok(Self { url }) Ok(Self { url })
} }
} }
#[async_trait]
impl Encode for GoAway { impl Encode for GoAway {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.url.encode(w).await self.url.encode(w)
} }
} }

View File

@ -2,27 +2,38 @@ mod announce;
mod announce_error; mod announce_error;
mod announce_ok; mod announce_ok;
mod go_away; mod go_away;
mod stream; mod role;
mod setup_client;
mod setup_server;
mod subscribe; mod subscribe;
mod subscribe_error; mod subscribe_error;
mod subscribe_ok; mod subscribe_ok;
mod version;
pub use announce::*; pub use announce::*;
pub use announce_error::*; pub use announce_error::*;
pub use announce_ok::*; pub use announce_ok::*;
pub use go_away::*; pub use go_away::*;
pub use stream::*; pub use role::*;
pub use setup_client::*;
pub use setup_server::*;
pub use subscribe::*; pub use subscribe::*;
pub use subscribe_error::*; pub use subscribe_error::*;
pub use subscribe_ok::*; pub use subscribe_ok::*;
pub use version::*;
use crate::coding::{Decode, Encode, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use async_trait::async_trait; use bytes::{Buf, BufMut};
use std::fmt; use std::fmt;
use tokio::io::{AsyncRead, AsyncWrite};
use anyhow::Context; // NOTE: This is forked from moq-transport-00.
// 1. SETUP role indicates local support ("I can subscribe"), not remote support ("server must publish")
// 2. SETUP_SERVER is id=2 to disambiguate
// 3. messages do not have a specified length.
// 4. messages are sent over a single bidrectional stream (after SETUP), not unidirectional streams.
// 5. SUBSCRIBE specifies the track_id, not SUBSCRIBE_OK
// 6. optional parameters are written in order, and zero when unset (setup, announce, subscribe)
// Use a macro to generate the message types rather than copy-paste. // Use a macro to generate the message types rather than copy-paste.
// This implements a decode/encode method that uses the specified type. // This implements a decode/encode method that uses the specified type.
@ -32,45 +43,33 @@ macro_rules! message_types {
$($name($name)),* $($name($name)),*
} }
#[async_trait]
impl Decode for Message { impl Decode for Message {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let t = VarInt::decode(r).await.context("failed to decode type")?; let t = VarInt::decode(r)?;
Ok(match t.into_inner() { match t.into_inner() {
$($val => { $($val => {
let msg = $name::decode(r).await.context(concat!("failed to decode ", stringify!($name)))?; let msg = $name::decode(r)?;
Self::$name(msg) Ok(Self::$name(msg))
})* })*
_ => anyhow::bail!("invalid type: {}", t), _ => Err(DecodeError::InvalidType(t)),
}) }
} }
} }
#[async_trait]
impl Encode for Message { impl Encode for Message {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
match self { match self {
$(Self::$name(ref m) => { $(Self::$name(ref m) => {
VarInt::from_u32($val).encode(w).await.context("failed to encode type")?; VarInt::from_u32($val).encode(w)?;
m.encode(w).await.context("failed to encode message") m.encode(w)
},)* },)*
} }
} }
} }
// Unwrap the enum into the specified type.
$(impl TryFrom<Message> for $name {
type Error = anyhow::Error;
fn try_from(m: Message) -> Result<Self, Self::Error> {
match m {
Message::$name(m) => Ok(m),
_ => anyhow::bail!("invalid message type"),
}
}
})*
$(impl From<$name> for Message { $(impl From<$name> for Message {
fn from(m: $name) -> Self { fn from(m: $name) -> Self {
Message::$name(m) Message::$name(m)
@ -88,17 +87,12 @@ macro_rules! message_types {
} }
} }
// NOTE: These messages are forked from moq-transport-00.
// 1. subscribe specifies the track_id, not subscribe_ok
// 2. messages lack a specified length
// 3. optional parameters are not supported (announce, subscribe)
// 4. not allowed on undirectional streams; only after SETUP on the bidirectional stream
// Each message is prefixed with the given VarInt type. // Each message is prefixed with the given VarInt type.
message_types! { message_types! {
// NOTE: Object and Setup are in other modules. // NOTE: Object and Setup are in other modules.
// Object = 0x0 // Object = 0x0
// Setup = 0x1 SetupClient = 0x1,
SetupServer = 0x2,
Subscribe = 0x3, Subscribe = 0x3,
SubscribeOk = 0x4, SubscribeOk = 0x4,
SubscribeError = 0x5, SubscribeError = 0x5,

View File

@ -1,7 +1,6 @@
use async_trait::async_trait; use bytes::{Buf, BufMut};
use tokio::io::{AsyncRead, AsyncWrite};
use crate::coding::{Decode, Encode, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Role { pub enum Role {
@ -37,29 +36,27 @@ impl From<Role> for VarInt {
} }
impl TryFrom<VarInt> for Role { impl TryFrom<VarInt> for Role {
type Error = anyhow::Error; type Error = DecodeError;
fn try_from(v: VarInt) -> Result<Self, Self::Error> { fn try_from(v: VarInt) -> Result<Self, Self::Error> {
Ok(match v.into_inner() { match v.into_inner() {
0x0 => Self::Publisher, 0x0 => Ok(Self::Publisher),
0x1 => Self::Subscriber, 0x1 => Ok(Self::Subscriber),
0x2 => Self::Both, 0x2 => Ok(Self::Both),
_ => anyhow::bail!("invalid role: {}", v), _ => Err(DecodeError::InvalidType(v)),
}) }
} }
} }
#[async_trait]
impl Decode for Role { impl Decode for Role {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let v = VarInt::decode(r).await?; let v = VarInt::decode(r)?;
v.try_into() v.try_into()
} }
} }
#[async_trait]
impl Encode for Role { impl Encode for Role {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
VarInt::from(*self).encode(w).await VarInt::from(*self).encode(w)
} }
} }

View File

@ -0,0 +1,41 @@
use super::{Role, Versions};
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
use bytes::{Buf, BufMut};
// Sent by the client to setup up the session.
#[derive(Debug)]
pub struct SetupClient {
// NOTE: This is not a message type, but rather the control stream header.
// Proposal: https://github.com/moq-wg/moq-transport/issues/138
// The list of supported versions in preferred order.
pub versions: Versions,
// Indicate if the client is a publisher, a subscriber, or both.
// Proposal: moq-wg/moq-transport#151
pub role: Role,
// The path, non-empty ONLY when not using WebTransport.
pub path: String,
}
impl Decode for SetupClient {
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let versions = Versions::decode(r)?;
let role = Role::decode(r)?;
let path = String::decode(r)?;
Ok(Self { versions, role, path })
}
}
impl Encode for SetupClient {
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.versions.encode(w)?;
self.role.encode(w)?;
self.path.encode(w)?;
Ok(())
}
}

View File

@ -0,0 +1,35 @@
use super::{Role, Version};
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
use bytes::{Buf, BufMut};
// Sent by the server in response to a client.
// NOTE: This is not a message type, but rather the control stream header.
// Proposal: https://github.com/moq-wg/moq-transport/issues/138
#[derive(Debug)]
pub struct SetupServer {
// The list of supported versions in preferred order.
pub version: Version,
// param: 0x0: Indicate if the server is a publisher, a subscriber, or both.
// Proposal: moq-wg/moq-transport#151
pub role: Role,
}
impl Decode for SetupServer {
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let version = Version::decode(r)?;
let role = Role::decode(r)?;
Ok(Self { version, role })
}
}
impl Encode for SetupServer {
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.version.encode(w)?;
self.role.encode(w)?;
Ok(())
}
}

View File

@ -1,79 +0,0 @@
use crate::coding::{Decode, Encode};
use crate::control::Message;
use bytes::Bytes;
use h3::quic::BidiStream;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct Stream {
sender: SendStream,
recver: RecvStream,
}
impl Stream {
pub(crate) fn new(stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<Bytes>, Bytes>) -> Self {
let (sender, recver) = stream.split();
let sender = SendStream { stream: sender };
let recver = RecvStream { stream: recver };
Self { sender, recver }
}
pub fn split(self) -> (SendStream, RecvStream) {
(self.sender, self.recver)
}
pub async fn send(&mut self, msg: Message) -> anyhow::Result<()> {
self.sender.send(msg).await
}
pub async fn recv(&mut self) -> anyhow::Result<Message> {
self.recver.recv().await
}
}
pub struct SendStream {
stream: h3_webtransport::stream::SendStream<h3_quinn::SendStream<Bytes>, Bytes>,
}
impl SendStream {
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
let msg = msg.into();
log::info!("sending message: {:?}", msg);
msg.encode(&mut self.stream).await
}
// Helper that lets multiple threads send control messages.
pub fn share(self) -> SendShared {
SendShared {
stream: Arc::new(Mutex::new(self)),
}
}
}
// Helper that allows multiple threads to send control messages.
#[derive(Clone)]
pub struct SendShared {
stream: Arc<Mutex<SendStream>>,
}
impl SendShared {
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
let mut stream = self.stream.lock().await;
stream.send(msg).await
}
}
pub struct RecvStream {
stream: h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, Bytes>,
}
impl RecvStream {
pub async fn recv(&mut self) -> anyhow::Result<Message> {
let msg = Message::decode(&mut self.stream).await?;
log::info!("received message: {:?}", msg);
Ok(msg)
}
}

View File

@ -1,7 +1,6 @@
use crate::coding::{Decode, Encode, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use async_trait::async_trait; use bytes::{Buf, BufMut};
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)] #[derive(Debug)]
pub struct Subscribe { pub struct Subscribe {
@ -16,12 +15,11 @@ pub struct Subscribe {
pub track_name: String, pub track_name: String,
} }
#[async_trait]
impl Decode for Subscribe { impl Decode for Subscribe {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let track_id = VarInt::decode(r).await?; let track_id = VarInt::decode(r)?;
let track_namespace = String::decode(r).await?; let track_namespace = String::decode(r)?;
let track_name = String::decode(r).await?; let track_name = String::decode(r)?;
Ok(Self { Ok(Self {
track_id, track_id,
@ -31,12 +29,11 @@ impl Decode for Subscribe {
} }
} }
#[async_trait]
impl Encode for Subscribe { impl Encode for Subscribe {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.track_id.encode(w).await?; self.track_id.encode(w)?;
self.track_namespace.encode(w).await?; self.track_namespace.encode(w)?;
self.track_name.encode(w).await?; self.track_name.encode(w)?;
Ok(()) Ok(())
} }

View File

@ -1,7 +1,6 @@
use crate::coding::{Decode, Encode, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use async_trait::async_trait; use bytes::{Buf, BufMut};
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)] #[derive(Debug)]
pub struct SubscribeError { pub struct SubscribeError {
@ -17,23 +16,21 @@ pub struct SubscribeError {
pub reason: String, pub reason: String,
} }
#[async_trait]
impl Decode for SubscribeError { impl Decode for SubscribeError {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let track_id = VarInt::decode(r).await?; let track_id = VarInt::decode(r)?;
let code = VarInt::decode(r).await?; let code = VarInt::decode(r)?;
let reason = String::decode(r).await?; let reason = String::decode(r)?;
Ok(Self { track_id, code, reason }) Ok(Self { track_id, code, reason })
} }
} }
#[async_trait]
impl Encode for SubscribeError { impl Encode for SubscribeError {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.track_id.encode(w).await?; self.track_id.encode(w)?;
self.code.encode(w).await?; self.code.encode(w)?;
self.reason.encode(w).await?; self.reason.encode(w)?;
Ok(()) Ok(())
} }

View File

@ -1,9 +1,8 @@
use crate::coding::{Decode, Encode, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use std::time::Duration; use std::time::Duration;
use async_trait::async_trait; use bytes::{Buf, BufMut};
use tokio::io::{AsyncRead, AsyncWrite};
#[derive(Debug)] #[derive(Debug)]
pub struct SubscribeOk { pub struct SubscribeOk {
@ -17,22 +16,20 @@ pub struct SubscribeOk {
pub expires: Option<Duration>, pub expires: Option<Duration>,
} }
#[async_trait]
impl Decode for SubscribeOk { impl Decode for SubscribeOk {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let track_id = VarInt::decode(r).await?; let track_id = VarInt::decode(r)?;
let expires = Duration::decode(r).await?; let expires = Duration::decode(r)?;
let expires = if expires == Duration::ZERO { None } else { Some(expires) }; let expires = if expires == Duration::ZERO { None } else { Some(expires) };
Ok(Self { track_id, expires }) Ok(Self { track_id, expires })
} }
} }
#[async_trait]
impl Encode for SubscribeOk { impl Encode for SubscribeOk {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.track_id.encode(w).await?; self.track_id.encode(w)?;
self.expires.unwrap_or_default().encode(w).await?; self.expires.unwrap_or_default().encode(w)?;
Ok(()) Ok(())
} }

View File

@ -1,7 +1,6 @@
use crate::coding::{Decode, Encode, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use async_trait::async_trait; use bytes::{Buf, BufMut};
use tokio::io::{AsyncRead, AsyncWrite};
use std::ops::Deref; use std::ops::Deref;
@ -24,32 +23,29 @@ impl From<Version> for VarInt {
} }
} }
#[async_trait]
impl Decode for Version { impl Decode for Version {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let v = VarInt::decode(r).await?; let v = VarInt::decode(r)?;
Ok(Self(v)) Ok(Self(v))
} }
} }
#[async_trait]
impl Encode for Version { impl Encode for Version {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
self.0.encode(w).await self.0.encode(w)
} }
} }
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Versions(pub Vec<Version>); pub struct Versions(pub Vec<Version>);
#[async_trait]
impl Decode for Versions { impl Decode for Versions {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> { fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let count = VarInt::decode(r).await?.into_inner(); let count = VarInt::decode(r)?.into_inner();
let mut vs = Vec::new(); let mut vs = Vec::new();
for _ in 0..count { for _ in 0..count {
let v = Version::decode(r).await?; let v = Version::decode(r)?;
vs.push(v); vs.push(v);
} }
@ -57,14 +53,15 @@ impl Decode for Versions {
} }
} }
#[async_trait]
impl Encode for Versions { impl Encode for Versions {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> { fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
let size: VarInt = self.0.len().try_into()?; let size: VarInt = self.0.len().try_into()?;
size.encode(w).await?; size.encode(w)?;
for v in &self.0 { for v in &self.0 {
v.encode(w).await?; v.encode(w)?;
} }
Ok(()) Ok(())
} }
} }

View File

@ -1,7 +1,7 @@
pub mod coding; mod coding;
pub mod control; mod control;
pub mod object; mod object;
pub mod server;
pub mod setup;
pub use coding::VarInt; pub use coding::*;
pub use control::*;
pub use object::*;

View File

@ -0,0 +1,54 @@
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use bytes::{Buf, BufMut};
#[derive(Debug)]
pub struct Object {
// An ID for this track.
// Proposal: https://github.com/moq-wg/moq-transport/issues/209
pub track: VarInt,
// The group sequence number.
pub group: VarInt,
// The object sequence number.
pub sequence: VarInt,
// The priority/send order.
pub send_order: VarInt,
}
impl Decode for Object {
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
let typ = VarInt::decode(r)?;
if typ.into_inner() != 0 {
return Err(DecodeError::InvalidType(typ));
}
// NOTE: size has been omitted
let track = VarInt::decode(r)?;
let group = VarInt::decode(r)?;
let sequence = VarInt::decode(r)?;
let send_order = VarInt::decode(r)?;
Ok(Self {
track,
group,
sequence,
send_order,
})
}
}
impl Encode for Object {
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
VarInt::from_u32(0).encode(w)?;
self.track.encode(w)?;
self.group.encode(w)?;
self.sequence.encode(w)?;
self.send_order.encode(w)?;
Ok(())
}
}

View File

@ -1,56 +0,0 @@
use crate::coding::{Decode, Encode, VarInt};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
// Another name for OBJECT, sent as a header for data streams.
#[derive(Debug)]
pub struct Header {
// An ID for this track.
// Proposal: https://github.com/moq-wg/moq-transport/issues/209
pub track_id: VarInt,
// The group sequence number.
pub group_sequence: VarInt,
// The object sequence number.
pub object_sequence: VarInt,
// The priority/send order.
pub send_order: VarInt,
}
#[async_trait]
impl Decode for Header {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let typ = VarInt::decode(r).await?;
anyhow::ensure!(u64::from(typ) == 0, "OBJECT type must be 0");
// NOTE: size has been omitted
let track_id = VarInt::decode(r).await?;
let group_sequence = VarInt::decode(r).await?;
let object_sequence = VarInt::decode(r).await?;
let send_order = VarInt::decode(r).await?;
Ok(Self {
track_id,
group_sequence,
object_sequence,
send_order,
})
}
}
#[async_trait]
impl Encode for Header {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
VarInt::from_u32(0).encode(w).await?;
self.track_id.encode(w).await?;
self.group_sequence.encode(w).await?;
self.object_sequence.encode(w).await?;
self.send_order.encode(w).await?;
Ok(())
}
}

View File

@ -1,5 +0,0 @@
mod header;
mod transport;
pub use header::*;
pub use transport::*;

View File

@ -1,26 +0,0 @@
use super::Header;
use std::sync::Arc;
// Reduce some typing for implementors.
pub type RecvStream = h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, Bytes>;
// Not clone, so we don't accidentally have two listners.
pub struct Receiver {
transport: Arc<Transport>,
}
impl Receiver {
pub async fn recv(&mut self) -> anyhow::Result<(Header, RecvStream)> {
let (_session_id, mut stream) = self
.transport
.accept_uni()
.await
.context("failed to accept uni stream")?
.context("no uni stream")?;
let header = Header::decode(&mut stream).await?;
Ok((header, stream))
}
}

View File

@ -1,24 +0,0 @@
use super::{Header, SendStream, WebTransportSession};
pub type SendStream = h3_webtransport::stream::SendStream<h3_quinn::SendStream<Bytes>, Bytes>;
#[derive(Clone)]
pub struct Sender {
transport: Arc<WebTransportSession>,
}
impl Sender {
pub async fn send(&self, header: Header) -> anyhow::Result<SendStream> {
let mut stream = self
.transport
.open_uni(self.transport.session_id())
.await
.context("failed to open uni stream")?;
// TODO set send_order based on header
header.encode(&mut stream).await?;
Ok(stream)
}
}

View File

@ -1,35 +0,0 @@
use super::{Header, Receiver, RecvStream, SendStream, Sender};
use anyhow::Context;
use bytes::Bytes;
use crate::coding::{Decode, Encode};
use std::sync::Arc;
// TODO support clients
type WebTransportSession = h3_webtransport::server::WebTransportSession<h3_quinn::Connection, Bytes>;
pub struct Session {
pub send: Sender,
pub recv: Receiver,
}
impl Session {
pub fn new(transport: WebTransportSession) -> Self {
let shared = Arc::new(transport);
Self {
send: Sender::new(shared.clone()),
recv: Sender::new(shared),
}
}
pub async fn recv(&mut self) -> anyhow::Result<(Header, RecvStream)> {
self.recv.recv().await
}
pub async fn send(&self, header: Header) -> anyhow::Result<SendStream> {
self.send.send(header).await
}
}

View File

@ -1,51 +0,0 @@
use super::Header;
use anyhow::Context;
use bytes::Bytes;
use crate::coding::{Decode, Encode};
// TODO support clients
type WebTransportSession = h3_webtransport::server::WebTransportSession<h3_quinn::Connection, Bytes>;
// Reduce some typing for implementors.
pub type SendStream = h3_webtransport::stream::SendStream<h3_quinn::SendStream<Bytes>, Bytes>;
pub type RecvStream = h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, Bytes>;
pub struct Transport {
transport: WebTransportSession,
}
impl Transport {
pub fn new(transport: WebTransportSession) -> Self {
Self { transport }
}
// TODO This should be &mut self to prevent multiple threads trying to read objects
pub async fn recv(&self) -> anyhow::Result<(Header, RecvStream)> {
let (_session_id, mut stream) = self
.transport
.accept_uni()
.await
.context("failed to accept uni stream")?
.context("no uni stream")?;
let header = Header::decode(&mut stream).await?;
Ok((header, stream))
}
// This can be &self since threads can create streams in parallel.
pub async fn send(&self, header: Header) -> anyhow::Result<SendStream> {
let mut stream = self
.transport
.open_uni(self.transport.session_id())
.await
.context("failed to open uni stream")?;
// TODO set send_order based on header
header.encode(&mut stream).await?;
Ok(stream)
}
}

View File

@ -1,42 +0,0 @@
use super::handshake::{Accept, Connecting};
use anyhow::Context;
use tokio::task::JoinSet;
pub struct Endpoint {
// The QUIC server, yielding new connections and sessions.
endpoint: quinn::Endpoint,
// A list of connections that are completing the WebTransport handshake.
handshake: JoinSet<anyhow::Result<Accept>>,
}
impl Endpoint {
pub fn new(endpoint: quinn::Endpoint) -> Self {
let handshake = JoinSet::new();
Self { endpoint, handshake }
}
// Accept the next WebTransport session.
pub async fn accept(&mut self) -> anyhow::Result<Accept> {
loop {
tokio::select!(
// Accept the connection and start the WebTransport handshake.
conn = self.endpoint.accept() => {
let conn = conn.context("failed to accept connection")?;
self.handshake.spawn(async move {
Connecting::new(conn).accept().await
});
},
// Return any mostly finished WebTransport handshakes.
res = self.handshake.join_next(), if !self.handshake.is_empty() => {
let res = res.expect("no tasks").expect("task aborted");
match res {
Ok(session) => return Ok(session),
Err(err) => log::warn!("failed to accept session: {:?}", err),
}
},
)
}
}
}

View File

@ -1,114 +0,0 @@
use super::setup::{RecvSetup, SendSetup};
use crate::{control, object, setup};
use anyhow::Context;
use bytes::Bytes;
pub struct Connecting {
conn: quinn::Connecting,
}
impl Connecting {
pub fn new(conn: quinn::Connecting) -> Self {
Self { conn }
}
pub async fn accept(self) -> anyhow::Result<Accept> {
let conn = self.conn.await.context("failed to accept h3 connection")?;
let mut conn = h3::server::builder()
.enable_webtransport(true)
.enable_connect(true)
.enable_datagram(true)
.max_webtransport_sessions(1)
.send_grease(true)
.build(h3_quinn::Connection::new(conn))
.await
.context("failed to create h3 server")?;
let (req, stream) = conn
.accept()
.await
.context("failed to accept h3 session")?
.context("failed to accept h3 request")?;
let ext = req.extensions();
anyhow::ensure!(req.method() == http::Method::CONNECT, "expected CONNECT request");
anyhow::ensure!(
ext.get::<h3::ext::Protocol>() == Some(&h3::ext::Protocol::WEB_TRANSPORT),
"expected WebTransport CONNECT"
);
// Return the request after validating the bare minimum.
let accept = Accept { conn, req, stream };
Ok(accept)
}
}
// The WebTransport handshake is complete, but we need to decide if we accept it or return 404.
pub struct Accept {
// Inspect to decide whether to accept() or reject() the session.
req: http::Request<()>,
conn: h3::server::Connection<h3_quinn::Connection, Bytes>,
stream: h3::server::RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
}
impl Accept {
// Expose the received URI
pub fn uri(&self) -> &http::Uri {
self.req.uri()
}
// Accept the WebTransport session.
pub async fn accept(self) -> anyhow::Result<Setup> {
let transport = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?;
let stream = transport
.accept_bi()
.await
.context("failed to accept bidi stream")?
.unwrap();
let transport = object::Transport::new(transport);
let stream = match stream {
h3_webtransport::server::AcceptedBi::BidiStream(_session_id, stream) => stream,
h3_webtransport::server::AcceptedBi::Request(..) => anyhow::bail!("additional http requests not supported"),
};
let setup = RecvSetup::new(stream).recv().await?;
Ok(Setup { transport, setup })
}
// Reject the WebTransport session with a HTTP response.
pub async fn reject(mut self, resp: http::Response<()>) -> anyhow::Result<()> {
self.stream.send_response(resp).await?;
Ok(())
}
}
pub struct Setup {
setup: SendSetup,
transport: object::Transport,
}
impl Setup {
// Return the setup message we received.
pub fn setup(&self) -> &setup::Client {
&self.setup.client
}
// Accept the session with our own setup message.
pub async fn accept(self, setup: setup::Server) -> anyhow::Result<(object::Transport, control::Stream)> {
let control = self.setup.send(setup).await?;
Ok((self.transport, control))
}
pub async fn reject(self) -> anyhow::Result<()> {
// TODO Close the QUIC connection with an error code.
Ok(())
}
}

View File

@ -1,6 +0,0 @@
mod endpoint;
mod handshake;
mod setup;
pub use endpoint::*;
pub use handshake::*;

View File

@ -1,42 +0,0 @@
use crate::coding::{Decode, Encode};
use crate::{control, setup};
use anyhow::Context;
use bytes::Bytes;
pub(crate) struct RecvSetup {
stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<Bytes>, Bytes>,
}
impl RecvSetup {
pub fn new(stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<Bytes>, Bytes>) -> Self {
Self { stream }
}
pub async fn recv(mut self) -> anyhow::Result<SendSetup> {
let setup = setup::Client::decode(&mut self.stream)
.await
.context("failed to read client SETUP message")?;
Ok(SendSetup::new(self.stream, setup))
}
}
pub(crate) struct SendSetup {
pub client: setup::Client,
stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<Bytes>, Bytes>,
}
impl SendSetup {
pub fn new(
stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<Bytes>, Bytes>,
client: setup::Client,
) -> Self {
Self { stream, client }
}
pub async fn send(mut self, setup: setup::Server) -> anyhow::Result<control::Stream> {
setup.encode(&mut self.stream).await?;
Ok(control::Stream::new(self.stream))
}
}

View File

@ -1,54 +0,0 @@
use super::{Role, Versions};
use crate::coding::{Decode, Encode, VarInt};
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
use anyhow::Context;
// Sent by the client to setup up the session.
#[derive(Debug)]
pub struct Client {
// NOTE: This is not a message type, but rather the control stream header.
// Proposal: https://github.com/moq-wg/moq-transport/issues/138
// The list of supported versions in preferred order.
pub versions: Versions,
// Indicate if the client is a publisher, a subscriber, or both.
// Proposal: moq-wg/moq-transport#151
pub role: Role,
// The path, non-empty ONLY when not using WebTransport.
pub path: String,
}
#[async_trait]
impl Decode for Client {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let typ = VarInt::decode(r).await.context("failed to read type")?;
anyhow::ensure!(typ.into_inner() == 1, "client SETUP must be type 1");
let versions = Versions::decode(r).await.context("failed to read supported versions")?;
anyhow::ensure!(!versions.is_empty(), "client must support at least one version");
let role = Role::decode(r).await.context("failed to decode role")?;
let path = String::decode(r).await.context("failed to read path")?;
Ok(Self { versions, role, path })
}
}
#[async_trait]
impl Encode for Client {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
VarInt::from_u32(1).encode(w).await?;
anyhow::ensure!(!self.versions.is_empty(), "client must support at least one version");
self.versions.encode(w).await?;
self.role.encode(w).await?;
self.path.encode(w).await?;
Ok(())
}
}

View File

@ -1,15 +0,0 @@
mod client;
mod role;
mod server;
mod version;
pub use client::*;
pub use role::*;
pub use server::*;
pub use version::*;
// NOTE: These are forked from moq-transport-00.
// 1. messages lack a sized length
// 2. parameters are not optional and written in order (role + path)
// 3. role indicates local support only, not remote support
// 4. server setup is id=2 to disambiguate

View File

@ -1,44 +0,0 @@
use super::{Role, Version};
use crate::coding::{Decode, Encode, VarInt};
use anyhow::Context;
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
// Sent by the server in response to a client.
// NOTE: This is not a message type, but rather the control stream header.
// Proposal: https://github.com/moq-wg/moq-transport/issues/138
#[derive(Debug)]
pub struct Server {
// The list of supported versions in preferred order.
pub version: Version,
// param: 0x0: Indicate if the server is a publisher, a subscriber, or both.
// Proposal: moq-wg/moq-transport#151
pub role: Role,
}
#[async_trait]
impl Decode for Server {
async fn decode<R: AsyncRead + Unpin + Send>(r: &mut R) -> anyhow::Result<Self> {
let typ = VarInt::decode(r).await.context("failed to read type")?;
anyhow::ensure!(typ.into_inner() == 2, "server SETUP must be type 2");
let version = Version::decode(r).await.context("failed to read version")?;
let role = Role::decode(r).await.context("failed to read role")?;
Ok(Self { version, role })
}
}
#[async_trait]
impl Encode for Server {
async fn encode<W: AsyncWrite + Unpin + Send>(&self, w: &mut W) -> anyhow::Result<()> {
VarInt::from_u32(2).encode(w).await?; // setup type
self.version.encode(w).await?;
self.role.encode(w).await?;
Ok(())
}
}

View File

@ -16,6 +16,7 @@ categories = [ "multimedia", "network-programming", "web-programming" ]
[dependencies] [dependencies]
moq-transport = { path = "../moq-transport" } moq-transport = { path = "../moq-transport" }
moq-transport-quinn = { path = "../moq-transport-quinn" }
tokio = "1.27" tokio = "1.27"
mp4 = "0.13.0" mp4 = "0.13.0"
@ -27,5 +28,3 @@ quinn = "0.10"
ring = "0.16.20" ring = "0.16.20"
rustls = "0.21.2" rustls = "0.21.2"
rustls-pemfile = "1.0.2" rustls-pemfile = "1.0.2"
async-trait = "0.1"

View File

@ -6,8 +6,8 @@ use tokio::io::AsyncReadExt;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::task::JoinSet; // lock across await boundaries use tokio::task::JoinSet; // lock across await boundaries
use moq_transport::coding::VarInt; use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
use moq_transport::object; use moq_transport_quinn::{RecvObjects, RecvStream};
use anyhow::Context; use anyhow::Context;
@ -18,8 +18,7 @@ use crate::source::Source;
// TODO experiment with making this Clone, so every task can have its own copy. // TODO experiment with making this Clone, so every task can have its own copy.
pub struct Session { pub struct Session {
// Used to receive objects. // Used to receive objects.
// TODO split into send/receive halves. objects: RecvObjects,
transport: Arc<object::Transport>,
// Used to send and receive control messages. // Used to send and receive control messages.
control: control::Component<control::Contribute>, control: control::Component<control::Contribute>,
@ -39,12 +38,12 @@ pub struct Session {
impl Session { impl Session {
pub fn new( pub fn new(
transport: Arc<object::Transport>, objects: RecvObjects,
control: control::Component<control::Contribute>, control: control::Component<control::Contribute>,
broker: broker::Broadcasts, broker: broker::Broadcasts,
) -> Self { ) -> Self {
Self { Self {
transport, objects,
control, control,
broker, broker,
broadcasts: HashMap::new(), broadcasts: HashMap::new(),
@ -62,9 +61,9 @@ impl Session {
log::error!("failed to produce segment: {:?}", err); log::error!("failed to produce segment: {:?}", err);
} }
}, },
object = self.transport.recv() => { object = self.objects.recv() => {
let (header, stream )= object.context("failed to receive object")?; let (object, stream) = object.context("failed to receive object")?;
let res = self.receive_object(header, stream).await; let res = self.receive_object(object, stream).await;
if let Err(err) = res { if let Err(err) = res {
log::error!("failed to receive object: {:?}", err); log::error!("failed to receive object: {:?}", err);
} }
@ -89,19 +88,19 @@ impl Session {
} }
} }
async fn receive_object(&mut self, header: object::Header, stream: object::RecvStream) -> anyhow::Result<()> { async fn receive_object(&mut self, object: Object, stream: RecvStream) -> anyhow::Result<()> {
let id = header.track_id; let track = object.track;
let segment = segment::Info { let segment = segment::Info {
sequence: header.object_sequence, sequence: object.sequence,
send_order: header.send_order, send_order: object.send_order,
expires: Some(time::Instant::now() + time::Duration::from_secs(10)), expires: Some(time::Instant::now() + time::Duration::from_secs(10)),
}; };
let segment = segment::Publisher::new(segment); let segment = segment::Publisher::new(segment);
self.publishers self.publishers
.push_segment(id, segment.subscribe()) .push_segment(track, segment.subscribe())
.context("failed to publish segment")?; .context("failed to publish segment")?;
// TODO implement a timeout // TODO implement a timeout
@ -112,7 +111,7 @@ impl Session {
Ok(()) Ok(())
} }
async fn run_segment(mut segment: segment::Publisher, mut stream: object::RecvStream) -> anyhow::Result<()> { async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> {
let mut buf = [0u8; 32 * 1024]; let mut buf = [0u8; 32 * 1024];
loop { loop {
let size = stream.read(&mut buf).await.context("failed to read from stream")?; let size = stream.read(&mut buf).await.context("failed to read from stream")?;
@ -125,16 +124,16 @@ impl Session {
} }
} }
async fn receive_announce(&mut self, msg: control::Announce) -> anyhow::Result<()> { async fn receive_announce(&mut self, msg: Announce) -> anyhow::Result<()> {
match self.receive_announce_inner(&msg).await { match self.receive_announce_inner(&msg).await {
Ok(()) => { Ok(()) => {
let msg = control::AnnounceOk { let msg = AnnounceOk {
track_namespace: msg.track_namespace, track_namespace: msg.track_namespace,
}; };
self.control.send(msg).await self.control.send(msg).await
} }
Err(e) => { Err(e) => {
let msg = control::AnnounceError { let msg = AnnounceError {
track_namespace: msg.track_namespace, track_namespace: msg.track_namespace,
code: VarInt::from_u32(1), code: VarInt::from_u32(1),
reason: e.to_string(), reason: e.to_string(),
@ -144,7 +143,7 @@ impl Session {
} }
} }
async fn receive_announce_inner(&mut self, msg: &control::Announce) -> anyhow::Result<()> { async fn receive_announce_inner(&mut self, msg: &Announce) -> anyhow::Result<()> {
// Create a broadcast and announce it. // Create a broadcast and announce it.
// We don't actually start producing the broadcast until we receive a subscription. // We don't actually start producing the broadcast until we receive a subscription.
let broadcast = Arc::new(Broadcast::new(&msg.track_namespace, &self.publishers)); let broadcast = Arc::new(Broadcast::new(&msg.track_namespace, &self.publishers));
@ -155,12 +154,12 @@ impl Session {
Ok(()) Ok(())
} }
fn receive_subscribe_ok(&mut self, _msg: control::SubscribeOk) -> anyhow::Result<()> { fn receive_subscribe_ok(&mut self, _msg: SubscribeOk) -> anyhow::Result<()> {
// TODO make sure this is for a track we are subscribed to // TODO make sure this is for a track we are subscribed to
Ok(()) Ok(())
} }
fn receive_subscribe_error(&mut self, msg: control::SubscribeError) -> anyhow::Result<()> { fn receive_subscribe_error(&mut self, msg: SubscribeError) -> anyhow::Result<()> {
let error = track::Error { let error = track::Error {
code: msg.code, code: msg.code,
reason: format!("upstream error: {}", msg.reason), reason: format!("upstream error: {}", msg.reason),
@ -282,10 +281,10 @@ impl Publishers {
} }
// Returns the next subscribe message we need to issue. // Returns the next subscribe message we need to issue.
pub async fn incoming(&mut self) -> anyhow::Result<control::Subscribe> { pub async fn incoming(&mut self) -> anyhow::Result<Subscribe> {
let (namespace, track) = self.receiver.recv().await.context("no more subscriptions")?; let (namespace, track) = self.receiver.recv().await.context("no more subscriptions")?;
let msg = control::Subscribe { let msg = Subscribe {
track_id: VarInt::try_from(self.next)?, track_id: VarInt::try_from(self.next)?,
track_namespace: namespace, track_namespace: namespace,
track_name: track.name, track_name: track.name,

View File

@ -1,11 +1,11 @@
use moq_transport::control;
use tokio::sync::mpsc; use tokio::sync::mpsc;
pub use control::*; use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk};
use moq_transport_quinn::Control;
pub struct Main { pub struct Main {
control: control::Stream, control: Control,
outgoing: mpsc::Receiver<control::Message>, outgoing: mpsc::Receiver<Message>,
contribute: mpsc::Sender<Contribute>, contribute: mpsc::Sender<Contribute>,
distribute: mpsc::Sender<Distribute>, distribute: mpsc::Sender<Distribute>,
@ -21,7 +21,7 @@ impl Main {
} }
} }
pub async fn handle(&mut self, msg: control::Message) -> anyhow::Result<()> { pub async fn handle(&mut self, msg: Message) -> anyhow::Result<()> {
match msg.try_into() { match msg.try_into() {
Ok(msg) => self.contribute.send(msg).await?, Ok(msg) => self.contribute.send(msg).await?,
Err(msg) => match msg.try_into() { Err(msg) => match msg.try_into() {
@ -36,7 +36,7 @@ impl Main {
pub struct Component<T> { pub struct Component<T> {
incoming: mpsc::Receiver<T>, incoming: mpsc::Receiver<T>,
outgoing: mpsc::Sender<control::Message>, outgoing: mpsc::Sender<Message>,
} }
impl<T> Component<T> { impl<T> Component<T> {
@ -51,7 +51,7 @@ impl<T> Component<T> {
} }
// Splits a control stream into two components, based on if it's a message for contribution or distribution. // Splits a control stream into two components, based on if it's a message for contribution or distribution.
pub fn split(control: control::Stream) -> (Main, Component<Contribute>, Component<Distribute>) { pub fn split(control: Control) -> (Main, Component<Contribute>, Component<Distribute>) {
let (outgoing_tx, outgoing_rx) = mpsc::channel(1); let (outgoing_tx, outgoing_rx) = mpsc::channel(1);
let (contribute_tx, contribute_rx) = mpsc::channel(1); let (contribute_tx, contribute_rx) = mpsc::channel(1);
let (distribute_tx, distribute_rx) = mpsc::channel(1); let (distribute_tx, distribute_rx) = mpsc::channel(1);
@ -79,19 +79,19 @@ pub fn split(control: control::Stream) -> (Main, Component<Contribute>, Componen
// Messages we expect to receive from the client for contribution. // Messages we expect to receive from the client for contribution.
#[derive(Debug)] #[derive(Debug)]
pub enum Contribute { pub enum Contribute {
Announce(control::Announce), Announce(Announce),
SubscribeOk(control::SubscribeOk), SubscribeOk(SubscribeOk),
SubscribeError(control::SubscribeError), SubscribeError(SubscribeError),
} }
impl TryFrom<control::Message> for Contribute { impl TryFrom<Message> for Contribute {
type Error = control::Message; type Error = Message;
fn try_from(msg: control::Message) -> Result<Self, Self::Error> { fn try_from(msg: Message) -> Result<Self, Self::Error> {
match msg { match msg {
control::Message::Announce(msg) => Ok(Self::Announce(msg)), Message::Announce(msg) => Ok(Self::Announce(msg)),
control::Message::SubscribeOk(msg) => Ok(Self::SubscribeOk(msg)), Message::SubscribeOk(msg) => Ok(Self::SubscribeOk(msg)),
control::Message::SubscribeError(msg) => Ok(Self::SubscribeError(msg)), Message::SubscribeError(msg) => Ok(Self::SubscribeError(msg)),
_ => Err(msg), _ => Err(msg),
} }
} }
@ -100,19 +100,19 @@ impl TryFrom<control::Message> for Contribute {
// Messages we expect to receive from the client for distribution. // Messages we expect to receive from the client for distribution.
#[derive(Debug)] #[derive(Debug)]
pub enum Distribute { pub enum Distribute {
AnnounceOk(control::AnnounceOk), AnnounceOk(AnnounceOk),
AnnounceError(control::AnnounceError), AnnounceError(AnnounceError),
Subscribe(control::Subscribe), Subscribe(Subscribe),
} }
impl TryFrom<control::Message> for Distribute { impl TryFrom<Message> for Distribute {
type Error = control::Message; type Error = Message;
fn try_from(value: control::Message) -> Result<Self, Self::Error> { fn try_from(value: Message) -> Result<Self, Self::Error> {
match value { match value {
control::Message::AnnounceOk(msg) => Ok(Self::AnnounceOk(msg)), Message::AnnounceOk(msg) => Ok(Self::AnnounceOk(msg)),
control::Message::AnnounceError(msg) => Ok(Self::AnnounceError(msg)), Message::AnnounceError(msg) => Ok(Self::AnnounceError(msg)),
control::Message::Subscribe(msg) => Ok(Self::Subscribe(msg)), Message::Subscribe(msg) => Ok(Self::Subscribe(msg)),
_ => Err(value), _ => Err(value),
} }
} }

View File

@ -3,17 +3,15 @@ use anyhow::Context;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet; // allows locking across await use tokio::task::JoinSet; // allows locking across await
use std::sync::Arc; use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
use moq_transport_quinn::SendObjects;
use moq_transport::coding::VarInt;
use moq_transport::object;
use super::{broker, control}; use super::{broker, control};
use crate::model::{segment, track}; use crate::model::{segment, track};
pub struct Session { pub struct Session {
// Objects are sent to the client using this transport. // Objects are sent to the client
transport: Arc<object::Transport>, objects: SendObjects,
// Used to send and receive control messages. // Used to send and receive control messages.
control: control::Component<control::Distribute>, control: control::Component<control::Distribute>,
@ -22,17 +20,17 @@ pub struct Session {
broker: broker::Broadcasts, broker: broker::Broadcasts,
// A list of tasks that are currently running. // A list of tasks that are currently running.
run_subscribes: JoinSet<control::SubscribeError>, // run subscriptions, sending the returned error if they fail run_subscribes: JoinSet<SubscribeError>, // run subscriptions, sending the returned error if they fail
} }
impl Session { impl Session {
pub fn new( pub fn new(
transport: Arc<object::Transport>, objects: SendObjects,
control: control::Component<control::Distribute>, control: control::Component<control::Distribute>,
broker: broker::Broadcasts, broker: broker::Broadcasts,
) -> Self { ) -> Self {
Self { Self {
transport, objects,
control, control,
broker, broker,
run_subscribes: JoinSet::new(), run_subscribes: JoinSet::new(),
@ -72,22 +70,22 @@ impl Session {
} }
} }
fn receive_announce_ok(&mut self, _msg: control::AnnounceOk) -> anyhow::Result<()> { fn receive_announce_ok(&mut self, _msg: AnnounceOk) -> anyhow::Result<()> {
// TODO make sure we sent this announce // TODO make sure we sent this announce
Ok(()) Ok(())
} }
fn receive_announce_error(&mut self, msg: control::AnnounceError) -> anyhow::Result<()> { fn receive_announce_error(&mut self, msg: AnnounceError) -> anyhow::Result<()> {
// TODO make sure we sent this announce // TODO make sure we sent this announce
// TODO remove this from the list of subscribable broadcasts. // TODO remove this from the list of subscribable broadcasts.
anyhow::bail!("received ANNOUNCE_ERROR({:?}): {}", msg.code, msg.reason) anyhow::bail!("received ANNOUNCE_ERROR({:?}): {}", msg.code, msg.reason)
} }
async fn receive_subscribe(&mut self, msg: control::Subscribe) -> anyhow::Result<()> { async fn receive_subscribe(&mut self, msg: Subscribe) -> anyhow::Result<()> {
match self.receive_subscribe_inner(&msg).await { match self.receive_subscribe_inner(&msg).await {
Ok(()) => { Ok(()) => {
self.control self.control
.send(control::SubscribeOk { .send(SubscribeOk {
track_id: msg.track_id, track_id: msg.track_id,
expires: None, expires: None,
}) })
@ -95,7 +93,7 @@ impl Session {
} }
Err(e) => { Err(e) => {
self.control self.control
.send(control::SubscribeError { .send(SubscribeError {
track_id: msg.track_id, track_id: msg.track_id,
code: VarInt::from_u32(1), code: VarInt::from_u32(1),
reason: e.to_string(), reason: e.to_string(),
@ -105,27 +103,23 @@ impl Session {
} }
} }
async fn receive_subscribe_inner(&mut self, msg: &control::Subscribe) -> anyhow::Result<()> { async fn receive_subscribe_inner(&mut self, msg: &Subscribe) -> anyhow::Result<()> {
let track = self let track = self
.broker .broker
.subscribe(&msg.track_namespace, &msg.track_name) .subscribe(&msg.track_namespace, &msg.track_name)
.context("could not find broadcast")?; .context("could not find broadcast")?;
// TODO can we just clone self? // TODO can we just clone self?
let transport = self.transport.clone(); let objects = self.objects.clone();
let track_id = msg.track_id; let track_id = msg.track_id;
self.run_subscribes self.run_subscribes
.spawn(async move { Self::run_subscribe(transport, track_id, track).await }); .spawn(async move { Self::run_subscribe(objects, track_id, track).await });
Ok(()) Ok(())
} }
async fn run_subscribe( async fn run_subscribe(objects: SendObjects, track_id: VarInt, mut track: track::Subscriber) -> SubscribeError {
transport: Arc<object::Transport>,
track_id: VarInt,
mut track: track::Subscriber,
) -> control::SubscribeError {
let mut tasks = JoinSet::new(); let mut tasks = JoinSet::new();
let mut result = None; let mut result = None;
@ -135,11 +129,11 @@ impl Session {
segment = track.next_segment(), if result.is_none() => { segment = track.next_segment(), if result.is_none() => {
match segment { match segment {
Ok(segment) => { Ok(segment) => {
let transport = transport.clone(); let objects = objects.clone();
tasks.spawn(async move { Self::serve_group(transport, track_id, segment).await }); tasks.spawn(async move { Self::serve_group(objects, track_id, segment).await });
}, },
Err(e) => { Err(e) => {
result = Some(control::SubscribeError { result = Some(SubscribeError {
track_id, track_id,
code: e.code, code: e.code,
reason: e.reason, reason: e.reason,
@ -160,18 +154,18 @@ impl Session {
} }
async fn serve_group( async fn serve_group(
transport: Arc<object::Transport>, mut objects: SendObjects,
track_id: VarInt, track_id: VarInt,
mut segment: segment::Subscriber, mut segment: segment::Subscriber,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let header = object::Header { let object = Object {
track_id, track: track_id,
group_sequence: segment.sequence, group: segment.sequence,
object_sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object
send_order: segment.send_order, send_order: segment.send_order,
}; };
let mut stream = transport.send(header).await?; let mut stream = objects.send(object).await?;
// Write each fragment as they are available. // Write each fragment as they are available.
while let Some(fragment) = segment.fragments.next().await { while let Some(fragment) = segment.fragments.next().await {
@ -187,14 +181,14 @@ impl Session {
match delta { match delta {
broker::Update::Insert(name) => { broker::Update::Insert(name) => {
self.control self.control
.send(control::Announce { .send(Announce {
track_namespace: name.clone(), track_namespace: name.clone(),
}) })
.await .await
} }
broker::Update::Remove(name, error) => { broker::Update::Remove(name, error) => {
self.control self.control
.send(control::AnnounceError { .send(AnnounceError {
track_namespace: name, track_namespace: name,
code: error.code, code: error.code,
reason: error.reason, reason: error.reason,

View File

@ -1,7 +1,5 @@
use super::{broker, Session}; use super::{broker, Session};
use moq_transport::server::Endpoint;
use std::{fs, io, net, path, sync, time}; use std::{fs, io, net, path, sync, time};
use anyhow::Context; use anyhow::Context;
@ -10,7 +8,7 @@ use tokio::task::JoinSet;
pub struct Server { pub struct Server {
// The MoQ transport server. // The MoQ transport server.
server: Endpoint, server: moq_transport_quinn::Server,
// The media sources. // The media sources.
broker: broker::Broadcasts, broker: broker::Broadcasts,
@ -76,7 +74,7 @@ impl Server {
let server = quinn::Endpoint::server(server_config, config.addr)?; let server = quinn::Endpoint::server(server_config, config.addr)?;
let broker = config.broker; let broker = config.broker;
let server = Endpoint::new(server); let server = moq_transport_quinn::Server::new(server);
let tasks = JoinSet::new(); let tasks = JoinSet::new();
Ok(Self { server, broker, tasks }) Ok(Self { server, broker, tasks })

View File

@ -1,11 +1,10 @@
use anyhow::Context; use anyhow::Context;
use std::sync::Arc;
use moq_transport::{server, setup};
use super::{broker, contribute, control, distribute}; use super::{broker, contribute, control, distribute};
use moq_transport::{Role, SetupServer, Version};
use moq_transport_quinn::Connect;
pub struct Session { pub struct Session {
// Split logic into contribution/distribution to reduce the problem space. // Split logic into contribution/distribution to reduce the problem space.
contribute: contribute::Session, contribute: contribute::Session,
@ -16,7 +15,7 @@ pub struct Session {
} }
impl Session { impl Session {
pub async fn accept(session: server::Accept, broker: broker::Broadcasts) -> anyhow::Result<Session> { pub async fn accept(session: Connect, broker: broker::Broadcasts) -> anyhow::Result<Session> {
// Accep the WebTransport session. // Accep the WebTransport session.
// OPTIONAL validate the conn.uri() otherwise call conn.reject() // OPTIONAL validate the conn.uri() otherwise call conn.reject()
let session = session let session = session
@ -28,26 +27,25 @@ impl Session {
.setup() .setup()
.versions .versions
.iter() .iter()
.find(|v| **v == setup::Version::DRAFT_00) .find(|v| **v == Version::DRAFT_00)
.context("failed to find supported version")?; .context("failed to find supported version")?;
match session.setup().role { // TODO use the role to decide if we can publish or subscribe
setup::Role::Subscriber => {}
_ => anyhow::bail!("TODO publishing not yet supported"),
}
let setup = setup::Server { let setup = SetupServer {
version: setup::Version::DRAFT_00, version: Version::DRAFT_00,
role: setup::Role::Publisher, role: Role::Publisher,
}; };
let (transport, control) = session.accept(setup).await?; let session = session.accept(setup).await?;
let transport = Arc::new(transport);
let (control, objects) = session.split();
let (objects_send, objects_recv) = objects.split();
let (control, contribute, distribute) = control::split(control); let (control, contribute, distribute) = control::split(control);
let contribute = contribute::Session::new(transport.clone(), contribute, broker.clone()); let contribute = contribute::Session::new(objects_recv, contribute, broker.clone());
let distribute = distribute::Session::new(transport, distribute, broker); let distribute = distribute::Session::new(objects_send, distribute, broker);
let session = Self { let session = Self {
control, control,