add generic moq transport trait

This commit is contained in:
François Michel 2023-07-12 11:38:55 +00:00
parent 6ca7ab5124
commit c360ea1416
14 changed files with 668 additions and 86 deletions

View File

@ -5,4 +5,5 @@ members = [
"moq-demo", "moq-demo",
"moq-warp", "moq-warp",
"transport", "transport",
"moq-transport-trait",
] ]

View File

@ -27,7 +27,7 @@ rustls = "0.21.2"
rustls-pemfile = "1.0.2" rustls-pemfile = "1.0.2"
# Async stuff # Async stuff
tokio = { version = "1.27", features = ["full"] } tokio = { version = "1.29.1", features = ["full"] }
# Web server to serve the fingerprint # Web server to serve the fingerprint
warp = { version = "0.3.3", features = ["tls"] } warp = { version = "0.3.3", features = ["tls"] }
@ -38,3 +38,9 @@ clap = { version = "4.0", features = [ "derive" ] }
log = { version = "0.4", features = ["std"] } log = { version = "0.4", features = ["std"] }
env_logger = "0.9.3" env_logger = "0.9.3"
anyhow = "1.0.70" anyhow = "1.0.70"
moq-transport-trait = { path = "../moq-transport-trait" }
# moq-generic-transport = { path = "../transport" }
moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"}
webtransport_quiche = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" }
async_webtransport = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" }

View File

