From 948d2ea1f1bfde6be4baf5a88402e3fc3bfc0243 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 6 Apr 2023 13:55:35 -0700 Subject: [PATCH] Good enough 5 me. --- player/src/audio/{decoder => }/decoder.ts | 75 +++----- player/src/audio/decoder/index.ts | 29 --- player/src/audio/index.ts | 64 +++++-- player/src/audio/message.ts | 15 +- player/src/audio/renderer.ts | 85 ++++++++ player/src/audio/renderer/index.ts | 47 ----- player/src/audio/ring.ts | 213 +++++++++++---------- player/src/audio/{decoder => }/worker.ts | 11 +- player/src/audio/{renderer => }/worklet.ts | 36 +--- player/src/player/index.ts | 10 +- player/src/video/renderer.ts | 32 +--- 11 files changed, 315 insertions(+), 302 deletions(-) rename player/src/audio/{decoder => }/decoder.ts (59%) delete mode 100644 player/src/audio/decoder/index.ts create mode 100644 player/src/audio/renderer.ts delete mode 100644 player/src/audio/renderer/index.ts rename player/src/audio/{decoder => }/worker.ts (51%) rename player/src/audio/{renderer => }/worklet.ts (53%) diff --git a/player/src/audio/decoder/decoder.ts b/player/src/audio/decoder.ts similarity index 59% rename from player/src/audio/decoder/decoder.ts rename to player/src/audio/decoder.ts index 47d7fb3..d23bdc3 100644 --- a/player/src/audio/decoder/decoder.ts +++ b/player/src/audio/decoder.ts @@ -1,28 +1,23 @@ -import * as Message from "../message"; -import * as MP4 from "../../mp4" -import * as Stream from "../../stream" -import * as Util from "../../util" -import { Ring, RingState } from "../ring" +import * as Message from "./message"; +import * as MP4 from "../mp4" +import * as Stream from "../stream" +import * as Util from "../util" - // Ignore the timestamp output by WebCodecs since it's in microseconds - // We will manually set the timestamp based on the sample rate. - let frameCount = 0 +import Renderer from "./renderer" export default class Decoder { // Store the init message for each track tracks: Map>; - sampleRate: number; - channels: Ring[]; - sync?: number; // the first timestamp + decoder: AudioDecoder; // TODO one per track + sync: Message.Sync; - constructor(config: Message.Config) { + constructor(config: Message.Config, renderer: Renderer) { this.tracks = new Map(); - this.sampleRate = config.sampleRate - this.channels = [] - for (let state of config.channels) { - this.channels.push(new Ring(state)) - } + this.decoder = new AudioDecoder({ + output: renderer.emit.bind(renderer), + error: console.warn, + }); } init(msg: Message.Init) { @@ -39,14 +34,6 @@ export default class Decoder { const track = msg.info.audioTracks[0] const audio = track.audio - if (audio.sample_rate != this.sampleRate) { - throw new Error("sample rate not supported") - } - - if (audio.channel_count > this.channels.length) { - throw new Error("channel count not supported") - } - defer.resolve(msg) } @@ -61,40 +48,24 @@ export default class Decoder { const init = await track.promise; const audio = init.info.audioTracks[0] - const decoder = new AudioDecoder({ - output: (frame: AudioData) => { - for (let i = 0; i < frame.numberOfChannels; i += 1) { - this.channels[i].emit(frameCount, frame, i) - } - - frameCount += frame.numberOfFrames; - }, - error: (err: Error) => { - console.warn(err) - } - }); - - decoder.configure({ - codec: audio.codec, - numberOfChannels: audio.audio.channel_count, - sampleRate: audio.audio.sample_rate, - // optimizeForLatency: true - }) + if (this.decoder.state == "unconfigured") { + this.decoder.configure({ + codec: audio.codec, + numberOfChannels: audio.audio.channel_count, + sampleRate: audio.audio.sample_rate, + }) + } const input = MP4.New(); input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => { for (let sample of samples) { - if (!this.sync) { - this.sync = sample.dts; - } - - // Convert to milliseconds - const timestamp = 1000 * (sample.dts - this.sync) / sample.timescale - const duration = 1000 * sample.duration / sample.timescale + // Convert to microseconds + const timestamp = 1000 * 1000 * sample.dts / sample.timescale + const duration = 1000 * 1000 * sample.duration / sample.timescale // This assumes that timescale == sample rate - decoder.decode(new EncodedAudioChunk({ + this.decoder.decode(new EncodedAudioChunk({ type: sample.is_sync ? "key" : "delta", data: sample.data, duration: duration, diff --git a/player/src/audio/decoder/index.ts b/player/src/audio/decoder/index.ts deleted file mode 100644 index d181e43..0000000 --- a/player/src/audio/decoder/index.ts +++ /dev/null @@ -1,29 +0,0 @@ -import * as Message from "../message" - -// Wrapper to run the decoder in a Worker -export default class Decoder { - worker: Worker; - - constructor(config: Message.Config) { - const url = new URL('worker.ts', import.meta.url) - this.worker = new Worker(url, { - name: "audio", - type: "module", - }) - - this.worker.onmessage = this.onMessage.bind(this) - this.worker.postMessage({ config }) - } - - init(init: Message.Init) { - this.worker.postMessage({ init }) - } - - segment(segment: Message.Segment) { - this.worker.postMessage({ segment }, [ segment.buffer.buffer, segment.reader ]) - } - - private onMessage(e: MessageEvent) { - // TODO - } -} diff --git a/player/src/audio/index.ts b/player/src/audio/index.ts index 3847d3f..725bc11 100644 --- a/player/src/audio/index.ts +++ b/player/src/audio/index.ts @@ -1,35 +1,77 @@ import * as Message from "./message" import Renderer from "./renderer" import Decoder from "./decoder" -import { RingState } from "./ring" + +import { RingInit } from "./ring" // Abstracts the Worker and Worklet into a simpler API // This class must be created on the main thread due to AudioContext. export default class Audio { - decoder: Decoder; // WebWorker - renderer: Renderer; // AudioWorklet + context: AudioContext; + worker: Worker; + worklet: Promise; constructor() { // Assume 44.1kHz and two audio channels const config = { sampleRate: 44100, - channels: [ new RingState(44100), new RingState(44100) ], + ring: new RingInit(2, 4410), // 100ms at 44.1khz } - // Start loading the worker script - this.decoder = new Decoder(config) - this.renderer = new Renderer(config) + this.context = new AudioContext({ + latencyHint: "interactive", + sampleRate: config.sampleRate, + }) + + this.worker = this.setupWorker(config) + this.worklet = this.setupWorklet(config) + } + + private setupWorker(config: Message.Config): Worker { + const url = new URL('worker.ts', import.meta.url) + const worker = new Worker(url, { + name: "audio", + type: "module", + }) + + worker.postMessage({ config }) + + return worker + } + + private async setupWorklet(config: Message.Config): Promise { + // Load the worklet source code. + const url = new URL('worklet.ts', import.meta.url) + await this.context.audioWorklet.addModule(url) + + const volume = this.context.createGain() + volume.gain.value = 2.0; + + // Create a worklet + const worklet = new AudioWorkletNode(this.context, 'renderer'); + worklet.onprocessorerror = (e: Event) => { + console.error("Audio worklet error:", e) + }; + + worklet.port.postMessage({ config }) + + // Connect the worklet to the volume node and then to the speakers + worklet.connect(volume) + volume.connect(this.context.destination) + + return worklet } init(init: Message.Init) { - this.decoder.init(init) + this.worker.postMessage({ init }) } segment(segment: Message.Segment) { - this.decoder.segment(segment) + this.worker.postMessage({ segment }, [ segment.buffer.buffer, segment.reader ]) } - play() { - this.renderer.play() + play(play: Message.Play) { + this.context.resume() + //this.worker.postMessage({ play }) } } \ No newline at end of file diff --git a/player/src/audio/message.ts b/player/src/audio/message.ts index 752e72c..73d4f1c 100644 --- a/player/src/audio/message.ts +++ b/player/src/audio/message.ts @@ -1,9 +1,9 @@ import * as MP4 from "../mp4" -import { RingState } from "./ring" +import { RingInit } from "./ring" export interface Config { sampleRate: number; - channels: RingState[]; + ring: RingInit; } export interface Init { @@ -16,4 +16,15 @@ export interface Segment { track: string; buffer: Uint8Array; // unread buffered data reader: ReadableStream; // unread unbuffered data +} + +// Audio tells video when the given timestamp should be rendered. +export interface Sync { + origin: number; + clock: DOMHighResTimeStamp; + timestamp: number; +} + +export interface Play { + timestamp?: number; } \ No newline at end of file diff --git a/player/src/audio/renderer.ts b/player/src/audio/renderer.ts new file mode 100644 index 0000000..7c5ec9e --- /dev/null +++ b/player/src/audio/renderer.ts @@ -0,0 +1,85 @@ +import * as Message from "./message" +import { Ring } from "./ring" + +export default class Renderer { + ring: Ring; + queue: Array; + sync?: DOMHighResTimeStamp + running: number; + + constructor(config: Message.Config) { + this.ring = new Ring(config.ring) + this.queue = []; + this.running = 0 + } + + emit(frame: AudioData) { + if (!this.sync) { + // Save the frame as the sync point + this.sync = 1000 * performance.now() - frame.timestamp + } + + // Insert the frame into the queue sorted by timestamp. + if (this.queue.length > 0 && this.queue[this.queue.length-1].timestamp <= frame.timestamp) { + // Fast path because we normally append to the end. + this.queue.push(frame) + } else { + // Do a full binary search + let low = 0 + let high = this.queue.length; + + while (low < high) { + var mid = (low + high) >>> 1; + if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1; + else high = mid; + } + + this.queue.splice(low, 0, frame) + } + + if (!this.running) { + // Wait for the next animation frame + this.running = self.requestAnimationFrame(this.render.bind(this)) + } + } + + render() { + // Determine the target timestamp. + const target = 1000 * performance.now() - this.sync! + + // Check if we should skip some frames + while (this.queue.length) { + const next = this.queue[0] + if (next.timestamp >= target) { + break + } + + console.warn("dropping audio") + + this.queue.shift() + next.close() + } + + // Push as many as we can to the ring buffer. + while (this.queue.length) { + let frame = this.queue[0] + let ok = this.ring.write(frame) + if (!ok) { + break + } + + frame.close() + this.queue.shift() + } + + if (this.queue.length) { + this.running = self.requestAnimationFrame(this.render.bind(this)) + } else { + this.running = 0 + } + } + + play(play: Message.Play) { + this.ring.reset() + } +} \ No newline at end of file diff --git a/player/src/audio/renderer/index.ts b/player/src/audio/renderer/index.ts deleted file mode 100644 index 38bb07e..0000000 --- a/player/src/audio/renderer/index.ts +++ /dev/null @@ -1,47 +0,0 @@ -import * as Message from "../message" - -export default class Renderer { - context: AudioContext; - worklet: Promise; - - constructor(config: Message.Config) { - this.context = new AudioContext({ - latencyHint: "interactive", - sampleRate: config.sampleRate, - }) - - this.worklet = this.setup(config) - } - - private async setup(config: Message.Config): Promise { - // Load the worklet source code. - const url = new URL('worklet.ts', import.meta.url) - await this.context.audioWorklet.addModule(url) - - const volume = this.context.createGain() - volume.gain.value = 2.0; - - // Create a worklet - const worklet = new AudioWorkletNode(this.context, 'renderer'); - worklet.onprocessorerror = (e: Event) => { - console.error("Audio worklet error:", e) - }; - - worklet.port.onmessage = this.onMessage.bind(this) - worklet.port.postMessage({ config }) - - // Connect the worklet to the volume node and then to the speakers - worklet.connect(volume) - volume.connect(this.context.destination) - - return worklet - } - - private onMessage(e: MessageEvent) { - // TODO - } - - play() { - this.context.resume() - } -} \ No newline at end of file diff --git a/player/src/audio/ring.ts b/player/src/audio/ring.ts index c6a4a51..cd796ee 100644 --- a/player/src/audio/ring.ts +++ b/player/src/audio/ring.ts @@ -1,142 +1,143 @@ // Ring buffer with audio samples. enum STATE { - START = 0, // First timestamp in the ring buffer. - END, // Last timestamp in the ring buffer. - LENGTH // Clever way of saving the total number of enums values. + READ_INDEX = 0, // Index of the current read position (mod capacity) + WRITE_INDEX, // Index of the current write position (mod capacity) + LENGTH // Clever way of saving the total number of enums values. } export class Ring { - state: RingState; + state: Int32Array; + channels: Float32Array[]; + capacity: number; - constructor(state: RingState) { - this.state = state - } + constructor(init: RingInit) { + this.state = new Int32Array(init.state) - // Add the samples for single audio frame with the given channel - emit(timestamp: number, frame: AudioData, channel: number) { - let start = timestamp; - - // The number of samples to skip at the start. - let offset = this.start - timestamp; - if (offset > 0) { - console.warn("dropping old samples", offset) - start += offset; - } else { - offset = 0 + this.channels = [] + for (let channel of init.channels) { + this.channels.push(new Float32Array(channel)) } - let count = frame.numberOfFrames - offset; - if (count <= 0) { - frame.close() + this.capacity = init.capacity + } - // Skip the entire frame + // Add the samples for single audio frame + write(frame: AudioData): boolean { + let count = frame.numberOfFrames; + + let readIndex = Atomics.load(this.state, STATE.READ_INDEX) + let writeIndex = Atomics.load(this.state, STATE.WRITE_INDEX) + let writeIndexNew = writeIndex + count; + + // There's not enough space in the ring buffer + if (writeIndexNew - readIndex > this.capacity) { + return false + } + + let startIndex = writeIndex % this.capacity; + let endIndex = writeIndexNew % this.capacity; + + // Loop over each channel + for (let i = 0; i < this.channels.length; i += 1) { + const channel = this.channels[i] + + if (startIndex < endIndex) { + // One continuous range to copy. + const full = channel.subarray(startIndex, endIndex) + + frame.copyTo(full, { + planeIndex: i, + frameCount: count, + }) + } else { + const first = channel.subarray(startIndex) + const second = channel.subarray(0, endIndex) + + frame.copyTo(first, { + planeIndex: i, + frameCount: first.length, + }) + + frame.copyTo(second, { + planeIndex: i, + frameOffset: first.length, + frameCount: second.length, + }) + } + } + + Atomics.store(this.state, STATE.WRITE_INDEX, writeIndexNew) + + return true + } + + read(dst: Float32Array[]) { + let readIndex = Atomics.load(this.state, STATE.READ_INDEX) + let writeIndex = Atomics.load(this.state, STATE.WRITE_INDEX) + if (readIndex >= writeIndex) { + // nothing to read return } - let end = timestamp + count; - - if (end >= start + this.state.capacity) { - // The renderer has to buffer frames; we have a fixed capacity. - // TODO maybe it's better to buffer here instead. - throw new Error("exceeded capacity") + let readIndexNew = readIndex + dst[0].length + if (readIndexNew > writeIndex) { + // Partial read + readIndexNew = writeIndex } - const startIndex = start % this.state.capacity; - const endIndex = end % this.state.capacity; + let startIndex = readIndex % this.capacity; + let endIndex = readIndexNew % this.capacity; - if (startIndex < endIndex) { - // One continuous range to copy. - const full = new Float32Array(this.state.buffer, 4*startIndex, endIndex-startIndex) + // Loop over each channel + for (let i = 0; i < dst.length; i += 1) { + if (i >= this.channels.length) { + // ignore excess channels + } - frame.copyTo(full, { - planeIndex: channel, - frameOffset: offset, - frameCount: count, - }) - } else { - // Wrapped around the ring buffer, so we have to copy twice. - const wrap = this.state.capacity - startIndex; + const input = this.channels[i] + const output = dst[i] - const first = new Float32Array(this.state.buffer, 4*startIndex, this.state.capacity - startIndex) - const second = new Float32Array(this.state.buffer, 0, endIndex) + if (startIndex < endIndex) { + const full = input.subarray(startIndex, endIndex) + output.set(full) + } else { + const first = input.subarray(startIndex) + const second = input.subarray(0, endIndex) - frame.copyTo(first, { - planeIndex: channel, - frameOffset: offset, - frameCount: wrap, - }) - - frame.copyTo(second, { - planeIndex: channel, - frameOffset: offset + wrap, - frameCount: endIndex, - }) + output.set(first) + output.set(second, first.length) + } } - if (this.end < end) { - this.end = end - } + Atomics.store(this.state, STATE.READ_INDEX, readIndexNew) } - peek(count: number): Float32Array[] { - const start = this.start - - let end = this.end - if (end > start + count) { - end = start + count - } - - const startIndex = start % this.state.capacity; - const endIndex = end % this.state.capacity; - - if (startIndex < endIndex) { - const full = new Float32Array(this.state.buffer, 4*startIndex, endIndex - startIndex) - return [ full ] - } else { - const first = new Float32Array(this.state.buffer, 4*startIndex, this.state.capacity - startIndex) - const second = new Float32Array(this.state.buffer, 0, endIndex) - return [ first, second ] - } - } - - advance(count: number) { - this.start += count - } - - set start(start: number) { - Atomics.store(this.state.stateView, STATE.START, start); - } - - get start(): number { - return Atomics.load(this.state.stateView, STATE.START); - } - - set end(end: number) { - Atomics.store(this.state.stateView, STATE.END, end); - } - - get end(): number { - return Atomics.load(this.state.stateView, STATE.END); + // TODO not thread safe + clear() { + const writeIndex = Atomics.load(this.state, STATE.WRITE_INDEX) + Atomics.store(this.state, STATE.READ_INDEX, writeIndex) } } // No prototype to make this easier to send via postMessage -export class RingState { +export class RingInit { state: SharedArrayBuffer; - stateView: Int32Array; - - buffer: SharedArrayBuffer; + channels: SharedArrayBuffer[]; capacity: number; - constructor(capacity: number) { - // Store this many samples in a ring buffer. - this.buffer = new SharedArrayBuffer(capacity * Float32Array.BYTES_PER_ELEMENT) - this.capacity = capacity - + constructor(channels: number, capacity: number) { // Store the current state in a separate ring buffer. this.state = new SharedArrayBuffer(STATE.LENGTH * Int32Array.BYTES_PER_ELEMENT) - this.stateView = new Int32Array(this.state) + + // Create a buffer for each audio channel + this.channels = [] + for (let i = 0; i < channels; i += 1) { + const buffer = new SharedArrayBuffer(capacity * Float32Array.BYTES_PER_ELEMENT) + this.channels.push(buffer) + } + + this.capacity = capacity } } \ No newline at end of file diff --git a/player/src/audio/decoder/worker.ts b/player/src/audio/worker.ts similarity index 51% rename from player/src/audio/decoder/worker.ts rename to player/src/audio/worker.ts index 3914e14..7ed9003 100644 --- a/player/src/audio/decoder/worker.ts +++ b/player/src/audio/worker.ts @@ -1,10 +1,15 @@ import Decoder from "./decoder" +import Renderer from "./renderer" + +import * as Message from "./message" let decoder: Decoder +let renderer: Renderer; self.addEventListener('message', (e: MessageEvent) => { if (e.data.config) { - decoder = new Decoder(e.data.config) + renderer = new Renderer(e.data.config) + decoder = new Decoder(e.data.config, renderer) } if (e.data.init) { @@ -14,4 +19,8 @@ self.addEventListener('message', (e: MessageEvent) => { if (e.data.segment) { decoder.decode(e.data.segment) } + + if (e.data.play) { + renderer.play(e.data.play) + } }) \ No newline at end of file diff --git a/player/src/audio/renderer/worklet.ts b/player/src/audio/worklet.ts similarity index 53% rename from player/src/audio/renderer/worklet.ts rename to player/src/audio/worklet.ts index 068645c..401eec9 100644 --- a/player/src/audio/renderer/worklet.ts +++ b/player/src/audio/worklet.ts @@ -2,17 +2,19 @@ // The renderer copies audio samples to a ring buffer read by this worklet. // The worklet then outputs those samples to emit audio. -import * as Message from "../message" +import * as Message from "./message" -import { Ring, RingState } from "../ring" +import { Ring } from "./ring" class Renderer extends AudioWorkletProcessor { - channels?: Ring[]; + ring?: Ring; + base: number; constructor(params: AudioWorkletNodeOptions) { // The super constructor call is required. super(); + this.base = 0 this.port.onmessage = this.onMessage.bind(this) } @@ -23,15 +25,12 @@ class Renderer extends AudioWorkletProcessor { } config(config: Message.Config) { - this.channels = [] - for (let state of config.channels) { - this.channels.push(new Ring(state)) - } + this.ring = new Ring(config.ring) } // Inputs and outputs in groups of 128 samples. process(inputs: Float32Array[][], outputs: Float32Array[][], parameters: Record): boolean { - if (!this.channels) { + if (!this.ring) { // Not initialized yet return true } @@ -41,26 +40,7 @@ class Renderer extends AudioWorkletProcessor { } const output = outputs[0] - - for (let i = 0; i < output.length; i += 1) { - const source = this.channels[i] - const channel = output[i]; - - const parts = source.peek(channel.length) - - let offset = 0 - for (let i = 0; i < parts.length; i += 1) { - channel.set(parts[i], offset) - offset += parts[i].length - } - - if (offset < channel.length) { - // TODO render silence - } - - // Always advance the full amount. - source.advance(channel.length) - } + this.ring.read(output) return true; } diff --git a/player/src/player/index.ts b/player/src/player/index.ts index 2d44048..00438b2 100644 --- a/player/src/player/index.ts +++ b/player/src/player/index.ts @@ -34,7 +34,13 @@ export default class Player { } play() { - this.audio.play() - this.video.play() + this.audio.play({}) + //this.video.play() } + + onMessage(msg: any) { + if (msg.sync) { + msg.sync + } + } } \ No newline at end of file diff --git a/player/src/video/renderer.ts b/player/src/video/renderer.ts index 71e6a59..98fd3b6 100644 --- a/player/src/video/renderer.ts +++ b/player/src/video/renderer.ts @@ -4,17 +4,13 @@ export default class Renderer { canvas: OffscreenCanvas; queue: Array; render: number; // non-zero if requestAnimationFrame has been called - sync: DOMHighResTimeStamp; // the wall clock value for timestamp 0 + sync?: DOMHighResTimeStamp; // the wall clock value for timestamp 0 last?: number; // the timestamp of the last rendered frame - maxDuration: number; // the maximum duration allowed in the buffer - constructor(config: Message.Config) { this.canvas = config.canvas; this.queue = []; this.render = 0; - this.sync = 0; - this.maxDuration = 10 * 1000 } emit(frame: VideoFrame) { @@ -30,14 +26,14 @@ export default class Renderer { } // Insert the frame into the queue sorted by timestamp. - let low = 0 - let high = this.queue.length; - - // Fast path because we normally append to the end. if (this.queue.length > 0 && this.queue[this.queue.length-1].timestamp <= frame.timestamp) { + // Fast path because we normally append to the end. this.queue.push(frame) } else { // Do a full binary search + let low = 0 + let high = this.queue.length; + while (low < high) { var mid = (low + high) >>> 1; if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1; @@ -47,18 +43,6 @@ export default class Renderer { this.queue.splice(low, 0, frame) } - // Trim the max size of the buffer - const last = this.queue[this.queue.length-1].timestamp - while (1) { - const first = this.queue[0] - if (first.timestamp + this.maxDuration >= last) { - break - } - - first.close() - this.queue.shift() - } - // Queue up to render the next frame. if (!this.render) { this.render = self.requestAnimationFrame(this.draw.bind(this)) @@ -67,10 +51,10 @@ export default class Renderer { draw(now: DOMHighResTimeStamp) { // Determine the target timestamp. - const target = now - this.sync + const target = now - this.sync! let frame = this.queue[0] - if (frame.timestamp > target) { + if (frame.timestamp >= target) { // nothing to render yet, wait for the next animation frame this.render = self.requestAnimationFrame(this.draw.bind(this)) return @@ -92,7 +76,7 @@ export default class Renderer { } const ctx = this.canvas.getContext("2d"); - ctx?.drawImage(frame, 0, 0, this.canvas.width, this.canvas.height) // TODO aspect ratio + ctx!.drawImage(frame, 0, 0, this.canvas.width, this.canvas.height) // TODO aspect ratio this.last = frame.timestamp; frame.close()