test(integration): extend app test

This commit is contained in:
Rob Watson 2025-03-15 22:31:45 +01:00
parent 65db62166e
commit 9314506c75
7 changed files with 197 additions and 234 deletions

View File

@ -12,14 +12,13 @@ import (
"git.netflux.io/rob/octoplex/internal/mediaserver"
"git.netflux.io/rob/octoplex/internal/multiplexer"
"git.netflux.io/rob/octoplex/internal/terminal"
"github.com/gdamore/tcell/v2"
)
// RunParams holds the parameters for running the application.
type RunParams struct {
Config config.Config
DockerClient container.DockerClient
Screen tcell.Screen
Screen *terminal.Screen // Screen may be nil.
ClipboardAvailable bool
ConfigFilePath string
BuildInfo domain.BuildInfo

View File

@ -4,47 +4,161 @@ package app_test
import (
"context"
"sync"
"testing"
"time"
"git.netflux.io/rob/octoplex/internal/app"
"git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/terminal"
"git.netflux.io/rob/octoplex/internal/testhelpers"
dockerclient "github.com/docker/docker/client"
"github.com/gdamore/tcell/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestIntegration(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
ctx, cancel := context.WithTimeout(t.Context(), 2*time.Minute)
defer cancel()
logger := testhelpers.NewTestLogger()
logger := testhelpers.NewTestLogger().With("component", "integration")
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv)
require.NoError(t, err)
// Fetching the screen contents is tricky at this level of the test pyramid,
// because we need to:
//
// 1. Somehow capture the screen contents, which is only available via the
// tcell.SimulationScreen, and...
// 2. Do so without triggering data races.
//
// We can achieve this by passing a channel into the terminal actor, which
// will send screen captures after each render. This can be stored locally
// and asserted against when needed.
var (
screenCells []tcell.SimCell
screenWidth int
screenMu sync.Mutex
)
getContents := func() []string {
screenMu.Lock()
defer screenMu.Unlock()
var lines []string
for n, _ := range screenCells {
y := n / screenWidth
if y > len(lines)-1 {
lines = append(lines, "")
}
lines[y] += string(screenCells[n].Runes[0])
}
return lines
}
screen := tcell.NewSimulationScreen("")
screenCaptureC := make(chan terminal.ScreenCapture, 1)
go func() {
for {
select {
case <-ctx.Done():
return
case capture := <-screenCaptureC:
screenMu.Lock()
screenCells = capture.Cells
screenWidth = capture.Width
screenMu.Unlock()
}
}
}()
done := make(chan struct{})
go func() {
require.NoError(t, app.Run(ctx, app.RunParams{
Config: config.Config{},
DockerClient: dockerClient,
Screen: tcell.NewSimulationScreen(""),
err := app.Run(ctx, app.RunParams{
Config: config.Config{
// We use the mediaserver as the destination server, just because it is
// reachable from the docker network via mediaserver:1935.
Destinations: []config.Destination{
{
Name: "Local server 1",
URL: "rtmp://mediaserver:1935/live/dest1",
},
{
Name: "Local server 2",
URL: "rtmp://mediaserver:1935/live/dest2",
},
},
},
DockerClient: dockerClient,
Screen: &terminal.Screen{
Screen: screen,
Width: 160,
Height: 25,
CaptureC: screenCaptureC,
},
ClipboardAvailable: false,
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
Logger: logger,
}))
})
require.NoError(t, err)
done <- struct{}{}
}()
// For now, just launch the app and wait for a few seconds.
// This is mostly useful to verify there are no obvious data races (when
// running with -race).
// See https://github.com/rivo/tview/wiki/Concurrency.
//
// TODO: test more user journeys.
time.Sleep(time.Second * 5)
// Wait for mediaserver container to start:
time.Sleep(5 * time.Second)
// Start streaming a test video to the app:
testhelpers.StreamFLV(t, "rtmp://localhost:1935/live")
time.Sleep(10 * time.Second)
// Start destinations:
screen.PostEvent(tcell.NewEventKey(tcell.KeyRune, ' ', tcell.ModNone))
screen.PostEvent(tcell.NewEventKey(tcell.KeyDown, ' ', tcell.ModNone))
screen.PostEvent(tcell.NewEventKey(tcell.KeyRune, ' ', tcell.ModNone))
time.Sleep(15 * time.Second)
contents := getContents()
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
require.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
require.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
require.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present")
assert.Contains(t, contents[3], "sending", "expected local server 2 to be sending")
assert.Contains(t, contents[3], "healthy", "expected local server 2 to be healthy")
// Stop destinations:
screen.PostEvent(tcell.NewEventKey(tcell.KeyRune, ' ', tcell.ModNone))
screen.PostEvent(tcell.NewEventKey(tcell.KeyUp, ' ', tcell.ModNone))
screen.PostEvent(tcell.NewEventKey(tcell.KeyRune, ' ', tcell.ModNone))
time.Sleep(10 * time.Second)
contents = getContents()
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited")
require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present")
assert.Contains(t, contents[3], "exited", "expected local server 2 to have exited")
// TODO:
// - Source error
// - Destination error
// - Additional features (copy URL, etc.)
cancel()
<-done

View File

@ -1,110 +0,0 @@
//go:build integration
package mediaserver_test
import (
"fmt"
"testing"
"time"
"git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/mediaserver"
"git.netflux.io/rob/octoplex/internal/testhelpers"
"github.com/docker/docker/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const component = "mediaserver"
func TestIntegrationMediaServerStartStop(t *testing.T) {
logger := testhelpers.NewTestLogger()
apiClient, err := client.NewClientWithOpts(client.FromEnv)
require.NoError(t, err)
containerClient, err := container.NewClient(t.Context(), apiClient, logger)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, containerClient.Close()) })
running, err := containerClient.ContainerRunning(t.Context(), containerClient.ContainersWithLabels(map[string]string{container.LabelComponent: component}))
require.NoError(t, err)
assert.False(t, running)
// We need to avoid clashing with other integration tests, e.g. multiplexer.
const (
apiPort = 9999
rtmpPort = 1937
)
rtmpURL := fmt.Sprintf("rtmp://localhost:%d/live", rtmpPort)
mediaServer := mediaserver.StartActor(t.Context(), mediaserver.StartActorParams{
RTMPPort: rtmpPort,
APIPort: apiPort,
FetchIngressStateInterval: 500 * time.Millisecond,
ChanSize: 1,
ContainerClient: containerClient,
Logger: logger,
})
require.NoError(t, err)
t.Cleanup(func() { mediaServer.Close() })
testhelpers.ChanDiscard(mediaServer.C())
require.Eventually(
t,
func() bool {
running, err = containerClient.ContainerRunning(t.Context(), containerClient.ContainersWithLabels(map[string]string{container.LabelComponent: component}))
return err == nil && running
},
time.Second*10,
time.Second,
"container not in RUNNING state",
)
state := mediaServer.State()
assert.False(t, state.Live)
assert.Equal(t, rtmpURL, state.RTMPURL)
testhelpers.StreamFLV(t, rtmpURL)
require.Eventually(
t,
func() bool {
currState := mediaServer.State()
return currState.Live &&
!currState.LiveChangedAt.IsZero() &&
currState.Container.HealthState == "healthy"
},
time.Second*5,
time.Second,
"actor not healthy and/or in LIVE state",
)
require.Eventually(
t,
func() bool {
currState := mediaServer.State()
return len(currState.Tracks) == 1 && currState.Tracks[0] == "H264"
},
time.Second*5,
time.Second,
"tracks not updated",
)
require.Eventually(
t,
func() bool {
currState := mediaServer.State()
return currState.Container.RxRate > 500
},
time.Second*10,
time.Second,
"rxRate not updated",
)
mediaServer.Close()
running, err = containerClient.ContainerRunning(t.Context(), containerClient.ContainersWithLabels(map[string]string{container.LabelComponent: component}))
require.NoError(t, err)
assert.False(t, running)
}