@ -1,11 +1,15 @@
use std::{fs, io, net, path, sync}; use std::{fs, io, net, path, sync::{self, Arc}};
use anyhow::Context; use anyhow::Context;
use async_webtransport_handler::{AsyncWebTransportServer, regex::Regex};
use clap::Parser; use clap::Parser;
use moq_transport::{Role, SetupServer, Version};
use ring::digest::{digest, SHA256}; use ring::digest::{digest, SHA256};
use tokio::task::JoinSet;
use warp::Filter; use warp::Filter;
use moq_warp::{relay, source}; use moq_warp::{relay::{self, ServerConfig}, source};
use webtransport_quiche::quiche;
/// Search for a pattern in a file and display the lines that contain it. /// Search for a pattern in a file and display the lines that contain it.
#[derive(Parser, Clone)] #[derive(Parser, Clone)]
@ -25,6 +29,61 @@ struct Cli {
/// Use the media file at this path /// Use the media file at this path
#[arg(short, long, default_value = "media/fragmented.mp4")] #[arg(short, long, default_value = "media/fragmented.mp4")]
media: path::PathBuf, media: path::PathBuf,
/// use quiche instead of quinn
#[arg(short, long)]
quiche: bool,
}
// Create a new server
pub fn new_quiche(config: ServerConfig) -> anyhow::Result<(AsyncWebTransportServer, tokio::net::UdpSocket, Vec<Regex>)> {
let mut quic_config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
println!("loading cert {:?}, key {:?}", config.cert, config.key);
quic_config.load_cert_chain_from_pem_file(config.cert.to_str().unwrap()).unwrap();
quic_config.load_priv_key_from_pem_file(config.key.to_str().unwrap()).unwrap();
quic_config
.set_application_protos(quiche::h3::APPLICATION_PROTOCOL)
.unwrap();
quic_config.set_cc_algorithm_name("cubic").unwrap();
quic_config.set_max_idle_timeout(10000);
quic_config.set_max_recv_udp_payload_size(1200);
quic_config.set_max_send_udp_payload_size(1200);
quic_config.set_initial_max_data(1_000_000_000);
quic_config.set_initial_max_stream_data_bidi_local(100_000_000);
quic_config.set_initial_max_stream_data_bidi_remote(100_000_000);
quic_config.set_initial_max_stream_data_uni(100_000_000);
quic_config.set_initial_max_streams_bidi(1_000_000);
quic_config.set_initial_max_streams_uni(1_000_000);
quic_config.set_disable_active_migration(true);
quic_config.enable_early_data();
quic_config.grease(false);
// quic_config.set_fec_scheduler_algorithm(quiche::FECSchedulerAlgorithm::BurstsOnly);
// quic_config.send_fec(args.get_bool("--send-fec"));
// quic_config.receive_fec(args.get_bool("--receive-fec"));
// quic_config.set_real_time(args.get_bool("--real-time-cc"));
let h3_config = quiche::h3::Config::new().unwrap();
let keylog = if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(keylog_path)
.unwrap();
Some(file)
} else {
None
};
let (server, socket) = AsyncWebTransportServer::with_configs(config.addr,
quic_config, h3_config, keylog)?;
let uri_root = "/";
let regexes = [Regex::new(format!("{}", uri_root).as_str()).unwrap()];
Ok((server, socket, regexes.to_vec()))
} }
#[tokio::main] #[tokio::main]
@ -35,6 +94,10 @@ async fn main() -> anyhow::Result<()> {
// Create a web server to serve the fingerprint // Create a web server to serve the fingerprint
let serve = serve_http(args.clone()); let serve = serve_http(args.clone());
let mut tasks = JoinSet::new();
tasks.spawn(async move {
serve.await.unwrap();
});
// Create a fake media source from disk. // Create a fake media source from disk.
let media = source::File::new(args.media).context("failed to open file source")?; let media = source::File::new(args.media).context("failed to open file source")?;
@ -44,22 +107,105 @@ async fn main() -> anyhow::Result<()> {
.announce("quic.video/demo", media.source()) .announce("quic.video/demo", media.source())
.context("failed to announce file source")?; .context("failed to announce file source")?;
let mut tasks = JoinSet::new();
tasks.spawn(async move {
media.run().await.unwrap();
});
// Create a server to actually serve the media // Create a server to actually serve the media
let config = relay::ServerConfig { let config = relay::ServerConfig {
addr: args.addr, addr: args.addr,
cert: args.cert, cert: args.cert,
key: args.key, key: args.key,
broker, broker: broker.clone(),
}; };
let server = relay::Server::new(config).context("failed to create server")?; if args.quiche {
let (server, socket, regexes) = new_quiche(config).unwrap();
let server = Arc::new(std::sync::Mutex::new(server));
let socket = Arc::new(socket);
let mut buf = vec![0; 10000];
let mut tasks = JoinSet::new();
'mainloop: loop {
println!("listen...");
let cid = {
// let mut server = endpoint.quiche_server.lock().await;
let ret = async_webtransport_handler::AsyncWebTransportServer::listen_ref(server.clone(), socket.clone(), &mut buf).await?;
println!("listen returned {:?}", ret);
match ret {
Some(cid) => cid,
None => continue 'mainloop,
}
};
loop {
println!("poll");
match server.lock().unwrap().poll(&cid, &regexes[..]) {
Ok(async_webtransport_handler::Event::NewSession(path, session_id, _regex_index)) => {
let server = server.clone();
let cid = cid.clone();
let broker = broker.clone();
tasks.spawn(async move {
// let control_stream = async_webtransport_handler::ServerBidiStream::new(server.clone(), cid.clone(), session_id, session_id);
let mut webtransport_session = async_webtransport_handler::WebTransportSession::new(server.clone(), cid.clone(), session_id);
let control_stream = moq_generic_transport::accept_bidi(&mut webtransport_session).await.unwrap().unwrap();
// let control_stream = async_webtransport_handler::ServerBidiStream::new(server.clone(), cid.clone(), session_id, control_stream_id);
// let session = moq_transport_trait::Session::new(Box::new(control_stream), Box::new(webtransport_session));
let received_client_setup = moq_transport_trait::Session::accept(Box::new(control_stream), Box::new(webtransport_session)).await.unwrap();
// TODO: maybe reject setup
let role = match received_client_setup.setup().role {
Role::Publisher => Role::Subscriber,
Role::Subscriber => Role::Publisher,
Role::Both => Role::Both,
};
let setup_server = SetupServer {
version: Version::DRAFT_00,
role,
};
let session = received_client_setup.accept(setup_server).await.unwrap();
let session = relay::Session::from_session(session, broker.clone()).await.unwrap();
session.run().await
});
},
Ok(async_webtransport_handler::Event::StreamData(session_id, stream_id)) => {
log::trace!("new data!");
},
Ok(async_webtransport_handler::Event::Done) => {
println!("H3 Done");
break;
},
Ok(async_webtransport_handler::Event::GoAway) => {
println!("GOAWAY");
break;
},
Err(_) => todo!(),
}
}
}
// let session = moq_transport_trait::Session::new(control_stream, connection)
// let server = relay::Server::new(config).context("failed to create server")?;
// // Run all of the above
// tokio::select! {
// res = server.run() => res.context("failed to run server"),
// res = media.run() => res.context("failed to run media source"),
// res = serve => res.context("failed to run HTTP server"),
// }
} else {
// let server = relay::Server::new(config).context("failed to create server")?;
// Run all of the above // Run all of the above
tokio::select! { // tokio::select! {
res = server.run() => res.context("failed to run server"), // res = server.run() => res.context("failed to run server"),
res = media.run() => res.context("failed to run media source"), // res = media.run() => res.context("failed to run media source"),
res = serve => res.context("failed to run HTTP server"), // res = serve => res.context("failed to run HTTP server"),
// }
} }
Ok(())
} }
// Run a HTTP server using Warp // Run a HTTP server using Warp

