From cc00a79881d45fa8f0ad21ef8fb7b9d358cc924f Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sun, 26 Mar 2023 16:04:51 -0700 Subject: [PATCH] Initial progress on WebCodecs. I'm on a plane and it's $18 to get Wifi for an hour. --- player/src/audio/index.ts | 3 + player/src/index.html | 3 +- player/src/index.ts | 45 +-- player/src/{ => mp4}/mp4.ts | 2 +- player/src/{ => mp4}/mp4box.all.js | 0 player/src/player.ts | 374 --------------------- player/src/{ => player}/init.ts | 2 +- player/src/segment.ts | 150 --------- player/src/source.ts | 147 -------- player/src/{stream.ts => stream/reader.ts} | 143 ++------ player/src/stream/writer.ts | 100 ++++++ player/src/track.ts | 124 ------- player/src/transport/index.ts | 135 ++++++++ player/src/{ => transport}/message.ts | 9 +- player/src/util.ts | 4 - player/src/video/decoder.ts | 123 +++++++ player/src/video/index.ts | 19 ++ player/src/video/message.ts | 15 + player/src/video/renderer.ts | 72 ++++ player/src/video/track.ts | 58 ++++ player/src/video/worker.ts | 22 ++ player/tsconfig.json | 2 +- 22 files changed, 588 insertions(+), 964 deletions(-) create mode 100644 player/src/audio/index.ts rename player/src/{ => mp4}/mp4.ts (99%) rename player/src/{ => mp4}/mp4box.all.js (100%) delete mode 100644 player/src/player.ts rename player/src/{ => player}/init.ts (94%) delete mode 100644 player/src/segment.ts delete mode 100644 player/src/source.ts rename player/src/{stream.ts => stream/reader.ts} (56%) create mode 100644 player/src/stream/writer.ts delete mode 100644 player/src/track.ts create mode 100644 player/src/transport/index.ts rename player/src/{ => transport}/message.ts (60%) delete mode 100644 player/src/util.ts create mode 100644 player/src/video/decoder.ts create mode 100644 player/src/video/index.ts create mode 100644 player/src/video/message.ts create mode 100644 player/src/video/renderer.ts create mode 100644 player/src/video/track.ts create mode 100644 player/src/video/worker.ts diff --git a/player/src/audio/index.ts b/player/src/audio/index.ts new file mode 100644 index 0000000..ebbc720 --- /dev/null +++ b/player/src/audio/index.ts @@ -0,0 +1,3 @@ +self.addEventListener('message', (e: Event) => { + +}) \ No newline at end of file diff --git a/player/src/index.html b/player/src/index.html index 92e0bdd..7ec7729 100644 --- a/player/src/index.html +++ b/player/src/index.html @@ -11,8 +11,7 @@
-
click to play
- +
diff --git a/player/src/index.ts b/player/src/index.ts index 40dbb7a..39e074a 100644 --- a/player/src/index.ts +++ b/player/src/index.ts @@ -1,48 +1,11 @@ -import { Player } from "./player" - -// This is so ghetto but I'm too lazy to improve it right now -const videoRef = document.querySelector("video#vid")!; -const liveRef = document.querySelector("#live")!; -const throttleRef = document.querySelector("#throttle")!; -const statsRef = document.querySelector("#stats")!; -const playRef = document.querySelector("#play")!; +import { Player } from "./transport/index" const params = new URLSearchParams(window.location.search) const url = params.get("url") || "https://localhost:4443/watch" +const canvas = document.querySelector("canvas#video")!; const player = new Player({ url: url, - videoRef: videoRef, - statsRef: statsRef, - throttleRef: throttleRef, -}) - -liveRef.addEventListener("click", (e) => { - e.preventDefault() - player.goLive() -}) - -throttleRef.addEventListener("click", (e) => { - e.preventDefault() - player.throttle() -}) - -playRef.addEventListener('click', (e) => { - videoRef.play() - e.preventDefault() -}) - -function playFunc(e: Event) { - playRef.style.display = "none" - //player.goLive() - - // Only fire once to restore pause/play functionality - videoRef.removeEventListener('play', playFunc) -} - -videoRef.addEventListener('play', playFunc) -videoRef.volume = 0.5 - -// Try to autoplay but ignore errors on mobile; they need to click -//vidRef.play().catch((e) => console.warn(e)) \ No newline at end of file + canvas: canvas, +}) \ No newline at end of file diff --git a/player/src/mp4.ts b/player/src/mp4/mp4.ts similarity index 99% rename from player/src/mp4.ts rename to player/src/mp4/mp4.ts index c150364..fdf3841 100644 --- a/player/src/mp4.ts +++ b/player/src/mp4/mp4.ts @@ -1,6 +1,6 @@ // Wrapper around MP4Box to play nicely with MP4Box. // I tried getting a mp4box.all.d.ts file to work but just couldn't figure it out -import { createFile, ISOFile, DataStream, BoxParser } from "./mp4box.all" +import { createFile, ISOFile, DataStream, BoxParser } from "./mp4box.all.js" // Rename some stuff so it's on brand. export { createFile as MP4New, ISOFile as MP4File, DataStream as MP4Stream, BoxParser as MP4Parser } diff --git a/player/src/mp4box.all.js b/player/src/mp4/mp4box.all.js similarity index 100% rename from player/src/mp4box.all.js rename to player/src/mp4/mp4box.all.js diff --git a/player/src/player.ts b/player/src/player.ts deleted file mode 100644 index 46487b9..0000000 --- a/player/src/player.ts +++ /dev/null @@ -1,374 +0,0 @@ -import { Source } from "./source" -import { StreamReader, StreamWriter } from "./stream" -import { InitParser } from "./init" -import { Segment } from "./segment" -import { Track } from "./track" -import { Message, MessageInit, MessageSegment } from "./message" - -/// - -export interface PlayerInit { - url: string; - - videoRef: HTMLVideoElement; - statsRef: HTMLElement; - throttleRef: HTMLElement; -} - -/* -*/ - - -export class Player { - mediaSource: MediaSource; - - init: Map; - audio: Track; - video: Track; - - quic: Promise; - api: Promise; - - // References to elements in the DOM - vidRef: HTMLVideoElement; // The video element itself - statsRef: HTMLElement; // The stats div - throttleRef: HTMLElement; // The throttle button - throttleCount: number; // number of times we've clicked the button in a row - - interval: number; - - timeRef?: DOMHighResTimeStamp; - - constructor(props: PlayerInit) { - this.vidRef = props.videoRef - this.statsRef = props.statsRef - this.throttleRef = props.throttleRef - this.throttleCount = 0 - - this.mediaSource = new MediaSource() - this.vidRef.src = URL.createObjectURL(this.mediaSource) - - this.init = new Map() - this.audio = new Track(new Source(this.mediaSource)); - this.video = new Track(new Source(this.mediaSource)); - - this.interval = setInterval(this.tick.bind(this), 100) - this.vidRef.addEventListener("waiting", this.tick.bind(this)) - - this.quic = this.connect(props.url) - - // Create a unidirectional stream for all of our messages - this.api = this.quic.then((q) => { - return q.createUnidirectionalStream() - }) - - // async functions - this.receiveStreams() - - // Limit to 4Mb/s - this.sendThrottle() - } - - async close() { - clearInterval(this.interval); - (await this.quic).close() - } - - async connect(url: string): Promise { - // TODO remove this when WebTransport supports the system CA pool - const fingerprintURL = new URL(url); - fingerprintURL.pathname = "/fingerprint" - - const response = await fetch(fingerprintURL) - if (!response.ok) { - throw new Error('failed to get server fingerprint'); - } - - const hex = await response.text() - - // Convert the hex to binary. - let fingerprint = []; - for (let c = 0; c < hex.length; c += 2) { - fingerprint.push(parseInt(hex.substring(c, c+2), 16)); - } - - //const fingerprint = Uint8Array.from(atob(hex), c => c.charCodeAt(0)) - - const quic = new WebTransport(url, { - "serverCertificateHashes": [{ - "algorithm": "sha-256", - "value": new Uint8Array(fingerprint), - }] - }) - - await quic.ready - - return quic - } - - async sendMessage(msg: any) { - const payload = JSON.stringify(msg) - const size = payload.length + 8 - - const stream = await this.api - - const writer = new StreamWriter(stream) - await writer.uint32(size) - await writer.string("warp") - await writer.string(payload) - writer.release() - } - - throttle() { - // Throttle is incremented each time we click the throttle button - this.throttleCount += 1 - this.sendThrottle() - - // After 5 seconds disable the throttling - setTimeout(() => { - this.throttleCount -= 1 - this.sendThrottle() - }, 5000) - } - - sendThrottle() { - let rate = 0; - - if (this.throttleCount > 0) { - // TODO detect the incoming bitrate instead of hard-coding - // Right shift by throttle to divide by 2,4,8,16,etc each time - const bitrate = 4 * 1024 * 1024 // 4Mb/s - - rate = bitrate >> (this.throttleCount-1) - - const str = formatBits(rate) + "/s" - this.throttleRef.textContent = `Throttle: ${ str }`; - } else { - this.throttleRef.textContent = "Throttle: none"; - } - - // Send the server a message to fake network congestion. - this.sendMessage({ - "debug": { - max_bitrate: rate, - }, - }) - } - - tick() { - // Try skipping ahead if there's no data in the current buffer. - this.trySeek() - - // Try skipping video if it would fix any desync. - this.trySkip() - - // Update the stats at the end - this.updateStats() - } - - goLive() { - const ranges = this.vidRef.buffered - if (!ranges.length) { - return - } - - this.vidRef.currentTime = ranges.end(ranges.length-1); - this.vidRef.play(); - } - - // Try seeking ahead to the next buffered range if there's a gap - trySeek() { - if (this.vidRef.readyState > 2) { // HAVE_CURRENT_DATA - // No need to seek - return - } - - const ranges = this.vidRef.buffered - if (!ranges.length) { - // Video has not started yet - return - } - - for (let i = 0; i < ranges.length; i += 1) { - const pos = ranges.start(i) - - if (this.vidRef.currentTime >= pos) { - // This would involve seeking backwards - continue - } - - console.warn("seeking forward", pos - this.vidRef.currentTime) - - this.vidRef.currentTime = pos - return - } - } - - // Try dropping video frames if there is future data available. - trySkip() { - let playhead: number | undefined - - if (this.vidRef.readyState > 2) { - // If we're not buffering, only skip video if it's before the current playhead - playhead = this.vidRef.currentTime - } - - this.video.advance(playhead) - } - - async receiveStreams() { - const q = await this.quic - const streams = q.incomingUnidirectionalStreams.getReader() - - while (true) { - const result = await streams.read() - if (result.done) break - - const stream = result.value - this.handleStream(stream) // don't await - } - } - - async handleStream(stream: ReadableStream) { - let r = new StreamReader(stream.getReader()) - - while (!await r.done()) { - const size = await r.uint32(); - const typ = new TextDecoder('utf-8').decode(await r.bytes(4)); - - if (typ != "warp") throw "expected warp atom" - if (size < 8) throw "atom too small" - - const payload = new TextDecoder('utf-8').decode(await r.bytes(size - 8)); - const msg = JSON.parse(payload) as Message - - if (msg.init) { - return this.handleInit(r, msg.init) - } else if (msg.segment) { - return this.handleSegment(r, msg.segment) - } - } - } - - async handleInit(stream: StreamReader, msg: MessageInit) { - let init = this.init.get(msg.id); - if (!init) { - init = new InitParser() - this.init.set(msg.id, init) - } - - while (1) { - const data = await stream.read() - if (!data) break - - init.push(data) - } - } - - async handleSegment(stream: StreamReader, msg: MessageSegment) { - let pending = this.init.get(msg.init); - if (!pending) { - pending = new InitParser() - this.init.set(msg.init, pending) - } - - // Wait for the init segment to be fully received and parsed - const init = await pending.ready; - - let track: Track; - if (init.info.videoTracks.length) { - track = this.video - } else { - track = this.audio - } - - const segment = new Segment(track.source, init, msg.timestamp) - - // The track is responsible for flushing the segments in order - track.add(segment) - - /* TODO I'm not actually sure why this code doesn't work; something trips up the MP4 parser - while (1) { - const data = await stream.read() - if (!data) break - - segment.push(data) - track.flush() // Flushes if the active segment has samples - } - */ - - // One day I'll figure it out; until then read one top-level atom at a time - while (!await stream.done()) { - const raw = await stream.peek(4) - const size = new DataView(raw.buffer, raw.byteOffset, raw.byteLength).getUint32(0) - const atom = await stream.bytes(size) - - segment.push(atom) - track.flush() // Flushes if the active segment has new samples - } - - segment.finish() - } - - updateStats() { - for (const child of this.statsRef.children) { - if (child.className == "audio buffer") { - const ranges: any = (this.audio) ? this.audio.buffered() : { length: 0 } - this.visualizeBuffer(child as HTMLElement, ranges) - } else if (child.className == "video buffer") { - const ranges: any = (this.video) ? this.video.buffered() : { length: 0 } - this.visualizeBuffer(child as HTMLElement, ranges) - } - } - } - - visualizeBuffer(element: HTMLElement, ranges: TimeRanges) { - const children = element.children - const max = 5 - - let index = 0 - let prev = 0 - - for (let i = 0; i < ranges.length; i += 1) { - let start = ranges.start(i) - this.vidRef.currentTime - let end = ranges.end(i) - this.vidRef.currentTime - - if (end < 0 || start > max) { - continue - } - - let fill: HTMLElement; - - if (index < children.length) { - fill = children[index] as HTMLElement; - } else { - fill = document.createElement("div") - element.appendChild(fill) - } - - fill.className = "fill" - fill.innerHTML = end.toFixed(2) - fill.setAttribute('style', "left: " + (100 * Math.max(start, 0) / max) + "%; right: " + (100 - 100 * Math.min(end, max) / max) + "%") - index += 1 - - prev = end - } - - for (let i = index; i < children.length; i += 1) { - element.removeChild(children[i]) - } - } -} - -// https://stackoverflow.com/questions/15900485/correct-way-to-convert-size-in-bytes-to-kb-mb-gb-in-javascript -function formatBits(bits: number, decimals: number = 1) { - if (bits === 0) return '0 bits'; - - const k = 1024; - const dm = decimals < 0 ? 0 : decimals; - const sizes = ['b', 'Kb', 'Mb', 'Gb', 'Tb', 'Pb', 'Eb', 'Zb', 'Yb']; - - const i = Math.floor(Math.log(bits) / Math.log(k)); - - return parseFloat((bits / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i]; -} diff --git a/player/src/init.ts b/player/src/player/init.ts similarity index 94% rename from player/src/init.ts rename to player/src/player/init.ts index 6d258a7..11594fa 100644 --- a/player/src/init.ts +++ b/player/src/player/init.ts @@ -1,4 +1,4 @@ -import { MP4New, MP4File, MP4ArrayBuffer, MP4Info } from "./mp4" +import { MP4New, MP4File, MP4ArrayBuffer, MP4Info } from "../mp4/mp4" export class InitParser { mp4box: MP4File; diff --git a/player/src/segment.ts b/player/src/segment.ts deleted file mode 100644 index 09f7649..0000000 --- a/player/src/segment.ts +++ /dev/null @@ -1,150 +0,0 @@ -import { Source } from "./source" -import { Init } from "./init" -import { MP4New, MP4File, MP4Sample, MP4Stream, MP4Parser, MP4ArrayBuffer } from "./mp4" - -// Manage a segment download, keeping a buffer of a single sample to potentially rewrite the duration. -export class Segment { - source: Source; // The SourceBuffer used to decode media. - offset: number; // The byte offset in the received file so far - samples: MP4Sample[]; // The samples ready to be flushed to the source. - timestamp: number; // The expected timestamp of the first sample in milliseconds - init: Init; - - dts?: number; // The parsed DTS of the first sample - timescale?: number; // The parsed timescale of the segment - - input: MP4File; // MP4Box file used to parse the incoming atoms. - output: MP4File; // MP4Box file used to write the outgoing atoms after modification. - - done: boolean; // The segment has been completed - - constructor(source: Source, init: Init, timestamp: number) { - this.source = source - this.offset = 0 - this.done = false - this.timestamp = timestamp - this.init = init - - this.input = MP4New(); - this.output = MP4New(); - this.samples = []; - - this.input.onReady = (info: any) => { - this.input.setExtractionOptions(info.tracks[0].id, {}, { nbSamples: 1 }); - - this.input.onSamples = this.onSamples.bind(this) - this.input.start(); - } - - // We have to reparse the init segment to work with mp4box - for (let i = 0; i < init.raw.length; i += 1) { - this.offset = this.input.appendBuffer(init.raw[i]) - - // Also populate the output with our init segment so it knows about tracks - this.output.appendBuffer(init.raw[i]) - } - - this.input.flush() - this.output.flush() - } - - push(data: Uint8Array) { - if (this.done) return; // ignore new data after marked done - - // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately - let box = new Uint8Array(data.byteLength); - box.set(data) - - // and for some reason we need to modify the underlying ArrayBuffer with offset - let buffer = box.buffer as MP4ArrayBuffer - buffer.fileStart = this.offset - - // Parse the data - this.offset = this.input.appendBuffer(buffer) - this.input.flush() - } - - onSamples(id: number, user: any, samples: MP4Sample[]) { - if (!samples.length) return; - - if (this.dts === undefined) { - this.dts = samples[0].dts; - this.timescale = samples[0].timescale; - } - - // Add the samples to a queue - this.samples.push(...samples) - } - - // Flushes any pending samples, returning true if the stream has finished. - flush(): boolean { - let stream = new MP4Stream(new ArrayBuffer(0), 0, false); // big-endian - - while (this.samples.length) { - // Keep a single sample if we're not done yet - if (!this.done && this.samples.length < 2) break; - - const sample = this.samples.shift() - if (!sample) break; - - let moof = this.output.createSingleSampleMoof(sample); - moof.write(stream); - - // adjusting the data_offset now that the moof size is known - moof.trafs[0].truns[0].data_offset = moof.size+8; //8 is mdat header - stream.adjustUint32(moof.trafs[0].truns[0].data_offset_position, moof.trafs[0].truns[0].data_offset); - - // @ts-ignore - var mdat = new MP4Parser.mdatBox(); - mdat.data = sample.data; - mdat.write(stream); - } - - this.source.initialize(this.init) - this.source.append(stream.buffer as ArrayBuffer) - - return this.done - } - - // The segment has completed - finish() { - this.done = true - this.flush() - - // Trim the buffer to 30s long after each segment. - this.source.trim(30) - } - - // Extend the last sample so it reaches the provided timestamp - skipTo(pts: number) { - if (this.samples.length == 0) return - let last = this.samples[this.samples.length-1] - - const skip = pts - (last.dts + last.duration); - - if (skip == 0) return; - if (skip < 0) throw "can't skip backwards" - - last.duration += skip - - if (this.timescale) { - console.warn("skipping video", skip / this.timescale) - } - } - - buffered() { - // Ignore if we have a single sample - if (this.samples.length <= 1) return undefined; - if (!this.timescale) return undefined; - - const first = this.samples[0]; - const last = this.samples[this.samples.length-1] - - - return { - length: 1, - start: first.dts / this.timescale, - end: (last.dts + last.duration) / this.timescale, - } - } -} diff --git a/player/src/source.ts b/player/src/source.ts deleted file mode 100644 index dc299a7..0000000 --- a/player/src/source.ts +++ /dev/null @@ -1,147 +0,0 @@ -import { Init } from "./init" - -// Create a SourceBuffer with convenience methods -export class Source { - sourceBuffer?: SourceBuffer; - mediaSource: MediaSource; - queue: Array; - init?: Init; - - constructor(mediaSource: MediaSource) { - this.mediaSource = mediaSource; - this.queue = []; - } - - // (re)initialize the source using the provided init segment. - initialize(init: Init) { - // Check if the init segment is already in the queue. - for (let i = this.queue.length - 1; i >= 0; i--) { - if ((this.queue[i] as SourceInit).init == init) { - // Already queued up. - return - } - } - - // Check if the init segment has already been applied. - if (this.init == init) { - return - } - - // Add the init segment to the queue so we call addSourceBuffer or changeType - this.queue.push({ - kind: "init", - init: init, - }) - - for (let i = 0; i < init.raw.length; i += 1) { - this.queue.push({ - kind: "data", - data: init.raw[i], - }) - } - - this.flush() - } - - // Append the segment data to the buffer. - append(data: Uint8Array | ArrayBuffer) { - this.queue.push({ - kind: "data", - data: data, - }) - - this.flush() - } - - // Return the buffered range. - buffered() { - if (!this.sourceBuffer) { - return { length: 0 } - } - - return this.sourceBuffer.buffered - } - - // Delete any media older than x seconds from the buffer. - trim(duration: number) { - this.queue.push({ - kind: "trim", - trim: duration, - }) - - this.flush() - } - - // Flush any queued instructions - flush() { - while (1) { - // Check if the buffer is currently busy. - if (this.sourceBuffer && this.sourceBuffer.updating) { - break; - } - - // Process the next item in the queue. - const next = this.queue.shift() - if (!next) { - break; - } - - switch (next.kind) { - case "init": - this.init = next.init; - - if (!this.sourceBuffer) { - // Create a new source buffer. - this.sourceBuffer = this.mediaSource.addSourceBuffer(this.init.info.mime) - - // Call flush automatically after each update finishes. - this.sourceBuffer.addEventListener('updateend', this.flush.bind(this)) - } else { - this.sourceBuffer.changeType(next.init.info.mime) - } - - break; - case "data": - if (!this.sourceBuffer) { - throw "failed to call initailize before append" - } - - this.sourceBuffer.appendBuffer(next.data) - - break; - case "trim": - if (!this.sourceBuffer) { - throw "failed to call initailize before trim" - } - - const end = this.sourceBuffer.buffered.end(this.sourceBuffer.buffered.length - 1) - next.trim; - const start = this.sourceBuffer.buffered.start(0) - - if (end > start) { - this.sourceBuffer.remove(start, end) - } - - break; - default: - throw "impossible; unknown SourceItem" - } - } - } -} - -interface SourceItem {} - -class SourceInit implements SourceItem { - kind!: "init"; - init!: Init; -} - -class SourceData implements SourceItem { - kind!: "data"; - data!: Uint8Array | ArrayBuffer; -} - -class SourceTrim implements SourceItem { - kind!: "trim"; - trim!: number; -} \ No newline at end of file diff --git a/player/src/stream.ts b/player/src/stream/reader.ts similarity index 56% rename from player/src/stream.ts rename to player/src/stream/reader.ts index 7dd7120..edab7d0 100644 --- a/player/src/stream.ts +++ b/player/src/stream/reader.ts @@ -1,18 +1,13 @@ // Reader wraps a stream and provides convience methods for reading pieces from a stream -export class StreamReader { - reader: ReadableStreamDefaultReader; // TODO make a separate class without promises when null +export default class Reader { + reader: ReadableStream; buffer: Uint8Array; - constructor(reader: ReadableStreamDefaultReader, buffer: Uint8Array = new Uint8Array(0)) { + constructor(reader: ReadableStream, buffer: Uint8Array = new Uint8Array(0)) { this.reader = reader this.buffer = buffer } - // TODO implementing pipeTo seems more reasonable than releasing the lock - release() { - this.reader.releaseLock() - } - // Returns any number of bytes async read(): Promise { if (this.buffer.byteLength) { @@ -21,13 +16,38 @@ export class StreamReader { return buffer } - const result = await this.reader.read() + const result = await this.reader.getReader().read() return result.value } + async readAll(): Promise { + while (1) { + const result = await this.reader.getReader().read() + if (result.done) { + break + } + + const buffer = new Uint8Array(result.value) + + if (this.buffer.byteLength == 0) { + this.buffer = buffer + } else { + const temp = new Uint8Array(this.buffer.byteLength + buffer.byteLength) + temp.set(this.buffer) + temp.set(buffer, this.buffer.byteLength) + this.buffer = temp + } + } + + const result = this.buffer + this.buffer = new Uint8Array() + + return result + } + async bytes(size: number): Promise { while (this.buffer.byteLength < size) { - const result = await this.reader.read() + const result = await this.reader.getReader().read() if (result.done) { throw "short buffer" } @@ -52,7 +72,7 @@ export class StreamReader { async peek(size: number): Promise { while (this.buffer.byteLength < size) { - const result = await this.reader.read() + const result = await this.reader.getReader().read() if (result.done) { throw "short buffer" } @@ -151,104 +171,3 @@ export class StreamReader { } } } - -// StreamWriter wraps a stream and writes chunks of data -export class StreamWriter { - buffer: ArrayBuffer; - writer: WritableStreamDefaultWriter; - - constructor(stream: WritableStream) { - this.buffer = new ArrayBuffer(8) - this.writer = stream.getWriter() - } - - release() { - this.writer.releaseLock() - } - - async close() { - return this.writer.close() - } - - async uint8(v: number) { - const view = new DataView(this.buffer, 0, 1) - view.setUint8(0, v) - return this.writer.write(view) - } - - async uint16(v: number) { - const view = new DataView(this.buffer, 0, 2) - view.setUint16(0, v) - return this.writer.write(view) - } - - async uint24(v: number) { - const v1 = (v >> 16) & 0xff - const v2 = (v >> 8) & 0xff - const v3 = (v) & 0xff - - const view = new DataView(this.buffer, 0, 3) - view.setUint8(0, v1) - view.setUint8(1, v2) - view.setUint8(2, v3) - - return this.writer.write(view) - } - - async uint32(v: number) { - const view = new DataView(this.buffer, 0, 4) - view.setUint32(0, v) - return this.writer.write(view) - } - - async uint52(v: number) { - if (v > Number.MAX_SAFE_INTEGER) { - throw "value too large" - } - - this.uint64(BigInt(v)) - } - - async vint52(v: number) { - if (v > Number.MAX_SAFE_INTEGER) { - throw "value too large" - } - - if (v < (1 << 6)) { - return this.uint8(v) - } else if (v < (1 << 14)) { - return this.uint16(v|0x4000) - } else if (v < (1 << 30)) { - return this.uint32(v|0x80000000) - } else { - return this.uint64(BigInt(v) | 0xc000000000000000n) - } - } - - async uint64(v: bigint) { - const view = new DataView(this.buffer, 0, 8) - view.setBigUint64(0, v) - return this.writer.write(view) - } - - async vint64(v: bigint) { - if (v < (1 << 6)) { - return this.uint8(Number(v)) - } else if (v < (1 << 14)) { - return this.uint16(Number(v)|0x4000) - } else if (v < (1 << 30)) { - return this.uint32(Number(v)|0x80000000) - } else { - return this.uint64(v | 0xc000000000000000n) - } - } - - async bytes(buffer: ArrayBuffer) { - return this.writer.write(buffer) - } - - async string(str: string) { - const data = new TextEncoder().encode(str) - return this.writer.write(data) - } -} diff --git a/player/src/stream/writer.ts b/player/src/stream/writer.ts new file mode 100644 index 0000000..201a426 --- /dev/null +++ b/player/src/stream/writer.ts @@ -0,0 +1,100 @@ +// Writer wraps a stream and writes chunks of data +export default class Writer { + buffer: ArrayBuffer; + writer: WritableStreamDefaultWriter; + + constructor(stream: WritableStream) { + this.buffer = new ArrayBuffer(8) + this.writer = stream.getWriter() + } + + release() { + this.writer.releaseLock() + } + + async close() { + return this.writer.close() + } + + async uint8(v: number) { + const view = new DataView(this.buffer, 0, 1) + view.setUint8(0, v) + return this.writer.write(view) + } + + async uint16(v: number) { + const view = new DataView(this.buffer, 0, 2) + view.setUint16(0, v) + return this.writer.write(view) + } + + async uint24(v: number) { + const v1 = (v >> 16) & 0xff + const v2 = (v >> 8) & 0xff + const v3 = (v) & 0xff + + const view = new DataView(this.buffer, 0, 3) + view.setUint8(0, v1) + view.setUint8(1, v2) + view.setUint8(2, v3) + + return this.writer.write(view) + } + + async uint32(v: number) { + const view = new DataView(this.buffer, 0, 4) + view.setUint32(0, v) + return this.writer.write(view) + } + + async uint52(v: number) { + if (v > Number.MAX_SAFE_INTEGER) { + throw "value too large" + } + + this.uint64(BigInt(v)) + } + + async vint52(v: number) { + if (v > Number.MAX_SAFE_INTEGER) { + throw "value too large" + } + + if (v < (1 << 6)) { + return this.uint8(v) + } else if (v < (1 << 14)) { + return this.uint16(v|0x4000) + } else if (v < (1 << 30)) { + return this.uint32(v|0x80000000) + } else { + return this.uint64(BigInt(v) | 0xc000000000000000n) + } + } + + async uint64(v: bigint) { + const view = new DataView(this.buffer, 0, 8) + view.setBigUint64(0, v) + return this.writer.write(view) + } + + async vint64(v: bigint) { + if (v < (1 << 6)) { + return this.uint8(Number(v)) + } else if (v < (1 << 14)) { + return this.uint16(Number(v)|0x4000) + } else if (v < (1 << 30)) { + return this.uint32(Number(v)|0x80000000) + } else { + return this.uint64(v | 0xc000000000000000n) + } + } + + async bytes(buffer: ArrayBuffer) { + return this.writer.write(buffer) + } + + async string(str: string) { + const data = new TextEncoder().encode(str) + return this.writer.write(data) + } +} diff --git a/player/src/track.ts b/player/src/track.ts deleted file mode 100644 index b31cd7f..0000000 --- a/player/src/track.ts +++ /dev/null @@ -1,124 +0,0 @@ -import { Source } from "./source" -import { Segment } from "./segment" -import { TimeRange } from "./util" - -// An audio or video track that consists of multiple sequential segments. -// -// Instead of buffering, we want to drop video while audio plays uninterupted. -// Chrome actually plays up to 3s of audio without video before buffering when in low latency mode. -// Unforuntately, this does not recover correctly when there are gaps (pls fix). -// Our solution is to flush segments in decode order, buffering a single additional frame. -// We extend the duration of the buffered frame and flush it to cover any gaps. -export class Track { - source: Source; - segments: Segment[]; - - constructor(source: Source) { - this.source = source; - this.segments = []; - } - - add(segment: Segment) { - // TODO don't add if the segment is out of date already - this.segments.push(segment) - - // Sort by timestamp ascending - // NOTE: The timestamp is in milliseconds, and we need to parse the media to get the accurate PTS/DTS. - this.segments.sort((a: Segment, b: Segment): number => { - return a.timestamp - b.timestamp - }) - } - - buffered(): TimeRanges { - let ranges: TimeRange[] = [] - - const buffered = this.source.buffered() as TimeRanges - for (let i = 0; i < buffered.length; i += 1) { - // Convert the TimeRanges into an oject we can modify - ranges.push({ - start: buffered.start(i), - end: buffered.end(i) - }) - } - - // Loop over segments and add in their ranges, merging if possible. - for (let segment of this.segments) { - const buffered = segment.buffered() - if (!buffered) continue; - - if (ranges.length) { - // Try to merge with an existing range - const last = ranges[ranges.length-1]; - if (buffered.start < last.start) { - // Network buffer is old; ignore it - continue - } - - // Extend the end of the last range instead of pushing - if (buffered.start <= last.end && buffered.end > last.end) { - last.end = buffered.end - continue - } - } - - ranges.push(buffered) - } - - // TODO typescript - return { - length: ranges.length, - start: (x) => { return ranges[x].start }, - end: (x) => { return ranges[x].end }, - } - } - - flush() { - while (1) { - if (!this.segments.length) break - - const first = this.segments[0] - const done = first.flush() - if (!done) break - - this.segments.shift() - } - } - - // Given the current playhead, determine if we should drop any segments - // If playhead is undefined, it means we're buffering so skip to anything now. - advance(playhead: number | undefined) { - if (this.segments.length < 2) return - - while (this.segments.length > 1) { - const current = this.segments[0]; - const next = this.segments[1]; - - if (next.dts === undefined || next.timescale == undefined) { - // No samples have been parsed for the next segment yet. - break - } - - if (current.dts === undefined) { - // No samples have been parsed for the current segment yet. - // We can't cover the gap by extending the sample so we have to seek. - // TODO I don't think this can happen, but I guess we have to seek past the gap. - break - } - - if (playhead !== undefined) { - // Check if the next segment has playable media now. - // Otherwise give the current segment more time to catch up. - if ((next.dts / next.timescale) > playhead) { - return - } - } - - current.skipTo(next.dts || 0) // tell typescript that it's not undefined; we already checked - current.finish() - - // TODO cancel the QUIC stream to save bandwidth - - this.segments.shift() - } - } -} diff --git a/player/src/transport/index.ts b/player/src/transport/index.ts new file mode 100644 index 0000000..142cafb --- /dev/null +++ b/player/src/transport/index.ts @@ -0,0 +1,135 @@ +import * as Message from "./message" +import Reader from "../stream/reader" +import Writer from "../stream/writer" +import Video from "../video/index" + +/// + +export interface PlayerInit { + url: string; + canvas: HTMLCanvasElement; +} + +export class Player { + quic: Promise; + api: Promise; + + //audio: Worker; + video: Video; + + constructor(props: PlayerInit) { + //this.audio = new Worker("../audio") + this.video = new Video({ + canvas: props.canvas.transferControlToOffscreen(), + }) + + this.quic = this.connect(props.url) + + // Create a unidirectional stream for all of our messages + this.api = this.quic.then((q) => { + return q.createUnidirectionalStream() + }) + + // async functions + this.receiveStreams() + } + + async close() { + (await this.quic).close() + } + + async connect(url: string): Promise { + // TODO remove this when WebTransport supports the system CA pool + const fingerprintURL = new URL(url); + fingerprintURL.pathname = "/fingerprint" + + const response = await fetch(fingerprintURL) + if (!response.ok) { + throw new Error('failed to get server fingerprint'); + } + + const hex = await response.text() + + // Convert the hex to binary. + let fingerprint = []; + for (let c = 0; c < hex.length; c += 2) { + fingerprint.push(parseInt(hex.substring(c, c+2), 16)); + } + + //const fingerprint = Uint8Array.from(atob(hex), c => c.charCodeAt(0)) + + const quic = new WebTransport(url, { + "serverCertificateHashes": [{ + "algorithm": "sha-256", + "value": new Uint8Array(fingerprint), + }] + }) + + await quic.ready + + return quic + } + + async sendMessage(msg: any) { + const payload = JSON.stringify(msg) + const size = payload.length + 8 + + const stream = await this.api + + const writer = new Writer(stream) + await writer.uint32(size) + await writer.string("warp") + await writer.string(payload) + writer.release() + } + + async receiveStreams() { + const q = await this.quic + const streams = q.incomingUnidirectionalStreams.getReader() + + while (true) { + const result = await streams.read() + if (result.done) break + + const stream = result.value + this.handleStream(stream) // don't await + } + } + + async handleStream(stream: ReadableStream) { + let r = new Reader(stream.getReader()) + + while (!await r.done()) { + const size = await r.uint32(); + const typ = new TextDecoder('utf-8').decode(await r.bytes(4)); + + if (typ != "warp") throw "expected warp atom" + if (size < 8) throw "atom too small" + + const payload = new TextDecoder('utf-8').decode(await r.bytes(size - 8)); + const msg = JSON.parse(payload) + + if (msg.init) { + return this.handleInit(r, msg.init as Message.Init) + } else if (msg.segment) { + return this.handleSegment(r, msg.segment as Message.Segment) + } + } + } + + async handleInit(stream: Reader, msg: Message.Init) { + // TODO properly determine if audio or video + this.video.init({ + track: msg.id, + stream: stream, + }) + } + + async handleSegment(stream: Reader, msg: Message.Segment) { + // TODO properly determine if audio or video + this.video.segment({ + track: msg.init, + stream: stream, + }) + } +} \ No newline at end of file diff --git a/player/src/message.ts b/player/src/transport/message.ts similarity index 60% rename from player/src/message.ts rename to player/src/transport/message.ts index 026ebee..d151538 100644 --- a/player/src/message.ts +++ b/player/src/transport/message.ts @@ -1,13 +1,8 @@ -export interface Message { - init?: MessageInit - segment?: MessageSegment -} - -export interface MessageInit { +export interface Init { id: string } -export interface MessageSegment { +export interface Segment { init: string // id of the init segment timestamp: number // presentation timestamp in milliseconds of the first sample // TODO track would be nice diff --git a/player/src/util.ts b/player/src/util.ts deleted file mode 100644 index c3b6bda..0000000 --- a/player/src/util.ts +++ /dev/null @@ -1,4 +0,0 @@ -export interface TimeRange { - start: number; - end: number; -} diff --git a/player/src/video/decoder.ts b/player/src/video/decoder.ts new file mode 100644 index 0000000..4b8839a --- /dev/null +++ b/player/src/video/decoder.ts @@ -0,0 +1,123 @@ +import * as Message from "./message"; +import { Track } from "./track"; +import { Renderer } from "./renderer" +import { MP4New, MP4Sample, MP4ArrayBuffer } from "../mp4/mp4" + +export class Decoder { + tracks: Map; + renderer: Renderer; + + constructor(renderer: Renderer) { + this.tracks = new Map(); + this.renderer = renderer; + } + + async init(msg: Message.Init) { + let track = this.tracks.get(msg.track); + if (!track) { + track = new Track() + this.tracks.set(msg.track, track) + } + + while (1) { + const data = await msg.stream.read() + if (!data) break + + track.init(data) + } + + // TODO this will hang on incomplete data + const init = await track.ready; + const info = init.info; + + if (info.audioTracks.length + info.videoTracks.length != 1) { + throw new Error("expected a single track") + } + } + + async decode(msg: Message.Segment) { + let track = this.tracks.get(msg.track); + if (!track) { + track = new Track() + this.tracks.set(msg.track, track) + } + + // Wait for the init segment to be fully received and parsed + const init = await track.ready; + const info = init.info; + const video = info.videoTracks[0] + + const decoder = new VideoDecoder({ + output: (frame: VideoFrame) => { + this.renderer.push(frame) + }, + error: (err: Error) => { + console.warn(err) + } + }); + + decoder.configure({ + codec: info.mime, + codedHeight: video.track_height, + codedWidth: video.track_width, + // optimizeForLatency: true + }) + + const input = MP4New(); + + input.onSamples = (id: number, user: any, samples: MP4Sample[]) => { + for (let sample of samples) { + const timestamp = sample.dts / (1000 / info.timescale) // milliseconds + + decoder.decode(new EncodedVideoChunk({ + data: sample.data, + duration: sample.duration, + timestamp: timestamp, + type: sample.is_sync ? "key" : "delta", + })) + } + } + + input.onReady = (info: any) => { + input.setExtractionOptions(info.tracks[0].id, {}, { nbSamples: 1 }); + input.start(); + } + + let offset = 0 + + // MP4box requires us to reparse the init segment unfortunately + for (let raw of track.raw) { + offset = input.appendBuffer(raw) + } + + /* TODO I'm not actually sure why this code doesn't work; something trips up the MP4 parser + while (1) { + const data = await stream.read() + if (!data) break + + input.appendBuffer(data) + input.flush() + } + */ + + // One day I'll figure it out; until then read one top-level atom at a time + while (!await msg.stream.done()) { + const raw = await msg.stream.peek(4) + const size = new DataView(raw.buffer, raw.byteOffset, raw.byteLength).getUint32(0) + const atom = await msg.stream.bytes(size) + + // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately + let box = new Uint8Array(atom.byteLength); + box.set(atom) + + // and for some reason we need to modify the underlying ArrayBuffer with offset + let buffer = box.buffer as MP4ArrayBuffer + buffer.fileStart = offset + + // Parse the data + offset = input.appendBuffer(buffer) + input.flush() + } + + } +} \ No newline at end of file diff --git a/player/src/video/index.ts b/player/src/video/index.ts new file mode 100644 index 0000000..8ea58f4 --- /dev/null +++ b/player/src/video/index.ts @@ -0,0 +1,19 @@ +import * as Message from "./message" + +// Wrapper around the WebWorker API +export default class Video { + worker: Worker; + + constructor(config: Message.Config) { + this.worker = new Worker(new URL('worker.ts', import.meta.url), { type: "module" }) + this.worker.postMessage({ config }, [ config.canvas ]) + } + + init(init: Message.Init) { + this.worker.postMessage({ init }, [ init.stream.buffer, init.stream.reader ]) + } + + segment(segment: Message.Segment) { + this.worker.postMessage({ segment }, [ segment.stream.buffer, segment.stream.reader ]) + } +} \ No newline at end of file diff --git a/player/src/video/message.ts b/player/src/video/message.ts new file mode 100644 index 0000000..9845708 --- /dev/null +++ b/player/src/video/message.ts @@ -0,0 +1,15 @@ +import Reader from "../stream/reader"; + +export interface Config { + canvas: OffscreenCanvas; +} + +export interface Init { + track: string; + stream: Reader; +} + +export interface Segment { + track: string; + stream: Reader; +} \ No newline at end of file diff --git a/player/src/video/renderer.ts b/player/src/video/renderer.ts new file mode 100644 index 0000000..c5b40cf --- /dev/null +++ b/player/src/video/renderer.ts @@ -0,0 +1,72 @@ +import * as Message from "./message"; + +export class Renderer { + canvas: OffscreenCanvas; + queue: Array; + render: number; // non-zero if requestAnimationFrame has been called + sync: DOMHighResTimeStamp; // the wall clock value for timestamp 0 + last?: number; // the timestamp of the last rendered frame + + constructor(config: Message.Config) { + this.canvas = config.canvas; + this.queue = []; + this.render = 0; + this.sync = 0; + } + + push(frame: VideoFrame) { + if (!this.sync) { + // Save the frame as the sync point + this.sync = performance.now() - frame.timestamp + } + + // Drop any old frames + if (this.last && frame.timestamp <= this.last) { + frame.close() + return + } + + // Insert the frame into the queue sorted by timestamp. + // TODO loop backwards for better performance + let index = this.queue.findIndex(other => { + return frame.timestamp < other.timestamp; + }) + + // Insert into the queue. + this.queue.splice(index, 0, frame) + + if (!this.render) { + this.render = self.requestAnimationFrame(this.draw.bind(this)) + } + } + + draw(now: DOMHighResTimeStamp) { + // Determine the target timestamp. + const target = now - this.sync + + let frame = this.queue[0] + if (frame.timestamp > target) { + // nothing to render yet + return + } + + this.queue.shift() + + // Check if we should skip some frames + while (this.queue.length) { + const next = this.queue[0] + if (next.timestamp > target) { + break + } + + this.queue.shift() + frame = next + } + + const ctx = this.canvas.getContext("2d"); + ctx?.drawImage(frame, 0, 0) // TODO aspect ratio + + this.last = frame.timestamp; + frame.close() + } +} \ No newline at end of file diff --git a/player/src/video/track.ts b/player/src/video/track.ts new file mode 100644 index 0000000..60aaec0 --- /dev/null +++ b/player/src/video/track.ts @@ -0,0 +1,58 @@ +import { MP4New, MP4File, MP4ArrayBuffer, MP4Info } from "../mp4/mp4" + +export interface Init { + raw: MP4ArrayBuffer[]; + info: MP4Info; +} + +export class Track { + mp4box: MP4File; + offset: number; + + raw: MP4ArrayBuffer[]; + ready: Promise; + + constructor() { + this.mp4box = MP4New() + this.raw = [] + this.offset = 0 + + // Create a promise that gets resolved once the init segment has been parsed. + this.ready = new Promise((resolve, reject) => { + this.mp4box.onError = reject + + // https://github.com/gpac/mp4box.js#onreadyinfo + this.mp4box.onReady = (info: MP4Info) => { + if (!info.isFragmented) { + reject("expected a fragmented mp4") + } + + if (info.tracks.length != 1) { + reject("expected a single track") + } + + resolve({ + raw: this.raw, + info, + }) + } + }) + } + + init(data: Uint8Array) { + // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately + let box = new Uint8Array(data.byteLength); + box.set(data) + + // and for some reason we need to modify the underlying ArrayBuffer with fileStart + let buffer = box.buffer as MP4ArrayBuffer + buffer.fileStart = this.offset + + // Parse the data + this.offset = this.mp4box.appendBuffer(buffer) + this.mp4box.flush() + + // Add the box to our queue of chunks + this.raw.push(buffer) + } +} \ No newline at end of file diff --git a/player/src/video/worker.ts b/player/src/video/worker.ts new file mode 100644 index 0000000..b8ede50 --- /dev/null +++ b/player/src/video/worker.ts @@ -0,0 +1,22 @@ +import { Renderer } from "./renderer" +import { Decoder } from "./decoder" +import * as Message from "./message" + +let decoder: Decoder; +let renderer: Renderer; + +self.addEventListener('message', async (e: MessageEvent) => { + if (e.data.config) { + const config = e.data.config as Message.Config + + renderer = new Renderer(config) + decoder = new Decoder(renderer) + } + + if (e.data.segment) { + const segment = e.data.segment as Message.Segment + + await decoder.decode(segment) + } +}) + diff --git a/player/tsconfig.json b/player/tsconfig.json index 9762983..d778306 100644 --- a/player/tsconfig.json +++ b/player/tsconfig.json @@ -3,7 +3,7 @@ "src/**/*" ], "compilerOptions": { - "target": "es2021", + "target": "es2022", "strict": true, "typeRoots": [ "src/types"