From 2608baac3346f1c9e763039bfe8947be5d29a60c Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 18 Nov 2022 15:13:35 -0800 Subject: [PATCH 1/4] WIP --- README.md | 8 +- player/src/index.html | 2 +- player/src/message.ts | 8 +- player/src/player.ts | 30 +++-- server/go.mod | 6 +- server/go.sum | 6 +- server/internal/warp/media.go | 188 ++++++++++++++++++-------------- server/internal/warp/message.go | 18 ++- server/internal/warp/server.go | 27 +++-- server/internal/warp/session.go | 71 ++++++------ server/internal/warp/stream.go | 3 +- server/main.go | 4 +- 12 files changed, 201 insertions(+), 170 deletions(-) diff --git a/README.md b/README.md index 8942275..7ff852b 100644 --- a/README.md +++ b/README.md @@ -68,13 +68,15 @@ mkcert -install With no arguments, the server will generate self-signed cert using this root CA. ## Server -The Warp server defaults to listening on UDP 4443. It supports HTTP/3 and WebTransport, pushing media over WebTransport streams once a connection has been established. A more refined implementation would load content based on the WebTransport URL or some other messaging scheme. +The Warp server supports WebTransport, pushing media over streams once a connection has been established. A more refined implementation would load content based on the WebTransport URL or some other messaging scheme. ``` cd server go run main.go ``` +This can be accessed via WebTransport on `https://localhost:4443` by default. + ## Web Player The web assets need to be hosted with a HTTPS server. If you're using a self-signed certificate, you will need to ignore the security warning in Chrome (Advanced -> proceed to localhost). This can be avoided by adding your certificate to the root CA but I'm too lazy to do that. @@ -84,7 +86,7 @@ yarn install yarn serve ``` -These can be accessed on `https://127.0.0.1:4444` by default. +These can be accessed on `https://localhost:4444` by default. ## Chrome Now we need to make Chrome accept these certificates, which normally would involve trusting a root CA but this was not working with WebTransport when I last tried. @@ -93,5 +95,5 @@ Instead, we need to run a *fresh instance* of Chrome, instructing it to allow ou Launch a new instance of Chrome Canary: ``` -/Applications/Google\ Chrome\ Canary.app/Contents/MacOS/Google\ Chrome\ Canary --allow-insecure-localhost --origin-to-force-quic-on=127.0.0.1:4443 https://127.0.0.1:4444 +/Applications/Google\ Chrome\ Canary.app/Contents/MacOS/Google\ Chrome\ Canary --allow-insecure-localhost --origin-to-force-quic-on=localhost:4443 https://localhost:4444 ``` diff --git a/player/src/index.html b/player/src/index.html index c5cfde7..efb254c 100644 --- a/player/src/index.html +++ b/player/src/index.html @@ -42,7 +42,7 @@ const params = new URLSearchParams(window.location.search) const player = new Player({ - url: params.get("url") || "https://127.0.0.1:4443", + url: params.get("url") || "https://localhost:4443", vid: vidRef, stats: statsRef, diff --git a/player/src/message.ts b/player/src/message.ts index 4173619..026ebee 100644 --- a/player/src/message.ts +++ b/player/src/message.ts @@ -4,11 +4,15 @@ export interface Message { } export interface MessageInit { - id: number // integer id + id: string } export interface MessageSegment { - init: number // integer id of the init segment + init: string // id of the init segment timestamp: number // presentation timestamp in milliseconds of the first sample // TODO track would be nice } + +export interface Debug { + max_bitrate: number +} \ No newline at end of file diff --git a/player/src/player.ts b/player/src/player.ts index c31724a..3e68f0e 100644 --- a/player/src/player.ts +++ b/player/src/player.ts @@ -10,7 +10,7 @@ import { Message, MessageInit, MessageSegment } from "./message" export class Player { mediaSource: MediaSource; - init: Map; + init: Map; audio: Track; video: Track; @@ -89,27 +89,25 @@ export class Player { } sendThrottle() { - // TODO detect the incoming bitrate instead of hard-coding - const bitrate = 4 * 1024 * 1024 // 4Mb/s + let rate = 0; - // Right shift by throttle to divide by 2,4,8,16,etc each time - // Right shift by 3 more to divide by 8 to convert bits to bytes - // Right shift by another 2 to divide by 4 to get the number of bytes in a quarter of a second - let rate = bitrate >> (this.throttleCount + 3) - let buffer = bitrate >> (this.throttleCount + 5) // 250ms before dropping + if (this.throttleCount > 0) { + // TODO detect the incoming bitrate instead of hard-coding + // Right shift by throttle to divide by 2,4,8,16,etc each time + const bitrate = 4 * 1024 * 1024 // 4Mb/s - const str = formatBits(8*rate) + "/s" - this.throttleRef.textContent = `Throttle: ${ str }`; + rate = bitrate >> (this.throttleCount-1) - // NOTE: We don't use random packet loss because it's not a good simulator of how congestion works. - // Delay-based congestion control like BBR most ignores packet loss, rightfully so. + const str = formatBits(rate) + "/s" + this.throttleRef.textContent = `Throttle: ${ str }`; + } else { + this.throttleRef.textContent = "Throttle: none"; + } // Send the server a message to fake network congestion. - // This is done on the server side at the socket-level for maximum accuracy (impacts all packets). this.sendMessage({ - "x-throttle": { - rate: rate, - buffer: buffer, + "debug": { + max_bitrate: rate, }, }) } diff --git a/server/go.mod b/server/go.mod index 962370c..c1054d2 100644 --- a/server/go.mod +++ b/server/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/abema/go-mp4 v0.7.2 - github.com/kixelated/invoker v0.9.2 + github.com/kixelated/invoker v0.10.0 github.com/lucas-clemente/quic-go v0.30.0 github.com/marten-seemann/webtransport-go v0.2.0 github.com/zencoder/go-dash/v3 v3.0.2 @@ -28,3 +28,7 @@ require ( golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.12 // indirect ) + +replace github.com/lucas-clemente/quic-go => ../../quic-go + +replace github.com/marten-seemann/webtransport-go => ../../webtransport-go diff --git a/server/go.sum b/server/go.sum index d89d1e0..0b7d22e 100644 --- a/server/go.sum +++ b/server/go.sum @@ -68,13 +68,13 @@ github.com/kisielk/errcheck v1.4.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kixelated/invoker v0.9.2 h1:Pz8JDiRs8EzGc4EGVMZ4RYvFh+iQLXGZ4PG2KZyAh/0= github.com/kixelated/invoker v0.9.2/go.mod h1:RjG3iqm/sKwZjOpcW4SGq+l+4DJCDR/yUtc70VjCRB8= +github.com/kixelated/invoker v0.10.0 h1:M93MfnlR5OjKoQNcM2mvEVwKeA3wolxEeqShCkLbcUE= +github.com/kixelated/invoker v0.10.0/go.mod h1:RjG3iqm/sKwZjOpcW4SGq+l+4DJCDR/yUtc70VjCRB8= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lucas-clemente/quic-go v0.30.0 h1:nwLW0h8ahVQ5EPTIM7uhl/stHqQDea15oRlYKZmw2O0= -github.com/lucas-clemente/quic-go v0.30.0/go.mod h1:ssOrRsOmdxa768Wr78vnh2B8JozgLsMzG/g+0qEC7uk= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/marten-seemann/qpack v0.3.0 h1:UiWstOgT8+znlkDPOg2+3rIuYXJ2CnGDkGUXN6ki6hE= @@ -83,8 +83,6 @@ github.com/marten-seemann/qtls-go1-18 v0.1.3 h1:R4H2Ks8P6pAtUagjFty2p7BVHn3XiwDA github.com/marten-seemann/qtls-go1-18 v0.1.3/go.mod h1:mJttiymBAByA49mhlNZZGrH5u1uXYZJ+RW28Py7f4m4= github.com/marten-seemann/qtls-go1-19 v0.1.1 h1:mnbxeq3oEyQxQXwI4ReCgW9DPoPR94sNlqWoDZnjRIE= github.com/marten-seemann/qtls-go1-19 v0.1.1/go.mod h1:5HTDWtVudo/WFsHKRNuOhWlbdjrfs5JHrYb0wIJqGpI= -github.com/marten-seemann/webtransport-go v0.2.0 h1:987jPVqcyE3vF+CHNIxDhT0P21O+bI4fVF+0NoRujSo= -github.com/marten-seemann/webtransport-go v0.2.0/go.mod h1:XmnWYsWXaxUF7kjeIIzLWPyS+q0OcBY5vA64NuyK0ps= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/server/internal/warp/media.go b/server/internal/warp/media.go index 416c184..86d6f93 100644 --- a/server/internal/warp/media.go +++ b/server/internal/warp/media.go @@ -22,8 +22,9 @@ import ( // It's just much easier to read from disk and "fake" being live. type Media struct { base fs.FS - audio *mpd.Representation - video *mpd.Representation + inits map[string]*MediaInit + video []*mpd.Representation + audio []*mpd.Representation } func NewMedia(playlistPath string) (m *Media, err error) { @@ -51,132 +52,148 @@ func NewMedia(playlistPath string) (m *Media, err error) { return nil, fmt.Errorf("missing representation mime type") } + if representation.Bandwidth == nil { + return nil, fmt.Errorf("missing representation bandwidth") + } + switch *representation.MimeType { case "video/mp4": - m.video = representation + m.video = append(m.video, representation) case "audio/mp4": - m.audio = representation + m.audio = append(m.audio, representation) } } - if m.video == nil { + if len(m.video) == 0 { return nil, fmt.Errorf("no video representation found") } - if m.audio == nil { + if len(m.audio) == 0 { return nil, fmt.Errorf("no audio representation found") } + m.inits = make(map[string]*MediaInit) + + var reps []*mpd.Representation + reps = append(reps, m.audio...) + reps = append(reps, m.video...) + + for _, rep := range reps { + path := *rep.SegmentTemplate.Initialization + + // TODO Support the full template engine + path = strings.ReplaceAll(path, "$RepresentationID$", *rep.ID) + + f, err := fs.ReadFile(m.base, path) + if err != nil { + return nil, fmt.Errorf("failed to read init file: %w", err) + } + + init, err := newMediaInit(*rep.ID, f) + if err != nil { + return nil, fmt.Errorf("failed to create init segment: %w", err) + } + + m.inits[*rep.ID] = init + } + return m, nil } -func (m *Media) Start() (audio *MediaStream, video *MediaStream, err error) { +func (m *Media) Start(bitrate func() uint64) (inits map[string]*MediaInit, audio *MediaStream, video *MediaStream, err error) { start := time.Now() - audio, err = newMediaStream(m, m.audio, start) + audio, err = newMediaStream(m, m.audio, start, bitrate) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - video, err = newMediaStream(m, m.video, start) + video, err = newMediaStream(m, m.video, start, bitrate) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - return audio, video, nil + return m.inits, audio, video, nil } type MediaStream struct { - media *Media - init *MediaInit + Media *Media start time.Time - rep *mpd.Representation + reps []*mpd.Representation sequence int + bitrate func() uint64 // returns the current estimated bitrate } -func newMediaStream(m *Media, rep *mpd.Representation, start time.Time) (ms *MediaStream, err error) { +func newMediaStream(m *Media, reps []*mpd.Representation, start time.Time, bitrate func() uint64) (ms *MediaStream, err error) { ms = new(MediaStream) - ms.media = m - ms.rep = rep + ms.Media = m + ms.reps = reps ms.start = start + ms.bitrate = bitrate + return ms, nil +} + +func (ms *MediaStream) chooseRepresentation() (choice *mpd.Representation) { + bitrate := ms.bitrate() + + // Loop over the renditions and pick the highest bitrate we can support + for _, r := range ms.reps { + if uint64(*r.Bandwidth) <= bitrate && (choice == nil || *r.Bandwidth > *choice.Bandwidth) { + choice = r + } + } + + if choice != nil { + return choice + } + + // We can't support any of the bitrates, so find the lowest one. + for _, r := range ms.reps { + if choice == nil || *r.Bandwidth < *choice.Bandwidth { + choice = r + } + } + + return choice +} + +// Returns the next segment in the stream +func (ms *MediaStream) Next(ctx context.Context) (segment *MediaSegment, err error) { + rep := ms.chooseRepresentation() if rep.SegmentTemplate == nil { return nil, fmt.Errorf("missing segment template") } + if rep.SegmentTemplate.Media == nil { + return nil, fmt.Errorf("no media template") + } + if rep.SegmentTemplate.StartNumber == nil { return nil, fmt.Errorf("missing start number") } - ms.sequence = int(*rep.SegmentTemplate.StartNumber) - - return ms, nil -} - -// Returns the init segment for the stream -func (ms *MediaStream) Init(ctx context.Context) (init *MediaInit, err error) { - // Cache the init segment - if ms.init != nil { - return ms.init, nil - } - - if ms.rep.SegmentTemplate.Initialization == nil { - return nil, fmt.Errorf("no init template") - } - - path := *ms.rep.SegmentTemplate.Initialization + path := *rep.SegmentTemplate.Media + sequence := ms.sequence + int(*rep.SegmentTemplate.StartNumber) // TODO Support the full template engine - path = strings.ReplaceAll(path, "$RepresentationID$", *ms.rep.ID) - - f, err := fs.ReadFile(ms.media.base, path) - if err != nil { - return nil, fmt.Errorf("failed to read init file: %w", err) - } - - ms.init, err = newMediaInit(f) - if err != nil { - return nil, fmt.Errorf("failed to create init segment: %w", err) - } - - return ms.init, nil -} - -// Returns the next segment in the stream -func (ms *MediaStream) Segment(ctx context.Context) (segment *MediaSegment, err error) { - if ms.rep.SegmentTemplate.Media == nil { - return nil, fmt.Errorf("no media template") - } - - path := *ms.rep.SegmentTemplate.Media - - // TODO Support the full template engine - path = strings.ReplaceAll(path, "$RepresentationID$", *ms.rep.ID) - path = strings.ReplaceAll(path, "$Number%05d$", fmt.Sprintf("%05d", ms.sequence)) // TODO TODO - - // Check if this is the first segment in the playlist - first := ms.sequence == int(*ms.rep.SegmentTemplate.StartNumber) + path = strings.ReplaceAll(path, "$RepresentationID$", *rep.ID) + path = strings.ReplaceAll(path, "$Number%05d$", fmt.Sprintf("%05d", sequence)) // TODO TODO // Try openning the file - f, err := ms.media.base.Open(path) - if !first && errors.Is(err, os.ErrNotExist) { + f, err := ms.Media.base.Open(path) + if errors.Is(err, os.ErrNotExist) && ms.sequence != 0 { // Return EOF if the next file is missing return nil, nil } else if err != nil { return nil, fmt.Errorf("failed to open segment file: %w", err) } - offset := ms.sequence - int(*ms.rep.SegmentTemplate.StartNumber) - duration := time.Duration(*ms.rep.SegmentTemplate.Duration) / time.Nanosecond + duration := time.Duration(*rep.SegmentTemplate.Duration) / time.Nanosecond + timestamp := time.Duration(ms.sequence) * duration - timestamp := time.Duration(offset) * duration - - // We need the init segment to properly parse the media segment - init, err := ms.Init(ctx) - if err != nil { - return nil, fmt.Errorf("failed to open init file: %w", err) - } + init := ms.Media.inits[*rep.ID] segment, err = newMediaSegment(ms, init, f, timestamp) if err != nil { @@ -189,12 +206,14 @@ func (ms *MediaStream) Segment(ctx context.Context) (segment *MediaSegment, err } type MediaInit struct { + ID string Raw []byte Timescale int } -func newMediaInit(raw []byte) (mi *MediaInit, err error) { +func newMediaInit(id string, raw []byte) (mi *MediaInit, err error) { mi = new(MediaInit) + mi.ID = id mi.Raw = raw err = mi.parse() @@ -241,18 +260,21 @@ func (mi *MediaInit) parse() (err error) { } type MediaSegment struct { - stream *MediaStream - init *MediaInit + Stream *MediaStream + Init *MediaInit + file fs.File timestamp time.Duration } func newMediaSegment(s *MediaStream, init *MediaInit, file fs.File, timestamp time.Duration) (ms *MediaSegment, err error) { ms = new(MediaSegment) - ms.stream = s - ms.init = init + ms.Stream = s + ms.Init = init + ms.file = file ms.timestamp = timestamp + return ms, nil } @@ -287,7 +309,7 @@ func (ms *MediaSegment) Read(ctx context.Context) (chunk []byte, err error) { if sample != nil { // Simulate a live stream by sleeping before we write this sample. // Figure out how much time has elapsed since the start - elapsed := time.Since(ms.stream.start) + elapsed := time.Since(ms.Stream.start) delay := sample.Timestamp - elapsed if delay > 0 { @@ -329,13 +351,13 @@ func (ms *MediaSegment) parseAtom(ctx context.Context, buf []byte) (sample *medi dts = time.Duration(box.BaseMediaDecodeTimeV1) } - if ms.init.Timescale == 0 { + if ms.Init.Timescale == 0 { return nil, fmt.Errorf("missing timescale") } // Convert to seconds // TODO What about PTS? - sample.Timestamp = dts * time.Second / time.Duration(ms.init.Timescale) + sample.Timestamp = dts * time.Second / time.Duration(ms.Init.Timescale) } // Expands children diff --git a/server/internal/warp/message.go b/server/internal/warp/message.go index e84aead..5514b78 100644 --- a/server/internal/warp/message.go +++ b/server/internal/warp/message.go @@ -1,22 +1,20 @@ package warp type Message struct { - Init *MessageInit `json:"init,omitempty"` - Segment *MessageSegment `json:"segment,omitempty"` - Throttle *MessageThrottle `json:"x-throttle,omitempty"` + Init *MessageInit `json:"init,omitempty"` + Segment *MessageSegment `json:"segment,omitempty"` + Debug *MessageDebug `json:"debug,omitempty"` } type MessageInit struct { - Id int `json:"id"` // ID of the init segment + Id string `json:"id"` // ID of the init segment } type MessageSegment struct { - Init int `json:"init"` // ID of the init segment to use for this segment - Timestamp int `json:"timestamp"` // PTS of the first frame in milliseconds + Init string `json:"init"` // ID of the init segment to use for this segment + Timestamp int `json:"timestamp"` // PTS of the first frame in milliseconds } -type MessageThrottle struct { - Rate int `json:"rate"` // Artificially limit the socket byte rate per second - Buffer int `json:"buffer"` // Artificially limit the socket buffer to the number of bytes - Loss float64 `json:"loss"` // Artificially increase packet loss percentage from 0.0 - 1.0 +type MessageDebug struct { + MaxBitrate int `json:"max_bitrate"` // Artificially limit the QUIC max bitrate } diff --git a/server/internal/warp/server.go b/server/internal/warp/server.go index f2af2e3..3cd1391 100644 --- a/server/internal/warp/server.go +++ b/server/internal/warp/server.go @@ -76,21 +76,28 @@ func NewServer(config ServerConfig, media *Media) (s *Server, err error) { return } - ss, err := NewSession(session, s.media) + defer session.Close() + + hijacker, ok := w.(http3.Hijacker) + if !ok { + log.Printf("unable to hijack connection") + return + } + + conn := hijacker.Connection() + + ss, err := NewSession(conn, session, s.media) if err != nil { - http.Error(w, err.Error(), 500) + log.Printf("failed to create session: %s", err) return } // Run the session in parallel, logging errors instead of crashing - s.sessions.Add(func(ctx context.Context) (err error) { - err = ss.Run(ctx) - if err != nil { - log.Printf("terminated session: %s", err) - } - - return nil - }) + err = ss.Run(r.Context()) + if err != nil { + log.Printf("terminated session: %s", err) + return + } }) return s, nil diff --git a/server/internal/warp/session.go b/server/internal/warp/session.go index 6f91120..6a98b02 100644 --- a/server/internal/warp/session.go +++ b/server/internal/warp/session.go @@ -7,25 +7,31 @@ import ( "errors" "fmt" "io" + "log" "math" "time" "github.com/kixelated/invoker" + "github.com/lucas-clemente/quic-go" "github.com/marten-seemann/webtransport-go" ) // A single WebTransport session type Session struct { + conn quic.Connection inner *webtransport.Session + media *Media + inits map[string]*MediaInit audio *MediaStream video *MediaStream streams invoker.Tasks } -func NewSession(session *webtransport.Session, media *Media) (s *Session, err error) { +func NewSession(connection quic.Connection, session *webtransport.Session, media *Media) (s *Session, err error) { s = new(Session) + s.conn = connection s.inner = session s.media = media return s, nil @@ -34,13 +40,13 @@ func NewSession(session *webtransport.Session, media *Media) (s *Session, err er func (s *Session) Run(ctx context.Context) (err error) { defer s.inner.Close() - s.audio, s.video, err = s.media.Start() + s.inits, s.audio, s.video, err = s.media.Start(s.conn.GetMaxBandwidth) if err != nil { return fmt.Errorf("failed to start media: %w", err) } // Once we've validated the session, now we can start accessing the streams - return invoker.Run(ctx, s.runAccept, s.runAcceptUni, s.runAudio, s.runVideo, s.streams.Repeat) + return invoker.Run(ctx, s.runAccept, s.runAcceptUni, s.runInit, s.runAudio, s.runVideo, s.streams.Repeat) } func (s *Session) runAccept(ctx context.Context) (err error) { @@ -103,6 +109,8 @@ func (s *Session) handleStream(ctx context.Context, stream webtransport.ReceiveS return fmt.Errorf("failed to read atom payload: %w", err) } + log.Println("received message:", string(payload)) + msg := Message{} err = json.Unmarshal(payload, &msg) @@ -110,26 +118,26 @@ func (s *Session) handleStream(ctx context.Context, stream webtransport.ReceiveS return fmt.Errorf("failed to decode json payload: %w", err) } - if msg.Throttle != nil { - s.setThrottle(msg.Throttle) + if msg.Debug != nil { + s.setDebug(msg.Debug) } } } +func (s *Session) runInit(ctx context.Context) (err error) { + for _, init := range s.inits { + err = s.writeInit(ctx, init) + if err != nil { + return fmt.Errorf("failed to write init stream: %w", err) + } + } + + return nil +} + func (s *Session) runAudio(ctx context.Context) (err error) { - init, err := s.audio.Init(ctx) - if err != nil { - return fmt.Errorf("failed to fetch init segment: %w", err) - } - - // NOTE: Assumes a single init segment - err = s.writeInit(ctx, init, 1) - if err != nil { - return fmt.Errorf("failed to write init stream: %w", err) - } - for { - segment, err := s.audio.Segment(ctx) + segment, err := s.audio.Next(ctx) if err != nil { return fmt.Errorf("failed to get next segment: %w", err) } @@ -138,7 +146,7 @@ func (s *Session) runAudio(ctx context.Context) (err error) { return nil } - err = s.writeSegment(ctx, segment, 1) + err = s.writeSegment(ctx, segment) if err != nil { return fmt.Errorf("failed to write segment stream: %w", err) } @@ -146,19 +154,8 @@ func (s *Session) runAudio(ctx context.Context) (err error) { } func (s *Session) runVideo(ctx context.Context) (err error) { - init, err := s.video.Init(ctx) - if err != nil { - return fmt.Errorf("failed to fetch init segment: %w", err) - } - - // NOTE: Assumes a single init segment - err = s.writeInit(ctx, init, 2) - if err != nil { - return fmt.Errorf("failed to write init stream: %w", err) - } - for { - segment, err := s.video.Segment(ctx) + segment, err := s.video.Next(ctx) if err != nil { return fmt.Errorf("failed to get next segment: %w", err) } @@ -167,7 +164,7 @@ func (s *Session) runVideo(ctx context.Context) (err error) { return nil } - err = s.writeSegment(ctx, segment, 2) + err = s.writeSegment(ctx, segment) if err != nil { return fmt.Errorf("failed to write segment stream: %w", err) } @@ -175,7 +172,7 @@ func (s *Session) runVideo(ctx context.Context) (err error) { } // Create a stream for an INIT segment and write the container. -func (s *Session) writeInit(ctx context.Context, init *MediaInit, id int) (err error) { +func (s *Session) writeInit(ctx context.Context, init *MediaInit) (err error) { temp, err := s.inner.OpenUniStreamSync(ctx) if err != nil { return fmt.Errorf("failed to create stream: %w", err) @@ -194,7 +191,7 @@ func (s *Session) writeInit(ctx context.Context, init *MediaInit, id int) (err e stream.SetPriority(math.MaxInt) err = stream.WriteMessage(Message{ - Init: &MessageInit{Id: id}, + Init: &MessageInit{Id: init.ID}, }) if err != nil { return fmt.Errorf("failed to write init header: %w", err) @@ -209,7 +206,7 @@ func (s *Session) writeInit(ctx context.Context, init *MediaInit, id int) (err e } // Create a stream for a segment and write the contents, chunk by chunk. -func (s *Session) writeSegment(ctx context.Context, segment *MediaSegment, init int) (err error) { +func (s *Session) writeSegment(ctx context.Context, segment *MediaSegment) (err error) { temp, err := s.inner.OpenUniStreamSync(ctx) if err != nil { return fmt.Errorf("failed to create stream: %w", err) @@ -232,7 +229,7 @@ func (s *Session) writeSegment(ctx context.Context, segment *MediaSegment, init err = stream.WriteMessage(Message{ Segment: &MessageSegment{ - Init: init, + Init: segment.Init.ID, Timestamp: ms, }, }) @@ -264,6 +261,6 @@ func (s *Session) writeSegment(ctx context.Context, segment *MediaSegment, init return nil } -func (s *Session) setThrottle(msg *MessageThrottle) { - // TODO +func (s *Session) setDebug(msg *MessageDebug) { + s.conn.SetMaxBandwidth(uint64(msg.MaxBitrate)) } diff --git a/server/internal/warp/stream.go b/server/internal/warp/stream.go index bd10399..34ff30c 100644 --- a/server/internal/warp/stream.go +++ b/server/internal/warp/stream.go @@ -123,8 +123,7 @@ func (s *Stream) WriteCancel(code webtransport.ErrorCode) { } func (s *Stream) SetPriority(prio int) { - // TODO - // s.inner.SetPriority(prio) + s.inner.SetPriority(prio) } func (s *Stream) Close() (err error) { diff --git a/server/main.go b/server/main.go index 7f562e9..340f223 100644 --- a/server/main.go +++ b/server/main.go @@ -16,6 +16,8 @@ import ( ) func main() { + invoker.Panic = true + err := run(context.Background()) if err == nil { return @@ -39,7 +41,7 @@ func main() { } func run(ctx context.Context) (err error) { - addr := flag.String("addr", "127.0.0.1:4443", "HTTPS server address") + addr := flag.String("addr", ":4443", "HTTPS server address") cert := flag.String("tls-cert", "../cert/localhost.crt", "TLS certificate file path") key := flag.String("tls-key", "../cert/localhost.key", "TLS certificate file path") logDir := flag.String("log-dir", "", "logs will be written to the provided directory") From 894f26c5af0c2d36a08a7a11154a9ec76bb5d550 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 5 Dec 2022 14:07:31 -0800 Subject: [PATCH 2/4] Add support for switching between tracks to the player. --- player/src/player.ts | 1 - player/src/segment.ts | 6 +- player/src/source.ts | 158 ++++++++++++++++++++++++++++++------------ 3 files changed, 117 insertions(+), 48 deletions(-) diff --git a/player/src/player.ts b/player/src/player.ts index 3e68f0e..5eac5d8 100644 --- a/player/src/player.ts +++ b/player/src/player.ts @@ -242,7 +242,6 @@ export class Player { const segment = new Segment(track.source, init, msg.timestamp) // The track is responsible for flushing the segments in order - track.source.initialize(init) track.add(segment) /* TODO I'm not actually sure why this code doesn't work; something trips up the MP4 parser diff --git a/player/src/segment.ts b/player/src/segment.ts index 3e7a63d..09f7649 100644 --- a/player/src/segment.ts +++ b/player/src/segment.ts @@ -100,7 +100,8 @@ export class Segment { mdat.write(stream); } - this.source.appendBuffer(stream.buffer as ArrayBuffer) + this.source.initialize(this.init) + this.source.append(stream.buffer as ArrayBuffer) return this.done } @@ -109,6 +110,9 @@ export class Segment { finish() { this.done = true this.flush() + + // Trim the buffer to 30s long after each segment. + this.source.trim(30) } // Extend the last sample so it reaches the provided timestamp diff --git a/player/src/source.ts b/player/src/source.ts index 90bdd56..dc299a7 100644 --- a/player/src/source.ts +++ b/player/src/source.ts @@ -4,46 +4,56 @@ import { Init } from "./init" export class Source { sourceBuffer?: SourceBuffer; mediaSource: MediaSource; - queue: Array; - mime: string; + queue: Array; + init?: Init; constructor(mediaSource: MediaSource) { this.mediaSource = mediaSource; this.queue = []; - this.mime = ""; } + // (re)initialize the source using the provided init segment. initialize(init: Init) { - if (!this.sourceBuffer) { - this.sourceBuffer = this.mediaSource.addSourceBuffer(init.info.mime) - this.sourceBuffer.addEventListener('updateend', this.flush.bind(this)) - - // Add the init data to the front of the queue - for (let i = init.raw.length - 1; i >= 0; i -= 1) { - this.queue.unshift(init.raw[i]) - } - - this.flush() - } else if (init.info.mime != this.mime) { - this.sourceBuffer.changeType(init.info.mime) - - // Add the init data to the front of the queue - for (let i = init.raw.length - 1; i >= 0; i -= 1) { - this.queue.unshift(init.raw[i]) + // Check if the init segment is already in the queue. + for (let i = this.queue.length - 1; i >= 0; i--) { + if ((this.queue[i] as SourceInit).init == init) { + // Already queued up. + return } } - this.mime = init.info.mime - } - - appendBuffer(data: Uint8Array | ArrayBuffer) { - if (!this.sourceBuffer || this.sourceBuffer.updating || this.queue.length) { - this.queue.push(data) - } else { - this.sourceBuffer.appendBuffer(data) + // Check if the init segment has already been applied. + if (this.init == init) { + return } + + // Add the init segment to the queue so we call addSourceBuffer or changeType + this.queue.push({ + kind: "init", + init: init, + }) + + for (let i = 0; i < init.raw.length; i += 1) { + this.queue.push({ + kind: "data", + data: init.raw[i], + }) + } + + this.flush() } + // Append the segment data to the buffer. + append(data: Uint8Array | ArrayBuffer) { + this.queue.push({ + kind: "data", + data: data, + }) + + this.flush() + } + + // Return the buffered range. buffered() { if (!this.sourceBuffer) { return { length: 0 } @@ -52,30 +62,86 @@ export class Source { return this.sourceBuffer.buffered } + // Delete any media older than x seconds from the buffer. + trim(duration: number) { + this.queue.push({ + kind: "trim", + trim: duration, + }) + + this.flush() + } + + // Flush any queued instructions flush() { - // Check if we have a mime yet - if (!this.sourceBuffer) { - return - } + while (1) { + // Check if the buffer is currently busy. + if (this.sourceBuffer && this.sourceBuffer.updating) { + break; + } - // Check if the buffer is currently busy. - if (this.sourceBuffer.updating) { - return - } + // Process the next item in the queue. + const next = this.queue.shift() + if (!next) { + break; + } - const data = this.queue.shift() - if (data) { - // If there's data in the queue, flush it. - this.sourceBuffer.appendBuffer(data) - } else if (this.sourceBuffer.buffered.length) { - // Otherwise with no data, trim anything older than 30s. - const end = this.sourceBuffer.buffered.end(this.sourceBuffer.buffered.length - 1) - 30.0 - const start = this.sourceBuffer.buffered.start(0) + switch (next.kind) { + case "init": + this.init = next.init; - // Remove any range larger than 1s. - if (end > start && end - start > 1.0) { - this.sourceBuffer.remove(start, end) + if (!this.sourceBuffer) { + // Create a new source buffer. + this.sourceBuffer = this.mediaSource.addSourceBuffer(this.init.info.mime) + + // Call flush automatically after each update finishes. + this.sourceBuffer.addEventListener('updateend', this.flush.bind(this)) + } else { + this.sourceBuffer.changeType(next.init.info.mime) + } + + break; + case "data": + if (!this.sourceBuffer) { + throw "failed to call initailize before append" + } + + this.sourceBuffer.appendBuffer(next.data) + + break; + case "trim": + if (!this.sourceBuffer) { + throw "failed to call initailize before trim" + } + + const end = this.sourceBuffer.buffered.end(this.sourceBuffer.buffered.length - 1) - next.trim; + const start = this.sourceBuffer.buffered.start(0) + + if (end > start) { + this.sourceBuffer.remove(start, end) + } + + break; + default: + throw "impossible; unknown SourceItem" } } } } + +interface SourceItem {} + +class SourceInit implements SourceItem { + kind!: "init"; + init!: Init; +} + +class SourceData implements SourceItem { + kind!: "data"; + data!: Uint8Array | ArrayBuffer; +} + +class SourceTrim implements SourceItem { + kind!: "trim"; + trim!: number; +} \ No newline at end of file From f1311a3d26604be20df0efa9b9483277e10801e9 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 5 Dec 2022 14:07:58 -0800 Subject: [PATCH 3/4] Upgrade invoker. --- server/go.mod | 2 +- server/go.sum | 6 ++---- server/main.go | 26 ++------------------------ 3 files changed, 5 insertions(+), 29 deletions(-) diff --git a/server/go.mod b/server/go.mod index c1054d2..dcb38e7 100644 --- a/server/go.mod +++ b/server/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/abema/go-mp4 v0.7.2 - github.com/kixelated/invoker v0.10.0 + github.com/kixelated/invoker v1.0.0 github.com/lucas-clemente/quic-go v0.30.0 github.com/marten-seemann/webtransport-go v0.2.0 github.com/zencoder/go-dash/v3 v3.0.2 diff --git a/server/go.sum b/server/go.sum index 0b7d22e..0e33684 100644 --- a/server/go.sum +++ b/server/go.sum @@ -66,10 +66,8 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/kisielk/errcheck v1.4.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/kixelated/invoker v0.9.2 h1:Pz8JDiRs8EzGc4EGVMZ4RYvFh+iQLXGZ4PG2KZyAh/0= -github.com/kixelated/invoker v0.9.2/go.mod h1:RjG3iqm/sKwZjOpcW4SGq+l+4DJCDR/yUtc70VjCRB8= -github.com/kixelated/invoker v0.10.0 h1:M93MfnlR5OjKoQNcM2mvEVwKeA3wolxEeqShCkLbcUE= -github.com/kixelated/invoker v0.10.0/go.mod h1:RjG3iqm/sKwZjOpcW4SGq+l+4DJCDR/yUtc70VjCRB8= +github.com/kixelated/invoker v1.0.0 h1:0wYlvK39yQPbkwIFy+YN41AhF89WOtGyWqV2pZB39xw= +github.com/kixelated/invoker v1.0.0/go.mod h1:RjG3iqm/sKwZjOpcW4SGq+l+4DJCDR/yUtc70VjCRB8= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/server/main.go b/server/main.go index 340f223..39753ba 100644 --- a/server/main.go +++ b/server/main.go @@ -1,43 +1,21 @@ package main import ( - "bufio" "context" "crypto/tls" - "errors" "flag" "fmt" "log" - "os" - "strings" "github.com/kixelated/invoker" "github.com/kixelated/warp-demo/server/internal/warp" ) func main() { - invoker.Panic = true - err := run(context.Background()) - if err == nil { - return + if err != nil { + log.Fatal(err) } - - log.Println(err) - - var errPanic invoker.ErrPanic - - // TODO use an interface - if errors.As(err, &errPanic) { - stack := string(errPanic.Stack()) - - scanner := bufio.NewScanner(strings.NewReader(stack)) - for scanner.Scan() { - log.Println(scanner.Text()) - } - } - - os.Exit(1) } func run(ctx context.Context) (err error) { From 3d5015e73772776be64ba3252073d29d914249b9 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 5 Dec 2022 16:26:35 -0800 Subject: [PATCH 4/4] Update to kixel fork. --- server/go.mod | 10 ++---- server/go.sum | 9 +++-- server/internal/warp/server.go | 63 +++++++++++++++++++-------------- server/internal/warp/session.go | 6 ++-- server/internal/warp/stream.go | 4 +-- 5 files changed, 51 insertions(+), 41 deletions(-) diff --git a/server/go.mod b/server/go.mod index dcb38e7..09af253 100644 --- a/server/go.mod +++ b/server/go.mod @@ -5,8 +5,8 @@ go 1.18 require ( github.com/abema/go-mp4 v0.7.2 github.com/kixelated/invoker v1.0.0 - github.com/lucas-clemente/quic-go v0.30.0 - github.com/marten-seemann/webtransport-go v0.2.0 + github.com/kixelated/quic-go v1.31.0 + github.com/kixelated/webtransport-go v1.4.1 github.com/zencoder/go-dash/v3 v3.0.2 ) @@ -24,11 +24,7 @@ require ( golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect - golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect + golang.org/x/sys v0.1.1-0.20221102194838-fc697a31fa06 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.12 // indirect ) - -replace github.com/lucas-clemente/quic-go => ../../quic-go - -replace github.com/marten-seemann/webtransport-go => ../../webtransport-go diff --git a/server/go.sum b/server/go.sum index 0e33684..fd6ebf9 100644 --- a/server/go.sum +++ b/server/go.sum @@ -68,11 +68,16 @@ github.com/kisielk/errcheck v1.4.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kixelated/invoker v1.0.0 h1:0wYlvK39yQPbkwIFy+YN41AhF89WOtGyWqV2pZB39xw= github.com/kixelated/invoker v1.0.0/go.mod h1:RjG3iqm/sKwZjOpcW4SGq+l+4DJCDR/yUtc70VjCRB8= +github.com/kixelated/quic-go v1.31.0 h1:O3JomeXPnLNSCNpZF415NWOyfpzbFfuvP6dlIDg8VEA= +github.com/kixelated/quic-go v1.31.0/go.mod h1:AO7pURnb8HXHmdalp5e09UxQfsuwseEhl0NLmwiSOFY= +github.com/kixelated/webtransport-go v1.4.1 h1:ZtY3P7hVe1wK5fAt71b+HHnNISFDcQ913v+bvaNATxA= +github.com/kixelated/webtransport-go v1.4.1/go.mod h1:6RV5pTXF7oP53T83bosSDsLdSdw31j5cfpMDqsO4D5k= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lucas-clemente/quic-go v0.31.0 h1:MfNp3fk0wjWRajw6quMFA3ap1AVtlU+2mtwmbVogB2M= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/marten-seemann/qpack v0.3.0 h1:UiWstOgT8+znlkDPOg2+3rIuYXJ2CnGDkGUXN6ki6hE= @@ -198,8 +203,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.1-0.20221102194838-fc697a31fa06 h1:E1pm64FqQa4v8dHd/bAneyMkR4hk8LTJhoSlc5mc1cM= +golang.org/x/sys v0.1.1-0.20221102194838-fc697a31fa06/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/server/internal/warp/server.go b/server/internal/warp/server.go index 3cd1391..1b78d17 100644 --- a/server/internal/warp/server.go +++ b/server/internal/warp/server.go @@ -12,11 +12,11 @@ import ( "path/filepath" "github.com/kixelated/invoker" - "github.com/lucas-clemente/quic-go" - "github.com/lucas-clemente/quic-go/http3" - "github.com/lucas-clemente/quic-go/logging" - "github.com/lucas-clemente/quic-go/qlog" - "github.com/marten-seemann/webtransport-go" + "github.com/kixelated/quic-go" + "github.com/kixelated/quic-go/http3" + "github.com/kixelated/quic-go/logging" + "github.com/kixelated/quic-go/qlog" + "github.com/kixelated/webtransport-go" ) type Server struct { @@ -70,33 +70,22 @@ func NewServer(config ServerConfig, media *Media) (s *Server, err error) { s.media = media mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - session, err := s.inner.Upgrade(w, r) + hijacker, ok := w.(http3.Hijacker) + if !ok { + panic("unable to hijack connection: must use kixelated/quic-go") + } + + conn := hijacker.Connection() + + sess, err := s.inner.Upgrade(w, r) if err != nil { http.Error(w, "failed to upgrade session", 500) return } - defer session.Close() - - hijacker, ok := w.(http3.Hijacker) - if !ok { - log.Printf("unable to hijack connection") - return - } - - conn := hijacker.Connection() - - ss, err := NewSession(conn, session, s.media) + err = s.serve(r.Context(), conn, sess) if err != nil { - log.Printf("failed to create session: %s", err) - return - } - - // Run the session in parallel, logging errors instead of crashing - err = ss.Run(r.Context()) - if err != nil { - log.Printf("terminated session: %s", err) - return + log.Println(err) } }) @@ -116,3 +105,25 @@ func (s *Server) runShutdown(ctx context.Context) (err error) { func (s *Server) Run(ctx context.Context) (err error) { return invoker.Run(ctx, s.runServe, s.runShutdown, s.sessions.Repeat) } + +func (s *Server) serve(ctx context.Context, conn quic.Connection, sess *webtransport.Session) (err error) { + defer func() { + if err != nil { + sess.CloseWithError(1, err.Error()) + } else { + sess.CloseWithError(0, "end of broadcast") + } + }() + + ss, err := NewSession(conn, sess, s.media) + if err != nil { + return fmt.Errorf("failed to create session: %w", err) + } + + err = ss.Run(ctx) + if err != nil { + return fmt.Errorf("terminated session: %w", err) + } + + return nil +} diff --git a/server/internal/warp/session.go b/server/internal/warp/session.go index 6a98b02..dac1cb1 100644 --- a/server/internal/warp/session.go +++ b/server/internal/warp/session.go @@ -12,8 +12,8 @@ import ( "time" "github.com/kixelated/invoker" - "github.com/lucas-clemente/quic-go" - "github.com/marten-seemann/webtransport-go" + "github.com/kixelated/quic-go" + "github.com/kixelated/webtransport-go" ) // A single WebTransport session @@ -38,8 +38,6 @@ func NewSession(connection quic.Connection, session *webtransport.Session, media } func (s *Session) Run(ctx context.Context) (err error) { - defer s.inner.Close() - s.inits, s.audio, s.video, err = s.media.Start(s.conn.GetMaxBandwidth) if err != nil { return fmt.Errorf("failed to start media: %w", err) diff --git a/server/internal/warp/stream.go b/server/internal/warp/stream.go index 34ff30c..2ba56b6 100644 --- a/server/internal/warp/stream.go +++ b/server/internal/warp/stream.go @@ -7,7 +7,7 @@ import ( "fmt" "sync" - "github.com/marten-seemann/webtransport-go" + "github.com/kixelated/webtransport-go" ) // Wrapper around quic.SendStream to make Write non-blocking. @@ -118,7 +118,7 @@ func (s *Stream) WriteMessage(msg Message) (err error) { return nil } -func (s *Stream) WriteCancel(code webtransport.ErrorCode) { +func (s *Stream) WriteCancel(code webtransport.StreamErrorCode) { s.inner.CancelWrite(code) }