View File

@ -0,0 +1,28 @@
[package]
name = "moq-transport-trait"
description = "Media over QUIC"
authors = [ "Luke Curley" ]
repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0"
version = "0.1.0"
edition = "2021"
keywords = [ "quic", "http3", "webtransport", "media", "live" ]
categories = [ "multimedia", "network-programming", "web-programming" ]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
moq-transport = { path = "../moq-transport" }
# WebTransport support: TODO pin a version when released
moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"}
http = "0.2"
tokio = { version = "1.27", features = ["macros", "sync"] }
bytes = "1"
log = "0.4"
anyhow = "1.0.70"
thiserror = "1.0.21"

View File

@ -0,0 +1,124 @@
use moq_generic_transport::{SendStream, RecvStream, BidiStream, SendStreamUnframed};
use moq_transport::{Decode, DecodeError, Encode, Message};
use bytes::{Buf, BytesMut};
use std::io::Cursor;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct Control<S: SendStream + SendStreamUnframed, B: BidiStream<SendStream = S>> {
sender: ControlSend<B::SendStream>,
recver: ControlRecv<B::RecvStream>,
}
impl<S: SendStream + SendStreamUnframed, B: BidiStream<SendStream = S>> Control<S, B> {
pub(crate) fn new(stream: Box<B>) -> Self {
let (sender, recver) = stream.split();
let sender = ControlSend::new(Box::new(sender));
let recver = ControlRecv::new(Box::new(recver));
Self { sender, recver }
}
pub fn split(self) -> (ControlSend<B::SendStream>, ControlRecv<B::RecvStream>) {
(self.sender, self.recver)
}
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
self.sender.send(msg).await
}
pub async fn recv(&mut self) -> anyhow::Result<Message> {
self.recver.recv().await
}
}
pub struct ControlSend<S> {
stream: Box<S>,
buf: BytesMut, // reuse a buffer to encode messages.
}
impl<S: SendStream + SendStreamUnframed> ControlSend<S> {
pub fn new(inner: Box<S>) -> Self {
Self {
buf: BytesMut::new(),
stream: inner,
}
}
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
let msg = msg.into();
log::info!("sending message: {:?}", msg);
self.buf.clear();
msg.encode(&mut self.buf)?;
// TODO make this work with select!
moq_generic_transport::send(self.stream.as_mut(), &mut self.buf).await?;
Ok(())
}
// Helper that lets multiple threads send control messages.
pub fn share(self) -> ControlShared<S> {
ControlShared {
stream: Arc::new(Mutex::new(self)),
_marker: PhantomData,
}
}
}
// Helper that allows multiple threads to send control messages.
// There's no equivalent for receiving since only one thread should be receiving at a time.
#[derive(Clone)]
pub struct ControlShared<S: SendStream + SendStreamUnframed> {
stream: Arc<Mutex<ControlSend<S>>>,
_marker: PhantomData<S>
}
impl<S: SendStream + SendStreamUnframed> ControlShared<S> {
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
let mut stream = self.stream.lock().await;
stream.send(msg).await
}
}
pub struct ControlRecv<R: RecvStream> {
stream: Box<R>,
buf: BytesMut, // data we've read but haven't fully decoded yet
}
impl<R: RecvStream> ControlRecv<R> {
pub fn new(inner: Box<R>) -> Self {
Self {
buf: BytesMut::new(),
stream: inner,
}
}
// Read the next full message from the stream.
pub async fn recv(&mut self) -> anyhow::Result<Message> {
loop {
// Read the contents of the buffer
let mut peek = Cursor::new(&self.buf);
match Message::decode(&mut peek) {
Ok(msg) => {
// We've successfully decoded a message, so we can advance the buffer.
self.buf.advance(peek.position() as usize);
log::info!("received message: {:?}", msg);
return Ok(msg);
}
Err(DecodeError::UnexpectedEnd) => {
// The decode failed, so we need to append more data.
moq_generic_transport::recv(self.stream.as_mut(), &mut self.buf).await?;
}
Err(e) => return Err(e.into()),
}
}
}
}

View File

@ -0,0 +1,11 @@
mod control;
mod object;
mod server;
use std::sync::Arc;
use std::sync::Mutex;
pub type SharedConnection<C> = Arc<Mutex<Box<C>>>;
pub use control::*;
pub use object::*;
pub use server::*;

View File

