Send INIT as a single message.

Much simpler on both the client and server side.
This commit is contained in:
Luke Curley 2023-05-04 19:43:43 -07:00
parent e9663accc6
commit d7237c4926
28 changed files with 546 additions and 782 deletions

View File

@ -1,121 +0,0 @@
import * as Message from "./message";
import * as MP4 from "../mp4"
import * as Stream from "../stream"
import * as Util from "../util"
import Renderer from "./renderer"
export default class Decoder {
// Store the init message for each track
tracks: Map<string, Util.Deferred<Message.Init>>;
decoder: AudioDecoder; // TODO one per track
sync: Message.Sync;
constructor(config: Message.Config, renderer: Renderer) {
this.tracks = new Map();
this.decoder = new AudioDecoder({
output: renderer.emit.bind(renderer),
error: console.warn,
});
}
init(msg: Message.Init) {
let defer = this.tracks.get(msg.track);
if (!defer) {
defer = new Util.Deferred()
this.tracks.set(msg.track, defer)
}
if (msg.info.audioTracks.length != 1 || msg.info.videoTracks.length != 0) {
throw new Error("Expected a single audio track")
}
const track = msg.info.audioTracks[0]
const audio = track.audio
defer.resolve(msg)
}
async decode(msg: Message.Segment) {
let track = this.tracks.get(msg.track);
if (!track) {
track = new Util.Deferred()
this.tracks.set(msg.track, track)
}
// Wait for the init segment to be fully received and parsed
const init = await track.promise;
const audio = init.info.audioTracks[0]
if (this.decoder.state == "unconfigured") {
this.decoder.configure({
codec: audio.codec,
numberOfChannels: audio.audio.channel_count,
sampleRate: audio.audio.sample_rate,
})
}
const input = MP4.New();
input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => {
for (let sample of samples) {
// Convert to microseconds
const timestamp = 1000 * 1000 * sample.dts / sample.timescale
const duration = 1000 * 1000 * sample.duration / sample.timescale
// This assumes that timescale == sample rate
this.decoder.decode(new EncodedAudioChunk({
type: sample.is_sync ? "key" : "delta",
data: sample.data,
duration: duration,
timestamp: timestamp,
}))
}
}
input.onReady = (info: any) => {
input.setExtractionOptions(info.tracks[0].id, {}, { nbSamples: 1 });
input.start();
}
// MP4box requires us to reparse the init segment unfortunately
let offset = 0;
for (let raw of init.raw) {
raw.fileStart = offset
input.appendBuffer(raw)
}
const stream = new Stream.Reader(msg.reader, msg.buffer)
/* TODO I'm not actually sure why this code doesn't work; something trips up the MP4 parser
while (1) {
const data = await stream.read()
if (!data) break
input.appendBuffer(data)
input.flush()
}
*/
// One day I'll figure it out; until then read one top-level atom at a time
while (!await stream.done()) {
const raw = await stream.peek(4)
const size = new DataView(raw.buffer, raw.byteOffset, raw.byteLength).getUint32(0)
const atom = await stream.bytes(size)
// Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately
let box = new Uint8Array(atom.byteLength);
box.set(atom)
// and for some reason we need to modify the underlying ArrayBuffer with offset
let buffer = box.buffer as MP4.ArrayBuffer
buffer.fileStart = offset
// Parse the data
offset = input.appendBuffer(buffer)
input.flush()
}
}
}

View File

@ -1,8 +1,8 @@
import * as Message from "./message" import * as Message from "./message"
import Renderer from "./renderer" import Renderer from "../media/audio"
import Decoder from "./decoder" import Decoder from "./decoder"
import { RingInit } from "./ring" import { RingInit } from "../media/ring"
// Abstracts the Worker and Worklet into a simpler API // Abstracts the Worker and Worklet into a simpler API
// This class must be created on the main thread due to AudioContext. // This class must be created on the main thread due to AudioContext.

View File

@ -1,85 +0,0 @@
import * as Message from "./message"
import { Ring } from "./ring"
export default class Renderer {
ring: Ring;
queue: Array<AudioData>;
sync?: DOMHighResTimeStamp
running: number;
constructor(config: Message.Config) {
this.ring = new Ring(config.ring)
this.queue = [];
this.running = 0
}
emit(frame: AudioData) {
if (!this.sync) {
// Save the frame as the sync point
this.sync = 1000 * performance.now() - frame.timestamp
}
// Insert the frame into the queue sorted by timestamp.
if (this.queue.length > 0 && this.queue[this.queue.length-1].timestamp <= frame.timestamp) {
// Fast path because we normally append to the end.
this.queue.push(frame)
} else {
// Do a full binary search
let low = 0
let high = this.queue.length;
while (low < high) {
var mid = (low + high) >>> 1;
if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1;
else high = mid;
}
this.queue.splice(low, 0, frame)
}
if (!this.running) {
// Wait for the next animation frame
this.running = self.requestAnimationFrame(this.render.bind(this))
}
}
render() {
// Determine the target timestamp.
const target = 1000 * performance.now() - this.sync!
// Check if we should skip some frames
while (this.queue.length) {
const next = this.queue[0]
if (next.timestamp >= target) {
break
}
console.warn("dropping audio")
this.queue.shift()
next.close()
}
// Push as many as we can to the ring buffer.
while (this.queue.length) {
let frame = this.queue[0]
let ok = this.ring.write(frame)
if (!ok) {
break
}
frame.close()
this.queue.shift()
}
if (this.queue.length) {
this.running = self.requestAnimationFrame(this.render.bind(this))
} else {
this.running = 0
}
}
play(play: Message.Play) {
this.ring.reset()
}
}

View File

@ -1,26 +0,0 @@
import Decoder from "./decoder"
import Renderer from "./renderer"
import * as Message from "./message"
let decoder: Decoder
let renderer: Renderer;
self.addEventListener('message', (e: MessageEvent) => {
if (e.data.config) {
renderer = new Renderer(e.data.config)
decoder = new Decoder(e.data.config, renderer)
}
if (e.data.init) {
decoder.init(e.data.init)
}
if (e.data.segment) {
decoder.decode(e.data.segment)
}
if (e.data.play) {
renderer.play(e.data.play)
}
})

