From c999e418f53bcbcf0e27192a021f3e0f463e8e9b Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Sun, 13 Apr 2025 09:49:16 +0200 Subject: [PATCH] fixup! refactor(container): restart handling --- internal/app/integration_helpers_test.go | 34 ++++ internal/app/integration_test.go | 198 +++++++++++++++++++++-- internal/app/testdata/mediamtx.yml | 12 ++ 3 files changed, 231 insertions(+), 13 deletions(-) create mode 100644 internal/app/testdata/mediamtx.yml diff --git a/internal/app/integration_helpers_test.go b/internal/app/integration_helpers_test.go index b93cd8e..e4427e2 100644 --- a/internal/app/integration_helpers_test.go +++ b/internal/app/integration_helpers_test.go @@ -3,7 +3,10 @@ package app_test import ( + "encoding/json" "fmt" + "io" + "net/http" "os" "strings" "sync" @@ -14,6 +17,7 @@ import ( "git.netflux.io/rob/octoplex/internal/terminal" "github.com/gdamore/tcell/v2" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" ) func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) { @@ -128,3 +132,33 @@ func sendBackspaces(screen tcell.SimulationScreen, n int) { } time.Sleep(500 * time.Millisecond) } + +// kickFirstRTMPConn kicks the first RTMP connection from the mediaMTX server. +func kickFirstRTMPConn(t *testing.T, srv testcontainers.Container) { + type conn struct { + ID string `json:"id"` + } + + type apiResponse struct { + Items []conn `json:"items"` + } + + port, err := srv.MappedPort(t.Context(), "9997/tcp") + require.NoError(t, err) + + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v3/rtmpconns/list", port.Int())) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + respBody, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + var apiResp apiResponse + require.NoError(t, json.Unmarshal(respBody, &apiResp)) + require.NoError(t, err) + require.True(t, len(apiResp.Items) > 0, "No RTMP connections found") + + resp, err = http.Post(fmt.Sprintf("http://localhost:%d/v3/rtmpconns/kick/%s", port.Int(), apiResp.Items[0].ID), "application/json", nil) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) +} diff --git a/internal/app/integration_test.go b/internal/app/integration_test.go index eb0ae57..3b924bb 100644 --- a/internal/app/integration_test.go +++ b/internal/app/integration_test.go @@ -5,6 +5,7 @@ package app_test import ( "cmp" "context" + _ "embed" "errors" "fmt" "log/slog" @@ -21,7 +22,6 @@ import ( "git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/terminal" "git.netflux.io/rob/octoplex/internal/testhelpers" - typescontainer "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" dockerclient "github.com/docker/docker/client" "github.com/docker/docker/errdefs" @@ -43,6 +43,12 @@ func TestIntegration(t *testing.T) { }) } +// hostIP is the IP address of the Docker host from within the container. +// +// This probably only works for Linux. +// https://stackoverflow.com/a/60740997/62871 +const hostIP = "172.17.0.1" + func testIntegration(t *testing.T, streamKey string) { ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) defer cancel() @@ -68,18 +74,6 @@ func testIntegration(t *testing.T, streamKey string) { dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) require.NoError(t, err) - // List existing containers to debug Github Actions environment. - containers, err := dockerClient.ContainerList(ctx, typescontainer.ListOptions{}) - require.NoError(t, err) - - if len(containers) == 0 { - logger.Info("No existing containers found") - } else { - for _, ctr := range containers { - logger.Info("Container", "id", ctr.ID, "name", ctr.Names, "image", ctr.Image, "started", ctr.Created, "labels", ctr.Labels) - } - } - screen, screenCaptureC, getContents := setupSimulationScreen(t) // https://stackoverflow.com/a/60740997/62871 @@ -273,6 +267,184 @@ func testIntegration(t *testing.T, streamKey string) { <-done } +func TestIntegrationRestartDestination(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) + defer cancel() + + destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "bluenviron/mediamtx:latest", + Env: map[string]string{"MTX_LOGLEVEL": "debug"}, + ExposedPorts: []string{"1936/tcp", "9997/tcp"}, + WaitingFor: wait.ForListeningPort("1936/tcp"), + }, + Started: false, + }) + testcontainers.CleanupContainer(t, destServer) + require.NoError(t, err) + + require.NoError(t, destServer.CopyFileToContainer(t.Context(), "testdata/mediamtx.yml", "/mediamtx.yml", 0600)) + require.NoError(t, destServer.Start(ctx)) + + destServerRTMPPort, err := destServer.MappedPort(ctx, "1936/tcp") + require.NoError(t, err) + + logger := testhelpers.NewTestLogger(t).With("component", "integration") + logger.Info("Initialised logger", "debug_level", logger.Enabled(ctx, slog.LevelDebug), "runner_debug", os.Getenv("RUNNER_DEBUG")) + dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) + require.NoError(t, err) + + screen, screenCaptureC, getContents := setupSimulationScreen(t) + + configService := setupConfigService(t, config.Config{ + Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}}, + Destinations: []config.Destination{{ + Name: "Local server 1", + URL: fmt.Sprintf("rtmp://%s:%d/live", hostIP, destServerRTMPPort.Int()), + }}, + }) + + done := make(chan struct{}) + go func() { + defer func() { + done <- struct{}{} + }() + + err := app.Run(ctx, app.RunParams{ + ConfigService: configService, + DockerClient: dockerClient, + Screen: &terminal.Screen{ + Screen: screen, + Width: 160, + Height: 25, + CaptureC: screenCaptureC, + }, + ClipboardAvailable: false, + BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"}, + Logger: logger, + }) + require.NoError(t, err) + }() + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + contents := getContents() + require.True(t, len(contents) > 2, "expected at least 3 lines of output") + + assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting") + }, + 2*time.Minute, + time.Second, + "expected the mediaserver to start", + ) + printScreen(getContents, "After starting the mediaserver") + + // Start streaming a test video to the app: + testhelpers.StreamFLV(t, "rtmp://localhost:1935/live") + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + contents := getContents() + require.True(t, len(contents) > 3, "expected at least 3 lines of output") + + assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + }, + time.Minute, + time.Second, + "expected to receive an ingress stream", + ) + printScreen(getContents, "After receiving the ingress stream") + + // Start destination: + sendKey(screen, tcell.KeyRune, ' ') + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + contents := getContents() + require.True(t, len(contents) > 4, "expected at least 5 lines of output") + + assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + + require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending") + assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy") + }, + 2*time.Minute, + time.Second, + "expected to start the destination stream", + ) + printScreen(getContents, "After starting the destination stream") + + // Wait for enough time that the container will be restarted. + // Then, kick the connection to force a restart. + time.Sleep(15 * time.Second) + kickFirstRTMPConn(t, destServer) + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + contents := getContents() + require.True(t, len(contents) > 3, "expected at least 3 lines of output") + + assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + + require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(t, contents[2], "off-air", "expected local server 1 to be off-air") + assert.Contains(t, contents[2], "restarting", "expected local server 1 to be restarting") + }, + 20*time.Second, + time.Second, + "expected to begin restarting", + ) + printScreen(getContents, "After stopping the destination server") + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + contents := getContents() + require.True(t, len(contents) > 4, "expected at least 4 lines of output") + + assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + + require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending") + assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy") + }, + 2*time.Minute, + time.Second, + "expected to restart the destination stream", + ) + printScreen(getContents, "After restarting the destination stream") + + // Stop destination. + sendKey(screen, tcell.KeyRune, ' ') + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + contents := getContents() + require.True(t, len(contents) > 4, "expected at least 4 lines of output") + + require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited") + + require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present") + }, + time.Minute, + time.Second, + "expected to stop the destination stream", + ) + + printScreen(getContents, "After stopping the destination") + + cancel() + + <-done +} + func TestIntegrationStartDestinationFailed(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) defer cancel() diff --git a/internal/app/testdata/mediamtx.yml b/internal/app/testdata/mediamtx.yml new file mode 100644 index 0000000..f0f2386 --- /dev/null +++ b/internal/app/testdata/mediamtx.yml @@ -0,0 +1,12 @@ +rtmp: true +rtmpAddress: :1936 +api: true +authInternalUsers: +- user: any + ips: [] + permissions: + - action: api + - action: read + - action: publish +paths: + live: