Remove the file source, since it doesn't loop. (#47)

ffmpeg -> moq-warp coming soon
This commit is contained in:
kixelated 2023-07-21 21:03:09 -07:00 committed by GitHub
parent e99ecee40a
commit 891d6b167b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 21 additions and 547 deletions

73
Cargo.lock generated
View File

@ -944,7 +944,6 @@ name = "moq-transport"
version = "0.1.0"
dependencies = [
"bytes",
"log",
"thiserror",
]
@ -971,7 +970,6 @@ dependencies = [
"log",
"moq-transport",
"moq-transport-quinn",
"mp4",
"quinn",
"ring",
"rustls 0.21.2",
@ -979,20 +977,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "mp4"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "509348cba250e7b852a875100a2ddce7a36ee3abf881a681c756670c1774264d"
dependencies = [
"byteorder",
"bytes",
"num-rational",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "multer"
version = "2.1.0"
@ -1011,49 +995,6 @@ dependencies = [
"version_check",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-integer"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-rational"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0"
dependencies = [
"autocfg",
"num-bigint",
"num-integer",
"num-traits",
"serde",
]
[[package]]
name = "num-traits"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.15.0"
@ -1457,20 +1398,6 @@ name = "serde"
version = "1.0.164"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.164"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"

View File

@ -8,18 +8,6 @@ It requires a client, such as [moq-js](https://github.com/kixelated/moq-js).
## Setup
### Media
This demo simulates a live stream by reading a file from disk and sleeping based on media timestamps. Obviously you should hook this up to a real live stream to do anything useful.
Download your favorite media file and convert it to fragmented MP4.
This requires [ffmpeg](https://ffmpeg.org/)
```
wget http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4 -O media/source.mp4
./media/generate
```
### Certificates
Unfortunately, QUIC mandates TLS and makes local development difficult.
@ -50,9 +38,7 @@ Use a [MoQ client](https://github.com/kixelated/moq-js) to connect to the server
## License
Licensed under either of
Licensed under either:
* Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
at your option.
- Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)

1
media/.gitignore vendored
View File

@ -1 +0,0 @@
*.mp4

View File

@ -1,12 +0,0 @@
#!/bin/bash
cd "$(dirname "$0")"
# empty_moov: Uses moof fragments instead of one giant moov/mdat pair.
# frag_every_frame: Creates a moof for each frame.
# separate_moof: Splits audio and video into separate moof flags.
# omit_tfhd_offset: Removes absolute byte offsets so we can fragment.
ffmpeg -i source.mp4 -y \
-c copy \
-movflags empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset \
fragmented.mp4 2>&1

View File

@ -5,8 +5,6 @@ use clap::Parser;
use ring::digest::{digest, SHA256};
use warp::Filter;
use moq_warp::{relay, source};
mod server;
use server::*;
@ -24,10 +22,6 @@ struct Cli {
/// Use the private key at this path
#[arg(short, long, default_value = "cert/localhost.key")]
key: path::PathBuf,
/// Use the media file at this path
#[arg(short, long, default_value = "media/fragmented.mp4")]
media: path::PathBuf,
}
#[tokio::main]
@ -39,20 +33,11 @@ async fn main() -> anyhow::Result<()> {
// Create a web server to serve the fingerprint
let serve = serve_http(args.clone());
// Create a fake media source from disk.
let media = source::File::new(args.media).context("failed to open file source")?;
let broker = relay::broker::Broadcasts::new();
broker
.announce("quic.video/demo", media.source())
.context("failed to announce file source")?;
// Create a server to actually serve the media
let config = ServerConfig {
addr: args.addr,
cert: args.cert,
key: args.key,
broker,
};
let server = Server::new(config).context("failed to create server")?;
@ -60,7 +45,6 @@ async fn main() -> anyhow::Result<()> {
// Run all of the above
tokio::select! {
res = server.run() => res.context("failed to run server"),
res = media.run() => res.context("failed to run media source"),
res = serve => res.context("failed to run HTTP server"),
}
}

View File

@ -20,7 +20,6 @@ pub struct ServerConfig {
pub addr: net::SocketAddr,
pub cert: path::PathBuf,
pub key: path::PathBuf,
pub broker: broker::Broadcasts,
}
impl Server {
@ -63,7 +62,7 @@ impl Server {
server_config.transport = sync::Arc::new(transport_config);
let server = quinn::Endpoint::server(server_config, config.addr)?;
let broker = config.broker;
let broker = broker::Broadcasts::new();
let conns = JoinSet::new();

View File

@ -19,7 +19,7 @@ moq-transport = { path = "../moq-transport" }
quinn = "0.10"
http = "0.2"
webtransport-quinn = "0.4.2"
tokio = { version = "1.27", features = ["macros"] }
tokio = { version = "1.27", features = ["macros", "io-util"] }
bytes = "1"
log = "0.4"
anyhow = "1.0.70"

View File

@ -1,15 +1,15 @@
[package]
name = "moq-transport"
description = "Media over QUIC"
authors = [ "Luke Curley" ]
authors = ["Luke Curley"]
repository = "https://github.com/kixelated/moq-rs"
license = "MIT OR Apache-2.0"
version = "0.1.0"
edition = "2021"
keywords = [ "quic", "http3", "webtransport", "media", "live" ]
categories = [ "multimedia", "network-programming", "web-programming" ]
keywords = ["quic", "http3", "webtransport", "media", "live"]
categories = ["multimedia", "network-programming", "web-programming"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -17,4 +17,3 @@ categories = [ "multimedia", "network-programming", "web-programming" ]
[dependencies]
bytes = "1"
thiserror = "1.0.21"
log = "0.4"

View File

@ -19,7 +19,6 @@ moq-transport = { path = "../moq-transport" }
moq-transport-quinn = { path = "../moq-transport-quinn" }
tokio = "1.27"
mp4 = "0.13.0"
anyhow = "1.0.70"
log = "0.4" # TODO remove

View File

@ -1,3 +1,2 @@
pub mod model;
pub mod relay;
pub mod source;

View File

@ -1,7 +1,7 @@
use super::watch;
use std::sync::Arc;
// Use Arc to avoid cloning the entire MP4 data for each subscriber.
// Use Arc to avoid cloning the data for each subscriber.
pub type Shared = Arc<Vec<u8>>;
// TODO combine fragments into the same buffer, instead of separate buffers.

View File

@ -1,5 +1,5 @@
use crate::model::{broadcast, track, watch};
use crate::source::Source;
use crate::relay::contribute;
use std::collections::hash_map::HashMap;
use std::sync::{Arc, Mutex};
@ -15,7 +15,7 @@ pub struct Broadcasts {
#[derive(Default)]
struct BroadcastsInner {
// TODO Automatically reclaim dropped sources.
lookup: HashMap<String, Arc<dyn Source + Send + Sync>>,
lookup: HashMap<String, Arc<contribute::Broadcast>>,
updates: watch::Publisher<Update>,
}
@ -47,7 +47,7 @@ impl Broadcasts {
(keys, updates)
}
pub fn announce(&self, namespace: &str, source: Arc<dyn Source + Send + Sync>) -> anyhow::Result<()> {
pub fn announce(&self, namespace: &str, source: Arc<contribute::Broadcast>) -> anyhow::Result<()> {
let mut this = self.inner.lock().unwrap();
if let Some(_existing) = this.lookup.get(namespace) {
@ -71,7 +71,6 @@ impl Broadcasts {
pub fn subscribe(&self, namespace: &str, name: &str) -> Option<track::Subscriber> {
let this = self.inner.lock().unwrap();
this.lookup.get(namespace).and_then(|v| v.subscribe(name))
}
}

View File

@ -13,7 +13,6 @@ use anyhow::Context;
use super::{broker, control};
use crate::model::{broadcast, segment, track};
use crate::source::Source;
// TODO experiment with making this Clone, so every task can have its own copy.
pub struct Session {
@ -215,10 +214,8 @@ impl Broadcast {
queue: publishers.sender.clone(),
}
}
}
impl Source for Broadcast {
fn subscribe(&self, name: &str) -> Option<track::Subscriber> {
pub fn subscribe(&self, name: &str) -> Option<track::Subscriber> {
let mut subscriptions = self.subscriptions.lock().unwrap();
// Check if there's an existing subscription.
@ -255,8 +252,8 @@ pub struct Publishers {
sender: mpsc::UnboundedSender<(String, track::Publisher)>,
}
impl Publishers {
pub fn new() -> Self {
impl Default for Publishers {
fn default() -> Self {
let (sender, receiver) = mpsc::unbounded_channel();
Self {
@ -266,6 +263,12 @@ impl Publishers {
receiver,
}
}
}
impl Publishers {
pub fn new() -> Self {
Self::default()
}
pub fn push_segment(&mut self, id: VarInt, segment: segment::Subscriber) -> anyhow::Result<()> {
let track = self.tracks.get_mut(&id).context("no track with that ID")?;

View File

@ -1,388 +0,0 @@
use std::io::Read;
use std::{fs, io, path, time};
use mp4::ReadBox;
use anyhow::Context;
use std::collections::HashMap;
use std::sync::Arc;
use moq_transport::VarInt;
use super::MapSource;
use crate::model::{segment, track};
pub struct File {
// We read the file once, in order, and don't seek backwards.
reader: io::BufReader<fs::File>,
// The catalog for the broadcast, held just so it's closed only when the broadcast is over.
_catalog: track::Publisher,
// The tracks we're producing.
tracks: HashMap<String, Track>,
// A subscribable source.
source: Arc<MapSource>,
}
impl File {
pub fn new(path: path::PathBuf) -> anyhow::Result<Self> {
let f = fs::File::open(path)?;
let mut reader = io::BufReader::new(f);
let ftyp = read_atom(&mut reader)?;
anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom");
let moov = read_atom(&mut reader)?;
anyhow::ensure!(&moov[4..8] == b"moov", "expected moov atom");
let mut init = ftyp;
init.extend(&moov);
// We're going to parse the moov box.
// We have to read the moov box header to correctly advance the cursor for the mp4 crate.
let mut moov_reader = io::Cursor::new(&moov);
let moov_header = mp4::BoxHeader::read(&mut moov_reader)?;
// Parse the moov box so we can detect the timescales for each track.
let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?;
// Create a source that can be subscribed to.
let mut source = HashMap::default();
// Create the catalog track
let (_catalog, subscriber) = Self::create_catalog(init);
source.insert("0".to_string(), subscriber);
let mut tracks = HashMap::new();
for trak in &moov.traks {
let id = trak.tkhd.track_id;
let name = id.to_string();
let timescale = track_timescale(&moov, id);
// Store the track publisher in a map so we can update it later.
let track = Track::new(&name, timescale);
source.insert(name.to_string(), track.subscribe());
tracks.insert(name, track);
}
let source = Arc::new(MapSource(source));
Ok(Self {
reader,
_catalog,
tracks,
source,
})
}
fn create_catalog(raw: Vec<u8>) -> (track::Publisher, track::Subscriber) {
// Create a track with a single segment containing the init data.
let mut catalog = track::Publisher::new("0");
// Subscribe to the catalog before we push the segment.
let subscriber = catalog.subscribe();
let mut segment = segment::Publisher::new(segment::Info {
sequence: VarInt::from_u32(0), // first and only segment
send_order: VarInt::from_u32(0), // highest priority
expires: None, // never delete from the cache
});
// Add the segment and add the fragment.
catalog.push_segment(segment.subscribe());
segment.fragments.push(raw.into());
// Return the catalog
(catalog, subscriber)
}
pub async fn run(mut self) -> anyhow::Result<()> {
// The timestamp when the broadcast "started", so we can sleep to simulate a live stream.
let start = tokio::time::Instant::now();
// The current track name
let mut track_name = None;
loop {
let atom = read_atom(&mut self.reader)?;
let mut reader = io::Cursor::new(&atom);
let header = mp4::BoxHeader::read(&mut reader)?;
match header.name {
mp4::BoxType::MoofBox => {
let moof = mp4::MoofBox::read_box(&mut reader, header.size).context("failed to read MP4")?;
// Process the moof.
let fragment = Fragment::new(moof)?;
let name = fragment.track.to_string();
// Get the track for this moof.
let track = self.tracks.get_mut(&name).context("failed to find track")?;
// Sleep until we should publish this sample.
let timestamp = time::Duration::from_millis(1000 * fragment.timestamp / track.timescale);
tokio::time::sleep_until(start + timestamp).await;
// Save the track ID for the next iteration, which must be a mdat.
anyhow::ensure!(track_name.is_none(), "multiple moof atoms");
track_name.replace(name);
// Publish the moof header, creating a new segment if it's a keyframe.
track.header(atom, fragment).context("failed to publish moof")?;
}
mp4::BoxType::MdatBox => {
// Get the track ID from the previous moof.
let name = track_name.take().context("missing moof")?;
let track = self.tracks.get_mut(&name).context("failed to find track")?;
// Publish the mdat atom.
track.data(atom).context("failed to publish mdat")?;
}
_ => {
// Skip unknown atoms
}
}
}
}
pub fn source(&self) -> Arc<MapSource> {
self.source.clone()
}
}
struct Track {
// The track we're producing
track: track::Publisher,
// The current segment
segment: Option<segment::Publisher>,
// The number of units per second.
timescale: u64,
// The number of segments produced.
sequence: u64,
}
impl Track {
fn new(name: &str, timescale: u64) -> Self {
let track = track::Publisher::new(name);
Self {
track,
sequence: 0,
segment: None,
timescale,
}
}
pub fn header(&mut self, raw: Vec<u8>, fragment: Fragment) -> anyhow::Result<()> {
if let Some(segment) = self.segment.as_mut() {
if !fragment.keyframe {
// Use the existing segment
segment.fragments.push(raw.into());
return Ok(());
}
}
// Otherwise make a new segment
let now = time::Instant::now();
// Compute the timestamp in milliseconds.
// Overflows after 583 million years, so we're fine.
let timestamp = fragment
.timestamp(self.timescale)
.as_millis()
.try_into()
.context("timestamp too large")?;
// The send order is simple; newer timestamps are higher priority.
// TODO give audio a boost?
let send_order = VarInt::MAX
.into_inner()
.checked_sub(timestamp)
.context("timestamp too large")?
.try_into()
.unwrap();
// Delete segments after 10s.
let expires = Some(now + time::Duration::from_secs(2)); // TODO increase this once send order is implemented
let sequence = self.sequence.try_into().context("sequence too large")?;
self.sequence += 1;
// Create a new segment.
let segment = segment::Info {
sequence,
expires,
send_order,
};
let mut segment = segment::Publisher::new(segment);
self.track.push_segment(segment.subscribe());
// Insert the raw atom into the segment.
segment.fragments.push(raw.into());
// Save for the next iteration
self.segment = Some(segment);
// Remove any segments older than 10s.
// TODO This can only drain from the FRONT of the queue, so don't get clever with expirations.
self.track.drain_segments(now);
Ok(())
}
pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
let segment = self.segment.as_mut().context("missing segment")?;
segment.fragments.push(raw.into());
Ok(())
}
pub fn subscribe(&self) -> track::Subscriber {
self.track.subscribe()
}
}
struct Fragment {
// The track for this fragment.
track: u32,
// The timestamp of the first sample in this fragment, in timescale units.
timestamp: u64,
// True if this fragment is a keyframe.
keyframe: bool,
}
impl Fragment {
fn new(moof: mp4::MoofBox) -> anyhow::Result<Self> {
// We can't split the mdat atom, so this is impossible to support
anyhow::ensure!(moof.trafs.len() == 1, "multiple tracks per moof atom");
let track = moof.trafs[0].tfhd.track_id;
// Parse the moof to get some timing information to sleep.
let timestamp = sample_timestamp(&moof).expect("couldn't find timestamp");
// Detect if we should start a new segment.
let keyframe = sample_keyframe(&moof);
Ok(Self {
track,
timestamp,
keyframe,
})
}
// Convert from timescale units to a duration.
fn timestamp(&self, timescale: u64) -> time::Duration {
time::Duration::from_millis(1000 * self.timestamp / timescale)
}
}
// Read a full MP4 atom into a vector.
fn read_atom<R: Read>(reader: &mut R) -> anyhow::Result<Vec<u8>> {
// Read the 8 bytes for the size + type
let mut buf = [0u8; 8];
reader.read_exact(&mut buf)?;
// Convert the first 4 bytes into the size.
let size = u32::from_be_bytes(buf[0..4].try_into()?) as u64;
//let typ = &buf[4..8].try_into().ok().unwrap();
let mut raw = buf.to_vec();
let mut limit = match size {
// Runs until the end of the file.
0 => reader.take(u64::MAX),
// The next 8 bytes are the extended size to be used instead.
1 => {
reader.read_exact(&mut buf)?;
let size_large = u64::from_be_bytes(buf);
anyhow::ensure!(size_large >= 16, "impossible extended box size: {}", size_large);
reader.take(size_large - 16)
}
2..=7 => {
anyhow::bail!("impossible box size: {}", size)
}
// Otherwise read based on the size.
size => reader.take(size - 8),
};
// Append to the vector and return it.
limit.read_to_end(&mut raw)?;
Ok(raw)
}
// Find the timescale for the given track.
fn track_timescale(moov: &mp4::MoovBox, track_id: u32) -> u64 {
let trak = moov
.traks
.iter()
.find(|trak| trak.tkhd.track_id == track_id)
.expect("failed to find trak");
trak.mdia.mdhd.timescale as u64
}
fn sample_keyframe(moof: &mp4::MoofBox) -> bool {
for traf in &moof.trafs {
// TODO trak default flags if this is None
let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default();
let trun = match &traf.trun {
Some(t) => t,
None => return false,
};
for i in 0..trun.sample_count {
let mut flags = match trun.sample_flags.get(i as usize) {
Some(f) => *f,
None => default_flags,
};
if i == 0 && trun.first_sample_flags.is_some() {
flags = trun.first_sample_flags.unwrap();
}
// https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177
let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther
let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample
if keyframe && !non_sync {
return true;
}
}
}
false
}
fn sample_timestamp(moof: &mp4::MoofBox) -> Option<u64> {
Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time)
}
/*
fn track_type(moov: &mp4::MoovBox, track_id: u32) -> mp4::TrackType {
let trak = moov
.traks
.iter()
.find(|trak| trak.tkhd.track_id == track_id)
.expect("failed to find trak");
mp4::TrackType::try_from(&trak.mdia.hdlr.handler_type).expect("unknown track type")
}
*/

View File

@ -1,20 +0,0 @@
mod file;
pub use file::*;
use crate::model::track;
use std::collections::HashMap;
// TODO move to model::Broadcast?
pub trait Source {
fn subscribe(&self, name: &str) -> Option<track::Subscriber>;
}
#[derive(Clone, Default)]
pub struct MapSource(pub HashMap<String, track::Subscriber>);
impl Source for MapSource {
fn subscribe(&self, name: &str) -> Option<track::Subscriber> {
self.0.get(name).cloned()
}
}