Compare commits

...

6 Commits

Author SHA1 Message Date
kixelated df5d362754
Add optional/required extensions. (#117) 2023-11-03 15:10:15 +09:00
kixelated ea701bcf7e
Also build the moq-pub image in this repo. (#116) 2023-11-03 13:56:45 +09:00
kixelated ddfe7963e6
Initial moq-transport-01 support (#115)
Co-authored-by: Mike English <mike.english@gmail.com>
2023-11-03 13:19:41 +09:00
kixelated d55c4a80d1
Add `--tls-root` and `--tls-disable-verify` to moq-pub. (#114) 2023-10-30 22:54:27 +09:00
kixelated 24cf36e923
Update HACKATHON.md 2023-10-25 15:39:39 +09:00
kixelated d69c7491ba
Hackathon (#113) 2023-10-25 15:28:47 +09:00
65 changed files with 1751 additions and 511 deletions

View File

@ -1,2 +1,3 @@
target target
dev dev
*.mp4

View File

@ -6,7 +6,8 @@ on:
env: env:
REGISTRY: docker.io REGISTRY: docker.io
IMAGE: ${{ github.repository }} IMAGE: kixelated/moq-rs
IMAGE-PUB: kixelated/moq-pub
SERVICE: api # Restart the API service TODO and relays SERVICE: api # Restart the API service TODO and relays
jobs: jobs:
@ -42,6 +43,16 @@ jobs:
tags: ${{env.REGISTRY}}/${{env.IMAGE}} tags: ${{env.REGISTRY}}/${{env.IMAGE}}
platforms: linux/amd64,linux/arm64 platforms: linux/amd64,linux/arm64
# Same, but include ffmpeg for publishing BBB
- uses: depot/build-push-action@v1
with:
project: r257ctfqm6
context: .
push: true
target: moq-pub # instead of the default target
tags: ${{env.REGISTRY}}/${{env.IMAGE-PUB}}
platforms: linux/amd64,linux/arm64
# Log in to GCP # Log in to GCP
- uses: google-github-actions/auth@v1 - uses: google-github-actions/auth@v1
with: with:

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
.DS_Store .DS_Store
target/ target/
logs/ logs/
*.mp4

9
Cargo.lock generated
View File

@ -1066,6 +1066,7 @@ dependencies = [
"rfc6381-codec", "rfc6381-codec",
"rustls", "rustls",
"rustls-native-certs", "rustls-native-certs",
"rustls-pemfile",
"serde_json", "serde_json",
"tokio", "tokio",
"tracing", "tracing",
@ -1106,9 +1107,11 @@ dependencies = [
name = "moq-transport" name = "moq-transport"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"async-trait",
"bytes", "bytes",
"indexmap 2.0.0", "indexmap 2.0.0",
"log", "log",
"paste",
"quinn", "quinn",
"thiserror", "thiserror",
"tokio", "tokio",
@ -1319,6 +1322,12 @@ dependencies = [
"windows-targets", "windows-targets",
] ]
[[package]]
name = "paste"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c"
[[package]] [[package]]
name = "percent-encoding" name = "percent-encoding"
version = "2.3.0" version = "2.3.0"

View File

@ -12,14 +12,28 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=/build/target \ --mount=type=cache,target=/build/target \
cargo build --release && cp /build/target/release/moq-* /usr/local/cargo/bin cargo build --release && cp /build/target/release/moq-* /usr/local/cargo/bin
# Runtime image # Special image for moq-pub with ffmpeg and a publish script included.
FROM rust:latest FROM rust:latest as moq-pub
# Install required utilities and ffmpeg
RUN apt-get update && \
apt-get install -y ffmpeg wget
# Copy the publish script into the image
COPY deploy/publish.sh /usr/local/bin/publish
# Copy the compiled binary
COPY --from=builder /usr/local/cargo/bin/moq-pub /usr/local/cargo/bin/moq-pub
CMD [ "publish" ]
# moq-rs image with just the binaries
FROM rust:latest as moq-rs
LABEL org.opencontainers.image.source=https://github.com/kixelated/moq-rs LABEL org.opencontainers.image.source=https://github.com/kixelated/moq-rs
LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0" LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0"
# Fly.io entrypoint # Fly.io entrypoint
ADD fly-relay.sh . ADD deploy/fly-relay.sh .
# Copy the compiled binaries # Copy the compiled binaries
COPY --from=builder /usr/local/cargo/bin /usr/local/cargo/bin COPY --from=builder /usr/local/cargo/bin /usr/local/cargo/bin

53
HACKATHON.md Normal file
View File

@ -0,0 +1,53 @@
# Hackathon
IETF Prague 118
## MoqTransport
Reference libraries are available at [moq-rs](https://github.com/kixelated/moq-rs) and [moq-js](https://github.com/kixelated/moq-js). The Rust library is [well documented](https://docs.rs/moq-transport/latest/moq_transport/) but the web library, not so much.
**TODO** Update both to draft-01.
**TODO** Switch any remaining forks over to extensions. ex: track_id in SUBSCRIBE
The stream mapping right now is quite rigid: `stream == group == object`.
**TODO** Support multiple objects per group. They MUST NOT use different priorities, different tracks, or out-of-order sequences.
The API and cache aren't designed to send/receive arbitrary objects over arbitrary streams as specified in the draft. I don't think it should, and it wouldn't be possible to implement in time for the hackathon anyway.
**TODO** Make an extension to enforce this stream mapping?
## Generic Relay
I'm hosting a simple CDN at: `relay.quic.video`
The traffic is sharded based on the WebTransport path to avoid namespace collisions. Think of it like a customer ID, although it's completely unauthenticated for now. Use your username or whatever string you want: `CONNECT https://relay.quic.video/alan`.
**TODO** Currently, it performs an implicit `ANNOUNCE ""` when `role=publisher`. This means there can only be a single publisher per shard and `role=both` is not supported. I should have explicit `ANNOUNCE` messages supported before the hackathon to remove this limitation.
**TODO** I don't know if I will have subscribe hints fully working in time. They will be parsed but might be ignored.
## CMAF Media
You can [publish](https://quic.video/publish) and [watch](https://quic.video/watch) broadcasts.
There's a [24/7 bunny stream](https://quic.video/watch/bbb) or you can publish your own using [moq-pub](https://github.com/kixelated/moq-rs/tree/main/moq-pub).
If you want to fetch from the relay directly, the name of the broadcast is the path. For example, `https://quic.video/watch/bbb` can be accessed at `relay.quic.video/bbb`.
The namespace is empty and the catalog track is `.catalog`. I'm currently using simple JSON catalog with no support for delta updates.
**TODO** update to the proposed [Warp catalog](https://datatracker.ietf.org/doc/draft-wilaw-moq-catalogformat/).
The media tracks uses a single (unbounded) object per group. Video groups are per GoP, while audio groups are per frame. There's also an init track containing information required to initialize the decoder.
**TODO** Base64 encode the init track in the catalog.
## Clock
**TODO** Host a clock demo that sends a group per second:
```
GROUP: YYYY-MM-DD HH:MM
OBJECT: SS
```

20
deploy/fly.toml Normal file
View File

@ -0,0 +1,20 @@
app = "englishm-moq-relay"
kill_signal = "SIGINT"
kill_timeout = 5
[env]
PORT = "4443"
[experimental]
cmd = "./fly-relay.sh"
[[services]]
internal_port = 4443
protocol = "udp"
[services.concurrency]
hard_limit = 25
soft_limit = 20
[[services.ports]]
port = "4443"

41
deploy/publish.sh Executable file
View File

@ -0,0 +1,41 @@
#!/bin/bash
set -euo pipefail
ADDR=${ADDR:-"https://relay.quic.video"}
NAME=${NAME:-"bbb"}
URL=${URL:-"http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"}
# Download the funny bunny
wget -nv "${URL}" -O "${NAME}.mp4"
# ffmpeg
# -hide_banner: Hide the banner
# -v quiet: and any other output
# -stats: But we still want some stats on stderr
# -stream_loop -1: Loop the broadcast an infinite number of times
# -re: Output in real-time
# -i "${INPUT}": Read from a file on disk
# -vf "drawtext": Render the current time in the corner of the video
# -an: Disable audio for now
# -b:v 3M: Output video at 3Mbps
# -preset ultrafast: Don't use much CPU at the cost of quality
# -tune zerolatency: Optimize for latency at the cost of quality
# -f mp4: Output to mp4 format
# -movflags: Build a fMP4 file with a frame per fragment
# - | moq-pub: Output to stdout and moq-pub to publish
# Run ffmpeg
ffmpeg \
-stream_loop -1 \
-hide_banner \
-v quiet \
-re \
-i "${NAME}.mp4" \
-vf "drawtext=fontfile=/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf:text='%{gmtime\: %H\\\\\:%M\\\\\:%S.%3N}':x=(W-tw)-24:y=24:fontsize=48:fontcolor=white:box=1:boxcolor=black@0.5" \
-an \
-b:v 3M \
-preset ultrafast \
-tune zerolatency \
-f mp4 \
-movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset \
- | moq-pub "${ADDR}/${NAME}"

View File

@ -83,9 +83,16 @@ The following command runs a development instance, broadcasing `dev/source.mp4`
``` ```
It will print out a URL when you can use to watch. It will print out a URL when you can use to watch.
This will contain a random broadcast name so the below link won't work: By default, the broadcast name is `dev` but you can overwrite it with the `NAME` env.
> Watch URL: https://quic.video/watch/REPLACE_WITH_NAME?server=localhost:4443 > Watch URL: https://quic.video/watch/dev?server=localhost:4443
If you're debugging encoding issues, you can use this script to dump the file to disk instead, defaulting to
`dev/output.mp4`.
```bash
./dev/pub-file
```
### moq-api ### moq-api

17
dev/pub
View File

@ -13,21 +13,28 @@ PORT="${PORT:-4443}"
ADDR="${ADDR:-$HOST:$PORT}" ADDR="${ADDR:-$HOST:$PORT}"
# Generate a random 16 character name by default. # Generate a random 16 character name by default.
NAME="${NAME:-$(head /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c 16)}" #NAME="${NAME:-$(head /dev/urandom | LC_ALL=C tr -dc 'a-zA-Z0-9' | head -c 16)}"
# JK use the name "dev" instead
# TODO use that random name if the host is not localhost
NAME="${NAME:-dev}"
# Combine the host and name into a URL. # Combine the host and name into a URL.
URL="${URL:-"https://$ADDR/$NAME"}" URL="${URL:-"https://$ADDR/$NAME"}"
# Default to a source video # Default to a source video
MEDIA="${MEDIA:-dev/source.mp4}" INPUT="${INPUT:-dev/source.mp4}"
# Print out the watch URL # Print out the watch URL
echo "Watch URL: https://quic.video/watch/$NAME?server=$ADDR" echo "Watch URL: https://quic.video/watch/$NAME?server=$ADDR"
# Run ffmpeg and pipe the output to moq-pub # Run ffmpeg and pipe the output to moq-pub
# TODO enable audio again once fixed.
ffmpeg -hide_banner -v quiet \ ffmpeg -hide_banner -v quiet \
-stream_loop -1 -re \ -stream_loop -1 -re \
-i "$MEDIA" \ -i "$INPUT" \
-c copy \
-an \ -an \
-f mp4 -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset - \ -f mp4 -movflags cmaf+separate_moof+delay_moov+skip_trailer \
| cargo run --bin moq-pub -- "$URL" "$@" -frag_duration 1 \
- | cargo run --bin moq-pub -- "$URL" "$@"

90
dev/pub-file Executable file
View File

@ -0,0 +1,90 @@
#!/bin/bash
set -euo pipefail
# Change directory to the root of the project
cd "$(dirname "$0")/.."
# Default to a source video
INPUT="${INPUT:-dev/source.mp4}"
# Output the fragmented MP4 to disk for testing.
OUTPUT="${OUTPUT:-dev/output.mp4}"
# Run ffmpeg the same as dev/pub, but:
# - print any errors/warnings
# - only loop twice
#
# Note this is artificially slowed down to real-time using the -re flag; you can remove it.
ffmpeg \
-re \
-y \
-i "$INPUT" \
-c copy \
-fps_mode passthrough \
-f mp4 -movflags cmaf+separate_moof+delay_moov+skip_trailer \
-frag_duration 1 \
"${OUTPUT}"
# % ffmpeg -f mp4 --ffmpeg -h muxer=mov
#
# ffmpeg version 6.0 Copyright (c) 2000-2023 the FFmpeg developers
# Muxer mov [QuickTime / MOV]:
# Common extensions: mov.
# Default video codec: h264.
# Default audio codec: aac.
# mov/mp4/tgp/psp/tg2/ipod/ismv/f4v muxer AVOptions:
# -movflags <flags> E.......... MOV muxer flags (default 0)
# rtphint E.......... Add RTP hint tracks
# empty_moov E.......... Make the initial moov atom empty
# frag_keyframe E.......... Fragment at video keyframes
# frag_every_frame E.......... Fragment at every frame
# separate_moof E.......... Write separate moof/mdat atoms for each track
# frag_custom E.......... Flush fragments on caller requests
# isml E.......... Create a live smooth streaming feed (for pushing to a publishing point)
# faststart E.......... Run a second pass to put the index (moov atom) at the beginning of the file
# omit_tfhd_offset E.......... Omit the base data offset in tfhd atoms
# disable_chpl E.......... Disable Nero chapter atom
# default_base_moof E.......... Set the default-base-is-moof flag in tfhd atoms
# dash E.......... Write DASH compatible fragmented MP4
# cmaf E.......... Write CMAF compatible fragmented MP4
# frag_discont E.......... Signal that the next fragment is discontinuous from earlier ones
# delay_moov E.......... Delay writing the initial moov until the first fragment is cut, or until the first fragment flush
# global_sidx E.......... Write a global sidx index at the start of the file
# skip_sidx E.......... Skip writing of sidx atom
# write_colr E.......... Write colr atom even if the color info is unspecified (Experimental, may be renamed or changed, do not use from scripts)
# prefer_icc E.......... If writing colr atom prioritise usage of ICC profile if it exists in stream packet side data
# write_gama E.......... Write deprecated gama atom
# use_metadata_tags E.......... Use mdta atom for metadata.
# skip_trailer E.......... Skip writing the mfra/tfra/mfro trailer for fragmented files
# negative_cts_offsets E.......... Use negative CTS offsets (reducing the need for edit lists)
# -moov_size <int> E.......... maximum moov size so it can be placed at the begin (from 0 to INT_MAX) (default 0)
# -rtpflags <flags> E.......... RTP muxer flags (default 0)
# latm E.......... Use MP4A-LATM packetization instead of MPEG4-GENERIC for AAC
# rfc2190 E.......... Use RFC 2190 packetization instead of RFC 4629 for H.263
# skip_rtcp E.......... Don't send RTCP sender reports
# h264_mode0 E.......... Use mode 0 for H.264 in RTP
# send_bye E.......... Send RTCP BYE packets when finishing
# -skip_iods <boolean> E.......... Skip writing iods atom. (default true)
# -iods_audio_profile <int> E.......... iods audio profile atom. (from -1 to 255) (default -1)
# -iods_video_profile <int> E.......... iods video profile atom. (from -1 to 255) (default -1)
# -frag_duration <int> E.......... Maximum fragment duration (from 0 to INT_MAX) (default 0)
# -min_frag_duration <int> E.......... Minimum fragment duration (from 0 to INT_MAX) (default 0)
# -frag_size <int> E.......... Maximum fragment size (from 0 to INT_MAX) (default 0)
# -ism_lookahead <int> E.......... Number of lookahead entries for ISM files (from 0 to 255) (default 0)
# -video_track_timescale <int> E.......... set timescale of all video tracks (from 0 to INT_MAX) (default 0)
# -brand <string> E.......... Override major brand
# -use_editlist <boolean> E.......... use edit list (default auto)
# -fragment_index <int> E.......... Fragment number of the next fragment (from 1 to INT_MAX) (default 1)
# -mov_gamma <float> E.......... gamma value for gama atom (from 0 to 10) (default 0)
# -frag_interleave <int> E.......... Interleave samples within fragments (max number of consecutive samples, lower is tighter interleaving, but with more overhead) (from 0 to INT_MAX) (default 0)
# -encryption_scheme <string> E.......... Configures the encryption scheme, allowed values are none, cenc-aes-ctr
# -encryption_key <binary> E.......... The media encryption key (hex)
# -encryption_kid <binary> E.......... The media encryption key identifier (hex)
# -use_stream_ids_as_track_ids <boolean> E.......... use stream ids as track ids (default false)
# -write_btrt <boolean> E.......... force or disable writing btrt (default auto)
# -write_tmcd <boolean> E.......... force or disable writing tmcd (default auto)
# -write_prft <int> E.......... Write producer reference time box with specified time source (from 0 to 2) (default 0)
# wallclock 1 E..........
# pts 2 E..........
# -empty_hdlr_name <boolean> E.......... write zero-length name string in hdlr atoms within mdia and minf atoms (default false)
# -movie_timescale <int> E.......... set movie timescale (from 1 to INT_MAX) (default 1000)

View File

@ -1,19 +0,0 @@
app = "englishm-moq-relay"
kill_signal = "SIGINT"
kill_timeout = 5
[env]
PORT = "4443"
[experimental]
cmd = "./fly-relay.sh"
[[services]]
internal_port = 4443
protocol = "udp"
[services.concurrency]
hard_limit = 25
soft_limit = 20
[[services.ports]]
port = "4443"

View File

@ -1,7 +1,7 @@
[package] [package]
name = "moq-pub" name = "moq-pub"
description = "Media over QUIC" description = "Media over QUIC"
authors = ["Mike English"] authors = ["Mike English", "Luke Curley"]
repository = "https://github.com/kixelated/moq-rs" repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
@ -23,8 +23,9 @@ webtransport-quinn = "0.6"
url = "2" url = "2"
# Crypto # Crypto
rustls = "0.21" rustls = { version = "0.21", features = ["dangerous_configuration"] }
rustls-native-certs = "0.6" rustls-native-certs = "0.6"
rustls-pemfile = "1"
# Async stuff # Async stuff
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }

View File

@ -5,7 +5,7 @@ A command line tool for publishing media via Media over QUIC (MoQ).
Expects to receive fragmented MP4 via standard input and connect to a MOQT relay. Expects to receive fragmented MP4 via standard input and connect to a MOQT relay.
``` ```
ffmpeg ... - | moq-pub -i - --host localhost:4443 ffmpeg ... - | moq-pub https://localhost:4443
``` ```
### Invoking `moq-pub`: ### Invoking `moq-pub`:
@ -13,7 +13,7 @@ ffmpeg ... - | moq-pub -i - --host localhost:4443
Here's how I'm currently testing things, with a local copy of Big Buck Bunny named `bbb_source.mp4`: Here's how I'm currently testing things, with a local copy of Big Buck Bunny named `bbb_source.mp4`:
``` ```
$ ffmpeg -hide_banner -v quiet -stream_loop -1 -re -i bbb_source.mp4 -an -f mp4 -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset - | RUST_LOG=moq_pub=info moq-pub -i - $ ffmpeg -hide_banner -v quiet -stream_loop -1 -re -i bbb_source.mp4 -an -f mp4 -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset - | RUST_LOG=moq_pub=info moq-pub https://localhost:4443
``` ```
This relies on having `moq-relay` (the relay server) already running locally in another shell. This relies on having `moq-relay` (the relay server) already running locally in another shell.

View File

@ -1,5 +1,5 @@
use clap::Parser; use clap::Parser;
use std::net; use std::{net, path};
use url::Url; use url::Url;
#[derive(Parser, Clone, Debug)] #[derive(Parser, Clone, Debug)]
@ -21,6 +21,19 @@ pub struct Config {
/// Connect to the given URL starting with https:// /// Connect to the given URL starting with https://
#[arg(value_parser = moq_url)] #[arg(value_parser = moq_url)]
pub url: Url, pub url: Url,
/// Use the TLS root CA at this path, encoded as PEM.
///
/// This value can be provided multiple times for multiple roots.
/// If this is empty, system roots will be used instead
#[arg(long)]
pub tls_root: Vec<path::PathBuf>,
/// Danger: Disable TLS certificate verification.
///
/// Fine for local development, but should be used in caution in production.
#[arg(long)]
pub tls_disable_verify: bool,
} }
fn moq_url(s: &str) -> Result<Url, String> { fn moq_url(s: &str) -> Result<Url, String> {

View File

@ -1,3 +1,5 @@
use std::{fs, io, sync::Arc, time};
use anyhow::Context; use anyhow::Context;
use clap::Parser; use clap::Parser;
@ -26,10 +28,28 @@ async fn main() -> anyhow::Result<()> {
let (publisher, subscriber) = broadcast::new(""); let (publisher, subscriber) = broadcast::new("");
let mut media = Media::new(&config, publisher).await?; let mut media = Media::new(&config, publisher).await?;
// Ugh, just let me use my native root certs already // Create a list of acceptable root certificates.
let mut roots = rustls::RootCertStore::empty(); let mut roots = rustls::RootCertStore::empty();
for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs") {
roots.add(&rustls::Certificate(cert.0)).unwrap(); if config.tls_root.is_empty() {
// Add the platform's native root certificates.
for cert in rustls_native_certs::load_native_certs().context("could not load platform certs")? {
roots
.add(&rustls::Certificate(cert.0))
.context("failed to add root cert")?;
}
} else {
// Add the specified root certificates.
for root in &config.tls_root {
let root = fs::File::open(root).context("failed to open root cert file")?;
let mut root = io::BufReader::new(root);
let root = rustls_pemfile::certs(&mut root).context("failed to read root cert")?;
anyhow::ensure!(root.len() == 1, "expected a single root cert");
let root = rustls::Certificate(root[0].to_owned());
roots.add(&root).context("failed to add root cert")?;
}
} }
let mut tls_config = rustls::ClientConfig::builder() let mut tls_config = rustls::ClientConfig::builder()
@ -37,6 +57,12 @@ async fn main() -> anyhow::Result<()> {
.with_root_certificates(roots) .with_root_certificates(roots)
.with_no_client_auth(); .with_no_client_auth();
// Allow disabling TLS verification altogether.
if config.tls_disable_verify {
let noop = NoCertificateVerification {};
tls_config.dangerous().set_certificate_verifier(Arc::new(noop));
}
tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important
let arc_tls_config = std::sync::Arc::new(tls_config); let arc_tls_config = std::sync::Arc::new(tls_config);
@ -63,3 +89,19 @@ async fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
pub struct NoCertificateVerification {}
impl rustls::client::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}

View File

@ -1,9 +1,10 @@
use crate::cli::Config; use crate::cli::Config;
use anyhow::{self, Context}; use anyhow::{self, Context};
use moq_transport::cache::{broadcast, segment, track}; use moq_transport::cache::{broadcast, fragment, segment, track};
use moq_transport::VarInt; use moq_transport::VarInt;
use mp4::{self, ReadBox}; use mp4::{self, ReadBox};
use serde_json::json; use serde_json::json;
use std::cmp::max;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Cursor; use std::io::Cursor;
use std::time; use std::time;
@ -15,11 +16,12 @@ pub struct Media {
_catalog: track::Publisher, _catalog: track::Publisher,
_init: track::Publisher, _init: track::Publisher,
tracks: HashMap<String, Track>, // Tracks based on their track ID.
tracks: HashMap<u32, Track>,
} }
impl Media { impl Media {
pub async fn new(config: &Config, mut broadcast: broadcast::Publisher) -> anyhow::Result<Self> { pub async fn new(_config: &Config, mut broadcast: broadcast::Publisher) -> anyhow::Result<Self> {
let mut stdin = tokio::io::stdin(); let mut stdin = tokio::io::stdin();
let ftyp = read_atom(&mut stdin).await?; let ftyp = read_atom(&mut stdin).await?;
anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom"); anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom");
@ -39,33 +41,39 @@ impl Media {
let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?; let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?;
// Create the catalog track with a single segment. // Create the catalog track with a single segment.
let mut init_track = broadcast.create_track("1.mp4")?; let mut init_track = broadcast.create_track("0.mp4")?;
let mut init_segment = init_track.create_segment(segment::Info { let mut init_segment = init_track.create_segment(segment::Info {
sequence: VarInt::ZERO, sequence: VarInt::ZERO,
priority: i32::MAX, priority: 0,
expires: None, expires: None,
})?; })?;
init_segment.write_chunk(init.into())?; // Create a single fragment, optionally setting the size
let mut init_fragment = init_segment.create_fragment(fragment::Info {
sequence: VarInt::ZERO,
size: None, // size is only needed when we have multiple fragments.
})?;
init_fragment.write_chunk(init.into())?;
let mut tracks = HashMap::new(); let mut tracks = HashMap::new();
for trak in &moov.traks { for trak in &moov.traks {
let id = trak.tkhd.track_id; let id = trak.tkhd.track_id;
let name = id.to_string(); let name = format!("{}.m4s", id);
let timescale = track_timescale(&moov, id); let timescale = track_timescale(&moov, id);
// Store the track publisher in a map so we can update it later. // Store the track publisher in a map so we can update it later.
let track = broadcast.create_track(&name)?; let track = broadcast.create_track(&name)?;
let track = Track::new(track, timescale); let track = Track::new(track, timescale);
tracks.insert(name, track); tracks.insert(id, track);
} }
let mut catalog = broadcast.create_track(".catalog")?; let mut catalog = broadcast.create_track(".catalog")?;
// Create the catalog track // Create the catalog track
Self::serve_catalog(&mut catalog, config, init_track.name.to_string(), &moov, &tracks)?; Self::serve_catalog(&mut catalog, &init_track.name, &moov)?;
Ok(Media { Ok(Media {
_broadcast: broadcast, _broadcast: broadcast,
@ -78,7 +86,7 @@ impl Media {
pub async fn run(&mut self) -> anyhow::Result<()> { pub async fn run(&mut self) -> anyhow::Result<()> {
let mut stdin = tokio::io::stdin(); let mut stdin = tokio::io::stdin();
// The current track name // The current track name
let mut track_name = None; let mut current = None;
loop { loop {
let atom = read_atom(&mut stdin).await?; let atom = read_atom(&mut stdin).await?;
@ -92,22 +100,21 @@ impl Media {
// Process the moof. // Process the moof.
let fragment = Fragment::new(moof)?; let fragment = Fragment::new(moof)?;
let name = fragment.track.to_string();
// Get the track for this moof. // Get the track for this moof.
let track = self.tracks.get_mut(&name).context("failed to find track")?; let track = self.tracks.get_mut(&fragment.track).context("failed to find track")?;
// Save the track ID for the next iteration, which must be a mdat. // Save the track ID for the next iteration, which must be a mdat.
anyhow::ensure!(track_name.is_none(), "multiple moof atoms"); anyhow::ensure!(current.is_none(), "multiple moof atoms");
track_name.replace(name); current.replace(fragment.track);
// Publish the moof header, creating a new segment if it's a keyframe. // Publish the moof header, creating a new segment if it's a keyframe.
track.header(atom, fragment).context("failed to publish moof")?; track.header(atom, fragment).context("failed to publish moof")?;
} }
mp4::BoxType::MdatBox => { mp4::BoxType::MdatBox => {
// Get the track ID from the previous moof. // Get the track ID from the previous moof.
let name = track_name.take().context("missing moof")?; let track = current.take().context("missing moof")?;
let track = self.tracks.get_mut(&name).context("failed to find track")?; let track = self.tracks.get_mut(&track).context("failed to find track")?;
// Publish the mdat atom. // Publish the mdat atom.
track.data(atom).context("failed to publish mdat")?; track.data(atom).context("failed to publish mdat")?;
@ -122,33 +129,31 @@ impl Media {
fn serve_catalog( fn serve_catalog(
track: &mut track::Publisher, track: &mut track::Publisher,
config: &Config, init_track_name: &str,
init_track_name: String,
moov: &mp4::MoovBox, moov: &mp4::MoovBox,
_tracks: &HashMap<String, Track>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let mut segment = track.create_segment(segment::Info { let mut segment = track.create_segment(segment::Info {
sequence: VarInt::ZERO, sequence: VarInt::ZERO,
priority: i32::MAX, priority: 0,
expires: None, expires: None,
})?; })?;
let mut tracks = Vec::new();
for trak in &moov.traks {
let mut track = json!({
"container": "mp4",
"init_track": init_track_name,
"data_track": format!("{}.m4s", trak.tkhd.track_id),
});
let stsd = &trak.mdia.minf.stbl.stsd;
if let Some(avc1) = &stsd.avc1 {
// avc1[.PPCCLL] // avc1[.PPCCLL]
// //
// let profile = 0x64; // let profile = 0x64;
// let constraints = 0x00; // let constraints = 0x00;
// let level = 0x1f; // let level = 0x1f;
// TODO: do build multi-track catalog by looping through moov.traks
let trak = moov.traks[0].clone();
let avc1 = trak
.mdia
.minf
.stbl
.stsd
.avc1
.ok_or(anyhow::anyhow!("avc1 atom not found"))?;
let profile = avc1.avcc.avc_profile_indication; let profile = avc1.avcc.avc_profile_indication;
let constraints = avc1.avcc.profile_compatibility; // Not 100% certain here, but it's 0x00 on my current test video let constraints = avc1.avcc.profile_compatibility; // Not 100% certain here, but it's 0x00 on my current test video
let level = avc1.avcc.avc_level_indication; let level = avc1.avcc.avc_level_indication;
@ -159,26 +164,67 @@ impl Media {
let codec = rfc6381_codec::Codec::avc1(profile, constraints, level); let codec = rfc6381_codec::Codec::avc1(profile, constraints, level);
let codec_str = codec.to_string(); let codec_str = codec.to_string();
let catalog = json!({ track["kind"] = json!("video");
"tracks": [ track["codec"] = json!(codec_str);
{ track["width"] = json!(width);
"container": "mp4", track["height"] = json!(height);
"kind": "video", } else if let Some(_hev1) = &stsd.hev1 {
"init_track": init_track_name, // TODO https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L106
"data_track": "1", // assume just one track for now anyhow::bail!("HEVC not yet supported")
"codec": codec_str, } else if let Some(mp4a) = &stsd.mp4a {
"width": width, let desc = &mp4a
"height": height, .esds
"frame_rate": config.fps, .as_ref()
"bit_rate": config.bitrate, .context("missing esds box for MP4a")?
.es_desc
.dec_config;
let codec_str = format!("mp4a.{:02x}.{}", desc.object_type_indication, desc.dec_specific.profile);
track["kind"] = json!("audio");
track["codec"] = json!(codec_str);
track["channel_count"] = json!(mp4a.channelcount);
track["sample_rate"] = json!(mp4a.samplerate.value());
track["sample_size"] = json!(mp4a.samplesize);
let bitrate = max(desc.max_bitrate, desc.avg_bitrate);
if bitrate > 0 {
track["bit_rate"] = json!(bitrate);
} }
] } else if let Some(vp09) = &stsd.vp09 {
// https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L238
let vpcc = &vp09.vpcc;
let codec_str = format!("vp09.0.{:02x}.{:02x}.{:02x}", vpcc.profile, vpcc.level, vpcc.bit_depth);
track["kind"] = json!("video");
track["codec"] = json!(codec_str);
track["width"] = json!(vp09.width); // no idea if this needs to be multiplied
track["height"] = json!(vp09.height); // no idea if this needs to be multiplied
// TODO Test if this actually works; I'm just guessing based on mp4box.js
anyhow::bail!("VP9 not yet supported")
} else {
// TODO add av01 support: https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L251
anyhow::bail!("unknown codec for track: {}", trak.tkhd.track_id);
}
tracks.push(track);
}
let catalog = json!({
"tracks": tracks
}); });
let catalog_str = serde_json::to_string_pretty(&catalog)?; let catalog_str = serde_json::to_string_pretty(&catalog)?;
log::info!("catalog: {}", catalog_str); log::info!("catalog: {}", catalog_str);
// Create a single fragment for the segment.
let mut fragment = segment.create_fragment(fragment::Info {
sequence: VarInt::ZERO,
size: None, // Size is only needed when we have multiple fragments.
})?;
// Add the segment and add the fragment. // Add the segment and add the fragment.
segment.write_chunk(catalog_str.into())?; fragment.write_chunk(catalog_str.into())?;
Ok(()) Ok(())
} }
@ -226,7 +272,7 @@ struct Track {
track: track::Publisher, track: track::Publisher,
// The current segment // The current segment
segment: Option<segment::Publisher>, current: Option<fragment::Publisher>,
// The number of units per second. // The number of units per second.
timescale: u64, timescale: u64,
@ -240,16 +286,16 @@ impl Track {
Self { Self {
track, track,
sequence: 0, sequence: 0,
segment: None, current: None,
timescale, timescale,
} }
} }
pub fn header(&mut self, raw: Vec<u8>, fragment: Fragment) -> anyhow::Result<()> { pub fn header(&mut self, raw: Vec<u8>, fragment: Fragment) -> anyhow::Result<()> {
if let Some(segment) = self.segment.as_mut() { if let Some(current) = self.current.as_mut() {
if !fragment.keyframe { if !fragment.keyframe {
// Use the existing segment // Use the existing segment
segment.write_chunk(raw.into())?; current.write_chunk(raw.into())?;
return Ok(()); return Ok(());
} }
} }
@ -258,7 +304,7 @@ impl Track {
// Compute the timestamp in milliseconds. // Compute the timestamp in milliseconds.
// Overflows after 583 million years, so we're fine. // Overflows after 583 million years, so we're fine.
let timestamp: i32 = fragment let timestamp: u32 = fragment
.timestamp(self.timescale) .timestamp(self.timescale)
.as_millis() .as_millis()
.try_into() .try_into()
@ -267,26 +313,34 @@ impl Track {
// Create a new segment. // Create a new segment.
let mut segment = self.track.create_segment(segment::Info { let mut segment = self.track.create_segment(segment::Info {
sequence: VarInt::try_from(self.sequence).context("sequence too large")?, sequence: VarInt::try_from(self.sequence).context("sequence too large")?,
priority: timestamp, // newer segments are higher priority
// Newer segments are higher priority
priority: u32::MAX.checked_sub(timestamp).context("priority too large")?,
// Delete segments after 10s. // Delete segments after 10s.
expires: Some(time::Duration::from_secs(10)), expires: Some(time::Duration::from_secs(10)),
})?; })?;
// Create a single fragment for the segment that we will keep appending.
let mut fragment = segment.create_fragment(fragment::Info {
sequence: VarInt::ZERO,
size: None,
})?;
self.sequence += 1; self.sequence += 1;
// Insert the raw atom into the segment. // Insert the raw atom into the segment.
segment.write_chunk(raw.into())?; fragment.write_chunk(raw.into())?;
// Save for the next iteration // Save for the next iteration
self.segment = Some(segment); self.current = Some(fragment);
Ok(()) Ok(())
} }
pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> { pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
let segment = self.segment.as_mut().context("missing segment")?; let fragment = self.current.as_mut().context("missing current fragment")?;
segment.write_chunk(raw.into())?; fragment.write_chunk(raw.into())?;
Ok(()) Ok(())
} }

View File

@ -23,7 +23,7 @@ url = "2"
# Crypto # Crypto
ring = "0.16" ring = "0.16"
rustls = "0.21" rustls = { version = "0.21", features = ["dangerous_configuration"] }
rustls-pemfile = "1" rustls-pemfile = "1"
rustls-native-certs = "0.6" rustls-native-certs = "0.6"
webpki = "0.22" webpki = "0.22"

View File

@ -31,6 +31,12 @@ pub struct Config {
#[arg(long)] #[arg(long)]
pub tls_root: Vec<path::PathBuf>, pub tls_root: Vec<path::PathBuf>,
/// Danger: Disable TLS certificate verification.
///
/// Fine for local development and between relays, but should be used in caution in production.
#[arg(long)]
pub tls_disable_verify: bool,
/// Optional: Use the moq-api via HTTP to store origin information. /// Optional: Use the moq-api via HTTP to store origin information.
#[arg(long)] #[arg(long)]
pub api: Option<Url>, pub api: Option<Url>,

View File

@ -37,15 +37,15 @@ impl moq_transport::MoqError for RelayError {
} }
} }
fn reason(&self) -> &str { fn reason(&self) -> String {
match self { match self {
Self::Transport(err) => err.reason(), Self::Transport(err) => format!("transport error: {}", err.reason()),
Self::Cache(err) => err.reason(), Self::Cache(err) => format!("cache error: {}", err.reason()),
Self::MoqApi(_err) => "api error", Self::MoqApi(err) => format!("api error: {}", err),
Self::Url(_) => "url error", Self::Url(err) => format!("url error: {}", err),
Self::MissingNode => "missing node", Self::MissingNode => "missing node".to_owned(),
Self::WebTransportServer(_) => "server error", Self::WebTransportServer(err) => format!("upstream server error: {}", err),
Self::WebTransportClient(_) => "upstream error", Self::WebTransportClient(err) => format!("upstream client error: {}", err),
} }
} }
} }

View File

@ -19,8 +19,8 @@ pub struct Quic {
impl Quic { impl Quic {
// Create a QUIC endpoint that can be used for both clients and servers. // Create a QUIC endpoint that can be used for both clients and servers.
pub async fn new(config: Config, tls: Tls) -> anyhow::Result<Self> { pub async fn new(config: Config, tls: Tls) -> anyhow::Result<Self> {
let mut client_config = tls.client(); let mut client_config = tls.client.clone();
let mut server_config = tls.server(); let mut server_config = tls.server.clone();
client_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; client_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()];
server_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; server_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()];

View File

@ -3,23 +3,19 @@ use ring::digest::{digest, SHA256};
use rustls::server::{ClientHello, ResolvesServerCert}; use rustls::server::{ClientHello, ResolvesServerCert};
use rustls::sign::CertifiedKey; use rustls::sign::CertifiedKey;
use rustls::{Certificate, PrivateKey, RootCertStore}; use rustls::{Certificate, PrivateKey, RootCertStore};
use rustls::{ClientConfig, ServerConfig};
use std::fs;
use std::io::{self, Cursor, Read}; use std::io::{self, Cursor, Read};
use std::path; use std::path;
use std::sync::Arc; use std::sync::Arc;
use std::{fs, time};
use webpki::{DnsNameRef, EndEntityCert}; use webpki::{DnsNameRef, EndEntityCert};
use crate::Config; use crate::Config;
#[derive(Clone)] #[derive(Clone)]
pub struct Tls { pub struct Tls {
// Support serving multiple certificates, choosing one that looks valid for the given SNI. pub server: rustls::ServerConfig,
// We store the parsed certificate, and the certified cert/key that rustls expects pub client: rustls::ClientConfig,
serve: Arc<ServeCerts>, pub fingerprints: Vec<String>,
// Accept any cert that is trusted by the system's native trust store.
accept: Arc<RootCertStore>,
} }
impl Tls { impl Tls {
@ -56,32 +52,34 @@ impl Tls {
} }
} }
// Create the TLS configuration we'll use as a client (relay -> relay)
let mut client = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(roots)
.with_no_client_auth();
// Allow disabling TLS verification altogether.
if config.tls_disable_verify {
let noop = NoCertificateVerification {};
client.dangerous().set_certificate_verifier(Arc::new(noop));
}
let fingerprints = serve.fingerprints();
// Create the TLS configuration we'll use as a server (relay <- browser)
let server = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_cert_resolver(Arc::new(serve));
let certs = Self { let certs = Self {
serve: Arc::new(serve), server,
accept: Arc::new(roots), client,
fingerprints,
}; };
Ok(certs) Ok(certs)
} }
pub fn client(&self) -> ClientConfig {
rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(self.accept.clone())
.with_no_client_auth()
}
pub fn server(&self) -> ServerConfig {
rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_cert_resolver(self.serve.clone())
}
// Return the SHA256 fingerprint of our certificates.
pub fn fingerprints(&self) -> Vec<String> {
self.serve.fingerprints()
}
} }
#[derive(Default)] #[derive(Default)]
@ -166,3 +164,19 @@ impl ResolvesServerCert for ServeCerts {
self.list.last().cloned() self.list.last().cloned()
} }
} }
pub struct NoCertificateVerification {}
impl rustls::client::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}

View File

@ -17,9 +17,9 @@ impl Web {
pub fn new(config: Config, tls: Tls) -> Self { pub fn new(config: Config, tls: Tls) -> Self {
// Get the first certificate's fingerprint. // Get the first certificate's fingerprint.
// TODO serve all of them so we can support multiple signature algorithms. // TODO serve all of them so we can support multiple signature algorithms.
let fingerprint = tls.fingerprints().first().expect("missing certificate").clone(); let fingerprint = tls.fingerprints.first().expect("missing certificate").clone();
let mut tls_config = tls.server(); let mut tls_config = tls.server.clone();
tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
let tls_config = axum_server::tls_rustls::RustlsConfig::from_config(Arc::new(tls_config)); let tls_config = axum_server::tls_rustls::RustlsConfig::from_config(Arc::new(tls_config));

View File

@ -24,3 +24,6 @@ indexmap = "2"
quinn = "0.10" quinn = "0.10"
webtransport-quinn = "0.6" webtransport-quinn = "0.6"
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" } #webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }
async-trait = "0.1"
paste = "1"

View File

@ -136,12 +136,12 @@ impl Publisher {
} }
/// Block until the next track requested by a subscriber. /// Block until the next track requested by a subscriber.
pub async fn next_track(&mut self) -> Result<Option<track::Publisher>, CacheError> { pub async fn next_track(&mut self) -> Result<track::Publisher, CacheError> {
loop { loop {
let notify = { let notify = {
let state = self.state.lock(); let state = self.state.lock();
if state.has_next()? { if state.has_next()? {
return Ok(Some(state.into_mut().next())); return Ok(state.into_mut().next());
} }
state.changed() state.changed()

View File

@ -39,13 +39,13 @@ impl MoqError for CacheError {
} }
/// A reason that is sent over the wire. /// A reason that is sent over the wire.
fn reason(&self) -> &str { fn reason(&self) -> String {
match self { match self {
Self::Closed => "closed", Self::Closed => "closed".to_owned(),
Self::Reset(_) => "reset", Self::Reset(code) => format!("reset code: {}", code),
Self::Stop => "stop", Self::Stop => "stop".to_owned(),
Self::NotFound => "not found", Self::NotFound => "not found".to_owned(),
Self::Duplicate => "duplicate", Self::Duplicate => "duplicate".to_owned(),
} }
} }
} }

216
moq-transport/src/cache/fragment.rs vendored Normal file
View File

@ -0,0 +1,216 @@
//! A fragment is a stream of bytes with a header, split into a [Publisher] and [Subscriber] handle.
//!
//! A [Publisher] writes an ordered stream of bytes in chunks.
//! There's no framing, so these chunks can be of any size or position, and won't be maintained over the network.
//!
//! A [Subscriber] reads an ordered stream of bytes in chunks.
//! These chunks are returned directly from the QUIC connection, so they may be of any size or position.
//! You can clone the [Subscriber] and each will read a copy of of all future chunks. (fanout)
//!
//! The fragment is closed with [CacheError::Closed] when all publishers or subscribers are dropped.
use core::fmt;
use std::{ops::Deref, sync::Arc};
use crate::VarInt;
use bytes::Bytes;
use super::{CacheError, Watch};
/// Create a new segment with the given info.
pub fn new(info: Info) -> (Publisher, Subscriber) {
let state = Watch::new(State::default());
let info = Arc::new(info);
let publisher = Publisher::new(state.clone(), info.clone());
let subscriber = Subscriber::new(state, info);
(publisher, subscriber)
}
/// Static information about the segment.
#[derive(Debug)]
pub struct Info {
// The sequence number of the fragment within the segment.
// NOTE: These may be received out of order or with gaps.
pub sequence: VarInt,
// The size of the fragment, optionally None if this is the last fragment in a segment.
// TODO enforce this size.
pub size: Option<VarInt>,
}
struct State {
// The data that has been received thus far.
chunks: Vec<Bytes>,
// Set when the publisher is dropped.
closed: Result<(), CacheError>,
}
impl State {
pub fn close(&mut self, err: CacheError) -> Result<(), CacheError> {
self.closed.clone()?;
self.closed = Err(err);
Ok(())
}
pub fn bytes(&self) -> usize {
self.chunks.iter().map(|f| f.len()).sum::<usize>()
}
}
impl Default for State {
fn default() -> Self {
Self {
chunks: Vec::new(),
closed: Ok(()),
}
}
}
impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// We don't want to print out the contents, so summarize.
f.debug_struct("State")
.field("chunks", &self.chunks.len().to_string())
.field("bytes", &self.bytes().to_string())
.field("closed", &self.closed)
.finish()
}
}
/// Used to write data to a segment and notify subscribers.
pub struct Publisher {
// Mutable segment state.
state: Watch<State>,
// Immutable segment state.
info: Arc<Info>,
// Closes the segment when all Publishers are dropped.
_dropped: Arc<Dropped>,
}
impl Publisher {
fn new(state: Watch<State>, info: Arc<Info>) -> Self {
let _dropped = Arc::new(Dropped::new(state.clone()));
Self { state, info, _dropped }
}
/// Write a new chunk of bytes.
pub fn write_chunk(&mut self, chunk: Bytes) -> Result<(), CacheError> {
let mut state = self.state.lock_mut();
state.closed.clone()?;
state.chunks.push(chunk);
Ok(())
}
/// Close the segment with an error.
pub fn close(self, err: CacheError) -> Result<(), CacheError> {
self.state.lock_mut().close(err)
}
}
impl Deref for Publisher {
type Target = Info;
fn deref(&self) -> &Self::Target {
&self.info
}
}
impl fmt::Debug for Publisher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Publisher")
.field("state", &self.state)
.field("info", &self.info)
.finish()
}
}
/// Notified when a segment has new data available.
#[derive(Clone)]
pub struct Subscriber {
// Modify the segment state.
state: Watch<State>,
// Immutable segment state.
info: Arc<Info>,
// The number of chunks that we've read.
// NOTE: Cloned subscribers inherit this index, but then run in parallel.
index: usize,
// Dropped when all Subscribers are dropped.
_dropped: Arc<Dropped>,
}
impl Subscriber {
fn new(state: Watch<State>, info: Arc<Info>) -> Self {
let _dropped = Arc::new(Dropped::new(state.clone()));
Self {
state,
info,
index: 0,
_dropped,
}
}
/// Block until the next chunk of bytes is available.
pub async fn read_chunk(&mut self) -> Result<Option<Bytes>, CacheError> {
loop {
let notify = {
let state = self.state.lock();
if self.index < state.chunks.len() {
let chunk = state.chunks[self.index].clone();
self.index += 1;
return Ok(Some(chunk));
}
match &state.closed {
Err(CacheError::Closed) => return Ok(None),
Err(err) => return Err(err.clone()),
Ok(()) => state.changed(),
}
};
notify.await; // Try again when the state changes
}
}
}
impl Deref for Subscriber {
type Target = Info;
fn deref(&self) -> &Self::Target {
&self.info
}
}
impl fmt::Debug for Subscriber {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Subscriber")
.field("state", &self.state)
.field("info", &self.info)
.field("index", &self.index)
.finish()
}
}
struct Dropped {
// Modify the segment state.
state: Watch<State>,
}
impl Dropped {
fn new(state: Watch<State>) -> Self {
Self { state }
}
}
impl Drop for Dropped {
fn drop(&mut self) {
self.state.lock_mut().close(CacheError::Closed).ok();
}
}

View File

@ -1,10 +1,17 @@
//! Allows a publisher to push updates, automatically caching and fanning it out to any subscribers. //! Allows a publisher to push updates, automatically caching and fanning it out to any subscribers.
//! //!
//! The naming scheme doesn't match the spec because it's vague and confusing. //! The hierarchy is: [broadcast] -> [track] -> [segment] -> [fragment] -> [Bytes](bytes::Bytes)
//! The hierarchy is: [broadcast] -> [track] -> [segment] -> [Bytes](bytes::Bytes) //!
//! The naming scheme doesn't match the spec because it's more strict, and bikeshedding of course:
//!
//! - [broadcast] is kinda like "track namespace"
//! - [track] is "track"
//! - [segment] is "group" but MUST use a single stream.
//! - [fragment] is "object" but MUST have the same properties as the segment.
pub mod broadcast; pub mod broadcast;
mod error; mod error;
pub mod fragment;
pub mod segment; pub mod segment;
pub mod track; pub mod track;

View File

@ -1,20 +1,18 @@
//! A segment is a stream of bytes with a header, split into a [Publisher] and [Subscriber] handle. //! A segment is a stream of fragments with a header, split into a [Publisher] and [Subscriber] handle.
//! //!
//! A [Publisher] writes an ordered stream of bytes in chunks. //! A [Publisher] writes an ordered stream of fragments.
//! There's no framing, so these chunks can be of any size or position, and won't be maintained over the network. //! Each fragment can have a sequence number, allowing the subscriber to detect gaps fragments.
//! //!
//! A [Subscriber] reads an ordered stream of bytes in chunks. //! A [Subscriber] reads an ordered stream of fragments.
//! These chunks are returned directly from the QUIC connection, so they may be of any size or position. //! The subscriber can be cloned, in which case each subscriber receives a copy of each fragment. (fanout)
//! A closed [Subscriber] will receive a copy of all future chunks. (fanout)
//! //!
//! The segment is closed with [CacheError::Closed] when all publishers or subscribers are dropped. //! The segment is closed with [CacheError::Closed] when all publishers or subscribers are dropped.
use core::fmt; use core::fmt;
use std::{ops::Deref, sync::Arc, time}; use std::{ops::Deref, sync::Arc, time};
use crate::VarInt; use crate::VarInt;
use bytes::Bytes;
use super::{CacheError, Watch}; use super::{fragment, CacheError, Watch};
/// Create a new segment with the given info. /// Create a new segment with the given info.
pub fn new(info: Info) -> (Publisher, Subscriber) { pub fn new(info: Info) -> (Publisher, Subscriber) {
@ -31,10 +29,11 @@ pub fn new(info: Info) -> (Publisher, Subscriber) {
#[derive(Debug)] #[derive(Debug)]
pub struct Info { pub struct Info {
// The sequence number of the segment within the track. // The sequence number of the segment within the track.
// NOTE: These may be received out of order or with gaps.
pub sequence: VarInt, pub sequence: VarInt,
// The priority of the segment within the BROADCAST. // The priority of the segment within the BROADCAST.
pub priority: i32, pub priority: u32,
// Cache the segment for at most this long. // Cache the segment for at most this long.
pub expires: Option<time::Duration>, pub expires: Option<time::Duration>,
@ -42,7 +41,7 @@ pub struct Info {
struct State { struct State {
// The data that has been received thus far. // The data that has been received thus far.
data: Vec<Bytes>, fragments: Vec<fragment::Subscriber>,
// Set when the publisher is dropped. // Set when the publisher is dropped.
closed: Result<(), CacheError>, closed: Result<(), CacheError>,
@ -59,7 +58,7 @@ impl State {
impl Default for State { impl Default for State {
fn default() -> Self { fn default() -> Self {
Self { Self {
data: Vec::new(), fragments: Vec::new(),
closed: Ok(()), closed: Ok(()),
} }
} }
@ -67,12 +66,8 @@ impl Default for State {
impl fmt::Debug for State { impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// We don't want to print out the contents, so summarize.
let size = self.data.iter().map(|chunk| chunk.len()).sum::<usize>();
let data = format!("size={} chunks={}", size, self.data.len());
f.debug_struct("State") f.debug_struct("State")
.field("data", &data) .field("fragments", &self.fragments)
.field("closed", &self.closed) .field("closed", &self.closed)
.finish() .finish()
} }
@ -96,14 +91,20 @@ impl Publisher {
Self { state, info, _dropped } Self { state, info, _dropped }
} }
/// Write a new chunk of bytes. /// Write a fragment
pub fn write_chunk(&mut self, data: Bytes) -> Result<(), CacheError> { pub fn push_fragment(&mut self, fragment: fragment::Subscriber) -> Result<(), CacheError> {
let mut state = self.state.lock_mut(); let mut state = self.state.lock_mut();
state.closed.clone()?; state.closed.clone()?;
state.data.push(data); state.fragments.push(fragment);
Ok(()) Ok(())
} }
pub fn create_fragment(&mut self, fragment: fragment::Info) -> Result<fragment::Publisher, CacheError> {
let (publisher, subscriber) = fragment::new(fragment);
self.push_fragment(subscriber)?;
Ok(publisher)
}
/// Close the segment with an error. /// Close the segment with an error.
pub fn close(self, err: CacheError) -> Result<(), CacheError> { pub fn close(self, err: CacheError) -> Result<(), CacheError> {
self.state.lock_mut().close(err) self.state.lock_mut().close(err)
@ -157,14 +158,14 @@ impl Subscriber {
} }
/// Block until the next chunk of bytes is available. /// Block until the next chunk of bytes is available.
pub async fn read_chunk(&mut self) -> Result<Option<Bytes>, CacheError> { pub async fn next_fragment(&mut self) -> Result<Option<fragment::Subscriber>, CacheError> {
loop { loop {
let notify = { let notify = {
let state = self.state.lock(); let state = self.state.lock();
if self.index < state.data.len() { if self.index < state.fragments.len() {
let chunk = state.data[self.index].clone(); let fragment = state.fragments[self.index].clone();
self.index += 1; self.index += 1;
return Ok(Some(chunk)); return Ok(Some(fragment));
} }
match &state.closed { match &state.closed {

View File

@ -206,7 +206,7 @@ impl Subscriber {
} }
} }
/// Block until the next segment arrives, or return None if the track is [CacheError::Closed]. /// Block until the next segment arrives
pub async fn next_segment(&mut self) -> Result<Option<segment::Subscriber>, CacheError> { pub async fn next_segment(&mut self) -> Result<Option<segment::Subscriber>, CacheError> {
loop { loop {
let notify = { let notify = {

View File

@ -1,5 +1,5 @@
use super::{BoundsExceeded, VarInt}; use super::{BoundsExceeded, VarInt};
use std::str; use std::{io, str};
use thiserror::Error; use thiserror::Error;
@ -7,6 +7,13 @@ use thiserror::Error;
// TODO Use trait aliases when they're stable, or add these bounds to every method. // TODO Use trait aliases when they're stable, or add these bounds to every method.
pub trait AsyncRead: tokio::io::AsyncRead + Unpin + Send {} pub trait AsyncRead: tokio::io::AsyncRead + Unpin + Send {}
impl AsyncRead for webtransport_quinn::RecvStream {} impl AsyncRead for webtransport_quinn::RecvStream {}
impl<T> AsyncRead for tokio::io::Take<&mut T> where T: AsyncRead {}
impl<T: AsRef<[u8]> + Unpin + Send> AsyncRead for io::Cursor<T> {}
#[async_trait::async_trait]
pub trait Decode: Sized {
async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError>;
}
/// A decode error. /// A decode error.
#[derive(Error, Debug)] #[derive(Error, Debug)]
@ -17,12 +24,32 @@ pub enum DecodeError {
#[error("invalid string")] #[error("invalid string")]
InvalidString(#[from] str::Utf8Error), InvalidString(#[from] str::Utf8Error),
#[error("invalid type: {0:?}")] #[error("invalid message: {0:?}")]
InvalidType(VarInt), InvalidMessage(VarInt),
#[error("invalid role: {0:?}")]
InvalidRole(VarInt),
#[error("invalid subscribe location")]
InvalidSubscribeLocation,
#[error("varint bounds exceeded")] #[error("varint bounds exceeded")]
BoundsExceeded(#[from] BoundsExceeded), BoundsExceeded(#[from] BoundsExceeded),
// TODO move these to ParamError
#[error("duplicate parameter")]
DupliateParameter,
#[error("missing parameter")]
MissingParameter,
#[error("invalid parameter")]
InvalidParameter,
#[error("io error: {0}")] #[error("io error: {0}")]
IoError(#[from] std::io::Error), IoError(#[from] std::io::Error),
// Used to signal that the stream has ended.
#[error("no more messages")]
Final,
} }

View File

@ -6,6 +6,12 @@ use thiserror::Error;
// TODO Use trait aliases when they're stable, or add these bounds to every method. // TODO Use trait aliases when they're stable, or add these bounds to every method.
pub trait AsyncWrite: tokio::io::AsyncWrite + Unpin + Send {} pub trait AsyncWrite: tokio::io::AsyncWrite + Unpin + Send {}
impl AsyncWrite for webtransport_quinn::SendStream {} impl AsyncWrite for webtransport_quinn::SendStream {}
impl AsyncWrite for Vec<u8> {}
#[async_trait::async_trait]
pub trait Encode: Sized {
async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError>;
}
/// An encode error. /// An encode error.
#[derive(Error, Debug)] #[derive(Error, Debug)]

View File

@ -1,9 +1,11 @@
mod decode; mod decode;
mod encode; mod encode;
mod params;
mod string; mod string;
mod varint; mod varint;
pub use decode::*; pub use decode::*;
pub use encode::*; pub use encode::*;
pub use params::*;
pub use string::*; pub use string::*;
pub use varint::*; pub use varint::*;

View File

@ -1,69 +1,85 @@
use std::cmp::min; use std::io::Cursor;
use std::{cmp::max, collections::HashMap};
use crate::VarInt; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::{AsyncRead, AsyncWrite, DecodeError, EncodeError}; use crate::coding::{AsyncRead, AsyncWrite, Decode, Encode};
use tokio::io::AsyncReadExt;
// I hate this parameter encoding so much. use crate::{
// i hate it i hate it i hate it coding::{DecodeError, EncodeError},
VarInt,
};
// TODO Use #[async_trait] so we can do Param<VarInt> instead. #[derive(Default, Debug, Clone)]
pub struct ParamInt(pub VarInt); pub struct Params(pub HashMap<VarInt, Vec<u8>>);
#[async_trait::async_trait]
impl Decode for Params {
async fn decode<R: AsyncRead>(mut r: &mut R) -> Result<Self, DecodeError> {
let mut params = HashMap::new();
// I hate this shit so much; let me encode my role and get on with my life.
let count = VarInt::decode(r).await?;
for _ in 0..count.into_inner() {
let kind = VarInt::decode(r).await?;
if params.contains_key(&kind) {
return Err(DecodeError::DupliateParameter);
}
impl ParamInt {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
// Why do we have a redundant size in front of each VarInt?
let size = VarInt::decode(r).await?; let size = VarInt::decode(r).await?;
let mut take = r.take(size.into_inner());
let value = VarInt::decode(&mut take).await?;
// Like seriously why do I have to check if the VarInt length mismatches. // Don't allocate the entire requested size to avoid a possible attack
if take.limit() != 0 { // Instead, we allocate up to 1024 and keep appending as we read further.
return Err(DecodeError::InvalidSize); let mut pr = r.take(size.into_inner());
let mut buf = Vec::with_capacity(max(1024, pr.limit() as usize));
pr.read_to_end(&mut buf).await?;
params.insert(kind, buf);
r = pr.into_inner();
} }
Ok(Self(value)) Ok(Params(params))
}
} }
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { #[async_trait::async_trait]
// Seriously why do I have to compute the size. impl Encode for Params {
let size = self.0.size(); async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
VarInt::try_from(size)?.encode(w).await?; VarInt::try_from(self.0.len())?.encode(w).await?;
self.0.encode(w).await?; for (kind, value) in self.0.iter() {
kind.encode(w).await?;
VarInt::try_from(value.len())?.encode(w).await?;
w.write_all(value).await?;
}
Ok(()) Ok(())
} }
} }
pub struct ParamBytes(pub Vec<u8>); impl Params {
pub fn new() -> Self {
impl ParamBytes { Self::default()
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
let size = VarInt::decode(r).await?;
let mut take = r.take(size.into_inner());
let mut buf = Vec::with_capacity(min(take.limit() as usize, 1024));
take.read_to_end(&mut buf).await?;
Ok(Self(buf))
} }
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn set<P: Encode>(&mut self, kind: VarInt, p: P) -> Result<(), EncodeError> {
let size = VarInt::try_from(self.0.len())?; let mut value = Vec::new();
size.encode(w).await?; p.encode(&mut value).await?;
w.write_all(&self.0).await?; self.0.insert(kind, value);
Ok(()) Ok(())
} }
pub fn has(&self, kind: VarInt) -> bool {
self.0.contains_key(&kind)
} }
pub struct ParamUnknown {} pub async fn get<P: Decode>(&mut self, kind: VarInt) -> Result<Option<P>, DecodeError> {
if let Some(value) = self.0.remove(&kind) {
impl ParamUnknown { let mut cursor = Cursor::new(value);
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<(), DecodeError> { Ok(Some(P::decode(&mut cursor).await?))
// Really? Is there no way to advance without reading? } else {
ParamBytes::decode(r).await?; Ok(None)
Ok(()) }
} }
} }

View File

@ -5,20 +5,25 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::VarInt; use crate::VarInt;
use super::{DecodeError, EncodeError}; use super::{Decode, DecodeError, Encode, EncodeError};
/// Encode a string with a varint length prefix. #[async_trait::async_trait]
pub async fn encode_string<W: AsyncWrite>(s: &str, w: &mut W) -> Result<(), EncodeError> { impl Encode for String {
let size = VarInt::try_from(s.len())?; async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
let size = VarInt::try_from(self.len())?;
size.encode(w).await?; size.encode(w).await?;
w.write_all(s.as_ref()).await?; w.write_all(self.as_ref()).await?;
Ok(()) Ok(())
} }
}
#[async_trait::async_trait]
impl Decode for String {
/// Decode a string with a varint length prefix. /// Decode a string with a varint length prefix.
pub async fn decode_string<R: AsyncRead>(r: &mut R) -> Result<String, DecodeError> { async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
let size = VarInt::decode(r).await?.into_inner(); let size = VarInt::decode(r).await?.into_inner();
let mut str = String::with_capacity(min(1024, size) as usize); let mut str = String::with_capacity(min(1024, size) as usize);
r.take(size).read_to_string(&mut str).await?; r.take(size).read_to_string(&mut str).await?;
Ok(str) Ok(str)
} }
}

View File

@ -9,7 +9,7 @@ use crate::coding::{AsyncRead, AsyncWrite};
use thiserror::Error; use thiserror::Error;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::{DecodeError, EncodeError}; use super::{Decode, DecodeError, Encode, EncodeError};
#[derive(Debug, Copy, Clone, Eq, PartialEq, Error)] #[derive(Debug, Copy, Clone, Eq, PartialEq, Error)]
#[error("value out of range")] #[error("value out of range")]
@ -164,14 +164,23 @@ impl fmt::Display for VarInt {
} }
} }
impl VarInt { #[async_trait::async_trait]
impl Decode for VarInt {
/// Decode a varint from the given reader. /// Decode a varint from the given reader.
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
let mut buf = [0u8; 8]; let b = r.read_u8().await?;
r.read_exact(buf[0..1].as_mut()).await?; Self::decode_byte(b, r).await
}
}
let tag = buf[0] >> 6; impl VarInt {
buf[0] &= 0b0011_1111; /// Decode a varint given the first byte, reading the rest as needed.
/// This is silly but useful for determining if the stream has ended.
pub async fn decode_byte<R: AsyncRead>(b: u8, r: &mut R) -> Result<Self, DecodeError> {
let tag = b >> 6;
let mut buf = [0u8; 8];
buf[0] = b & 0b0011_1111;
let x = match tag { let x = match tag {
0b00 => u64::from(buf[0]), 0b00 => u64::from(buf[0]),
@ -192,9 +201,12 @@ impl VarInt {
Ok(Self(x)) Ok(Self(x))
} }
}
#[async_trait::async_trait]
impl Encode for VarInt {
/// Encode a varint to the given writer. /// Encode a varint to the given writer.
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
let x = self.0; let x = self.0;
if x < 2u64.pow(6) { if x < 2u64.pow(6) {
w.write_u8(x as u8).await?; w.write_u8(x as u8).await?;

View File

@ -1,5 +1,7 @@
pub trait MoqError { pub trait MoqError {
/// An integer code that is sent over the wire. /// An integer code that is sent over the wire.
fn code(&self) -> u32; fn code(&self) -> u32;
fn reason(&self) -> &str;
/// An optional reason sometimes sent over the wire.
fn reason(&self) -> String;
} }

View File

@ -5,9 +5,7 @@
//! The specification is a work in progress and will change. //! The specification is a work in progress and will change.
//! See the [specification](https://datatracker.ietf.org/doc/draft-ietf-moq-transport/) and [github](https://github.com/moq-wg/moq-transport) for any updates. //! See the [specification](https://datatracker.ietf.org/doc/draft-ietf-moq-transport/) and [github](https://github.com/moq-wg/moq-transport) for any updates.
//! //!
//! **FORKED**: This is implementation makes extensive changes to the protocol. //! This implementation has some required extensions until the draft stablizes. See: [Extensions](crate::setup::Extensions)
//! See [KIXEL_00](crate::setup::Version::KIXEL_00) for a list of differences.
//! Many of these will get merged into the specification, so don't panic.
mod coding; mod coding;
mod error; mod error;

View File

@ -1,22 +1,30 @@
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params};
use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{AsyncRead, AsyncWrite};
use crate::setup::Extensions;
/// Sent by the publisher to announce the availability of a group of tracks. /// Sent by the publisher to announce the availability of a group of tracks.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Announce { pub struct Announce {
// The track namespace /// The track namespace
pub namespace: String, pub namespace: String,
/// Optional parameters
pub params: Params,
} }
impl Announce { impl Announce {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: AsyncRead>(r: &mut R, _ext: &Extensions) -> Result<Self, DecodeError> {
let namespace = decode_string(r).await?; let namespace = String::decode(r).await?;
Ok(Self { namespace }) let params = Params::decode(r).await?;
Ok(Self { namespace, params })
} }
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: AsyncWrite>(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> {
encode_string(&self.namespace, w).await?; self.namespace.encode(w).await?;
self.params.encode(w).await?;
Ok(()) Ok(())
} }
} }

View File

@ -1,4 +1,7 @@
use crate::coding::{decode_string, encode_string, AsyncRead, AsyncWrite, DecodeError, EncodeError}; use crate::{
coding::{AsyncRead, AsyncWrite, Decode, DecodeError, Encode, EncodeError},
setup::Extensions,
};
/// Sent by the subscriber to accept an Announce. /// Sent by the subscriber to accept an Announce.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -9,12 +12,12 @@ pub struct AnnounceOk {
} }
impl AnnounceOk { impl AnnounceOk {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: AsyncRead>(r: &mut R, _ext: &Extensions) -> Result<Self, DecodeError> {
let namespace = decode_string(r).await?; let namespace = String::decode(r).await?;
Ok(Self { namespace }) Ok(Self { namespace })
} }
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: AsyncWrite>(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> {
encode_string(&self.namespace, w).await self.namespace.encode(w).await
} }
} }

View File

@ -1,10 +1,11 @@
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{AsyncRead, AsyncWrite};
use crate::setup::Extensions;
/// Sent by the subscriber to reject an Announce. /// Sent by the subscriber to reject an Announce.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct AnnounceReset { pub struct AnnounceError {
// Echo back the namespace that was reset // Echo back the namespace that was reset
pub namespace: String, pub namespace: String,
@ -15,11 +16,11 @@ pub struct AnnounceReset {
pub reason: String, pub reason: String,
} }
impl AnnounceReset { impl AnnounceError {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: AsyncRead>(r: &mut R, _ext: &Extensions) -> Result<Self, DecodeError> {
let namespace = decode_string(r).await?; let namespace = String::decode(r).await?;
let code = VarInt::decode(r).await?.try_into()?; let code = VarInt::decode(r).await?.try_into()?;
let reason = decode_string(r).await?; let reason = String::decode(r).await?;
Ok(Self { Ok(Self {
namespace, namespace,
@ -28,10 +29,10 @@ impl AnnounceReset {
}) })
} }
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: AsyncWrite>(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> {
encode_string(&self.namespace, w).await?; self.namespace.encode(w).await?;
VarInt::from_u32(self.code).encode(w).await?; VarInt::from_u32(self.code).encode(w).await?;
encode_string(&self.reason, w).await?; self.reason.encode(w).await?;
Ok(()) Ok(())
} }

View File

@ -1,24 +0,0 @@
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError};
use crate::coding::{AsyncRead, AsyncWrite};
/// Sent by the publisher to terminate an Announce.
#[derive(Clone, Debug)]
pub struct AnnounceStop {
// Echo back the namespace that was reset
pub namespace: String,
}
impl AnnounceStop {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
let namespace = decode_string(r).await?;
Ok(Self { namespace })
}
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
encode_string(&self.namespace, w).await?;
Ok(())
}
}

View File

@ -1,6 +1,7 @@
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError}; use crate::coding::{Decode, DecodeError, Encode, EncodeError};
use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{AsyncRead, AsyncWrite};
use crate::setup::Extensions;
/// Sent by the server to indicate that the client should connect to a different server. /// Sent by the server to indicate that the client should connect to a different server.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -9,12 +10,12 @@ pub struct GoAway {
} }
impl GoAway { impl GoAway {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: AsyncRead>(r: &mut R, _ext: &Extensions) -> Result<Self, DecodeError> {
let url = decode_string(r).await?; let url = String::decode(r).await?;
Ok(Self { url }) Ok(Self { url })
} }
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: AsyncWrite>(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> {
encode_string(&self.url, w).await self.url.encode(w).await
} }
} }

View File

@ -6,16 +6,17 @@
//! //!
//! Messages sent by the publisher: //! Messages sent by the publisher:
//! - [Announce] //! - [Announce]
//! - [AnnounceReset] //! - [Unannounce]
//! - [SubscribeOk] //! - [SubscribeOk]
//! - [SubscribeError]
//! - [SubscribeReset] //! - [SubscribeReset]
//! - [Object] //! - [Object]
//! //!
//! Messages sent by the subscriber: //! Messages sent by the subscriber:
//! - [Subscribe] //! - [Subscribe]
//! - [SubscribeStop] //! - [Unsubscribe]
//! - [AnnounceOk] //! - [AnnounceOk]
//! - [AnnounceStop] //! - [AnnounceError]
//! //!
//! Example flow: //! Example flow:
//! ```test //! ```test
@ -32,30 +33,35 @@
mod announce; mod announce;
mod announce_ok; mod announce_ok;
mod announce_reset; mod announce_reset;
mod announce_stop;
mod go_away; mod go_away;
mod object; mod object;
mod subscribe; mod subscribe;
mod subscribe_error;
mod subscribe_fin;
mod subscribe_ok; mod subscribe_ok;
mod subscribe_reset; mod subscribe_reset;
mod subscribe_stop; mod unannounce;
mod unsubscribe;
pub use announce::*; pub use announce::*;
pub use announce_ok::*; pub use announce_ok::*;
pub use announce_reset::*; pub use announce_reset::*;
pub use announce_stop::*;
pub use go_away::*; pub use go_away::*;
pub use object::*; pub use object::*;
pub use subscribe::*; pub use subscribe::*;
pub use subscribe_error::*;
pub use subscribe_fin::*;
pub use subscribe_ok::*; pub use subscribe_ok::*;
pub use subscribe_reset::*; pub use subscribe_reset::*;
pub use subscribe_stop::*; pub use unannounce::*;
pub use unsubscribe::*;
use crate::coding::{DecodeError, EncodeError, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use std::fmt; use std::fmt;
use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{AsyncRead, AsyncWrite};
use crate::setup::Extensions;
// Use a macro to generate the message types rather than copy-paste. // Use a macro to generate the message types rather than copy-paste.
// This implements a decode/encode method that uses the specified type. // This implements a decode/encode method that uses the specified type.
@ -68,23 +74,23 @@ macro_rules! message_types {
} }
impl Message { impl Message {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: AsyncRead>(r: &mut R, ext: &Extensions) -> Result<Self, DecodeError> {
let t = VarInt::decode(r).await?; let t = VarInt::decode(r).await?;
match t.into_inner() { match t.into_inner() {
$($val => { $($val => {
let msg = $name::decode(r).await?; let msg = $name::decode(r, ext).await?;
Ok(Self::$name(msg)) Ok(Self::$name(msg))
})* })*
_ => Err(DecodeError::InvalidType(t)), _ => Err(DecodeError::InvalidMessage(t)),
} }
} }
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: AsyncWrite>(&self, w: &mut W, ext: &Extensions) -> Result<(), EncodeError> {
match self { match self {
$(Self::$name(ref m) => { $(Self::$name(ref m) => {
VarInt::from_u32($val).encode(w).await?; VarInt::from_u32($val).encode(w).await?;
m.encode(w).await m.encode(w, ext).await
},)* },)*
} }
} }
@ -127,15 +133,28 @@ macro_rules! message_types {
message_types! { message_types! {
// NOTE: Object and Setup are in other modules. // NOTE: Object and Setup are in other modules.
// Object = 0x0 // Object = 0x0
// SetupClient = 0x1 // ObjectUnbounded = 0x2
// SetupServer = 0x2 // SetupClient = 0x40
// SetupServer = 0x41
// SUBSCRIBE family, sent by subscriber
Subscribe = 0x3, Subscribe = 0x3,
Unsubscribe = 0xa,
// SUBSCRIBE family, sent by publisher
SubscribeOk = 0x4, SubscribeOk = 0x4,
SubscribeReset = 0x5, SubscribeError = 0x5,
SubscribeStop = 0x15, SubscribeFin = 0xb,
SubscribeReset = 0xc,
// ANNOUNCE family, sent by publisher
Announce = 0x6, Announce = 0x6,
Unannounce = 0x9,
// ANNOUNCE family, sent by subscriber
AnnounceOk = 0x7, AnnounceOk = 0x7,
AnnounceReset = 0x8, AnnounceError = 0x8,
AnnounceStop = 0x18,
// Misc
GoAway = 0x10, GoAway = 0x10,
} }

View File

@ -1,9 +1,10 @@
use std::time; use std::{io, time};
use crate::coding::{DecodeError, EncodeError, VarInt}; use tokio::io::AsyncReadExt;
use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use crate::setup;
/// Sent by the publisher as the header of each data stream. /// Sent by the publisher as the header of each data stream.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -13,47 +14,78 @@ pub struct Object {
pub track: VarInt, pub track: VarInt,
// The sequence number within the track. // The sequence number within the track.
pub group: VarInt,
// The sequence number within the group.
pub sequence: VarInt, pub sequence: VarInt,
// The priority, where **larger** values are sent first. // The priority, where **smaller** values are sent first.
// Proposal: int32 instead of a varint. pub priority: u32,
pub priority: i32,
// Cache the object for at most this many seconds. // Cache the object for at most this many seconds.
// Zero means never expire. // Zero means never expire.
pub expires: Option<time::Duration>, pub expires: Option<time::Duration>,
/// An optional size, allowing multiple OBJECTs on the same stream.
pub size: Option<VarInt>,
} }
impl Object { impl Object {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: AsyncRead>(r: &mut R, extensions: &setup::Extensions) -> Result<Self, DecodeError> {
let typ = VarInt::decode(r).await?; // Try reading the first byte, returning a special error if the stream naturally ended.
if typ.into_inner() != 0 { let typ = match r.read_u8().await {
return Err(DecodeError::InvalidType(typ)); Ok(b) => VarInt::decode_byte(b, r).await?,
} Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Err(DecodeError::Final),
Err(e) => return Err(e.into()),
};
// NOTE: size has been omitted let size_present = match typ.into_inner() {
0 => false,
2 => true,
_ => return Err(DecodeError::InvalidMessage(typ)),
};
let track = VarInt::decode(r).await?; let track = VarInt::decode(r).await?;
let group = VarInt::decode(r).await?;
let sequence = VarInt::decode(r).await?; let sequence = VarInt::decode(r).await?;
let priority = r.read_i32().await?; // big-endian let priority = VarInt::decode(r).await?.try_into()?;
let expires = match VarInt::decode(r).await?.into_inner() {
let expires = match extensions.object_expires {
true => match VarInt::decode(r).await?.into_inner() {
0 => None, 0 => None,
secs => Some(time::Duration::from_secs(secs)), secs => Some(time::Duration::from_secs(secs)),
},
false => None,
};
// The presence of the size field depends on the type.
let size = match size_present {
true => Some(VarInt::decode(r).await?),
false => None,
}; };
Ok(Self { Ok(Self {
track, track,
group,
sequence, sequence,
priority, priority,
expires, expires,
size,
}) })
} }
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: AsyncWrite>(&self, w: &mut W, extensions: &setup::Extensions) -> Result<(), EncodeError> {
VarInt::ZERO.encode(w).await?; // The kind changes based on the presence of the size.
let kind = match self.size {
Some(_) => VarInt::from_u32(2),
None => VarInt::ZERO,
};
kind.encode(w).await?;
self.track.encode(w).await?; self.track.encode(w).await?;
self.group.encode(w).await?;
self.sequence.encode(w).await?; self.sequence.encode(w).await?;
w.write_i32(self.priority).await?; VarInt::from_u32(self.priority).encode(w).await?;
// Round up if there's any decimal points. // Round up if there's any decimal points.
let expires = match self.expires { let expires = match self.expires {
@ -63,7 +95,13 @@ impl Object {
Some(expires) => expires.as_secs(), Some(expires) => expires.as_secs(),
}; };
if extensions.object_expires {
VarInt::try_from(expires)?.encode(w).await?; VarInt::try_from(expires)?.encode(w).await?;
}
if let Some(size) = self.size {
size.encode(w).await?;
}
Ok(()) Ok(())
} }

View File

@ -1,38 +1,141 @@
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params, VarInt};
use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{AsyncRead, AsyncWrite};
use crate::setup::Extensions;
/// Sent by the subscriber to request all future objects for the given track. /// Sent by the subscriber to request all future objects for the given track.
/// ///
/// Objects will use the provided ID instead of the full track name, to save bytes. /// Objects will use the provided ID instead of the full track name, to save bytes.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Subscribe { pub struct Subscribe {
// An ID we choose so we can map to the track_name. /// An ID we choose so we can map to the track_name.
// Proposal: https://github.com/moq-wg/moq-transport/issues/209 // Proposal: https://github.com/moq-wg/moq-transport/issues/209
pub id: VarInt, pub id: VarInt,
// The track namespace. /// The track namespace.
pub namespace: String, ///
/// Must be None if `extensions.subscribe_split` is false.
pub namespace: Option<String>,
// The track name. /// The track name.
pub name: String, pub name: String,
/// The start/end group/object.
pub start_group: SubscribeLocation,
pub start_object: SubscribeLocation,
pub end_group: SubscribeLocation,
pub end_object: SubscribeLocation,
/// Optional parameters
pub params: Params,
} }
impl Subscribe { impl Subscribe {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: AsyncRead>(r: &mut R, ext: &Extensions) -> Result<Self, DecodeError> {
let id = VarInt::decode(r).await?; let id = VarInt::decode(r).await?;
let namespace = decode_string(r).await?;
let name = decode_string(r).await?;
Ok(Self { id, namespace, name }) let namespace = match ext.subscribe_split {
} true => Some(String::decode(r).await?),
false => None,
};
let name = String::decode(r).await?;
let start_group = SubscribeLocation::decode(r).await?;
let start_object = SubscribeLocation::decode(r).await?;
let end_group = SubscribeLocation::decode(r).await?;
let end_object = SubscribeLocation::decode(r).await?;
// You can't have a start object without a start group.
if start_group == SubscribeLocation::None && start_object != SubscribeLocation::None {
return Err(DecodeError::InvalidSubscribeLocation);
} }
impl Subscribe { // You can't have an end object without an end group.
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { if end_group == SubscribeLocation::None && end_object != SubscribeLocation::None {
return Err(DecodeError::InvalidSubscribeLocation);
}
// NOTE: There's some more location restrictions in the draft, but they're enforced at a higher level.
let params = Params::decode(r).await?;
Ok(Self {
id,
namespace,
name,
start_group,
start_object,
end_group,
end_object,
params,
})
}
pub async fn encode<W: AsyncWrite>(&self, w: &mut W, ext: &Extensions) -> Result<(), EncodeError> {
self.id.encode(w).await?; self.id.encode(w).await?;
encode_string(&self.namespace, w).await?;
encode_string(&self.name, w).await?; if self.namespace.is_some() != ext.subscribe_split {
panic!("namespace must be None if subscribe_split is false");
}
if ext.subscribe_split {
self.namespace.as_ref().unwrap().encode(w).await?;
}
self.name.encode(w).await?;
self.start_group.encode(w).await?;
self.start_object.encode(w).await?;
self.end_group.encode(w).await?;
self.end_object.encode(w).await?;
self.params.encode(w).await?;
Ok(())
}
}
/// Signal where the subscription should begin, relative to the current cache.
#[derive(Clone, Debug, PartialEq)]
pub enum SubscribeLocation {
None,
Absolute(VarInt),
Latest(VarInt),
Future(VarInt),
}
impl SubscribeLocation {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
let kind = VarInt::decode(r).await?;
match kind.into_inner() {
0 => Ok(Self::None),
1 => Ok(Self::Absolute(VarInt::decode(r).await?)),
2 => Ok(Self::Latest(VarInt::decode(r).await?)),
3 => Ok(Self::Future(VarInt::decode(r).await?)),
_ => Err(DecodeError::InvalidSubscribeLocation),
}
}
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
match self {
Self::None => {
VarInt::from_u32(0).encode(w).await?;
}
Self::Absolute(val) => {
VarInt::from_u32(1).encode(w).await?;
val.encode(w).await?;
}
Self::Latest(val) => {
VarInt::from_u32(2).encode(w).await?;
val.encode(w).await?;
}
Self::Future(val) => {
VarInt::from_u32(3).encode(w).await?;
val.encode(w).await?;
}
}
Ok(()) Ok(())
} }

View File

@ -0,0 +1,36 @@
use crate::coding::{AsyncRead, AsyncWrite};
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use crate::setup::Extensions;
/// Sent by the publisher to reject a Subscribe.
#[derive(Clone, Debug)]
pub struct SubscribeError {
// NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209
// The ID for this subscription.
pub id: VarInt,
// An error code.
pub code: u32,
// An optional, human-readable reason.
pub reason: String,
}
impl SubscribeError {
pub async fn decode<R: AsyncRead>(r: &mut R, _ext: &Extensions) -> Result<Self, DecodeError> {
let id = VarInt::decode(r).await?;
let code = VarInt::decode(r).await?.try_into()?;
let reason = String::decode(r).await?;
Ok(Self { id, code, reason })
}
pub async fn encode<W: AsyncWrite>(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> {
self.id.encode(w).await?;
VarInt::from_u32(self.code).encode(w).await?;
self.reason.encode(w).await?;
Ok(())
}
}

View File

@ -0,0 +1,37 @@
use crate::coding::{AsyncRead, AsyncWrite};
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use crate::setup::Extensions;
/// Sent by the publisher to cleanly terminate a Subscribe.
#[derive(Clone, Debug)]
pub struct SubscribeFin {
// NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209
/// The ID for this subscription.
pub id: VarInt,
/// The final group/object sent on this subscription.
pub final_group: VarInt,
pub final_object: VarInt,
}
impl SubscribeFin {
pub async fn decode<R: AsyncRead>(r: &mut R, _ext: &Extensions) -> Result<Self, DecodeError> {
let id = VarInt::decode(r).await?;
let final_group = VarInt::decode(r).await?;
let final_object = VarInt::decode(r).await?;
Ok(Self {
id,
final_group,
final_object,
})
}
pub async fn encode<W: AsyncWrite>(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> {
self.id.encode(w).await?;
self.final_group.encode(w).await?;
self.final_object.encode(w).await?;
Ok(())
}
}

View File

@ -1,26 +1,31 @@
use crate::coding::{DecodeError, EncodeError, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{AsyncRead, AsyncWrite};
use crate::setup::Extensions;
/// Sent by the publisher to accept a Subscribe. /// Sent by the publisher to accept a Subscribe.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SubscribeOk { pub struct SubscribeOk {
// NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209
/// The ID for this track.
// The ID for this track.
pub id: VarInt, pub id: VarInt,
/// The subscription will expire in this many milliseconds.
pub expires: VarInt,
} }
impl SubscribeOk { impl SubscribeOk {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: AsyncRead>(r: &mut R, _ext: &Extensions) -> Result<Self, DecodeError> {
let id = VarInt::decode(r).await?; let id = VarInt::decode(r).await?;
Ok(Self { id }) let expires = VarInt::decode(r).await?;
Ok(Self { id, expires })
} }
} }
impl SubscribeOk { impl SubscribeOk {
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: AsyncWrite>(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> {
self.id.encode(w).await?; self.id.encode(w).await?;
self.expires.encode(w).await?;
Ok(()) Ok(())
} }
} }

View File

@ -1,35 +1,49 @@
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError, VarInt};
use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{AsyncRead, AsyncWrite};
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use crate::setup::Extensions;
/// Sent by the publisher to reject a Subscribe. /// Sent by the publisher to terminate a Subscribe.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SubscribeReset { pub struct SubscribeReset {
// NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209
/// The ID for this subscription.
// The ID for this subscription.
pub id: VarInt, pub id: VarInt,
// An error code. /// An error code.
pub code: u32, pub code: u32,
// An optional, human-readable reason. /// An optional, human-readable reason.
pub reason: String, pub reason: String,
/// The final group/object sent on this subscription.
pub final_group: VarInt,
pub final_object: VarInt,
} }
impl SubscribeReset { impl SubscribeReset {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: AsyncRead>(r: &mut R, _ext: &Extensions) -> Result<Self, DecodeError> {
let id = VarInt::decode(r).await?; let id = VarInt::decode(r).await?;
let code = VarInt::decode(r).await?.try_into()?; let code = VarInt::decode(r).await?.try_into()?;
let reason = decode_string(r).await?; let reason = String::decode(r).await?;
let final_group = VarInt::decode(r).await?;
let final_object = VarInt::decode(r).await?;
Ok(Self { id, code, reason }) Ok(Self {
id,
code,
reason,
final_group,
final_object,
})
} }
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: AsyncWrite>(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> {
self.id.encode(w).await?; self.id.encode(w).await?;
VarInt::from_u32(self.code).encode(w).await?; VarInt::from_u32(self.code).encode(w).await?;
encode_string(&self.reason, w).await?; self.reason.encode(w).await?;
self.final_group.encode(w).await?;
self.final_object.encode(w).await?;
Ok(()) Ok(())
} }

View File

@ -0,0 +1,25 @@
use crate::coding::{Decode, DecodeError, Encode, EncodeError};
use crate::coding::{AsyncRead, AsyncWrite};
use crate::setup::Extensions;
/// Sent by the publisher to terminate an Announce.
#[derive(Clone, Debug)]
pub struct Unannounce {
// Echo back the namespace that was reset
pub namespace: String,
}
impl Unannounce {
pub async fn decode<R: AsyncRead>(r: &mut R, _ext: &Extensions) -> Result<Self, DecodeError> {
let namespace = String::decode(r).await?;
Ok(Self { namespace })
}
pub async fn encode<W: AsyncWrite>(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> {
self.namespace.encode(w).await?;
Ok(())
}
}

View File

@ -1,25 +1,26 @@
use crate::coding::{DecodeError, EncodeError, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{AsyncRead, AsyncWrite};
use crate::setup::Extensions;
/// Sent by the subscriber to terminate a Subscribe. /// Sent by the subscriber to terminate a Subscribe.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SubscribeStop { pub struct Unsubscribe {
// NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209 // NOTE: No full track name because of this proposal: https://github.com/moq-wg/moq-transport/issues/209
// The ID for this subscription. // The ID for this subscription.
pub id: VarInt, pub id: VarInt,
} }
impl SubscribeStop { impl Unsubscribe {
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: AsyncRead>(r: &mut R, _ext: &Extensions) -> Result<Self, DecodeError> {
let id = VarInt::decode(r).await?; let id = VarInt::decode(r).await?;
Ok(Self { id }) Ok(Self { id })
} }
} }
impl SubscribeStop { impl Unsubscribe {
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: AsyncWrite>(&self, w: &mut W, _ext: &Extensions) -> Result<(), EncodeError> {
self.id.encode(w).await?; self.id.encode(w).await?;
Ok(()) Ok(())
} }

View File

@ -1,6 +1,6 @@
use super::{Publisher, SessionError, Subscriber}; use super::{Control, Publisher, SessionError, Subscriber};
use crate::{cache::broadcast, setup}; use crate::{cache::broadcast, setup};
use webtransport_quinn::{RecvStream, SendStream, Session}; use webtransport_quinn::Session;
/// An endpoint that connects to a URL to publish and/or consume live streams. /// An endpoint that connects to a URL to publish and/or consume live streams.
pub struct Client {} pub struct Client {}
@ -9,7 +9,6 @@ impl Client {
/// Connect using an established WebTransport session, performing the MoQ handshake as a publisher. /// Connect using an established WebTransport session, performing the MoQ handshake as a publisher.
pub async fn publisher(session: Session, source: broadcast::Subscriber) -> Result<Publisher, SessionError> { pub async fn publisher(session: Session, source: broadcast::Subscriber) -> Result<Publisher, SessionError> {
let control = Self::send_setup(&session, setup::Role::Publisher).await?; let control = Self::send_setup(&session, setup::Role::Publisher).await?;
let publisher = Publisher::new(session, control, source); let publisher = Publisher::new(session, control, source);
Ok(publisher) Ok(publisher)
} }
@ -17,7 +16,6 @@ impl Client {
/// Connect using an established WebTransport session, performing the MoQ handshake as a subscriber. /// Connect using an established WebTransport session, performing the MoQ handshake as a subscriber.
pub async fn subscriber(session: Session, source: broadcast::Publisher) -> Result<Subscriber, SessionError> { pub async fn subscriber(session: Session, source: broadcast::Publisher) -> Result<Subscriber, SessionError> {
let control = Self::send_setup(&session, setup::Role::Subscriber).await?; let control = Self::send_setup(&session, setup::Role::Subscriber).await?;
let subscriber = Subscriber::new(session, control, source); let subscriber = Subscriber::new(session, control, source);
Ok(subscriber) Ok(subscriber)
} }
@ -29,26 +27,46 @@ impl Client {
} }
*/ */
async fn send_setup(session: &Session, role: setup::Role) -> Result<(SendStream, RecvStream), SessionError> { async fn send_setup(session: &Session, role: setup::Role) -> Result<Control, SessionError> {
let mut control = session.open_bi().await?; let mut control = session.open_bi().await?;
let versions: setup::Versions = [setup::Version::DRAFT_01, setup::Version::KIXEL_01].into();
let client = setup::Client { let client = setup::Client {
role, role,
versions: vec![setup::Version::KIXEL_00].into(), versions: versions.clone(),
params: Default::default(),
// Offer all extensions
extensions: setup::Extensions {
object_expires: true,
subscriber_id: true,
subscribe_split: true,
},
}; };
client.encode(&mut control.0).await?; client.encode(&mut control.0).await?;
let server = setup::Server::decode(&mut control.1).await?; let mut server = setup::Server::decode(&mut control.1).await?;
if server.version != setup::Version::KIXEL_00 { match server.version {
return Err(SessionError::Version(Some(server.version))); setup::Version::DRAFT_01 => {
// We always require this extension
server.extensions.require_subscriber_id()?;
if server.role.is_publisher() {
// We only require object expires if we're a subscriber, so we don't cache objects indefinitely.
server.extensions.require_object_expires()?;
}
}
setup::Version::KIXEL_01 => {
// KIXEL_01 didn't support extensions; all were enabled.
server.extensions = client.extensions.clone()
}
_ => return Err(SessionError::Version(versions, [server.version].into())),
} }
// Make sure the server replied with the let control = Control::new(control.0, control.1, server.extensions);
if !client.role.is_compatible(server.role) {
return Err(SessionError::RoleIncompatible(client.role, server.role));
}
Ok(control) Ok(control)
} }

View File

@ -6,19 +6,21 @@ use tokio::sync::Mutex;
use webtransport_quinn::{RecvStream, SendStream}; use webtransport_quinn::{RecvStream, SendStream};
use super::SessionError; use super::SessionError;
use crate::message::Message; use crate::{message::Message, setup::Extensions};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct Control { pub(crate) struct Control {
send: Arc<Mutex<SendStream>>, send: Arc<Mutex<SendStream>>,
recv: Arc<Mutex<RecvStream>>, recv: Arc<Mutex<RecvStream>>,
pub ext: Extensions,
} }
impl Control { impl Control {
pub fn new(send: SendStream, recv: RecvStream) -> Self { pub fn new(send: SendStream, recv: RecvStream, ext: Extensions) -> Self {
Self { Self {
send: Arc::new(Mutex::new(send)), send: Arc::new(Mutex::new(send)),
recv: Arc::new(Mutex::new(recv)), recv: Arc::new(Mutex::new(recv)),
ext,
} }
} }
@ -26,7 +28,7 @@ impl Control {
let mut stream = self.send.lock().await; let mut stream = self.send.lock().await;
log::info!("sending message: {:?}", msg); log::info!("sending message: {:?}", msg);
msg.into() msg.into()
.encode(&mut *stream) .encode(&mut *stream, &self.ext)
.await .await
.map_err(|e| SessionError::Unknown(e.to_string()))?; .map_err(|e| SessionError::Unknown(e.to_string()))?;
Ok(()) Ok(())
@ -35,7 +37,7 @@ impl Control {
// It's likely a mistake to call this from two different tasks, but it's easier to just support it. // It's likely a mistake to call this from two different tasks, but it's easier to just support it.
pub async fn recv(&self) -> Result<Message, SessionError> { pub async fn recv(&self) -> Result<Message, SessionError> {
let mut stream = self.recv.lock().await; let mut stream = self.recv.lock().await;
let msg = Message::decode(&mut *stream) let msg = Message::decode(&mut *stream, &self.ext)
.await .await
.map_err(|e| SessionError::Unknown(e.to_string()))?; .map_err(|e| SessionError::Unknown(e.to_string()))?;
Ok(msg) Ok(msg)

View File

@ -14,8 +14,8 @@ pub enum SessionError {
#[error("decode error: {0}")] #[error("decode error: {0}")]
Decode(#[from] coding::DecodeError), Decode(#[from] coding::DecodeError),
#[error("unsupported version: {0:?}")] #[error("unsupported versions: client={0:?} server={1:?}")]
Version(Option<setup::Version>), Version(setup::Versions, setup::Versions),
#[error("incompatible roles: client={0:?} server={1:?}")] #[error("incompatible roles: client={0:?} server={1:?}")]
RoleIncompatible(setup::Role, setup::Role), RoleIncompatible(setup::Role, setup::Role),
@ -32,6 +32,22 @@ pub enum SessionError {
#[error("role violation: msg={0}")] #[error("role violation: msg={0}")]
RoleViolation(VarInt), RoleViolation(VarInt),
/// Our enforced stream mapping was disrespected.
#[error("stream mapping conflict")]
StreamMapping,
/// The priority was invalid.
#[error("invalid priority: {0}")]
InvalidPriority(VarInt),
/// The size was invalid.
#[error("invalid size: {0}")]
InvalidSize(VarInt),
/// A required extension was not offered.
#[error("required extension not offered: {0:?}")]
RequiredExtension(VarInt),
/// An unclassified error because I'm lazy. TODO classify these errors /// An unclassified error because I'm lazy. TODO classify these errors
#[error("unknown error: {0}")] #[error("unknown error: {0}")]
Unknown(String), Unknown(String),
@ -44,29 +60,42 @@ impl MoqError for SessionError {
Self::Cache(err) => err.code(), Self::Cache(err) => err.code(),
Self::RoleIncompatible(..) => 406, Self::RoleIncompatible(..) => 406,
Self::RoleViolation(..) => 405, Self::RoleViolation(..) => 405,
Self::StreamMapping => 409,
Self::Unknown(_) => 500, Self::Unknown(_) => 500,
Self::Write(_) => 501, Self::Write(_) => 501,
Self::Read(_) => 502, Self::Read(_) => 502,
Self::Session(_) => 503, Self::Session(_) => 503,
Self::Version(_) => 406, Self::Version(..) => 406,
Self::Encode(_) => 500, Self::Encode(_) => 500,
Self::Decode(_) => 500, Self::Decode(_) => 500,
Self::InvalidPriority(_) => 400,
Self::InvalidSize(_) => 400,
Self::RequiredExtension(_) => 426,
} }
} }
/// A reason that is sent over the wire. /// A reason that is sent over the wire.
fn reason(&self) -> &str { fn reason(&self) -> String {
match self { match self {
Self::Cache(err) => err.reason(), Self::Cache(err) => err.reason(),
Self::RoleViolation(_) => "role violation", Self::RoleViolation(kind) => format!("role violation for message type {:?}", kind),
Self::RoleIncompatible(..) => "role incompatible", Self::RoleIncompatible(client, server) => {
Self::Read(_) => "read error", format!(
Self::Write(_) => "write error", "role incompatible: client wanted {:?} but server wanted {:?}",
Self::Session(_) => "session error", client, server
Self::Unknown(_) => "unknown", )
Self::Version(_) => "unsupported version", }
Self::Encode(_) => "encode error", Self::Read(err) => format!("read error: {}", err),
Self::Decode(_) => "decode error", Self::Write(err) => format!("write error: {}", err),
Self::Session(err) => format!("session error: {}", err),
Self::Unknown(err) => format!("unknown error: {}", err),
Self::Version(client, server) => format!("unsupported versions: client={:?} server={:?}", client, server),
Self::Encode(err) => format!("encode error: {}", err),
Self::Decode(err) => format!("decode error: {}", err),
Self::StreamMapping => "streaming mapping conflict".to_owned(),
Self::InvalidPriority(priority) => format!("invalid priority: {}", priority),
Self::InvalidSize(size) => format!("invalid size: {}", size),
Self::RequiredExtension(id) => format!("required extension was missing: {:?}", id),
} }
} }
} }

View File

@ -4,7 +4,7 @@ use std::{
}; };
use tokio::task::AbortHandle; use tokio::task::AbortHandle;
use webtransport_quinn::{RecvStream, SendStream, Session}; use webtransport_quinn::Session;
use crate::{ use crate::{
cache::{broadcast, segment, track, CacheError}, cache::{broadcast, segment, track, CacheError},
@ -27,13 +27,11 @@ pub struct Publisher {
} }
impl Publisher { impl Publisher {
pub(crate) fn new(webtransport: Session, control: (SendStream, RecvStream), source: broadcast::Subscriber) -> Self { pub(crate) fn new(webtransport: Session, control: Control, source: broadcast::Subscriber) -> Self {
let control = Control::new(control.0, control.1);
Self { Self {
webtransport, webtransport,
subscribes: Default::default(),
control, control,
subscribes: Default::default(),
source, source,
} }
} }
@ -85,9 +83,9 @@ impl Publisher {
async fn recv_message(&mut self, msg: &Message) -> Result<(), SessionError> { async fn recv_message(&mut self, msg: &Message) -> Result<(), SessionError> {
match msg { match msg {
Message::AnnounceOk(msg) => self.recv_announce_ok(msg).await, Message::AnnounceOk(msg) => self.recv_announce_ok(msg).await,
Message::AnnounceStop(msg) => self.recv_announce_stop(msg).await, Message::AnnounceError(msg) => self.recv_announce_error(msg).await,
Message::Subscribe(msg) => self.recv_subscribe(msg).await, Message::Subscribe(msg) => self.recv_subscribe(msg).await,
Message::SubscribeStop(msg) => self.recv_subscribe_stop(msg).await, Message::Unsubscribe(msg) => self.recv_unsubscribe(msg).await,
_ => Err(SessionError::RoleViolation(msg.id())), _ => Err(SessionError::RoleViolation(msg.id())),
} }
} }
@ -97,7 +95,7 @@ impl Publisher {
Err(CacheError::NotFound.into()) Err(CacheError::NotFound.into())
} }
async fn recv_announce_stop(&mut self, _msg: &message::AnnounceStop) -> Result<(), SessionError> { async fn recv_announce_error(&mut self, _msg: &message::AnnounceError) -> Result<(), SessionError> {
// We didn't send an announce. // We didn't send an announce.
Err(CacheError::NotFound.into()) Err(CacheError::NotFound.into())
} }
@ -115,14 +113,24 @@ impl Publisher {
hash_map::Entry::Vacant(entry) => entry.insert(abort), hash_map::Entry::Vacant(entry) => entry.insert(abort),
}; };
self.control.send(message::SubscribeOk { id: msg.id }).await self.control
.send(message::SubscribeOk {
id: msg.id,
expires: VarInt::ZERO,
})
.await
} }
async fn reset_subscribe<E: MoqError>(&mut self, id: VarInt, err: E) -> Result<(), SessionError> { async fn reset_subscribe<E: MoqError>(&mut self, id: VarInt, err: E) -> Result<(), SessionError> {
let msg = message::SubscribeReset { let msg = message::SubscribeReset {
id, id,
code: err.code(), code: err.code(),
reason: err.reason().to_string(), reason: err.reason(),
// TODO properly populate these
// But first: https://github.com/moq-wg/moq-transport/issues/313
final_group: VarInt::ZERO,
final_object: VarInt::ZERO,
}; };
self.control.send(msg).await self.control.send(msg).await
@ -130,7 +138,8 @@ impl Publisher {
fn start_subscribe(&mut self, msg: message::Subscribe) -> Result<AbortHandle, SessionError> { fn start_subscribe(&mut self, msg: message::Subscribe) -> Result<AbortHandle, SessionError> {
// We currently don't use the namespace field in SUBSCRIBE // We currently don't use the namespace field in SUBSCRIBE
if !msg.namespace.is_empty() { // Make sure the namespace is empty if it's provided.
if msg.namespace.as_ref().map_or(false, |namespace| !namespace.is_empty()) {
return Err(CacheError::NotFound.into()); return Err(CacheError::NotFound.into());
} }
@ -176,31 +185,42 @@ impl Publisher {
} }
async fn run_segment(&self, id: VarInt, segment: &mut segment::Subscriber) -> Result<(), SessionError> { async fn run_segment(&self, id: VarInt, segment: &mut segment::Subscriber) -> Result<(), SessionError> {
let object = message::Object { log::trace!("serving group: {:?}", segment);
track: id,
sequence: segment.sequence,
priority: segment.priority,
expires: segment.expires,
};
log::trace!("serving object: {:?}", object);
let mut stream = self.webtransport.open_uni().await?; let mut stream = self.webtransport.open_uni().await?;
stream.set_priority(object.priority).ok();
// Convert the u32 to a i32, since the Quinn set_priority is signed.
let priority = (segment.priority as i64 - i32::MAX as i64) as i32;
stream.set_priority(priority).ok();
while let Some(mut fragment) = segment.next_fragment().await? {
let object = message::Object {
track: id,
// Properties of the segment
group: segment.sequence,
priority: segment.priority,
expires: segment.expires,
// Properties of the fragment
sequence: fragment.sequence,
size: fragment.size,
};
object object
.encode(&mut stream) .encode(&mut stream, &self.control.ext)
.await .await
.map_err(|e| SessionError::Unknown(e.to_string()))?; .map_err(|e| SessionError::Unknown(e.to_string()))?;
while let Some(data) = segment.read_chunk().await? { while let Some(chunk) = fragment.read_chunk().await? {
stream.write_chunk(data).await?; stream.write_all(&chunk).await?;
}
} }
Ok(()) Ok(())
} }
async fn recv_subscribe_stop(&mut self, msg: &message::SubscribeStop) -> Result<(), SessionError> { async fn recv_unsubscribe(&mut self, msg: &message::Unsubscribe) -> Result<(), SessionError> {
let abort = self let abort = self
.subscribes .subscribes
.lock() .lock()

View File

@ -1,4 +1,4 @@
use super::{Publisher, SessionError, Subscriber}; use super::{Control, Publisher, SessionError, Subscriber};
use crate::{cache::broadcast, setup}; use crate::{cache::broadcast, setup};
use webtransport_quinn::{RecvStream, SendStream, Session}; use webtransport_quinn::{RecvStream, SendStream, Session};
@ -13,13 +13,32 @@ impl Server {
pub async fn accept(session: Session) -> Result<Request, SessionError> { pub async fn accept(session: Session) -> Result<Request, SessionError> {
let mut control = session.accept_bi().await?; let mut control = session.accept_bi().await?;
let client = setup::Client::decode(&mut control.1).await?; let mut client = setup::Client::decode(&mut control.1).await?;
client if client.versions.contains(&setup::Version::DRAFT_01) {
.versions // We always require subscriber ID.
.iter() client.extensions.require_subscriber_id()?;
.find(|version| **version == setup::Version::KIXEL_00)
.ok_or_else(|| SessionError::Version(client.versions.last().cloned()))?; // We require OBJECT_EXPIRES for publishers only.
if client.role.is_publisher() {
client.extensions.require_object_expires()?;
}
// We don't require SUBSCRIBE_SPLIT since it's easy enough to support, but it's clearly an oversight.
// client.extensions.require(&Extension::SUBSCRIBE_SPLIT)?;
} else if client.versions.contains(&setup::Version::KIXEL_01) {
// Extensions didn't exist in KIXEL_01, so we set them manually.
client.extensions = setup::Extensions {
object_expires: true,
subscriber_id: true,
subscribe_split: true,
};
} else {
return Err(SessionError::Version(
client.versions,
[setup::Version::DRAFT_01, setup::Version::KIXEL_01].into(),
));
}
Ok(Request { Ok(Request {
session, session,
@ -39,17 +58,21 @@ pub struct Request {
impl Request { impl Request {
/// Accept the session as a publisher, using the provided broadcast to serve subscriptions. /// Accept the session as a publisher, using the provided broadcast to serve subscriptions.
pub async fn publisher(mut self, source: broadcast::Subscriber) -> Result<Publisher, SessionError> { pub async fn publisher(mut self, source: broadcast::Subscriber) -> Result<Publisher, SessionError> {
self.send_setup(setup::Role::Publisher).await?; let setup = self.setup(setup::Role::Publisher)?;
setup.encode(&mut self.control.0).await?;
let publisher = Publisher::new(self.session, self.control, source); let control = Control::new(self.control.0, self.control.1, setup.extensions);
let publisher = Publisher::new(self.session, control, source);
Ok(publisher) Ok(publisher)
} }
/// Accept the session as a subscriber only. /// Accept the session as a subscriber only.
pub async fn subscriber(mut self, source: broadcast::Publisher) -> Result<Subscriber, SessionError> { pub async fn subscriber(mut self, source: broadcast::Publisher) -> Result<Subscriber, SessionError> {
self.send_setup(setup::Role::Subscriber).await?; let setup = self.setup(setup::Role::Subscriber)?;
setup.encode(&mut self.control.0).await?;
let subscriber = Subscriber::new(self.session, self.control, source); let control = Control::new(self.control.0, self.control.1, setup.extensions);
let subscriber = Subscriber::new(self.session, control, source);
Ok(subscriber) Ok(subscriber)
} }
@ -60,10 +83,12 @@ impl Request {
} }
*/ */
async fn send_setup(&mut self, role: setup::Role) -> Result<(), SessionError> { fn setup(&mut self, role: setup::Role) -> Result<setup::Server, SessionError> {
let server = setup::Server { let server = setup::Server {
role, role,
version: setup::Version::KIXEL_00, version: setup::Version::DRAFT_01,
extensions: self.client.extensions.clone(),
params: Default::default(),
}; };
// We need to sure we support the opposite of the client's role. // We need to sure we support the opposite of the client's role.
@ -72,9 +97,7 @@ impl Request {
return Err(SessionError::RoleIncompatible(self.client.role, server.role)); return Err(SessionError::RoleIncompatible(self.client.role, server.role));
} }
server.encode(&mut self.control.0).await?; Ok(server)
Ok(())
} }
/// Reject the request, closing the Webtransport session. /// Reject the request, closing the Webtransport session.

View File

@ -1,4 +1,4 @@
use webtransport_quinn::{RecvStream, SendStream, Session}; use webtransport_quinn::{RecvStream, Session};
use std::{ use std::{
collections::HashMap, collections::HashMap,
@ -6,7 +6,8 @@ use std::{
}; };
use crate::{ use crate::{
cache::{broadcast, segment, track, CacheError}, cache::{broadcast, fragment, segment, track, CacheError},
coding::DecodeError,
message, message,
message::Message, message::Message,
session::{Control, SessionError}, session::{Control, SessionError},
@ -34,9 +35,7 @@ pub struct Subscriber {
} }
impl Subscriber { impl Subscriber {
pub(crate) fn new(webtransport: Session, control: (SendStream, RecvStream), source: broadcast::Publisher) -> Self { pub(crate) fn new(webtransport: Session, control: Control, source: broadcast::Publisher) -> Self {
let control = Control::new(control.0, control.1);
Self { Self {
webtransport, webtransport,
subscribes: Default::default(), subscribes: Default::default(),
@ -64,28 +63,28 @@ impl Subscriber {
let msg = self.control.recv().await?; let msg = self.control.recv().await?;
log::info!("message received: {:?}", msg); log::info!("message received: {:?}", msg);
if let Err(err) = self.recv_message(&msg).await { if let Err(err) = self.recv_message(&msg) {
log::warn!("message error: {:?} {:?}", err, msg); log::warn!("message error: {:?} {:?}", err, msg);
} }
} }
} }
async fn recv_message(&mut self, msg: &Message) -> Result<(), SessionError> { fn recv_message(&mut self, msg: &Message) -> Result<(), SessionError> {
match msg { match msg {
Message::Announce(_) => Ok(()), // don't care Message::Announce(_) => Ok(()), // don't care
Message::AnnounceReset(_) => Ok(()), // also don't care Message::Unannounce(_) => Ok(()), // also don't care
Message::SubscribeOk(_) => Ok(()), // guess what, don't care Message::SubscribeOk(_msg) => Ok(()), // don't care
Message::SubscribeReset(msg) => self.recv_subscribe_reset(msg).await, Message::SubscribeReset(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code)),
Message::SubscribeFin(msg) => self.recv_subscribe_error(msg.id, CacheError::Closed),
Message::SubscribeError(msg) => self.recv_subscribe_error(msg.id, CacheError::Reset(msg.code)),
Message::GoAway(_msg) => unimplemented!("GOAWAY"), Message::GoAway(_msg) => unimplemented!("GOAWAY"),
_ => Err(SessionError::RoleViolation(msg.id())), _ => Err(SessionError::RoleViolation(msg.id())),
} }
} }
async fn recv_subscribe_reset(&mut self, msg: &message::SubscribeReset) -> Result<(), SessionError> { fn recv_subscribe_error(&mut self, id: VarInt, err: CacheError) -> Result<(), SessionError> {
let err = CacheError::Reset(msg.code);
let mut subscribes = self.subscribes.lock().unwrap(); let mut subscribes = self.subscribes.lock().unwrap();
let subscribe = subscribes.remove(&msg.id).ok_or(CacheError::NotFound)?; let subscribe = subscribes.remove(&id).ok_or(CacheError::NotFound)?;
subscribe.close(err)?; subscribe.close(err)?;
Ok(()) Ok(())
@ -107,36 +106,82 @@ impl Subscriber {
async fn run_stream(self, mut stream: RecvStream) -> Result<(), SessionError> { async fn run_stream(self, mut stream: RecvStream) -> Result<(), SessionError> {
// Decode the object on the data stream. // Decode the object on the data stream.
let object = message::Object::decode(&mut stream) let mut object = message::Object::decode(&mut stream, &self.control.ext)
.await .await
.map_err(|e| SessionError::Unknown(e.to_string()))?; .map_err(|e| SessionError::Unknown(e.to_string()))?;
log::trace!("received object: {:?}", object); log::trace!("received object: {:?}", object);
// A new scope is needed because the async compiler is dumb // A new scope is needed because the async compiler is dumb
let mut publisher = { let mut segment = {
let mut subscribes = self.subscribes.lock().unwrap(); let mut subscribes = self.subscribes.lock().unwrap();
let track = subscribes.get_mut(&object.track).ok_or(CacheError::NotFound)?; let track = subscribes.get_mut(&object.track).ok_or(CacheError::NotFound)?;
track.create_segment(segment::Info { track.create_segment(segment::Info {
sequence: object.sequence, sequence: object.group,
priority: object.priority, priority: object.priority,
expires: object.expires, expires: object.expires,
})? })?
}; };
while let Some(data) = stream.read_chunk(usize::MAX, true).await? { // Create the first fragment
let mut fragment = segment.create_fragment(fragment::Info {
sequence: object.sequence,
size: object.size,
})?;
let mut remain = object.size.map(usize::from);
loop {
if let Some(0) = remain {
// Decode the next object from the stream.
let next = match message::Object::decode(&mut stream, &self.control.ext).await {
Ok(next) => next,
// No more objects
Err(DecodeError::Final) => break,
// Unknown error
Err(err) => return Err(err.into()),
};
// NOTE: This is a custom restriction; not part of the moq-transport draft.
// We require every OBJECT to contain the same priority since prioritization is done per-stream.
// We also require every OBJECT to contain the same group so we know when the group ends, and can detect gaps.
if next.priority != object.priority && next.group != object.group {
return Err(SessionError::StreamMapping);
}
// Create a new object.
fragment = segment.create_fragment(fragment::Info {
sequence: object.sequence,
size: object.size,
})?;
object = next;
remain = object.size.map(usize::from);
}
match stream.read_chunk(remain.unwrap_or(usize::MAX), true).await? {
// Unbounded object has ended
None if remain.is_none() => break,
// Bounded object ended early, oops.
None => return Err(DecodeError::UnexpectedEnd.into()),
// NOTE: This does not make a copy! // NOTE: This does not make a copy!
// Bytes are immutable and ref counted. // Bytes are immutable and ref counted.
publisher.write_chunk(data.bytes)?; Some(data) => fragment.write_chunk(data.bytes)?,
}
} }
Ok(()) Ok(())
} }
async fn run_source(mut self) -> Result<(), SessionError> { async fn run_source(mut self) -> Result<(), SessionError> {
loop {
// NOTE: This returns Closed when the source is closed. // NOTE: This returns Closed when the source is closed.
while let Some(track) = self.source.next_track().await? { let track = self.source.next_track().await?;
let name = track.name.clone(); let name = track.name.clone();
let id = VarInt::from_u32(self.next.fetch_add(1, atomic::Ordering::SeqCst)); let id = VarInt::from_u32(self.next.fetch_add(1, atomic::Ordering::SeqCst));
@ -144,13 +189,19 @@ impl Subscriber {
let msg = message::Subscribe { let msg = message::Subscribe {
id, id,
namespace: "".to_string(), namespace: self.control.ext.subscribe_split.then(|| "".to_string()),
name, name,
// TODO correctly support these
start_group: message::SubscribeLocation::Latest(VarInt::ZERO),
start_object: message::SubscribeLocation::Absolute(VarInt::ZERO),
end_group: message::SubscribeLocation::None,
end_object: message::SubscribeLocation::None,
params: Default::default(),
}; };
self.control.send(msg).await?; self.control.send(msg).await?;
} }
Ok(())
} }
} }

View File

@ -1,6 +1,6 @@
use super::{Role, Versions}; use super::{Extensions, Role, Versions};
use crate::{ use crate::{
coding::{DecodeError, EncodeError}, coding::{Decode, DecodeError, Encode, EncodeError, Params},
VarInt, VarInt,
}; };
@ -15,29 +15,57 @@ pub struct Client {
pub versions: Versions, pub versions: Versions,
/// Indicate if the client is a publisher, a subscriber, or both. /// Indicate if the client is a publisher, a subscriber, or both.
// Proposal: moq-wg/moq-transport#151
pub role: Role, pub role: Role,
/// A list of known/offered extensions.
pub extensions: Extensions,
/// Unknown parameters.
pub params: Params,
} }
impl Client { impl Client {
/// Decode a client setup message. /// Decode a client setup message.
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
let typ = VarInt::decode(r).await?; let typ = VarInt::decode(r).await?;
if typ.into_inner() != 1 { if typ.into_inner() != 0x40 {
return Err(DecodeError::InvalidType(typ)); return Err(DecodeError::InvalidMessage(typ));
} }
let versions = Versions::decode(r).await?; let versions = Versions::decode(r).await?;
let role = Role::decode(r).await?; let mut params = Params::decode(r).await?;
Ok(Self { versions, role }) let role = params
.get::<Role>(VarInt::from_u32(0))
.await?
.ok_or(DecodeError::MissingParameter)?;
// Make sure the PATH parameter isn't used
// TODO: This assumes WebTransport support only
if params.has(VarInt::from_u32(1)) {
return Err(DecodeError::InvalidParameter);
}
let extensions = Extensions::load(&mut params).await?;
Ok(Self {
versions,
role,
extensions,
params,
})
} }
/// Encode a server setup message. /// Encode a server setup message.
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
VarInt::from_u32(1).encode(w).await?; VarInt::from_u32(0x40).encode(w).await?;
self.versions.encode(w).await?; self.versions.encode(w).await?;
self.role.encode(w).await?;
let mut params = self.params.clone();
params.set(VarInt::from_u32(0), self.role).await?;
self.extensions.store(&mut params).await?;
params.encode(w).await?;
Ok(()) Ok(())
} }

View File

@ -0,0 +1,84 @@
use tokio::io::{AsyncRead, AsyncWrite};
use crate::coding::{Decode, DecodeError, Encode, EncodeError, Params};
use crate::session::SessionError;
use crate::VarInt;
use paste::paste;
/// This is a custom extension scheme to allow/require draft PRs.
///
/// By convention, the extension number is the PR number + 0xe0000.
macro_rules! extensions {
{$($name:ident = $val:expr,)*} => {
#[derive(Clone, Default, Debug)]
pub struct Extensions {
$(
pub $name: bool,
)*
}
impl Extensions {
pub async fn load(params: &mut Params) -> Result<Self, DecodeError> {
let mut extensions = Self::default();
$(
if let Some(_) = params.get::<ExtensionExists>(VarInt::from_u32($val)).await? {
extensions.$name = true
}
)*
Ok(extensions)
}
pub async fn store(&self, params: &mut Params) -> Result<(), EncodeError> {
$(
if self.$name {
params.set(VarInt::from_u32($val), ExtensionExists{}).await?;
}
)*
Ok(())
}
paste! {
$(
pub fn [<require_ $name>](&self) -> Result<(), SessionError> {
match self.$name {
true => Ok(()),
false => Err(SessionError::RequiredExtension(VarInt::from_u32($val))),
}
}
)*
}
}
}
}
struct ExtensionExists;
#[async_trait::async_trait]
impl Decode for ExtensionExists {
async fn decode<R: AsyncRead>(_r: &mut R) -> Result<Self, DecodeError> {
Ok(ExtensionExists {})
}
}
#[async_trait::async_trait]
impl Encode for ExtensionExists {
async fn encode<W: AsyncWrite>(&self, _w: &mut W) -> Result<(), EncodeError> {
Ok(())
}
}
extensions! {
// required for publishers: OBJECT contains expires VarInt in seconds: https://github.com/moq-wg/moq-transport/issues/249
// TODO write up a PR
object_expires = 0xe00f9,
// required: SUBSCRIBE chooses track ID: https://github.com/moq-wg/moq-transport/pull/258
subscriber_id = 0xe0102,
// optional: SUBSCRIBE contains namespace/name tuple: https://github.com/moq-wg/moq-transport/pull/277
subscribe_split = 0xe0115,
}

View File

@ -5,11 +5,13 @@
//! Both sides negotate the [Version] and [Role]. //! Both sides negotate the [Version] and [Role].
mod client; mod client;
mod extension;
mod role; mod role;
mod server; mod server;
mod version; mod version;
pub use client::*; pub use client::*;
pub use extension::*;
pub use role::*; pub use role::*;
pub use server::*; pub use server::*;
pub use version::*; pub use version::*;

View File

@ -1,6 +1,6 @@
use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{AsyncRead, AsyncWrite};
use crate::coding::{DecodeError, EncodeError, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
/// Indicates the endpoint is a publisher, subscriber, or both. /// Indicates the endpoint is a publisher, subscriber, or both.
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -36,9 +36,9 @@ impl Role {
impl From<Role> for VarInt { impl From<Role> for VarInt {
fn from(r: Role) -> Self { fn from(r: Role) -> Self {
VarInt::from_u32(match r { VarInt::from_u32(match r {
Role::Publisher => 0x0, Role::Publisher => 0x1,
Role::Subscriber => 0x1, Role::Subscriber => 0x2,
Role::Both => 0x2, Role::Both => 0x3,
}) })
} }
} }
@ -48,23 +48,27 @@ impl TryFrom<VarInt> for Role {
fn try_from(v: VarInt) -> Result<Self, Self::Error> { fn try_from(v: VarInt) -> Result<Self, Self::Error> {
match v.into_inner() { match v.into_inner() {
0x0 => Ok(Self::Publisher), 0x1 => Ok(Self::Publisher),
0x1 => Ok(Self::Subscriber), 0x2 => Ok(Self::Subscriber),
0x2 => Ok(Self::Both), 0x3 => Ok(Self::Both),
_ => Err(DecodeError::InvalidType(v)), _ => Err(DecodeError::InvalidRole(v)),
} }
} }
} }
impl Role { #[async_trait::async_trait]
impl Decode for Role {
/// Decode the role. /// Decode the role.
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
let v = VarInt::decode(r).await?; let v = VarInt::decode(r).await?;
v.try_into() v.try_into()
} }
}
#[async_trait::async_trait]
impl Encode for Role {
/// Encode the role. /// Encode the role.
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
VarInt::from(*self).encode(w).await VarInt::from(*self).encode(w).await
} }
} }

View File

@ -1,6 +1,6 @@
use super::{Role, Version}; use super::{Extensions, Role, Version};
use crate::{ use crate::{
coding::{DecodeError, EncodeError}, coding::{Decode, DecodeError, Encode, EncodeError, Params},
VarInt, VarInt,
}; };
@ -17,27 +17,54 @@ pub struct Server {
/// Indicate if the server is a publisher, a subscriber, or both. /// Indicate if the server is a publisher, a subscriber, or both.
// Proposal: moq-wg/moq-transport#151 // Proposal: moq-wg/moq-transport#151
pub role: Role, pub role: Role,
/// Custom extensions.
pub extensions: Extensions,
/// Unknown parameters.
pub params: Params,
} }
impl Server { impl Server {
/// Decode the server setup. /// Decode the server setup.
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
let typ = VarInt::decode(r).await?; let typ = VarInt::decode(r).await?;
if typ.into_inner() != 2 { if typ.into_inner() != 0x41 {
return Err(DecodeError::InvalidType(typ)); return Err(DecodeError::InvalidMessage(typ));
} }
let version = Version::decode(r).await?; let version = Version::decode(r).await?;
let role = Role::decode(r).await?; let mut params = Params::decode(r).await?;
Ok(Self { version, role }) let role = params
.get::<Role>(VarInt::from_u32(0))
.await?
.ok_or(DecodeError::MissingParameter)?;
// Make sure the PATH parameter isn't used
if params.has(VarInt::from_u32(1)) {
return Err(DecodeError::InvalidParameter);
}
let extensions = Extensions::load(&mut params).await?;
Ok(Self {
version,
role,
extensions,
params,
})
} }
/// Encode the server setup. /// Encode the server setup.
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
VarInt::from_u32(2).encode(w).await?; VarInt::from_u32(0x41).encode(w).await?;
self.version.encode(w).await?; self.version.encode(w).await?;
self.role.encode(w).await?;
let mut params = self.params.clone();
params.set(VarInt::from_u32(0), self.role).await?;
self.extensions.store(&mut params).await?;
params.encode(w).await?;
Ok(()) Ok(())
} }

View File

@ -1,4 +1,4 @@
use crate::coding::{DecodeError, EncodeError, VarInt}; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};
use crate::coding::{AsyncRead, AsyncWrite}; use crate::coding::{AsyncRead, AsyncWrite};
@ -9,9 +9,12 @@ use std::ops::Deref;
pub struct Version(pub VarInt); pub struct Version(pub VarInt);
impl Version { impl Version {
/// <https://www.ietf.org/archive/id/draft-ietf-moq-transport-00.html> /// https://www.ietf.org/archive/id/draft-ietf-moq-transport-00.html
pub const DRAFT_00: Version = Version(VarInt::from_u32(0xff00)); pub const DRAFT_00: Version = Version(VarInt::from_u32(0xff00));
/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-01.html
pub const DRAFT_01: Version = Version(VarInt::from_u32(0xff01));
/// Fork of draft-ietf-moq-transport-00. /// Fork of draft-ietf-moq-transport-00.
/// ///
/// Rough list of differences: /// Rough list of differences:
@ -56,6 +59,18 @@ impl Version {
/// # GROUP /// # GROUP
/// - GROUP concept was removed, replaced with OBJECT as a QUIC stream. /// - GROUP concept was removed, replaced with OBJECT as a QUIC stream.
pub const KIXEL_00: Version = Version(VarInt::from_u32(0xbad00)); pub const KIXEL_00: Version = Version(VarInt::from_u32(0xbad00));
/// Fork of draft-ietf-moq-transport-01.
///
/// Most of the KIXEL_00 changes made it into the draft, or were reverted.
/// This was only used for a short time until extensions were created.
///
/// - SUBSCRIBE contains a separate track namespace and track name field (accidental revert). [#277](https://github.com/moq-wg/moq-transport/pull/277)
/// - SUBSCRIBE contains the `track_id` instead of SUBSCRIBE_OK. [#145](https://github.com/moq-wg/moq-transport/issues/145)
/// - SUBSCRIBE_* reference `track_id` the instead of the `track_full_name`. [#145](https://github.com/moq-wg/moq-transport/issues/145)
/// - OBJECT `priority` is still a VarInt, but the max value is a u32 (implementation reasons)
/// - OBJECT messages within the same `group` MUST be on the same QUIC stream.
pub const KIXEL_01: Version = Version(VarInt::from_u32(0xbad01));
} }
impl From<VarInt> for Version { impl From<VarInt> for Version {
@ -88,9 +103,10 @@ impl Version {
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Versions(Vec<Version>); pub struct Versions(Vec<Version>);
impl Versions { #[async_trait::async_trait]
impl Decode for Versions {
/// Decode the version list. /// Decode the version list.
pub async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> { async fn decode<R: AsyncRead>(r: &mut R) -> Result<Self, DecodeError> {
let count = VarInt::decode(r).await?.into_inner(); let count = VarInt::decode(r).await?.into_inner();
let mut vs = Vec::new(); let mut vs = Vec::new();
@ -101,9 +117,12 @@ impl Versions {
Ok(Self(vs)) Ok(Self(vs))
} }
}
#[async_trait::async_trait]
impl Encode for Versions {
/// Encode the version list. /// Encode the version list.
pub async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> { async fn encode<W: AsyncWrite>(&self, w: &mut W) -> Result<(), EncodeError> {
let size: VarInt = self.0.len().try_into()?; let size: VarInt = self.0.len().try_into()?;
size.encode(w).await?; size.encode(w).await?;
@ -128,3 +147,9 @@ impl From<Vec<Version>> for Versions {
Self(vs) Self(vs)
} }
} }
impl<const N: usize> From<[Version; N]> for Versions {
fn from(vs: [Version; N]) -> Self {
Self(vs.to_vec())
}
}