From 9c7989018ba275dcad380ff01223212f4a90a6ae Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Mon, 27 Jan 2025 21:21:10 +0100 Subject: [PATCH] refactor: container logic --- container/container.go | 298 ++++++++++++++++-------------------- container/container_test.go | 36 ++--- domain/types.go | 14 +- main.go | 22 +-- mediaserver/actor.go | 63 ++++---- mediaserver/actor_test.go | 10 +- terminal/actor.go | 19 ++- testhelpers/logging.go | 9 ++ 8 files changed, 221 insertions(+), 250 deletions(-) diff --git a/container/container.go b/container/container.go index f9468fc..da64f4c 100644 --- a/container/container.go +++ b/container/container.go @@ -1,7 +1,6 @@ package container import ( - "cmp" "context" "encoding/json" "errors" @@ -22,36 +21,22 @@ import ( "github.com/google/uuid" ) -var ( - stopTimeout = 10 * time.Second - defaultChanSize = 64 -) +// stopTimeout is the timeout for stopping a container. +var stopTimeout = 10 * time.Second -type action func() - -// Actor is an actor that provides a thin wrapper around the Docker API client, -// and provides additional functionality such as exposing container stats. -type Actor struct { +// Client provides a thin wrapper around the Docker API client, and provides +// additional functionality such as exposing container stats. +type Client struct { id uuid.UUID ctx context.Context cancel context.CancelFunc - ch chan action wg sync.WaitGroup apiClient *client.Client logger *slog.Logger - - // mutable state - containers map[string]*domain.Container } -// NewActorParams are the parameters for creating a new Actor. -type NewActorParams struct { - ChanSize int - Logger *slog.Logger -} - -// NewActor creates a new Actor. -func NewActor(ctx context.Context, params NewActorParams) (*Actor, error) { +// NewClient creates a new Client. +func NewClient(ctx context.Context, logger *slog.Logger) (*Client, error) { apiClient, err := client.NewClientWithOpts(client.FromEnv) if err != nil { return nil, err @@ -59,144 +44,111 @@ func NewActor(ctx context.Context, params NewActorParams) (*Actor, error) { ctx, cancel := context.WithCancel(ctx) - client := &Actor{ - id: uuid.New(), - ctx: ctx, - cancel: cancel, - ch: make(chan action, cmp.Or(params.ChanSize, defaultChanSize)), - apiClient: apiClient, - logger: params.Logger, - containers: make(map[string]*domain.Container), + client := &Client{ + id: uuid.New(), + ctx: ctx, + cancel: cancel, + apiClient: apiClient, + logger: logger, } - client.wg.Add(1) - go func() { - defer client.wg.Done() - - client.dockerEventLoop() - }() - - go client.actorLoop() - return client, nil } -// actorLoop is the main loop for the client. -// -// It continues to run until the internal channel is closed, which only happens -// when [Close] is called. This means that it is not reliant on any context -// remaining open, and any method calls made after [Close] may deadlock or -// fail. -func (a *Actor) actorLoop() { - for action := range a.ch { - action() - } +// stats is a struct to hold container stats. +type stats struct { + cpuPercent float64 + memoryUsageBytes uint64 } -func (a *Actor) handleStats(id string) { - statsReader, err := a.apiClient.ContainerStats(a.ctx, id, true) - if err != nil { - // TODO: error handling? - a.logger.Error("Error getting container stats", "err", err, "id", shortID(id)) - return - } - defer statsReader.Body.Close() +// getStats returns a channel that will receive container stats. The channel is +// never closed, but the spawned goroutine will exit when the context is +// cancelled. +func (a *Client) getStats(containerID string) <-chan stats { + ch := make(chan stats) - buf := make([]byte, 4_096) - for { - n, err := statsReader.Body.Read(buf) + go func() { + statsReader, err := a.apiClient.ContainerStats(a.ctx, containerID, true) if err != nil { - if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { - return - } - a.logger.Error("Error reading stats", "err", err, "id", shortID(id)) + // TODO: error handling? + a.logger.Error("Error getting container stats", "err", err, "id", shortID(containerID)) return } + defer statsReader.Body.Close() - var statsResp container.StatsResponse - if err = json.Unmarshal(buf[:n], &statsResp); err != nil { - a.logger.Error("Error unmarshalling stats", "err", err, "id", shortID(id)) - continue - } + buf := make([]byte, 4_096) + for { + n, err := statsReader.Body.Read(buf) + if err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + break + } + a.logger.Error("Error reading stats", "err", err, "id", shortID(containerID)) + break + } - a.ch <- func() { - ctr, ok := a.containers[id] - if !ok { - return + var statsResp container.StatsResponse + if err = json.Unmarshal(buf[:n], &statsResp); err != nil { + a.logger.Error("Error unmarshalling stats", "err", err, "id", shortID(containerID)) + break } // https://stackoverflow.com/a/30292327/62871 cpuDelta := float64(statsResp.CPUStats.CPUUsage.TotalUsage - statsResp.PreCPUStats.CPUUsage.TotalUsage) systemDelta := float64(statsResp.CPUStats.SystemUsage - statsResp.PreCPUStats.SystemUsage) - ctr.CPUPercent = (cpuDelta / systemDelta) * float64(statsResp.CPUStats.OnlineCPUs) * 100 - ctr.MemoryUsageBytes = statsResp.MemoryStats.Usage + ch <- stats{ + cpuPercent: (cpuDelta / systemDelta) * float64(statsResp.CPUStats.OnlineCPUs) * 100, + memoryUsageBytes: statsResp.MemoryStats.Usage, + } } - } + }() + + return ch } -func (a *Actor) dockerEventLoop() { - for { - ch, errCh := a.apiClient.Events(a.ctx, events.ListOptions{ +// getEvents returns a channel that will receive container events. The channel is +// never closed, but the spawned goroutine will exit when the context is +// cancelled. +func (a *Client) getEvents(containerID string) <-chan events.Message { + sendC := make(chan events.Message) + + getEvents := func() (bool, error) { + recvC, errC := a.apiClient.Events(a.ctx, events.ListOptions{ Filters: filters.NewArgs( - filters.Arg("label", "app=termstream"), - filters.Arg("label", "app-id="+a.id.String()), + filters.Arg("container", containerID), + filters.Arg("type", "container"), ), }) - select { - case <-a.ctx.Done(): - return - case evt := <-ch: - a.handleDockerEvent(evt) - case err := <-errCh: - if a.ctx.Err() != nil { - return - } + for { + select { + case <-a.ctx.Done(): + return false, a.ctx.Err() + case evt := <-recvC: + sendC <- evt + case err := <-errC: + if a.ctx.Err() != nil || errors.Is(err, io.EOF) { + return false, err + } - a.logger.Warn("Error receiving Docker events", "err", err) - time.Sleep(1 * time.Second) - } - } -} - -func (a *Actor) handleDockerEvent(evt events.Message) { - a.ch <- func() { - ctr, ok := a.containers[evt.ID] - if !ok { - return - } - - if strings.HasPrefix(string(evt.Action), "health_status") { - a.logger.Info("Event: health status changed", "type", evt.Type, "action", evt.Action, "id", shortID(evt.ID)) - - switch evt.Action { - case events.ActionHealthStatusRunning: - ctr.HealthState = "running" - case events.ActionHealthStatusHealthy: - ctr.HealthState = "healthy" - case events.ActionHealthStatusUnhealthy: - ctr.HealthState = "unhealthy" - default: - a.logger.Warn("Unknown health status", "action", evt.Action) - ctr.HealthState = "unknown" + return true, err } } } -} -// GetContainerState returns a copy of the current state of all containers. -func (a *Actor) GetContainerState() map[string]domain.Container { - resultChan := make(chan map[string]domain.Container) + go func() { + for { + shouldRetry, err := getEvents() + if !shouldRetry { + break + } - a.ch <- func() { - result := make(map[string]domain.Container, len(a.containers)) - for id, ctr := range a.containers { - result[id] = *ctr + a.logger.Warn("Error receiving Docker events", "err", err, "id", shortID(containerID)) + time.Sleep(2 * time.Second) } - resultChan <- result - } + }() - return <-resultChan + return sendC } // RunContainerParams are the parameters for running a container. @@ -207,7 +159,10 @@ type RunContainerParams struct { } // RunContainer runs a container with the given parameters. -func (a *Actor) RunContainer(ctx context.Context, params RunContainerParams) (string, <-chan struct{}, error) { +// +// The returned channel will receive the current state of the container, and +// will be closed after the container has stopped. +func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (string, <-chan domain.ContainerState, error) { pullReader, err := a.apiClient.ImagePull(ctx, params.ContainerConfig.Image, image.PullOptions{}) if err != nil { return "", nil, fmt.Errorf("image pull: %w", err) @@ -240,44 +195,65 @@ func (a *Actor) RunContainer(ctx context.Context, params RunContainerParams) (st } a.logger.Info("Started container", "id", shortID(createResp.ID)) - go a.handleStats(createResp.ID) - - ch := make(chan struct{}, 1) + containerStateC := make(chan domain.ContainerState, 1) a.wg.Add(1) go func() { defer a.wg.Done() + defer close(containerStateC) - respChan, errChan := a.apiClient.ContainerWait(ctx, createResp.ID, container.WaitConditionNotRunning) - select { - case resp := <-respChan: - a.logger.Info("Container entered non-running state", "exit_code", resp.StatusCode, "id", shortID(createResp.ID)) - a.ch <- func() { - delete(a.containers, createResp.ID) - } - case err = <-errChan: - // TODO: error handling? - if err != context.Canceled { - a.logger.Error("Error setting container wait", "err", err, "id", shortID(createResp.ID)) - } - } - - ch <- struct{}{} + a.runContainerLoop(ctx, createResp.ID, containerStateC) }() - // Update the containers map which must be done on the actor loop, but before - // we return from the method. - done := make(chan struct{}) - a.ch <- func() { - a.containers[createResp.ID] = &domain.Container{ID: createResp.ID, HealthState: "healthy"} - done <- struct{}{} - } - <-done + return createResp.ID, containerStateC, nil +} - return createResp.ID, ch, nil +// runContainerLoop is the control loop for a single container. It returns only +// when the container exits. +func (a *Client) runContainerLoop(ctx context.Context, containerID string, stateCh chan<- domain.ContainerState) { + statsC := a.getStats(containerID) + eventsC := a.getEvents(containerID) + respC, errC := a.apiClient.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) + + state := &domain.ContainerState{ID: containerID} + sendState := func() { stateCh <- *state } + sendState() + + for { + select { + case resp := <-respC: + a.logger.Info("Container entered non-running state", "exit_code", resp.StatusCode, "id", shortID(containerID)) + return + case err := <-errC: + // TODO: error handling? + if err != context.Canceled { + a.logger.Error("Error setting container wait", "err", err, "id", shortID(containerID)) + } + return + case evt := <-eventsC: + if strings.Contains(string(evt.Action), "health_status") { + switch evt.Action { + case events.ActionHealthStatusRunning: + state.HealthState = "running" + case events.ActionHealthStatusHealthy: + state.HealthState = "healthy" + case events.ActionHealthStatusUnhealthy: + state.HealthState = "unhealthy" + default: + a.logger.Warn("Unknown health status", "status", evt.Action) + state.HealthState = "unknown" + } + sendState() + } + case stats := <-statsC: + state.CPUPercent = stats.cpuPercent + state.MemoryUsageBytes = stats.memoryUsageBytes + sendState() + } + } } // Close closes the client, stopping and removing all running containers. -func (a *Actor) Close() error { +func (a *Client) Close() error { a.cancel() ctx, cancel := context.WithTimeout(context.Background(), stopTimeout) @@ -296,12 +272,10 @@ func (a *Actor) Close() error { a.wg.Wait() - close(a.ch) - return a.apiClient.Close() } -func (a *Actor) removeContainer(ctx context.Context, id string) error { +func (a *Client) removeContainer(ctx context.Context, id string) error { a.logger.Info("Stopping container", "id", shortID(id)) stopTimeout := int(stopTimeout.Seconds()) if err := a.apiClient.ContainerStop(ctx, id, container.StopOptions{Timeout: &stopTimeout}); err != nil { @@ -317,7 +291,7 @@ func (a *Actor) removeContainer(ctx context.Context, id string) error { } // ContainerRunning checks if a container with the given labels is running. -func (a *Actor) ContainerRunning(ctx context.Context, labels map[string]string) (bool, error) { +func (a *Client) ContainerRunning(ctx context.Context, labels map[string]string) (bool, error) { containers, err := a.containersMatchingLabels(ctx, labels) if err != nil { return false, fmt.Errorf("container list: %w", err) @@ -333,7 +307,7 @@ func (a *Actor) ContainerRunning(ctx context.Context, labels map[string]string) } // RemoveContainers removes all containers with the given labels. -func (a *Actor) RemoveContainers(ctx context.Context, labels map[string]string) error { +func (a *Client) RemoveContainers(ctx context.Context, labels map[string]string) error { containers, err := a.containersMatchingLabels(ctx, labels) if err != nil { return fmt.Errorf("container list: %w", err) @@ -348,7 +322,7 @@ func (a *Actor) RemoveContainers(ctx context.Context, labels map[string]string) return nil } -func (a *Actor) containersMatchingLabels(ctx context.Context, labels map[string]string) ([]types.Container, error) { +func (a *Client) containersMatchingLabels(ctx context.Context, labels map[string]string) ([]types.Container, error) { filterArgs := filters.NewArgs( filters.Arg("label", "app=termstream"), filters.Arg("label", "app-id="+a.id.String()), diff --git a/container/container_test.go b/container/container_test.go index 735bc5e..2abbb31 100644 --- a/container/container_test.go +++ b/container/container_test.go @@ -21,17 +21,14 @@ func TestClientStartStop(t *testing.T) { containerName := "termstream-test-" + uuid.NewString() component := "test-start-stop" - client, err := container.NewActor(ctx, container.NewActorParams{ - ChanSize: 1, - Logger: logger, - }) + client, err := container.NewClient(ctx, logger) require.NoError(t, err) running, err := client.ContainerRunning(ctx, map[string]string{"component": component}) require.NoError(t, err) assert.False(t, running) - containerID, _, err := client.RunContainer(ctx, container.RunContainerParams{ + containerID, containerStateC, err := client.RunContainer(ctx, container.RunContainerParams{ Name: containerName, ContainerConfig: &typescontainer.Config{ Image: "netfluxio/mediamtx-alpine:latest", @@ -42,6 +39,8 @@ func TestClientStartStop(t *testing.T) { }, }) require.NoError(t, err) + testhelpers.DiscardChannel(containerStateC) + assert.NotEmpty(t, containerID) require.Eventually( @@ -55,17 +54,6 @@ func TestClientStartStop(t *testing.T) { "container not in RUNNING state", ) - require.Eventually( - t, - func() bool { - ctr, ok := client.GetContainerState()[containerID] - return ok && ctr.HealthState == "healthy" && ctr.CPUPercent > 0 && ctr.MemoryUsageBytes > 0 - }, - 2*time.Second, - 100*time.Millisecond, - "container state not updated", - ) - client.Close() running, err = client.ContainerRunning(ctx, map[string]string{"component": component}) @@ -80,14 +68,11 @@ func TestClientRemoveContainers(t *testing.T) { logger := testhelpers.NewTestLogger() component := "test-remove-containers" - client, err := container.NewActor(ctx, container.NewActorParams{ - ChanSize: 1, - Logger: logger, - }) + client, err := container.NewClient(ctx, logger) require.NoError(t, err) t.Cleanup(func() { client.Close() }) - _, _, err = client.RunContainer(ctx, container.RunContainerParams{ + _, stateC, err := client.RunContainer(ctx, container.RunContainerParams{ ContainerConfig: &typescontainer.Config{ Image: "netfluxio/mediamtx-alpine:latest", Labels: map[string]string{"component": component, "group": "test1"}, @@ -95,8 +80,9 @@ func TestClientRemoveContainers(t *testing.T) { HostConfig: &typescontainer.HostConfig{NetworkMode: "default"}, }) require.NoError(t, err) + testhelpers.DiscardChannel(stateC) - _, _, err = client.RunContainer(ctx, container.RunContainerParams{ + _, stateC, err = client.RunContainer(ctx, container.RunContainerParams{ ContainerConfig: &typescontainer.Config{ Image: "netfluxio/mediamtx-alpine:latest", Labels: map[string]string{"component": component, "group": "test1"}, @@ -104,8 +90,9 @@ func TestClientRemoveContainers(t *testing.T) { HostConfig: &typescontainer.HostConfig{NetworkMode: "default"}, }) require.NoError(t, err) + testhelpers.DiscardChannel(stateC) - _, _, err = client.RunContainer(ctx, container.RunContainerParams{ + _, stateC, err = client.RunContainer(ctx, container.RunContainerParams{ ContainerConfig: &typescontainer.Config{ Image: "netfluxio/mediamtx-alpine:latest", Labels: map[string]string{"component": component, "group": "test2"}, @@ -113,7 +100,7 @@ func TestClientRemoveContainers(t *testing.T) { HostConfig: &typescontainer.HostConfig{NetworkMode: "default"}, }) require.NoError(t, err) - require.Equal(t, 3, len(client.GetContainerState())) + testhelpers.DiscardChannel(stateC) // check all containers in group 1 are running require.Eventually( @@ -154,7 +141,6 @@ func TestClientRemoveContainers(t *testing.T) { 100*time.Millisecond, "container group 1 still in RUNNING state", ) - require.Equal(t, 1, len(client.GetContainerState())) // check group 2 is still running running, err := client.ContainerRunning(ctx, map[string]string{"group": "test2"}) diff --git a/domain/types.go b/domain/types.go index 1659962..0a9da6d 100644 --- a/domain/types.go +++ b/domain/types.go @@ -4,26 +4,26 @@ package domain type AppState struct { Source Source Destinations []Destination - Containers map[string]Container } // Source represents the source, currently always the mediaserver. type Source struct { - ContainerID string - Live bool - URL string + ContainerState ContainerState + Live bool + URL string } // Destination is a single destination. type Destination struct { - URL string + ContainerState ContainerState + URL string } -// Container represents the current state of an individual container. +// ContainerState represents the current state of an individual container. // // The source of truth is always the Docker daemon, this struct is used only // for passing asynchronous state. -type Container struct { +type ContainerState struct { ID string HealthState string CPUPercent float64 diff --git a/main.go b/main.go index 30eef9c..1cb7138 100644 --- a/main.go +++ b/main.go @@ -42,9 +42,7 @@ func run(ctx context.Context, cfgReader io.Reader) error { logger := slog.New(slog.NewTextHandler(logFile, nil)) logger.Info("Starting termstream", slog.Any("initial_state", state)) - containerClient, err := container.NewActor(ctx, container.NewActorParams{ - Logger: logger.With("component", "container_client"), - }) + containerClient, err := container.NewClient(ctx, logger.With("component", "container_client")) if err != nil { return fmt.Errorf("new container client: %w", err) } @@ -80,16 +78,11 @@ func run(ctx context.Context, cfgReader io.Reader) error { } logger.Info("Command received", "cmd", cmd) case <-uiTicker.C: - applyContainerState(containerClient, state) + // TODO: update UI with current state? + updateUI() + case serverState := <-srv.C(): + applyServerState(serverState, state) updateUI() - case serverState, ok := <-srv.C(): - if ok { - applyServerState(serverState, state) - updateUI() - } else { - logger.Info("Source state channel closed, shutting down...") - return nil - } } } } @@ -106,8 +99,3 @@ func applyConfig(cfg config.Config, appState *domain.AppState) { appState.Destinations = append(appState.Destinations, domain.Destination{URL: dest.URL}) } } - -// applyContainerState applies the current container state to the app state. -func applyContainerState(containerClient *container.Actor, appState *domain.AppState) { - appState.Containers = containerClient.GetContainerState() -} diff --git a/mediaserver/actor.go b/mediaserver/actor.go index 3b5ea99..ebfe6ed 100644 --- a/mediaserver/actor.go +++ b/mediaserver/actor.go @@ -26,18 +26,20 @@ type action func() // Actor is responsible for managing the media server. type Actor struct { - ch chan action - state *domain.Source - stateChan chan domain.Source - containerClient *container.Actor + actorC chan action + stateC chan domain.Source + containerClient *container.Client logger *slog.Logger httpClient *http.Client + + // mutable state + state *domain.Source } // StartActorParams contains the parameters for starting a new media server // actor. type StartActorParams struct { - ContainerClient *container.Actor + ContainerClient *container.Client ChanSize int Logger *slog.Logger } @@ -53,22 +55,22 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { chanSize := cmp.Or(params.ChanSize, defaultChanSize) actor := &Actor{ - ch: make(chan action, chanSize), + actorC: make(chan action, chanSize), state: new(domain.Source), - stateChan: make(chan domain.Source, chanSize), + stateC: make(chan domain.Source, chanSize), containerClient: params.ContainerClient, logger: params.Logger, httpClient: &http.Client{Timeout: httpClientTimeout}, } - containerID, containerDone, err := params.ContainerClient.RunContainer( + containerID, containerStateC, err := params.ContainerClient.RunContainer( ctx, container.RunContainerParams{ Name: "server", ContainerConfig: &typescontainer.Config{ Image: imageNameMediaMTX, Env: []string{ - "MTX_LOGLEVEL=debug", + "MTX_LOGLEVEL=info", "MTX_API=yes", }, Labels: map[string]string{ @@ -91,23 +93,23 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { return nil, fmt.Errorf("run container: %w", err) } - actor.state.ContainerID = containerID + actor.state.ContainerState.ID = containerID actor.state.URL = "rtmp://localhost:1935/" + rtmpPath - go actor.actorLoop(containerDone) + go actor.actorLoop(containerStateC) return actor, nil } // C returns a channel that will receive the current state of the media server. func (s *Actor) C() <-chan domain.Source { - return s.stateChan + return s.stateC } // State returns the current state of the media server. func (s *Actor) State() domain.Source { resultChan := make(chan domain.Source) - s.ch <- func() { + s.actorC <- func() { resultChan <- *s.state } return <-resultChan @@ -119,32 +121,37 @@ func (s *Actor) Close() error { return fmt.Errorf("remove containers: %w", err) } + close(s.actorC) + return nil } -func (s *Actor) actorLoop(containerDone <-chan struct{}) { - defer close(s.ch) - +// actorLoop is the main loop of the media server actor. It only exits when the +// actor is closed. +func (s *Actor) actorLoop(containerStateC <-chan domain.ContainerState) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - var closing bool - sendState := func() { - if !closing { - s.stateChan <- *s.state - } - } + sendState := func() { s.stateC <- *s.state } for { select { - case <-containerDone: - ticker.Stop() + case containerState, ok := <-containerStateC: + if !ok { + ticker.Stop() - s.state.Live = false + if s.state.Live { + s.state.Live = false + sendState() + } + + continue + } + + s.state.ContainerState = containerState sendState() - closing = true - close(s.stateChan) + continue case <-ticker.C: ingressLive, err := s.fetchIngressStateFromServer() if err != nil { @@ -155,7 +162,7 @@ func (s *Actor) actorLoop(containerDone <-chan struct{}) { s.state.Live = ingressLive sendState() } - case action, ok := <-s.ch: + case action, ok := <-s.actorC: if !ok { return } diff --git a/mediaserver/actor_test.go b/mediaserver/actor_test.go index 9ca16fb..990a9c0 100644 --- a/mediaserver/actor_test.go +++ b/mediaserver/actor_test.go @@ -21,7 +21,7 @@ func TestMediaServerStartStop(t *testing.T) { t.Cleanup(cancel) logger := testhelpers.NewTestLogger() - containerClient, err := container.NewActor(ctx, container.NewActorParams{ChanSize: 1, Logger: logger}) + containerClient, err := container.NewClient(ctx, logger) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, containerClient.Close()) }) @@ -35,6 +35,7 @@ func TestMediaServerStartStop(t *testing.T) { Logger: logger, }) require.NoError(t, err) + testhelpers.DiscardChannel(mediaServer.C()) require.Eventually( t, @@ -54,10 +55,13 @@ func TestMediaServerStartStop(t *testing.T) { launchFFMPEG(t, "rtmp://localhost:1935/live") require.Eventually( t, - func() bool { return mediaServer.State().Live }, + func() bool { + currState := mediaServer.State() + return currState.Live && currState.ContainerState.HealthState == "healthy" + }, 5*time.Second, 250*time.Millisecond, - "actor not in LIVE state", + "actor not healthy and/or in LIVE state", ) mediaServer.Close() diff --git a/terminal/actor.go b/terminal/actor.go index 92306ff..9d70400 100644 --- a/terminal/actor.go +++ b/terminal/actor.go @@ -135,23 +135,25 @@ func (a *Actor) redrawFromState(state domain.AppState) { setHeaderRow := func(tableView *tview.Table) { tableView.SetCell(0, 0, tview.NewTableCell("[grey]URL").SetAlign(tview.AlignLeft).SetExpansion(7).SetSelectable(false)) tableView.SetCell(0, 1, tview.NewTableCell("[grey]Status").SetAlign(tview.AlignLeft).SetExpansion(1).SetSelectable(false)) - tableView.SetCell(0, 2, tview.NewTableCell("[grey]CPU %").SetAlign(tview.AlignLeft).SetExpansion(1).SetSelectable(false)) - tableView.SetCell(0, 3, tview.NewTableCell("[grey]Mem used (MB)").SetAlign(tview.AlignLeft).SetExpansion(1).SetSelectable(false)) - tableView.SetCell(0, 4, tview.NewTableCell("[grey]Actions").SetAlign(tview.AlignLeft).SetExpansion(2).SetSelectable(false)) + tableView.SetCell(0, 2, tview.NewTableCell("[grey]Health").SetAlign(tview.AlignLeft).SetExpansion(1).SetSelectable(false)) + tableView.SetCell(0, 3, tview.NewTableCell("[grey]CPU %").SetAlign(tview.AlignLeft).SetExpansion(1).SetSelectable(false)) + tableView.SetCell(0, 4, tview.NewTableCell("[grey]Mem used (MB)").SetAlign(tview.AlignLeft).SetExpansion(1).SetSelectable(false)) + tableView.SetCell(0, 5, tview.NewTableCell("[grey]Actions").SetAlign(tview.AlignLeft).SetExpansion(2).SetSelectable(false)) } a.sourceView.Clear() setHeaderRow(a.sourceView) - sourceContainer := state.Containers[state.Source.ContainerID] a.sourceView.SetCell(1, 0, tview.NewTableCell(state.Source.URL)) + if state.Source.Live { a.sourceView.SetCell(1, 1, tview.NewTableCell("[green]on-air")) } else { a.sourceView.SetCell(1, 1, tview.NewTableCell("[yellow]off-air")) } - a.sourceView.SetCell(1, 2, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", sourceContainer.CPUPercent))) - a.sourceView.SetCell(1, 3, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", float64(sourceContainer.MemoryUsageBytes)/1024/1024))) - a.sourceView.SetCell(1, 4, tview.NewTableCell("")) + a.sourceView.SetCell(1, 2, tview.NewTableCell("[white]"+cmp.Or(state.Source.ContainerState.HealthState, "starting"))) + a.sourceView.SetCell(1, 3, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", state.Source.ContainerState.CPUPercent))) + a.sourceView.SetCell(1, 4, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", float64(state.Source.ContainerState.MemoryUsageBytes)/1024/1024))) + a.sourceView.SetCell(1, 5, tview.NewTableCell("")) a.destView.Clear() setHeaderRow(a.destView) @@ -161,7 +163,8 @@ func (a *Actor) redrawFromState(state domain.AppState) { a.destView.SetCell(i+1, 1, tview.NewTableCell("[yellow]off-air")) a.destView.SetCell(i+1, 2, tview.NewTableCell("[white]-")) a.destView.SetCell(i+1, 3, tview.NewTableCell("[white]-")) - a.destView.SetCell(i+1, 4, tview.NewTableCell("[green]Tab to go live")) + a.destView.SetCell(i+1, 4, tview.NewTableCell("[white]-")) + a.destView.SetCell(i+1, 5, tview.NewTableCell("[green]Tab to go live")) } a.app.Draw() diff --git a/testhelpers/logging.go b/testhelpers/logging.go index c8b5c70..0012c8d 100644 --- a/testhelpers/logging.go +++ b/testhelpers/logging.go @@ -16,3 +16,12 @@ func NewNopLogger() *slog.Logger { func NewTestLogger() *slog.Logger { return slog.New(slog.NewTextHandler(os.Stderr, nil)) } + +// NoopChannel consumes a channel and discards all values. +func DiscardChannel[T any](ch <-chan T) { + go func() { + for range ch { + // no-op + } + }() +}