@ -0,0 +1,135 @@
use anyhow::Context;
use bytes::{Buf, BytesMut};
use moq_generic_transport::{Connection, SendStream, SendStreamUnframed, RecvStream};
use moq_transport::{Decode, DecodeError, Encode, Object};
use std::{io::Cursor, marker::PhantomData};
use crate::SharedConnection;
// TODO support clients
// We could replace this generic soup by just <C: Connection> if we forced Connection's SendStream
// to provide SendStreamUnframes's send() method. Without that, we have to make Connection's
// SendStream type more specific and force it to implement SendStreamUnframes as well.
pub struct Objects<C: Connection> {
send: SendObjects<C>,
recv: RecvObjects<C>,
}
impl<S: SendStream + SendStreamUnframed, R: RecvStream + 'static, C: Connection<SendStream = S, RecvStream = R> + Send> Objects<C> {
pub fn new(session: SharedConnection<C>) -> Self {
let send = SendObjects::new(session.clone());
let recv = RecvObjects::new(session);
Self { send, recv }
}
pub fn split(self) -> (SendObjects<C>, RecvObjects<C>) {
(self.send, self.recv)
}
pub async fn recv(&mut self) -> anyhow::Result<(Object, R)> {
self.recv.recv().await
}
pub async fn send(&mut self, header: Object) -> anyhow::Result<C::SendStream> {
self.send.send(header).await
}
}
pub struct SendObjects<C: Connection> {
session: SharedConnection<C>,
// A reusable buffer for encoding messages.
buf: BytesMut,
_marker: PhantomData<C>,
}
impl<S: SendStream + SendStreamUnframed, C: Connection<SendStream = S>> SendObjects<C> {
pub fn new(session: SharedConnection<C>) -> Self {
Self {
session,
buf: BytesMut::new(),
_marker: PhantomData,
}
}
pub async fn send(&mut self, header: Object) -> anyhow::Result<C::SendStream> {
self.buf.clear();
header.encode(&mut self.buf).unwrap();
// TODO support select! without making a new stream.
let mut stream = moq_generic_transport::open_send_shared(self.session.clone())
.await
.context("failed to open uni stream")?;
moq_generic_transport::send(&mut stream, &mut self.buf).await?;
Ok(stream)
}
}
impl<S: SendStream + SendStreamUnframed, C: Connection<SendStream = S>> Clone for SendObjects<C> {
fn clone(&self) -> Self {
Self {
session: self.session.clone(),
buf: BytesMut::new(),
_marker: PhantomData,
}
}
}
// Not clone, so we don't accidentally have two listners.
pub struct RecvObjects<C: Connection> {
session: SharedConnection<C>,
// A uni stream that's been accepted but not fully read from yet.
stream: Option<Box<C::RecvStream>>,
// Data that we've read but haven't formed a full message yet.
buf: BytesMut,
}
impl<R: RecvStream + 'static, C: Connection<RecvStream = R>> RecvObjects<C> {
pub fn new(session: SharedConnection<C>) -> Self {
Self {
session,
stream: None,
buf: BytesMut::new(),
}
}
pub async fn recv(&mut self) -> anyhow::Result<(Object, R)> {
// Make sure any state is saved across await boundaries so this works with select!
let stream = match self.stream.as_mut() {
Some(stream) => stream,
None => {
let stream = moq_generic_transport::accept_recv_shared(self.session.clone())
.await
.context("failed to accept uni stream")?
.context("no uni stream")?;
self.stream.insert(Box::new(stream))
}
};
loop {
// Read the contents of the buffer
let mut peek = Cursor::new(&self.buf);
match Object::decode(&mut peek) {
Ok(header) => {
let stream = self.stream.take().unwrap();
self.buf.advance(peek.position() as usize);
return Ok((header, *stream));
}
Err(DecodeError::UnexpectedEnd) => {
// The decode failed, so we need to append more data.
moq_generic_transport::recv(stream.as_mut(), &mut self.buf).await?;
}
Err(e) => return Err(e.into()),
}
}
}
}

View File

