refactor(mediaserver): simplify API interactions
Some checks are pending
ci-build / lint (push) Waiting to run
ci-build / build (push) Blocked by required conditions
ci-build / release (push) Blocked by required conditions
ci-scan / Analyze (go) (push) Waiting to run
ci-scan / Analyze (actions) (push) Waiting to run

This commit is contained in:
Rob Watson 2025-04-18 09:51:50 +02:00
parent c022c18a7f
commit e49bbb6800
4 changed files with 80 additions and 268 deletions

View File

@ -34,7 +34,6 @@ type Source struct {
Container Container
Live bool
LiveChangedAt time.Time
Listeners int
Tracks []string
RTMPURL string
ExitReason string

View File

@ -26,7 +26,7 @@ import (
type StreamKey string
const (
defaultFetchIngressStateInterval = 5 * time.Second // default interval to fetch the state of the media server
defaultUpdateStateInterval = 5 * time.Second // default interval to update the state of the media server
defaultAPIPort = 9997 // default API host port for the media server
defaultRTMPIP = "127.0.0.1" // default RTMP host IP, bound to localhost for security
defaultRTMPPort = 1935 // default RTMP host port for the media server
@ -51,7 +51,7 @@ type Actor struct {
rtmpAddr domain.NetAddr
rtmpHost string
streamKey StreamKey
fetchIngressStateInterval time.Duration
updateStateInterval time.Duration
pass string // password for the media server
tlsCert, tlsKey []byte // TLS cert and key for the media server
logger *slog.Logger
@ -69,7 +69,7 @@ type NewActorParams struct {
RTMPHost string // defaults to "localhost"
StreamKey StreamKey // defaults to "live"
ChanSize int // defaults to 64
FetchIngressStateInterval time.Duration // defaults to 5 seconds
UpdateStateInterval time.Duration // defaults to 5 seconds
ContainerClient *container.Client
Logger *slog.Logger
}
@ -97,7 +97,7 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error)
rtmpAddr: rtmpAddr,
rtmpHost: cmp.Or(params.RTMPHost, defaultRTMPHost),
streamKey: cmp.Or(params.StreamKey, defaultStreamKey),
fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval),
updateStateInterval: cmp.Or(params.UpdateStateInterval, defaultUpdateStateInterval),
tlsCert: tlsCert,
tlsKey: tlsKey,
pass: generatePassword(),
@ -255,16 +255,8 @@ func (s *Actor) Close() error {
// actorLoop is the main loop of the media server actor. It exits when the
// actor is closed, or the parent context is cancelled.
func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Container, errC <-chan error) {
fetchStateT := time.NewTicker(s.fetchIngressStateInterval)
defer fetchStateT.Stop()
// fetchTracksT is used to signal that tracks should be fetched from the
// media server, after the stream goes on-air. A short delay is needed due to
// workaround a race condition in the media server.
var fetchTracksT *time.Timer
resetFetchTracksT := func(d time.Duration) { fetchTracksT = time.NewTimer(d) }
resetFetchTracksT(time.Second)
fetchTracksT.Stop()
updateStateT := time.NewTicker(s.updateStateInterval)
defer updateStateT.Stop()
sendState := func() { s.stateC <- *s.state }
@ -274,7 +266,7 @@ func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Con
s.state.Container = containerState
if s.state.Container.Status == domain.ContainerStatusExited {
fetchStateT.Stop()
updateStateT.Stop()
s.handleContainerExit(nil)
}
@ -293,43 +285,21 @@ func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Con
s.logger.Error("Error from container client", "err", err, "id", shortID(s.state.Container.ID))
}
fetchStateT.Stop()
updateStateT.Stop()
s.handleContainerExit(err)
sendState()
case <-fetchStateT.C:
ingressState, err := fetchIngressState(s.rtmpConnsURL(), s.streamKey, s.apiClient)
case <-updateStateT.C:
path, err := fetchPath(s.pathURL(string(s.streamKey)), s.apiClient)
if err != nil {
s.logger.Error("Error fetching server state", "err", err)
s.logger.Error("Error fetching path", "err", err)
continue
}
var shouldSendState bool
if ingressState.ready != s.state.Live {
s.state.Live = ingressState.ready
if path.Ready != s.state.Live {
s.state.Live = path.Ready
s.state.LiveChangedAt = time.Now()
resetFetchTracksT(time.Second)
shouldSendState = true
}
if ingressState.listeners != s.state.Listeners {
s.state.Listeners = ingressState.listeners
shouldSendState = true
}
if shouldSendState {
sendState()
}
case <-fetchTracksT.C:
if !s.state.Live {
continue
}
if tracks, err := fetchTracks(s.pathsURL(), s.streamKey, s.apiClient); err != nil {
s.logger.Error("Error fetching tracks", "err", err)
resetFetchTracksT(3 * time.Second)
} else if len(tracks) == 0 {
resetFetchTracksT(time.Second)
} else {
s.state.Tracks = tracks
s.state.Tracks = path.Tracks
sendState()
}
case action, ok := <-s.actorC:
@ -369,15 +339,9 @@ func (s *Actor) RTMPInternalURL() string {
return fmt.Sprintf("rtmp://mediaserver:1935/%s?user=api&pass=%s", s.streamKey, s.pass)
}
// rtmpConnsURL returns the URL for fetching RTMP connections, accessible from
// the host.
func (s *Actor) rtmpConnsURL() string {
return fmt.Sprintf("https://api:%s@localhost:%d/v3/rtmpconns/list", s.pass, s.apiPort)
}
// pathsURL returns the URL for fetching paths, accessible from the host.
func (s *Actor) pathsURL() string {
return fmt.Sprintf("https://api:%s@localhost:%d/v3/paths/list", s.pass, s.apiPort)
// pathURL returns the URL for fetching a path, accessible from the host.
func (s *Actor) pathURL(path string) string {
return fmt.Sprintf("https://api:%s@localhost:%d/v3/paths/get/%s", s.pass, s.apiPort, path)
}
// healthCheckURL returns the URL for the health check, accessible from the

View File

@ -8,7 +8,6 @@ import (
"fmt"
"io"
"net/http"
"time"
)
type httpClient interface {
@ -44,109 +43,37 @@ func buildAPIClient(certPEM []byte) (*http.Client, error) {
const userAgent = "octoplex-client"
type apiResponse[T any] struct {
Items []T `json:"items"`
}
type rtmpConnsResponse struct {
ID string `json:"id"`
CreatedAt time.Time `json:"created"`
State string `json:"state"`
Path string `json:"path"`
BytesReceived int64 `json:"bytesReceived"`
BytesSent int64 `json:"bytesSent"`
RemoteAddr string `json:"remoteAddr"`
}
type ingressStreamState struct {
ready bool
listeners int
}
// TODO: handle pagination
func fetchIngressState(apiURL string, streamKey StreamKey, httpClient httpClient) (state ingressStreamState, _ error) {
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
if err != nil {
return state, fmt.Errorf("new request: %w", err)
}
req.Header.Set("User-Agent", userAgent)
httpResp, err := httpClient.Do(req)
if err != nil {
return state, fmt.Errorf("do request: %w", err)
}
if httpResp.StatusCode != http.StatusOK {
return state, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
}
respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
return state, fmt.Errorf("read body: %w", err)
}
var resp apiResponse[rtmpConnsResponse]
if err = json.Unmarshal(respBody, &resp); err != nil {
return state, fmt.Errorf("unmarshal: %w", err)
}
for _, conn := range resp.Items {
if conn.Path != string(streamKey) {
continue
}
switch conn.State {
case "publish":
// mediamtx may report a stream as being in publish state via the API,
// but still refuse to serve them due to being unpublished. This seems to
// be a bug, this is a hacky workaround.
state.ready = conn.BytesReceived > 20_000
case "read":
state.listeners++
}
}
return state, nil
}
type path struct {
type apiPath struct {
Name string `json:"name"`
Ready bool `json:"ready"`
Tracks []string `json:"tracks"`
}
// TODO: handle pagination
func fetchTracks(apiURL string, streamKey StreamKey, httpClient httpClient) ([]string, error) {
func fetchPath(apiURL string, httpClient httpClient) (apiPath, error) {
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
if err != nil {
return nil, fmt.Errorf("new request: %w", err)
return apiPath{}, fmt.Errorf("new request: %w", err)
}
req.Header.Set("User-Agent", userAgent)
httpResp, err := httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("do request: %w", err)
return apiPath{}, fmt.Errorf("do request: %w", err)
}
if httpResp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
return apiPath{}, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
}
respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, fmt.Errorf("read body: %w", err)
return apiPath{}, fmt.Errorf("read body: %w", err)
}
var resp apiResponse[path]
if err = json.Unmarshal(respBody, &resp); err != nil {
return nil, fmt.Errorf("unmarshal: %w", err)
var path apiPath
if err = json.Unmarshal(respBody, &path); err != nil {
return apiPath{}, fmt.Errorf("unmarshal: %w", err)
}
var tracks []string
for _, path := range resp.Items {
if path.Name == string(streamKey) {
tracks = path.Tracks
}
}
return tracks, nil
return path, nil
}

View File

@ -12,14 +12,14 @@ import (
"github.com/stretchr/testify/require"
)
func TestFetchIngressState(t *testing.T) {
const url = "http://localhost:8989/v3/rtmpconns/list"
func TestFetchPath(t *testing.T) {
const url = "http://localhost:8989/v3/paths/get/live"
testCases := []struct {
name string
httpResponse *http.Response
httpError error
wantState ingressStreamState
wantPath apiPath
wantErr error
}{
{
@ -36,36 +36,20 @@ func TestFetchIngressState(t *testing.T) {
wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"),
},
{
name: "successful response, no streams",
name: "successful response, not ready",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":0,"pageCount":0,"items":[]}`))),
Body: io.NopCloser(bytes.NewReader([]byte(`{"name":"live","confName":"live","source":null,"ready":false,"readyTime":null,"tracks":[],"bytesReceived":0,"bytesSent":0,"readers":[]}`))),
},
wantState: ingressStreamState{ready: false, listeners: 0},
},
{
name: "successful response, not yet ready",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"id":"d2953cf8-9cd6-4c30-816f-807b80b6a71f","created":"2025-02-15T08:19:00.616220354Z","remoteAddr":"172.17.0.1:32972","state":"publish","path":"live","query":"","bytesReceived":15462,"bytesSent":3467}]}`))),
},
wantState: ingressStreamState{ready: false, listeners: 0},
wantPath: apiPath{Name: "live", Ready: false, Tracks: []string{}},
},
{
name: "successful response, ready",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"id":"d2953cf8-9cd6-4c30-816f-807b80b6a71f","created":"2025-02-15T08:19:00.616220354Z","remoteAddr":"172.17.0.1:32972","state":"publish","path":"live","query":"","bytesReceived":27832,"bytesSent":3467}]}`))),
Body: io.NopCloser(bytes.NewReader([]byte(`{"name":"live","confName":"live","source":{"type":"rtmpConn","id":"fd2d79a8-bab9-4141-a1b5-55bd1a8649df"},"ready":true,"readyTime":"2025-04-18T07:44:53.683627506Z","tracks":["H264"],"bytesReceived":254677,"bytesSent":0,"readers":[]}`))),
},
wantState: ingressStreamState{ready: true, listeners: 0},
},
{
name: "successful response, ready, with listeners",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":2,"pageCount":1,"items":[{"id":"12668315-0572-41f1-8384-fe7047cc73be","created":"2025-02-15T08:23:43.836589664Z","remoteAddr":"172.17.0.1:40026","state":"publish","path":"live","query":"","bytesReceived":7180753,"bytesSent":3467},{"id":"079370fd-43bb-4798-b079-860cc3159e4e","created":"2025-02-15T08:24:32.396794364Z","remoteAddr":"192.168.48.3:44736","state":"read","path":"live","query":"","bytesReceived":333435,"bytesSent":24243}]}`))),
},
wantState: ingressStreamState{ready: true, listeners: 1},
wantPath: apiPath{Name: "live", Ready: true, Tracks: []string{"H264"}},
},
}
@ -79,74 +63,12 @@ func TestFetchIngressState(t *testing.T) {
})).
Return(tc.httpResponse, tc.httpError)
state, err := fetchIngressState(url, StreamKey("live"), &httpClient)
path, err := fetchPath(url, &httpClient)
if tc.wantErr != nil {
require.EqualError(t, err, tc.wantErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, tc.wantState, state)
}
})
}
}
func TestFetchTracks(t *testing.T) {
const url = "http://localhost:8989/v3/paths/list"
testCases := []struct {
name string
httpResponse *http.Response
httpError error
wantTracks []string
wantErr error
}{
{
name: "non-200 status",
httpResponse: &http.Response{StatusCode: http.StatusNotFound},
wantErr: errors.New("unexpected status code: 404"),
},
{
name: "unparseable response",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte("invalid json"))),
},
wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"),
},
{
name: "successful response, no tracks",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":[],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))),
},
wantTracks: []string{},
},
{
name: "successful response, tracks",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":["H264","MPEG-4 Audio"],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))),
},
wantTracks: []string{"H264", "MPEG-4 Audio"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var httpClient mocks.HTTPClient
httpClient.
EXPECT().
Do(mock.MatchedBy(func(req *http.Request) bool {
return req.URL.String() == url && req.Method == http.MethodGet
})).
Return(tc.httpResponse, tc.httpError)
tracks, err := fetchTracks(url, StreamKey("live"), &httpClient)
if tc.wantErr != nil {
require.EqualError(t, err, tc.wantErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, tc.wantTracks, tracks)
require.Equal(t, tc.wantPath, path)
}
})
}