Add a CLOCK. (#119)

This commit is contained in:
kixelated 2023-11-07 15:41:15 +09:00 committed by GitHub
parent df5d362754
commit 93830a2f24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 570 additions and 117 deletions

95
Cargo.lock generated
View File

@ -26,6 +26,21 @@ dependencies = [
"memchr",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.5.0"
@ -374,6 +389,20 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets",
]
[[package]]
name = "clap"
version = "4.4.2"
@ -872,6 +901,29 @@ dependencies = [
"tokio-native-tls",
]
[[package]]
name = "iana-time-zone"
version = "0.1.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "idna"
version = "0.4.0"
@ -1051,6 +1103,28 @@ dependencies = [
"url",
]
[[package]]
name = "moq-clock"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"clap",
"clap_mangen",
"env_logger",
"log",
"moq-transport",
"quinn",
"rustls",
"rustls-native-certs",
"rustls-pemfile",
"tokio",
"tracing",
"tracing-subscriber",
"url",
"webtransport-quinn",
]
[[package]]
name = "moq-pub"
version = "0.1.0"
@ -1107,14 +1181,26 @@ dependencies = [
name = "moq-transport"
version = "0.2.0"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"clap",
"env_logger",
"indexmap 2.0.0",
"log",
"mp4",
"paste",
"quinn",
"rfc6381-codec",
"rustls",
"rustls-native-certs",
"rustls-pemfile",
"serde_json",
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",
"url",
"webtransport-quinn",
]
@ -2461,6 +2547,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.51.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-sys"
version = "0.48.0"

View File

@ -1,3 +1,3 @@
[workspace]
members = ["moq-transport", "moq-relay", "moq-pub", "moq-api"]
members = ["moq-transport", "moq-relay", "moq-pub", "moq-api", "moq-clock"]
resolver = "2"

View File

@ -22,11 +22,11 @@ RUN apt-get update && \
# Copy the publish script into the image
COPY deploy/publish.sh /usr/local/bin/publish
# Copy the compiled binary
COPY --from=builder /usr/local/cargo/bin/moq-pub /usr/local/cargo/bin/moq-pub
# Copy the compiled binaries
COPY --from=builder /usr/local/cargo/bin /usr/local/cargo/bin
CMD [ "publish" ]
# moq-rs image with just the binaries
# moq-rs image with just the binaries (default)
FROM rust:latest as moq-rs
LABEL org.opencontainers.image.source=https://github.com/kixelated/moq-rs

View File

