feat(ui): debounce

This commit is contained in:
Rob Watson 2025-03-06 06:31:22 +01:00 committed by Rob Watson
parent e3ca34e8e0
commit d7391cd9b2
8 changed files with 220 additions and 66 deletions

View File

@ -90,8 +90,10 @@ func Run(ctx context.Context, params RunParams) error {
logger.Info("Command received", "cmd", cmd.Name())
switch c := cmd.(type) {
case terminal.CommandToggleDestination:
mp.ToggleDestination(c.URL)
case terminal.CommandStartDestination:
mp.StartDestination(c.URL)
case terminal.CommandStopDestination:
mp.StopDestination(c.URL)
case terminal.CommandQuit:
return nil
}

View File

@ -274,7 +274,7 @@ func (a *Client) runContainerLoop(
}
// Race condition: the container may have already restarted.
restarting := ctr.State.Status == "restarting" || ctr.State.Status == "running"
restarting := ctr.State.Status == domain.ContainerStatusRestarting || ctr.State.Status == domain.ContainerStatusRunning
containerRespC <- containerWaitResponse{WaitResponse: resp, restarting: restarting}
if !restarting {
@ -303,9 +303,9 @@ func (a *Client) runContainerLoop(
var containerState string
if resp.restarting {
containerState = "restarting"
containerState = domain.ContainerStatusRestarting
} else {
containerState = "exited"
containerState = domain.ContainerStatusExited
}
state.Status = containerState
@ -421,7 +421,7 @@ func (a *Client) ContainerRunning(ctx context.Context, labelOptions LabelOptions
}
for _, container := range containers {
if container.State == "running" || container.State == "restarting" {
if container.State == domain.ContainerStatusRunning || container.State == domain.ContainerStatusRestarting {
return true, nil
}
}

View File

@ -3,6 +3,7 @@
package mediaserver_test
import (
"fmt"
"testing"
"time"
@ -29,7 +30,17 @@ func TestIntegrationMediaServerStartStop(t *testing.T) {
require.NoError(t, err)
assert.False(t, running)
// We need to avoid clashing with other integration tests, e.g. multiplexer.
const (
apiPort = 9999
rtmpPort = 1937
)
rtmpURL := fmt.Sprintf("rtmp://localhost:%d/live", rtmpPort)
mediaServer := mediaserver.StartActor(t.Context(), mediaserver.StartActorParams{
RTMPPort: rtmpPort,
APIPort: apiPort,
FetchIngressStateInterval: 500 * time.Millisecond,
ChanSize: 1,
ContainerClient: containerClient,
@ -52,9 +63,9 @@ func TestIntegrationMediaServerStartStop(t *testing.T) {
state := mediaServer.State()
assert.False(t, state.Live)
assert.Equal(t, "rtmp://localhost:1935/live", state.RTMPURL)
assert.Equal(t, rtmpURL, state.RTMPURL)
testhelpers.StreamFLV(t, "rtmp://localhost:1935/live")
testhelpers.StreamFLV(t, rtmpURL)
require.Eventually(
t,

View File

@ -30,9 +30,15 @@ func TestIntegrationMultiplexer(t *testing.T) {
require.NoError(t, err)
assert.False(t, running)
// We need to avoid clashing with other integration tests, e.g. mediaserver.
const (
apiPort = 9998
rtmpPort = 19350
)
srv := mediaserver.StartActor(t.Context(), mediaserver.StartActorParams{
RTMPPort: 19350,
APIPort: 9998,
RTMPPort: rtmpPort,
APIPort: apiPort,
FetchIngressStateInterval: 250 * time.Millisecond,
ContainerClient: containerClient,
ChanSize: 1,
@ -63,16 +69,16 @@ func TestIntegrationMultiplexer(t *testing.T) {
requireListeners(t, srv, 0)
mp.ToggleDestination("rtmp://mediaserver:19350/destination/test1")
mp.ToggleDestination("rtmp://mediaserver:19350/destination/test2")
mp.ToggleDestination("rtmp://mediaserver:19350/destination/test3")
mp.StartDestination("rtmp://mediaserver:19350/destination/test1")
mp.StartDestination("rtmp://mediaserver:19350/destination/test2")
mp.StartDestination("rtmp://mediaserver:19350/destination/test3")
requireListeners(t, srv, 3)
mp.ToggleDestination("rtmp://mediaserver:19350/destination/test3")
mp.StopDestination("rtmp://mediaserver:19350/destination/test3")
requireListeners(t, srv, 2)
mp.ToggleDestination("rtmp://mediaserver:19350/destination/test2")
mp.ToggleDestination("rtmp://mediaserver:19350/destination/test1")
mp.StopDestination("rtmp://mediaserver:19350/destination/test2")
mp.StopDestination("rtmp://mediaserver:19350/destination/test1")
requireListeners(t, srv, 0)
}

View File

@ -77,23 +77,10 @@ func NewActor(ctx context.Context, params NewActorParams) *Actor {
return actor
}
// ToggleDestination toggles the destination stream between on and off.
func (a *Actor) ToggleDestination(url string) {
// StartDestination starts a destination stream.
func (a *Actor) StartDestination(url string) {
a.actorC <- func() {
labels := map[string]string{
container.LabelComponent: componentName,
container.LabelURL: url,
}
if _, ok := a.currURLs[url]; ok {
a.logger.Info("Stopping live stream", "url", url)
if err := a.containerClient.RemoveContainers(a.ctx, a.containerClient.ContainersWithLabels(labels)); err != nil {
// TODO: error handling
a.logger.Error("Failed to stop live stream", "url", url, "err", err)
}
delete(a.currURLs, url)
return
}
@ -109,7 +96,10 @@ func (a *Actor) ToggleDestination(url string) {
"-f", "flv",
url,
},
Labels: labels,
Labels: map[string]string{
container.LabelComponent: componentName,
container.LabelURL: url,
},
},
HostConfig: &typescontainer.HostConfig{
NetworkMode: "default",
@ -130,6 +120,30 @@ func (a *Actor) ToggleDestination(url string) {
}
}
// StopDestination stops a destination stream.
func (a *Actor) StopDestination(url string) {
a.actorC <- func() {
if _, ok := a.currURLs[url]; !ok {
return
}
a.logger.Info("Stopping live stream", "url", url)
if err := a.containerClient.RemoveContainers(
a.ctx,
a.containerClient.ContainersWithLabels(map[string]string{
container.LabelComponent: componentName,
container.LabelURL: url,
}),
); err != nil {
// TODO: error handling
a.logger.Error("Failed to stop live stream", "url", url, "err", err)
}
delete(a.currURLs, url)
}
}
// destLoop is the actor loop for a destination stream.
func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, errC <-chan error) {
defer func() {
@ -146,7 +160,7 @@ func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, er
case containerState := <-containerStateC:
state.Container = containerState
if containerState.Status == "running" {
if containerState.Status == domain.ContainerStatusRunning {
if hasElapsedSince(5*time.Second, containerState.RxSince) {
state.Status = domain.DestinationStatusLive
} else {

View File

@ -1,14 +1,23 @@
package terminal
// CommandToggleDestination toggles a destination from on-air to off-air, or
// vice versa.
type CommandToggleDestination struct {
// CommandStartDestination starts a destination.
type CommandStartDestination struct {
URL string
}
// Name implements the Command interface.
func (c CommandToggleDestination) Name() string {
return "toggle_destination"
func (c CommandStartDestination) Name() string {
return "start_destination"
}
// CommandStopDestination stops a destination.
type CommandStopDestination struct {
URL string
}
// Name implements the Command interface.
func (c CommandStopDestination) Name() string {
return "stop_destination"
}
// CommandQuit quits the app.

View File

@ -7,6 +7,7 @@ import (
"log/slog"
"strconv"
"strings"
"sync"
"time"
"git.netflux.io/rob/octoplex/domain"
@ -25,15 +26,33 @@ type sourceViews struct {
rx *tview.TextView
}
// startState represents the state of a destination from the point of view of
// the user interface: either started, starting or not started.
type startState int
const (
startStateNotStarted startState = iota
startStateStarting
startStateStarted
)
// UI is responsible for managing the terminal user interface.
type UI struct {
app *tview.Application
pages *tview.Pages
commandCh chan Command
buildInfo domain.BuildInfo
logger *slog.Logger
// tview state
app *tview.Application
pages *tview.Pages
sourceViews sourceViews
destView *tview.Table
// other mutable state
mu sync.Mutex
urlsToStartState map[string]startState
}
// StartParams contains the parameters for starting a new terminal user
@ -114,6 +133,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
aboutView.SetDirection(tview.FlexRow)
aboutView.SetBorder(true)
aboutView.SetTitle("Actions")
aboutView.AddItem(tview.NewTextView().SetText("[Space] Toggle destination"), 1, 0, false)
aboutView.AddItem(tview.NewTextView().SetText("[C] Copy ingress RTMP URL"), 1, 0, false)
aboutView.AddItem(tview.NewTextView().SetText("[?] About"), 1, 0, false)
@ -125,16 +145,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
destView.SetSelectable(true, false)
destView.SetWrapSelection(true, false)
destView.SetSelectedStyle(tcell.StyleDefault.Foreground(tcell.ColorWhite).Background(tcell.ColorDarkSlateGrey))
destView.SetDoneFunc(func(key tcell.Key) {
const urlCol = 1
row, _ := destView.GetSelection()
url, ok := destView.GetCell(row, urlCol).GetReference().(string)
if !ok {
return
}
commandCh <- CommandToggleDestination{URL: url}
})
flex := tview.NewFlex().
SetDirection(tview.FlexColumn).
@ -164,12 +174,15 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
rx: rxTextView,
},
destView: destView,
urlsToStartState: make(map[string]startState),
}
app.SetInputCapture(func(event *tcell.EventKey) *tcell.EventKey {
switch event.Key() {
case tcell.KeyRune:
switch event.Rune() {
case ' ':
ui.toggleDestination()
case 'c', 'C':
ui.copySourceURLToClipboard(params.ClipboardAvailable)
case '?':
@ -249,6 +262,24 @@ func (ui *UI) ShowStartupCheckModal() bool {
return <-done
}
// SetState sets the state of the terminal user interface.
func (ui *UI) SetState(state domain.AppState) {
if state.Source.ExitReason != "" {
ui.handleMediaServerClosed(state.Source.ExitReason)
}
ui.mu.Lock()
for _, dest := range state.Destinations {
ui.urlsToStartState[dest.URL] = containerStateToStartState(dest.Container.Status)
}
ui.mu.Unlock()
// The state is mutable so can't be passed into QueueUpdateDraw, which
// passes it to another goroutine, without cloning it first.
stateClone := state.Clone()
ui.app.QueueUpdateDraw(func() { ui.redrawFromState(stateClone) })
}
func (ui *UI) handleMediaServerClosed(exitReason string) {
done := make(chan struct{})
@ -273,20 +304,6 @@ func (ui *UI) handleMediaServerClosed(exitReason string) {
<-done
}
// SetState sets the state of the terminal user interface.
func (ui *UI) SetState(state domain.AppState) {
if state.Source.ExitReason != "" {
ui.handleMediaServerClosed(state.Source.ExitReason)
}
// The state is mutable so can't be passed into QueueUpdateDraw, which
// passes it to another goroutine, without cloning it first.
stateClone := state.Clone()
ui.app.QueueUpdateDraw(func() {
ui.redrawFromState(stateClone)
})
}
const dash = "—"
const (
@ -418,6 +435,42 @@ func (ui *UI) Close() {
ui.app.Stop()
}
func (ui *UI) toggleDestination() {
const urlCol = 1
row, _ := ui.destView.GetSelection()
url, ok := ui.destView.GetCell(row, urlCol).GetReference().(string)
if !ok {
return
}
// Communicating with the multiplexer/container client is asynchronous. To
// ensure we can limit each destination to a single container we need some
// kind of local mutable state which synchronously tracks the "start state"
// of each destination.
//
// Something about this approach feels a tiny bit hacky. Either of these
// approaches would be nicer, if one could be made to work:
//
// 1. Store the state in the *tview.Table, which would mean not recreating
// the cells on each redraw.
// 2. Piggy-back on the tview goroutine to handle synchronization, but that
// seems to introduce deadlocks and/or UI bugs.
ui.mu.Lock()
defer ui.mu.Unlock()
ss := ui.urlsToStartState[url]
switch ss {
case startStateNotStarted:
ui.urlsToStartState[url] = startStateStarting
ui.commandCh <- CommandStartDestination{URL: url}
case startStateStarting:
// do nothing
return
case startStateStarted:
ui.commandCh <- CommandStopDestination{URL: url}
}
}
func (ui *UI) copySourceURLToClipboard(clipboardAvailable bool) {
var text string
if clipboardAvailable {
@ -482,6 +535,18 @@ func (ui *UI) showAbout() {
ui.pages.AddPage("modal", modal, true, true)
}
// comtainerStateToStartState converts a container state to a start state.
func containerStateToStartState(containerState string) startState {
switch containerState {
case domain.ContainerStatusPulling, domain.ContainerStatusCreated:
return startStateStarting
case domain.ContainerStatusRunning, domain.ContainerStatusRestarting, domain.ContainerStatusPaused, domain.ContainerStatusRemoving:
return startStateStarted
default:
return startStateNotStarted
}
}
func rightPad(s string, n int) string {
if s == "" || len(s) == n {
return s

View File

@ -3,9 +3,56 @@ package terminal
import (
"testing"
"git.netflux.io/rob/octoplex/domain"
"github.com/stretchr/testify/assert"
)
func TestContainerStateToStartState(t *testing.T) {
testCases := []struct {
containerState string
want startState
}{
{
containerState: domain.ContainerStatusPulling,
want: startStateStarting,
},
{
containerState: domain.ContainerStatusCreated,
want: startStateStarting,
},
{
containerState: domain.ContainerStatusRunning,
want: startStateStarted,
},
{
containerState: domain.ContainerStatusRestarting,
want: startStateStarted,
},
{
containerState: domain.ContainerStatusPaused,
want: startStateStarted,
},
{
containerState: domain.ContainerStatusRemoving,
want: startStateStarted,
},
{
containerState: domain.ContainerStatusExited,
want: startStateNotStarted,
},
{
containerState: domain.ContainerStatusDead,
want: startStateNotStarted,
},
}
for _, tc := range testCases {
t.Run(tc.containerState, func(t *testing.T) {
assert.Equal(t, tc.want, containerStateToStartState(tc.containerState))
})
}
}
func TestRightPad(t *testing.T) {
testCases := []struct {
name string