diff --git a/internal/app/integration_test.go b/internal/app/integration_test.go index cdd887d..17f52b3 100644 --- a/internal/app/integration_test.go +++ b/internal/app/integration_test.go @@ -483,7 +483,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) { func(t *assert.CollectT) { contents := getContents() assert.True(t, contentsIncludes(contents, "Streaming to Example server failed:"), "expected to see destination error") - assert.True(t, contentsIncludes(contents, "container failed to start"), "expected to see destination error") + assert.True(t, contentsIncludes(contents, "Error opening output files: I/O error"), "expected to see destination error") }, time.Minute, time.Second, diff --git a/internal/container/container.go b/internal/container/container.go index fb2b255..513deaa 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -38,6 +38,7 @@ type DockerClient interface { ContainerCreate(context.Context, *container.Config, *container.HostConfig, *network.NetworkingConfig, *ocispec.Platform, string) (container.CreateResponse, error) ContainerList(context.Context, container.ListOptions) ([]container.Summary, error) + ContainerLogs(context.Context, string, container.LogsOptions) (io.ReadCloser, error) ContainerRemove(context.Context, string, container.RemoveOptions) error ContainerStart(context.Context, string, container.StartOptions) error ContainerStats(context.Context, string, bool) (container.StatsResponseReader, error) @@ -136,21 +137,48 @@ func (a *Client) getEvents(containerID string) <-chan events.Message { return ch } +// getLogs returns a channel (which is never closed) that will receive +// container logs. +func (a *Client) getLogs(ctx context.Context, containerID string, cfg LogConfig) <-chan []byte { + if !cfg.Stdout && !cfg.Stderr { + return nil + } + + ch := make(chan []byte) + + go getLogs(ctx, containerID, a.apiClient, cfg, ch, a.logger) + + return ch +} + +// NetworkCountConfig holds configuration for observing network traffic. type NetworkCountConfig struct { Rx string // the network name to count the Rx bytes Tx string // the network name to count the Tx bytes } +// CopyFileConfig holds configuration for a single file which should be copied +// into a container. type CopyFileConfig struct { Path string Payload io.Reader Mode int64 } +// LogConfig holds configuration for container logs. +type LogConfig struct { + Stdout, Stderr bool +} + // ShouldRestartFunc is a callback function that is called when a container // exits. It should return true if the container is to be restarted. If not // restarting, err may be non-nil. -type ShouldRestartFunc func(exitCode int64, restartCount int, runningTime time.Duration) (bool, error) +type ShouldRestartFunc func( + exitCode int64, + restartCount int, + containerLogs [][]byte, + runningTime time.Duration, +) (bool, error) // defaultRestartInterval is the default interval between restarts. // TODO: exponential backoff @@ -164,7 +192,8 @@ type RunContainerParams struct { HostConfig *container.HostConfig NetworkingConfig *network.NetworkingConfig NetworkCountConfig NetworkCountConfig - CopyFileConfigs []CopyFileConfig + CopyFiles []CopyFileConfig + Logs LogConfig ShouldRestart ShouldRestartFunc RestartInterval time.Duration // defaults to 10 seconds } @@ -227,7 +256,7 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (< return } - if err = a.copyFilesToContainer(ctx, createResp.ID, params.CopyFileConfigs); err != nil { + if err = a.copyFilesToContainer(ctx, createResp.ID, params.CopyFiles); err != nil { sendError(fmt.Errorf("copy files to container: %w", err)) return } @@ -252,6 +281,7 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (< createResp.ID, params.ContainerConfig.Image, params.NetworkCountConfig, + params.Logs, params.ShouldRestart, cmp.Or(params.RestartInterval, defaultRestartInterval), containerStateC, @@ -332,6 +362,14 @@ func (a *Client) pullImageIfNeeded(ctx context.Context, imageName string, contai return nil } +type containerWaitResponse struct { + container.WaitResponse + + restarting bool + restartCount int + err error +} + // runContainerLoop is the control loop for a single container. It returns only // when the container exits. func (a *Client) runContainerLoop( @@ -340,6 +378,7 @@ func (a *Client) runContainerLoop( containerID string, imageName string, networkCountConfig NetworkCountConfig, + logConfig LogConfig, shouldRestartFunc ShouldRestartFunc, restartInterval time.Duration, stateC chan<- domain.Container, @@ -347,84 +386,15 @@ func (a *Client) runContainerLoop( ) { defer cancel() - type containerWaitResponse struct { - container.WaitResponse - - restarting bool - restartCount int - err error - } - containerRespC := make(chan containerWaitResponse) - containerErrC := make(chan error) + containerErrC := make(chan error, 1) statsC := a.getStats(containerID, networkCountConfig) eventsC := a.getEvents(containerID) - // ContainerWait only sends a result for the first non-running state, so we - // need to poll it repeatedly. - // - // The goroutine exits when a value is received on the error channel, or when - // the container exits and is not restarting, or when the context is cancelled. go func() { - timer := time.NewTimer(restartInterval) - defer timer.Stop() - timer.Stop() - var restartCount int - - for { - startedWaitingAt := time.Now() - respC, errC := a.apiClient.ContainerWait(ctx, containerID, container.WaitConditionNextExit) - select { - case resp := <-respC: - exit := func(err error) { - a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "false", "exit_code", resp.StatusCode, "restart_count", restartCount) - containerRespC <- containerWaitResponse{ - WaitResponse: resp, - restarting: false, - restartCount: restartCount, - err: err, - } - } - - if shouldRestartFunc == nil { - exit(nil) - return - } - - shouldRestart, err := shouldRestartFunc(resp.StatusCode, restartCount, time.Since(startedWaitingAt)) - if shouldRestart && err != nil { - panic(fmt.Errorf("shouldRestart must return nil error if restarting, but returned: %w", err)) - } - if !shouldRestart { - exit(err) - return - } - - a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "true", "exit_code", resp.StatusCode, "restart_count", restartCount) - timer.Reset(restartInterval) - - containerRespC <- containerWaitResponse{ - WaitResponse: resp, - restarting: true, - restartCount: restartCount, - } - case <-timer.C: - a.logger.Info("Container restarting", "id", shortID(containerID), "restart_count", restartCount) - restartCount++ - if err := a.apiClient.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil { - containerErrC <- fmt.Errorf("container start: %w", err) - return - } - a.logger.Info("Restarted container", "id", shortID(containerID)) - case err := <-errC: - containerErrC <- err - return - case <-ctx.Done(): - // This is probably because the container was stopped. - containerRespC <- containerWaitResponse{WaitResponse: container.WaitResponse{}, restarting: false} - return - } + for a.waitForContainerExit(ctx, containerID, containerRespC, containerErrC, logConfig, shouldRestartFunc, restartInterval, restartCount) { + restartCount++ } }() @@ -483,6 +453,7 @@ func (a *Client) runContainerLoop( if evt.Action == "start" { state.Status = domain.ContainerStatusRunning sendState() + continue } @@ -512,6 +483,96 @@ func (a *Client) runContainerLoop( } } +// waitForContainerExit blocks while it waits for a container to exit, and restarts +// it if configured to do so. +func (a *Client) waitForContainerExit( + ctx context.Context, + containerID string, + containerRespC chan<- containerWaitResponse, + containerErrC chan<- error, + logConfig LogConfig, + shouldRestartFunc ShouldRestartFunc, + restartInterval time.Duration, + restartCount int, +) bool { + var logs [][]byte + startedWaitingAt := time.Now() + respC, errC := a.apiClient.ContainerWait(ctx, containerID, container.WaitConditionNextExit) + logsC := a.getLogs(ctx, containerID, logConfig) + + timer := time.NewTimer(restartInterval) + defer timer.Stop() + timer.Stop() + + for { + select { + case resp := <-respC: + exit := func(err error) { + a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "false", "exit_code", resp.StatusCode, "restart_count", restartCount) + containerRespC <- containerWaitResponse{ + WaitResponse: resp, + restarting: false, + restartCount: restartCount, + err: err, + } + } + + // If the container exited with a non-zero status code, and debug + // logging is not enabled, log the container logs at ERROR level for + // debugging. + // TODO: parameterize + if resp.StatusCode != 0 && !a.logger.Enabled(ctx, slog.LevelDebug) { + for _, line := range logs { + a.logger.Error("Container log", "id", shortID(containerID), "log", string(line)) + } + } + + if shouldRestartFunc == nil { + exit(nil) + return false + } + + shouldRestart, err := shouldRestartFunc(resp.StatusCode, restartCount, logs, time.Since(startedWaitingAt)) + if shouldRestart && err != nil { + panic(fmt.Errorf("shouldRestart must return nil error if restarting, but returned: %w", err)) + } + if !shouldRestart { + exit(err) + return false + } + + a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "true", "exit_code", resp.StatusCode, "restart_count", restartCount) + timer.Reset(restartInterval) + + containerRespC <- containerWaitResponse{ + WaitResponse: resp, + restarting: true, + restartCount: restartCount, + } + // Don't return yet. Wait for the timer to fire. + case <-timer.C: + a.logger.Info("Container restarting", "id", shortID(containerID), "restart_count", restartCount) + if err := a.apiClient.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil { + containerErrC <- fmt.Errorf("container start: %w", err) + return false + } + a.logger.Info("Restarted container", "id", shortID(containerID)) + return true + case line := <-logsC: + a.logger.Debug("Container log", "id", shortID(containerID), "log", string(line)) + // TODO: limit max stored lines + logs = append(logs, line) + case err := <-errC: + containerErrC <- err + return false + case <-ctx.Done(): + // This is probably because the container was stopped. + containerRespC <- containerWaitResponse{WaitResponse: container.WaitResponse{}, restarting: false} + return false + } + } +} + // Close closes the client, stopping and removing all running containers. func (a *Client) Close() error { a.cancel() diff --git a/internal/container/container_test.go b/internal/container/container_test.go index 3e54622..344fe79 100644 --- a/internal/container/container_test.go +++ b/internal/container/container_test.go @@ -73,6 +73,10 @@ func TestClientRunContainer(t *testing.T) { EXPECT(). Events(mock.Anything, events.ListOptions{Filters: filters.NewArgs(filters.Arg("container", "123"), filters.Arg("type", "container"))}). Return(eventsC, eventsErrC) + dockerClient. + EXPECT(). + ContainerLogs(mock.Anything, "123", mock.Anything). + Return(io.NopCloser(bytes.NewReader(nil)), nil) containerClient, err := container.NewClient(t.Context(), &dockerClient, logger) require.NoError(t, err) @@ -82,7 +86,8 @@ func TestClientRunContainer(t *testing.T) { ChanSize: 1, ContainerConfig: &dockercontainer.Config{Image: "alpine"}, HostConfig: &dockercontainer.HostConfig{}, - CopyFileConfigs: []container.CopyFileConfig{ + Logs: container.LogConfig{Stdout: true}, + CopyFiles: []container.CopyFileConfig{ { Path: "/hello", Payload: bytes.NewReader([]byte("world")), @@ -176,6 +181,10 @@ func TestClientRunContainerWithRestart(t *testing.T) { EXPECT(). ContainerStart(mock.Anything, "123", dockercontainer.StartOptions{}). // restart Return(nil) + dockerClient. + EXPECT(). + ContainerLogs(mock.Anything, "123", mock.Anything). + Return(io.NopCloser(bytes.NewReader(nil)), nil) containerClient, err := container.NewClient(t.Context(), &dockerClient, logger) require.NoError(t, err) @@ -185,7 +194,8 @@ func TestClientRunContainerWithRestart(t *testing.T) { ChanSize: 1, ContainerConfig: &dockercontainer.Config{Image: "alpine"}, HostConfig: &dockercontainer.HostConfig{}, - ShouldRestart: func(_ int64, restartCount int, _ time.Duration) (bool, error) { + Logs: container.LogConfig{Stdout: true}, + ShouldRestart: func(_ int64, restartCount int, _ [][]byte, _ time.Duration) (bool, error) { if restartCount == 0 { return true, nil } diff --git a/internal/container/logs.go b/internal/container/logs.go new file mode 100644 index 0000000..df792de --- /dev/null +++ b/internal/container/logs.go @@ -0,0 +1,53 @@ +package container + +import ( + "bufio" + "context" + "log/slog" + + typescontainer "github.com/docker/docker/api/types/container" +) + +func getLogs( + ctx context.Context, + containerID string, + apiClient DockerClient, + cfg LogConfig, + ch chan<- []byte, + logger *slog.Logger, +) { + logsC, err := apiClient.ContainerLogs( + ctx, + containerID, + typescontainer.LogsOptions{ + ShowStdout: cfg.Stdout, + ShowStderr: cfg.Stderr, + Follow: true, + }, + ) + if err != nil { + logger.Error("Error getting container logs", "err", err, "id", shortID(containerID)) + return + } + defer logsC.Close() + + scanner := bufio.NewScanner(logsC) + for scanner.Scan() { + select { + case <-ctx.Done(): + return + default: + // Docker logs are prefixed with an 8 byte prefix. + // See client.ContainerLogs for more details. + // We could use + // [StdCopy](https://pkg.go.dev/github.com/docker/docker/pkg/stdcopy#StdCopy) + // but for our purposes it's enough to just slice it off. + const prefixLen = 8 + line := scanner.Bytes() + if len(line) <= prefixLen { + continue + } + ch <- line[prefixLen:] + } + } +} diff --git a/internal/container/logs_test.go b/internal/container/logs_test.go new file mode 100644 index 0000000..4158894 --- /dev/null +++ b/internal/container/logs_test.go @@ -0,0 +1,45 @@ +package container + +import ( + "io" + "strings" + "testing" + + "git.netflux.io/rob/octoplex/internal/container/mocks" + "git.netflux.io/rob/octoplex/internal/testhelpers" + typescontainer "github.com/docker/docker/api/types/container" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestGetLogs(t *testing.T) { + var dockerClient mocks.DockerClient + dockerClient. + EXPECT(). + ContainerLogs(mock.Anything, "123", typescontainer.LogsOptions{ShowStderr: true, Follow: true}). + Return(io.NopCloser(strings.NewReader("********line 1\n********line 2\n********line 3\n")), nil) + + ch := make(chan []byte) + + go func() { + defer close(ch) + + getLogs( + t.Context(), + "123", + &dockerClient, + LogConfig{Stderr: true}, + ch, + testhelpers.NewTestLogger(t), + ) + }() + + // Ensure we get the expected lines, including stripping 8 bytes of Docker + // multiplexing prefix. + assert.Equal(t, "line 1", string(<-ch)) + assert.Equal(t, "line 2", string(<-ch)) + assert.Equal(t, "line 3", string(<-ch)) + + _, ok := <-ch + assert.False(t, ok) +} diff --git a/internal/container/mocks/dockerclient_mock.go b/internal/container/mocks/dockerclient_mock.go index 6ecc419..f353259 100644 --- a/internal/container/mocks/dockerclient_mock.go +++ b/internal/container/mocks/dockerclient_mock.go @@ -197,6 +197,66 @@ func (_c *DockerClient_ContainerList_Call) RunAndReturn(run func(context.Context return _c } +// ContainerLogs provides a mock function with given fields: _a0, _a1, _a2 +func (_m *DockerClient) ContainerLogs(_a0 context.Context, _a1 string, _a2 typescontainer.LogsOptions) (io.ReadCloser, error) { + ret := _m.Called(_a0, _a1, _a2) + + if len(ret) == 0 { + panic("no return value specified for ContainerLogs") + } + + var r0 io.ReadCloser + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, typescontainer.LogsOptions) (io.ReadCloser, error)); ok { + return rf(_a0, _a1, _a2) + } + if rf, ok := ret.Get(0).(func(context.Context, string, typescontainer.LogsOptions) io.ReadCloser); ok { + r0 = rf(_a0, _a1, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(io.ReadCloser) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, typescontainer.LogsOptions) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DockerClient_ContainerLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ContainerLogs' +type DockerClient_ContainerLogs_Call struct { + *mock.Call +} + +// ContainerLogs is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 string +// - _a2 typescontainer.LogsOptions +func (_e *DockerClient_Expecter) ContainerLogs(_a0 interface{}, _a1 interface{}, _a2 interface{}) *DockerClient_ContainerLogs_Call { + return &DockerClient_ContainerLogs_Call{Call: _e.mock.On("ContainerLogs", _a0, _a1, _a2)} +} + +func (_c *DockerClient_ContainerLogs_Call) Run(run func(_a0 context.Context, _a1 string, _a2 typescontainer.LogsOptions)) *DockerClient_ContainerLogs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(typescontainer.LogsOptions)) + }) + return _c +} + +func (_c *DockerClient_ContainerLogs_Call) Return(_a0 io.ReadCloser, _a1 error) *DockerClient_ContainerLogs_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DockerClient_ContainerLogs_Call) RunAndReturn(run func(context.Context, string, typescontainer.LogsOptions) (io.ReadCloser, error)) *DockerClient_ContainerLogs_Call { + _c.Call.Return(run) + return _c +} + // ContainerRemove provides a mock function with given fields: _a0, _a1, _a2 func (_m *DockerClient) ContainerRemove(_a0 context.Context, _a1 string, _a2 typescontainer.RemoveOptions) error { ret := _m.Called(_a0, _a1, _a2) diff --git a/internal/mediaserver/actor.go b/internal/mediaserver/actor.go index 1cff252..17da238 100644 --- a/internal/mediaserver/actor.go +++ b/internal/mediaserver/actor.go @@ -186,7 +186,8 @@ func (a *Actor) Start(ctx context.Context) error { PortBindings: portBindings, }, NetworkCountConfig: container.NetworkCountConfig{Rx: "eth0", Tx: "eth1"}, - CopyFileConfigs: []container.CopyFileConfig{ + Logs: container.LogConfig{Stdout: true}, + CopyFiles: []container.CopyFileConfig{ { Path: "/mediamtx.yml", Payload: bytes.NewReader(cfg), diff --git a/internal/replicator/replicator.go b/internal/replicator/replicator.go index d3ca087..90fbe56 100644 --- a/internal/replicator/replicator.go +++ b/internal/replicator/replicator.go @@ -7,6 +7,7 @@ import ( "fmt" "log/slog" "strconv" + "strings" "sync" "time" @@ -96,6 +97,7 @@ func (a *Actor) StartDestination(url string) { ContainerConfig: &typescontainer.Config{ Image: imageNameFFMPEG, Cmd: []string{ + "-loglevel", "level+error", "-i", a.sourceURL, "-c", "copy", "-f", "flv", @@ -108,13 +110,13 @@ func (a *Actor) StartDestination(url string) { }, HostConfig: &typescontainer.HostConfig{NetworkMode: "default"}, NetworkCountConfig: container.NetworkCountConfig{Rx: "eth1", Tx: "eth0"}, - ShouldRestart: func(_ int64, restartCount int, runningTime time.Duration) (bool, error) { + Logs: container.LogConfig{Stderr: true}, + ShouldRestart: func(_ int64, restartCount int, logs [][]byte, runningTime time.Duration) (bool, error) { // Try to infer if the container failed to start. // - // TODO: this is a bit hacky, we should check the container logs and - // include some details in the error message. + // For now, we just check if it was running for less than ten seconds. if restartCount == 0 && runningTime < 10*time.Second { - return false, errors.New("container failed to start") + return false, containerStartErrFromLogs(logs) } // Otherwise, always restart, regardless of the exit code. @@ -131,6 +133,32 @@ func (a *Actor) StartDestination(url string) { } } +// Grab the first fatal log line, if it exists, or the first error log line, +// from the FFmpeg output. +func containerStartErrFromLogs(logs [][]byte) error { + var fatalLog, errLog string + + for _, logBytes := range logs { + log := string(logBytes) + if strings.HasPrefix(log, "[fatal]") { + fatalLog = log + break + } + } + + if fatalLog == "" { + for _, logBytes := range logs { + log := string(logBytes) + if strings.HasPrefix(log, "[error]") { + errLog = log + break + } + } + } + + return errors.New(cmp.Or(fatalLog, errLog, "container failed to start")) +} + // StopDestination stops a destination stream. func (a *Actor) StopDestination(url string) { a.actorC <- func() { diff --git a/internal/replicator/replicator_test.go b/internal/replicator/replicator_test.go new file mode 100644 index 0000000..8c321f0 --- /dev/null +++ b/internal/replicator/replicator_test.go @@ -0,0 +1,51 @@ +package replicator + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestContainerStartErrFromLogs(t *testing.T) { + testCases := []struct { + name string + logs [][]byte + want error + }{ + { + name: "no logs", + want: errors.New("container failed to start"), + }, + { + name: "with only error logs", + logs: [][]byte{ + []byte("[error] this is an error log"), + []byte("[error] this is another error log"), + }, + want: errors.New("[error] this is an error log"), + }, + { + name: "with only fatal logs", + logs: [][]byte{ + []byte("[fatal] this is a fatal log"), + []byte("[fatal] this is another fatal log"), + }, + want: errors.New("[fatal] this is a fatal log"), + }, + { + name: "with error and fatal logs", + logs: [][]byte{ + []byte("[error] this is an error log"), + []byte("[fatal] this is a fatal log"), + }, + want: errors.New("[fatal] this is a fatal log"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, containerStartErrFromLogs(tc.logs)) + }) + } +}