feat: stream names

This commit is contained in:
Rob Watson 2025-02-15 07:27:07 +01:00
parent 162fd51fb4
commit d385df32c6
8 changed files with 109 additions and 55 deletions

View File

@ -80,7 +80,6 @@ func Run(
return nil return nil
} }
case <-uiTicker.C: case <-uiTicker.C:
// TODO: update UI with current state?
updateUI() updateUI()
case serverState := <-srv.C(): case serverState := <-srv.C():
applyServerState(serverState, state) applyServerState(serverState, state)
@ -97,13 +96,15 @@ func applyServerState(serverState domain.Source, appState *domain.AppState) {
appState.Source = serverState appState.Source = serverState
} }
func applyMultiplexerState(destination domain.Destination, appState *domain.AppState) { // applyMultiplexerState applies the current multiplexer state to the app state.
func applyMultiplexerState(mpState multiplexer.State, appState *domain.AppState) {
for i, dest := range appState.Destinations { for i, dest := range appState.Destinations {
if dest.URL != destination.URL { if dest.URL != mpState.URL {
continue continue
} }
appState.Destinations[i] = destination appState.Destinations[i].Container = mpState.Container
appState.Destinations[i].Status = mpState.Status
break break
} }
@ -113,6 +114,9 @@ func applyMultiplexerState(destination domain.Destination, appState *domain.AppS
func applyConfig(cfg config.Config, appState *domain.AppState) { func applyConfig(cfg config.Config, appState *domain.AppState) {
appState.Destinations = make([]domain.Destination, 0, len(cfg.Destinations)) appState.Destinations = make([]domain.Destination, 0, len(cfg.Destinations))
for _, dest := range cfg.Destinations { for _, dest := range cfg.Destinations {
appState.Destinations = append(appState.Destinations, domain.Destination{URL: dest.URL}) appState.Destinations = append(appState.Destinations, domain.Destination{
Name: dest.Name,
URL: dest.URL,
})
} }
} }

View File

@ -15,7 +15,8 @@ const defaultLogFile = "termstream.log"
// Destination holds the configuration for a destination. // Destination holds the configuration for a destination.
type Destination struct { type Destination struct {
URL string `yaml:"url"` Name string `yaml:"name"`
URL string `yaml:"url"`
} }
// Config holds the configuration for the application. // Config holds the configuration for the application.
@ -65,6 +66,12 @@ func setDefaults(cfg *Config) {
if cfg.LogFile == "" { if cfg.LogFile == "" {
cfg.LogFile = defaultLogFile cfg.LogFile = defaultLogFile
} }
for i := range cfg.Destinations {
if strings.TrimSpace(cfg.Destinations[i].Name) == "" {
cfg.Destinations[i].Name = fmt.Sprintf("Stream %d", i+1)
}
}
} }
func validate(cfg Config) error { func validate(cfg Config) error {

View File

@ -17,6 +17,9 @@ var configComplete []byte
//go:embed testdata/no-logfile.yml //go:embed testdata/no-logfile.yml
var configNoLogfile []byte var configNoLogfile []byte
//go:embed testdata/no-name.yml
var configNoName []byte
//go:embed testdata/invalid-destination-url.yml //go:embed testdata/invalid-destination-url.yml
var configInvalidDestinationURL []byte var configInvalidDestinationURL []byte
@ -39,7 +42,10 @@ func TestConfig(t *testing.T) {
config.Config{ config.Config{
LogFile: "test.log", LogFile: "test.log",
Destinations: []config.Destination{ Destinations: []config.Destination{
{URL: "rtmp://rtmp.example.com:1935/live"}, {
Name: "my stream",
URL: "rtmp://rtmp.example.com:1935/live",
},
}, },
}, cfg) }, cfg)
}, },
@ -51,6 +57,13 @@ func TestConfig(t *testing.T) {
assert.Equal(t, "termstream.log", cfg.LogFile) assert.Equal(t, "termstream.log", cfg.LogFile)
}, },
}, },
{
name: "no name",
r: bytes.NewReader(configNoName),
want: func(t *testing.T, cfg config.Config) {
assert.Equal(t, "Stream 1", cfg.Destinations[0].Name)
},
},
{ {
name: "invalid destination URL", name: "invalid destination URL",
r: bytes.NewReader(configInvalidDestinationURL), r: bytes.NewReader(configInvalidDestinationURL),

View File

@ -1,4 +1,5 @@
--- ---
logfile: test.log logfile: test.log
destinations: destinations:
- url: rtmp://rtmp.example.com:1935/live - name: my stream
url: rtmp://rtmp.example.com:1935/live

4
config/testdata/no-name.yml vendored Normal file
View File

@ -0,0 +1,4 @@
---
logfile: test.log
destinations:
- url: rtmp://rtmp.example.com:1935/live

View File

@ -18,18 +18,19 @@ type Source struct {
ExitReason string ExitReason string
} }
type DestinationState int type DestinationStatus int
const ( const (
DestinationStateOffAir DestinationState = iota DestinationStatusOffAir DestinationStatus = iota
DestinationStateStarting DestinationStatusStarting
DestinationStateLive DestinationStatusLive
) )
// Destination is a single destination. // Destination is a single destination.
type Destination struct { type Destination struct {
Container Container Container Container
State DestinationState Status DestinationStatus
Name string
URL string URL string
} }

View File

@ -23,6 +23,14 @@ const (
imageNameFFMPEG = "ghcr.io/jrottenberg/ffmpeg:7.1-scratch" // image name for ffmpeg imageNameFFMPEG = "ghcr.io/jrottenberg/ffmpeg:7.1-scratch" // image name for ffmpeg
) )
// State is the state of a single destination from the point of view of the
// multiplexer.
type State struct {
URL string
Container domain.Container
Status domain.DestinationStatus
}
// Actor is responsible for managing the multiplexer. // Actor is responsible for managing the multiplexer.
type Actor struct { type Actor struct {
wg sync.WaitGroup wg sync.WaitGroup
@ -32,7 +40,7 @@ type Actor struct {
containerClient *container.Client containerClient *container.Client
logger *slog.Logger logger *slog.Logger
actorC chan action actorC chan action
stateC chan domain.Destination stateC chan State
// mutable state // mutable state
currURLs map[string]struct{} currURLs map[string]struct{}
@ -60,7 +68,7 @@ func NewActor(ctx context.Context, params NewActorParams) *Actor {
containerClient: params.ContainerClient, containerClient: params.ContainerClient,
logger: params.Logger, logger: params.Logger,
actorC: make(chan action, cmp.Or(params.ChanSize, defaultChanSize)), actorC: make(chan action, cmp.Or(params.ChanSize, defaultChanSize)),
stateC: make(chan domain.Destination, cmp.Or(params.ChanSize, defaultChanSize)), stateC: make(chan State, cmp.Or(params.ChanSize, defaultChanSize)),
currURLs: make(map[string]struct{}), currURLs: make(map[string]struct{}),
} }
@ -127,7 +135,7 @@ func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, er
} }
}() }()
state := &domain.Destination{URL: url} state := &State{URL: url}
sendState := func() { a.stateC <- *state } sendState := func() { a.stateC <- *state }
for { for {
@ -137,12 +145,12 @@ func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, er
if containerState.State == "running" { if containerState.State == "running" {
if hasElapsedSince(5*time.Second, containerState.RxSince) { if hasElapsedSince(5*time.Second, containerState.RxSince) {
state.State = domain.DestinationStateLive state.Status = domain.DestinationStatusLive
} else { } else {
state.State = domain.DestinationStateStarting state.Status = domain.DestinationStatusStarting
} }
} else { } else {
state.State = domain.DestinationStateOffAir state.Status = domain.DestinationStatusOffAir
} }
sendState() sendState()
case err := <-errC: case err := <-errC:
@ -157,7 +165,7 @@ func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, er
// C returns a channel that will receive the current state of the multiplexer. // C returns a channel that will receive the current state of the multiplexer.
// The channel is never closed. // The channel is never closed.
func (a *Actor) C() <-chan domain.Destination { func (a *Actor) C() <-chan State {
return a.stateC return a.stateC
} }

View File

@ -52,8 +52,14 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) {
destView.SetWrapSelection(true, false) destView.SetWrapSelection(true, false)
destView.SetSelectedStyle(tcell.StyleDefault.Foreground(tcell.ColorWhite).Background(tcell.ColorDarkSlateGrey)) destView.SetSelectedStyle(tcell.StyleDefault.Foreground(tcell.ColorWhite).Background(tcell.ColorDarkSlateGrey))
destView.SetDoneFunc(func(key tcell.Key) { destView.SetDoneFunc(func(key tcell.Key) {
const urlCol = 1
row, _ := destView.GetSelection() row, _ := destView.GetSelection()
commandCh <- CommandToggleDestination{URL: destView.GetCell(row, 0).Text} url, ok := destView.GetCell(row, urlCol).GetReference().(string)
if !ok {
return
}
commandCh <- CommandToggleDestination{URL: url}
}) })
flex := tview.NewFlex(). flex := tview.NewFlex().
@ -180,44 +186,47 @@ func (a *Actor) redrawFromState(state domain.AppState) {
} }
setHeaderRow := func(tableView *tview.Table, txRxLabel string) { setHeaderRow := func(tableView *tview.Table, txRxLabel string) {
tableView.SetCell(0, 0, headerCell("[grey]URL", 3)) tableView.SetCell(0, 0, headerCell("[grey]Name", 3))
tableView.SetCell(0, 1, headerCell("[grey]Status", 2)) tableView.SetCell(0, 1, headerCell("[grey]URL", 3))
tableView.SetCell(0, 2, headerCell("[grey]Container", 2)) tableView.SetCell(0, 2, headerCell("[grey]Status", 2))
tableView.SetCell(0, 3, headerCell("[grey]Health", 2)) tableView.SetCell(0, 3, headerCell("[grey]Container", 2))
tableView.SetCell(0, 4, headerCell("[grey]CPU %", 1)) tableView.SetCell(0, 4, headerCell("[grey]Health", 2))
tableView.SetCell(0, 5, headerCell("[grey]Memory MB", 1)) tableView.SetCell(0, 5, headerCell("[grey]CPU %", 1))
tableView.SetCell(0, 6, headerCell("[grey]"+txRxLabel+" Kbps", 1)) tableView.SetCell(0, 6, headerCell("[grey]Memory MB", 1))
tableView.SetCell(0, 7, headerCell("[grey]Action", 2)) tableView.SetCell(0, 7, headerCell("[grey]"+txRxLabel+" Kbps", 1))
tableView.SetCell(0, 8, headerCell("[grey]Action", 2))
} }
a.sourceView.Clear() a.sourceView.Clear()
setHeaderRow(a.sourceView, "Rx") setHeaderRow(a.sourceView, "Rx")
a.sourceView.SetCell(1, 0, tview.NewTableCell(state.Source.RTMPURL)) a.sourceView.SetCell(1, 0, tview.NewTableCell("Local server"))
a.sourceView.SetCell(1, 1, tview.NewTableCell(state.Source.RTMPURL))
if state.Source.Live { if state.Source.Live {
a.sourceView.SetCell(1, 1, tview.NewTableCell("[black:green]receiving")) a.sourceView.SetCell(1, 2, tview.NewTableCell("[black:green]receiving"))
} else if state.Source.Container.State == "running" && state.Source.Container.HealthState == "healthy" { } else if state.Source.Container.State == "running" && state.Source.Container.HealthState == "healthy" {
a.sourceView.SetCell(1, 1, tview.NewTableCell("[black:yellow]ready")) a.sourceView.SetCell(1, 2, tview.NewTableCell("[black:yellow]ready"))
} else { } else {
a.sourceView.SetCell(1, 1, tview.NewTableCell("[white:red]not ready")) a.sourceView.SetCell(1, 2, tview.NewTableCell("[white:red]not ready"))
} }
a.sourceView.SetCell(1, 2, tview.NewTableCell("[white]"+state.Source.Container.State)) a.sourceView.SetCell(1, 3, tview.NewTableCell("[white]"+state.Source.Container.State))
a.sourceView.SetCell(1, 3, tview.NewTableCell("[white]"+cmp.Or(state.Source.Container.HealthState, "starting"))) a.sourceView.SetCell(1, 4, tview.NewTableCell("[white]"+cmp.Or(state.Source.Container.HealthState, "starting")))
a.sourceView.SetCell(1, 4, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", state.Source.Container.CPUPercent))) a.sourceView.SetCell(1, 5, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", state.Source.Container.CPUPercent)))
a.sourceView.SetCell(1, 5, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", float64(state.Source.Container.MemoryUsageBytes)/1024/1024))) a.sourceView.SetCell(1, 6, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", float64(state.Source.Container.MemoryUsageBytes)/1024/1024)))
a.sourceView.SetCell(1, 6, tview.NewTableCell("[white]"+fmt.Sprintf("%d", state.Source.Container.RxRate))) a.sourceView.SetCell(1, 7, tview.NewTableCell("[white]"+fmt.Sprintf("%d", state.Source.Container.RxRate)))
a.sourceView.SetCell(1, 7, tview.NewTableCell("")) a.sourceView.SetCell(1, 8, tview.NewTableCell(""))
a.destView.Clear() a.destView.Clear()
setHeaderRow(a.destView, "Tx") setHeaderRow(a.destView, "Tx")
for i, dest := range state.Destinations { for i, dest := range state.Destinations {
a.destView.SetCell(i+1, 0, tview.NewTableCell(dest.URL)) a.destView.SetCell(i+1, 0, tview.NewTableCell(dest.Name))
switch dest.State { a.destView.SetCell(i+1, 1, tview.NewTableCell(truncate(dest.URL, 50)).SetReference(dest.URL))
case domain.DestinationStateLive: switch dest.Status {
case domain.DestinationStatusLive:
a.destView.SetCell( a.destView.SetCell(
i+1, i+1,
1, 2,
tview.NewTableCell("sending"). tview.NewTableCell("sending").
SetTextColor(tcell.ColorBlack). SetTextColor(tcell.ColorBlack).
SetBackgroundColor(tcell.ColorGreen). SetBackgroundColor(tcell.ColorGreen).
@ -228,44 +237,44 @@ func (a *Actor) redrawFromState(state domain.AppState) {
Background(tcell.ColorGreen), Background(tcell.ColorGreen),
), ),
) )
case domain.DestinationStateStarting: case domain.DestinationStatusStarting:
label := "starting" label := "starting"
if dest.Container.RestartCount > 0 { if dest.Container.RestartCount > 0 {
label = fmt.Sprintf("restarting (%d)", dest.Container.RestartCount) label = fmt.Sprintf("restarting (%d)", dest.Container.RestartCount)
} }
a.destView.SetCell(i+1, 1, tview.NewTableCell("[white]"+label)) a.destView.SetCell(i+1, 2, tview.NewTableCell("[white]"+label))
case domain.DestinationStateOffAir: case domain.DestinationStatusOffAir:
a.destView.SetCell(i+1, 1, tview.NewTableCell("[white]off-air")) a.destView.SetCell(i+1, 2, tview.NewTableCell("[white]off-air"))
default: default:
panic("unknown destination state") panic("unknown destination state")
} }
a.destView.SetCell(i+1, 2, tview.NewTableCell("[white]"+cmp.Or(dest.Container.State, dash))) a.destView.SetCell(i+1, 3, tview.NewTableCell("[white]"+cmp.Or(dest.Container.State, dash)))
healthState := dash healthState := dash
if dest.State == domain.DestinationStateLive { if dest.Status == domain.DestinationStatusLive {
healthState = "healthy" healthState = "healthy"
} }
a.destView.SetCell(i+1, 3, tview.NewTableCell("[white]"+healthState)) a.destView.SetCell(i+1, 4, tview.NewTableCell("[white]"+healthState))
cpuPercent := dash cpuPercent := dash
if dest.Container.State == "running" { if dest.Container.State == "running" {
cpuPercent = fmt.Sprintf("%.1f", dest.Container.CPUPercent) cpuPercent = fmt.Sprintf("%.1f", dest.Container.CPUPercent)
} }
a.destView.SetCell(i+1, 4, tview.NewTableCell("[white]"+cpuPercent)) a.destView.SetCell(i+1, 5, tview.NewTableCell("[white]"+cpuPercent))
memoryUsage := dash memoryUsage := dash
if dest.Container.State == "running" { if dest.Container.State == "running" {
memoryUsage = fmt.Sprintf("%.1f", float64(dest.Container.MemoryUsageBytes)/1024/1024) memoryUsage = fmt.Sprintf("%.1f", float64(dest.Container.MemoryUsageBytes)/1000/1000)
} }
a.destView.SetCell(i+1, 5, tview.NewTableCell("[white]"+memoryUsage)) a.destView.SetCell(i+1, 6, tview.NewTableCell("[white]"+memoryUsage))
txRate := dash txRate := dash
if dest.Container.State == "running" { if dest.Container.State == "running" {
txRate = "[white]" + fmt.Sprintf("%d", dest.Container.TxRate) txRate = "[white]" + fmt.Sprintf("%d", dest.Container.TxRate)
} }
a.destView.SetCell(i+1, 6, tview.NewTableCell(txRate)) a.destView.SetCell(i+1, 7, tview.NewTableCell(txRate))
a.destView.SetCell(i+1, 7, tview.NewTableCell("[green]Tab to go live")) a.destView.SetCell(i+1, 8, tview.NewTableCell("[green]Tab to go live"))
} }
a.app.Draw() a.app.Draw()
@ -275,3 +284,10 @@ func (a *Actor) redrawFromState(state domain.AppState) {
func (a *Actor) Close() { func (a *Actor) Close() {
a.app.Stop() a.app.Stop()
} }
func truncate(s string, max int) string {
if len(s) <= max {
return s
}
return s[:max] + "…"
}