Make a docker-compose ez mode.

This commit is contained in:
Luke Curley 2023-05-16 10:23:50 -07:00
parent 0f4d823d39
commit 4675c27179
25 changed files with 425 additions and 96 deletions

View File

@ -1,64 +1,39 @@
# Warp
Segmented live media delivery protocol utilizing QUIC streams. See the [Warp draft](https://datatracker.ietf.org/doc/draft-lcurley-warp/).
Live media delivery protocol utilizing QUIC streams. See the [Warp draft](https://datatracker.ietf.org/doc/draft-lcurley-warp/).
Warp works by delivering each audio and video segment as a separate QUIC stream. These streams are assigned a priority such that old video will arrive last and can be dropped. This avoids buffering in many cases, offering the viewer a potentially better experience.
Warp works by delivering media over independent QUIC stream. These streams are assigned a priority such that old video will arrive last and can be dropped. This avoids buffering in many cases, offering the viewer a potentially better experience.
# Limitations
## Browser Support
This demo currently only works on Chrome for two reasons:
This demo requires WebTransport and WebCodecs, which currently (May 2023) only works on Chrome.
1. WebTransport support.
2. [Media underflow behavior](https://github.com/whatwg/html/issues/6359).
# Development
## Easy Mode
Requires Docker *only*.
The ability to skip video abuses the fact that Chrome can play audio without video for up to 3 seconds (hardcoded!) when using MSE. It is possible to use something like WebCodecs instead... but that's still Chrome only at the moment.
```
docker-compose up --build
```
## Streaming
This demo works by reading pre-encoded media and sleeping based on media timestamps. Obviously this is not a live stream; you should plug in your own encoder or source.
Then open [https://localhost:4444/](https://localhost:4444) in a browser. You'll have to click past the TLS error, but that's the price you pay for being lazy. Follow the more in-depth instructions if you want a better development experience.
The media is encoded on disk as a LL-DASH playlist. There's a crude parser and I haven't used DASH before so don't expect it to work with arbitrary inputs.
## QUIC Implementation
This demo uses a fork of [quic-go](https://github.com/lucas-clemente/quic-go). There are two critical features missing upstream:
1. ~~[WebTransport](https://github.com/lucas-clemente/quic-go/issues/3191)~~
2. [Prioritization](https://github.com/lucas-clemente/quic-go/pull/3442)
## Congestion Control
This demo uses a single rendition. A production implementation will want to:
1. Change the rendition bitrate to match the estimated bitrate.
2. Switch renditions at segment boundaries based on the estimated bitrate.
3. or both!
Also, quic-go ships with the default New Reno congestion control. Something like [BBRv2](https://github.com/lucas-clemente/quic-go/issues/341) will work much better for live video as it limits RTT growth.
# Setup
## Requirements
* Go
* Rust
* ffmpeg
* openssl
* Chrome Canary
* Chrome
## Media
This demo simulates a live stream by reading a file from disk and sleeping based on media timestamps. Obviously you should hook this up to a real live stream to do anything useful.
Download your favorite media file:
Download your favorite media file and convert it to fragmented MP4:
```
wget http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4 -O media/source.mp4
./media/fragment
```
Use ffmpeg to create a LL-DASH playlist. This creates a segment every 2s and MP4 fragment every 10ms.
```
./media/generate
```
You can increase the `frag_duration` (microseconds) to slightly reduce the file size in exchange for higher latency.
## TLS
## Certificates
Unfortunately, QUIC mandates TLS and makes local development difficult.
If you have a valid certificate you can use it instead of self-signing. The go binaries take a `-tls-cert` and `-tls-key` argument. Skip the remaining steps in this section and use your hostname instead.
If you have a valid certificate you can use it instead of self-signing.
Otherwise, we use [mkcert](https://github.com/FiloSottile/mkcert) to install a self-signed CA:
```
@ -72,13 +47,13 @@ The Warp server supports WebTransport, pushing media over streams once a connect
```
cd server
go run main.go
cargo run
```
This can be accessed via WebTransport on `https://localhost:4443` by default.
This listens for WebTransport connections (not HTTP) on `https://localhost:4443` by default.
## Web Player
The web assets need to be hosted with a HTTPS server. If you're using a self-signed certificate, you may need to ignore the security warning in Chrome (Advanced -> proceed to localhost).
## Web
The web assets need to be hosted with a HTTPS server.
```
cd web
@ -87,5 +62,3 @@ yarn serve
```
These can be accessed on `https://localhost:4444` by default.
If you use a custom domain for the Warp server, make sure to override the server URL with the `url` query string parameter, e.g. `https://localhost:4444/?url=https://warp.demo`.

3
cert/.dockerignore Normal file
View File

@ -0,0 +1,3 @@
*.crt
*.key
*.hex

1
cert/.gitignore vendored
View File

@ -1,2 +1,3 @@
*.crt
*.key
*.hex

20
cert/Dockerfile Normal file
View File

@ -0,0 +1,20 @@
# Use ubuntu because it's ez
FROM ubuntu:latest
# Use openssl and golang to generate certificates
RUN apt-get update && \
apt-get install -y ca-certificates openssl golang
# Save the certificates to a volume
VOLUME /cert
WORKDIR /cert
# Download the go modules
COPY go.mod go.sum ./
RUN go mod download
# Copy over the remaining files.
COPY . .
# TODO support an output directory
CMD ./generate

View File

@ -17,4 +17,4 @@ go run filippo.io/mkcert -ecdsa -install
go run filippo.io/mkcert -ecdsa -days 10 -cert-file "$CRT" -key-file "$KEY" localhost 127.0.0.1 ::1
# Compute the sha256 fingerprint of the certificate for WebTransport
openssl x509 -in "$CRT" -outform der | openssl dgst -sha256 > ../web/fingerprint.hex
openssl x509 -in "$CRT" -outform der | openssl dgst -sha256 > localhost.hex

45
docker-compose.yml Normal file
View File

@ -0,0 +1,45 @@
version: '3'
services:
# Generate certificates only valid for 14 days.
cert:
build: ./cert
volumes:
- cert:/cert
# Generate a fragmented MP4 file for testing.
media:
build: ./media
volumes:
- media:/media
# Serve the web code once we have certificates.
web:
build: ./web
ports:
- "4444:4444"
volumes:
- cert:/cert
depends_on:
cert:
condition: service_completed_successfully
# Run the server once we have certificates and media.
server:
build: ./server
environment:
- RUST_LOG=debug
ports:
- "4443:4443/udp"
volumes:
- cert:/cert
- media:/media
depends_on:
cert:
condition: service_completed_successfully
media:
condition: service_completed_successfully
volumes:
cert:
media:

1
media/.dockerignore Normal file
View File

@ -0,0 +1 @@
fragmented.mp4

20
media/Dockerfile Normal file
View File

@ -0,0 +1,20 @@
# Create a build image
FROM ubuntu:latest
# Install necessary packages
RUN apt-get update && \
apt-get install -y \
ca-certificates \
wget \
ffmpeg
# Create a media volume
VOLUME /media
WORKDIR /media
# Download a file from the internet, in this case my boy big buck bunny
RUN wget http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4 -O source.mp4
# Copy an run a script to create a fragmented mp4 (more overhead, easier to split)
COPY fragment .
CMD ./fragment

1
server/.dockerignore Normal file
View File

@ -0,0 +1 @@
target

6
server/Cargo.lock generated
View File

@ -329,6 +329,8 @@ dependencies = [
[[package]]
name = "mp4"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "509348cba250e7b852a875100a2ddce7a36ee3abf881a681c756670c1774264d"
dependencies = [
"byteorder",
"bytes",
@ -384,7 +386,7 @@ dependencies = [
[[package]]
name = "octets"
version = "0.2.0"
source = "git+https://github.com/n8o/quiche.git?branch=master#0137dc3ca6f4f31e3175d0a0868acb9c64b46cc7"
source = "git+https://github.com/kixelated/quiche.git?branch=master#007a25b35b9509d673466fed8ddc73fd8d9b4184"
[[package]]
name = "once_cell"
@ -404,7 +406,7 @@ dependencies = [
[[package]]
name = "quiche"
version = "0.17.1"
source = "git+https://github.com/n8o/quiche.git?branch=master#0137dc3ca6f4f31e3175d0a0868acb9c64b46cc7"
source = "git+https://github.com/kixelated/quiche.git?branch=master#007a25b35b9509d673466fed8ddc73fd8d9b4184"
dependencies = [
"cmake",
"lazy_static",

View File

@ -6,13 +6,13 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
quiche = { git = "https://github.com/n8o/quiche.git", branch = "master" } # WebTransport fork
quiche = { git = "https://github.com/kixelated/quiche.git", branch = "master" } # WebTransport fork
clap = { version = "4.0", features = [ "derive" ] }
log = { version = "0.4", features = ["std"] }
mio = { version = "0.8", features = ["net", "os-poll"] }
env_logger = "0.9.3"
ring = "0.16"
anyhow = "1.0.70"
mp4 = { path = "../../mp4-rust" } # { git = "https://github.com/kixelated/mp4-rust.git", branch = "trexs" }
mp4 = "0.13.0"
serde = "1.0.160"
serde_json = "1.0"

42
server/Dockerfile Normal file
View File

@ -0,0 +1,42 @@
# Use the official Rust image as the base image
FROM rust:latest as build
# Quiche requires docker
RUN apt-get update && \
apt-get install -y cmake
# Set the build directory
WORKDIR /warp
# Create an empty project
RUN cargo init --bin
# Copy the Cargo.toml and Cargo.lock files to the container
COPY Cargo.toml Cargo.lock ./
# Build the empty project so we download/cache dependencies
RUN cargo build --release
# Copy the entire project to the container
COPY . .
# Build the project
RUN cargo build --release
# Make a new image to run the binary
FROM ubuntu:latest
# Use a volume to access certificates
VOLUME /cert
# Use another volume to access the media
VOLUME /media
# Expose port 4443 for the server
EXPOSE 4443/udp
# Copy the built binary
COPY --from=build /warp/target/release/warp /bin
# Set the startup command to run the binary
CMD warp --cert /cert/localhost.crt --key /cert/localhost.key --media /media/fragmented.mp4

View File

@ -6,7 +6,7 @@ use clap::Parser;
#[derive(Parser)]
struct Cli {
/// Listen on this address
#[arg(short, long, default_value = "127.0.0.1:4443")]
#[arg(short, long, default_value = "0.0.0.0:4443")]
addr: String,
/// Use the certificate file at this path

View File

@ -29,13 +29,15 @@ impl transport::App for Session {
Ok(e) => e,
};
log::debug!("webtransport event {:?}", event);
match event {
webtransport::ServerEvent::ConnectRequest(_req) => {
// you can handle request with
// req.authority()
// req.path()
// and you can validate this request with req.origin()
session.accept_connect_request(conn, None).unwrap();
session.accept_connect_request(conn, None)?;
// TODO
let media = media::Source::new("../media/fragmented.mp4")?;
@ -65,10 +67,11 @@ impl transport::App for Session {
}
// Send any pending stream data.
self.streams.poll(conn)?;
// NOTE: This doesn't return an error because it's async, and would be confusing.
self.streams.poll(conn);
// Fetch the next media fragment, possibly queuing up stream data.
self.poll_source(conn, session)?;
self.poll_source(conn, session).expect("poll_source");
Ok(())
}

View File

@ -78,6 +78,8 @@ impl<T: app::App> Server<T> {
}
pub fn run(&mut self) -> anyhow::Result<()> {
log::info!("listening on {}", self.socket.local_addr()?);
loop {
self.wait()?;
self.receive()?;
@ -253,6 +255,8 @@ impl<T: app::App> Server<T> {
for conn in self.conns.values_mut() {
if let Some(session) = &mut conn.session {
if let Err(e) = conn.app.poll(&mut conn.quiche, session) {
log::debug!("app error: {:?}", e);
// Close the connection on any application error
let reason = format!("app error: {:?}", e);
conn.quiche.close(true, 0xff, reason.as_bytes()).ok();

View File

@ -55,7 +55,11 @@ impl Streams {
// If there's no data buffered, try to write it immediately.
let size = if stream.buffer.is_empty() {
conn.stream_send(id, buf, fin)?
match conn.stream_send(id, buf, fin) {
Ok(size) => size,
Err(quiche::Error::Done) => 0,
Err(e) => anyhow::bail!(e),
}
} else {
0
};
@ -71,36 +75,8 @@ impl Streams {
}
// Flush any pending stream data.
pub fn poll(&mut self, conn: &mut quiche::Connection) -> anyhow::Result<()> {
// Loop over stream in order order.
'outer: for stream in self.ordered.iter_mut() {
// Keep reading from the buffer until it's empty.
while !stream.buffer.is_empty() {
// VecDeque is a ring buffer, so we can't write the whole thing at once.
let parts = stream.buffer.as_slices();
let size = conn.stream_send(stream.id, parts.0, false)?;
if size == 0 {
// No more space available for this stream.
continue 'outer;
}
// Remove the bytes that were written.
stream.buffer.drain(..size);
}
if stream.fin {
// Write the stream done signal.
conn.stream_send(stream.id, &[], true)?;
}
}
// Remove streams that are done.
// No need to reprioritize, since the streams are still in order order.
self.ordered
.retain(|stream| !stream.buffer.is_empty() || !stream.fin);
Ok(())
pub fn poll(&mut self, conn: &mut quiche::Connection) {
self.ordered.retain_mut(|s| s.poll(conn).is_ok());
}
// Set the send order of the stream.
@ -143,3 +119,31 @@ impl Streams {
pos
}
}
impl Stream {
fn poll(&mut self, conn: &mut quiche::Connection) -> quiche::Result<()> {
// Keep reading from the buffer until it's empty.
while !self.buffer.is_empty() {
// VecDeque is a ring buffer, so we can't write the whole thing at once.
let parts = self.buffer.as_slices();
let size = conn.stream_send(self.id, parts.0, false)?;
if size == 0 {
// No more space available for this stream.
return Ok(());
}
// Remove the bytes that were written.
self.buffer.drain(..size);
}
if self.fin {
// Write the stream done signal.
conn.stream_send(self.id, &[], true)?;
Err(quiche::Error::Done)
} else {
Ok(())
}
}
}

4
web/.dockerignore Normal file
View File

@ -0,0 +1,4 @@
dist
.parcel-cache
node_modules
fingerprint.hex

13
web/.eslintrc.cjs Normal file
View File

@ -0,0 +1,13 @@
/* eslint-env node */
module.exports = {
extends: ['eslint:recommended', 'plugin:@typescript-eslint/recommended'],
parser: '@typescript-eslint/parser',
plugins: ['@typescript-eslint'],
root: true,
ignorePatterns: [ 'dist', 'node_modules' ],
rules: {
"@typescript-eslint/ban-ts-comment": "off",
"@typescript-eslint/no-non-null-assertion": "off",
"@typescript-eslint/no-explicit-any": "off",
}
};

26
web/Dockerfile Normal file
View File

@ -0,0 +1,26 @@
# Use the official Node.js image as the build image
FROM node:latest
# Set the build directory
WORKDIR /build
# Copy the package.json and yarn.lock files to the container
COPY package*.json yarn.lock ./
# Install dependencies
RUN yarn install
# Copy the entire project to the container
COPY . .
# Expose port 4444 for serving the project
EXPOSE 4444
# Copy the certificate hash before running
VOLUME /cert
# Make a symlink to the certificate fingerprint
RUN ln -s /cert/localhost.hex fingerprint.hex
# Copy the certificate fingerprint and start the web server
CMD yarn parcel serve --https --cert /cert/localhost.crt --key /cert/localhost.key --port 4444

View File

@ -1,7 +1,8 @@
{
"license": "Apache-2.0",
"source": "src/index.html",
"scripts": {
"serve": "parcel serve --https --cert ../cert/localhost.crt --key ../cert/localhost.key --host localhost --port 4444 --open",
"serve": "parcel serve --https --cert ../cert/localhost.crt --key ../cert/localhost.key --port 4444 --open",
"build": "parcel build",
"check": "tsc --noEmit"
},

View File

@ -12,7 +12,7 @@ for (let c = 0; c < fingerprintHex.length-1; c += 2) {
const params = new URLSearchParams(window.location.search)
const url = params.get("url") || "https://127.0.0.1:4443/watch"
const url = params.get("url") || "https://localhost:4443/watch"
const canvas = document.querySelector<HTMLCanvasElement>("canvas#video")!
const transport = new Transport({
@ -30,7 +30,7 @@ const player = new Player({
const play = document.querySelector<HTMLElement>("#screen #play")!
let playFunc = (e: Event) => {
const playFunc = (e: Event) => {
player.play({})
e.preventDefault()

75
web/src/player/audio.ts Normal file
View File

@ -0,0 +1,75 @@
import * as Message from "./message";
import { Ring } from "./ring"
export default class Audio {
ring: Ring;
queue: Array<AudioData>;
sync?: DOMHighResTimeStamp; // the wall clock value for timestamp 0, in microseconds
last?: number; // the timestamp of the last rendered frame, in microseconds
constructor(config: Message.AudioConfig) {
this.ring = new Ring(config.ring);
this.queue = [];
}
push(frame: AudioData) {
if (!this.sync) {
// Save the frame as the sync point
// TODO sync with video
this.sync = 1000 * performance.now() - frame.timestamp
}
// Drop any old frames
if (this.last && frame.timestamp <= this.last) {
frame.close()
return
}
// Insert the frame into the queue sorted by timestamp.
if (this.queue.length > 0 && this.queue[this.queue.length-1].timestamp <= frame.timestamp) {
// Fast path because we normally append to the end.
this.queue.push(frame)
} else {
// Do a full binary search
let low = 0
let high = this.queue.length;
while (low < high) {
const mid = (low + high) >>> 1;
if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1;
else high = mid;
}
this.queue.splice(low, 0, frame)
}
}
draw() {
// Convert to microseconds
const now = 1000 * performance.now();
// Determine the target timestamp.
const target = now - this.sync!
// Check if we should skip some frames
while (this.queue.length) {
const next = this.queue[0]
if (next.timestamp > target) {
const ok = this.ring.write(next)
if (!ok) {
console.warn("ring buffer is full")
// No more space in the ring
break
}
} else {
console.warn("dropping audio")
}
next.close()
this.queue.shift()
}
}
}

91
web/src/player/video.ts Normal file
View File

@ -0,0 +1,91 @@
import * as Message from "./message";
export default class Video {
canvas: OffscreenCanvas;
queue: Array<VideoFrame>;
render: number; // non-zero if requestAnimationFrame has been called
sync?: DOMHighResTimeStamp; // the wall clock value for timestamp 0, in microseconds
last?: number; // the timestamp of the last rendered frame, in microseconds
constructor(config: Message.VideoConfig) {
this.canvas = config.canvas;
this.queue = [];
this.render = 0;
}
push(frame: VideoFrame) {
if (!this.sync) {
// Save the frame as the sync point
this.sync = 1000 * performance.now() - frame.timestamp
}
// Drop any old frames
if (this.last && frame.timestamp <= this.last) {
frame.close()
return
}
// Insert the frame into the queue sorted by timestamp.
if (this.queue.length > 0 && this.queue[this.queue.length-1].timestamp <= frame.timestamp) {
// Fast path because we normally append to the end.
this.queue.push(frame)
} else {
// Do a full binary search
let low = 0
let high = this.queue.length;
while (low < high) {
const mid = (low + high) >>> 1;
if (this.queue[mid].timestamp < frame.timestamp) low = mid + 1;
else high = mid;
}
this.queue.splice(low, 0, frame)
}
// Queue up to render the next frame.
if (!this.render) {
this.render = self.requestAnimationFrame(this.draw.bind(this))
}
}
draw(now: DOMHighResTimeStamp) {
// Convert to microseconds
now *= 1000;
// Determine the target timestamp.
const target = now - this.sync!
let frame = this.queue[0];
if (frame.timestamp >= target) {
// nothing to render yet, wait for the next animation frame
this.render = self.requestAnimationFrame(this.draw.bind(this))
return
}
this.queue.shift();
// Check if we should skip some frames
while (this.queue.length) {
const next = this.queue[0]
if (next.timestamp > target) break
frame.close()
frame = this.queue.shift()!;
}
const ctx = this.canvas.getContext("2d");
ctx!.drawImage(frame, 0, 0, this.canvas.width, this.canvas.height) // TODO aspect ratio
this.last = frame.timestamp;
frame.close()
if (this.queue.length) {
this.render = self.requestAnimationFrame(this.draw.bind(this))
} else {
this.render = 0
}
}
}

View File

@ -56,7 +56,7 @@ export default class Transport {
const q = await this.quic
const streams = q.incomingUnidirectionalStreams.getReader()
while (true) {
for (;;) {
const result = await streams.read()
if (result.done) break
@ -66,7 +66,7 @@ export default class Transport {
}
async handleStream(stream: ReadableStream) {
let r = new Stream.Reader(stream)
const r = new Stream.Reader(stream)
while (!await r.done()) {
const size = await r.uint32();