@ -0,0 +1,94 @@
use anyhow::Context;
use moq_generic_transport::{Connection, BidiStream, SendStream, SendStreamUnframed, RecvStream};
use moq_transport::{Message, SetupClient, SetupServer};
use crate::SharedConnection;
use super::{Control, Objects};
// pub struct Server<C: Connection> {
// // The Webtransport/QUIC server, with an already established session/connection.
// endpoint: Box<C>,
// }
// impl<C: Connection> Server<C> {
// pub fn new(endpoint: Box<C>) -> Self {
// let handshake = JoinSet::new();
// Self { endpoint }
// }
// // Accept the next WebTransport session.
// pub async fn accept(&mut self) -> anyhow::Result<Connect> {
// loop {
// tokio::select!(
// // Accept the connection and start the WebTransport handshake.
// conn = self.endpoint.accept() => {
// let conn = conn.context("failed to accept connection")?;
// self.handshake.spawn(async move {
// Connecting::new(conn).accept().await
// });
// },
// // Return any mostly finished WebTransport handshakes.
// res = self.handshake.join_next(), if !self.handshake.is_empty() => {
// let res = res.expect("no tasks").expect("task aborted");
// match res {
// Ok(session) => return Ok(session),
// Err(err) => log::warn!("failed to accept session: {:?}", err),
// }
// },
// )
// }
// }
// }
pub struct Session<S: SendStream + SendStreamUnframed, R: RecvStream + Send, B: BidiStream<SendStream = S, RecvStream = R>, C: Connection<SendStream = S, RecvStream = R, BidiStream = B> + Send> {
pub control: Control<S, C::BidiStream>,
pub objects: Objects<C>,
}
impl<S: SendStream + SendStreamUnframed, B: BidiStream<SendStream = S, RecvStream = R>, R: RecvStream + Send + 'static, C: Connection<SendStream = S, RecvStream = R, BidiStream = B> + Send> Session<S, R, B, C> {
pub async fn accept(control_stream: Box<C::BidiStream>, connection: Box<C>) -> anyhow::Result<AcceptSetup<S, R, B, C>> {
let mut control = Control::new(control_stream);
let objects = Objects::new(std::sync::Arc::new(std::sync::Mutex::new(connection)));
let setup_client = match control.recv().await.context("failed to read SETUP")? {
Message::SetupClient(setup) => setup,
_ => anyhow::bail!("expected CLIENT SETUP"),
};
Ok(AcceptSetup { setup_client, control, objects })
}
pub fn split(self) -> (Control<B::SendStream, C::BidiStream>, Objects<C>) {
(self.control, self.objects)
}
}
pub struct AcceptSetup<S: SendStream + SendStreamUnframed, R: RecvStream + Send, B: BidiStream<SendStream = S, RecvStream = R>, C: Connection<SendStream = S, RecvStream = R, BidiStream = B> + Send> {
setup_client: SetupClient,
control: Control<S, C::BidiStream>,
objects: Objects<C>,
}
impl<S: SendStream + SendStreamUnframed, R: RecvStream + Send, B: BidiStream<SendStream = S, RecvStream = R>, C: Connection<SendStream = S, RecvStream = R, BidiStream = B> + Send> AcceptSetup<S, R, B, C> {
// Return the setup message we received.
pub fn setup(&self) -> &SetupClient {
&self.setup_client
}
// Accept the session with our own setup message.
pub async fn accept(mut self, setup_server: SetupServer) -> anyhow::Result<Session<S, R, B, C>> {
self.control.send(setup_server).await?;
Ok(Session {
control: self.control,
objects: self.objects,
})
}
pub async fn reject(self) -> anyhow::Result<()> {
// TODO Close the QUIC connection with an error code.
Ok(())
}
}

View File

@ -17,7 +17,10 @@ categories = [ "multimedia", "network-programming", "web-programming" ]
[dependencies] [dependencies]
moq-transport = { path = "../moq-transport" } moq-transport = { path = "../moq-transport" }
moq-transport-quinn = { path = "../moq-transport-quinn" } moq-transport-quinn = { path = "../moq-transport-quinn" }
moq-transport-trait = { path = "../moq-transport-trait" }
moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"}
bytes = "1"
tokio = "1.27" tokio = "1.27"
mp4 = "0.13.0" mp4 = "0.13.0"
anyhow = "1.0.70" anyhow = "1.0.70"

View File

