From 98d93ad28606b394ca859ac363bb690abf96790f Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Fri, 18 Apr 2025 22:08:18 +0200 Subject: [PATCH] feat(mediaserver): RTMPS --- README.md | 9 +- internal/app/app.go | 22 +- internal/app/integration_helpers_test.go | 2 + internal/app/integration_test.go | 470 ++++++++++++++--------- internal/config/config.go | 9 +- internal/config/service.go | 9 +- internal/config/service_test.go | 40 +- internal/config/testdata/complete.yml | 7 +- internal/config/testdata/rtmps-only.yml | 9 + internal/domain/types.go | 6 +- internal/domain/types_test.go | 9 + internal/mediaserver/actor.go | 200 +++++++--- internal/mediaserver/config.go | 6 +- internal/terminal/terminal.go | 98 +++-- 14 files changed, 616 insertions(+), 280 deletions(-) create mode 100644 internal/config/testdata/rtmps-only.yml diff --git a/README.md b/README.md index 07d65b4..43ba0e7 100644 --- a/README.md +++ b/README.md @@ -100,9 +100,14 @@ sources: mediaServer: streamKey: live # defaults to "live" host: rtmp.example.com # defaults to "localhost" - rtmp: # must be present, use `rtmp: {}` for defaults - ip: 0.0.0.0 # defaults to 127.0.0.1 + rtmp: + enabled: true # defaults to false + ip: 127.0.0.1 # defaults to 127.0.0.1 port: 1935 # defaults to 1935 + rtmps: + enabled: true # defaults to false + ip: 0.0.0.0 # defaults to 127.0.0.1 + port: 1936 # defaults to 1936 destinations: - name: YouTube # Destination name, used only for display url: rtmp://rtmp.youtube.com/12345 # Destination URL with stream key diff --git a/internal/app/app.go b/internal/app/app.go index 68c9216..f3df7b9 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -38,9 +38,9 @@ func Run(ctx context.Context, params RunParams) error { state := new(domain.AppState) applyConfig(cfg, state) - // While RTMP is the only source, it doesn't make sense to disable it. - if cfg.Sources.MediaServer.RTMP == nil { - return errors.New("config: sources.mediaServer.rtmp is required") + // Ensure there is at least one active source. + if !cfg.Sources.MediaServer.RTMP.Enabled && !cfg.Sources.MediaServer.RTMPS.Enabled { + return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set") } logger := params.Logger @@ -89,7 +89,8 @@ func Run(ctx context.Context, params RunParams) error { updateUI() srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{ - RTMPAddr: domain.NetAddr(cfg.Sources.MediaServer.RTMP.NetAddr), + RTMPAddr: buildNetAddr(cfg.Sources.MediaServer.RTMP), + RTMPSAddr: buildNetAddr(cfg.Sources.MediaServer.RTMPS), Host: cfg.Sources.MediaServer.Host, StreamKey: mediaserver.StreamKey(cfg.Sources.MediaServer.StreamKey), ContainerClient: containerClient, @@ -104,6 +105,10 @@ func Run(ctx context.Context, params RunParams) error { } defer srv.Close() + // Set the RTMP and RTMPS URLs in the UI, which are only known after the + // MediaServer is available. + ui.SetRTMPURLs(srv.RTMPURL(), srv.RTMPSURL()) + repl := replicator.StartActor(ctx, replicator.StartActorParams{ SourceURL: srv.RTMPInternalURL(), ContainerClient: containerClient, @@ -321,3 +326,12 @@ func doStartupCheck(ctx context.Context, containerClient *container.Client, show return ch } + +// buildNetAddr builds a [mediaserver.OptionalNetAddr] from the config. +func buildNetAddr(src config.RTMPSource) mediaserver.OptionalNetAddr { + if !src.Enabled { + return mediaserver.OptionalNetAddr{Enabled: false} + } + + return mediaserver.OptionalNetAddr{Enabled: true, NetAddr: domain.NetAddr(src.NetAddr)} +} diff --git a/internal/app/integration_helpers_test.go b/internal/app/integration_helpers_test.go index 5081fb2..be288b0 100644 --- a/internal/app/integration_helpers_test.go +++ b/internal/app/integration_helpers_test.go @@ -85,6 +85,8 @@ func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- termina lines[y] += string(screenCells[n].Runes[0]) } + require.GreaterOrEqual(t, len(lines), 5, "Screen contents should have at least 5 lines") + return lines } diff --git a/internal/app/integration_test.go b/internal/app/integration_test.go index 3342ec6..ed2d1e4 100644 --- a/internal/app/integration_test.go +++ b/internal/app/integration_test.go @@ -29,13 +29,41 @@ import ( "github.com/testcontainers/testcontainers-go/wait" ) +const waitTime = time.Minute + func TestIntegration(t *testing.T) { - t.Run("with default host, port and stream key", func(t *testing.T) { - testIntegration(t, "", "", 0, "") + t.Run("RTMP with default host, port and stream key", func(t *testing.T) { + testIntegration(t, config.MediaServerSource{ + RTMP: config.RTMPSource{Enabled: true}, + }) }) - t.Run("with custom host, port and stream key", func(t *testing.T) { - testIntegration(t, "localhost", "0.0.0.0", 3000, "s0meK3y") + t.Run("RTMPS with default host, port and stream key", func(t *testing.T) { + testIntegration(t, config.MediaServerSource{ + RTMPS: config.RTMPSource{Enabled: true}, + }) + }) + + t.Run("RTMP with custom host, port and stream key", func(t *testing.T) { + testIntegration(t, config.MediaServerSource{ + StreamKey: "s0meK3y", + Host: "localhost", + RTMP: config.RTMPSource{ + Enabled: true, + NetAddr: config.NetAddr{IP: "0.0.0.0", Port: 3000}, + }, + }) + }) + + t.Run("RTMPS with custom host, port and stream key", func(t *testing.T) { + testIntegration(t, config.MediaServerSource{ + StreamKey: "an0therK3y", + Host: "localhost", + RTMPS: config.RTMPSource{ + Enabled: true, + NetAddr: config.NetAddr{IP: "0.0.0.0", Port: 443}, + }, + }) }) } @@ -45,27 +73,39 @@ func TestIntegration(t *testing.T) { // https://stackoverflow.com/a/60740997/62871 const hostIP = "172.17.0.1" -func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, streamKey string) { +func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) { ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) defer cancel() - wantRTMPHost := cmp.Or(rtmpHost, "localhost") - wantRTMPPort := cmp.Or(rtmpPort, 1935) - wantStreamKey := cmp.Or(streamKey, "live") - wantRTMPURL := fmt.Sprintf("rtmp://%s:%d/%s", wantRTMPHost, wantRTMPPort, wantStreamKey) + var rtmpConfig config.RTMPSource + var proto string + var defaultPort int + if mediaServerConfig.RTMP.Enabled { + rtmpConfig = mediaServerConfig.RTMP + proto = "rtmp://" + defaultPort = 1935 + } else { + rtmpConfig = mediaServerConfig.RTMPS + proto = "rtmps://" + defaultPort = 1936 + } + + wantHost := cmp.Or(mediaServerConfig.Host, "localhost") + wantRTMPPort := cmp.Or(rtmpConfig.Port, defaultPort) + wantStreamKey := cmp.Or(mediaServerConfig.StreamKey, "live") + wantRTMPURL := fmt.Sprintf("%s%s:%d/%s", proto, wantHost, wantRTMPPort, wantStreamKey) destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ Image: "bluenviron/mediamtx:latest", - Env: map[string]string{"MTX_RTMPADDRESS": ":1936"}, - ExposedPorts: []string{"1936/tcp"}, - WaitingFor: wait.ForListeningPort("1936/tcp"), + ExposedPorts: []string{"1935/tcp"}, + WaitingFor: wait.ForListeningPort("1935/tcp"), }, Started: true, }) testcontainers.CleanupContainer(t, destServer) require.NoError(t, err) - destServerPort, err := destServer.MappedPort(ctx, "1936/tcp") + destServerPort, err := destServer.MappedPort(ctx, "1935/tcp") require.NoError(t, err) logger := testhelpers.NewTestLogger(t).With("component", "integration") @@ -74,17 +114,10 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, screen, screenCaptureC, getContents := setupSimulationScreen(t) - destURL1 := fmt.Sprintf("rtmp://%s:%d/%s/dest1", hostIP, destServerPort.Int(), wantStreamKey) - destURL2 := fmt.Sprintf("rtmp://%s:%d/%s/dest2", hostIP, destServerPort.Int(), wantStreamKey) + destURL1 := fmt.Sprintf("rtmp://%s:%d/live/dest1", hostIP, destServerPort.Int()) + destURL2 := fmt.Sprintf("rtmp://%s:%d/live/dest2", hostIP, destServerPort.Int()) configService := setupConfigService(t, config.Config{ - Sources: config.Sources{ - MediaServer: config.MediaServerSource{ - Host: rtmpHost, - StreamKey: streamKey, - RTMP: &config.RTMPSource{ - NetAddr: config.NetAddr{IP: rtmpIP, Port: rtmpPort}, - }}, - }, + Sources: config.Sources{MediaServer: mediaServerConfig}, // Load one destination from config, add the other in-app. Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}}, }) @@ -113,13 +146,12 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 2, "expected at least 3 lines of output") - assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting") + assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting") }, - 2*time.Minute, + waitTime, time.Second, "expected the mediaserver to start", ) @@ -130,16 +162,14 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 4, "expected at least 5 lines of output") - assert.Contains(t, contents[1], "URL "+wantRTMPURL, "expected mediaserver status to be receiving") - assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") - assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264") - assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy") + assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving") + assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264") + assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy") }, - time.Minute, + waitTime, time.Second, "expected to receive an ingress stream", ) @@ -159,22 +189,21 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 4, "expected at least 5 lines of output") - assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") - assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264") - assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy") + assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving") + assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264") + assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy") - require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") - assert.Contains(t, contents[2], "off-air", "expected local server 1 to be off-air") + require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(c, contents[3], "off-air", "expected local server 0 to be off-air") - require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present") - assert.Contains(t, contents[3], "off-air", "expected local server 2 to be off-air") + require.Contains(c, contents[3], "Local server 2", "expected local server 2 to be present") + assert.Contains(c, contents[3], "off-air", "expected local server 2 to be off-air") }, - 2*time.Minute, + waitTime, time.Second, "expected to add the destinations", ) @@ -187,23 +216,22 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 4, "expected at least 5 lines of output") - assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") - assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264") - assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy") + assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving") + assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264") + assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy") - require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") - assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending") - assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy") + require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending") + assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy") - require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present") - assert.Contains(t, contents[3], "sending", "expected local server 2 to be sending") - assert.Contains(t, contents[3], "healthy", "expected local server 2 to be healthy") + require.Contains(c, contents[3], "Local server 2", "expected local server 2 to be present") + assert.Contains(c, contents[3], "sending", "expected local server 2 to be sending") + assert.Contains(c, contents[3], "healthy", "expected local server 2 to be healthy") }, - 2*time.Minute, + waitTime, time.Second, "expected to start the destination streams", ) @@ -215,22 +243,21 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 4, "expected at least 5 lines of output") - assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") - assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264") - assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy") + assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving") + assert.Contains(c, contents[2], "Tracks H264", "expected mediaserver tracks to be H264") + assert.Contains(c, contents[3], "Health healthy", "expected mediaserver to be healthy") - require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") - assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending") - assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy") + require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending") + assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy") - require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present") + require.NotContains(c, contents[3], "Local server 2", "expected local server 2 to not be present") }, - 2*time.Minute, + waitTime, time.Second, "expected to remove the second destination", ) @@ -241,16 +268,15 @@ func testIntegration(t *testing.T, rtmpHost string, rtmpIP string, rtmpPort int, require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 4, "expected at least 5 lines of output") - require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") - assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited") + require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(c, contents[2], "exited", "expected local server 1 to have exited") - require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present") + require.NotContains(c, contents[3], "Local server 2", "expected local server 2 to not be present") }, - time.Minute, + waitTime, time.Second, "expected to stop the first destination stream", ) @@ -278,7 +304,7 @@ func TestIntegrationCustomRTMPURL(t *testing.T) { Sources: config.Sources{ MediaServer: config.MediaServerSource{ Host: "rtmp.live.tv", - RTMP: &config.RTMPSource{}, + RTMP: config.RTMPSource{Enabled: true}, }, }, }) @@ -293,12 +319,15 @@ func TestIntegrationCustomRTMPURL(t *testing.T) { require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) }() + time.Sleep(time.Second) + sendKey(t, screen, tcell.KeyF1, ' ') + require.EventuallyWithT( t, func(t *assert.CollectT) { - assert.True(t, contentsIncludes(getContents(), "URL rtmp://rtmp.live.tv:1935/live"), "expected to see custom host name") + assert.True(t, contentsIncludes(getContents(), "rtmp://rtmp.live.tv:1935/live"), "expected to see custom host name") }, - 5*time.Second, + waitTime, time.Second, "expected to see custom host name", ) @@ -308,6 +337,7 @@ func TestIntegrationCustomRTMPURL(t *testing.T) { <-done } + func TestIntegrationRestartDestination(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) defer cancel() @@ -337,7 +367,7 @@ func TestIntegrationRestartDestination(t *testing.T) { screen, screenCaptureC, getContents := setupSimulationScreen(t) configService := setupConfigService(t, config.Config{ - Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: &config.RTMPSource{}}}, + Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}, Destinations: []config.Destination{{ Name: "Local server 1", URL: fmt.Sprintf("rtmp://%s:%d/live", hostIP, destServerRTMPPort.Int()), @@ -355,13 +385,12 @@ func TestIntegrationRestartDestination(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 2, "expected at least 3 lines of output") - assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting") + assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting") }, - 2*time.Minute, + waitTime, time.Second, "expected the mediaserver to start", ) @@ -372,13 +401,12 @@ func TestIntegrationRestartDestination(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 3, "expected at least 3 lines of output") - assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving") }, - time.Minute, + waitTime, time.Second, "expected to receive an ingress stream", ) @@ -389,17 +417,16 @@ func TestIntegrationRestartDestination(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 4, "expected at least 5 lines of output") - assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving") - require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") - assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending") - assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy") + require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending") + assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy") }, - 2*time.Minute, + waitTime, time.Second, "expected to start the destination stream", ) @@ -412,17 +439,16 @@ func TestIntegrationRestartDestination(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 3, "expected at least 3 lines of output") - assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving") - require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") - assert.Contains(t, contents[2], "off-air", "expected local server 1 to be off-air") - assert.Contains(t, contents[2], "restarting", "expected local server 1 to be restarting") + require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(c, contents[2], "off-air", "expected local server 1 to be off-air") + assert.Contains(c, contents[2], "restarting", "expected local server 1 to be restarting") }, - 20*time.Second, + waitTime, time.Second, "expected to begin restarting", ) @@ -430,17 +456,16 @@ func TestIntegrationRestartDestination(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 4, "expected at least 4 lines of output") - assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving") - require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") - assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending") - assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy") + require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(c, contents[2], "sending", "expected local server 1 to be sending") + assert.Contains(c, contents[2], "healthy", "expected local server 1 to be healthy") }, - 2*time.Minute, + waitTime, time.Second, "expected to restart the destination stream", ) @@ -451,16 +476,15 @@ func TestIntegrationRestartDestination(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 4, "expected at least 4 lines of output") - require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") - assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited") + require.Contains(c, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(c, contents[2], "exited", "expected local server 1 to have exited") - require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present") + require.NotContains(c, contents[3], "Local server 2", "expected local server 2 to not be present") }, - time.Minute, + waitTime, time.Second, "expected to stop the destination stream", ) @@ -483,7 +507,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) { screen, screenCaptureC, getContents := setupSimulationScreen(t) configService := setupConfigService(t, config.Config{ - Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: &config.RTMPSource{}}}, + Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}, Destinations: []config.Destination{{Name: "Example server", URL: "rtmp://rtmp.example.com/live"}}, }) @@ -498,13 +522,12 @@ func TestIntegrationStartDestinationFailed(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 2, "expected at least 3 lines of output") - assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting") + assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting") }, - 2*time.Minute, + waitTime, time.Second, "expected the mediaserver to start", ) @@ -515,13 +538,12 @@ func TestIntegrationStartDestinationFailed(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 3, "expected at least 3 lines of output") - assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + assert.Contains(c, contents[1], "Status receiving", "expected mediaserver status to be receiving") }, - time.Minute, + waitTime, time.Second, "expected to receive an ingress stream", ) @@ -532,12 +554,12 @@ func TestIntegrationStartDestinationFailed(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - assert.True(t, contentsIncludes(contents, "Streaming to Example server failed:"), "expected to see destination error") - assert.True(t, contentsIncludes(contents, "Error opening output files: I/O error"), "expected to see destination error") + assert.True(c, contentsIncludes(contents, "Streaming to Example server failed:"), "expected to see destination error") + assert.True(c, contentsIncludes(contents, "Error opening output files: I/O error"), "expected to see destination error") }, - time.Minute, + waitTime, time.Second, "expected to see the destination start error modal", ) @@ -559,7 +581,7 @@ func TestIntegrationDestinationValidations(t *testing.T) { screen, screenCaptureC, getContents := setupSimulationScreen(t) configService := setupConfigService(t, config.Config{ - Sources: config.Sources{MediaServer: config.MediaServerSource{StreamKey: "live", RTMP: &config.RTMPSource{}}}, + Sources: config.Sources{MediaServer: config.MediaServerSource{StreamKey: "live", RTMP: config.RTMPSource{Enabled: true}}}, }) done := make(chan struct{}) @@ -573,14 +595,14 @@ func TestIntegrationDestinationValidations(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 2, "expected at least 3 lines of output") + require.True(c, len(contents) > 2, "expected at least 3 lines of output") - assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting") - assert.True(t, contentsIncludes(contents, "No destinations added yet. Press [a] to add a new destination."), "expected to see no destinations message") + assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting") + assert.True(c, contentsIncludes(contents, "No destinations added yet. Press [a] to add a new destination."), "expected to see no destinations message") }, - 2*time.Minute, + waitTime, time.Second, "expected the mediaserver to start", ) @@ -594,13 +616,13 @@ func TestIntegrationDestinationValidations(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error") - assert.True(t, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error") + assert.True(c, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error") + assert.True(c, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error") }, - 10*time.Second, + waitTime, time.Second, "expected a validation error for an empty URL", ) @@ -614,13 +636,13 @@ func TestIntegrationDestinationValidations(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error") - assert.True(t, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error") + assert.True(c, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error") + assert.True(c, contentsIncludes(contents, "validate: destination URL must be an RTMP URL"), "expected to see invalid RTMP URL error") }, - 10*time.Second, + waitTime, time.Second, "expected a validation error for an invalid URL", ) @@ -635,15 +657,14 @@ func TestIntegrationDestinationValidations(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 2, "expected at least 3 lines of output") - require.Contains(t, contents[2], "My stream", "expected new destination to be present") - assert.Contains(t, contents[2], "off-air", "expected new destination to be off-air") - assert.False(t, contentsIncludes(contents, "No destinations added yet. Press [a] to add a new destination."), "expected to not see no destinations message") + require.Contains(c, contents[2], "My stream", "expected new destination to be present") + assert.Contains(c, contents[2], "off-air", "expected new destination to be off-air") + assert.False(c, contentsIncludes(contents, "No destinations added yet"), "expected to not see no destinations message") }, - 10*time.Second, + waitTime, time.Second, "expected to add the destination", ) @@ -661,13 +682,13 @@ func TestIntegrationDestinationValidations(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error") - assert.True(t, contentsIncludes(contents, "validate: duplicate destination URL: rtmp://"), "expected to see config update error") + assert.True(c, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error") + assert.True(c, contentsIncludes(contents, "validate: duplicate destination URL: rtmp://"), "expected to see config update error") }, - 10*time.Second, + waitTime, time.Second, "expected a validation error for a duplicate URL", ) @@ -702,7 +723,7 @@ func TestIntegrationStartupCheck(t *testing.T) { dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) require.NoError(t, err) - configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: &config.RTMPSource{}}}}) + configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) screen, screenCaptureC, getContents := setupSimulationScreen(t) done := make(chan struct{}) @@ -716,10 +737,10 @@ func TestIntegrationStartupCheck(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { - assert.True(t, contentsIncludes(getContents(), "Another instance of Octoplex may already be running."), "expected to see startup check modal") + func(c *assert.CollectT) { + assert.True(c, contentsIncludes(getContents(), "Another instance of Octoplex may already be running."), "expected to see startup check modal") }, - 30*time.Second, + waitTime, time.Second, "expected to see startup check modal", ) @@ -729,13 +750,13 @@ func TestIntegrationStartupCheck(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { _, err := staleContainer.State(context.Background()) // IsRunning() does not work, probably because we're undercutting the // testcontainers API. - require.True(t, errdefs.IsNotFound(err), "expected to not find the container") + require.True(c, errdefs.IsNotFound(err), "expected to not find the container") }, - time.Minute, + waitTime, 2*time.Second, "expected to quit the other containers", ) @@ -743,13 +764,13 @@ func TestIntegrationStartupCheck(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { + func(c *assert.CollectT) { contents := getContents() - require.True(t, len(contents) > 2, "expected at least 3 lines of output") + require.True(c, len(contents) > 2, "expected at least 3 lines of output") - assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting") + assert.Contains(c, contents[1], "Status waiting for stream", "expected mediaserver status to be waiting") }, - 10*time.Second, + waitTime, time.Second, "expected the mediaserver to start", ) @@ -771,7 +792,7 @@ func TestIntegrationMediaServerError(t *testing.T) { dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) require.NoError(t, err) - configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: &config.RTMPSource{}}}}) + configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) screen, screenCaptureC, getContents := setupSimulationScreen(t) done := make(chan struct{}) @@ -785,11 +806,11 @@ func TestIntegrationMediaServerError(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { - assert.True(t, contentsIncludes(getContents(), "Mediaserver error: Server process exited unexpectedly."), "expected to see title") - assert.True(t, contentsIncludes(getContents(), "address already in use"), "expected to see message") + func(c *assert.CollectT) { + assert.True(c, contentsIncludes(getContents(), "Mediaserver error: Server process exited unexpectedly."), "expected to see title") + assert.True(c, contentsIncludes(getContents(), "address already in use"), "expected to see message") }, - time.Minute, + waitTime, time.Second, "expected to see media server error modal", ) @@ -810,7 +831,7 @@ func TestIntegrationDockerClientError(t *testing.T) { var dockerClient mocks.DockerClient dockerClient.EXPECT().NetworkCreate(mock.Anything, mock.Anything, mock.Anything).Return(network.CreateResponse{}, errors.New("boom")) - configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: &config.RTMPSource{}}}}) + configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) screen, screenCaptureC, getContents := setupSimulationScreen(t) done := make(chan struct{}) @@ -828,11 +849,11 @@ func TestIntegrationDockerClientError(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { - assert.True(t, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message") - assert.True(t, contentsIncludes(getContents(), "create container client: network create: boom"), "expected to see message") + func(c *assert.CollectT) { + assert.True(c, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message") + assert.True(c, contentsIncludes(getContents(), "create container client: network create: boom"), "expected to see message") }, - 5*time.Second, + waitTime, time.Second, "expected to see fatal error modal", ) @@ -843,6 +864,7 @@ func TestIntegrationDockerClientError(t *testing.T) { <-done } + func TestIntegrationDockerConnectionError(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) defer cancel() @@ -851,7 +873,7 @@ func TestIntegrationDockerConnectionError(t *testing.T) { dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost("http://docker.example.com")) require.NoError(t, err) - configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: &config.RTMPSource{}}}}) + configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) screen, screenCaptureC, getContents := setupSimulationScreen(t) done := make(chan struct{}) @@ -867,11 +889,11 @@ func TestIntegrationDockerConnectionError(t *testing.T) { require.EventuallyWithT( t, - func(t *assert.CollectT) { - assert.True(t, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message") - assert.True(t, contentsIncludes(getContents(), "Could not connect to Docker. Is Docker installed"), "expected to see message") + func(c *assert.CollectT) { + assert.True(c, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message") + assert.True(c, contentsIncludes(getContents(), "Could not connect to Docker. Is Docker installed"), "expected to see message") }, - 5*time.Second, + waitTime, time.Second, "expected to see fatal error modal", ) @@ -882,3 +904,103 @@ func TestIntegrationDockerConnectionError(t *testing.T) { <-done } + +func TestIntegrationCopyURLs(t *testing.T) { + type binding struct { + key tcell.Key + content string + url string + } + + testCases := []struct { + name string + mediaServerConfig config.MediaServerSource + wantBindings []binding + wantNot []string + }{ + { + name: "RTMP only", + mediaServerConfig: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}, + wantBindings: []binding{ + { + key: tcell.KeyF1, + content: "F1 Copy source RTMP URL", + url: "rtmp://localhost:1935/live", + }, + }, + wantNot: []string{"F2"}, + }, + { + name: "RTMPS only", + mediaServerConfig: config.MediaServerSource{RTMPS: config.RTMPSource{Enabled: true}}, + wantBindings: []binding{ + { + key: tcell.KeyF1, + content: "F1 Copy source RTMPS URL", + url: "rtmps://localhost:1936/live", + }, + }, + wantNot: []string{"F2"}, + }, + { + name: "RTMP and RTMPS", + mediaServerConfig: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}, RTMPS: config.RTMPSource{Enabled: true}}, + wantBindings: []binding{ + { + key: tcell.KeyF1, + content: "F1 Copy source RTMP URL", + url: "rtmp://localhost:1935/live", + }, + { + key: tcell.KeyF2, + content: "F2 Copy source RTMPS URL", + url: "rtmps://localhost:1936/live", + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), time.Minute) + defer cancel() + + logger := testhelpers.NewTestLogger(t).With("component", "integration") + dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) + require.NoError(t, err) + + configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: tc.mediaServerConfig}}) + screen, screenCaptureC, getContents := setupSimulationScreen(t) + + done := make(chan struct{}) + go func() { + defer func() { + done <- struct{}{} + }() + + require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) + }() + + time.Sleep(3 * time.Second) + printScreen(t, getContents, "Ater loading the app") + + for _, want := range tc.wantBindings { + assert.True(t, contentsIncludes(getContents(), want.content), "expected to see %q", want) + + sendKey(t, screen, want.key, ' ') + time.Sleep(3 * time.Second) + assert.True(t, contentsIncludes(getContents(), want.url), "expected to see copied message") + + sendKey(t, screen, tcell.KeyEscape, ' ') + } + + for _, wantNot := range tc.wantNot { + assert.False(t, contentsIncludes(getContents(), wantNot), "expected to not see %q", wantNot) + } + + cancel() + + <-done + }) + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 3abbda4..76c1113 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -30,14 +30,17 @@ type NetAddr struct { // RTMPSource holds the configuration for the RTMP source. type RTMPSource struct { + Enabled bool `yaml:"enabled"` + NetAddr `yaml:",inline"` } // MediaServerSource holds the configuration for the media server source. type MediaServerSource struct { - StreamKey string `yaml:"streamKey,omitempty"` - Host string `yaml:"host,omitempty"` - RTMP *RTMPSource `yaml:"rtmp,omitempty"` + StreamKey string `yaml:"streamKey,omitempty"` + Host string `yaml:"host,omitempty"` + RTMP RTMPSource `yaml:"rtmp,omitempty"` + RTMPS RTMPSource `yaml:"rtmps,omitempty"` } // Sources holds the configuration for the sources. diff --git a/internal/config/service.go b/internal/config/service.go index 3fec40c..8233d2b 100644 --- a/internal/config/service.go +++ b/internal/config/service.go @@ -180,10 +180,13 @@ func (s *Service) writeConfig(cfgBytes []byte) error { // populateConfigOnBuild is called to set default values for a new, empty // configuration. // -// This function may set exported fields to arbitrary values. +// This function may set serialized fields to arbitrary values. func (s *Service) populateConfigOnBuild(cfg *Config) { cfg.Sources.MediaServer.StreamKey = "live" - cfg.Sources.MediaServer.RTMP = &RTMPSource{NetAddr{"127.0.0.1", 1935}} + cfg.Sources.MediaServer.RTMP = RTMPSource{ + Enabled: true, + NetAddr: NetAddr{IP: "127.0.0.1", Port: 1935}, + } s.populateConfigOnRead(cfg) } @@ -191,7 +194,7 @@ func (s *Service) populateConfigOnBuild(cfg *Config) { // populateConfigOnRead is called to set default values for a configuration // read from an existing file. // -// This function should not update any exported values, which would be a +// This function should not update any serialized values, which would be a // confusing experience for the user. func (s *Service) populateConfigOnRead(cfg *Config) { cfg.LogFile.defaultPath = filepath.Join(s.appStateDir, "octoplex.log") diff --git a/internal/config/service_test.go b/internal/config/service_test.go index ce3dd12..f124d11 100644 --- a/internal/config/service_test.go +++ b/internal/config/service_test.go @@ -19,6 +19,9 @@ import ( //go:embed testdata/complete.yml var configComplete []byte +//go:embed testdata/rtmps-only.yml +var configRTMPSOnly []byte + //go:embed testdata/logfile.yml var configLogfile []byte @@ -97,12 +100,20 @@ func TestConfigServiceReadConfig(t *testing.T) { MediaServer: config.MediaServerSource{ StreamKey: "s3cr3t", Host: "rtmp.example.com", - RTMP: &config.RTMPSource{ + RTMP: config.RTMPSource{ + Enabled: true, NetAddr: config.NetAddr{ IP: "0.0.0.0", Port: 19350, }, }, + RTMPS: config.RTMPSource{ + Enabled: true, + NetAddr: config.NetAddr{ + IP: "0.0.0.0", + Port: 19443, + }, + }, }, }, Destinations: []config.Destination{ @@ -118,6 +129,33 @@ func TestConfigServiceReadConfig(t *testing.T) { ) }, }, + { + name: "RTMPS only", + configBytes: configRTMPSOnly, + want: func(t *testing.T, cfg config.Config) { + require.Empty( + t, + gocmp.Diff( + config.Config{ + LogFile: config.LogFile{Enabled: true}, + Sources: config.Sources{ + MediaServer: config.MediaServerSource{ + RTMPS: config.RTMPSource{ + Enabled: true, + NetAddr: config.NetAddr{ + IP: "0.0.0.0", + Port: 1935, + }, + }, + }, + }, + }, + cfg, + cmpopts.IgnoreUnexported(config.LogFile{}), + ), + ) + }, + }, { name: "logging enabled, logfile", configBytes: configLogfile, diff --git a/internal/config/testdata/complete.yml b/internal/config/testdata/complete.yml index 04c77c1..3a98d50 100644 --- a/internal/config/testdata/complete.yml +++ b/internal/config/testdata/complete.yml @@ -7,8 +7,13 @@ sources: streamKey: s3cr3t host: rtmp.example.com rtmp: + enabled: true ip: 0.0.0.0 - port: 19350 + port: 19350 + rtmps: + enabled: true + ip: 0.0.0.0 + port: 19443 destinations: - name: my stream url: rtmp://rtmp.example.com:1935/live diff --git a/internal/config/testdata/rtmps-only.yml b/internal/config/testdata/rtmps-only.yml new file mode 100644 index 0000000..efa459e --- /dev/null +++ b/internal/config/testdata/rtmps-only.yml @@ -0,0 +1,9 @@ +--- +logfile: + enabled: true +sources: + mediaServer: + rtmps: + enabled: true + ip: 0.0.0.0 + port: 1935 diff --git a/internal/domain/types.go b/internal/domain/types.go index ceaa1e4..0bdc66a 100644 --- a/internal/domain/types.go +++ b/internal/domain/types.go @@ -35,7 +35,6 @@ type Source struct { Live bool LiveChangedAt time.Time Tracks []string - RTMPURL string ExitReason string } @@ -62,6 +61,11 @@ type NetAddr struct { Port int } +// IsZero returns true if the NetAddr is zero value. +func (n NetAddr) IsZero() bool { + return n.IP == "" && n.Port == 0 +} + // Container status strings. // // TODO: refactor to strictly reflect Docker status strings. diff --git a/internal/domain/types_test.go b/internal/domain/types_test.go index e5838cc..2e2323c 100644 --- a/internal/domain/types_test.go +++ b/internal/domain/types_test.go @@ -31,3 +31,12 @@ func TestAppStateClone(t *testing.T) { s.Destinations[0].Name = "Twitch" assert.Equal(t, "YouTube", s2.Destinations[0].Name) } + +func TestNetAddr(t *testing.T) { + var addr domain.NetAddr + assert.True(t, addr.IsZero()) + + addr.IP = "127.0.0.1" + addr.Port = 3000 + assert.False(t, addr.IsZero()) +} diff --git a/internal/mediaserver/actor.go b/internal/mediaserver/actor.go index b2c883d..810bcdd 100644 --- a/internal/mediaserver/actor.go +++ b/internal/mediaserver/actor.go @@ -30,7 +30,8 @@ const ( defaultAPIPort = 9997 // default API host port for the media server defaultRTMPIP = "127.0.0.1" // default RTMP host IP, bound to localhost for security defaultRTMPPort = 1935 // default RTMP host port for the media server - defaultHost = "localhost" // default RTMP host name, used for the RTMP URL + defaultRTMPSPort = 1936 // default RTMPS host port for the media server + defaultHost = "localhost" // default mediaserver host name defaultChanSize = 64 // default channel size for asynchronous non-error channels imageNameMediaMTX = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx defaultStreamKey StreamKey = "live" // Default stream key. See [StreamKey]. @@ -47,9 +48,10 @@ type Actor struct { stateC chan domain.Source chanSize int containerClient *container.Client - apiPort int rtmpAddr domain.NetAddr - rtmpHost string + rtmpsAddr domain.NetAddr + apiPort int + host string streamKey StreamKey updateStateInterval time.Duration pass string // password for the media server @@ -64,16 +66,25 @@ type Actor struct { // NewActorParams contains the parameters for building a new media server // actor. type NewActorParams struct { - APIPort int // defaults to 9997 - RTMPAddr domain.NetAddr // defaults to 127.0.0.1:1935 - Host string // defaults to "localhost" - StreamKey StreamKey // defaults to "live" - ChanSize int // defaults to 64 - UpdateStateInterval time.Duration // defaults to 5 seconds + RTMPAddr OptionalNetAddr // defaults to disabled, or 127.0.0.1:1935 + RTMPSAddr OptionalNetAddr // defaults to disabled, or 127.0.0.1:1936 + APIPort int // defaults to 9997 + Host string // defaults to "localhost" + StreamKey StreamKey // defaults to "live" + ChanSize int // defaults to 64 + UpdateStateInterval time.Duration // defaults to 5 seconds ContainerClient *container.Client Logger *slog.Logger } +// OptionalNetAddr is a wrapper around domain.NetAddr that indicates whether it +// is enabled or not. +type OptionalNetAddr struct { + domain.NetAddr + + Enabled bool +} + // NewActor creates a new media server actor. // // Callers must consume the state channel exposed via [C]. @@ -87,15 +98,12 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error) return nil, fmt.Errorf("build API client: %w", err) } - rtmpAddr := params.RTMPAddr - rtmpAddr.IP = cmp.Or(rtmpAddr.IP, defaultRTMPIP) - rtmpAddr.Port = cmp.Or(rtmpAddr.Port, defaultRTMPPort) - chanSize := cmp.Or(params.ChanSize, defaultChanSize) return &Actor{ + rtmpAddr: toRTMPAddr(params.RTMPAddr, defaultRTMPPort), + rtmpsAddr: toRTMPAddr(params.RTMPSAddr, defaultRTMPSPort), apiPort: cmp.Or(params.APIPort, defaultAPIPort), - rtmpAddr: rtmpAddr, - rtmpHost: cmp.Or(params.Host, defaultHost), + host: cmp.Or(params.Host, defaultHost), streamKey: cmp.Or(params.StreamKey, defaultStreamKey), updateStateInterval: cmp.Or(params.UpdateStateInterval, defaultUpdateStateInterval), tlsCert: tlsCert, @@ -112,56 +120,37 @@ func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error) } func (a *Actor) Start(ctx context.Context) error { - apiPortSpec := nat.Port(fmt.Sprintf("127.0.0.1:%d:9997", a.apiPort)) - rtmpPortSpec := nat.Port(fmt.Sprintf("%s:%d:%d", a.rtmpAddr.IP, a.rtmpAddr.Port, 1935)) - exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)}) - - // The RTMP URL is passed to the UI via the state. - // This could be refactored, it's not really stateful data. - a.state.RTMPURL = a.RTMPURL() - - cfg, err := yaml.Marshal( - Config{ - LogLevel: "info", - LogDestinations: []string{"stdout"}, - AuthMethod: "internal", - AuthInternalUsers: []User{ - { - User: "any", - IPs: []string{}, // any IP - Permissions: []UserPermission{ - {Action: "publish"}, - }, - }, - { - User: "api", - Pass: a.pass, - IPs: []string{}, // any IP - Permissions: []UserPermission{ - {Action: "read"}, - }, - }, - { - User: "api", - Pass: a.pass, - IPs: []string{}, // any IP - Permissions: []UserPermission{{Action: "api"}}, - }, - }, - API: true, - APIEncryption: true, - APIServerCert: "/etc/tls.crt", - APIServerKey: "/etc/tls.key", - Paths: map[string]Path{ - string(a.streamKey): {Source: "publisher"}, - }, - }, - ) - if err != nil { // should never happen - return fmt.Errorf("marshal config: %w", err) + var portSpecs []string + portSpecs = append(portSpecs, fmt.Sprintf("127.0.0.1:%d:9997", a.apiPort)) + if !a.rtmpAddr.IsZero() { + portSpecs = append(portSpecs, fmt.Sprintf("%s:%d:%d", a.rtmpAddr.IP, a.rtmpAddr.Port, 1935)) + } + if !a.rtmpsAddr.IsZero() { + portSpecs = append(portSpecs, fmt.Sprintf("%s:%d:%d", a.rtmpsAddr.IP, a.rtmpsAddr.Port, 1936)) + } + exposedPorts, portBindings, err := nat.ParsePortSpecs(portSpecs) + if err != nil { + return fmt.Errorf("parse port specs: %w", err) } - a.logger.Info("Starting media server", "host", a.rtmpHost, "bind_ip", a.rtmpAddr.IP, "bind_port", a.rtmpAddr.Port) + cfg, err := a.buildServerConfig() + if err != nil { + return fmt.Errorf("build server config: %w", err) + } + + args := []any{"host", a.host} + if a.rtmpAddr.IsZero() { + args = append(args, "rtmp.enabled", false) + } else { + args = append(args, "rtmp.enabled", true, "rtmp.bind_addr", a.rtmpAddr.IP, "rtmp.bind_port", a.rtmpAddr.Port) + } + if a.rtmpsAddr.IsZero() { + args = append(args, "rtmps.enabled", false) + } else { + args = append(args, "rtmps.enabled", true, "rtmps.bind_addr", a.rtmpsAddr.IP, "rtmps.bind_port", a.rtmpsAddr.Port) + } + a.logger.Info("Starting media server", args...) + containerStateC, errC := a.containerClient.RunContainer( ctx, container.RunContainerParams{ @@ -224,6 +213,62 @@ func (a *Actor) Start(ctx context.Context) error { return nil } +func (a *Actor) buildServerConfig() ([]byte, error) { + // NOTE: Regardless of the user configuration (which mostly affects exposed + // ports and UI rendering) plain RTMP must be enabled at the container level, + // for internal connections. + var encryptionString string + if a.rtmpsAddr.IsZero() { + encryptionString = "no" + } else { + encryptionString = "optional" + } + + return yaml.Marshal( + Config{ + LogLevel: "debug", + LogDestinations: []string{"stdout"}, + AuthMethod: "internal", + AuthInternalUsers: []User{ + { + User: "any", + IPs: []string{}, // any IP + Permissions: []UserPermission{ + {Action: "publish"}, + }, + }, + { + User: "api", + Pass: a.pass, + IPs: []string{}, // any IP + Permissions: []UserPermission{ + {Action: "read"}, + }, + }, + { + User: "api", + Pass: a.pass, + IPs: []string{}, // any IP + Permissions: []UserPermission{{Action: "api"}}, + }, + }, + RTMP: true, + RTMPEncryption: encryptionString, + RTMPAddress: ":1935", + RTMPSAddress: ":1936", + RTMPServerCert: "/etc/tls.crt", // TODO: custom certs + RTMPServerKey: "/etc/tls.key", // TODO: custom certs + API: true, + APIEncryption: true, + APIServerCert: "/etc/tls.crt", + APIServerKey: "/etc/tls.key", + Paths: map[string]Path{ + string(a.streamKey): {Source: "publisher"}, + }, + }, + ) +} + // C returns a channel that will receive the current state of the media server. func (s *Actor) C() <-chan domain.Source { return s.stateC @@ -329,7 +374,20 @@ func (s *Actor) handleContainerExit(err error) { // RTMPURL returns the RTMP URL for the media server, accessible from the host. func (s *Actor) RTMPURL() string { - return fmt.Sprintf("rtmp://%s:%d/%s", s.rtmpHost, s.rtmpAddr.Port, s.streamKey) + if s.rtmpAddr.IsZero() { + return "" + } + + return fmt.Sprintf("rtmp://%s:%d/%s", s.host, s.rtmpAddr.Port, s.streamKey) +} + +// RTMPSURL returns the RTMPS URL for the media server, accessible from the host. +func (s *Actor) RTMPSURL() string { + if s.rtmpsAddr.IsZero() { + return "" + } + + return fmt.Sprintf("rtmps://%s:%d/%s", s.host, s.rtmpsAddr.Port, s.streamKey) } // RTMPInternalURL returns the RTMP URL for the media server, accessible from @@ -367,3 +425,17 @@ func generatePassword() string { _, _ = rand.Read(p) return fmt.Sprintf("%x", []byte(p)) } + +// toRTMPAddr builds a domain.NetAddr from an OptionalNetAddr, with default +// values set to RTMP default bind config if needed. If the OptionalNetAddr is +// not enabled, a zero value is returned. +func toRTMPAddr(a OptionalNetAddr, defaultPort int) domain.NetAddr { + if !a.Enabled { + return domain.NetAddr{} + } + + return domain.NetAddr{ + IP: cmp.Or(a.IP, defaultRTMPIP), + Port: cmp.Or(a.Port, defaultPort), + } +} diff --git a/internal/mediaserver/config.go b/internal/mediaserver/config.go index 45713b6..5af3157 100644 --- a/internal/mediaserver/config.go +++ b/internal/mediaserver/config.go @@ -17,8 +17,12 @@ type Config struct { APIEncryption bool `yaml:"apiEncryption,omitempty"` APIServerCert string `yaml:"apiServerCert,omitempty"` APIServerKey string `yaml:"apiServerKey,omitempty"` - RTMP bool `yaml:"rtmp,omitempty"` + RTMP bool `yaml:"rtmp"` + RTMPEncryption string `yaml:"rtmpEncryption,omitempty"` RTMPAddress string `yaml:"rtmpAddress,omitempty"` + RTMPSAddress string `yaml:"rtmpsAddress,omitempty"` + RTMPServerCert string `yaml:"rtmpServerCert,omitempty"` + RTMPServerKey string `yaml:"rtmpServerKey,omitempty"` HLS bool `yaml:"hls"` RTSP bool `yaml:"rtsp"` WebRTC bool `yaml:"webrtc"` diff --git a/internal/terminal/terminal.go b/internal/terminal/terminal.go index 00cd8ca..7693cfe 100644 --- a/internal/terminal/terminal.go +++ b/internal/terminal/terminal.go @@ -20,7 +20,6 @@ import ( ) type sourceViews struct { - url *tview.TextView status *tview.TextView tracks *tview.TextView health *tview.TextView @@ -44,6 +43,7 @@ type UI struct { commandC chan Command clipboardAvailable bool configFilePath string + rtmpURL, rtmpsURL string buildInfo domain.BuildInfo logger *slog.Logger @@ -57,6 +57,7 @@ type UI struct { sourceViews sourceViews destView *tview.Table noDestView *tview.TextView + aboutView *tview.Flex pullProgressModal *tview.Modal // other mutable state @@ -127,8 +128,8 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { sourceView := tview.NewFlex() sourceView.SetDirection(tview.FlexColumn) sourceView.SetBorder(true) - sourceView.SetTitle("Source RTMP server") - sidebar.AddItem(sourceView, 9, 0, false) + sourceView.SetTitle("Source") + sidebar.AddItem(sourceView, 8, 0, false) leftCol := tview.NewFlex() leftCol.SetDirection(tview.FlexRow) @@ -137,11 +138,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { sourceView.AddItem(leftCol, 9, 0, false) sourceView.AddItem(rightCol, 0, 1, false) - urlHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerURL) - leftCol.AddItem(urlHeaderTextView, 1, 0, false) - urlTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash) - rightCol.AddItem(urlTextView, 1, 0, false) - statusHeaderTextView := tview.NewTextView().SetDynamicColors(true).SetText("[grey]" + headerStatus) leftCol.AddItem(statusHeaderTextView, 1, 0, false) statusTextView := tview.NewTextView().SetDynamicColors(true).SetText("[white]" + dash) @@ -176,13 +172,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { aboutView.SetDirection(tview.FlexRow) aboutView.SetBorder(true) aboutView.SetTitle("Actions") - aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]a[-] Add destination"), 1, 0, false) - aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Del[-] Remove destination"), 1, 0, false) - aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Space[-] Start/stop destination"), 1, 0, false) - aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText(""), 1, 0, false) - aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]u[-] Copy source RTMP URL"), 1, 0, false) - aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false) - aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false) sidebar.AddItem(aboutView, 0, 1, false) @@ -232,7 +221,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { pages: pages, container: container, sourceViews: sourceViews{ - url: urlTextView, status: statusTextView, tracks: tracksTextView, health: healthTextView, @@ -242,6 +230,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { }, destView: destView, noDestView: noDestView, + aboutView: aboutView, pullProgressModal: pullProgressModal, urlsToStartState: make(map[string]startState), } @@ -254,6 +243,30 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { return ui, nil } +func (ui *UI) renderAboutView() { + ui.aboutView.Clear() + + ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]a[-] Add destination"), 1, 0, false) + ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Del[-] Remove destination"), 1, 0, false) + ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]Space[-] Start/stop destination"), 1, 0, false) + ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText(""), 1, 0, false) + + i := 1 + if ui.rtmpURL != "" { + rtmpURLView := tview.NewTextView().SetDynamicColors(true).SetText(fmt.Sprintf("[grey]F%d[-] Copy source RTMP URL", i)) + ui.aboutView.AddItem(rtmpURLView, 1, 0, false) + i++ + } + + if ui.rtmpsURL != "" { + rtmpsURLView := tview.NewTextView().SetDynamicColors(true).SetText(fmt.Sprintf("[grey]F%d[-] Copy source RTMPS URL", i)) + ui.aboutView.AddItem(rtmpsURLView, 1, 0, false) + } + + ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false) + ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false) +} + // C returns a channel that receives commands from the user interface. func (ui *UI) C() <-chan Command { return ui.commandC @@ -283,6 +296,17 @@ func (ui *UI) run(ctx context.Context) { } } +// SetRTMPURLs sets the RTMP and RTMPS URLs for the user interface, which are +// unavailable when the UI is first created. +func (ui *UI) SetRTMPURLs(rtmpURL, rtmpsURL string) { + ui.mu.Lock() + ui.rtmpURL = rtmpURL + ui.rtmpsURL = rtmpsURL + ui.mu.Unlock() + + ui.renderAboutView() +} + func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey { // Special case: handle CTRL-C even when a modal is visible. if event.Key() == tcell.KeyCtrlC { @@ -309,8 +333,6 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey { return nil case ' ': ui.toggleDestination() - case 'u', 'U': - ui.copySourceURLToClipboard(ui.clipboardAvailable) case 'c', 'C': ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath) case '?': @@ -318,6 +340,8 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey { case 'k': // tview vim bindings handleKeyUp() } + case tcell.KeyF1, tcell.KeyF2: + ui.fkeyHandler(event.Key()) case tcell.KeyDelete, tcell.KeyBackspace, tcell.KeyBackspace2: ui.removeDestination() return nil @@ -328,11 +352,34 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey { return event } +func (ui *UI) fkeyHandler(key tcell.Key) { + var urls []string + if ui.rtmpURL != "" { + urls = append(urls, ui.rtmpURL) + } + if ui.rtmpsURL != "" { + urls = append(urls, ui.rtmpsURL) + } + + switch key { + case tcell.KeyF1: + if len(urls) == 0 { + return + } + ui.copySourceURLToClipboard(urls[0]) + case tcell.KeyF2: + if len(urls) < 2 { + return + } + ui.copySourceURLToClipboard(urls[1]) + } +} + func (ui *UI) ShowSourceNotLiveModal() { ui.app.QueueUpdateDraw(func() { ui.showModal( - pageNameModalStartupCheck, - fmt.Sprintf("Waiting for stream.\nStart streaming to the source URL then try again:\n\n%s", ui.sourceViews.url.GetText(true)), + pageNameModalNotLive, + "Waiting for stream.\n\nStart streaming to a source URL then try again.", []string{"Ok"}, false, nil, @@ -519,6 +566,7 @@ func (ui *UI) updateProgressModal(container domain.Container) { const ( pageNameMain = "main" pageNameAddDestination = "add-destination" + pageNameViewURLs = "view-urls" pageNameConfigUpdateFailed = "modal-config-update-failed" pageNameNoDestinations = "no-destinations" pageNameModalAbout = "modal-about" @@ -530,6 +578,7 @@ const ( pageNameModalRemoveDestination = "modal-remove-destination" pageNameModalSourceError = "modal-source-error" pageNameModalStartupCheck = "modal-startup-check" + pageNameModalNotLive = "modal-not-live" ) // modalVisible returns true if any modal, including the add destination form, @@ -696,8 +745,6 @@ func (ui *UI) redrawFromState(state domain.AppState) { SetSelectable(false) } - ui.sourceViews.url.SetText(cmp.Or(state.Source.RTMPURL, dash)) - tracks := dash if state.Source.Live && len(state.Source.Tracks) > 0 { tracks = strings.Join(state.Source.Tracks, ", ") @@ -971,13 +1018,12 @@ func (ui *UI) toggleDestination() { } } -func (ui *UI) copySourceURLToClipboard(clipboardAvailable bool) { +func (ui *UI) copySourceURLToClipboard(url string) { var text string - url := ui.sourceViews.url.GetText(true) - if clipboardAvailable { + if ui.clipboardAvailable { clipboard.Write(clipboard.FmtText, []byte(url)) - text = "Source URL copied to clipboard:\n\n" + url + text = "URL copied to clipboard:\n\n" + url } else { text = "Copy to clipboard not available:\n\n" + url }