diff --git a/internal/app/app.go b/internal/app/app.go index 88593c1..9bb6507 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -12,14 +12,13 @@ import ( "git.netflux.io/rob/octoplex/internal/mediaserver" "git.netflux.io/rob/octoplex/internal/multiplexer" "git.netflux.io/rob/octoplex/internal/terminal" - "github.com/gdamore/tcell/v2" ) // RunParams holds the parameters for running the application. type RunParams struct { Config config.Config DockerClient container.DockerClient - Screen tcell.Screen + Screen *terminal.Screen // Screen may be nil. ClipboardAvailable bool ConfigFilePath string BuildInfo domain.BuildInfo diff --git a/internal/app/integration_test.go b/internal/app/integration_test.go index fb5d648..a97c835 100644 --- a/internal/app/integration_test.go +++ b/internal/app/integration_test.go @@ -4,47 +4,161 @@ package app_test import ( "context" + "sync" "testing" "time" "git.netflux.io/rob/octoplex/internal/app" "git.netflux.io/rob/octoplex/internal/config" "git.netflux.io/rob/octoplex/internal/domain" + "git.netflux.io/rob/octoplex/internal/terminal" "git.netflux.io/rob/octoplex/internal/testhelpers" dockerclient "github.com/docker/docker/client" "github.com/gdamore/tcell/v2" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestIntegration(t *testing.T) { - ctx, cancel := context.WithCancel(t.Context()) + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Minute) defer cancel() - logger := testhelpers.NewTestLogger() + logger := testhelpers.NewTestLogger().With("component", "integration") dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv) require.NoError(t, err) + // Fetching the screen contents is tricky at this level of the test pyramid, + // because we need to: + // + // 1. Somehow capture the screen contents, which is only available via the + // tcell.SimulationScreen, and... + // 2. Do so without triggering data races. + // + // We can achieve this by passing a channel into the terminal actor, which + // will send screen captures after each render. This can be stored locally + // and asserted against when needed. + var ( + screenCells []tcell.SimCell + screenWidth int + screenMu sync.Mutex + ) + + getContents := func() []string { + screenMu.Lock() + defer screenMu.Unlock() + + var lines []string + for n, _ := range screenCells { + y := n / screenWidth + + if y > len(lines)-1 { + lines = append(lines, "") + } + lines[y] += string(screenCells[n].Runes[0]) + } + + return lines + } + + screen := tcell.NewSimulationScreen("") + screenCaptureC := make(chan terminal.ScreenCapture, 1) + go func() { + for { + select { + case <-ctx.Done(): + return + case capture := <-screenCaptureC: + screenMu.Lock() + screenCells = capture.Cells + screenWidth = capture.Width + screenMu.Unlock() + } + } + }() + done := make(chan struct{}) go func() { - require.NoError(t, app.Run(ctx, app.RunParams{ - Config: config.Config{}, - DockerClient: dockerClient, - Screen: tcell.NewSimulationScreen(""), + err := app.Run(ctx, app.RunParams{ + Config: config.Config{ + // We use the mediaserver as the destination server, just because it is + // reachable from the docker network via mediaserver:1935. + Destinations: []config.Destination{ + { + Name: "Local server 1", + URL: "rtmp://mediaserver:1935/live/dest1", + }, + { + Name: "Local server 2", + URL: "rtmp://mediaserver:1935/live/dest2", + }, + }, + }, + DockerClient: dockerClient, + Screen: &terminal.Screen{ + Screen: screen, + Width: 160, + Height: 25, + CaptureC: screenCaptureC, + }, ClipboardAvailable: false, BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"}, Logger: logger, - })) + }) + require.NoError(t, err) done <- struct{}{} }() - // For now, just launch the app and wait for a few seconds. - // This is mostly useful to verify there are no obvious data races (when - // running with -race). - // See https://github.com/rivo/tview/wiki/Concurrency. - // - // TODO: test more user journeys. - time.Sleep(time.Second * 5) + // Wait for mediaserver container to start: + time.Sleep(5 * time.Second) + + // Start streaming a test video to the app: + testhelpers.StreamFLV(t, "rtmp://localhost:1935/live") + time.Sleep(10 * time.Second) + + // Start destinations: + screen.PostEvent(tcell.NewEventKey(tcell.KeyRune, ' ', tcell.ModNone)) + screen.PostEvent(tcell.NewEventKey(tcell.KeyDown, ' ', tcell.ModNone)) + screen.PostEvent(tcell.NewEventKey(tcell.KeyRune, ' ', tcell.ModNone)) + + time.Sleep(15 * time.Second) + + contents := getContents() + require.True(t, len(contents) > 2, "expected at least 3 lines of output") + + require.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + require.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264") + require.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy") + + require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending") + assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy") + + require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present") + assert.Contains(t, contents[3], "sending", "expected local server 2 to be sending") + assert.Contains(t, contents[3], "healthy", "expected local server 2 to be healthy") + + // Stop destinations: + screen.PostEvent(tcell.NewEventKey(tcell.KeyRune, ' ', tcell.ModNone)) + screen.PostEvent(tcell.NewEventKey(tcell.KeyUp, ' ', tcell.ModNone)) + screen.PostEvent(tcell.NewEventKey(tcell.KeyRune, ' ', tcell.ModNone)) + + time.Sleep(10 * time.Second) + + contents = getContents() + require.True(t, len(contents) > 2, "expected at least 3 lines of output") + + require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited") + + require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present") + assert.Contains(t, contents[3], "exited", "expected local server 2 to have exited") + + // TODO: + // - Source error + // - Destination error + // - Additional features (copy URL, etc.) + cancel() <-done diff --git a/internal/mediaserver/integration_test.go b/internal/mediaserver/integration_test.go deleted file mode 100644 index 151a58c..0000000 --- a/internal/mediaserver/integration_test.go +++ /dev/null @@ -1,110 +0,0 @@ -//go:build integration - -package mediaserver_test - -import ( - "fmt" - "testing" - "time" - - "git.netflux.io/rob/octoplex/internal/container" - "git.netflux.io/rob/octoplex/internal/mediaserver" - "git.netflux.io/rob/octoplex/internal/testhelpers" - "github.com/docker/docker/client" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const component = "mediaserver" - -func TestIntegrationMediaServerStartStop(t *testing.T) { - logger := testhelpers.NewTestLogger() - apiClient, err := client.NewClientWithOpts(client.FromEnv) - require.NoError(t, err) - - containerClient, err := container.NewClient(t.Context(), apiClient, logger) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, containerClient.Close()) }) - - running, err := containerClient.ContainerRunning(t.Context(), containerClient.ContainersWithLabels(map[string]string{container.LabelComponent: component})) - require.NoError(t, err) - assert.False(t, running) - - // We need to avoid clashing with other integration tests, e.g. multiplexer. - const ( - apiPort = 9999 - rtmpPort = 1937 - ) - - rtmpURL := fmt.Sprintf("rtmp://localhost:%d/live", rtmpPort) - - mediaServer := mediaserver.StartActor(t.Context(), mediaserver.StartActorParams{ - RTMPPort: rtmpPort, - APIPort: apiPort, - FetchIngressStateInterval: 500 * time.Millisecond, - ChanSize: 1, - ContainerClient: containerClient, - Logger: logger, - }) - require.NoError(t, err) - t.Cleanup(func() { mediaServer.Close() }) - testhelpers.ChanDiscard(mediaServer.C()) - - require.Eventually( - t, - func() bool { - running, err = containerClient.ContainerRunning(t.Context(), containerClient.ContainersWithLabels(map[string]string{container.LabelComponent: component})) - return err == nil && running - }, - time.Second*10, - time.Second, - "container not in RUNNING state", - ) - - state := mediaServer.State() - assert.False(t, state.Live) - assert.Equal(t, rtmpURL, state.RTMPURL) - - testhelpers.StreamFLV(t, rtmpURL) - - require.Eventually( - t, - func() bool { - currState := mediaServer.State() - return currState.Live && - !currState.LiveChangedAt.IsZero() && - currState.Container.HealthState == "healthy" - }, - time.Second*5, - time.Second, - "actor not healthy and/or in LIVE state", - ) - - require.Eventually( - t, - func() bool { - currState := mediaServer.State() - return len(currState.Tracks) == 1 && currState.Tracks[0] == "H264" - }, - time.Second*5, - time.Second, - "tracks not updated", - ) - - require.Eventually( - t, - func() bool { - currState := mediaServer.State() - return currState.Container.RxRate > 500 - }, - time.Second*10, - time.Second, - "rxRate not updated", - ) - - mediaServer.Close() - - running, err = containerClient.ContainerRunning(t.Context(), containerClient.ContainersWithLabels(map[string]string{container.LabelComponent: component})) - require.NoError(t, err) - assert.False(t, running) -} diff --git a/internal/multiplexer/integration_test.go b/internal/multiplexer/integration_test.go deleted file mode 100644 index f06a630..0000000 --- a/internal/multiplexer/integration_test.go +++ /dev/null @@ -1,93 +0,0 @@ -//go:build integration - -package multiplexer_test - -import ( - "testing" - "time" - - "git.netflux.io/rob/octoplex/internal/container" - "git.netflux.io/rob/octoplex/internal/mediaserver" - "git.netflux.io/rob/octoplex/internal/multiplexer" - "git.netflux.io/rob/octoplex/internal/testhelpers" - "github.com/docker/docker/client" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const component = "multiplexer" - -func TestIntegrationMultiplexer(t *testing.T) { - logger := testhelpers.NewTestLogger() - apiClient, err := client.NewClientWithOpts(client.FromEnv) - require.NoError(t, err) - - containerClient, err := container.NewClient(t.Context(), apiClient, logger) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, containerClient.Close()) }) - - running, err := containerClient.ContainerRunning(t.Context(), containerClient.ContainersWithLabels(map[string]string{container.LabelComponent: component})) - require.NoError(t, err) - assert.False(t, running) - - // We need to avoid clashing with other integration tests, e.g. mediaserver. - const ( - apiPort = 9998 - rtmpPort = 19350 - ) - - srv := mediaserver.StartActor(t.Context(), mediaserver.StartActorParams{ - RTMPPort: rtmpPort, - APIPort: apiPort, - FetchIngressStateInterval: 250 * time.Millisecond, - ContainerClient: containerClient, - ChanSize: 1, - Logger: logger, - }) - defer srv.Close() - testhelpers.ChanDiscard(srv.C()) - - time.Sleep(2 * time.Second) - testhelpers.StreamFLV(t, srv.State().RTMPURL) - - require.Eventually( - t, - func() bool { return srv.State().Live }, - time.Second*10, - time.Second, - "source not live", - ) - - mp := multiplexer.NewActor(t.Context(), multiplexer.NewActorParams{ - SourceURL: srv.State().RTMPInternalURL, - ChanSize: 1, - ContainerClient: containerClient, - Logger: logger, - }) - defer mp.Close() - testhelpers.ChanDiscard(mp.C()) - - requireListeners(t, srv, 0) - - mp.StartDestination("rtmp://mediaserver:19350/destination/test1") - mp.StartDestination("rtmp://mediaserver:19350/destination/test2") - mp.StartDestination("rtmp://mediaserver:19350/destination/test3") - requireListeners(t, srv, 3) - - mp.StopDestination("rtmp://mediaserver:19350/destination/test3") - requireListeners(t, srv, 2) - - mp.StopDestination("rtmp://mediaserver:19350/destination/test2") - mp.StopDestination("rtmp://mediaserver:19350/destination/test1") - requireListeners(t, srv, 0) -} - -func requireListeners(t *testing.T, srv *mediaserver.Actor, expected int) { - require.Eventually( - t, - func() bool { return srv.State().Listeners == expected }, - time.Second*10, - time.Second, - "expected %d listeners", expected, - ) -} diff --git a/internal/terminal/terminal.go b/internal/terminal/terminal.go index caedc7a..f2dfd7e 100644 --- a/internal/terminal/terminal.go +++ b/internal/terminal/terminal.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "log/slog" + "slices" "strconv" "strings" "sync" @@ -44,10 +45,12 @@ type UI struct { // tview state - app *tview.Application - pages *tview.Pages - sourceViews sourceViews - destView *tview.Table + app *tview.Application + screen tcell.Screen + screenCaptureC chan<- ScreenCapture + pages *tview.Pages + sourceViews sourceViews + destView *tview.Table // other mutable state @@ -56,6 +59,21 @@ type UI struct { allowQuit bool } +// Screen represents a terminal screen. This includes its desired dimensions, +// which is required to initialize the tcell.SimulationScreen. +type Screen struct { + Screen tcell.Screen + Width, Height int + CaptureC chan<- ScreenCapture +} + +// ScreenCapture represents a screen capture, which is used for integration +// testing with the tcell.SimulationScreen. +type ScreenCapture struct { + Cells []tcell.SimCell + Width, Height int +} + // StartParams contains the parameters for starting a new terminal user // interface. type StartParams struct { @@ -64,7 +82,7 @@ type StartParams struct { ClipboardAvailable bool ConfigFilePath string BuildInfo domain.BuildInfo - Screen tcell.Screen + Screen *Screen // Screen may be nil. } const defaultChanSize = 64 @@ -76,9 +94,17 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { app := tview.NewApplication() - // Allow the tcell screen to be overridden for integration tests. If - // params.Screen is nil, the real terminal is used. - app.SetScreen(params.Screen) + var screen tcell.Screen + var screenCaptureC chan<- ScreenCapture + if params.Screen != nil { + screen = params.Screen.Screen + screenCaptureC = params.Screen.CaptureC + // Allow the tcell screen to be overridden for integration tests. If + // params.Screen is nil, the real terminal is used. + app.SetScreen(screen) + // SetSize must be called after SetScreen: + screen.SetSize(params.Screen.Width, params.Screen.Height) + } sidebar := tview.NewFlex() sidebar.SetDirection(tview.FlexRow) @@ -162,11 +188,13 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { app.EnableMouse(false) ui := &UI{ - commandCh: commandCh, - buildInfo: params.BuildInfo, - logger: params.Logger, - app: app, - pages: pages, + commandCh: commandCh, + buildInfo: params.BuildInfo, + logger: params.Logger, + app: app, + screen: screen, + screenCaptureC: screenCaptureC, + pages: pages, sourceViews: sourceViews{ url: urlTextView, status: statusTextView, @@ -201,6 +229,10 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { return event }) + if ui.screenCaptureC != nil { + app.SetAfterDrawFunc(ui.captureScreen) + } + go ui.run(ctx) return ui, nil @@ -297,6 +329,25 @@ func (ui *UI) AllowQuit() { ui.allowQuit = true } +// captureScreen captures the screen and sends it to the screenCaptureC +// channel, which must have been set in StartParams. +// +// This is required for integration testing because GetContents() must be +// called inside the tview goroutine to avoid data races. +func (ui *UI) captureScreen(screen tcell.Screen) { + simScreen, ok := screen.(tcell.SimulationScreen) + if !ok { + ui.logger.Error("simulation screen not available") + } + + cells, w, h := simScreen.GetContents() + ui.screenCaptureC <- ScreenCapture{ + Cells: slices.Clone(cells), + Width: w, + Height: h, + } +} + // SetState sets the state of the terminal user interface. func (ui *UI) SetState(state domain.AppState) { if state.Source.ExitReason != "" { diff --git a/internal/testhelpers/ffmpeg.go b/internal/testhelpers/ffmpeg.go index c4d50bb..ad756dd 100644 --- a/internal/testhelpers/ffmpeg.go +++ b/internal/testhelpers/ffmpeg.go @@ -27,6 +27,8 @@ func StreamFLV(t *testing.T, destURL string) { "-f", "flv", destURL, ) + // Uncomment to view output: + // cmd.Stderr = os.Stderr require.NoError(t, cmd.Start()) t.Cleanup(func() { diff --git a/mise/config.toml b/mise/config.toml index b1afba8..fdee14e 100644 --- a/mise/config.toml +++ b/mise/config.toml @@ -16,12 +16,12 @@ alias = "ti" [tasks.test_ci] description = "Run tests in CI" dir = "{{cwd}}" -run = "go test -v -count 1 -parallel 1 -race ./..." +run = "go test -v -count 1 -race ./..." [tasks.test_integration_ci] description = "Run integration tests in CI" dir = "{{cwd}}" -run = "go test -v -race -tags=integration -run TestIntegration ./..." +run = "go test -v -count 1 -race -parallel 1 -tags=integration -run TestIntegration ./..." [tasks.lint] description = "Run linters"