Good enough 5 me.

This commit is contained in:
Luke Curley 2023-04-06 13:55:35 -07:00
parent 9e29d552a0
commit 948d2ea1f1
11 changed files with 315 additions and 302 deletions

View File

@ -1,28 +1,23 @@
import * as Message from "../message"; import * as Message from "./message";
import * as MP4 from "../../mp4" import * as MP4 from "../mp4"
import * as Stream from "../../stream" import * as Stream from "../stream"
import * as Util from "../../util" import * as Util from "../util"
import { Ring, RingState } from "../ring"
// Ignore the timestamp output by WebCodecs since it's in microseconds import Renderer from "./renderer"
// We will manually set the timestamp based on the sample rate.
let frameCount = 0
export default class Decoder { export default class Decoder {
// Store the init message for each track // Store the init message for each track
tracks: Map<string, Util.Deferred<Message.Init>>; tracks: Map<string, Util.Deferred<Message.Init>>;
sampleRate: number; decoder: AudioDecoder; // TODO one per track
channels: Ring[]; sync: Message.Sync;
sync?: number; // the first timestamp
constructor(config: Message.Config) { constructor(config: Message.Config, renderer: Renderer) {
this.tracks = new Map(); this.tracks = new Map();
this.sampleRate = config.sampleRate
this.channels = [] this.decoder = new AudioDecoder({
for (let state of config.channels) { output: renderer.emit.bind(renderer),
this.channels.push(new Ring(state)) error: console.warn,
} });
} }
init(msg: Message.Init) { init(msg: Message.Init) {
@ -39,14 +34,6 @@ export default class Decoder {
const track = msg.info.audioTracks[0] const track = msg.info.audioTracks[0]
const audio = track.audio const audio = track.audio
if (audio.sample_rate != this.sampleRate) {
throw new Error("sample rate not supported")
}
if (audio.channel_count > this.channels.length) {
throw new Error("channel count not supported")
}
defer.resolve(msg) defer.resolve(msg)
} }
@ -61,40 +48,24 @@ export default class Decoder {
const init = await track.promise; const init = await track.promise;
const audio = init.info.audioTracks[0] const audio = init.info.audioTracks[0]
const decoder = new AudioDecoder({ if (this.decoder.state == "unconfigured") {
output: (frame: AudioData) => { this.decoder.configure({
for (let i = 0; i < frame.numberOfChannels; i += 1) {
this.channels[i].emit(frameCount, frame, i)
}
frameCount += frame.numberOfFrames;
},
error: (err: Error) => {
console.warn(err)
}
});
decoder.configure({
codec: audio.codec, codec: audio.codec,
numberOfChannels: audio.audio.channel_count, numberOfChannels: audio.audio.channel_count,
sampleRate: audio.audio.sample_rate, sampleRate: audio.audio.sample_rate,
// optimizeForLatency: true
}) })
}
const input = MP4.New(); const input = MP4.New();
input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => { input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => {
for (let sample of samples) { for (let sample of samples) {
if (!this.sync) { // Convert to microseconds
this.sync = sample.dts; const timestamp = 1000 * 1000 * sample.dts / sample.timescale
} const duration = 1000 * 1000 * sample.duration / sample.timescale
// Convert to milliseconds
const timestamp = 1000 * (sample.dts - this.sync) / sample.timescale
const duration = 1000 * sample.duration / sample.timescale
// This assumes that timescale == sample rate // This assumes that timescale == sample rate
decoder.decode(new EncodedAudioChunk({ this.decoder.decode(new EncodedAudioChunk({
type: sample.is_sync ? "key" : "delta", type: sample.is_sync ? "key" : "delta",
data: sample.data, data: sample.data,
duration: duration, duration: duration,

View File

@ -1,29 +0,0 @@
import * as Message from "../message"
// Wrapper to run the decoder in a Worker
export default class Decoder {
worker: Worker;
constructor(config: Message.Config) {
const url = new URL('worker.ts', import.meta.url)
this.worker = new Worker(url, {
name: "audio",
type: "module",
})
this.worker.onmessage = this.onMessage.bind(this)
this.worker.postMessage({ config })
}
init(init: Message.Init) {
this.worker.postMessage({ init })
}
segment(segment: Message.Segment) {
this.worker.postMessage({ segment }, [ segment.buffer.buffer, segment.reader ])
}
private onMessage(e: MessageEvent) {
// TODO
}
}

View File

@ -1,35 +1,77 @@
import * as Message from "./message" import * as Message from "./message"
import Renderer from "./renderer" import Renderer from "./renderer"
import Decoder from "./decoder" import Decoder from "./decoder"
import { RingState } from "./ring"
import { RingInit } from "./ring"
// Abstracts the Worker and Worklet into a simpler API // Abstracts the Worker and Worklet into a simpler API
// This class must be created on the main thread due to AudioContext. // This class must be created on the main thread due to AudioContext.
export default class Audio { export default class Audio {
decoder: Decoder; // WebWorker context: AudioContext;
renderer: Renderer; // AudioWorklet worker: Worker;
worklet: Promise<AudioWorkletNode>;
constructor() { constructor() {
// Assume 44.1kHz and two audio channels // Assume 44.1kHz and two audio channels
const config = { const config = {
sampleRate: 44100, sampleRate: 44100,
channels: [ new RingState(44100), new RingState(44100) ], ring: new RingInit(2, 4410), // 100ms at 44.1khz
} }
// Start loading the worker script this.context = new AudioContext({
this.decoder = new Decoder(config) latencyHint: "interactive",
this.renderer = new Renderer(config) sampleRate: config.sampleRate,
})
this.worker = this.setupWorker(config)
this.worklet = this.setupWorklet(config)
}
private setupWorker(config: Message.Config): Worker {
const url = new URL('worker.ts', import.meta.url)
const worker = new Worker(url, {
name: "audio",
type: "module",
})
worker.postMessage({ config })
return worker
}
private async setupWorklet(config: Message.Config): Promise<AudioWorkletNode> {
// Load the worklet source code.
const url = new URL('worklet.ts', import.meta.url)
await this.context.audioWorklet.addModule(url)
const volume = this.context.createGain()
volume.gain.value = 2.0;
// Create a worklet
const worklet = new AudioWorkletNode(this.context, 'renderer');
worklet.onprocessorerror = (e: Event) => {
console.error("Audio worklet error:", e)
};
worklet.port.postMessage({ config })
// Connect the worklet to the volume node and then to the speakers
worklet.connect(volume)
volume.connect(this.context.destination)
return worklet
} }
init(init: Message.Init) { init(init: Message.Init) {
this.decoder.init(init) this.worker.postMessage({ init })
} }
segment(segment: Message.Segment) { segment(segment: Message.Segment) {
this.decoder.segment(segment) this.worker.postMessage({ segment }, [ segment.buffer.buffer, segment.reader ])
} }
play() { play(play: Message.Play) {
this.renderer.play() this.context.resume()
//this.worker.postMessage({ play })
} }
} }

View File

@ -1,9 +1,9 @@
import * as MP4 from "../mp4" import * as MP4 from "../mp4"
import { RingState } from "./ring" import { RingInit } from "./ring"
export interface Config { export interface Config {
sampleRate: number; sampleRate: number;
channels: RingState[]; ring: RingInit;
} }
export interface Init { export interface Init {
@ -17,3 +17,14 @@ export interface Segment {
buffer: Uint8Array; // unread buffered data buffer: Uint8Array; // unread buffered data
reader: ReadableStream; // unread unbuffered data reader: ReadableStream; // unread unbuffered data
} }
// Audio tells video when the given timestamp should be rendered.
export interface Sync {
origin: number;
clock: DOMHighResTimeStamp;
timestamp: number;
}
export interface Play {
timestamp?: number;
}

View File

@ -0,0 +1,85 @@
import * as Message from "./message"
import { Ring } from "./ring"
export default class Renderer {
ring: Ring;
queue: Array<AudioData>;
sync?: DOMHighResTimeStamp
running: number;
constructor(config: Message.Config) {
this.ring = new Ring(config.ring)
this.queue = [];
this.running = 0
}
emit(frame: AudioData) {
if (!this.sync) {
// Save the frame as the sync point
this.sync = 1000 * performance.now() - frame.timestamp
}
// Insert the frame into the queue sorted by timestamp.
if (this.queue.length > 0 && this.queue[this.queue.length-1].timestamp <= frame.timestamp) {
// Fast path because we normally append to the end.
this.queue.push(frame)
} else {
// Do a full binary search
let low = 0
let high = this.queue.length;
while (low < high) {
var mid = (low + high) >>> 1;
if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1;
else high = mid;
}
this.queue.splice(low, 0, frame)
}
if (!this.running) {
// Wait for the next animation frame
this.running = self.requestAnimationFrame(this.render.bind(this))
}
}
render() {
// Determine the target timestamp.
const target = 1000 * performance.now() - this.sync!
// Check if we should skip some frames
while (this.queue.length) {
const next = this.queue[0]
if (next.timestamp >= target) {
break
}
console.warn("dropping audio")
this.queue.shift()
next.close()
}
// Push as many as we can to the ring buffer.
while (this.queue.length) {
let frame = this.queue[0]
let ok = this.ring.write(frame)
if (!ok) {
break
}
frame.close()
this.queue.shift()
}
if (this.queue.length) {
this.running = self.requestAnimationFrame(this.render.bind(this))
} else {
this.running = 0
}
}
play(play: Message.Play) {
this.ring.reset()
}
}

View File

@ -1,47 +0,0 @@
import * as Message from "../message"
export default class Renderer {
context: AudioContext;
worklet: Promise<AudioWorkletNode>;
constructor(config: Message.Config) {
this.context = new AudioContext({
latencyHint: "interactive",
sampleRate: config.sampleRate,
})
this.worklet = this.setup(config)
}
private async setup(config: Message.Config): Promise<AudioWorkletNode> {
// Load the worklet source code.
const url = new URL('worklet.ts', import.meta.url)
await this.context.audioWorklet.addModule(url)
const volume = this.context.createGain()
volume.gain.value = 2.0;
// Create a worklet
const worklet = new AudioWorkletNode(this.context, 'renderer');
worklet.onprocessorerror = (e: Event) => {
console.error("Audio worklet error:", e)
};
worklet.port.onmessage = this.onMessage.bind(this)
worklet.port.postMessage({ config })
// Connect the worklet to the volume node and then to the speakers
worklet.connect(volume)
volume.connect(this.context.destination)
return worklet
}
private onMessage(e: MessageEvent) {
// TODO
}
play() {
this.context.resume()
}
}

View File

@ -1,142 +1,143 @@
// Ring buffer with audio samples. // Ring buffer with audio samples.
enum STATE { enum STATE {
START = 0, // First timestamp in the ring buffer. READ_INDEX = 0, // Index of the current read position (mod capacity)
END, // Last timestamp in the ring buffer. WRITE_INDEX, // Index of the current write position (mod capacity)
LENGTH // Clever way of saving the total number of enums values. LENGTH // Clever way of saving the total number of enums values.
} }
export class Ring { export class Ring {
state: RingState; state: Int32Array;
channels: Float32Array[];
capacity: number;
constructor(state: RingState) { constructor(init: RingInit) {
this.state = state this.state = new Int32Array(init.state)
this.channels = []
for (let channel of init.channels) {
this.channels.push(new Float32Array(channel))
} }
// Add the samples for single audio frame with the given channel this.capacity = init.capacity
emit(timestamp: number, frame: AudioData, channel: number) {
let start = timestamp;
// The number of samples to skip at the start.
let offset = this.start - timestamp;
if (offset > 0) {
console.warn("dropping old samples", offset)
start += offset;
} else {
offset = 0
} }
let count = frame.numberOfFrames - offset; // Add the samples for single audio frame
if (count <= 0) { write(frame: AudioData): boolean {
frame.close() let count = frame.numberOfFrames;
// Skip the entire frame let readIndex = Atomics.load(this.state, STATE.READ_INDEX)
return let writeIndex = Atomics.load(this.state, STATE.WRITE_INDEX)
let writeIndexNew = writeIndex + count;
// There's not enough space in the ring buffer
if (writeIndexNew - readIndex > this.capacity) {
return false
} }
let end = timestamp + count; let startIndex = writeIndex % this.capacity;
let endIndex = writeIndexNew % this.capacity;
if (end >= start + this.state.capacity) { // Loop over each channel
// The renderer has to buffer frames; we have a fixed capacity. for (let i = 0; i < this.channels.length; i += 1) {
// TODO maybe it's better to buffer here instead. const channel = this.channels[i]
throw new Error("exceeded capacity")
}
const startIndex = start % this.state.capacity;
const endIndex = end % this.state.capacity;
if (startIndex < endIndex) { if (startIndex < endIndex) {
// One continuous range to copy. // One continuous range to copy.
const full = new Float32Array(this.state.buffer, 4*startIndex, endIndex-startIndex) const full = channel.subarray(startIndex, endIndex)
frame.copyTo(full, { frame.copyTo(full, {
planeIndex: channel, planeIndex: i,
frameOffset: offset,
frameCount: count, frameCount: count,
}) })
} else { } else {
// Wrapped around the ring buffer, so we have to copy twice. const first = channel.subarray(startIndex)
const wrap = this.state.capacity - startIndex; const second = channel.subarray(0, endIndex)
const first = new Float32Array(this.state.buffer, 4*startIndex, this.state.capacity - startIndex)
const second = new Float32Array(this.state.buffer, 0, endIndex)
frame.copyTo(first, { frame.copyTo(first, {
planeIndex: channel, planeIndex: i,
frameOffset: offset, frameCount: first.length,
frameCount: wrap,
}) })
frame.copyTo(second, { frame.copyTo(second, {
planeIndex: channel, planeIndex: i,
frameOffset: offset + wrap, frameOffset: first.length,
frameCount: endIndex, frameCount: second.length,
}) })
} }
if (this.end < end) {
this.end = end
}
} }
peek(count: number): Float32Array[] { Atomics.store(this.state, STATE.WRITE_INDEX, writeIndexNew)
const start = this.start
let end = this.end return true
if (end > start + count) {
end = start + count
} }
const startIndex = start % this.state.capacity; read(dst: Float32Array[]) {
const endIndex = end % this.state.capacity; let readIndex = Atomics.load(this.state, STATE.READ_INDEX)
let writeIndex = Atomics.load(this.state, STATE.WRITE_INDEX)
if (readIndex >= writeIndex) {
// nothing to read
return
}
let readIndexNew = readIndex + dst[0].length
if (readIndexNew > writeIndex) {
// Partial read
readIndexNew = writeIndex
}
let startIndex = readIndex % this.capacity;
let endIndex = readIndexNew % this.capacity;
// Loop over each channel
for (let i = 0; i < dst.length; i += 1) {
if (i >= this.channels.length) {
// ignore excess channels
}
const input = this.channels[i]
const output = dst[i]
if (startIndex < endIndex) { if (startIndex < endIndex) {
const full = new Float32Array(this.state.buffer, 4*startIndex, endIndex - startIndex) const full = input.subarray(startIndex, endIndex)
return [ full ] output.set(full)
} else { } else {
const first = new Float32Array(this.state.buffer, 4*startIndex, this.state.capacity - startIndex) const first = input.subarray(startIndex)
const second = new Float32Array(this.state.buffer, 0, endIndex) const second = input.subarray(0, endIndex)
return [ first, second ]
output.set(first)
output.set(second, first.length)
} }
} }
advance(count: number) { Atomics.store(this.state, STATE.READ_INDEX, readIndexNew)
this.start += count
} }
set start(start: number) { // TODO not thread safe
Atomics.store(this.state.stateView, STATE.START, start); clear() {
} const writeIndex = Atomics.load(this.state, STATE.WRITE_INDEX)
Atomics.store(this.state, STATE.READ_INDEX, writeIndex)
get start(): number {
return Atomics.load(this.state.stateView, STATE.START);
}
set end(end: number) {
Atomics.store(this.state.stateView, STATE.END, end);
}
get end(): number {
return Atomics.load(this.state.stateView, STATE.END);
} }
} }
// No prototype to make this easier to send via postMessage // No prototype to make this easier to send via postMessage
export class RingState { export class RingInit {
state: SharedArrayBuffer; state: SharedArrayBuffer;
stateView: Int32Array;
buffer: SharedArrayBuffer;
channels: SharedArrayBuffer[];
capacity: number; capacity: number;
constructor(capacity: number) { constructor(channels: number, capacity: number) {
// Store this many samples in a ring buffer.
this.buffer = new SharedArrayBuffer(capacity * Float32Array.BYTES_PER_ELEMENT)
this.capacity = capacity
// Store the current state in a separate ring buffer. // Store the current state in a separate ring buffer.
this.state = new SharedArrayBuffer(STATE.LENGTH * Int32Array.BYTES_PER_ELEMENT) this.state = new SharedArrayBuffer(STATE.LENGTH * Int32Array.BYTES_PER_ELEMENT)
this.stateView = new Int32Array(this.state)
// Create a buffer for each audio channel
this.channels = []
for (let i = 0; i < channels; i += 1) {
const buffer = new SharedArrayBuffer(capacity * Float32Array.BYTES_PER_ELEMENT)
this.channels.push(buffer)
}
this.capacity = capacity
} }
} }

View File

@ -1,10 +1,15 @@
import Decoder from "./decoder" import Decoder from "./decoder"
import Renderer from "./renderer"
import * as Message from "./message"
let decoder: Decoder let decoder: Decoder
let renderer: Renderer;
self.addEventListener('message', (e: MessageEvent) => { self.addEventListener('message', (e: MessageEvent) => {
if (e.data.config) { if (e.data.config) {
decoder = new Decoder(e.data.config) renderer = new Renderer(e.data.config)
decoder = new Decoder(e.data.config, renderer)
} }
if (e.data.init) { if (e.data.init) {
@ -14,4 +19,8 @@ self.addEventListener('message', (e: MessageEvent) => {
if (e.data.segment) { if (e.data.segment) {
decoder.decode(e.data.segment) decoder.decode(e.data.segment)
} }
if (e.data.play) {
renderer.play(e.data.play)
}
}) })

View File

@ -2,17 +2,19 @@
// The renderer copies audio samples to a ring buffer read by this worklet. // The renderer copies audio samples to a ring buffer read by this worklet.
// The worklet then outputs those samples to emit audio. // The worklet then outputs those samples to emit audio.
import * as Message from "../message" import * as Message from "./message"
import { Ring, RingState } from "../ring" import { Ring } from "./ring"
class Renderer extends AudioWorkletProcessor { class Renderer extends AudioWorkletProcessor {
channels?: Ring[]; ring?: Ring;
base: number;
constructor(params: AudioWorkletNodeOptions) { constructor(params: AudioWorkletNodeOptions) {
// The super constructor call is required. // The super constructor call is required.
super(); super();
this.base = 0
this.port.onmessage = this.onMessage.bind(this) this.port.onmessage = this.onMessage.bind(this)
} }
@ -23,15 +25,12 @@ class Renderer extends AudioWorkletProcessor {
} }
config(config: Message.Config) { config(config: Message.Config) {
this.channels = [] this.ring = new Ring(config.ring)
for (let state of config.channels) {
this.channels.push(new Ring(state))
}
} }
// Inputs and outputs in groups of 128 samples. // Inputs and outputs in groups of 128 samples.
process(inputs: Float32Array[][], outputs: Float32Array[][], parameters: Record<string, Float32Array>): boolean { process(inputs: Float32Array[][], outputs: Float32Array[][], parameters: Record<string, Float32Array>): boolean {
if (!this.channels) { if (!this.ring) {
// Not initialized yet // Not initialized yet
return true return true
} }
@ -41,26 +40,7 @@ class Renderer extends AudioWorkletProcessor {
} }
const output = outputs[0] const output = outputs[0]
this.ring.read(output)
for (let i = 0; i < output.length; i += 1) {
const source = this.channels[i]
const channel = output[i];
const parts = source.peek(channel.length)
let offset = 0
for (let i = 0; i < parts.length; i += 1) {
channel.set(parts[i], offset)
offset += parts[i].length
}
if (offset < channel.length) {
// TODO render silence
}
// Always advance the full amount.
source.advance(channel.length)
}
return true; return true;
} }

View File

@ -34,7 +34,13 @@ export default class Player {
} }
play() { play() {
this.audio.play() this.audio.play({})
this.video.play() //this.video.play()
}
onMessage(msg: any) {
if (msg.sync) {
msg.sync
}
} }
} }

View File

@ -4,17 +4,13 @@ export default class Renderer {
canvas: OffscreenCanvas; canvas: OffscreenCanvas;
queue: Array<VideoFrame>; queue: Array<VideoFrame>;
render: number; // non-zero if requestAnimationFrame has been called render: number; // non-zero if requestAnimationFrame has been called
sync: DOMHighResTimeStamp; // the wall clock value for timestamp 0 sync?: DOMHighResTimeStamp; // the wall clock value for timestamp 0
last?: number; // the timestamp of the last rendered frame last?: number; // the timestamp of the last rendered frame
maxDuration: number; // the maximum duration allowed in the buffer
constructor(config: Message.Config) { constructor(config: Message.Config) {
this.canvas = config.canvas; this.canvas = config.canvas;
this.queue = []; this.queue = [];
this.render = 0; this.render = 0;
this.sync = 0;
this.maxDuration = 10 * 1000
} }
emit(frame: VideoFrame) { emit(frame: VideoFrame) {
@ -30,14 +26,14 @@ export default class Renderer {
} }
// Insert the frame into the queue sorted by timestamp. // Insert the frame into the queue sorted by timestamp.
let low = 0
let high = this.queue.length;
// Fast path because we normally append to the end.
if (this.queue.length > 0 && this.queue[this.queue.length-1].timestamp <= frame.timestamp) { if (this.queue.length > 0 && this.queue[this.queue.length-1].timestamp <= frame.timestamp) {
// Fast path because we normally append to the end.
this.queue.push(frame) this.queue.push(frame)
} else { } else {
// Do a full binary search // Do a full binary search
let low = 0
let high = this.queue.length;
while (low < high) { while (low < high) {
var mid = (low + high) >>> 1; var mid = (low + high) >>> 1;
if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1; if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1;
@ -47,18 +43,6 @@ export default class Renderer {
this.queue.splice(low, 0, frame) this.queue.splice(low, 0, frame)
} }
// Trim the max size of the buffer
const last = this.queue[this.queue.length-1].timestamp
while (1) {
const first = this.queue[0]
if (first.timestamp + this.maxDuration >= last) {
break
}
first.close()
this.queue.shift()
}
// Queue up to render the next frame. // Queue up to render the next frame.
if (!this.render) { if (!this.render) {
this.render = self.requestAnimationFrame(this.draw.bind(this)) this.render = self.requestAnimationFrame(this.draw.bind(this))
@ -67,10 +51,10 @@ export default class Renderer {
draw(now: DOMHighResTimeStamp) { draw(now: DOMHighResTimeStamp) {
// Determine the target timestamp. // Determine the target timestamp.
const target = now - this.sync const target = now - this.sync!
let frame = this.queue[0] let frame = this.queue[0]
if (frame.timestamp > target) { if (frame.timestamp >= target) {
// nothing to render yet, wait for the next animation frame // nothing to render yet, wait for the next animation frame
this.render = self.requestAnimationFrame(this.draw.bind(this)) this.render = self.requestAnimationFrame(this.draw.bind(this))
return return
@ -92,7 +76,7 @@ export default class Renderer {
} }
const ctx = this.canvas.getContext("2d"); const ctx = this.canvas.getContext("2d");
ctx?.drawImage(frame, 0, 0, this.canvas.width, this.canvas.height) // TODO aspect ratio ctx!.drawImage(frame, 0, 0, this.canvas.width, this.canvas.height) // TODO aspect ratio
this.last = frame.timestamp; this.last = frame.timestamp;
frame.close() frame.close()