Webtransport generic (#51)
Switched to the webtransport-generic crate so quinn or quiche (with adapter) can be used. This also involved switching out the decoder/encoder since it meant a wrapper was required.
This commit is contained in:
parent
3a65873055
commit
c5d8873e4e
|
@ -919,7 +919,7 @@ dependencies = [
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "moq-demo"
|
name = "moq-quinn"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
@ -928,7 +928,6 @@ dependencies = [
|
||||||
"hex",
|
"hex",
|
||||||
"log",
|
"log",
|
||||||
"moq-transport",
|
"moq-transport",
|
||||||
"moq-transport-quinn",
|
|
||||||
"moq-warp",
|
"moq-warp",
|
||||||
"quinn",
|
"quinn",
|
||||||
"ring",
|
"ring",
|
||||||
|
@ -936,30 +935,19 @@ dependencies = [
|
||||||
"rustls-pemfile",
|
"rustls-pemfile",
|
||||||
"tokio",
|
"tokio",
|
||||||
"warp",
|
"warp",
|
||||||
|
"webtransport-generic",
|
||||||
"webtransport-quinn",
|
"webtransport-quinn",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "moq-transport"
|
name = "moq-transport"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
|
||||||
"bytes",
|
|
||||||
"thiserror",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "moq-transport-quinn"
|
|
||||||
version = "0.1.0"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"bytes",
|
"bytes",
|
||||||
"http",
|
|
||||||
"log",
|
|
||||||
"moq-transport",
|
|
||||||
"quinn",
|
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"webtransport-quinn",
|
"webtransport-generic",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -970,12 +958,8 @@ dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"log",
|
"log",
|
||||||
"moq-transport",
|
"moq-transport",
|
||||||
"moq-transport-quinn",
|
|
||||||
"quinn",
|
|
||||||
"ring",
|
|
||||||
"rustls 0.21.2",
|
|
||||||
"rustls-pemfile",
|
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"webtransport-generic",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1912,13 +1896,12 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "webtransport-generic"
|
name = "webtransport-generic"
|
||||||
version = "0.3.0"
|
version = "0.5.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0ba4583e96bb0ef08142f868bf0d28f90211eced56a473768ee27446864a2310"
|
checksum = "df712317d761312996f654739debeb3838eb02c6fd9146d9efdfd08a46674e45"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"log",
|
"tokio",
|
||||||
"thiserror",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1934,9 +1917,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "webtransport-quinn"
|
name = "webtransport-quinn"
|
||||||
version = "0.4.2"
|
version = "0.5.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "96a4ac9975117d8c63c4d04577d594b3130fe2023b7363ebc613905acf98590a"
|
checksum = "b558ddb09b77347cca94bf2fd726d72c3753b60875eb3d2b7388adc12b9b4a1f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-std",
|
"async-std",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
|
|
@ -1,7 +1,2 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = [
|
members = ["moq-transport", "moq-quinn", "moq-warp"]
|
||||||
"moq-transport",
|
|
||||||
"moq-transport-quinn",
|
|
||||||
"moq-demo",
|
|
||||||
"moq-warp",
|
|
||||||
]
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
[package]
|
[package]
|
||||||
name = "moq-demo"
|
name = "moq-quinn"
|
||||||
description = "Media over QUIC"
|
description = "Media over QUIC"
|
||||||
authors = ["Luke Curley"]
|
authors = ["Luke Curley"]
|
||||||
repository = "https://github.com/kixelated/moq-rs"
|
repository = "https://github.com/kixelated/moq-rs"
|
||||||
|
@ -16,12 +16,12 @@ categories = ["multimedia", "network-programming", "web-programming"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
moq-transport = { path = "../moq-transport" }
|
moq-transport = { path = "../moq-transport" }
|
||||||
moq-transport-quinn = { path = "../moq-transport-quinn" }
|
|
||||||
moq-warp = { path = "../moq-warp" }
|
moq-warp = { path = "../moq-warp" }
|
||||||
|
|
||||||
# QUIC
|
# QUIC
|
||||||
quinn = "0.10"
|
quinn = "0.10"
|
||||||
webtransport-quinn = "0.4.2"
|
webtransport-generic = "0.5"
|
||||||
|
webtransport-quinn = "0.5"
|
||||||
|
|
||||||
# Crypto
|
# Crypto
|
||||||
ring = "0.16.20"
|
ring = "0.16.20"
|
|
@ -1,16 +1,15 @@
|
||||||
use moq_warp::relay::broker;
|
|
||||||
|
|
||||||
use std::{fs, io, net, path, sync, time};
|
use std::{fs, io, net, path, sync, time};
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
|
||||||
|
use moq_warp::relay;
|
||||||
use tokio::task::JoinSet;
|
use tokio::task::JoinSet;
|
||||||
|
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
server: quinn::Endpoint,
|
server: quinn::Endpoint,
|
||||||
|
|
||||||
// The media sources.
|
// The media sources.
|
||||||
broker: broker::Broadcasts,
|
broker: relay::Broker,
|
||||||
|
|
||||||
// The active connections.
|
// The active connections.
|
||||||
conns: JoinSet<anyhow::Result<()>>,
|
conns: JoinSet<anyhow::Result<()>>,
|
||||||
|
@ -62,7 +61,7 @@ impl Server {
|
||||||
|
|
||||||
server_config.transport = sync::Arc::new(transport_config);
|
server_config.transport = sync::Arc::new(transport_config);
|
||||||
let server = quinn::Endpoint::server(server_config, config.addr)?;
|
let server = quinn::Endpoint::server(server_config, config.addr)?;
|
||||||
let broker = broker::Broadcasts::new();
|
let broker = relay::Broker::new();
|
||||||
|
|
||||||
let conns = JoinSet::new();
|
let conns = JoinSet::new();
|
||||||
|
|
||||||
|
@ -88,7 +87,7 @@ impl Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle(conn: quinn::Connecting, broker: broker::Broadcasts) -> anyhow::Result<()> {
|
async fn handle(conn: quinn::Connecting, broker: relay::Broker) -> anyhow::Result<()> {
|
||||||
// Wait for the QUIC connection to be established.
|
// Wait for the QUIC connection to be established.
|
||||||
let conn = conn.await.context("failed to establish QUIC connection")?;
|
let conn = conn.await.context("failed to establish QUIC connection")?;
|
||||||
|
|
||||||
|
@ -106,12 +105,12 @@ impl Server {
|
||||||
.context("failed to respond to WebTransport request")?;
|
.context("failed to respond to WebTransport request")?;
|
||||||
|
|
||||||
// Perform the MoQ handshake.
|
// Perform the MoQ handshake.
|
||||||
let session = moq_transport_quinn::accept(session, moq_transport::Role::Both)
|
let session = moq_transport::Session::accept(session, moq_transport::setup::Role::Both)
|
||||||
.await
|
.await
|
||||||
.context("failed to perform MoQ handshake")?;
|
.context("failed to perform MoQ handshake")?;
|
||||||
|
|
||||||
// Run the relay code.
|
// Run the relay code.
|
||||||
let session = moq_warp::relay::Session::new(session, broker);
|
let session = relay::Session::new(session, broker);
|
||||||
session.run().await
|
session.run().await
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,26 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "moq-transport-quinn"
|
|
||||||
description = "Media over QUIC"
|
|
||||||
authors = ["Luke Curley"]
|
|
||||||
repository = "https://github.com/kixelated/moq-rs"
|
|
||||||
license = "MIT OR Apache-2.0"
|
|
||||||
|
|
||||||
version = "0.1.0"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
keywords = ["quic", "http3", "webtransport", "media", "live"]
|
|
||||||
categories = ["multimedia", "network-programming", "web-programming"]
|
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
moq-transport = { path = "../moq-transport" }
|
|
||||||
|
|
||||||
quinn = "0.10"
|
|
||||||
http = "0.2"
|
|
||||||
webtransport-quinn = "0.4.2"
|
|
||||||
tokio = { version = "1.27", features = ["macros", "io-util"] }
|
|
||||||
bytes = "1"
|
|
||||||
log = "0.4"
|
|
||||||
anyhow = "1.0.70"
|
|
||||||
thiserror = "1.0.21"
|
|
|
@ -1,94 +0,0 @@
|
||||||
use moq_transport::{Decode, DecodeError, Encode, Message};
|
|
||||||
|
|
||||||
use bytes::{Buf, BytesMut};
|
|
||||||
|
|
||||||
use std::io::Cursor;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio::{io::AsyncReadExt, sync::Mutex};
|
|
||||||
|
|
||||||
use webtransport_quinn::{RecvStream, SendStream};
|
|
||||||
|
|
||||||
pub struct SendControl {
|
|
||||||
stream: SendStream,
|
|
||||||
buf: BytesMut, // reuse a buffer to encode messages.
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SendControl {
|
|
||||||
pub fn new(stream: SendStream) -> Self {
|
|
||||||
Self {
|
|
||||||
buf: BytesMut::new(),
|
|
||||||
stream,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
|
|
||||||
let msg = msg.into();
|
|
||||||
log::info!("sending message: {:?}", msg);
|
|
||||||
|
|
||||||
self.buf.clear();
|
|
||||||
msg.encode(&mut self.buf)?;
|
|
||||||
|
|
||||||
// TODO make this work with select!
|
|
||||||
self.stream.write_all(&self.buf).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper that lets multiple threads send control messages.
|
|
||||||
pub fn share(self) -> ControlShared {
|
|
||||||
ControlShared {
|
|
||||||
stream: Arc::new(Mutex::new(self)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper that allows multiple threads to send control messages.
|
|
||||||
// There's no equivalent for receiving since only one thread should be receiving at a time.
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct ControlShared {
|
|
||||||
stream: Arc<Mutex<SendControl>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ControlShared {
|
|
||||||
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
|
|
||||||
let mut stream = self.stream.lock().await;
|
|
||||||
stream.send(msg).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct RecvControl {
|
|
||||||
stream: RecvStream,
|
|
||||||
buf: BytesMut, // data we've read but haven't fully decoded yet
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RecvControl {
|
|
||||||
pub fn new(stream: RecvStream) -> Self {
|
|
||||||
Self {
|
|
||||||
buf: BytesMut::new(),
|
|
||||||
stream,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read the next full message from the stream.
|
|
||||||
pub async fn recv(&mut self) -> anyhow::Result<Message> {
|
|
||||||
loop {
|
|
||||||
// Read the contents of the buffer
|
|
||||||
let mut peek = Cursor::new(&self.buf);
|
|
||||||
|
|
||||||
match Message::decode(&mut peek) {
|
|
||||||
Ok(msg) => {
|
|
||||||
// We've successfully decoded a message, so we can advance the buffer.
|
|
||||||
self.buf.advance(peek.position() as usize);
|
|
||||||
|
|
||||||
log::info!("received message: {:?}", msg);
|
|
||||||
return Ok(msg);
|
|
||||||
}
|
|
||||||
Err(DecodeError::UnexpectedEnd) => {
|
|
||||||
// The decode failed, so we need to append more data.
|
|
||||||
self.stream.read_buf(&mut self.buf).await?;
|
|
||||||
}
|
|
||||||
Err(e) => return Err(e.into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,9 +0,0 @@
|
||||||
mod control;
|
|
||||||
mod object;
|
|
||||||
mod session;
|
|
||||||
mod stream;
|
|
||||||
|
|
||||||
pub use control::*;
|
|
||||||
pub use object::*;
|
|
||||||
pub use session::*;
|
|
||||||
pub use stream::*;
|
|
|
@ -1,150 +0,0 @@
|
||||||
use std::{collections::BinaryHeap, io::Cursor, sync::Arc};
|
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use bytes::{Buf, BytesMut};
|
|
||||||
use moq_transport::{Decode, DecodeError, Encode, Object};
|
|
||||||
|
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
||||||
use tokio::sync::Mutex;
|
|
||||||
use tokio::task::JoinSet;
|
|
||||||
use webtransport_quinn::Session;
|
|
||||||
|
|
||||||
use crate::{RecvStream, SendStream, SendStreamOrder};
|
|
||||||
|
|
||||||
// Allow this to be cloned so we can have multiple senders.
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct SendObjects {
|
|
||||||
// This is a tokio mutex since we need to lock across await boundaries.
|
|
||||||
inner: Arc<Mutex<SendObjectsInner>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SendObjects {
|
|
||||||
pub fn new(session: Session) -> Self {
|
|
||||||
let inner = SendObjectsInner::new(session);
|
|
||||||
Self {
|
|
||||||
inner: Arc::new(Mutex::new(inner)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn open(&mut self, header: Object) -> anyhow::Result<SendStream> {
|
|
||||||
let mut inner = self.inner.lock().await;
|
|
||||||
inner.open(header).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct SendObjectsInner {
|
|
||||||
session: Session,
|
|
||||||
|
|
||||||
// Quinn supports a i32 for priority, but the wire format is a u64.
|
|
||||||
// Our work around is to keep a list of streams in priority order and use the index as the priority.
|
|
||||||
// This involves more work, so TODO either increase the Quinn size or reduce the wire size.
|
|
||||||
ordered: BinaryHeap<SendStreamOrder>,
|
|
||||||
ordered_swap: BinaryHeap<SendStreamOrder>, // reuse memory to avoid allocations
|
|
||||||
|
|
||||||
// A reusable buffer for encoding headers.
|
|
||||||
// TODO figure out how to use BufMut on the stack and remove this.
|
|
||||||
buf: BytesMut,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SendObjectsInner {
|
|
||||||
fn new(session: Session) -> Self {
|
|
||||||
Self {
|
|
||||||
session,
|
|
||||||
ordered: BinaryHeap::new(),
|
|
||||||
ordered_swap: BinaryHeap::new(),
|
|
||||||
buf: BytesMut::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn open(&mut self, header: Object) -> anyhow::Result<SendStream> {
|
|
||||||
let stream = self.session.open_uni().await.context("failed to open uni stream")?;
|
|
||||||
let (mut stream, priority) = SendStream::with_order(stream, header.send_order.into_inner());
|
|
||||||
|
|
||||||
// Add the priority to our existing list.
|
|
||||||
self.ordered.push(priority);
|
|
||||||
|
|
||||||
// Loop through the list and update the priorities of any still active streams.
|
|
||||||
let mut index = 0;
|
|
||||||
while let Some(stream) = self.ordered.pop() {
|
|
||||||
if stream.update(index).is_ok() {
|
|
||||||
// Add the stream to the new list so it'll be in sorted order.
|
|
||||||
self.ordered_swap.push(stream);
|
|
||||||
index += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Swap the lists so we can reuse the memory.
|
|
||||||
std::mem::swap(&mut self.ordered, &mut self.ordered_swap);
|
|
||||||
|
|
||||||
// Encode and write the stream header.
|
|
||||||
// TODO do this in SendStream so we don't hold the lock.
|
|
||||||
// Otherwise,
|
|
||||||
self.buf.clear();
|
|
||||||
header.encode(&mut self.buf).unwrap();
|
|
||||||
stream.write_all(&self.buf).await.context("failed to write header")?;
|
|
||||||
|
|
||||||
// log::info!("created stream: {:?}", header);
|
|
||||||
|
|
||||||
Ok(stream)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Not clone, so we don't accidentally have two listners.
|
|
||||||
pub struct RecvObjects {
|
|
||||||
session: Session,
|
|
||||||
|
|
||||||
// Streams that we've accepted but haven't read the header from yet.
|
|
||||||
streams: JoinSet<anyhow::Result<(Object, RecvStream)>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RecvObjects {
|
|
||||||
pub fn new(session: Session) -> Self {
|
|
||||||
Self {
|
|
||||||
session,
|
|
||||||
streams: JoinSet::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn recv(&mut self) -> anyhow::Result<(Object, RecvStream)> {
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
res = self.session.accept_uni() => {
|
|
||||||
let stream = res.context("failed to accept stream")?;
|
|
||||||
self.streams.spawn(async move { Self::read(stream).await });
|
|
||||||
},
|
|
||||||
res = self.streams.join_next(), if !self.streams.is_empty() => {
|
|
||||||
return res.unwrap().context("failed to run join set")?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read(mut stream: webtransport_quinn::RecvStream) -> anyhow::Result<(Object, RecvStream)> {
|
|
||||||
let mut buf = BytesMut::new();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
// Read more data into the buffer.
|
|
||||||
stream.read_buf(&mut buf).await?;
|
|
||||||
|
|
||||||
// Use a cursor to read the buffer and remember how much we read.
|
|
||||||
let mut read = Cursor::new(&mut buf);
|
|
||||||
|
|
||||||
let header = match Object::decode(&mut read) {
|
|
||||||
Ok(header) => header,
|
|
||||||
Err(DecodeError::UnexpectedEnd) => continue,
|
|
||||||
Err(err) => return Err(err.into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
// We parsed a full header, advance the buffer.
|
|
||||||
let size = read.position() as usize;
|
|
||||||
buf.advance(size);
|
|
||||||
let buf = buf.freeze();
|
|
||||||
|
|
||||||
// log::info!("received stream: {:?}", header);
|
|
||||||
|
|
||||||
let stream = RecvStream::new(buf, stream);
|
|
||||||
|
|
||||||
return Ok((header, stream));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,98 +0,0 @@
|
||||||
use anyhow::Context;
|
|
||||||
|
|
||||||
use moq_transport::{Message, SetupClient, SetupServer};
|
|
||||||
|
|
||||||
use super::{RecvControl, RecvObjects, SendControl, SendObjects};
|
|
||||||
|
|
||||||
/// Called by a server with an established WebTransport session.
|
|
||||||
// TODO close the session with an error code
|
|
||||||
pub async fn accept(session: webtransport_quinn::Session, role: moq_transport::Role) -> anyhow::Result<Session> {
|
|
||||||
let (send, recv) = session.accept_bi().await.context("failed to accept bidi stream")?;
|
|
||||||
|
|
||||||
let mut send_control = SendControl::new(send);
|
|
||||||
let mut recv_control = RecvControl::new(recv);
|
|
||||||
|
|
||||||
let setup_client = match recv_control.recv().await.context("failed to read SETUP")? {
|
|
||||||
Message::SetupClient(setup) => setup,
|
|
||||||
_ => anyhow::bail!("expected CLIENT SETUP"),
|
|
||||||
};
|
|
||||||
|
|
||||||
setup_client
|
|
||||||
.versions
|
|
||||||
.iter()
|
|
||||||
.find(|version| **version == moq_transport::Version::DRAFT_00)
|
|
||||||
.context("no supported versions")?;
|
|
||||||
|
|
||||||
if !setup_client.role.compatible(role) {
|
|
||||||
anyhow::bail!("incompatible roles: {:?} {:?}", setup_client.role, role);
|
|
||||||
}
|
|
||||||
|
|
||||||
let setup_server = SetupServer {
|
|
||||||
role,
|
|
||||||
version: moq_transport::Version::DRAFT_00,
|
|
||||||
};
|
|
||||||
|
|
||||||
send_control
|
|
||||||
.send(moq_transport::Message::SetupServer(setup_server))
|
|
||||||
.await
|
|
||||||
.context("failed to send setup server")?;
|
|
||||||
|
|
||||||
let send_objects = SendObjects::new(session.clone());
|
|
||||||
let recv_objects = RecvObjects::new(session.clone());
|
|
||||||
|
|
||||||
Ok(Session {
|
|
||||||
send_control,
|
|
||||||
recv_control,
|
|
||||||
send_objects,
|
|
||||||
recv_objects,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Called by a client with an established WebTransport session.
|
|
||||||
pub async fn connect(session: webtransport_quinn::Session, role: moq_transport::Role) -> anyhow::Result<Session> {
|
|
||||||
let (send, recv) = session.open_bi().await.context("failed to oen bidi stream")?;
|
|
||||||
|
|
||||||
let mut send_control = SendControl::new(send);
|
|
||||||
let mut recv_control = RecvControl::new(recv);
|
|
||||||
|
|
||||||
let setup_client = SetupClient {
|
|
||||||
role,
|
|
||||||
versions: vec![moq_transport::Version::DRAFT_00].into(),
|
|
||||||
path: "".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
send_control
|
|
||||||
.send(moq_transport::Message::SetupClient(setup_client))
|
|
||||||
.await
|
|
||||||
.context("failed to send SETUP CLIENT")?;
|
|
||||||
|
|
||||||
let setup_server = match recv_control.recv().await.context("failed to read SETUP")? {
|
|
||||||
Message::SetupServer(setup) => setup,
|
|
||||||
_ => anyhow::bail!("expected SERVER SETUP"),
|
|
||||||
};
|
|
||||||
|
|
||||||
if setup_server.version != moq_transport::Version::DRAFT_00 {
|
|
||||||
anyhow::bail!("unsupported version: {:?}", setup_server.version);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !setup_server.role.compatible(role) {
|
|
||||||
anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role);
|
|
||||||
}
|
|
||||||
|
|
||||||
let send_objects = SendObjects::new(session.clone());
|
|
||||||
let recv_objects = RecvObjects::new(session.clone());
|
|
||||||
|
|
||||||
Ok(Session {
|
|
||||||
send_control,
|
|
||||||
recv_control,
|
|
||||||
send_objects,
|
|
||||||
recv_objects,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Session {
|
|
||||||
pub send_control: SendControl,
|
|
||||||
pub recv_control: RecvControl,
|
|
||||||
pub send_objects: SendObjects,
|
|
||||||
pub recv_objects: RecvObjects,
|
|
||||||
}
|
|
|
@ -1,115 +0,0 @@
|
||||||
use std::{
|
|
||||||
io,
|
|
||||||
pin::{pin, Pin},
|
|
||||||
sync::{Arc, Mutex, Weak},
|
|
||||||
task::{self, Poll},
|
|
||||||
};
|
|
||||||
|
|
||||||
use bytes::{BufMut, Bytes};
|
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
|
||||||
|
|
||||||
// Ugh, so we need to wrap SendStream with a mutex because we need to be able to call set_priority on it.
|
|
||||||
// The problem is that set_priority takes a i32, while send_order is a VarInt
|
|
||||||
// So the solution is to maintain a priority queue of active streams and constantly update the priority with their index.
|
|
||||||
// So the library might update the priority of the stream at any point, while the application might similtaniously write to it.
|
|
||||||
// The only upside is that we don't expose set_priority, so the application can't screw with things.
|
|
||||||
pub struct SendStream {
|
|
||||||
stream: Arc<Mutex<webtransport_quinn::SendStream>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SendStream {
|
|
||||||
// Create a new stream with the given order, returning a handle that allows us to update the priority.
|
|
||||||
pub(crate) fn with_order(stream: webtransport_quinn::SendStream, order: u64) -> (Self, SendStreamOrder) {
|
|
||||||
let stream = Arc::new(Mutex::new(stream));
|
|
||||||
let weak = Arc::<Mutex<webtransport_quinn::SendStream>>::downgrade(&stream);
|
|
||||||
|
|
||||||
(SendStream { stream }, SendStreamOrder { stream: weak, order })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct SendStreamOrder {
|
|
||||||
// We use Weak here so we don't prevent the stream from being closed when dereferenced.
|
|
||||||
// update() will return an error if the stream was closed instead.
|
|
||||||
stream: Weak<Mutex<webtransport_quinn::SendStream>>,
|
|
||||||
order: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SendStreamOrder {
|
|
||||||
pub(crate) fn update(&self, index: i32) -> Result<(), webtransport_quinn::StreamClosed> {
|
|
||||||
let stream = self.stream.upgrade().ok_or(webtransport_quinn::StreamClosed)?;
|
|
||||||
let mut stream = stream.lock().unwrap();
|
|
||||||
stream.set_priority(index)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PartialEq for SendStreamOrder {
|
|
||||||
fn eq(&self, other: &Self) -> bool {
|
|
||||||
self.order == other.order
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Eq for SendStreamOrder {}
|
|
||||||
|
|
||||||
impl PartialOrd for SendStreamOrder {
|
|
||||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
|
||||||
// We reverse the order so the lower send order is higher priority.
|
|
||||||
other.order.partial_cmp(&self.order)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Ord for SendStreamOrder {
|
|
||||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
|
||||||
// We reverse the order so the lower send order is higher priority.
|
|
||||||
other.order.cmp(&self.order)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// We implement AsyncWrite so we can grab the mutex on each write attempt, instead of holding it for the entire async function.
|
|
||||||
impl AsyncWrite for SendStream {
|
|
||||||
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> task::Poll<io::Result<usize>> {
|
|
||||||
let mut stream = self.stream.lock().unwrap();
|
|
||||||
Pin::new(&mut *stream).poll_write(cx, buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<io::Result<()>> {
|
|
||||||
let mut stream = self.stream.lock().unwrap();
|
|
||||||
Pin::new(&mut *stream).poll_flush(cx)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<io::Result<()>> {
|
|
||||||
let mut stream = self.stream.lock().unwrap();
|
|
||||||
Pin::new(&mut *stream).poll_shutdown(cx)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unfortunately, we need to wrap RecvStream with a buffer since moq-transport::Coding only supports buffered reads.
|
|
||||||
// We first serve any data in the buffer, then we poll the stream.
|
|
||||||
pub struct RecvStream {
|
|
||||||
buf: Bytes,
|
|
||||||
stream: webtransport_quinn::RecvStream,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RecvStream {
|
|
||||||
pub(crate) fn new(buf: Bytes, stream: webtransport_quinn::RecvStream) -> Self {
|
|
||||||
Self { buf, stream }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn stop(&mut self, code: u32) {
|
|
||||||
self.stream.stop(code).ok();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsyncRead for RecvStream {
|
|
||||||
fn poll_read(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut task::Context<'_>,
|
|
||||||
buf: &mut tokio::io::ReadBuf<'_>,
|
|
||||||
) -> Poll<io::Result<()>> {
|
|
||||||
if !self.buf.is_empty() {
|
|
||||||
buf.put(&mut pin!(self).buf);
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
} else {
|
|
||||||
Pin::new(&mut self.stream).poll_read(cx, buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -15,5 +15,8 @@ categories = ["multimedia", "network-programming", "web-programming"]
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bytes = "1"
|
bytes = "1.4"
|
||||||
thiserror = "1.0.21"
|
thiserror = "1"
|
||||||
|
anyhow = "1"
|
||||||
|
webtransport-generic = "0.5"
|
||||||
|
tokio = { version = "1.27", features = ["macros", "io-util", "rt", "sync"] }
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use super::VarInt;
|
use super::VarInt;
|
||||||
use bytes::{Buf, Bytes};
|
|
||||||
use std::str;
|
use std::str;
|
||||||
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
@ -15,62 +14,6 @@ pub enum DecodeError {
|
||||||
#[error("invalid type: {0:?}")]
|
#[error("invalid type: {0:?}")]
|
||||||
InvalidType(VarInt),
|
InvalidType(VarInt),
|
||||||
|
|
||||||
#[error("unknown error")]
|
#[error("io error: {0}")]
|
||||||
Unknown,
|
IoError(#[from] std::io::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait Decode: Sized {
|
|
||||||
// Decodes a message, returning UnexpectedEnd if there's not enough bytes in the buffer.
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for Bytes {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
let size = VarInt::decode(r)?.into_inner() as usize;
|
|
||||||
if r.remaining() < size {
|
|
||||||
return Err(DecodeError::UnexpectedEnd);
|
|
||||||
}
|
|
||||||
|
|
||||||
let buf = r.copy_to_bytes(size);
|
|
||||||
Ok(buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for Vec<u8> {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
Bytes::decode(r).map(|b| b.to_vec())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for String {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
let data = Vec::decode(r)?;
|
|
||||||
let s = str::from_utf8(&data)?.to_string();
|
|
||||||
Ok(s)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for u8 {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
if r.remaining() < 1 {
|
|
||||||
return Err(DecodeError::UnexpectedEnd);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(r.get_u8())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
impl<const N: usize> Decode for [u8; N] {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
if r.remaining() < N {
|
|
||||||
return Err(DecodeError::UnexpectedEnd);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut buf = [0; N];
|
|
||||||
r.copy_to_slice(&mut buf);
|
|
||||||
|
|
||||||
Ok(buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
|
@ -1,20 +0,0 @@
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
|
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
impl Encode for Duration {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
let ms = self.as_millis();
|
|
||||||
let ms = VarInt::try_from(ms)?;
|
|
||||||
ms.encode(w)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for Duration {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
let ms = VarInt::decode(r)?;
|
|
||||||
Ok(Self::from_millis(ms.into()))
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,95 +1,12 @@
|
||||||
use super::{BoundsExceeded, VarInt};
|
use super::BoundsExceeded;
|
||||||
use bytes::{BufMut, Bytes};
|
|
||||||
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum EncodeError {
|
pub enum EncodeError {
|
||||||
#[error("unexpected end of buffer")]
|
|
||||||
UnexpectedEnd,
|
|
||||||
|
|
||||||
#[error("varint too large")]
|
#[error("varint too large")]
|
||||||
BoundsExceeded(#[from] BoundsExceeded),
|
BoundsExceeded(#[from] BoundsExceeded),
|
||||||
|
|
||||||
#[error("unknown error")]
|
#[error("i/o error: {0}")]
|
||||||
Unknown,
|
IoError(#[from] std::io::Error),
|
||||||
}
|
|
||||||
|
|
||||||
pub trait Encode: Sized {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for Bytes {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
self.as_ref().encode(w)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for Vec<u8> {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
self.as_slice().encode(w)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for &[u8] {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
let size = VarInt::try_from(self.len())?;
|
|
||||||
size.encode(w)?;
|
|
||||||
|
|
||||||
if w.remaining_mut() < self.len() {
|
|
||||||
return Err(EncodeError::UnexpectedEnd);
|
|
||||||
}
|
|
||||||
w.put_slice(self);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for String {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
self.as_bytes().encode(w)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for u8 {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
if w.remaining_mut() < 1 {
|
|
||||||
return Err(EncodeError::UnexpectedEnd);
|
|
||||||
}
|
|
||||||
|
|
||||||
w.put_u8(*self);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for u16 {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
if w.remaining_mut() < 2 {
|
|
||||||
return Err(EncodeError::UnexpectedEnd);
|
|
||||||
}
|
|
||||||
|
|
||||||
w.put_u16(*self);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for u32 {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
if w.remaining_mut() < 4 {
|
|
||||||
return Err(EncodeError::UnexpectedEnd);
|
|
||||||
}
|
|
||||||
|
|
||||||
w.put_u32(*self);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl Encode for u64 {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
if w.remaining_mut() < 8 {
|
|
||||||
return Err(EncodeError::UnexpectedEnd);
|
|
||||||
}
|
|
||||||
|
|
||||||
w.put_u64(*self);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
mod decode;
|
mod decode;
|
||||||
mod duration;
|
|
||||||
mod encode;
|
mod encode;
|
||||||
|
mod string;
|
||||||
mod varint;
|
mod varint;
|
||||||
|
|
||||||
pub use decode::*;
|
pub use decode::*;
|
||||||
pub use duration::*;
|
|
||||||
pub use encode::*;
|
pub use encode::*;
|
||||||
|
pub use string::*;
|
||||||
pub use varint::*;
|
pub use varint::*;
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
use std::cmp::min;
|
||||||
|
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
|
use crate::VarInt;
|
||||||
|
|
||||||
|
use super::{DecodeError, EncodeError};
|
||||||
|
|
||||||
|
pub async fn encode_string<W: SendStream>(s: &str, w: &mut W) -> Result<(), EncodeError> {
|
||||||
|
let size = VarInt::try_from(s.len())?;
|
||||||
|
size.encode(w).await?;
|
||||||
|
w.write_all(s.as_ref()).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn decode_string<R: RecvStream>(r: &mut R) -> Result<String, DecodeError> {
|
||||||
|
let size = VarInt::decode(r).await?.into_inner();
|
||||||
|
let mut str = String::with_capacity(min(1024, size) as usize);
|
||||||
|
r.take(size).read_to_string(&mut str).await?;
|
||||||
|
Ok(str)
|
||||||
|
}
|
|
@ -5,10 +5,11 @@
|
||||||
use std::convert::{TryFrom, TryInto};
|
use std::convert::{TryFrom, TryInto};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
|
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
|
use super::{DecodeError, EncodeError};
|
||||||
|
|
||||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Error)]
|
#[derive(Debug, Copy, Clone, Eq, PartialEq, Error)]
|
||||||
#[error("value too large for varint encoding")]
|
#[error("value too large for varint encoding")]
|
||||||
|
@ -120,15 +121,10 @@ impl fmt::Display for VarInt {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Decode for VarInt {
|
impl VarInt {
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
let mut buf = [0; 8];
|
let mut buf = [0u8; 8];
|
||||||
|
r.read_exact(buf[0..1].as_mut()).await?;
|
||||||
if r.remaining() < 1 {
|
|
||||||
return Err(DecodeError::UnexpectedEnd);
|
|
||||||
}
|
|
||||||
|
|
||||||
buf[0] = r.get_u8();
|
|
||||||
|
|
||||||
let tag = buf[0] >> 6;
|
let tag = buf[0] >> 6;
|
||||||
buf[0] &= 0b0011_1111;
|
buf[0] &= 0b0011_1111;
|
||||||
|
@ -136,27 +132,15 @@ impl Decode for VarInt {
|
||||||
let x = match tag {
|
let x = match tag {
|
||||||
0b00 => u64::from(buf[0]),
|
0b00 => u64::from(buf[0]),
|
||||||
0b01 => {
|
0b01 => {
|
||||||
if r.remaining() < 1 {
|
r.read_exact(buf[1..2].as_mut()).await?;
|
||||||
return Err(DecodeError::UnexpectedEnd);
|
|
||||||
}
|
|
||||||
|
|
||||||
r.copy_to_slice(buf[1..2].as_mut());
|
|
||||||
u64::from(u16::from_be_bytes(buf[..2].try_into().unwrap()))
|
u64::from(u16::from_be_bytes(buf[..2].try_into().unwrap()))
|
||||||
}
|
}
|
||||||
0b10 => {
|
0b10 => {
|
||||||
if r.remaining() < 3 {
|
r.read_exact(buf[1..4].as_mut()).await?;
|
||||||
return Err(DecodeError::UnexpectedEnd);
|
|
||||||
}
|
|
||||||
|
|
||||||
r.copy_to_slice(buf[1..4].as_mut());
|
|
||||||
u64::from(u32::from_be_bytes(buf[..4].try_into().unwrap()))
|
u64::from(u32::from_be_bytes(buf[..4].try_into().unwrap()))
|
||||||
}
|
}
|
||||||
0b11 => {
|
0b11 => {
|
||||||
if r.remaining() < 7 {
|
r.read_exact(buf[1..8].as_mut()).await?;
|
||||||
return Err(DecodeError::UnexpectedEnd);
|
|
||||||
}
|
|
||||||
|
|
||||||
r.copy_to_slice(buf[1..8].as_mut());
|
|
||||||
u64::from_be_bytes(buf)
|
u64::from_be_bytes(buf)
|
||||||
}
|
}
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
|
@ -164,21 +148,21 @@ impl Decode for VarInt {
|
||||||
|
|
||||||
Ok(Self(x))
|
Ok(Self(x))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for VarInt {
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
let x = self.0;
|
let x = self.0;
|
||||||
if x < 2u64.pow(6) {
|
if x < 2u64.pow(6) {
|
||||||
(x as u8).encode(w)
|
w.write_u8(x as u8).await?;
|
||||||
} else if x < 2u64.pow(14) {
|
} else if x < 2u64.pow(14) {
|
||||||
(0b01 << 14 | x as u16).encode(w)
|
w.write_u16(0b01 << 14 | x as u16).await?;
|
||||||
} else if x < 2u64.pow(30) {
|
} else if x < 2u64.pow(30) {
|
||||||
(0b10 << 30 | x as u32).encode(w)
|
w.write_u32(0b10 << 30 | x as u32).await?;
|
||||||
} else if x < 2u64.pow(62) {
|
} else if x < 2u64.pow(62) {
|
||||||
(0b11 << 62 | x).encode(w)
|
w.write_u64(0b11 << 62 | x).await?;
|
||||||
} else {
|
} else {
|
||||||
unreachable!("malformed VarInt");
|
unreachable!("malformed VarInt");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,23 +0,0 @@
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
|
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Announce {
|
|
||||||
// The track namespace
|
|
||||||
pub track_namespace: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for Announce {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
let track_namespace = String::decode(r)?;
|
|
||||||
Ok(Self { track_namespace })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for Announce {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
self.track_namespace.encode(w)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,40 +0,0 @@
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
|
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct AnnounceError {
|
|
||||||
// Echo back the namespace that was announced.
|
|
||||||
// TODO Propose using an ID to save bytes.
|
|
||||||
pub track_namespace: String,
|
|
||||||
|
|
||||||
// An error code.
|
|
||||||
pub code: VarInt,
|
|
||||||
|
|
||||||
// An optional, human-readable reason.
|
|
||||||
pub reason: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for AnnounceError {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
let track_namespace = String::decode(r)?;
|
|
||||||
let code = VarInt::decode(r)?;
|
|
||||||
let reason = String::decode(r)?;
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
track_namespace,
|
|
||||||
code,
|
|
||||||
reason,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for AnnounceError {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
self.track_namespace.encode(w)?;
|
|
||||||
self.code.encode(w)?;
|
|
||||||
self.reason.encode(w)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,23 +0,0 @@
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
|
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct AnnounceOk {
|
|
||||||
// Echo back the namespace that was announced.
|
|
||||||
// TODO Propose using an ID to save bytes.
|
|
||||||
pub track_namespace: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for AnnounceOk {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
let track_namespace = String::decode(r)?;
|
|
||||||
Ok(Self { track_namespace })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for AnnounceOk {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
self.track_namespace.encode(w)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,21 +0,0 @@
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
|
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct GoAway {
|
|
||||||
pub url: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for GoAway {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
let url = String::decode(r)?;
|
|
||||||
Ok(Self { url })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for GoAway {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
self.url.encode(w)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,40 +0,0 @@
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
|
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Subscribe {
|
|
||||||
// An ID we choose so we can map to the track_name.
|
|
||||||
// Proposal: https://github.com/moq-wg/moq-transport/issues/209
|
|
||||||
pub track_id: VarInt,
|
|
||||||
|
|
||||||
// The track namespace.
|
|
||||||
pub track_namespace: String,
|
|
||||||
|
|
||||||
// The track name.
|
|
||||||
pub track_name: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for Subscribe {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
let track_id = VarInt::decode(r)?;
|
|
||||||
let track_namespace = String::decode(r)?;
|
|
||||||
let track_name = String::decode(r)?;
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
track_id,
|
|
||||||
track_namespace,
|
|
||||||
track_name,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for Subscribe {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
self.track_id.encode(w)?;
|
|
||||||
self.track_namespace.encode(w)?;
|
|
||||||
self.track_name.encode(w)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,37 +0,0 @@
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
|
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct SubscribeError {
|
|
||||||
// NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209
|
|
||||||
|
|
||||||
// The ID for this track.
|
|
||||||
pub track_id: VarInt,
|
|
||||||
|
|
||||||
// An error code.
|
|
||||||
pub code: VarInt,
|
|
||||||
|
|
||||||
// An optional, human-readable reason.
|
|
||||||
pub reason: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for SubscribeError {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
let track_id = VarInt::decode(r)?;
|
|
||||||
let code = VarInt::decode(r)?;
|
|
||||||
let reason = String::decode(r)?;
|
|
||||||
|
|
||||||
Ok(Self { track_id, code, reason })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for SubscribeError {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
self.track_id.encode(w)?;
|
|
||||||
self.code.encode(w)?;
|
|
||||||
self.reason.encode(w)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,36 +0,0 @@
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
|
|
||||||
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct SubscribeOk {
|
|
||||||
// NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209
|
|
||||||
|
|
||||||
// The ID for this track.
|
|
||||||
pub track_id: VarInt,
|
|
||||||
|
|
||||||
// The subscription will end after this duration has elapsed.
|
|
||||||
// A value of zero is invalid.
|
|
||||||
pub expires: Option<Duration>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for SubscribeOk {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
let track_id = VarInt::decode(r)?;
|
|
||||||
let expires = Duration::decode(r)?;
|
|
||||||
let expires = if expires == Duration::ZERO { None } else { Some(expires) };
|
|
||||||
|
|
||||||
Ok(Self { track_id, expires })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for SubscribeOk {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
self.track_id.encode(w)?;
|
|
||||||
self.expires.unwrap_or_default().encode(w)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,7 +1,9 @@
|
||||||
mod coding;
|
mod coding;
|
||||||
mod control;
|
pub mod message;
|
||||||
mod object;
|
pub mod object;
|
||||||
|
pub mod session;
|
||||||
|
pub mod setup;
|
||||||
|
|
||||||
pub use coding::*;
|
pub use coding::VarInt;
|
||||||
pub use control::*;
|
pub use message::Message;
|
||||||
pub use object::*;
|
pub use session::Session;
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError};
|
||||||
|
|
||||||
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Announce {
|
||||||
|
// The track namespace
|
||||||
|
pub track_namespace: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Announce {
|
||||||
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
|
let track_namespace = decode_string(r).await?;
|
||||||
|
Ok(Self { track_namespace })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
|
encode_string(&self.track_namespace, w).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError, VarInt};
|
||||||
|
|
||||||
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct AnnounceError {
|
||||||
|
// Echo back the namespace that was announced.
|
||||||
|
// TODO Propose using an ID to save bytes.
|
||||||
|
pub track_namespace: String,
|
||||||
|
|
||||||
|
// An error code.
|
||||||
|
pub code: VarInt,
|
||||||
|
|
||||||
|
// An optional, human-readable reason.
|
||||||
|
pub reason: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AnnounceError {
|
||||||
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
|
let track_namespace = decode_string(r).await?;
|
||||||
|
let code = VarInt::decode(r).await?;
|
||||||
|
let reason = decode_string(r).await?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
track_namespace,
|
||||||
|
code,
|
||||||
|
reason,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
|
encode_string(&self.track_namespace, w).await?;
|
||||||
|
self.code.encode(w).await?;
|
||||||
|
encode_string(&self.reason, w).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError};
|
||||||
|
|
||||||
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct AnnounceOk {
|
||||||
|
// Echo back the namespace that was announced.
|
||||||
|
// TODO Propose using an ID to save bytes.
|
||||||
|
pub track_namespace: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AnnounceOk {
|
||||||
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
|
let track_namespace = decode_string(r).await?;
|
||||||
|
Ok(Self { track_namespace })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
|
encode_string(&self.track_namespace, w).await
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError};
|
||||||
|
|
||||||
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct GoAway {
|
||||||
|
pub url: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GoAway {
|
||||||
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
|
let url = decode_string(r).await?;
|
||||||
|
Ok(Self { url })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
|
encode_string(&self.url, w).await
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,31 +2,29 @@ mod announce;
|
||||||
mod announce_error;
|
mod announce_error;
|
||||||
mod announce_ok;
|
mod announce_ok;
|
||||||
mod go_away;
|
mod go_away;
|
||||||
mod role;
|
mod receiver;
|
||||||
mod setup_client;
|
mod sender;
|
||||||
mod setup_server;
|
|
||||||
mod subscribe;
|
mod subscribe;
|
||||||
mod subscribe_error;
|
mod subscribe_error;
|
||||||
mod subscribe_ok;
|
mod subscribe_ok;
|
||||||
mod version;
|
|
||||||
|
|
||||||
pub use announce::*;
|
pub use announce::*;
|
||||||
pub use announce_error::*;
|
pub use announce_error::*;
|
||||||
pub use announce_ok::*;
|
pub use announce_ok::*;
|
||||||
pub use go_away::*;
|
pub use go_away::*;
|
||||||
pub use role::*;
|
pub use receiver::*;
|
||||||
pub use setup_client::*;
|
pub use sender::*;
|
||||||
pub use setup_server::*;
|
|
||||||
pub use subscribe::*;
|
pub use subscribe::*;
|
||||||
pub use subscribe_error::*;
|
pub use subscribe_error::*;
|
||||||
pub use subscribe_ok::*;
|
pub use subscribe_ok::*;
|
||||||
pub use version::*;
|
|
||||||
|
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
|
use crate::coding::{DecodeError, EncodeError, VarInt};
|
||||||
|
use crate::setup;
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
// NOTE: This is forked from moq-transport-00.
|
// NOTE: This is forked from moq-transport-00.
|
||||||
// 1. SETUP role indicates local support ("I can subscribe"), not remote support ("server must publish")
|
// 1. SETUP role indicates local support ("I can subscribe"), not remote support ("server must publish")
|
||||||
// 2. SETUP_SERVER is id=2 to disambiguate
|
// 2. SETUP_SERVER is id=2 to disambiguate
|
||||||
|
@ -43,28 +41,24 @@ macro_rules! message_types {
|
||||||
$($name($name)),*
|
$($name($name)),*
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Message {
|
||||||
impl Decode for Message {
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
let t = VarInt::decode(r).await?;
|
||||||
let t = VarInt::decode(r)?;
|
|
||||||
|
|
||||||
match t.into_inner() {
|
match t.into_inner() {
|
||||||
$($val => {
|
$($val => {
|
||||||
let msg = $name::decode(r)?;
|
let msg = $name::decode(r).await?;
|
||||||
Ok(Self::$name(msg))
|
Ok(Self::$name(msg))
|
||||||
})*
|
})*
|
||||||
_ => Err(DecodeError::InvalidType(t)),
|
_ => Err(DecodeError::InvalidType(t)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
impl Encode for Message {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
match self {
|
match self {
|
||||||
$(Self::$name(ref m) => {
|
$(Self::$name(ref m) => {
|
||||||
VarInt::from_u32($val).encode(w)?;
|
VarInt::from_u32($val).encode(w).await?;
|
||||||
m.encode(w)
|
m.encode(w).await
|
||||||
},)*
|
},)*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -87,6 +81,10 @@ macro_rules! message_types {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Just so we can use the macro above.
|
||||||
|
type SetupClient = setup::Client;
|
||||||
|
type SetupServer = setup::Server;
|
||||||
|
|
||||||
// Each message is prefixed with the given VarInt type.
|
// Each message is prefixed with the given VarInt type.
|
||||||
message_types! {
|
message_types! {
|
||||||
// NOTE: Object and Setup are in other modules.
|
// NOTE: Object and Setup are in other modules.
|
|
@ -0,0 +1,19 @@
|
||||||
|
use crate::{coding::DecodeError, message::Message};
|
||||||
|
|
||||||
|
use webtransport_generic::RecvStream;
|
||||||
|
|
||||||
|
pub struct Receiver<R: RecvStream> {
|
||||||
|
stream: R,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: RecvStream> Receiver<R> {
|
||||||
|
pub fn new(stream: R) -> Self {
|
||||||
|
Self { stream }
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the next full message from the stream.
|
||||||
|
// NOTE: This is not cancellable; you must poll the future to completion.
|
||||||
|
pub async fn recv(&mut self) -> Result<Message, DecodeError> {
|
||||||
|
Message::decode(&mut self.stream).await
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,21 @@
|
||||||
|
use crate::message::Message;
|
||||||
|
|
||||||
|
use webtransport_generic::SendStream;
|
||||||
|
|
||||||
|
pub struct Sender<S: SendStream> {
|
||||||
|
stream: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: SendStream> Sender<S> {
|
||||||
|
pub fn new(stream: S) -> Self {
|
||||||
|
Self { stream }
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the next full message from the stream.
|
||||||
|
// NOTE: This is not cancellable; you must poll the future to completion.
|
||||||
|
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
|
||||||
|
let msg = msg.into();
|
||||||
|
msg.encode(&mut self.stream).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError, VarInt};
|
||||||
|
|
||||||
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Subscribe {
|
||||||
|
// An ID we choose so we can map to the track_name.
|
||||||
|
// Proposal: https://github.com/moq-wg/moq-transport/issues/209
|
||||||
|
pub track_id: VarInt,
|
||||||
|
|
||||||
|
// The track namespace.
|
||||||
|
pub track_namespace: String,
|
||||||
|
|
||||||
|
// The track name.
|
||||||
|
pub track_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Subscribe {
|
||||||
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
|
let track_id = VarInt::decode(r).await?;
|
||||||
|
let track_namespace = decode_string(r).await?;
|
||||||
|
let track_name = decode_string(r).await?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
track_id,
|
||||||
|
track_namespace,
|
||||||
|
track_name,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Subscribe {
|
||||||
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
|
self.track_id.encode(w).await?;
|
||||||
|
encode_string(&self.track_namespace, w).await?;
|
||||||
|
encode_string(&self.track_name, w).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError, VarInt};
|
||||||
|
|
||||||
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct SubscribeError {
|
||||||
|
// NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209
|
||||||
|
|
||||||
|
// The ID for this track.
|
||||||
|
pub track_id: VarInt,
|
||||||
|
|
||||||
|
// An error code.
|
||||||
|
pub code: VarInt,
|
||||||
|
|
||||||
|
// An optional, human-readable reason.
|
||||||
|
pub reason: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SubscribeError {
|
||||||
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
|
let track_id = VarInt::decode(r).await?;
|
||||||
|
let code = VarInt::decode(r).await?;
|
||||||
|
let reason = decode_string(r).await?;
|
||||||
|
|
||||||
|
Ok(Self { track_id, code, reason })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SubscribeError {
|
||||||
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
|
self.track_id.encode(w).await?;
|
||||||
|
self.code.encode(w).await?;
|
||||||
|
encode_string(&self.reason, w).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
use crate::coding::{DecodeError, EncodeError, VarInt};
|
||||||
|
|
||||||
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct SubscribeOk {
|
||||||
|
// NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209
|
||||||
|
|
||||||
|
// The ID for this track.
|
||||||
|
pub track_id: VarInt,
|
||||||
|
|
||||||
|
// The subscription will end after this duration has elapsed.
|
||||||
|
// A value of zero is invalid.
|
||||||
|
pub expires: Option<VarInt>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SubscribeOk {
|
||||||
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
|
let track_id = VarInt::decode(r).await?;
|
||||||
|
let expires = VarInt::decode(r).await?;
|
||||||
|
let expires = if expires.into_inner() == 0 { None } else { Some(expires) };
|
||||||
|
|
||||||
|
Ok(Self { track_id, expires })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SubscribeOk {
|
||||||
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
|
self.track_id.encode(w).await?;
|
||||||
|
self.expires.unwrap_or_default().encode(w).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,54 +0,0 @@
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
|
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Object {
|
|
||||||
// An ID for this track.
|
|
||||||
// Proposal: https://github.com/moq-wg/moq-transport/issues/209
|
|
||||||
pub track: VarInt,
|
|
||||||
|
|
||||||
// The group sequence number.
|
|
||||||
pub group: VarInt,
|
|
||||||
|
|
||||||
// The object sequence number.
|
|
||||||
pub sequence: VarInt,
|
|
||||||
|
|
||||||
// The priority/send order.
|
|
||||||
pub send_order: VarInt,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Decode for Object {
|
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
|
||||||
let typ = VarInt::decode(r)?;
|
|
||||||
if typ.into_inner() != 0 {
|
|
||||||
return Err(DecodeError::InvalidType(typ));
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOTE: size has been omitted
|
|
||||||
|
|
||||||
let track = VarInt::decode(r)?;
|
|
||||||
let group = VarInt::decode(r)?;
|
|
||||||
let sequence = VarInt::decode(r)?;
|
|
||||||
let send_order = VarInt::decode(r)?;
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
track,
|
|
||||||
group,
|
|
||||||
sequence,
|
|
||||||
send_order,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for Object {
|
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
VarInt::from_u32(0).encode(w)?;
|
|
||||||
self.track.encode(w)?;
|
|
||||||
self.group.encode(w)?;
|
|
||||||
self.sequence.encode(w)?;
|
|
||||||
self.send_order.encode(w)?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,54 @@
|
||||||
|
use crate::coding::{DecodeError, EncodeError, VarInt};
|
||||||
|
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Header {
|
||||||
|
// An ID for this track.
|
||||||
|
// Proposal: https://github.com/moq-wg/moq-transport/issues/209
|
||||||
|
pub track: VarInt,
|
||||||
|
|
||||||
|
// The group sequence number.
|
||||||
|
pub group: VarInt,
|
||||||
|
|
||||||
|
// The object sequence number.
|
||||||
|
pub sequence: VarInt,
|
||||||
|
|
||||||
|
// The priority/send order.
|
||||||
|
// Proposal: int32 instead of a varint.
|
||||||
|
pub send_order: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Header {
|
||||||
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
|
let typ = VarInt::decode(r).await?;
|
||||||
|
if typ.into_inner() != 0 {
|
||||||
|
return Err(DecodeError::InvalidType(typ));
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: size has been omitted
|
||||||
|
|
||||||
|
let track = VarInt::decode(r).await?;
|
||||||
|
let group = VarInt::decode(r).await?;
|
||||||
|
let sequence = VarInt::decode(r).await?;
|
||||||
|
let send_order = r.read_i32().await?; // big-endian
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
track,
|
||||||
|
group,
|
||||||
|
sequence,
|
||||||
|
send_order,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
|
VarInt::from_u32(0).encode(w).await?;
|
||||||
|
self.track.encode(w).await?;
|
||||||
|
self.group.encode(w).await?;
|
||||||
|
self.sequence.encode(w).await?;
|
||||||
|
w.write_i32(self.send_order).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
mod header;
|
||||||
|
mod receiver;
|
||||||
|
mod sender;
|
||||||
|
|
||||||
|
pub use header::*;
|
||||||
|
pub use receiver::*;
|
||||||
|
pub use sender::*;
|
|
@ -0,0 +1,42 @@
|
||||||
|
use crate::object::Header;
|
||||||
|
|
||||||
|
use anyhow::Context;
|
||||||
|
|
||||||
|
use tokio::task::JoinSet;
|
||||||
|
|
||||||
|
use webtransport_generic::Session;
|
||||||
|
|
||||||
|
pub struct Receiver<S: Session> {
|
||||||
|
session: S,
|
||||||
|
|
||||||
|
// Streams that we've accepted but haven't read the header from yet.
|
||||||
|
streams: JoinSet<anyhow::Result<(Header, S::RecvStream)>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Session> Receiver<S> {
|
||||||
|
pub fn new(session: S) -> Self {
|
||||||
|
Self {
|
||||||
|
session,
|
||||||
|
streams: JoinSet::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn recv(&mut self) -> anyhow::Result<(Header, S::RecvStream)> {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
res = self.session.accept_uni() => {
|
||||||
|
let stream = res.context("failed to accept stream")?;
|
||||||
|
self.streams.spawn(async move { Self::read(stream).await });
|
||||||
|
},
|
||||||
|
res = self.streams.join_next(), if !self.streams.is_empty() => {
|
||||||
|
return res.unwrap().context("failed to run join set")?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read(mut stream: S::RecvStream) -> anyhow::Result<(Header, S::RecvStream)> {
|
||||||
|
let header = Header::decode(&mut stream).await?;
|
||||||
|
Ok((header, stream))
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
use anyhow::Context;
|
||||||
|
|
||||||
|
use crate::object::Header;
|
||||||
|
|
||||||
|
use webtransport_generic::{SendStream, Session};
|
||||||
|
|
||||||
|
// Allow this to be cloned so we can have multiple senders.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Sender<S: Session> {
|
||||||
|
// The session.
|
||||||
|
session: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Session> Sender<S> {
|
||||||
|
pub fn new(session: S) -> Self {
|
||||||
|
Self { session }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn open(&mut self, header: Header) -> anyhow::Result<S::SendStream> {
|
||||||
|
let mut stream = self.session.open_uni().await.context("failed to open uni stream")?;
|
||||||
|
|
||||||
|
stream.set_priority(header.send_order);
|
||||||
|
header.encode(&mut stream).await.context("failed to write header")?;
|
||||||
|
|
||||||
|
// log::info!("created stream: {:?}", header);
|
||||||
|
|
||||||
|
Ok(stream)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,99 @@
|
||||||
|
use anyhow::Context;
|
||||||
|
|
||||||
|
use crate::{message, object, setup};
|
||||||
|
use webtransport_generic::Session as WTSession;
|
||||||
|
|
||||||
|
pub struct Session<S: WTSession> {
|
||||||
|
pub send_control: message::Sender<S::SendStream>,
|
||||||
|
pub recv_control: message::Receiver<S::RecvStream>,
|
||||||
|
pub send_objects: object::Sender<S>,
|
||||||
|
pub recv_objects: object::Receiver<S>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: WTSession> Session<S> {
|
||||||
|
/// Called by a server with an established WebTransport session.
|
||||||
|
// TODO close the session with an error code
|
||||||
|
pub async fn accept(session: S, role: setup::Role) -> anyhow::Result<Self> {
|
||||||
|
let (send, recv) = session.accept_bi().await.context("failed to accept bidi stream")?;
|
||||||
|
|
||||||
|
let mut send_control = message::Sender::new(send);
|
||||||
|
let mut recv_control = message::Receiver::new(recv);
|
||||||
|
|
||||||
|
let setup_client = match recv_control.recv().await.context("failed to read SETUP")? {
|
||||||
|
message::Message::SetupClient(setup) => setup,
|
||||||
|
_ => anyhow::bail!("expected CLIENT SETUP"),
|
||||||
|
};
|
||||||
|
|
||||||
|
setup_client
|
||||||
|
.versions
|
||||||
|
.iter()
|
||||||
|
.find(|version| **version == setup::Version::DRAFT_00)
|
||||||
|
.context("no supported versions")?;
|
||||||
|
|
||||||
|
if !setup_client.role.compatible(role) {
|
||||||
|
anyhow::bail!("incompatible roles: {:?} {:?}", setup_client.role, role);
|
||||||
|
}
|
||||||
|
|
||||||
|
let setup_server = setup::Server {
|
||||||
|
role,
|
||||||
|
version: setup::Version::DRAFT_00,
|
||||||
|
};
|
||||||
|
|
||||||
|
send_control
|
||||||
|
.send(message::Message::SetupServer(setup_server))
|
||||||
|
.await
|
||||||
|
.context("failed to send setup server")?;
|
||||||
|
|
||||||
|
let send_objects = object::Sender::new(session.clone());
|
||||||
|
let recv_objects = object::Receiver::new(session.clone());
|
||||||
|
|
||||||
|
Ok(Session {
|
||||||
|
send_control,
|
||||||
|
recv_control,
|
||||||
|
send_objects,
|
||||||
|
recv_objects,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Called by a client with an established WebTransport session.
|
||||||
|
pub async fn connect(session: S, role: setup::Role) -> anyhow::Result<Self> {
|
||||||
|
let (send, recv) = session.open_bi().await.context("failed to oen bidi stream")?;
|
||||||
|
|
||||||
|
let mut send_control = message::Sender::new(send);
|
||||||
|
let mut recv_control = message::Receiver::new(recv);
|
||||||
|
|
||||||
|
let setup_client = setup::Client {
|
||||||
|
role,
|
||||||
|
versions: vec![setup::Version::DRAFT_00].into(),
|
||||||
|
path: "".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
send_control
|
||||||
|
.send(message::Message::SetupClient(setup_client))
|
||||||
|
.await
|
||||||
|
.context("failed to send SETUP CLIENT")?;
|
||||||
|
|
||||||
|
let setup_server = match recv_control.recv().await.context("failed to read SETUP")? {
|
||||||
|
message::Message::SetupServer(setup) => setup,
|
||||||
|
_ => anyhow::bail!("expected SERVER SETUP"),
|
||||||
|
};
|
||||||
|
|
||||||
|
if setup_server.version != setup::Version::DRAFT_00 {
|
||||||
|
anyhow::bail!("unsupported version: {:?}", setup_server.version);
|
||||||
|
}
|
||||||
|
|
||||||
|
if !setup_server.role.compatible(role) {
|
||||||
|
anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role);
|
||||||
|
}
|
||||||
|
|
||||||
|
let send_objects = object::Sender::new(session.clone());
|
||||||
|
let recv_objects = object::Receiver::new(session.clone());
|
||||||
|
|
||||||
|
Ok(Session {
|
||||||
|
send_control,
|
||||||
|
recv_control,
|
||||||
|
send_objects,
|
||||||
|
recv_objects,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,11 +1,11 @@
|
||||||
use super::{Role, Versions};
|
use super::{Role, Versions};
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
|
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError};
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
// Sent by the client to setup up the session.
|
// Sent by the client to setup up the session.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct SetupClient {
|
pub struct Client {
|
||||||
// NOTE: This is not a message type, but rather the control stream header.
|
// NOTE: This is not a message type, but rather the control stream header.
|
||||||
// Proposal: https://github.com/moq-wg/moq-transport/issues/138
|
// Proposal: https://github.com/moq-wg/moq-transport/issues/138
|
||||||
|
|
||||||
|
@ -20,21 +20,19 @@ pub struct SetupClient {
|
||||||
pub path: String,
|
pub path: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Decode for SetupClient {
|
impl Client {
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
let versions = Versions::decode(r)?;
|
let versions = Versions::decode(r).await?;
|
||||||
let role = Role::decode(r)?;
|
let role = Role::decode(r).await?;
|
||||||
let path = String::decode(r)?;
|
let path = decode_string(r).await?;
|
||||||
|
|
||||||
Ok(Self { versions, role, path })
|
Ok(Self { versions, role, path })
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for SetupClient {
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
self.versions.encode(w).await?;
|
||||||
self.versions.encode(w)?;
|
self.role.encode(w).await?;
|
||||||
self.role.encode(w)?;
|
encode_string(&self.path, w).await?;
|
||||||
self.path.encode(w)?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
|
@ -0,0 +1,9 @@
|
||||||
|
mod client;
|
||||||
|
mod role;
|
||||||
|
mod server;
|
||||||
|
mod version;
|
||||||
|
|
||||||
|
pub use client::*;
|
||||||
|
pub use role::*;
|
||||||
|
pub use server::*;
|
||||||
|
pub use version::*;
|
|
@ -1,6 +1,6 @@
|
||||||
use bytes::{Buf, BufMut};
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
|
use crate::coding::{DecodeError, EncodeError, VarInt};
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
pub enum Role {
|
pub enum Role {
|
||||||
|
@ -52,15 +52,13 @@ impl TryFrom<VarInt> for Role {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Decode for Role {
|
impl Role {
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
let v = VarInt::decode(r)?;
|
let v = VarInt::decode(r).await?;
|
||||||
v.try_into()
|
v.try_into()
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for Role {
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
VarInt::from(*self).encode(w).await
|
||||||
VarInt::from(*self).encode(w)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,13 +1,13 @@
|
||||||
use super::{Role, Version};
|
use super::{Role, Version};
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
|
use crate::coding::{DecodeError, EncodeError};
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
// Sent by the server in response to a client.
|
// Sent by the server in response to a client.
|
||||||
// NOTE: This is not a message type, but rather the control stream header.
|
// NOTE: This is not a message type, but rather the control stream header.
|
||||||
// Proposal: https://github.com/moq-wg/moq-transport/issues/138
|
// Proposal: https://github.com/moq-wg/moq-transport/issues/138
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct SetupServer {
|
pub struct Server {
|
||||||
// The list of supported versions in preferred order.
|
// The list of supported versions in preferred order.
|
||||||
pub version: Version,
|
pub version: Version,
|
||||||
|
|
||||||
|
@ -16,19 +16,17 @@ pub struct SetupServer {
|
||||||
pub role: Role,
|
pub role: Role,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Decode for SetupServer {
|
impl Server {
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
let version = Version::decode(r)?;
|
let version = Version::decode(r).await?;
|
||||||
let role = Role::decode(r)?;
|
let role = Role::decode(r).await?;
|
||||||
|
|
||||||
Ok(Self { version, role })
|
Ok(Self { version, role })
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for SetupServer {
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
self.version.encode(w).await?;
|
||||||
self.version.encode(w)?;
|
self.role.encode(w).await?;
|
||||||
self.role.encode(w)?;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
|
@ -1,6 +1,6 @@
|
||||||
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
|
use crate::coding::{DecodeError, EncodeError, VarInt};
|
||||||
|
|
||||||
use bytes::{Buf, BufMut};
|
use webtransport_generic::{RecvStream, SendStream};
|
||||||
|
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
|
||||||
|
@ -23,43 +23,40 @@ impl From<Version> for VarInt {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Decode for Version {
|
impl Version {
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
let v = VarInt::decode(r)?;
|
let v = VarInt::decode(r).await?;
|
||||||
Ok(Self(v))
|
Ok(Self(v))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for Version {
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
self.0.encode(w).await?;
|
||||||
self.0.encode(w)
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||||
pub struct Versions(pub Vec<Version>);
|
pub struct Versions(pub Vec<Version>);
|
||||||
|
|
||||||
impl Decode for Versions {
|
impl Versions {
|
||||||
fn decode<R: Buf>(r: &mut R) -> Result<Self, DecodeError> {
|
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
|
||||||
let count = VarInt::decode(r)?.into_inner();
|
let count = VarInt::decode(r).await?.into_inner();
|
||||||
let mut vs = Vec::new();
|
let mut vs = Vec::new();
|
||||||
|
|
||||||
for _ in 0..count {
|
for _ in 0..count {
|
||||||
let v = Version::decode(r)?;
|
let v = Version::decode(r).await?;
|
||||||
vs.push(v);
|
vs.push(v);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Self(vs))
|
Ok(Self(vs))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl Encode for Versions {
|
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
|
||||||
fn encode<W: BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
|
|
||||||
let size: VarInt = self.0.len().try_into()?;
|
let size: VarInt = self.0.len().try_into()?;
|
||||||
size.encode(w)?;
|
size.encode(w).await?;
|
||||||
|
|
||||||
for v in &self.0 {
|
for v in &self.0 {
|
||||||
v.encode(w)?;
|
v.encode(w).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
|
@ -16,15 +16,9 @@ categories = ["multimedia", "network-programming", "web-programming"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
moq-transport = { path = "../moq-transport" }
|
moq-transport = { path = "../moq-transport" }
|
||||||
moq-transport-quinn = { path = "../moq-transport-quinn" }
|
webtransport-generic = "0.5"
|
||||||
|
|
||||||
tokio = "1.27"
|
tokio = "1.27"
|
||||||
anyhow = "1.0.70"
|
anyhow = "1.0.70"
|
||||||
log = "0.4" # TODO remove
|
log = "0.4" # TODO remove
|
||||||
bytes = "1.4"
|
bytes = "1.4"
|
||||||
|
|
||||||
# QUIC stuff
|
|
||||||
quinn = "0.10"
|
|
||||||
ring = "0.16.20"
|
|
||||||
rustls = "0.21.2"
|
|
||||||
rustls-pemfile = "1.0.2"
|
|
||||||
|
|
|
@ -2,4 +2,4 @@ pub mod broadcast;
|
||||||
pub mod fragment;
|
pub mod fragment;
|
||||||
pub mod segment;
|
pub mod segment;
|
||||||
pub mod track;
|
pub mod track;
|
||||||
pub(crate) mod watch;
|
pub mod watch;
|
||||||
|
|
|
@ -12,7 +12,7 @@ pub struct Info {
|
||||||
pub sequence: VarInt,
|
pub sequence: VarInt,
|
||||||
|
|
||||||
// The priority of the segment within the BROADCAST.
|
// The priority of the segment within the BROADCAST.
|
||||||
pub send_order: VarInt,
|
pub send_order: i32,
|
||||||
|
|
||||||
// The time at which the segment expires for cache purposes.
|
// The time at which the segment expires for cache purposes.
|
||||||
pub expires: Option<time::Instant>,
|
pub expires: Option<time::Instant>,
|
||||||
|
|
|
@ -7,20 +7,20 @@ use std::sync::{Arc, Mutex};
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
pub struct Broadcasts {
|
pub struct Broker {
|
||||||
// Operate on the inner struct so we can share/clone the outer struct.
|
// Operate on the inner struct so we can share/clone the outer struct.
|
||||||
inner: Arc<Mutex<BroadcastsInner>>,
|
inner: Arc<Mutex<BrokerInner>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct BroadcastsInner {
|
struct BrokerInner {
|
||||||
// TODO Automatically reclaim dropped sources.
|
// TODO Automatically reclaim dropped sources.
|
||||||
lookup: HashMap<String, Arc<contribute::Broadcast>>,
|
lookup: HashMap<String, Arc<contribute::Broadcast>>,
|
||||||
updates: watch::Publisher<Update>,
|
updates: watch::Publisher<BrokerUpdate>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub enum Update {
|
pub enum BrokerUpdate {
|
||||||
// Broadcast was announced
|
// Broadcast was announced
|
||||||
Insert(String), // TODO include source?
|
Insert(String), // TODO include source?
|
||||||
|
|
||||||
|
@ -28,13 +28,13 @@ pub enum Update {
|
||||||
Remove(String, broadcast::Error),
|
Remove(String, broadcast::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Broadcasts {
|
impl Broker {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Default::default()
|
Default::default()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return the list of available broadcasts, and a subscriber that will return updates (add/remove).
|
// Return the list of available broadcasts, and a subscriber that will return updates (add/remove).
|
||||||
pub fn available(&self) -> (Vec<String>, watch::Subscriber<Update>) {
|
pub fn available(&self) -> (Vec<String>, watch::Subscriber<BrokerUpdate>) {
|
||||||
// Grab the lock.
|
// Grab the lock.
|
||||||
let this = self.inner.lock().unwrap();
|
let this = self.inner.lock().unwrap();
|
||||||
|
|
||||||
|
@ -55,7 +55,7 @@ impl Broadcasts {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.lookup.insert(namespace.to_string(), source);
|
this.lookup.insert(namespace.to_string(), source);
|
||||||
this.updates.push(Update::Insert(namespace.to_string()));
|
this.updates.push(BrokerUpdate::Insert(namespace.to_string()));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -64,7 +64,7 @@ impl Broadcasts {
|
||||||
let mut this = self.inner.lock().unwrap();
|
let mut this = self.inner.lock().unwrap();
|
||||||
|
|
||||||
this.lookup.remove(namespace).context("namespace was not published")?;
|
this.lookup.remove(namespace).context("namespace was not published")?;
|
||||||
this.updates.push(Update::Remove(namespace.to_string(), error));
|
this.updates.push(BrokerUpdate::Remove(namespace.to_string(), error));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,26 +6,30 @@ use tokio::io::AsyncReadExt;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::task::JoinSet; // lock across await boundaries
|
use tokio::task::JoinSet; // lock across await boundaries
|
||||||
|
|
||||||
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
|
use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk};
|
||||||
use moq_transport_quinn::{RecvObjects, RecvStream};
|
use moq_transport::{object, VarInt};
|
||||||
|
use webtransport_generic::Session as WTSession;
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
|
||||||
use super::{broker, control};
|
|
||||||
use crate::model::{broadcast, segment, track};
|
use crate::model::{broadcast, segment, track};
|
||||||
|
use crate::relay::{
|
||||||
|
message::{Component, Contribute},
|
||||||
|
Broker,
|
||||||
|
};
|
||||||
|
|
||||||
// TODO experiment with making this Clone, so every task can have its own copy.
|
// TODO experiment with making this Clone, so every task can have its own copy.
|
||||||
pub struct Session {
|
pub struct Session<S: WTSession> {
|
||||||
// Used to receive objects.
|
// Used to receive objects.
|
||||||
objects: RecvObjects,
|
objects: object::Receiver<S>,
|
||||||
|
|
||||||
// Used to send and receive control messages.
|
// Used to send and receive control messages.
|
||||||
control: control::Component<control::Contribute>,
|
control: Component<Contribute>,
|
||||||
|
|
||||||
// Globally announced namespaces, which we can add ourselves to.
|
// Globally announced namespaces, which we can add ourselves to.
|
||||||
broker: broker::Broadcasts,
|
broker: Broker,
|
||||||
|
|
||||||
// The names of active broadcasts being produced.
|
// The names of active broadcasts being produced.
|
||||||
broadcasts: HashMap<String, Arc<Broadcast>>,
|
broadcasts: HashMap<String, Arc<Broadcast>>,
|
||||||
|
@ -37,12 +41,8 @@ pub struct Session {
|
||||||
run_segments: JoinSet<anyhow::Result<()>>, // receiving objects
|
run_segments: JoinSet<anyhow::Result<()>>, // receiving objects
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Session {
|
impl<S: WTSession> Session<S> {
|
||||||
pub fn new(
|
pub fn new(objects: object::Receiver<S>, control: Component<Contribute>, broker: Broker) -> Self {
|
||||||
objects: RecvObjects,
|
|
||||||
control: control::Component<control::Contribute>,
|
|
||||||
broker: broker::Broadcasts,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
objects,
|
objects,
|
||||||
control,
|
control,
|
||||||
|
@ -81,23 +81,23 @@ impl Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_message(&mut self, msg: control::Contribute) -> anyhow::Result<()> {
|
async fn receive_message(&mut self, msg: Contribute) -> anyhow::Result<()> {
|
||||||
match msg {
|
match msg {
|
||||||
control::Contribute::Announce(msg) => self.receive_announce(msg).await,
|
Contribute::Announce(msg) => self.receive_announce(msg).await,
|
||||||
control::Contribute::SubscribeOk(msg) => self.receive_subscribe_ok(msg),
|
Contribute::SubscribeOk(msg) => self.receive_subscribe_ok(msg),
|
||||||
control::Contribute::SubscribeError(msg) => self.receive_subscribe_error(msg),
|
Contribute::SubscribeError(msg) => self.receive_subscribe_error(msg),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_object(&mut self, object: Object, stream: RecvStream) -> anyhow::Result<()> {
|
async fn receive_object(&mut self, header: object::Header, stream: S::RecvStream) -> anyhow::Result<()> {
|
||||||
let track = object.track;
|
let track = header.track;
|
||||||
|
|
||||||
// Keep objects in memory for 10s
|
// Keep objects in memory for 10s
|
||||||
let expires = time::Instant::now() + time::Duration::from_secs(10);
|
let expires = time::Instant::now() + time::Duration::from_secs(10);
|
||||||
|
|
||||||
let segment = segment::Info {
|
let segment = segment::Info {
|
||||||
sequence: object.sequence,
|
sequence: header.sequence,
|
||||||
send_order: object.send_order,
|
send_order: header.send_order,
|
||||||
expires: Some(expires),
|
expires: Some(expires),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -115,19 +115,16 @@ impl Session {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> {
|
async fn run_segment(mut segment: segment::Publisher, mut stream: S::RecvStream) -> anyhow::Result<()> {
|
||||||
let mut buf = BytesMut::new();
|
let mut buf = BytesMut::new();
|
||||||
|
|
||||||
loop {
|
while stream.read_buf(&mut buf).await? > 0 {
|
||||||
let size = stream.read_buf(&mut buf).await?;
|
|
||||||
if size == 0 {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Split off the data we read into the buffer, freezing it so multiple threads can read simitaniously.
|
// Split off the data we read into the buffer, freezing it so multiple threads can read simitaniously.
|
||||||
let data = buf.split().freeze();
|
let data = buf.split().freeze();
|
||||||
segment.fragments.push(data);
|
segment.fragments.push(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_announce(&mut self, msg: Announce) -> anyhow::Result<()> {
|
async fn receive_announce(&mut self, msg: Announce) -> anyhow::Result<()> {
|
||||||
|
@ -180,7 +177,7 @@ impl Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Session {
|
impl<S: WTSession> Drop for Session<S> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// Unannounce all broadcasts we have announced.
|
// Unannounce all broadcasts we have announced.
|
||||||
// TODO make this automatic so we can't screw up?
|
// TODO make this automatic so we can't screw up?
|
||||||
|
|
|
@ -1,33 +1,34 @@
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
|
||||||
use tokio::{io::AsyncWriteExt, task::JoinSet}; // allows locking across await
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::task::JoinSet; // allows locking across await
|
||||||
|
|
||||||
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
|
use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk};
|
||||||
use moq_transport_quinn::SendObjects;
|
use moq_transport::{object, VarInt};
|
||||||
|
use webtransport_generic::Session as WTSession;
|
||||||
|
|
||||||
use super::{broker, control};
|
|
||||||
use crate::model::{segment, track};
|
use crate::model::{segment, track};
|
||||||
|
use crate::relay::{
|
||||||
|
message::{Component, Distribute},
|
||||||
|
Broker, BrokerUpdate,
|
||||||
|
};
|
||||||
|
|
||||||
pub struct Session {
|
pub struct Session<S: WTSession> {
|
||||||
// Objects are sent to the client
|
// Objects are sent to the client
|
||||||
objects: SendObjects,
|
objects: object::Sender<S>,
|
||||||
|
|
||||||
// Used to send and receive control messages.
|
// Used to send and receive control messages.
|
||||||
control: control::Component<control::Distribute>,
|
control: Component<Distribute>,
|
||||||
|
|
||||||
// Globally announced namespaces, which can be subscribed to.
|
// Globally announced namespaces, which can be subscribed to.
|
||||||
broker: broker::Broadcasts,
|
broker: Broker,
|
||||||
|
|
||||||
// A list of tasks that are currently running.
|
// A list of tasks that are currently running.
|
||||||
run_subscribes: JoinSet<SubscribeError>, // run subscriptions, sending the returned error if they fail
|
run_subscribes: JoinSet<SubscribeError>, // run subscriptions, sending the returned error if they fail
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Session {
|
impl<S: WTSession> Session<S> {
|
||||||
pub fn new(
|
pub fn new(objects: object::Sender<S>, control: Component<Distribute>, broker: Broker) -> Self {
|
||||||
objects: SendObjects,
|
|
||||||
control: control::Component<control::Distribute>,
|
|
||||||
broker: broker::Broadcasts,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
objects,
|
objects,
|
||||||
control,
|
control,
|
||||||
|
@ -40,7 +41,7 @@ impl Session {
|
||||||
// Announce all available tracks and get a stream of updates.
|
// Announce all available tracks and get a stream of updates.
|
||||||
let (available, mut updates) = self.broker.available();
|
let (available, mut updates) = self.broker.available();
|
||||||
for namespace in available {
|
for namespace in available {
|
||||||
self.on_available(broker::Update::Insert(namespace)).await?;
|
self.on_available(BrokerUpdate::Insert(namespace)).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -61,11 +62,11 @@ impl Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_message(&mut self, msg: control::Distribute) -> anyhow::Result<()> {
|
async fn receive_message(&mut self, msg: Distribute) -> anyhow::Result<()> {
|
||||||
match msg {
|
match msg {
|
||||||
control::Distribute::AnnounceOk(msg) => self.receive_announce_ok(msg),
|
Distribute::AnnounceOk(msg) => self.receive_announce_ok(msg),
|
||||||
control::Distribute::AnnounceError(msg) => self.receive_announce_error(msg),
|
Distribute::AnnounceError(msg) => self.receive_announce_error(msg),
|
||||||
control::Distribute::Subscribe(msg) => self.receive_subscribe(msg).await,
|
Distribute::Subscribe(msg) => self.receive_subscribe(msg).await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,7 +120,11 @@ impl Session {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_subscribe(objects: SendObjects, track_id: VarInt, mut track: track::Subscriber) -> SubscribeError {
|
async fn run_subscribe(
|
||||||
|
objects: object::Sender<S>,
|
||||||
|
track_id: VarInt,
|
||||||
|
mut track: track::Subscriber,
|
||||||
|
) -> SubscribeError {
|
||||||
let mut tasks = JoinSet::new();
|
let mut tasks = JoinSet::new();
|
||||||
let mut result = None;
|
let mut result = None;
|
||||||
|
|
||||||
|
@ -154,11 +159,11 @@ impl Session {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn serve_group(
|
async fn serve_group(
|
||||||
mut objects: SendObjects,
|
mut objects: object::Sender<S>,
|
||||||
track_id: VarInt,
|
track_id: VarInt,
|
||||||
mut segment: segment::Subscriber,
|
mut segment: segment::Subscriber,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let object = Object {
|
let object = object::Header {
|
||||||
track: track_id,
|
track: track_id,
|
||||||
group: segment.sequence,
|
group: segment.sequence,
|
||||||
sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object
|
sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object
|
||||||
|
@ -168,8 +173,8 @@ impl Session {
|
||||||
let mut stream = objects.open(object).await?;
|
let mut stream = objects.open(object).await?;
|
||||||
|
|
||||||
// Write each fragment as they are available.
|
// Write each fragment as they are available.
|
||||||
while let Some(mut fragment) = segment.fragments.next().await {
|
while let Some(fragment) = segment.fragments.next().await {
|
||||||
stream.write_all_buf(&mut fragment).await?;
|
stream.write_all(&fragment).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: stream is automatically closed when dropped
|
// NOTE: stream is automatically closed when dropped
|
||||||
|
@ -177,16 +182,16 @@ impl Session {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn on_available(&mut self, delta: broker::Update) -> anyhow::Result<()> {
|
async fn on_available(&mut self, delta: BrokerUpdate) -> anyhow::Result<()> {
|
||||||
match delta {
|
match delta {
|
||||||
broker::Update::Insert(name) => {
|
BrokerUpdate::Insert(name) => {
|
||||||
self.control
|
self.control
|
||||||
.send(Announce {
|
.send(Announce {
|
||||||
track_namespace: name.clone(),
|
track_namespace: name.clone(),
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
broker::Update::Remove(name, error) => {
|
BrokerUpdate::Remove(name, error) => {
|
||||||
self.control
|
self.control
|
||||||
.send(AnnounceError {
|
.send(AnnounceError {
|
||||||
track_namespace: name,
|
track_namespace: name,
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk};
|
use moq_transport::message::{
|
||||||
use moq_transport_quinn::{RecvControl, SendControl};
|
self, Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk,
|
||||||
|
};
|
||||||
|
use webtransport_generic::Session;
|
||||||
|
|
||||||
pub struct Main {
|
pub struct Main<S: Session> {
|
||||||
send_control: SendControl,
|
send_control: message::Sender<S::SendStream>,
|
||||||
recv_control: RecvControl,
|
recv_control: message::Receiver<S::RecvStream>,
|
||||||
|
|
||||||
outgoing: mpsc::Receiver<Message>,
|
outgoing: mpsc::Receiver<Message>,
|
||||||
|
|
||||||
|
@ -13,7 +15,7 @@ pub struct Main {
|
||||||
distribute: mpsc::Sender<Distribute>,
|
distribute: mpsc::Sender<Distribute>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Main {
|
impl<S: Session> Main<S> {
|
||||||
pub async fn run(mut self) -> anyhow::Result<()> {
|
pub async fn run(mut self) -> anyhow::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
@ -53,10 +55,10 @@ impl<T> Component<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Splits a control stream into two components, based on if it's a message for contribution or distribution.
|
// Splits a control stream into two components, based on if it's a message for contribution or distribution.
|
||||||
pub fn split(
|
pub fn split<S: Session>(
|
||||||
send_control: SendControl,
|
send_control: message::Sender<S::SendStream>,
|
||||||
recv_control: RecvControl,
|
recv_control: message::Receiver<S::RecvStream>,
|
||||||
) -> (Main, Component<Contribute>, Component<Distribute>) {
|
) -> (Main<S>, Component<Contribute>, Component<Distribute>) {
|
||||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(1);
|
let (outgoing_tx, outgoing_rx) = mpsc::channel(1);
|
||||||
let (contribute_tx, contribute_rx) = mpsc::channel(1);
|
let (contribute_tx, contribute_rx) = mpsc::channel(1);
|
||||||
let (distribute_tx, distribute_rx) = mpsc::channel(1);
|
let (distribute_tx, distribute_rx) = mpsc::channel(1);
|
|
@ -1,8 +1,8 @@
|
||||||
pub mod broker;
|
mod broker;
|
||||||
|
|
||||||
mod contribute;
|
mod contribute;
|
||||||
mod control;
|
|
||||||
mod distribute;
|
mod distribute;
|
||||||
|
mod message;
|
||||||
mod session;
|
mod session;
|
||||||
|
|
||||||
|
pub use broker::*;
|
||||||
pub use session::*;
|
pub use session::*;
|
||||||
|
|
|
@ -1,17 +1,19 @@
|
||||||
use super::{broker, contribute, control, distribute};
|
use crate::relay::{contribute, distribute, message, Broker};
|
||||||
|
|
||||||
pub struct Session {
|
use webtransport_generic::Session as WTSession;
|
||||||
|
|
||||||
|
pub struct Session<S: WTSession> {
|
||||||
// Split logic into contribution/distribution to reduce the problem space.
|
// Split logic into contribution/distribution to reduce the problem space.
|
||||||
contribute: contribute::Session,
|
contribute: contribute::Session<S>,
|
||||||
distribute: distribute::Session,
|
distribute: distribute::Session<S>,
|
||||||
|
|
||||||
// Used to receive control messages and forward to contribute/distribute.
|
// Used to receive control messages and forward to contribute/distribute.
|
||||||
control: control::Main,
|
control: message::Main<S>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Session {
|
impl<S: WTSession> Session<S> {
|
||||||
pub fn new(session: moq_transport_quinn::Session, broker: broker::Broadcasts) -> Session {
|
pub fn new(session: moq_transport::Session<S>, broker: Broker) -> Self {
|
||||||
let (control, contribute, distribute) = control::split(session.send_control, session.recv_control);
|
let (control, contribute, distribute) = message::split(session.send_control, session.recv_control);
|
||||||
|
|
||||||
let contribute = contribute::Session::new(session.recv_objects, contribute, broker.clone());
|
let contribute = contribute::Session::new(session.recv_objects, contribute, broker.clone());
|
||||||
let distribute = distribute::Session::new(session.send_objects, distribute, broker);
|
let distribute = distribute::Session::new(session.send_objects, distribute, broker);
|
||||||
|
|
Loading…
Reference in New Issue