From fc958e11ae62d79d94576d16559b53f3806e126f Mon Sep 17 00:00:00 2001 From: kixelated Date: Wed, 24 May 2023 12:55:36 -0700 Subject: [PATCH] Split audio into 1s streams (#19) --- server/src/media/source.rs | 63 ++++++++++----- server/src/session/mod.rs | 60 ++++++++++----- web/src/mp4/index.ts | 2 - web/src/mp4/init.ts | 43 ----------- web/src/player/decoder.ts | 153 ++++++++++++++++++++++++------------- 5 files changed, 181 insertions(+), 140 deletions(-) delete mode 100644 web/src/mp4/init.ts diff --git a/server/src/media/source.rs b/server/src/media/source.rs index 660647e..9a87c0a 100644 --- a/server/src/media/source.rs +++ b/server/src/media/source.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, VecDeque}; +use std::collections::VecDeque; use std::io::Read; use std::{fs, io, time}; @@ -17,8 +17,8 @@ pub struct Source { // The initialization payload; ftyp + moov boxes. pub init: Vec, - // The timescale used for each track. - timescales: HashMap, + // The parsed moov box. + moov: mp4::MoovBox, // Any fragments parsed and ready to be returned by next(). fragments: VecDeque, @@ -34,7 +34,10 @@ pub struct Fragment { // Whether this fragment is a keyframe. pub keyframe: bool, - // The timestamp of the fragment, in milliseconds, to simulate a live stream. + // The number of samples that make up a second (ex. ms = 1000) + pub timescale: u64, + + // The timestamp of the fragment, in timescale units, to simulate a live stream. pub timestamp: u64, } @@ -65,7 +68,7 @@ impl Source { reader, start, init, - timescales: timescales(&moov), + moov, fragments: VecDeque::new(), }) } @@ -101,11 +104,20 @@ impl Source { anyhow::bail!("multiple tracks per moof atom") } + let track_id = moof.trafs[0].tfhd.track_id; + let timestamp = sample_timestamp(&moof).expect("couldn't find timestamp"); + + // Detect if this is a keyframe. + let keyframe = sample_keyframe(&moof); + + let timescale = track_timescale(&self.moov, track_id); + self.fragments.push_back(Fragment { - track_id: moof.trafs[0].tfhd.track_id, + track_id, data: atom, - keyframe: has_keyframe(&moof), - timestamp: first_timestamp(&moof).expect("couldn't find timestamp"), + keyframe, + timescale, + timestamp, }) } mp4::BoxType::MdatBox => { @@ -115,6 +127,7 @@ impl Source { track_id: moof.track_id, data: atom, keyframe: false, + timescale: moof.timescale, timestamp: moof.timestamp, }); @@ -131,12 +144,8 @@ impl Source { // Simulate a live stream by sleeping until the next timestamp in the media. pub fn timeout(&self) -> Option { let next = self.fragments.front()?; - let timestamp = next.timestamp; - // Find the timescale for the track. - let timescale = self.timescales.get(&next.track_id).unwrap(); - - let delay = time::Duration::from_millis(1000 * timestamp / *timescale as u64); + let delay = time::Duration::from_millis(1000 * next.timestamp / next.timescale); let elapsed = self.start.elapsed(); delay.checked_sub(elapsed) @@ -182,7 +191,18 @@ pub fn read_atom(reader: &mut R) -> anyhow::Result> { Ok(raw) } -fn has_keyframe(moof: &mp4::MoofBox) -> bool { +// Find the timescale for the given track. +fn track_timescale(moov: &mp4::MoovBox, track_id: u32) -> u64 { + let trak = moov + .traks + .iter() + .find(|trak| trak.tkhd.track_id == track_id) + .expect("failed to find trak"); + + trak.mdia.mdhd.timescale as u64 +} + +fn sample_keyframe(moof: &mp4::MoofBox) -> bool { for traf in &moof.trafs { // TODO trak default flags if this is None let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default(); @@ -214,13 +234,18 @@ fn has_keyframe(moof: &mp4::MoofBox) -> bool { false } -fn first_timestamp(moof: &mp4::MoofBox) -> Option { +fn sample_timestamp(moof: &mp4::MoofBox) -> Option { Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time) } -fn timescales(moov: &mp4::MoovBox) -> HashMap { - moov.traks +/* +fn track_type(moov: &mp4::MoovBox, track_id: u32) -> mp4::TrackType { + let trak = moov + .traks .iter() - .map(|trak| (trak.tkhd.track_id, trak.mdia.mdhd.timescale)) - .collect() + .find(|trak| trak.tkhd.track_id == track_id) + .expect("failed to find trak"); + + mp4::TrackType::try_from(&trak.mdia.hdlr.handler_type).expect("unknown track type") } +*/ diff --git a/server/src/session/mod.rs b/server/src/session/mod.rs index 2a28200..1282b92 100644 --- a/server/src/session/mod.rs +++ b/server/src/session/mod.rs @@ -10,9 +10,25 @@ use crate::{media, transport}; #[derive(Default)] pub struct Session { + // The media source, configured on CONNECT. media: Option, - streams: transport::Streams, // An easy way of buffering stream data. - tracks: hmap::HashMap, // map from track_id to current stream_id + + // A helper for automatically buffering stream data. + streams: transport::Streams, + + // Map from track_id to the the Track state. + tracks: hmap::HashMap, +} + +pub struct Track { + // Current stream_id + stream_id: Option, + + // The timescale used for this track. + timescale: u64, + + // The timestamp of the last keyframe. + keyframe: u64, } impl transport::App for Session { @@ -95,25 +111,27 @@ impl Session { None => return Ok(()), }; - let stream_id = match self.tracks.get(&fragment.track_id) { - // Close the old stream. - Some(stream_id) if fragment.keyframe => { - self.streams.send(conn, *stream_id, &[], true)?; - None + // Get the track state or insert a new entry. + let track = self.tracks.entry(fragment.track_id).or_insert_with(|| Track { + stream_id: None, + timescale: fragment.timescale, + keyframe: 0, + }); + + if let Some(stream_id) = track.stream_id { + // Existing stream, check if we should close it. + if fragment.keyframe && fragment.timestamp >= track.keyframe + track.timescale { + // Close the existing stream + self.streams.send(conn, stream_id, &[], true)?; + + // Unset the stream id so we create a new one. + track.stream_id = None; + track.keyframe = fragment.timestamp; } + } - // Use the existing stream - Some(stream_id) => Some(*stream_id), - - // No existing stream. - _ => None, - }; - - let stream_id = match stream_id { - // Use the existing stream, + let stream_id = match track.stream_id { Some(stream_id) => stream_id, - - // Open a new stream. None => { // Create a new unidirectional stream. let stream_id = session.open_stream(conn, false)?; @@ -134,9 +152,6 @@ impl Session { let data = message.serialize()?; self.streams.send(conn, stream_id, &data, false)?; - // Keep a mapping from the track id to the current stream id. - self.tracks.insert(fragment.track_id, stream_id); - stream_id } }; @@ -145,6 +160,9 @@ impl Session { let data = fragment.data.as_slice(); self.streams.send(conn, stream_id, data, false)?; + // Save the stream_id for the next fragment. + track.stream_id = Some(stream_id); + Ok(()) } } diff --git a/web/src/mp4/index.ts b/web/src/mp4/index.ts index 8c4cd6f..f520a0e 100644 --- a/web/src/mp4/index.ts +++ b/web/src/mp4/index.ts @@ -12,5 +12,3 @@ export { ISOFile, Sample, } from "mp4box" - -export { Init, InitParser } from "./init" diff --git a/web/src/mp4/init.ts b/web/src/mp4/init.ts deleted file mode 100644 index 5ff2802..0000000 --- a/web/src/mp4/init.ts +++ /dev/null @@ -1,43 +0,0 @@ -import * as MP4 from "./index" - -export interface Init { - raw: MP4.ArrayBuffer - info: MP4.Info -} - -export class InitParser { - mp4box: MP4.File - offset: number - - raw: MP4.ArrayBuffer[] - info: Promise - - constructor() { - this.mp4box = MP4.New() - this.raw = [] - this.offset = 0 - - // Create a promise that gets resolved once the init segment has been parsed. - this.info = new Promise((resolve, reject) => { - this.mp4box.onError = reject - this.mp4box.onReady = resolve - }) - } - - push(data: Uint8Array) { - // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately - const box = new Uint8Array(data.byteLength) - box.set(data) - - // and for some reason we need to modify the underlying ArrayBuffer with fileStart - const buffer = box.buffer as MP4.ArrayBuffer - buffer.fileStart = this.offset - - // Parse the data - this.offset = this.mp4box.appendBuffer(buffer) - this.mp4box.flush() - - // Add the box to our queue of chunks - this.raw.push(buffer) - } -} diff --git a/web/src/player/decoder.ts b/web/src/player/decoder.ts index bec4bad..889cfaa 100644 --- a/web/src/player/decoder.ts +++ b/web/src/player/decoder.ts @@ -3,33 +3,48 @@ import * as MP4 from "../mp4" import * as Stream from "../stream" import Renderer from "./renderer" +import { Deferred } from "../util" export default class Decoder { - init: MP4.InitParser decoders: Map renderer: Renderer + init: Deferred + constructor(renderer: Renderer) { - this.init = new MP4.InitParser() + this.init = new Deferred() this.decoders = new Map() this.renderer = renderer } async receiveInit(msg: Message.Init) { + const init = new Array() + let offset = 0 + const stream = new Stream.Reader(msg.reader, msg.buffer) for (;;) { const data = await stream.read() if (!data) break - this.init.push(data) + // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately + const box = new Uint8Array(data.byteLength) + box.set(data) + + // and for some reason we need to modify the underlying ArrayBuffer with fileStart + const buffer = box.buffer as MP4.ArrayBuffer + buffer.fileStart = offset + + // Add the box to our queue of chunks + init.push(buffer) + + offset += data.byteLength } - // TODO make sure the init segment is fully received + this.init.resolve(init) } async receiveSegment(msg: Message.Segment) { // Wait for the init segment to be fully received and parsed - await this.init.info const input = MP4.New() input.onSamples = this.onSamples.bind(this) @@ -42,11 +57,12 @@ export default class Decoder { input.start() } - // MP4box requires us to reparse the init segment unfortunately + // MP4box requires us to parse the init segment for each segment unfortunately + // TODO If this sees production usage, I would recommend caching this somehow. let offset = 0 - for (const raw of this.init.raw) { - raw.fileStart = offset + const init = await this.init.promise + for (const raw of init) { offset = input.appendBuffer(raw) } @@ -74,54 +90,20 @@ export default class Decoder { } } - onSamples(track_id: number, track: MP4.Track, samples: MP4.Sample[]) { - let decoder = this.decoders.get(track_id) + onSamples(_track_id: number, track: MP4.Track, samples: MP4.Sample[]) { + if (!track.track_width) { + // TODO ignoring audio to debug + return + } - if (!decoder) { + let decoder + if (isVideoTrack(track)) { // We need a sample to initalize the video decoder, because of mp4box limitations. - const sample = samples[0] - - if (isVideoTrack(track)) { - // Configure the decoder using the AVC box for H.264 - // TODO it should be easy to support other codecs, just need to know the right boxes. - const avcc = sample.description.avcC - if (!avcc) throw new Error("TODO only h264 is supported") - - const description = new MP4.Stream(new Uint8Array(avcc.size), 0, false) - avcc.write(description) - - const videoDecoder = new VideoDecoder({ - output: this.renderer.push.bind(this.renderer), - error: console.warn, - }) - - videoDecoder.configure({ - codec: track.codec, - codedHeight: track.video.height, - codedWidth: track.video.width, - description: description.buffer?.slice(8), - // optimizeForLatency: true - }) - - decoder = videoDecoder - } else if (isAudioTrack(track)) { - const audioDecoder = new AudioDecoder({ - output: this.renderer.push.bind(this.renderer), - error: console.warn, - }) - - audioDecoder.configure({ - codec: track.codec, - numberOfChannels: track.audio.channel_count, - sampleRate: track.audio.sample_rate, - }) - - decoder = audioDecoder - } else { - throw new Error("unknown track type") - } - - this.decoders.set(track_id, decoder) + decoder = this.videoDecoder(track, samples[0]) + } else if (isAudioTrack(track)) { + decoder = this.audioDecoder(track) + } else { + throw new Error("unknown track type") } for (const sample of samples) { @@ -129,7 +111,9 @@ export default class Decoder { const timestamp = (1000 * 1000 * sample.dts) / sample.timescale const duration = (1000 * 1000 * sample.duration) / sample.timescale - if (isAudioDecoder(decoder)) { + if (!decoder) { + throw new Error("decoder not initialized") + } else if (isAudioDecoder(decoder)) { decoder.decode( new EncodedAudioChunk({ type: sample.is_sync ? "key" : "delta", @@ -152,6 +136,65 @@ export default class Decoder { } } } + + audioDecoder(track: MP4.AudioTrack): AudioDecoder { + // Reuse the audio decoder when possible to avoid glitches. + // TODO detect when the codec changes and make a new decoder. + const decoder = this.decoders.get(track.id) + if (decoder && isAudioDecoder(decoder)) { + return decoder + } + + const audioDecoder = new AudioDecoder({ + output: this.renderer.push.bind(this.renderer), + error: console.error, + }) + + audioDecoder.configure({ + codec: track.codec, + numberOfChannels: track.audio.channel_count, + sampleRate: track.audio.sample_rate, + }) + + this.decoders.set(track.id, audioDecoder) + + return audioDecoder + } + + videoDecoder(track: MP4.VideoTrack, sample: MP4.Sample): VideoDecoder { + // Make a new video decoder for each keyframe. + if (!sample.is_sync) { + const decoder = this.decoders.get(track.id) + if (decoder && isVideoDecoder(decoder)) { + return decoder + } + } + + // Configure the decoder using the AVC box for H.264 + // TODO it should be easy to support other codecs, just need to know the right boxes. + const avcc = sample.description.avcC + if (!avcc) throw new Error("TODO only h264 is supported") + + const description = new MP4.Stream(new Uint8Array(avcc.size), 0, false) + avcc.write(description) + + const videoDecoder = new VideoDecoder({ + output: this.renderer.push.bind(this.renderer), + error: console.error, + }) + + videoDecoder.configure({ + codec: track.codec, + codedHeight: track.video.height, + codedWidth: track.video.width, + description: description.buffer?.slice(8), + // optimizeForLatency: true + }) + + this.decoders.set(track.id, videoDecoder) + + return videoDecoder + } } function isAudioDecoder(decoder: AudioDecoder | VideoDecoder): decoder is AudioDecoder {