diff --git a/player/src/audio/decoder.ts b/player/src/audio/decoder.ts index c1b813f..5c1d654 100644 --- a/player/src/audio/decoder.ts +++ b/player/src/audio/decoder.ts @@ -1,10 +1,13 @@ import * as Message from "./message"; -import { InitParser } from "../mp4/init"; +import * as MP4 from "../mp4" +import * as Stream from "../stream" +import * as Util from "../util" + import { Renderer } from "./renderer" -import { MP4New, MP4Sample, MP4ArrayBuffer } from "../mp4/index" export class Decoder { - tracks: Map; + // Store the init message for each track + tracks: Map> renderer: Renderer; constructor(renderer: Renderer) { @@ -13,43 +16,34 @@ export class Decoder { } async init(msg: Message.Init) { - let track = this.tracks.get(msg.track); - if (!track) { - track = new InitParser() - this.tracks.set(msg.track, track) + let track = this.tracks.get(msg.track); + if (!track) { + track = new Util.Deferred() + this.tracks.set(msg.track, track) + } + + if (msg.info.audioTracks.length != 1 || msg.info.videoTracks.length != 0) { + throw new Error("Expected a single audio track") } - while (1) { - const data = await msg.stream.read() - if (!data) break - - track.init(data) - } - - // TODO this will hang on incomplete data - const init = await track.ready; - const info = init.info; - - if (info.audioTracks.length != 1 || info.videoTracks.length != 0) { - throw new Error("expected a single audio track") - } + track.resolve(msg) } async decode(msg: Message.Segment) { let track = this.tracks.get(msg.track); if (!track) { - track = new InitParser() + track = new Util.Deferred() this.tracks.set(msg.track, track) } // Wait for the init segment to be fully received and parsed - const init = await track.ready; + const init = await track.promise; const info = init.info; - const video = info.videoTracks[0] + const audio = info.audioTracks[0] const decoder = new AudioDecoder({ output: (frame: AudioFrame) => { - this.renderer.push(frame) + this.renderer.emit(frame) }, error: (err: Error) => { console.warn(err) @@ -57,21 +51,19 @@ export class Decoder { }); decoder.configure({ - codec: info.mime, - // TODO what else? + codec: audio.codec, // optimizeForLatency: true }) - const input = MP4New(); + const input = MP4.New(); - input.onSamples = (id: number, user: any, samples: MP4Sample[]) => { + input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => { for (let sample of samples) { - const timestamp = sample.dts / (1000 / info.timescale) // milliseconds - + // TODO this assumes that timescale == sample rate decoder.decode(new EncodedAudioChunk({ data: sample.data, duration: sample.duration, - timestamp: timestamp, + timestamp: sample.dts, })) } } @@ -81,13 +73,16 @@ export class Decoder { input.start(); } - let offset = 0 - // MP4box requires us to reparse the init segment unfortunately - for (let raw of track.raw) { - offset = input.appendBuffer(raw) + let offset = 0; + + for (let raw of init.raw) { + raw.fileStart = offset + input.appendBuffer(raw) } + const stream = new Stream.Reader(msg.reader, msg.buffer) + /* TODO I'm not actually sure why this code doesn't work; something trips up the MP4 parser while (1) { const data = await stream.read() @@ -99,17 +94,17 @@ export class Decoder { */ // One day I'll figure it out; until then read one top-level atom at a time - while (!await msg.stream.done()) { - const raw = await msg.stream.peek(4) + while (!await stream.done()) { + const raw = await stream.peek(4) const size = new DataView(raw.buffer, raw.byteOffset, raw.byteLength).getUint32(0) - const atom = await msg.stream.bytes(size) + const atom = await stream.bytes(size) // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately let box = new Uint8Array(atom.byteLength); box.set(atom) // and for some reason we need to modify the underlying ArrayBuffer with offset - let buffer = box.buffer as MP4ArrayBuffer + let buffer = box.buffer as MP4.ArrayBufferOffset buffer.fileStart = offset // Parse the data diff --git a/player/src/audio/index.ts b/player/src/audio/index.ts index 517a3f4..3c06976 100644 --- a/player/src/audio/index.ts +++ b/player/src/audio/index.ts @@ -6,14 +6,14 @@ export default class Audio { constructor(config: Message.Config) { this.worker = new Worker(new URL('worker.ts', import.meta.url), { type: "module" }) - this.worker.postMessage({ config }, [ ]) + this.worker.postMessage({ config }, []) } init(init: Message.Init) { - this.worker.postMessage({ init }, [ init.stream.buffer, init.stream.reader ]) + this.worker.postMessage({ init }) // note: we copy the raw init bytes each time } segment(segment: Message.Segment) { - this.worker.postMessage({ segment }, [ segment.stream.buffer, segment.stream.reader ]) + this.worker.postMessage({ segment }, [ segment.buffer.buffer, segment.reader ]) } } \ No newline at end of file diff --git a/player/src/audio/message.ts b/player/src/audio/message.ts index 9845708..f9e39e5 100644 --- a/player/src/audio/message.ts +++ b/player/src/audio/message.ts @@ -1,15 +1,17 @@ -import Reader from "../stream/reader"; +import * as MP4 from "../mp4" export interface Config { - canvas: OffscreenCanvas; + // temporarily empty } export interface Init { track: string; - stream: Reader; + info: MP4.Info; + raw: MP4.ArrayBufferOffset[]; } export interface Segment { track: string; - stream: Reader; -} \ No newline at end of file + buffer: Uint8Array; // unread buffered data + reader: ReadableStream; // unread unbuffered data +} diff --git a/player/src/audio/renderer.ts b/player/src/audio/renderer.ts index 774e11b..5a704cd 100644 --- a/player/src/audio/renderer.ts +++ b/player/src/audio/renderer.ts @@ -1,12 +1,26 @@ import * as Message from "./message"; +import Source from "./source"; + export class Renderer { + ctx: AudioContext; + source: Source; + render: number; // non-zero if requestAnimationFrame has been called - sync: DOMHighResTimeStamp; // the wall clock value for timestamp 0 last?: number; // the timestamp of the last rendered frame - constructor(config: Message.Config) { + maxDuration: number; // the maximum duration allowed in the buffer + + constructor() { this.render = 0; - this.sync = 0; + this.maxDuration = 10 * 1000 + + // TODO evaluate { latencyHint: "interactive" } + this.ctx = new AudioContext() + this.source = new Source(this.ctx) + } + + emit(frame: AudioFrame) { + this.source.emit(frame) } } \ No newline at end of file diff --git a/player/src/audio/source/index.ts b/player/src/audio/source/index.ts new file mode 100644 index 0000000..c0f8421 --- /dev/null +++ b/player/src/audio/source/index.ts @@ -0,0 +1,64 @@ +import * as Message from "./message" +import Ring from "./ring" + +// Wrapper around the AudioWorklet API to make it easier to use. +export default class Source { + ctx: AudioContext; + worklet?: AudioWorkletNode; // async initialization + channels: Ring[]; + + ready: Promise; + + constructor(ctx: AudioContext) { + this.ctx = ctx + + // two channels, holding a maximum of 1s at 44khz + this.channels = [ + new Ring(44000), + new Ring(44000), + ] + + // Start loading the worklet + this.ready = this.setup() + } + + private async setup(): Promise { + // Load the worklet source code. + await this.ctx.audioWorklet.addModule('worklet.ts') + + // Create a worklet + this.worklet = new AudioWorkletNode(this.ctx, 'source'); + + this.worklet.port.onmessage = this.onMessage.bind(this) + + this.worklet.onprocessorerror = (e: Event) => { + console.error("Audio worklet error:", e); + }; + + const config: Message.Config = { + channels: this.channels, + } + + this.worklet.port.postMessage({ config }) + } + + private async onMessage(e: MessageEvent) { + if (e.data.configReply) { + const reply = e.data.configReply as Message.ConfigReply + + if (reply.error) { + throw reply.error + } + + // Start playback + this.worklet?.connect(this.ctx.destination); + } + } + + emit(frame: AudioFrame) { + for (let i = 0; i < frame.channels; i += 1) { + const ring = this.channels[i] + ring.set(frame, i) + } + } +} \ No newline at end of file diff --git a/player/src/audio/source/message.ts b/player/src/audio/source/message.ts new file mode 100644 index 0000000..9f21a71 --- /dev/null +++ b/player/src/audio/source/message.ts @@ -0,0 +1,11 @@ +import Ring from "./ring" + +// Sent to the worklet to share ring buffers. +export interface Config { + channels: Ring[]; +} + +// Reply from the worklet indicating when the configuration was suscessful. +export interface ConfigReply { + error?: Error; +} \ No newline at end of file diff --git a/player/src/audio/source/ring.ts b/player/src/audio/source/ring.ts new file mode 100644 index 0000000..dedb1d0 --- /dev/null +++ b/player/src/audio/source/ring.ts @@ -0,0 +1,131 @@ +// Ring buffer with audio samples. + +// TODO typescript enums when I have internet access +const STATE = { + START: 0, + END: 1, +} + +export default class Ring { + state: SharedArrayBuffer; + stateView: Int32Array; + + buffer: SharedArrayBuffer; + capacity: number; + + constructor(samples: number) { + this.state = new SharedArrayBuffer(Object.keys(STATE).length * Int32Array.BYTES_PER_ELEMENT) + this.stateView = new Int32Array(this.state) + + this.setStart(0) + this.setEnd(0) + + this.capacity = samples; + + // TODO better way to loop in modern Javascript? + this.buffer = new SharedArrayBuffer(samples * Float32Array.BYTES_PER_ELEMENT) + } + + setStart(start: number) { + return Atomics.store(this.stateView, STATE.START, start); + } + + getStart(): number { + return Atomics.load(this.stateView, STATE.START); + } + + setEnd(end: number) { + return Atomics.store(this.stateView, STATE.START, end); + } + + getEnd(): number { + return Atomics.load(this.stateView, STATE.END); + } + + set(frame: AudioFrame, channel: number) { + let start = this.getStart() + + // The number of samples to skip at the start. + let offset = start - frame.timestamp; + if (offset > 0) { + console.warn("dropping old samples", offset) + } else { + offset = 0 + } + + let count = frame.numberOfFrames - offset; + if (count <= 0) { + frame.close() + + // Skip the entire frame + return + } + + if (start + this.capacity < frame.timestamp + count) { + // The renderer has to buffer frames; we have a fixed capacity. + // TODO maybe it's better to buffer here instead. + throw new Error("exceeded capacity") + } + + let end = this.getEnd() + + const startIndex = start % this.capacity; + const endIndex = end % this.capacity; + + if (startIndex < endIndex) { + // One continuous range to copy. + const full = new Float32Array(this.buffer, startIndex, endIndex-startIndex) + + frame.copyTo(full, { + planeIndex: channel, + frameOffset: offset, + frameCount: count, + }) + } else { + // Wrapped around the ring buffer, so we have to copy twice. + const wrap = this.capacity - startIndex; + + const first = new Float32Array(this.buffer, startIndex) + const second = new Float32Array(this.buffer, 0, endIndex) + + frame.copyTo(first, { + planeIndex: channel, + frameOffset: offset, + frameCount: wrap, + }) + + frame.copyTo(second, { + planeIndex: channel, + frameOffset: offset + wrap, + frameCount: endIndex, + }) + } + + // TODO insert silence when index > end + if (frame.timestamp + count > end) { + end = frame.timestamp + count + this.setEnd(end) + } + } + + peek(count: number): Float32Array[] { + const start = this.getStart() + const end = this.getEnd() + + const startIndex = start % this.capacity; + const endIndex = end % this.capacity; + + if (startIndex < endIndex) { + const full = new Float32Array(this.buffer, startIndex, endIndex - startIndex) + return [ full ] + } else { + const first = new Float32Array(this.buffer, startIndex) + const second = new Float32Array(this.buffer, 0, endIndex) + return [ first, second ] + } + } + + advance(count: number) { + this.setStart(this.getStart() + count) + } +} \ No newline at end of file diff --git a/player/src/audio/source/worklet.ts b/player/src/audio/source/worklet.ts new file mode 100644 index 0000000..8ade137 --- /dev/null +++ b/player/src/audio/source/worklet.ts @@ -0,0 +1,68 @@ +// This is an AudioWorklet that acts as a media source. +// The renderer copies audio samples to a ring buffer read by this worklet. +// The worklet then outputs those samples to emit audio. + +import * as Message from "../message" +import * as Util from "../../util" + +import Ring from "./ring" + +class Source extends AudioWorkletProcessor { + channels?: Ring[]; + + constructor() { + // The super constructor call is required. + super(); + + this.port.onmessage = (e: MessageEvent) => { + if (e.data.config) { + this.config(e.data.config as Message.Config) + } + } + } + + static get parameterDescriptors() { + return []; + } + + config(config: Message.Config) { + this.channels = config.channels; + } + + // TODO correct types + process(inputs: any, outputs: any, parameters: any) { + if (!this.channels) { + return + } + + if (outputs.length != 1) { + throw new Error("only a single track is supported") + } + + const track = outputs[0]; + + for (let i = 0; i < track.length; i += 1) { + const input = this.channels[i] + const output = track[i]; + + const parts = input.peek(output.length) + + let offset = 0 + for (let i = 0; i < parts.length; i += 1) { + output.set(parts[i], offset) + offset += parts[i].length + } + + if (offset < output.length) { + // TODO render silence + } + + // Always advance the full amount. + input.advance(output.length) + } + + return true; + } +} + +self.registerProcessor("source", Source); \ No newline at end of file diff --git a/player/src/audio/worker.ts b/player/src/audio/worker.ts index dec1c3f..028fc65 100644 --- a/player/src/audio/worker.ts +++ b/player/src/audio/worker.ts @@ -2,20 +2,24 @@ import { Renderer } from "./renderer" import { Decoder } from "./decoder" import * as Message from "./message" -let decoder: Decoder; let renderer: Renderer; +let decoder: Decoder; self.addEventListener('message', async (e: MessageEvent) => { if (e.data.config) { - const config = e.data.config as Message.Config + const config = e.data.config as Message.Config; - renderer = new Renderer(config) + renderer = new Renderer() decoder = new Decoder(renderer) } + if (e.data.init) { + const init = e.data.init as Message.Init + await decoder.init(init) + } + if (e.data.segment) { const segment = e.data.segment as Message.Segment - await decoder.decode(segment) } }) diff --git a/player/src/transport/index.ts b/player/src/transport/index.ts index 677ac0d..f9df8ee 100644 --- a/player/src/transport/index.ts +++ b/player/src/transport/index.ts @@ -2,7 +2,8 @@ import * as Message from "./message" import * as Stream from "../stream" import * as MP4 from "../mp4" -import Video from "../video/index" +import Audio from "../audio" +import Video from "../video" // @ts-ignore bundler embeds data import fingerprint from 'bundle-text:./fingerprint.hex'; @@ -19,13 +20,14 @@ export class Player { api: Promise; tracks: Map - //audio: Worker; + audio: Audio; video: Video; constructor(props: PlayerInit) { this.tracks = new Map(); - //this.audio = new Worker("../audio") + // TODO move these to another class so this only deals with the transport. + this.audio = new Audio({}) this.video = new Video({ canvas: props.canvas.transferControlToOffscreen(), }) @@ -52,7 +54,7 @@ export class Player { hash.push(parseInt(fingerprint.substring(c, c+2), 16)); } - const quic = new WebTransport(url, { + const quic = new WebTransport(url, { "serverCertificateHashes": [{ "algorithm": "sha-256", "value": new Uint8Array(hash), @@ -131,12 +133,20 @@ export class Player { throw new Error("expected a single track") } - if (info.videoTracks.length) { + if (info.audioTracks) { + this.audio.init({ + track: msg.id, + info: info, + raw: track.raw, + }) + } else if (info.videoTracks) { this.video.init({ track: msg.id, info: info, raw: track.raw, }) + } else { + throw new Error("init is neither audio nor video") } } @@ -147,16 +157,23 @@ export class Player { this.tracks.set(msg.init, track) } + // Wait until we learn if this is an audio or video track const info = await track.info - // Wait until we learn if this is an audio or video track - - if (info.videoTracks.length) { + if (info.audioTracks) { + this.audio.segment({ + track: msg.init, + buffer: stream.buffer, + reader: stream.reader, + }) + } else if (info.videoTracks) { this.video.segment({ track: msg.init, buffer: stream.buffer, reader: stream.reader, }) + } else { + throw new Error("segment is neither audio nor video") } } } \ No newline at end of file diff --git a/player/src/video/decoder.ts b/player/src/video/decoder.ts index b4d8c4e..276e2ff 100644 --- a/player/src/video/decoder.ts +++ b/player/src/video/decoder.ts @@ -22,6 +22,10 @@ export class Decoder { this.tracks.set(msg.track, track) } + if (msg.info.videoTracks.length != 1 || msg.info.audioTracks.length != 0) { + throw new Error("Expected a single video track") + } + track.resolve(msg) } @@ -39,7 +43,7 @@ export class Decoder { const decoder = new VideoDecoder({ output: (frame: VideoFrame) => { - this.renderer.push(frame) + this.renderer.emit(frame) }, error: (err: Error) => { console.warn(err) @@ -83,7 +87,7 @@ export class Decoder { // MP4box requires us to reparse the init segment unfortunately let offset = 0; - + for (let raw of init.raw) { raw.fileStart = offset input.appendBuffer(raw) diff --git a/player/src/video/renderer.ts b/player/src/video/renderer.ts index 98170ab..e863746 100644 --- a/player/src/video/renderer.ts +++ b/player/src/video/renderer.ts @@ -17,7 +17,7 @@ export class Renderer { this.maxDuration = 10 * 1000 } - push(frame: VideoFrame) { + emit(frame: VideoFrame) { if (!this.sync) { // Save the frame as the sync point this.sync = performance.now() - frame.timestamp