diff --git a/build/mediamtx.Dockerfile b/build/mediamtx.Dockerfile new file mode 100644 index 0000000..0fdd0e4 --- /dev/null +++ b/build/mediamtx.Dockerfile @@ -0,0 +1,12 @@ +FROM bluenviron/mediamtx:latest AS mediamtx + +FROM alpine:3.21 + +RUN apk add --no-cache \ + bash \ + curl + +COPY --from=mediamtx /mediamtx /usr/bin/mediamtx +COPY --from=mediamtx /mediamtx.yml /mediamtx.yml + +CMD ["/usr/bin/mediamtx"] diff --git a/container/client.go b/container/client.go deleted file mode 100644 index 6a62f83..0000000 --- a/container/client.go +++ /dev/null @@ -1,197 +0,0 @@ -package container - -import ( - "context" - "fmt" - "io" - "log/slog" - "sync" - "time" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/filters" - "github.com/docker/docker/api/types/image" - "github.com/docker/docker/client" - "github.com/google/uuid" -) - -var containerStopTimeout = 10 * time.Second - -// Client is a thin wrapper around the Docker client. -type Client struct { - id uuid.UUID - wg sync.WaitGroup // TODO: is it needed? - apiClient *client.Client - logger *slog.Logger -} - -// NewClient creates a new Client. -func NewClient(logger *slog.Logger) (*Client, error) { - apiClient, err := client.NewClientWithOpts(client.FromEnv) - if err != nil { - return nil, err - } - - return &Client{ - id: uuid.New(), - apiClient: apiClient, - logger: logger, - }, nil -} - -// RunContainerParams are the parameters for running a container. -type RunContainerParams struct { - Name string - ContainerConfig *container.Config - HostConfig *container.HostConfig -} - -// RunContainer runs a container with the given parameters. -func (r *Client) RunContainer(ctx context.Context, params RunContainerParams) (string, <-chan struct{}, error) { - pullReader, err := r.apiClient.ImagePull(ctx, params.ContainerConfig.Image, image.PullOptions{}) - if err != nil { - return "", nil, fmt.Errorf("image pull: %w", err) - } - _, _ = io.Copy(io.Discard, pullReader) - _ = pullReader.Close() - - params.ContainerConfig.Labels["app"] = "termstream" - params.ContainerConfig.Labels["app-id"] = r.id.String() - - var name string - if params.Name != "" { - name = "termstream-" + r.id.String() + "-" + params.Name - } - - createResp, err := r.apiClient.ContainerCreate( - ctx, - params.ContainerConfig, - params.HostConfig, - nil, - nil, - name, - ) - if err != nil { - return "", nil, fmt.Errorf("container create: %w", err) - } - - ch := make(chan struct{}, 1) - r.wg.Add(1) - go func() { - defer r.wg.Done() - - respChan, errChan := r.apiClient.ContainerWait(ctx, createResp.ID, container.WaitConditionNotRunning) - select { - case resp := <-respChan: - r.logger.Info("Container entered non-running state", "status", resp.StatusCode, "id", shortID(createResp.ID)) - case err = <-errChan: - if err != context.Canceled { - r.logger.Error("Error setting container wait", "err", err, "id", shortID(createResp.ID)) - } - } - - ch <- struct{}{} - }() - - if err = r.apiClient.ContainerStart(ctx, createResp.ID, container.StartOptions{}); err != nil { - return "", nil, fmt.Errorf("container start: %w", err) - } - r.logger.Info("Started container", "id", shortID(createResp.ID)) - - ctr, err := r.apiClient.ContainerInspect(ctx, createResp.ID) - if err != nil { - return "", nil, fmt.Errorf("container inspect: %w", err) - } - - return ctr.ID, ch, nil -} - -// Close closes the client, stopping and removing all running containers. -func (r *Client) Close() error { - ctx, cancel := context.WithTimeout(context.Background(), containerStopTimeout) - defer cancel() - - containerList, err := r.containersMatchingLabels(ctx, nil) - if err != nil { - return fmt.Errorf("container list: %w", err) - } - - for _, container := range containerList { - if err := r.removeContainer(ctx, container.ID); err != nil { - r.logger.Error("Error removing container:", "err", err, "id", shortID(container.ID)) - } - } - - r.wg.Wait() - - return r.apiClient.Close() -} - -func (r *Client) removeContainer(ctx context.Context, id string) error { - r.logger.Info("Stopping container", "id", shortID(id)) - stopTimeout := int(containerStopTimeout.Seconds()) - if err := r.apiClient.ContainerStop(ctx, id, container.StopOptions{Timeout: &stopTimeout}); err != nil { - return fmt.Errorf("container stop: %w", err) - } - - r.logger.Info("Removing container", "id", shortID(id)) - if err := r.apiClient.ContainerRemove(ctx, id, container.RemoveOptions{Force: true}); err != nil { - return fmt.Errorf("container remove: %w", err) - } - - return nil -} - -// ContainerRunning checks if a container with the given labels is running. -func (r *Client) ContainerRunning(ctx context.Context, labels map[string]string) (bool, error) { - containers, err := r.containersMatchingLabels(ctx, labels) - if err != nil { - return false, fmt.Errorf("container list: %w", err) - } - - for _, container := range containers { - if container.State == "running" { - return true, nil - } - } - - return false, nil -} - -// RemoveContainers removes all containers with the given labels. -func (r *Client) RemoveContainers(ctx context.Context, labels map[string]string) error { - containers, err := r.containersMatchingLabels(ctx, labels) - if err != nil { - return fmt.Errorf("container list: %w", err) - } - - for _, container := range containers { - if err := r.removeContainer(ctx, container.ID); err != nil { - r.logger.Error("Error removing container:", "err", err, "id", shortID(container.ID)) - } - } - - return nil -} - -func (r *Client) containersMatchingLabels(ctx context.Context, labels map[string]string) ([]types.Container, error) { - filterArgs := filters.NewArgs( - filters.Arg("label", "app=termstream"), - filters.Arg("label", "app-id="+r.id.String()), - ) - for k, v := range labels { - filterArgs.Add("label", k+"="+v) - } - return r.apiClient.ContainerList(ctx, container.ListOptions{ - All: true, - Filters: filterArgs, - }) -} - -func shortID(id string) string { - if len(id) < 12 { - return id - } - return id[:12] -} diff --git a/container/client_test.go b/container/client_test.go deleted file mode 100644 index 29d0419..0000000 --- a/container/client_test.go +++ /dev/null @@ -1,65 +0,0 @@ -package container_test - -import ( - "context" - "testing" - "time" - - "git.netflux.io/rob/termstream/container" - "git.netflux.io/rob/termstream/testhelpers" - typescontainer "github.com/docker/docker/api/types/container" - "github.com/docker/docker/client" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestClientStartStop(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - apiClient, err := client.NewClientWithOpts(client.FromEnv) - require.NoError(t, err) - defer apiClient.Close() - - logger := testhelpers.NewTestLogger() - containerName := "termstream-test-" + uuid.NewString() - component := "test-start-stop" - - client, err := container.NewClient(logger) - require.NoError(t, err) - - running, err := client.ContainerRunning(ctx, map[string]string{"component": component}) - require.NoError(t, err) - assert.False(t, running) - - containerID, _, err := client.RunContainer(ctx, container.RunContainerParams{ - Name: containerName, - ContainerConfig: &typescontainer.Config{ - Image: "bluenviron/mediamtx", - Labels: map[string]string{"component": component}, - }, - HostConfig: &typescontainer.HostConfig{ - NetworkMode: "default", - }, - }) - require.NoError(t, err) - assert.NotEmpty(t, containerID) - - require.Eventually( - t, - func() bool { - running, err = client.ContainerRunning(ctx, map[string]string{"component": component}) - return err == nil && running - }, - 2*time.Second, - 100*time.Millisecond, - "container not in RUNNING state", - ) - - client.Close() - - running, err = client.ContainerRunning(ctx, map[string]string{"component": component}) - require.NoError(t, err) - assert.False(t, running) -} diff --git a/container/container.go b/container/container.go new file mode 100644 index 0000000..f9468fc --- /dev/null +++ b/container/container.go @@ -0,0 +1,370 @@ +package container + +import ( + "cmp" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "strings" + "sync" + "time" + + "git.netflux.io/rob/termstream/domain" + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/events" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/client" + "github.com/google/uuid" +) + +var ( + stopTimeout = 10 * time.Second + defaultChanSize = 64 +) + +type action func() + +// Actor is an actor that provides a thin wrapper around the Docker API client, +// and provides additional functionality such as exposing container stats. +type Actor struct { + id uuid.UUID + ctx context.Context + cancel context.CancelFunc + ch chan action + wg sync.WaitGroup + apiClient *client.Client + logger *slog.Logger + + // mutable state + containers map[string]*domain.Container +} + +// NewActorParams are the parameters for creating a new Actor. +type NewActorParams struct { + ChanSize int + Logger *slog.Logger +} + +// NewActor creates a new Actor. +func NewActor(ctx context.Context, params NewActorParams) (*Actor, error) { + apiClient, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + + client := &Actor{ + id: uuid.New(), + ctx: ctx, + cancel: cancel, + ch: make(chan action, cmp.Or(params.ChanSize, defaultChanSize)), + apiClient: apiClient, + logger: params.Logger, + containers: make(map[string]*domain.Container), + } + + client.wg.Add(1) + go func() { + defer client.wg.Done() + + client.dockerEventLoop() + }() + + go client.actorLoop() + + return client, nil +} + +// actorLoop is the main loop for the client. +// +// It continues to run until the internal channel is closed, which only happens +// when [Close] is called. This means that it is not reliant on any context +// remaining open, and any method calls made after [Close] may deadlock or +// fail. +func (a *Actor) actorLoop() { + for action := range a.ch { + action() + } +} + +func (a *Actor) handleStats(id string) { + statsReader, err := a.apiClient.ContainerStats(a.ctx, id, true) + if err != nil { + // TODO: error handling? + a.logger.Error("Error getting container stats", "err", err, "id", shortID(id)) + return + } + defer statsReader.Body.Close() + + buf := make([]byte, 4_096) + for { + n, err := statsReader.Body.Read(buf) + if err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + return + } + a.logger.Error("Error reading stats", "err", err, "id", shortID(id)) + return + } + + var statsResp container.StatsResponse + if err = json.Unmarshal(buf[:n], &statsResp); err != nil { + a.logger.Error("Error unmarshalling stats", "err", err, "id", shortID(id)) + continue + } + + a.ch <- func() { + ctr, ok := a.containers[id] + if !ok { + return + } + + // https://stackoverflow.com/a/30292327/62871 + cpuDelta := float64(statsResp.CPUStats.CPUUsage.TotalUsage - statsResp.PreCPUStats.CPUUsage.TotalUsage) + systemDelta := float64(statsResp.CPUStats.SystemUsage - statsResp.PreCPUStats.SystemUsage) + ctr.CPUPercent = (cpuDelta / systemDelta) * float64(statsResp.CPUStats.OnlineCPUs) * 100 + ctr.MemoryUsageBytes = statsResp.MemoryStats.Usage + } + } +} + +func (a *Actor) dockerEventLoop() { + for { + ch, errCh := a.apiClient.Events(a.ctx, events.ListOptions{ + Filters: filters.NewArgs( + filters.Arg("label", "app=termstream"), + filters.Arg("label", "app-id="+a.id.String()), + ), + }) + + select { + case <-a.ctx.Done(): + return + case evt := <-ch: + a.handleDockerEvent(evt) + case err := <-errCh: + if a.ctx.Err() != nil { + return + } + + a.logger.Warn("Error receiving Docker events", "err", err) + time.Sleep(1 * time.Second) + } + } +} + +func (a *Actor) handleDockerEvent(evt events.Message) { + a.ch <- func() { + ctr, ok := a.containers[evt.ID] + if !ok { + return + } + + if strings.HasPrefix(string(evt.Action), "health_status") { + a.logger.Info("Event: health status changed", "type", evt.Type, "action", evt.Action, "id", shortID(evt.ID)) + + switch evt.Action { + case events.ActionHealthStatusRunning: + ctr.HealthState = "running" + case events.ActionHealthStatusHealthy: + ctr.HealthState = "healthy" + case events.ActionHealthStatusUnhealthy: + ctr.HealthState = "unhealthy" + default: + a.logger.Warn("Unknown health status", "action", evt.Action) + ctr.HealthState = "unknown" + } + } + } +} + +// GetContainerState returns a copy of the current state of all containers. +func (a *Actor) GetContainerState() map[string]domain.Container { + resultChan := make(chan map[string]domain.Container) + + a.ch <- func() { + result := make(map[string]domain.Container, len(a.containers)) + for id, ctr := range a.containers { + result[id] = *ctr + } + resultChan <- result + } + + return <-resultChan +} + +// RunContainerParams are the parameters for running a container. +type RunContainerParams struct { + Name string + ContainerConfig *container.Config + HostConfig *container.HostConfig +} + +// RunContainer runs a container with the given parameters. +func (a *Actor) RunContainer(ctx context.Context, params RunContainerParams) (string, <-chan struct{}, error) { + pullReader, err := a.apiClient.ImagePull(ctx, params.ContainerConfig.Image, image.PullOptions{}) + if err != nil { + return "", nil, fmt.Errorf("image pull: %w", err) + } + _, _ = io.Copy(io.Discard, pullReader) + _ = pullReader.Close() + + params.ContainerConfig.Labels["app"] = "termstream" + params.ContainerConfig.Labels["app-id"] = a.id.String() + + var name string + if params.Name != "" { + name = "termstream-" + a.id.String() + "-" + params.Name + } + + createResp, err := a.apiClient.ContainerCreate( + ctx, + params.ContainerConfig, + params.HostConfig, + nil, + nil, + name, + ) + if err != nil { + return "", nil, fmt.Errorf("container create: %w", err) + } + + if err = a.apiClient.ContainerStart(ctx, createResp.ID, container.StartOptions{}); err != nil { + return "", nil, fmt.Errorf("container start: %w", err) + } + a.logger.Info("Started container", "id", shortID(createResp.ID)) + + go a.handleStats(createResp.ID) + + ch := make(chan struct{}, 1) + a.wg.Add(1) + go func() { + defer a.wg.Done() + + respChan, errChan := a.apiClient.ContainerWait(ctx, createResp.ID, container.WaitConditionNotRunning) + select { + case resp := <-respChan: + a.logger.Info("Container entered non-running state", "exit_code", resp.StatusCode, "id", shortID(createResp.ID)) + a.ch <- func() { + delete(a.containers, createResp.ID) + } + case err = <-errChan: + // TODO: error handling? + if err != context.Canceled { + a.logger.Error("Error setting container wait", "err", err, "id", shortID(createResp.ID)) + } + } + + ch <- struct{}{} + }() + + // Update the containers map which must be done on the actor loop, but before + // we return from the method. + done := make(chan struct{}) + a.ch <- func() { + a.containers[createResp.ID] = &domain.Container{ID: createResp.ID, HealthState: "healthy"} + done <- struct{}{} + } + <-done + + return createResp.ID, ch, nil +} + +// Close closes the client, stopping and removing all running containers. +func (a *Actor) Close() error { + a.cancel() + + ctx, cancel := context.WithTimeout(context.Background(), stopTimeout) + defer cancel() + + containerList, err := a.containersMatchingLabels(ctx, nil) + if err != nil { + return fmt.Errorf("container list: %w", err) + } + + for _, container := range containerList { + if err := a.removeContainer(ctx, container.ID); err != nil { + a.logger.Error("Error removing container:", "err", err, "id", shortID(container.ID)) + } + } + + a.wg.Wait() + + close(a.ch) + + return a.apiClient.Close() +} + +func (a *Actor) removeContainer(ctx context.Context, id string) error { + a.logger.Info("Stopping container", "id", shortID(id)) + stopTimeout := int(stopTimeout.Seconds()) + if err := a.apiClient.ContainerStop(ctx, id, container.StopOptions{Timeout: &stopTimeout}); err != nil { + return fmt.Errorf("container stop: %w", err) + } + + a.logger.Info("Removing container", "id", shortID(id)) + if err := a.apiClient.ContainerRemove(ctx, id, container.RemoveOptions{Force: true}); err != nil { + return fmt.Errorf("container remove: %w", err) + } + + return nil +} + +// ContainerRunning checks if a container with the given labels is running. +func (a *Actor) ContainerRunning(ctx context.Context, labels map[string]string) (bool, error) { + containers, err := a.containersMatchingLabels(ctx, labels) + if err != nil { + return false, fmt.Errorf("container list: %w", err) + } + + for _, container := range containers { + if container.State == "running" || container.State == "restarting" { + return true, nil + } + } + + return false, nil +} + +// RemoveContainers removes all containers with the given labels. +func (a *Actor) RemoveContainers(ctx context.Context, labels map[string]string) error { + containers, err := a.containersMatchingLabels(ctx, labels) + if err != nil { + return fmt.Errorf("container list: %w", err) + } + + for _, container := range containers { + if err := a.removeContainer(ctx, container.ID); err != nil { + a.logger.Error("Error removing container:", "err", err, "id", shortID(container.ID)) + } + } + + return nil +} + +func (a *Actor) containersMatchingLabels(ctx context.Context, labels map[string]string) ([]types.Container, error) { + filterArgs := filters.NewArgs( + filters.Arg("label", "app=termstream"), + filters.Arg("label", "app-id="+a.id.String()), + ) + for k, v := range labels { + filterArgs.Add("label", k+"="+v) + } + return a.apiClient.ContainerList(ctx, container.ListOptions{ + All: true, + Filters: filterArgs, + }) +} + +func shortID(id string) string { + if len(id) < 12 { + return id + } + return id[:12] +} diff --git a/container/container_test.go b/container/container_test.go new file mode 100644 index 0000000..735bc5e --- /dev/null +++ b/container/container_test.go @@ -0,0 +1,163 @@ +package container_test + +import ( + "context" + "testing" + "time" + + "git.netflux.io/rob/termstream/container" + "git.netflux.io/rob/termstream/testhelpers" + typescontainer "github.com/docker/docker/api/types/container" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestClientStartStop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + logger := testhelpers.NewTestLogger() + containerName := "termstream-test-" + uuid.NewString() + component := "test-start-stop" + + client, err := container.NewActor(ctx, container.NewActorParams{ + ChanSize: 1, + Logger: logger, + }) + require.NoError(t, err) + + running, err := client.ContainerRunning(ctx, map[string]string{"component": component}) + require.NoError(t, err) + assert.False(t, running) + + containerID, _, err := client.RunContainer(ctx, container.RunContainerParams{ + Name: containerName, + ContainerConfig: &typescontainer.Config{ + Image: "netfluxio/mediamtx-alpine:latest", + Labels: map[string]string{"component": component}, + }, + HostConfig: &typescontainer.HostConfig{ + NetworkMode: "default", + }, + }) + require.NoError(t, err) + assert.NotEmpty(t, containerID) + + require.Eventually( + t, + func() bool { + running, err = client.ContainerRunning(ctx, map[string]string{"component": component}) + return err == nil && running + }, + 2*time.Second, + 100*time.Millisecond, + "container not in RUNNING state", + ) + + require.Eventually( + t, + func() bool { + ctr, ok := client.GetContainerState()[containerID] + return ok && ctr.HealthState == "healthy" && ctr.CPUPercent > 0 && ctr.MemoryUsageBytes > 0 + }, + 2*time.Second, + 100*time.Millisecond, + "container state not updated", + ) + + client.Close() + + running, err = client.ContainerRunning(ctx, map[string]string{"component": component}) + require.NoError(t, err) + assert.False(t, running) +} + +func TestClientRemoveContainers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + logger := testhelpers.NewTestLogger() + component := "test-remove-containers" + + client, err := container.NewActor(ctx, container.NewActorParams{ + ChanSize: 1, + Logger: logger, + }) + require.NoError(t, err) + t.Cleanup(func() { client.Close() }) + + _, _, err = client.RunContainer(ctx, container.RunContainerParams{ + ContainerConfig: &typescontainer.Config{ + Image: "netfluxio/mediamtx-alpine:latest", + Labels: map[string]string{"component": component, "group": "test1"}, + }, + HostConfig: &typescontainer.HostConfig{NetworkMode: "default"}, + }) + require.NoError(t, err) + + _, _, err = client.RunContainer(ctx, container.RunContainerParams{ + ContainerConfig: &typescontainer.Config{ + Image: "netfluxio/mediamtx-alpine:latest", + Labels: map[string]string{"component": component, "group": "test1"}, + }, + HostConfig: &typescontainer.HostConfig{NetworkMode: "default"}, + }) + require.NoError(t, err) + + _, _, err = client.RunContainer(ctx, container.RunContainerParams{ + ContainerConfig: &typescontainer.Config{ + Image: "netfluxio/mediamtx-alpine:latest", + Labels: map[string]string{"component": component, "group": "test2"}, + }, + HostConfig: &typescontainer.HostConfig{NetworkMode: "default"}, + }) + require.NoError(t, err) + require.Equal(t, 3, len(client.GetContainerState())) + + // check all containers in group 1 are running + require.Eventually( + t, + func() bool { + running, _ := client.ContainerRunning(ctx, map[string]string{"group": "test1"}) + return running + }, + 2*time.Second, + 100*time.Millisecond, + "container group 1 not in RUNNING state", + ) + // check all containers in group 2 are running + require.Eventually( + t, + func() bool { + running, _ := client.ContainerRunning(ctx, map[string]string{"group": "test2"}) + return running + }, + 2*time.Second, + 100*time.Millisecond, + "container group 2 not in RUNNING state", + ) + + // remove group 1 + err = client.RemoveContainers(ctx, map[string]string{"group": "test1"}) + require.NoError(t, err) + + // check group 1 is not running + require.Eventually( + t, + func() bool { + var running bool + running, err = client.ContainerRunning(ctx, map[string]string{"group": "test1"}) + return err == nil && !running + }, + 2*time.Second, + 100*time.Millisecond, + "container group 1 still in RUNNING state", + ) + require.Equal(t, 1, len(client.GetContainerState())) + + // check group 2 is still running + running, err := client.ContainerRunning(ctx, map[string]string{"group": "test2"}) + require.NoError(t, err) + require.True(t, running) +} diff --git a/domain/types.go b/domain/types.go index f06e485..1659962 100644 --- a/domain/types.go +++ b/domain/types.go @@ -2,13 +2,30 @@ package domain // AppState holds application state. type AppState struct { - ContainerRunning bool - IngressLive bool - IngressURL string - Destinations []Destination + Source Source + Destinations []Destination + Containers map[string]Container +} + +// Source represents the source, currently always the mediaserver. +type Source struct { + ContainerID string + Live bool + URL string } // Destination is a single destination. type Destination struct { URL string } + +// Container represents the current state of an individual container. +// +// The source of truth is always the Docker daemon, this struct is used only +// for passing asynchronous state. +type Container struct { + ID string + HealthState string + CPUPercent float64 + MemoryUsageBytes uint64 +} diff --git a/main.go b/main.go index 8757e85..30eef9c 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "io" "log/slog" "os" + "time" "git.netflux.io/rob/termstream/config" "git.netflux.io/rob/termstream/container" @@ -23,6 +24,8 @@ func main() { } } +const uiUpdateInterval = 2 * time.Second + func run(ctx context.Context, cfgReader io.Reader) error { cfg, err := config.Load(cfgReader) if err != nil { @@ -39,15 +42,17 @@ func run(ctx context.Context, cfgReader io.Reader) error { logger := slog.New(slog.NewTextHandler(logFile, nil)) logger.Info("Starting termstream", slog.Any("initial_state", state)) - containerClient, err := container.NewClient(logger.With("component", "container_client")) + containerClient, err := container.NewActor(ctx, container.NewActorParams{ + Logger: logger.With("component", "container_client"), + }) if err != nil { return fmt.Errorf("new container client: %w", err) } defer containerClient.Close() srv, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ - Client: containerClient, - Logger: logger.With("component", "mediaserver"), + ContainerClient: containerClient, + Logger: logger.With("component", "mediaserver"), }) if err != nil { return fmt.Errorf("start media server: %w", err) @@ -63,20 +68,26 @@ func run(ctx context.Context, cfgReader io.Reader) error { updateUI := func() { ui.SetState(*state) } updateUI() + uiTicker := time.NewTicker(uiUpdateInterval) + defer uiTicker.Stop() + for { select { case cmd, ok := <-ui.C(): - logger.Info("Command received", "cmd", cmd) if !ok { logger.Info("UI closed") return nil } + logger.Info("Command received", "cmd", cmd) + case <-uiTicker.C: + applyContainerState(containerClient, state) + updateUI() case serverState, ok := <-srv.C(): if ok { applyServerState(serverState, state) updateUI() } else { - logger.Info("State channel closed, shutting down...") + logger.Info("Source state channel closed, shutting down...") return nil } } @@ -84,10 +95,8 @@ func run(ctx context.Context, cfgReader io.Reader) error { } // applyServerState applies the current server state to the app state. -func applyServerState(serverState mediaserver.State, appState *domain.AppState) { - appState.ContainerRunning = serverState.ContainerRunning - appState.IngressLive = serverState.IngressLive - appState.IngressURL = serverState.IngressURL +func applyServerState(serverState domain.Source, appState *domain.AppState) { + appState.Source = serverState } // applyConfig applies the configuration to the app state. @@ -97,3 +106,8 @@ func applyConfig(cfg config.Config, appState *domain.AppState) { appState.Destinations = append(appState.Destinations, domain.Destination{URL: dest.URL}) } } + +// applyContainerState applies the current container state to the app state. +func applyContainerState(containerClient *container.Actor, appState *domain.AppState) { + appState.Containers = containerClient.GetContainerState() +} diff --git a/mediaserver/actor.go b/mediaserver/actor.go index ac55932..3b5ea99 100644 --- a/mediaserver/actor.go +++ b/mediaserver/actor.go @@ -13,30 +13,23 @@ import ( typescontainer "github.com/docker/docker/api/types/container" "git.netflux.io/rob/termstream/container" + "git.netflux.io/rob/termstream/domain" ) const ( - imageNameMediaMTX = "bluenviron/mediamtx" + imageNameMediaMTX = "netfluxio/mediamtx-alpine:latest" rtmpPath = "live" ) -// State contains the current state of the media server. -type State struct { - ContainerRunning bool - ContainerID string - IngressLive bool - IngressURL string -} - // action is an action to be performed by the actor. type action func() // Actor is responsible for managing the media server. type Actor struct { ch chan action - state *State - stateChan chan State - containerClient *container.Client + state *domain.Source + stateChan chan domain.Source + containerClient *container.Actor logger *slog.Logger httpClient *http.Client } @@ -44,7 +37,7 @@ type Actor struct { // StartActorParams contains the parameters for starting a new media server // actor. type StartActorParams struct { - ContainerClient *container.Client + ContainerClient *container.Actor ChanSize int Logger *slog.Logger } @@ -61,8 +54,8 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { actor := &Actor{ ch: make(chan action, chanSize), - state: new(State), - stateChan: make(chan State, chanSize), + state: new(domain.Source), + stateChan: make(chan domain.Source, chanSize), containerClient: params.ContainerClient, logger: params.Logger, httpClient: &http.Client{Timeout: httpClientTimeout}, @@ -81,6 +74,13 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { Labels: map[string]string{ "component": componentName, }, + Healthcheck: &typescontainer.HealthConfig{ + Test: []string{"CMD", "curl", "-f", "http://localhost:9997/v3/paths/list"}, + Interval: time.Second * 10, + StartPeriod: time.Second * 2, + StartInterval: time.Second * 2, + Retries: 2, + }, }, HostConfig: &typescontainer.HostConfig{ NetworkMode: "host", @@ -92,8 +92,7 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { } actor.state.ContainerID = containerID - actor.state.ContainerRunning = true - actor.state.IngressURL = "rtmp://localhost:1935/" + rtmpPath + actor.state.URL = "rtmp://localhost:1935/" + rtmpPath go actor.actorLoop(containerDone) @@ -101,13 +100,13 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { } // C returns a channel that will receive the current state of the media server. -func (s *Actor) C() <-chan State { +func (s *Actor) C() <-chan domain.Source { return s.stateChan } // State returns the current state of the media server. -func (s *Actor) State() State { - resultChan := make(chan State) +func (s *Actor) State() domain.Source { + resultChan := make(chan domain.Source) s.ch <- func() { resultChan <- *s.state } @@ -141,8 +140,7 @@ func (s *Actor) actorLoop(containerDone <-chan struct{}) { case <-containerDone: ticker.Stop() - s.state.ContainerRunning = false - s.state.IngressLive = false + s.state.Live = false sendState() closing = true @@ -153,8 +151,8 @@ func (s *Actor) actorLoop(containerDone <-chan struct{}) { s.logger.Error("Error fetching server state", "error", err) continue } - if ingressLive != s.state.IngressLive { - s.state.IngressLive = ingressLive + if ingressLive != s.state.Live { + s.state.Live = ingressLive sendState() } case action, ok := <-s.ch: diff --git a/mediaserver/actor_test.go b/mediaserver/actor_test.go index 7250612..9ca16fb 100644 --- a/mediaserver/actor_test.go +++ b/mediaserver/actor_test.go @@ -21,7 +21,7 @@ func TestMediaServerStartStop(t *testing.T) { t.Cleanup(cancel) logger := testhelpers.NewTestLogger() - containerClient, err := container.NewClient(logger) + containerClient, err := container.NewActor(ctx, container.NewActorParams{ChanSize: 1, Logger: logger}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, containerClient.Close()) }) @@ -29,7 +29,7 @@ func TestMediaServerStartStop(t *testing.T) { require.NoError(t, err) assert.False(t, running) - actor, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ + mediaServer, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ ChanSize: 1, ContainerClient: containerClient, Logger: logger, @@ -47,20 +47,20 @@ func TestMediaServerStartStop(t *testing.T) { "container not in RUNNING state", ) - state := actor.State() - assert.False(t, state.IngressLive) - assert.Equal(t, "rtmp://localhost:1935/live", state.IngressURL) + state := mediaServer.State() + assert.False(t, state.Live) + assert.Equal(t, "rtmp://localhost:1935/live", state.URL) launchFFMPEG(t, "rtmp://localhost:1935/live") require.Eventually( t, - func() bool { return actor.State().IngressLive }, + func() bool { return mediaServer.State().Live }, 5*time.Second, 250*time.Millisecond, "actor not in LIVE state", ) - actor.Close() + mediaServer.Close() running, err = containerClient.ContainerRunning(ctx, map[string]string{"component": component}) require.NoError(t, err) diff --git a/terminal/actor.go b/terminal/actor.go index a2652ea..92306ff 100644 --- a/terminal/actor.go +++ b/terminal/actor.go @@ -3,8 +3,8 @@ package terminal import ( "cmp" "context" + "fmt" "log/slog" - "strings" "git.netflux.io/rob/termstream/domain" "github.com/gdamore/tcell/v2" @@ -13,12 +13,12 @@ import ( // Actor is responsible for managing the terminal user interface. type Actor struct { - app *tview.Application - ch chan action - commandCh chan Command - logger *slog.Logger - serverBox *tview.TextView - destBox *tview.Table + app *tview.Application + ch chan action + commandCh chan Command + logger *slog.Logger + sourceView *tview.Table + destView *tview.Table } const defaultChanSize = 64 @@ -39,44 +39,43 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { commandCh := make(chan Command, chanSize) app := tview.NewApplication() - serverBox := tview.NewTextView() - serverBox.SetDynamicColors(true) - serverBox.SetBorder(true) - serverBox.SetTitle("media server") - serverBox.SetTextAlign(tview.AlignCenter) - destBox := tview.NewTable() - destBox.SetTitle("destinations") - destBox.SetBorder(true) - destBox.SetSelectable(true, false) - destBox.SetWrapSelection(true, false) - destBox.SetDoneFunc(func(key tcell.Key) { - row, _ := destBox.GetSelection() - commandCh <- CommandToggleDestination{URL: destBox.GetCell(row, 0).Text} + sourceView := tview.NewTable() + sourceView.SetTitle("source") + sourceView.SetBorder(true) + + destView := tview.NewTable() + destView.SetTitle("destinations") + destView.SetBorder(true) + destView.SetSelectable(true, false) + destView.SetWrapSelection(true, false) + destView.SetDoneFunc(func(key tcell.Key) { + row, _ := destView.GetSelection() + commandCh <- CommandToggleDestination{URL: destView.GetCell(row, 0).Text} }) flex := tview.NewFlex(). SetDirection(tview.FlexRow). - AddItem(serverBox, 9, 0, false). - AddItem(destBox, 0, 1, false) + AddItem(sourceView, 4, 0, false). + AddItem(destView, 0, 1, false) container := tview.NewFlex(). SetDirection(tview.FlexColumn). AddItem(nil, 0, 1, false). - AddItem(flex, 120, 0, false). + AddItem(flex, 180, 0, false). AddItem(nil, 0, 1, false) app.SetRoot(container, true) - app.SetFocus(destBox) + app.SetFocus(destView) app.EnableMouse(false) actor := &Actor{ - ch: ch, - commandCh: commandCh, - logger: params.Logger, - app: app, - serverBox: serverBox, - destBox: destBox, + ch: ch, + commandCh: commandCh, + logger: params.Logger, + app: app, + sourceView: sourceView, + destView: destView, } app.SetInputCapture(func(event *tcell.EventKey) *tcell.EventKey { @@ -133,52 +132,41 @@ func (a *Actor) SetState(state domain.AppState) { } func (a *Actor) redrawFromState(state domain.AppState) { - a.serverBox.SetText(generateServerStatus(state)) + setHeaderRow := func(tableView *tview.Table) { + tableView.SetCell(0, 0, tview.NewTableCell("[grey]URL").SetAlign(tview.AlignLeft).SetExpansion(7).SetSelectable(false)) + tableView.SetCell(0, 1, tview.NewTableCell("[grey]Status").SetAlign(tview.AlignLeft).SetExpansion(1).SetSelectable(false)) + tableView.SetCell(0, 2, tview.NewTableCell("[grey]CPU %").SetAlign(tview.AlignLeft).SetExpansion(1).SetSelectable(false)) + tableView.SetCell(0, 3, tview.NewTableCell("[grey]Mem used (MB)").SetAlign(tview.AlignLeft).SetExpansion(1).SetSelectable(false)) + tableView.SetCell(0, 4, tview.NewTableCell("[grey]Actions").SetAlign(tview.AlignLeft).SetExpansion(2).SetSelectable(false)) + } - a.destBox.Clear() + a.sourceView.Clear() + setHeaderRow(a.sourceView) + sourceContainer := state.Containers[state.Source.ContainerID] + a.sourceView.SetCell(1, 0, tview.NewTableCell(state.Source.URL)) + if state.Source.Live { + a.sourceView.SetCell(1, 1, tview.NewTableCell("[green]on-air")) + } else { + a.sourceView.SetCell(1, 1, tview.NewTableCell("[yellow]off-air")) + } + a.sourceView.SetCell(1, 2, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", sourceContainer.CPUPercent))) + a.sourceView.SetCell(1, 3, tview.NewTableCell("[white]"+fmt.Sprintf("%.1f", float64(sourceContainer.MemoryUsageBytes)/1024/1024))) + a.sourceView.SetCell(1, 4, tview.NewTableCell("")) - a.destBox.SetCell(0, 0, tview.NewTableCell("[grey]URL").SetAlign(tview.AlignLeft).SetExpansion(7).SetSelectable(false)) - a.destBox.SetCell(0, 1, tview.NewTableCell("[grey]Status").SetAlign(tview.AlignLeft).SetExpansion(1).SetSelectable(false)) - a.destBox.SetCell(0, 2, tview.NewTableCell("[grey]Actions").SetAlign(tview.AlignLeft).SetExpansion(2).SetSelectable(false)) + a.destView.Clear() + setHeaderRow(a.destView) for i, dest := range state.Destinations { - a.destBox.SetCell(i+1, 0, tview.NewTableCell(dest.URL)) - a.destBox.SetCell(i+1, 1, tview.NewTableCell("[yellow]off-air")) - a.destBox.SetCell(i+1, 2, tview.NewTableCell("[green]Tab to go live")) + a.destView.SetCell(i+1, 0, tview.NewTableCell(dest.URL)) + a.destView.SetCell(i+1, 1, tview.NewTableCell("[yellow]off-air")) + a.destView.SetCell(i+1, 2, tview.NewTableCell("[white]-")) + a.destView.SetCell(i+1, 3, tview.NewTableCell("[white]-")) + a.destView.SetCell(i+1, 4, tview.NewTableCell("[green]Tab to go live")) } a.app.Draw() } -func generateServerStatus(state domain.AppState) string { - var s strings.Builder - - s.WriteString("\n") - - s.WriteString("[grey]Container status: ") - if state.ContainerRunning { - s.WriteString("[green]running") - } else { - s.WriteString("[red]stopped") - } - s.WriteString("\n\n") - - s.WriteString("[grey]RTMP URL: ") - if state.IngressURL != "" { - s.WriteString("[white:grey]" + state.IngressURL) - } - s.WriteString("\n\n") - - s.WriteString("[grey:black]Ingress stream: ") - if state.IngressLive { - s.WriteString("[green]on-air") - } else { - s.WriteString("[yellow]off-air") - } - - return s.String() -} - // Close closes the terminal user interface. func (a *Actor) Close() { a.app.Stop()