Audio worksish.

This commit is contained in:
Luke Curley 2023-04-03 12:48:02 -07:00
parent a5278e6e04
commit 9e29d552a0
24 changed files with 468 additions and 384 deletions

View File

@ -1,33 +1,53 @@
import * as Message from "./message";
import * as MP4 from "../mp4"
import * as Stream from "../stream"
import * as Util from "../util"
import * as Message from "../message";
import * as MP4 from "../../mp4"
import * as Stream from "../../stream"
import * as Util from "../../util"
import { Ring, RingState } from "../ring"
import { Renderer } from "./renderer"
// Ignore the timestamp output by WebCodecs since it's in microseconds
// We will manually set the timestamp based on the sample rate.
let frameCount = 0
export class Decoder {
export default class Decoder {
// Store the init message for each track
tracks: Map<string, Util.Deferred<Message.Init>>
renderer: Renderer;
tracks: Map<string, Util.Deferred<Message.Init>>;
sampleRate: number;
channels: Ring[];
sync?: number; // the first timestamp
constructor(config: Message.Config, renderer: Renderer) {
constructor(config: Message.Config) {
this.tracks = new Map();
this.renderer = renderer;
this.sampleRate = config.sampleRate
this.channels = []
for (let state of config.channels) {
this.channels.push(new Ring(state))
}
}
async init(msg: Message.Init) {
let track = this.tracks.get(msg.track);
if (!track) {
track = new Util.Deferred()
this.tracks.set(msg.track, track)
init(msg: Message.Init) {
let defer = this.tracks.get(msg.track);
if (!defer) {
defer = new Util.Deferred()
this.tracks.set(msg.track, defer)
}
console.log(msg.info)
if (msg.info.audioTracks.length != 1 || msg.info.videoTracks.length != 0) {
throw new Error("Expected a single audio track")
}
track.resolve(msg)
const track = msg.info.audioTracks[0]
const audio = track.audio
if (audio.sample_rate != this.sampleRate) {
throw new Error("sample rate not supported")
}
if (audio.channel_count > this.channels.length) {
throw new Error("channel count not supported")
}
defer.resolve(msg)
}
async decode(msg: Message.Segment) {
@ -39,18 +59,20 @@ export class Decoder {
// Wait for the init segment to be fully received and parsed
const init = await track.promise;
const info = init.info;
const audio = info.audioTracks[0]
const audio = init.info.audioTracks[0]
const decoder = new AudioDecoder({
output: (frame: AudioData) => {
this.renderer.emit(frame)
for (let i = 0; i < frame.numberOfChannels; i += 1) {
this.channels[i].emit(frameCount, frame, i)
}
frameCount += frame.numberOfFrames;
},
error: (err: Error) => {
console.warn(err)
}
});
console.log(audio)
decoder.configure({
codec: audio.codec,
@ -63,12 +85,20 @@ export class Decoder {
input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => {
for (let sample of samples) {
// TODO this assumes that timescale == sample rate
if (!this.sync) {
this.sync = sample.dts;
}
// Convert to milliseconds
const timestamp = 1000 * (sample.dts - this.sync) / sample.timescale
const duration = 1000 * sample.duration / sample.timescale
// This assumes that timescale == sample rate
decoder.decode(new EncodedAudioChunk({
type: sample.is_sync ? "key" : "delta",
data: sample.data,
duration: sample.duration,
timestamp: sample.dts,
duration: duration,
timestamp: timestamp,
}))
}
}

View File

@ -0,0 +1,29 @@
import * as Message from "../message"
// Wrapper to run the decoder in a Worker
export default class Decoder {
worker: Worker;
constructor(config: Message.Config) {
const url = new URL('worker.ts', import.meta.url)
this.worker = new Worker(url, {
name: "audio",
type: "module",
})
this.worker.onmessage = this.onMessage.bind(this)
this.worker.postMessage({ config })
}
init(init: Message.Init) {
this.worker.postMessage({ init })
}
segment(segment: Message.Segment) {
this.worker.postMessage({ segment }, [ segment.buffer.buffer, segment.reader ])
}
private onMessage(e: MessageEvent) {
// TODO
}
}

View File

@ -0,0 +1,17 @@
import Decoder from "./decoder"
let decoder: Decoder
self.addEventListener('message', (e: MessageEvent) => {
if (e.data.config) {
decoder = new Decoder(e.data.config)
}
if (e.data.init) {
decoder.init(e.data.init)
}
if (e.data.segment) {
decoder.decode(e.data.segment)
}
})

View File

@ -1,21 +1,35 @@
import * as Message from "./message"
import { Renderer } from "./renderer"
import { Decoder } from "./decoder"
import Renderer from "./renderer"
import Decoder from "./decoder"
import { RingState } from "./ring"
// Abstracts the Worker and Worklet into a simpler API
// This class must be created on the main thread due to AudioContext.
export default class Audio {
renderer: Renderer;
decoder: Decoder;
decoder: Decoder; // WebWorker
renderer: Renderer; // AudioWorklet
constructor(config: Message.Config) {
constructor() {
// Assume 44.1kHz and two audio channels
const config = {
sampleRate: 44100,
channels: [ new RingState(44100), new RingState(44100) ],
}
// Start loading the worker script
this.decoder = new Decoder(config)
this.renderer = new Renderer(config)
this.decoder = new Decoder(config, this.renderer)
}
async init(init: Message.Init) {
await this.decoder.init(init)
init(init: Message.Init) {
this.decoder.init(init)
}
async segment(segment: Message.Segment) {
await this.decoder.decode(segment)
segment(segment: Message.Segment) {
this.decoder.segment(segment)
}
play() {
this.renderer.play()
}
}

View File

@ -1,17 +1,19 @@
import * as MP4 from "../mp4"
import { RingState } from "./ring"
export interface Config {
ctx: AudioContext;
sampleRate: number;
channels: RingState[];
}
export interface Init {
track: string;
info: MP4.Info;
raw: MP4.ArrayBufferOffset[];
raw: MP4.ArrayBuffer[];
}
export interface Segment {
track: string;
buffer: Uint8Array; // unread buffered data
reader: ReadableStream; // unread unbuffered data
}
}

View File

@ -1,24 +0,0 @@
import * as Message from "./message";
import Source from "./source";
export class Renderer {
source: Source;
render: number; // non-zero if requestAnimationFrame has been called
last?: number; // the timestamp of the last rendered frame
maxDuration: number; // the maximum duration allowed in the buffer
constructor(config: Message.Config) {
this.render = 0;
this.maxDuration = 10 * 1000
// TODO evaluate { latencyHint: "interactive" }
this.source = new Source(config.ctx)
}
emit(frame: AudioData) {
this.source.emit(frame)
}
}

View File

@ -0,0 +1,47 @@
import * as Message from "../message"
export default class Renderer {
context: AudioContext;
worklet: Promise<AudioWorkletNode>;
constructor(config: Message.Config) {
this.context = new AudioContext({
latencyHint: "interactive",
sampleRate: config.sampleRate,
})
this.worklet = this.setup(config)
}
private async setup(config: Message.Config): Promise<AudioWorkletNode> {
// Load the worklet source code.
const url = new URL('worklet.ts', import.meta.url)
await this.context.audioWorklet.addModule(url)
const volume = this.context.createGain()
volume.gain.value = 2.0;
// Create a worklet
const worklet = new AudioWorkletNode(this.context, 'renderer');
worklet.onprocessorerror = (e: Event) => {
console.error("Audio worklet error:", e)
};
worklet.port.onmessage = this.onMessage.bind(this)
worklet.port.postMessage({ config })
// Connect the worklet to the volume node and then to the speakers
worklet.connect(volume)
volume.connect(this.context.destination)
return worklet
}
private onMessage(e: MessageEvent) {
// TODO
}
play() {
this.context.resume()
}
}

View File

@ -0,0 +1,69 @@
// This is an AudioWorklet that acts as a media source.
// The renderer copies audio samples to a ring buffer read by this worklet.
// The worklet then outputs those samples to emit audio.
import * as Message from "../message"
import { Ring, RingState } from "../ring"
class Renderer extends AudioWorkletProcessor {
channels?: Ring[];
constructor(params: AudioWorkletNodeOptions) {
// The super constructor call is required.
super();
this.port.onmessage = this.onMessage.bind(this)
}
onMessage(e: MessageEvent) {
if (e.data.config) {
this.config(e.data.config)
}
}
config(config: Message.Config) {
this.channels = []
for (let state of config.channels) {
this.channels.push(new Ring(state))
}
}
// Inputs and outputs in groups of 128 samples.
process(inputs: Float32Array[][], outputs: Float32Array[][], parameters: Record<string, Float32Array>): boolean {
if (!this.channels) {
// Not initialized yet
return true
}
if (inputs.length != 1 && outputs.length != 1) {
throw new Error("only a single track is supported")
}
const output = outputs[0]
for (let i = 0; i < output.length; i += 1) {
const source = this.channels[i]
const channel = output[i];
const parts = source.peek(channel.length)
let offset = 0
for (let i = 0; i < parts.length; i += 1) {
channel.set(parts[i], offset)
offset += parts[i].length
}
if (offset < channel.length) {
// TODO render silence
}
// Always advance the full amount.
source.advance(channel.length)
}
return true;
}
}
registerProcessor("renderer", Renderer);

142
player/src/audio/ring.ts Normal file
View File

@ -0,0 +1,142 @@
// Ring buffer with audio samples.
enum STATE {
START = 0, // First timestamp in the ring buffer.
END, // Last timestamp in the ring buffer.
LENGTH // Clever way of saving the total number of enums values.
}
export class Ring {
state: RingState;
constructor(state: RingState) {
this.state = state
}
// Add the samples for single audio frame with the given channel
emit(timestamp: number, frame: AudioData, channel: number) {
let start = timestamp;
// The number of samples to skip at the start.
let offset = this.start - timestamp;
if (offset > 0) {
console.warn("dropping old samples", offset)
start += offset;
} else {
offset = 0
}
let count = frame.numberOfFrames - offset;
if (count <= 0) {
frame.close()
// Skip the entire frame
return
}
let end = timestamp + count;
if (end >= start + this.state.capacity) {
// The renderer has to buffer frames; we have a fixed capacity.
// TODO maybe it's better to buffer here instead.
throw new Error("exceeded capacity")
}
const startIndex = start % this.state.capacity;
const endIndex = end % this.state.capacity;
if (startIndex < endIndex) {
// One continuous range to copy.
const full = new Float32Array(this.state.buffer, 4*startIndex, endIndex-startIndex)
frame.copyTo(full, {
planeIndex: channel,
frameOffset: offset,
frameCount: count,
})
} else {
// Wrapped around the ring buffer, so we have to copy twice.
const wrap = this.state.capacity - startIndex;
const first = new Float32Array(this.state.buffer, 4*startIndex, this.state.capacity - startIndex)
const second = new Float32Array(this.state.buffer, 0, endIndex)
frame.copyTo(first, {
planeIndex: channel,
frameOffset: offset,
frameCount: wrap,
})
frame.copyTo(second, {
planeIndex: channel,
frameOffset: offset + wrap,
frameCount: endIndex,
})
}
if (this.end < end) {
this.end = end
}
}
peek(count: number): Float32Array[] {
const start = this.start
let end = this.end
if (end > start + count) {
end = start + count
}
const startIndex = start % this.state.capacity;
const endIndex = end % this.state.capacity;
if (startIndex < endIndex) {
const full = new Float32Array(this.state.buffer, 4*startIndex, endIndex - startIndex)
return [ full ]
} else {
const first = new Float32Array(this.state.buffer, 4*startIndex, this.state.capacity - startIndex)
const second = new Float32Array(this.state.buffer, 0, endIndex)
return [ first, second ]
}
}
advance(count: number) {
this.start += count
}
set start(start: number) {
Atomics.store(this.state.stateView, STATE.START, start);
}
get start(): number {
return Atomics.load(this.state.stateView, STATE.START);
}
set end(end: number) {
Atomics.store(this.state.stateView, STATE.END, end);
}
get end(): number {
return Atomics.load(this.state.stateView, STATE.END);
}
}
// No prototype to make this easier to send via postMessage
export class RingState {
state: SharedArrayBuffer;
stateView: Int32Array;
buffer: SharedArrayBuffer;
capacity: number;
constructor(capacity: number) {
// Store this many samples in a ring buffer.
this.buffer = new SharedArrayBuffer(capacity * Float32Array.BYTES_PER_ELEMENT)
this.capacity = capacity
// Store the current state in a separate ring buffer.
this.state = new SharedArrayBuffer(STATE.LENGTH * Int32Array.BYTES_PER_ELEMENT)
this.stateView = new Int32Array(this.state)
}
}

View File

@ -1,64 +0,0 @@
import * as Message from "./message"
import Ring from "./ring"
// Wrapper around the AudioWorklet API to make it easier to use.
export default class Source {
ctx: AudioContext;
worklet?: AudioWorkletNode; // async initialization
channels: Ring[];
ready: Promise<void>;
constructor(ctx: AudioContext) {
this.ctx = ctx
// two channels, holding a maximum of 1s at 44khz
this.channels = [
new Ring(44000),
new Ring(44000),
]
// Start loading the worklet
this.ready = this.setup()
}
private async setup(): Promise<void> {
// Load the worklet source code.
await this.ctx.audioWorklet.addModule('worklet.ts')
// Create a worklet
this.worklet = new AudioWorkletNode(this.ctx, 'source');
this.worklet.port.onmessage = this.onMessage.bind(this)
this.worklet.onprocessorerror = (e: Event) => {
console.error("Audio worklet error:", e);
};
const config: Message.Config = {
channels: this.channels,
}
this.worklet.port.postMessage({ config })
}
private async onMessage(e: MessageEvent) {
if (e.data.configReply) {
const reply = e.data.configReply as Message.ConfigReply
if (reply.error) {
throw reply.error
}
// Start playback
this.worklet?.connect(this.ctx.destination);
}
}
emit(frame: AudioData) {
for (let i = 0; i < frame.numberOfChannels; i += 1) {
const ring = this.channels[i]
ring.set(frame, i)
}
}
}

View File

@ -1,11 +0,0 @@
import Ring from "./ring"
// Sent to the worklet to share ring buffers.
export interface Config {
channels: Ring[];
}
// Reply from the worklet indicating when the configuration was suscessful.
export interface ConfigReply {
error?: Error;
}

View File

@ -1,131 +0,0 @@
// Ring buffer with audio samples.
// TODO typescript enums when I have internet access
const STATE = {
START: 0,
END: 1,
}
export default class Ring {
state: SharedArrayBuffer;
stateView: Int32Array;
buffer: SharedArrayBuffer;
capacity: number;
constructor(samples: number) {
this.state = new SharedArrayBuffer(Object.keys(STATE).length * Int32Array.BYTES_PER_ELEMENT)
this.stateView = new Int32Array(this.state)
this.setStart(0)
this.setEnd(0)
this.capacity = samples;
// TODO better way to loop in modern Javascript?
this.buffer = new SharedArrayBuffer(samples * Float32Array.BYTES_PER_ELEMENT)
}
setStart(start: number) {
return Atomics.store(this.stateView, STATE.START, start);
}
getStart(): number {
return Atomics.load(this.stateView, STATE.START);
}
setEnd(end: number) {
return Atomics.store(this.stateView, STATE.START, end);
}
getEnd(): number {
return Atomics.load(this.stateView, STATE.END);
}
set(frame: AudioFrame, channel: number) {
let start = this.getStart()
// The number of samples to skip at the start.
let offset = start - frame.timestamp;
if (offset > 0) {
console.warn("dropping old samples", offset)
} else {
offset = 0
}
let count = frame.numberOfFrames - offset;
if (count <= 0) {
frame.close()
// Skip the entire frame
return
}
if (start + this.capacity < frame.timestamp + count) {
// The renderer has to buffer frames; we have a fixed capacity.
// TODO maybe it's better to buffer here instead.
throw new Error("exceeded capacity")
}
let end = this.getEnd()
const startIndex = start % this.capacity;
const endIndex = end % this.capacity;
if (startIndex < endIndex) {
// One continuous range to copy.
const full = new Float32Array(this.buffer, startIndex, endIndex-startIndex)
frame.copyTo(full, {
planeIndex: channel,
frameOffset: offset,
frameCount: count,
})
} else {
// Wrapped around the ring buffer, so we have to copy twice.
const wrap = this.capacity - startIndex;
const first = new Float32Array(this.buffer, startIndex)
const second = new Float32Array(this.buffer, 0, endIndex)
frame.copyTo(first, {
planeIndex: channel,
frameOffset: offset,
frameCount: wrap,
})
frame.copyTo(second, {
planeIndex: channel,
frameOffset: offset + wrap,
frameCount: endIndex,
})
}
// TODO insert silence when index > end
if (frame.timestamp + count > end) {
end = frame.timestamp + count
this.setEnd(end)
}
}
peek(count: number): Float32Array[] {
const start = this.getStart()
const end = this.getEnd()
const startIndex = start % this.capacity;
const endIndex = end % this.capacity;
if (startIndex < endIndex) {
const full = new Float32Array(this.buffer, startIndex, endIndex - startIndex)
return [ full ]
} else {
const first = new Float32Array(this.buffer, startIndex)
const second = new Float32Array(this.buffer, 0, endIndex)
return [ first, second ]
}
}
advance(count: number) {
this.setStart(this.getStart() + count)
}
}

View File

@ -1,68 +0,0 @@
// This is an AudioWorklet that acts as a media source.
// The renderer copies audio samples to a ring buffer read by this worklet.
// The worklet then outputs those samples to emit audio.
import * as Message from "../message"
import * as Util from "../../util"
import Ring from "./ring"
class Source extends AudioWorkletProcessor {
channels?: Ring[];
constructor() {
// The super constructor call is required.
super();
this.port.onmessage = (e: MessageEvent) => {
if (e.data.config) {
this.config(e.data.config as Message.Config)
}
}
}
static get parameterDescriptors() {
return [];
}
config(config: Message.Config) {
this.channels = config.channels;
}
// TODO correct types
process(inputs: any, outputs: any, parameters: any) {
if (!this.channels) {
return
}
if (outputs.length != 1) {
throw new Error("only a single track is supported")
}
const track = outputs[0];
for (let i = 0; i < track.length; i += 1) {
const input = this.channels[i]
const output = track[i];
const parts = input.peek(output.length)
let offset = 0
for (let i = 0; i < parts.length; i += 1) {
output.set(parts[i], offset)
offset += parts[i].length
}
if (offset < output.length) {
// TODO render silence
}
// Always advance the full amount.
input.advance(output.length)
}
return true;
}
}
self.registerProcessor("source", Source);

View File

@ -1,25 +0,0 @@
import { Renderer } from "./renderer"
import { Decoder } from "./decoder"
import * as Message from "./message"
let renderer: Renderer;
let decoder: Decoder;
self.addEventListener('message', async (e: MessageEvent) => {
if (e.data.config) {
const config = e.data.config as Message.Config;
renderer = new Renderer(config)
decoder = new Decoder(config, renderer)
}
if (e.data.init) {
const init = e.data.init as Message.Init
await decoder.init(init)
}
if (e.data.segment) {
const segment = e.data.segment as Message.Segment
await decoder.decode(segment)
}
})

View File

@ -16,7 +16,7 @@ body {
position: relative;
}
#play {
#screen #play {
position: absolute;
width: 100%;
height: 100%;

View File

@ -11,6 +11,7 @@
<body>
<div id="player">
<div id="screen">
<div id="play"><span>click for audio</span></div>
<canvas id="video" width="1280" height="720"></canvas>
</div>

View File

@ -1,11 +1,23 @@
import { Player } from "./transport/index"
import Player from "./player"
const params = new URLSearchParams(window.location.search)
const url = params.get("url") || "https://localhost:4443/watch"
const canvas = document.querySelector<HTMLCanvasElement>("canvas#video")!;
const canvas = document.querySelector<HTMLCanvasElement>("canvas#video")!
const player = new Player({
url: url,
canvas: canvas,
})
})
const play = document.querySelector<HTMLElement>("#screen #play")!
let playFunc = (e: Event) => {
player.play()
e.preventDefault()
play.removeEventListener('click', playFunc)
play.style.display = "none"
}
play.addEventListener('click', playFunc)

View File

@ -0,0 +1,40 @@
import Audio from "../audio"
import Transport from "../transport"
import Video from "../video"
export interface PlayerInit {
url: string;
canvas: HTMLCanvasElement;
}
export default class Player {
audio: Audio;
video: Video;
transport: Transport;
constructor(props: PlayerInit) {
this.audio = new Audio()
this.video = new Video({
canvas: props.canvas.transferControlToOffscreen(),
})
this.transport = new Transport({
url: props.url,
audio: this.audio,
video: this.video,
})
}
async close() {
this.transport.close()
}
async connect(url: string) {
await this.transport.connect(url)
}
play() {
this.audio.play()
this.video.play()
}
}

View File

@ -8,12 +8,13 @@ import Video from "../video"
// @ts-ignore bundler embeds data
import fingerprint from 'bundle-text:./fingerprint.hex';
export interface PlayerInit {
export interface TransportInit {
url: string;
canvas: HTMLCanvasElement;
audio: Audio;
video: Video;
}
export class Player {
export default class Transport {
quic: Promise<WebTransport>;
api: Promise<WritableStream>;
tracks: Map<string, MP4.InitParser>
@ -21,16 +22,11 @@ export class Player {
audio: Audio;
video: Video;
constructor(props: PlayerInit) {
constructor(props: TransportInit) {
this.tracks = new Map();
// TODO move these to another class so this only deals with the transport.
this.audio = new Audio({
ctx: new AudioContext(),
})
this.video = new Video({
canvas: props.canvas.transferControlToOffscreen(),
})
this.audio = props.audio;
this.video = props.video;
this.quic = this.connect(props.url)

View File

@ -3,9 +3,9 @@ import * as MP4 from "../mp4"
import * as Stream from "../stream"
import * as Util from "../util"
import { Renderer } from "./renderer"
import Renderer from "./renderer"
export class Decoder {
export default class Decoder {
// Store the init message for each track
tracks: Map<string, Util.Deferred<Message.Init>>
renderer: Renderer;
@ -116,7 +116,7 @@ export class Decoder {
box.set(atom)
// and for some reason we need to modify the underlying ArrayBuffer with offset
let buffer = box.buffer as MP4.ArrayBufferOffset
let buffer = box.buffer as MP4.ArrayBuffer
buffer.fileStart = offset
// Parse the data

View File

@ -5,7 +5,11 @@ export default class Video {
worker: Worker;
constructor(config: Message.Config) {
this.worker = new Worker(new URL('worker.ts', import.meta.url), { type: "module" })
const url = new URL('worker.ts', import.meta.url)
this.worker = new Worker(url, {
type: "module",
name: "video",
})
this.worker.postMessage({ config }, [ config.canvas ])
}
@ -16,4 +20,8 @@ export default class Video {
segment(segment: Message.Segment) {
this.worker.postMessage({ segment }, [ segment.buffer.buffer, segment.reader ])
}
play() {
// TODO
}
}

View File

@ -7,7 +7,7 @@ export interface Config {
export interface Init {
track: string;
info: MP4.Info;
raw: MP4.ArrayBufferOffset[];
raw: MP4.ArrayBuffer[];
}
export interface Segment {

View File

@ -1,6 +1,6 @@
import * as Message from "./message";
export class Renderer {
export default class Renderer {
canvas: OffscreenCanvas;
queue: Array<VideoFrame>;
render: number; // non-zero if requestAnimationFrame has been called
@ -34,7 +34,7 @@ export class Renderer {
let high = this.queue.length;
// Fast path because we normally append to the end.
if (this.queue.length > 0 && this.queue[this.queue.length].timestamp <= frame.timestamp) {
if (this.queue.length > 0 && this.queue[this.queue.length-1].timestamp <= frame.timestamp) {
this.queue.push(frame)
} else {
// Do a full binary search

View File

@ -1,5 +1,5 @@
import { Renderer } from "./renderer"
import { Decoder } from "./decoder"
import Renderer from "./renderer"
import Decoder from "./decoder"
import * as Message from "./message"
let decoder: Decoder;