From 51956b20b128ff49e4920797865b1cfafde4ae8a Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Sat, 18 Jan 2025 14:42:26 +0100 Subject: [PATCH] feat: bootstrap server --- .github/workflows/ci-build.yml | 44 ++++++++ container/runner.go | 199 +++++++++++++++++++++++++++++++++ container/runner_test.go | 59 ++++++++++ go.mod | 37 ++++++ go.sum | 134 ++++++++++++++++++++++ main.go | 54 ++++++++- mediaserver/actor.go | 188 +++++++++++++++++++++++++++++++ mediaserver/actor_test.go | 94 ++++++++++++++++ mise/config.toml | 18 +++ mise/tasks/check_gomod | 13 +++ testhelpers/logging.go | 18 +++ 11 files changed, 857 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/ci-build.yml create mode 100644 container/runner.go create mode 100644 container/runner_test.go create mode 100644 go.sum create mode 100644 mediaserver/actor.go create mode 100644 mediaserver/actor_test.go create mode 100755 mise/tasks/check_gomod create mode 100644 testhelpers/logging.go diff --git a/.github/workflows/ci-build.yml b/.github/workflows/ci-build.yml new file mode 100644 index 0000000..a847298 --- /dev/null +++ b/.github/workflows/ci-build.yml @@ -0,0 +1,44 @@ +name: ci-build +run-name: Building ${{ github.ref_name }} +on: +- push +- pull_request +jobs: + lint: + runs-on: ubuntu-24.04 + steps: + - uses: actions/checkout@v4 + - uses: ludeeus/action-shellcheck@2.0.0 + backend: + runs-on: ubuntu-24.04 + needs: + - lint + steps: + - name: install mise + run: | + curl https://mise.run | sh + echo "$HOME/.local/share/mise/bin" >> $GITHUB_PATH + echo "$HOME/.local/share/mise/shims" >> $GITHUB_PATH + - name: install nscd + run: sudo apt-get install nscd + - name: setup ffmpeg + uses: FedericoCarboni/setup-ffmpeg@v3 + with: + ffmpeg-version: release + - name: checkout + uses: actions/checkout@v4 + - name: Setup Go 1.23.5 + uses: actions/setup-go@v5 + with: + go-version: '1.23.5' + cache: false + - name: golangci-lint + uses: golangci/golangci-lint-action@v6 + with: + version: v1.60.1 + - name: check_gomod + run: mise run check_gomod + - name: test_ci + env: + DOCKER_API_VERSION: "1.45" + run: mise run test_ci diff --git a/container/runner.go b/container/runner.go new file mode 100644 index 0000000..ecf93a9 --- /dev/null +++ b/container/runner.go @@ -0,0 +1,199 @@ +package container + +import ( + "cmp" + "context" + "fmt" + "io" + "log/slog" + "maps" + "sync" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/client" + "github.com/google/uuid" +) + +var containerStopTimeout = 10 * time.Second + +// Runner is responsible for running containers. +type Runner struct { + id uuid.UUID + wg sync.WaitGroup // TODO: is it needed? + apiClient *client.Client + logger *slog.Logger +} + +// NewRunner creates a new Runner. +func NewRunner(logger *slog.Logger) (*Runner, error) { + apiClient, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + return nil, err + } + + return &Runner{ + id: uuid.New(), + apiClient: apiClient, + logger: logger, + }, nil +} + +// Close closes the runner, stopping and removing all running containers. +func (r *Runner) Close() error { + ctx, cancel := context.WithTimeout(context.Background(), containerStopTimeout) + defer cancel() + + containerList, err := r.containersMatchingLabels(ctx, nil) + if err != nil { + return fmt.Errorf("container list: %w", err) + } + + for _, container := range containerList { + if err := r.removeContainer(ctx, container.ID); err != nil { + r.logger.Error("Error removing container:", "err", err) + } + } + + r.wg.Wait() + + return r.apiClient.Close() +} + +func (r *Runner) removeContainer(ctx context.Context, id string) error { + r.logger.Info("Stopping container") + stopTimeout := int(containerStopTimeout.Seconds()) + if err := r.apiClient.ContainerStop(ctx, id, container.StopOptions{Timeout: &stopTimeout}); err != nil { + return fmt.Errorf("container stop: %w", err) + } + + r.logger.Info("Removing container") + if err := r.apiClient.ContainerRemove(ctx, id, container.RemoveOptions{Force: true}); err != nil { + return fmt.Errorf("container remove: %w", err) + } + + return nil +} + +// RunContainerParams are the parameters for running a container. +type RunContainerParams struct { + Name string + Image string + Env []string + Labels map[string]string + NetworkMode string +} + +// RunContainer runs a container with the given parameters. +func (r *Runner) RunContainer(ctx context.Context, params RunContainerParams) (<-chan struct{}, error) { + pullReader, err := r.apiClient.ImagePull(ctx, params.Image, image.PullOptions{}) + if err != nil { + return nil, fmt.Errorf("image pull: %w", err) + } + _, _ = io.Copy(io.Discard, pullReader) + _ = pullReader.Close() + + labels := map[string]string{ + "app": "termstream", + "app-id": r.id.String(), + } + maps.Copy(labels, params.Labels) + + var name string + if params.Name != "" { + name = "termstream-" + r.id.String() + "-" + params.Name + } + + ctr, err := r.apiClient.ContainerCreate( + ctx, + &container.Config{ + Image: params.Image, + Env: params.Env, + Labels: labels, + }, + &container.HostConfig{ + NetworkMode: container.NetworkMode(cmp.Or(params.NetworkMode, "default")), + }, + nil, + nil, + name, + ) + if err != nil { + return nil, fmt.Errorf("container create: %w", err) + } + + if err = r.apiClient.ContainerStart(ctx, ctr.ID, container.StartOptions{}); err != nil { + return nil, fmt.Errorf("container start: %w", err) + } + r.logger.Info("Started container", "id", ctr.ID) + + ch := make(chan struct{}, 1) + r.wg.Add(1) + + go func() { + defer r.wg.Done() + + respChan, errChan := r.apiClient.ContainerWait(ctx, ctr.ID, container.WaitConditionNotRunning) + select { + case resp := <-respChan: + r.logger.Info("Container terminated", "status", resp.StatusCode) + case err = <-errChan: + if err != context.Canceled { + r.logger.Error("Container terminated with error", "err", err) + } + } + + ch <- struct{}{} + }() + + return ch, nil +} + +// ContainerRunning checks if a container with the given labels is running. +func (r *Runner) ContainerRunning(ctx context.Context, labels map[string]string) (bool, error) { + containers, err := r.containersMatchingLabels(ctx, labels) + if err != nil { + return false, fmt.Errorf("container list: %w", err) + } + + for _, container := range containers { + if container.State == "running" { + return true, nil + } + } + + return false, nil +} + +// RemoveContainers removes all containers with the given labels. +func (r *Runner) RemoveContainers(ctx context.Context, labels map[string]string) error { + containers, err := r.containersMatchingLabels(ctx, labels) + if err != nil { + return fmt.Errorf("container list: %w", err) + } + + for _, container := range containers { + if err := r.removeContainer(ctx, container.ID); err != nil { + r.logger.Error("Error removing container:", "err", err) + } + } + + return nil +} + +func (r *Runner) containersMatchingLabels(ctx context.Context, labels map[string]string) ([]types.Container, error) { + filterArgs := filters.NewArgs( + filters.Arg("label", "app=termstream"), + filters.Arg("label", "app-id="+r.id.String()), + ) + for k, v := range labels { + filterArgs.Add("label", k+"="+v) + } + return r.apiClient.ContainerList(ctx, container.ListOptions{ + All: true, + Filters: filterArgs, + }) +} diff --git a/container/runner_test.go b/container/runner_test.go new file mode 100644 index 0000000..bed6e08 --- /dev/null +++ b/container/runner_test.go @@ -0,0 +1,59 @@ +package container_test + +import ( + "context" + "testing" + "time" + + "git.netflux.io/rob/termstream/container" + "git.netflux.io/rob/termstream/testhelpers" + "github.com/docker/docker/client" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRunnerStartStop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + apiClient, err := client.NewClientWithOpts(client.FromEnv) + require.NoError(t, err) + defer apiClient.Close() + + logger := testhelpers.NewTestLogger() + containerName := "termstream-test-" + uuid.NewString() + component := "test-start-stop" + + runner, err := container.NewRunner(logger) + require.NoError(t, err) + + running, err := runner.ContainerRunning(ctx, map[string]string{"component": component}) + require.NoError(t, err) + assert.False(t, running) + + _, err = runner.RunContainer(ctx, container.RunContainerParams{ + Name: containerName, + Image: "bluenviron/mediamtx", + Labels: map[string]string{"component": component}, + NetworkMode: "default", + }) + require.NoError(t, err) + + require.Eventually( + t, + func() bool { + running, err = runner.ContainerRunning(ctx, map[string]string{"component": component}) + return err == nil && running + }, + 5*time.Second, + 250*time.Millisecond, + "container not in RUNNING state", + ) + + runner.Close() + + running, err = runner.ContainerRunning(ctx, map[string]string{"component": component}) + require.NoError(t, err) + assert.False(t, running) +} diff --git a/go.mod b/go.mod index a4d451d..2e2873d 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,40 @@ module git.netflux.io/rob/termstream go 1.23.5 + +require ( + github.com/docker/docker v27.5.0+incompatible + github.com/google/uuid v1.6.0 + github.com/stretchr/testify v1.10.0 +) + +require ( + github.com/Microsoft/go-winio v0.4.14 // indirect + github.com/containerd/log v0.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/distribution/reference v0.6.0 // indirect + github.com/docker/go-connections v0.5.0 // indirect + github.com/docker/go-units v0.5.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/term v0.5.2 // indirect + github.com/morikuni/aec v1.0.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect + go.opentelemetry.io/otel v1.34.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 // indirect + go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.opentelemetry.io/otel/sdk v1.34.0 // indirect + go.opentelemetry.io/otel/trace v1.34.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/time v0.9.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + gotest.tools/v3 v3.5.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c2e010a --- /dev/null +++ b/go.sum @@ -0,0 +1,134 @@ +github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c h1:udKWzYgxTojEKWjV8V+WSxDXJ4NFATAsZjh8iIbsQIg= +github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU= +github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= +github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/docker v27.5.0+incompatible h1:um++2NcQtGRTz5eEgO6aJimo6/JxrTXC941hd05JO6U= +github.com/docker/docker v27.5.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= +github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1/go.mod h1:RBRO7fro65R6tjKzYgLAFo0t1QEXY1Dp+i/bvpRiqiQ= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= +github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= +github.com/moby/term v0.5.2 h1:6qk3FJAFDs6i/q3W/pQ97SX192qKfZgGjCQqfCJkgzQ= +github.com/moby/term v0.5.2/go.mod h1:d3djjFCrjnB+fl8NJux+EJzu0msscUP+f8it8hPkFLc= +github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= +github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= +github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 h1:CV7UdSGJt/Ao6Gp4CXckLxVRRsRgDHoI8XjbL3PDl8s= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0/go.mod h1:FRmFuRJfag1IZ2dPkHnEoSFVgTVPUd2qf5Vi69hLb8I= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 h1:BEj3SPM81McUZHYjRS5pEgNgnmzGJ5tRpU5krWnV8Bs= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0/go.mod h1:9cKLGBDzI/F3NoHLQGm4ZrYdIHsvGt6ej6hUowxY0J4= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= +go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= +go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= +golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA= +google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= +google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= +google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= +google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/main.go b/main.go index 16e2b22..23f433b 100644 --- a/main.go +++ b/main.go @@ -1,5 +1,57 @@ package main +import ( + "context" + "fmt" + "log/slog" + "os" + "os/signal" + + "git.netflux.io/rob/termstream/container" + "git.netflux.io/rob/termstream/mediaserver" +) + func main() { - println("Hello, World!") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if err := run(ctx); err != nil { + _, _ = os.Stderr.WriteString("Error: " + err.Error() + "\n") + } +} + +func run(ctx context.Context) error { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt) + + runner, err := container.NewRunner(logger.With("component", "runner")) + if err != nil { + return fmt.Errorf("new runner: %w", err) + } + defer runner.Close() + + srv, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ + Runner: runner, + Logger: logger.With("component", "mediaserver"), + }) + if err != nil { + return fmt.Errorf("start media server: %w", err) + } + + for { + select { + case <-ch: + logger.Info("Received interrupt signal, shutting down...") + return nil + case state, ok := <-srv.C(): + if ok { + logger.Info("Received state change", "state", state) + } else { + logger.Info("State channel closed, shutting down...") + return nil + } + } + } } diff --git a/mediaserver/actor.go b/mediaserver/actor.go new file mode 100644 index 0000000..9f14e7d --- /dev/null +++ b/mediaserver/actor.go @@ -0,0 +1,188 @@ +package mediaserver + +import ( + "cmp" + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "time" + + "git.netflux.io/rob/termstream/container" +) + +const imageNameMediaMTX = "bluenviron/mediamtx" + +// State contains the current state of the media server. +type State struct { + Live bool +} + +// action is an action to be performed by the actor. +type action func() + +// Actor is responsible for managing the media server. +type Actor struct { + ch chan action + stateChan chan State + runner *container.Runner + logger *slog.Logger + httpClient *http.Client + + // mutable state + live bool +} + +// StartActorParams contains the parameters for starting a new media server +// actor. +type StartActorParams struct { + Runner *container.Runner + ChanSize int + Logger *slog.Logger +} + +const ( + defaultChanSize = 64 + componentName = "mediaserver" + httpClientTimeout = time.Second +) + +// StartActor starts a new media server actor. +func StartActor(ctx context.Context, params StartActorParams) (*Actor, error) { + chanSize := cmp.Or(params.ChanSize, defaultChanSize) + + actor := &Actor{ + ch: make(chan action, chanSize), + stateChan: make(chan State, chanSize), + runner: params.Runner, + logger: params.Logger, + httpClient: &http.Client{Timeout: httpClientTimeout}, + } + + containerDone, err := params.Runner.RunContainer( + ctx, + container.RunContainerParams{ + Name: "server", + Image: imageNameMediaMTX, + Env: []string{ + "MTX_LOGLEVEL=debug", + "MTX_API=yes", + }, + Labels: map[string]string{ + "component": componentName, + }, + NetworkMode: "host", + }, + ) + if err != nil { + return nil, fmt.Errorf("run container: %w", err) + } + + go actor.actorLoop(containerDone) + + return actor, nil +} + +// C returns a channel that will receive the current state of the media server. +func (s *Actor) C() <-chan State { + return s.stateChan +} + +func (s *Actor) State() State { + resultChan := make(chan State) + + s.ch <- func() { + resultChan <- State{Live: s.live} + } + + return <-resultChan +} + +// Close closes the media server actor. +func (s *Actor) Close() error { + if err := s.runner.RemoveContainers(context.Background(), map[string]string{"component": componentName}); err != nil { + return fmt.Errorf("remove containers: %w", err) + } + + return nil +} + +func (s *Actor) actorLoop(containerDone <-chan struct{}) { + defer close(s.ch) + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-containerDone: + s.stateChan <- State{Live: false} + close(s.stateChan) + case <-ticker.C: + live, err := s.checkState() + if err != nil { + s.logger.Error("Error fetching server state", "error", err) + continue + } + if live != s.live { + s.live = live + s.stateChan <- State{Live: live} + } + case action, ok := <-s.ch: + if !ok { + return + } + action() + } + } +} + +type apiResponse[T any] struct { + Items []T `json:"items"` +} + +type rtmpConnsResponse struct { + ID string `json:"id"` + CreatedAt time.Time `json:"created"` + State string `json:"state"` + Path string `json:"path"` + BytesReceived int64 `json:"bytesReceived"` + BytesSent int64 `json:"bytesSent"` + RemoteAddr string `json:"remoteAddr"` +} + +func (s *Actor) checkState() (bool, error) { + req, err := http.NewRequest(http.MethodGet, "http://localhost:9997/v3/rtmpconns/list", nil) + if err != nil { + return false, fmt.Errorf("new request: %w", err) + } + + httpResp, err := s.httpClient.Do(req) + if err != nil { + return false, fmt.Errorf("do request: %w", err) + } + + if httpResp.StatusCode != http.StatusOK { + return false, fmt.Errorf("unexpected status code: %d", httpResp.StatusCode) + } + + respBody, err := io.ReadAll(httpResp.Body) + if err != nil { + return false, fmt.Errorf("read body: %w", err) + } + + var resp apiResponse[rtmpConnsResponse] + if err = json.Unmarshal(respBody, &resp); err != nil { + return false, fmt.Errorf("unmarshal: %w", err) + } + + for _, conn := range resp.Items { + if conn.Path == "live" && conn.State == "publish" { + return true, nil + } + } + + return false, nil +} diff --git a/mediaserver/actor_test.go b/mediaserver/actor_test.go new file mode 100644 index 0000000..477f65d --- /dev/null +++ b/mediaserver/actor_test.go @@ -0,0 +1,94 @@ +package mediaserver_test + +import ( + "context" + "os/exec" + "syscall" + "testing" + "time" + + "git.netflux.io/rob/termstream/container" + "git.netflux.io/rob/termstream/mediaserver" + "git.netflux.io/rob/termstream/testhelpers" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const component = "mediaserver" + +func TestMediaServerStartStop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + logger := testhelpers.NewTestLogger() + runner, err := container.NewRunner(logger) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, runner.Close()) }) + + running, err := runner.ContainerRunning(ctx, map[string]string{"component": component}) + require.NoError(t, err) + assert.False(t, running) + + actor, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ + ChanSize: 1, + Runner: runner, + Logger: logger, + }) + require.NoError(t, err) + + require.Eventually( + t, + func() bool { + running, err = runner.ContainerRunning(ctx, map[string]string{"component": component}) + return err == nil && running + }, + 5*time.Second, + 250*time.Millisecond, + "container not in RUNNING state", + ) + + launchFFMPEG(t, "rtmp://localhost:1935/live") + require.Eventually( + t, + func() bool { return actor.State().Live }, + 5*time.Second, + 250*time.Millisecond, + "actor not in LIVE state", + ) + + actor.Close() + + running, err = runner.ContainerRunning(ctx, map[string]string{"component": component}) + require.NoError(t, err) + assert.False(t, running) +} + +func launchFFMPEG(t *testing.T, destURL string) *exec.Cmd { + ctx, cancel := context.WithCancel(context.Background()) + + cmd := exec.CommandContext( + ctx, + "ffmpeg", + "-r", "30", + "-f", "lavfi", + "-i", "testsrc", + "-vf", "scale=1280:960", + "-vcodec", "libx264", + "-profile:v", "baseline", + "-pix_fmt", "yuv420p", + "-f", "flv", + destURL, + ) + + require.NoError(t, cmd.Start()) + + t.Cleanup(func() { + if cmd.Process != nil { + _ = cmd.Process.Signal(syscall.SIGINT) + } + + cancel() + }) + + return cmd +} diff --git a/mise/config.toml b/mise/config.toml index 2eaf334..ff56f1e 100644 --- a/mise/config.toml +++ b/mise/config.toml @@ -1,2 +1,20 @@ [env] GOTOOLCHAIN = "go1.23.5" + +[tasks.test] +description = "Run tests" +dir = "{{cwd}}" +run = "go test -v ./..." +alias = "t" + +[tasks.test_ci] +description = "Run tests in CI" +dir = "{{cwd}}" +run = "go test -v -race ./..." +alias = "tci" + +[tasks.lint] +description = "Run linters" +dir = "{{cwd}}" +run = "golangci-lint run" +alias = "l" diff --git a/mise/tasks/check_gomod b/mise/tasks/check_gomod new file mode 100755 index 0000000..06eee0f --- /dev/null +++ b/mise/tasks/check_gomod @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +#MISE description="Check if go.mod and go.sum are up-to-date" + +set -euo pipefail + +go mod tidy +STATUS=$(git status --porcelain go.mod go.sum) +if [ -n "$STATUS" ]; then + echo "Run \`go mod tidy\` and commit the changes." + exit 1 +fi + +exit 0 diff --git a/testhelpers/logging.go b/testhelpers/logging.go new file mode 100644 index 0000000..c8b5c70 --- /dev/null +++ b/testhelpers/logging.go @@ -0,0 +1,18 @@ +package testhelpers + +import ( + "io" + "log/slog" + "os" +) + +// NewNopLogger returns a logger that discards all log output. +// +// TODO: remove in Go 1.24: https://github.com/golang/go/issues/62005 +func NewNopLogger() *slog.Logger { + return slog.New(slog.NewJSONHandler(io.Discard, nil)) +} + +func NewTestLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stderr, nil)) +}