feat: container logs
Some checks are pending
ci-build / lint (push) Waiting to run
ci-build / build (push) Blocked by required conditions
ci-build / release (push) Blocked by required conditions

This commit is contained in:
Rob Watson 2025-04-13 19:38:49 +02:00 committed by Rob Watson
parent 579dfeef22
commit df9724afa7
9 changed files with 392 additions and 83 deletions

View File

@ -483,7 +483,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
func(t *assert.CollectT) {
contents := getContents()
assert.True(t, contentsIncludes(contents, "Streaming to Example server failed:"), "expected to see destination error")
assert.True(t, contentsIncludes(contents, "container failed to start"), "expected to see destination error")
assert.True(t, contentsIncludes(contents, "Error opening output files: I/O error"), "expected to see destination error")
},
time.Minute,
time.Second,

View File

@ -38,6 +38,7 @@ type DockerClient interface {
ContainerCreate(context.Context, *container.Config, *container.HostConfig, *network.NetworkingConfig, *ocispec.Platform, string) (container.CreateResponse, error)
ContainerList(context.Context, container.ListOptions) ([]container.Summary, error)
ContainerLogs(context.Context, string, container.LogsOptions) (io.ReadCloser, error)
ContainerRemove(context.Context, string, container.RemoveOptions) error
ContainerStart(context.Context, string, container.StartOptions) error
ContainerStats(context.Context, string, bool) (container.StatsResponseReader, error)
@ -136,21 +137,48 @@ func (a *Client) getEvents(containerID string) <-chan events.Message {
return ch
}
// getLogs returns a channel (which is never closed) that will receive
// container logs.
func (a *Client) getLogs(ctx context.Context, containerID string, cfg LogConfig) <-chan []byte {
if !cfg.Stdout && !cfg.Stderr {
return nil
}
ch := make(chan []byte)
go getLogs(ctx, containerID, a.apiClient, cfg, ch, a.logger)
return ch
}
// NetworkCountConfig holds configuration for observing network traffic.
type NetworkCountConfig struct {
Rx string // the network name to count the Rx bytes
Tx string // the network name to count the Tx bytes
}
// CopyFileConfig holds configuration for a single file which should be copied
// into a container.
type CopyFileConfig struct {
Path string
Payload io.Reader
Mode int64
}
// LogConfig holds configuration for container logs.
type LogConfig struct {
Stdout, Stderr bool
}
// ShouldRestartFunc is a callback function that is called when a container
// exits. It should return true if the container is to be restarted. If not
// restarting, err may be non-nil.
type ShouldRestartFunc func(exitCode int64, restartCount int, runningTime time.Duration) (bool, error)
type ShouldRestartFunc func(
exitCode int64,
restartCount int,
containerLogs [][]byte,
runningTime time.Duration,
) (bool, error)
// defaultRestartInterval is the default interval between restarts.
// TODO: exponential backoff
@ -164,7 +192,8 @@ type RunContainerParams struct {
HostConfig *container.HostConfig
NetworkingConfig *network.NetworkingConfig
NetworkCountConfig NetworkCountConfig
CopyFileConfigs []CopyFileConfig
CopyFiles []CopyFileConfig
Logs LogConfig
ShouldRestart ShouldRestartFunc
RestartInterval time.Duration // defaults to 10 seconds
}
@ -227,7 +256,7 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (<
return
}
if err = a.copyFilesToContainer(ctx, createResp.ID, params.CopyFileConfigs); err != nil {
if err = a.copyFilesToContainer(ctx, createResp.ID, params.CopyFiles); err != nil {
sendError(fmt.Errorf("copy files to container: %w", err))
return
}
@ -252,6 +281,7 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (<
createResp.ID,
params.ContainerConfig.Image,
params.NetworkCountConfig,
params.Logs,
params.ShouldRestart,
cmp.Or(params.RestartInterval, defaultRestartInterval),
containerStateC,
@ -332,21 +362,6 @@ func (a *Client) pullImageIfNeeded(ctx context.Context, imageName string, contai
return nil
}
// runContainerLoop is the control loop for a single container. It returns only
// when the container exits.
func (a *Client) runContainerLoop(
ctx context.Context,
cancel context.CancelFunc,
containerID string,
imageName string,
networkCountConfig NetworkCountConfig,
shouldRestartFunc ShouldRestartFunc,
restartInterval time.Duration,
stateC chan<- domain.Container,
errC chan<- error,
) {
defer cancel()
type containerWaitResponse struct {
container.WaitResponse
@ -355,76 +370,31 @@ func (a *Client) runContainerLoop(
err error
}
// runContainerLoop is the control loop for a single container. It returns only
// when the container exits.
func (a *Client) runContainerLoop(
ctx context.Context,
cancel context.CancelFunc,
containerID string,
imageName string,
networkCountConfig NetworkCountConfig,
logConfig LogConfig,
shouldRestartFunc ShouldRestartFunc,
restartInterval time.Duration,
stateC chan<- domain.Container,
errC chan<- error,
) {
defer cancel()
containerRespC := make(chan containerWaitResponse)
containerErrC := make(chan error)
containerErrC := make(chan error, 1)
statsC := a.getStats(containerID, networkCountConfig)
eventsC := a.getEvents(containerID)
// ContainerWait only sends a result for the first non-running state, so we
// need to poll it repeatedly.
//
// The goroutine exits when a value is received on the error channel, or when
// the container exits and is not restarting, or when the context is cancelled.
go func() {
timer := time.NewTimer(restartInterval)
defer timer.Stop()
timer.Stop()
var restartCount int
for {
startedWaitingAt := time.Now()
respC, errC := a.apiClient.ContainerWait(ctx, containerID, container.WaitConditionNextExit)
select {
case resp := <-respC:
exit := func(err error) {
a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "false", "exit_code", resp.StatusCode, "restart_count", restartCount)
containerRespC <- containerWaitResponse{
WaitResponse: resp,
restarting: false,
restartCount: restartCount,
err: err,
}
}
if shouldRestartFunc == nil {
exit(nil)
return
}
shouldRestart, err := shouldRestartFunc(resp.StatusCode, restartCount, time.Since(startedWaitingAt))
if shouldRestart && err != nil {
panic(fmt.Errorf("shouldRestart must return nil error if restarting, but returned: %w", err))
}
if !shouldRestart {
exit(err)
return
}
a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "true", "exit_code", resp.StatusCode, "restart_count", restartCount)
timer.Reset(restartInterval)
containerRespC <- containerWaitResponse{
WaitResponse: resp,
restarting: true,
restartCount: restartCount,
}
case <-timer.C:
a.logger.Info("Container restarting", "id", shortID(containerID), "restart_count", restartCount)
for a.waitForContainerExit(ctx, containerID, containerRespC, containerErrC, logConfig, shouldRestartFunc, restartInterval, restartCount) {
restartCount++
if err := a.apiClient.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil {
containerErrC <- fmt.Errorf("container start: %w", err)
return
}
a.logger.Info("Restarted container", "id", shortID(containerID))
case err := <-errC:
containerErrC <- err
return
case <-ctx.Done():
// This is probably because the container was stopped.
containerRespC <- containerWaitResponse{WaitResponse: container.WaitResponse{}, restarting: false}
return
}
}
}()
@ -483,6 +453,7 @@ func (a *Client) runContainerLoop(
if evt.Action == "start" {
state.Status = domain.ContainerStatusRunning
sendState()
continue
}
@ -512,6 +483,96 @@ func (a *Client) runContainerLoop(
}
}
// waitForContainerExit blocks while it waits for a container to exit, and restarts
// it if configured to do so.
func (a *Client) waitForContainerExit(
ctx context.Context,
containerID string,
containerRespC chan<- containerWaitResponse,
containerErrC chan<- error,
logConfig LogConfig,
shouldRestartFunc ShouldRestartFunc,
restartInterval time.Duration,
restartCount int,
) bool {
var logs [][]byte
startedWaitingAt := time.Now()
respC, errC := a.apiClient.ContainerWait(ctx, containerID, container.WaitConditionNextExit)
logsC := a.getLogs(ctx, containerID, logConfig)
timer := time.NewTimer(restartInterval)
defer timer.Stop()
timer.Stop()
for {
select {
case resp := <-respC:
exit := func(err error) {
a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "false", "exit_code", resp.StatusCode, "restart_count", restartCount)
containerRespC <- containerWaitResponse{
WaitResponse: resp,
restarting: false,
restartCount: restartCount,
err: err,
}
}
// If the container exited with a non-zero status code, and debug
// logging is not enabled, log the container logs at ERROR level for
// debugging.
// TODO: parameterize
if resp.StatusCode != 0 && !a.logger.Enabled(ctx, slog.LevelDebug) {
for _, line := range logs {
a.logger.Error("Container log", "id", shortID(containerID), "log", string(line))
}
}
if shouldRestartFunc == nil {
exit(nil)
return false
}
shouldRestart, err := shouldRestartFunc(resp.StatusCode, restartCount, logs, time.Since(startedWaitingAt))
if shouldRestart && err != nil {
panic(fmt.Errorf("shouldRestart must return nil error if restarting, but returned: %w", err))
}
if !shouldRestart {
exit(err)
return false
}
a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "true", "exit_code", resp.StatusCode, "restart_count", restartCount)
timer.Reset(restartInterval)
containerRespC <- containerWaitResponse{
WaitResponse: resp,
restarting: true,
restartCount: restartCount,
}
// Don't return yet. Wait for the timer to fire.
case <-timer.C:
a.logger.Info("Container restarting", "id", shortID(containerID), "restart_count", restartCount)
if err := a.apiClient.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil {
containerErrC <- fmt.Errorf("container start: %w", err)
return false
}
a.logger.Info("Restarted container", "id", shortID(containerID))
return true
case line := <-logsC:
a.logger.Debug("Container log", "id", shortID(containerID), "log", string(line))
// TODO: limit max stored lines
logs = append(logs, line)
case err := <-errC:
containerErrC <- err
return false
case <-ctx.Done():
// This is probably because the container was stopped.
containerRespC <- containerWaitResponse{WaitResponse: container.WaitResponse{}, restarting: false}
return false
}
}
}
// Close closes the client, stopping and removing all running containers.
func (a *Client) Close() error {
a.cancel()

View File

@ -73,6 +73,10 @@ func TestClientRunContainer(t *testing.T) {
EXPECT().
Events(mock.Anything, events.ListOptions{Filters: filters.NewArgs(filters.Arg("container", "123"), filters.Arg("type", "container"))}).
Return(eventsC, eventsErrC)
dockerClient.
EXPECT().
ContainerLogs(mock.Anything, "123", mock.Anything).
Return(io.NopCloser(bytes.NewReader(nil)), nil)
containerClient, err := container.NewClient(t.Context(), &dockerClient, logger)
require.NoError(t, err)
@ -82,7 +86,8 @@ func TestClientRunContainer(t *testing.T) {
ChanSize: 1,
ContainerConfig: &dockercontainer.Config{Image: "alpine"},
HostConfig: &dockercontainer.HostConfig{},
CopyFileConfigs: []container.CopyFileConfig{
Logs: container.LogConfig{Stdout: true},
CopyFiles: []container.CopyFileConfig{
{
Path: "/hello",
Payload: bytes.NewReader([]byte("world")),
@ -176,6 +181,10 @@ func TestClientRunContainerWithRestart(t *testing.T) {
EXPECT().
ContainerStart(mock.Anything, "123", dockercontainer.StartOptions{}). // restart
Return(nil)
dockerClient.
EXPECT().
ContainerLogs(mock.Anything, "123", mock.Anything).
Return(io.NopCloser(bytes.NewReader(nil)), nil)
containerClient, err := container.NewClient(t.Context(), &dockerClient, logger)
require.NoError(t, err)
@ -185,7 +194,8 @@ func TestClientRunContainerWithRestart(t *testing.T) {
ChanSize: 1,
ContainerConfig: &dockercontainer.Config{Image: "alpine"},
HostConfig: &dockercontainer.HostConfig{},
ShouldRestart: func(_ int64, restartCount int, _ time.Duration) (bool, error) {
Logs: container.LogConfig{Stdout: true},
ShouldRestart: func(_ int64, restartCount int, _ [][]byte, _ time.Duration) (bool, error) {
if restartCount == 0 {
return true, nil
}

View File

@ -0,0 +1,53 @@
package container
import (
"bufio"
"context"
"log/slog"
typescontainer "github.com/docker/docker/api/types/container"
)
func getLogs(
ctx context.Context,
containerID string,
apiClient DockerClient,
cfg LogConfig,
ch chan<- []byte,
logger *slog.Logger,
) {
logsC, err := apiClient.ContainerLogs(
ctx,
containerID,
typescontainer.LogsOptions{
ShowStdout: cfg.Stdout,
ShowStderr: cfg.Stderr,
Follow: true,
},
)
if err != nil {
logger.Error("Error getting container logs", "err", err, "id", shortID(containerID))
return
}
defer logsC.Close()
scanner := bufio.NewScanner(logsC)
for scanner.Scan() {
select {
case <-ctx.Done():
return
default:
// Docker logs are prefixed with an 8 byte prefix.
// See client.ContainerLogs for more details.
// We could use
// [StdCopy](https://pkg.go.dev/github.com/docker/docker/pkg/stdcopy#StdCopy)
// but for our purposes it's enough to just slice it off.
const prefixLen = 8
line := scanner.Bytes()
if len(line) <= prefixLen {
continue
}
ch <- line[prefixLen:]
}
}
}

View File

@ -0,0 +1,45 @@
package container
import (
"io"
"strings"
"testing"
"git.netflux.io/rob/octoplex/internal/container/mocks"
"git.netflux.io/rob/octoplex/internal/testhelpers"
typescontainer "github.com/docker/docker/api/types/container"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestGetLogs(t *testing.T) {
var dockerClient mocks.DockerClient
dockerClient.
EXPECT().
ContainerLogs(mock.Anything, "123", typescontainer.LogsOptions{ShowStderr: true, Follow: true}).
Return(io.NopCloser(strings.NewReader("********line 1\n********line 2\n********line 3\n")), nil)
ch := make(chan []byte)
go func() {
defer close(ch)
getLogs(
t.Context(),
"123",
&dockerClient,
LogConfig{Stderr: true},
ch,
testhelpers.NewTestLogger(t),
)
}()
// Ensure we get the expected lines, including stripping 8 bytes of Docker
// multiplexing prefix.
assert.Equal(t, "line 1", string(<-ch))
assert.Equal(t, "line 2", string(<-ch))
assert.Equal(t, "line 3", string(<-ch))
_, ok := <-ch
assert.False(t, ok)
}

View File

@ -197,6 +197,66 @@ func (_c *DockerClient_ContainerList_Call) RunAndReturn(run func(context.Context
return _c
}
// ContainerLogs provides a mock function with given fields: _a0, _a1, _a2
func (_m *DockerClient) ContainerLogs(_a0 context.Context, _a1 string, _a2 typescontainer.LogsOptions) (io.ReadCloser, error) {
ret := _m.Called(_a0, _a1, _a2)
if len(ret) == 0 {
panic("no return value specified for ContainerLogs")
}
var r0 io.ReadCloser
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string, typescontainer.LogsOptions) (io.ReadCloser, error)); ok {
return rf(_a0, _a1, _a2)
}
if rf, ok := ret.Get(0).(func(context.Context, string, typescontainer.LogsOptions) io.ReadCloser); ok {
r0 = rf(_a0, _a1, _a2)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(io.ReadCloser)
}
}
if rf, ok := ret.Get(1).(func(context.Context, string, typescontainer.LogsOptions) error); ok {
r1 = rf(_a0, _a1, _a2)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DockerClient_ContainerLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ContainerLogs'
type DockerClient_ContainerLogs_Call struct {
*mock.Call
}
// ContainerLogs is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 string
// - _a2 typescontainer.LogsOptions
func (_e *DockerClient_Expecter) ContainerLogs(_a0 interface{}, _a1 interface{}, _a2 interface{}) *DockerClient_ContainerLogs_Call {
return &DockerClient_ContainerLogs_Call{Call: _e.mock.On("ContainerLogs", _a0, _a1, _a2)}
}
func (_c *DockerClient_ContainerLogs_Call) Run(run func(_a0 context.Context, _a1 string, _a2 typescontainer.LogsOptions)) *DockerClient_ContainerLogs_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].(typescontainer.LogsOptions))
})
return _c
}
func (_c *DockerClient_ContainerLogs_Call) Return(_a0 io.ReadCloser, _a1 error) *DockerClient_ContainerLogs_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *DockerClient_ContainerLogs_Call) RunAndReturn(run func(context.Context, string, typescontainer.LogsOptions) (io.ReadCloser, error)) *DockerClient_ContainerLogs_Call {
_c.Call.Return(run)
return _c
}
// ContainerRemove provides a mock function with given fields: _a0, _a1, _a2
func (_m *DockerClient) ContainerRemove(_a0 context.Context, _a1 string, _a2 typescontainer.RemoveOptions) error {
ret := _m.Called(_a0, _a1, _a2)

View File

@ -186,7 +186,8 @@ func (a *Actor) Start(ctx context.Context) error {
PortBindings: portBindings,
},
NetworkCountConfig: container.NetworkCountConfig{Rx: "eth0", Tx: "eth1"},
CopyFileConfigs: []container.CopyFileConfig{
Logs: container.LogConfig{Stdout: true},
CopyFiles: []container.CopyFileConfig{
{
Path: "/mediamtx.yml",
Payload: bytes.NewReader(cfg),

View File

@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"strconv"
"strings"
"sync"
"time"
@ -96,6 +97,7 @@ func (a *Actor) StartDestination(url string) {
ContainerConfig: &typescontainer.Config{
Image: imageNameFFMPEG,
Cmd: []string{
"-loglevel", "level+error",
"-i", a.sourceURL,
"-c", "copy",
"-f", "flv",
@ -108,13 +110,13 @@ func (a *Actor) StartDestination(url string) {
},
HostConfig: &typescontainer.HostConfig{NetworkMode: "default"},
NetworkCountConfig: container.NetworkCountConfig{Rx: "eth1", Tx: "eth0"},
ShouldRestart: func(_ int64, restartCount int, runningTime time.Duration) (bool, error) {
Logs: container.LogConfig{Stderr: true},
ShouldRestart: func(_ int64, restartCount int, logs [][]byte, runningTime time.Duration) (bool, error) {
// Try to infer if the container failed to start.
//
// TODO: this is a bit hacky, we should check the container logs and
// include some details in the error message.
// For now, we just check if it was running for less than ten seconds.
if restartCount == 0 && runningTime < 10*time.Second {
return false, errors.New("container failed to start")
return false, containerStartErrFromLogs(logs)
}
// Otherwise, always restart, regardless of the exit code.
@ -131,6 +133,32 @@ func (a *Actor) StartDestination(url string) {
}
}
// Grab the first fatal log line, if it exists, or the first error log line,
// from the FFmpeg output.
func containerStartErrFromLogs(logs [][]byte) error {
var fatalLog, errLog string
for _, logBytes := range logs {
log := string(logBytes)
if strings.HasPrefix(log, "[fatal]") {
fatalLog = log
break
}
}
if fatalLog == "" {
for _, logBytes := range logs {
log := string(logBytes)
if strings.HasPrefix(log, "[error]") {
errLog = log
break
}
}
}
return errors.New(cmp.Or(fatalLog, errLog, "container failed to start"))
}
// StopDestination stops a destination stream.
func (a *Actor) StopDestination(url string) {
a.actorC <- func() {

View File

@ -0,0 +1,51 @@
package replicator
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
)
func TestContainerStartErrFromLogs(t *testing.T) {
testCases := []struct {
name string
logs [][]byte
want error
}{
{
name: "no logs",
want: errors.New("container failed to start"),
},
{
name: "with only error logs",
logs: [][]byte{
[]byte("[error] this is an error log"),
[]byte("[error] this is another error log"),
},
want: errors.New("[error] this is an error log"),
},
{
name: "with only fatal logs",
logs: [][]byte{
[]byte("[fatal] this is a fatal log"),
[]byte("[fatal] this is another fatal log"),
},
want: errors.New("[fatal] this is a fatal log"),
},
{
name: "with error and fatal logs",
logs: [][]byte{
[]byte("[error] this is an error log"),
[]byte("[fatal] this is a fatal log"),
},
want: errors.New("[fatal] this is a fatal log"),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.want, containerStartErrFromLogs(tc.logs))
})
}
}