160
player/src/media/decoder.ts Normal file
View File

@ -0,0 +1,160 @@
import * as Message from "./message";
import * as MP4 from "../mp4"
import * as Stream from "../stream"
import * as Util from "../util"
import Renderer from "./renderer"
export default class Decoder {
init: MP4.InitParser;
decoders: Map<number, AudioDecoder | VideoDecoder>;
renderer: Renderer;
constructor(renderer: Renderer) {
this.init = new MP4.InitParser();
this.decoders = new Map();
this.renderer = renderer;
}
async receiveInit(msg: Message.Init) {
let stream = new Stream.Reader(msg.reader, msg.buffer);
while (1) {
const data = await stream.read()
if (!data) break
this.init.push(data)
}
// TODO make sure the init segment is fully received
}
async receiveSegment(msg: Message.Segment) {
// Wait for the init segment to be fully received and parsed
const info = await this.init.info
const input = MP4.New();
input.onSamples = this.onSamples.bind(this);
input.onReady = (info: any) => {
// Extract all of the tracks, because we don't know if it's audio or video.
for (let track of info.tracks) {
input.setExtractionOptions(track.id, track, { nbSamples: 1 });
}
input.start();
}
// MP4box requires us to reparse the init segment unfortunately
let offset = 0;
for (let raw of this.init.raw) {
raw.fileStart = offset
offset = input.appendBuffer(raw)
}
const stream = new Stream.Reader(msg.reader, msg.buffer)
// For whatever reason, mp4box doesn't work until you read an atom at a time.
while (!await stream.done()) {
const raw = await stream.peek(4)
// TODO this doesn't support when size = 0 (until EOF) or size = 1 (extended size)
const size = new DataView(raw.buffer, raw.byteOffset, raw.byteLength).getUint32(0)
const atom = await stream.bytes(size)
// Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately
let box = new Uint8Array(atom.byteLength);
box.set(atom)
// and for some reason we need to modify the underlying ArrayBuffer with offset
let buffer = box.buffer as MP4.ArrayBuffer
buffer.fileStart = offset
// Parse the data
offset = input.appendBuffer(buffer)
input.flush()
}
}
onSamples(track_id: number, track: MP4.Track, samples: MP4.Sample[]) {
let decoder = this.decoders.get(track_id);
if (!decoder) {
// We need a sample to initalize the video decoder, because of mp4box limitations.
let sample = samples[0];
if (MP4.isVideoTrack(track)) {
// Configure the decoder using the AVC box for H.264
// TODO it should be easy to support other codecs, just need to know the right boxes.
const avcc = sample.description.avcC;
if (!avcc) throw new Error("TODO only h264 is supported");
const description = new MP4.Stream(new Uint8Array(avcc.size), 0, false)
avcc.write(description)
const videoDecoder = new VideoDecoder({
output: this.renderer.push.bind(this.renderer),
error: console.warn,
});
videoDecoder.configure({
codec: track.codec,
codedHeight: track.video.height,
codedWidth: track.video.width,
description: description.buffer?.slice(8),
// optimizeForLatency: true
})
decoder = videoDecoder
} else if (MP4.isAudioTrack(track)) {
const audioDecoder = new AudioDecoder({
output: this.renderer.push.bind(this.renderer),
error: console.warn,
});
audioDecoder.configure({
codec: track.codec,
numberOfChannels: track.audio.channel_count,
sampleRate: track.audio.sample_rate,
})
decoder = audioDecoder
} else {
throw new Error("unknown track type")
}
this.decoders.set(track_id, decoder)
}
for (let sample of samples) {
// Convert to microseconds
const timestamp = 1000 * 1000 * sample.dts / sample.timescale
const duration = 1000 * 1000 * sample.duration / sample.timescale
if (isAudioDecoder(decoder)) {
decoder.decode(new EncodedAudioChunk({
type: sample.is_sync ? "key" : "delta",
data: sample.data,
duration: duration,
timestamp: timestamp,
}))
} else if (isVideoDecoder(decoder)) {
decoder.decode(new EncodedVideoChunk({
type: sample.is_sync ? "key" : "delta",
data: sample.data,
duration: duration,
timestamp: timestamp,
}))
} else {
throw new Error("unknown decoder type")
}
}
}
}
function isAudioDecoder(decoder: AudioDecoder | VideoDecoder): decoder is AudioDecoder {
return decoder instanceof AudioDecoder
}
function isVideoDecoder(decoder: AudioDecoder | VideoDecoder): decoder is VideoDecoder {
return decoder instanceof VideoDecoder
}

82
player/src/media/index.ts Normal file
View File

@ -0,0 +1,82 @@
import * as Message from "./message"
import { RingInit } from "./ring"
// Abstracts the Worker and Worklet into a simpler API
// This class must be created on the main thread due to AudioContext.
export default class Media {
context: AudioContext;
worker: Worker;
worklet: Promise<AudioWorkletNode>;
constructor(videoConfig: Message.VideoConfig) {
// Assume 44.1kHz and two audio channels
const audioConfig = {
sampleRate: 44100,
ring: new RingInit(2, 4410), // 100ms at 44.1khz
}
const config = {
audio: audioConfig,
video: videoConfig,
}
this.context = new AudioContext({
latencyHint: "interactive",
sampleRate: config.audio.sampleRate,
})
this.worker = this.setupWorker(config)
this.worklet = this.setupWorklet(config)
}
init(init: Message.Init) {
this.worker.postMessage({ init }, [ init.buffer.buffer, init.reader ])
}
segment(segment: Message.Segment) {
this.worker.postMessage({ segment }, [ segment.buffer.buffer, segment.reader ])
}
play(play: Message.Play) {
this.context.resume()
//this.worker.postMessage({ play })
}
private setupWorker(config: Message.Config): Worker {
const url = new URL('worker.ts', import.meta.url)
const worker = new Worker(url, {
type: "module",
name: "media",
})
worker.postMessage({ config }, [ config.video.canvas ])
return worker
}
private async setupWorklet(config: Message.Config): Promise<AudioWorkletNode> {
// Load the worklet source code.
const url = new URL('worklet.ts', import.meta.url)
await this.context.audioWorklet.addModule(url)
const volume = this.context.createGain()
volume.gain.value = 2.0;
// Create a worklet
const worklet = new AudioWorkletNode(this.context, 'renderer');
worklet.onprocessorerror = (e: Event) => {
console.error("Audio worklet error:", e)
};
worklet.port.postMessage({ config })
// Connect the worklet to the volume node and then to the speakers
worklet.connect(volume)
volume.connect(this.context.destination)
return worklet
}
}

