From 71297b093ca79a242f5eb165e14eb5a77ca73767 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Sun, 26 Jan 2025 06:52:52 +0100 Subject: [PATCH] refactor(container): remove unnecessary abstractions --- container/{runner.go => client.go} | 180 +++++++++---------- container/{runner_test.go => client_test.go} | 32 ++-- main.go | 8 +- mediaserver/actor.go | 58 +++--- mediaserver/actor_test.go | 16 +- 5 files changed, 153 insertions(+), 141 deletions(-) rename container/{runner.go => client.go} (58%) rename container/{runner_test.go => client_test.go} (56%) diff --git a/container/runner.go b/container/client.go similarity index 58% rename from container/runner.go rename to container/client.go index ecf93a9..6a62f83 100644 --- a/container/runner.go +++ b/container/client.go @@ -1,12 +1,10 @@ package container import ( - "cmp" "context" "fmt" "io" "log/slog" - "maps" "sync" "time" @@ -20,30 +18,97 @@ import ( var containerStopTimeout = 10 * time.Second -// Runner is responsible for running containers. -type Runner struct { +// Client is a thin wrapper around the Docker client. +type Client struct { id uuid.UUID wg sync.WaitGroup // TODO: is it needed? apiClient *client.Client logger *slog.Logger } -// NewRunner creates a new Runner. -func NewRunner(logger *slog.Logger) (*Runner, error) { +// NewClient creates a new Client. +func NewClient(logger *slog.Logger) (*Client, error) { apiClient, err := client.NewClientWithOpts(client.FromEnv) if err != nil { return nil, err } - return &Runner{ + return &Client{ id: uuid.New(), apiClient: apiClient, logger: logger, }, nil } -// Close closes the runner, stopping and removing all running containers. -func (r *Runner) Close() error { +// RunContainerParams are the parameters for running a container. +type RunContainerParams struct { + Name string + ContainerConfig *container.Config + HostConfig *container.HostConfig +} + +// RunContainer runs a container with the given parameters. +func (r *Client) RunContainer(ctx context.Context, params RunContainerParams) (string, <-chan struct{}, error) { + pullReader, err := r.apiClient.ImagePull(ctx, params.ContainerConfig.Image, image.PullOptions{}) + if err != nil { + return "", nil, fmt.Errorf("image pull: %w", err) + } + _, _ = io.Copy(io.Discard, pullReader) + _ = pullReader.Close() + + params.ContainerConfig.Labels["app"] = "termstream" + params.ContainerConfig.Labels["app-id"] = r.id.String() + + var name string + if params.Name != "" { + name = "termstream-" + r.id.String() + "-" + params.Name + } + + createResp, err := r.apiClient.ContainerCreate( + ctx, + params.ContainerConfig, + params.HostConfig, + nil, + nil, + name, + ) + if err != nil { + return "", nil, fmt.Errorf("container create: %w", err) + } + + ch := make(chan struct{}, 1) + r.wg.Add(1) + go func() { + defer r.wg.Done() + + respChan, errChan := r.apiClient.ContainerWait(ctx, createResp.ID, container.WaitConditionNotRunning) + select { + case resp := <-respChan: + r.logger.Info("Container entered non-running state", "status", resp.StatusCode, "id", shortID(createResp.ID)) + case err = <-errChan: + if err != context.Canceled { + r.logger.Error("Error setting container wait", "err", err, "id", shortID(createResp.ID)) + } + } + + ch <- struct{}{} + }() + + if err = r.apiClient.ContainerStart(ctx, createResp.ID, container.StartOptions{}); err != nil { + return "", nil, fmt.Errorf("container start: %w", err) + } + r.logger.Info("Started container", "id", shortID(createResp.ID)) + + ctr, err := r.apiClient.ContainerInspect(ctx, createResp.ID) + if err != nil { + return "", nil, fmt.Errorf("container inspect: %w", err) + } + + return ctr.ID, ch, nil +} + +// Close closes the client, stopping and removing all running containers. +func (r *Client) Close() error { ctx, cancel := context.WithTimeout(context.Background(), containerStopTimeout) defer cancel() @@ -54,7 +119,7 @@ func (r *Runner) Close() error { for _, container := range containerList { if err := r.removeContainer(ctx, container.ID); err != nil { - r.logger.Error("Error removing container:", "err", err) + r.logger.Error("Error removing container:", "err", err, "id", shortID(container.ID)) } } @@ -63,14 +128,14 @@ func (r *Runner) Close() error { return r.apiClient.Close() } -func (r *Runner) removeContainer(ctx context.Context, id string) error { - r.logger.Info("Stopping container") +func (r *Client) removeContainer(ctx context.Context, id string) error { + r.logger.Info("Stopping container", "id", shortID(id)) stopTimeout := int(containerStopTimeout.Seconds()) if err := r.apiClient.ContainerStop(ctx, id, container.StopOptions{Timeout: &stopTimeout}); err != nil { return fmt.Errorf("container stop: %w", err) } - r.logger.Info("Removing container") + r.logger.Info("Removing container", "id", shortID(id)) if err := r.apiClient.ContainerRemove(ctx, id, container.RemoveOptions{Force: true}); err != nil { return fmt.Errorf("container remove: %w", err) } @@ -78,82 +143,8 @@ func (r *Runner) removeContainer(ctx context.Context, id string) error { return nil } -// RunContainerParams are the parameters for running a container. -type RunContainerParams struct { - Name string - Image string - Env []string - Labels map[string]string - NetworkMode string -} - -// RunContainer runs a container with the given parameters. -func (r *Runner) RunContainer(ctx context.Context, params RunContainerParams) (<-chan struct{}, error) { - pullReader, err := r.apiClient.ImagePull(ctx, params.Image, image.PullOptions{}) - if err != nil { - return nil, fmt.Errorf("image pull: %w", err) - } - _, _ = io.Copy(io.Discard, pullReader) - _ = pullReader.Close() - - labels := map[string]string{ - "app": "termstream", - "app-id": r.id.String(), - } - maps.Copy(labels, params.Labels) - - var name string - if params.Name != "" { - name = "termstream-" + r.id.String() + "-" + params.Name - } - - ctr, err := r.apiClient.ContainerCreate( - ctx, - &container.Config{ - Image: params.Image, - Env: params.Env, - Labels: labels, - }, - &container.HostConfig{ - NetworkMode: container.NetworkMode(cmp.Or(params.NetworkMode, "default")), - }, - nil, - nil, - name, - ) - if err != nil { - return nil, fmt.Errorf("container create: %w", err) - } - - if err = r.apiClient.ContainerStart(ctx, ctr.ID, container.StartOptions{}); err != nil { - return nil, fmt.Errorf("container start: %w", err) - } - r.logger.Info("Started container", "id", ctr.ID) - - ch := make(chan struct{}, 1) - r.wg.Add(1) - - go func() { - defer r.wg.Done() - - respChan, errChan := r.apiClient.ContainerWait(ctx, ctr.ID, container.WaitConditionNotRunning) - select { - case resp := <-respChan: - r.logger.Info("Container terminated", "status", resp.StatusCode) - case err = <-errChan: - if err != context.Canceled { - r.logger.Error("Container terminated with error", "err", err) - } - } - - ch <- struct{}{} - }() - - return ch, nil -} - // ContainerRunning checks if a container with the given labels is running. -func (r *Runner) ContainerRunning(ctx context.Context, labels map[string]string) (bool, error) { +func (r *Client) ContainerRunning(ctx context.Context, labels map[string]string) (bool, error) { containers, err := r.containersMatchingLabels(ctx, labels) if err != nil { return false, fmt.Errorf("container list: %w", err) @@ -169,7 +160,7 @@ func (r *Runner) ContainerRunning(ctx context.Context, labels map[string]string) } // RemoveContainers removes all containers with the given labels. -func (r *Runner) RemoveContainers(ctx context.Context, labels map[string]string) error { +func (r *Client) RemoveContainers(ctx context.Context, labels map[string]string) error { containers, err := r.containersMatchingLabels(ctx, labels) if err != nil { return fmt.Errorf("container list: %w", err) @@ -177,14 +168,14 @@ func (r *Runner) RemoveContainers(ctx context.Context, labels map[string]string) for _, container := range containers { if err := r.removeContainer(ctx, container.ID); err != nil { - r.logger.Error("Error removing container:", "err", err) + r.logger.Error("Error removing container:", "err", err, "id", shortID(container.ID)) } } return nil } -func (r *Runner) containersMatchingLabels(ctx context.Context, labels map[string]string) ([]types.Container, error) { +func (r *Client) containersMatchingLabels(ctx context.Context, labels map[string]string) ([]types.Container, error) { filterArgs := filters.NewArgs( filters.Arg("label", "app=termstream"), filters.Arg("label", "app-id="+r.id.String()), @@ -197,3 +188,10 @@ func (r *Runner) containersMatchingLabels(ctx context.Context, labels map[string Filters: filterArgs, }) } + +func shortID(id string) string { + if len(id) < 12 { + return id + } + return id[:12] +} diff --git a/container/runner_test.go b/container/client_test.go similarity index 56% rename from container/runner_test.go rename to container/client_test.go index bed6e08..29d0419 100644 --- a/container/runner_test.go +++ b/container/client_test.go @@ -7,13 +7,14 @@ import ( "git.netflux.io/rob/termstream/container" "git.netflux.io/rob/termstream/testhelpers" + typescontainer "github.com/docker/docker/api/types/container" "github.com/docker/docker/client" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestRunnerStartStop(t *testing.T) { +func TestClientStartStop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -25,35 +26,40 @@ func TestRunnerStartStop(t *testing.T) { containerName := "termstream-test-" + uuid.NewString() component := "test-start-stop" - runner, err := container.NewRunner(logger) + client, err := container.NewClient(logger) require.NoError(t, err) - running, err := runner.ContainerRunning(ctx, map[string]string{"component": component}) + running, err := client.ContainerRunning(ctx, map[string]string{"component": component}) require.NoError(t, err) assert.False(t, running) - _, err = runner.RunContainer(ctx, container.RunContainerParams{ - Name: containerName, - Image: "bluenviron/mediamtx", - Labels: map[string]string{"component": component}, - NetworkMode: "default", + containerID, _, err := client.RunContainer(ctx, container.RunContainerParams{ + Name: containerName, + ContainerConfig: &typescontainer.Config{ + Image: "bluenviron/mediamtx", + Labels: map[string]string{"component": component}, + }, + HostConfig: &typescontainer.HostConfig{ + NetworkMode: "default", + }, }) require.NoError(t, err) + assert.NotEmpty(t, containerID) require.Eventually( t, func() bool { - running, err = runner.ContainerRunning(ctx, map[string]string{"component": component}) + running, err = client.ContainerRunning(ctx, map[string]string{"component": component}) return err == nil && running }, - 5*time.Second, - 250*time.Millisecond, + 2*time.Second, + 100*time.Millisecond, "container not in RUNNING state", ) - runner.Close() + client.Close() - running, err = runner.ContainerRunning(ctx, map[string]string{"component": component}) + running, err = client.ContainerRunning(ctx, map[string]string{"component": component}) require.NoError(t, err) assert.False(t, running) } diff --git a/main.go b/main.go index 7c1b961..8757e85 100644 --- a/main.go +++ b/main.go @@ -39,14 +39,14 @@ func run(ctx context.Context, cfgReader io.Reader) error { logger := slog.New(slog.NewTextHandler(logFile, nil)) logger.Info("Starting termstream", slog.Any("initial_state", state)) - runner, err := container.NewRunner(logger.With("component", "runner")) + containerClient, err := container.NewClient(logger.With("component", "container_client")) if err != nil { - return fmt.Errorf("new runner: %w", err) + return fmt.Errorf("new container client: %w", err) } - defer runner.Close() + defer containerClient.Close() srv, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ - Runner: runner, + Client: containerClient, Logger: logger.With("component", "mediaserver"), }) if err != nil { diff --git a/mediaserver/actor.go b/mediaserver/actor.go index bc5f4f4..ac55932 100644 --- a/mediaserver/actor.go +++ b/mediaserver/actor.go @@ -10,6 +10,8 @@ import ( "net/http" "time" + typescontainer "github.com/docker/docker/api/types/container" + "git.netflux.io/rob/termstream/container" ) @@ -21,6 +23,7 @@ const ( // State contains the current state of the media server. type State struct { ContainerRunning bool + ContainerID string IngressLive bool IngressURL string } @@ -30,20 +33,20 @@ type action func() // Actor is responsible for managing the media server. type Actor struct { - ch chan action - state *State - stateChan chan State - runner *container.Runner - logger *slog.Logger - httpClient *http.Client + ch chan action + state *State + stateChan chan State + containerClient *container.Client + logger *slog.Logger + httpClient *http.Client } // StartActorParams contains the parameters for starting a new media server // actor. type StartActorParams struct { - Runner *container.Runner - ChanSize int - Logger *slog.Logger + ContainerClient *container.Client + ChanSize int + Logger *slog.Logger } const ( @@ -57,33 +60,38 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { chanSize := cmp.Or(params.ChanSize, defaultChanSize) actor := &Actor{ - ch: make(chan action, chanSize), - state: new(State), - stateChan: make(chan State, chanSize), - runner: params.Runner, - logger: params.Logger, - httpClient: &http.Client{Timeout: httpClientTimeout}, + ch: make(chan action, chanSize), + state: new(State), + stateChan: make(chan State, chanSize), + containerClient: params.ContainerClient, + logger: params.Logger, + httpClient: &http.Client{Timeout: httpClientTimeout}, } - containerDone, err := params.Runner.RunContainer( + containerID, containerDone, err := params.ContainerClient.RunContainer( ctx, container.RunContainerParams{ - Name: "server", - Image: imageNameMediaMTX, - Env: []string{ - "MTX_LOGLEVEL=debug", - "MTX_API=yes", + Name: "server", + ContainerConfig: &typescontainer.Config{ + Image: imageNameMediaMTX, + Env: []string{ + "MTX_LOGLEVEL=debug", + "MTX_API=yes", + }, + Labels: map[string]string{ + "component": componentName, + }, }, - Labels: map[string]string{ - "component": componentName, + HostConfig: &typescontainer.HostConfig{ + NetworkMode: "host", }, - NetworkMode: "host", }, ) if err != nil { return nil, fmt.Errorf("run container: %w", err) } + actor.state.ContainerID = containerID actor.state.ContainerRunning = true actor.state.IngressURL = "rtmp://localhost:1935/" + rtmpPath @@ -108,7 +116,7 @@ func (s *Actor) State() State { // Close closes the media server actor. func (s *Actor) Close() error { - if err := s.runner.RemoveContainers(context.Background(), map[string]string{"component": componentName}); err != nil { + if err := s.containerClient.RemoveContainers(context.Background(), map[string]string{"component": componentName}); err != nil { return fmt.Errorf("remove containers: %w", err) } diff --git a/mediaserver/actor_test.go b/mediaserver/actor_test.go index fb17699..7250612 100644 --- a/mediaserver/actor_test.go +++ b/mediaserver/actor_test.go @@ -21,25 +21,25 @@ func TestMediaServerStartStop(t *testing.T) { t.Cleanup(cancel) logger := testhelpers.NewTestLogger() - runner, err := container.NewRunner(logger) + containerClient, err := container.NewClient(logger) require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, runner.Close()) }) + t.Cleanup(func() { require.NoError(t, containerClient.Close()) }) - running, err := runner.ContainerRunning(ctx, map[string]string{"component": component}) + running, err := containerClient.ContainerRunning(ctx, map[string]string{"component": component}) require.NoError(t, err) assert.False(t, running) actor, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ - ChanSize: 1, - Runner: runner, - Logger: logger, + ChanSize: 1, + ContainerClient: containerClient, + Logger: logger, }) require.NoError(t, err) require.Eventually( t, func() bool { - running, err = runner.ContainerRunning(ctx, map[string]string{"component": component}) + running, err = containerClient.ContainerRunning(ctx, map[string]string{"component": component}) return err == nil && running }, 5*time.Second, @@ -62,7 +62,7 @@ func TestMediaServerStartStop(t *testing.T) { actor.Close() - running, err = runner.ContainerRunning(ctx, map[string]string{"component": component}) + running, err = containerClient.ContainerRunning(ctx, map[string]string{"component": component}) require.NoError(t, err) assert.False(t, running) }