From 7edb975b8e74c3b1f72bc8de52f6d6f816625d98 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Fri, 28 Mar 2025 06:31:54 +0100 Subject: [PATCH] feat(ui): add and remove destinations --- internal/app/app.go | 71 ++++++++--- internal/app/app_test.go | 71 +++++++++++ internal/app/integration_test.go | 203 +++++++++++++++++++++++++------ internal/config/service.go | 103 ++++++++++++---- internal/config/service_test.go | 37 +++++- internal/config/xdg.go | 2 +- internal/terminal/command.go | 21 ++++ internal/terminal/terminal.go | 155 ++++++++++++++++++----- main.go | 2 +- 9 files changed, 555 insertions(+), 110 deletions(-) create mode 100644 internal/app/app_test.go diff --git a/internal/app/app.go b/internal/app/app.go index e5a64a6..dbed4d4 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "slices" "time" "git.netflux.io/rob/octoplex/internal/config" @@ -17,7 +18,7 @@ import ( // RunParams holds the parameters for running the application. type RunParams struct { - Config config.Config + ConfigService *config.Service DockerClient container.DockerClient Screen *terminal.Screen // Screen may be nil. ClipboardAvailable bool @@ -28,9 +29,15 @@ type RunParams struct { // Run starts the application, and blocks until it exits. func Run(ctx context.Context, params RunParams) error { - state := newStateFromRunParams(params) - logger := params.Logger + // cfg is the current configuration of the application, as reflected in the + // config file. + cfg := params.ConfigService.Current() + // state is the current state of the application, as reflected in the UI. + state := new(domain.AppState) + applyConfig(cfg, state) + + logger := params.Logger ui, err := terminal.StartUI(ctx, terminal.StartParams{ Screen: params.Screen, ClipboardAvailable: params.ClipboardAvailable, @@ -52,7 +59,6 @@ func Run(ctx context.Context, params RunParams) error { updateUI := func() { ui.SetState(*state) } updateUI() - // TODO: check for unused networks. var exists bool if exists, err = containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil { return fmt.Errorf("check existing containers: %w", err) @@ -71,12 +77,12 @@ func Run(ctx context.Context, params RunParams) error { ui.AllowQuit() // While RTMP is the only source, it doesn't make sense to disable it. - if !params.Config.Sources.RTMP.Enabled { + if !cfg.Sources.RTMP.Enabled { return errors.New("config: sources.rtmp.enabled must be set to true") } srv, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ - StreamKey: mediaserver.StreamKey(params.Config.Sources.RTMP.StreamKey), + StreamKey: mediaserver.StreamKey(cfg.Sources.RTMP.StreamKey), ContainerClient: containerClient, Logger: logger.With("component", "mediaserver"), }) @@ -98,6 +104,9 @@ func Run(ctx context.Context, params RunParams) error { for { select { + case cfg = <-params.ConfigService.C(): + applyConfig(cfg, state) + updateUI() case cmd, ok := <-ui.C(): if !ok { // TODO: keep UI open until all containers have closed @@ -107,6 +116,24 @@ func Run(ctx context.Context, params RunParams) error { logger.Debug("Command received", "cmd", cmd.Name()) switch c := cmd.(type) { + case terminal.CommandAddDestination: + cfg.Destinations = append(cfg.Destinations, config.Destination{ + Name: c.DestinationName, + URL: c.URL, + }) + if err := params.ConfigService.SetConfig(cfg); err != nil { + // TODO: error handling + logger.Error("Failed to set config", "err", err) + } + case terminal.CommandRemoveDestination: + mp.StopDestination(c.URL) // no-op if not live + cfg.Destinations = slices.DeleteFunc(cfg.Destinations, func(dest config.Destination) bool { + return dest.URL == c.URL + }) + if err := params.ConfigService.SetConfig(cfg); err != nil { + // TODO: error handling + logger.Error("Failed to set config", "err", err) + } case terminal.CommandStartDestination: mp.StartDestination(c.URL) case terminal.CommandStopDestination: @@ -183,17 +210,31 @@ func handleDestError(destError destinationError, mp *multiplexer.Actor, ui *term mp.StopDestination(destError.url) } -// newStateFromRunParams creates a new app state from the run parameters. -func newStateFromRunParams(params RunParams) *domain.AppState { - var state domain.AppState +// applyConfig applies the config to the app state. For now we only set the +// destinations. +func applyConfig(cfg config.Config, appState *domain.AppState) { + appState.Destinations = resolveDestinations(appState.Destinations, cfg.Destinations) +} - state.Destinations = make([]domain.Destination, 0, len(params.Config.Destinations)) - for _, dest := range params.Config.Destinations { - state.Destinations = append(state.Destinations, domain.Destination{ - Name: dest.Name, - URL: dest.URL, +// resolveDestinations merges the current destinations with newly configured +// destinations. +func resolveDestinations(destinations []domain.Destination, inDestinations []config.Destination) []domain.Destination { + destinations = slices.DeleteFunc(destinations, func(dest domain.Destination) bool { + return !slices.ContainsFunc(inDestinations, func(inDest config.Destination) bool { + return inDest.URL == dest.URL + }) + }) + + for i, inDest := range inDestinations { + if i < len(destinations) && destinations[i].URL == inDest.URL { + continue + } + + destinations = slices.Insert(destinations, i, domain.Destination{ + Name: inDest.Name, + URL: inDest.URL, }) } - return &state + return destinations[:len(inDestinations)] } diff --git a/internal/app/app_test.go b/internal/app/app_test.go new file mode 100644 index 0000000..c9985ef --- /dev/null +++ b/internal/app/app_test.go @@ -0,0 +1,71 @@ +package app + +import ( + "testing" + + "git.netflux.io/rob/octoplex/internal/config" + "git.netflux.io/rob/octoplex/internal/domain" + "github.com/stretchr/testify/assert" +) + +func TestResolveDestinations(t *testing.T) { + testCases := []struct { + name string + in []config.Destination + existing []domain.Destination + want []domain.Destination + }{ + { + name: "nil slices", + existing: nil, + want: nil, + }, + { + name: "empty slices", + existing: []domain.Destination{}, + want: []domain.Destination{}, + }, + { + name: "identical slices", + in: []config.Destination{{URL: "rtmp://rtmp.youtube.com/live"}, {URL: "rtmp://rtmp.twitch.tv/live"}, {URL: "rtmp://rtmp.facebook.com/live"}, {URL: "rtmp://rtmp.tiktok.com/live"}}, + existing: []domain.Destination{{URL: "rtmp://rtmp.youtube.com/live"}, {URL: "rtmp://rtmp.twitch.tv/live"}, {URL: "rtmp://rtmp.facebook.com/live"}, {URL: "rtmp://rtmp.tiktok.com/live"}}, + want: []domain.Destination{{URL: "rtmp://rtmp.youtube.com/live"}, {URL: "rtmp://rtmp.twitch.tv/live"}, {URL: "rtmp://rtmp.facebook.com/live"}, {URL: "rtmp://rtmp.tiktok.com/live"}}, + }, + { + name: "adding a new destination", + in: []config.Destination{{URL: "rtmp://rtmp.youtube.com/live"}, {URL: "rtmp://rtmp.twitch.tv/live"}}, + existing: []domain.Destination{{URL: "rtmp://rtmp.youtube.com/live"}}, + want: []domain.Destination{{URL: "rtmp://rtmp.youtube.com/live"}, {URL: "rtmp://rtmp.twitch.tv/live"}}, + }, + { + name: "removing a destination", + in: []config.Destination{{URL: "rtmp://rtmp.twitch.tv/live"}}, + existing: []domain.Destination{{URL: "rtmp://rtmp.youtube.com/live"}, {URL: "rtmp://rtmp.twitch.tv/live"}}, + want: []domain.Destination{{URL: "rtmp://rtmp.twitch.tv/live"}}, + }, + { + name: "switching order, two items", + in: []config.Destination{{URL: "rtmp://rtmp.twitch.tv/live"}, {URL: "rtmp://rtmp.youtube.com/live"}}, + existing: []domain.Destination{{URL: "rtmp://rtmp.youtube.com/live"}, {URL: "rtmp://rtmp.twitch.tv/live"}}, + want: []domain.Destination{{URL: "rtmp://rtmp.twitch.tv/live"}, {URL: "rtmp://rtmp.youtube.com/live"}}, + }, + { + name: "switching order, several items", + in: []config.Destination{{URL: "rtmp://rtmp.twitch.tv/live"}, {URL: "rtmp://rtmp.youtube.com/live"}, {URL: "rtmp://rtmp.facebook.com/live"}, {URL: "rtmp://rtmp.tiktok.com/live"}}, + existing: []domain.Destination{{URL: "rtmp://rtmp.youtube.com/live"}, {URL: "rtmp://rtmp.twitch.tv/live"}, {URL: "rtmp://rtmp.tiktok.com/live"}, {URL: "rtmp://rtmp.facebook.com/live"}}, + want: []domain.Destination{{URL: "rtmp://rtmp.twitch.tv/live"}, {URL: "rtmp://rtmp.youtube.com/live"}, {URL: "rtmp://rtmp.facebook.com/live"}, {URL: "rtmp://rtmp.tiktok.com/live"}}, + }, + { + name: "removing all destinations", + in: []config.Destination{}, + existing: []domain.Destination{{URL: "rtmp://rtmp.youtube.com/live"}, {URL: "rtmp://rtmp.twitch.tv/live"}, {URL: "rtmp://rtmp.facebook.com/live"}, {URL: "rtmp://rtmp.tiktok.com/live"}}, + want: []domain.Destination{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, resolveDestinations(tc.existing, tc.in)) + }) + } +} diff --git a/internal/app/integration_test.go b/internal/app/integration_test.go index ff860ff..4921ea4 100644 --- a/internal/app/integration_test.go +++ b/internal/app/integration_test.go @@ -5,6 +5,8 @@ package app_test import ( "context" "fmt" + "log/slog" + "os" "runtime" "sync" "testing" @@ -24,9 +26,12 @@ import ( ) func TestIntegration(t *testing.T) { - ctx, cancel := context.WithTimeout(t.Context(), 2*time.Minute) + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) defer cancel() + configService, err := config.NewDefaultService() + require.NoError(t, err) + destServer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ Image: "bluenviron/mediamtx:latest", @@ -42,6 +47,7 @@ func TestIntegration(t *testing.T) { require.NoError(t, err) logger := testhelpers.NewTestLogger().With("component", "integration") + logger.Info("Initialised logger", "debug_level", logger.Enabled(ctx, slog.LevelDebug), "runner_debug", os.Getenv("RUNNER_DEBUG")) dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv) require.NoError(t, err) @@ -78,6 +84,12 @@ func TestIntegration(t *testing.T) { return lines } + t.Cleanup(func() { + if t.Failed() { + printScreen(getContents, "After failing") + } + }) + screen := tcell.NewSimulationScreen("") screenCaptureC := make(chan terminal.ScreenCapture, 1) go func() { @@ -94,34 +106,32 @@ func TestIntegration(t *testing.T) { } }() + // https://stackoverflow.com/a/60740997/62871 + if runtime.GOOS != "linux" { + panic("TODO: try host.docker.internal or Mac equivalent here") + } + const destHost = "172.17.0.1" + + destURL1 := fmt.Sprintf("rtmp://%s:%d/live/dest1", destHost, destServerPort.Int()) + destURL2 := fmt.Sprintf("rtmp://%s:%d/live/dest2", destHost, destServerPort.Int()) + cfg := config.Config{ + Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true, StreamKey: "live"}}, + // Load one destination from config, add the other in-app. + Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}}, + } + + tmpDir, err := os.MkdirTemp("", "octoplex-app-test-integration") + require.NoError(t, err) + t.Cleanup(func() { os.RemoveAll(tmpDir) }) + configService, err = config.NewService(func() (string, error) { return tmpDir, nil }, 1) + require.NoError(t, err) + require.NoError(t, configService.SetConfig(cfg)) + done := make(chan struct{}) go func() { - // https://stackoverflow.com/a/60740997/62871 - if runtime.GOOS != "linux" { - panic("TODO: try host.docker.internal or Mac equivalent here") - } - const destHost = "172.17.0.1" - err := app.Run(ctx, app.RunParams{ - Config: config.Config{ - Sources: config.Sources{ - RTMP: config.RTMPSource{ - Enabled: true, - StreamKey: "live", - }, - }, - Destinations: []config.Destination{ - { - Name: "Local server 1", - URL: fmt.Sprintf("rtmp://%s:%d/live/dest1", destHost, destServerPort.Int()), - }, - { - Name: "Local server 2", - URL: fmt.Sprintf("rtmp://%s:%d/live/dest2", destHost, destServerPort.Int()), - }, - }, - }, - DockerClient: dockerClient, + ConfigService: configService, + DockerClient: dockerClient, Screen: &terminal.Screen{ Screen: screen, Width: 160, @@ -137,17 +147,79 @@ func TestIntegration(t *testing.T) { done <- struct{}{} }() - // Wait for mediaserver container to start: time.Sleep(5 * time.Second) + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + contents := getContents() + require.True(t, len(contents) > 2, "expected at least 3 lines of output") + + assert.Contains(t, contents[2], "Status ready", "expected mediaserver status to be ready") + }, + 2*time.Minute, + time.Second, + "expected the mediaserver to start", + ) + printScreen(getContents, "After starting the mediaserver") // Start streaming a test video to the app: testhelpers.StreamFLV(t, "rtmp://localhost:1935/live") - time.Sleep(10 * time.Second) + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + contents := getContents() + require.True(t, len(contents) > 2, "expected at least 3 lines of output") + + assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264") + assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy") + }, + time.Minute, + time.Second, + "expected to receive an ingress stream", + ) + printScreen(getContents, "After receiving the ingress stream") + + // Add a second destination in-app: + sendKey(screen, tcell.KeyRune, 'a') + + sendBackspaces(screen, 30) + sendKeys(screen, "Local server 2") + sendKey(screen, tcell.KeyTab, ' ') + + sendBackspaces(screen, 30) + sendKeys(screen, destURL2) + sendKey(screen, tcell.KeyTab, ' ') + sendKey(screen, tcell.KeyEnter, ' ') + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + contents := getContents() + require.True(t, len(contents) > 2, "expected at least 3 lines of output") + + assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264") + assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy") + + require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(t, contents[2], "off-air", "expected local server 1 to be off-air") + + require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present") + assert.Contains(t, contents[3], "off-air", "expected local server 2 to be off-air") + + }, + 2*time.Minute, + time.Second, + "expected to add the destinations", + ) + printScreen(getContents, "After adding the destinations") // Start destinations: - screen.PostEvent(tcell.NewEventKey(tcell.KeyRune, ' ', tcell.ModNone)) - screen.PostEvent(tcell.NewEventKey(tcell.KeyDown, ' ', tcell.ModNone)) - screen.PostEvent(tcell.NewEventKey(tcell.KeyRune, ' ', tcell.ModNone)) + sendKey(screen, tcell.KeyRune, ' ') + sendKey(screen, tcell.KeyDown, ' ') + sendKey(screen, tcell.KeyRune, ' ') require.EventuallyWithT( t, @@ -166,18 +238,43 @@ func TestIntegration(t *testing.T) { require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present") assert.Contains(t, contents[3], "sending", "expected local server 2 to be sending") assert.Contains(t, contents[3], "healthy", "expected local server 2 to be healthy") + }, + 2*time.Minute, + time.Second, + "expected to start the destination streams", + ) + printScreen(getContents, "After starting the destination streams") + + sendKey(screen, tcell.KeyRune, 'r') + sendKey(screen, tcell.KeyEnter, ' ') + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + contents := getContents() + require.True(t, len(contents) > 2, "expected at least 3 lines of output") + + assert.Contains(t, contents[2], "Status receiving", "expected mediaserver status to be receiving") + assert.Contains(t, contents[3], "Tracks H264", "expected mediaserver tracks to be H264") + assert.Contains(t, contents[4], "Health healthy", "expected mediaserver to be healthy") + + require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") + assert.Contains(t, contents[2], "sending", "expected local server 1 to be sending") + assert.Contains(t, contents[2], "healthy", "expected local server 1 to be healthy") + + require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present") }, 2*time.Minute, time.Second, + "expected to remove the second destination", ) + printScreen(getContents, "After removing the second destination") - // Stop destinations: - screen.PostEvent(tcell.NewEventKey(tcell.KeyRune, ' ', tcell.ModNone)) - screen.PostEvent(tcell.NewEventKey(tcell.KeyUp, ' ', tcell.ModNone)) - screen.PostEvent(tcell.NewEventKey(tcell.KeyRune, ' ', tcell.ModNone)) - - time.Sleep(10 * time.Second) + // Stop remaining destination. + // It is currently necessary to press down to re-focus the destination: + sendKey(screen, tcell.KeyDown, ' ') + sendKey(screen, tcell.KeyRune, ' ') require.EventuallyWithT( t, @@ -188,13 +285,15 @@ func TestIntegration(t *testing.T) { require.Contains(t, contents[2], "Local server 1", "expected local server 1 to be present") assert.Contains(t, contents[2], "exited", "expected local server 1 to have exited") - require.Contains(t, contents[3], "Local server 2", "expected local server 2 to be present") - assert.Contains(t, contents[3], "exited", "expected local server 2 to have exited") + require.NotContains(t, contents[3], "Local server 2", "expected local server 2 to not be present") }, - 2*time.Minute, + time.Minute, time.Second, + "expected to stop the first destination stream", ) + printScreen(getContents, "After stopping the first destination") + // TODO: // - Source error // - Destination error @@ -204,3 +303,27 @@ func TestIntegration(t *testing.T) { <-done } + +func printScreen(getContents func() []string, label string) { + fmt.Println(label + ":") + for _, line := range getContents() { + fmt.Println(line) + } +} + +func sendKey(screen tcell.SimulationScreen, key tcell.Key, ch rune) { + screen.InjectKey(key, ch, tcell.ModNone) + time.Sleep(50 * time.Millisecond) +} +func sendKeys(screen tcell.SimulationScreen, keys string) { + screen.InjectKeyBytes([]byte(keys)) + time.Sleep(500 * time.Millisecond) +} + +func sendBackspaces(screen tcell.SimulationScreen, n int) { + for range n { + screen.InjectKey(tcell.KeyBackspace, ' ', tcell.ModNone) + time.Sleep(50 * time.Millisecond) + } + time.Sleep(500 * time.Millisecond) +} diff --git a/internal/config/service.go b/internal/config/service.go index e0cbce8..5e199cc 100644 --- a/internal/config/service.go +++ b/internal/config/service.go @@ -17,25 +17,29 @@ var exampleConfig []byte // Service provides configuration services. type Service struct { - userConfigDir string - appConfigDir string - appStateDir string + current Config + appConfigDir string + appStateDir string + configC chan Config } // ConfigDirFunc is a function that returns the user configuration directory. type ConfigDirFunc func() (string, error) +// defaultChanSize is the default size of the configuration channel. +const defaultChanSize = 64 + // NewDefaultService creates a new service with the default configuration file // location. func NewDefaultService() (*Service, error) { - return NewService(os.UserConfigDir) + return NewService(os.UserConfigDir, defaultChanSize) } // NewService creates a new service with provided ConfigDirFunc. // // The app data directories (config and state) are created if they do not // exist. -func NewService(configDirFunc ConfigDirFunc) (*Service, error) { +func NewService(configDirFunc ConfigDirFunc, chanSize int) (*Service, error) { configDir, err := configDirFunc() if err != nil { return nil, fmt.Errorf("user config dir: %w", err) @@ -51,18 +55,37 @@ func NewService(configDirFunc ConfigDirFunc) (*Service, error) { return nil, fmt.Errorf("app state dir: %w", err) } - return &Service{ - userConfigDir: configDir, - appConfigDir: appConfigDir, - appStateDir: appStateDir, - }, nil + svc := &Service{ + appConfigDir: appConfigDir, + appStateDir: appStateDir, + configC: make(chan Config, chanSize), + } + + svc.setDefaults(&svc.current) + + return svc, nil } -// ReadOrCreateConfig reads the configuration from the file at the given path or -// creates it with default values. +// Current returns the current configuration. +// +// This will be the last-loaded or last-updated configuration, or a default +// configuration if nothing else is available. +func (s *Service) Current() Config { + return s.current +} + +// C returns a channel that receives configuration updates. +// +// The channel is never closed. +func (s *Service) C() <-chan Config { + return s.configC +} + +// ReadOrCreateConfig reads the configuration from the file or creates it with +// default values. func (s *Service) ReadOrCreateConfig() (cfg Config, _ error) { if _, err := os.Stat(s.Path()); os.IsNotExist(err) { - return s.createConfig() + return s.writeDefaultConfig() } else if err != nil { return cfg, fmt.Errorf("stat: %w", err) } @@ -70,6 +93,29 @@ func (s *Service) ReadOrCreateConfig() (cfg Config, _ error) { return s.readConfig() } +// SetConfig sets the configuration to the given value and writes it to the +// file. +func (s *Service) SetConfig(cfg Config) error { + cfgBytes, err := yaml.Marshal(cfg) + if err != nil { + return fmt.Errorf("marshal: %w", err) + } + + if err = s.writeConfig(cfgBytes); err != nil { + return fmt.Errorf("write config: %w", err) + } + + s.current = cfg + s.configC <- cfg + + return nil +} + +// Path returns the path to the configuration file. +func (s *Service) Path() string { + return filepath.Join(s.appConfigDir, "config.yaml") +} + func (s *Service) readConfig() (cfg Config, _ error) { contents, err := os.ReadFile(s.Path()) if err != nil { @@ -86,23 +132,34 @@ func (s *Service) readConfig() (cfg Config, _ error) { return cfg, err } + s.current = cfg + + return s.current, nil +} + +func (s *Service) writeDefaultConfig() (Config, error) { + var cfg Config + if err := yaml.Unmarshal(exampleConfig, &cfg); err != nil { + return cfg, fmt.Errorf("unmarshal: %w", err) + } + + if err := s.writeConfig(exampleConfig); err != nil { + return Config{}, fmt.Errorf("write config: %w", err) + } + return cfg, nil } -func (s *Service) createConfig() (Config, error) { +func (s *Service) writeConfig(cfgBytes []byte) error { if err := os.MkdirAll(s.appConfigDir, 0744); err != nil { - return Config{}, fmt.Errorf("mkdir: %w", err) + return fmt.Errorf("mkdir: %w", err) } - if err := os.WriteFile(s.Path(), exampleConfig, 0644); err != nil { - return Config{}, fmt.Errorf("write file: %w", err) + if err := os.WriteFile(s.Path(), cfgBytes, 0644); err != nil { + return fmt.Errorf("write file: %w", err) } - return Config{}, nil -} - -func (s *Service) Path() string { - return filepath.Join(s.appConfigDir, "config.yaml") + return nil } func (s *Service) setDefaults(cfg *Config) { @@ -110,6 +167,8 @@ func (s *Service) setDefaults(cfg *Config) { cfg.LogFile.Path = filepath.Join(s.appStateDir, domain.AppName+".log") } + cfg.Sources.RTMP.Enabled = true + for i := range cfg.Destinations { if strings.TrimSpace(cfg.Destinations[i].Name) == "" { cfg.Destinations[i].Name = fmt.Sprintf("Stream %d", i+1) diff --git a/internal/config/service_test.go b/internal/config/service_test.go index dce8538..301c967 100644 --- a/internal/config/service_test.go +++ b/internal/config/service_test.go @@ -31,14 +31,26 @@ var configInvalidDestinationURL []byte //go:embed testdata/multiple-invalid-destination-urls.yml var configMultipleInvalidDestinationURLs []byte +func TestConfigServiceCurrent(t *testing.T) { + suffix := "current_" + shortid.New().String() + systemConfigDirFunc := buildSystemConfigDirFunc(suffix) + systemConfigDir, _ := systemConfigDirFunc() + + service, err := config.NewService(systemConfigDirFunc, 1) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, os.RemoveAll(systemConfigDir)) }) + + // Ensure defaults are set: + assert.True(t, service.Current().Sources.RTMP.Enabled) +} + func TestConfigServiceCreateConfig(t *testing.T) { suffix := "read_or_create_" + shortid.New().String() systemConfigDirFunc := buildSystemConfigDirFunc(suffix) systemConfigDir, _ := systemConfigDirFunc() - service, err := config.NewService(systemConfigDirFunc) + service, err := config.NewService(systemConfigDirFunc, 1) require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, os.RemoveAll(systemConfigDir)) }) cfg, err := service.ReadOrCreateConfig() @@ -131,7 +143,7 @@ func TestConfigServiceReadConfig(t *testing.T) { configPath := filepath.Join(appConfigDir, "config.yaml") require.NoError(t, os.WriteFile(configPath, tc.configBytes, 0644)) - service, err := config.NewService(buildSystemConfigDirFunc(suffix)) + service, err := config.NewService(buildSystemConfigDirFunc(suffix), 1) require.NoError(t, err) cfg, err := service.ReadOrCreateConfig() @@ -146,6 +158,25 @@ func TestConfigServiceReadConfig(t *testing.T) { } } +func TestConfigServiceSetConfig(t *testing.T) { + suffix := "set_config_" + shortid.New().String() + systemConfigDirFunc := buildSystemConfigDirFunc(suffix) + systemConfigDir, _ := systemConfigDirFunc() + + service, err := config.NewService(systemConfigDirFunc, 1) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, os.RemoveAll(systemConfigDir)) }) + + cfg := config.Config{LogFile: config.LogFile{Enabled: true, Path: "test.log"}} + require.NoError(t, service.SetConfig(cfg)) + + cfg, err = service.ReadOrCreateConfig() + require.NoError(t, err) + + assert.Equal(t, "test.log", cfg.LogFile.Path) + assert.True(t, cfg.LogFile.Enabled) +} + // buildAppConfigDir returns a temporary directory which mimics // $XDG_CONFIG_HOME/octoplex. func buildAppConfigDir(suffix string) string { diff --git a/internal/config/xdg.go b/internal/config/xdg.go index 5e0e822..bce189a 100644 --- a/internal/config/xdg.go +++ b/internal/config/xdg.go @@ -28,7 +28,7 @@ func createAppStateDir() (string, error) { var dir string switch runtime.GOOS { case "darwin": - dir = filepath.Join(userHomeDir, "/Library", "Caches", domain.AppName) + dir = filepath.Join(userHomeDir, "Library", "Caches", domain.AppName) case "windows": // TODO: Windows support return "", errors.New("not implemented") diff --git a/internal/terminal/command.go b/internal/terminal/command.go index 9929263..57aeecc 100644 --- a/internal/terminal/command.go +++ b/internal/terminal/command.go @@ -1,5 +1,26 @@ package terminal +// CommandAddDestination adds a destination. +type CommandAddDestination struct { + DestinationName string + URL string +} + +// Name implements the Command interface. +func (c CommandAddDestination) Name() string { + return "add_destination" +} + +// CommandRemoveDestination removes a destination. +type CommandRemoveDestination struct { + URL string +} + +// Name implements the Command interface. +func (c CommandRemoveDestination) Name() string { + return "remove_destination" +} + // CommandStartDestination starts a destination. type CommandStartDestination struct { URL string diff --git a/internal/terminal/terminal.go b/internal/terminal/terminal.go index 664edc3..888a663 100644 --- a/internal/terminal/terminal.go +++ b/internal/terminal/terminal.go @@ -163,6 +163,8 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { aboutView.SetDirection(tview.FlexRow) aboutView.SetBorder(true) aboutView.SetTitle("Actions") + aboutView.AddItem(tview.NewTextView().SetText("[a] Add new destination"), 1, 0, false) + aboutView.AddItem(tview.NewTextView().SetText("[r] Remove destination"), 1, 0, false) aboutView.AddItem(tview.NewTextView().SetText("[Space] Toggle destination"), 1, 0, false) aboutView.AddItem(tview.NewTextView().SetText("[u] Copy ingress RTMP URL"), 1, 0, false) aboutView.AddItem(tview.NewTextView().SetText("[c] Copy config file path"), 1, 0, false) @@ -189,7 +191,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { AddItem(destView, 0, 6, false) pages := tview.NewPages() - pages.AddPage("main", flex, true, true) + pages.AddPage(pageNameMain, flex, true, true) app.SetRoot(pages, true) app.SetFocus(destView) @@ -218,9 +220,26 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { } app.SetInputCapture(func(event *tcell.EventKey) *tcell.EventKey { + // Special case: allow all keys except Escape to be passed to the add + // destination modal. + if pageName, _ := pages.GetFrontPage(); pageName == pageNameAddDestination { + if event.Key() == tcell.KeyEscape { + ui.closeDestinationForm() + return nil + } + + return event + } + switch event.Key() { case tcell.KeyRune: switch event.Rune() { + case 'a', 'A': + ui.addDestination() + return nil + case 'r', 'R': + ui.removeDestination() + return nil case ' ': ui.toggleDestination() case 'u', 'U': @@ -287,7 +306,7 @@ func (ui *UI) ShowStartupCheckModal() bool { ui.app.QueueUpdateDraw(func() { ui.showModal( - modalGroupStartupCheck, + pageNameModalStartupCheck, "Another instance of Octoplex may already be running. Pressing continue will close that instance. Continue?", []string{"Continue", "Exit"}, func(buttonIndex int, _ string) { @@ -309,7 +328,7 @@ func (ui *UI) ShowDestinationErrorModal(name string, err error) { ui.app.QueueUpdateDraw(func() { ui.showModal( - modalGroupStartupCheck, + pageNameModalStartupCheck, fmt.Sprintf( "Streaming to %s failed:\n\n%s", cmp.Or(name, "this destination"), @@ -395,7 +414,9 @@ func (ui *UI) updatePullProgress(state domain.AppState) { } if len(pullingContainers) == 0 { - ui.hideModal(modalGroupPullProgress) + ui.app.QueueUpdateDraw(func() { + ui.hideModal(pageNameModalPullProgress) + }) return } @@ -410,7 +431,7 @@ func (ui *UI) updatePullProgress(state domain.AppState) { func (ui *UI) updateProgressModal(container domain.Container) { ui.app.QueueUpdateDraw(func() { - modalName := "modal-" + string(modalGroupPullProgress) + modalName := string(pageNameModalPullProgress) var status string // Avoid showing the long Docker pull status in the modal content. @@ -434,21 +455,23 @@ func (ui *UI) updateProgressModal(container domain.Container) { }) } -// modalGroup represents a specific modal of which only one may be shown -// simultaneously. -type modalGroup string - +// page names represent a specific page in the terminal user interface. +// +// Modals should generally have a unique name, which allows them to be stacked +// on top of other modals. const ( - modalGroupAbout modalGroup = "about" - modalGroupQuit modalGroup = "quit" - modalGroupStartupCheck modalGroup = "startup-check" - modalGroupClipboard modalGroup = "clipboard" - modalGroupPullProgress modalGroup = "pull-progress" + pageNameMain = "main" + pageNameAddDestination = "add-destination" + pageNameModalAbout = "modal-about" + pageNameModalQuit = "modal-quit" + pageNameModalStartupCheck = "modal-startup-check" + pageNameModalClipboard = "modal-clipboard" + pageNameModalPullProgress = "modal-pull-progress" + pageNameModalRemoveDestination = "modal-remove-destination" ) -func (ui *UI) showModal(group modalGroup, text string, buttons []string, doneFunc func(int, string)) { - modalName := "modal-" + string(group) - if ui.pages.HasPage(modalName) { +func (ui *UI) showModal(pageName string, text string, buttons []string, doneFunc func(int, string)) { + if ui.pages.HasPage(pageName) { return } @@ -458,9 +481,9 @@ func (ui *UI) showModal(group modalGroup, text string, buttons []string, doneFun SetBackgroundColor(tcell.ColorBlack). SetTextColor(tcell.ColorWhite). SetDoneFunc(func(buttonIndex int, buttonLabel string) { - ui.pages.RemovePage(modalName) + ui.pages.RemovePage(pageName) - if ui.pages.GetPageCount() == 1 { + if name, _ := ui.pages.GetFrontPage(); name == pageNameMain { ui.app.SetFocus(ui.destView) } @@ -470,16 +493,15 @@ func (ui *UI) showModal(group modalGroup, text string, buttons []string, doneFun }). SetBorderStyle(tcell.StyleDefault.Background(tcell.ColorBlack).Foreground(tcell.ColorWhite)) - ui.pages.AddPage(modalName, modal, true, true) + ui.pages.AddPage(pageName, modal, true, true) } -func (ui *UI) hideModal(group modalGroup) { - modalName := "modal-" + string(group) - if !ui.pages.HasPage(modalName) { +func (ui *UI) hideModal(pageName string) { + if !ui.pages.HasPage(pageName) { return } - ui.pages.RemovePage(modalName) + ui.pages.RemovePage(pageName) ui.app.SetFocus(ui.destView) } @@ -637,6 +659,83 @@ func (ui *UI) Close() { ui.app.Stop() } +func (ui *UI) addDestination() { + // TODO: check for existing + const ( + inputLen = 60 + inputLabelName = "Name" + inputLabelURL = "RTMP URL" + formInnerWidth = inputLen + 8 + 1 // inputLen + length of longest label + one space + formInnerHeight = 7 // line count from first input field to last button + formWidth = formInnerWidth + 4 + formHeight = formInnerHeight + 2 + ) + + var currWidth, currHeight int + if name, frontPage := ui.pages.GetFrontPage(); name == pageNameMain { + _, _, currWidth, currHeight = frontPage.GetRect() + } else { + return + } + + form := tview.NewForm() + form. + AddInputField(inputLabelName, "My stream", inputLen, nil, nil). + AddInputField(inputLabelURL, "rtmp://", inputLen, nil, nil). + AddButton("Add", func() { + ui.commandCh <- CommandAddDestination{ + DestinationName: form.GetFormItemByLabel(inputLabelName).(*tview.InputField).GetText(), + URL: form.GetFormItemByLabel(inputLabelURL).(*tview.InputField).GetText(), + } + ui.closeDestinationForm() + }). + AddButton("Cancel", func() { + ui.closeDestinationForm() + }). + SetFieldBackgroundColor(tcell.ColorDarkSlateGrey). + SetBorder(true). + SetTitle("Add a new destination"). + SetTitleAlign(tview.AlignLeft). + SetRect((currWidth-formWidth)/2, (currHeight-formHeight)/2, formWidth, formHeight) + + ui.pages.AddPage(pageNameAddDestination, form, false, true) +} + +func (ui *UI) removeDestination() { + const urlCol = 1 + row, _ := ui.destView.GetSelection() + url, ok := ui.destView.GetCell(row, urlCol).GetReference().(string) + if !ok { + return + } + + var started bool + ui.mu.Lock() + started = ui.urlsToStartState[url] != startStateNotStarted + ui.mu.Unlock() + + text := "Are you sure you want to remove the destination?" + if started { + text += "\n\nThis will stop the current live stream for this destination." + } + + ui.showModal( + pageNameModalRemoveDestination, + text, + []string{"Remove", "Cancel"}, + func(buttonIndex int, _ string) { + if buttonIndex == 0 { + ui.commandCh <- CommandRemoveDestination{URL: url} + } + }, + ) +} + +func (ui *UI) closeDestinationForm() { + ui.pages.RemovePage(pageNameAddDestination) + ui.app.SetFocus(ui.destView) +} + func (ui *UI) toggleDestination() { const urlCol = 1 row, _ := ui.destView.GetSelection() @@ -685,7 +784,7 @@ func (ui *UI) copySourceURLToClipboard(clipboardAvailable bool) { } ui.showModal( - modalGroupClipboard, + pageNameModalClipboard, text, []string{"Ok"}, nil, @@ -706,7 +805,7 @@ func (ui *UI) copyConfigFilePathToClipboard(clipboardAvailable bool, configFileP } ui.showModal( - modalGroupClipboard, + pageNameModalClipboard, text, []string{"Ok"}, nil, @@ -724,7 +823,7 @@ func (ui *UI) confirmQuit() { } ui.showModal( - modalGroupQuit, + pageNameModalQuit, "Are you sure you want to quit?", []string{"Quit", "Cancel"}, func(buttonIndex int, _ string) { @@ -743,7 +842,7 @@ func (ui *UI) showAbout() { } ui.showModal( - modalGroupAbout, + pageNameModalAbout, fmt.Sprintf( "%s: live stream multiplexer\n(c) Rob Watson\nhttps://git.netflux.io/rob/octoplex\n\nReleased under AGPL3.\n\nv%s (%s)\nBuilt on %s (%s).", domain.AppName, diff --git a/main.go b/main.go index 5789a68..0823112 100644 --- a/main.go +++ b/main.go @@ -89,7 +89,7 @@ func run(ctx context.Context) error { return app.Run( ctx, app.RunParams{ - Config: cfg, + ConfigService: configService, DockerClient: dockerClient, ClipboardAvailable: clipboardAvailable, ConfigFilePath: configService.Path(),