View File

@ -1,93 +0,0 @@
//go:build integration
package multiplexer_test
import (
"testing"
"time"
"git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/mediaserver"
"git.netflux.io/rob/octoplex/internal/multiplexer"
"git.netflux.io/rob/octoplex/internal/testhelpers"
"github.com/docker/docker/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const component = "multiplexer"
func TestIntegrationMultiplexer(t *testing.T) {
logger := testhelpers.NewTestLogger()
apiClient, err := client.NewClientWithOpts(client.FromEnv)
require.NoError(t, err)
containerClient, err := container.NewClient(t.Context(), apiClient, logger)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, containerClient.Close()) })
running, err := containerClient.ContainerRunning(t.Context(), containerClient.ContainersWithLabels(map[string]string{container.LabelComponent: component}))
require.NoError(t, err)
assert.False(t, running)
// We need to avoid clashing with other integration tests, e.g. mediaserver.
const (
apiPort = 9998
rtmpPort = 19350
)
srv := mediaserver.StartActor(t.Context(), mediaserver.StartActorParams{
RTMPPort: rtmpPort,
APIPort: apiPort,
FetchIngressStateInterval: 250 * time.Millisecond,
ContainerClient: containerClient,
ChanSize: 1,
Logger: logger,
})
defer srv.Close()
testhelpers.ChanDiscard(srv.C())
time.Sleep(2 * time.Second)
testhelpers.StreamFLV(t, srv.State().RTMPURL)
require.Eventually(
t,
func() bool { return srv.State().Live },
time.Second*10,
time.Second,
"source not live",
)
mp := multiplexer.NewActor(t.Context(), multiplexer.NewActorParams{
SourceURL: srv.State().RTMPInternalURL,
ChanSize: 1,
ContainerClient: containerClient,
Logger: logger,
})
defer mp.Close()
testhelpers.ChanDiscard(mp.C())
requireListeners(t, srv, 0)
mp.StartDestination("rtmp://mediaserver:19350/destination/test1")
mp.StartDestination("rtmp://mediaserver:19350/destination/test2")
mp.StartDestination("rtmp://mediaserver:19350/destination/test3")
requireListeners(t, srv, 3)
mp.StopDestination("rtmp://mediaserver:19350/destination/test3")
requireListeners(t, srv, 2)
mp.StopDestination("rtmp://mediaserver:19350/destination/test2")
mp.StopDestination("rtmp://mediaserver:19350/destination/test1")
requireListeners(t, srv, 0)
}
func requireListeners(t *testing.T, srv *mediaserver.Actor, expected int) {
require.Eventually(
t,
func() bool { return srv.State().Listeners == expected },
time.Second*10,
time.Second,
"expected %d listeners", expected,
)
}

