From 90818ac848959c7c90f1096a79300944f09dc6cd Mon Sep 17 00:00:00 2001 From: Mike English Date: Tue, 5 Sep 2023 15:08:35 -0400 Subject: [PATCH] moq-pub: JSON catalog support, bugfixes (#60) Fixes some bugs around subscription handling and adds support for the new JSON catalog format --- Cargo.lock | 54 +++++++++++++--- moq-pub/Cargo.toml | 2 + moq-pub/README.md | 11 ++-- moq-pub/build.rs | 2 +- moq-pub/src/cli.rs | 8 ++- moq-pub/src/main.rs | 11 +--- moq-pub/src/media.rs | 114 ++++++++++++++++++++++++++++++---- moq-pub/src/media_runner.rs | 22 ++++--- moq-pub/src/session_runner.rs | 11 +--- 9 files changed, 180 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f4ed144..6ed9c03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -489,6 +489,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "four-cc" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3958af68a31b1d1384d3f39b6aa33eb14b6009065b5ca305ddd9712a4237124f" + [[package]] name = "futures" version = "0.3.28" @@ -945,10 +951,12 @@ dependencies = [ "moq-warp", "mp4", "quinn", + "rfc6381-codec", "ring", "rustls 0.21.2", "rustls-native-certs", "rustls-pemfile", + "serde_json", "tokio", "webtransport-generic", "webtransport-quinn", @@ -1012,6 +1020,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "mp4ra-rust" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be9daf03b43bf3842962947c62ba40f411e46a58774c60838038f04a67d17626" +dependencies = [ + "four-cc", +] + +[[package]] +name = "mpeg4-audio-const" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96a1fe2275b68991faded2c80aa4a33dba398b77d276038b8f50701a22e55918" + [[package]] name = "multer" version = "2.1.0" @@ -1195,9 +1218,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.60" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dec2b086b7a862cf4de201096214fa870344cf922b2b30c167badb3af3195406" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" dependencies = [ "unicode-ident", ] @@ -1315,6 +1338,17 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" +[[package]] +name = "rfc6381-codec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4395f46a67f0d57c57f6a5361f3a9a0c0183a19cab3998892ecdc003de6d8037" +dependencies = [ + "four-cc", + "mp4ra-rust", + "mpeg4-audio-const", +] + [[package]] name = "ring" version = "0.16.20" @@ -1479,18 +1513,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.164" +version = "1.0.188" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d" +checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.164" +version = "1.0.188" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" +checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", @@ -1499,9 +1533,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.97" +version = "1.0.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdf3bf93142acad5821c99197022e170842cdbc1c30482b98750c688c640842a" +checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" dependencies = [ "itoa", "ryu", @@ -1595,9 +1629,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "2.0.18" +version = "2.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e" +checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" dependencies = [ "proc-macro2", "quote", diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml index 80d6e3b..7d75245 100644 --- a/moq-pub/Cargo.toml +++ b/moq-pub/Cargo.toml @@ -39,6 +39,8 @@ env_logger = "0.9.3" anyhow = { version = "1.0.70", features = ["backtrace"]} mp4 = "0.13.0" rustls-native-certs = "0.6.3" +serde_json = "1.0.105" +rfc6381-codec = "0.1.0" [build-dependencies] http = "0.2.9" diff --git a/moq-pub/README.md b/moq-pub/README.md index da80283..7b32847 100644 --- a/moq-pub/README.md +++ b/moq-pub/README.md @@ -26,19 +26,16 @@ Longer term, I think it'd be interesting to refactor everything such that the `M Here's how I'm currently testing things, with a local copy of Big Buck Bunny named `bbb_source.mp4`: ``` -$ ffmpeg -hide_banner -v quiet -stream_loop 0 -re -i ../media/bbb_source.mp4 -an -f mp4 -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset - | RUST_LOG=moq_pub=info cargo run -- -i - +$ ffmpeg -hide_banner -v quiet -stream_loop -1 -re -i bbb_source.mp4 -an -f mp4 -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset - | RUST_LOG=moq_pub=info moq-pub -i - ``` This relies on having `moq-quinn` (the relay server) already running locally in another shell. -Here's we can (eventually) run `moq-pub` without dropping the audio track (omit the `-an` I'm using above): -``` -$ ffmpeg -hide_banner -v quiet -stream_loop 0 -re -i ../media/bbb_source.mp4 -f mp4 -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset - | RUST_LOG=moq_pub=info cargo run -- -i - -``` +Note also that we're dropping the audio track (`-an`) above until audio playback is stabilized on the `moq-js` side. ### Known issues -- Catalog track is a raw binary MP4 init segment rather than the newer JSON format moq-js now expects -- Doesn't handle EOF - just send it media forever with `-stream_loop` +- Expects only one H.264/AVC1-encoded video track (catalog generation doesn't support audio tracks yet) +- Doesn't yet gracefully handle EOF - workaround: never stop sending it media (`-stream_loop -1`) - Probably still full of lots of bugs - Various other TODOs you can find in the code diff --git a/moq-pub/build.rs b/moq-pub/build.rs index 4a5d062..27f92f2 100644 --- a/moq-pub/build.rs +++ b/moq-pub/build.rs @@ -6,7 +6,7 @@ fn main() -> Result<(), Box> { let out_dir = std::path::PathBuf::from( std::env::var_os("OUT_DIR").ok_or(std::io::Error::new(std::io::ErrorKind::NotFound, "OUT_DIR not found"))?, ); - let cmd = Cli::command(); + let cmd = Config::command(); let man = clap_mangen::Man::new(cmd); let mut buffer: Vec = Default::default(); man.render(&mut buffer)?; diff --git a/moq-pub/src/cli.rs b/moq-pub/src/cli.rs index 6ebab84..a76ef55 100644 --- a/moq-pub/src/cli.rs +++ b/moq-pub/src/cli.rs @@ -3,7 +3,7 @@ use std::net; #[derive(Parser, Clone)] #[command(arg_required_else_help(true))] -pub struct Cli { +pub struct Config { #[arg(long, hide_short_help = true, default_value = "[::]:0")] pub bind_address: net::SocketAddr, @@ -12,6 +12,12 @@ pub struct Cli { #[arg(short, long, required = true, value_parser=input_parser)] input: InputValues, + + #[arg(long, hide_short_help = true, default_value = "24")] + pub catalog_fps: u8, + + #[arg(long, hide_short_help = true, default_value = "1500000")] + pub catalog_bit_rate: u32, } fn input_parser(s: &str) -> Result { diff --git a/moq-pub/src/main.rs b/moq-pub/src/main.rs index 0f7b0e9..8e285c8 100644 --- a/moq-pub/src/main.rs +++ b/moq-pub/src/main.rs @@ -23,15 +23,10 @@ use cli::*; async fn main() -> anyhow::Result<()> { env_logger::init(); - let args = Cli::parse(); + let config = Config::parse(); - let config = Config { - addr: args.bind_address, - uri: args.uri, - }; - - let mut media = Media::new().await?; - let session_runner = SessionRunner::new(config).await?; + let mut media = Media::new(&config).await?; + let session_runner = SessionRunner::new(&config).await?; let mut log_viewer = LogViewer::new(session_runner.get_incoming_receivers().await).await?; let mut media_runner = MediaRunner::new( session_runner.get_send_objects().await, diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs index 4d10f3f..b8f9975 100644 --- a/moq-pub/src/media.rs +++ b/moq-pub/src/media.rs @@ -1,8 +1,10 @@ +use crate::cli::Config; use anyhow::{self, Context}; -use log::debug; +use log::{debug, info}; use moq_transport::VarInt; use moq_warp::model::{segment, track}; use mp4::{self, ReadBox}; +use serde_json::json; use std::collections::HashMap; use std::io::Cursor; use std::sync::Arc; @@ -17,7 +19,7 @@ pub struct Media { } impl Media { - pub async fn new() -> anyhow::Result { + pub async fn new(config: &Config) -> anyhow::Result { let mut stdin = tokio::io::stdin(); let ftyp = read_atom(&mut stdin).await?; anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom"); @@ -39,15 +41,17 @@ impl Media { // Create a source that can be subscribed to. let mut source = HashMap::default(); - // Create the catalog track - let (_catalog, subscriber) = Self::create_catalog(init); - source.insert("0".to_string(), subscriber); - let mut tracks = HashMap::new(); + // Create the init track + let init_track_name = "1.mp4"; + let (_init, subscriber) = Self::create_init(init); + source.insert(init_track_name.to_string(), subscriber); + for trak in &moov.traks { let id = trak.tkhd.track_id; let name = id.to_string(); + //let name = "2".to_string(); //dbg!("trak name: {}", &name); let timescale = track_timescale(&moov, id); @@ -59,6 +63,17 @@ impl Media { tracks.insert(name, track); } + // Create the catalog track + let namespace = "quic.video/moq-pub-foo"; + let (_catalog, subscriber) = Self::create_catalog( + config, + namespace.to_string(), + init_track_name.to_string(), + &moov, + &tracks, + )?; + source.insert(".catalog".to_string(), subscriber); + let source = Arc::new(MapSource(source)); Ok(Media { tracks, source }) @@ -107,12 +122,13 @@ impl Media { } } } - fn create_catalog(raw: Vec) -> (track::Publisher, track::Subscriber) { - // Create a track with a single segment containing the init data. - let mut catalog = track::Publisher::new("0"); - // Subscribe to the catalog before we push the segment. - let subscriber = catalog.subscribe(); + fn create_init(raw: Vec) -> (track::Publisher, track::Subscriber) { + // Create a track with a single segment containing the init data. + let mut init_track = track::Publisher::new("1.mp4"); + + // Subscribe to the init track before we push the segment. + let subscriber = init_track.subscribe(); let mut segment = segment::Publisher::new(segment::Info { sequence: VarInt::from_u32(0), // first and only segment @@ -121,11 +137,83 @@ impl Media { }); // Add the segment and add the fragment. - catalog.push_segment(segment.subscribe()); + init_track.push_segment(segment.subscribe()); segment.fragments.push(raw.into()); // Return the catalog - (catalog, subscriber) + (init_track, subscriber) + } + + fn create_catalog( + config: &Config, + namespace: String, + init_track_name: String, + moov: &mp4::MoovBox, + _tracks: &HashMap, + ) -> Result<(track::Publisher, track::Subscriber), anyhow::Error> { + // Create a track with a single segment containing the init data. + let mut catalog_track = track::Publisher::new(".catalog"); + + // Subscribe to the catalog before we push the segment. + let catalog_subscriber = catalog_track.subscribe(); + + let mut segment = segment::Publisher::new(segment::Info { + sequence: VarInt::from_u32(0), // first and only segment + send_order: i32::MIN, // highest priority + expires: None, // never delete from the cache + }); + + // avc1[.PPCCLL] + // + // let profile = 0x64; + // let constraints = 0x00; + // let level = 0x1f; + + // TODO: do build multi-track catalog by looping through moov.traks + let trak = moov.traks[0].clone(); + let avc1 = trak + .mdia + .minf + .stbl + .stsd + .avc1 + .ok_or(anyhow::anyhow!("avc1 atom not found"))?; + + let profile = avc1.avcc.avc_profile_indication; + let constraints = avc1.avcc.profile_compatibility; // Not 100% certain here, but it's 0x00 on my current test video + let level = avc1.avcc.avc_level_indication; + + let width = avc1.width; + let height = avc1.height; + + let codec = rfc6381_codec::Codec::avc1(profile, constraints, level); + let codec_str = codec.to_string(); + + let catalog = json!({ + "tracks": [ + { + "container": "mp4", + "namespace": namespace, + "kind": "video", + "init_track": init_track_name, + "data_track": "1", // assume just one track for now + "codec": codec_str, + "width": width, + "height": height, + "frame_rate": config.catalog_fps, + "bit_rate": config.catalog_bit_rate, + } + ] + }); + let catalog_str = serde_json::to_string_pretty(&catalog)?; + info!("catalog: {}", catalog_str); + + // Add the segment and add the fragment. + catalog_track.push_segment(segment.subscribe()); + segment.fragments.push(catalog_str.into()); + + // Return the catalog + Ok((catalog_track, catalog_subscriber)) } pub fn source(&self) -> Arc { self.source.clone() diff --git a/moq-pub/src/media_runner.rs b/moq-pub/src/media_runner.rs index b42ee26..fa1a02c 100644 --- a/moq-pub/src/media_runner.rs +++ b/moq-pub/src/media_runner.rs @@ -2,7 +2,7 @@ use crate::media::{self, MapSource}; use anyhow::bail; use log::{debug, error}; use moq_transport::message::Message; -use moq_transport::message::{Announce, SubscribeError}; +use moq_transport::message::{Announce, SubscribeError, SubscribeOk}; use moq_transport::{object, Object, VarInt}; use std::collections::HashMap; use std::sync::Arc; @@ -74,7 +74,7 @@ impl MediaRunner { debug!("media_runner.run()"); let source = self.source.clone(); let mut join_set: JoinSet> = tokio::task::JoinSet::new(); - let mut track_dispatcher: HashMap> = HashMap::new(); + let mut track_dispatcher: HashMap> = HashMap::new(); let mut incoming_ctl_receiver = self.incoming_ctl_receiver.resubscribe(); let outgoing_ctl_sender = self.outgoing_ctl_sender.clone(); @@ -86,13 +86,14 @@ impl MediaRunner { let mut objects = self.send_objects.clone(); let mut track = track.clone(); join_set.spawn(async move { - receiver.recv().await.ok_or(anyhow::anyhow!("channel closed"))?; + let track_id = receiver.recv().await.ok_or(anyhow::anyhow!("channel closed"))?; + // TODO: validate track_id is valid (not already in use), for now just trust subscribers are correct loop { let mut segment = track.next_segment().await?; debug!("segment: {:?}", &segment); let object = Object { - track: VarInt::from_u32(track.name.parse::()?), + track: track_id, group: segment.sequence, sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object send_order: segment.send_order, @@ -115,9 +116,10 @@ impl MediaRunner { debug!("Received a subscription request"); let track_id = subscribe.track_id; - debug!("Looking up track_id: {}", &track_id); + let track_name = subscribe.track_name; + debug!("Looking up track_name: {} (track_id: {})", &track_name, &track_id); // Look up track in source - match source.0.get(&track_id.to_string()) { + match source.0.get(&track_name.to_string()) { None => { // if track !exist, send subscribe error outgoing_ctl_sender @@ -134,7 +136,13 @@ impl MediaRunner { track_dispatcher .get(&track.name) .ok_or(anyhow::anyhow!("missing task for track"))? - .send(()) + .send(track_id) + .await?; + outgoing_ctl_sender + .send(Message::SubscribeOk(SubscribeOk { + track_id: subscribe.track_id, + expires: Some(VarInt::from_u32(0)), // valid until unsubscribed + })) .await?; } }; diff --git a/moq-pub/src/session_runner.rs b/moq-pub/src/session_runner.rs index 23d2953..4ee38cf 100644 --- a/moq-pub/src/session_runner.rs +++ b/moq-pub/src/session_runner.rs @@ -1,7 +1,7 @@ +use crate::cli::Config; use anyhow::Context; use log::debug; use moq_transport::{object, Object}; -use std::net; use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::task::JoinSet; @@ -14,13 +14,8 @@ pub struct SessionRunner { incoming_obj_sender: broadcast::Sender, } -pub struct Config { - pub addr: net::SocketAddr, - pub uri: http::uri::Uri, -} - impl SessionRunner { - pub async fn new(config: Config) -> anyhow::Result { + pub async fn new(config: &Config) -> anyhow::Result { let mut roots = rustls::RootCertStore::empty(); for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs") { roots.add(&rustls::Certificate(cert.0)).unwrap(); @@ -36,7 +31,7 @@ impl SessionRunner { let arc_tls_config = std::sync::Arc::new(tls_config); let quinn_client_config = quinn::ClientConfig::new(arc_tls_config); - let mut endpoint = quinn::Endpoint::client(config.addr)?; + let mut endpoint = quinn::Endpoint::client(config.bind_address)?; endpoint.set_default_client_config(quinn_client_config); let webtransport_session = webtransport_quinn::connect(&endpoint, &config.uri)