From 2b1a9a4ce57aeac83bff94df5701b07c08b23389 Mon Sep 17 00:00:00 2001 From: Mike English Date: Wed, 30 Aug 2023 09:32:42 -0400 Subject: [PATCH] Add moq-pub (#54) Initial version of a CLI publisher / contribution tool --- Cargo.lock | 113 +++++++++++ Cargo.toml | 7 +- moq-pub/Cargo.toml | 46 +++++ moq-pub/README.md | 44 +++++ moq-pub/build.rs | 15 ++ moq-pub/src/cli.rs | 27 +++ moq-pub/src/log_viewer.rs | 39 ++++ moq-pub/src/main.rs | 60 ++++++ moq-pub/src/media.rs | 362 ++++++++++++++++++++++++++++++++++ moq-pub/src/media_runner.rs | 151 ++++++++++++++ moq-pub/src/session_runner.rs | 127 ++++++++++++ moq-quinn/Cargo.toml | 2 + 12 files changed, 992 insertions(+), 1 deletion(-) create mode 100644 moq-pub/Cargo.toml create mode 100644 moq-pub/README.md create mode 100644 moq-pub/build.rs create mode 100644 moq-pub/src/cli.rs create mode 100644 moq-pub/src/log_viewer.rs create mode 100644 moq-pub/src/main.rs create mode 100644 moq-pub/src/media.rs create mode 100644 moq-pub/src/media_runner.rs create mode 100644 moq-pub/src/session_runner.rs diff --git a/Cargo.lock b/Cargo.lock index 94d344f..f4ed144 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,6 +80,9 @@ name = "anyhow" version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +dependencies = [ + "backtrace", +] [[package]] name = "async-channel" @@ -334,6 +337,16 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" +[[package]] +name = "clap_mangen" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f2e32b579dae093c2424a8b7e2bea09c89da01e1ce5065eb2f0a6f1cc15cc1f" +dependencies = [ + "clap", + "roff", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -918,6 +931,29 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "moq-pub" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "clap_mangen", + "env_logger", + "http", + "log", + "moq-transport", + "moq-warp", + "mp4", + "quinn", + "ring", + "rustls 0.21.2", + "rustls-native-certs", + "rustls-pemfile", + "tokio", + "webtransport-generic", + "webtransport-quinn", +] + [[package]] name = "moq-quinn" version = "0.1.0" @@ -962,6 +998,20 @@ dependencies = [ "webtransport-generic", ] +[[package]] +name = "mp4" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "509348cba250e7b852a875100a2ddce7a36ee3abf881a681c756670c1774264d" +dependencies = [ + "byteorder", + "bytes", + "num-rational", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "multer" version = "2.1.0" @@ -980,6 +1030,49 @@ dependencies = [ "version_check", ] +[[package]] +name = "num-bigint" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", + "serde", +] + +[[package]] +name = "num-traits" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.15.0" @@ -1237,6 +1330,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "roff" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b833d8d034ea094b1ea68aa6d5c740e0d04bad9d16568d08ba6f76823a114316" + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -1383,6 +1482,20 @@ name = "serde" version = "1.0.164" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.164" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "serde_json" diff --git a/Cargo.toml b/Cargo.toml index 5407a66..1dda68f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,7 @@ [workspace] -members = ["moq-transport", "moq-quinn", "moq-warp"] +members = [ + "moq-transport", + "moq-quinn", + "moq-pub", + "moq-warp", +] diff --git a/moq-pub/Cargo.toml b/moq-pub/Cargo.toml new file mode 100644 index 0000000..80d6e3b --- /dev/null +++ b/moq-pub/Cargo.toml @@ -0,0 +1,46 @@ +[package] +name = "moq-pub" +description = "Media over QUIC" +authors = ["Mike English"] +repository = "https://github.com/kixelated/moq-rs" +license = "MIT OR Apache-2.0" + +version = "0.1.0" +edition = "2021" + +keywords = ["quic", "http3", "webtransport", "media", "live"] +categories = ["multimedia", "network-programming", "web-programming"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +moq-transport = { path = "../moq-transport" } +#moq-transport-quinn = { path = "../moq-transport-quinn" } +moq-warp = { path = "../moq-warp" } + +# QUIC +quinn = "0.10" +webtransport-quinn = "0.5" +webtransport-generic = "0.5" +http = "0.2.9" + +# Crypto +ring = "0.16.20" +rustls = "0.21.2" +rustls-pemfile = "1.0.2" + +# Async stuff +tokio = { version = "1.27", features = ["full"] } + +# CLI, logging, error handling +clap = { version = "4.0", features = ["derive"] } +log = { version = "0.4", features = ["std"] } +env_logger = "0.9.3" +anyhow = { version = "1.0.70", features = ["backtrace"]} +mp4 = "0.13.0" +rustls-native-certs = "0.6.3" + +[build-dependencies] +http = "0.2.9" +clap = { version = "4.0", features = ["derive"] } +clap_mangen = "0.2.12" diff --git a/moq-pub/README.md b/moq-pub/README.md new file mode 100644 index 0000000..da80283 --- /dev/null +++ b/moq-pub/README.md @@ -0,0 +1,44 @@ +# moq-pub + +A command line tool for publishing media via Media over QUIC (MoQ). + +Expects to receive fragmented MP4 via standard input and connect to a MOQT relay. + +``` +ffmpeg ... - | moq-pub -i - -u https://localhost:4443 +``` + +### A note on the `moq-pub` code organization + +- `Media` is responsible for reading from stdin and parsing MP4 boxes. It populates a `MapSource` of `Track`s for which it holds the producer side, pushing segments of video/audio into them and notifying consumers via tokio watch async primitives. + +- `SessionRunner` is where we create and hold the MOQT Session from the `moq_transport` library. We currently hard-code our implementation to use `quinn` as the underlying WebTranport implementation. We use a series of `mpsc` and `broadcast` channels to make it possible for other parts of our code to send/recieve control messages via that Session. Sending Objects is handled a little differently because we are able to clone the MOQT Session's sender wherever we need to do that. + +- `MediaRunner` is responsible for consuming the `Track`s that `Media` produces and populates. `MediaRunner` spawns tasks for each `Track` to `.await` new segments and then put the media data into Objects and onto the wire (via channels into `SessionRunner`). Note that these tasks are created, but block waiting un the reception of a MOQT SUBSCRIBE message before they actually send any segments on the wire. `MediaRunner` is also responsible for sending the initial MOQT ANNOUNCE message announcing the namespace for the tracks we will send. + +- `LogViewer` as the name implies is responsible for logging. It snoops on some channels going in/out of `SessionRunner` and logs MOQT control messages. + +Longer term, I think it'd be interesting to refactor everything such that the `Media` + `MediaRunner` bits consume an interface that's _closer_ to what we'd like to eventually expose as a C FFI for consumption by external tools. That probably means greatly reducing the use of async Rust in the parts of this code that make up both sides of that interface boundary. + + +### Invoking `moq-pub`: + +Here's how I'm currently testing things, with a local copy of Big Buck Bunny named `bbb_source.mp4`: + +``` +$ ffmpeg -hide_banner -v quiet -stream_loop 0 -re -i ../media/bbb_source.mp4 -an -f mp4 -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset - | RUST_LOG=moq_pub=info cargo run -- -i - +``` + +This relies on having `moq-quinn` (the relay server) already running locally in another shell. + +Here's we can (eventually) run `moq-pub` without dropping the audio track (omit the `-an` I'm using above): +``` +$ ffmpeg -hide_banner -v quiet -stream_loop 0 -re -i ../media/bbb_source.mp4 -f mp4 -movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset - | RUST_LOG=moq_pub=info cargo run -- -i - +``` + +### Known issues + +- Catalog track is a raw binary MP4 init segment rather than the newer JSON format moq-js now expects +- Doesn't handle EOF - just send it media forever with `-stream_loop` +- Probably still full of lots of bugs +- Various other TODOs you can find in the code diff --git a/moq-pub/build.rs b/moq-pub/build.rs new file mode 100644 index 0000000..4a5d062 --- /dev/null +++ b/moq-pub/build.rs @@ -0,0 +1,15 @@ +include!("src/cli.rs"); + +use clap::CommandFactory; + +fn main() -> Result<(), Box> { + let out_dir = std::path::PathBuf::from( + std::env::var_os("OUT_DIR").ok_or(std::io::Error::new(std::io::ErrorKind::NotFound, "OUT_DIR not found"))?, + ); + let cmd = Cli::command(); + let man = clap_mangen::Man::new(cmd); + let mut buffer: Vec = Default::default(); + man.render(&mut buffer)?; + std::fs::write(out_dir.join("moq-pub.1"), buffer)?; + Ok(()) +} diff --git a/moq-pub/src/cli.rs b/moq-pub/src/cli.rs new file mode 100644 index 0000000..6ebab84 --- /dev/null +++ b/moq-pub/src/cli.rs @@ -0,0 +1,27 @@ +use clap::{Parser, ValueEnum}; +use std::net; + +#[derive(Parser, Clone)] +#[command(arg_required_else_help(true))] +pub struct Cli { + #[arg(long, hide_short_help = true, default_value = "[::]:0")] + pub bind_address: net::SocketAddr, + + #[arg(short, long, default_value = "https://localhost:4443")] + pub uri: http::uri::Uri, + + #[arg(short, long, required = true, value_parser=input_parser)] + input: InputValues, +} + +fn input_parser(s: &str) -> Result { + if s == "-" { + return Ok(InputValues::Stdin); + } + Err("The only currently supported input value is: '-' (stdin)".to_string()) +} + +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] +pub enum InputValues { + Stdin, +} diff --git a/moq-pub/src/log_viewer.rs b/moq-pub/src/log_viewer.rs new file mode 100644 index 0000000..92de7ea --- /dev/null +++ b/moq-pub/src/log_viewer.rs @@ -0,0 +1,39 @@ +use log::{debug, info}; +use tokio::{select, sync::broadcast}; + +pub struct LogViewer { + incoming_ctl_receiver: broadcast::Receiver, + incoming_obj_receiver: broadcast::Receiver, +} + +impl LogViewer { + pub async fn new( + incoming: ( + broadcast::Receiver, + broadcast::Receiver, + ), + ) -> anyhow::Result { + Ok(Self { + incoming_ctl_receiver: incoming.0, + incoming_obj_receiver: incoming.1, + }) + } + pub async fn run(&mut self) -> anyhow::Result<()> { + debug!("log_viewer.run()"); + + loop { + select! { + msg = self.incoming_ctl_receiver.recv() => { + info!( + "Received incoming MOQT Control message: {:?}", + &msg? + );} + obj = self.incoming_obj_receiver.recv() => { + info!( + "Received incoming MOQT Object with header: {:?}", + &obj? + );} + } + } + } +} diff --git a/moq-pub/src/main.rs b/moq-pub/src/main.rs new file mode 100644 index 0000000..0f7b0e9 --- /dev/null +++ b/moq-pub/src/main.rs @@ -0,0 +1,60 @@ +use anyhow::Context; +use clap::Parser; +use tokio::task::JoinSet; + +mod session_runner; +use session_runner::*; + +mod media_runner; +use media_runner::*; + +mod log_viewer; +use log_viewer::*; + +mod media; +use media::*; + +mod cli; +use cli::*; + +// TODO: clap complete + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + env_logger::init(); + + let args = Cli::parse(); + + let config = Config { + addr: args.bind_address, + uri: args.uri, + }; + + let mut media = Media::new().await?; + let session_runner = SessionRunner::new(config).await?; + let mut log_viewer = LogViewer::new(session_runner.get_incoming_receivers().await).await?; + let mut media_runner = MediaRunner::new( + session_runner.get_send_objects().await, + session_runner.get_outgoing_senders().await, + session_runner.get_incoming_receivers().await, + ) + .await?; + + let mut join_set: JoinSet> = tokio::task::JoinSet::new(); + + join_set.spawn(async { session_runner.run().await.context("failed to run session runner") }); + join_set.spawn(async move { log_viewer.run().await.context("failed to run media source") }); + + // TODO: generate unique namespace with UUID and/or take a command line arg + media_runner.announce("quic.video/moq-pub-foo", media.source()).await?; + + join_set.spawn(async move { media.run().await.context("failed to run media source") }); + join_set.spawn(async move { media_runner.run().await.context("failed to run client") }); + + while let Some(res) = join_set.join_next().await { + dbg!(&res); + res??; + } + + Ok(()) +} diff --git a/moq-pub/src/media.rs b/moq-pub/src/media.rs new file mode 100644 index 0000000..4d10f3f --- /dev/null +++ b/moq-pub/src/media.rs @@ -0,0 +1,362 @@ +use anyhow::{self, Context}; +use log::debug; +use moq_transport::VarInt; +use moq_warp::model::{segment, track}; +use mp4::{self, ReadBox}; +use std::collections::HashMap; +use std::io::Cursor; +use std::sync::Arc; +use std::time; +use tokio::io::AsyncReadExt; + +pub struct Media { + // The tracks we're producing. + tracks: HashMap, + + source: Arc, +} + +impl Media { + pub async fn new() -> anyhow::Result { + let mut stdin = tokio::io::stdin(); + let ftyp = read_atom(&mut stdin).await?; + anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom"); + + let moov = read_atom(&mut stdin).await?; + anyhow::ensure!(&moov[4..8] == b"moov", "expected moov atom"); + + let mut init = ftyp; + init.extend(&moov); + + // We're going to parse the moov box. + // We have to read the moov box header to correctly advance the cursor for the mp4 crate. + let mut moov_reader = Cursor::new(&moov); + let moov_header = mp4::BoxHeader::read(&mut moov_reader)?; + + // Parse the moov box so we can detect the timescales for each track. + let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?; + + // Create a source that can be subscribed to. + let mut source = HashMap::default(); + + // Create the catalog track + let (_catalog, subscriber) = Self::create_catalog(init); + source.insert("0".to_string(), subscriber); + + let mut tracks = HashMap::new(); + + for trak in &moov.traks { + let id = trak.tkhd.track_id; + let name = id.to_string(); + //dbg!("trak name: {}", &name); + + let timescale = track_timescale(&moov, id); + + // Store the track publisher in a map so we can update it later. + let track = Track::new(&name, timescale); + source.insert(name.to_string(), track.subscribe()); + + tracks.insert(name, track); + } + + let source = Arc::new(MapSource(source)); + + Ok(Media { tracks, source }) + } + pub async fn run(&mut self) -> anyhow::Result<()> { + let mut stdin = tokio::io::stdin(); + // The current track name + let mut track_name = None; + + loop { + let atom = read_atom(&mut stdin).await?; + + let mut reader = Cursor::new(&atom); + let header = mp4::BoxHeader::read(&mut reader)?; + + match header.name { + mp4::BoxType::MoofBox => { + let moof = mp4::MoofBox::read_box(&mut reader, header.size).context("failed to read MP4")?; + + // Process the moof. + let fragment = Fragment::new(moof)?; + let name = fragment.track.to_string(); + + // Get the track for this moof. + let track = self.tracks.get_mut(&name).context("failed to find track")?; + + // Save the track ID for the next iteration, which must be a mdat. + anyhow::ensure!(track_name.is_none(), "multiple moof atoms"); + track_name.replace(name); + + // Publish the moof header, creating a new segment if it's a keyframe. + track.header(atom, fragment).context("failed to publish moof")?; + } + mp4::BoxType::MdatBox => { + // Get the track ID from the previous moof. + let name = track_name.take().context("missing moof")?; + let track = self.tracks.get_mut(&name).context("failed to find track")?; + + // Publish the mdat atom. + track.data(atom).context("failed to publish mdat")?; + } + + _ => { + // Skip unknown atoms + } + } + } + } + fn create_catalog(raw: Vec) -> (track::Publisher, track::Subscriber) { + // Create a track with a single segment containing the init data. + let mut catalog = track::Publisher::new("0"); + + // Subscribe to the catalog before we push the segment. + let subscriber = catalog.subscribe(); + + let mut segment = segment::Publisher::new(segment::Info { + sequence: VarInt::from_u32(0), // first and only segment + send_order: i32::MIN, // highest priority + expires: None, // never delete from the cache + }); + + // Add the segment and add the fragment. + catalog.push_segment(segment.subscribe()); + segment.fragments.push(raw.into()); + + // Return the catalog + (catalog, subscriber) + } + pub fn source(&self) -> Arc { + self.source.clone() + } +} + +// Read a full MP4 atom into a vector. +async fn read_atom(reader: &mut R) -> anyhow::Result> { + // Read the 8 bytes for the size + type + let mut buf = [0u8; 8]; + reader.read_exact(&mut buf).await?; + + // Convert the first 4 bytes into the size. + let size = u32::from_be_bytes(buf[0..4].try_into()?) as u64; + + let mut raw = buf.to_vec(); + + debug!("size: {}", &size); + + let mut limit = match size { + // Runs until the end of the file. + 0 => reader.take(u64::MAX), + + // The next 8 bytes are the extended size to be used instead. + 1 => { + reader.read_exact(&mut buf).await?; + let size_large = u64::from_be_bytes(buf); + anyhow::ensure!(size_large >= 16, "impossible extended box size: {}", size_large); + + reader.take(size_large - 16) + } + + 2..=7 => { + anyhow::bail!("impossible box size: {}", size) + } + + // Otherwise read based on the size. + size => reader.take(size - 8), + }; + + // Append to the vector and return it. + let read_bytes = limit.read_to_end(&mut raw).await?; + debug!("read_bytes: {}", read_bytes); + + Ok(raw) +} + +struct Track { + // The track we're producing + track: track::Publisher, + + // The current segment + segment: Option, + + // The number of units per second. + timescale: u64, + + // The number of segments produced. + sequence: u64, +} + +impl Track { + fn new(name: &str, timescale: u64) -> Self { + let track = track::Publisher::new(name); + + Self { + track, + sequence: 0, + segment: None, + timescale, + } + } + + pub fn header(&mut self, raw: Vec, fragment: Fragment) -> anyhow::Result<()> { + if let Some(segment) = self.segment.as_mut() { + if !fragment.keyframe { + // Use the existing segment + segment.fragments.push(raw.into()); + return Ok(()); + } + } + + // Otherwise make a new segment + let now = time::Instant::now(); + + // Compute the timestamp in milliseconds. + // Overflows after 583 million years, so we're fine. + let _timestamp: i32 = fragment + .timestamp(self.timescale) + .as_millis() + .try_into() + .context("timestamp too large")?; + + // The send order is simple; newer timestamps should be higher priority. + // TODO give audio a boost? + // TODO Use timestamps for prioritization again after quinn priority bug fixed + let send_order = i32::MIN; + + // Delete segments after 10s. + let expires = Some(now + time::Duration::from_secs(10)); // TODO increase this once send order is implemented + let sequence = self.sequence.try_into().context("sequence too large")?; + + self.sequence += 1; + + // Create a new segment. + let segment = segment::Info { + sequence, + expires, + send_order, + }; + + let mut segment = segment::Publisher::new(segment); + self.track.push_segment(segment.subscribe()); + + // Insert the raw atom into the segment. + segment.fragments.push(raw.into()); + + // Save for the next iteration + self.segment = Some(segment); + + // Remove any segments older than 10s. + // TODO This can only drain from the FRONT of the queue, so don't get clever with expirations. + self.track.drain_segments(now); + + Ok(()) + } + + pub fn data(&mut self, raw: Vec) -> anyhow::Result<()> { + let segment = self.segment.as_mut().context("missing segment")?; + segment.fragments.push(raw.into()); + + Ok(()) + } + + pub fn subscribe(&self) -> track::Subscriber { + self.track.subscribe() + } +} + +struct Fragment { + // The track for this fragment. + track: u32, + + // The timestamp of the first sample in this fragment, in timescale units. + timestamp: u64, + + // True if this fragment is a keyframe. + keyframe: bool, +} + +impl Fragment { + fn new(moof: mp4::MoofBox) -> anyhow::Result { + // We can't split the mdat atom, so this is impossible to support + anyhow::ensure!(moof.trafs.len() == 1, "multiple tracks per moof atom"); + let track = moof.trafs[0].tfhd.track_id; + + // Parse the moof to get some timing information to sleep. + let timestamp = sample_timestamp(&moof).expect("couldn't find timestamp"); + + // Detect if we should start a new segment. + let keyframe = sample_keyframe(&moof); + + Ok(Self { + track, + timestamp, + keyframe, + }) + } + + // Convert from timescale units to a duration. + fn timestamp(&self, timescale: u64) -> time::Duration { + time::Duration::from_millis(1000 * self.timestamp / timescale) + } +} + +fn sample_timestamp(moof: &mp4::MoofBox) -> Option { + Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time) +} + +fn sample_keyframe(moof: &mp4::MoofBox) -> bool { + for traf in &moof.trafs { + // TODO trak default flags if this is None + let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default(); + let trun = match &traf.trun { + Some(t) => t, + None => return false, + }; + + for i in 0..trun.sample_count { + let mut flags = match trun.sample_flags.get(i as usize) { + Some(f) => *f, + None => default_flags, + }; + + if i == 0 && trun.first_sample_flags.is_some() { + flags = trun.first_sample_flags.unwrap(); + } + + // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177 + let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther + let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample + + if keyframe && !non_sync { + return true; + } + } + } + + false +} + +// Find the timescale for the given track. +fn track_timescale(moov: &mp4::MoovBox, track_id: u32) -> u64 { + let trak = moov + .traks + .iter() + .find(|trak| trak.tkhd.track_id == track_id) + .expect("failed to find trak"); + + trak.mdia.mdhd.timescale as u64 +} + +pub trait Source { + fn subscribe(&self, name: &str) -> Option; +} + +#[derive(Clone, Default, Debug)] +pub struct MapSource(pub HashMap); + +impl Source for MapSource { + fn subscribe(&self, name: &str) -> Option { + self.0.get(name).cloned() + } +} diff --git a/moq-pub/src/media_runner.rs b/moq-pub/src/media_runner.rs new file mode 100644 index 0000000..b42ee26 --- /dev/null +++ b/moq-pub/src/media_runner.rs @@ -0,0 +1,151 @@ +use crate::media::{self, MapSource}; +use anyhow::bail; +use log::{debug, error}; +use moq_transport::message::Message; +use moq_transport::message::{Announce, SubscribeError}; +use moq_transport::{object, Object, VarInt}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::io::AsyncWriteExt; +use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio::task::JoinSet; + +use webtransport_generic::Session as WTSession; + +pub struct MediaRunner { + send_objects: object::Sender, + outgoing_ctl_sender: mpsc::Sender, + incoming_ctl_receiver: broadcast::Receiver, + source: Arc, +} + +impl MediaRunner { + pub async fn new( + send_objects: object::Sender, + outgoing: mpsc::Sender, + incoming: (broadcast::Receiver, broadcast::Receiver), + ) -> anyhow::Result { + let outgoing_ctl_sender = outgoing; + let (incoming_ctl_receiver, _incoming_obj_receiver) = incoming; + Ok(Self { + send_objects, + outgoing_ctl_sender, + incoming_ctl_receiver, + source: Arc::new(MapSource::default()), + }) + } + pub async fn announce(&mut self, namespace: &str, source: Arc) -> anyhow::Result<()> { + debug!("media_runner.announce()"); + // Only allow one souce at a time for now? + self.source = source; + + // ANNOUNCE the namespace + self.outgoing_ctl_sender + .send(Message::Announce(Announce { + track_namespace: namespace.to_string(), + })) + .await?; + + // wait for the go ahead + loop { + match self.incoming_ctl_receiver.recv().await? { + Message::AnnounceOk(_) => { + break; + } + Message::AnnounceError(announce_error) => { + error!( + "Failed to announce namespace '{}' with error code '{}' and reason '{}'", + &namespace, &announce_error.code, &announce_error.reason + ); + // TODO: Think about how to recover here? Retry? + bail!("Failed to announce namespace"); + } + _ => { + // TODO: work out how to ignore unknown/unrelated messages here without consuming them prematurely + } + } + } + + Ok(()) + } + + pub async fn run(&mut self) -> anyhow::Result<()> { + debug!("media_runner.run()"); + let source = self.source.clone(); + let mut join_set: JoinSet> = tokio::task::JoinSet::new(); + let mut track_dispatcher: HashMap> = HashMap::new(); + let mut incoming_ctl_receiver = self.incoming_ctl_receiver.resubscribe(); + let outgoing_ctl_sender = self.outgoing_ctl_sender.clone(); + + // Pre-spawn tasks for each track we have + // and let them .await on receiving the go ahead via a channel + for (track_name, track) in source.0.iter() { + let (sender, mut receiver) = tokio::sync::mpsc::channel(1); + track_dispatcher.insert(track_name.to_string(), sender); + let mut objects = self.send_objects.clone(); + let mut track = track.clone(); + join_set.spawn(async move { + receiver.recv().await.ok_or(anyhow::anyhow!("channel closed"))?; + loop { + let mut segment = track.next_segment().await?; + + debug!("segment: {:?}", &segment); + let object = Object { + track: VarInt::from_u32(track.name.parse::()?), + group: segment.sequence, + sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object + send_order: segment.send_order, + }; + debug!("object: {:?}", &object); + + let mut stream = objects.open(object).await?; + + // Write each fragment as they are available. + while let Some(fragment) = segment.fragments.next().await { + stream.write_all(&fragment).await?; + } + } + }); + } + + join_set.spawn(async move { + loop { + if let Message::Subscribe(subscribe) = incoming_ctl_receiver.recv().await? { + debug!("Received a subscription request"); + + let track_id = subscribe.track_id; + debug!("Looking up track_id: {}", &track_id); + // Look up track in source + match source.0.get(&track_id.to_string()) { + None => { + // if track !exist, send subscribe error + outgoing_ctl_sender + .send(Message::SubscribeError(SubscribeError { + track_id: subscribe.track_id, + code: moq_transport::VarInt::from_u32(1), + reason: "Only bad reasons (don't know what that track is)".to_string(), + })) + .await?; + } + // if track exists, send go-ahead signal to unblock task to send data to subscriber + Some(track) => { + debug!("We have the track! (Good news everyone)"); + track_dispatcher + .get(&track.name) + .ok_or(anyhow::anyhow!("missing task for track"))? + .send(()) + .await?; + } + }; + } + } + }); + + while let Some(res) = join_set.join_next().await { + debug!("MediaRunner task finished with result: {:?}", &res); + } + + Ok(()) + } +} diff --git a/moq-pub/src/session_runner.rs b/moq-pub/src/session_runner.rs new file mode 100644 index 0000000..23d2953 --- /dev/null +++ b/moq-pub/src/session_runner.rs @@ -0,0 +1,127 @@ +use anyhow::Context; +use log::debug; +use moq_transport::{object, Object}; +use std::net; +use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio::task::JoinSet; + +pub struct SessionRunner { + moq_transport_session: moq_transport::Session, + outgoing_ctl_sender: mpsc::Sender, + outgoing_ctl_receiver: mpsc::Receiver, + incoming_ctl_sender: broadcast::Sender, + incoming_obj_sender: broadcast::Sender, +} + +pub struct Config { + pub addr: net::SocketAddr, + pub uri: http::uri::Uri, +} + +impl SessionRunner { + pub async fn new(config: Config) -> anyhow::Result { + let mut roots = rustls::RootCertStore::empty(); + for cert in rustls_native_certs::load_native_certs().expect("could not load platform certs") { + roots.add(&rustls::Certificate(cert.0)).unwrap(); + } + + let mut tls_config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(roots) + .with_no_client_auth(); + + tls_config.alpn_protocols = vec![webtransport_quinn::ALPN.to_vec()]; // this one is important + + let arc_tls_config = std::sync::Arc::new(tls_config); + let quinn_client_config = quinn::ClientConfig::new(arc_tls_config); + + let mut endpoint = quinn::Endpoint::client(config.addr)?; + endpoint.set_default_client_config(quinn_client_config); + + let webtransport_session = webtransport_quinn::connect(&endpoint, &config.uri) + .await + .context("failed to create WebTransport session")?; + let moq_transport_session = + moq_transport::Session::connect(webtransport_session, moq_transport::setup::Role::Both) + .await + .context("failed to create MoQ Transport session")?; + + // outgoing ctl msgs + let (outgoing_ctl_sender, outgoing_ctl_receiver) = mpsc::channel(5); + // incoming ctl msg + let (incoming_ctl_sender, _incoming_ctl_receiver) = broadcast::channel(5); + // incoming objs + let (incoming_obj_sender, _incoming_obj_receiver) = broadcast::channel(5); + + Ok(SessionRunner { + moq_transport_session, + outgoing_ctl_sender, + outgoing_ctl_receiver, + incoming_ctl_sender, + incoming_obj_sender, + }) + } + pub async fn get_outgoing_senders(&self) -> mpsc::Sender { + self.outgoing_ctl_sender.clone() + } + pub async fn get_incoming_receivers( + &self, + ) -> ( + broadcast::Receiver, + broadcast::Receiver, + ) { + ( + self.incoming_ctl_sender.subscribe(), + self.incoming_obj_sender.subscribe(), + ) + } + pub async fn run(mut self) -> anyhow::Result<()> { + debug!("session_runner.run()"); + + let mut join_set: JoinSet> = tokio::task::JoinSet::new(); + + // Send outgoing control messages + join_set.spawn(async move { + loop { + let msg = self + .outgoing_ctl_receiver + .recv() + .await + .ok_or(anyhow::anyhow!("error receiving outbound control message"))?; + debug!("Sending outgoing MOQT Control Message: {:?}", &msg); + self.moq_transport_session.send_control.send(msg).await?; + } + }); + + // Route incoming Control messages + join_set.spawn(async move { + loop { + let msg = self.moq_transport_session.recv_control.recv().await?; + self.incoming_ctl_sender.send(msg)?; + } + }); + + // Route incoming Objects headers + // NOTE: Only sends the headers for incoming objects, not the associated streams + // We don't currently expose any way to read incoming bytestreams because we don't expect any + join_set.spawn(async move { + loop { + let receive_stream = self.moq_transport_session.recv_objects.recv().await?; + + self.incoming_obj_sender.send(receive_stream.0)?; + } + }); + + while let Some(res) = join_set.join_next().await { + debug!("SessionRunner task finished with result: {:?}", &res); + let _ = res?; // if we finish, it'll be with an error, which we can return + } + + Ok(()) + } + + pub async fn get_send_objects(&self) -> object::Sender { + self.moq_transport_session.send_objects.clone() + } +} diff --git a/moq-quinn/Cargo.toml b/moq-quinn/Cargo.toml index 815adbb..ac4b500 100644 --- a/moq-quinn/Cargo.toml +++ b/moq-quinn/Cargo.toml @@ -11,6 +11,8 @@ edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] +default-run = "moq-quinn" + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html