refactor(container): remove unnecessary abstractions

This commit is contained in:
Rob Watson 2025-01-26 06:52:52 +01:00
parent 99caa31f2e
commit 71297b093c
5 changed files with 153 additions and 141 deletions

View File

@ -1,12 +1,10 @@
package container package container
import ( import (
"cmp"
"context" "context"
"fmt" "fmt"
"io" "io"
"log/slog" "log/slog"
"maps"
"sync" "sync"
"time" "time"
@ -20,30 +18,97 @@ import (
var containerStopTimeout = 10 * time.Second var containerStopTimeout = 10 * time.Second
// Runner is responsible for running containers. // Client is a thin wrapper around the Docker client.
type Runner struct { type Client struct {
id uuid.UUID id uuid.UUID
wg sync.WaitGroup // TODO: is it needed? wg sync.WaitGroup // TODO: is it needed?
apiClient *client.Client apiClient *client.Client
logger *slog.Logger logger *slog.Logger
} }
// NewRunner creates a new Runner. // NewClient creates a new Client.
func NewRunner(logger *slog.Logger) (*Runner, error) { func NewClient(logger *slog.Logger) (*Client, error) {
apiClient, err := client.NewClientWithOpts(client.FromEnv) apiClient, err := client.NewClientWithOpts(client.FromEnv)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Runner{ return &Client{
id: uuid.New(), id: uuid.New(),
apiClient: apiClient, apiClient: apiClient,
logger: logger, logger: logger,
}, nil }, nil
} }
// Close closes the runner, stopping and removing all running containers. // RunContainerParams are the parameters for running a container.
func (r *Runner) Close() error { type RunContainerParams struct {
Name string
ContainerConfig *container.Config
HostConfig *container.HostConfig
}
// RunContainer runs a container with the given parameters.
func (r *Client) RunContainer(ctx context.Context, params RunContainerParams) (string, <-chan struct{}, error) {
pullReader, err := r.apiClient.ImagePull(ctx, params.ContainerConfig.Image, image.PullOptions{})
if err != nil {
return "", nil, fmt.Errorf("image pull: %w", err)
}
_, _ = io.Copy(io.Discard, pullReader)
_ = pullReader.Close()
params.ContainerConfig.Labels["app"] = "termstream"
params.ContainerConfig.Labels["app-id"] = r.id.String()
var name string
if params.Name != "" {
name = "termstream-" + r.id.String() + "-" + params.Name
}
createResp, err := r.apiClient.ContainerCreate(
ctx,
params.ContainerConfig,
params.HostConfig,
nil,
nil,
name,
)
if err != nil {
return "", nil, fmt.Errorf("container create: %w", err)
}
ch := make(chan struct{}, 1)
r.wg.Add(1)
go func() {
defer r.wg.Done()
respChan, errChan := r.apiClient.ContainerWait(ctx, createResp.ID, container.WaitConditionNotRunning)
select {
case resp := <-respChan:
r.logger.Info("Container entered non-running state", "status", resp.StatusCode, "id", shortID(createResp.ID))
case err = <-errChan:
if err != context.Canceled {
r.logger.Error("Error setting container wait", "err", err, "id", shortID(createResp.ID))
}
}
ch <- struct{}{}
}()
if err = r.apiClient.ContainerStart(ctx, createResp.ID, container.StartOptions{}); err != nil {
return "", nil, fmt.Errorf("container start: %w", err)
}
r.logger.Info("Started container", "id", shortID(createResp.ID))
ctr, err := r.apiClient.ContainerInspect(ctx, createResp.ID)
if err != nil {
return "", nil, fmt.Errorf("container inspect: %w", err)
}
return ctr.ID, ch, nil
}
// Close closes the client, stopping and removing all running containers.
func (r *Client) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), containerStopTimeout) ctx, cancel := context.WithTimeout(context.Background(), containerStopTimeout)
defer cancel() defer cancel()
@ -54,7 +119,7 @@ func (r *Runner) Close() error {
for _, container := range containerList { for _, container := range containerList {
if err := r.removeContainer(ctx, container.ID); err != nil { if err := r.removeContainer(ctx, container.ID); err != nil {
r.logger.Error("Error removing container:", "err", err) r.logger.Error("Error removing container:", "err", err, "id", shortID(container.ID))
} }
} }
@ -63,14 +128,14 @@ func (r *Runner) Close() error {
return r.apiClient.Close() return r.apiClient.Close()
} }
func (r *Runner) removeContainer(ctx context.Context, id string) error { func (r *Client) removeContainer(ctx context.Context, id string) error {
r.logger.Info("Stopping container") r.logger.Info("Stopping container", "id", shortID(id))
stopTimeout := int(containerStopTimeout.Seconds()) stopTimeout := int(containerStopTimeout.Seconds())
if err := r.apiClient.ContainerStop(ctx, id, container.StopOptions{Timeout: &stopTimeout}); err != nil { if err := r.apiClient.ContainerStop(ctx, id, container.StopOptions{Timeout: &stopTimeout}); err != nil {
return fmt.Errorf("container stop: %w", err) return fmt.Errorf("container stop: %w", err)
} }
r.logger.Info("Removing container") r.logger.Info("Removing container", "id", shortID(id))
if err := r.apiClient.ContainerRemove(ctx, id, container.RemoveOptions{Force: true}); err != nil { if err := r.apiClient.ContainerRemove(ctx, id, container.RemoveOptions{Force: true}); err != nil {
return fmt.Errorf("container remove: %w", err) return fmt.Errorf("container remove: %w", err)
} }
@ -78,82 +143,8 @@ func (r *Runner) removeContainer(ctx context.Context, id string) error {
return nil return nil
} }
// RunContainerParams are the parameters for running a container.
type RunContainerParams struct {
Name string
Image string
Env []string
Labels map[string]string
NetworkMode string
}
// RunContainer runs a container with the given parameters.
func (r *Runner) RunContainer(ctx context.Context, params RunContainerParams) (<-chan struct{}, error) {
pullReader, err := r.apiClient.ImagePull(ctx, params.Image, image.PullOptions{})
if err != nil {
return nil, fmt.Errorf("image pull: %w", err)
}
_, _ = io.Copy(io.Discard, pullReader)
_ = pullReader.Close()
labels := map[string]string{
"app": "termstream",
"app-id": r.id.String(),
}
maps.Copy(labels, params.Labels)
var name string
if params.Name != "" {
name = "termstream-" + r.id.String() + "-" + params.Name
}
ctr, err := r.apiClient.ContainerCreate(
ctx,
&container.Config{
Image: params.Image,
Env: params.Env,
Labels: labels,
},
&container.HostConfig{
NetworkMode: container.NetworkMode(cmp.Or(params.NetworkMode, "default")),
},
nil,
nil,
name,
)
if err != nil {
return nil, fmt.Errorf("container create: %w", err)
}
if err = r.apiClient.ContainerStart(ctx, ctr.ID, container.StartOptions{}); err != nil {
return nil, fmt.Errorf("container start: %w", err)
}
r.logger.Info("Started container", "id", ctr.ID)
ch := make(chan struct{}, 1)
r.wg.Add(1)
go func() {
defer r.wg.Done()
respChan, errChan := r.apiClient.ContainerWait(ctx, ctr.ID, container.WaitConditionNotRunning)
select {
case resp := <-respChan:
r.logger.Info("Container terminated", "status", resp.StatusCode)
case err = <-errChan:
if err != context.Canceled {
r.logger.Error("Container terminated with error", "err", err)
}
}
ch <- struct{}{}
}()
return ch, nil
}
// ContainerRunning checks if a container with the given labels is running. // ContainerRunning checks if a container with the given labels is running.
func (r *Runner) ContainerRunning(ctx context.Context, labels map[string]string) (bool, error) { func (r *Client) ContainerRunning(ctx context.Context, labels map[string]string) (bool, error) {
containers, err := r.containersMatchingLabels(ctx, labels) containers, err := r.containersMatchingLabels(ctx, labels)
if err != nil { if err != nil {
return false, fmt.Errorf("container list: %w", err) return false, fmt.Errorf("container list: %w", err)
@ -169,7 +160,7 @@ func (r *Runner) ContainerRunning(ctx context.Context, labels map[string]string)
} }
// RemoveContainers removes all containers with the given labels. // RemoveContainers removes all containers with the given labels.
func (r *Runner) RemoveContainers(ctx context.Context, labels map[string]string) error { func (r *Client) RemoveContainers(ctx context.Context, labels map[string]string) error {
containers, err := r.containersMatchingLabels(ctx, labels) containers, err := r.containersMatchingLabels(ctx, labels)
if err != nil { if err != nil {
return fmt.Errorf("container list: %w", err) return fmt.Errorf("container list: %w", err)
@ -177,14 +168,14 @@ func (r *Runner) RemoveContainers(ctx context.Context, labels map[string]string)
for _, container := range containers { for _, container := range containers {
if err := r.removeContainer(ctx, container.ID); err != nil { if err := r.removeContainer(ctx, container.ID); err != nil {
r.logger.Error("Error removing container:", "err", err) r.logger.Error("Error removing container:", "err", err, "id", shortID(container.ID))
} }
} }
return nil return nil
} }
func (r *Runner) containersMatchingLabels(ctx context.Context, labels map[string]string) ([]types.Container, error) { func (r *Client) containersMatchingLabels(ctx context.Context, labels map[string]string) ([]types.Container, error) {
filterArgs := filters.NewArgs( filterArgs := filters.NewArgs(
filters.Arg("label", "app=termstream"), filters.Arg("label", "app=termstream"),
filters.Arg("label", "app-id="+r.id.String()), filters.Arg("label", "app-id="+r.id.String()),
@ -197,3 +188,10 @@ func (r *Runner) containersMatchingLabels(ctx context.Context, labels map[string
Filters: filterArgs, Filters: filterArgs,
}) })
} }
func shortID(id string) string {
if len(id) < 12 {
return id
}
return id[:12]
}

View File

@ -7,13 +7,14 @@ import (
"git.netflux.io/rob/termstream/container" "git.netflux.io/rob/termstream/container"
"git.netflux.io/rob/termstream/testhelpers" "git.netflux.io/rob/termstream/testhelpers"
typescontainer "github.com/docker/docker/api/types/container"
"github.com/docker/docker/client" "github.com/docker/docker/client"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestRunnerStartStop(t *testing.T) { func TestClientStartStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel) t.Cleanup(cancel)
@ -25,35 +26,40 @@ func TestRunnerStartStop(t *testing.T) {
containerName := "termstream-test-" + uuid.NewString() containerName := "termstream-test-" + uuid.NewString()
component := "test-start-stop" component := "test-start-stop"
runner, err := container.NewRunner(logger) client, err := container.NewClient(logger)
require.NoError(t, err) require.NoError(t, err)
running, err := runner.ContainerRunning(ctx, map[string]string{"component": component}) running, err := client.ContainerRunning(ctx, map[string]string{"component": component})
require.NoError(t, err) require.NoError(t, err)
assert.False(t, running) assert.False(t, running)
_, err = runner.RunContainer(ctx, container.RunContainerParams{ containerID, _, err := client.RunContainer(ctx, container.RunContainerParams{
Name: containerName, Name: containerName,
ContainerConfig: &typescontainer.Config{
Image: "bluenviron/mediamtx", Image: "bluenviron/mediamtx",
Labels: map[string]string{"component": component}, Labels: map[string]string{"component": component},
},
HostConfig: &typescontainer.HostConfig{
NetworkMode: "default", NetworkMode: "default",
},
}) })
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, containerID)
require.Eventually( require.Eventually(
t, t,
func() bool { func() bool {
running, err = runner.ContainerRunning(ctx, map[string]string{"component": component}) running, err = client.ContainerRunning(ctx, map[string]string{"component": component})
return err == nil && running return err == nil && running
}, },
5*time.Second, 2*time.Second,
250*time.Millisecond, 100*time.Millisecond,
"container not in RUNNING state", "container not in RUNNING state",
) )
runner.Close() client.Close()
running, err = runner.ContainerRunning(ctx, map[string]string{"component": component}) running, err = client.ContainerRunning(ctx, map[string]string{"component": component})
require.NoError(t, err) require.NoError(t, err)
assert.False(t, running) assert.False(t, running)
} }

View File

@ -39,14 +39,14 @@ func run(ctx context.Context, cfgReader io.Reader) error {
logger := slog.New(slog.NewTextHandler(logFile, nil)) logger := slog.New(slog.NewTextHandler(logFile, nil))
logger.Info("Starting termstream", slog.Any("initial_state", state)) logger.Info("Starting termstream", slog.Any("initial_state", state))
runner, err := container.NewRunner(logger.With("component", "runner")) containerClient, err := container.NewClient(logger.With("component", "container_client"))
if err != nil { if err != nil {
return fmt.Errorf("new runner: %w", err) return fmt.Errorf("new container client: %w", err)
} }
defer runner.Close() defer containerClient.Close()
srv, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ srv, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{
Runner: runner, Client: containerClient,
Logger: logger.With("component", "mediaserver"), Logger: logger.With("component", "mediaserver"),
}) })
if err != nil { if err != nil {

View File

@ -10,6 +10,8 @@ import (
"net/http" "net/http"
"time" "time"
typescontainer "github.com/docker/docker/api/types/container"
"git.netflux.io/rob/termstream/container" "git.netflux.io/rob/termstream/container"
) )
@ -21,6 +23,7 @@ const (
// State contains the current state of the media server. // State contains the current state of the media server.
type State struct { type State struct {
ContainerRunning bool ContainerRunning bool
ContainerID string
IngressLive bool IngressLive bool
IngressURL string IngressURL string
} }
@ -33,7 +36,7 @@ type Actor struct {
ch chan action ch chan action
state *State state *State
stateChan chan State stateChan chan State
runner *container.Runner containerClient *container.Client
logger *slog.Logger logger *slog.Logger
httpClient *http.Client httpClient *http.Client
} }
@ -41,7 +44,7 @@ type Actor struct {
// StartActorParams contains the parameters for starting a new media server // StartActorParams contains the parameters for starting a new media server
// actor. // actor.
type StartActorParams struct { type StartActorParams struct {
Runner *container.Runner ContainerClient *container.Client
ChanSize int ChanSize int
Logger *slog.Logger Logger *slog.Logger
} }
@ -60,15 +63,16 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) {
ch: make(chan action, chanSize), ch: make(chan action, chanSize),
state: new(State), state: new(State),
stateChan: make(chan State, chanSize), stateChan: make(chan State, chanSize),
runner: params.Runner, containerClient: params.ContainerClient,
logger: params.Logger, logger: params.Logger,
httpClient: &http.Client{Timeout: httpClientTimeout}, httpClient: &http.Client{Timeout: httpClientTimeout},
} }
containerDone, err := params.Runner.RunContainer( containerID, containerDone, err := params.ContainerClient.RunContainer(
ctx, ctx,
container.RunContainerParams{ container.RunContainerParams{
Name: "server", Name: "server",
ContainerConfig: &typescontainer.Config{
Image: imageNameMediaMTX, Image: imageNameMediaMTX,
Env: []string{ Env: []string{
"MTX_LOGLEVEL=debug", "MTX_LOGLEVEL=debug",
@ -77,13 +81,17 @@ func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) {
Labels: map[string]string{ Labels: map[string]string{
"component": componentName, "component": componentName,
}, },
},
HostConfig: &typescontainer.HostConfig{
NetworkMode: "host", NetworkMode: "host",
}, },
},
) )
if err != nil { if err != nil {
return nil, fmt.Errorf("run container: %w", err) return nil, fmt.Errorf("run container: %w", err)
} }
actor.state.ContainerID = containerID
actor.state.ContainerRunning = true actor.state.ContainerRunning = true
actor.state.IngressURL = "rtmp://localhost:1935/" + rtmpPath actor.state.IngressURL = "rtmp://localhost:1935/" + rtmpPath
@ -108,7 +116,7 @@ func (s *Actor) State() State {
// Close closes the media server actor. // Close closes the media server actor.
func (s *Actor) Close() error { func (s *Actor) Close() error {
if err := s.runner.RemoveContainers(context.Background(), map[string]string{"component": componentName}); err != nil { if err := s.containerClient.RemoveContainers(context.Background(), map[string]string{"component": componentName}); err != nil {
return fmt.Errorf("remove containers: %w", err) return fmt.Errorf("remove containers: %w", err)
} }

View File

@ -21,17 +21,17 @@ func TestMediaServerStartStop(t *testing.T) {
t.Cleanup(cancel) t.Cleanup(cancel)
logger := testhelpers.NewTestLogger() logger := testhelpers.NewTestLogger()
runner, err := container.NewRunner(logger) containerClient, err := container.NewClient(logger)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, runner.Close()) }) t.Cleanup(func() { require.NoError(t, containerClient.Close()) })
running, err := runner.ContainerRunning(ctx, map[string]string{"component": component}) running, err := containerClient.ContainerRunning(ctx, map[string]string{"component": component})
require.NoError(t, err) require.NoError(t, err)
assert.False(t, running) assert.False(t, running)
actor, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ actor, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{
ChanSize: 1, ChanSize: 1,
Runner: runner, ContainerClient: containerClient,
Logger: logger, Logger: logger,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -39,7 +39,7 @@ func TestMediaServerStartStop(t *testing.T) {
require.Eventually( require.Eventually(
t, t,
func() bool { func() bool {
running, err = runner.ContainerRunning(ctx, map[string]string{"component": component}) running, err = containerClient.ContainerRunning(ctx, map[string]string{"component": component})
return err == nil && running return err == nil && running
}, },
5*time.Second, 5*time.Second,
@ -62,7 +62,7 @@ func TestMediaServerStartStop(t *testing.T) {
actor.Close() actor.Close()
running, err = runner.ContainerRunning(ctx, map[string]string{"component": component}) running, err = containerClient.ContainerRunning(ctx, map[string]string{"component": component})
require.NoError(t, err) require.NoError(t, err)
assert.False(t, running) assert.False(t, running)
} }