@ -2,12 +2,15 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time; use std::time;
use bytes::Buf;
use moq_generic_transport::{Connection, RecvStream, SendStream, SendStreamUnframed, BidiStream};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::task::JoinSet; // lock across await boundaries use tokio::task::JoinSet; // lock across await boundaries
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
use moq_transport_quinn::{RecvObjects, RecvStream}; use moq_transport_trait::{RecvObjects};
use anyhow::Context; use anyhow::Context;
@ -16,9 +19,9 @@ use crate::model::{broadcast, segment, track};
use crate::source::Source; use crate::source::Source;
// TODO experiment with making this Clone, so every task can have its own copy. // TODO experiment with making this Clone, so every task can have its own copy.
pub struct Session { pub struct Session<S: SendStream + SendStreamUnframed, R: RecvStream + Send, B: BidiStream<SendStream = S, RecvStream = R>, C: Connection<SendStream = S, RecvStream = R, BidiStream = B> + Send> {
// Used to receive objects. // Used to receive objects.
objects: RecvObjects, objects: RecvObjects<C>,
// Used to send and receive control messages. // Used to send and receive control messages.
control: control::Component<control::Contribute>, control: control::Component<control::Contribute>,
@ -36,9 +39,9 @@ pub struct Session {
run_segments: JoinSet<anyhow::Result<()>>, // receiving objects run_segments: JoinSet<anyhow::Result<()>>, // receiving objects
} }
impl Session { impl<Bu: Buf + Send, S: SendStream + SendStreamUnframed, R: RecvStream<Buf = Bu> + Send + 'static, B: BidiStream<SendStream = S, RecvStream = R>, C: Connection<SendStream = S, RecvStream = R, BidiStream = B> + Send> Session<S, R, B, C> {
pub fn new( pub fn new(
objects: RecvObjects, objects: RecvObjects<C>,
control: control::Component<control::Contribute>, control: control::Component<control::Contribute>,
broker: broker::Broadcasts, broker: broker::Broadcasts,
) -> Self { ) -> Self {
@ -63,7 +66,7 @@ impl Session {
}, },
object = self.objects.recv() => { object = self.objects.recv() => {
let (object, stream) = object.context("failed to receive object")?; let (object, stream) = object.context("failed to receive object")?;
let res = self.receive_object(object, stream).await; let res = self.receive_object(object, Box::new(stream)).await;
if let Err(err) = res { if let Err(err) = res {
log::error!("failed to receive object: {:?}", err); log::error!("failed to receive object: {:?}", err);
} }
@ -88,7 +91,7 @@ impl Session {
} }
} }
async fn receive_object(&mut self, object: Object, stream: RecvStream) -> anyhow::Result<()> { async fn receive_object<Buu: Buf + Send, Re: RecvStream<Buf = Buu> + Send + 'static>(&mut self, object: Object, stream: Box<Re>) -> anyhow::Result<()> {
let track = object.track; let track = object.track;
let segment = segment::Info { let segment = segment::Info {
@ -111,16 +114,18 @@ impl Session {
Ok(()) Ok(())
} }
async fn run_segment(mut segment: segment::Publisher, mut stream: RecvStream) -> anyhow::Result<()> { async fn run_segment<Buu: Buf + Send, Re: RecvStream<Buf = Buu> + Send + 'static>(mut segment: segment::Publisher, mut stream: Box<Re>) -> anyhow::Result<()> {
let mut buf = [0u8; 32 * 1024]; // let mut buf = [0u8; 32 * 1024];
loop { loop {
let size = stream.read(&mut buf).await.context("failed to read from stream")?; let mut b = bytes::BytesMut::new();
if size == 0 { let stream_finished = !moq_generic_transport::recv(stream.as_mut(), &mut b).await?;
// let size = stream.read(&mut buf).await.context("failed to read from stream")?;
if stream_finished {
return Ok(()); return Ok(());
} }
let chunk = buf[..size].to_vec(); // let chunk = buf[..size].to_vec();
segment.fragments.push(chunk.into()) segment.fragments.push(b.chunk().to_vec().into())
} }
} }
@ -174,7 +179,7 @@ impl Session {
} }
} }
impl Drop for Session { impl<S: SendStream + SendStreamUnframed, B: BidiStream<SendStream = S, RecvStream = R>, R: RecvStream + Send, C: Connection<SendStream = S, RecvStream = R, BidiStream = B> + Send> Drop for Session<S, R, B, C> {
fn drop(&mut self) { fn drop(&mut self) {
// Unannounce all broadcasts we have announced. // Unannounce all broadcasts we have announced.
// TODO make this automatic so we can't screw up? // TODO make this automatic so we can't screw up?

View File

@ -1,17 +1,18 @@
use moq_generic_transport::{SendStream, SendStreamUnframed, BidiStream};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk}; use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk};
use moq_transport_quinn::Control; use moq_transport_trait::Control;
pub struct Main { pub struct Main<S: SendStream + SendStreamUnframed, B: BidiStream<SendStream = S>> {
control: Control, control: Control<S, B>,
outgoing: mpsc::Receiver<Message>, outgoing: mpsc::Receiver<Message>,
contribute: mpsc::Sender<Contribute>, contribute: mpsc::Sender<Contribute>,
distribute: mpsc::Sender<Distribute>, distribute: mpsc::Sender<Distribute>,
} }
impl Main { impl<S: SendStream + SendStreamUnframed, B: BidiStream<SendStream = S>> Main <S, B> {
pub async fn run(mut self) -> anyhow::Result<()> { pub async fn run(mut self) -> anyhow::Result<()> {
loop { loop {
tokio::select! { tokio::select! {
@ -51,7 +52,7 @@ impl<T> Component<T> {
} }
// Splits a control stream into two components, based on if it's a message for contribution or distribution. // Splits a control stream into two components, based on if it's a message for contribution or distribution.
pub fn split(control: Control) -> (Main, Component<Contribute>, Component<Distribute>) { pub fn split<S: SendStream + SendStreamUnframed, B: BidiStream<SendStream = S>>(control: Control<S, B>) -> (Main<S, B>, Component<Contribute>, Component<Distribute>) {
let (outgoing_tx, outgoing_rx) = mpsc::channel(1); let (outgoing_tx, outgoing_rx) = mpsc::channel(1);
let (contribute_tx, contribute_rx) = mpsc::channel(1); let (contribute_tx, contribute_rx) = mpsc::channel(1);
let (distribute_tx, distribute_rx) = mpsc::channel(1); let (distribute_tx, distribute_rx) = mpsc::channel(1);

View File

@ -1,17 +1,19 @@
use anyhow::Context; use anyhow::Context;
use bytes::Buf;
use moq_generic_transport::{SendStream, SendStreamUnframed, BidiStream, Connection};
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet; // allows locking across await use tokio::task::JoinSet; // allows locking across await
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt};
use moq_transport_quinn::SendObjects; use moq_transport_trait::SendObjects;
use super::{broker, control}; use super::{broker, control};
use crate::model::{segment, track}; use crate::model::{segment, track};
pub struct Session { pub struct Session<S: SendStream + SendStreamUnframed + Send, B: BidiStream<SendStream = S>, C: Connection<SendStream = S, BidiStream = B> + Send> {
// Objects are sent to the client // Objects are sent to the client
objects: SendObjects, objects: SendObjects<C>,
// Used to send and receive control messages. // Used to send and receive control messages.
control: control::Component<control::Distribute>, control: control::Component<control::Distribute>,
@ -23,9 +25,9 @@ pub struct Session {
run_subscribes: JoinSet<SubscribeError>, // run subscriptions, sending the returned error if they fail run_subscribes: JoinSet<SubscribeError>, // run subscriptions, sending the returned error if they fail
} }
impl Session { impl<S: SendStream + SendStreamUnframed + Send, B: BidiStream<SendStream = S>, C: Connection<SendStream = S, BidiStream = B> + Send + 'static> Session<S, B, C> {
pub fn new( pub fn new(
objects: SendObjects, objects: SendObjects<C>,
control: control::Component<control::Distribute>, control: control::Component<control::Distribute>,
broker: broker::Broadcasts, broker: broker::Broadcasts,
) -> Self { ) -> Self {
@ -119,7 +121,7 @@ impl Session {
Ok(()) Ok(())
} }
async fn run_subscribe(objects: SendObjects, track_id: VarInt, mut track: track::Subscriber) -> SubscribeError { async fn run_subscribe(objects: SendObjects<C>, track_id: VarInt, mut track: track::Subscriber) -> SubscribeError {
let mut tasks = JoinSet::new(); let mut tasks = JoinSet::new();
let mut result = None; let mut result = None;
@ -154,7 +156,7 @@ impl Session {
} }
async fn serve_group( async fn serve_group(
mut objects: SendObjects, mut objects: SendObjects<C>,
track_id: VarInt, track_id: VarInt,
mut segment: segment::Subscriber, mut segment: segment::Subscriber,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -169,7 +171,11 @@ impl Session {
// Write each fragment as they are available. // Write each fragment as they are available.
while let Some(fragment) = segment.fragments.next().await { while let Some(fragment) = segment.fragments.next().await {
stream.write_all(fragment.as_slice()).await?; let mut buf = bytes::Bytes::copy_from_slice(fragment.as_slice());
while buf.has_remaining() {
moq_generic_transport::send(&mut stream, &mut buf).await?;
}
// stream.write_all(fragment.as_slice()).await?;
} }
// NOTE: stream is automatically closed when dropped // NOTE: stream is automatically closed when dropped

View File

@ -80,26 +80,26 @@ impl Server {
Ok(Self { server, broker, tasks }) Ok(Self { server, broker, tasks })
} }
pub async fn run(mut self) -> anyhow::Result<()> { // pub async fn run(mut self) -> anyhow::Result<()> {
loop { // loop {
tokio::select! { // tokio::select! {
res = self.server.accept() => { // res = self.server.accept() => {
let session = res.context("failed to accept connection")?; // let session = res.context("failed to accept connection")?;
let broker = self.broker.clone(); // let broker = self.broker.clone();
self.tasks.spawn(async move { // self.tasks.spawn(async move {
let session: Session = Session::accept(session, broker).await?; // let session: Session = Session::accept(session, broker).await?;
session.run().await // session.run().await
}); // });
}, // },
res = self.tasks.join_next(), if !self.tasks.is_empty() => { // res = self.tasks.join_next(), if !self.tasks.is_empty() => {
let res = res.expect("no tasks").expect("task aborted"); // let res = res.expect("no tasks").expect("task aborted");
if let Err(err) = res { // if let Err(err) = res {
log::error!("session terminated: {:?}", err); // log::error!("session terminated: {:?}", err);
} // }
}, // },
} // }
} // }
} // }
} }

View File

@ -1,49 +1,71 @@
use anyhow::Context; use anyhow::Context;
use moq_generic_transport::{SendStream, SendStreamUnframed, BidiStream, Connection, RecvStream};
use super::{broker, contribute, control, distribute}; use super::{broker, contribute, control, distribute};
use moq_transport::{Role, SetupServer, Version}; use moq_transport::{Role, SetupServer, Version};
use moq_transport_quinn::Connect; use moq_transport_quinn::Connect;
pub struct Session { pub struct Session<S: SendStream + SendStreamUnframed + Send, R: RecvStream + Send, B: BidiStream<SendStream = S, RecvStream = R>, C: Connection<SendStream = S, RecvStream = R, BidiStream = B> + Send> {
// Split logic into contribution/distribution to reduce the problem space. // Split logic into contribution/distribution to reduce the problem space.
contribute: contribute::Session, contribute: contribute::Session<S, R, B, C>,
distribute: distribute::Session, distribute: distribute::Session<S, B, C>,
// Used to receive control messages and forward to contribute/distribute. // Used to receive control messages and forward to contribute/distribute.
control: control::Main, control: control::Main<S, B>,
} }
impl Session { impl<S: SendStream + SendStreamUnframed + Send + 'static, B: BidiStream<SendStream = S, RecvStream = R>, R: RecvStream + Send + 'static, C: Connection<SendStream = S, RecvStream = R, BidiStream = B> + Send + 'static> Session<S, R, B, C> {
pub async fn accept(session: Connect, broker: broker::Broadcasts) -> anyhow::Result<Session> { // pub async fn accept(session: Connect, broker: broker::Broadcasts) -> anyhow::Result<Session<S, R, B, C>> {
// Accep the WebTransport session. // // Accep the WebTransport session.
// OPTIONAL validate the conn.uri() otherwise call conn.reject() // // OPTIONAL validate the conn.uri() otherwise call conn.reject()
let session = session // let session = session
.accept() // .accept()
.await // .await
.context(": server::Setupfailed to accept WebTransport session")?; // .context(": server::Setupfailed to accept WebTransport session")?;
session // session
.setup() // .setup()
.versions // .versions
.iter() // .iter()
.find(|v| **v == Version::DRAFT_00) // .find(|v| **v == Version::DRAFT_00)
.context("failed to find supported version")?; // .context("failed to find supported version")?;
// Choose our role based on the client's role. // // Choose our role based on the client's role.
let role = match session.setup().role { // let role = match session.setup().role {
Role::Publisher => Role::Subscriber, // Role::Publisher => Role::Subscriber,
Role::Subscriber => Role::Publisher, // Role::Subscriber => Role::Publisher,
Role::Both => Role::Both, // Role::Both => Role::Both,
}; // };
let setup = SetupServer { // let setup = SetupServer {
version: Version::DRAFT_00, // version: Version::DRAFT_00,
role, // role,
}; // };
let session = session.accept(setup).await?; // let session = session.accept(setup).await?;
// let (control, objects) = session.split();
// let (objects_send, objects_recv) = objects.split();
// let (control, contribute, distribute) = control::split(control);
// let contribute = contribute::Session::new(objects_recv, contribute, broker.clone());
// let distribute = distribute::Session::new(objects_send, distribute, broker);
// let session = Self {
// control,
// contribute,
// distribute,
// };
// Ok(session)
// }
pub async fn from_session(
session: moq_transport_trait::Session<S, R, B, C>,
broker: broker::Broadcasts,
) -> anyhow::Result<Session<S, R, B, C>> {
let (control, objects) = session.split(); let (control, objects) = session.split();
let (objects_send, objects_recv) = objects.split(); let (objects_send, objects_recv) = objects.split();