fixup! refactor(container): restart handling

This commit is contained in:
Rob Watson 2025-04-13 09:49:16 +02:00
parent 3bcc682df5
commit c999e418f5
3 changed files with 231 additions and 13 deletions

View File

@ -3,7 +3,10 @@
package app_test package app_test
import ( import (
"encoding/json"
"fmt" "fmt"
"io"
"net/http"
"os" "os"
"strings" "strings"
"sync" "sync"
@ -14,6 +17,7 @@ import (
"git.netflux.io/rob/octoplex/internal/terminal" "git.netflux.io/rob/octoplex/internal/terminal"
"github.com/gdamore/tcell/v2" "github.com/gdamore/tcell/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
) )
func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) { func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) {
@ -128,3 +132,33 @@ func sendBackspaces(screen tcell.SimulationScreen, n int) {
} }
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
} }
// kickFirstRTMPConn kicks the first RTMP connection from the mediaMTX server.
func kickFirstRTMPConn(t *testing.T, srv testcontainers.Container) {
type conn struct {
ID string `json:"id"`
}
type apiResponse struct {
Items []conn `json:"items"`
}
port, err := srv.MappedPort(t.Context(), "9997/tcp")
require.NoError(t, err)
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v3/rtmpconns/list", port.Int()))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
respBody, err := io.ReadAll(resp.Body)
require.NoError(t, err)
var apiResp apiResponse
require.NoError(t, json.Unmarshal(respBody, &apiResp))
require.NoError(t, err)
require.True(t, len(apiResp.Items) > 0, "No RTMP connections found")
resp, err = http.Post(fmt.Sprintf("http://localhost:%d/v3/rtmpconns/kick/%s", port.Int(), apiResp.Items[0].ID), "application/json", nil)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
}

View File

