feat: validate destinations

This commit is contained in:
Rob Watson 2025-03-31 20:31:16 +02:00
parent cddcb0eb4d
commit d3a6d6acdb
8 changed files with 225 additions and 27 deletions

View File

@ -117,22 +117,27 @@ func Run(ctx context.Context, params RunParams) error {
logger.Debug("Command received", "cmd", cmd.Name())
switch c := cmd.(type) {
case terminal.CommandAddDestination:
cfg.Destinations = append(cfg.Destinations, config.Destination{
newCfg := cfg
newCfg.Destinations = append(newCfg.Destinations, config.Destination{
Name: c.DestinationName,
URL: c.URL,
})
if err := params.ConfigService.SetConfig(cfg); err != nil {
// TODO: error handling
logger.Error("Failed to set config", "err", err)
if err := params.ConfigService.SetConfig(newCfg); err != nil {
logger.Error("Config update failed", "err", err)
ui.ConfigUpdateFailed(err)
continue
}
ui.DestinationAdded()
case terminal.CommandRemoveDestination:
mp.StopDestination(c.URL) // no-op if not live
cfg.Destinations = slices.DeleteFunc(cfg.Destinations, func(dest config.Destination) bool {
newCfg := cfg
newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool {
return dest.URL == c.URL
})
if err := params.ConfigService.SetConfig(cfg); err != nil {
// TODO: error handling
logger.Error("Failed to set config", "err", err)
if err := params.ConfigService.SetConfig(newCfg); err != nil {
logger.Error("Config update failed", "err", err)
ui.ConfigUpdateFailed(err)
continue
}
case terminal.CommandStartDestination:
mp.StartDestination(c.URL)

View File

@ -8,6 +8,7 @@ import (
"log/slog"
"os"
"runtime"
"strings"
"sync"
"testing"
"time"
@ -43,7 +44,7 @@ func TestIntegration(t *testing.T) {
destServerPort, err := destServer.MappedPort(ctx, "1936/tcp")
require.NoError(t, err)
logger := testhelpers.NewTestLogger().With("component", "integration")
logger := testhelpers.NewTestLogger(t).With("component", "integration")
logger.Info("Initialised logger", "debug_level", logger.Enabled(ctx, slog.LevelDebug), "runner_debug", os.Getenv("RUNNER_DEBUG"))
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv)
require.NoError(t, err)
@ -240,6 +241,144 @@ func TestIntegration(t *testing.T) {
<-done
}
func TestIntegrationDestinationValidations(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
defer cancel()
logger := testhelpers.NewTestLogger(t).With("component", "integration")
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv)
require.NoError(t, err)
screen, screenCaptureC, getContents := setupSimulationScreen(t)
configService := setupConfigService(t, config.Config{
Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true, StreamKey: "live"}},
})
done := make(chan struct{})
go func() {
err := app.Run(ctx, app.RunParams{
ConfigService: configService,
DockerClient: dockerClient,
Screen: &terminal.Screen{
Screen: screen,
Width: 160,
Height: 25,
CaptureC: screenCaptureC,
},
ClipboardAvailable: false,
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
Logger: logger,
})
require.NoError(t, err)
done <- struct{}{}
}()
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
contents := getContents()
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
assert.Contains(t, contents[2], "Status ready", "expected mediaserver status to be ready")
},
2*time.Minute,
time.Second,
"expected the mediaserver to start",
)
printScreen(getContents, "After starting the mediaserver")
sendKey(screen, tcell.KeyRune, 'a')
sendKey(screen, tcell.KeyTab, ' ')
sendBackspaces(screen, 10)
sendKey(screen, tcell.KeyTab, ' ')
sendKey(screen, tcell.KeyEnter, ' ')
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
contents := getContents()
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
assert.True(t, contentsIncludes(contents, "validate: destination URL must start with"), "expected to see config update error")
},
10*time.Second,
time.Second,
"expected a validation error for an empty URL",
)
printScreen(getContents, "After entering an empty destination URL")
sendKey(screen, tcell.KeyEnter, ' ')
sendKey(screen, tcell.KeyBacktab, ' ')
sendKeys(screen, "nope")
sendKey(screen, tcell.KeyTab, ' ')
sendKey(screen, tcell.KeyEnter, ' ')
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
contents := getContents()
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
assert.True(t, contentsIncludes(contents, "validate: destination URL must start with"), "expected to see config update error")
},
10*time.Second,
time.Second,
"expected a validation error for an invalid URL",
)
printScreen(getContents, "After entering an invalid destination URL")
sendKey(screen, tcell.KeyEnter, ' ')
sendKey(screen, tcell.KeyBacktab, ' ')
sendBackspaces(screen, len("nope"))
sendKeys(screen, "rtmp://rtmp.youtube.com:1935/live")
sendKey(screen, tcell.KeyTab, ' ')
sendKey(screen, tcell.KeyEnter, ' ')
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
contents := getContents()
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
require.Contains(t, contents[2], "My stream", "expected new destination to be present")
assert.Contains(t, contents[2], "off-air", "expected new destination to be off-air")
},
10*time.Second,
time.Second,
"expected to add the destination",
)
printScreen(getContents, "After adding the destination")
sendKey(screen, tcell.KeyRune, 'a')
sendKey(screen, tcell.KeyTab, ' ')
sendBackspaces(screen, 10)
sendKeys(screen, "rtmp://rtmp.youtube.com:1935/live")
sendKey(screen, tcell.KeyTab, ' ')
sendKey(screen, tcell.KeyEnter, ' ')
// Start streaming a test video to the app:
testhelpers.StreamFLV(t, "rtmp://localhost:1935/live")
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
contents := getContents()
assert.True(t, contentsIncludes(contents, "Configuration update failed:"), "expected to see config update error")
assert.True(t, contentsIncludes(contents, "validate: duplicate destination URL: rtmp://"), "expected to see config update error")
},
10*time.Second,
time.Second,
"expected a validation error for a duplicate URL",
)
printScreen(getContents, "After entering a duplicate destination URL")
cancel()
<-done
}
func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) {
// Fetching the screen contents is tricky at this level of the test pyramid,
// because we need to:
@ -299,6 +438,16 @@ func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- termina
return screen, screenCaptureC, getContents
}
func contentsIncludes(contents []string, search string) bool {
for _, line := range contents {
if strings.Contains(line, search) {
return true
}
}
return false
}
func setupConfigService(t *testing.T, cfg config.Config) *config.Service {
tmpDir, err := os.MkdirTemp("", "octoplex_"+t.Name())
require.NoError(t, err)
@ -321,6 +470,12 @@ func sendKey(screen tcell.SimulationScreen, key tcell.Key, ch rune) {
screen.InjectKey(key, ch, tcell.ModNone)
time.Sleep(50 * time.Millisecond)
}
func sendKeyShift(screen tcell.SimulationScreen, key tcell.Key, ch rune) {
screen.InjectKey(key, ch, tcell.ModShift)
time.Sleep(50 * time.Millisecond)
}
func sendKeys(screen tcell.SimulationScreen, keys string) {
screen.InjectKeyBytes([]byte(keys))
time.Sleep(500 * time.Millisecond)

View File

@ -96,6 +96,10 @@ func (s *Service) ReadOrCreateConfig() (cfg Config, _ error) {
// SetConfig sets the configuration to the given value and writes it to the
// file.
func (s *Service) SetConfig(cfg Config) error {
if err := validate(cfg); err != nil {
return fmt.Errorf("validate: %w", err)
}
cfgBytes, err := yaml.Marshal(cfg)
if err != nil {
return fmt.Errorf("marshal: %w", err)
@ -176,13 +180,24 @@ func (s *Service) setDefaults(cfg *Config) {
}
}
// TODO: validate URL format
func validate(cfg Config) error {
var err error
urlCounts := make(map[string]int)
for _, dest := range cfg.Destinations {
if !strings.HasPrefix(dest.URL, "rtmp://") {
err = errors.Join(err, fmt.Errorf("destination URL must start with rtmp://"))
}
urlCounts[dest.URL]++
}
for url, count := range urlCounts {
if count > 1 {
err = errors.Join(err, fmt.Errorf("duplicate destination URL: %s", url))
}
}
return err

View File

@ -22,7 +22,7 @@ import (
)
func TestClientRunContainer(t *testing.T) {
logger := testhelpers.NewTestLogger()
logger := testhelpers.NewTestLogger(t)
// channels returned by Docker's ContainerWait:
containerWaitC := make(chan dockercontainer.WaitResponse)
@ -128,7 +128,7 @@ func TestClientRunContainer(t *testing.T) {
}
func TestClientRunContainerErrorStartingContainer(t *testing.T) {
logger := testhelpers.NewTestLogger()
logger := testhelpers.NewTestLogger(t)
var dockerClient mocks.DockerClient
defer dockerClient.AssertExpectations(t)
@ -174,7 +174,7 @@ func TestClientRunContainerErrorStartingContainer(t *testing.T) {
}
func TestClientClose(t *testing.T) {
logger := testhelpers.NewTestLogger()
logger := testhelpers.NewTestLogger(t)
var dockerClient mocks.DockerClient
defer dockerClient.AssertExpectations(t)
@ -213,7 +213,7 @@ func TestClientClose(t *testing.T) {
}
func TestRemoveUnusedNetworks(t *testing.T) {
logger := testhelpers.NewTestLogger()
logger := testhelpers.NewTestLogger(t)
var dockerClient mocks.DockerClient
defer dockerClient.AssertExpectations(t)

View File

@ -22,7 +22,7 @@ func TestIntegrationClientStartStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
logger := testhelpers.NewTestLogger()
logger := testhelpers.NewTestLogger(t)
apiClient, err := client.NewClientWithOpts(client.FromEnv)
require.NoError(t, err)
containerName := "octoplex-test-" + shortid.New().String()
@ -72,7 +72,7 @@ func TestIntegrationClientRemoveContainers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
logger := testhelpers.NewTestLogger()
logger := testhelpers.NewTestLogger(t)
apiClient, err := client.NewClientWithOpts(client.FromEnv)
require.NoError(t, err)
component := "test-remove-containers"
@ -171,7 +171,7 @@ func TestContainerRestart(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
logger := testhelpers.NewTestLogger()
logger := testhelpers.NewTestLogger(t)
apiClient, err := client.NewClientWithOpts(client.FromEnv)
require.NoError(t, err)
containerName := "octoplex-test-" + shortid.New().String()

View File

@ -33,7 +33,7 @@ func TestHandleStats(t *testing.T) {
Return(dockercontainer.StatsResponseReader{Body: pr}, nil)
networkCountConfig := NetworkCountConfig{Rx: "eth0", Tx: "eth1"}
logger := testhelpers.NewTestLogger()
logger := testhelpers.NewTestLogger(t)
ch := make(chan stats)
go func() {
@ -79,7 +79,7 @@ func TestHandleStatsWithContainerRestart(t *testing.T) {
Return(dockercontainer.StatsResponseReader{Body: pr}, nil)
networkCountConfig := NetworkCountConfig{Rx: "eth1", Tx: "eth0"}
logger := testhelpers.NewTestLogger()
logger := testhelpers.NewTestLogger(t)
ch := make(chan stats)
go func() {

View File

@ -222,9 +222,11 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
app.SetInputCapture(func(event *tcell.EventKey) *tcell.EventKey {
// Special case: allow all keys except Escape to be passed to the add
// destination modal.
//
// TODO: catch Ctrl-c
if pageName, _ := pages.GetFrontPage(); pageName == pageNameAddDestination {
if event.Key() == tcell.KeyEscape {
ui.closeDestinationForm()
ui.closeAddDestinationForm()
return nil
}
@ -468,6 +470,7 @@ const (
pageNameModalClipboard = "modal-clipboard"
pageNameModalPullProgress = "modal-pull-progress"
pageNameModalRemoveDestination = "modal-remove-destination"
pageNameConfigUpdateFailed = "modal-config-update-failed"
)
func (ui *UI) showModal(pageName string, text string, buttons []string, doneFunc func(int, string)) {
@ -659,8 +662,24 @@ func (ui *UI) Close() {
ui.app.Stop()
}
func (ui *UI) ConfigUpdateFailed(err error) {
ui.app.QueueUpdateDraw(func() {
ui.showModal(
pageNameConfigUpdateFailed,
"Configuration update failed:\n\n"+err.Error(),
[]string{"Ok"},
func(int, string) {
pageName, frontPage := ui.pages.GetFrontPage()
if pageName != pageNameAddDestination {
ui.logger.Warn("Unexpected page when configuration form closed", "page", pageName)
}
ui.app.SetFocus(frontPage)
},
)
})
}
func (ui *UI) addDestination() {
// TODO: check for existing
const (
inputLen = 60
inputLabelName = "Name"
@ -687,11 +706,8 @@ func (ui *UI) addDestination() {
DestinationName: form.GetFormItemByLabel(inputLabelName).(*tview.InputField).GetText(),
URL: form.GetFormItemByLabel(inputLabelURL).(*tview.InputField).GetText(),
}
ui.closeDestinationForm()
}).
AddButton("Cancel", func() {
ui.closeDestinationForm()
}).
AddButton("Cancel", func() { ui.closeAddDestinationForm() }).
SetFieldBackgroundColor(tcell.ColorDarkSlateGrey).
SetBorder(true).
SetTitle("Add a new destination").
@ -731,7 +747,13 @@ func (ui *UI) removeDestination() {
)
}
func (ui *UI) closeDestinationForm() {
func (ui *UI) DestinationAdded() {
ui.app.QueueUpdateDraw(func() {
ui.closeAddDestinationForm()
})
}
func (ui *UI) closeAddDestinationForm() {
ui.pages.RemovePage(pageNameAddDestination)
ui.app.SetFocus(ui.destView)
}

View File

@ -3,6 +3,7 @@ package testhelpers
import (
"log/slog"
"os"
"testing"
)
// NewNopLogger returns a logger that discards all log output.
@ -11,11 +12,11 @@ func NewNopLogger() *slog.Logger {
}
// NewTestLogger returns a logger that writes to stderr.
func NewTestLogger() *slog.Logger {
func NewTestLogger(t *testing.T) *slog.Logger {
var handlerOpts slog.HandlerOptions
// RUNNER_DEBUG is used in the GitHub actions runner to enable debug logging.
if os.Getenv("DEBUG") != "" || os.Getenv("RUNNER_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
return slog.New(slog.NewTextHandler(os.Stderr, &handlerOpts))
return slog.New(slog.NewTextHandler(os.Stderr, &handlerOpts)).With("test", t.Name())
}