View File

@ -5,6 +5,7 @@ import (
"context"
"fmt"
"log/slog"
"slices"
"strconv"
"strings"
"sync"
@ -44,10 +45,12 @@ type UI struct {
// tview state
app *tview.Application
pages *tview.Pages
sourceViews sourceViews
destView *tview.Table
app *tview.Application
screen tcell.Screen
screenCaptureC chan<- ScreenCapture
pages *tview.Pages
sourceViews sourceViews
destView *tview.Table
// other mutable state
@ -56,6 +59,21 @@ type UI struct {
allowQuit bool
}
// Screen represents a terminal screen. This includes its desired dimensions,
// which is required to initialize the tcell.SimulationScreen.
type Screen struct {
Screen tcell.Screen
Width, Height int
CaptureC chan<- ScreenCapture
}
// ScreenCapture represents a screen capture, which is used for integration
// testing with the tcell.SimulationScreen.
type ScreenCapture struct {
Cells []tcell.SimCell
Width, Height int
}
// StartParams contains the parameters for starting a new terminal user
// interface.
type StartParams struct {
@ -64,7 +82,7 @@ type StartParams struct {
ClipboardAvailable bool
ConfigFilePath string
BuildInfo domain.BuildInfo
Screen tcell.Screen
Screen *Screen // Screen may be nil.
}
const defaultChanSize = 64
@ -76,9 +94,17 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
app := tview.NewApplication()
// Allow the tcell screen to be overridden for integration tests. If
// params.Screen is nil, the real terminal is used.
app.SetScreen(params.Screen)
var screen tcell.Screen
var screenCaptureC chan<- ScreenCapture
if params.Screen != nil {
screen = params.Screen.Screen
screenCaptureC = params.Screen.CaptureC
// Allow the tcell screen to be overridden for integration tests. If
// params.Screen is nil, the real terminal is used.
app.SetScreen(screen)
// SetSize must be called after SetScreen:
screen.SetSize(params.Screen.Width, params.Screen.Height)
}
sidebar := tview.NewFlex()
sidebar.SetDirection(tview.FlexRow)
@ -162,11 +188,13 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
app.EnableMouse(false)
ui := &UI{
commandCh: commandCh,
buildInfo: params.BuildInfo,
logger: params.Logger,
app: app,
pages: pages,
commandCh: commandCh,
buildInfo: params.BuildInfo,
logger: params.Logger,
app: app,
screen: screen,
screenCaptureC: screenCaptureC,
pages: pages,
sourceViews: sourceViews{
url: urlTextView,
status: statusTextView,
@ -201,6 +229,10 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
return event
})
if ui.screenCaptureC != nil {
app.SetAfterDrawFunc(ui.captureScreen)
}
go ui.run(ctx)
return ui, nil
@ -297,6 +329,25 @@ func (ui *UI) AllowQuit() {
ui.allowQuit = true
}
// captureScreen captures the screen and sends it to the screenCaptureC
// channel, which must have been set in StartParams.
//
// This is required for integration testing because GetContents() must be
// called inside the tview goroutine to avoid data races.
func (ui *UI) captureScreen(screen tcell.Screen) {
simScreen, ok := screen.(tcell.SimulationScreen)
if !ok {
ui.logger.Error("simulation screen not available")
}
cells, w, h := simScreen.GetContents()
ui.screenCaptureC <- ScreenCapture{
Cells: slices.Clone(cells),
Width: w,
Height: h,
}
}
// SetState sets the state of the terminal user interface.
func (ui *UI) SetState(state domain.AppState) {
if state.Source.ExitReason != "" {

View File

@ -27,6 +27,8 @@ func StreamFLV(t *testing.T, destURL string) {
"-f", "flv",
destURL,
)
// Uncomment to view output:
// cmd.Stderr = os.Stderr
require.NoError(t, cmd.Start())
t.Cleanup(func() {

View File

@ -16,12 +16,12 @@ alias = "ti"
[tasks.test_ci]
description = "Run tests in CI"
dir = "{{cwd}}"
run = "go test -v -count 1 -parallel 1 -race ./..."
run = "go test -v -count 1 -race ./..."
[tasks.test_integration_ci]
description = "Run integration tests in CI"
dir = "{{cwd}}"
run = "go test -v -race -tags=integration -run TestIntegration ./..."
run = "go test -v -count 1 -race -parallel 1 -tags=integration -run TestIntegration ./..."
[tasks.lint]
description = "Run linters"