From b147da6d9bd79e1e1513ed460ae49274de621cdf Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Thu, 17 Apr 2025 10:49:23 +0200 Subject: [PATCH] feat(mediaserver): configurable RTMP host and bind address --- README.md | 10 +++- internal/app/app.go | 2 + internal/app/integration_test.go | 67 +++++++++++++++++++++++---- internal/config/config.go | 12 ++++- internal/config/service_test.go | 5 ++ internal/config/testdata/complete.yml | 4 ++ internal/domain/types.go | 6 +++ internal/mediaserver/actor.go | 33 +++++++------ internal/terminal/terminal.go | 2 +- 9 files changed, 116 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 1237e82..5aafd9e 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,10 @@ sources: rtmp: enabled: true # must be true streamKey: live # defaults to "live" + host: rtmp.example.com # defaults to "localhost" + bindAddr: # optional + ip: 0.0.0.0 # defaults to 127.0.0.1 + port: 1935 # defaults to 1935 destinations: - name: YouTube # Destination name, used only for display url: rtmp://rtmp.youtube.com/12345 # Destination URL with stream key @@ -108,9 +112,13 @@ destinations: # other destinations here ``` -:warning: It is also possible to add and remove destinations directly from the +:information_source: It is also possible to add and remove destinations directly from the terminal user interface. +:warning: `sources.rtmp.bindAddr.ip` must be set to a valid IP address if you want +to accept connections from other hosts. Leave it blank to bind only to +localhost (`127.0.0.1`) or use `0.0.0.0` to bind to all network interfaces. + ## Contributing ### Bug reports diff --git a/internal/app/app.go b/internal/app/app.go index 8887d05..e1223ad 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -89,6 +89,8 @@ func Run(ctx context.Context, params RunParams) error { updateUI() srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{ + RTMPAddr: domain.NetAddr(cfg.Sources.RTMP.BindAddr), + RTMPHost: cfg.Sources.RTMP.Host, StreamKey: mediaserver.StreamKey(cfg.Sources.RTMP.StreamKey), ContainerClient: containerClient, Logger: logger.With("component", "mediaserver"), diff --git a/internal/app/integration_test.go b/internal/app/integration_test.go index e0a4f58..c4d0fde 100644 --- a/internal/app/integration_test.go +++ b/internal/app/integration_test.go @@ -30,12 +30,12 @@ import ( ) func TestIntegration(t *testing.T) { - t.Run("with default stream key", func(t *testing.T) { - testIntegration(t, "") + t.Run("with default host, port and stream key", func(t *testing.T) { + testIntegration(t, "", "", 0, "") }) - t.Run("with custom stream key", func(t *testing.T) { - testIntegration(t, "s0meK3y") + t.Run("with custom host, port and stream key", func(t *testing.T) { + testIntegration(t, "localhost", "0.0.0.0", 3000, "s0meK3y") }) } @@ -45,11 +45,14 @@ func TestIntegration(t *testing.T) { // https://stackoverflow.com/a/60740997/62871 const hostIP = "172.17.0.1" -func testIntegration(t *testing.T, streamKey string) { +func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, streamKey string) { ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) defer cancel() + wantRTMPHost := cmp.Or(rtmpHost, "localhost") + wantRTMPPort := cmp.Or(rtmpPort, 1935) wantStreamKey := cmp.Or(streamKey, "live") + wantRTMPURL := fmt.Sprintf("rtmp://%s:%d/%s", wantRTMPHost, wantRTMPPort, wantStreamKey) destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ @@ -74,7 +77,13 @@ func testIntegration(t *testing.T, streamKey string) { destURL1 := fmt.Sprintf("rtmp://%s:%d/%s/dest1", hostIP, destServerPort.Int(), wantStreamKey) destURL2 := fmt.Sprintf("rtmp://%s:%d/%s/dest2", hostIP, destServerPort.Int(), wantStreamKey) configService := setupConfigService(t, config.Config{ - Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true, StreamKey: streamKey}}, + Sources: config.Sources{ + RTMP: config.RTMPSource{ + Enabled: true, + Host: rtmpHost, + BindAddr: config.NetAddr{IP: rtmpIP, Port: rtmpPort}, + StreamKey: streamKey, + }}, // Load one destination from config, add the other in-app. Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}}, }) @@ -116,7 +125,7 @@ func testIntegration(t *testing.T, streamKey string) { printScreen(t, getContents, "After starting the mediaserver") // Start streaming a test video to the app: - testhelpers.StreamFLV(t, "rtmp://localhost:1935/"+wantStreamKey) + testhelpers.StreamFLV(t, wantRTMPURL) require.EventuallyWithT( t, @@ -124,7 +133,7 @@ func testIntegration(t *testing.T, streamKey string) { contents := getContents() require.True(t, len(contents) > 4, "expected at least 5 lines of output") - assert.Contains(t, contents[1], "URL rtmp://localhost:1935/"+wantStreamKey, "expected mediaserver status to be receiving") + assert.Contains(t, contents[1], "URL "+wantRTMPURL, "expected mediaserver status to be receiving") assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264") assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy") @@ -256,6 +265,48 @@ func testIntegration(t *testing.T, streamKey string) { <-done } +func TestIntegrationCustomRTMPURL(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) + defer cancel() + + logger := testhelpers.NewTestLogger(t).With("component", "integration") + dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) + require.NoError(t, err) + + configService := setupConfigService(t, config.Config{ + Sources: config.Sources{ + RTMP: config.RTMPSource{ + Enabled: true, + Host: "rtmp.live.tv", + }, + }, + }) + screen, screenCaptureC, getContents := setupSimulationScreen(t) + + done := make(chan struct{}) + go func() { + defer func() { + done <- struct{}{} + }() + + require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) + }() + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + assert.True(t, contentsIncludes(getContents(), "URL rtmp://rtmp.live.tv:1935/live"), "expected to see custom host name") + }, + 5*time.Second, + time.Second, + "expected to see custom host name", + ) + printScreen(t, getContents, "Ater displaying the fatal error modal") + + cancel() + + <-done +} func TestIntegrationRestartDestination(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) defer cancel() diff --git a/internal/config/config.go b/internal/config/config.go index 84d51a0..535e044 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -22,10 +22,18 @@ func (l LogFile) GetPath() string { return cmp.Or(l.Path, l.defaultPath) } +// NetAddr holds an IP and/or port. +type NetAddr struct { + IP string `yaml:"ip,omitempty"` + Port int `yaml:"port,omitempty"` +} + // RTMPSource holds the configuration for the RTMP source. type RTMPSource struct { - Enabled bool `yaml:"enabled"` - StreamKey string `yaml:"streamKey,omitempty"` + Enabled bool `yaml:"enabled"` + StreamKey string `yaml:"streamKey,omitempty"` + Host string `yaml:"host,omitempty"` + BindAddr NetAddr `yaml:"bindAddr,omitempty"` } // Sources holds the configuration for the sources. diff --git a/internal/config/service_test.go b/internal/config/service_test.go index 91531a5..eb0b09d 100644 --- a/internal/config/service_test.go +++ b/internal/config/service_test.go @@ -93,6 +93,11 @@ func TestConfigServiceReadConfig(t *testing.T) { RTMP: config.RTMPSource{ Enabled: true, StreamKey: "s3cr3t", + Host: "rtmp.example.com", + BindAddr: config.NetAddr{ + IP: "0.0.0.0", + Port: 19350, + }, }, }, Destinations: []config.Destination{ diff --git a/internal/config/testdata/complete.yml b/internal/config/testdata/complete.yml index 4be07e0..e3d7fa8 100644 --- a/internal/config/testdata/complete.yml +++ b/internal/config/testdata/complete.yml @@ -6,6 +6,10 @@ sources: rtmp: enabled: true streamKey: s3cr3t + host: rtmp.example.com + bindAddr: + ip: 0.0.0.0 + port: 19350 destinations: - name: my stream url: rtmp://rtmp.example.com:1935/live diff --git a/internal/domain/types.go b/internal/domain/types.go index 74d4357..f9181b0 100644 --- a/internal/domain/types.go +++ b/internal/domain/types.go @@ -57,6 +57,12 @@ type Destination struct { URL string } +// NetAddr holds a network address. +type NetAddr struct { + IP string + Port int +} + // Container status strings. // // TODO: refactor to strictly reflect Docker status strings. diff --git a/internal/mediaserver/actor.go b/internal/mediaserver/actor.go index 17da238..6971b86 100644 --- a/internal/mediaserver/actor.go +++ b/internal/mediaserver/actor.go @@ -8,7 +8,6 @@ import ( "fmt" "log/slog" "net/http" - "strconv" "time" typescontainer "github.com/docker/docker/api/types/container" @@ -29,7 +28,9 @@ type StreamKey string const ( defaultFetchIngressStateInterval = 5 * time.Second // default interval to fetch the state of the media server defaultAPIPort = 9997 // default API host port for the media server + defaultRTMPIP = "127.0.0.1" // default RTMP host IP, bound to localhost for security defaultRTMPPort = 1935 // default RTMP host port for the media server + defaultRTMPHost = "localhost" // default RTMP host name, used for the RTMP URL defaultChanSize = 64 // default channel size for asynchronous non-error channels imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey]. @@ -47,7 +48,8 @@ type Actor struct { chanSize int containerClient *container.Client apiPort int - rtmpPort int + rtmpAddr domain.NetAddr + rtmpHost string streamKey StreamKey fetchIngressStateInterval time.Duration pass string // password for the media server @@ -62,11 +64,12 @@ type Actor struct { // NewActorParams contains the parameters for building a new media server // actor. type NewActorParams struct { - APIPort int // defaults to 9997 - RTMPPort int // defaults to 1935 - StreamKey StreamKey // defaults to "live" - ChanSize int // defaults to 64 - FetchIngressStateInterval time.Duration // defaults to 5 seconds + APIPort int // defaults to 9997 + RTMPAddr domain.NetAddr // defaults to 127.0.0.1:1935 + RTMPHost string // defaults to "localhost" + StreamKey StreamKey // defaults to "live" + ChanSize int // defaults to 64 + FetchIngressStateInterval time.Duration // defaults to 5 seconds ContainerClient *container.Client Logger *slog.Logger } @@ -84,10 +87,15 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error) return nil, fmt.Errorf("build API client: %w", err) } + rtmpAddr := params.RTMPAddr + rtmpAddr.IP = cmp.Or(rtmpAddr.IP, defaultRTMPIP) + rtmpAddr.Port = cmp.Or(rtmpAddr.Port, defaultRTMPPort) + chanSize := cmp.Or(params.ChanSize, defaultChanSize) return &Actor{ apiPort: cmp.Or(params.APIPort, defaultAPIPort), - rtmpPort: cmp.Or(params.RTMPPort, defaultRTMPPort), + rtmpAddr: rtmpAddr, + rtmpHost: cmp.Or(params.RTMPHost, defaultRTMPHost), streamKey: cmp.Or(params.StreamKey, defaultStreamKey), fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval), tlsCert: tlsCert, @@ -104,10 +112,8 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error) } func (a *Actor) Start(ctx context.Context) error { - // Exposed ports are bound to 127.0.0.1 for security. - // TODO: configurable RTMP bind address - apiPortSpec := nat.Port("127.0.0.1:" + strconv.Itoa(a.apiPort) + ":9997") - rtmpPortSpec := nat.Port("127.0.0.1:" + strconv.Itoa(+a.rtmpPort) + ":1935") + apiPortSpec := nat.Port(fmt.Sprintf("127.0.0.1:%d:9997", a.apiPort)) + rtmpPortSpec := nat.Port(fmt.Sprintf("%s:%d:%d", a.rtmpAddr.IP, a.rtmpAddr.Port, 1935)) exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)}) // The RTMP URL is passed to the UI via the state. @@ -155,6 +161,7 @@ func (a *Actor) Start(ctx context.Context) error { return fmt.Errorf("marshal config: %w", err) } + a.logger.Info("Starting media server", "host", a.rtmpHost, "bind_ip", a.rtmpAddr.IP, "bind_port", a.rtmpAddr.Port) containerStateC, errC := a.containerClient.RunContainer( ctx, container.RunContainerParams{ @@ -352,7 +359,7 @@ func (s *Actor) handleContainerExit(err error) { // RTMPURL returns the RTMP URL for the media server, accessible from the host. func (s *Actor) RTMPURL() string { - return fmt.Sprintf("rtmp://localhost:%d/%s", s.rtmpPort, s.streamKey) + return fmt.Sprintf("rtmp://%s:%d/%s", s.rtmpHost, s.rtmpAddr.Port, s.streamKey) } // RTMPInternalURL returns the RTMP URL for the media server, accessible from diff --git a/internal/terminal/terminal.go b/internal/terminal/terminal.go index e21ab19..00cd8ca 100644 --- a/internal/terminal/terminal.go +++ b/internal/terminal/terminal.go @@ -696,7 +696,7 @@ func (ui *UI) redrawFromState(state domain.AppState) { SetSelectable(false) } - ui.sourceViews.url.SetText(state.Source.RTMPURL) + ui.sourceViews.url.SetText(cmp.Or(state.Source.RTMPURL, dash)) tracks := dash if state.Source.Live && len(state.Source.Tracks) > 0 {