put generic transport behaviour in moq-transport

This commit is contained in:
François Michel 2023-07-15 11:13:40 +00:00
parent 4b9a31fa89
commit 05da131c9a
21 changed files with 203 additions and 732 deletions

View File

@ -2,9 +2,6 @@
members = [ members = [
"moq-transport", "moq-transport",
"moq-transport-quinn", "moq-transport-quinn",
"moq-demo-quiche",
"moq-demo-quinn", "moq-demo-quinn",
"moq-warp", "moq-warp",
"transport",
"moq-transport-generic",
] ]

View File

@ -1,46 +0,0 @@
[package]
name = "moq-demo-quiche"
description = "Media over QUIC"
authors = [ "Luke Curley" ]
repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0"
version = "0.1.0"
edition = "2021"
keywords = [ "quic", "http3", "webtransport", "media", "live" ]
categories = [ "multimedia", "network-programming", "web-programming" ]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
moq-transport = { path = "../moq-transport" }
moq-warp = { path = "../moq-warp" }
# QUIC
quinn = "0.10"
# Crypto
ring = "0.16.20"
rustls = "0.21.2"
rustls-pemfile = "1.0.2"
# Async stuff
tokio = { version = "1.29.1", features = ["full"] }
# Web server to serve the fingerprint
warp = { version = "0.3.3", features = ["tls"] }
hex = "0.4.3"
# Logging
clap = { version = "4.0", features = [ "derive" ] }
log = { version = "0.4", features = ["std"] }
env_logger = "0.9.3"
anyhow = "1.0.70"
moq-transport-generic = { path = "../moq-transport-generic" }
# moq-generic-transport = { path = "../transport" }
moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"}
webtransport_quiche = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" }
async_webtransport = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" }

View File

@ -1,218 +0,0 @@
use std::{fs, io, net, path, sync::{self, Arc}};
use anyhow::Context;
use async_webtransport_handler::{AsyncWebTransportServer, regex::Regex};
use clap::Parser;
use moq_transport::{Role, SetupServer, Version};
use ring::digest::{digest, SHA256};
use tokio::task::JoinSet;
use warp::Filter;
use moq_warp::{relay::{self, ServerConfig}, source};
use webtransport_quiche::quiche;
/// Search for a pattern in a file and display the lines that contain it.
#[derive(Parser, Clone)]
struct Cli {
/// Listen on this address
#[arg(short, long, default_value = "[::]:4443")]
addr: net::SocketAddr,
/// Use the certificate file at this path
#[arg(short, long, default_value = "cert/localhost.crt")]
cert: path::PathBuf,
/// Use the private key at this path
#[arg(short, long, default_value = "cert/localhost.key")]
key: path::PathBuf,
/// Use the media file at this path
#[arg(short, long, default_value = "media/fragmented.mp4")]
media: path::PathBuf,
/// use quiche instead of quinn
#[arg(short, long)]
quiche: bool,
}
// Create a new server
pub fn new_quiche(config: ServerConfig) -> anyhow::Result<(AsyncWebTransportServer, tokio::net::UdpSocket, Vec<Regex>)> {
let mut quic_config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
println!("loading cert {:?}, key {:?}", config.cert, config.key);
quic_config.load_cert_chain_from_pem_file(config.cert.to_str().unwrap()).unwrap();
quic_config.load_priv_key_from_pem_file(config.key.to_str().unwrap()).unwrap();
quic_config
.set_application_protos(quiche::h3::APPLICATION_PROTOCOL)
.unwrap();
quic_config.set_cc_algorithm_name("cubic").unwrap();
quic_config.set_max_idle_timeout(10000);
quic_config.set_max_recv_udp_payload_size(1200);
quic_config.set_max_send_udp_payload_size(1200);
quic_config.set_initial_max_data(1_000_000_000);
quic_config.set_initial_max_stream_data_bidi_local(100_000_000);
quic_config.set_initial_max_stream_data_bidi_remote(100_000_000);
quic_config.set_initial_max_stream_data_uni(100_000_000);
quic_config.set_initial_max_streams_bidi(1_000_000);
quic_config.set_initial_max_streams_uni(1_000_000);
quic_config.set_disable_active_migration(true);
quic_config.enable_early_data();
quic_config.grease(false);
// quic_config.set_fec_scheduler_algorithm(quiche::FECSchedulerAlgorithm::BurstsOnly);
// quic_config.send_fec(args.get_bool("--send-fec"));
// quic_config.receive_fec(args.get_bool("--receive-fec"));
// quic_config.set_real_time(args.get_bool("--real-time-cc"));
let h3_config = quiche::h3::Config::new().unwrap();
let keylog = if let Some(keylog_path) = std::env::var_os("SSLKEYLOGFILE") {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(keylog_path)
.unwrap();
Some(file)
} else {
None
};
let (server, socket) = AsyncWebTransportServer::with_configs(config.addr,
quic_config, h3_config, keylog)?;
let uri_root = "/";
let regexes = [Regex::new(format!("{}", uri_root).as_str()).unwrap()];
Ok((server, socket, regexes.to_vec()))
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
let args = Cli::parse();
// Create a web server to serve the fingerprint
let serve = serve_http(args.clone());
let mut tasks = JoinSet::new();
tasks.spawn(async move {
serve.await.unwrap();
});
// Create a fake media source from disk.
let media = source::File::new(args.media).context("failed to open file source")?;
let broker = relay::broker::Broadcasts::new();
broker
.announce("quic.video/demo", media.source())
.context("failed to announce file source")?;
let mut tasks = JoinSet::new();
tasks.spawn(async move {
media.run().await.unwrap();
});
// Create a server to actually serve the media
let config = relay::ServerConfig {
addr: args.addr,
cert: args.cert,
key: args.key,
broker: broker.clone(),
};
let (server, socket, regexes) = new_quiche(config).unwrap();
let server = Arc::new(std::sync::Mutex::new(server));
let socket = Arc::new(socket);
let mut buf = vec![0; 10000];
let mut tasks = JoinSet::new();
'mainloop: loop {
println!("listen...");
let cid = {
// let mut server = endpoint.quiche_server.lock().await;
let ret = async_webtransport_handler::AsyncWebTransportServer::listen_ref(server.clone(), socket.clone(), &mut buf).await?;
println!("listen returned {:?}", ret);
match ret {
Some(cid) => cid,
None => continue 'mainloop,
}
};
loop {
println!("poll");
match server.lock().unwrap().poll(&cid, &regexes[..]) {
Ok(async_webtransport_handler::Event::NewSession(path, session_id, _regex_index)) => {
let server = server.clone();
let cid = cid.clone();
let broker = broker.clone();
tasks.spawn(async move {
let mut webtransport_session = async_webtransport_handler::WebTransportSession::new(server.clone(), cid.clone(), session_id);
let control_stream = moq_generic_transport::accept_bidi(&mut webtransport_session).await.unwrap().unwrap();
let received_client_setup = moq_transport_generic::Session::accept(Box::new(control_stream), Box::new(webtransport_session)).await.unwrap();
// TODO: maybe reject setup
let role = match received_client_setup.setup().role {
Role::Publisher => Role::Subscriber,
Role::Subscriber => Role::Publisher,
Role::Both => Role::Both,
};
let setup_server = SetupServer {
version: Version::DRAFT_00,
role,
};
let session = received_client_setup.accept(setup_server).await.unwrap();
let session = relay::Session::from_transport_session(session, broker.clone()).await.unwrap();
session.run().await
});
},
Ok(async_webtransport_handler::Event::StreamData(session_id, stream_id)) => {
log::trace!("new data!");
},
Ok(async_webtransport_handler::Event::Done) => {
println!("H3 Done");
break;
},
Ok(async_webtransport_handler::Event::GoAway) => {
println!("GOAWAY");
break;
},
Err(_) => todo!(),
}
}
}
}
// Run a HTTP server using Warp
// TODO remove this when Chrome adds support for self-signed certificates using WebTransport
async fn serve_http(args: Cli) -> anyhow::Result<()> {
// Read the PEM certificate file
let crt = fs::File::open(&args.cert)?;
let mut crt = io::BufReader::new(crt);
// Parse the DER certificate
let certs = rustls_pemfile::certs(&mut crt)?;
let cert = certs.first().expect("no certificate found");
// Compute the SHA-256 digest
let fingerprint = digest(&SHA256, cert.as_ref());
let fingerprint = hex::encode(fingerprint.as_ref());
let fingerprint = sync::Arc::new(fingerprint);
let cors = warp::cors().allow_any_origin();
// What an annoyingly complicated way to serve a static String
// I spent a long time trying to find the exact way of cloning and dereferencing the Arc.
let routes = warp::path!("fingerprint")
.map(move || (*(fingerprint.clone())).clone())
.with(cors);
warp::serve(routes)
.tls()
.cert_path(args.cert)
.key_path(args.key)
.run(args.addr)
.await;
Ok(())
}

