This commit is contained in:
Luke Curley 2022-11-18 15:13:35 -08:00
parent de12598549
commit 2608baac33
12 changed files with 201 additions and 170 deletions

View File

@ -68,13 +68,15 @@ mkcert -install
With no arguments, the server will generate self-signed cert using this root CA. With no arguments, the server will generate self-signed cert using this root CA.
## Server ## Server
The Warp server defaults to listening on UDP 4443. It supports HTTP/3 and WebTransport, pushing media over WebTransport streams once a connection has been established. A more refined implementation would load content based on the WebTransport URL or some other messaging scheme. The Warp server supports WebTransport, pushing media over streams once a connection has been established. A more refined implementation would load content based on the WebTransport URL or some other messaging scheme.
``` ```
cd server cd server
go run main.go go run main.go
``` ```
This can be accessed via WebTransport on `https://localhost:4443` by default.
## Web Player ## Web Player
The web assets need to be hosted with a HTTPS server. If you're using a self-signed certificate, you will need to ignore the security warning in Chrome (Advanced -> proceed to localhost). This can be avoided by adding your certificate to the root CA but I'm too lazy to do that. The web assets need to be hosted with a HTTPS server. If you're using a self-signed certificate, you will need to ignore the security warning in Chrome (Advanced -> proceed to localhost). This can be avoided by adding your certificate to the root CA but I'm too lazy to do that.
@ -84,7 +86,7 @@ yarn install
yarn serve yarn serve
``` ```
These can be accessed on `https://127.0.0.1:4444` by default. These can be accessed on `https://localhost:4444` by default.
## Chrome ## Chrome
Now we need to make Chrome accept these certificates, which normally would involve trusting a root CA but this was not working with WebTransport when I last tried. Now we need to make Chrome accept these certificates, which normally would involve trusting a root CA but this was not working with WebTransport when I last tried.
@ -93,5 +95,5 @@ Instead, we need to run a *fresh instance* of Chrome, instructing it to allow ou
Launch a new instance of Chrome Canary: Launch a new instance of Chrome Canary:
``` ```
/Applications/Google\ Chrome\ Canary.app/Contents/MacOS/Google\ Chrome\ Canary --allow-insecure-localhost --origin-to-force-quic-on=127.0.0.1:4443 https://127.0.0.1:4444 /Applications/Google\ Chrome\ Canary.app/Contents/MacOS/Google\ Chrome\ Canary --allow-insecure-localhost --origin-to-force-quic-on=localhost:4443 https://localhost:4444
``` ```

View File

