From 8f24b845d44333bc35222a38282eaa4909616b7b Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Sat, 22 Feb 2025 19:11:30 +0100 Subject: [PATCH] feat: tracks --- domain/types.go | 1 + mediaserver/actor.go | 39 ++++++++++++++++--- mediaserver/api.go | 42 ++++++++++++++++++++ mediaserver/api_test.go | 68 +++++++++++++++++++++++++++++++-- mediaserver/integration_test.go | 14 ++++++- multiplexer/multiplexer.go | 4 +- terminal/actor.go | 20 +++++++++- 7 files changed, 175 insertions(+), 13 deletions(-) diff --git a/domain/types.go b/domain/types.go index fc1ee33..3bd5f54 100644 --- a/domain/types.go +++ b/domain/types.go @@ -13,6 +13,7 @@ type Source struct { Container Container Live bool Listeners int + Tracks []string RTMPURL string RTMPInternalURL string ExitReason string diff --git a/mediaserver/actor.go b/mediaserver/actor.go index 587fe61..de7f8b2 100644 --- a/mediaserver/actor.go +++ b/mediaserver/actor.go @@ -154,6 +154,14 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e fetchStateT := time.NewTicker(s.fetchIngressStateInterval) defer fetchStateT.Stop() + // fetchTracksT is used to signal that tracks should be fetched from the + // media server, after the stream goes on-air. A short delay is needed due to + // workaround a race condition in the media server. + var fetchTracksT *time.Timer + resetFetchTracksT := func(d time.Duration) { fetchTracksT = time.NewTimer(d) } + resetFetchTracksT(time.Second) + fetchTracksT.Stop() + sendState := func() { s.stateC <- *s.state } for { @@ -173,7 +181,7 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e } if err != nil { - s.logger.Error("Error from container client", "error", err, "id", shortID(s.state.Container.ID)) + s.logger.Error("Error from container client", "err", err, "id", shortID(s.state.Container.ID)) } fetchStateT.Stop() @@ -191,14 +199,29 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e sendState() case <-fetchStateT.C: - ingressState, err := fetchIngressState(s.apiURL(), s.httpClient) + ingressState, err := fetchIngressState(s.rtmpConnsURL(), s.httpClient) if err != nil { - s.logger.Error("Error fetching server state", "error", err) + s.logger.Error("Error fetching server state", "err", err) continue } if ingressState.ready != s.state.Live || ingressState.listeners != s.state.Listeners { s.state.Live = ingressState.ready s.state.Listeners = ingressState.listeners + resetFetchTracksT(time.Second) + sendState() + } + case <-fetchTracksT.C: + if !s.state.Live { + continue + } + + if tracks, err := fetchTracks(s.pathsURL(), s.httpClient); err != nil { + s.logger.Error("Error fetching tracks", "err", err) + resetFetchTracksT(3 * time.Second) + } else if len(tracks) == 0 { + resetFetchTracksT(time.Second) + } else { + s.state.Tracks = tracks sendState() } case action, ok := <-s.actorC: @@ -224,11 +247,17 @@ func (s *Actor) rtmpInternalURL() string { return fmt.Sprintf("rtmp://mediaserver:1935/%s", rtmpPath) } -// apiURL returns the API URL for the media server, accessible from the host. -func (s *Actor) apiURL() string { +// rtmpConnsURL returns the URL for fetching RTMP connections, accessible from +// the host. +func (s *Actor) rtmpConnsURL() string { return fmt.Sprintf("http://localhost:%d/v3/rtmpconns/list", s.apiPort) } +// pathsURL returns the URL for fetching paths, accessible from the host. +func (s *Actor) pathsURL() string { + return fmt.Sprintf("http://localhost:%d/v3/paths/list", s.apiPort) +} + // shortID returns the first 12 characters of the given container ID. func shortID(id string) string { if len(id) < 12 { diff --git a/mediaserver/api.go b/mediaserver/api.go index 1245be7..24b631b 100644 --- a/mediaserver/api.go +++ b/mediaserver/api.go @@ -31,6 +31,7 @@ type ingressStreamState struct { listeners int } +// TODO: handle pagination func fetchIngressState(apiURL string, httpClient httpClient) (state ingressStreamState, _ error) { req, err := http.NewRequest(http.MethodGet, apiURL, nil) if err != nil { @@ -74,3 +75,44 @@ func fetchIngressState(apiURL string, httpClient httpClient) (state ingressStrea return state, nil } + +type path struct { + Name string `json:"name"` + Tracks []string `json:"tracks"` +} + +// TODO: handle pagination +func fetchTracks(apiURL string, httpClient httpClient) ([]string, error) { + req, err := http.NewRequest(http.MethodGet, apiURL, nil) + if err != nil { + return nil, fmt.Errorf("new request: %w", err) + } + + httpResp, err := httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("do request: %w", err) + } + + if httpResp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode) + } + + respBody, err := io.ReadAll(httpResp.Body) + if err != nil { + return nil, fmt.Errorf("read body: %w", err) + } + + var resp apiResponse[path] + if err = json.Unmarshal(respBody, &resp); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) + } + + var tracks []string + for _, path := range resp.Items { + if path.Name == rtmpPath { + tracks = path.Tracks + } + } + + return tracks, nil +} diff --git a/mediaserver/api_test.go b/mediaserver/api_test.go index 058ea9e..6029e51 100644 --- a/mediaserver/api_test.go +++ b/mediaserver/api_test.go @@ -13,7 +13,7 @@ import ( ) func TestFetchIngressState(t *testing.T) { - const URL = "http://localhost:8989/v3/rtmpconns/list" + const url = "http://localhost:8989/v3/rtmpconns/list" testCases := []struct { name string @@ -75,11 +75,11 @@ func TestFetchIngressState(t *testing.T) { httpClient. EXPECT(). Do(mock.MatchedBy(func(req *http.Request) bool { - return req.URL.String() == URL && req.Method == http.MethodGet + return req.URL.String() == url && req.Method == http.MethodGet })). Return(tc.httpResponse, tc.httpError) - state, err := fetchIngressState(URL, &httpClient) + state, err := fetchIngressState(url, &httpClient) if tc.wantErr != nil { require.EqualError(t, err, tc.wantErr.Error()) } else { @@ -89,3 +89,65 @@ func TestFetchIngressState(t *testing.T) { }) } } + +func TestFetchTracks(t *testing.T) { + const url = "http://localhost:8989/v3/paths/list" + + testCases := []struct { + name string + httpResponse *http.Response + httpError error + wantTracks []string + wantErr error + }{ + { + name: "non-200 status", + httpResponse: &http.Response{StatusCode: http.StatusNotFound}, + wantErr: errors.New("unexpected status code: 404"), + }, + { + name: "unparseable response", + httpResponse: &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader([]byte("invalid json"))), + }, + wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"), + }, + { + name: "successful response, no tracks", + httpResponse: &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":[],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))), + }, + wantTracks: []string{}, + }, + { + name: "successful response, tracks", + httpResponse: &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":["H264","MPEG-4 Audio"],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))), + }, + wantTracks: []string{"H264", "MPEG-4 Audio"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var httpClient mocks.HTTPClient + httpClient. + EXPECT(). + Do(mock.MatchedBy(func(req *http.Request) bool { + return req.URL.String() == url && req.Method == http.MethodGet + })). + Return(tc.httpResponse, tc.httpError) + + tracks, err := fetchTracks(url, &httpClient) + if tc.wantErr != nil { + require.EqualError(t, err, tc.wantErr.Error()) + } else { + require.NoError(t, err) + require.Equal(t, tc.wantTracks, tracks) + } + }) + } +} diff --git a/mediaserver/integration_test.go b/mediaserver/integration_test.go index 23cb744..f0ac3de 100644 --- a/mediaserver/integration_test.go +++ b/mediaserver/integration_test.go @@ -36,6 +36,7 @@ func TestIntegrationMediaServerStartStop(t *testing.T) { Logger: logger, }) require.NoError(t, err) + t.Cleanup(func() { mediaServer.Close() }) testhelpers.ChanDiscard(mediaServer.C()) require.Eventually( @@ -66,6 +67,17 @@ func TestIntegrationMediaServerStartStop(t *testing.T) { "actor not healthy and/or in LIVE state", ) + require.Eventually( + t, + func() bool { + currState := mediaServer.State() + return len(currState.Tracks) == 1 && currState.Tracks[0] == "H264" + }, + time.Second*5, + time.Second, + "tracks not updated", + ) + require.Eventually( t, func() bool { @@ -74,7 +86,7 @@ func TestIntegrationMediaServerStartStop(t *testing.T) { }, time.Second*10, time.Second, - "actor not healthy and/or in LIVE state", + "rxRate not updated", ) mediaServer.Close() diff --git a/multiplexer/multiplexer.go b/multiplexer/multiplexer.go index 79642a8..e05d73d 100644 --- a/multiplexer/multiplexer.go +++ b/multiplexer/multiplexer.go @@ -87,7 +87,7 @@ func (a *Actor) ToggleDestination(url string) { if err := a.containerClient.RemoveContainers(a.ctx, labels); err != nil { // TODO: error handling - a.logger.Error("Failed to stop live stream", "url", url, "error", err) + a.logger.Error("Failed to stop live stream", "url", url, "err", err) } delete(a.currURLs, url) @@ -156,7 +156,7 @@ func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, er case err := <-errC: // TODO: error handling if err != nil { - a.logger.Error("Error from container client", "error", err) + a.logger.Error("Error from container client", "err", err) } return } diff --git a/terminal/actor.go b/terminal/actor.go index 441ca3a..ee17809 100644 --- a/terminal/actor.go +++ b/terminal/actor.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "log/slog" + "strings" "git.netflux.io/rob/termstream/domain" "github.com/gdamore/tcell/v2" @@ -14,6 +15,7 @@ import ( type sourceViews struct { url *tview.TextView status *tview.TextView + tracks *tview.TextView health *tview.TextView cpu *tview.TextView mem *tview.TextView @@ -57,13 +59,13 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { sourceView.SetDirection(tview.FlexColumn) sourceView.SetBorder(true) sourceView.SetTitle("Ingress RTMP server") - sidebar.AddItem(sourceView, 8, 0, false) + sidebar.AddItem(sourceView, 9, 0, false) leftCol := tview.NewFlex() leftCol.SetDirection(tview.FlexRow) rightCol := tview.NewFlex() rightCol.SetDirection(tview.FlexRow) - sourceView.AddItem(leftCol, 8, 0, false) + sourceView.AddItem(leftCol, 9, 0, false) sourceView.AddItem(rightCol, 0, 1, false) urlHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerURL) @@ -76,6 +78,11 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { statusTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash) rightCol.AddItem(statusTextView, 1, 0, false) + tracksHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerTracks) + leftCol.AddItem(tracksHeaderTextView, 1, 0, false) + tracksTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash) + rightCol.AddItem(tracksTextView, 1, 0, false) + healthHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerHealth) leftCol.AddItem(healthHeaderTextView, 1, 0, false) healthTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash) @@ -139,6 +146,7 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { sourceViews: sourceViews{ url: urlTextView, status: statusTextView, + tracks: tracksTextView, health: healthTextView, cpu: cpuTextView, mem: memTextView, @@ -244,6 +252,7 @@ const ( headerRx = "Rx Kbps" headerTx = "Tx Kbps" headerAction = "Action" + headerTracks = "Tracks" ) func (a *Actor) redrawFromState(state domain.AppState) { @@ -256,6 +265,13 @@ func (a *Actor) redrawFromState(state domain.AppState) { } a.sourceViews.url.SetText(state.Source.RTMPURL) + + tracks := dash + if state.Source.Live && len(state.Source.Tracks) > 0 { + tracks = strings.Join(state.Source.Tracks, ", ") + } + a.sourceViews.tracks.SetText(tracks) + if state.Source.Live { a.sourceViews.status.SetText("[black:green]receiving") } else if state.Source.Container.State == "running" && state.Source.Container.HealthState == "healthy" {