feat: tracks

This commit is contained in:
Rob Watson 2025-02-22 19:11:30 +01:00
parent e984258444
commit 8f24b845d4
7 changed files with 175 additions and 13 deletions

View File

@ -13,6 +13,7 @@ type Source struct {
Container Container
Live bool
Listeners int
Tracks []string
RTMPURL string
RTMPInternalURL string
ExitReason string

View File

@ -154,6 +154,14 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e
fetchStateT := time.NewTicker(s.fetchIngressStateInterval)
defer fetchStateT.Stop()
// fetchTracksT is used to signal that tracks should be fetched from the
// media server, after the stream goes on-air. A short delay is needed due to
// workaround a race condition in the media server.
var fetchTracksT *time.Timer
resetFetchTracksT := func(d time.Duration) { fetchTracksT = time.NewTimer(d) }
resetFetchTracksT(time.Second)
fetchTracksT.Stop()
sendState := func() { s.stateC <- *s.state }
for {
@ -173,7 +181,7 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e
}
if err != nil {
s.logger.Error("Error from container client", "error", err, "id", shortID(s.state.Container.ID))
s.logger.Error("Error from container client", "err", err, "id", shortID(s.state.Container.ID))
}
fetchStateT.Stop()
@ -191,14 +199,29 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e
sendState()
case <-fetchStateT.C:
ingressState, err := fetchIngressState(s.apiURL(), s.httpClient)
ingressState, err := fetchIngressState(s.rtmpConnsURL(), s.httpClient)
if err != nil {
s.logger.Error("Error fetching server state", "error", err)
s.logger.Error("Error fetching server state", "err", err)
continue
}
if ingressState.ready != s.state.Live || ingressState.listeners != s.state.Listeners {
s.state.Live = ingressState.ready
s.state.Listeners = ingressState.listeners
resetFetchTracksT(time.Second)
sendState()
}
case <-fetchTracksT.C:
if !s.state.Live {
continue
}
if tracks, err := fetchTracks(s.pathsURL(), s.httpClient); err != nil {
s.logger.Error("Error fetching tracks", "err", err)
resetFetchTracksT(3 * time.Second)
} else if len(tracks) == 0 {
resetFetchTracksT(time.Second)
} else {
s.state.Tracks = tracks
sendState()
}
case action, ok := <-s.actorC:
@ -224,11 +247,17 @@ func (s *Actor) rtmpInternalURL() string {
return fmt.Sprintf("rtmp://mediaserver:1935/%s", rtmpPath)
}
// apiURL returns the API URL for the media server, accessible from the host.
func (s *Actor) apiURL() string {
// rtmpConnsURL returns the URL for fetching RTMP connections, accessible from
// the host.
func (s *Actor) rtmpConnsURL() string {
return fmt.Sprintf("http://localhost:%d/v3/rtmpconns/list", s.apiPort)
}
// pathsURL returns the URL for fetching paths, accessible from the host.
func (s *Actor) pathsURL() string {
return fmt.Sprintf("http://localhost:%d/v3/paths/list", s.apiPort)
}
// shortID returns the first 12 characters of the given container ID.
func shortID(id string) string {
if len(id) < 12 {

View File

@ -31,6 +31,7 @@ type ingressStreamState struct {
listeners int
}
// TODO: handle pagination
func fetchIngressState(apiURL string, httpClient httpClient) (state ingressStreamState, _ error) {
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
if err != nil {
@ -74,3 +75,44 @@ func fetchIngressState(apiURL string, httpClient httpClient) (state ingressStrea
return state, nil
}
type path struct {
Name string `json:"name"`
Tracks []string `json:"tracks"`
}
// TODO: handle pagination
func fetchTracks(apiURL string, httpClient httpClient) ([]string, error) {
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
if err != nil {
return nil, fmt.Errorf("new request: %w", err)
}
httpResp, err := httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("do request: %w", err)
}
if httpResp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
}
respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, fmt.Errorf("read body: %w", err)
}
var resp apiResponse[path]
if err = json.Unmarshal(respBody, &resp); err != nil {
return nil, fmt.Errorf("unmarshal: %w", err)
}
var tracks []string
for _, path := range resp.Items {
if path.Name == rtmpPath {
tracks = path.Tracks
}
}
return tracks, nil
}

View File

@ -13,7 +13,7 @@ import (
)
func TestFetchIngressState(t *testing.T) {
const URL = "http://localhost:8989/v3/rtmpconns/list"
const url = "http://localhost:8989/v3/rtmpconns/list"
testCases := []struct {
name string
@ -75,11 +75,11 @@ func TestFetchIngressState(t *testing.T) {
httpClient.
EXPECT().
Do(mock.MatchedBy(func(req *http.Request) bool {
return req.URL.String() == URL && req.Method == http.MethodGet
return req.URL.String() == url && req.Method == http.MethodGet
})).
Return(tc.httpResponse, tc.httpError)
state, err := fetchIngressState(URL, &httpClient)
state, err := fetchIngressState(url, &httpClient)
if tc.wantErr != nil {
require.EqualError(t, err, tc.wantErr.Error())
} else {
@ -89,3 +89,65 @@ func TestFetchIngressState(t *testing.T) {
})
}
}
func TestFetchTracks(t *testing.T) {
const url = "http://localhost:8989/v3/paths/list"
testCases := []struct {
name string
httpResponse *http.Response
httpError error
wantTracks []string
wantErr error
}{
{
name: "non-200 status",
httpResponse: &http.Response{StatusCode: http.StatusNotFound},
wantErr: errors.New("unexpected status code: 404"),
},
{
name: "unparseable response",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte("invalid json"))),
},
wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"),
},
{
name: "successful response, no tracks",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":[],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))),
},
wantTracks: []string{},
},
{
name: "successful response, tracks",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":["H264","MPEG-4 Audio"],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))),
},
wantTracks: []string{"H264", "MPEG-4 Audio"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var httpClient mocks.HTTPClient
httpClient.
EXPECT().
Do(mock.MatchedBy(func(req *http.Request) bool {
return req.URL.String() == url && req.Method == http.MethodGet
})).
Return(tc.httpResponse, tc.httpError)
tracks, err := fetchTracks(url, &httpClient)
if tc.wantErr != nil {
require.EqualError(t, err, tc.wantErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, tc.wantTracks, tracks)
}
})
}
}

