moq-pub: JSON catalog support, bugfixes (#60)

Fixes some bugs around subscription handling and 
adds support for the new JSON catalog format
This commit is contained in:
Mike English 2023-09-05 15:08:35 -04:00 committed by GitHub
parent 2b1a9a4ce5
commit 90818ac848
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 180 additions and 55 deletions

54
Cargo.lock generated
View File

@ -489,6 +489,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "four-cc"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3958af68a31b1d1384d3f39b6aa33eb14b6009065b5ca305ddd9712a4237124f"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.28" version = "0.3.28"
@ -945,10 +951,12 @@ dependencies = [
"moq-warp", "moq-warp",
"mp4", "mp4",
"quinn", "quinn",
"rfc6381-codec",
"ring", "ring",
"rustls 0.21.2", "rustls 0.21.2",
"rustls-native-certs", "rustls-native-certs",
"rustls-pemfile", "rustls-pemfile",
"serde_json",
"tokio", "tokio",
"webtransport-generic", "webtransport-generic",
"webtransport-quinn", "webtransport-quinn",
@ -1012,6 +1020,21 @@ dependencies = [
"thiserror", "thiserror",
] ]
[[package]]
name = "mp4ra-rust"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be9daf03b43bf3842962947c62ba40f411e46a58774c60838038f04a67d17626"
dependencies = [
"four-cc",
]
[[package]]
name = "mpeg4-audio-const"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96a1fe2275b68991faded2c80aa4a33dba398b77d276038b8f50701a22e55918"
[[package]] [[package]]
name = "multer" name = "multer"
version = "2.1.0" version = "2.1.0"
@ -1195,9 +1218,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.60" version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dec2b086b7a862cf4de201096214fa870344cf922b2b30c167badb3af3195406" checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@ -1315,6 +1338,17 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
[[package]]
name = "rfc6381-codec"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4395f46a67f0d57c57f6a5361f3a9a0c0183a19cab3998892ecdc003de6d8037"
dependencies = [
"four-cc",
"mp4ra-rust",
"mpeg4-audio-const",
]
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.16.20" version = "0.16.20"
@ -1479,18 +1513,18 @@ dependencies = [
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.164" version = "1.0.188"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d" checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.164" version = "1.0.188"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -1499,9 +1533,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.97" version = "1.0.105"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdf3bf93142acad5821c99197022e170842cdbc1c30482b98750c688c640842a" checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360"
dependencies = [ dependencies = [
"itoa", "itoa",
"ryu", "ryu",
@ -1595,9 +1629,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.18" version = "2.0.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32d41677bcbe24c20c52e7c70b0d8db04134c5d1066bf98662e2871ad200ea3e" checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View File

@ -39,6 +39,8 @@ env_logger = "0.9.3"
anyhow = { version = "1.0.70", features = ["backtrace"]} anyhow = { version = "1.0.70", features = ["backtrace"]}
mp4 = "0.13.0" mp4 = "0.13.0"
rustls-native-certs = "0.6.3" rustls-native-certs = "0.6.3"
serde_json = "1.0.105"
rfc6381-codec = "0.1.0"
[build-dependencies] [build-dependencies]
http = "0.2.9" http = "0.2.9"

View File

@ -26,19 +26,16 @@ Longer term, I think it'd be interesting to refactor everything such that the `M
Here's how I'm currently testing things, with a local copy of Big Buck Bunny named `bbb_source.mp4`: Here's how I'm currently testing things, with a local copy of Big Buck Bunny named `bbb_source.mp4`:
``` ```
$ ffmpeg -hide_banner -v quiet -stream_loop 0 -re -i ../media/bbb_source.mp4 -an -f mp4 -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset - | RUST_LOG=moq_pub=info cargo run -- -i - $ ffmpeg -hide_banner -v quiet -stream_loop -1 -re -i bbb_source.mp4 -an -f mp4 -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset - | RUST_LOG=moq_pub=info moq-pub -i -
``` ```
This relies on having `moq-quinn` (the relay server) already running locally in another shell. This relies on having `moq-quinn` (the relay server) already running locally in another shell.
Here's we can (eventually) run `moq-pub` without dropping the audio track (omit the `-an` I'm using above): Note also that we're dropping the audio track (`-an`) above until audio playback is stabilized on the `moq-js` side.
```
$ ffmpeg -hide_banner -v quiet -stream_loop 0 -re -i ../media/bbb_source.mp4 -f mp4 -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset - | RUST_LOG=moq_pub=info cargo run -- -i -
```
### Known issues ### Known issues
- Catalog track is a raw binary MP4 init segment rather than the newer JSON format moq-js now expects - Expects only one H.264/AVC1-encoded video track (catalog generation doesn't support audio tracks yet)
- Doesn't handle EOF - just send it media forever with `-stream_loop` - Doesn't yet gracefully handle EOF - workaround: never stop sending it media (`-stream_loop -1`)
- Probably still full of lots of bugs - Probably still full of lots of bugs
- Various other TODOs you can find in the code - Various other TODOs you can find in the code

View File

@ -6,7 +6,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = std::path::PathBuf::from( let out_dir = std::path::PathBuf::from(
std::env::var_os("OUT_DIR").ok_or(std::io::Error::new(std::io::ErrorKind::NotFound, "OUT_DIR not found"))?, std::env::var_os("OUT_DIR").ok_or(std::io::Error::new(std::io::ErrorKind::NotFound, "OUT_DIR not found"))?,
); );
let cmd = Cli::command(); let cmd = Config::command();
let man = clap_mangen::Man::new(cmd); let man = clap_mangen::Man::new(cmd);
let mut buffer: Vec<u8> = Default::default(); let mut buffer: Vec<u8> = Default::default();
man.render(&mut buffer)?; man.render(&mut buffer)?;

View File

@ -3,7 +3,7 @@ use std::net;
#[derive(Parser, Clone)] #[derive(Parser, Clone)]
#[command(arg_required_else_help(true))] #[command(arg_required_else_help(true))]
pub struct Cli { pub struct Config {
#[arg(long, hide_short_help = true, default_value = "[::]:0")] #[arg(long, hide_short_help = true, default_value = "[::]:0")]
pub bind_address: net::SocketAddr, pub bind_address: net::SocketAddr,
@ -12,6 +12,12 @@ pub struct Cli {
#[arg(short, long, required = true, value_parser=input_parser)] #[arg(short, long, required = true, value_parser=input_parser)]
input: InputValues, input: InputValues,
#[arg(long, hide_short_help = true, default_value = "24")]
pub catalog_fps: u8,
#[arg(long, hide_short_help = true, default_value = "1500000")]
pub catalog_bit_rate: u32,
} }
fn input_parser(s: &str) -> Result<InputValues, String> { fn input_parser(s: &str) -> Result<InputValues, String> {

View File

@ -23,15 +23,10 @@ use cli::*;
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
env_logger::init(); env_logger::init();
let args = Cli::parse(); let config = Config::parse();
let config = Config { let mut media = Media::new(&config).await?;
addr: args.bind_address, let session_runner = SessionRunner::new(&config).await?;
uri: args.uri,
};
let mut media = Media::new().await?;
let session_runner = SessionRunner::new(config).await?;
let mut log_viewer = LogViewer::new(session_runner.get_incoming_receivers().await).await?; let mut log_viewer = LogViewer::new(session_runner.get_incoming_receivers().await).await?;
let mut media_runner = MediaRunner::new( let mut media_runner = MediaRunner::new(
session_runner.get_send_objects().await, session_runner.get_send_objects().await,

View File

@ -1,8 +1,10 @@
use crate::cli::Config;
use anyhow::{self, Context}; use anyhow::{self, Context};
use log::debug; use log::{debug, info};
use moq_transport::VarInt; use moq_transport::VarInt;
use moq_warp::model::{segment, track}; use moq_warp::model::{segment, track};
use mp4::{self, ReadBox}; use mp4::{self, ReadBox};
use serde_json::json;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Cursor; use std::io::Cursor;
use std::sync::Arc; use std::sync::Arc;
@ -17,7 +19,7 @@ pub struct Media {
} }
impl Media { impl Media {
pub async fn new() -> anyhow::Result<Self> { pub async fn new(config: &Config) -> anyhow::Result<Self> {
let mut stdin = tokio::io::stdin(); let mut stdin = tokio::io::stdin();
let ftyp = read_atom(&mut stdin).await?; let ftyp = read_atom(&mut stdin).await?;
anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom"); anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom");
@ -39,15 +41,17 @@ impl Media {
// Create a source that can be subscribed to. // Create a source that can be subscribed to.
let mut source = HashMap::default(); let mut source = HashMap::default();
// Create the catalog track
let (_catalog, subscriber) = Self::create_catalog(init);
source.insert("0".to_string(), subscriber);
let mut tracks = HashMap::new(); let mut tracks = HashMap::new();
// Create the init track
let init_track_name = "1.mp4";
let (_init, subscriber) = Self::create_init(init);
source.insert(init_track_name.to_string(), subscriber);
for trak in &moov.traks { for trak in &moov.traks {
let id = trak.tkhd.track_id; let id = trak.tkhd.track_id;
let name = id.to_string(); let name = id.to_string();
//let name = "2".to_string();
//dbg!("trak name: {}", &name); //dbg!("trak name: {}", &name);
let timescale = track_timescale(&moov, id); let timescale = track_timescale(&moov, id);
@ -59,6 +63,17 @@ impl Media {
tracks.insert(name, track); tracks.insert(name, track);
} }
// Create the catalog track
let namespace = "quic.video/moq-pub-foo";
let (_catalog, subscriber) = Self::create_catalog(
config,
namespace.to_string(),
init_track_name.to_string(),
&moov,
&tracks,
)?;
source.insert(".catalog".to_string(), subscriber);
let source = Arc::new(MapSource(source)); let source = Arc::new(MapSource(source));
Ok(Media { tracks, source }) Ok(Media { tracks, source })
@ -107,12 +122,13 @@ impl Media {
} }
} }
} }
fn create_catalog(raw: Vec<u8>) -> (track::Publisher, track::Subscriber) {
// Create a track with a single segment containing the init data.
let mut catalog = track::Publisher::new("0");
// Subscribe to the catalog before we push the segment. fn create_init(raw: Vec<u8>) -> (track::Publisher, track::Subscriber) {
let subscriber = catalog.subscribe(); // Create a track with a single segment containing the init data.
let mut init_track = track::Publisher::new("1.mp4");
// Subscribe to the init track before we push the segment.
let subscriber = init_track.subscribe();
let mut segment = segment::Publisher::new(segment::Info { let mut segment = segment::Publisher::new(segment::Info {
sequence: VarInt::from_u32(0), // first and only segment sequence: VarInt::from_u32(0), // first and only segment
@ -121,11 +137,83 @@ impl Media {
}); });
// Add the segment and add the fragment. // Add the segment and add the fragment.
catalog.push_segment(segment.subscribe()); init_track.push_segment(segment.subscribe());
segment.fragments.push(raw.into()); segment.fragments.push(raw.into());
// Return the catalog // Return the catalog
(catalog, subscriber) (init_track, subscriber)
}
fn create_catalog(
config: &Config,
namespace: String,
init_track_name: String,
moov: &mp4::MoovBox,
_tracks: &HashMap<String, Track>,
) -> Result<(track::Publisher, track::Subscriber), anyhow::Error> {
// Create a track with a single segment containing the init data.
let mut catalog_track = track::Publisher::new(".catalog");
// Subscribe to the catalog before we push the segment.
let catalog_subscriber = catalog_track.subscribe();
let mut segment = segment::Publisher::new(segment::Info {
sequence: VarInt::from_u32(0), // first and only segment
send_order: i32::MIN, // highest priority
expires: None, // never delete from the cache
});
// avc1[.PPCCLL]
//
// let profile = 0x64;
// let constraints = 0x00;
// let level = 0x1f;
// TODO: do build multi-track catalog by looping through moov.traks
let trak = moov.traks[0].clone();
let avc1 = trak
.mdia
.minf
.stbl
.stsd
.avc1
.ok_or(anyhow::anyhow!("avc1 atom not found"))?;
let profile = avc1.avcc.avc_profile_indication;
let constraints = avc1.avcc.profile_compatibility; // Not 100% certain here, but it's 0x00 on my current test video
let level = avc1.avcc.avc_level_indication;
let width = avc1.width;
let height = avc1.height;
let codec = rfc6381_codec::Codec::avc1(profile, constraints, level);
let codec_str = codec.to_string();
let catalog = json!({
"tracks": [
{
"container": "mp4",
"namespace": namespace,
"kind": "video",
"init_track": init_track_name,
"data_track": "1", // assume just one track for now
"codec": codec_str,
"width": width,
"height": height,
"frame_rate": config.catalog_fps,
"bit_rate": config.catalog_bit_rate,
}
]
});
let catalog_str = serde_json::to_string_pretty(&catalog)?;
info!("catalog: {}", catalog_str);
// Add the segment and add the fragment.
catalog_track.push_segment(segment.subscribe());
segment.fragments.push(catalog_str.into());
// Return the catalog
Ok((catalog_track, catalog_subscriber))
} }
pub fn source(&self) -> Arc<MapSource> { pub fn source(&self) -> Arc<MapSource> {
self.source.clone() self.source.clone()

View File

@ -2,7 +2,7 @@ use crate::media::{self, MapSource};
use anyhow::bail; use anyhow::bail;
use log::{debug, error}; use log::{debug, error};
use moq_transport::message::Message; use moq_transport::message::Message;
use moq_transport::message::{Announce, SubscribeError}; use moq_transport::message::{Announce, SubscribeError, SubscribeOk};
use moq_transport::{object, Object, VarInt}; use moq_transport::{object, Object, VarInt};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
@ -74,7 +74,7 @@ impl<S: WTSession> MediaRunner<S> {
debug!("media_runner.run()"); debug!("media_runner.run()");
let source = self.source.clone(); let source = self.source.clone();
let mut join_set: JoinSet<anyhow::Result<()>> = tokio::task::JoinSet::new(); let mut join_set: JoinSet<anyhow::Result<()>> = tokio::task::JoinSet::new();
let mut track_dispatcher: HashMap<String, tokio::sync::mpsc::Sender<()>> = HashMap::new(); let mut track_dispatcher: HashMap<String, tokio::sync::mpsc::Sender<VarInt>> = HashMap::new();
let mut incoming_ctl_receiver = self.incoming_ctl_receiver.resubscribe(); let mut incoming_ctl_receiver = self.incoming_ctl_receiver.resubscribe();
let outgoing_ctl_sender = self.outgoing_ctl_sender.clone(); let outgoing_ctl_sender = self.outgoing_ctl_sender.clone();
@ -86,13 +86,14 @@ impl<S: WTSession> MediaRunner<S> {
let mut objects = self.send_objects.clone(); let mut objects = self.send_objects.clone();
let mut track = track.clone(); let mut track = track.clone();
join_set.spawn(async move { join_set.spawn(async move {
receiver.recv().await.ok_or(anyhow::anyhow!("channel closed"))?; let track_id = receiver.recv().await.ok_or(anyhow::anyhow!("channel closed"))?;
// TODO: validate track_id is valid (not already in use), for now just trust subscribers are correct
loop { loop {
let mut segment = track.next_segment().await?; let mut segment = track.next_segment().await?;
debug!("segment: {:?}", &segment); debug!("segment: {:?}", &segment);
let object = Object { let object = Object {
track: VarInt::from_u32(track.name.parse::<u32>()?), track: track_id,
group: segment.sequence, group: segment.sequence,
sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object
send_order: segment.send_order, send_order: segment.send_order,
@ -115,9 +116,10 @@ impl<S: WTSession> MediaRunner<S> {
debug!("Received a subscription request"); debug!("Received a subscription request");
let track_id = subscribe.track_id; let track_id = subscribe.track_id;
debug!("Looking up track_id: {}", &track_id); let track_name = subscribe.track_name;
debug!("Looking up track_name: {} (track_id: {})", &track_name, &track_id);
// Look up track in source // Look up track in source
match source.0.get(&track_id.to_string()) { match source.0.get(&track_name.to_string()) {
None => { None => {
// if track !exist, send subscribe error // if track !exist, send subscribe error
outgoing_ctl_sender outgoing_ctl_sender
@ -134,7 +136,13 @@ impl<S: WTSession> MediaRunner<S> {
track_dispatcher track_dispatcher
.get(&track.name) .get(&track.name)
.ok_or(anyhow::anyhow!("missing task for track"))? .ok_or(anyhow::anyhow!("missing task for track"))?
.send(()) .send(track_id)
.await?;
outgoing_ctl_sender
.send(Message::SubscribeOk(SubscribeOk {
track_id: subscribe.track_id,
expires: Some(VarInt::from_u32(0)), // valid until unsubscribed
}))
.await?; .await?;
} }
}; };

View File

@ -1,7 +1,7 @@
use crate::cli::Config;
use anyhow::Context; use anyhow::Context;
use log::debug; use log::debug;
use moq_transport::{object, Object}; use moq_transport::{object, Object};
use std::net;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::task::JoinSet; use tokio::task::JoinSet;
@ -14,13 +14,8 @@ pub struct SessionRunner {
incoming_obj_sender: broadcast::Sender<Object>, incoming_obj_sender: broadcast::Sender<Object>,
} }
pub struct Config {
pub addr: net::SocketAddr,
pub uri: http::uri::Uri,
}
impl SessionRunner { impl SessionRunner {
pub async fn new(config: Config) -> anyhow::Result<Self> { pub async fn new(config: &Config) -> anyhow::Result<Self> {
let mut roots = rustls::RootCertStore::empty(); let mut roots = rustls::RootCertStore::empty();
for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs") { for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs") {
roots.add(&rustls::Certificate(cert.0)).unwrap(); roots.add(&rustls::Certificate(cert.0)).unwrap();
@ -36,7 +31,7 @@ impl SessionRunner {
let arc_tls_config = std::sync::Arc::new(tls_config); let arc_tls_config = std::sync::Arc::new(tls_config);
let quinn_client_config = quinn::ClientConfig::new(arc_tls_config); let quinn_client_config = quinn::ClientConfig::new(arc_tls_config);
let mut endpoint = quinn::Endpoint::client(config.addr)?; let mut endpoint = quinn::Endpoint::client(config.bind_address)?;
endpoint.set_default_client_config(quinn_client_config); endpoint.set_default_client_config(quinn_client_config);
let webtransport_session = webtransport_quinn::connect(&endpoint, &config.uri) let webtransport_session = webtransport_quinn::connect(&endpoint, &config.uri)