View File

@ -15,13 +15,6 @@ categories = [ "multimedia", "network-programming", "web-programming" ]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
moq-transport = { path = "../moq-transport" }
moq-transport-quinn = { path = "../moq-transport-quinn" }
moq-warp = { path = "../moq-warp" }
h3 = { git = "https://github.com/hyperium/h3", branch = "master" }
h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" }
h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" }
# QUIC # QUIC
quinn = "0.10" quinn = "0.10"
@ -46,8 +39,11 @@ anyhow = "1.0.70"
bytes= "1" bytes= "1"
moq-transport-generic = { path = "../moq-transport-generic" } webtransport-generic = {git = "https://github.com/kixelated/webtransport-rs"}
# moq-generic-transport = { path = "../transport" } moq-transport = { path = "../moq-transport" }
moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"} moq-transport-quinn = { path = "../moq-transport-quinn" }
webtransport_quiche = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" } moq-warp = { path = "../moq-warp" }
async_webtransport = { git = "ssh://git@github.com/francoismichel/webtransport-quiche.git" }
h3 = { git = "https://github.com/hyperium/h3", branch = "master" }
h3-quinn = { git = "https://github.com/hyperium/h3", branch = "master" }
h3-webtransport = { git = "https://github.com/hyperium/h3", branch = "master" }

View File