View File

@ -36,6 +36,7 @@ func TestIntegrationMediaServerStartStop(t *testing.T) {
Logger: logger,
})
require.NoError(t, err)
t.Cleanup(func() { mediaServer.Close() })
testhelpers.ChanDiscard(mediaServer.C())
require.Eventually(
@ -66,6 +67,17 @@ func TestIntegrationMediaServerStartStop(t *testing.T) {
"actor not healthy and/or in LIVE state",
)
require.Eventually(
t,
func() bool {
currState := mediaServer.State()
return len(currState.Tracks) == 1 && currState.Tracks[0] == "H264"
},
time.Second*5,
time.Second,
"tracks not updated",
)
require.Eventually(
t,
func() bool {
@ -74,7 +86,7 @@ func TestIntegrationMediaServerStartStop(t *testing.T) {
},
time.Second*10,
time.Second,
"actor not healthy and/or in LIVE state",
"rxRate not updated",
)
mediaServer.Close()

View File

@ -87,7 +87,7 @@ func (a *Actor) ToggleDestination(url string) {
if err := a.containerClient.RemoveContainers(a.ctx, labels); err != nil {
// TODO: error handling
a.logger.Error("Failed to stop live stream", "url", url, "error", err)
a.logger.Error("Failed to stop live stream", "url", url, "err", err)
}
delete(a.currURLs, url)
@ -156,7 +156,7 @@ func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, er
case err := <-errC:
// TODO: error handling
if err != nil {
a.logger.Error("Error from container client", "error", err)
a.logger.Error("Error from container client", "err", err)
}
return
}

View File

@ -5,6 +5,7 @@ import (
"context"
"fmt"
"log/slog"
"strings"
"git.netflux.io/rob/termstream/domain"
"github.com/gdamore/tcell/v2"
@ -14,6 +15,7 @@ import (
type sourceViews struct {
url *tview.TextView
status *tview.TextView
tracks *tview.TextView
health *tview.TextView
cpu *tview.TextView
mem *tview.TextView
@ -57,13 +59,13 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) {
sourceView.SetDirection(tview.FlexColumn)
sourceView.SetBorder(true)
sourceView.SetTitle("Ingress RTMP server")
sidebar.AddItem(sourceView, 8, 0, false)
sidebar.AddItem(sourceView, 9, 0, false)
leftCol := tview.NewFlex()
leftCol.SetDirection(tview.FlexRow)
rightCol := tview.NewFlex()
rightCol.SetDirection(tview.FlexRow)
sourceView.AddItem(leftCol, 8, 0, false)
sourceView.AddItem(leftCol, 9, 0, false)
sourceView.AddItem(rightCol, 0, 1, false)
urlHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerURL)
@ -76,6 +78,11 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) {
statusTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
rightCol.AddItem(statusTextView, 1, 0, false)
tracksHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerTracks)
leftCol.AddItem(tracksHeaderTextView, 1, 0, false)
tracksTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
rightCol.AddItem(tracksTextView, 1, 0, false)
healthHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerHealth)
leftCol.AddItem(healthHeaderTextView, 1, 0, false)
healthTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash)
@ -139,6 +146,7 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) {
sourceViews: sourceViews{
url: urlTextView,
status: statusTextView,
tracks: tracksTextView,
health: healthTextView,
cpu: cpuTextView,
mem: memTextView,
@ -244,6 +252,7 @@ const (
headerRx = "Rx Kbps"
headerTx = "Tx Kbps"
headerAction = "Action"
headerTracks = "Tracks"
)
func (a *Actor) redrawFromState(state domain.AppState) {
@ -256,6 +265,13 @@ func (a *Actor) redrawFromState(state domain.AppState) {
}
a.sourceViews.url.SetText(state.Source.RTMPURL)
tracks := dash
if state.Source.Live && len(state.Source.Tracks) > 0 {
tracks = strings.Join(state.Source.Tracks, ", ")
}
a.sourceViews.tracks.SetText(tracks)
if state.Source.Live {
a.sourceViews.status.SetText("[black:green]receiving")
} else if state.Source.Container.State == "running" && state.Source.Container.HealthState == "healthy" {