From 93830a2f24325b953ca0bf20ea6ed5324b6d92d4 Mon Sep 17 00:00:00 2001 From: kixelated Date: Tue, 7 Nov 2023 15:41:15 +0900 Subject: [PATCH] Add a CLOCK. (#119) --- Cargo.lock | 95 +++++++++++++++ Cargo.toml | 2 +- Dockerfile | 6 +- HACKATHON.md | 53 --------- dev/clock | 19 +++ moq-clock/Cargo.toml | 46 ++++++++ moq-clock/src/cli.rs | 46 ++++++++ moq-clock/src/clock.rs | 148 ++++++++++++++++++++++++ moq-clock/src/main.rs | 123 ++++++++++++++++++++ moq-pub/src/media.rs | 31 ++--- moq-transport/Cargo.toml | 23 ++++ moq-transport/src/cache/fragment.rs | 16 +-- moq-transport/src/cache/segment.rs | 28 +++-- moq-transport/src/cache/track.rs | 2 +- moq-transport/src/session/error.rs | 6 + moq-transport/src/session/publisher.rs | 11 +- moq-transport/src/session/subscriber.rs | 32 ++--- 17 files changed, 570 insertions(+), 117 deletions(-) delete mode 100644 HACKATHON.md create mode 100755 dev/clock create mode 100644 moq-clock/Cargo.toml create mode 100644 moq-clock/src/cli.rs create mode 100644 moq-clock/src/clock.rs create mode 100644 moq-clock/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 6820423..b8225b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.5.0" @@ -374,6 +389,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets", +] + [[package]] name = "clap" version = "4.4.2" @@ -872,6 +901,29 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "iana-time-zone" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.4.0" @@ -1051,6 +1103,28 @@ dependencies = [ "url", ] +[[package]] +name = "moq-clock" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "clap", + "clap_mangen", + "env_logger", + "log", + "moq-transport", + "quinn", + "rustls", + "rustls-native-certs", + "rustls-pemfile", + "tokio", + "tracing", + "tracing-subscriber", + "url", + "webtransport-quinn", +] + [[package]] name = "moq-pub" version = "0.1.0" @@ -1107,14 +1181,26 @@ dependencies = [ name = "moq-transport" version = "0.2.0" dependencies = [ + "anyhow", "async-trait", "bytes", + "clap", + "env_logger", "indexmap 2.0.0", "log", + "mp4", "paste", "quinn", + "rfc6381-codec", + "rustls", + "rustls-native-certs", + "rustls-pemfile", + "serde_json", "thiserror", "tokio", + "tracing", + "tracing-subscriber", + "url", "webtransport-quinn", ] @@ -2461,6 +2547,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.51.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index a5bae56..fbe60d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] -members = ["moq-transport", "moq-relay", "moq-pub", "moq-api"] +members = ["moq-transport", "moq-relay", "moq-pub", "moq-api", "moq-clock"] resolver = "2" diff --git a/Dockerfile b/Dockerfile index de4a70f..0545e38 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,11 +22,11 @@ RUN apt-get update && \ # Copy the publish script into the image COPY deploy/publish.sh /usr/local/bin/publish -# Copy the compiled binary -COPY --from=builder /usr/local/cargo/bin/moq-pub /usr/local/cargo/bin/moq-pub +# Copy the compiled binaries +COPY --from=builder /usr/local/cargo/bin /usr/local/cargo/bin CMD [ "publish" ] -# moq-rs image with just the binaries +# moq-rs image with just the binaries (default) FROM rust:latest as moq-rs LABEL org.opencontainers.image.source=https://github.com/kixelated/moq-rs diff --git a/HACKATHON.md b/HACKATHON.md deleted file mode 100644 index 6f5ad79..0000000 --- a/HACKATHON.md +++ /dev/null @@ -1,53 +0,0 @@ -# Hackathon - -IETF Prague 118 - -## MoqTransport - -Reference libraries are available at [moq-rs](https://github.com/kixelated/moq-rs) and [moq-js](https://github.com/kixelated/moq-js). The Rust library is [well documented](https://docs.rs/moq-transport/latest/moq_transport/) but the web library, not so much. - -**TODO** Update both to draft-01. -**TODO** Switch any remaining forks over to extensions. ex: track_id in SUBSCRIBE - -The stream mapping right now is quite rigid: `stream == group == object`. - -**TODO** Support multiple objects per group. They MUST NOT use different priorities, different tracks, or out-of-order sequences. - -The API and cache aren't designed to send/receive arbitrary objects over arbitrary streams as specified in the draft. I don't think it should, and it wouldn't be possible to implement in time for the hackathon anyway. - -**TODO** Make an extension to enforce this stream mapping? - -## Generic Relay - -I'm hosting a simple CDN at: `relay.quic.video` - -The traffic is sharded based on the WebTransport path to avoid namespace collisions. Think of it like a customer ID, although it's completely unauthenticated for now. Use your username or whatever string you want: `CONNECT https://relay.quic.video/alan`. - -**TODO** Currently, it performs an implicit `ANNOUNCE ""` when `role=publisher`. This means there can only be a single publisher per shard and `role=both` is not supported. I should have explicit `ANNOUNCE` messages supported before the hackathon to remove this limitation. - -**TODO** I don't know if I will have subscribe hints fully working in time. They will be parsed but might be ignored. - -## CMAF Media - -You can [publish](https://quic.video/publish) and [watch](https://quic.video/watch) broadcasts. -There's a [24/7 bunny stream](https://quic.video/watch/bbb) or you can publish your own using [moq-pub](https://github.com/kixelated/moq-rs/tree/main/moq-pub). - -If you want to fetch from the relay directly, the name of the broadcast is the path. For example, `https://quic.video/watch/bbb` can be accessed at `relay.quic.video/bbb`. - -The namespace is empty and the catalog track is `.catalog`. I'm currently using simple JSON catalog with no support for delta updates. - -**TODO** update to the proposed [Warp catalog](https://datatracker.ietf.org/doc/draft-wilaw-moq-catalogformat/). - -The media tracks uses a single (unbounded) object per group. Video groups are per GoP, while audio groups are per frame. There's also an init track containing information required to initialize the decoder. - -**TODO** Base64 encode the init track in the catalog. - - -## Clock - -**TODO** Host a clock demo that sends a group per second: - -``` -GROUP: YYYY-MM-DD HH:MM -OBJECT: SS -``` diff --git a/dev/clock b/dev/clock new file mode 100755 index 0000000..ee2f91c --- /dev/null +++ b/dev/clock @@ -0,0 +1,19 @@ +#!/bin/bash +set -euo pipefail + +# Change directory to the root of the project +cd "$(dirname "$0")/.." + +# Use debug logging by default +export RUST_LOG="${RUST_LOG:-debug}" + +# Connect to localhost by default. +HOST="${HOST:-localhost}" +PORT="${PORT:-4443}" +ADDR="${ADDR:-$HOST:$PORT}" +NAME="${NAME:-clock}" + +# Combine the host and name into a URL. +URL="${URL:-"https://$ADDR/$NAME"}" + +cargo run --bin moq-clock -- "$URL" "$@" diff --git a/moq-clock/Cargo.toml b/moq-clock/Cargo.toml new file mode 100644 index 0000000..d5664f7 --- /dev/null +++ b/moq-clock/Cargo.toml @@ -0,0 +1,46 @@ +[package] +name = "moq-clock" +description = "CLOCK over QUIC" +authors = ["Luke Curley"] +repository = "https://github.com/kixelated/moq-rs" +license = "MIT OR Apache-2.0" + +version = "0.1.0" +edition = "2021" + +keywords = ["quic", "http3", "webtransport", "media", "live"] +categories = ["multimedia", "network-programming", "web-programming"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +moq-transport = { path = "../moq-transport" } + +# QUIC +quinn = "0.10" +webtransport-quinn = "0.6" +url = "2" + +# Crypto +rustls = { version = "0.21", features = ["dangerous_configuration"] } +rustls-native-certs = "0.6" +rustls-pemfile = "1" + +# Async stuff +tokio = { version = "1", features = ["full"] } + +# CLI, logging, error handling +clap = { version = "4", features = ["derive"] } +log = { version = "0.4", features = ["std"] } +env_logger = "0.9" +anyhow = { version = "1", features = ["backtrace"] } +tracing = "0.1" +tracing-subscriber = "0.3" + +# CLOCK STUFF +chrono = "0.4" + +[build-dependencies] +clap = { version = "4", features = ["derive"] } +clap_mangen = "0.2" +url = "2" diff --git a/moq-clock/src/cli.rs b/moq-clock/src/cli.rs new file mode 100644 index 0000000..32debc1 --- /dev/null +++ b/moq-clock/src/cli.rs @@ -0,0 +1,46 @@ +use clap::Parser; +use std::{net, path}; +use url::Url; + +#[derive(Parser, Clone, Debug)] +pub struct Config { + /// Listen for UDP packets on the given address. + #[arg(long, default_value = "[::]:0")] + pub bind: net::SocketAddr, + + /// Connect to the given URL starting with https:// + #[arg(value_parser = moq_url)] + pub url: Url, + + /// Use the TLS root CA at this path, encoded as PEM. + /// + /// This value can be provided multiple times for multiple roots. + /// If this is empty, system roots will be used instead + #[arg(long)] + pub tls_root: Vec, + + /// Danger: Disable TLS certificate verification. + /// + /// Fine for local development, but should be used in caution in production. + #[arg(long)] + pub tls_disable_verify: bool, + + /// Publish the current time to the relay, otherwise only subscribe. + #[arg(long)] + pub publish: bool, + + /// The name of the clock track. + #[arg(long, default_value = "now")] + pub track: String, +} + +fn moq_url(s: &str) -> Result { + let url = Url::try_from(s).map_err(|e| e.to_string())?; + + // Make sure the scheme is moq + if url.scheme() != "https" { + return Err("url scheme must be https:// for WebTransport".to_string()); + } + + Ok(url) +} diff --git a/moq-clock/src/clock.rs b/moq-clock/src/clock.rs new file mode 100644 index 0000000..5faf56e --- /dev/null +++ b/moq-clock/src/clock.rs @@ -0,0 +1,148 @@ +use std::time; + +use anyhow::Context; +use moq_transport::{ + cache::{fragment, segment, track}, + VarInt, +}; + +use chrono::prelude::*; + +pub struct Publisher { + track: track::Publisher, +} + +impl Publisher { + pub fn new(track: track::Publisher) -> Self { + Self { track } + } + + pub async fn run(mut self) -> anyhow::Result<()> { + let start = Utc::now(); + let mut now = start; + + // Just for fun, don't start at zero. + let mut sequence = start.minute(); + + loop { + let segment = self + .track + .create_segment(segment::Info { + sequence: VarInt::from_u32(sequence), + priority: 0, + expires: Some(time::Duration::from_secs(60)), + }) + .context("failed to create minute segment")?; + + sequence += 1; + + tokio::spawn(async move { + if let Err(err) = Self::send_segment(segment, now).await { + log::warn!("failed to send minute: {:?}", err); + } + }); + + let next = now + chrono::Duration::minutes(1); + let next = next.with_second(0).unwrap().with_nanosecond(0).unwrap(); + + let delay = (next - now).to_std().unwrap(); + tokio::time::sleep(delay).await; + + now = next; // just assume we didn't undersleep + } + } + + async fn send_segment(mut segment: segment::Publisher, mut now: DateTime) -> anyhow::Result<()> { + // Everything but the second. + let base = now.format("%Y-%m-%d %H:%M:").to_string(); + + segment + .fragment(VarInt::ZERO, base.len())? + .chunk(base.clone().into()) + .context("failed to write base")?; + + loop { + let delta = now.format("%S").to_string(); + let sequence = VarInt::from_u32(now.second() + 1); + + segment + .fragment(sequence, delta.len())? + .chunk(delta.clone().into()) + .context("failed to write delta")?; + + log::info!("{}{}", base, delta); + + let next = now + chrono::Duration::seconds(1); + let next = next.with_nanosecond(0).unwrap(); + + let delay = (next - now).to_std().unwrap(); + tokio::time::sleep(delay).await; + + // Get the current time again to check if we overslept + let next = Utc::now(); + if next.minute() != now.minute() { + return Ok(()); + } + + now = next; + } + } +} +pub struct Subscriber { + track: track::Subscriber, +} + +impl Subscriber { + pub fn new(track: track::Subscriber) -> Self { + Self { track } + } + + pub async fn run(mut self) -> anyhow::Result<()> { + while let Some(segment) = self.track.segment().await.context("failed to get segment")? { + log::debug!("got segment: {:?}", segment); + tokio::spawn(async move { + if let Err(err) = Self::recv_segment(segment).await { + log::warn!("failed to receive segment: {:?}", err); + } + }); + } + + Ok(()) + } + + async fn recv_segment(mut segment: segment::Subscriber) -> anyhow::Result<()> { + let first = segment + .fragment() + .await + .context("failed to get first fragment")? + .context("no fragments in segment")?; + + log::debug!("got first: {:?}", first); + + if first.sequence.into_inner() != 0 { + anyhow::bail!("first object must be zero; I'm not going to implement a reassembly buffer"); + } + + let base = Self::recv_fragment(first, Vec::new()).await?; + + log::debug!("read base: {:?}", String::from_utf8_lossy(&base)); + + while let Some(fragment) = segment.fragment().await? { + log::debug!("next fragment: {:?}", fragment); + let value = Self::recv_fragment(fragment, base.clone()).await?; + let str = String::from_utf8(value).context("invalid UTF-8")?; + + log::info!("{}", str); + } + + Ok(()) + } + + async fn recv_fragment(mut fragment: fragment::Subscriber, mut buf: Vec) -> anyhow::Result> { + while let Some(data) = fragment.chunk().await? { + buf.extend_from_slice(&data); + } + + Ok(buf) + } +} diff --git a/moq-clock/src/main.rs b/moq-clock/src/main.rs new file mode 100644 index 0000000..219dab7 --- /dev/null +++ b/moq-clock/src/main.rs @@ -0,0 +1,123 @@ +use std::{fs, io, sync::Arc, time}; + +use anyhow::Context; +use clap::Parser; + +mod cli; +mod clock; + +use moq_transport::cache::broadcast; + +// TODO: clap complete + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + // Disable tracing so we don't get a bunch of Quinn spam. + let tracer = tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::WARN) + .finish(); + tracing::subscriber::set_global_default(tracer).unwrap(); + + let config = cli::Config::parse(); + + // Create a list of acceptable root certificates. + let mut roots = rustls::RootCertStore::empty(); + + if config.tls_root.is_empty() { + // Add the platform's native root certificates. + for cert in rustls_native_certs::load_native_certs().context("could not load platform certs")? { + roots + .add(&rustls::Certificate(cert.0)) + .context("failed to add root cert")?; + } + } else { + // Add the specified root certificates. + for root in &config.tls_root { + let root = fs::File::open(root).context("failed to open root cert file")?; + let mut root = io::BufReader::new(root); + + let root = rustls_pemfile::certs(&mut root).context("failed to read root cert")?; + anyhow::ensure!(root.len() == 1, "expected a single root cert"); + let root = rustls::Certificate(root[0].to_owned()); + + roots.add(&root).context("failed to add root cert")?; + } + } + + let mut tls_config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(roots) + .with_no_client_auth(); + + // Allow disabling TLS verification altogether. + if config.tls_disable_verify { + let noop = NoCertificateVerification {}; + tls_config.dangerous().set_certificate_verifier(Arc::new(noop)); + } + + tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important + + let arc_tls_config = std::sync::Arc::new(tls_config); + let quinn_client_config = quinn::ClientConfig::new(arc_tls_config); + + let mut endpoint = quinn::Endpoint::client(config.bind)?; + endpoint.set_default_client_config(quinn_client_config); + + log::info!("connecting to relay: url={}", config.url); + + let session = webtransport_quinn::connect(&endpoint, &config.url) + .await + .context("failed to create WebTransport session")?; + + let (mut publisher, subscriber) = broadcast::new(""); // TODO config.namespace + + if config.publish { + let session = moq_transport::session::Client::publisher(session, subscriber) + .await + .context("failed to create MoQ Transport session")?; + + let publisher = publisher + .create_track(&config.track) + .context("failed to create clock track")?; + let clock = clock::Publisher::new(publisher); + + tokio::select! { + res = session.run() => res.context("session error")?, + res = clock.run() => res.context("clock error")?, + } + } else { + let session = moq_transport::session::Client::subscriber(session, publisher) + .await + .context("failed to create MoQ Transport session")?; + + let subscriber = subscriber + .get_track(&config.track) + .context("failed to get clock track")?; + let clock = clock::Subscriber::new(subscriber); + + tokio::select! { + res = session.run() => res.context("session error")?, + res = clock.run() => res.context("clock error")?, + } + } + + Ok(()) +} + +pub struct NoCertificateVerification {} + +impl rustls::client::ServerCertVerifier for NoCertificateVerification { + fn verify_server_cert( + &self, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) + } +} diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs index 1eed6b9..a651f33 100644 --- a/moq-pub/src/media.rs +++ b/moq-pub/src/media.rs @@ -42,19 +42,16 @@ impl Media { // Create the catalog track with a single segment. let mut init_track = broadcast.create_track("0.mp4")?; - let mut init_segment = init_track.create_segment(segment::Info { + let init_segment = init_track.create_segment(segment::Info { sequence: VarInt::ZERO, priority: 0, expires: None, })?; // Create a single fragment, optionally setting the size - let mut init_fragment = init_segment.create_fragment(fragment::Info { - sequence: VarInt::ZERO, - size: None, // size is only needed when we have multiple fragments. - })?; + let mut init_fragment = init_segment.final_fragment(VarInt::ZERO)?; - init_fragment.write_chunk(init.into())?; + init_fragment.chunk(init.into())?; let mut tracks = HashMap::new(); @@ -132,7 +129,7 @@ impl Media { init_track_name: &str, moov: &mp4::MoovBox, ) -> Result<(), anyhow::Error> { - let mut segment = track.create_segment(segment::Info { + let segment = track.create_segment(segment::Info { sequence: VarInt::ZERO, priority: 0, expires: None, @@ -218,13 +215,10 @@ impl Media { log::info!("catalog: {}", catalog_str); // Create a single fragment for the segment. - let mut fragment = segment.create_fragment(fragment::Info { - sequence: VarInt::ZERO, - size: None, // Size is only needed when we have multiple fragments. - })?; + let mut fragment = segment.final_fragment(VarInt::ZERO)?; // Add the segment and add the fragment. - fragment.write_chunk(catalog_str.into())?; + fragment.chunk(catalog_str.into())?; Ok(()) } @@ -295,7 +289,7 @@ impl Track { if let Some(current) = self.current.as_mut() { if !fragment.keyframe { // Use the existing segment - current.write_chunk(raw.into())?; + current.chunk(raw.into())?; return Ok(()); } } @@ -311,7 +305,7 @@ impl Track { .context("timestamp too large")?; // Create a new segment. - let mut segment = self.track.create_segment(segment::Info { + let segment = self.track.create_segment(segment::Info { sequence: VarInt::try_from(self.sequence).context("sequence too large")?, // Newer segments are higher priority @@ -322,15 +316,12 @@ impl Track { })?; // Create a single fragment for the segment that we will keep appending. - let mut fragment = segment.create_fragment(fragment::Info { - sequence: VarInt::ZERO, - size: None, - })?; + let mut fragment = segment.final_fragment(VarInt::ZERO)?; self.sequence += 1; // Insert the raw atom into the segment. - fragment.write_chunk(raw.into())?; + fragment.chunk(raw.into())?; // Save for the next iteration self.current = Some(fragment); @@ -340,7 +331,7 @@ impl Track { pub fn data(&mut self, raw: Vec) -> anyhow::Result<()> { let fragment = self.current.as_mut().context("missing current fragment")?; - fragment.write_chunk(raw.into())?; + fragment.chunk(raw.into())?; Ok(()) } diff --git a/moq-transport/Cargo.toml b/moq-transport/Cargo.toml index 661a117..93203e5 100644 --- a/moq-transport/Cargo.toml +++ b/moq-transport/Cargo.toml @@ -27,3 +27,26 @@ webtransport-quinn = "0.6" async-trait = "0.1" paste = "1" + +[dev-dependencies] +# QUIC +url = "2" + +# Crypto +rustls = { version = "0.21", features = ["dangerous_configuration"] } +rustls-native-certs = "0.6" +rustls-pemfile = "1" + +# Async stuff +tokio = { version = "1", features = ["full"] } + +# CLI, logging, error handling +clap = { version = "4", features = ["derive"] } +log = { version = "0.4", features = ["std"] } +env_logger = "0.9" +mp4 = "0.13" +anyhow = { version = "1", features = ["backtrace"] } +serde_json = "1" +rfc6381-codec = "0.1" +tracing = "0.1" +tracing-subscriber = "0.3" diff --git a/moq-transport/src/cache/fragment.rs b/moq-transport/src/cache/fragment.rs index 49cab62..4e08333 100644 --- a/moq-transport/src/cache/fragment.rs +++ b/moq-transport/src/cache/fragment.rs @@ -36,7 +36,7 @@ pub struct Info { // The size of the fragment, optionally None if this is the last fragment in a segment. // TODO enforce this size. - pub size: Option, + pub size: Option, } struct State { @@ -53,10 +53,6 @@ impl State { self.closed = Err(err); Ok(()) } - - pub fn bytes(&self) -> usize { - self.chunks.iter().map(|f| f.len()).sum::() - } } impl Default for State { @@ -71,11 +67,7 @@ impl Default for State { impl fmt::Debug for State { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // We don't want to print out the contents, so summarize. - f.debug_struct("State") - .field("chunks", &self.chunks.len().to_string()) - .field("bytes", &self.bytes().to_string()) - .field("closed", &self.closed) - .finish() + f.debug_struct("State").field("closed", &self.closed).finish() } } @@ -98,7 +90,7 @@ impl Publisher { } /// Write a new chunk of bytes. - pub fn write_chunk(&mut self, chunk: Bytes) -> Result<(), CacheError> { + pub fn chunk(&mut self, chunk: Bytes) -> Result<(), CacheError> { let mut state = self.state.lock_mut(); state.closed.clone()?; state.chunks.push(chunk); @@ -158,7 +150,7 @@ impl Subscriber { } /// Block until the next chunk of bytes is available. - pub async fn read_chunk(&mut self) -> Result, CacheError> { + pub async fn chunk(&mut self) -> Result, CacheError> { loop { let notify = { let state = self.state.lock(); diff --git a/moq-transport/src/cache/segment.rs b/moq-transport/src/cache/segment.rs index b4b31ef..ecd27de 100644 --- a/moq-transport/src/cache/segment.rs +++ b/moq-transport/src/cache/segment.rs @@ -91,18 +91,28 @@ impl Publisher { Self { state, info, _dropped } } - /// Write a fragment - pub fn push_fragment(&mut self, fragment: fragment::Subscriber) -> Result<(), CacheError> { + // Not public because it's a footgun. + pub(crate) fn push_fragment( + &mut self, + sequence: VarInt, + size: Option, + ) -> Result { + let (publisher, subscriber) = fragment::new(fragment::Info { sequence, size }); + let mut state = self.state.lock_mut(); state.closed.clone()?; - state.fragments.push(fragment); - Ok(()) + state.fragments.push(subscriber); + Ok(publisher) } - pub fn create_fragment(&mut self, fragment: fragment::Info) -> Result { - let (publisher, subscriber) = fragment::new(fragment); - self.push_fragment(subscriber)?; - Ok(publisher) + /// Write a fragment + pub fn fragment(&mut self, sequence: VarInt, size: usize) -> Result { + self.push_fragment(sequence, Some(size)) + } + + /// Write the last fragment, which means size can be unknown. + pub fn final_fragment(mut self, sequence: VarInt) -> Result { + self.push_fragment(sequence, None) } /// Close the segment with an error. @@ -158,7 +168,7 @@ impl Subscriber { } /// Block until the next chunk of bytes is available. - pub async fn next_fragment(&mut self) -> Result, CacheError> { + pub async fn fragment(&mut self) -> Result, CacheError> { loop { let notify = { let state = self.state.lock(); diff --git a/moq-transport/src/cache/track.rs b/moq-transport/src/cache/track.rs index ad4d380..6d2d405 100644 --- a/moq-transport/src/cache/track.rs +++ b/moq-transport/src/cache/track.rs @@ -207,7 +207,7 @@ impl Subscriber { } /// Block until the next segment arrives - pub async fn next_segment(&mut self) -> Result, CacheError> { + pub async fn segment(&mut self) -> Result, CacheError> { loop { let notify = { let state = self.state.lock(); diff --git a/moq-transport/src/session/error.rs b/moq-transport/src/session/error.rs index cb816db..228a4c8 100644 --- a/moq-transport/src/session/error.rs +++ b/moq-transport/src/session/error.rs @@ -48,6 +48,10 @@ pub enum SessionError { #[error("required extension not offered: {0:?}")] RequiredExtension(VarInt), + /// Some VarInt was too large and we were too lazy to handle it + #[error("varint bounds exceeded")] + BoundsExceeded(#[from] coding::BoundsExceeded), + /// An unclassified error because I'm lazy. TODO classify these errors #[error("unknown error: {0}")] Unknown(String), @@ -71,6 +75,7 @@ impl MoqError for SessionError { Self::InvalidPriority(_) => 400, Self::InvalidSize(_) => 400, Self::RequiredExtension(_) => 426, + Self::BoundsExceeded(_) => 500, } } @@ -96,6 +101,7 @@ impl MoqError for SessionError { Self::InvalidPriority(priority) => format!("invalid priority: {}", priority), Self::InvalidSize(size) => format!("invalid size: {}", size), Self::RequiredExtension(id) => format!("required extension was missing: {:?}", id), + Self::BoundsExceeded(_) => "varint bounds exceeded".to_string(), } } } diff --git a/moq-transport/src/session/publisher.rs b/moq-transport/src/session/publisher.rs index 05cb462..cf19bd3 100644 --- a/moq-transport/src/session/publisher.rs +++ b/moq-transport/src/session/publisher.rs @@ -170,7 +170,7 @@ impl Publisher { async fn run_subscribe(&self, id: VarInt, track: &mut track::Subscriber) -> Result<(), SessionError> { // TODO add an Ok method to track::Publisher so we can send SUBSCRIBE_OK - while let Some(mut segment) = track.next_segment().await? { + while let Some(mut segment) = track.segment().await? { // TODO only clone the fields we need let this = self.clone(); @@ -193,7 +193,9 @@ impl Publisher { let priority = (segment.priority as i64 - i32::MAX as i64) as i32; stream.set_priority(priority).ok(); - while let Some(mut fragment) = segment.next_fragment().await? { + while let Some(mut fragment) = segment.fragment().await? { + log::trace!("serving fragment: {:?}", fragment); + let object = message::Object { track: id, @@ -204,7 +206,7 @@ impl Publisher { // Properties of the fragment sequence: fragment.sequence, - size: fragment.size, + size: fragment.size.map(VarInt::try_from).transpose()?, }; object @@ -212,7 +214,8 @@ impl Publisher { .await .map_err(|e| SessionError::Unknown(e.to_string()))?; - while let Some(chunk) = fragment.read_chunk().await? { + while let Some(chunk) = fragment.chunk().await? { + //log::trace!("writing chunk: {:?}", chunk); stream.write_all(&chunk).await?; } } diff --git a/moq-transport/src/session/subscriber.rs b/moq-transport/src/session/subscriber.rs index a538dcf..02b5fbd 100644 --- a/moq-transport/src/session/subscriber.rs +++ b/moq-transport/src/session/subscriber.rs @@ -6,7 +6,7 @@ use std::{ }; use crate::{ - cache::{broadcast, fragment, segment, track, CacheError}, + cache::{broadcast, segment, track, CacheError}, coding::DecodeError, message, message::Message, @@ -110,7 +110,7 @@ impl Subscriber { .await .map_err(|e| SessionError::Unknown(e.to_string()))?; - log::trace!("received object: {:?}", object); + log::trace!("first object: {:?}", object); // A new scope is needed because the async compiler is dumb let mut segment = { @@ -124,12 +124,10 @@ impl Subscriber { })? }; - // Create the first fragment - let mut fragment = segment.create_fragment(fragment::Info { - sequence: object.sequence, - size: object.size, - })?; + log::trace!("received segment: {:?}", segment); + // Create the first fragment + let mut fragment = segment.push_fragment(object.sequence, object.size.map(usize::from))?; let mut remain = object.size.map(usize::from); loop { @@ -145,6 +143,8 @@ impl Subscriber { Err(err) => return Err(err.into()), }; + log::trace!("next object: {:?}", object); + // NOTE: This is a custom restriction; not part of the moq-transport draft. // We require every OBJECT to contain the same priority since prioritization is done per-stream. // We also require every OBJECT to contain the same group so we know when the group ends, and can detect gaps. @@ -152,14 +152,13 @@ impl Subscriber { return Err(SessionError::StreamMapping); } - // Create a new object. - fragment = segment.create_fragment(fragment::Info { - sequence: object.sequence, - size: object.size, - })?; - object = next; + + // Create a new object. + fragment = segment.push_fragment(object.sequence, object.size.map(usize::from))?; remain = object.size.map(usize::from); + + log::trace!("next fragment: {:?}", fragment); } match stream.read_chunk(remain.unwrap_or(usize::MAX), true).await? { @@ -171,7 +170,12 @@ impl Subscriber { // NOTE: This does not make a copy! // Bytes are immutable and ref counted. - Some(data) => fragment.write_chunk(data.bytes)?, + Some(data) => { + remain = remain.map(|r| r - data.bytes.len()); + + log::trace!("next chunk: {:?}", data); + fragment.chunk(data.bytes)?; + } } }