Split audio into 1s streams (#19)

This commit is contained in:
kixelated 2023-05-24 12:55:36 -07:00 committed by GitHub
parent 05fccc2ae8
commit fc958e11ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 181 additions and 140 deletions

View File

@ -1,4 +1,4 @@
use std::collections::{HashMap, VecDeque}; use std::collections::VecDeque;
use std::io::Read; use std::io::Read;
use std::{fs, io, time}; use std::{fs, io, time};
@ -17,8 +17,8 @@ pub struct Source {
// The initialization payload; ftyp + moov boxes. // The initialization payload; ftyp + moov boxes.
pub init: Vec<u8>, pub init: Vec<u8>,
// The timescale used for each track. // The parsed moov box.
timescales: HashMap<u32, u32>, moov: mp4::MoovBox,
// Any fragments parsed and ready to be returned by next(). // Any fragments parsed and ready to be returned by next().
fragments: VecDeque<Fragment>, fragments: VecDeque<Fragment>,
@ -34,7 +34,10 @@ pub struct Fragment {
// Whether this fragment is a keyframe. // Whether this fragment is a keyframe.
pub keyframe: bool, pub keyframe: bool,
// The timestamp of the fragment, in milliseconds, to simulate a live stream. // The number of samples that make up a second (ex. ms = 1000)
pub timescale: u64,
// The timestamp of the fragment, in timescale units, to simulate a live stream.
pub timestamp: u64, pub timestamp: u64,
} }
@ -65,7 +68,7 @@ impl Source {
reader, reader,
start, start,
init, init,
timescales: timescales(&moov), moov,
fragments: VecDeque::new(), fragments: VecDeque::new(),
}) })
} }
@ -101,11 +104,20 @@ impl Source {
anyhow::bail!("multiple tracks per moof atom") anyhow::bail!("multiple tracks per moof atom")
} }
let track_id = moof.trafs[0].tfhd.track_id;
let timestamp = sample_timestamp(&moof).expect("couldn't find timestamp");
// Detect if this is a keyframe.
let keyframe = sample_keyframe(&moof);
let timescale = track_timescale(&self.moov, track_id);
self.fragments.push_back(Fragment { self.fragments.push_back(Fragment {
track_id: moof.trafs[0].tfhd.track_id, track_id,
data: atom, data: atom,
keyframe: has_keyframe(&moof), keyframe,
timestamp: first_timestamp(&moof).expect("couldn't find timestamp"), timescale,
timestamp,
}) })
} }
mp4::BoxType::MdatBox => { mp4::BoxType::MdatBox => {
@ -115,6 +127,7 @@ impl Source {
track_id: moof.track_id, track_id: moof.track_id,
data: atom, data: atom,
keyframe: false, keyframe: false,
timescale: moof.timescale,
timestamp: moof.timestamp, timestamp: moof.timestamp,
}); });
@ -131,12 +144,8 @@ impl Source {
// Simulate a live stream by sleeping until the next timestamp in the media. // Simulate a live stream by sleeping until the next timestamp in the media.
pub fn timeout(&self) -> Option<time::Duration> { pub fn timeout(&self) -> Option<time::Duration> {
let next = self.fragments.front()?; let next = self.fragments.front()?;
let timestamp = next.timestamp;
// Find the timescale for the track. let delay = time::Duration::from_millis(1000 * next.timestamp / next.timescale);
let timescale = self.timescales.get(&next.track_id).unwrap();
let delay = time::Duration::from_millis(1000 * timestamp / *timescale as u64);
let elapsed = self.start.elapsed(); let elapsed = self.start.elapsed();
delay.checked_sub(elapsed) delay.checked_sub(elapsed)
@ -182,7 +191,18 @@ pub fn read_atom<R: Read>(reader: &mut R) -> anyhow::Result<Vec<u8>> {
Ok(raw) Ok(raw)
} }
fn has_keyframe(moof: &mp4::MoofBox) -> bool { // Find the timescale for the given track.
fn track_timescale(moov: &mp4::MoovBox, track_id: u32) -> u64 {
let trak = moov
.traks
.iter()
.find(|trak| trak.tkhd.track_id == track_id)
.expect("failed to find trak");
trak.mdia.mdhd.timescale as u64
}
fn sample_keyframe(moof: &mp4::MoofBox) -> bool {
for traf in &moof.trafs { for traf in &moof.trafs {
// TODO trak default flags if this is None // TODO trak default flags if this is None
let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default(); let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default();
@ -214,13 +234,18 @@ fn has_keyframe(moof: &mp4::MoofBox) -> bool {
false false
} }
fn first_timestamp(moof: &mp4::MoofBox) -> Option<u64> { fn sample_timestamp(moof: &mp4::MoofBox) -> Option<u64> {
Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time) Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time)
} }
fn timescales(moov: &mp4::MoovBox) -> HashMap<u32, u32> { /*
moov.traks fn track_type(moov: &mp4::MoovBox, track_id: u32) -> mp4::TrackType {
let trak = moov
.traks
.iter() .iter()
.map(|trak| (trak.tkhd.track_id, trak.mdia.mdhd.timescale)) .find(|trak| trak.tkhd.track_id == track_id)
.collect() .expect("failed to find trak");
mp4::TrackType::try_from(&trak.mdia.hdlr.handler_type).expect("unknown track type")
} }
*/

View File

@ -10,9 +10,25 @@ use crate::{media, transport};
#[derive(Default)] #[derive(Default)]
pub struct Session { pub struct Session {
// The media source, configured on CONNECT.
media: Option<media::Source>, media: Option<media::Source>,
streams: transport::Streams, // An easy way of buffering stream data.
tracks: hmap::HashMap<u32, u64>, // map from track_id to current stream_id // A helper for automatically buffering stream data.
streams: transport::Streams,
// Map from track_id to the the Track state.
tracks: hmap::HashMap<u32, Track>,
}
pub struct Track {
// Current stream_id
stream_id: Option<u64>,
// The timescale used for this track.
timescale: u64,
// The timestamp of the last keyframe.
keyframe: u64,
} }
impl transport::App for Session { impl transport::App for Session {
@ -95,25 +111,27 @@ impl Session {
None => return Ok(()), None => return Ok(()),
}; };
let stream_id = match self.tracks.get(&fragment.track_id) { // Get the track state or insert a new entry.
// Close the old stream. let track = self.tracks.entry(fragment.track_id).or_insert_with(|| Track {
Some(stream_id) if fragment.keyframe => { stream_id: None,
self.streams.send(conn, *stream_id, &[], true)?; timescale: fragment.timescale,
None keyframe: 0,
});
if let Some(stream_id) = track.stream_id {
// Existing stream, check if we should close it.
if fragment.keyframe && fragment.timestamp >= track.keyframe + track.timescale {
// Close the existing stream
self.streams.send(conn, stream_id, &[], true)?;
// Unset the stream id so we create a new one.
track.stream_id = None;
track.keyframe = fragment.timestamp;
} }
}
// Use the existing stream let stream_id = match track.stream_id {
Some(stream_id) => Some(*stream_id),
// No existing stream.
_ => None,
};
let stream_id = match stream_id {
// Use the existing stream,
Some(stream_id) => stream_id, Some(stream_id) => stream_id,
// Open a new stream.
None => { None => {
// Create a new unidirectional stream. // Create a new unidirectional stream.
let stream_id = session.open_stream(conn, false)?; let stream_id = session.open_stream(conn, false)?;
@ -134,9 +152,6 @@ impl Session {
let data = message.serialize()?; let data = message.serialize()?;
self.streams.send(conn, stream_id, &data, false)?; self.streams.send(conn, stream_id, &data, false)?;
// Keep a mapping from the track id to the current stream id.
self.tracks.insert(fragment.track_id, stream_id);
stream_id stream_id
} }
}; };
@ -145,6 +160,9 @@ impl Session {
let data = fragment.data.as_slice(); let data = fragment.data.as_slice();
self.streams.send(conn, stream_id, data, false)?; self.streams.send(conn, stream_id, data, false)?;
// Save the stream_id for the next fragment.
track.stream_id = Some(stream_id);
Ok(()) Ok(())
} }
} }

