From d7391cd9b2670f2c2a54329e2dab1d4d5f632eb2 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Thu, 6 Mar 2025 06:31:22 +0100 Subject: [PATCH] feat(ui): debounce --- app/app.go | 6 +- container/container.go | 8 +-- mediaserver/integration_test.go | 15 +++- multiplexer/integration_test.go | 22 +++--- multiplexer/multiplexer.go | 48 ++++++++----- terminal/command.go | 19 +++-- terminal/terminal.go | 121 ++++++++++++++++++++++++-------- terminal/terminal_test.go | 47 +++++++++++++ 8 files changed, 220 insertions(+), 66 deletions(-) diff --git a/app/app.go b/app/app.go index 3fdfda8..01a5b74 100644 --- a/app/app.go +++ b/app/app.go @@ -90,8 +90,10 @@ func Run(ctx context.Context, params RunParams) error { logger.Info("Command received", "cmd", cmd.Name()) switch c := cmd.(type) { - case terminal.CommandToggleDestination: - mp.ToggleDestination(c.URL) + case terminal.CommandStartDestination: + mp.StartDestination(c.URL) + case terminal.CommandStopDestination: + mp.StopDestination(c.URL) case terminal.CommandQuit: return nil } diff --git a/container/container.go b/container/container.go index 370f232..56d73b2 100644 --- a/container/container.go +++ b/container/container.go @@ -274,7 +274,7 @@ func (a *Client) runContainerLoop( } // Race condition: the container may have already restarted. - restarting := ctr.State.Status == "restarting" || ctr.State.Status == "running" + restarting := ctr.State.Status == domain.ContainerStatusRestarting || ctr.State.Status == domain.ContainerStatusRunning containerRespC <- containerWaitResponse{WaitResponse: resp, restarting: restarting} if !restarting { @@ -303,9 +303,9 @@ func (a *Client) runContainerLoop( var containerState string if resp.restarting { - containerState = "restarting" + containerState = domain.ContainerStatusRestarting } else { - containerState = "exited" + containerState = domain.ContainerStatusExited } state.Status = containerState @@ -421,7 +421,7 @@ func (a *Client) ContainerRunning(ctx context.Context, labelOptions LabelOptions } for _, container := range containers { - if container.State == "running" || container.State == "restarting" { + if container.State == domain.ContainerStatusRunning || container.State == domain.ContainerStatusRestarting { return true, nil } } diff --git a/mediaserver/integration_test.go b/mediaserver/integration_test.go index f560556..5583c9f 100644 --- a/mediaserver/integration_test.go +++ b/mediaserver/integration_test.go @@ -3,6 +3,7 @@ package mediaserver_test import ( + "fmt" "testing" "time" @@ -29,7 +30,17 @@ func TestIntegrationMediaServerStartStop(t *testing.T) { require.NoError(t, err) assert.False(t, running) + // We need to avoid clashing with other integration tests, e.g. multiplexer. + const ( + apiPort = 9999 + rtmpPort = 1937 + ) + + rtmpURL := fmt.Sprintf("rtmp://localhost:%d/live", rtmpPort) + mediaServer := mediaserver.StartActor(t.Context(), mediaserver.StartActorParams{ + RTMPPort: rtmpPort, + APIPort: apiPort, FetchIngressStateInterval: 500 * time.Millisecond, ChanSize: 1, ContainerClient: containerClient, @@ -52,9 +63,9 @@ func TestIntegrationMediaServerStartStop(t *testing.T) { state := mediaServer.State() assert.False(t, state.Live) - assert.Equal(t, "rtmp://localhost:1935/live", state.RTMPURL) + assert.Equal(t, rtmpURL, state.RTMPURL) - testhelpers.StreamFLV(t, "rtmp://localhost:1935/live") + testhelpers.StreamFLV(t, rtmpURL) require.Eventually( t, diff --git a/multiplexer/integration_test.go b/multiplexer/integration_test.go index f75d721..0a42dfd 100644 --- a/multiplexer/integration_test.go +++ b/multiplexer/integration_test.go @@ -30,9 +30,15 @@ func TestIntegrationMultiplexer(t *testing.T) { require.NoError(t, err) assert.False(t, running) + // We need to avoid clashing with other integration tests, e.g. mediaserver. + const ( + apiPort = 9998 + rtmpPort = 19350 + ) + srv := mediaserver.StartActor(t.Context(), mediaserver.StartActorParams{ - RTMPPort: 19350, - APIPort: 9998, + RTMPPort: rtmpPort, + APIPort: apiPort, FetchIngressStateInterval: 250 * time.Millisecond, ContainerClient: containerClient, ChanSize: 1, @@ -63,16 +69,16 @@ func TestIntegrationMultiplexer(t *testing.T) { requireListeners(t, srv, 0) - mp.ToggleDestination("rtmp://mediaserver:19350/destination/test1") - mp.ToggleDestination("rtmp://mediaserver:19350/destination/test2") - mp.ToggleDestination("rtmp://mediaserver:19350/destination/test3") + mp.StartDestination("rtmp://mediaserver:19350/destination/test1") + mp.StartDestination("rtmp://mediaserver:19350/destination/test2") + mp.StartDestination("rtmp://mediaserver:19350/destination/test3") requireListeners(t, srv, 3) - mp.ToggleDestination("rtmp://mediaserver:19350/destination/test3") + mp.StopDestination("rtmp://mediaserver:19350/destination/test3") requireListeners(t, srv, 2) - mp.ToggleDestination("rtmp://mediaserver:19350/destination/test2") - mp.ToggleDestination("rtmp://mediaserver:19350/destination/test1") + mp.StopDestination("rtmp://mediaserver:19350/destination/test2") + mp.StopDestination("rtmp://mediaserver:19350/destination/test1") requireListeners(t, srv, 0) } diff --git a/multiplexer/multiplexer.go b/multiplexer/multiplexer.go index f006eb5..b25fa02 100644 --- a/multiplexer/multiplexer.go +++ b/multiplexer/multiplexer.go @@ -77,23 +77,10 @@ func NewActor(ctx context.Context, params NewActorParams) *Actor { return actor } -// ToggleDestination toggles the destination stream between on and off. -func (a *Actor) ToggleDestination(url string) { +// StartDestination starts a destination stream. +func (a *Actor) StartDestination(url string) { a.actorC <- func() { - labels := map[string]string{ - container.LabelComponent: componentName, - container.LabelURL: url, - } - if _, ok := a.currURLs[url]; ok { - a.logger.Info("Stopping live stream", "url", url) - - if err := a.containerClient.RemoveContainers(a.ctx, a.containerClient.ContainersWithLabels(labels)); err != nil { - // TODO: error handling - a.logger.Error("Failed to stop live stream", "url", url, "err", err) - } - - delete(a.currURLs, url) return } @@ -109,7 +96,10 @@ func (a *Actor) ToggleDestination(url string) { "-f", "flv", url, }, - Labels: labels, + Labels: map[string]string{ + container.LabelComponent: componentName, + container.LabelURL: url, + }, }, HostConfig: &typescontainer.HostConfig{ NetworkMode: "default", @@ -130,6 +120,30 @@ func (a *Actor) ToggleDestination(url string) { } } +// StopDestination stops a destination stream. +func (a *Actor) StopDestination(url string) { + a.actorC <- func() { + if _, ok := a.currURLs[url]; !ok { + return + } + + a.logger.Info("Stopping live stream", "url", url) + + if err := a.containerClient.RemoveContainers( + a.ctx, + a.containerClient.ContainersWithLabels(map[string]string{ + container.LabelComponent: componentName, + container.LabelURL: url, + }), + ); err != nil { + // TODO: error handling + a.logger.Error("Failed to stop live stream", "url", url, "err", err) + } + + delete(a.currURLs, url) + } +} + // destLoop is the actor loop for a destination stream. func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, errC <-chan error) { defer func() { @@ -146,7 +160,7 @@ func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, er case containerState := <-containerStateC: state.Container = containerState - if containerState.Status == "running" { + if containerState.Status == domain.ContainerStatusRunning { if hasElapsedSince(5*time.Second, containerState.RxSince) { state.Status = domain.DestinationStatusLive } else { diff --git a/terminal/command.go b/terminal/command.go index 2bac8bf..9929263 100644 --- a/terminal/command.go +++ b/terminal/command.go @@ -1,14 +1,23 @@ package terminal -// CommandToggleDestination toggles a destination from on-air to off-air, or -// vice versa. -type CommandToggleDestination struct { +// CommandStartDestination starts a destination. +type CommandStartDestination struct { URL string } // Name implements the Command interface. -func (c CommandToggleDestination) Name() string { - return "toggle_destination" +func (c CommandStartDestination) Name() string { + return "start_destination" +} + +// CommandStopDestination stops a destination. +type CommandStopDestination struct { + URL string +} + +// Name implements the Command interface. +func (c CommandStopDestination) Name() string { + return "stop_destination" } // CommandQuit quits the app. diff --git a/terminal/terminal.go b/terminal/terminal.go index a8abb1d..52b4a42 100644 --- a/terminal/terminal.go +++ b/terminal/terminal.go @@ -7,6 +7,7 @@ import ( "log/slog" "strconv" "strings" + "sync" "time" "git.netflux.io/rob/octoplex/domain" @@ -25,15 +26,33 @@ type sourceViews struct { rx *tview.TextView } +// startState represents the state of a destination from the point of view of +// the user interface: either started, starting or not started. +type startState int + +const ( + startStateNotStarted startState = iota + startStateStarting + startStateStarted +) + // UI is responsible for managing the terminal user interface. type UI struct { + commandCh chan Command + buildInfo domain.BuildInfo + logger *slog.Logger + + // tview state + app *tview.Application pages *tview.Pages - commandCh chan Command - buildInfo domain.BuildInfo - logger *slog.Logger sourceViews sourceViews destView *tview.Table + + // other mutable state + + mu sync.Mutex + urlsToStartState map[string]startState } // StartParams contains the parameters for starting a new terminal user @@ -114,6 +133,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { aboutView.SetDirection(tview.FlexRow) aboutView.SetBorder(true) aboutView.SetTitle("Actions") + aboutView.AddItem(tview.NewTextView().SetText("[Space] Toggle destination"), 1, 0, false) aboutView.AddItem(tview.NewTextView().SetText("[C] Copy ingress RTMP URL"), 1, 0, false) aboutView.AddItem(tview.NewTextView().SetText("[?] About"), 1, 0, false) @@ -125,16 +145,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { destView.SetSelectable(true, false) destView.SetWrapSelection(true, false) destView.SetSelectedStyle(tcell.StyleDefault.Foreground(tcell.ColorWhite).Background(tcell.ColorDarkSlateGrey)) - destView.SetDoneFunc(func(key tcell.Key) { - const urlCol = 1 - row, _ := destView.GetSelection() - url, ok := destView.GetCell(row, urlCol).GetReference().(string) - if !ok { - return - } - - commandCh <- CommandToggleDestination{URL: url} - }) flex := tview.NewFlex(). SetDirection(tview.FlexColumn). @@ -163,13 +173,16 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { mem: memTextView, rx: rxTextView, }, - destView: destView, + destView: destView, + urlsToStartState: make(map[string]startState), } app.SetInputCapture(func(event *tcell.EventKey) *tcell.EventKey { switch event.Key() { case tcell.KeyRune: switch event.Rune() { + case ' ': + ui.toggleDestination() case 'c', 'C': ui.copySourceURLToClipboard(params.ClipboardAvailable) case '?': @@ -249,6 +262,24 @@ func (ui *UI) ShowStartupCheckModal() bool { return <-done } +// SetState sets the state of the terminal user interface. +func (ui *UI) SetState(state domain.AppState) { + if state.Source.ExitReason != "" { + ui.handleMediaServerClosed(state.Source.ExitReason) + } + + ui.mu.Lock() + for _, dest := range state.Destinations { + ui.urlsToStartState[dest.URL] = containerStateToStartState(dest.Container.Status) + } + ui.mu.Unlock() + + // The state is mutable so can't be passed into QueueUpdateDraw, which + // passes it to another goroutine, without cloning it first. + stateClone := state.Clone() + ui.app.QueueUpdateDraw(func() { ui.redrawFromState(stateClone) }) +} + func (ui *UI) handleMediaServerClosed(exitReason string) { done := make(chan struct{}) @@ -273,20 +304,6 @@ func (ui *UI) handleMediaServerClosed(exitReason string) { <-done } -// SetState sets the state of the terminal user interface. -func (ui *UI) SetState(state domain.AppState) { - if state.Source.ExitReason != "" { - ui.handleMediaServerClosed(state.Source.ExitReason) - } - - // The state is mutable so can't be passed into QueueUpdateDraw, which - // passes it to another goroutine, without cloning it first. - stateClone := state.Clone() - ui.app.QueueUpdateDraw(func() { - ui.redrawFromState(stateClone) - }) -} - const dash = "—" const ( @@ -418,6 +435,42 @@ func (ui *UI) Close() { ui.app.Stop() } +func (ui *UI) toggleDestination() { + const urlCol = 1 + row, _ := ui.destView.GetSelection() + url, ok := ui.destView.GetCell(row, urlCol).GetReference().(string) + if !ok { + return + } + + // Communicating with the multiplexer/container client is asynchronous. To + // ensure we can limit each destination to a single container we need some + // kind of local mutable state which synchronously tracks the "start state" + // of each destination. + // + // Something about this approach feels a tiny bit hacky. Either of these + // approaches would be nicer, if one could be made to work: + // + // 1. Store the state in the *tview.Table, which would mean not recreating + // the cells on each redraw. + // 2. Piggy-back on the tview goroutine to handle synchronization, but that + // seems to introduce deadlocks and/or UI bugs. + ui.mu.Lock() + defer ui.mu.Unlock() + + ss := ui.urlsToStartState[url] + switch ss { + case startStateNotStarted: + ui.urlsToStartState[url] = startStateStarting + ui.commandCh <- CommandStartDestination{URL: url} + case startStateStarting: + // do nothing + return + case startStateStarted: + ui.commandCh <- CommandStopDestination{URL: url} + } +} + func (ui *UI) copySourceURLToClipboard(clipboardAvailable bool) { var text string if clipboardAvailable { @@ -482,6 +535,18 @@ func (ui *UI) showAbout() { ui.pages.AddPage("modal", modal, true, true) } +// comtainerStateToStartState converts a container state to a start state. +func containerStateToStartState(containerState string) startState { + switch containerState { + case domain.ContainerStatusPulling, domain.ContainerStatusCreated: + return startStateStarting + case domain.ContainerStatusRunning, domain.ContainerStatusRestarting, domain.ContainerStatusPaused, domain.ContainerStatusRemoving: + return startStateStarted + default: + return startStateNotStarted + } +} + func rightPad(s string, n int) string { if s == "" || len(s) == n { return s diff --git a/terminal/terminal_test.go b/terminal/terminal_test.go index 1720218..657e714 100644 --- a/terminal/terminal_test.go +++ b/terminal/terminal_test.go @@ -3,9 +3,56 @@ package terminal import ( "testing" + "git.netflux.io/rob/octoplex/domain" "github.com/stretchr/testify/assert" ) +func TestContainerStateToStartState(t *testing.T) { + testCases := []struct { + containerState string + want startState + }{ + { + containerState: domain.ContainerStatusPulling, + want: startStateStarting, + }, + { + containerState: domain.ContainerStatusCreated, + want: startStateStarting, + }, + { + containerState: domain.ContainerStatusRunning, + want: startStateStarted, + }, + { + containerState: domain.ContainerStatusRestarting, + want: startStateStarted, + }, + { + containerState: domain.ContainerStatusPaused, + want: startStateStarted, + }, + { + containerState: domain.ContainerStatusRemoving, + want: startStateStarted, + }, + { + containerState: domain.ContainerStatusExited, + want: startStateNotStarted, + }, + { + containerState: domain.ContainerStatusDead, + want: startStateNotStarted, + }, + } + + for _, tc := range testCases { + t.Run(tc.containerState, func(t *testing.T) { + assert.Equal(t, tc.want, containerStateToStartState(tc.containerState)) + }) + } +} + func TestRightPad(t *testing.T) { testCases := []struct { name string