Compare commits

..

4 Commits

Author SHA1 Message Date
Rob Watson
e49bbb6800 refactor(mediaserver): simplify API interactions
Some checks are pending
ci-build / lint (push) Waiting to run
ci-build / build (push) Blocked by required conditions
ci-build / release (push) Blocked by required conditions
ci-scan / Analyze (go) (push) Waiting to run
ci-scan / Analyze (actions) (push) Waiting to run
2025-04-18 09:51:56 +02:00
Rob Watson
c022c18a7f doc: add CONTRIBUTING.md 2025-04-17 11:28:25 +02:00
Rob Watson
b147da6d9b feat(mediaserver): configurable RTMP host and bind address 2025-04-17 11:15:08 +02:00
Rob Watson
e113d55044 build: fix homebrew tap permissions 2025-04-17 07:29:02 +02:00
15 changed files with 235 additions and 281 deletions

View File

@ -87,3 +87,4 @@ jobs:
args: release --clean args: release --clean
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
HOMEBREW_TOKEN: ${{ secrets.HOMEBREW_TAP_GITHUB_TOKEN }}

View File

@ -33,6 +33,7 @@ brews:
repository: repository:
owner: rfwatson owner: rfwatson
name: homebrew-octoplex name: homebrew-octoplex
token: "{{ .Env.HOMEBREW_TOKEN }}"
install: | install: |
bin.install "octoplex" bin.install "octoplex"
test: | test: |