View File

@ -1,28 +1,29 @@
import * as MP4 from "../mp4" import * as MP4 from "../mp4"
import { RingInit } from "./ring" import { RingInit } from "../media/ring"
export interface Config { export interface Config {
audio: AudioConfig;
video: VideoConfig;
}
export interface VideoConfig {
canvas: OffscreenCanvas;
}
export interface AudioConfig {
// audio stuff
sampleRate: number; sampleRate: number;
ring: RingInit; ring: RingInit;
} }
export interface Init { export interface Init {
track: string;
info: MP4.Info;
raw: MP4.ArrayBuffer[];
}
export interface Segment {
track: string;
buffer: Uint8Array; // unread buffered data buffer: Uint8Array; // unread buffered data
reader: ReadableStream; // unread unbuffered data reader: ReadableStream; // unread unbuffered data
} }
// Audio tells video when the given timestamp should be rendered. export interface Segment {
export interface Sync { buffer: Uint8Array; // unread buffered data
origin: number; reader: ReadableStream; // unread unbuffered data
clock: DOMHighResTimeStamp;
timestamp: number;
} }
export interface Play { export interface Play {

View File

@ -0,0 +1,138 @@
import * as Message from "./message";
import { Ring } from "./ring"
export default class Renderer {
audioRing: Ring;
audioQueue: Array<AudioData>;
videoCanvas: OffscreenCanvas;
videoQueue: Array<VideoFrame>;
render: number; // non-zero if requestAnimationFrame has been called
sync?: DOMHighResTimeStamp; // the wall clock value for timestamp 0, in microseconds
last?: number; // the timestamp of the last rendered frame, in microseconds
constructor(config: Message.Config) {
this.audioRing = new Ring(config.audio.ring);
this.audioQueue = [];
this.videoCanvas = config.video.canvas;
this.videoQueue = [];
this.render = 0;
}
push(frame: AudioData | VideoFrame) {
if (!this.sync) {
// Save the frame as the sync point
this.sync = 1000 * performance.now() - frame.timestamp
}
// Drop any old frames
if (this.last && frame.timestamp <= this.last) {
frame.close()
return
}
let queue
if (isAudioData(frame)) {
queue = this.audioQueue;
} else if (isVideoFrame(frame)) {
queue = this.videoQueue;
} else {
throw new Error("unknown frame type")
}
// Insert the frame into the queue sorted by timestamp.
if (queue.length > 0 && queue[queue.length-1].timestamp <= frame.timestamp) {
// Fast path because we normally append to the end.
queue.push(frame as any)
} else {
// Do a full binary search
let low = 0
let high = queue.length;
while (low < high) {
var mid = (low + high) >>> 1;
if (queue[mid].timestamp < frame.timestamp) low = mid + 1;
else high = mid;
}
queue.splice(low, 0, frame as any)
}
// Queue up to render the next frame.
if (!this.render) {
this.render = self.requestAnimationFrame(this.draw.bind(this))
}
}
draw(now: DOMHighResTimeStamp) {
// Determine the target timestamp.
const target = 1000 * now - this.sync!
this.drawAudio(now, target)
this.drawVideo(now, target)
if (this.audioQueue.length || this.videoQueue.length) {
this.render = self.requestAnimationFrame(this.draw.bind(this))
} else {
this.render = 0
}
}
drawAudio(now: DOMHighResTimeStamp, target: DOMHighResTimeStamp) {
// Check if we should skip some frames
while (this.audioQueue.length) {
const next = this.audioQueue[0]
if (next.timestamp >= target) {
let ok = this.audioRing.write(next)
if (!ok) {
// No more space in the ring
break
}
} else {
console.warn("dropping audio")
}
next.close()
this.audioQueue.shift()
}
}
drawVideo(now: DOMHighResTimeStamp, target: DOMHighResTimeStamp) {
if (this.videoQueue.length == 0) return;
let frame = this.videoQueue[0];
if (frame.timestamp >= target) {
// nothing to render yet, wait for the next animation frame
this.render = self.requestAnimationFrame(this.draw.bind(this))
return
}
this.videoQueue.shift();
// Check if we should skip some frames
while (this.videoQueue.length) {
const next = this.videoQueue[0]
if (next.timestamp > target) break
frame.close()
frame = this.videoQueue.shift()!;
}
const ctx = this.videoCanvas.getContext("2d");
ctx!.drawImage(frame, 0, 0, this.videoCanvas.width, this.videoCanvas.height) // TODO aspect ratio
this.last = frame.timestamp;
frame.close()
}
}
function isAudioData(frame: AudioData | VideoFrame): frame is AudioData {
return frame instanceof AudioData
}
function isVideoFrame(frame: AudioData | VideoFrame): frame is VideoFrame {
return frame instanceof VideoFrame
}

View File

@ -13,10 +13,10 @@ self.addEventListener('message', async (e: MessageEvent) => {
decoder = new Decoder(renderer) decoder = new Decoder(renderer)
} else if (e.data.init) { } else if (e.data.init) {
const init = e.data.init as Message.Init const init = e.data.init as Message.Init
await decoder.init(init) await decoder.receiveInit(init)
} else if (e.data.segment) { } else if (e.data.segment) {
const segment = e.data.segment as Message.Segment const segment = e.data.segment as Message.Segment
await decoder.decode(segment) await decoder.receiveSegment(segment)
} }
}) })