@ -5,6 +5,7 @@ package app_test
import ( import (
"cmp" "cmp"
"context" "context"
_ "embed"
"errors" "errors"
"fmt" "fmt"
"log/slog" "log/slog"
@ -21,7 +22,6 @@ import (
"git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/terminal" "git.netflux.io/rob/octoplex/internal/terminal"
"git.netflux.io/rob/octoplex/internal/testhelpers" "git.netflux.io/rob/octoplex/internal/testhelpers"
typescontainer "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/network"
dockerclient "github.com/docker/docker/client" dockerclient "github.com/docker/docker/client"
"github.com/docker/docker/errdefs" "github.com/docker/docker/errdefs"
@ -43,6 +43,12 @@ func TestIntegration(t *testing.T) {
}) })
} }
// hostIP is the IP address of the Docker host from within the container.
//
// This probably only works for Linux.
// https://stackoverflow.com/a/60740997/62871
const hostIP = "172.17.0.1"
func testIntegration(t *testing.T, streamKey string) { func testIntegration(t *testing.T, streamKey string) {
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
defer cancel() defer cancel()
@ -68,18 +74,6 @@ func testIntegration(t *testing.T, streamKey string) {
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
require.NoError(t, err) require.NoError(t, err)
// List existing containers to debug Github Actions environment.
containers, err := dockerClient.ContainerList(ctx, typescontainer.ListOptions{})
require.NoError(t, err)
if len(containers) == 0 {
logger.Info("No existing containers found")
} else {
for _, ctr := range containers {
logger.Info("Container", "id", ctr.ID, "name", ctr.Names, "image", ctr.Image, "started", ctr.Created, "labels", ctr.Labels)
}
}
screen, screenCaptureC, getContents := setupSimulationScreen(t) screen, screenCaptureC, getContents := setupSimulationScreen(t)
// https://stackoverflow.com/a/60740997/62871 // https://stackoverflow.com/a/60740997/62871
@ -273,6 +267,184 @@ func testIntegration(t *testing.T, streamKey string) {
<-done <-done
} }
func TestIntegrationRestartDestination(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
defer cancel()
destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "bluenviron/mediamtx:latest",
Env: map[string]string{"MTX_LOGLEVEL": "debug"},
ExposedPorts: []string{"1936/tcp", "9997/tcp"},
WaitingFor: wait.ForListeningPort("1936/tcp"),
},
Started: false,
})
testcontainers.CleanupContainer(t, destServer)
require.NoError(t, err)
require.NoError(t, destServer.CopyFileToContainer(t.Context(), "testdata/mediamtx.yml", "/mediamtx.yml", 0600))
require.NoError(t, destServer.Start(ctx))
destServerRTMPPort, err := destServer.MappedPort(ctx, "1936/tcp")
require.NoError(t, err)
logger := testhelpers.NewTestLogger(t).With("component", "integration")
logger.Info("Initialised logger", "debug_level", logger.Enabled(ctx, slog.LevelDebug), "runner_debug", os.Getenv("RUNNER_DEBUG"))
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
require.NoError(t, err)
screen, screenCaptureC, getContents := setupSimulationScreen(t)
configService := setupConfigService(t, config.Config{
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}},
Destinations: []config.Destination{{
Name: "Local server 1",
URL: fmt.Sprintf("rtmp://%s:%d/live", hostIP, destServerRTMPPort.Int()),
}},
})
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
err := app.Run(ctx, app.RunParams{
ConfigService: configService,
DockerClient: dockerClient,
Screen: &terminal.Screen{
Screen: screen,
Width: 160,
Height: 25,
CaptureC: screenCaptureC,
},
ClipboardAvailable: false,
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
Logger: logger,
})
require.NoError(t, err)
}()
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
contents := getContents()
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
},
2*time.Minute,
time.Second,
"expected the mediaserver to start",
)
printScreen(getContents, "After starting the mediaserver")
// Start streaming a test video to the app:
testhelpers.StreamFLV(t, "rtmp://localhost:1935/live")
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
contents := getContents()
require.True(t, len(contents) > 3, "expected at least 3 lines of output")
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
},
time.Minute,
time.Second,
"expected to receive an ingress stream",
)
printScreen(getContents, "After receiving the ingress stream")
// Start destination:
sendKey(screen, tcell.KeyRune, ' ')
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
contents := getContents()
require.True(t, len(contents) > 4, "expected at least 5 lines of output")
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
},
2*time.Minute,
time.Second,
"expected to start the destination stream",
)
printScreen(getContents, "After starting the destination stream")
// Wait for enough time that the container will be restarted.
// Then, kick the connection to force a restart.
time.Sleep(15 * time.Second)
kickFirstRTMPConn(t, destServer)
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
contents := getContents()
require.True(t, len(contents) > 3, "expected at least 3 lines of output")
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
assert.Contains(t, contents[2], "off-air", "expected local server 1 to be off-air")
assert.Contains(t, contents[2], "restarting", "expected local server 1 to be restarting")
},
20*time.Second,
time.Second,
"expected to begin restarting",
)
printScreen(getContents, "After stopping the destination server")
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
contents := getContents()
require.True(t, len(contents) > 4, "expected at least 4 lines of output")
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
},
2*time.Minute,
time.Second,
"expected to restart the destination stream",
)
printScreen(getContents, "After restarting the destination stream")
// Stop destination.
sendKey(screen, tcell.KeyRune, ' ')
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
contents := getContents()
require.True(t, len(contents) > 4, "expected at least 4 lines of output")
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited")
require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present")
},
time.Minute,
time.Second,
"expected to stop the destination stream",
)
printScreen(getContents, "After stopping the destination")
cancel()
<-done
}
func TestIntegrationStartDestinationFailed(t *testing.T) { func TestIntegrationStartDestinationFailed(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
defer cancel() defer cancel()

12
internal/app/testdata/mediamtx.yml vendored Normal file
View File

@ -0,0 +1,12 @@
rtmp: true
rtmpAddress: :1936
api: true
authInternalUsers:
- user: any
ips: []
permissions:
- action: api
- action: read
- action: publish
paths:
live: