From d7237c4926db49d3588d87bf26a08eba57938e56 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 4 May 2023 19:43:43 -0700 Subject: [PATCH] Send INIT as a single message. Much simpler on both the client and server side. --- player/src/audio/decoder.ts | 121 ------------------- player/src/audio/index.ts | 4 +- player/src/audio/renderer.ts | 85 ------------- player/src/audio/worker.ts | 26 ---- player/src/media/decoder.ts | 160 +++++++++++++++++++++++++ player/src/media/index.ts | 82 +++++++++++++ player/src/{audio => media}/message.ts | 27 +++-- player/src/media/renderer.ts | 138 +++++++++++++++++++++ player/src/{audio => media}/ring.ts | 0 player/src/{video => media}/worker.ts | 4 +- player/src/{audio => media}/worklet.ts | 2 +- player/src/mp4/index.ts | 21 ++-- player/src/mp4/init.ts | 14 +-- player/src/mp4/mp4box.d.ts | 12 +- player/src/mp4/rename.ts | 12 ++ player/src/player/index.ts | 22 +--- player/src/transport/index.ts | 96 ++------------- player/src/video/decoder.ts | 127 -------------------- player/src/video/index.ts | 27 ----- player/src/video/message.ts | 17 --- player/src/video/renderer.ts | 91 -------------- server/Cargo.lock | 1 - server/Cargo.toml | 2 +- server/src/media/mod.rs | 2 +- server/src/media/source.rs | 149 +++++++++-------------- server/src/session/message.rs | 6 +- server/src/session/mod.rs | 79 ++++++------ server/src/session/session.rs | 1 - 28 files changed, 546 insertions(+), 782 deletions(-) delete mode 100644 player/src/audio/decoder.ts delete mode 100644 player/src/audio/renderer.ts delete mode 100644 player/src/audio/worker.ts create mode 100644 player/src/media/decoder.ts create mode 100644 player/src/media/index.ts rename player/src/{audio => media}/message.ts (50%) create mode 100644 player/src/media/renderer.ts rename player/src/{audio => media}/ring.ts (100%) rename player/src/{video => media}/worker.ts (86%) rename player/src/{audio => media}/worklet.ts (93%) create mode 100644 player/src/mp4/rename.ts delete mode 100644 player/src/video/decoder.ts delete mode 100644 player/src/video/index.ts delete mode 100644 player/src/video/message.ts delete mode 100644 player/src/video/renderer.ts delete mode 100644 server/src/session/session.rs diff --git a/player/src/audio/decoder.ts b/player/src/audio/decoder.ts deleted file mode 100644 index d23bdc3..0000000 --- a/player/src/audio/decoder.ts +++ /dev/null @@ -1,121 +0,0 @@ -import * as Message from "./message"; -import * as MP4 from "../mp4" -import * as Stream from "../stream" -import * as Util from "../util" - -import Renderer from "./renderer" - -export default class Decoder { - // Store the init message for each track - tracks: Map>; - decoder: AudioDecoder; // TODO one per track - sync: Message.Sync; - - constructor(config: Message.Config, renderer: Renderer) { - this.tracks = new Map(); - - this.decoder = new AudioDecoder({ - output: renderer.emit.bind(renderer), - error: console.warn, - }); - } - - init(msg: Message.Init) { - let defer = this.tracks.get(msg.track); - if (!defer) { - defer = new Util.Deferred() - this.tracks.set(msg.track, defer) - } - - if (msg.info.audioTracks.length != 1 || msg.info.videoTracks.length != 0) { - throw new Error("Expected a single audio track") - } - - const track = msg.info.audioTracks[0] - const audio = track.audio - - defer.resolve(msg) - } - - async decode(msg: Message.Segment) { - let track = this.tracks.get(msg.track); - if (!track) { - track = new Util.Deferred() - this.tracks.set(msg.track, track) - } - - // Wait for the init segment to be fully received and parsed - const init = await track.promise; - const audio = init.info.audioTracks[0] - - if (this.decoder.state == "unconfigured") { - this.decoder.configure({ - codec: audio.codec, - numberOfChannels: audio.audio.channel_count, - sampleRate: audio.audio.sample_rate, - }) - } - - const input = MP4.New(); - - input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => { - for (let sample of samples) { - // Convert to microseconds - const timestamp = 1000 * 1000 * sample.dts / sample.timescale - const duration = 1000 * 1000 * sample.duration / sample.timescale - - // This assumes that timescale == sample rate - this.decoder.decode(new EncodedAudioChunk({ - type: sample.is_sync ? "key" : "delta", - data: sample.data, - duration: duration, - timestamp: timestamp, - })) - } - } - - input.onReady = (info: any) => { - input.setExtractionOptions(info.tracks[0].id, {}, { nbSamples: 1 }); - input.start(); - } - - // MP4box requires us to reparse the init segment unfortunately - let offset = 0; - - for (let raw of init.raw) { - raw.fileStart = offset - input.appendBuffer(raw) - } - - const stream = new Stream.Reader(msg.reader, msg.buffer) - - /* TODO I'm not actually sure why this code doesn't work; something trips up the MP4 parser - while (1) { - const data = await stream.read() - if (!data) break - - input.appendBuffer(data) - input.flush() - } - */ - - // One day I'll figure it out; until then read one top-level atom at a time - while (!await stream.done()) { - const raw = await stream.peek(4) - const size = new DataView(raw.buffer, raw.byteOffset, raw.byteLength).getUint32(0) - const atom = await stream.bytes(size) - - // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately - let box = new Uint8Array(atom.byteLength); - box.set(atom) - - // and for some reason we need to modify the underlying ArrayBuffer with offset - let buffer = box.buffer as MP4.ArrayBuffer - buffer.fileStart = offset - - // Parse the data - offset = input.appendBuffer(buffer) - input.flush() - } - } -} \ No newline at end of file diff --git a/player/src/audio/index.ts b/player/src/audio/index.ts index 725bc11..7076cd3 100644 --- a/player/src/audio/index.ts +++ b/player/src/audio/index.ts @@ -1,8 +1,8 @@ import * as Message from "./message" -import Renderer from "./renderer" +import Renderer from "../media/audio" import Decoder from "./decoder" -import { RingInit } from "./ring" +import { RingInit } from "../media/ring" // Abstracts the Worker and Worklet into a simpler API // This class must be created on the main thread due to AudioContext. diff --git a/player/src/audio/renderer.ts b/player/src/audio/renderer.ts deleted file mode 100644 index 7c5ec9e..0000000 --- a/player/src/audio/renderer.ts +++ /dev/null @@ -1,85 +0,0 @@ -import * as Message from "./message" -import { Ring } from "./ring" - -export default class Renderer { - ring: Ring; - queue: Array; - sync?: DOMHighResTimeStamp - running: number; - - constructor(config: Message.Config) { - this.ring = new Ring(config.ring) - this.queue = []; - this.running = 0 - } - - emit(frame: AudioData) { - if (!this.sync) { - // Save the frame as the sync point - this.sync = 1000 * performance.now() - frame.timestamp - } - - // Insert the frame into the queue sorted by timestamp. - if (this.queue.length > 0 && this.queue[this.queue.length-1].timestamp <= frame.timestamp) { - // Fast path because we normally append to the end. - this.queue.push(frame) - } else { - // Do a full binary search - let low = 0 - let high = this.queue.length; - - while (low < high) { - var mid = (low + high) >>> 1; - if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1; - else high = mid; - } - - this.queue.splice(low, 0, frame) - } - - if (!this.running) { - // Wait for the next animation frame - this.running = self.requestAnimationFrame(this.render.bind(this)) - } - } - - render() { - // Determine the target timestamp. - const target = 1000 * performance.now() - this.sync! - - // Check if we should skip some frames - while (this.queue.length) { - const next = this.queue[0] - if (next.timestamp >= target) { - break - } - - console.warn("dropping audio") - - this.queue.shift() - next.close() - } - - // Push as many as we can to the ring buffer. - while (this.queue.length) { - let frame = this.queue[0] - let ok = this.ring.write(frame) - if (!ok) { - break - } - - frame.close() - this.queue.shift() - } - - if (this.queue.length) { - this.running = self.requestAnimationFrame(this.render.bind(this)) - } else { - this.running = 0 - } - } - - play(play: Message.Play) { - this.ring.reset() - } -} \ No newline at end of file diff --git a/player/src/audio/worker.ts b/player/src/audio/worker.ts deleted file mode 100644 index 7ed9003..0000000 --- a/player/src/audio/worker.ts +++ /dev/null @@ -1,26 +0,0 @@ -import Decoder from "./decoder" -import Renderer from "./renderer" - -import * as Message from "./message" - -let decoder: Decoder -let renderer: Renderer; - -self.addEventListener('message', (e: MessageEvent) => { - if (e.data.config) { - renderer = new Renderer(e.data.config) - decoder = new Decoder(e.data.config, renderer) - } - - if (e.data.init) { - decoder.init(e.data.init) - } - - if (e.data.segment) { - decoder.decode(e.data.segment) - } - - if (e.data.play) { - renderer.play(e.data.play) - } -}) \ No newline at end of file diff --git a/player/src/media/decoder.ts b/player/src/media/decoder.ts new file mode 100644 index 0000000..ca2f639 --- /dev/null +++ b/player/src/media/decoder.ts @@ -0,0 +1,160 @@ +import * as Message from "./message"; +import * as MP4 from "../mp4" +import * as Stream from "../stream" +import * as Util from "../util" + +import Renderer from "./renderer" + +export default class Decoder { + init: MP4.InitParser; + decoders: Map; + renderer: Renderer; + + constructor(renderer: Renderer) { + this.init = new MP4.InitParser(); + this.decoders = new Map(); + this.renderer = renderer; + } + + async receiveInit(msg: Message.Init) { + let stream = new Stream.Reader(msg.reader, msg.buffer); + while (1) { + const data = await stream.read() + if (!data) break + + this.init.push(data) + } + + // TODO make sure the init segment is fully received + } + + async receiveSegment(msg: Message.Segment) { + // Wait for the init segment to be fully received and parsed + const info = await this.init.info + const input = MP4.New(); + + input.onSamples = this.onSamples.bind(this); + input.onReady = (info: any) => { + // Extract all of the tracks, because we don't know if it's audio or video. + for (let track of info.tracks) { + input.setExtractionOptions(track.id, track, { nbSamples: 1 }); + } + + input.start(); + } + + // MP4box requires us to reparse the init segment unfortunately + let offset = 0; + + for (let raw of this.init.raw) { + raw.fileStart = offset + offset = input.appendBuffer(raw) + } + + const stream = new Stream.Reader(msg.reader, msg.buffer) + + // For whatever reason, mp4box doesn't work until you read an atom at a time. + while (!await stream.done()) { + const raw = await stream.peek(4) + + // TODO this doesn't support when size = 0 (until EOF) or size = 1 (extended size) + const size = new DataView(raw.buffer, raw.byteOffset, raw.byteLength).getUint32(0) + const atom = await stream.bytes(size) + + // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately + let box = new Uint8Array(atom.byteLength); + box.set(atom) + + // and for some reason we need to modify the underlying ArrayBuffer with offset + let buffer = box.buffer as MP4.ArrayBuffer + buffer.fileStart = offset + + // Parse the data + offset = input.appendBuffer(buffer) + input.flush() + } + } + + onSamples(track_id: number, track: MP4.Track, samples: MP4.Sample[]) { + let decoder = this.decoders.get(track_id); + + if (!decoder) { + // We need a sample to initalize the video decoder, because of mp4box limitations. + let sample = samples[0]; + + if (MP4.isVideoTrack(track)) { + // Configure the decoder using the AVC box for H.264 + // TODO it should be easy to support other codecs, just need to know the right boxes. + const avcc = sample.description.avcC; + if (!avcc) throw new Error("TODO only h264 is supported"); + + const description = new MP4.Stream(new Uint8Array(avcc.size), 0, false) + avcc.write(description) + + const videoDecoder = new VideoDecoder({ + output: this.renderer.push.bind(this.renderer), + error: console.warn, + }); + + videoDecoder.configure({ + codec: track.codec, + codedHeight: track.video.height, + codedWidth: track.video.width, + description: description.buffer?.slice(8), + // optimizeForLatency: true + }) + + decoder = videoDecoder + } else if (MP4.isAudioTrack(track)) { + const audioDecoder = new AudioDecoder({ + output: this.renderer.push.bind(this.renderer), + error: console.warn, + }); + + audioDecoder.configure({ + codec: track.codec, + numberOfChannels: track.audio.channel_count, + sampleRate: track.audio.sample_rate, + }) + + decoder = audioDecoder + } else { + throw new Error("unknown track type") + } + + this.decoders.set(track_id, decoder) + } + + for (let sample of samples) { + // Convert to microseconds + const timestamp = 1000 * 1000 * sample.dts / sample.timescale + const duration = 1000 * 1000 * sample.duration / sample.timescale + + if (isAudioDecoder(decoder)) { + decoder.decode(new EncodedAudioChunk({ + type: sample.is_sync ? "key" : "delta", + data: sample.data, + duration: duration, + timestamp: timestamp, + })) + } else if (isVideoDecoder(decoder)) { + decoder.decode(new EncodedVideoChunk({ + type: sample.is_sync ? "key" : "delta", + data: sample.data, + duration: duration, + timestamp: timestamp, + })) + } else { + throw new Error("unknown decoder type") + } + } + } +} + +function isAudioDecoder(decoder: AudioDecoder | VideoDecoder): decoder is AudioDecoder { + return decoder instanceof AudioDecoder +} + +function isVideoDecoder(decoder: AudioDecoder | VideoDecoder): decoder is VideoDecoder { + return decoder instanceof VideoDecoder +} \ No newline at end of file diff --git a/player/src/media/index.ts b/player/src/media/index.ts new file mode 100644 index 0000000..fd42ed9 --- /dev/null +++ b/player/src/media/index.ts @@ -0,0 +1,82 @@ +import * as Message from "./message" +import { RingInit } from "./ring" + +// Abstracts the Worker and Worklet into a simpler API +// This class must be created on the main thread due to AudioContext. +export default class Media { + context: AudioContext; + worker: Worker; + worklet: Promise; + + constructor(videoConfig: Message.VideoConfig) { + // Assume 44.1kHz and two audio channels + const audioConfig = { + sampleRate: 44100, + ring: new RingInit(2, 4410), // 100ms at 44.1khz + } + + const config = { + audio: audioConfig, + video: videoConfig, + } + + this.context = new AudioContext({ + latencyHint: "interactive", + sampleRate: config.audio.sampleRate, + }) + + + this.worker = this.setupWorker(config) + this.worklet = this.setupWorklet(config) + } + + init(init: Message.Init) { + this.worker.postMessage({ init }, [ init.buffer.buffer, init.reader ]) + } + + segment(segment: Message.Segment) { + this.worker.postMessage({ segment }, [ segment.buffer.buffer, segment.reader ]) + } + + play(play: Message.Play) { + this.context.resume() + //this.worker.postMessage({ play }) + } + + private setupWorker(config: Message.Config): Worker { + const url = new URL('worker.ts', import.meta.url) + + const worker = new Worker(url, { + type: "module", + name: "media", + }) + + worker.postMessage({ config }, [ config.video.canvas ]) + + return worker + } + + private async setupWorklet(config: Message.Config): Promise { + // Load the worklet source code. + const url = new URL('worklet.ts', import.meta.url) + await this.context.audioWorklet.addModule(url) + + const volume = this.context.createGain() + volume.gain.value = 2.0; + + // Create a worklet + const worklet = new AudioWorkletNode(this.context, 'renderer'); + worklet.onprocessorerror = (e: Event) => { + console.error("Audio worklet error:", e) + }; + + worklet.port.postMessage({ config }) + + // Connect the worklet to the volume node and then to the speakers + worklet.connect(volume) + volume.connect(this.context.destination) + + return worklet + } + +} \ No newline at end of file diff --git a/player/src/audio/message.ts b/player/src/media/message.ts similarity index 50% rename from player/src/audio/message.ts rename to player/src/media/message.ts index 73d4f1c..a457200 100644 --- a/player/src/audio/message.ts +++ b/player/src/media/message.ts @@ -1,28 +1,29 @@ import * as MP4 from "../mp4" -import { RingInit } from "./ring" +import { RingInit } from "../media/ring" export interface Config { + audio: AudioConfig; + video: VideoConfig; +} + +export interface VideoConfig { + canvas: OffscreenCanvas; +} + +export interface AudioConfig { + // audio stuff sampleRate: number; ring: RingInit; } export interface Init { - track: string; - info: MP4.Info; - raw: MP4.ArrayBuffer[]; -} - -export interface Segment { - track: string; buffer: Uint8Array; // unread buffered data reader: ReadableStream; // unread unbuffered data } -// Audio tells video when the given timestamp should be rendered. -export interface Sync { - origin: number; - clock: DOMHighResTimeStamp; - timestamp: number; +export interface Segment { + buffer: Uint8Array; // unread buffered data + reader: ReadableStream; // unread unbuffered data } export interface Play { diff --git a/player/src/media/renderer.ts b/player/src/media/renderer.ts new file mode 100644 index 0000000..af56e2a --- /dev/null +++ b/player/src/media/renderer.ts @@ -0,0 +1,138 @@ +import * as Message from "./message"; +import { Ring } from "./ring" + +export default class Renderer { + audioRing: Ring; + audioQueue: Array; + + videoCanvas: OffscreenCanvas; + videoQueue: Array; + + render: number; // non-zero if requestAnimationFrame has been called + sync?: DOMHighResTimeStamp; // the wall clock value for timestamp 0, in microseconds + last?: number; // the timestamp of the last rendered frame, in microseconds + + constructor(config: Message.Config) { + this.audioRing = new Ring(config.audio.ring); + this.audioQueue = []; + + this.videoCanvas = config.video.canvas; + this.videoQueue = []; + + this.render = 0; + } + + push(frame: AudioData | VideoFrame) { + if (!this.sync) { + // Save the frame as the sync point + this.sync = 1000 * performance.now() - frame.timestamp + } + + // Drop any old frames + if (this.last && frame.timestamp <= this.last) { + frame.close() + return + } + + let queue + if (isAudioData(frame)) { + queue = this.audioQueue; + } else if (isVideoFrame(frame)) { + queue = this.videoQueue; + } else { + throw new Error("unknown frame type") + } + + // Insert the frame into the queue sorted by timestamp. + if (queue.length > 0 && queue[queue.length-1].timestamp <= frame.timestamp) { + // Fast path because we normally append to the end. + queue.push(frame as any) + } else { + // Do a full binary search + let low = 0 + let high = queue.length; + + while (low < high) { + var mid = (low + high) >>> 1; + if (queue[mid].timestamp < frame.timestamp) low = mid + 1; + else high = mid; + } + + queue.splice(low, 0, frame as any) + } + + // Queue up to render the next frame. + if (!this.render) { + this.render = self.requestAnimationFrame(this.draw.bind(this)) + } + } + + draw(now: DOMHighResTimeStamp) { + // Determine the target timestamp. + const target = 1000 * now - this.sync! + + this.drawAudio(now, target) + this.drawVideo(now, target) + + if (this.audioQueue.length || this.videoQueue.length) { + this.render = self.requestAnimationFrame(this.draw.bind(this)) + } else { + this.render = 0 + } + } + + drawAudio(now: DOMHighResTimeStamp, target: DOMHighResTimeStamp) { + // Check if we should skip some frames + while (this.audioQueue.length) { + const next = this.audioQueue[0] + if (next.timestamp >= target) { + let ok = this.audioRing.write(next) + if (!ok) { + // No more space in the ring + break + } + } else { + console.warn("dropping audio") + } + + next.close() + this.audioQueue.shift() + } + } + + drawVideo(now: DOMHighResTimeStamp, target: DOMHighResTimeStamp) { + if (this.videoQueue.length == 0) return; + + let frame = this.videoQueue[0]; + if (frame.timestamp >= target) { + // nothing to render yet, wait for the next animation frame + this.render = self.requestAnimationFrame(this.draw.bind(this)) + return + } + + this.videoQueue.shift(); + + // Check if we should skip some frames + while (this.videoQueue.length) { + const next = this.videoQueue[0] + if (next.timestamp > target) break + + frame.close() + frame = this.videoQueue.shift()!; + } + + const ctx = this.videoCanvas.getContext("2d"); + ctx!.drawImage(frame, 0, 0, this.videoCanvas.width, this.videoCanvas.height) // TODO aspect ratio + + this.last = frame.timestamp; + frame.close() + } +} + +function isAudioData(frame: AudioData | VideoFrame): frame is AudioData { + return frame instanceof AudioData +} + +function isVideoFrame(frame: AudioData | VideoFrame): frame is VideoFrame { + return frame instanceof VideoFrame +} \ No newline at end of file diff --git a/player/src/audio/ring.ts b/player/src/media/ring.ts similarity index 100% rename from player/src/audio/ring.ts rename to player/src/media/ring.ts diff --git a/player/src/video/worker.ts b/player/src/media/worker.ts similarity index 86% rename from player/src/video/worker.ts rename to player/src/media/worker.ts index 8eeede8..4597c29 100644 --- a/player/src/video/worker.ts +++ b/player/src/media/worker.ts @@ -13,10 +13,10 @@ self.addEventListener('message', async (e: MessageEvent) => { decoder = new Decoder(renderer) } else if (e.data.init) { const init = e.data.init as Message.Init - await decoder.init(init) + await decoder.receiveInit(init) } else if (e.data.segment) { const segment = e.data.segment as Message.Segment - await decoder.decode(segment) + await decoder.receiveSegment(segment) } }) diff --git a/player/src/audio/worklet.ts b/player/src/media/worklet.ts similarity index 93% rename from player/src/audio/worklet.ts rename to player/src/media/worklet.ts index 401eec9..5961c32 100644 --- a/player/src/audio/worklet.ts +++ b/player/src/media/worklet.ts @@ -25,7 +25,7 @@ class Renderer extends AudioWorkletProcessor { } config(config: Message.Config) { - this.ring = new Ring(config.ring) + this.ring = new Ring(config.audio.ring) } // Inputs and outputs in groups of 128 samples. diff --git a/player/src/mp4/index.ts b/player/src/mp4/index.ts index e05fbce..ca1a0bd 100644 --- a/player/src/mp4/index.ts +++ b/player/src/mp4/index.ts @@ -1,11 +1,12 @@ -// Rename some stuff so it's on brand. -export { - createFile as New, - MP4File as File, - MP4ArrayBuffer as ArrayBuffer, - MP4Info as Info, - DataStream as Stream, - Sample, -} from "mp4box" +import * as MP4 from "./rename" +export * from "./rename" -export { Init, InitParser } from "./init" \ No newline at end of file +export { Init, InitParser } from "./init" + +export function isAudioTrack(track: MP4.Track): track is MP4.AudioTrack { + return (track as MP4.AudioTrack).audio !== undefined; +} + +export function isVideoTrack(track: MP4.Track): track is MP4.VideoTrack { + return (track as MP4.VideoTrack).video !== undefined; +} \ No newline at end of file diff --git a/player/src/mp4/init.ts b/player/src/mp4/init.ts index d36c25b..45e7c42 100644 --- a/player/src/mp4/init.ts +++ b/player/src/mp4/init.ts @@ -20,19 +20,7 @@ export class InitParser { // Create a promise that gets resolved once the init segment has been parsed. this.info = new Promise((resolve, reject) => { this.mp4box.onError = reject - - // https://github.com/gpac/mp4box.js#onreadyinfo - this.mp4box.onReady = (info: MP4.Info) => { - if (!info.isFragmented) { - reject("expected a fragmented mp4") - } - - if (info.tracks.length != 1) { - reject("expected a single track") - } - - resolve(info) - } + this.mp4box.onReady = resolve }) } diff --git a/player/src/mp4/mp4box.d.ts b/player/src/mp4/mp4box.d.ts index 7ce91f7..018f185 100644 --- a/player/src/mp4/mp4box.d.ts +++ b/player/src/mp4/mp4box.d.ts @@ -1,7 +1,7 @@ // https://github.com/gpac/mp4box.js/issues/233 declare module "mp4box" { - interface MP4MediaTrack { + export interface MP4MediaTrack { id: number; created: Date; modified: Date; @@ -19,26 +19,26 @@ declare module "mp4box" { nb_samples: number; } - interface MP4VideoData { + export interface MP4VideoData { width: number; height: number; } - interface MP4VideoTrack extends MP4MediaTrack { + export interface MP4VideoTrack extends MP4MediaTrack { video: MP4VideoData; } - interface MP4AudioData { + export interface MP4AudioData { sample_rate: number; channel_count: number; sample_size: number; } - interface MP4AudioTrack extends MP4MediaTrack { + export interface MP4AudioTrack extends MP4MediaTrack { audio: MP4AudioData; } - type MP4Track = MP4VideoTrack | MP4AudioTrack; + export type MP4Track = MP4VideoTrack | MP4AudioTrack; export interface MP4Info { duration: number; diff --git a/player/src/mp4/rename.ts b/player/src/mp4/rename.ts new file mode 100644 index 0000000..d45682b --- /dev/null +++ b/player/src/mp4/rename.ts @@ -0,0 +1,12 @@ +// Rename some stuff so it's on brand. +export { + createFile as New, + MP4File as File, + MP4ArrayBuffer as ArrayBuffer, + MP4Info as Info, + MP4Track as Track, + MP4AudioTrack as AudioTrack, + MP4VideoTrack as VideoTrack, + DataStream as Stream, + Sample, +} from "mp4box" \ No newline at end of file diff --git a/player/src/player/index.ts b/player/src/player/index.ts index b18a625..7fafccb 100644 --- a/player/src/player/index.ts +++ b/player/src/player/index.ts @@ -1,6 +1,5 @@ -import Audio from "../audio" import Transport from "../transport" -import Video from "../video" +import Media from "../media" export interface PlayerInit { url: string; @@ -9,22 +8,18 @@ export interface PlayerInit { } export default class Player { - audio: Audio; - video: Video; + media: Media; transport: Transport; constructor(props: PlayerInit) { - this.audio = new Audio() - this.video = new Video({ + this.media = new Media({ canvas: props.canvas.transferControlToOffscreen(), }) this.transport = new Transport({ url: props.url, fingerprint: props.fingerprint, - - audio: this.audio, - video: this.video, + media: this.media, }) } @@ -33,13 +28,6 @@ export default class Player { } play() { - this.audio.play({}) - //this.video.play() + //this.media.play() } - - onMessage(msg: any) { - if (msg.sync) { - msg.sync - } - } } \ No newline at end of file diff --git a/player/src/transport/index.ts b/player/src/transport/index.ts index ee58ed1..e588a8f 100644 --- a/player/src/transport/index.ts +++ b/player/src/transport/index.ts @@ -2,30 +2,22 @@ import * as Message from "./message" import * as Stream from "../stream" import * as MP4 from "../mp4" -import Audio from "../audio" -import Video from "../video" +import Media from "../media" export interface TransportInit { url: string; fingerprint?: WebTransportHash; // the certificate fingerprint, temporarily needed for local development - - audio: Audio; - video: Video; + media: Media; } export default class Transport { quic: Promise; api: Promise; - tracks: Map - audio: Audio; - video: Video; + media: Media; constructor(props: TransportInit) { - this.tracks = new Map(); - - this.audio = props.audio; - this.video = props.video; + this.media = props.media; this.quic = this.connect(props) @@ -94,82 +86,18 @@ export default class Transport { const msg = JSON.parse(payload) if (msg.init) { - return this.handleInit(r, msg.init as Message.Init) + return this.media.init({ + buffer: r.buffer, + reader: r.reader, + }) } else if (msg.segment) { - return this.handleSegment(r, msg.segment as Message.Segment) + return this.media.segment({ + buffer: r.buffer, + reader: r.reader, + }) } else { console.warn("unknown message", msg); } } } - - async handleInit(stream: Stream.Reader, msg: Message.Init) { - console.log("handle init", msg); - - let track = this.tracks.get(msg.id); - if (!track) { - track = new MP4.InitParser() - this.tracks.set(msg.id, track) - } - - while (1) { - const data = await stream.read() - if (!data) break - - track.push(data) - } - - const info = await track.info - - console.log(info); - - if (info.audioTracks.length + info.videoTracks.length != 1) { - throw new Error("expected a single track") - } - - if (info.audioTracks.length) { - this.audio.init({ - track: msg.id, - info: info, - raw: track.raw, - }) - } else if (info.videoTracks.length) { - this.video.init({ - track: msg.id, - info: info, - raw: track.raw, - }) - } else { - throw new Error("init is neither audio nor video") - } - } - - async handleSegment(stream: Stream.Reader, msg: Message.Segment) { - console.log("handle segment", msg); - - let track = this.tracks.get(msg.init); - if (!track) { - track = new MP4.InitParser() - this.tracks.set(msg.init, track) - } - - // Wait until we learn if this is an audio or video track - const info = await track.info - - if (info.audioTracks.length) { - this.audio.segment({ - track: msg.init, - buffer: stream.buffer, - reader: stream.reader, - }) - } else if (info.videoTracks.length) { - this.video.segment({ - track: msg.init, - buffer: stream.buffer, - reader: stream.reader, - }) - } else { - throw new Error("segment is neither audio nor video") - } - } } \ No newline at end of file diff --git a/player/src/video/decoder.ts b/player/src/video/decoder.ts deleted file mode 100644 index 582dcd2..0000000 --- a/player/src/video/decoder.ts +++ /dev/null @@ -1,127 +0,0 @@ -import * as Message from "./message"; -import * as MP4 from "../mp4" -import * as Stream from "../stream" -import * as Util from "../util" - -import Renderer from "./renderer" - -export default class Decoder { - // Store the init message for each track - tracks: Map> - renderer: Renderer; - - constructor(renderer: Renderer) { - this.tracks = new Map(); - this.renderer = renderer; - } - - async init(msg: Message.Init) { - let track = this.tracks.get(msg.track); - if (!track) { - track = new Util.Deferred() - this.tracks.set(msg.track, track) - } - - if (msg.info.videoTracks.length != 1 || msg.info.audioTracks.length != 0) { - throw new Error("Expected a single video track") - } - - track.resolve(msg) - } - - async decode(msg: Message.Segment) { - let track = this.tracks.get(msg.track); - if (!track) { - track = new Util.Deferred() - this.tracks.set(msg.track, track) - } - - // Wait for the init segment to be fully received and parsed - const init = await track.promise; - const info = init.info; - const video = info.videoTracks[0] - - const decoder = new VideoDecoder({ - output: (frame: VideoFrame) => { - this.renderer.emit(frame) - }, - error: (err: Error) => { - console.warn(err) - } - }); - - const input = MP4.New(); - - input.onSamples = (id: number, user: any, samples: MP4.Sample[]) => { - for (let sample of samples) { - const timestamp = 1000 * sample.dts / sample.timescale // milliseconds - - if (sample.is_sync) { - // Configure the decoder using the AVC box for H.264 - const avcc = sample.description.avcC; - const description = new MP4.Stream(new Uint8Array(avcc.size), 0, false) - avcc.write(description) - - decoder.configure({ - codec: video.codec, - codedHeight: video.track_height, - codedWidth: video.track_width, - description: description.buffer?.slice(8), - // optimizeForLatency: true - }) - } - - decoder.decode(new EncodedVideoChunk({ - data: sample.data, - duration: sample.duration, - timestamp: timestamp, - type: sample.is_sync ? "key" : "delta", - })) - } - } - - input.onReady = (info: any) => { - input.setExtractionOptions(info.tracks[0].id, {}, { nbSamples: 1 }); - input.start(); - } - - // MP4box requires us to reparse the init segment unfortunately - let offset = 0; - - for (let raw of init.raw) { - raw.fileStart = offset - offset = input.appendBuffer(raw) - } - - const stream = new Stream.Reader(msg.reader, msg.buffer) - - /* TODO I'm not actually sure why this code doesn't work; something trips up the MP4 parser - while (1) { - const data = await stream.read() - if (!data) break - - input.appendBuffer(data) - input.flush() - } - */ - - // One day I'll figure it out; until then read one top-level atom at a time - while (!await stream.done()) { - const raw = await stream.peek(4) - const size = new DataView(raw.buffer, raw.byteOffset, raw.byteLength).getUint32(0) - const atom = await stream.bytes(size) - - // Make a copy of the atom because mp4box only accepts an ArrayBuffer unfortunately - let box = new Uint8Array(atom.byteLength); - box.set(atom) - - // and for some reason we need to modify the underlying ArrayBuffer with offset - let buffer = box.buffer as MP4.ArrayBuffer - buffer.fileStart = offset - - // Parse the data - offset = input.appendBuffer(buffer) - input.flush() - } - } -} \ No newline at end of file diff --git a/player/src/video/index.ts b/player/src/video/index.ts deleted file mode 100644 index f447072..0000000 --- a/player/src/video/index.ts +++ /dev/null @@ -1,27 +0,0 @@ -import * as Message from "./message" - -// Wrapper around the WebWorker API -export default class Video { - worker: Worker; - - constructor(config: Message.Config) { - const url = new URL('worker.ts', import.meta.url) - this.worker = new Worker(url, { - type: "module", - name: "video", - }) - this.worker.postMessage({ config }, [ config.canvas ]) - } - - init(init: Message.Init) { - this.worker.postMessage({ init }) // note: we copy the raw init bytes each time - } - - segment(segment: Message.Segment) { - this.worker.postMessage({ segment }, [ segment.buffer.buffer, segment.reader ]) - } - - play() { - // TODO - } -} \ No newline at end of file diff --git a/player/src/video/message.ts b/player/src/video/message.ts deleted file mode 100644 index 61e4b6d..0000000 --- a/player/src/video/message.ts +++ /dev/null @@ -1,17 +0,0 @@ -import * as MP4 from "../mp4" - -export interface Config { - canvas: OffscreenCanvas; -} - -export interface Init { - track: string; - info: MP4.Info; - raw: MP4.ArrayBuffer[]; -} - -export interface Segment { - track: string; - buffer: Uint8Array; // unread buffered data - reader: ReadableStream; // unread unbuffered data -} \ No newline at end of file diff --git a/player/src/video/renderer.ts b/player/src/video/renderer.ts deleted file mode 100644 index 98fd3b6..0000000 --- a/player/src/video/renderer.ts +++ /dev/null @@ -1,91 +0,0 @@ -import * as Message from "./message"; - -export default class Renderer { - canvas: OffscreenCanvas; - queue: Array; - render: number; // non-zero if requestAnimationFrame has been called - sync?: DOMHighResTimeStamp; // the wall clock value for timestamp 0 - last?: number; // the timestamp of the last rendered frame - - constructor(config: Message.Config) { - this.canvas = config.canvas; - this.queue = []; - this.render = 0; - } - - emit(frame: VideoFrame) { - if (!this.sync) { - // Save the frame as the sync point - this.sync = performance.now() - frame.timestamp - } - - // Drop any old frames - if (this.last && frame.timestamp <= this.last) { - frame.close() - return - } - - // Insert the frame into the queue sorted by timestamp. - if (this.queue.length > 0 && this.queue[this.queue.length-1].timestamp <= frame.timestamp) { - // Fast path because we normally append to the end. - this.queue.push(frame) - } else { - // Do a full binary search - let low = 0 - let high = this.queue.length; - - while (low < high) { - var mid = (low + high) >>> 1; - if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1; - else high = mid; - } - - this.queue.splice(low, 0, frame) - } - - // Queue up to render the next frame. - if (!this.render) { - this.render = self.requestAnimationFrame(this.draw.bind(this)) - } - } - - draw(now: DOMHighResTimeStamp) { - // Determine the target timestamp. - const target = now - this.sync! - - let frame = this.queue[0] - if (frame.timestamp >= target) { - // nothing to render yet, wait for the next animation frame - this.render = self.requestAnimationFrame(this.draw.bind(this)) - return - } - - this.queue.shift() - - // Check if we should skip some frames - while (this.queue.length) { - const next = this.queue[0] - if (next.timestamp > target) { - break - } - - frame.close() - - this.queue.shift() - frame = next - } - - const ctx = this.canvas.getContext("2d"); - ctx!.drawImage(frame, 0, 0, this.canvas.width, this.canvas.height) // TODO aspect ratio - - this.last = frame.timestamp; - frame.close() - - if (this.queue.length > 0) { - this.render = self.requestAnimationFrame(this.draw.bind(this)) - } else { - // Break the loop for now - this.render = 0 - } - } -} \ No newline at end of file diff --git a/server/Cargo.lock b/server/Cargo.lock index b470484..0ae81ac 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -329,7 +329,6 @@ dependencies = [ [[package]] name = "mp4" version = "0.13.0" -source = "git+https://github.com/kixelated/mp4-rust.git?branch=trexs#efefcc47353f477518bff01493785ae0daa8efd4" dependencies = [ "byteorder", "bytes", diff --git a/server/Cargo.toml b/server/Cargo.toml index 660ab93..af201c6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -13,6 +13,6 @@ mio = { version = "0.8", features = ["net", "os-poll"] } env_logger = "0.9.3" ring = "0.16" anyhow = "1.0.70" -mp4 = { git = "https://github.com/kixelated/mp4-rust.git", branch = "trexs" } +mp4 = { path = "../../mp4-rust" } # { git = "https://github.com/kixelated/mp4-rust.git", branch = "trexs" } serde = "1.0.160" serde_json = "1.0" \ No newline at end of file diff --git a/server/src/media/mod.rs b/server/src/media/mod.rs index e7ec5eb..f73c588 100644 --- a/server/src/media/mod.rs +++ b/server/src/media/mod.rs @@ -1,3 +1,3 @@ mod source; -pub use source::{Fragment, Source}; +pub use source::{Fragment, Source}; \ No newline at end of file diff --git a/server/src/media/source.rs b/server/src/media/source.rs index 38cfb97..e639ce6 100644 --- a/server/src/media/source.rs +++ b/server/src/media/source.rs @@ -1,37 +1,32 @@ -use io::Read; -use std::collections::VecDeque; use std::{fs, io, time}; - -use std::io::Write; +use std::collections::{HashMap,VecDeque}; +use std::io::Read; use anyhow; -use mp4; -use mp4::{ReadBox, WriteBox}; +use mp4; +use mp4::ReadBox; pub struct Source { // We read the file once, in order, and don't seek backwards. reader: io::BufReader, - // Any fragments parsed and ready to be returned by next(). - fragments: VecDeque, - // The timestamp when the broadcast "started", so we can sleep to simulate a live stream. start: time::Instant, - // The raw ftyp box, which we need duplicate for each track, but we don't know how many tracks exist yet. - ftyp: Vec, + // The initialization payload; ftyp + moov boxes. + pub init: Vec, - // The parsed moov box, so we can look up track information later. - moov: Option, + // The timescale used for each track. + timescale: HashMap, + + // Any fragments parsed and ready to be returned by next(). + fragments: VecDeque, } pub struct Fragment { // The track ID for the fragment. - pub track: u32, - - // The type of the fragment. - pub typ: mp4::BoxType, + pub track_id: u32, // The data of the fragment. pub data: Vec, @@ -44,21 +39,42 @@ pub struct Fragment { } impl Source { - pub fn new(path: &str) -> io::Result { + pub fn new(path: &str) -> anyhow::Result { let f = fs::File::open(path)?; - let reader = io::BufReader::new(f); + let mut reader = io::BufReader::new(f); let start = time::Instant::now(); - Ok(Self { + let ftyp = read_atom(&mut reader)?; + anyhow::ensure!(&ftyp[4..8] == b"ftyp", "expected ftyp atom"); + + let moov = read_atom(&mut reader)?; + anyhow::ensure!(&moov[4..8] == b"moov", "expected moov atom"); + + let mut init = ftyp; + init.extend(&moov); + + // We're going to parse the moov box. + // We have to read the moov box header to correctly advance the cursor for the mp4 crate. + let mut moov_reader = io::Cursor::new(&moov); + let moov_header = mp4::BoxHeader::read(&mut moov_reader)?; + + // Parse the moov box so we can detect the timescales for each track. + let moov = mp4::MoovBox::read_box(&mut moov_reader, moov_header.size)?; + let timescale = moov.traks + .iter() + .map(|trak| (trak.tkhd.track_id, trak.mdia.mdhd.timescale)) + .collect(); + + Ok(Self{ reader, start, + init, + timescale, fragments: VecDeque::new(), - ftyp: Vec::new(), - moov: None, }) } - pub fn get(&mut self) -> anyhow::Result> { + pub fn fragment(&mut self) -> anyhow::Result> { if self.fragments.is_empty() { self.parse()?; }; @@ -72,63 +88,13 @@ impl Source { fn parse(&mut self) -> anyhow::Result<()> { loop { - // Read the next full atom. - let atom = read_box(&mut self.reader)?; + let atom = read_atom(&mut self.reader)?; - // Before we return it, let's do some simple parsing. let mut reader = io::Cursor::new(&atom); let header = mp4::BoxHeader::read(&mut reader)?; match header.name { - mp4::BoxType::FtypBox => { - // Don't return anything until we know the total number of tracks. - // To be honest, I didn't expect the borrow checker to allow this, but it does! - self.ftyp = atom; - } - mp4::BoxType::MoovBox => { - // We need to split the moov based on the tracks. - let moov = mp4::MoovBox::read_box(&mut reader, header.size)?; - - for trak in &moov.traks { - let track_id = trak.tkhd.track_id; - - // Push the styp atom for each track. - self.fragments.push_back(Fragment { - track: track_id, - typ: mp4::BoxType::FtypBox, - data: self.ftyp.clone(), - keyframe: false, - timestamp: None, - }); - - // Unfortunately, we need to create a brand new moov atom for each track. - // We remove every box for other track IDs. - let mut toov = moov.clone(); - toov.traks.retain(|t| t.tkhd.track_id == track_id); - toov.mvex - .as_mut() - .expect("missing mvex") - .trexs - .retain(|f| f.track_id == track_id); - - // Marshal the box. - let mut toov_data = Vec::new(); - toov.write_box(&mut toov_data)?; - - let mut file = std::fs::File::create(format!("track{}.mp4", track_id))?; - file.write_all(toov_data.as_slice())?; - - self.fragments.push_back(Fragment { - track: track_id, - typ: mp4::BoxType::MoovBox, - data: toov_data, - keyframe: false, - timestamp: None, - }); - } - - self.moov = Some(moov); - } + mp4::BoxType::FtypBox | mp4::BoxType::MoovBox => anyhow::bail!("must call init first"), mp4::BoxType::MoofBox => { let moof = mp4::MoofBox::read_box(&mut reader, header.size)?; @@ -138,8 +104,7 @@ impl Source { } self.fragments.push_back(Fragment { - track: moof.trafs[0].tfhd.track_id, - typ: mp4::BoxType::MoofBox, + track_id: moof.trafs[0].tfhd.track_id, data: atom, keyframe: has_keyframe(&moof), timestamp: first_timestamp(&moof), @@ -147,11 +112,9 @@ impl Source { } mp4::BoxType::MdatBox => { let moof = self.fragments.back().expect("no atom before mdat"); - assert!(moof.typ == mp4::BoxType::MoofBox, "no moof before mdat"); self.fragments.push_back(Fragment { - track: moof.track, - typ: mp4::BoxType::MoofBox, + track_id: moof.track_id, data: atom, keyframe: false, timestamp: None, @@ -160,7 +123,9 @@ impl Source { // We have some media data, return so we can start sending it. return Ok(()); } - _ => anyhow::bail!("unknown top-level atom: {:?}", header.name), + _ => { + // Skip unknown atoms + } } } } @@ -171,15 +136,9 @@ impl Source { let timestamp = next.timestamp?; // Find the timescale for the track. - let track = self - .moov - .as_ref()? - .traks - .iter() - .find(|t| t.tkhd.track_id == next.track)?; - let timescale = track.mdia.mdhd.timescale as u64; + let timescale = self.timescale.get(&next.track_id).unwrap(); - let delay = time::Duration::from_millis(1000 * timestamp / timescale); + let delay = time::Duration::from_millis(1000 * timestamp / *timescale as u64); let elapsed = self.start.elapsed(); delay.checked_sub(elapsed) @@ -187,14 +146,16 @@ impl Source { } // Read a full MP4 atom into a vector. -fn read_box(reader: &mut R) -> anyhow::Result> { +pub fn read_atom(reader: &mut R) -> anyhow::Result> { // Read the 8 bytes for the size + type let mut buf = [0u8; 8]; reader.read_exact(&mut buf)?; // Convert the first 4 bytes into the size. let size = u32::from_be_bytes(buf[0..4].try_into()?) as u64; - let mut out = buf.to_vec(); + //let typ = &buf[4..8].try_into().ok().unwrap(); + + let mut raw = buf.to_vec(); let mut limit = match size { // Runs until the end of the file. @@ -222,9 +183,9 @@ fn read_box(reader: &mut R) -> anyhow::Result> { }; // Append to the vector and return it. - limit.read_to_end(&mut out)?; + limit.read_to_end(&mut raw)?; - Ok(out) + Ok(raw) } fn has_keyframe(moof: &mp4::MoofBox) -> bool { @@ -261,4 +222,4 @@ fn has_keyframe(moof: &mp4::MoofBox) -> bool { fn first_timestamp(moof: &mp4::MoofBox) -> Option { Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time) -} +} \ No newline at end of file diff --git a/server/src/session/message.rs b/server/src/session/message.rs index 74243e8..0849e2e 100644 --- a/server/src/session/message.rs +++ b/server/src/session/message.rs @@ -7,13 +7,11 @@ pub struct Message { } #[derive(Serialize, Deserialize)] -pub struct Init { - pub id: String, -} +pub struct Init {} #[derive(Serialize, Deserialize)] pub struct Segment { - pub init: String, + pub track_id: u32, } impl Message { diff --git a/server/src/session/mod.rs b/server/src/session/mod.rs index 47e0074..0d59e8e 100644 --- a/server/src/session/mod.rs +++ b/server/src/session/mod.rs @@ -1,6 +1,7 @@ mod message; use std::time; +use std::collections::hash_map as hmap; use quiche; use quiche::h3::webtransport; @@ -10,9 +11,8 @@ use crate::{media, transport}; #[derive(Default)] pub struct Session { media: Option, - stream_id: Option, // stream ID of the current segment - streams: transport::Streams, // An easy way of buffering stream data. + tracks: hmap::HashMap, // map from track_id to current stream_id } impl transport::App for Session { @@ -38,12 +38,23 @@ impl transport::App for Session { // req.authority() // req.path() // and you can validate this request with req.origin() + session.accept_connect_request(conn, None).unwrap(); // TODO let media = media::Source::new("../media/fragmented.mp4")?; - self.media = Some(media); + let init = &media.init; - session.accept_connect_request(conn, None).unwrap(); + // Create a JSON header. + let mut message = message::Message::new(); + message.init = Some(message::Init{}); + let data = message.serialize()?; + + // Create a new stream and write the header. + let stream_id = session.open_stream(conn, false)?; + self.streams.send(conn, stream_id, data.as_slice(), false)?; + self.streams.send(conn, stream_id, init.as_slice(), true)?; + + self.media = Some(media); } webtransport::ServerEvent::StreamData(stream_id) => { let mut buf = vec![0; 10000]; @@ -84,62 +95,54 @@ impl Session { }; // Get the next media fragment. - let fragment = match media.get()? { + let fragment = match media.fragment()? { Some(f) => f, None => return Ok(()), }; - // Check if we have already created a stream for this fragment. - let stream_id = match self.stream_id { - Some(old_stream_id) if fragment.keyframe => { - // This is the start of a new segment. + let stream_id = match self.tracks.get(&fragment.track_id) { + // Close the old stream. + Some(stream_id) if fragment.keyframe => { + self.streams.send(conn, *stream_id, &[], true)?; + None + }, - // Close the prior stream. - self.streams.send(conn, old_stream_id, &[], true)?; + // Use the existing stream + Some(stream_id) => Some(*stream_id), - // Encode a JSON header indicating this is the video track. - let mut message = message::Message::new(); - message.segment = Some(message::Segment { - init: "video".to_string(), - }); + // No existing stream. + _ => None, + }; - // Open a new stream. + let stream_id = match stream_id { + // Use the existing stream, + Some(stream_id) => stream_id, + + // Open a new stream. + None => { let stream_id = session.open_stream(conn, false)?; // TODO: conn.stream_priority(stream_id, urgency, incremental) + // Encode a JSON header indicating this is a new track. + let mut message = message::Message::new(); + message.segment = Some(message::Segment { + track_id: fragment.track_id, + }); + // Write the header. let data = message.serialize()?; self.streams.send(conn, stream_id, &data, false)?; - stream_id - } - None => { - // This is the start of an init segment. - - // Create a JSON header. - let mut message = message::Message::new(); - message.init = Some(message::Init { - id: "video".to_string(), - }); - - let data = message.serialize()?; - - // Create a new stream and write the header. - let stream_id = session.open_stream(conn, false)?; - self.streams.send(conn, stream_id, data.as_slice(), false)?; + self.tracks.insert(fragment.track_id, stream_id); stream_id - } - Some(stream_id) => stream_id, // Continuation of init or segment + }, }; // Write the current fragment. let data = fragment.data.as_slice(); self.streams.send(conn, stream_id, data, false)?; - // Save the stream ID for the next fragment. - self.stream_id = Some(stream_id); - Ok(()) } } diff --git a/server/src/session/session.rs b/server/src/session/session.rs deleted file mode 100644 index 8b13789..0000000 --- a/server/src/session/session.rs +++ /dev/null @@ -1 +0,0 @@ -