View File

@ -25,7 +25,7 @@ class Renderer extends AudioWorkletProcessor {
} }
config(config: Message.Config) { config(config: Message.Config) {
this.ring = new Ring(config.ring) this.ring = new Ring(config.audio.ring)
} }
// Inputs and outputs in groups of 128 samples. // Inputs and outputs in groups of 128 samples.

View File

@ -1,11 +1,12 @@
// Rename some stuff so it's on brand. import * as MP4 from "./rename"
export { export * from "./rename"
createFile as New,
MP4File as File,
MP4ArrayBuffer as ArrayBuffer,
MP4Info as Info,
DataStream as Stream,
Sample,
} from "mp4box"
export { Init, InitParser } from "./init" export { Init, InitParser } from "./init"
export function isAudioTrack(track: MP4.Track): track is MP4.AudioTrack {
return (track as MP4.AudioTrack).audio !== undefined;
}
export function isVideoTrack(track: MP4.Track): track is MP4.VideoTrack {
return (track as MP4.VideoTrack).video !== undefined;
}

View File

@ -20,19 +20,7 @@ export class InitParser {
// Create a promise that gets resolved once the init segment has been parsed. // Create a promise that gets resolved once the init segment has been parsed.
this.info = new Promise((resolve, reject) => { this.info = new Promise((resolve, reject) => {
this.mp4box.onError = reject this.mp4box.onError = reject
this.mp4box.onReady = resolve
// https://github.com/gpac/mp4box.js#onreadyinfo
this.mp4box.onReady = (info: MP4.Info) => {
if (!info.isFragmented) {
reject("expected a fragmented mp4")
}
if (info.tracks.length != 1) {
reject("expected a single track")
}
resolve(info)
}
}) })
} }

View File