@ -1,24 +1,18 @@
use std::{fs, io, sync::{self, Arc}, time}; use std::{fs, io, time};
use anyhow::Context; use anyhow::Context;
use bytes::Bytes; use bytes::Bytes;
use h3::{quic::StreamId, proto::varint::VarInt}; use h3::quic::BidiStream;
use h3_webtransport::server::AcceptedBi; use h3_webtransport::server::AcceptedBi;
use moq_transport::Message; use moq_transport::AcceptSetup;
use moq_transport_generic::{AcceptSetup, Control, Objects}; use moq_warp::relay::ServerConfig;
use moq_warp::relay::{broker, ServerConfig};
use tokio::task::JoinSet; use tokio::task::JoinSet;
use warp::{Future, http}; use warp::{Future, http};
use self::stream::{QuinnBidiStream, QuinnSendStream, QuinnRecvStream}; use self::stream::{QuinnSendStream, QuinnRecvStream};
mod stream; mod stream;
fn stream_id_to_u64(stream_id: StreamId) -> u64 {
let val: VarInt = stream_id.into();
val.into_inner()
}
pub struct Server { pub struct Server {
// The MoQ transport server. // The MoQ transport server.
server: h3_webtransport::server::WebTransportSession<h3_quinn::Connection, bytes::Bytes>, server: h3_webtransport::server::WebTransportSession<h3_quinn::Connection, bytes::Bytes>,
@ -61,15 +55,15 @@ impl Server {
]; ];
tls_config.alpn_protocols = alpn; tls_config.alpn_protocols = alpn;
let mut server_config = quinn::ServerConfig::with_crypto(sync::Arc::new(tls_config)); let mut server_config = quinn::ServerConfig::with_crypto(std::sync::Arc::new(tls_config));
// Enable BBR congestion control // Enable BBR congestion control
// TODO validate the implementation // TODO validate the implementation
let mut transport_config = quinn::TransportConfig::default(); let mut transport_config = quinn::TransportConfig::default();
transport_config.keep_alive_interval(Some(time::Duration::from_secs(2))); transport_config.keep_alive_interval(Some(time::Duration::from_secs(2)));
transport_config.congestion_controller_factory(sync::Arc::new(quinn::congestion::BbrConfig::default())); transport_config.congestion_controller_factory(std::sync::Arc::new(quinn::congestion::BbrConfig::default()));
server_config.transport = sync::Arc::new(transport_config); server_config.transport = std::sync::Arc::new(transport_config);
let server = quinn::Endpoint::server(server_config, config.addr)?; let server = quinn::Endpoint::server(server_config, config.addr)?;
Ok(server) Ok(server)
@ -160,22 +154,18 @@ pub struct Connect {
} }
impl Connect { impl Connect {
// Expose the received URI
pub fn uri(&self) -> &http::Uri {
self.req.uri()
}
// Accept the WebTransport session. // Accept the WebTransport session.
pub async fn accept(self) -> anyhow::Result<AcceptSetup<Server>> { pub async fn accept(self) -> anyhow::Result<AcceptSetup<Server>> {
let session = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?; let session = h3_webtransport::server::WebTransportSession::accept(self.req, self.stream, self.conn).await?;
let mut session = Server{server: session}; let mut session = Server{server: session};
let control_stream = moq_generic_transport::accept_bidi(&mut session) let (control_stream_send, control_stream_recv) = moq_transport::accept_bidi(&mut session)
.await .await
.context("failed to accept bidi stream")? .context("failed to accept bidi stream")?
.unwrap(); .unwrap();
Ok(moq_transport_generic::Session::accept(Box::new(control_stream), Box::new(session)).await?) Ok(moq_transport::Session::accept(Box::new(control_stream_send), Box::new(control_stream_recv), Box::new(session)).await?)
} }
// Reject the WebTransport session with a HTTP response. // Reject the WebTransport session with a HTTP response.
@ -186,17 +176,17 @@ impl Connect {
} }
impl moq_generic_transport::Connection for Server { impl webtransport_generic::Connection for Server {
type BidiStream = QuinnBidiStream;
type Error = anyhow::Error;
type SendStream = QuinnSendStream; type SendStream = QuinnSendStream;
type RecvStream = QuinnRecvStream; type RecvStream = QuinnRecvStream;
fn poll_accept_recv( fn poll_accept_uni(
&mut self, &mut self,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Option<Self::RecvStream>, anyhow::Error>> { ) -> std::task::Poll<Result<Option<Self::RecvStream>, Self::Error>> {
let fut = self.server.accept_uni(); let fut = self.server.accept_uni();
let fut = std::pin::pin!(fut); let fut = std::pin::pin!(fut);
fut.poll(cx) fut.poll(cx)
@ -207,13 +197,16 @@ impl moq_generic_transport::Connection for Server {
fn poll_accept_bidi( fn poll_accept_bidi(
&mut self, &mut self,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Option<Self::BidiStream>, anyhow::Error>> { ) -> std::task::Poll<Result<Option<(Self::SendStream, Self::RecvStream)>, Self::Error>> {
let fut = self.server.accept_bi(); let fut = self.server.accept_bi();
let fut = std::pin::pin!(fut); let fut = std::pin::pin!(fut);
let res = std::task::ready!(fut.poll(cx).map_err(|e| anyhow::anyhow!("{:?}", e))); let res = std::task::ready!(fut.poll(cx).map_err(|e| anyhow::anyhow!("{:?}", e)));
match res { match res {
Ok(Some(AcceptedBi::Request(_, _))) => std::task::Poll::Ready(Err(anyhow::anyhow!("received new session whils accepting bidi stream"))), Ok(Some(AcceptedBi::Request(_, _))) => std::task::Poll::Ready(Err(anyhow::anyhow!("received new session whils accepting bidi stream"))),
Ok(Some(AcceptedBi::BidiStream(_, s))) => std::task::Poll::Ready(Ok(Some(QuinnBidiStream::new(s)))), Ok(Some(AcceptedBi::BidiStream(_, s))) => {
let (send, recv) = s.split();
std::task::Poll::Ready(Ok(Some((QuinnSendStream::new(send), QuinnRecvStream::new(recv)))))
}
Ok(None) => std::task::Poll::Ready(Ok(None)), Ok(None) => std::task::Poll::Ready(Ok(None)),
Err(e) => std::task::Poll::Ready(Err(e)), Err(e) => std::task::Poll::Ready(Err(e)),
} }
@ -222,18 +215,22 @@ impl moq_generic_transport::Connection for Server {
fn poll_open_bidi( fn poll_open_bidi(
&mut self, &mut self,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Self::BidiStream, anyhow::Error>> { ) -> std::task::Poll<Result<(Self::SendStream, Self::RecvStream), Self::Error>> {
let fut = self.server.open_bi(self.server.session_id()); let fut = self.server.open_bi(self.server.session_id());
let fut = std::pin::pin!(fut); let fut = std::pin::pin!(fut);
fut.poll(cx) fut.poll(cx)
.map_ok(|s| QuinnBidiStream::new(s)) .map_ok(|s| {
let (send, recv) = s.split();
(QuinnSendStream::new(send), QuinnRecvStream::new(recv))
}
)
.map_err(|e| anyhow::anyhow!("{:?}", e)) .map_err(|e| anyhow::anyhow!("{:?}", e))
} }
fn poll_open_send( fn poll_open_uni(
&mut self, &mut self,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Self::SendStream, anyhow::Error>> { ) -> std::task::Poll<Result<Self::SendStream, Self::Error>> {
let fut = self.server.open_uni(self.server.session_id()); let fut = self.server.open_uni(self.server.session_id());
let fut = std::pin::pin!(fut); let fut = std::pin::pin!(fut);
fut.poll(cx) fut.poll(cx)
@ -241,7 +238,7 @@ impl moq_generic_transport::Connection for Server {
.map_err(|e| anyhow::anyhow!("{:?}", e)) .map_err(|e| anyhow::anyhow!("{:?}", e))
} }
fn close(&mut self, _code: u64, _reason: &[u8]) { fn close(&mut self, _code: u32, _reason: &[u8]) {
todo!("not implemented") todo!("not implemented")
} }
} }

View File

@ -1,10 +1,7 @@
use h3::quic::SendStream; use h3::quic::SendStream;
use h3::quic::RecvStream; use h3::quic::RecvStream;
use h3::quic::BidiStream;
use h3::quic::SendStreamUnframed; use h3::quic::SendStreamUnframed;
use super::stream_id_to_u64;
pub struct QuinnSendStream { pub struct QuinnSendStream {
stream: h3_webtransport::stream::SendStream<h3_quinn::SendStream<bytes::Bytes>, bytes::Bytes> stream: h3_webtransport::stream::SendStream<h3_quinn::SendStream<bytes::Bytes>, bytes::Bytes>
} }
@ -15,25 +12,18 @@ impl QuinnSendStream {
} }
} }
impl moq_generic_transport::SendStream for QuinnSendStream { impl webtransport_generic::SendStream for QuinnSendStream {
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), anyhow::Error>> {
self.stream.poll_ready(cx).map_err(|e| anyhow::anyhow!("{:?}", e))
}
fn poll_finish(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), anyhow::Error>> { type Error = anyhow::Error;
fn poll_finish(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
self.stream.poll_finish(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) self.stream.poll_finish(cx).map_err(|e| anyhow::anyhow!("{:?}", e))
} }
fn reset(&mut self, reset_code: u64) { fn reset(&mut self, reset_code: u32) {
self.stream.reset(reset_code) self.stream.reset(reset_code as u64)
} }
fn send_id(&self) -> u64 {
stream_id_to_u64(self.stream.send_id())
}
}
impl moq_generic_transport::SendStreamUnframed for QuinnSendStream {
fn poll_send<D: bytes::Buf>( fn poll_send<D: bytes::Buf>(
&mut self, &mut self,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
@ -43,7 +33,6 @@ impl moq_generic_transport::SendStreamUnframed for QuinnSendStream {
} }
} }
pub struct QuinnRecvStream { pub struct QuinnRecvStream {
stream: h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, bytes::Bytes> stream: h3_webtransport::stream::RecvStream<h3_quinn::RecvStream, bytes::Bytes>
} }
@ -54,96 +43,18 @@ impl QuinnRecvStream {
} }
} }
impl moq_generic_transport::RecvStream for QuinnRecvStream { impl webtransport_generic::RecvStream for QuinnRecvStream {
type Error = anyhow::Error;
type Buf = bytes::Bytes; type Buf = bytes::Bytes;
fn poll_data( fn poll_data(
&mut self, &mut self,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Option<Self::Buf>, anyhow::Error>> { ) -> std::task::Poll<Result<Option<Self::Buf>, Self::Error>> {
self.stream.poll_data(cx).map_err(|e| anyhow::anyhow!("{:?}", e)) self.stream.poll_data(cx).map_err(|e| anyhow::anyhow!("{:?}", e))
} }
fn stop_sending(&mut self, error_code: u64) { fn stop_sending(&mut self, error_code: u32) {
self.stream.stop_sending(error_code) self.stream.stop_sending(error_code as u64)
}
fn recv_id(&self) -> u64 {
stream_id_to_u64(self.stream.recv_id())
} }
} }
pub struct QuinnBidiStream {
stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<bytes::Bytes>, bytes::Bytes>
}
impl QuinnBidiStream {
pub fn new(stream: h3_webtransport::stream::BidiStream<h3_quinn::BidiStream<bytes::Bytes>, bytes::Bytes>) -> QuinnBidiStream {
QuinnBidiStream { stream }
}
}
impl moq_generic_transport::BidiStream for QuinnBidiStream {
type SendStream = QuinnSendStream;
type RecvStream = QuinnRecvStream;
fn split(self) -> (Self::SendStream, Self::RecvStream) {
let (send, recv) = self.stream.split();
let send = QuinnSendStream{
stream: send,
};
let recv = QuinnRecvStream{
stream: recv,
};
(send, recv)
}
}
impl moq_generic_transport::RecvStream for QuinnBidiStream {
type Buf = bytes::Bytes;
fn poll_data(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Option<Self::Buf>, anyhow::Error>> {
self.stream.poll_data(cx).map_err(|e| anyhow::anyhow!("{:?}", e))
}
fn stop_sending(&mut self, error_code: u64) {
self.stream.stop_sending(error_code)
}
fn recv_id(&self) -> u64 {
stream_id_to_u64(self.stream.recv_id())
}
}
impl moq_generic_transport::SendStream for QuinnBidiStream {
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), anyhow::Error>> {
self.stream.poll_ready(cx).map_err(|e| anyhow::anyhow!("{:?}", e))
}
fn poll_finish(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), anyhow::Error>> {
self.stream.poll_finish(cx).map_err(|e| anyhow::anyhow!("{:?}", e))
}
fn reset(&mut self, reset_code: u64) {
self.stream.reset(reset_code)
}
fn send_id(&self) -> u64 {
stream_id_to_u64(self.stream.send_id())
}
}
impl moq_generic_transport::SendStreamUnframed for QuinnBidiStream {
fn poll_send<D: bytes::Buf>(
&mut self,
cx: &mut std::task::Context<'_>,
buf: &mut D,
) -> std::task::Poll<Result<usize, anyhow::Error>> {
self.stream.poll_send(cx, buf).map_err(|e| anyhow::anyhow!("{:?}", e))
}
}

View File

@ -1,28 +0,0 @@
[package]
name = "moq-transport-generic"
description = "Media over QUIC"
authors = [ "Luke Curley" ]
repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0"
version = "0.1.0"
edition = "2021"
keywords = [ "quic", "http3", "webtransport", "media", "live" ]
categories = [ "multimedia", "network-programming", "web-programming" ]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
moq-transport = { path = "../moq-transport" }
# WebTransport support: TODO pin a version when released
moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"}
http = "0.2"
tokio = { version = "1.27", features = ["macros", "sync"] }
bytes = "1"
log = "0.4"
anyhow = "1.0.70"
thiserror = "1.0.21"

View File

@ -18,3 +18,7 @@ categories = [ "multimedia", "network-programming", "web-programming" ]
bytes = "1" bytes = "1"
thiserror = "1.0.21" thiserror = "1.0.21"
log = "0.4" log = "0.4"
tokio = { version = "1.27", features = ["full"] }
anyhow = "1.0.70"
webtransport-generic = {git = "https://github.com/kixelated/webtransport-rs"}

View File

@ -1,7 +1,9 @@
mod coding; mod coding;
mod control; mod control;
mod object; mod object;
mod network;
pub use coding::*; pub use coding::*;
pub use control::*; pub use control::*;
pub use object::*; pub use object::*;
pub use network::*;

View File

@ -1,5 +1,6 @@
use moq_generic_transport::{RecvStream, BidiStream}; use webtransport_generic::{RecvStream, SendStream};
use moq_transport::{Decode, DecodeError, Encode, Message}; use crate::{Decode, DecodeError, Encode, Message};
use crate::network::stream::{recv, send};
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
@ -7,27 +8,31 @@ use std::io::Cursor;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use anyhow::Context;
pub struct Control<B: BidiStream> {
sender: ControlSend<B>, pub struct Control<S: SendStream, R: RecvStream> {
recver: ControlRecv<B::RecvStream>, sender: ControlSend<S>,
recver: ControlRecv<R>,
} }
impl<B: BidiStream> Control<B>{ impl<S: SendStream, R: RecvStream> Control<S, R>{
pub(crate) fn new(stream: Box<B>) -> Self { pub(crate) fn new(sender: Box<S>, recver: Box<R>) -> Self {
let (sender, recver) = stream.split(); let sender = ControlSend::new(sender);
let sender = ControlSend::new(Box::new(sender)); let recver = ControlRecv::new(recver);
let recver = ControlRecv::new(Box::new(recver));
Self { sender, recver } Self { sender, recver }
} }
pub fn split(self) -> (ControlSend<B>, ControlRecv<B::RecvStream>) { pub fn split(self) -> (ControlSend<S>, ControlRecv<R>) {
(self.sender, self.recver) (self.sender, self.recver)
} }
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> { pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
self.sender.send(msg).await self.sender.send(msg)
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))
.context("error sending control message")
} }
pub async fn recv(&mut self) -> anyhow::Result<Message> { pub async fn recv(&mut self) -> anyhow::Result<Message> {
@ -35,13 +40,13 @@ impl<B: BidiStream> Control<B>{
} }
} }
pub struct ControlSend<B: BidiStream> { pub struct ControlSend<S: SendStream> {
stream: Box<B::SendStream>, stream: Box<S>,
buf: BytesMut, // reuse a buffer to encode messages. buf: BytesMut, // reuse a buffer to encode messages.
} }
impl<B: BidiStream> ControlSend<B> { impl<S: SendStream> ControlSend<S> {
pub fn new(inner: Box<B::SendStream>) -> Self { pub fn new(inner: Box<S>) -> Self {
Self { Self {
buf: BytesMut::new(), buf: BytesMut::new(),
stream: inner, stream: inner,
@ -56,13 +61,15 @@ impl<B: BidiStream> ControlSend<B> {
msg.encode(&mut self.buf)?; msg.encode(&mut self.buf)?;
// TODO make this work with select! // TODO make this work with select!
moq_generic_transport::send(self.stream.as_mut(), &mut self.buf).await?; send(self.stream.as_mut(), &mut self.buf)
.await
.map_err(|e| anyhow::anyhow!("{:?}", e.into()))
.context("error sending control message")?;
Ok(()) Ok(())
} }
// Helper that lets multiple threads send control messages. // Helper that lets multiple threads send control messages.
pub fn share(self) -> ControlShared<B> { pub fn share(self) -> ControlShared<S> {
ControlShared { ControlShared {
stream: Arc::new(Mutex::new(self)), stream: Arc::new(Mutex::new(self)),
} }
@ -72,11 +79,11 @@ impl<B: BidiStream> ControlSend<B> {
// Helper that allows multiple threads to send control messages. // Helper that allows multiple threads to send control messages.
// There's no equivalent for receiving since only one thread should be receiving at a time. // There's no equivalent for receiving since only one thread should be receiving at a time.
#[derive(Clone)] #[derive(Clone)]
pub struct ControlShared<B: BidiStream> { pub struct ControlShared<S: SendStream> {
stream: Arc<Mutex<ControlSend<B>>>, stream: Arc<Mutex<ControlSend<S>>>,
} }
impl<B: BidiStream> ControlShared<B> { impl<S: SendStream> ControlShared<S> {
pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> { pub async fn send<T: Into<Message>>(&mut self, msg: T) -> anyhow::Result<()> {
let mut stream = self.stream.lock().await; let mut stream = self.stream.lock().await;
stream.send(msg).await stream.send(msg).await
@ -112,7 +119,10 @@ impl<R: RecvStream> ControlRecv<R> {
} }
Err(DecodeError::UnexpectedEnd) => { Err(DecodeError::UnexpectedEnd) => {
// The decode failed, so we need to append more data. // The decode failed, so we need to append more data.
moq_generic_transport::recv(self.stream.as_mut(), &mut self.buf).await?; recv(self.stream.as_mut(), &mut self.buf)
.await
.map_err(|e| anyhow::anyhow!("{:?}", e.into()))
.context("error receiving control message")?;
} }
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
} }

View File

@ -1,3 +1,4 @@
mod stream;
mod control; mod control;
mod object; mod object;
mod server; mod server;
@ -6,6 +7,7 @@ use std::sync::Arc;
use std::sync::Mutex; use std::sync::Mutex;
pub type SharedConnection<C> = Arc<Mutex<Box<C>>>; pub type SharedConnection<C> = Arc<Mutex<Box<C>>>;
pub use stream::*;
pub use control::*; pub use control::*;
pub use object::*; pub use object::*;
pub use server::*; pub use server::*;

View File

@ -1,22 +1,21 @@
use anyhow::Context; use anyhow::Context;
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use moq_generic_transport::{Connection, SendStream, SendStreamUnframed, RecvStream}; use webtransport_generic::{Connection, SendStream, RecvStream};
use moq_transport::{Decode, DecodeError, Encode, Object}; use crate::{Decode, DecodeError, Encode, Object};
use std::{io::Cursor, marker::PhantomData}; use std::{io::Cursor, marker::PhantomData};
use crate::SharedConnection; use crate::network::SharedConnection;
use super::stream::{open_uni_shared, send, recv, accept_uni_shared};
// TODO support clients // TODO support clients
// We could replace this generic soup by just <C: Connection> if we forced Connection's SendStream
// to provide SendStreamUnframes's send() method. Without that, we have to make Connection's
// SendStream type more specific and force it to implement SendStreamUnframes as well.
pub struct Objects<C: Connection> { pub struct Objects<C: Connection> {
send: SendObjects<C>, send: SendObjects<C>,
recv: RecvObjects<C>, recv: RecvObjects<C>,
} }
impl<S: SendStream + SendStreamUnframed, R: RecvStream + 'static, C: Connection<SendStream = S, RecvStream = R> + Send> Objects<C> { impl<S: SendStream, R: RecvStream + 'static, C: Connection<SendStream = S, RecvStream = R> + Send> Objects<C> {
pub fn new(session: SharedConnection<C>) -> Self { pub fn new(session: SharedConnection<C>) -> Self {
let send = SendObjects::new(session.clone()); let send = SendObjects::new(session.clone());
let recv = RecvObjects::new(session); let recv = RecvObjects::new(session);
@ -44,7 +43,7 @@ pub struct SendObjects<C: Connection> {
_marker: PhantomData<C>, _marker: PhantomData<C>,
} }
impl<S: SendStream + SendStreamUnframed, C: Connection<SendStream = S>> SendObjects<C> { impl<S: SendStream, C: Connection<SendStream = S>> SendObjects<C> {
pub fn new(session: SharedConnection<C>) -> Self { pub fn new(session: SharedConnection<C>) -> Self {
Self { Self {
session, session,
@ -58,17 +57,21 @@ impl<S: SendStream + SendStreamUnframed, C: Connection<SendStream = S>> SendObje
header.encode(&mut self.buf).unwrap(); header.encode(&mut self.buf).unwrap();
// TODO support select! without making a new stream. // TODO support select! without making a new stream.
let mut stream = moq_generic_transport::open_send_shared(self.session.clone()) let mut stream = open_uni_shared(self.session.clone())
.await .await
.map_err(|e| anyhow::anyhow!("{:?}", e.into()))
.context("failed to open uni stream")?; .context("failed to open uni stream")?;
moq_generic_transport::send(&mut stream, &mut self.buf).await?; send(&mut stream, &mut self.buf)
.await
.map_err(|e| anyhow::anyhow!("{:?}", e.into()))
.context("failed to send data on stream")?;
Ok(stream) Ok(stream)
} }
} }
impl<S: SendStream + SendStreamUnframed, C: Connection<SendStream = S>> Clone for SendObjects<C> { impl<S: SendStream, C: Connection<SendStream = S>> Clone for SendObjects<C> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
session: self.session.clone(), session: self.session.clone(),
@ -104,8 +107,9 @@ impl<R: RecvStream + 'static, C: Connection<RecvStream = R>> RecvObjects<C> {
let stream = match self.stream.as_mut() { let stream = match self.stream.as_mut() {
Some(stream) => stream, Some(stream) => stream,
None => { None => {
let stream = moq_generic_transport::accept_recv_shared(self.session.clone()) let stream = accept_uni_shared(self.session.clone())
.await .await
.map_err(|e| anyhow::anyhow!("{:?}", e.into()))
.context("failed to accept uni stream")? .context("failed to accept uni stream")?
.context("no uni stream")?; .context("no uni stream")?;
@ -126,7 +130,10 @@ impl<R: RecvStream + 'static, C: Connection<RecvStream = R>> RecvObjects<C> {
} }
Err(DecodeError::UnexpectedEnd) => { Err(DecodeError::UnexpectedEnd) => {
// The decode failed, so we need to append more data. // The decode failed, so we need to append more data.
moq_generic_transport::recv(stream.as_mut(), &mut self.buf).await?; recv(stream.as_mut(), &mut self.buf)
.await
.map_err(|e| anyhow::anyhow!("{:?}", e.into()))
.context("failed to recv data on stream")?;
} }
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
} }

View File

@ -1,19 +1,19 @@
use anyhow::Context; use anyhow::Context;
use moq_generic_transport::{Connection, RecvStream}; use webtransport_generic::{Connection, RecvStream};
use moq_transport::{Message, SetupClient, SetupServer}; use crate::{Message, SetupClient, SetupServer};
use super::{Control, Objects}; use super::{Control, Objects};
pub struct Session<C: Connection + Send> { pub struct Session<C: Connection + Send> {
pub control: Control<C::BidiStream>, pub control: Control<C::SendStream, C::RecvStream>,
pub objects: Objects<C>, pub objects: Objects<C>,
} }
impl<R: RecvStream + 'static, C: Connection<RecvStream = R> + Send> Session<C> { impl<R: RecvStream + 'static, C: Connection<RecvStream = R> + Send> Session<C> {
pub async fn accept(control_stream: Box<C::BidiStream>, connection: Box<C>) -> anyhow::Result<AcceptSetup<C>> { pub async fn accept(control_stream_send: Box<C::SendStream>, control_stream_recv: Box::<C::RecvStream>, connection: Box<C>) -> anyhow::Result<AcceptSetup<C>> {
let mut control = Control::new(control_stream); let mut control = Control::new(control_stream_send, control_stream_recv);
let objects = Objects::new(std::sync::Arc::new(std::sync::Mutex::new(connection))); let objects = Objects::new(std::sync::Arc::new(std::sync::Mutex::new(connection)));
let setup_client = match control.recv().await.context("failed to read SETUP")? { let setup_client = match control.recv().await.context("failed to read SETUP")? {
@ -23,7 +23,7 @@ impl<R: RecvStream + 'static, C: Connection<RecvStream = R> + Send> Session<C> {
Ok(AcceptSetup { setup_client, control, objects }) Ok(AcceptSetup { setup_client, control, objects })
} }
pub fn split(self) -> (Control<C::BidiStream>, Objects<C>) { pub fn split(self) -> (Control<C::SendStream, C::RecvStream>, Objects<C>) {
(self.control, self.objects) (self.control, self.objects)
} }
} }
@ -31,7 +31,7 @@ impl<R: RecvStream + 'static, C: Connection<RecvStream = R> + Send> Session<C> {
pub struct AcceptSetup<C: Connection + Send> { pub struct AcceptSetup<C: Connection + Send> {
setup_client: SetupClient, setup_client: SetupClient,
control: Control<C::BidiStream>, control: Control<C::SendStream, C::RecvStream>,
objects: Objects<C>, objects: Objects<C>,
} }

View File

@ -0,0 +1,60 @@
use std::sync::Arc;
use bytes::{Buf, BytesMut, BufMut};
use webtransport_generic::{Connection, RecvStream, SendStream};
pub async fn accept_uni<C: Connection>(conn: &mut C) -> Result<Option<C::RecvStream>, C::Error> {
std::future::poll_fn(|cx| conn.poll_accept_uni(cx)).await
}
pub async fn accept_uni_shared<C: Connection>(conn: Arc<std::sync::Mutex<Box<C>>>) -> Result<Option<C::RecvStream>, C::Error> {
Ok(std::future::poll_fn(|cx| conn.lock().unwrap().poll_accept_uni(cx)).await?)
}
pub async fn accept_bidi<C: Connection>(conn: &mut C) -> Result<Option<(C::SendStream, C::RecvStream)>, C::Error> {
std::future::poll_fn(|cx| conn.poll_accept_bidi(cx)).await
}
pub async fn accept_bidi_shared<C: Connection>(conn: Arc<std::sync::Mutex<Box<C>>>) -> Result<Option<(C::SendStream, C::RecvStream)>, C::Error> {
std::future::poll_fn(|cx| conn.lock().unwrap().poll_accept_bidi(cx)).await
}
pub async fn open_uni<C: Connection>(conn: &mut C) -> anyhow::Result<C::SendStream, C::Error> {
std::future::poll_fn(|cx| conn.poll_open_uni(cx)).await
}
pub async fn open_uni_shared<C: Connection>(conn: Arc<std::sync::Mutex<Box<C>>>) -> Result<C::SendStream, C::Error> {
std::future::poll_fn(|cx| conn.lock().unwrap().poll_open_uni(cx)).await
}
pub async fn open_bidi<C: Connection>(conn: &mut C) -> anyhow::Result<(C::SendStream, C::RecvStream), C::Error> {
std::future::poll_fn(|cx| conn.poll_open_bidi(cx)).await
}
pub async fn open_bidi_shared<C: Connection>(conn: Arc<std::sync::Mutex<Box<C>>>) -> Result<(C::SendStream, C::RecvStream), C::Error> {
std::future::poll_fn(|cx| conn.lock().unwrap().poll_open_bidi(cx)).await
}
pub async fn recv<R: RecvStream>(recv: &mut R , outbuf: &mut BytesMut) -> Result<bool, R::Error> {
let buf = std::future::poll_fn(|cx| recv.poll_data(cx)).await?;
match buf {
Some(buf) => {
outbuf.put(buf);
Ok(true)
}
None => Ok(false) // stream finished
}
}
pub async fn send<B: Buf, S: SendStream>(send: &mut S, buf: &mut B) -> Result<usize, S::Error> {
std::future::poll_fn(|cx| send.poll_send(cx, buf)).await
}

View File

@ -17,8 +17,7 @@ categories = [ "multimedia", "network-programming", "web-programming" ]
[dependencies] [dependencies]
moq-transport = { path = "../moq-transport" } moq-transport = { path = "../moq-transport" }
moq-transport-quinn = { path = "../moq-transport-quinn" } moq-transport-quinn = { path = "../moq-transport-quinn" }
moq-transport-generic = { path = "../moq-transport-generic" } webtransport-generic = {git = "https://github.com/kixelated/webtransport-rs"}
moq-generic-transport = {git = "https://github.com/francoismichel/moq-rs-quiche", branch = "generic-transport-trait"}
bytes = "1" bytes = "1"
tokio = "1.27" tokio = "1.27"

View File

@ -4,12 +4,11 @@ use std::sync::{Arc, Mutex};
use std::time; use std::time;
use bytes::Buf; use bytes::Buf;
use moq_generic_transport::{Connection, RecvStream}; use webtransport_generic::{Connection, RecvStream};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::task::JoinSet; // lock across await boundaries use tokio::task::JoinSet; // lock across await boundaries
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt, RecvObjects};
use moq_transport_generic::RecvObjects;
use anyhow::Context; use anyhow::Context;
@ -93,7 +92,7 @@ impl<R: RecvStream + Send + 'static, C: Connection<RecvStream = R> + Send> Sessi
} }
} }
async fn receive_object<Buu: Buf + Send, Re: RecvStream<Buf = Buu> + Send + 'static>(&mut self, object: Object, stream: Box<Re>) -> anyhow::Result<()> { async fn receive_object(&mut self, object: Object, stream: Box<R>) -> anyhow::Result<()> {
let track = object.track; let track = object.track;
let segment = segment::Info { let segment = segment::Info {
@ -116,17 +115,17 @@ impl<R: RecvStream + Send + 'static, C: Connection<RecvStream = R> + Send> Sessi
Ok(()) Ok(())
} }
async fn run_segment<Buu: Buf + Send, Re: RecvStream<Buf = Buu> + Send + 'static>(mut segment: segment::Publisher, mut stream: Box<Re>) -> anyhow::Result<()> { async fn run_segment(mut segment: segment::Publisher, mut stream: Box<R>) -> anyhow::Result<()> {
// let mut buf = [0u8; 32 * 1024];
loop { loop {
let mut b = bytes::BytesMut::new(); let mut b = bytes::BytesMut::new();
let stream_finished = !moq_generic_transport::recv(stream.as_mut(), &mut b).await?; let stream_finished = !moq_transport::recv(stream.as_mut(), &mut b)
// let size = stream.read(&mut buf).await.context("failed to read from stream")?; .await
.map_err(|e| anyhow::anyhow!("{:?}", e.into()))
.context("error receiving control message")?;
if stream_finished { if stream_finished {
return Ok(()); return Ok(());
} }
// let chunk = buf[..size].to_vec();
segment.fragments.push(b.chunk().to_vec().into()) segment.fragments.push(b.chunk().to_vec().into())
} }
} }

View File

@ -1,18 +1,17 @@
use moq_generic_transport::{SendStream, SendStreamUnframed, BidiStream}; use webtransport_generic::{SendStream, RecvStream};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk}; use moq_transport::{Announce, AnnounceError, AnnounceOk, Message, Subscribe, SubscribeError, SubscribeOk, Control};
use moq_transport_generic::Control;
pub struct Main<B: BidiStream> { pub struct Main<S: SendStream, R: RecvStream> {
control: Control<B>, control: Control<S, R>,
outgoing: mpsc::Receiver<Message>, outgoing: mpsc::Receiver<Message>,
contribute: mpsc::Sender<Contribute>, contribute: mpsc::Sender<Contribute>,
distribute: mpsc::Sender<Distribute>, distribute: mpsc::Sender<Distribute>,
} }
impl<B: BidiStream> Main <B> { impl<S: SendStream, R: RecvStream> Main <S, R> {
pub async fn run(mut self) -> anyhow::Result<()> { pub async fn run(mut self) -> anyhow::Result<()> {
loop { loop {
tokio::select! { tokio::select! {
@ -52,7 +51,7 @@ impl<T> Component<T> {
} }
// Splits a control stream into two components, based on if it's a message for contribution or distribution. // Splits a control stream into two components, based on if it's a message for contribution or distribution.
pub fn split<S: SendStream + SendStreamUnframed, B: BidiStream<SendStream = S>>(control: Control<B>) -> (Main<B>, Component<Contribute>, Component<Distribute>) { pub fn split<S: SendStream, R: RecvStream>(control: Control<S, R>) -> (Main<S, R>, Component<Contribute>, Component<Distribute>) {
let (outgoing_tx, outgoing_rx) = mpsc::channel(1); let (outgoing_tx, outgoing_rx) = mpsc::channel(1);
let (contribute_tx, contribute_rx) = mpsc::channel(1); let (contribute_tx, contribute_rx) = mpsc::channel(1);
let (distribute_tx, distribute_rx) = mpsc::channel(1); let (distribute_tx, distribute_rx) = mpsc::channel(1);

View File

@ -3,16 +3,15 @@ use std::marker::PhantomData;
use anyhow::Context; use anyhow::Context;
use bytes::Buf; use bytes::Buf;
use moq_generic_transport::{SendStream, SendStreamUnframed, Connection}; use webtransport_generic::{SendStream, Connection};
use tokio::task::JoinSet; // allows locking across await use tokio::task::JoinSet; // allows locking across await
use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt}; use moq_transport::{Announce, AnnounceError, AnnounceOk, Object, Subscribe, SubscribeError, SubscribeOk, VarInt, SendObjects};
use moq_transport_generic::SendObjects;
use super::{broker, control}; use super::{broker, control};
use crate::model::{segment, track}; use crate::model::{segment, track};
pub struct Session<S: SendStream + SendStreamUnframed + Send, C: Connection + Send> { pub struct Session<S: SendStream + Send, C: Connection + Send> {
// Objects are sent to the client // Objects are sent to the client
objects: SendObjects<C>, objects: SendObjects<C>,
@ -29,7 +28,7 @@ pub struct Session<S: SendStream + SendStreamUnframed + Send, C: Connection + Se
} }
impl<S, C> Session<S, C> where impl<S, C> Session<S, C> where
S: SendStream + SendStreamUnframed + Send, S: SendStream + Send,
C: Connection<SendStream = S> + Send + 'static { C: Connection<SendStream = S> + Send + 'static {
pub fn new( pub fn new(
objects: SendObjects<C>, objects: SendObjects<C>,
@ -179,13 +178,14 @@ impl<S, C> Session<S, C> where
while let Some(fragment) = segment.fragments.next().await { while let Some(fragment) = segment.fragments.next().await {
let mut buf = bytes::Bytes::copy_from_slice(fragment.as_slice()); let mut buf = bytes::Bytes::copy_from_slice(fragment.as_slice());
while buf.has_remaining() { while buf.has_remaining() {
moq_generic_transport::send(&mut stream, &mut buf).await?; moq_transport::send(&mut stream, &mut buf)
.await
.map_err(|e| anyhow::anyhow!("{:?}", e.into()))
.context("error sending control message")?;
} }
// stream.write_all(fragment.as_slice()).await?;
} }
// NOTE: stream is automatically closed when dropped // NOTE: stream is automatically closed when dropped
Ok(()) Ok(())
} }

View File

@ -1,29 +1,28 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use moq_generic_transport::{SendStream, SendStreamUnframed, Connection, RecvStream}; use webtransport_generic::{SendStream, Connection, RecvStream};
use super::{broker, contribute, control, distribute}; use super::{broker, contribute, control, distribute};
pub struct Session<R: RecvStream + Send, S: SendStream + SendStreamUnframed + Send, C: Connection + Send> { pub struct Session<R: RecvStream + Send, S: SendStream + Send, C: Connection + Send> {
// Split logic into contribution/distribution to reduce the problem space. // Split logic into contribution/distribution to reduce the problem space.
contribute: contribute::Session<R, C>, contribute: contribute::Session<R, C>,
distribute: distribute::Session<S, C>, distribute: distribute::Session<S, C>,
// Used to receive control messages and forward to contribute/distribute. // Used to receive control messages and forward to contribute/distribute.
control: control::Main<C::BidiStream>, control: control::Main<S, R>,
_marker: PhantomData<S>, _marker: PhantomData<S>,
_marker_r: PhantomData<R>, _marker_r: PhantomData<R>,
} }
// impl<R: RecvStream + Send + 'static, S: SendStream + SendStreamUnframed + Send, C: Connection<RecvStream = R, SendStream = S> + Send + 'static> Session<R, S, C> {
impl<R, S, C> Session<R, S, C> where impl<R, S, C> Session<R, S, C> where
R: RecvStream + Send + 'static, R: RecvStream + Send + 'static,
S: SendStream + SendStreamUnframed + Send, S: SendStream + Send,
C: Connection<RecvStream = R, SendStream = S> + Send + 'static C: Connection<RecvStream = R, SendStream = S> + Send + 'static
{ {
pub async fn from_transport_session( pub async fn from_transport_session(
session: moq_transport_generic::Session<C>, session: moq_transport::Session<C>,
broker: broker::Broadcasts, broker: broker::Broadcasts,
) -> anyhow::Result<Session<R, S, C>> { ) -> anyhow::Result<Session<R, S, C>> {
let (control, objects) = session.split(); let (control, objects) = session.split();

View File

@ -1,22 +0,0 @@
[package]
name = "moq-generic-transport"
description = "Media over QUIC"
authors = [ "Luke Curley" ]
repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0"
version = "0.1.0"
edition = "2021"
keywords = [ "quic", "http3", "webtransport", "media", "live" ]
categories = [ "multimedia", "network-programming", "web-programming" ]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bytes = "1"
thiserror = "1.0.21"
log = "0.4"
anyhow = "1.0.70"

View File

@ -1,197 +0,0 @@
// Coming from https://github.com/hyperium/h3, the goal is to
// do a PR with the changes afterwards
use std::{task::{self, Poll}, sync::Arc};
use bytes::{Buf, BufMut};
use anyhow::Error;
type ErrorCode = u64;
type StreamId = u64;
/// Trait representing a QUIC connection.
pub trait Connection {
/// The type produced by `poll_accept_bidi()`
type BidiStream: BidiStream;
/// The type of the sending part of `BidiStream`
type SendStream: SendStream + SendStreamUnframed;
/// The type produced by `poll_accept_recv()`
type RecvStream: RecvStream;
/// Accept an incoming unidirectional stream
///
/// Returning `None` implies the connection is closing or closed.
fn poll_accept_recv(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::RecvStream>, Error>>;
/// Accept an incoming bidirectional stream
///
/// Returning `None` implies the connection is closing or closed.
fn poll_accept_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::BidiStream>, Error>>;
/// Poll the connection to create a new bidirectional stream.
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Error>>;
/// Poll the connection to create a new unidirectional stream.
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Error>>;
/// Close the connection immediately
fn close(&mut self, code: ErrorCode, reason: &[u8]);
}
/// Trait for opening outgoing streams
pub trait OpenStreams {
/// The type produced by `poll_open_bidi()`
type BidiStream: BidiStream;
/// The type produced by `poll_open_send()`
type SendStream: SendStream + SendStreamUnframed;
/// The type of the receiving part of `BidiStream`
type RecvStream: RecvStream;
/// Poll the connection to create a new bidirectional stream.
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Error>>;
/// Poll the connection to create a new unidirectional stream.
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Error>>;
/// Close the connection immediately
fn close(&mut self, code: ErrorCode, reason: &[u8]);
}
/// A trait describing the "send" actions of a QUIC stream.
pub trait SendStream {
/// Polls if the stream can send more data.
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>>;
/// Poll to finish the sending side of the stream.
fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>>;
/// Send a QUIC reset code.
fn reset(&mut self, reset_code: u64);
/// Get QUIC send stream id
fn send_id(&self) -> StreamId;
}
/// Allows sending unframed pure bytes to a stream. Similar to [`AsyncWrite`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncWrite.html)
pub trait SendStreamUnframed: SendStream {
/// Attempts write data into the stream.
///
/// Returns the number of bytes written.
///
/// `buf` is advanced by the number of bytes written.
fn poll_send<D: Buf>(
&mut self,
cx: &mut task::Context<'_>,
buf: &mut D,
) -> Poll<Result<usize, Error>>;
}
/// A trait describing the "receive" actions of a QUIC stream.
pub trait RecvStream {
/// The type of `Buf` for data received on this stream.
type Buf: Buf + Send;
/// Poll the stream for more data.
///
/// When the receive side will no longer receive more data (such as because
/// the peer closed their sending side), this should return `None`.
fn poll_data(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::Buf>, anyhow::Error>>;
/// Send a `STOP_SENDING` QUIC code.
fn stop_sending(&mut self, error_code: u64);
/// Get QUIC send stream id
fn recv_id(&self) -> StreamId;
}
pub async fn accept_recv<C: Connection>(conn: &mut C) -> anyhow::Result<Option<C::RecvStream>, Error> {
Ok(std::future::poll_fn(|cx| conn.poll_accept_recv(cx)).await?)
}
pub async fn accept_recv_shared<C: Connection>(conn: Arc<std::sync::Mutex<Box<C>>>) -> anyhow::Result<Option<C::RecvStream>, Error> {
Ok(std::future::poll_fn(|cx| conn.lock().unwrap().poll_accept_recv(cx)).await?)
}
pub async fn accept_bidi<C: Connection>(conn: &mut C) -> anyhow::Result<Option<C::BidiStream>, Error> {
Ok(std::future::poll_fn(|cx| conn.poll_accept_bidi(cx)).await?)
}
pub async fn accept_bidi_shared<C: Connection>(conn: Arc<std::sync::Mutex<Box<C>>>) -> anyhow::Result<Option<C::BidiStream>, Error> {
Ok(std::future::poll_fn(|cx| conn.lock().unwrap().poll_accept_bidi(cx)).await?)
}
pub async fn open_send<C: Connection>(conn: &mut C) -> anyhow::Result<C::SendStream, Error> {
Ok(std::future::poll_fn(|cx| conn.poll_open_send(cx)).await?)
}
pub async fn open_send_shared<C: Connection>(conn: Arc<std::sync::Mutex<Box<C>>>) -> anyhow::Result<C::SendStream, Error> {
Ok(std::future::poll_fn(|cx| conn.lock().unwrap().poll_open_send(cx)).await?)
}
pub async fn open_bidi<C: Connection>(conn: &mut C) -> anyhow::Result<C::BidiStream, Error> {
Ok(std::future::poll_fn(|cx| conn.poll_open_bidi(cx)).await?)
}
pub async fn open_bidi_shared<C: Connection>(conn: Arc<std::sync::Mutex<Box<C>>>) -> anyhow::Result<C::BidiStream, Error> {
Ok(std::future::poll_fn(|cx| conn.lock().unwrap().poll_open_bidi(cx)).await?)
}
pub async fn recv<B: Buf, BM: BufMut, R: RecvStream<Buf = B>>(recv: &mut R , outbuf: &mut BM) -> anyhow::Result<bool> {
let buf = std::future::poll_fn(|cx| recv.poll_data(cx)).await?;
match buf {
Some(buf) => {
outbuf.put(buf);
Ok(true)
}
None => Ok(false) // stream finished
}
}
pub async fn send<B: Buf, S: SendStreamUnframed>(send: &mut S, buf: &mut B) -> anyhow::Result<usize> {
Ok(std::future::poll_fn(|cx| send.poll_send(cx, buf)).await?)
}
/// Optional trait to allow "splitting" a bidirectional stream into two sides.
pub trait BidiStream: SendStream + RecvStream {
/// The type for the send half.
type SendStream: SendStream + SendStreamUnframed;
/// The type for the receive half.
type RecvStream: RecvStream;
/// Split this stream into two halves.
fn split(self) -> (Self::SendStream, Self::RecvStream);
}