diff --git a/app/app.go b/app/app.go index 184795f..a3f3d01 100644 --- a/app/app.go +++ b/app/app.go @@ -80,7 +80,6 @@ func Run( return nil } case <-uiTicker.C: - // TODO: update UI with current state? updateUI() case serverState := <-srv.C(): applyServerState(serverState, state) @@ -97,13 +96,15 @@ func applyServerState(serverState domain.Source, appState *domain.AppState) { appState.Source = serverState } -func applyMultiplexerState(destination domain.Destination, appState *domain.AppState) { +// applyMultiplexerState applies the current multiplexer state to the app state. +func applyMultiplexerState(mpState multiplexer.State, appState *domain.AppState) { for i, dest := range appState.Destinations { - if dest.URL != destination.URL { + if dest.URL != mpState.URL { continue } - appState.Destinations[i] = destination + appState.Destinations[i].Container = mpState.Container + appState.Destinations[i].Status = mpState.Status break } @@ -113,6 +114,9 @@ func applyMultiplexerState(destination domain.Destination, appState *domain.AppS func applyConfig(cfg config.Config, appState *domain.AppState) { appState.Destinations = make([]domain.Destination, 0, len(cfg.Destinations)) for _, dest := range cfg.Destinations { - appState.Destinations = append(appState.Destinations, domain.Destination{URL: dest.URL}) + appState.Destinations = append(appState.Destinations, domain.Destination{ + Name: dest.Name, + URL: dest.URL, + }) } } diff --git a/config/config.go b/config/config.go index 29b5f82..1693e33 100644 --- a/config/config.go +++ b/config/config.go @@ -15,7 +15,8 @@ const defaultLogFile = "termstream.log" // Destination holds the configuration for a destination. type Destination struct { - URL string `yaml:"url"` + Name string `yaml:"name"` + URL string `yaml:"url"` } // Config holds the configuration for the application. @@ -65,6 +66,12 @@ func setDefaults(cfg *Config) { if cfg.LogFile == "" { cfg.LogFile = defaultLogFile } + + for i := range cfg.Destinations { + if strings.TrimSpace(cfg.Destinations[i].Name) == "" { + cfg.Destinations[i].Name = fmt.Sprintf("Stream %d", i+1) + } + } } func validate(cfg Config) error { diff --git a/config/config_test.go b/config/config_test.go index 636bdc2..3b2861c 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -17,6 +17,9 @@ var configComplete []byte //go:embed testdata/no-logfile.yml var configNoLogfile []byte +//go:embed testdata/no-name.yml +var configNoName []byte + //go:embed testdata/invalid-destination-url.yml var configInvalidDestinationURL []byte @@ -39,7 +42,10 @@ func TestConfig(t *testing.T) { config.Config{ LogFile: "test.log", Destinations: []config.Destination{ - {URL: "rtmp://rtmp.example.com:1935/live"}, + { + Name: "my stream", + URL: "rtmp://rtmp.example.com:1935/live", + }, }, }, cfg) }, @@ -51,6 +57,13 @@ func TestConfig(t *testing.T) { assert.Equal(t, "termstream.log", cfg.LogFile) }, }, + { + name: "no name", + r: bytes.NewReader(configNoName), + want: func(t *testing.T, cfg config.Config) { + assert.Equal(t, "Stream 1", cfg.Destinations[0].Name) + }, + }, { name: "invalid destination URL", r: bytes.NewReader(configInvalidDestinationURL), diff --git a/config/testdata/complete.yml b/config/testdata/complete.yml index 6542551..8dd6a86 100644 --- a/config/testdata/complete.yml +++ b/config/testdata/complete.yml @@ -1,4 +1,5 @@ --- logfile: test.log destinations: -- url: rtmp://rtmp.example.com:1935/live +- name: my stream + url: rtmp://rtmp.example.com:1935/live diff --git a/config/testdata/no-name.yml b/config/testdata/no-name.yml new file mode 100644 index 0000000..6542551 --- /dev/null +++ b/config/testdata/no-name.yml @@ -0,0 +1,4 @@ +--- +logfile: test.log +destinations: +- url: rtmp://rtmp.example.com:1935/live diff --git a/domain/types.go b/domain/types.go index 53e0008..fc1ee33 100644 --- a/domain/types.go +++ b/domain/types.go @@ -18,18 +18,19 @@ type Source struct { ExitReason string } -type DestinationState int +type DestinationStatus int const ( - DestinationStateOffAir DestinationState = iota - DestinationStateStarting - DestinationStateLive + DestinationStatusOffAir DestinationStatus = iota + DestinationStatusStarting + DestinationStatusLive ) // Destination is a single destination. type Destination struct { Container Container - State DestinationState + Status DestinationStatus + Name string URL string } diff --git a/multiplexer/multiplexer.go b/multiplexer/multiplexer.go index 38a182c..79642a8 100644 --- a/multiplexer/multiplexer.go +++ b/multiplexer/multiplexer.go @@ -23,6 +23,14 @@ const ( imageNameFFMPEG = "ghcr.io/jrottenberg/ffmpeg:7.1-scratch" // image name for ffmpeg ) +// State is the state of a single destination from the point of view of the +// multiplexer. +type State struct { + URL string + Container domain.Container + Status domain.DestinationStatus +} + // Actor is responsible for managing the multiplexer. type Actor struct { wg sync.WaitGroup @@ -32,7 +40,7 @@ type Actor struct { containerClient *container.Client logger *slog.Logger actorC chan action - stateC chan domain.Destination + stateC chan State // mutable state currURLs map[string]struct{} @@ -60,7 +68,7 @@ func NewActor(ctx context.Context, params NewActorParams) *Actor { containerClient: params.ContainerClient, logger: params.Logger, actorC: make(chan action, cmp.Or(params.ChanSize, defaultChanSize)), - stateC: make(chan domain.Destination, cmp.Or(params.ChanSize, defaultChanSize)), + stateC: make(chan State, cmp.Or(params.ChanSize, defaultChanSize)), currURLs: make(map[string]struct{}), } @@ -127,7 +135,7 @@ func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, er } }() - state := &domain.Destination{URL: url} + state := &State{URL: url} sendState := func() { a.stateC <- *state } for { @@ -137,12 +145,12 @@ func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, er if containerState.State == "running" { if hasElapsedSince(5*time.Second, containerState.RxSince) { - state.State = domain.DestinationStateLive + state.Status = domain.DestinationStatusLive } else { - state.State = domain.DestinationStateStarting + state.Status = domain.DestinationStatusStarting } } else { - state.State = domain.DestinationStateOffAir + state.Status = domain.DestinationStatusOffAir } sendState() case err := <-errC: @@ -157,7 +165,7 @@ func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, er // C returns a channel that will receive the current state of the multiplexer. // The channel is never closed. -func (a *Actor) C() <-chan domain.Destination { +func (a *Actor) C() <-chan State { return a.stateC } diff --git a/terminal/actor.go b/terminal/actor.go index 47cfa8a..d3cf574 100644 --- a/terminal/actor.go +++ b/terminal/actor.go @@ -52,8 +52,14 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { destView.SetWrapSelection(true, false) destView.SetSelectedStyle(tcell.StyleDefault.Foreground(tcell.ColorWhite).Background(tcell.ColorDarkSlateGrey)) destView.SetDoneFunc(func(key tcell.Key) { + const urlCol = 1 row, _ := destView.GetSelection() - commandCh <- CommandToggleDestination{URL: destView.GetCell(row, 0).Text} + url, ok := destView.GetCell(row, urlCol).GetReference().(string) + if !ok { + return + } + + commandCh <- CommandToggleDestination{URL: url} }) flex := tview.NewFlex(). @@ -180,44 +186,47 @@ func (a *Actor) redrawFromState(state domain.AppState) { } setHeaderRow := func(tableView *tview.Table, txRxLabel string) { - tableView.SetCell(0, 0, headerCell("[grey]URL", 3)) - tableView.SetCell(0, 1, headerCell("[grey]Status", 2)) - tableView.SetCell(0, 2, headerCell("[grey]Container", 2)) - tableView.SetCell(0, 3, headerCell("[grey]Health", 2)) - tableView.SetCell(0, 4, headerCell("[grey]CPU %", 1)) - tableView.SetCell(0, 5, headerCell("[grey]Memory MB", 1)) - tableView.SetCell(0, 6, headerCell("[grey]"+txRxLabel+" Kbps", 1)) - tableView.SetCell(0, 7, headerCell("[grey]Action", 2)) + tableView.SetCell(0, 0, headerCell("[grey]Name", 3)) + tableView.SetCell(0, 1, headerCell("[grey]URL", 3)) + tableView.SetCell(0, 2, headerCell("[grey]Status", 2)) + tableView.SetCell(0, 3, headerCell("[grey]Container", 2)) + tableView.SetCell(0, 4, headerCell("[grey]Health", 2)) + tableView.SetCell(0, 5, headerCell("[grey]CPU %", 1)) + tableView.SetCell(0, 6, headerCell("[grey]Memory MB", 1)) + tableView.SetCell(0, 7, headerCell("[grey]"+txRxLabel+" Kbps", 1)) + tableView.SetCell(0, 8, headerCell("[grey]Action", 2)) } a.sourceView.Clear() setHeaderRow(a.sourceView, "Rx") - a.sourceView.SetCell(1, 0, tview.NewTableCell(state.Source.RTMPURL)) + a.sourceView.SetCell(1, 0, tview.NewTableCell("Local server")) + a.sourceView.SetCell(1, 1, tview.NewTableCell(state.Source.RTMPURL)) if state.Source.Live { - a.sourceView.SetCell(1, 1, tview.NewTableCell("[black:green]receiving")) + a.sourceView.SetCell(1, 2, tview.NewTableCell("[black:green]receiving")) } else if state.Source.Container.State == "running" && state.Source.Container.HealthState == "healthy" { - a.sourceView.SetCell(1, 1, tview.NewTableCell("[black:yellow]ready")) + a.sourceView.SetCell(1, 2, tview.NewTableCell("[black:yellow]ready")) } else { - a.sourceView.SetCell(1, 1, tview.NewTableCell("[white:red]not ready")) + a.sourceView.SetCell(1, 2, tview.NewTableCell("[white:red]not ready")) } - a.sourceView.SetCell(1, 2, tview.NewTableCell("[white]"+state.Source.Container.State)) - a.sourceView.SetCell(1, 3, tview.NewTableCell("[white]"+cmp.Or(state.Source.Container.HealthState, "starting"))) - a.sourceView.SetCell(1, 4, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", state.Source.Container.CPUPercent))) - a.sourceView.SetCell(1, 5, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", float64(state.Source.Container.MemoryUsageBytes)/1024/1024))) - a.sourceView.SetCell(1, 6, tview.NewTableCell("[white]"+fmt.Sprintf("%d", state.Source.Container.RxRate))) - a.sourceView.SetCell(1, 7, tview.NewTableCell("")) + a.sourceView.SetCell(1, 3, tview.NewTableCell("[white]"+state.Source.Container.State)) + a.sourceView.SetCell(1, 4, tview.NewTableCell("[white]"+cmp.Or(state.Source.Container.HealthState, "starting"))) + a.sourceView.SetCell(1, 5, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", state.Source.Container.CPUPercent))) + a.sourceView.SetCell(1, 6, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", float64(state.Source.Container.MemoryUsageBytes)/1024/1024))) + a.sourceView.SetCell(1, 7, tview.NewTableCell("[white]"+fmt.Sprintf("%d", state.Source.Container.RxRate))) + a.sourceView.SetCell(1, 8, tview.NewTableCell("")) a.destView.Clear() setHeaderRow(a.destView, "Tx") for i, dest := range state.Destinations { - a.destView.SetCell(i+1, 0, tview.NewTableCell(dest.URL)) - switch dest.State { - case domain.DestinationStateLive: + a.destView.SetCell(i+1, 0, tview.NewTableCell(dest.Name)) + a.destView.SetCell(i+1, 1, tview.NewTableCell(truncate(dest.URL, 50)).SetReference(dest.URL)) + switch dest.Status { + case domain.DestinationStatusLive: a.destView.SetCell( i+1, - 1, + 2, tview.NewTableCell("sending"). SetTextColor(tcell.ColorBlack). SetBackgroundColor(tcell.ColorGreen). @@ -228,44 +237,44 @@ func (a *Actor) redrawFromState(state domain.AppState) { Background(tcell.ColorGreen), ), ) - case domain.DestinationStateStarting: + case domain.DestinationStatusStarting: label := "starting" if dest.Container.RestartCount > 0 { label = fmt.Sprintf("restarting (%d)", dest.Container.RestartCount) } - a.destView.SetCell(i+1, 1, tview.NewTableCell("[white]"+label)) - case domain.DestinationStateOffAir: - a.destView.SetCell(i+1, 1, tview.NewTableCell("[white]off-air")) + a.destView.SetCell(i+1, 2, tview.NewTableCell("[white]"+label)) + case domain.DestinationStatusOffAir: + a.destView.SetCell(i+1, 2, tview.NewTableCell("[white]off-air")) default: panic("unknown destination state") } - a.destView.SetCell(i+1, 2, tview.NewTableCell("[white]"+cmp.Or(dest.Container.State, dash))) + a.destView.SetCell(i+1, 3, tview.NewTableCell("[white]"+cmp.Or(dest.Container.State, dash))) healthState := dash - if dest.State == domain.DestinationStateLive { + if dest.Status == domain.DestinationStatusLive { healthState = "healthy" } - a.destView.SetCell(i+1, 3, tview.NewTableCell("[white]"+healthState)) + a.destView.SetCell(i+1, 4, tview.NewTableCell("[white]"+healthState)) cpuPercent := dash if dest.Container.State == "running" { cpuPercent = fmt.Sprintf("%.1f", dest.Container.CPUPercent) } - a.destView.SetCell(i+1, 4, tview.NewTableCell("[white]"+cpuPercent)) + a.destView.SetCell(i+1, 5, tview.NewTableCell("[white]"+cpuPercent)) memoryUsage := dash if dest.Container.State == "running" { - memoryUsage = fmt.Sprintf("%.1f", float64(dest.Container.MemoryUsageBytes)/1024/1024) + memoryUsage = fmt.Sprintf("%.1f", float64(dest.Container.MemoryUsageBytes)/1000/1000) } - a.destView.SetCell(i+1, 5, tview.NewTableCell("[white]"+memoryUsage)) + a.destView.SetCell(i+1, 6, tview.NewTableCell("[white]"+memoryUsage)) txRate := dash if dest.Container.State == "running" { txRate = "[white]" + fmt.Sprintf("%d", dest.Container.TxRate) } - a.destView.SetCell(i+1, 6, tview.NewTableCell(txRate)) + a.destView.SetCell(i+1, 7, tview.NewTableCell(txRate)) - a.destView.SetCell(i+1, 7, tview.NewTableCell("[green]Tab to go live")) + a.destView.SetCell(i+1, 8, tview.NewTableCell("[green]Tab to go live")) } a.app.Draw() @@ -275,3 +284,10 @@ func (a *Actor) redrawFromState(state domain.AppState) { func (a *Actor) Close() { a.app.Stop() } + +func truncate(s string, max int) string { + if len(s) <= max { + return s + } + return s[:max] + "…" +}