@ -1,7 +1,7 @@
// https://github.com/gpac/mp4box.js/issues/233 // https://github.com/gpac/mp4box.js/issues/233
declare module "mp4box" { declare module "mp4box" {
interface MP4MediaTrack { export interface MP4MediaTrack {
id: number; id: number;
created: Date; created: Date;
modified: Date; modified: Date;
@ -19,26 +19,26 @@ declare module "mp4box" {
nb_samples: number; nb_samples: number;
} }
interface MP4VideoData { export interface MP4VideoData {
width: number; width: number;
height: number; height: number;
} }
interface MP4VideoTrack extends MP4MediaTrack { export interface MP4VideoTrack extends MP4MediaTrack {
video: MP4VideoData; video: MP4VideoData;
} }
interface MP4AudioData { export interface MP4AudioData {
sample_rate: number; sample_rate: number;
channel_count: number; channel_count: number;
sample_size: number; sample_size: number;
} }
interface MP4AudioTrack extends MP4MediaTrack { export interface MP4AudioTrack extends MP4MediaTrack {
audio: MP4AudioData; audio: MP4AudioData;
} }
type MP4Track = MP4VideoTrack | MP4AudioTrack; export type MP4Track = MP4VideoTrack | MP4AudioTrack;
export interface MP4Info { export interface MP4Info {
duration: number; duration: number;

12
player/src/mp4/rename.ts Normal file
View File

@ -0,0 +1,12 @@
// Rename some stuff so it's on brand.
export {
createFile as New,
MP4File as File,
MP4ArrayBuffer as ArrayBuffer,
MP4Info as Info,
MP4Track as Track,
MP4AudioTrack as AudioTrack,
MP4VideoTrack as VideoTrack,
DataStream as Stream,
Sample,
} from "mp4box"

View File

@ -1,6 +1,5 @@
import Audio from "../audio"
import Transport from "../transport" import Transport from "../transport"
import Video from "../video" import Media from "../media"
export interface PlayerInit { export interface PlayerInit {
url: string; url: string;
@ -9,22 +8,18 @@ export interface PlayerInit {
} }
export default class Player { export default class Player {
audio: Audio; media: Media;
video: Video;
transport: Transport; transport: Transport;
constructor(props: PlayerInit) { constructor(props: PlayerInit) {
this.audio = new Audio() this.media = new Media({
this.video = new Video({
canvas: props.canvas.transferControlToOffscreen(), canvas: props.canvas.transferControlToOffscreen(),
}) })
this.transport = new Transport({ this.transport = new Transport({
url: props.url, url: props.url,
fingerprint: props.fingerprint, fingerprint: props.fingerprint,
media: this.media,
audio: this.audio,
video: this.video,
}) })
} }
@ -33,13 +28,6 @@ export default class Player {
} }
play() { play() {
this.audio.play({}) //this.media.play()
//this.video.play()
} }
onMessage(msg: any) {
if (msg.sync) {
msg.sync
}
}
} }

View File

@ -2,30 +2,22 @@ import * as Message from "./message"
import * as Stream from "../stream" import * as Stream from "../stream"
import * as MP4 from "../mp4" import * as MP4 from "../mp4"
import Audio from "../audio" import Media from "../media"
import Video from "../video"
export interface TransportInit { export interface TransportInit {
url: string; url: string;
fingerprint?: WebTransportHash; // the certificate fingerprint, temporarily needed for local development fingerprint?: WebTransportHash; // the certificate fingerprint, temporarily needed for local development
media: Media;
audio: Audio;
video: Video;
} }
export default class Transport { export default class Transport {
quic: Promise<WebTransport>; quic: Promise<WebTransport>;
api: Promise<WritableStream>; api: Promise<WritableStream>;
tracks: Map<string, MP4.InitParser>
audio: Audio; media: Media;
video: Video;
constructor(props: TransportInit) { constructor(props: TransportInit) {
this.tracks = new Map(); this.media = props.media;
this.audio = props.audio;
this.video = props.video;
this.quic = this.connect(props) this.quic = this.connect(props)
@ -94,82 +86,18 @@ export default class Transport {
const msg = JSON.parse(payload) const msg = JSON.parse(payload)
if (msg.init) { if (msg.init) {
return this.handleInit(r, msg.init as Message.Init) return this.media.init({
buffer: r.buffer,
reader: r.reader,
})
} else if (msg.segment) { } else if (msg.segment) {
return this.handleSegment(r, msg.segment as Message.Segment) return this.media.segment({
buffer: r.buffer,
reader: r.reader,
})
} else { } else {
console.warn("unknown message", msg); console.warn("unknown message", msg);
} }
} }
} }
async handleInit(stream: Stream.Reader, msg: Message.Init) {
console.log("handle init", msg);
let track = this.tracks.get(msg.id);
if (!track) {
track = new MP4.InitParser()
this.tracks.set(msg.id, track)
}
while (1) {
const data = await stream.read()
if (!data) break
track.push(data)
}
const info = await track.info
console.log(info);
if (info.audioTracks.length + info.videoTracks.length != 1) {
throw new Error("expected a single track")
}
if (info.audioTracks.length) {
this.audio.init({
track: msg.id,
info: info,
raw: track.raw,
})
} else if (info.videoTracks.length) {
this.video.init({
track: msg.id,
info: info,
raw: track.raw,
})
} else {
throw new Error("init is neither audio nor video")
}
}
async handleSegment(stream: Stream.Reader, msg: Message.Segment) {
console.log("handle segment", msg);
let track = this.tracks.get(msg.init);
if (!track) {
track = new MP4.InitParser()
this.tracks.set(msg.init, track)
}
// Wait until we learn if this is an audio or video track
const info = await track.info
if (info.audioTracks.length) {
this.audio.segment({
track: msg.init,
buffer: stream.buffer,
reader: stream.reader,
})
} else if (info.videoTracks.length) {
this.video.segment({
track: msg.init,
buffer: stream.buffer,
reader: stream.reader,
})
} else {
throw new Error("segment is neither audio nor video")
}
}
} }

View File

@ -1,127 +0,0 @@
import * as Message from "./message";
import * as MP4 from "../mp4"
import * as Stream from "../stream"
import * as Util from "../util"
import Renderer from "./renderer"
export default class Decoder {
// Store the init message for each track
tracks: Map<string, Util.Deferred<Message.Init>>
renderer: Renderer;
constructor(renderer: Renderer) {
this.tracks = new Map();
this.renderer = renderer;
}
async init(msg: Message.Init) {
let track = this.tracks.get(msg.track);
if (!track) {
track = new Util.Deferred()
this.tracks.set(msg.track, track)
}
if (msg.info.videoTracks.length != 1 || msg.info.audioTracks.length != 0) {
throw new Error("Expected a single video track")
}
track.resolve(msg)
}
async decode(msg: Message.Segment) {
let track = this.tracks.get(msg.track);
if (!track) {
track = new Util.Deferred()
this.tracks.set(msg.track, track)
}
// Wait for the init segment to be fully received and parsed
const init = await track.promise;
const info = init.info;
const video = info.videoTracks[0]
const decoder = new VideoDecoder({
output: (frame: VideoFrame) => {
this.renderer.emit(frame)
},
error: (err: Error) => {
console.warn(err)
}
});
const input = MP4.New();
input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => {
for (let sample of samples) {
const timestamp = 1000 * sample.dts / sample.timescale // milliseconds
if (sample.is_sync) {
// Configure the decoder using the AVC box for H.264
const avcc = sample.description.avcC;
const description = new MP4.Stream(new Uint8Array(avcc.size), 0, false)
avcc.write(description)
decoder.configure({
codec: video.codec,
codedHeight: video.track_height,
codedWidth: video.track_width,
description: description.buffer?.slice(8),
// optimizeForLatency: true
})
}
decoder.decode(new EncodedVideoChunk({
data: sample.data,
duration: sample.duration,
timestamp: timestamp,
type: sample.is_sync ? "key" : "delta",
}))
}
}
input.onReady = (info: any) => {
input.setExtractionOptions(info.tracks[0].id, {}, { nbSamples: 1 });
input.start();
}
// MP4box requires us to reparse the init segment unfortunately
let offset = 0;
for (let raw of init.raw) {
raw.fileStart = offset
offset = input.appendBuffer(raw)
}
const stream = new Stream.Reader(msg.reader, msg.buffer)
/* TODO I'm not actually sure why this code doesn't work; something trips up the MP4 parser
while (1) {
const data = await stream.read()
if (!data) break
input.appendBuffer(data)
input.flush()
}
*/
// One day I'll figure it out; until then read one top-level atom at a time
while (!await stream.done()) {
const raw = await stream.peek(4)
const size = new DataView(raw.buffer, raw.byteOffset, raw.byteLength).getUint32(0)
const atom = await stream.bytes(size)
// Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately
let box = new Uint8Array(atom.byteLength);
box.set(atom)
// and for some reason we need to modify the underlying ArrayBuffer with offset
let buffer = box.buffer as MP4.ArrayBuffer
buffer.fileStart = offset
// Parse the data
offset = input.appendBuffer(buffer)
input.flush()
}
}
}

View File

@ -1,27 +0,0 @@
import * as Message from "./message"
// Wrapper around the WebWorker API
export default class Video {
worker: Worker;
constructor(config: Message.Config) {
const url = new URL('worker.ts', import.meta.url)
this.worker = new Worker(url, {
type: "module",
name: "video",
})
this.worker.postMessage({ config }, [ config.canvas ])
}
init(init: Message.Init) {
this.worker.postMessage({ init }) // note: we copy the raw init bytes each time
}
segment(segment: Message.Segment) {
this.worker.postMessage({ segment }, [ segment.buffer.buffer, segment.reader ])
}
play() {
// TODO
}
}

View File

@ -1,17 +0,0 @@
import * as MP4 from "../mp4"
export interface Config {
canvas: OffscreenCanvas;
}
export interface Init {
track: string;
info: MP4.Info;
raw: MP4.ArrayBuffer[];
}
export interface Segment {
track: string;
buffer: Uint8Array; // unread buffered data
reader: ReadableStream; // unread unbuffered data
}

View File

@ -1,91 +0,0 @@
import * as Message from "./message";
export default class Renderer {
canvas: OffscreenCanvas;
queue: Array<VideoFrame>;
render: number; // non-zero if requestAnimationFrame has been called
sync?: DOMHighResTimeStamp; // the wall clock value for timestamp 0
last?: number; // the timestamp of the last rendered frame
constructor(config: Message.Config) {
this.canvas = config.canvas;
this.queue = [];
this.render = 0;
}
emit(frame: VideoFrame) {
if (!this.sync) {
// Save the frame as the sync point
this.sync = performance.now() - frame.timestamp
}
// Drop any old frames
if (this.last && frame.timestamp <= this.last) {
frame.close()
return
}
// Insert the frame into the queue sorted by timestamp.
if (this.queue.length > 0 && this.queue[this.queue.length-1].timestamp <= frame.timestamp) {
// Fast path because we normally append to the end.
this.queue.push(frame)
} else {
// Do a full binary search
let low = 0
let high = this.queue.length;
while (low < high) {
var mid = (low + high) >>> 1;
if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1;
else high = mid;
}
this.queue.splice(low, 0, frame)
}
// Queue up to render the next frame.
if (!this.render) {
this.render = self.requestAnimationFrame(this.draw.bind(this))
}
}
draw(now: DOMHighResTimeStamp) {
// Determine the target timestamp.
const target = now - this.sync!
let frame = this.queue[0]
if (frame.timestamp >= target) {
// nothing to render yet, wait for the next animation frame
this.render = self.requestAnimationFrame(this.draw.bind(this))
return
}
this.queue.shift()
// Check if we should skip some frames
while (this.queue.length) {
const next = this.queue[0]
if (next.timestamp > target) {
break
}
frame.close()
this.queue.shift()
frame = next
}
const ctx = this.canvas.getContext("2d");
ctx!.drawImage(frame, 0, 0, this.canvas.width, this.canvas.height) // TODO aspect ratio
this.last = frame.timestamp;
frame.close()
if (this.queue.length > 0) {
this.render = self.requestAnimationFrame(this.draw.bind(this))
} else {
// Break the loop for now
this.render = 0
}
}
}

1
server/Cargo.lock generated
View File

@ -329,7 +329,6 @@ dependencies = [
[[package]] [[package]]
name = "mp4" name = "mp4"
version = "0.13.0" version = "0.13.0"
source = "git+https://github.com/kixelated/mp4-rust.git?branch=trexs#efefcc47353f477518bff01493785ae0daa8efd4"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"bytes", "bytes",

View File

@ -13,6 +13,6 @@ mio = { version = "0.8", features = ["net", "os-poll"] }
env_logger = "0.9.3" env_logger = "0.9.3"
ring = "0.16" ring = "0.16"
anyhow = "1.0.70" anyhow = "1.0.70"
mp4 = { git = "https://github.com/kixelated/mp4-rust.git", branch = "trexs" } mp4 = { path = "../../mp4-rust" } # { git = "https://github.com/kixelated/mp4-rust.git", branch = "trexs" }
serde = "1.0.160" serde = "1.0.160"
serde_json = "1.0" serde_json = "1.0"

View File

@ -1,3 +1,3 @@
mod source; mod source;
pub use source::{Fragment, Source}; pub use source::{Fragment, Source};

View File

@ -1,37 +1,32 @@
use io::Read;
use std::collections::VecDeque;
use std::{fs, io, time}; use std::{fs, io, time};
use std::collections::{HashMap,VecDeque};
use std::io::Write; use std::io::Read;
use anyhow; use anyhow;
use mp4;
use mp4::{ReadBox, WriteBox}; use mp4;
use mp4::ReadBox;
pub struct Source { pub struct Source {
// We read the file once, in order, and don't seek backwards. // We read the file once, in order, and don't seek backwards.
reader: io::BufReader<fs::File>, reader: io::BufReader<fs::File>,
// Any fragments parsed and ready to be returned by next().
fragments: VecDeque<Fragment>,
// The timestamp when the broadcast "started", so we can sleep to simulate a live stream. // The timestamp when the broadcast "started", so we can sleep to simulate a live stream.
start: time::Instant, start: time::Instant,
// The raw ftyp box, which we need duplicate for each track, but we don't know how many tracks exist yet. // The initialization payload; ftyp + moov boxes.
ftyp: Vec<u8>, pub init: Vec<u8>,
// The parsed moov box, so we can look up track information later. // The timescale used for each track.
moov: Option<mp4::MoovBox>, timescale: HashMap<u32, u32>,
// Any fragments parsed and ready to be returned by next().
fragments: VecDeque<Fragment>,
} }
pub struct Fragment { pub struct Fragment {
// The track ID for the fragment. // The track ID for the fragment.
pub track: u32, pub track_id: u32,
// The type of the fragment.
pub typ: mp4::BoxType,
// The data of the fragment. // The data of the fragment.
pub data: Vec<u8>, pub data: Vec<u8>,
@ -44,21 +39,42 @@ pub struct Fragment {
} }
impl Source { impl Source {
pub fn new(path: &str) -> io::Result<Self> { pub fn new(path: &str) -> anyhow::Result<Self> {
let f = fs::File::open(path)?; let f = fs::File::open(path)?;
let reader = io::BufReader::new(f); let mut reader = io::BufReader::new(f);
let start = time::Instant::now(); let start = time::Instant::now();
Ok(Self { let ftyp = read_atom(&mut reader)?;
anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom");
let moov = read_atom(&mut reader)?;
anyhow::ensure!(&moov[4..8] == b"moov", "expected moov atom");
let mut init = ftyp;
init.extend(&moov);
// We're going to parse the moov box.
// We have to read the moov box header to correctly advance the cursor for the mp4 crate.
let mut moov_reader = io::Cursor::new(&moov);
let moov_header = mp4::BoxHeader::read(&mut moov_reader)?;
// Parse the moov box so we can detect the timescales for each track.
let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?;
let timescale = moov.traks
.iter()
.map(|trak| (trak.tkhd.track_id, trak.mdia.mdhd.timescale))
.collect();
Ok(Self{
reader, reader,
start, start,
init,
timescale,
fragments: VecDeque::new(), fragments: VecDeque::new(),
ftyp: Vec::new(),
moov: None,
}) })
} }
pub fn get(&mut self) -> anyhow::Result<Option<Fragment>> { pub fn fragment(&mut self) -> anyhow::Result<Option<Fragment>> {
if self.fragments.is_empty() { if self.fragments.is_empty() {
self.parse()?; self.parse()?;
}; };
@ -72,63 +88,13 @@ impl Source {
fn parse(&mut self) -> anyhow::Result<()> { fn parse(&mut self) -> anyhow::Result<()> {
loop { loop {
// Read the next full atom. let atom = read_atom(&mut self.reader)?;
let atom = read_box(&mut self.reader)?;
// Before we return it, let's do some simple parsing.
let mut reader = io::Cursor::new(&atom); let mut reader = io::Cursor::new(&atom);
let header = mp4::BoxHeader::read(&mut reader)?; let header = mp4::BoxHeader::read(&mut reader)?;
match header.name { match header.name {
mp4::BoxType::FtypBox => { mp4::BoxType::FtypBox | mp4::BoxType::MoovBox => anyhow::bail!("must call init first"),
// Don't return anything until we know the total number of tracks.
// To be honest, I didn't expect the borrow checker to allow this, but it does!
self.ftyp = atom;
}
mp4::BoxType::MoovBox => {
// We need to split the moov based on the tracks.
let moov = mp4::MoovBox::read_box(&mut reader, header.size)?;
for trak in &moov.traks {
let track_id = trak.tkhd.track_id;
// Push the styp atom for each track.
self.fragments.push_back(Fragment {
track: track_id,
typ: mp4::BoxType::FtypBox,
data: self.ftyp.clone(),
keyframe: false,
timestamp: None,
});
// Unfortunately, we need to create a brand new moov atom for each track.
// We remove every box for other track IDs.
let mut toov = moov.clone();
toov.traks.retain(|t| t.tkhd.track_id == track_id);
toov.mvex
.as_mut()
.expect("missing mvex")
.trexs
.retain(|f| f.track_id == track_id);
// Marshal the box.
let mut toov_data = Vec::new();
toov.write_box(&mut toov_data)?;
let mut file = std::fs::File::create(format!("track{}.mp4", track_id))?;
file.write_all(toov_data.as_slice())?;
self.fragments.push_back(Fragment {
track: track_id,
typ: mp4::BoxType::MoovBox,
data: toov_data,
keyframe: false,
timestamp: None,
});
}
self.moov = Some(moov);
}
mp4::BoxType::MoofBox => { mp4::BoxType::MoofBox => {
let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
@ -138,8 +104,7 @@ impl Source {
} }
self.fragments.push_back(Fragment { self.fragments.push_back(Fragment {
track: moof.trafs[0].tfhd.track_id, track_id: moof.trafs[0].tfhd.track_id,
typ: mp4::BoxType::MoofBox,
data: atom, data: atom,
keyframe: has_keyframe(&moof), keyframe: has_keyframe(&moof),
timestamp: first_timestamp(&moof), timestamp: first_timestamp(&moof),
@ -147,11 +112,9 @@ impl Source {
} }
mp4::BoxType::MdatBox => { mp4::BoxType::MdatBox => {
let moof = self.fragments.back().expect("no atom before mdat"); let moof = self.fragments.back().expect("no atom before mdat");
assert!(moof.typ == mp4::BoxType::MoofBox, "no moof before mdat");
self.fragments.push_back(Fragment { self.fragments.push_back(Fragment {
track: moof.track, track_id: moof.track_id,
typ: mp4::BoxType::MoofBox,
data: atom, data: atom,
keyframe: false, keyframe: false,
timestamp: None, timestamp: None,
@ -160,7 +123,9 @@ impl Source {
// We have some media data, return so we can start sending it. // We have some media data, return so we can start sending it.
return Ok(()); return Ok(());
} }
_ => anyhow::bail!("unknown top-level atom: {:?}", header.name), _ => {
// Skip unknown atoms
}
} }
} }
} }
@ -171,15 +136,9 @@ impl Source {
let timestamp = next.timestamp?; let timestamp = next.timestamp?;
// Find the timescale for the track. // Find the timescale for the track.
let track = self let timescale = self.timescale.get(&next.track_id).unwrap();
.moov
.as_ref()?
.traks
.iter()
.find(|t| t.tkhd.track_id == next.track)?;
let timescale = track.mdia.mdhd.timescale as u64;
let delay = time::Duration::from_millis(1000 * timestamp / timescale); let delay = time::Duration::from_millis(1000 * timestamp / *timescale as u64);
let elapsed = self.start.elapsed(); let elapsed = self.start.elapsed();
delay.checked_sub(elapsed) delay.checked_sub(elapsed)
@ -187,14 +146,16 @@ impl Source {
} }
// Read a full MP4 atom into a vector. // Read a full MP4 atom into a vector.
fn read_box<R: io::Read>(reader: &mut R) -> anyhow::Result<Vec<u8>> { pub fn read_atom<R: Read>(reader: &mut R) -> anyhow::Result<Vec<u8>> {
// Read the 8 bytes for the size + type // Read the 8 bytes for the size + type
let mut buf = [0u8; 8]; let mut buf = [0u8; 8];
reader.read_exact(&mut buf)?; reader.read_exact(&mut buf)?;
// Convert the first 4 bytes into the size. // Convert the first 4 bytes into the size.
let size = u32::from_be_bytes(buf[0..4].try_into()?) as u64; let size = u32::from_be_bytes(buf[0..4].try_into()?) as u64;
let mut out = buf.to_vec(); //let typ = &buf[4..8].try_into().ok().unwrap();
let mut raw = buf.to_vec();
let mut limit = match size { let mut limit = match size {
// Runs until the end of the file. // Runs until the end of the file.
@ -222,9 +183,9 @@ fn read_box<R: io::Read>(reader: &mut R) -> anyhow::Result<Vec<u8>> {
}; };
// Append to the vector and return it. // Append to the vector and return it.
limit.read_to_end(&mut out)?; limit.read_to_end(&mut raw)?;
Ok(out) Ok(raw)
} }
fn has_keyframe(moof: &mp4::MoofBox) -> bool { fn has_keyframe(moof: &mp4::MoofBox) -> bool {
@ -261,4 +222,4 @@ fn has_keyframe(moof: &mp4::MoofBox) -> bool {
fn first_timestamp(moof: &mp4::MoofBox) -> Option<u64> { fn first_timestamp(moof: &mp4::MoofBox) -> Option<u64> {
Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time) Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time)
} }

View File

@ -7,13 +7,11 @@ pub struct Message {
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct Init { pub struct Init {}
pub id: String,
}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct Segment { pub struct Segment {
pub init: String, pub track_id: u32,
} }
impl Message { impl Message {

View File

@ -1,6 +1,7 @@
mod message; mod message;
use std::time; use std::time;
use std::collections::hash_map as hmap;
use quiche; use quiche;
use quiche::h3::webtransport; use quiche::h3::webtransport;
@ -10,9 +11,8 @@ use crate::{media, transport};
#[derive(Default)] #[derive(Default)]
pub struct Session { pub struct Session {
media: Option<media::Source>, media: Option<media::Source>,
stream_id: Option<u64>, // stream ID of the current segment
streams: transport::Streams, // An easy way of buffering stream data. streams: transport::Streams, // An easy way of buffering stream data.
tracks: hmap::HashMap<u32, u64>, // map from track_id to current stream_id
} }
impl transport::App for Session { impl transport::App for Session {
@ -38,12 +38,23 @@ impl transport::App for Session {
// req.authority() // req.authority()
// req.path() // req.path()
// and you can validate this request with req.origin() // and you can validate this request with req.origin()
session.accept_connect_request(conn, None).unwrap();
// TODO // TODO
let media = media::Source::new("../media/fragmented.mp4")?; let media = media::Source::new("../media/fragmented.mp4")?;
self.media = Some(media); let init = &media.init;
session.accept_connect_request(conn, None).unwrap(); // Create a JSON header.
let mut message = message::Message::new();
message.init = Some(message::Init{});
let data = message.serialize()?;
// Create a new stream and write the header.
let stream_id = session.open_stream(conn, false)?;
self.streams.send(conn, stream_id, data.as_slice(), false)?;
self.streams.send(conn, stream_id, init.as_slice(), true)?;
self.media = Some(media);
} }
webtransport::ServerEvent::StreamData(stream_id) => { webtransport::ServerEvent::StreamData(stream_id) => {
let mut buf = vec![0; 10000]; let mut buf = vec![0; 10000];
@ -84,62 +95,54 @@ impl Session {
}; };
// Get the next media fragment. // Get the next media fragment.
let fragment = match media.get()? { let fragment = match media.fragment()? {
Some(f) => f, Some(f) => f,
None => return Ok(()), None => return Ok(()),
}; };
// Check if we have already created a stream for this fragment. let stream_id = match self.tracks.get(&fragment.track_id) {
let stream_id = match self.stream_id { // Close the old stream.
Some(old_stream_id) if fragment.keyframe => { Some(stream_id) if fragment.keyframe => {
// This is the start of a new segment. self.streams.send(conn, *stream_id, &[], true)?;
None
},
// Close the prior stream. // Use the existing stream
self.streams.send(conn, old_stream_id, &[], true)?; Some(stream_id) => Some(*stream_id),
// Encode a JSON header indicating this is the video track. // No existing stream.
let mut message = message::Message::new(); _ => None,
message.segment = Some(message::Segment { };
init: "video".to_string(),
});
// Open a new stream. let stream_id = match stream_id {
// Use the existing stream,
Some(stream_id) => stream_id,
// Open a new stream.
None => {
let stream_id = session.open_stream(conn, false)?; let stream_id = session.open_stream(conn, false)?;
// TODO: conn.stream_priority(stream_id, urgency, incremental) // TODO: conn.stream_priority(stream_id, urgency, incremental)
// Encode a JSON header indicating this is a new track.
let mut message = message::Message::new();
message.segment = Some(message::Segment {
track_id: fragment.track_id,
});
// Write the header. // Write the header.
let data = message.serialize()?; let data = message.serialize()?;
self.streams.send(conn, stream_id, &data, false)?; self.streams.send(conn, stream_id, &data, false)?;
stream_id self.tracks.insert(fragment.track_id, stream_id);
}
None => {
// This is the start of an init segment.
// Create a JSON header.
let mut message = message::Message::new();
message.init = Some(message::Init {
id: "video".to_string(),
});
let data = message.serialize()?;
// Create a new stream and write the header.
let stream_id = session.open_stream(conn, false)?;
self.streams.send(conn, stream_id, data.as_slice(), false)?;
stream_id stream_id
} },
Some(stream_id) => stream_id, // Continuation of init or segment
}; };
// Write the current fragment. // Write the current fragment.
let data = fragment.data.as_slice(); let data = fragment.data.as_slice();
self.streams.send(conn, stream_id, data, false)?; self.streams.send(conn, stream_id, data, false)?;
// Save the stream ID for the next fragment.
self.stream_id = Some(stream_id);
Ok(()) Ok(())
} }
} }

View File

@ -1 +0,0 @@