View File

@ -12,5 +12,3 @@ export {
ISOFile, ISOFile,
Sample, Sample,
} from "mp4box" } from "mp4box"
export { Init, InitParser } from "./init"

View File

@ -1,43 +0,0 @@
import * as MP4 from "./index"
export interface Init {
raw: MP4.ArrayBuffer
info: MP4.Info
}
export class InitParser {
mp4box: MP4.File
offset: number
raw: MP4.ArrayBuffer[]
info: Promise<MP4.Info>
constructor() {
this.mp4box = MP4.New()
this.raw = []
this.offset = 0
// Create a promise that gets resolved once the init segment has been parsed.
this.info = new Promise((resolve, reject) => {
this.mp4box.onError = reject
this.mp4box.onReady = resolve
})
}
push(data: Uint8Array) {
// Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately
const box = new Uint8Array(data.byteLength)
box.set(data)
// and for some reason we need to modify the underlying ArrayBuffer with fileStart
const buffer = box.buffer as MP4.ArrayBuffer
buffer.fileStart = this.offset
// Parse the data
this.offset = this.mp4box.appendBuffer(buffer)
this.mp4box.flush()
// Add the box to our queue of chunks
this.raw.push(buffer)
}
}

View File

@ -3,33 +3,48 @@ import * as MP4 from "../mp4"
import * as Stream from "../stream" import * as Stream from "../stream"
import Renderer from "./renderer" import Renderer from "./renderer"
import { Deferred } from "../util"
export default class Decoder { export default class Decoder {
init: MP4.InitParser
decoders: Map<number, AudioDecoder | VideoDecoder> decoders: Map<number, AudioDecoder | VideoDecoder>
renderer: Renderer renderer: Renderer
init: Deferred<MP4.ArrayBuffer[]>
constructor(renderer: Renderer) { constructor(renderer: Renderer) {
this.init = new MP4.InitParser() this.init = new Deferred()
this.decoders = new Map() this.decoders = new Map()
this.renderer = renderer this.renderer = renderer
} }
async receiveInit(msg: Message.Init) { async receiveInit(msg: Message.Init) {
const init = new Array<MP4.ArrayBuffer>()
let offset = 0
const stream = new Stream.Reader(msg.reader, msg.buffer) const stream = new Stream.Reader(msg.reader, msg.buffer)
for (;;) { for (;;) {
const data = await stream.read() const data = await stream.read()
if (!data) break if (!data) break
this.init.push(data) // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately
const box = new Uint8Array(data.byteLength)
box.set(data)
// and for some reason we need to modify the underlying ArrayBuffer with fileStart
const buffer = box.buffer as MP4.ArrayBuffer
buffer.fileStart = offset
// Add the box to our queue of chunks
init.push(buffer)
offset += data.byteLength
} }
// TODO make sure the init segment is fully received this.init.resolve(init)
} }
async receiveSegment(msg: Message.Segment) { async receiveSegment(msg: Message.Segment) {
// Wait for the init segment to be fully received and parsed // Wait for the init segment to be fully received and parsed
await this.init.info
const input = MP4.New() const input = MP4.New()
input.onSamples = this.onSamples.bind(this) input.onSamples = this.onSamples.bind(this)
@ -42,11 +57,12 @@ export default class Decoder {
input.start() input.start()
} }
// MP4box requires us to reparse the init segment unfortunately // MP4box requires us to parse the init segment for each segment unfortunately
// TODO If this sees production usage, I would recommend caching this somehow.
let offset = 0 let offset = 0
for (const raw of this.init.raw) { const init = await this.init.promise
raw.fileStart = offset for (const raw of init) {
offset = input.appendBuffer(raw) offset = input.appendBuffer(raw)
} }
@ -74,54 +90,20 @@ export default class Decoder {
} }
} }
onSamples(track_id: number, track: MP4.Track, samples: MP4.Sample[]) { onSamples(_track_id: number, track: MP4.Track, samples: MP4.Sample[]) {
let decoder = this.decoders.get(track_id) if (!track.track_width) {
// TODO ignoring audio to debug
return
}
if (!decoder) { let decoder
if (isVideoTrack(track)) {
// We need a sample to initalize the video decoder, because of mp4box limitations. // We need a sample to initalize the video decoder, because of mp4box limitations.
const sample = samples[0] decoder = this.videoDecoder(track, samples[0])
} else if (isAudioTrack(track)) {
if (isVideoTrack(track)) { decoder = this.audioDecoder(track)
// Configure the decoder using the AVC box for H.264 } else {
// TODO it should be easy to support other codecs, just need to know the right boxes. throw new Error("unknown track type")
const avcc = sample.description.avcC
if (!avcc) throw new Error("TODO only h264 is supported")
const description = new MP4.Stream(new Uint8Array(avcc.size), 0, false)
avcc.write(description)
const videoDecoder = new VideoDecoder({
output: this.renderer.push.bind(this.renderer),
error: console.warn,
})
videoDecoder.configure({
codec: track.codec,
codedHeight: track.video.height,
codedWidth: track.video.width,
description: description.buffer?.slice(8),
// optimizeForLatency: true
})
decoder = videoDecoder
} else if (isAudioTrack(track)) {
const audioDecoder = new AudioDecoder({
output: this.renderer.push.bind(this.renderer),
error: console.warn,
})
audioDecoder.configure({
codec: track.codec,
numberOfChannels: track.audio.channel_count,
sampleRate: track.audio.sample_rate,
})
decoder = audioDecoder
} else {
throw new Error("unknown track type")
}
this.decoders.set(track_id, decoder)
} }
for (const sample of samples) { for (const sample of samples) {
@ -129,7 +111,9 @@ export default class Decoder {
const timestamp = (1000 * 1000 * sample.dts) / sample.timescale const timestamp = (1000 * 1000 * sample.dts) / sample.timescale
const duration = (1000 * 1000 * sample.duration) / sample.timescale const duration = (1000 * 1000 * sample.duration) / sample.timescale
if (isAudioDecoder(decoder)) { if (!decoder) {
throw new Error("decoder not initialized")
} else if (isAudioDecoder(decoder)) {
decoder.decode( decoder.decode(
new EncodedAudioChunk({ new EncodedAudioChunk({
type: sample.is_sync ? "key" : "delta", type: sample.is_sync ? "key" : "delta",
@ -152,6 +136,65 @@ export default class Decoder {
} }
} }
} }
audioDecoder(track: MP4.AudioTrack): AudioDecoder {
// Reuse the audio decoder when possible to avoid glitches.
// TODO detect when the codec changes and make a new decoder.
const decoder = this.decoders.get(track.id)
if (decoder && isAudioDecoder(decoder)) {
return decoder
}
const audioDecoder = new AudioDecoder({
output: this.renderer.push.bind(this.renderer),
error: console.error,
})
audioDecoder.configure({
codec: track.codec,
numberOfChannels: track.audio.channel_count,
sampleRate: track.audio.sample_rate,
})
this.decoders.set(track.id, audioDecoder)
return audioDecoder
}
videoDecoder(track: MP4.VideoTrack, sample: MP4.Sample): VideoDecoder {
// Make a new video decoder for each keyframe.
if (!sample.is_sync) {
const decoder = this.decoders.get(track.id)
if (decoder && isVideoDecoder(decoder)) {
return decoder
}
}
// Configure the decoder using the AVC box for H.264
// TODO it should be easy to support other codecs, just need to know the right boxes.
const avcc = sample.description.avcC
if (!avcc) throw new Error("TODO only h264 is supported")
const description = new MP4.Stream(new Uint8Array(avcc.size), 0, false)
avcc.write(description)
const videoDecoder = new VideoDecoder({
output: this.renderer.push.bind(this.renderer),
error: console.error,
})
videoDecoder.configure({
codec: track.codec,
codedHeight: track.video.height,
codedWidth: track.video.width,
description: description.buffer?.slice(8),
// optimizeForLatency: true
})
this.decoders.set(track.id, videoDecoder)
return videoDecoder
}
} }
function isAudioDecoder(decoder: AudioDecoder | VideoDecoder): decoder is AudioDecoder { function isAudioDecoder(decoder: AudioDecoder | VideoDecoder): decoder is AudioDecoder {