Compare commits
No commits in common. "feat/docker" and "main" have entirely different histories.
feat/docke
...
main
|
@ -26,21 +26,6 @@ dependencies = [
|
||||||
"memchr",
|
"memchr",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "android-tzdata"
|
|
||||||
version = "0.1.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "android_system_properties"
|
|
||||||
version = "0.1.5"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
|
|
||||||
dependencies = [
|
|
||||||
"libc",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "anstream"
|
name = "anstream"
|
||||||
version = "0.5.0"
|
version = "0.5.0"
|
||||||
|
@ -389,20 +374,6 @@ version = "1.0.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "chrono"
|
|
||||||
version = "0.4.31"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
|
|
||||||
dependencies = [
|
|
||||||
"android-tzdata",
|
|
||||||
"iana-time-zone",
|
|
||||||
"js-sys",
|
|
||||||
"num-traits",
|
|
||||||
"wasm-bindgen",
|
|
||||||
"windows-targets",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "clap"
|
name = "clap"
|
||||||
version = "4.4.2"
|
version = "4.4.2"
|
||||||
|
@ -901,29 +872,6 @@ dependencies = [
|
||||||
"tokio-native-tls",
|
"tokio-native-tls",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "iana-time-zone"
|
|
||||||
version = "0.1.58"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20"
|
|
||||||
dependencies = [
|
|
||||||
"android_system_properties",
|
|
||||||
"core-foundation-sys",
|
|
||||||
"iana-time-zone-haiku",
|
|
||||||
"js-sys",
|
|
||||||
"wasm-bindgen",
|
|
||||||
"windows-core",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "iana-time-zone-haiku"
|
|
||||||
version = "0.1.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
|
|
||||||
dependencies = [
|
|
||||||
"cc",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "idna"
|
name = "idna"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
|
@ -1103,28 +1051,6 @@ dependencies = [
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "moq-clock"
|
|
||||||
version = "0.1.0"
|
|
||||||
dependencies = [
|
|
||||||
"anyhow",
|
|
||||||
"chrono",
|
|
||||||
"clap",
|
|
||||||
"clap_mangen",
|
|
||||||
"env_logger",
|
|
||||||
"log",
|
|
||||||
"moq-transport",
|
|
||||||
"quinn",
|
|
||||||
"rustls",
|
|
||||||
"rustls-native-certs",
|
|
||||||
"rustls-pemfile",
|
|
||||||
"tokio",
|
|
||||||
"tracing",
|
|
||||||
"tracing-subscriber",
|
|
||||||
"url",
|
|
||||||
"webtransport-quinn",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "moq-pub"
|
name = "moq-pub"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
@ -1181,26 +1107,14 @@ dependencies = [
|
||||||
name = "moq-transport"
|
name = "moq-transport"
|
||||||
version = "0.2.0"
|
version = "0.2.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"bytes",
|
"bytes",
|
||||||
"clap",
|
|
||||||
"env_logger",
|
|
||||||
"indexmap 2.0.0",
|
"indexmap 2.0.0",
|
||||||
"log",
|
"log",
|
||||||
"mp4",
|
|
||||||
"paste",
|
"paste",
|
||||||
"quinn",
|
"quinn",
|
||||||
"rfc6381-codec",
|
|
||||||
"rustls",
|
|
||||||
"rustls-native-certs",
|
|
||||||
"rustls-pemfile",
|
|
||||||
"serde_json",
|
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
|
||||||
"tracing-subscriber",
|
|
||||||
"url",
|
|
||||||
"webtransport-quinn",
|
"webtransport-quinn",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -2499,15 +2413,14 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "webtransport-quinn"
|
name = "webtransport-quinn"
|
||||||
version = "0.6.1"
|
version = "0.6.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0b6536bd7382e3ecaeaf791fefbe8aa98d987eb5809ba7f1bd20617161d3a319"
|
checksum = "cceb876dbd00a87b3fd8869d1c315e07c28b0eb54d59b592a07a634f5e2b64e1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-std",
|
"async-std",
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures",
|
"futures",
|
||||||
"http",
|
"http",
|
||||||
"log",
|
|
||||||
"quinn",
|
"quinn",
|
||||||
"quinn-proto",
|
"quinn-proto",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
@ -2548,15 +2461,6 @@ version = "0.4.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows-core"
|
|
||||||
version = "0.51.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64"
|
|
||||||
dependencies = [
|
|
||||||
"windows-targets",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows-sys"
|
name = "windows-sys"
|
||||||
version = "0.48.0"
|
version = "0.48.0"
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
[workspace]
|
[workspace]
|
||||||
members = ["moq-transport", "moq-relay", "moq-pub", "moq-api", "moq-clock"]
|
members = ["moq-transport", "moq-relay", "moq-pub", "moq-api"]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
32
Dockerfile
32
Dockerfile
|
@ -1,8 +1,8 @@
|
||||||
FROM rust:bookworm as builder
|
FROM rust:latest as builder
|
||||||
|
|
||||||
# Create a build directory and copy over all of the files
|
# Create a build directory and copy over all of the files
|
||||||
WORKDIR /build
|
WORKDIR /build
|
||||||
COPY . ./
|
COPY . .
|
||||||
|
|
||||||
# Reuse a cache between builds.
|
# Reuse a cache between builds.
|
||||||
# I tried to `cargo install`, but it doesn't seem to work with workspaces.
|
# I tried to `cargo install`, but it doesn't seem to work with workspaces.
|
||||||
|
@ -12,20 +12,28 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
||||||
--mount=type=cache,target=/build/target \
|
--mount=type=cache,target=/build/target \
|
||||||
cargo build --release && cp /build/target/release/moq-* /usr/local/cargo/bin
|
cargo build --release && cp /build/target/release/moq-* /usr/local/cargo/bin
|
||||||
|
|
||||||
# moq-rs image with just the binaries
|
# Special image for moq-pub with ffmpeg and a publish script included.
|
||||||
FROM debian:bookworm-slim
|
FROM rust:latest as moq-pub
|
||||||
|
|
||||||
|
# Install required utilities and ffmpeg
|
||||||
RUN apt-get update && \
|
RUN apt-get update && \
|
||||||
apt-get install -y --no-install-recommends curl libssl3 && \
|
apt-get install -y ffmpeg wget
|
||||||
rm -rf /var/lib/apt/lists/*
|
|
||||||
|
# Copy the publish script into the image
|
||||||
|
COPY deploy/publish.sh /usr/local/bin/publish
|
||||||
|
|
||||||
|
# Copy the compiled binary
|
||||||
|
COPY --from=builder /usr/local/cargo/bin/moq-pub /usr/local/cargo/bin/moq-pub
|
||||||
|
CMD [ "publish" ]
|
||||||
|
|
||||||
|
# moq-rs image with just the binaries
|
||||||
|
FROM rust:latest as moq-rs
|
||||||
|
|
||||||
LABEL org.opencontainers.image.source=https://github.com/kixelated/moq-rs
|
LABEL org.opencontainers.image.source=https://github.com/kixelated/moq-rs
|
||||||
LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0"
|
LABEL org.opencontainers.image.licenses="MIT OR Apache-2.0"
|
||||||
|
|
||||||
COPY --from=builder /usr/local/cargo/bin /usr/local/bin
|
# Fly.io entrypoint
|
||||||
|
ADD deploy/fly-relay.sh .
|
||||||
|
|
||||||
# Entrypoint to load relay TLS config in Fly:
|
# Copy the compiled binaries
|
||||||
COPY deploy/fly-relay.sh .
|
COPY --from=builder /usr/local/cargo/bin /usr/local/cargo/bin
|
||||||
|
|
||||||
# Default to moq-relay:
|
|
||||||
CMD ["moq-relay"]
|
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
# Hackathon
|
||||||
|
|
||||||
|
IETF Prague 118
|
||||||
|
|
||||||
|
## MoqTransport
|
||||||
|
|
||||||
|
Reference libraries are available at [moq-rs](https://github.com/kixelated/moq-rs) and [moq-js](https://github.com/kixelated/moq-js). The Rust library is [well documented](https://docs.rs/moq-transport/latest/moq_transport/) but the web library, not so much.
|
||||||
|
|
||||||
|
**TODO** Update both to draft-01.
|
||||||
|
**TODO** Switch any remaining forks over to extensions. ex: track_id in SUBSCRIBE
|
||||||
|
|
||||||
|
The stream mapping right now is quite rigid: `stream == group == object`.
|
||||||
|
|
||||||
|
**TODO** Support multiple objects per group. They MUST NOT use different priorities, different tracks, or out-of-order sequences.
|
||||||
|
|
||||||
|
The API and cache aren't designed to send/receive arbitrary objects over arbitrary streams as specified in the draft. I don't think it should, and it wouldn't be possible to implement in time for the hackathon anyway.
|
||||||
|
|
||||||
|
**TODO** Make an extension to enforce this stream mapping?
|
||||||
|
|
||||||
|
## Generic Relay
|
||||||
|
|
||||||
|
I'm hosting a simple CDN at: `relay.quic.video`
|
||||||
|
|
||||||
|
The traffic is sharded based on the WebTransport path to avoid namespace collisions. Think of it like a customer ID, although it's completely unauthenticated for now. Use your username or whatever string you want: `CONNECT https://relay.quic.video/alan`.
|
||||||
|
|
||||||
|
**TODO** Currently, it performs an implicit `ANNOUNCE ""` when `role=publisher`. This means there can only be a single publisher per shard and `role=both` is not supported. I should have explicit `ANNOUNCE` messages supported before the hackathon to remove this limitation.
|
||||||
|
|
||||||
|
**TODO** I don't know if I will have subscribe hints fully working in time. They will be parsed but might be ignored.
|
||||||
|
|
||||||
|
## CMAF Media
|
||||||
|
|
||||||
|
You can [publish](https://quic.video/publish) and [watch](https://quic.video/watch) broadcasts.
|
||||||
|
There's a [24/7 bunny stream](https://quic.video/watch/bbb) or you can publish your own using [moq-pub](https://github.com/kixelated/moq-rs/tree/main/moq-pub).
|
||||||
|
|
||||||
|
If you want to fetch from the relay directly, the name of the broadcast is the path. For example, `https://quic.video/watch/bbb` can be accessed at `relay.quic.video/bbb`.
|
||||||
|
|
||||||
|
The namespace is empty and the catalog track is `.catalog`. I'm currently using simple JSON catalog with no support for delta updates.
|
||||||
|
|
||||||
|
**TODO** update to the proposed [Warp catalog](https://datatracker.ietf.org/doc/draft-wilaw-moq-catalogformat/).
|
||||||
|
|
||||||
|
The media tracks uses a single (unbounded) object per group. Video groups are per GoP, while audio groups are per frame. There's also an init track containing information required to initialize the decoder.
|
||||||
|
|
||||||
|
**TODO** Base64 encode the init track in the catalog.
|
||||||
|
|
||||||
|
|
||||||
|
## Clock
|
||||||
|
|
||||||
|
**TODO** Host a clock demo that sends a group per second:
|
||||||
|
|
||||||
|
```
|
||||||
|
GROUP: YYYY-MM-DD HH:MM
|
||||||
|
OBJECT: SS
|
||||||
|
```
|
8
Makefile
8
Makefile
|
@ -1,8 +0,0 @@
|
||||||
export CAROOT ?= $(shell cd dev ; go run filippo.io/mkcert -CAROOT)
|
|
||||||
|
|
||||||
.PHONY: run
|
|
||||||
run: dev/localhost.crt
|
|
||||||
@docker-compose up --build --remove-orphans
|
|
||||||
|
|
||||||
dev/localhost.crt:
|
|
||||||
@dev/cert
|
|
13
README.md
13
README.md
|
@ -11,23 +11,12 @@ This repository contains a few crates:
|
||||||
- **moq-pub**: A publish client, accepting media from stdin (ex. via ffmpeg) and sending it to a remote server.
|
- **moq-pub**: A publish client, accepting media from stdin (ex. via ffmpeg) and sending it to a remote server.
|
||||||
- **moq-transport**: An async implementation of the underlying MoQ protocol.
|
- **moq-transport**: An async implementation of the underlying MoQ protocol.
|
||||||
- **moq-api**: A HTTP API server that stores the origin for each broadcast, backed by redis.
|
- **moq-api**: A HTTP API server that stores the origin for each broadcast, backed by redis.
|
||||||
- **moq-clock**: A dumb clock client/server just to prove MoQ is more than media.
|
|
||||||
|
|
||||||
There's currently no way to view media with this repo; you'll need to use [moq-js](https://github.com/kixelated/moq-js) for that.
|
There's currently no way to view media with this repo; you'll need to use [moq-js](https://github.com/kixelated/moq-js) for that.
|
||||||
|
|
||||||
## Development
|
## Development
|
||||||
|
|
||||||
Launch backend containers (including two relays, API and a Redis instance) using docker-compose:
|
Use the [dev helper scripts](dev/README.md) for local development.
|
||||||
|
|
||||||
```
|
|
||||||
make run
|
|
||||||
```
|
|
||||||
|
|
||||||
As well as launching the containers, this handles the creation and transfer of TLS certificates.
|
|
||||||
|
|
||||||
Then, visit https://quic.video/publish/?server=localhost:4443.
|
|
||||||
|
|
||||||
Alternatively, use the [dev helper scripts](dev/README.md).
|
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
|
|
|
@ -1,19 +1,9 @@
|
||||||
# Local Development
|
# Local Development
|
||||||
|
|
||||||
## Quickstart with Docker
|
|
||||||
|
|
||||||
Launch a basic cluster, including provisioning certs and deploying root certificates:
|
|
||||||
|
|
||||||
```
|
|
||||||
# From repo root:
|
|
||||||
make run
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
## Manual setup
|
|
||||||
|
|
||||||
This is a collection of helpful scripts for local development.
|
This is a collection of helpful scripts for local development.
|
||||||
|
|
||||||
|
## Setup
|
||||||
|
|
||||||
### moq-relay
|
### moq-relay
|
||||||
|
|
||||||
Unfortunately, QUIC mandates TLS and makes local development difficult.
|
Unfortunately, QUIC mandates TLS and makes local development difficult.
|
||||||
|
|
2
dev/cert
2
dev/cert
|
@ -15,4 +15,4 @@ go run filippo.io/mkcert -ecdsa -install
|
||||||
# Generate a new certificate for localhost
|
# Generate a new certificate for localhost
|
||||||
# This fork of mkcert supports the -days flag.
|
# This fork of mkcert supports the -days flag.
|
||||||
# TODO remove the -days flag when Chrome accepts self-signed certs.
|
# TODO remove the -days flag when Chrome accepts self-signed certs.
|
||||||
go run filippo.io/mkcert -ecdsa -days 10 -cert-file "$CRT" -key-file "$KEY" localhost 127.0.0.1 ::1 relay1 relay2
|
go run filippo.io/mkcert -ecdsa -days 10 -cert-file "$CRT" -key-file "$KEY" localhost 127.0.0.1 ::1
|
||||||
|
|
19
dev/clock
19
dev/clock
|
@ -1,19 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
set -euo pipefail
|
|
||||||
|
|
||||||
# Change directory to the root of the project
|
|
||||||
cd "$(dirname "$0")/.."
|
|
||||||
|
|
||||||
# Use debug logging by default
|
|
||||||
export RUST_LOG="${RUST_LOG:-debug}"
|
|
||||||
|
|
||||||
# Connect to localhost by default.
|
|
||||||
HOST="${HOST:-localhost}"
|
|
||||||
PORT="${PORT:-4443}"
|
|
||||||
ADDR="${ADDR:-$HOST:$PORT}"
|
|
||||||
NAME="${NAME:-clock}"
|
|
||||||
|
|
||||||
# Combine the host and name into a URL.
|
|
||||||
URL="${URL:-"https://$ADDR/$NAME"}"
|
|
||||||
|
|
||||||
cargo run --bin moq-clock -- "$URL" "$@"
|
|
|
@ -1,54 +0,0 @@
|
||||||
version: "3.8"
|
|
||||||
|
|
||||||
x-relay: &x-relay
|
|
||||||
build: .
|
|
||||||
entrypoint: ["moq-relay"]
|
|
||||||
environment:
|
|
||||||
RUST_LOG: ${RUST_LOG:-debug}
|
|
||||||
volumes:
|
|
||||||
- ./dev/localhost.crt:/etc/tls/cert:ro
|
|
||||||
- ./dev/localhost.key:/etc/tls/key:ro
|
|
||||||
- certs:/etc/ssl/certs
|
|
||||||
depends_on:
|
|
||||||
install-certs:
|
|
||||||
condition: service_completed_successfully
|
|
||||||
|
|
||||||
services:
|
|
||||||
redis:
|
|
||||||
image: redis:7
|
|
||||||
ports:
|
|
||||||
- "6400:6379"
|
|
||||||
|
|
||||||
api:
|
|
||||||
build: .
|
|
||||||
entrypoint: moq-api
|
|
||||||
command: --listen [::]:4442 --redis redis://redis:6379
|
|
||||||
|
|
||||||
relay1:
|
|
||||||
<<: *x-relay
|
|
||||||
command: --listen [::]:4443 --tls-cert /etc/tls/cert --tls-key /etc/tls/key --api http://api:4442 --api-node https://relay1:4443 --dev
|
|
||||||
ports:
|
|
||||||
- "4443:4443"
|
|
||||||
- "4443:4443/udp"
|
|
||||||
|
|
||||||
relay2:
|
|
||||||
<<: *x-relay
|
|
||||||
command: --listen [::]:4443 --tls-cert /etc/tls/cert --tls-key /etc/tls/key --api http://api:4442 --api-node https://relay2:4443 --dev
|
|
||||||
ports:
|
|
||||||
- "4444:4443"
|
|
||||||
- "4444:4443/udp"
|
|
||||||
|
|
||||||
install-certs:
|
|
||||||
image: golang:latest
|
|
||||||
working_dir: /work
|
|
||||||
command: go run filippo.io/mkcert -install
|
|
||||||
environment:
|
|
||||||
CAROOT: /work/caroot
|
|
||||||
volumes:
|
|
||||||
- ${CAROOT:-.}:/work/caroot
|
|
||||||
- certs:/etc/ssl/certs
|
|
||||||
- ./dev/go.mod:/work/go.mod:ro
|
|
||||||
- ./dev/go.sum:/work/go.sum:ro
|
|
||||||
|
|
||||||
volumes:
|
|
||||||
certs:
|
|
|
@ -1,46 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "moq-clock"
|
|
||||||
description = "CLOCK over QUIC"
|
|
||||||
authors = ["Luke Curley"]
|
|
||||||
repository = "https://github.com/kixelated/moq-rs"
|
|
||||||
license = "MIT OR Apache-2.0"
|
|
||||||
|
|
||||||
version = "0.1.0"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
keywords = ["quic", "http3", "webtransport", "media", "live"]
|
|
||||||
categories = ["multimedia", "network-programming", "web-programming"]
|
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
moq-transport = { path = "../moq-transport" }
|
|
||||||
|
|
||||||
# QUIC
|
|
||||||
quinn = "0.10"
|
|
||||||
webtransport-quinn = "0.6.1"
|
|
||||||
url = "2"
|
|
||||||
|
|
||||||
# Crypto
|
|
||||||
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
|
||||||
rustls-native-certs = "0.6"
|
|
||||||
rustls-pemfile = "1"
|
|
||||||
|
|
||||||
# Async stuff
|
|
||||||
tokio = { version = "1", features = ["full"] }
|
|
||||||
|
|
||||||
# CLI, logging, error handling
|
|
||||||
clap = { version = "4", features = ["derive"] }
|
|
||||||
log = { version = "0.4", features = ["std"] }
|
|
||||||
env_logger = "0.9"
|
|
||||||
anyhow = { version = "1", features = ["backtrace"] }
|
|
||||||
tracing = "0.1"
|
|
||||||
tracing-subscriber = "0.3"
|
|
||||||
|
|
||||||
# CLOCK STUFF
|
|
||||||
chrono = "0.4"
|
|
||||||
|
|
||||||
[build-dependencies]
|
|
||||||
clap = { version = "4", features = ["derive"] }
|
|
||||||
clap_mangen = "0.2"
|
|
||||||
url = "2"
|
|
|
@ -1,46 +0,0 @@
|
||||||
use clap::Parser;
|
|
||||||
use std::{net, path};
|
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
#[derive(Parser, Clone, Debug)]
|
|
||||||
pub struct Config {
|
|
||||||
/// Listen for UDP packets on the given address.
|
|
||||||
#[arg(long, default_value = "[::]:0")]
|
|
||||||
pub bind: net::SocketAddr,
|
|
||||||
|
|
||||||
/// Connect to the given URL starting with https://
|
|
||||||
#[arg(value_parser = moq_url)]
|
|
||||||
pub url: Url,
|
|
||||||
|
|
||||||
/// Use the TLS root CA at this path, encoded as PEM.
|
|
||||||
///
|
|
||||||
/// This value can be provided multiple times for multiple roots.
|
|
||||||
/// If this is empty, system roots will be used instead
|
|
||||||
#[arg(long)]
|
|
||||||
pub tls_root: Vec<path::PathBuf>,
|
|
||||||
|
|
||||||
/// Danger: Disable TLS certificate verification.
|
|
||||||
///
|
|
||||||
/// Fine for local development, but should be used in caution in production.
|
|
||||||
#[arg(long)]
|
|
||||||
pub tls_disable_verify: bool,
|
|
||||||
|
|
||||||
/// Publish the current time to the relay, otherwise only subscribe.
|
|
||||||
#[arg(long)]
|
|
||||||
pub publish: bool,
|
|
||||||
|
|
||||||
/// The name of the clock track.
|
|
||||||
#[arg(long, default_value = "now")]
|
|
||||||
pub track: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn moq_url(s: &str) -> Result<Url, String> {
|
|
||||||
let url = Url::try_from(s).map_err(|e| e.to_string())?;
|
|
||||||
|
|
||||||
// Make sure the scheme is moq
|
|
||||||
if url.scheme() != "https" {
|
|
||||||
return Err("url scheme must be https:// for WebTransport".to_string());
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(url)
|
|
||||||
}
|
|
|
@ -1,148 +0,0 @@
|
||||||
use std::time;
|
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use moq_transport::{
|
|
||||||
cache::{fragment, segment, track},
|
|
||||||
VarInt,
|
|
||||||
};
|
|
||||||
|
|
||||||
use chrono::prelude::*;
|
|
||||||
|
|
||||||
pub struct Publisher {
|
|
||||||
track: track::Publisher,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Publisher {
|
|
||||||
pub fn new(track: track::Publisher) -> Self {
|
|
||||||
Self { track }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run(mut self) -> anyhow::Result<()> {
|
|
||||||
let start = Utc::now();
|
|
||||||
let mut now = start;
|
|
||||||
|
|
||||||
// Just for fun, don't start at zero.
|
|
||||||
let mut sequence = start.minute();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let segment = self
|
|
||||||
.track
|
|
||||||
.create_segment(segment::Info {
|
|
||||||
sequence: VarInt::from_u32(sequence),
|
|
||||||
priority: 0,
|
|
||||||
expires: Some(time::Duration::from_secs(60)),
|
|
||||||
})
|
|
||||||
.context("failed to create minute segment")?;
|
|
||||||
|
|
||||||
sequence += 1;
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(err) = Self::send_segment(segment, now).await {
|
|
||||||
log::warn!("failed to send minute: {:?}", err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let next = now + chrono::Duration::minutes(1);
|
|
||||||
let next = next.with_second(0).unwrap().with_nanosecond(0).unwrap();
|
|
||||||
|
|
||||||
let delay = (next - now).to_std().unwrap();
|
|
||||||
tokio::time::sleep(delay).await;
|
|
||||||
|
|
||||||
now = next; // just assume we didn't undersleep
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send_segment(mut segment: segment::Publisher, mut now: DateTime<Utc>) -> anyhow::Result<()> {
|
|
||||||
// Everything but the second.
|
|
||||||
let base = now.format("%Y-%m-%d %H:%M:").to_string();
|
|
||||||
|
|
||||||
segment
|
|
||||||
.fragment(VarInt::ZERO, base.len())?
|
|
||||||
.chunk(base.clone().into())
|
|
||||||
.context("failed to write base")?;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let delta = now.format("%S").to_string();
|
|
||||||
let sequence = VarInt::from_u32(now.second() + 1);
|
|
||||||
|
|
||||||
segment
|
|
||||||
.fragment(sequence, delta.len())?
|
|
||||||
.chunk(delta.clone().into())
|
|
||||||
.context("failed to write delta")?;
|
|
||||||
|
|
||||||
println!("{}{}", base, delta);
|
|
||||||
|
|
||||||
let next = now + chrono::Duration::seconds(1);
|
|
||||||
let next = next.with_nanosecond(0).unwrap();
|
|
||||||
|
|
||||||
let delay = (next - now).to_std().unwrap();
|
|
||||||
tokio::time::sleep(delay).await;
|
|
||||||
|
|
||||||
// Get the current time again to check if we overslept
|
|
||||||
let next = Utc::now();
|
|
||||||
if next.minute() != now.minute() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
now = next;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub struct Subscriber {
|
|
||||||
track: track::Subscriber,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Subscriber {
|
|
||||||
pub fn new(track: track::Subscriber) -> Self {
|
|
||||||
Self { track }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run(mut self) -> anyhow::Result<()> {
|
|
||||||
while let Some(segment) = self.track.segment().await.context("failed to get segment")? {
|
|
||||||
log::debug!("got segment: {:?}", segment);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(err) = Self::recv_segment(segment).await {
|
|
||||||
log::warn!("failed to receive segment: {:?}", err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn recv_segment(mut segment: segment::Subscriber) -> anyhow::Result<()> {
|
|
||||||
let first = segment
|
|
||||||
.fragment()
|
|
||||||
.await
|
|
||||||
.context("failed to get first fragment")?
|
|
||||||
.context("no fragments in segment")?;
|
|
||||||
|
|
||||||
log::debug!("got first: {:?}", first);
|
|
||||||
|
|
||||||
if first.sequence.into_inner() != 0 {
|
|
||||||
anyhow::bail!("first object must be zero; I'm not going to implement a reassembly buffer");
|
|
||||||
}
|
|
||||||
|
|
||||||
let base = Self::recv_fragment(first, Vec::new()).await?;
|
|
||||||
|
|
||||||
log::debug!("read base: {:?}", String::from_utf8_lossy(&base));
|
|
||||||
|
|
||||||
while let Some(fragment) = segment.fragment().await? {
|
|
||||||
log::debug!("next fragment: {:?}", fragment);
|
|
||||||
let value = Self::recv_fragment(fragment, base.clone()).await?;
|
|
||||||
let str = String::from_utf8(value).context("invalid UTF-8")?;
|
|
||||||
|
|
||||||
println!("{}", str);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn recv_fragment(mut fragment: fragment::Subscriber, mut buf: Vec<u8>) -> anyhow::Result<Vec<u8>> {
|
|
||||||
while let Some(data) = fragment.chunk().await? {
|
|
||||||
buf.extend_from_slice(&data);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(buf)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,123 +0,0 @@
|
||||||
use std::{fs, io, sync::Arc, time};
|
|
||||||
|
|
||||||
use anyhow::Context;
|
|
||||||
use clap::Parser;
|
|
||||||
|
|
||||||
mod cli;
|
|
||||||
mod clock;
|
|
||||||
|
|
||||||
use moq_transport::cache::broadcast;
|
|
||||||
|
|
||||||
// TODO: clap complete
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> anyhow::Result<()> {
|
|
||||||
env_logger::init();
|
|
||||||
|
|
||||||
// Disable tracing so we don't get a bunch of Quinn spam.
|
|
||||||
let tracer = tracing_subscriber::FmtSubscriber::builder()
|
|
||||||
.with_max_level(tracing::Level::WARN)
|
|
||||||
.finish();
|
|
||||||
tracing::subscriber::set_global_default(tracer).unwrap();
|
|
||||||
|
|
||||||
let config = cli::Config::parse();
|
|
||||||
|
|
||||||
// Create a list of acceptable root certificates.
|
|
||||||
let mut roots = rustls::RootCertStore::empty();
|
|
||||||
|
|
||||||
if config.tls_root.is_empty() {
|
|
||||||
// Add the platform's native root certificates.
|
|
||||||
for cert in rustls_native_certs::load_native_certs().context("could not load platform certs")? {
|
|
||||||
roots
|
|
||||||
.add(&rustls::Certificate(cert.0))
|
|
||||||
.context("failed to add root cert")?;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Add the specified root certificates.
|
|
||||||
for root in &config.tls_root {
|
|
||||||
let root = fs::File::open(root).context("failed to open root cert file")?;
|
|
||||||
let mut root = io::BufReader::new(root);
|
|
||||||
|
|
||||||
let root = rustls_pemfile::certs(&mut root).context("failed to read root cert")?;
|
|
||||||
anyhow::ensure!(root.len() == 1, "expected a single root cert");
|
|
||||||
let root = rustls::Certificate(root[0].to_owned());
|
|
||||||
|
|
||||||
roots.add(&root).context("failed to add root cert")?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut tls_config = rustls::ClientConfig::builder()
|
|
||||||
.with_safe_defaults()
|
|
||||||
.with_root_certificates(roots)
|
|
||||||
.with_no_client_auth();
|
|
||||||
|
|
||||||
// Allow disabling TLS verification altogether.
|
|
||||||
if config.tls_disable_verify {
|
|
||||||
let noop = NoCertificateVerification {};
|
|
||||||
tls_config.dangerous().set_certificate_verifier(Arc::new(noop));
|
|
||||||
}
|
|
||||||
|
|
||||||
tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important
|
|
||||||
|
|
||||||
let arc_tls_config = std::sync::Arc::new(tls_config);
|
|
||||||
let quinn_client_config = quinn::ClientConfig::new(arc_tls_config);
|
|
||||||
|
|
||||||
let mut endpoint = quinn::Endpoint::client(config.bind)?;
|
|
||||||
endpoint.set_default_client_config(quinn_client_config);
|
|
||||||
|
|
||||||
log::info!("connecting to relay: url={}", config.url);
|
|
||||||
|
|
||||||
let session = webtransport_quinn::connect(&endpoint, &config.url)
|
|
||||||
.await
|
|
||||||
.context("failed to create WebTransport session")?;
|
|
||||||
|
|
||||||
let (mut publisher, subscriber) = broadcast::new(""); // TODO config.namespace
|
|
||||||
|
|
||||||
if config.publish {
|
|
||||||
let session = moq_transport::session::Client::publisher(session, subscriber)
|
|
||||||
.await
|
|
||||||
.context("failed to create MoQ Transport session")?;
|
|
||||||
|
|
||||||
let publisher = publisher
|
|
||||||
.create_track(&config.track)
|
|
||||||
.context("failed to create clock track")?;
|
|
||||||
let clock = clock::Publisher::new(publisher);
|
|
||||||
|
|
||||||
tokio::select! {
|
|
||||||
res = session.run() => res.context("session error")?,
|
|
||||||
res = clock.run() => res.context("clock error")?,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let session = moq_transport::session::Client::subscriber(session, publisher)
|
|
||||||
.await
|
|
||||||
.context("failed to create MoQ Transport session")?;
|
|
||||||
|
|
||||||
let subscriber = subscriber
|
|
||||||
.get_track(&config.track)
|
|
||||||
.context("failed to get clock track")?;
|
|
||||||
let clock = clock::Subscriber::new(subscriber);
|
|
||||||
|
|
||||||
tokio::select! {
|
|
||||||
res = session.run() => res.context("session error")?,
|
|
||||||
res = clock.run() => res.context("clock error")?,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct NoCertificateVerification {}
|
|
||||||
|
|
||||||
impl rustls::client::ServerCertVerifier for NoCertificateVerification {
|
|
||||||
fn verify_server_cert(
|
|
||||||
&self,
|
|
||||||
_end_entity: &rustls::Certificate,
|
|
||||||
_intermediates: &[rustls::Certificate],
|
|
||||||
_server_name: &rustls::ServerName,
|
|
||||||
_scts: &mut dyn Iterator<Item = &[u8]>,
|
|
||||||
_ocsp_response: &[u8],
|
|
||||||
_now: time::SystemTime,
|
|
||||||
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
|
|
||||||
Ok(rustls::client::ServerCertVerified::assertion())
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -18,7 +18,7 @@ moq-transport = { path = "../moq-transport" }
|
||||||
|
|
||||||
# QUIC
|
# QUIC
|
||||||
quinn = "0.10"
|
quinn = "0.10"
|
||||||
webtransport-quinn = "0.6.1"
|
webtransport-quinn = "0.6"
|
||||||
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }
|
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }
|
||||||
url = "2"
|
url = "2"
|
||||||
|
|
||||||
|
|
|
@ -42,16 +42,19 @@ impl Media {
|
||||||
|
|
||||||
// Create the catalog track with a single segment.
|
// Create the catalog track with a single segment.
|
||||||
let mut init_track = broadcast.create_track("0.mp4")?;
|
let mut init_track = broadcast.create_track("0.mp4")?;
|
||||||
let init_segment = init_track.create_segment(segment::Info {
|
let mut init_segment = init_track.create_segment(segment::Info {
|
||||||
sequence: VarInt::ZERO,
|
sequence: VarInt::ZERO,
|
||||||
priority: 0,
|
priority: 0,
|
||||||
expires: None,
|
expires: None,
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// Create a single fragment, optionally setting the size
|
// Create a single fragment, optionally setting the size
|
||||||
let mut init_fragment = init_segment.final_fragment(VarInt::ZERO)?;
|
let mut init_fragment = init_segment.create_fragment(fragment::Info {
|
||||||
|
sequence: VarInt::ZERO,
|
||||||
|
size: None, // size is only needed when we have multiple fragments.
|
||||||
|
})?;
|
||||||
|
|
||||||
init_fragment.chunk(init.into())?;
|
init_fragment.write_chunk(init.into())?;
|
||||||
|
|
||||||
let mut tracks = HashMap::new();
|
let mut tracks = HashMap::new();
|
||||||
|
|
||||||
|
@ -129,7 +132,7 @@ impl Media {
|
||||||
init_track_name: &str,
|
init_track_name: &str,
|
||||||
moov: &mp4::MoovBox,
|
moov: &mp4::MoovBox,
|
||||||
) -> Result<(), anyhow::Error> {
|
) -> Result<(), anyhow::Error> {
|
||||||
let segment = track.create_segment(segment::Info {
|
let mut segment = track.create_segment(segment::Info {
|
||||||
sequence: VarInt::ZERO,
|
sequence: VarInt::ZERO,
|
||||||
priority: 0,
|
priority: 0,
|
||||||
expires: None,
|
expires: None,
|
||||||
|
@ -215,10 +218,13 @@ impl Media {
|
||||||
log::info!("catalog: {}", catalog_str);
|
log::info!("catalog: {}", catalog_str);
|
||||||
|
|
||||||
// Create a single fragment for the segment.
|
// Create a single fragment for the segment.
|
||||||
let mut fragment = segment.final_fragment(VarInt::ZERO)?;
|
let mut fragment = segment.create_fragment(fragment::Info {
|
||||||
|
sequence: VarInt::ZERO,
|
||||||
|
size: None, // Size is only needed when we have multiple fragments.
|
||||||
|
})?;
|
||||||
|
|
||||||
// Add the segment and add the fragment.
|
// Add the segment and add the fragment.
|
||||||
fragment.chunk(catalog_str.into())?;
|
fragment.write_chunk(catalog_str.into())?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -289,7 +295,7 @@ impl Track {
|
||||||
if let Some(current) = self.current.as_mut() {
|
if let Some(current) = self.current.as_mut() {
|
||||||
if !fragment.keyframe {
|
if !fragment.keyframe {
|
||||||
// Use the existing segment
|
// Use the existing segment
|
||||||
current.chunk(raw.into())?;
|
current.write_chunk(raw.into())?;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -305,7 +311,7 @@ impl Track {
|
||||||
.context("timestamp too large")?;
|
.context("timestamp too large")?;
|
||||||
|
|
||||||
// Create a new segment.
|
// Create a new segment.
|
||||||
let segment = self.track.create_segment(segment::Info {
|
let mut segment = self.track.create_segment(segment::Info {
|
||||||
sequence: VarInt::try_from(self.sequence).context("sequence too large")?,
|
sequence: VarInt::try_from(self.sequence).context("sequence too large")?,
|
||||||
|
|
||||||
// Newer segments are higher priority
|
// Newer segments are higher priority
|
||||||
|
@ -316,12 +322,15 @@ impl Track {
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// Create a single fragment for the segment that we will keep appending.
|
// Create a single fragment for the segment that we will keep appending.
|
||||||
let mut fragment = segment.final_fragment(VarInt::ZERO)?;
|
let mut fragment = segment.create_fragment(fragment::Info {
|
||||||
|
sequence: VarInt::ZERO,
|
||||||
|
size: None,
|
||||||
|
})?;
|
||||||
|
|
||||||
self.sequence += 1;
|
self.sequence += 1;
|
||||||
|
|
||||||
// Insert the raw atom into the segment.
|
// Insert the raw atom into the segment.
|
||||||
fragment.chunk(raw.into())?;
|
fragment.write_chunk(raw.into())?;
|
||||||
|
|
||||||
// Save for the next iteration
|
// Save for the next iteration
|
||||||
self.current = Some(fragment);
|
self.current = Some(fragment);
|
||||||
|
@ -331,7 +340,7 @@ impl Track {
|
||||||
|
|
||||||
pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
|
pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
|
||||||
let fragment = self.current.as_mut().context("missing current fragment")?;
|
let fragment = self.current.as_mut().context("missing current fragment")?;
|
||||||
fragment.chunk(raw.into())?;
|
fragment.write_chunk(raw.into())?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ moq-api = { path = "../moq-api" }
|
||||||
|
|
||||||
# QUIC
|
# QUIC
|
||||||
quinn = "0.10"
|
quinn = "0.10"
|
||||||
webtransport-quinn = "0.6.1"
|
webtransport-quinn = "0.6"
|
||||||
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }
|
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }
|
||||||
url = "2"
|
url = "2"
|
||||||
|
|
||||||
|
|
|
@ -22,31 +22,8 @@ log = "0.4"
|
||||||
indexmap = "2"
|
indexmap = "2"
|
||||||
|
|
||||||
quinn = "0.10"
|
quinn = "0.10"
|
||||||
webtransport-quinn = "0.6.1"
|
webtransport-quinn = "0.6"
|
||||||
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }
|
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }
|
||||||
|
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
paste = "1"
|
paste = "1"
|
||||||
|
|
||||||
[dev-dependencies]
|
|
||||||
# QUIC
|
|
||||||
url = "2"
|
|
||||||
|
|
||||||
# Crypto
|
|
||||||
rustls = { version = "0.21", features = ["dangerous_configuration"] }
|
|
||||||
rustls-native-certs = "0.6"
|
|
||||||
rustls-pemfile = "1"
|
|
||||||
|
|
||||||
# Async stuff
|
|
||||||
tokio = { version = "1", features = ["full"] }
|
|
||||||
|
|
||||||
# CLI, logging, error handling
|
|
||||||
clap = { version = "4", features = ["derive"] }
|
|
||||||
log = { version = "0.4", features = ["std"] }
|
|
||||||
env_logger = "0.9"
|
|
||||||
mp4 = "0.13"
|
|
||||||
anyhow = { version = "1", features = ["backtrace"] }
|
|
||||||
serde_json = "1"
|
|
||||||
rfc6381-codec = "0.1"
|
|
||||||
tracing = "0.1"
|
|
||||||
tracing-subscriber = "0.3"
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ pub struct Info {
|
||||||
|
|
||||||
// The size of the fragment, optionally None if this is the last fragment in a segment.
|
// The size of the fragment, optionally None if this is the last fragment in a segment.
|
||||||
// TODO enforce this size.
|
// TODO enforce this size.
|
||||||
pub size: Option<usize>,
|
pub size: Option<VarInt>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct State {
|
struct State {
|
||||||
|
@ -53,6 +53,10 @@ impl State {
|
||||||
self.closed = Err(err);
|
self.closed = Err(err);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn bytes(&self) -> usize {
|
||||||
|
self.chunks.iter().map(|f| f.len()).sum::<usize>()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for State {
|
impl Default for State {
|
||||||
|
@ -67,7 +71,11 @@ impl Default for State {
|
||||||
impl fmt::Debug for State {
|
impl fmt::Debug for State {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
// We don't want to print out the contents, so summarize.
|
// We don't want to print out the contents, so summarize.
|
||||||
f.debug_struct("State").field("closed", &self.closed).finish()
|
f.debug_struct("State")
|
||||||
|
.field("chunks", &self.chunks.len().to_string())
|
||||||
|
.field("bytes", &self.bytes().to_string())
|
||||||
|
.field("closed", &self.closed)
|
||||||
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,7 +98,7 @@ impl Publisher {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write a new chunk of bytes.
|
/// Write a new chunk of bytes.
|
||||||
pub fn chunk(&mut self, chunk: Bytes) -> Result<(), CacheError> {
|
pub fn write_chunk(&mut self, chunk: Bytes) -> Result<(), CacheError> {
|
||||||
let mut state = self.state.lock_mut();
|
let mut state = self.state.lock_mut();
|
||||||
state.closed.clone()?;
|
state.closed.clone()?;
|
||||||
state.chunks.push(chunk);
|
state.chunks.push(chunk);
|
||||||
|
@ -150,7 +158,7 @@ impl Subscriber {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Block until the next chunk of bytes is available.
|
/// Block until the next chunk of bytes is available.
|
||||||
pub async fn chunk(&mut self) -> Result<Option<Bytes>, CacheError> {
|
pub async fn read_chunk(&mut self) -> Result<Option<Bytes>, CacheError> {
|
||||||
loop {
|
loop {
|
||||||
let notify = {
|
let notify = {
|
||||||
let state = self.state.lock();
|
let state = self.state.lock();
|
||||||
|
|
|
@ -91,30 +91,20 @@ impl Publisher {
|
||||||
Self { state, info, _dropped }
|
Self { state, info, _dropped }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not public because it's a footgun.
|
/// Write a fragment
|
||||||
pub(crate) fn push_fragment(
|
pub fn push_fragment(&mut self, fragment: fragment::Subscriber) -> Result<(), CacheError> {
|
||||||
&mut self,
|
|
||||||
sequence: VarInt,
|
|
||||||
size: Option<usize>,
|
|
||||||
) -> Result<fragment::Publisher, CacheError> {
|
|
||||||
let (publisher, subscriber) = fragment::new(fragment::Info { sequence, size });
|
|
||||||
|
|
||||||
let mut state = self.state.lock_mut();
|
let mut state = self.state.lock_mut();
|
||||||
state.closed.clone()?;
|
state.closed.clone()?;
|
||||||
state.fragments.push(subscriber);
|
state.fragments.push(fragment);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_fragment(&mut self, fragment: fragment::Info) -> Result<fragment::Publisher, CacheError> {
|
||||||
|
let (publisher, subscriber) = fragment::new(fragment);
|
||||||
|
self.push_fragment(subscriber)?;
|
||||||
Ok(publisher)
|
Ok(publisher)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Write a fragment
|
|
||||||
pub fn fragment(&mut self, sequence: VarInt, size: usize) -> Result<fragment::Publisher, CacheError> {
|
|
||||||
self.push_fragment(sequence, Some(size))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Write the last fragment, which means size can be unknown.
|
|
||||||
pub fn final_fragment(mut self, sequence: VarInt) -> Result<fragment::Publisher, CacheError> {
|
|
||||||
self.push_fragment(sequence, None)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Close the segment with an error.
|
/// Close the segment with an error.
|
||||||
pub fn close(self, err: CacheError) -> Result<(), CacheError> {
|
pub fn close(self, err: CacheError) -> Result<(), CacheError> {
|
||||||
self.state.lock_mut().close(err)
|
self.state.lock_mut().close(err)
|
||||||
|
@ -168,7 +158,7 @@ impl Subscriber {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Block until the next chunk of bytes is available.
|
/// Block until the next chunk of bytes is available.
|
||||||
pub async fn fragment(&mut self) -> Result<Option<fragment::Subscriber>, CacheError> {
|
pub async fn next_fragment(&mut self) -> Result<Option<fragment::Subscriber>, CacheError> {
|
||||||
loop {
|
loop {
|
||||||
let notify = {
|
let notify = {
|
||||||
let state = self.state.lock();
|
let state = self.state.lock();
|
||||||
|
|
|
@ -207,7 +207,7 @@ impl Subscriber {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Block until the next segment arrives
|
/// Block until the next segment arrives
|
||||||
pub async fn segment(&mut self) -> Result<Option<segment::Subscriber>, CacheError> {
|
pub async fn next_segment(&mut self) -> Result<Option<segment::Subscriber>, CacheError> {
|
||||||
loop {
|
loop {
|
||||||
let notify = {
|
let notify = {
|
||||||
let state = self.state.lock();
|
let state = self.state.lock();
|
||||||
|
|
|
@ -45,13 +45,10 @@ impl Client {
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
log::debug!("sending client SETUP: {:?}", client);
|
|
||||||
client.encode(&mut control.0).await?;
|
client.encode(&mut control.0).await?;
|
||||||
|
|
||||||
let mut server = setup::Server::decode(&mut control.1).await?;
|
let mut server = setup::Server::decode(&mut control.1).await?;
|
||||||
|
|
||||||
log::debug!("received server SETUP: {:?}", server);
|
|
||||||
|
|
||||||
match server.version {
|
match server.version {
|
||||||
setup::Version::DRAFT_01 => {
|
setup::Version::DRAFT_01 => {
|
||||||
// We always require this extension
|
// We always require this extension
|
||||||
|
|
|
@ -48,10 +48,6 @@ pub enum SessionError {
|
||||||
#[error("required extension not offered: {0:?}")]
|
#[error("required extension not offered: {0:?}")]
|
||||||
RequiredExtension(VarInt),
|
RequiredExtension(VarInt),
|
||||||
|
|
||||||
/// Some VarInt was too large and we were too lazy to handle it
|
|
||||||
#[error("varint bounds exceeded")]
|
|
||||||
BoundsExceeded(#[from] coding::BoundsExceeded),
|
|
||||||
|
|
||||||
/// An unclassified error because I'm lazy. TODO classify these errors
|
/// An unclassified error because I'm lazy. TODO classify these errors
|
||||||
#[error("unknown error: {0}")]
|
#[error("unknown error: {0}")]
|
||||||
Unknown(String),
|
Unknown(String),
|
||||||
|
@ -75,7 +71,6 @@ impl MoqError for SessionError {
|
||||||
Self::InvalidPriority(_) => 400,
|
Self::InvalidPriority(_) => 400,
|
||||||
Self::InvalidSize(_) => 400,
|
Self::InvalidSize(_) => 400,
|
||||||
Self::RequiredExtension(_) => 426,
|
Self::RequiredExtension(_) => 426,
|
||||||
Self::BoundsExceeded(_) => 500,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,7 +96,6 @@ impl MoqError for SessionError {
|
||||||
Self::InvalidPriority(priority) => format!("invalid priority: {}", priority),
|
Self::InvalidPriority(priority) => format!("invalid priority: {}", priority),
|
||||||
Self::InvalidSize(size) => format!("invalid size: {}", size),
|
Self::InvalidSize(size) => format!("invalid size: {}", size),
|
||||||
Self::RequiredExtension(id) => format!("required extension was missing: {:?}", id),
|
Self::RequiredExtension(id) => format!("required extension was missing: {:?}", id),
|
||||||
Self::BoundsExceeded(_) => "varint bounds exceeded".to_string(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,7 +170,7 @@ impl Publisher {
|
||||||
async fn run_subscribe(&self, id: VarInt, track: &mut track::Subscriber) -> Result<(), SessionError> {
|
async fn run_subscribe(&self, id: VarInt, track: &mut track::Subscriber) -> Result<(), SessionError> {
|
||||||
// TODO add an Ok method to track::Publisher so we can send SUBSCRIBE_OK
|
// TODO add an Ok method to track::Publisher so we can send SUBSCRIBE_OK
|
||||||
|
|
||||||
while let Some(mut segment) = track.segment().await? {
|
while let Some(mut segment) = track.next_segment().await? {
|
||||||
// TODO only clone the fields we need
|
// TODO only clone the fields we need
|
||||||
let this = self.clone();
|
let this = self.clone();
|
||||||
|
|
||||||
|
@ -193,9 +193,7 @@ impl Publisher {
|
||||||
let priority = (segment.priority as i64 - i32::MAX as i64) as i32;
|
let priority = (segment.priority as i64 - i32::MAX as i64) as i32;
|
||||||
stream.set_priority(priority).ok();
|
stream.set_priority(priority).ok();
|
||||||
|
|
||||||
while let Some(mut fragment) = segment.fragment().await? {
|
while let Some(mut fragment) = segment.next_fragment().await? {
|
||||||
log::trace!("serving fragment: {:?}", fragment);
|
|
||||||
|
|
||||||
let object = message::Object {
|
let object = message::Object {
|
||||||
track: id,
|
track: id,
|
||||||
|
|
||||||
|
@ -206,7 +204,7 @@ impl Publisher {
|
||||||
|
|
||||||
// Properties of the fragment
|
// Properties of the fragment
|
||||||
sequence: fragment.sequence,
|
sequence: fragment.sequence,
|
||||||
size: fragment.size.map(VarInt::try_from).transpose()?,
|
size: fragment.size,
|
||||||
};
|
};
|
||||||
|
|
||||||
object
|
object
|
||||||
|
@ -214,8 +212,7 @@ impl Publisher {
|
||||||
.await
|
.await
|
||||||
.map_err(|e| SessionError::Unknown(e.to_string()))?;
|
.map_err(|e| SessionError::Unknown(e.to_string()))?;
|
||||||
|
|
||||||
while let Some(chunk) = fragment.chunk().await? {
|
while let Some(chunk) = fragment.read_chunk().await? {
|
||||||
//log::trace!("writing chunk: {:?}", chunk);
|
|
||||||
stream.write_all(&chunk).await?;
|
stream.write_all(&chunk).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,8 +15,6 @@ impl Server {
|
||||||
|
|
||||||
let mut client = setup::Client::decode(&mut control.1).await?;
|
let mut client = setup::Client::decode(&mut control.1).await?;
|
||||||
|
|
||||||
log::debug!("received client SETUP: {:?}", client);
|
|
||||||
|
|
||||||
if client.versions.contains(&setup::Version::DRAFT_01) {
|
if client.versions.contains(&setup::Version::DRAFT_01) {
|
||||||
// We always require subscriber ID.
|
// We always require subscriber ID.
|
||||||
client.extensions.require_subscriber_id()?;
|
client.extensions.require_subscriber_id()?;
|
||||||
|
@ -93,8 +91,6 @@ impl Request {
|
||||||
params: Default::default(),
|
params: Default::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
log::debug!("sending server SETUP: {:?}", server);
|
|
||||||
|
|
||||||
// We need to sure we support the opposite of the client's role.
|
// We need to sure we support the opposite of the client's role.
|
||||||
// ex. if the client is a publisher, we must be a subscriber ONLY.
|
// ex. if the client is a publisher, we must be a subscriber ONLY.
|
||||||
if !self.client.role.is_compatible(server.role) {
|
if !self.client.role.is_compatible(server.role) {
|
||||||
|
|
|
@ -6,7 +6,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
cache::{broadcast, segment, track, CacheError},
|
cache::{broadcast, fragment, segment, track, CacheError},
|
||||||
coding::DecodeError,
|
coding::DecodeError,
|
||||||
message,
|
message,
|
||||||
message::Message,
|
message::Message,
|
||||||
|
@ -110,7 +110,7 @@ impl Subscriber {
|
||||||
.await
|
.await
|
||||||
.map_err(|e| SessionError::Unknown(e.to_string()))?;
|
.map_err(|e| SessionError::Unknown(e.to_string()))?;
|
||||||
|
|
||||||
log::trace!("first object: {:?}", object);
|
log::trace!("received object: {:?}", object);
|
||||||
|
|
||||||
// A new scope is needed because the async compiler is dumb
|
// A new scope is needed because the async compiler is dumb
|
||||||
let mut segment = {
|
let mut segment = {
|
||||||
|
@ -124,10 +124,12 @@ impl Subscriber {
|
||||||
})?
|
})?
|
||||||
};
|
};
|
||||||
|
|
||||||
log::trace!("received segment: {:?}", segment);
|
|
||||||
|
|
||||||
// Create the first fragment
|
// Create the first fragment
|
||||||
let mut fragment = segment.push_fragment(object.sequence, object.size.map(usize::from))?;
|
let mut fragment = segment.create_fragment(fragment::Info {
|
||||||
|
sequence: object.sequence,
|
||||||
|
size: object.size,
|
||||||
|
})?;
|
||||||
|
|
||||||
let mut remain = object.size.map(usize::from);
|
let mut remain = object.size.map(usize::from);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -143,8 +145,6 @@ impl Subscriber {
|
||||||
Err(err) => return Err(err.into()),
|
Err(err) => return Err(err.into()),
|
||||||
};
|
};
|
||||||
|
|
||||||
log::trace!("next object: {:?}", object);
|
|
||||||
|
|
||||||
// NOTE: This is a custom restriction; not part of the moq-transport draft.
|
// NOTE: This is a custom restriction; not part of the moq-transport draft.
|
||||||
// We require every OBJECT to contain the same priority since prioritization is done per-stream.
|
// We require every OBJECT to contain the same priority since prioritization is done per-stream.
|
||||||
// We also require every OBJECT to contain the same group so we know when the group ends, and can detect gaps.
|
// We also require every OBJECT to contain the same group so we know when the group ends, and can detect gaps.
|
||||||
|
@ -152,13 +152,14 @@ impl Subscriber {
|
||||||
return Err(SessionError::StreamMapping);
|
return Err(SessionError::StreamMapping);
|
||||||
}
|
}
|
||||||
|
|
||||||
object = next;
|
|
||||||
|
|
||||||
// Create a new object.
|
// Create a new object.
|
||||||
fragment = segment.push_fragment(object.sequence, object.size.map(usize::from))?;
|
fragment = segment.create_fragment(fragment::Info {
|
||||||
remain = object.size.map(usize::from);
|
sequence: object.sequence,
|
||||||
|
size: object.size,
|
||||||
|
})?;
|
||||||
|
|
||||||
log::trace!("next fragment: {:?}", fragment);
|
object = next;
|
||||||
|
remain = object.size.map(usize::from);
|
||||||
}
|
}
|
||||||
|
|
||||||
match stream.read_chunk(remain.unwrap_or(usize::MAX), true).await? {
|
match stream.read_chunk(remain.unwrap_or(usize::MAX), true).await? {
|
||||||
|
@ -170,12 +171,7 @@ impl Subscriber {
|
||||||
|
|
||||||
// NOTE: This does not make a copy!
|
// NOTE: This does not make a copy!
|
||||||
// Bytes are immutable and ref counted.
|
// Bytes are immutable and ref counted.
|
||||||
Some(data) => {
|
Some(data) => fragment.write_chunk(data.bytes)?,
|
||||||
remain = remain.map(|r| r - data.bytes.len());
|
|
||||||
|
|
||||||
log::trace!("next chunk: {:?}", data);
|
|
||||||
fragment.chunk(data.bytes)?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,10 +10,10 @@ pub struct Version(pub VarInt);
|
||||||
|
|
||||||
impl Version {
|
impl Version {
|
||||||
/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-00.html
|
/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-00.html
|
||||||
pub const DRAFT_00: Version = Version(VarInt::from_u32(0xff000000));
|
pub const DRAFT_00: Version = Version(VarInt::from_u32(0xff00));
|
||||||
|
|
||||||
/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-01.html
|
/// https://www.ietf.org/archive/id/draft-ietf-moq-transport-01.html
|
||||||
pub const DRAFT_01: Version = Version(VarInt::from_u32(0xff000001));
|
pub const DRAFT_01: Version = Version(VarInt::from_u32(0xff01));
|
||||||
|
|
||||||
/// Fork of draft-ietf-moq-transport-00.
|
/// Fork of draft-ietf-moq-transport-00.
|
||||||
///
|
///
|
||||||
|
|
Loading…
Reference in New Issue