491 lines
15 KiB
Go
491 lines
15 KiB
Go
//go:build integration
|
|
|
|
package app_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.netflux.io/rob/octoplex/internal/app"
|
|
"git.netflux.io/rob/octoplex/internal/config"
|
|
"git.netflux.io/rob/octoplex/internal/domain"
|
|
"git.netflux.io/rob/octoplex/internal/terminal"
|
|
"git.netflux.io/rob/octoplex/internal/testhelpers"
|
|
dockerclient "github.com/docker/docker/client"
|
|
"github.com/gdamore/tcell/v2"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/testcontainers/testcontainers-go"
|
|
"github.com/testcontainers/testcontainers-go/wait"
|
|
)
|
|
|
|
func TestIntegration(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
|
defer cancel()
|
|
|
|
destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
|
ContainerRequest: testcontainers.ContainerRequest{
|
|
Image: "bluenviron/mediamtx:latest",
|
|
Env: map[string]string{"MTX_RTMPADDRESS": ":1936"},
|
|
ExposedPorts: []string{"1936/tcp"},
|
|
WaitingFor: wait.ForListeningPort("1936/tcp"),
|
|
},
|
|
Started: true,
|
|
})
|
|
testcontainers.CleanupContainer(t, destServer)
|
|
require.NoError(t, err)
|
|
destServerPort, err := destServer.MappedPort(ctx, "1936/tcp")
|
|
require.NoError(t, err)
|
|
|
|
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
|
logger.Info("Initialised logger", "debug_level", logger.Enabled(ctx, slog.LevelDebug), "runner_debug", os.Getenv("RUNNER_DEBUG"))
|
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv)
|
|
require.NoError(t, err)
|
|
|
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
|
|
|
// https://stackoverflow.com/a/60740997/62871
|
|
if runtime.GOOS != "linux" {
|
|
panic("TODO: try host.docker.internal or Mac equivalent here")
|
|
}
|
|
const destHost = "172.17.0.1"
|
|
|
|
destURL1 := fmt.Sprintf("rtmp://%s:%d/live/dest1", destHost, destServerPort.Int())
|
|
destURL2 := fmt.Sprintf("rtmp://%s:%d/live/dest2", destHost, destServerPort.Int())
|
|
configService := setupConfigService(t, config.Config{
|
|
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true, StreamKey: "live"}},
|
|
// Load one destination from config, add the other in-app.
|
|
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
|
|
})
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
err := app.Run(ctx, app.RunParams{
|
|
ConfigService: configService,
|
|
DockerClient: dockerClient,
|
|
Screen: &terminal.Screen{
|
|
Screen: screen,
|
|
Width: 160,
|
|
Height: 25,
|
|
CaptureC: screenCaptureC,
|
|
},
|
|
ClipboardAvailable: false,
|
|
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
|
|
Logger: logger,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
done <- struct{}{}
|
|
}()
|
|
|
|
require.EventuallyWithT(
|
|
t,
|
|
func(t *assert.CollectT) {
|
|
contents := getContents()
|
|
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
|
|
assert.Contains(t, contents[2], "Status waiting", "expected mediaserver status to be waiting")
|
|
},
|
|
2*time.Minute,
|
|
time.Second,
|
|
"expected the mediaserver to start",
|
|
)
|
|
printScreen(getContents, "After starting the mediaserver")
|
|
|
|
// Start streaming a test video to the app:
|
|
testhelpers.StreamFLV(t, "rtmp://localhost:1935/live")
|
|
|
|
require.EventuallyWithT(
|
|
t,
|
|
func(t *assert.CollectT) {
|
|
contents := getContents()
|
|
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
|
|
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
|
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
|
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
|
},
|
|
time.Minute,
|
|
time.Second,
|
|
"expected to receive an ingress stream",
|
|
)
|
|
printScreen(getContents, "After receiving the ingress stream")
|
|
|
|
// Add a second destination in-app:
|
|
sendKey(screen, tcell.KeyRune, 'a')
|
|
|
|
sendBackspaces(screen, 30)
|
|
sendKeys(screen, "Local server 2")
|
|
sendKey(screen, tcell.KeyTab, ' ')
|
|
|
|
sendBackspaces(screen, 30)
|
|
sendKeys(screen, destURL2)
|
|
sendKey(screen, tcell.KeyTab, ' ')
|
|
sendKey(screen, tcell.KeyEnter, ' ')
|
|
|
|
require.EventuallyWithT(
|
|
t,
|
|
func(t *assert.CollectT) {
|
|
contents := getContents()
|
|
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
|
|
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
|
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
|
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
|
|
|
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
|
assert.Contains(t, contents[2], "off-air", "expected local server 1 to be off-air")
|
|
|
|
require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present")
|
|
assert.Contains(t, contents[3], "off-air", "expected local server 2 to be off-air")
|
|
|
|
},
|
|
2*time.Minute,
|
|
time.Second,
|
|
"expected to add the destinations",
|
|
)
|
|
printScreen(getContents, "After adding the destinations")
|
|
|
|
// Start destinations:
|
|
sendKey(screen, tcell.KeyRune, ' ')
|
|
sendKey(screen, tcell.KeyDown, ' ')
|
|
sendKey(screen, tcell.KeyRune, ' ')
|
|
|
|
require.EventuallyWithT(
|
|
t,
|
|
func(t *assert.CollectT) {
|
|
contents := getContents()
|
|
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
|
|
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
|
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
|
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
|
|
|
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
|
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
|
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
|
|
|
require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present")
|
|
assert.Contains(t, contents[3], "sending", "expected local server 2 to be sending")
|
|
assert.Contains(t, contents[3], "healthy", "expected local server 2 to be healthy")
|
|
},
|
|
2*time.Minute,
|
|
time.Second,
|
|
"expected to start the destination streams",
|
|
)
|
|
printScreen(getContents, "After starting the destination streams")
|
|
|
|
sendKey(screen, tcell.KeyRune, 'r')
|
|
sendKey(screen, tcell.KeyEnter, ' ')
|
|
|
|
require.EventuallyWithT(
|
|
t,
|
|
func(t *assert.CollectT) {
|
|
contents := getContents()
|
|
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
|
|
assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving")
|
|
assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264")
|
|
assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy")
|
|
|
|
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
|
assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending")
|
|
assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy")
|
|
|
|
require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present")
|
|
|
|
},
|
|
2*time.Minute,
|
|
time.Second,
|
|
"expected to remove the second destination",
|
|
)
|
|
printScreen(getContents, "After removing the second destination")
|
|
|
|
// Stop remaining destination.
|
|
// It is currently necessary to press down to re-focus the destination:
|
|
sendKey(screen, tcell.KeyDown, ' ')
|
|
sendKey(screen, tcell.KeyRune, ' ')
|
|
|
|
require.EventuallyWithT(
|
|
t,
|
|
func(t *assert.CollectT) {
|
|
contents := getContents()
|
|
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
|
|
require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present")
|
|
assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited")
|
|
|
|
require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present")
|
|
},
|
|
time.Minute,
|
|
time.Second,
|
|
"expected to stop the first destination stream",
|
|
)
|
|
|
|
printScreen(getContents, "After stopping the first destination")
|
|
|
|
// TODO:
|
|
// - Source error
|
|
// - Destination error
|
|
// - Additional features (copy URL, etc.)
|
|
|
|
cancel()
|
|
|
|
<-done
|
|
}
|
|
|
|
func TestIntegrationDestinationValidations(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
|
defer cancel()
|
|
|
|
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv)
|
|
require.NoError(t, err)
|
|
|
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
|
|
|
configService := setupConfigService(t, config.Config{
|
|
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true, StreamKey: "live"}},
|
|
})
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
err := app.Run(ctx, app.RunParams{
|
|
ConfigService: configService,
|
|
DockerClient: dockerClient,
|
|
Screen: &terminal.Screen{
|
|
Screen: screen,
|
|
Width: 160,
|
|
Height: 25,
|
|
CaptureC: screenCaptureC,
|
|
},
|
|
ClipboardAvailable: false,
|
|
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
|
|
Logger: logger,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
done <- struct{}{}
|
|
}()
|
|
|
|
require.EventuallyWithT(
|
|
t,
|
|
func(t *assert.CollectT) {
|
|
contents := getContents()
|
|
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
|
|
assert.Contains(t, contents[2], "Status waiting", "expected mediaserver status to be waiting")
|
|
},
|
|
2*time.Minute,
|
|
time.Second,
|
|
"expected the mediaserver to start",
|
|
)
|
|
printScreen(getContents, "After starting the mediaserver")
|
|
|
|
sendKey(screen, tcell.KeyRune, 'a')
|
|
sendKey(screen, tcell.KeyTab, ' ')
|
|
sendBackspaces(screen, 10)
|
|
sendKey(screen, tcell.KeyTab, ' ')
|
|
sendKey(screen, tcell.KeyEnter, ' ')
|
|
|
|
require.EventuallyWithT(
|
|
t,
|
|
func(t *assert.CollectT) {
|
|
contents := getContents()
|
|
|
|
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
|
assert.True(t, contentsIncludes(contents, "validate: destination URL must start with"), "expected to see config update error")
|
|
},
|
|
10*time.Second,
|
|
time.Second,
|
|
"expected a validation error for an empty URL",
|
|
)
|
|
printScreen(getContents, "After entering an empty destination URL")
|
|
|
|
sendKey(screen, tcell.KeyEnter, ' ')
|
|
sendKey(screen, tcell.KeyBacktab, ' ')
|
|
sendKeys(screen, "nope")
|
|
sendKey(screen, tcell.KeyTab, ' ')
|
|
sendKey(screen, tcell.KeyEnter, ' ')
|
|
|
|
require.EventuallyWithT(
|
|
t,
|
|
func(t *assert.CollectT) {
|
|
contents := getContents()
|
|
|
|
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
|
assert.True(t, contentsIncludes(contents, "validate: destination URL must start with"), "expected to see config update error")
|
|
},
|
|
10*time.Second,
|
|
time.Second,
|
|
"expected a validation error for an invalid URL",
|
|
)
|
|
printScreen(getContents, "After entering an invalid destination URL")
|
|
|
|
sendKey(screen, tcell.KeyEnter, ' ')
|
|
sendKey(screen, tcell.KeyBacktab, ' ')
|
|
sendBackspaces(screen, len("nope"))
|
|
sendKeys(screen, "rtmp://rtmp.youtube.com:1935/live")
|
|
sendKey(screen, tcell.KeyTab, ' ')
|
|
sendKey(screen, tcell.KeyEnter, ' ')
|
|
|
|
require.EventuallyWithT(
|
|
t,
|
|
func(t *assert.CollectT) {
|
|
contents := getContents()
|
|
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
|
|
|
|
require.Contains(t, contents[2], "My stream", "expected new destination to be present")
|
|
assert.Contains(t, contents[2], "off-air", "expected new destination to be off-air")
|
|
},
|
|
10*time.Second,
|
|
time.Second,
|
|
"expected to add the destination",
|
|
)
|
|
printScreen(getContents, "After adding the destination")
|
|
|
|
sendKey(screen, tcell.KeyRune, 'a')
|
|
sendKey(screen, tcell.KeyTab, ' ')
|
|
sendBackspaces(screen, 10)
|
|
sendKeys(screen, "rtmp://rtmp.youtube.com:1935/live")
|
|
sendKey(screen, tcell.KeyTab, ' ')
|
|
sendKey(screen, tcell.KeyEnter, ' ')
|
|
|
|
// Start streaming a test video to the app:
|
|
testhelpers.StreamFLV(t, "rtmp://localhost:1935/live")
|
|
|
|
require.EventuallyWithT(
|
|
t,
|
|
func(t *assert.CollectT) {
|
|
contents := getContents()
|
|
|
|
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
|
|
assert.True(t, contentsIncludes(contents, "validate: duplicate destination URL: rtmp://"), "expected to see config update error")
|
|
},
|
|
10*time.Second,
|
|
time.Second,
|
|
"expected a validation error for a duplicate URL",
|
|
)
|
|
printScreen(getContents, "After entering a duplicate destination URL")
|
|
cancel()
|
|
|
|
<-done
|
|
}
|
|
|
|
func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) {
|
|
// Fetching the screen contents is tricky at this level of the test pyramid,
|
|
// because we need to:
|
|
//
|
|
// 1. Somehow capture the screen contents, which is only available via the
|
|
// tcell.SimulationScreen, and...
|
|
// 2. Do so without triggering data races.
|
|
//
|
|
// We can achieve this by passing a channel into the terminal actor, which
|
|
// will send screen captures after each render. This can be stored locally
|
|
// and asserted against when needed.
|
|
var (
|
|
screenCells []tcell.SimCell
|
|
screenWidth int
|
|
screenMu sync.Mutex
|
|
)
|
|
|
|
getContents := func() []string {
|
|
screenMu.Lock()
|
|
defer screenMu.Unlock()
|
|
|
|
var lines []string
|
|
for n, _ := range screenCells {
|
|
y := n / screenWidth
|
|
|
|
if y > len(lines)-1 {
|
|
lines = append(lines, "")
|
|
}
|
|
lines[y] += string(screenCells[n].Runes[0])
|
|
}
|
|
|
|
return lines
|
|
}
|
|
|
|
t.Cleanup(func() {
|
|
if t.Failed() {
|
|
printScreen(getContents, "After failing")
|
|
}
|
|
})
|
|
|
|
screen := tcell.NewSimulationScreen("")
|
|
screenCaptureC := make(chan terminal.ScreenCapture, 1)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-t.Context().Done():
|
|
return
|
|
case capture := <-screenCaptureC:
|
|
screenMu.Lock()
|
|
screenCells = capture.Cells
|
|
screenWidth = capture.Width
|
|
screenMu.Unlock()
|
|
}
|
|
}
|
|
}()
|
|
|
|
return screen, screenCaptureC, getContents
|
|
}
|
|
|
|
func contentsIncludes(contents []string, search string) bool {
|
|
for _, line := range contents {
|
|
if strings.Contains(line, search) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func setupConfigService(t *testing.T, cfg config.Config) *config.Service {
|
|
tmpDir, err := os.MkdirTemp("", "octoplex_"+t.Name())
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { os.RemoveAll(tmpDir) })
|
|
configService, err := config.NewService(func() (string, error) { return tmpDir, nil }, 1)
|
|
require.NoError(t, err)
|
|
require.NoError(t, configService.SetConfig(cfg))
|
|
|
|
return configService
|
|
}
|
|
|
|
func printScreen(getContents func() []string, label string) {
|
|
fmt.Println(label + ":")
|
|
for _, line := range getContents() {
|
|
fmt.Println(line)
|
|
}
|
|
}
|
|
|
|
func sendKey(screen tcell.SimulationScreen, key tcell.Key, ch rune) {
|
|
screen.InjectKey(key, ch, tcell.ModNone)
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
|
|
func sendKeyShift(screen tcell.SimulationScreen, key tcell.Key, ch rune) {
|
|
screen.InjectKey(key, ch, tcell.ModShift)
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
|
|
func sendKeys(screen tcell.SimulationScreen, keys string) {
|
|
screen.InjectKeyBytes([]byte(keys))
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
|
|
func sendBackspaces(screen tcell.SimulationScreen, n int) {
|
|
for range n {
|
|
screen.InjectKey(tcell.KeyBackspace, ' ', tcell.ModNone)
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|