43
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,43 @@
# Contributing
Thanks for contributing to Octoplex!
## Development
### Mise
Octoplex uses [mise](https://mise.jdx.dev/installing-mise.html) as a task
runner and environment management tool.
Once installed, you can run common development tasks easily:
Command|Shortcut|Description
---|---|---
`mise run test`|`mise run t`|Run unit tests
`mise run test_integration`|`mise run ti`|Run integration tests
`mise run lint`|`mise run l`|Run linter
`mise run format`|`mise run f`|Run formatter
`mise run generate_mocks`|`mise run m`|Re-generate mocks
### Tests
#### Integration tests
The integration tests (mostly in `/internal/app/integration_test.go`) attempt
to exercise the entire app, including launching containers and rendering the
terminal output.
Sometimes they can be flaky. Always ensure there are no stale Docker containers
present from previous runs, and that nothing is listening or attempting to
broadcast to localhost:1935 or localhost:1936.
## Opening a pull request
Pull requests are welcome, but please propose significant changes in a
[discussion](https://github.com/rfwatson/octoplex/discussions) first.
1. Fork the repo
2. Make your changes, including test coverage
3. Push the changes to a branch
4. Ensure the branch is passing
5. Open a pull request

View File

@ -100,6 +100,10 @@ sources:
rtmp: rtmp:
enabled: true # must be true enabled: true # must be true
streamKey: live # defaults to "live" streamKey: live # defaults to "live"
host: rtmp.example.com # defaults to "localhost"
bindAddr: # optional
ip: 0.0.0.0 # defaults to 127.0.0.1
port: 1935 # defaults to 1935
destinations: destinations:
- name: YouTube # Destination name, used only for display - name: YouTube # Destination name, used only for display
url: rtmp://rtmp.youtube.com/12345 # Destination URL with stream key url: rtmp://rtmp.youtube.com/12345 # Destination URL with stream key
@ -108,9 +112,13 @@ destinations:
# other destinations here # other destinations here
``` ```
:warning: It is also possible to add and remove destinations directly from the :information_source: It is also possible to add and remove destinations directly from the
terminal user interface. terminal user interface.
:warning: `sources.rtmp.bindAddr.ip` must be set to a valid IP address if you want
to accept connections from other hosts. Leave it blank to bind only to
localhost (`127.0.0.1`) or use `0.0.0.0` to bind to all network interfaces.
## Contributing ## Contributing
### Bug reports ### Bug reports

View File

@ -89,6 +89,8 @@ func Run(ctx context.Context, params RunParams) error {
updateUI() updateUI()
srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{ srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{
RTMPAddr: domain.NetAddr(cfg.Sources.RTMP.BindAddr),
RTMPHost: cfg.Sources.RTMP.Host,
StreamKey: mediaserver.StreamKey(cfg.Sources.RTMP.StreamKey), StreamKey: mediaserver.StreamKey(cfg.Sources.RTMP.StreamKey),
ContainerClient: containerClient, ContainerClient: containerClient,
Logger: logger.With("component", "mediaserver"), Logger: logger.With("component", "mediaserver"),

View File

@ -30,12 +30,12 @@ import (
) )
func TestIntegration(t *testing.T) { func TestIntegration(t *testing.T) {
t.Run("with default stream key", func(t *testing.T) { t.Run("with default host, port and stream key", func(t *testing.T) {
testIntegration(t, "") testIntegration(t, "", "", 0, "")
}) })
t.Run("with custom stream key", func(t *testing.T) { t.Run("with custom host, port and stream key", func(t *testing.T) {
testIntegration(t, "s0meK3y") testIntegration(t, "localhost", "0.0.0.0", 3000, "s0meK3y")
}) })
} }
@ -45,11 +45,14 @@ func TestIntegration(t *testing.T) {
// https://stackoverflow.com/a/60740997/62871 // https://stackoverflow.com/a/60740997/62871
const hostIP = "172.17.0.1" const hostIP = "172.17.0.1"
func testIntegration(t *testing.T, streamKey string) { func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, streamKey string) {
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
defer cancel() defer cancel()
wantRTMPHost := cmp.Or(rtmpHost, "localhost")
wantRTMPPort := cmp.Or(rtmpPort, 1935)
wantStreamKey := cmp.Or(streamKey, "live") wantStreamKey := cmp.Or(streamKey, "live")
wantRTMPURL := fmt.Sprintf("rtmp://%s:%d/%s", wantRTMPHost, wantRTMPPort, wantStreamKey)
destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{
@ -74,7 +77,13 @@ func testIntegration(t *testing.T, streamKey string) {
destURL1 := fmt.Sprintf("rtmp://%s:%d/%s/dest1", hostIP, destServerPort.Int(), wantStreamKey) destURL1 := fmt.Sprintf("rtmp://%s:%d/%s/dest1", hostIP, destServerPort.Int(), wantStreamKey)
destURL2 := fmt.Sprintf("rtmp://%s:%d/%s/dest2", hostIP, destServerPort.Int(), wantStreamKey) destURL2 := fmt.Sprintf("rtmp://%s:%d/%s/dest2", hostIP, destServerPort.Int(), wantStreamKey)
configService := setupConfigService(t, config.Config{ configService := setupConfigService(t, config.Config{
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true, StreamKey: streamKey}}, Sources: config.Sources{
RTMP: config.RTMPSource{
Enabled: true,
Host: rtmpHost,
BindAddr: config.NetAddr{IP: rtmpIP, Port: rtmpPort},
StreamKey: streamKey,
}},
// Load one destination from config, add the other in-app. // Load one destination from config, add the other in-app.
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}}, Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
}) })
@ -116,7 +125,7 @@ func testIntegration(t *testing.T, streamKey string) {
printScreen(t, getContents, "After starting the mediaserver") printScreen(t, getContents, "After starting the mediaserver")
// Start streaming a test video to the app: // Start streaming a test video to the app:
testhelpers.StreamFLV(t, "rtmp://localhost:1935/"+wantStreamKey) testhelpers.StreamFLV(t, wantRTMPURL)
require.EventuallyWithT( require.EventuallyWithT(
t, t,
@ -124,7 +133,7 @@ func testIntegration(t *testing.T, streamKey string) {
contents := getContents() contents := getContents()
require.True(t, len(contents) > 4, "expected at least 5 lines of output") require.True(t, len(contents) > 4, "expected at least 5 lines of output")
assert.Contains(t, contents[1], "URL rtmp://localhost:1935/"+wantStreamKey, "expected mediaserver status to be receiving") assert.Contains(t, contents[1], "URL "+wantRTMPURL, "expected mediaserver status to be receiving")
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264") assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy") assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
@ -256,6 +265,48 @@ func testIntegration(t *testing.T, streamKey string) {
<-done <-done
} }
func TestIntegrationCustomRTMPURL(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
defer cancel()
logger := testhelpers.NewTestLogger(t).With("component", "integration")
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
require.NoError(t, err)
configService := setupConfigService(t, config.Config{
Sources: config.Sources{
RTMP: config.RTMPSource{
Enabled: true,
Host: "rtmp.live.tv",
},
},
})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
}()
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
assert.True(t, contentsIncludes(getContents(), "URL rtmp://rtmp.live.tv:1935/live"), "expected to see custom host name")
},
5*time.Second,
time.Second,
"expected to see custom host name",
)
printScreen(t, getContents, "Ater displaying the fatal error modal")
cancel()
<-done
}
func TestIntegrationRestartDestination(t *testing.T) { func TestIntegrationRestartDestination(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
defer cancel() defer cancel()

View File

@ -22,10 +22,18 @@ func (l LogFile) GetPath() string {
return cmp.Or(l.Path, l.defaultPath) return cmp.Or(l.Path, l.defaultPath)
} }
// NetAddr holds an IP and/or port.
type NetAddr struct {
IP string `yaml:"ip,omitempty"`
Port int `yaml:"port,omitempty"`
}
// RTMPSource holds the configuration for the RTMP source. // RTMPSource holds the configuration for the RTMP source.
type RTMPSource struct { type RTMPSource struct {
Enabled bool `yaml:"enabled"` Enabled bool `yaml:"enabled"`
StreamKey string `yaml:"streamKey,omitempty"` StreamKey string `yaml:"streamKey,omitempty"`
Host string `yaml:"host,omitempty"`
BindAddr NetAddr `yaml:"bindAddr,omitempty"`
} }
// Sources holds the configuration for the sources. // Sources holds the configuration for the sources.

View File

@ -93,6 +93,11 @@ func TestConfigServiceReadConfig(t *testing.T) {
RTMP: config.RTMPSource{ RTMP: config.RTMPSource{
Enabled: true, Enabled: true,
StreamKey: "s3cr3t", StreamKey: "s3cr3t",
Host: "rtmp.example.com",
BindAddr: config.NetAddr{
IP: "0.0.0.0",
Port: 19350,
},
}, },
}, },
Destinations: []config.Destination{ Destinations: []config.Destination{

View File

@ -6,6 +6,10 @@ sources:
rtmp: rtmp:
enabled: true enabled: true
streamKey: s3cr3t streamKey: s3cr3t
host: rtmp.example.com
bindAddr:
ip: 0.0.0.0
port: 19350
destinations: destinations:
- name: my stream - name: my stream
url: rtmp://rtmp.example.com:1935/live url: rtmp://rtmp.example.com:1935/live

View File

@ -34,7 +34,6 @@ type Source struct {
Container Container Container Container
Live bool Live bool
LiveChangedAt time.Time LiveChangedAt time.Time
Listeners int
Tracks []string Tracks []string
RTMPURL string RTMPURL string
ExitReason string ExitReason string
@ -57,6 +56,12 @@ type Destination struct {
URL string URL string
} }
// NetAddr holds a network address.
type NetAddr struct {
IP string
Port int
}
// Container status strings. // Container status strings.
// //
// TODO: refactor to strictly reflect Docker status strings. // TODO: refactor to strictly reflect Docker status strings.

View File

@ -8,7 +8,6 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"strconv"
"time" "time"
typescontainer "github.com/docker/docker/api/types/container" typescontainer "github.com/docker/docker/api/types/container"
@ -27,14 +26,16 @@ import (
type StreamKey string type StreamKey string
const ( const (
defaultFetchIngressStateInterval = 5 * time.Second // default interval to fetch the state of the media server defaultUpdateStateInterval = 5 * time.Second // default interval to update the state of the media server
defaultAPIPort = 9997 // default API host port for the media server defaultAPIPort = 9997 // default API host port for the media server
defaultRTMPPort = 1935 // default RTMP host port for the media server defaultRTMPIP = "127.0.0.1" // default RTMP host IP, bound to localhost for security
defaultChanSize = 64 // default channel size for asynchronous non-error channels defaultRTMPPort = 1935 // default RTMP host port for the media server
imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx defaultRTMPHost = "localhost" // default RTMP host name, used for the RTMP URL
defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey]. defaultChanSize = 64 // default channel size for asynchronous non-error channels
componentName = "mediaserver" // component name, mostly used for Docker labels imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx
httpClientTimeout = time.Second // timeout for outgoing HTTP client requests defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey].
componentName = "mediaserver" // component name, mostly used for Docker labels
httpClientTimeout = time.Second // timeout for outgoing HTTP client requests
) )
// action is an action to be performed by the actor. // action is an action to be performed by the actor.
@ -42,18 +43,19 @@ type action func()
// Actor is responsible for managing the media server. // Actor is responsible for managing the media server.
type Actor struct { type Actor struct {
actorC chan action actorC chan action
stateC chan domain.Source stateC chan domain.Source
chanSize int chanSize int
containerClient *container.Client containerClient *container.Client
apiPort int apiPort int
rtmpPort int rtmpAddr domain.NetAddr
streamKey StreamKey rtmpHost string
fetchIngressStateInterval time.Duration streamKey StreamKey
pass string // password for the media server updateStateInterval time.Duration
tlsCert, tlsKey []byte // TLS cert and key for the media server pass string // password for the media server
logger *slog.Logger tlsCert, tlsKey []byte // TLS cert and key for the media server
apiClient *http.Client logger *slog.Logger
apiClient *http.Client
// mutable state // mutable state
state *domain.Source state *domain.Source
@ -62,13 +64,14 @@ type Actor struct {
// NewActorParams contains the parameters for building a new media server // NewActorParams contains the parameters for building a new media server
// actor. // actor.
type NewActorParams struct { type NewActorParams struct {
APIPort int // defaults to 9997 APIPort int // defaults to 9997
RTMPPort int // defaults to 1935 RTMPAddr domain.NetAddr // defaults to 127.0.0.1:1935
StreamKey StreamKey // defaults to "live" RTMPHost string // defaults to "localhost"
ChanSize int // defaults to 64 StreamKey StreamKey // defaults to "live"
FetchIngressStateInterval time.Duration // defaults to 5 seconds ChanSize int // defaults to 64
ContainerClient *container.Client UpdateStateInterval time.Duration // defaults to 5 seconds
Logger *slog.Logger ContainerClient *container.Client
Logger *slog.Logger
} }
// NewActor creates a new media server actor. // NewActor creates a new media server actor.
@ -84,30 +87,33 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error)
return nil, fmt.Errorf("build API client: %w", err) return nil, fmt.Errorf("build API client: %w", err)
} }
rtmpAddr := params.RTMPAddr
rtmpAddr.IP = cmp.Or(rtmpAddr.IP, defaultRTMPIP)
rtmpAddr.Port = cmp.Or(rtmpAddr.Port, defaultRTMPPort)
chanSize := cmp.Or(params.ChanSize, defaultChanSize) chanSize := cmp.Or(params.ChanSize, defaultChanSize)
return &Actor{ return &Actor{
apiPort: cmp.Or(params.APIPort, defaultAPIPort), apiPort: cmp.Or(params.APIPort, defaultAPIPort),
rtmpPort: cmp.Or(params.RTMPPort, defaultRTMPPort), rtmpAddr: rtmpAddr,
streamKey: cmp.Or(params.StreamKey, defaultStreamKey), rtmpHost: cmp.Or(params.RTMPHost, defaultRTMPHost),
fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval), streamKey: cmp.Or(params.StreamKey, defaultStreamKey),
tlsCert: tlsCert, updateStateInterval: cmp.Or(params.UpdateStateInterval, defaultUpdateStateInterval),
tlsKey: tlsKey, tlsCert: tlsCert,
pass: generatePassword(), tlsKey: tlsKey,
actorC: make(chan action, chanSize), pass: generatePassword(),
state: new(domain.Source), actorC: make(chan action, chanSize),
stateC: make(chan domain.Source, chanSize), state: new(domain.Source),
chanSize: chanSize, stateC: make(chan domain.Source, chanSize),
containerClient: params.ContainerClient, chanSize: chanSize,
logger: params.Logger, containerClient: params.ContainerClient,
apiClient: apiClient, logger: params.Logger,
apiClient: apiClient,
}, nil }, nil
} }
func (a *Actor) Start(ctx context.Context) error { func (a *Actor) Start(ctx context.Context) error {
// Exposed ports are bound to 127.0.0.1 for security. apiPortSpec := nat.Port(fmt.Sprintf("127.0.0.1:%d:9997", a.apiPort))
// TODO: configurable RTMP bind address rtmpPortSpec := nat.Port(fmt.Sprintf("%s:%d:%d", a.rtmpAddr.IP, a.rtmpAddr.Port, 1935))
apiPortSpec := nat.Port("127.0.0.1:" + strconv.Itoa(a.apiPort) + ":9997")
rtmpPortSpec := nat.Port("127.0.0.1:" + strconv.Itoa(+a.rtmpPort) + ":1935")
exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)}) exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)})
// The RTMP URL is passed to the UI via the state. // The RTMP URL is passed to the UI via the state.
@ -155,6 +161,7 @@ func (a *Actor) Start(ctx context.Context) error {
return fmt.Errorf("marshal config: %w", err) return fmt.Errorf("marshal config: %w", err)
} }
a.logger.Info("Starting media server", "host", a.rtmpHost, "bind_ip", a.rtmpAddr.IP, "bind_port", a.rtmpAddr.Port)
containerStateC, errC := a.containerClient.RunContainer( containerStateC, errC := a.containerClient.RunContainer(
ctx, ctx,
container.RunContainerParams{ container.RunContainerParams{
@ -248,16 +255,8 @@ func (s *Actor) Close() error {
// actorLoop is the main loop of the media server actor. It exits when the // actorLoop is the main loop of the media server actor. It exits when the
// actor is closed, or the parent context is cancelled. // actor is closed, or the parent context is cancelled.
func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Container, errC <-chan error) { func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Container, errC <-chan error) {
fetchStateT := time.NewTicker(s.fetchIngressStateInterval) updateStateT := time.NewTicker(s.updateStateInterval)
defer fetchStateT.Stop() defer updateStateT.Stop()
// fetchTracksT is used to signal that tracks should be fetched from the
// media server, after the stream goes on-air. A short delay is needed due to
// workaround a race condition in the media server.
var fetchTracksT *time.Timer
resetFetchTracksT := func(d time.Duration) { fetchTracksT = time.NewTimer(d) }
resetFetchTracksT(time.Second)
fetchTracksT.Stop()
sendState := func() { s.stateC <- *s.state } sendState := func() { s.stateC <- *s.state }
@ -267,7 +266,7 @@ func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Con
s.state.Container = containerState s.state.Container = containerState
if s.state.Container.Status == domain.ContainerStatusExited { if s.state.Container.Status == domain.ContainerStatusExited {
fetchStateT.Stop() updateStateT.Stop()
s.handleContainerExit(nil) s.handleContainerExit(nil)
} }
@ -286,43 +285,21 @@ func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Con
s.logger.Error("Error from container client", "err", err, "id", shortID(s.state.Container.ID)) s.logger.Error("Error from container client", "err", err, "id", shortID(s.state.Container.ID))
} }
fetchStateT.Stop() updateStateT.Stop()
s.handleContainerExit(err) s.handleContainerExit(err)
sendState() sendState()
case <-fetchStateT.C: case <-updateStateT.C:
ingressState, err := fetchIngressState(s.rtmpConnsURL(), s.streamKey, s.apiClient) path, err := fetchPath(s.pathURL(string(s.streamKey)), s.apiClient)
if err != nil { if err != nil {
s.logger.Error("Error fetching server state", "err", err) s.logger.Error("Error fetching path", "err", err)
continue continue
} }
var shouldSendState bool if path.Ready != s.state.Live {
if ingressState.ready != s.state.Live { s.state.Live = path.Ready
s.state.Live = ingressState.ready
s.state.LiveChangedAt = time.Now() s.state.LiveChangedAt = time.Now()
resetFetchTracksT(time.Second) s.state.Tracks = path.Tracks
shouldSendState = true
}
if ingressState.listeners != s.state.Listeners {
s.state.Listeners = ingressState.listeners
shouldSendState = true
}
if shouldSendState {
sendState()
}
case <-fetchTracksT.C:
if !s.state.Live {
continue
}
if tracks, err := fetchTracks(s.pathsURL(), s.streamKey, s.apiClient); err != nil {
s.logger.Error("Error fetching tracks", "err", err)
resetFetchTracksT(3 * time.Second)
} else if len(tracks) == 0 {
resetFetchTracksT(time.Second)
} else {
s.state.Tracks = tracks
sendState() sendState()
} }
case action, ok := <-s.actorC: case action, ok := <-s.actorC:
@ -352,7 +329,7 @@ func (s *Actor) handleContainerExit(err error) {
// RTMPURL returns the RTMP URL for the media server, accessible from the host. // RTMPURL returns the RTMP URL for the media server, accessible from the host.
func (s *Actor) RTMPURL() string { func (s *Actor) RTMPURL() string {
return fmt.Sprintf("rtmp://localhost:%d/%s", s.rtmpPort, s.streamKey) return fmt.Sprintf("rtmp://%s:%d/%s", s.rtmpHost, s.rtmpAddr.Port, s.streamKey)
} }
// RTMPInternalURL returns the RTMP URL for the media server, accessible from // RTMPInternalURL returns the RTMP URL for the media server, accessible from
@ -362,15 +339,9 @@ func (s *Actor) RTMPInternalURL() string {
return fmt.Sprintf("rtmp://mediaserver:1935/%s?user=api&pass=%s", s.streamKey, s.pass) return fmt.Sprintf("rtmp://mediaserver:1935/%s?user=api&pass=%s", s.streamKey, s.pass)
} }
// rtmpConnsURL returns the URL for fetching RTMP connections, accessible from // pathURL returns the URL for fetching a path, accessible from the host.
// the host. func (s *Actor) pathURL(path string) string {
func (s *Actor) rtmpConnsURL() string { return fmt.Sprintf("https://api:%s@localhost:%d/v3/paths/get/%s", s.pass, s.apiPort, path)
return fmt.Sprintf("https://api:%s@localhost:%d/v3/rtmpconns/list", s.pass, s.apiPort)
}
// pathsURL returns the URL for fetching paths, accessible from the host.
func (s *Actor) pathsURL() string {
return fmt.Sprintf("https://api:%s@localhost:%d/v3/paths/list", s.pass, s.apiPort)
} }
// healthCheckURL returns the URL for the health check, accessible from the // healthCheckURL returns the URL for the health check, accessible from the

View File

@ -8,7 +8,6 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"time"
) )
type httpClient interface { type httpClient interface {
@ -44,109 +43,37 @@ func buildAPIClient(certPEM []byte) (*http.Client, error) {
const userAgent = "octoplex-client" const userAgent = "octoplex-client"
type apiResponse[T any] struct { type apiPath struct {
Items []T `json:"items"`
}
type rtmpConnsResponse struct {
ID string `json:"id"`
CreatedAt time.Time `json:"created"`
State string `json:"state"`
Path string `json:"path"`
BytesReceived int64 `json:"bytesReceived"`
BytesSent int64 `json:"bytesSent"`
RemoteAddr string `json:"remoteAddr"`
}
type ingressStreamState struct {
ready bool
listeners int
}
// TODO: handle pagination
func fetchIngressState(apiURL string, streamKey StreamKey, httpClient httpClient) (state ingressStreamState, _ error) {
req, err := http.NewRequest(http.MethodGet, apiURL, nil)
if err != nil {
return state, fmt.Errorf("new request: %w", err)
}
req.Header.Set("User-Agent", userAgent)
httpResp, err := httpClient.Do(req)
if err != nil {
return state, fmt.Errorf("do request: %w", err)
}
if httpResp.StatusCode != http.StatusOK {
return state, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
}
respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
return state, fmt.Errorf("read body: %w", err)
}
var resp apiResponse[rtmpConnsResponse]
if err = json.Unmarshal(respBody, &resp); err != nil {
return state, fmt.Errorf("unmarshal: %w", err)
}
for _, conn := range resp.Items {
if conn.Path != string(streamKey) {
continue
}
switch conn.State {
case "publish":
// mediamtx may report a stream as being in publish state via the API,
// but still refuse to serve them due to being unpublished. This seems to
// be a bug, this is a hacky workaround.
state.ready = conn.BytesReceived > 20_000
case "read":
state.listeners++
}
}
return state, nil
}
type path struct {
Name string `json:"name"` Name string `json:"name"`
Ready bool `json:"ready"`
Tracks []string `json:"tracks"` Tracks []string `json:"tracks"`
} }
// TODO: handle pagination func fetchPath(apiURL string, httpClient httpClient) (apiPath, error) {
func fetchTracks(apiURL string, streamKey StreamKey, httpClient httpClient) ([]string, error) {
req, err := http.NewRequest(http.MethodGet, apiURL, nil) req, err := http.NewRequest(http.MethodGet, apiURL, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("new request: %w", err) return apiPath{}, fmt.Errorf("new request: %w", err)
} }
req.Header.Set("User-Agent", userAgent) req.Header.Set("User-Agent", userAgent)
httpResp, err := httpClient.Do(req) httpResp, err := httpClient.Do(req)
if err != nil { if err != nil {
return nil, fmt.Errorf("do request: %w", err) return apiPath{}, fmt.Errorf("do request: %w", err)
} }
if httpResp.StatusCode != http.StatusOK { if httpResp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode) return apiPath{}, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode)
} }
respBody, err := io.ReadAll(httpResp.Body) respBody, err := io.ReadAll(httpResp.Body)
if err != nil { if err != nil {
return nil, fmt.Errorf("read body: %w", err) return apiPath{}, fmt.Errorf("read body: %w", err)
} }
var resp apiResponse[path] var path apiPath
if err = json.Unmarshal(respBody, &resp); err != nil { if err = json.Unmarshal(respBody, &path); err != nil {
return nil, fmt.Errorf("unmarshal: %w", err) return apiPath{}, fmt.Errorf("unmarshal: %w", err)
} }
var tracks []string return path, nil
for _, path := range resp.Items {
if path.Name == string(streamKey) {
tracks = path.Tracks
}
}
return tracks, nil
} }