@ -42,7 +42,7 @@
const params = new URLSearchParams(window.location.search) const params = new URLSearchParams(window.location.search)
const player = new Player({ const player = new Player({
url: params.get("url") || "https://127.0.0.1:4443", url: params.get("url") || "https://localhost:4443",
vid: vidRef, vid: vidRef,
stats: statsRef, stats: statsRef,

View File

@ -4,11 +4,15 @@ export interface Message {
} }
export interface MessageInit { export interface MessageInit {
id: number // integer id id: string
} }
export interface MessageSegment { export interface MessageSegment {
init: number // integer id of the init segment init: string // id of the init segment
timestamp: number // presentation timestamp in milliseconds of the first sample timestamp: number // presentation timestamp in milliseconds of the first sample
// TODO track would be nice // TODO track would be nice
} }
export interface Debug {
max_bitrate: number
}

View File

@ -10,7 +10,7 @@ import { Message, MessageInit, MessageSegment } from "./message"
export class Player { export class Player {
mediaSource: MediaSource; mediaSource: MediaSource;
init: Map<number, InitParser>; init: Map<string, InitParser>;
audio: Track; audio: Track;
video: Track; video: Track;
@ -89,27 +89,25 @@ export class Player {
} }
sendThrottle() { sendThrottle() {
let rate = 0;
if (this.throttleCount > 0) {
// TODO detect the incoming bitrate instead of hard-coding // TODO detect the incoming bitrate instead of hard-coding
// Right shift by throttle to divide by 2,4,8,16,etc each time
const bitrate = 4 * 1024 * 1024 // 4Mb/s const bitrate = 4 * 1024 * 1024 // 4Mb/s
// Right shift by throttle to divide by 2,4,8,16,etc each time rate = bitrate >> (this.throttleCount-1)
// Right shift by 3 more to divide by 8 to convert bits to bytes
// Right shift by another 2 to divide by 4 to get the number of bytes in a quarter of a second
let rate = bitrate >> (this.throttleCount + 3)
let buffer = bitrate >> (this.throttleCount + 5) // 250ms before dropping
const str = formatBits(8*rate) + "/s" const str = formatBits(rate) + "/s"
this.throttleRef.textContent = `Throttle: ${ str }`; this.throttleRef.textContent = `Throttle: ${ str }`;
} else {
// NOTE: We don't use random packet loss because it's not a good simulator of how congestion works. this.throttleRef.textContent = "Throttle: none";
// Delay-based congestion control like BBR most ignores packet loss, rightfully so. }
// Send the server a message to fake network congestion. // Send the server a message to fake network congestion.
// This is done on the server side at the socket-level for maximum accuracy (impacts all packets).
this.sendMessage({ this.sendMessage({
"x-throttle": { "debug": {
rate: rate, max_bitrate: rate,
buffer: buffer,
}, },
}) })
} }

View File

@ -4,7 +4,7 @@ go 1.18
require ( require (
github.com/abema/go-mp4 v0.7.2 github.com/abema/go-mp4 v0.7.2
github.com/kixelated/invoker v0.9.2 github.com/kixelated/invoker v0.10.0
github.com/lucas-clemente/quic-go v0.30.0 github.com/lucas-clemente/quic-go v0.30.0
github.com/marten-seemann/webtransport-go v0.2.0 github.com/marten-seemann/webtransport-go v0.2.0
github.com/zencoder/go-dash/v3 v3.0.2 github.com/zencoder/go-dash/v3 v3.0.2
@ -28,3 +28,7 @@ require (
golang.org/x/text v0.3.7 // indirect golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.12 // indirect golang.org/x/tools v0.1.12 // indirect
) )
replace github.com/lucas-clemente/quic-go => ../../quic-go
replace github.com/marten-seemann/webtransport-go => ../../webtransport-go

View File

@ -68,13 +68,13 @@ github.com/kisielk/errcheck v1.4.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kixelated/invoker v0.9.2 h1:Pz8JDiRs8EzGc4EGVMZ4RYvFh+iQLXGZ4PG2KZyAh/0= github.com/kixelated/invoker v0.9.2 h1:Pz8JDiRs8EzGc4EGVMZ4RYvFh+iQLXGZ4PG2KZyAh/0=
github.com/kixelated/invoker v0.9.2/go.mod h1:RjG3iqm/sKwZjOpcW4SGq+l+4DJCDR/yUtc70VjCRB8= github.com/kixelated/invoker v0.9.2/go.mod h1:RjG3iqm/sKwZjOpcW4SGq+l+4DJCDR/yUtc70VjCRB8=
github.com/kixelated/invoker v0.10.0 h1:M93MfnlR5OjKoQNcM2mvEVwKeA3wolxEeqShCkLbcUE=
github.com/kixelated/invoker v0.10.0/go.mod h1:RjG3iqm/sKwZjOpcW4SGq+l+4DJCDR/yUtc70VjCRB8=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lucas-clemente/quic-go v0.30.0 h1:nwLW0h8ahVQ5EPTIM7uhl/stHqQDea15oRlYKZmw2O0=
github.com/lucas-clemente/quic-go v0.30.0/go.mod h1:ssOrRsOmdxa768Wr78vnh2B8JozgLsMzG/g+0qEC7uk=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/marten-seemann/qpack v0.3.0 h1:UiWstOgT8+znlkDPOg2+3rIuYXJ2CnGDkGUXN6ki6hE= github.com/marten-seemann/qpack v0.3.0 h1:UiWstOgT8+znlkDPOg2+3rIuYXJ2CnGDkGUXN6ki6hE=
@ -83,8 +83,6 @@ github.com/marten-seemann/qtls-go1-18 v0.1.3 h1:R4H2Ks8P6pAtUagjFty2p7BVHn3XiwDA
github.com/marten-seemann/qtls-go1-18 v0.1.3/go.mod h1:mJttiymBAByA49mhlNZZGrH5u1uXYZJ+RW28Py7f4m4= github.com/marten-seemann/qtls-go1-18 v0.1.3/go.mod h1:mJttiymBAByA49mhlNZZGrH5u1uXYZJ+RW28Py7f4m4=
github.com/marten-seemann/qtls-go1-19 v0.1.1 h1:mnbxeq3oEyQxQXwI4ReCgW9DPoPR94sNlqWoDZnjRIE= github.com/marten-seemann/qtls-go1-19 v0.1.1 h1:mnbxeq3oEyQxQXwI4ReCgW9DPoPR94sNlqWoDZnjRIE=
github.com/marten-seemann/qtls-go1-19 v0.1.1/go.mod h1:5HTDWtVudo/WFsHKRNuOhWlbdjrfs5JHrYb0wIJqGpI= github.com/marten-seemann/qtls-go1-19 v0.1.1/go.mod h1:5HTDWtVudo/WFsHKRNuOhWlbdjrfs5JHrYb0wIJqGpI=
github.com/marten-seemann/webtransport-go v0.2.0 h1:987jPVqcyE3vF+CHNIxDhT0P21O+bI4fVF+0NoRujSo=
github.com/marten-seemann/webtransport-go v0.2.0/go.mod h1:XmnWYsWXaxUF7kjeIIzLWPyS+q0OcBY5vA64NuyK0ps=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=

View File

@ -22,8 +22,9 @@ import (
// It's just much easier to read from disk and "fake" being live. // It's just much easier to read from disk and "fake" being live.
type Media struct { type Media struct {
base fs.FS base fs.FS
audio *mpd.Representation inits map[string]*MediaInit
video *mpd.Representation video []*mpd.Representation
audio []*mpd.Representation
} }
func NewMedia(playlistPath string) (m *Media, err error) { func NewMedia(playlistPath string) (m *Media, err error) {
@ -51,132 +52,148 @@ func NewMedia(playlistPath string) (m *Media, err error) {
return nil, fmt.Errorf("missing representation mime type") return nil, fmt.Errorf("missing representation mime type")
} }
if representation.Bandwidth == nil {
return nil, fmt.Errorf("missing representation bandwidth")
}
switch *representation.MimeType { switch *representation.MimeType {
case "video/mp4": case "video/mp4":
m.video = representation m.video = append(m.video, representation)
case "audio/mp4": case "audio/mp4":
m.audio = representation m.audio = append(m.audio, representation)
} }
} }
if m.video == nil { if len(m.video) == 0 {
return nil, fmt.Errorf("no video representation found") return nil, fmt.Errorf("no video representation found")
} }
if m.audio == nil { if len(m.audio) == 0 {
return nil, fmt.Errorf("no audio representation found") return nil, fmt.Errorf("no audio representation found")
} }
m.inits = make(map[string]*MediaInit)
var reps []*mpd.Representation
reps = append(reps, m.audio...)
reps = append(reps, m.video...)
for _, rep := range reps {
path := *rep.SegmentTemplate.Initialization
// TODO Support the full template engine
path = strings.ReplaceAll(path, "$RepresentationID$", *rep.ID)
f, err := fs.ReadFile(m.base, path)
if err != nil {
return nil, fmt.Errorf("failed to read init file: %w", err)
}
init, err := newMediaInit(*rep.ID, f)
if err != nil {
return nil, fmt.Errorf("failed to create init segment: %w", err)
}
m.inits[*rep.ID] = init
}
return m, nil return m, nil
} }
func (m *Media) Start() (audio *MediaStream, video *MediaStream, err error) { func (m *Media) Start(bitrate func() uint64) (inits map[string]*MediaInit, audio *MediaStream, video *MediaStream, err error) {
start := time.Now() start := time.Now()
audio, err = newMediaStream(m, m.audio, start) audio, err = newMediaStream(m, m.audio, start, bitrate)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, nil, err
} }
video, err = newMediaStream(m, m.video, start) video, err = newMediaStream(m, m.video, start, bitrate)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, nil, err
} }
return audio, video, nil return m.inits, audio, video, nil
} }
type MediaStream struct { type MediaStream struct {
media *Media Media *Media
init *MediaInit
start time.Time start time.Time
rep *mpd.Representation reps []*mpd.Representation
sequence int sequence int
bitrate func() uint64 // returns the current estimated bitrate
} }
func newMediaStream(m *Media, rep *mpd.Representation, start time.Time) (ms *MediaStream, err error) { func newMediaStream(m *Media, reps []*mpd.Representation, start time.Time, bitrate func() uint64) (ms *MediaStream, err error) {
ms = new(MediaStream) ms = new(MediaStream)
ms.media = m ms.Media = m
ms.rep = rep ms.reps = reps
ms.start = start ms.start = start
ms.bitrate = bitrate
return ms, nil
}
func (ms *MediaStream) chooseRepresentation() (choice *mpd.Representation) {
bitrate := ms.bitrate()
// Loop over the renditions and pick the highest bitrate we can support
for _, r := range ms.reps {
if uint64(*r.Bandwidth) <= bitrate && (choice == nil || *r.Bandwidth > *choice.Bandwidth) {
choice = r
}
}
if choice != nil {
return choice
}
// We can't support any of the bitrates, so find the lowest one.
for _, r := range ms.reps {
if choice == nil || *r.Bandwidth < *choice.Bandwidth {
choice = r
}
}
return choice
}
// Returns the next segment in the stream
func (ms *MediaStream) Next(ctx context.Context) (segment *MediaSegment, err error) {
rep := ms.chooseRepresentation()
if rep.SegmentTemplate == nil { if rep.SegmentTemplate == nil {
return nil, fmt.Errorf("missing segment template") return nil, fmt.Errorf("missing segment template")
} }
if rep.SegmentTemplate.Media == nil {
return nil, fmt.Errorf("no media template")
}
if rep.SegmentTemplate.StartNumber == nil { if rep.SegmentTemplate.StartNumber == nil {
return nil, fmt.Errorf("missing start number") return nil, fmt.Errorf("missing start number")
} }
ms.sequence = int(*rep.SegmentTemplate.StartNumber) path := *rep.SegmentTemplate.Media
sequence := ms.sequence + int(*rep.SegmentTemplate.StartNumber)
return ms, nil
}
// Returns the init segment for the stream
func (ms *MediaStream) Init(ctx context.Context) (init *MediaInit, err error) {
// Cache the init segment
if ms.init != nil {
return ms.init, nil
}
if ms.rep.SegmentTemplate.Initialization == nil {
return nil, fmt.Errorf("no init template")
}
path := *ms.rep.SegmentTemplate.Initialization
// TODO Support the full template engine // TODO Support the full template engine
path = strings.ReplaceAll(path, "$RepresentationID$", *ms.rep.ID) path = strings.ReplaceAll(path, "$RepresentationID$", *rep.ID)
path = strings.ReplaceAll(path, "$Number%05d$", fmt.Sprintf("%05d", sequence)) // TODO TODO
f, err := fs.ReadFile(ms.media.base, path)
if err != nil {
return nil, fmt.Errorf("failed to read init file: %w", err)
}
ms.init, err = newMediaInit(f)
if err != nil {
return nil, fmt.Errorf("failed to create init segment: %w", err)
}
return ms.init, nil
}
// Returns the next segment in the stream
func (ms *MediaStream) Segment(ctx context.Context) (segment *MediaSegment, err error) {
if ms.rep.SegmentTemplate.Media == nil {
return nil, fmt.Errorf("no media template")
}
path := *ms.rep.SegmentTemplate.Media
// TODO Support the full template engine
path = strings.ReplaceAll(path, "$RepresentationID$", *ms.rep.ID)
path = strings.ReplaceAll(path, "$Number%05d$", fmt.Sprintf("%05d", ms.sequence)) // TODO TODO
// Check if this is the first segment in the playlist
first := ms.sequence == int(*ms.rep.SegmentTemplate.StartNumber)
// Try openning the file // Try openning the file
f, err := ms.media.base.Open(path) f, err := ms.Media.base.Open(path)
if !first && errors.Is(err, os.ErrNotExist) { if errors.Is(err, os.ErrNotExist) && ms.sequence != 0 {
// Return EOF if the next file is missing // Return EOF if the next file is missing
return nil, nil return nil, nil
} else if err != nil { } else if err != nil {
return nil, fmt.Errorf("failed to open segment file: %w", err) return nil, fmt.Errorf("failed to open segment file: %w", err)
} }
offset := ms.sequence - int(*ms.rep.SegmentTemplate.StartNumber) duration := time.Duration(*rep.SegmentTemplate.Duration) / time.Nanosecond
duration := time.Duration(*ms.rep.SegmentTemplate.Duration) / time.Nanosecond timestamp := time.Duration(ms.sequence) * duration
timestamp := time.Duration(offset) * duration init := ms.Media.inits[*rep.ID]
// We need the init segment to properly parse the media segment
init, err := ms.Init(ctx)
if err != nil {
return nil, fmt.Errorf("failed to open init file: %w", err)
}
segment, err = newMediaSegment(ms, init, f, timestamp) segment, err = newMediaSegment(ms, init, f, timestamp)
if err != nil { if err != nil {
@ -189,12 +206,14 @@ func (ms *MediaStream) Segment(ctx context.Context) (segment *MediaSegment, err
} }
type MediaInit struct { type MediaInit struct {
ID string
Raw []byte Raw []byte
Timescale int Timescale int
} }
func newMediaInit(raw []byte) (mi *MediaInit, err error) { func newMediaInit(id string, raw []byte) (mi *MediaInit, err error) {
mi = new(MediaInit) mi = new(MediaInit)
mi.ID = id
mi.Raw = raw mi.Raw = raw
err = mi.parse() err = mi.parse()
@ -241,18 +260,21 @@ func (mi *MediaInit) parse() (err error) {
} }
type MediaSegment struct { type MediaSegment struct {
stream *MediaStream Stream *MediaStream
init *MediaInit Init *MediaInit
file fs.File file fs.File
timestamp time.Duration timestamp time.Duration
} }
func newMediaSegment(s *MediaStream, init *MediaInit, file fs.File, timestamp time.Duration) (ms *MediaSegment, err error) { func newMediaSegment(s *MediaStream, init *MediaInit, file fs.File, timestamp time.Duration) (ms *MediaSegment, err error) {
ms = new(MediaSegment) ms = new(MediaSegment)
ms.stream = s ms.Stream = s
ms.init = init ms.Init = init
ms.file = file ms.file = file
ms.timestamp = timestamp ms.timestamp = timestamp
return ms, nil return ms, nil
} }
@ -287,7 +309,7 @@ func (ms *MediaSegment) Read(ctx context.Context) (chunk []byte, err error) {
if sample != nil { if sample != nil {
// Simulate a live stream by sleeping before we write this sample. // Simulate a live stream by sleeping before we write this sample.
// Figure out how much time has elapsed since the start // Figure out how much time has elapsed since the start
elapsed := time.Since(ms.stream.start) elapsed := time.Since(ms.Stream.start)
delay := sample.Timestamp - elapsed delay := sample.Timestamp - elapsed
if delay > 0 { if delay > 0 {
@ -329,13 +351,13 @@ func (ms *MediaSegment) parseAtom(ctx context.Context, buf []byte) (sample *medi
dts = time.Duration(box.BaseMediaDecodeTimeV1) dts = time.Duration(box.BaseMediaDecodeTimeV1)
} }
if ms.init.Timescale == 0 { if ms.Init.Timescale == 0 {
return nil, fmt.Errorf("missing timescale") return nil, fmt.Errorf("missing timescale")
} }
// Convert to seconds // Convert to seconds
// TODO What about PTS? // TODO What about PTS?
sample.Timestamp = dts * time.Second / time.Duration(ms.init.Timescale) sample.Timestamp = dts * time.Second / time.Duration(ms.Init.Timescale)
} }
// Expands children // Expands children

View File

@ -3,20 +3,18 @@ package warp
type Message struct { type Message struct {
Init *MessageInit `json:"init,omitempty"` Init *MessageInit `json:"init,omitempty"`
Segment *MessageSegment `json:"segment,omitempty"` Segment *MessageSegment `json:"segment,omitempty"`
Throttle *MessageThrottle `json:"x-throttle,omitempty"` Debug *MessageDebug `json:"debug,omitempty"`
} }
type MessageInit struct { type MessageInit struct {
Id int `json:"id"` // ID of the init segment Id string `json:"id"` // ID of the init segment
} }
type MessageSegment struct { type MessageSegment struct {
Init int `json:"init"` // ID of the init segment to use for this segment Init string `json:"init"` // ID of the init segment to use for this segment
Timestamp int `json:"timestamp"` // PTS of the first frame in milliseconds Timestamp int `json:"timestamp"` // PTS of the first frame in milliseconds
} }
type MessageThrottle struct { type MessageDebug struct {
Rate int `json:"rate"` // Artificially limit the socket byte rate per second MaxBitrate int `json:"max_bitrate"` // Artificially limit the QUIC max bitrate
Buffer int `json:"buffer"` // Artificially limit the socket buffer to the number of bytes
Loss float64 `json:"loss"` // Artificially increase packet loss percentage from 0.0 - 1.0
} }

View File

@ -76,21 +76,28 @@ func NewServer(config ServerConfig, media *Media) (s *Server, err error) {
return return
} }
ss, err := NewSession(session, s.media) defer session.Close()
hijacker, ok := w.(http3.Hijacker)
if !ok {
log.Printf("unable to hijack connection")
return
}
conn := hijacker.Connection()
ss, err := NewSession(conn, session, s.media)
if err != nil { if err != nil {
http.Error(w, err.Error(), 500) log.Printf("failed to create session: %s", err)
return return
} }
// Run the session in parallel, logging errors instead of crashing // Run the session in parallel, logging errors instead of crashing
s.sessions.Add(func(ctx context.Context) (err error) { err = ss.Run(r.Context())
err = ss.Run(ctx)
if err != nil { if err != nil {
log.Printf("terminated session: %s", err) log.Printf("terminated session: %s", err)
return
} }
return nil
})
}) })
return s, nil return s, nil

View File

@ -7,25 +7,31 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log"
"math" "math"
"time" "time"
"github.com/kixelated/invoker" "github.com/kixelated/invoker"
"github.com/lucas-clemente/quic-go"
"github.com/marten-seemann/webtransport-go" "github.com/marten-seemann/webtransport-go"
) )
// A single WebTransport session // A single WebTransport session
type Session struct { type Session struct {
conn quic.Connection
inner *webtransport.Session inner *webtransport.Session
media *Media media *Media
inits map[string]*MediaInit
audio *MediaStream audio *MediaStream
video *MediaStream video *MediaStream
streams invoker.Tasks streams invoker.Tasks
} }
func NewSession(session *webtransport.Session, media *Media) (s *Session, err error) { func NewSession(connection quic.Connection, session *webtransport.Session, media *Media) (s *Session, err error) {
s = new(Session) s = new(Session)
s.conn = connection
s.inner = session s.inner = session
s.media = media s.media = media
return s, nil return s, nil
@ -34,13 +40,13 @@ func NewSession(session *webtransport.Session, media *Media) (s *Session, err er
func (s *Session) Run(ctx context.Context) (err error) { func (s *Session) Run(ctx context.Context) (err error) {
defer s.inner.Close() defer s.inner.Close()
s.audio, s.video, err = s.media.Start() s.inits, s.audio, s.video, err = s.media.Start(s.conn.GetMaxBandwidth)
if err != nil { if err != nil {
return fmt.Errorf("failed to start media: %w", err) return fmt.Errorf("failed to start media: %w", err)
} }
// Once we've validated the session, now we can start accessing the streams // Once we've validated the session, now we can start accessing the streams
return invoker.Run(ctx, s.runAccept, s.runAcceptUni, s.runAudio, s.runVideo, s.streams.Repeat) return invoker.Run(ctx, s.runAccept, s.runAcceptUni, s.runInit, s.runAudio, s.runVideo, s.streams.Repeat)
} }
func (s *Session) runAccept(ctx context.Context) (err error) { func (s *Session) runAccept(ctx context.Context) (err error) {
@ -103,6 +109,8 @@ func (s *Session) handleStream(ctx context.Context, stream webtransport.ReceiveS
return fmt.Errorf("failed to read atom payload: %w", err) return fmt.Errorf("failed to read atom payload: %w", err)
} }
log.Println("received message:", string(payload))
msg := Message{} msg := Message{}
err = json.Unmarshal(payload, &msg) err = json.Unmarshal(payload, &msg)
@ -110,26 +118,26 @@ func (s *Session) handleStream(ctx context.Context, stream webtransport.ReceiveS
return fmt.Errorf("failed to decode json payload: %w", err) return fmt.Errorf("failed to decode json payload: %w", err)
} }
if msg.Throttle != nil { if msg.Debug != nil {
s.setThrottle(msg.Throttle) s.setDebug(msg.Debug)
} }
} }
} }
func (s *Session) runAudio(ctx context.Context) (err error) { func (s *Session) runInit(ctx context.Context) (err error) {
init, err := s.audio.Init(ctx) for _, init := range s.inits {
if err != nil { err = s.writeInit(ctx, init)
return fmt.Errorf("failed to fetch init segment: %w", err)
}
// NOTE: Assumes a single init segment
err = s.writeInit(ctx, init, 1)
if err != nil { if err != nil {
return fmt.Errorf("failed to write init stream: %w", err) return fmt.Errorf("failed to write init stream: %w", err)
} }
}
return nil
}
func (s *Session) runAudio(ctx context.Context) (err error) {
for { for {
segment, err := s.audio.Segment(ctx) segment, err := s.audio.Next(ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to get next segment: %w", err) return fmt.Errorf("failed to get next segment: %w", err)
} }
@ -138,7 +146,7 @@ func (s *Session) runAudio(ctx context.Context) (err error) {
return nil return nil
} }
err = s.writeSegment(ctx, segment, 1) err = s.writeSegment(ctx, segment)
if err != nil { if err != nil {
return fmt.Errorf("failed to write segment stream: %w", err) return fmt.Errorf("failed to write segment stream: %w", err)
} }
@ -146,19 +154,8 @@ func (s *Session) runAudio(ctx context.Context) (err error) {
} }
func (s *Session) runVideo(ctx context.Context) (err error) { func (s *Session) runVideo(ctx context.Context) (err error) {
init, err := s.video.Init(ctx)
if err != nil {
return fmt.Errorf("failed to fetch init segment: %w", err)
}
// NOTE: Assumes a single init segment
err = s.writeInit(ctx, init, 2)
if err != nil {
return fmt.Errorf("failed to write init stream: %w", err)
}
for { for {
segment, err := s.video.Segment(ctx) segment, err := s.video.Next(ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to get next segment: %w", err) return fmt.Errorf("failed to get next segment: %w", err)
} }
@ -167,7 +164,7 @@ func (s *Session) runVideo(ctx context.Context) (err error) {
return nil return nil
} }
err = s.writeSegment(ctx, segment, 2) err = s.writeSegment(ctx, segment)
if err != nil { if err != nil {
return fmt.Errorf("failed to write segment stream: %w", err) return fmt.Errorf("failed to write segment stream: %w", err)
} }
@ -175,7 +172,7 @@ func (s *Session) runVideo(ctx context.Context) (err error) {
} }
// Create a stream for an INIT segment and write the container. // Create a stream for an INIT segment and write the container.
func (s *Session) writeInit(ctx context.Context, init *MediaInit, id int) (err error) { func (s *Session) writeInit(ctx context.Context, init *MediaInit) (err error) {
temp, err := s.inner.OpenUniStreamSync(ctx) temp, err := s.inner.OpenUniStreamSync(ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to create stream: %w", err) return fmt.Errorf("failed to create stream: %w", err)
@ -194,7 +191,7 @@ func (s *Session) writeInit(ctx context.Context, init *MediaInit, id int) (err e
stream.SetPriority(math.MaxInt) stream.SetPriority(math.MaxInt)
err = stream.WriteMessage(Message{ err = stream.WriteMessage(Message{
Init: &MessageInit{Id: id}, Init: &MessageInit{Id: init.ID},
}) })
if err != nil { if err != nil {
return fmt.Errorf("failed to write init header: %w", err) return fmt.Errorf("failed to write init header: %w", err)
@ -209,7 +206,7 @@ func (s *Session) writeInit(ctx context.Context, init *MediaInit, id int) (err e
} }
// Create a stream for a segment and write the contents, chunk by chunk. // Create a stream for a segment and write the contents, chunk by chunk.
func (s *Session) writeSegment(ctx context.Context, segment *MediaSegment, init int) (err error) { func (s *Session) writeSegment(ctx context.Context, segment *MediaSegment) (err error) {
temp, err := s.inner.OpenUniStreamSync(ctx) temp, err := s.inner.OpenUniStreamSync(ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to create stream: %w", err) return fmt.Errorf("failed to create stream: %w", err)
@ -232,7 +229,7 @@ func (s *Session) writeSegment(ctx context.Context, segment *MediaSegment, init
err = stream.WriteMessage(Message{ err = stream.WriteMessage(Message{
Segment: &MessageSegment{ Segment: &MessageSegment{
Init: init, Init: segment.Init.ID,
Timestamp: ms, Timestamp: ms,
}, },
}) })
@ -264,6 +261,6 @@ func (s *Session) writeSegment(ctx context.Context, segment *MediaSegment, init
return nil return nil
} }
func (s *Session) setThrottle(msg *MessageThrottle) { func (s *Session) setDebug(msg *MessageDebug) {
// TODO s.conn.SetMaxBandwidth(uint64(msg.MaxBitrate))
} }

View File

@ -123,8 +123,7 @@ func (s *Stream) WriteCancel(code webtransport.ErrorCode) {
} }
func (s *Stream) SetPriority(prio int) { func (s *Stream) SetPriority(prio int) {
// TODO s.inner.SetPriority(prio)
// s.inner.SetPriority(prio)
} }
func (s *Stream) Close() (err error) { func (s *Stream) Close() (err error) {

View File

@ -16,6 +16,8 @@ import (
) )
func main() { func main() {
invoker.Panic = true
err := run(context.Background()) err := run(context.Background())
if err == nil { if err == nil {
return return
@ -39,7 +41,7 @@ func main() {
} }
func run(ctx context.Context) (err error) { func run(ctx context.Context) (err error) {
addr := flag.String("addr", "127.0.0.1:4443", "HTTPS server address") addr := flag.String("addr", ":4443", "HTTPS server address")
cert := flag.String("tls-cert", "../cert/localhost.crt", "TLS certificate file path") cert := flag.String("tls-cert", "../cert/localhost.crt", "TLS certificate file path")
key := flag.String("tls-key", "../cert/localhost.key", "TLS certificate file path") key := flag.String("tls-key", "../cert/localhost.key", "TLS certificate file path")
logDir := flag.String("log-dir", "", "logs will be written to the provided directory") logDir := flag.String("log-dir", "", "logs will be written to the provided directory")