From e49bbb6800658c7807fdf9ab4f2ad6b1b656d8d5 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Fri, 18 Apr 2025 09:51:50 +0200 Subject: [PATCH] refactor(mediaserver): simplify API interactions --- internal/domain/types.go | 1 - internal/mediaserver/actor.go | 154 ++++++++++++------------------- internal/mediaserver/api.go | 95 +++---------------- internal/mediaserver/api_test.go | 98 ++------------------ 4 files changed, 80 insertions(+), 268 deletions(-) diff --git a/internal/domain/types.go b/internal/domain/types.go index f9181b0..ceaa1e4 100644 --- a/internal/domain/types.go +++ b/internal/domain/types.go @@ -34,7 +34,6 @@ type Source struct { Container Container Live bool LiveChangedAt time.Time - Listeners int Tracks []string RTMPURL string ExitReason string diff --git a/internal/mediaserver/actor.go b/internal/mediaserver/actor.go index 6971b86..6fc584c 100644 --- a/internal/mediaserver/actor.go +++ b/internal/mediaserver/actor.go @@ -26,16 +26,16 @@ import ( type StreamKey string const ( - defaultFetchIngressStateInterval = 5 * time.Second // default interval to fetch the state of the media server - defaultAPIPort = 9997 // default API host port for the media server - defaultRTMPIP = "127.0.0.1" // default RTMP host IP, bound to localhost for security - defaultRTMPPort = 1935 // default RTMP host port for the media server - defaultRTMPHost = "localhost" // default RTMP host name, used for the RTMP URL - defaultChanSize = 64 // default channel size for asynchronous non-error channels - imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx - defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey]. - componentName = "mediaserver" // component name, mostly used for Docker labels - httpClientTimeout = time.Second // timeout for outgoing HTTP client requests + defaultUpdateStateInterval = 5 * time.Second // default interval to update the state of the media server + defaultAPIPort = 9997 // default API host port for the media server + defaultRTMPIP = "127.0.0.1" // default RTMP host IP, bound to localhost for security + defaultRTMPPort = 1935 // default RTMP host port for the media server + defaultRTMPHost = "localhost" // default RTMP host name, used for the RTMP URL + defaultChanSize = 64 // default channel size for asynchronous non-error channels + imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx + defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey]. + componentName = "mediaserver" // component name, mostly used for Docker labels + httpClientTimeout = time.Second // timeout for outgoing HTTP client requests ) // action is an action to be performed by the actor. @@ -43,19 +43,19 @@ type action func() // Actor is responsible for managing the media server. type Actor struct { - actorC chan action - stateC chan domain.Source - chanSize int - containerClient *container.Client - apiPort int - rtmpAddr domain.NetAddr - rtmpHost string - streamKey StreamKey - fetchIngressStateInterval time.Duration - pass string // password for the media server - tlsCert, tlsKey []byte // TLS cert and key for the media server - logger *slog.Logger - apiClient *http.Client + actorC chan action + stateC chan domain.Source + chanSize int + containerClient *container.Client + apiPort int + rtmpAddr domain.NetAddr + rtmpHost string + streamKey StreamKey + updateStateInterval time.Duration + pass string // password for the media server + tlsCert, tlsKey []byte // TLS cert and key for the media server + logger *slog.Logger + apiClient *http.Client // mutable state state *domain.Source @@ -64,14 +64,14 @@ type Actor struct { // NewActorParams contains the parameters for building a new media server // actor. type NewActorParams struct { - APIPort int // defaults to 9997 - RTMPAddr domain.NetAddr // defaults to 127.0.0.1:1935 - RTMPHost string // defaults to "localhost" - StreamKey StreamKey // defaults to "live" - ChanSize int // defaults to 64 - FetchIngressStateInterval time.Duration // defaults to 5 seconds - ContainerClient *container.Client - Logger *slog.Logger + APIPort int // defaults to 9997 + RTMPAddr domain.NetAddr // defaults to 127.0.0.1:1935 + RTMPHost string // defaults to "localhost" + StreamKey StreamKey // defaults to "live" + ChanSize int // defaults to 64 + UpdateStateInterval time.Duration // defaults to 5 seconds + ContainerClient *container.Client + Logger *slog.Logger } // NewActor creates a new media server actor. @@ -93,21 +93,21 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error) chanSize := cmp.Or(params.ChanSize, defaultChanSize) return &Actor{ - apiPort: cmp.Or(params.APIPort, defaultAPIPort), - rtmpAddr: rtmpAddr, - rtmpHost: cmp.Or(params.RTMPHost, defaultRTMPHost), - streamKey: cmp.Or(params.StreamKey, defaultStreamKey), - fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval), - tlsCert: tlsCert, - tlsKey: tlsKey, - pass: generatePassword(), - actorC: make(chan action, chanSize), - state: new(domain.Source), - stateC: make(chan domain.Source, chanSize), - chanSize: chanSize, - containerClient: params.ContainerClient, - logger: params.Logger, - apiClient: apiClient, + apiPort: cmp.Or(params.APIPort, defaultAPIPort), + rtmpAddr: rtmpAddr, + rtmpHost: cmp.Or(params.RTMPHost, defaultRTMPHost), + streamKey: cmp.Or(params.StreamKey, defaultStreamKey), + updateStateInterval: cmp.Or(params.UpdateStateInterval, defaultUpdateStateInterval), + tlsCert: tlsCert, + tlsKey: tlsKey, + pass: generatePassword(), + actorC: make(chan action, chanSize), + state: new(domain.Source), + stateC: make(chan domain.Source, chanSize), + chanSize: chanSize, + containerClient: params.ContainerClient, + logger: params.Logger, + apiClient: apiClient, }, nil } @@ -255,16 +255,8 @@ func (s *Actor) Close() error { // actorLoop is the main loop of the media server actor. It exits when the // actor is closed, or the parent context is cancelled. func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Container, errC <-chan error) { - fetchStateT := time.NewTicker(s.fetchIngressStateInterval) - defer fetchStateT.Stop() - - // fetchTracksT is used to signal that tracks should be fetched from the - // media server, after the stream goes on-air. A short delay is needed due to - // workaround a race condition in the media server. - var fetchTracksT *time.Timer - resetFetchTracksT := func(d time.Duration) { fetchTracksT = time.NewTimer(d) } - resetFetchTracksT(time.Second) - fetchTracksT.Stop() + updateStateT := time.NewTicker(s.updateStateInterval) + defer updateStateT.Stop() sendState := func() { s.stateC <- *s.state } @@ -274,7 +266,7 @@ func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Con s.state.Container = containerState if s.state.Container.Status == domain.ContainerStatusExited { - fetchStateT.Stop() + updateStateT.Stop() s.handleContainerExit(nil) } @@ -293,43 +285,21 @@ func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Con s.logger.Error("Error from container client", "err", err, "id", shortID(s.state.Container.ID)) } - fetchStateT.Stop() + updateStateT.Stop() s.handleContainerExit(err) sendState() - case <-fetchStateT.C: - ingressState, err := fetchIngressState(s.rtmpConnsURL(), s.streamKey, s.apiClient) + case <-updateStateT.C: + path, err := fetchPath(s.pathURL(string(s.streamKey)), s.apiClient) if err != nil { - s.logger.Error("Error fetching server state", "err", err) + s.logger.Error("Error fetching path", "err", err) continue } - var shouldSendState bool - if ingressState.ready != s.state.Live { - s.state.Live = ingressState.ready + if path.Ready != s.state.Live { + s.state.Live = path.Ready s.state.LiveChangedAt = time.Now() - resetFetchTracksT(time.Second) - shouldSendState = true - } - if ingressState.listeners != s.state.Listeners { - s.state.Listeners = ingressState.listeners - shouldSendState = true - } - if shouldSendState { - sendState() - } - case <-fetchTracksT.C: - if !s.state.Live { - continue - } - - if tracks, err := fetchTracks(s.pathsURL(), s.streamKey, s.apiClient); err != nil { - s.logger.Error("Error fetching tracks", "err", err) - resetFetchTracksT(3 * time.Second) - } else if len(tracks) == 0 { - resetFetchTracksT(time.Second) - } else { - s.state.Tracks = tracks + s.state.Tracks = path.Tracks sendState() } case action, ok := <-s.actorC: @@ -369,15 +339,9 @@ func (s *Actor) RTMPInternalURL() string { return fmt.Sprintf("rtmp://mediaserver:1935/%s?user=api&pass=%s", s.streamKey, s.pass) } -// rtmpConnsURL returns the URL for fetching RTMP connections, accessible from -// the host. -func (s *Actor) rtmpConnsURL() string { - return fmt.Sprintf("https://api:%s@localhost:%d/v3/rtmpconns/list", s.pass, s.apiPort) -} - -// pathsURL returns the URL for fetching paths, accessible from the host. -func (s *Actor) pathsURL() string { - return fmt.Sprintf("https://api:%s@localhost:%d/v3/paths/list", s.pass, s.apiPort) +// pathURL returns the URL for fetching a path, accessible from the host. +func (s *Actor) pathURL(path string) string { + return fmt.Sprintf("https://api:%s@localhost:%d/v3/paths/get/%s", s.pass, s.apiPort, path) } // healthCheckURL returns the URL for the health check, accessible from the diff --git a/internal/mediaserver/api.go b/internal/mediaserver/api.go index df7b30e..05dbbb7 100644 --- a/internal/mediaserver/api.go +++ b/internal/mediaserver/api.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "net/http" - "time" ) type httpClient interface { @@ -44,109 +43,37 @@ func buildAPIClient(certPEM []byte) (*http.Client, error) { const userAgent = "octoplex-client" -type apiResponse[T any] struct { - Items []T `json:"items"` -} - -type rtmpConnsResponse struct { - ID string `json:"id"` - CreatedAt time.Time `json:"created"` - State string `json:"state"` - Path string `json:"path"` - BytesReceived int64 `json:"bytesReceived"` - BytesSent int64 `json:"bytesSent"` - RemoteAddr string `json:"remoteAddr"` -} - -type ingressStreamState struct { - ready bool - listeners int -} - -// TODO: handle pagination -func fetchIngressState(apiURL string, streamKey StreamKey, httpClient httpClient) (state ingressStreamState, _ error) { - req, err := http.NewRequest(http.MethodGet, apiURL, nil) - if err != nil { - return state, fmt.Errorf("new request: %w", err) - } - req.Header.Set("User-Agent", userAgent) - - httpResp, err := httpClient.Do(req) - if err != nil { - return state, fmt.Errorf("do request: %w", err) - } - - if httpResp.StatusCode != http.StatusOK { - return state, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode) - } - - respBody, err := io.ReadAll(httpResp.Body) - if err != nil { - return state, fmt.Errorf("read body: %w", err) - } - - var resp apiResponse[rtmpConnsResponse] - if err = json.Unmarshal(respBody, &resp); err != nil { - return state, fmt.Errorf("unmarshal: %w", err) - } - - for _, conn := range resp.Items { - if conn.Path != string(streamKey) { - continue - } - - switch conn.State { - case "publish": - // mediamtx may report a stream as being in publish state via the API, - // but still refuse to serve them due to being unpublished. This seems to - // be a bug, this is a hacky workaround. - state.ready = conn.BytesReceived > 20_000 - case "read": - state.listeners++ - } - } - - return state, nil -} - -type path struct { +type apiPath struct { Name string `json:"name"` + Ready bool `json:"ready"` Tracks []string `json:"tracks"` } -// TODO: handle pagination -func fetchTracks(apiURL string, streamKey StreamKey, httpClient httpClient) ([]string, error) { +func fetchPath(apiURL string, httpClient httpClient) (apiPath, error) { req, err := http.NewRequest(http.MethodGet, apiURL, nil) if err != nil { - return nil, fmt.Errorf("new request: %w", err) + return apiPath{}, fmt.Errorf("new request: %w", err) } req.Header.Set("User-Agent", userAgent) httpResp, err := httpClient.Do(req) if err != nil { - return nil, fmt.Errorf("do request: %w", err) + return apiPath{}, fmt.Errorf("do request: %w", err) } if httpResp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode) + return apiPath{}, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode) } respBody, err := io.ReadAll(httpResp.Body) if err != nil { - return nil, fmt.Errorf("read body: %w", err) + return apiPath{}, fmt.Errorf("read body: %w", err) } - var resp apiResponse[path] - if err = json.Unmarshal(respBody, &resp); err != nil { - return nil, fmt.Errorf("unmarshal: %w", err) + var path apiPath + if err = json.Unmarshal(respBody, &path); err != nil { + return apiPath{}, fmt.Errorf("unmarshal: %w", err) } - var tracks []string - for _, path := range resp.Items { - if path.Name == string(streamKey) { - tracks = path.Tracks - } - } - - return tracks, nil + return path, nil } diff --git a/internal/mediaserver/api_test.go b/internal/mediaserver/api_test.go index 1c0951b..236c14d 100644 --- a/internal/mediaserver/api_test.go +++ b/internal/mediaserver/api_test.go @@ -12,14 +12,14 @@ import ( "github.com/stretchr/testify/require" ) -func TestFetchIngressState(t *testing.T) { - const url = "http://localhost:8989/v3/rtmpconns/list" +func TestFetchPath(t *testing.T) { + const url = "http://localhost:8989/v3/paths/get/live" testCases := []struct { name string httpResponse *http.Response httpError error - wantState ingressStreamState + wantPath apiPath wantErr error }{ { @@ -36,36 +36,20 @@ func TestFetchIngressState(t *testing.T) { wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"), }, { - name: "successful response, no streams", + name: "successful response, not ready", httpResponse: &http.Response{ StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":0,"pageCount":0,"items":[]}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"name":"live","confName":"live","source":null,"ready":false,"readyTime":null,"tracks":[],"bytesReceived":0,"bytesSent":0,"readers":[]}`))), }, - wantState: ingressStreamState{ready: false, listeners: 0}, - }, - { - name: "successful response, not yet ready", - httpResponse: &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"id":"d2953cf8-9cd6-4c30-816f-807b80b6a71f","created":"2025-02-15T08:19:00.616220354Z","remoteAddr":"172.17.0.1:32972","state":"publish","path":"live","query":"","bytesReceived":15462,"bytesSent":3467}]}`))), - }, - wantState: ingressStreamState{ready: false, listeners: 0}, + wantPath: apiPath{Name: "live", Ready: false, Tracks: []string{}}, }, { name: "successful response, ready", httpResponse: &http.Response{ StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"id":"d2953cf8-9cd6-4c30-816f-807b80b6a71f","created":"2025-02-15T08:19:00.616220354Z","remoteAddr":"172.17.0.1:32972","state":"publish","path":"live","query":"","bytesReceived":27832,"bytesSent":3467}]}`))), + Body: io.NopCloser(bytes.NewReader([]byte(`{"name":"live","confName":"live","source":{"type":"rtmpConn","id":"fd2d79a8-bab9-4141-a1b5-55bd1a8649df"},"ready":true,"readyTime":"2025-04-18T07:44:53.683627506Z","tracks":["H264"],"bytesReceived":254677,"bytesSent":0,"readers":[]}`))), }, - wantState: ingressStreamState{ready: true, listeners: 0}, - }, - { - name: "successful response, ready, with listeners", - httpResponse: &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":2,"pageCount":1,"items":[{"id":"12668315-0572-41f1-8384-fe7047cc73be","created":"2025-02-15T08:23:43.836589664Z","remoteAddr":"172.17.0.1:40026","state":"publish","path":"live","query":"","bytesReceived":7180753,"bytesSent":3467},{"id":"079370fd-43bb-4798-b079-860cc3159e4e","created":"2025-02-15T08:24:32.396794364Z","remoteAddr":"192.168.48.3:44736","state":"read","path":"live","query":"","bytesReceived":333435,"bytesSent":24243}]}`))), - }, - wantState: ingressStreamState{ready: true, listeners: 1}, + wantPath: apiPath{Name: "live", Ready: true, Tracks: []string{"H264"}}, }, } @@ -79,74 +63,12 @@ func TestFetchIngressState(t *testing.T) { })). Return(tc.httpResponse, tc.httpError) - state, err := fetchIngressState(url, StreamKey("live"), &httpClient) + path, err := fetchPath(url, &httpClient) if tc.wantErr != nil { require.EqualError(t, err, tc.wantErr.Error()) } else { require.NoError(t, err) - require.Equal(t, tc.wantState, state) - } - }) - } -} - -func TestFetchTracks(t *testing.T) { - const url = "http://localhost:8989/v3/paths/list" - - testCases := []struct { - name string - httpResponse *http.Response - httpError error - wantTracks []string - wantErr error - }{ - { - name: "non-200 status", - httpResponse: &http.Response{StatusCode: http.StatusNotFound}, - wantErr: errors.New("unexpected status code: 404"), - }, - { - name: "unparseable response", - httpResponse: &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewReader([]byte("invalid json"))), - }, - wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"), - }, - { - name: "successful response, no tracks", - httpResponse: &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":[],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))), - }, - wantTracks: []string{}, - }, - { - name: "successful response, tracks", - httpResponse: &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":["H264","MPEG-4 Audio"],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))), - }, - wantTracks: []string{"H264", "MPEG-4 Audio"}, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - var httpClient mocks.HTTPClient - httpClient. - EXPECT(). - Do(mock.MatchedBy(func(req *http.Request) bool { - return req.URL.String() == url && req.Method == http.MethodGet - })). - Return(tc.httpResponse, tc.httpError) - - tracks, err := fetchTracks(url, StreamKey("live"), &httpClient) - if tc.wantErr != nil { - require.EqualError(t, err, tc.wantErr.Error()) - } else { - require.NoError(t, err) - require.Equal(t, tc.wantTracks, tracks) + require.Equal(t, tc.wantPath, path) } }) }