@ -1,53 +0,0 @@
# Hackathon
IETF Prague 118
## MoqTransport
Reference libraries are available at [moq-rs](https://github.com/kixelated/moq-rs) and [moq-js](https://github.com/kixelated/moq-js). The Rust library is [well documented](https://docs.rs/moq-transport/latest/moq_transport/) but the web library, not so much.
**TODO** Update both to draft-01.
**TODO** Switch any remaining forks over to extensions. ex: track_id in SUBSCRIBE
The stream mapping right now is quite rigid: `stream == group == object`.
**TODO** Support multiple objects per group. They MUST NOT use different priorities, different tracks, or out-of-order sequences.
The API and cache aren't designed to send/receive arbitrary objects over arbitrary streams as specified in the draft. I don't think it should, and it wouldn't be possible to implement in time for the hackathon anyway.
**TODO** Make an extension to enforce this stream mapping?
## Generic Relay
I'm hosting a simple CDN at: `relay.quic.video`
The traffic is sharded based on the WebTransport path to avoid namespace collisions. Think of it like a customer ID, although it's completely unauthenticated for now. Use your username or whatever string you want: `CONNECT https://relay.quic.video/alan`.
**TODO** Currently, it performs an implicit `ANNOUNCE ""` when `role=publisher`. This means there can only be a single publisher per shard and `role=both` is not supported. I should have explicit `ANNOUNCE` messages supported before the hackathon to remove this limitation.
**TODO** I don't know if I will have subscribe hints fully working in time. They will be parsed but might be ignored.
## CMAF Media
You can [publish](https://quic.video/publish) and [watch](https://quic.video/watch) broadcasts.
There's a [24/7 bunny stream](https://quic.video/watch/bbb) or you can publish your own using [moq-pub](https://github.com/kixelated/moq-rs/tree/main/moq-pub).
If you want to fetch from the relay directly, the name of the broadcast is the path. For example, `https://quic.video/watch/bbb` can be accessed at `relay.quic.video/bbb`.
The namespace is empty and the catalog track is `.catalog`. I'm currently using simple JSON catalog with no support for delta updates.
**TODO** update to the proposed [Warp catalog](https://datatracker.ietf.org/doc/draft-wilaw-moq-catalogformat/).
The media tracks uses a single (unbounded) object per group. Video groups are per GoP, while audio groups are per frame. There's also an init track containing information required to initialize the decoder.
**TODO** Base64 encode the init track in the catalog.
## Clock
**TODO** Host a clock demo that sends a group per second:
```
GROUP: YYYY-MM-DD HH:MM
OBJECT: SS
```

19
dev/clock Executable file
View File

@ -0,0 +1,19 @@
#!/bin/bash
set -euo pipefail
# Change directory to the root of the project
cd "$(dirname "$0")/.."
# Use debug logging by default
export RUST_LOG="${RUST_LOG:-debug}"
# Connect to localhost by default.
HOST="${HOST:-localhost}"
PORT="${PORT:-4443}"
ADDR="${ADDR:-$HOST:$PORT}"
NAME="${NAME:-clock}"
# Combine the host and name into a URL.
URL="${URL:-"https://$ADDR/$NAME"}"
cargo run --bin moq-clock -- "$URL" "$@"

46
moq-clock/Cargo.toml Normal file
View File

@ -0,0 +1,46 @@
[package]
name = "moq-clock"
description = "CLOCK over QUIC"
authors = ["Luke Curley"]
repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0"
version = "0.1.0"
edition = "2021"
keywords = ["quic", "http3", "webtransport", "media", "live"]
categories = ["multimedia", "network-programming", "web-programming"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
moq-transport = { path = "../moq-transport" }
# QUIC
quinn = "0.10"
webtransport-quinn = "0.6"
url = "2"
# Crypto
rustls = { version = "0.21", features = ["dangerous_configuration"] }
rustls-native-certs = "0.6"
rustls-pemfile = "1"
# Async stuff
tokio = { version = "1", features = ["full"] }
# CLI, logging, error handling
clap = { version = "4", features = ["derive"] }
log = { version = "0.4", features = ["std"] }
env_logger = "0.9"
anyhow = { version = "1", features = ["backtrace"] }
tracing = "0.1"
tracing-subscriber = "0.3"
# CLOCK STUFF
chrono = "0.4"
[build-dependencies]
clap = { version = "4", features = ["derive"] }
clap_mangen = "0.2"
url = "2"

46
moq-clock/src/cli.rs Normal file
View File

@ -0,0 +1,46 @@
use clap::Parser;
use std::{net, path};
use url::Url;
#[derive(Parser, Clone, Debug)]
pub struct Config {
/// Listen for UDP packets on the given address.
#[arg(long, default_value = "[::]:0")]
pub bind: net::SocketAddr,
/// Connect to the given URL starting with https://
#[arg(value_parser = moq_url)]
pub url: Url,
/// Use the TLS root CA at this path, encoded as PEM.
///
/// This value can be provided multiple times for multiple roots.
/// If this is empty, system roots will be used instead
#[arg(long)]
pub tls_root: Vec<path::PathBuf>,
/// Danger: Disable TLS certificate verification.
///
/// Fine for local development, but should be used in caution in production.
#[arg(long)]
pub tls_disable_verify: bool,
/// Publish the current time to the relay, otherwise only subscribe.
#[arg(long)]
pub publish: bool,
/// The name of the clock track.
#[arg(long, default_value = "now")]
pub track: String,
}
fn moq_url(s: &str) -> Result<Url, String> {
let url = Url::try_from(s).map_err(|e| e.to_string())?;
// Make sure the scheme is moq
if url.scheme() != "https" {
return Err("url scheme must be https:// for WebTransport".to_string());
}
Ok(url)
}

148
moq-clock/src/clock.rs Normal file
View File

@ -0,0 +1,148 @@
use std::time;
use anyhow::Context;
use moq_transport::{
cache::{fragment, segment, track},
VarInt,
};
use chrono::prelude::*;
pub struct Publisher {
track: track::Publisher,
}
impl Publisher {
pub fn new(track: track::Publisher) -> Self {
Self { track }
}
pub async fn run(mut self) -> anyhow::Result<()> {
let start = Utc::now();
let mut now = start;
// Just for fun, don't start at zero.
let mut sequence = start.minute();
loop {
let segment = self
.track
.create_segment(segment::Info {
sequence: VarInt::from_u32(sequence),
priority: 0,
expires: Some(time::Duration::from_secs(60)),
})
.context("failed to create minute segment")?;
sequence += 1;
tokio::spawn(async move {
if let Err(err) = Self::send_segment(segment, now).await {
log::warn!("failed to send minute: {:?}", err);
}
});
let next = now + chrono::Duration::minutes(1);
let next = next.with_second(0).unwrap().with_nanosecond(0).unwrap();
let delay = (next - now).to_std().unwrap();
tokio::time::sleep(delay).await;
now = next; // just assume we didn't undersleep
}
}
async fn send_segment(mut segment: segment::Publisher, mut now: DateTime<Utc>) -> anyhow::Result<()> {
// Everything but the second.
let base = now.format("%Y-%m-%d %H:%M:").to_string();
segment
.fragment(VarInt::ZERO, base.len())?
.chunk(base.clone().into())
.context("failed to write base")?;
loop {
let delta = now.format("%S").to_string();
let sequence = VarInt::from_u32(now.second() + 1);
segment
.fragment(sequence, delta.len())?
.chunk(delta.clone().into())
.context("failed to write delta")?;
log::info!("{}{}", base, delta);
let next = now + chrono::Duration::seconds(1);
let next = next.with_nanosecond(0).unwrap();
let delay = (next - now).to_std().unwrap();
tokio::time::sleep(delay).await;
// Get the current time again to check if we overslept
let next = Utc::now();
if next.minute() != now.minute() {
return Ok(());
}
now = next;
}
}
}
pub struct Subscriber {
track: track::Subscriber,
}
impl Subscriber {
pub fn new(track: track::Subscriber) -> Self {
Self { track }
}
pub async fn run(mut self) -> anyhow::Result<()> {
while let Some(segment) = self.track.segment().await.context("failed to get segment")? {
log::debug!("got segment: {:?}", segment);
tokio::spawn(async move {
if let Err(err) = Self::recv_segment(segment).await {
log::warn!("failed to receive segment: {:?}", err);
}
});
}
Ok(())
}
async fn recv_segment(mut segment: segment::Subscriber) -> anyhow::Result<()> {
let first = segment
.fragment()
.await
.context("failed to get first fragment")?
.context("no fragments in segment")?;
log::debug!("got first: {:?}", first);
if first.sequence.into_inner() != 0 {
anyhow::bail!("first object must be zero; I'm not going to implement a reassembly buffer");
}
let base = Self::recv_fragment(first, Vec::new()).await?;
log::debug!("read base: {:?}", String::from_utf8_lossy(&base));
while let Some(fragment) = segment.fragment().await? {
log::debug!("next fragment: {:?}", fragment);
let value = Self::recv_fragment(fragment, base.clone()).await?;
let str = String::from_utf8(value).context("invalid UTF-8")?;
log::info!("{}", str);
}
Ok(())
}
async fn recv_fragment(mut fragment: fragment::Subscriber, mut buf: Vec<u8>) -> anyhow::Result<Vec<u8>> {
while let Some(data) = fragment.chunk().await? {
buf.extend_from_slice(&data);
}
Ok(buf)
}
}

123
moq-clock/src/main.rs Normal file
View File

@ -0,0 +1,123 @@
use std::{fs, io, sync::Arc, time};
use anyhow::Context;
use clap::Parser;
mod cli;
mod clock;
use moq_transport::cache::broadcast;
// TODO: clap complete
#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
// Disable tracing so we don't get a bunch of Quinn spam.
let tracer = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::WARN)
.finish();
tracing::subscriber::set_global_default(tracer).unwrap();
let config = cli::Config::parse();
// Create a list of acceptable root certificates.
let mut roots = rustls::RootCertStore::empty();
if config.tls_root.is_empty() {
// Add the platform's native root certificates.
for cert in rustls_native_certs::load_native_certs().context("could not load platform certs")? {
roots
.add(&rustls::Certificate(cert.0))
.context("failed to add root cert")?;
}
} else {
// Add the specified root certificates.
for root in &config.tls_root {
let root = fs::File::open(root).context("failed to open root cert file")?;
let mut root = io::BufReader::new(root);
let root = rustls_pemfile::certs(&mut root).context("failed to read root cert")?;
anyhow::ensure!(root.len() == 1, "expected a single root cert");
let root = rustls::Certificate(root[0].to_owned());
roots.add(&root).context("failed to add root cert")?;
}
}
let mut tls_config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(roots)
.with_no_client_auth();
// Allow disabling TLS verification altogether.
if config.tls_disable_verify {
let noop = NoCertificateVerification {};
tls_config.dangerous().set_certificate_verifier(Arc::new(noop));
}
tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important
let arc_tls_config = std::sync::Arc::new(tls_config);
let quinn_client_config = quinn::ClientConfig::new(arc_tls_config);
let mut endpoint = quinn::Endpoint::client(config.bind)?;
endpoint.set_default_client_config(quinn_client_config);
log::info!("connecting to relay: url={}", config.url);
let session = webtransport_quinn::connect(&endpoint, &config.url)
.await
.context("failed to create WebTransport session")?;
let (mut publisher, subscriber) = broadcast::new(""); // TODO config.namespace
if config.publish {
let session = moq_transport::session::Client::publisher(session, subscriber)
.await
.context("failed to create MoQ Transport session")?;
let publisher = publisher
.create_track(&config.track)
.context("failed to create clock track")?;
let clock = clock::Publisher::new(publisher);
tokio::select! {
res = session.run() => res.context("session error")?,
res = clock.run() => res.context("clock error")?,
}
} else {
let session = moq_transport::session::Client::subscriber(session, publisher)
.await
.context("failed to create MoQ Transport session")?;
let subscriber = subscriber
.get_track(&config.track)
.context("failed to get clock track")?;
let clock = clock::Subscriber::new(subscriber);
tokio::select! {
res = session.run() => res.context("session error")?,
res = clock.run() => res.context("clock error")?,
}
}
Ok(())
}
pub struct NoCertificateVerification {}
impl rustls::client::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}

View File

@ -42,19 +42,16 @@ impl Media {
// Create the catalog track with a single segment.
let mut init_track = broadcast.create_track("0.mp4")?;
let mut init_segment = init_track.create_segment(segment::Info {
let init_segment = init_track.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: 0,
expires: None,
})?;
// Create a single fragment, optionally setting the size
let mut init_fragment = init_segment.create_fragment(fragment::Info {
sequence: VarInt::ZERO,
size: None, // size is only needed when we have multiple fragments.
})?;
let mut init_fragment = init_segment.final_fragment(VarInt::ZERO)?;
init_fragment.write_chunk(init.into())?;
init_fragment.chunk(init.into())?;
let mut tracks = HashMap::new();
@ -132,7 +129,7 @@ impl Media {
init_track_name: &str,
moov: &mp4::MoovBox,
) -> Result<(), anyhow::Error> {
let mut segment = track.create_segment(segment::Info {
let segment = track.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: 0,
expires: None,
@ -218,13 +215,10 @@ impl Media {
log::info!("catalog: {}", catalog_str);
// Create a single fragment for the segment.
let mut fragment = segment.create_fragment(fragment::Info {
sequence: VarInt::ZERO,
size: None, // Size is only needed when we have multiple fragments.
})?;
let mut fragment = segment.final_fragment(VarInt::ZERO)?;
// Add the segment and add the fragment.
fragment.write_chunk(catalog_str.into())?;
fragment.chunk(catalog_str.into())?;
Ok(())
}
@ -295,7 +289,7 @@ impl Track {
if let Some(current) = self.current.as_mut() {
if !fragment.keyframe {
// Use the existing segment
current.write_chunk(raw.into())?;
current.chunk(raw.into())?;
return Ok(());
}
}
@ -311,7 +305,7 @@ impl Track {
.context("timestamp too large")?;
// Create a new segment.
let mut segment = self.track.create_segment(segment::Info {
let segment = self.track.create_segment(segment::Info {
sequence: VarInt::try_from(self.sequence).context("sequence too large")?,
// Newer segments are higher priority
@ -322,15 +316,12 @@ impl Track {
})?;
// Create a single fragment for the segment that we will keep appending.
let mut fragment = segment.create_fragment(fragment::Info {
sequence: VarInt::ZERO,
size: None,
})?;
let mut fragment = segment.final_fragment(VarInt::ZERO)?;
self.sequence += 1;
// Insert the raw atom into the segment.
fragment.write_chunk(raw.into())?;
fragment.chunk(raw.into())?;
// Save for the next iteration
self.current = Some(fragment);
@ -340,7 +331,7 @@ impl Track {
pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
let fragment = self.current.as_mut().context("missing current fragment")?;
fragment.write_chunk(raw.into())?;
fragment.chunk(raw.into())?;
Ok(())
}

View File

@ -27,3 +27,26 @@ webtransport-quinn = "0.6"
async-trait = "0.1"
paste = "1"
[dev-dependencies]
# QUIC
url = "2"
# Crypto
rustls = { version = "0.21", features = ["dangerous_configuration"] }
rustls-native-certs = "0.6"
rustls-pemfile = "1"
# Async stuff
tokio = { version = "1", features = ["full"] }
# CLI, logging, error handling
clap = { version = "4", features = ["derive"] }
log = { version = "0.4", features = ["std"] }
env_logger = "0.9"
mp4 = "0.13"
anyhow = { version = "1", features = ["backtrace"] }
serde_json = "1"
rfc6381-codec = "0.1"
tracing = "0.1"
tracing-subscriber = "0.3"

View File

@ -36,7 +36,7 @@ pub struct Info {
// The size of the fragment, optionally None if this is the last fragment in a segment.
// TODO enforce this size.
pub size: Option<VarInt>,
pub size: Option<usize>,
}
struct State {
@ -53,10 +53,6 @@ impl State {
self.closed = Err(err);
Ok(())
}
pub fn bytes(&self) -> usize {
self.chunks.iter().map(|f| f.len()).sum::<usize>()
}
}
impl Default for State {
@ -71,11 +67,7 @@ impl Default for State {
impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// We don't want to print out the contents, so summarize.
f.debug_struct("State")
.field("chunks", &self.chunks.len().to_string())
.field("bytes", &self.bytes().to_string())
.field("closed", &self.closed)
.finish()
f.debug_struct("State").field("closed", &self.closed).finish()
}
}
@ -98,7 +90,7 @@ impl Publisher {
}
/// Write a new chunk of bytes.
pub fn write_chunk(&mut self, chunk: Bytes) -> Result<(), CacheError> {
pub fn chunk(&mut self, chunk: Bytes) -> Result<(), CacheError> {
let mut state = self.state.lock_mut();
state.closed.clone()?;
state.chunks.push(chunk);
@ -158,7 +150,7 @@ impl Subscriber {
}
/// Block until the next chunk of bytes is available.
pub async fn read_chunk(&mut self) -> Result<Option<Bytes>, CacheError> {
pub async fn chunk(&mut self) -> Result<Option<Bytes>, CacheError> {
loop {
let notify = {
let state = self.state.lock();

View File

@ -91,18 +91,28 @@ impl Publisher {
Self { state, info, _dropped }
}
/// Write a fragment
pub fn push_fragment(&mut self, fragment: fragment::Subscriber) -> Result<(), CacheError> {
// Not public because it's a footgun.
pub(crate) fn push_fragment(
&mut self,
sequence: VarInt,
size: Option<usize>,
) -> Result<fragment::Publisher, CacheError> {
let (publisher, subscriber) = fragment::new(fragment::Info { sequence, size });
let mut state = self.state.lock_mut();
state.closed.clone()?;
state.fragments.push(fragment);
Ok(())
state.fragments.push(subscriber);
Ok(publisher)
}
pub fn create_fragment(&mut self, fragment: fragment::Info) -> Result<fragment::Publisher, CacheError> {
let (publisher, subscriber) = fragment::new(fragment);
self.push_fragment(subscriber)?;
Ok(publisher)
/// Write a fragment
pub fn fragment(&mut self, sequence: VarInt, size: usize) -> Result<fragment::Publisher, CacheError> {
self.push_fragment(sequence, Some(size))
}
/// Write the last fragment, which means size can be unknown.
pub fn final_fragment(mut self, sequence: VarInt) -> Result<fragment::Publisher, CacheError> {
self.push_fragment(sequence, None)
}
/// Close the segment with an error.
@ -158,7 +168,7 @@ impl Subscriber {
}
/// Block until the next chunk of bytes is available.
pub async fn next_fragment(&mut self) -> Result<Option<fragment::Subscriber>, CacheError> {
pub async fn fragment(&mut self) -> Result<Option<fragment::Subscriber>, CacheError> {
loop {
let notify = {
let state = self.state.lock();

View File

@ -207,7 +207,7 @@ impl Subscriber {
}
/// Block until the next segment arrives
pub async fn next_segment(&mut self) -> Result<Option<segment::Subscriber>, CacheError> {
pub async fn segment(&mut self) -> Result<Option<segment::Subscriber>, CacheError> {
loop {
let notify = {
let state = self.state.lock();

View File

@ -48,6 +48,10 @@ pub enum SessionError {
#[error("required extension not offered: {0:?}")]
RequiredExtension(VarInt),
/// Some VarInt was too large and we were too lazy to handle it
#[error("varint bounds exceeded")]
BoundsExceeded(#[from] coding::BoundsExceeded),
/// An unclassified error because I'm lazy. TODO classify these errors
#[error("unknown error: {0}")]
Unknown(String),
@ -71,6 +75,7 @@ impl MoqError for SessionError {
Self::InvalidPriority(_) => 400,
Self::InvalidSize(_) => 400,
Self::RequiredExtension(_) => 426,
Self::BoundsExceeded(_) => 500,
}
}
@ -96,6 +101,7 @@ impl MoqError for SessionError {
Self::InvalidPriority(priority) => format!("invalid priority: {}", priority),
Self::InvalidSize(size) => format!("invalid size: {}", size),
Self::RequiredExtension(id) => format!("required extension was missing: {:?}", id),
Self::BoundsExceeded(_) => "varint bounds exceeded".to_string(),
}
}
}

View File

@ -170,7 +170,7 @@ impl Publisher {
async fn run_subscribe(&self, id: VarInt, track: &mut track::Subscriber) -> Result<(), SessionError> {
// TODO add an Ok method to track::Publisher so we can send SUBSCRIBE_OK
while let Some(mut segment) = track.next_segment().await? {
while let Some(mut segment) = track.segment().await? {
// TODO only clone the fields we need
let this = self.clone();
@ -193,7 +193,9 @@ impl Publisher {
let priority = (segment.priority as i64 - i32::MAX as i64) as i32;
stream.set_priority(priority).ok();
while let Some(mut fragment) = segment.next_fragment().await? {
while let Some(mut fragment) = segment.fragment().await? {
log::trace!("serving fragment: {:?}", fragment);
let object = message::Object {
track: id,
@ -204,7 +206,7 @@ impl Publisher {
// Properties of the fragment
sequence: fragment.sequence,
size: fragment.size,
size: fragment.size.map(VarInt::try_from).transpose()?,
};
object
@ -212,7 +214,8 @@ impl Publisher {
.await
.map_err(|e| SessionError::Unknown(e.to_string()))?;
while let Some(chunk) = fragment.read_chunk().await? {
while let Some(chunk) = fragment.chunk().await? {
//log::trace!("writing chunk: {:?}", chunk);
stream.write_all(&chunk).await?;
}
}

View File

@ -6,7 +6,7 @@ use std::{
};
use crate::{
cache::{broadcast, fragment, segment, track, CacheError},
cache::{broadcast, segment, track, CacheError},
coding::DecodeError,
message,
message::Message,
@ -110,7 +110,7 @@ impl Subscriber {
.await
.map_err(|e| SessionError::Unknown(e.to_string()))?;
log::trace!("received object: {:?}", object);
log::trace!("first object: {:?}", object);
// A new scope is needed because the async compiler is dumb
let mut segment = {
@ -124,12 +124,10 @@ impl Subscriber {
})?
};
// Create the first fragment
let mut fragment = segment.create_fragment(fragment::Info {
sequence: object.sequence,
size: object.size,
})?;
log::trace!("received segment: {:?}", segment);
// Create the first fragment
let mut fragment = segment.push_fragment(object.sequence, object.size.map(usize::from))?;
let mut remain = object.size.map(usize::from);
loop {
@ -145,6 +143,8 @@ impl Subscriber {
Err(err) => return Err(err.into()),
};
log::trace!("next object: {:?}", object);
// NOTE: This is a custom restriction; not part of the moq-transport draft.
// We require every OBJECT to contain the same priority since prioritization is done per-stream.
// We also require every OBJECT to contain the same group so we know when the group ends, and can detect gaps.
@ -152,14 +152,13 @@ impl Subscriber {
return Err(SessionError::StreamMapping);
}
// Create a new object.
fragment = segment.create_fragment(fragment::Info {
sequence: object.sequence,
size: object.size,
})?;
object = next;
// Create a new object.
fragment = segment.push_fragment(object.sequence, object.size.map(usize::from))?;
remain = object.size.map(usize::from);
log::trace!("next fragment: {:?}", fragment);
}
match stream.read_chunk(remain.unwrap_or(usize::MAX), true).await? {
@ -171,7 +170,12 @@ impl Subscriber {
// NOTE: This does not make a copy!
// Bytes are immutable and ref counted.
Some(data) => fragment.write_chunk(data.bytes)?,
Some(data) => {
remain = remain.map(|r| r - data.bytes.len());
log::trace!("next chunk: {:?}", data);
fragment.chunk(data.bytes)?;
}
}
}