View File

@ -12,14 +12,14 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestFetchIngressState(t *testing.T) { func TestFetchPath(t *testing.T) {
const url = "http://localhost:8989/v3/rtmpconns/list" const url = "http://localhost:8989/v3/paths/get/live"
testCases := []struct { testCases := []struct {
name string name string
httpResponse *http.Response httpResponse *http.Response
httpError error httpError error
wantState ingressStreamState wantPath apiPath
wantErr error wantErr error
}{ }{
{ {
@ -36,36 +36,20 @@ func TestFetchIngressState(t *testing.T) {
wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"), wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"),
}, },
{ {
name: "successful response, no streams", name: "successful response, not ready",
httpResponse: &http.Response{ httpResponse: &http.Response{
StatusCode: http.StatusOK, StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":0,"pageCount":0,"items":[]}`))), Body: io.NopCloser(bytes.NewReader([]byte(`{"name":"live","confName":"live","source":null,"ready":false,"readyTime":null,"tracks":[],"bytesReceived":0,"bytesSent":0,"readers":[]}`))),
}, },
wantState: ingressStreamState{ready: false, listeners: 0}, wantPath: apiPath{Name: "live", Ready: false, Tracks: []string{}},
},
{
name: "successful response, not yet ready",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"id":"d2953cf8-9cd6-4c30-816f-807b80b6a71f","created":"2025-02-15T08:19:00.616220354Z","remoteAddr":"172.17.0.1:32972","state":"publish","path":"live","query":"","bytesReceived":15462,"bytesSent":3467}]}`))),
},
wantState: ingressStreamState{ready: false, listeners: 0},
}, },
{ {
name: "successful response, ready", name: "successful response, ready",
httpResponse: &http.Response{ httpResponse: &http.Response{
StatusCode: http.StatusOK, StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"id":"d2953cf8-9cd6-4c30-816f-807b80b6a71f","created":"2025-02-15T08:19:00.616220354Z","remoteAddr":"172.17.0.1:32972","state":"publish","path":"live","query":"","bytesReceived":27832,"bytesSent":3467}]}`))), Body: io.NopCloser(bytes.NewReader([]byte(`{"name":"live","confName":"live","source":{"type":"rtmpConn","id":"fd2d79a8-bab9-4141-a1b5-55bd1a8649df"},"ready":true,"readyTime":"2025-04-18T07:44:53.683627506Z","tracks":["H264"],"bytesReceived":254677,"bytesSent":0,"readers":[]}`))),
}, },
wantState: ingressStreamState{ready: true, listeners: 0}, wantPath: apiPath{Name: "live", Ready: true, Tracks: []string{"H264"}},
},
{
name: "successful response, ready, with listeners",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":2,"pageCount":1,"items":[{"id":"12668315-0572-41f1-8384-fe7047cc73be","created":"2025-02-15T08:23:43.836589664Z","remoteAddr":"172.17.0.1:40026","state":"publish","path":"live","query":"","bytesReceived":7180753,"bytesSent":3467},{"id":"079370fd-43bb-4798-b079-860cc3159e4e","created":"2025-02-15T08:24:32.396794364Z","remoteAddr":"192.168.48.3:44736","state":"read","path":"live","query":"","bytesReceived":333435,"bytesSent":24243}]}`))),
},
wantState: ingressStreamState{ready: true, listeners: 1},
}, },
} }
@ -79,74 +63,12 @@ func TestFetchIngressState(t *testing.T) {
})). })).
Return(tc.httpResponse, tc.httpError) Return(tc.httpResponse, tc.httpError)
state, err := fetchIngressState(url, StreamKey("live"), &httpClient) path, err := fetchPath(url, &httpClient)
if tc.wantErr != nil { if tc.wantErr != nil {
require.EqualError(t, err, tc.wantErr.Error()) require.EqualError(t, err, tc.wantErr.Error())
} else { } else {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, tc.wantState, state) require.Equal(t, tc.wantPath, path)
}
})
}
}
func TestFetchTracks(t *testing.T) {
const url = "http://localhost:8989/v3/paths/list"
testCases := []struct {
name string
httpResponse *http.Response
httpError error
wantTracks []string
wantErr error
}{
{
name: "non-200 status",
httpResponse: &http.Response{StatusCode: http.StatusNotFound},
wantErr: errors.New("unexpected status code: 404"),
},
{
name: "unparseable response",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte("invalid json"))),
},
wantErr: errors.New("unmarshal: invalid character 'i' looking for beginning of value"),
},
{
name: "successful response, no tracks",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":[],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))),
},
wantTracks: []string{},
},
{
name: "successful response, tracks",
httpResponse: &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader([]byte(`{"itemCount":1,"pageCount":1,"items":[{"name":"live","confName":"all_others","source":{"type":"rtmpConn","id":"287340b2-04c2-4fcc-ab9c-089f4ff15aeb"},"ready":true,"readyTime":"2025-02-22T17:26:05.527206818Z","tracks":["H264","MPEG-4 Audio"],"bytesReceived":94430983,"bytesSent":0,"readers":[]}]}`))),
},
wantTracks: []string{"H264", "MPEG-4 Audio"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var httpClient mocks.HTTPClient
httpClient.
EXPECT().
Do(mock.MatchedBy(func(req *http.Request) bool {
return req.URL.String() == url && req.Method == http.MethodGet
})).
Return(tc.httpResponse, tc.httpError)
tracks, err := fetchTracks(url, StreamKey("live"), &httpClient)
if tc.wantErr != nil {
require.EqualError(t, err, tc.wantErr.Error())
} else {
require.NoError(t, err)
require.Equal(t, tc.wantTracks, tracks)
} }
}) })
} }

View File

@ -696,7 +696,7 @@ func (ui *UI) redrawFromState(state domain.AppState) {
SetSelectable(false) SetSelectable(false)
} }
ui.sourceViews.url.SetText(state.Source.RTMPURL) ui.sourceViews.url.SetText(cmp.Or(state.Source.RTMPURL, dash))
tracks := dash tracks := dash
if state.Source.Live && len(state.Source.Tracks) > 0 { if state.Source.Live && len(state.Source.Tracks) > 0 {

View File

@ -29,6 +29,12 @@ dir = "{{cwd}}"
run = "golangci-lint run" run = "golangci-lint run"
alias = "l" alias = "l"
[tasks.fmt]
description = "Run formatter"
dir = "{{cwd}}"
run = "goimports -w ."
alias = "f"
[tasks.generate_mocks] [tasks.generate_mocks]
description = "Generate mocks" description = "Generate mocks"
dir = "{{cwd}}" dir = "{{cwd}}"