From df9724afa7db90933a8f85f67e1165a1db6f1545 Mon Sep 17 00:00:00 2001
From: Rob Watson <rob@netflux.io>
Date: Sun, 13 Apr 2025 19:38:49 +0200
Subject: [PATCH] feat: container logs

---
 internal/app/integration_test.go              |   2 +-
 internal/container/container.go               | 211 +++++++++++-------
 internal/container/container_test.go          |  14 +-
 internal/container/logs.go                    |  53 +++++
 internal/container/logs_test.go               |  45 ++++
 internal/container/mocks/dockerclient_mock.go |  60 +++++
 internal/mediaserver/actor.go                 |   3 +-
 internal/replicator/replicator.go             |  36 ++-
 internal/replicator/replicator_test.go        |  51 +++++
 9 files changed, 392 insertions(+), 83 deletions(-)
 create mode 100644 internal/container/logs.go
 create mode 100644 internal/container/logs_test.go
 create mode 100644 internal/replicator/replicator_test.go

diff --git a/internal/app/integration_test.go b/internal/app/integration_test.go
index cdd887d..17f52b3 100644
--- a/internal/app/integration_test.go
+++ b/internal/app/integration_test.go
@@ -483,7 +483,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
 		func(t *assert.CollectT) {
 			contents := getContents()
 			assert.True(t, contentsIncludes(contents, "Streaming to Example server failed:"), "expected to see destination error")
-			assert.True(t, contentsIncludes(contents, "container failed to start"), "expected to see destination error")
+			assert.True(t, contentsIncludes(contents, "Error opening output files: I/O error"), "expected to see destination error")
 		},
 		time.Minute,
 		time.Second,
diff --git a/internal/container/container.go b/internal/container/container.go
index fb2b255..513deaa 100644
--- a/internal/container/container.go
+++ b/internal/container/container.go
@@ -38,6 +38,7 @@ type DockerClient interface {
 
 	ContainerCreate(context.Context, *container.Config, *container.HostConfig, *network.NetworkingConfig, *ocispec.Platform, string) (container.CreateResponse, error)
 	ContainerList(context.Context, container.ListOptions) ([]container.Summary, error)
+	ContainerLogs(context.Context, string, container.LogsOptions) (io.ReadCloser, error)
 	ContainerRemove(context.Context, string, container.RemoveOptions) error
 	ContainerStart(context.Context, string, container.StartOptions) error
 	ContainerStats(context.Context, string, bool) (container.StatsResponseReader, error)
@@ -136,21 +137,48 @@ func (a *Client) getEvents(containerID string) <-chan events.Message {
 	return ch
 }
 
+// getLogs returns a channel (which is never closed) that will receive
+// container logs.
+func (a *Client) getLogs(ctx context.Context, containerID string, cfg LogConfig) <-chan []byte {
+	if !cfg.Stdout && !cfg.Stderr {
+		return nil
+	}
+
+	ch := make(chan []byte)
+
+	go getLogs(ctx, containerID, a.apiClient, cfg, ch, a.logger)
+
+	return ch
+}
+
+// NetworkCountConfig holds configuration for observing network traffic.
 type NetworkCountConfig struct {
 	Rx string // the network name to count the Rx bytes
 	Tx string // the network name to count the Tx bytes
 }
 
+// CopyFileConfig holds configuration for a single file which should be copied
+// into a container.
 type CopyFileConfig struct {
 	Path    string
 	Payload io.Reader
 	Mode    int64
 }
 
+// LogConfig holds configuration for container logs.
+type LogConfig struct {
+	Stdout, Stderr bool
+}
+
 // ShouldRestartFunc is a callback function that is called when a container
 // exits. It should return true if the container is to be restarted. If not
 // restarting, err may be non-nil.
-type ShouldRestartFunc func(exitCode int64, restartCount int, runningTime time.Duration) (bool, error)
+type ShouldRestartFunc func(
+	exitCode int64,
+	restartCount int,
+	containerLogs [][]byte,
+	runningTime time.Duration,
+) (bool, error)
 
 // defaultRestartInterval is the default interval between restarts.
 // TODO: exponential backoff
@@ -164,7 +192,8 @@ type RunContainerParams struct {
 	HostConfig         *container.HostConfig
 	NetworkingConfig   *network.NetworkingConfig
 	NetworkCountConfig NetworkCountConfig
-	CopyFileConfigs    []CopyFileConfig
+	CopyFiles          []CopyFileConfig
+	Logs               LogConfig
 	ShouldRestart      ShouldRestartFunc
 	RestartInterval    time.Duration // defaults to 10 seconds
 }
@@ -227,7 +256,7 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (<
 			return
 		}
 
-		if err = a.copyFilesToContainer(ctx, createResp.ID, params.CopyFileConfigs); err != nil {
+		if err = a.copyFilesToContainer(ctx, createResp.ID, params.CopyFiles); err != nil {
 			sendError(fmt.Errorf("copy files to container: %w", err))
 			return
 		}
@@ -252,6 +281,7 @@ func (a *Client) RunContainer(ctx context.Context, params RunContainerParams) (<
 			createResp.ID,
 			params.ContainerConfig.Image,
 			params.NetworkCountConfig,
+			params.Logs,
 			params.ShouldRestart,
 			cmp.Or(params.RestartInterval, defaultRestartInterval),
 			containerStateC,
@@ -332,6 +362,14 @@ func (a *Client) pullImageIfNeeded(ctx context.Context, imageName string, contai
 	return nil
 }
 
+type containerWaitResponse struct {
+	container.WaitResponse
+
+	restarting   bool
+	restartCount int
+	err          error
+}
+
 // runContainerLoop is the control loop for a single container. It returns only
 // when the container exits.
 func (a *Client) runContainerLoop(
@@ -340,6 +378,7 @@ func (a *Client) runContainerLoop(
 	containerID string,
 	imageName string,
 	networkCountConfig NetworkCountConfig,
+	logConfig LogConfig,
 	shouldRestartFunc ShouldRestartFunc,
 	restartInterval time.Duration,
 	stateC chan<- domain.Container,
@@ -347,84 +386,15 @@ func (a *Client) runContainerLoop(
 ) {
 	defer cancel()
 
-	type containerWaitResponse struct {
-		container.WaitResponse
-
-		restarting   bool
-		restartCount int
-		err          error
-	}
-
 	containerRespC := make(chan containerWaitResponse)
-	containerErrC := make(chan error)
+	containerErrC := make(chan error, 1)
 	statsC := a.getStats(containerID, networkCountConfig)
 	eventsC := a.getEvents(containerID)
 
-	// ContainerWait only sends a result for the first non-running state, so we
-	// need to poll it repeatedly.
-	//
-	// The goroutine exits when a value is received on the error channel, or when
-	// the container exits and is not restarting, or when the context is cancelled.
 	go func() {
-		timer := time.NewTimer(restartInterval)
-		defer timer.Stop()
-		timer.Stop()
-
 		var restartCount int
-
-		for {
-			startedWaitingAt := time.Now()
-			respC, errC := a.apiClient.ContainerWait(ctx, containerID, container.WaitConditionNextExit)
-			select {
-			case resp := <-respC:
-				exit := func(err error) {
-					a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "false", "exit_code", resp.StatusCode, "restart_count", restartCount)
-					containerRespC <- containerWaitResponse{
-						WaitResponse: resp,
-						restarting:   false,
-						restartCount: restartCount,
-						err:          err,
-					}
-				}
-
-				if shouldRestartFunc == nil {
-					exit(nil)
-					return
-				}
-
-				shouldRestart, err := shouldRestartFunc(resp.StatusCode, restartCount, time.Since(startedWaitingAt))
-				if shouldRestart && err != nil {
-					panic(fmt.Errorf("shouldRestart must return nil error if restarting, but returned: %w", err))
-				}
-				if !shouldRestart {
-					exit(err)
-					return
-				}
-
-				a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "true", "exit_code", resp.StatusCode, "restart_count", restartCount)
-				timer.Reset(restartInterval)
-
-				containerRespC <- containerWaitResponse{
-					WaitResponse: resp,
-					restarting:   true,
-					restartCount: restartCount,
-				}
-			case <-timer.C:
-				a.logger.Info("Container restarting", "id", shortID(containerID), "restart_count", restartCount)
-				restartCount++
-				if err := a.apiClient.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil {
-					containerErrC <- fmt.Errorf("container start: %w", err)
-					return
-				}
-				a.logger.Info("Restarted container", "id", shortID(containerID))
-			case err := <-errC:
-				containerErrC <- err
-				return
-			case <-ctx.Done():
-				// This is probably because the container was stopped.
-				containerRespC <- containerWaitResponse{WaitResponse: container.WaitResponse{}, restarting: false}
-				return
-			}
+		for a.waitForContainerExit(ctx, containerID, containerRespC, containerErrC, logConfig, shouldRestartFunc, restartInterval, restartCount) {
+			restartCount++
 		}
 	}()
 
@@ -483,6 +453,7 @@ func (a *Client) runContainerLoop(
 			if evt.Action == "start" {
 				state.Status = domain.ContainerStatusRunning
 				sendState()
+
 				continue
 			}
 
@@ -512,6 +483,96 @@ func (a *Client) runContainerLoop(
 	}
 }
 
+// waitForContainerExit blocks while it waits for a container to exit, and restarts
+// it if configured to do so.
+func (a *Client) waitForContainerExit(
+	ctx context.Context,
+	containerID string,
+	containerRespC chan<- containerWaitResponse,
+	containerErrC chan<- error,
+	logConfig LogConfig,
+	shouldRestartFunc ShouldRestartFunc,
+	restartInterval time.Duration,
+	restartCount int,
+) bool {
+	var logs [][]byte
+	startedWaitingAt := time.Now()
+	respC, errC := a.apiClient.ContainerWait(ctx, containerID, container.WaitConditionNextExit)
+	logsC := a.getLogs(ctx, containerID, logConfig)
+
+	timer := time.NewTimer(restartInterval)
+	defer timer.Stop()
+	timer.Stop()
+
+	for {
+		select {
+		case resp := <-respC:
+			exit := func(err error) {
+				a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "false", "exit_code", resp.StatusCode, "restart_count", restartCount)
+				containerRespC <- containerWaitResponse{
+					WaitResponse: resp,
+					restarting:   false,
+					restartCount: restartCount,
+					err:          err,
+				}
+			}
+
+			// If the container exited with a non-zero status code, and debug
+			// logging is not enabled, log the container logs at ERROR level for
+			// debugging.
+			// TODO: parameterize
+			if resp.StatusCode != 0 && !a.logger.Enabled(ctx, slog.LevelDebug) {
+				for _, line := range logs {
+					a.logger.Error("Container log", "id", shortID(containerID), "log", string(line))
+				}
+			}
+
+			if shouldRestartFunc == nil {
+				exit(nil)
+				return false
+			}
+
+			shouldRestart, err := shouldRestartFunc(resp.StatusCode, restartCount, logs, time.Since(startedWaitingAt))
+			if shouldRestart && err != nil {
+				panic(fmt.Errorf("shouldRestart must return nil error if restarting, but returned: %w", err))
+			}
+			if !shouldRestart {
+				exit(err)
+				return false
+			}
+
+			a.logger.Info("Container exited", "id", shortID(containerID), "should_restart", "true", "exit_code", resp.StatusCode, "restart_count", restartCount)
+			timer.Reset(restartInterval)
+
+			containerRespC <- containerWaitResponse{
+				WaitResponse: resp,
+				restarting:   true,
+				restartCount: restartCount,
+			}
+			// Don't return yet. Wait for the timer to fire.
+		case <-timer.C:
+			a.logger.Info("Container restarting", "id", shortID(containerID), "restart_count", restartCount)
+			if err := a.apiClient.ContainerStart(ctx, containerID, container.StartOptions{}); err != nil {
+				containerErrC <- fmt.Errorf("container start: %w", err)
+				return false
+			}
+			a.logger.Info("Restarted container", "id", shortID(containerID))
+			return true
+		case line := <-logsC:
+			a.logger.Debug("Container log", "id", shortID(containerID), "log", string(line))
+			// TODO: limit max stored lines
+			logs = append(logs, line)
+		case err := <-errC:
+			containerErrC <- err
+			return false
+		case <-ctx.Done():
+			// This is probably because the container was stopped.
+			containerRespC <- containerWaitResponse{WaitResponse: container.WaitResponse{}, restarting: false}
+			return false
+		}
+	}
+}
+
 // Close closes the client, stopping and removing all running containers.
 func (a *Client) Close() error {
 	a.cancel()
diff --git a/internal/container/container_test.go b/internal/container/container_test.go
index 3e54622..344fe79 100644
--- a/internal/container/container_test.go
+++ b/internal/container/container_test.go
@@ -73,6 +73,10 @@ func TestClientRunContainer(t *testing.T) {
 		EXPECT().
 		Events(mock.Anything, events.ListOptions{Filters: filters.NewArgs(filters.Arg("container", "123"), filters.Arg("type", "container"))}).
 		Return(eventsC, eventsErrC)
+	dockerClient.
+		EXPECT().
+		ContainerLogs(mock.Anything, "123", mock.Anything).
+		Return(io.NopCloser(bytes.NewReader(nil)), nil)
 
 	containerClient, err := container.NewClient(t.Context(), &dockerClient, logger)
 	require.NoError(t, err)
@@ -82,7 +86,8 @@ func TestClientRunContainer(t *testing.T) {
 		ChanSize:        1,
 		ContainerConfig: &dockercontainer.Config{Image: "alpine"},
 		HostConfig:      &dockercontainer.HostConfig{},
-		CopyFileConfigs: []container.CopyFileConfig{
+		Logs:            container.LogConfig{Stdout: true},
+		CopyFiles: []container.CopyFileConfig{
 			{
 				Path:    "/hello",
 				Payload: bytes.NewReader([]byte("world")),
@@ -176,6 +181,10 @@ func TestClientRunContainerWithRestart(t *testing.T) {
 		EXPECT().
 		ContainerStart(mock.Anything, "123", dockercontainer.StartOptions{}). // restart
 		Return(nil)
+	dockerClient.
+		EXPECT().
+		ContainerLogs(mock.Anything, "123", mock.Anything).
+		Return(io.NopCloser(bytes.NewReader(nil)), nil)
 
 	containerClient, err := container.NewClient(t.Context(), &dockerClient, logger)
 	require.NoError(t, err)
@@ -185,7 +194,8 @@ func TestClientRunContainerWithRestart(t *testing.T) {
 		ChanSize:        1,
 		ContainerConfig: &dockercontainer.Config{Image: "alpine"},
 		HostConfig:      &dockercontainer.HostConfig{},
-		ShouldRestart: func(_ int64, restartCount int, _ time.Duration) (bool, error) {
+		Logs:            container.LogConfig{Stdout: true},
+		ShouldRestart: func(_ int64, restartCount int, _ [][]byte, _ time.Duration) (bool, error) {
 			if restartCount == 0 {
 				return true, nil
 			}
diff --git a/internal/container/logs.go b/internal/container/logs.go
new file mode 100644
index 0000000..df792de
--- /dev/null
+++ b/internal/container/logs.go
@@ -0,0 +1,53 @@
+package container
+
+import (
+	"bufio"
+	"context"
+	"log/slog"
+
+	typescontainer "github.com/docker/docker/api/types/container"
+)
+
+func getLogs(
+	ctx context.Context,
+	containerID string,
+	apiClient DockerClient,
+	cfg LogConfig,
+	ch chan<- []byte,
+	logger *slog.Logger,
+) {
+	logsC, err := apiClient.ContainerLogs(
+		ctx,
+		containerID,
+		typescontainer.LogsOptions{
+			ShowStdout: cfg.Stdout,
+			ShowStderr: cfg.Stderr,
+			Follow:     true,
+		},
+	)
+	if err != nil {
+		logger.Error("Error getting container logs", "err", err, "id", shortID(containerID))
+		return
+	}
+	defer logsC.Close()
+
+	scanner := bufio.NewScanner(logsC)
+	for scanner.Scan() {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			// Docker logs are prefixed with an 8 byte prefix.
+			// See client.ContainerLogs for more details.
+			// We could use
+			// [StdCopy](https://pkg.go.dev/github.com/docker/docker/pkg/stdcopy#StdCopy)
+			// but for our purposes it's enough to just slice it off.
+			const prefixLen = 8
+			line := scanner.Bytes()
+			if len(line) <= prefixLen {
+				continue
+			}
+			ch <- line[prefixLen:]
+		}
+	}
+}
diff --git a/internal/container/logs_test.go b/internal/container/logs_test.go
new file mode 100644
index 0000000..4158894
--- /dev/null
+++ b/internal/container/logs_test.go
@@ -0,0 +1,45 @@
+package container
+
+import (
+	"io"
+	"strings"
+	"testing"
+
+	"git.netflux.io/rob/octoplex/internal/container/mocks"
+	"git.netflux.io/rob/octoplex/internal/testhelpers"
+	typescontainer "github.com/docker/docker/api/types/container"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/mock"
+)
+
+func TestGetLogs(t *testing.T) {
+	var dockerClient mocks.DockerClient
+	dockerClient.
+		EXPECT().
+		ContainerLogs(mock.Anything, "123", typescontainer.LogsOptions{ShowStderr: true, Follow: true}).
+		Return(io.NopCloser(strings.NewReader("********line 1\n********line 2\n********line 3\n")), nil)
+
+	ch := make(chan []byte)
+
+	go func() {
+		defer close(ch)
+
+		getLogs(
+			t.Context(),
+			"123",
+			&dockerClient,
+			LogConfig{Stderr: true},
+			ch,
+			testhelpers.NewTestLogger(t),
+		)
+	}()
+
+	// Ensure we get the expected lines, including stripping 8 bytes of Docker
+	// multiplexing prefix.
+	assert.Equal(t, "line 1", string(<-ch))
+	assert.Equal(t, "line 2", string(<-ch))
+	assert.Equal(t, "line 3", string(<-ch))
+
+	_, ok := <-ch
+	assert.False(t, ok)
+}
diff --git a/internal/container/mocks/dockerclient_mock.go b/internal/container/mocks/dockerclient_mock.go
index 6ecc419..f353259 100644
--- a/internal/container/mocks/dockerclient_mock.go
+++ b/internal/container/mocks/dockerclient_mock.go
@@ -197,6 +197,66 @@ func (_c *DockerClient_ContainerList_Call) RunAndReturn(run func(context.Context
 	return _c
 }
 
+// ContainerLogs provides a mock function with given fields: _a0, _a1, _a2
+func (_m *DockerClient) ContainerLogs(_a0 context.Context, _a1 string, _a2 typescontainer.LogsOptions) (io.ReadCloser, error) {
+	ret := _m.Called(_a0, _a1, _a2)
+
+	if len(ret) == 0 {
+		panic("no return value specified for ContainerLogs")
+	}
+
+	var r0 io.ReadCloser
+	var r1 error
+	if rf, ok := ret.Get(0).(func(context.Context, string, typescontainer.LogsOptions) (io.ReadCloser, error)); ok {
+		return rf(_a0, _a1, _a2)
+	}
+	if rf, ok := ret.Get(0).(func(context.Context, string, typescontainer.LogsOptions) io.ReadCloser); ok {
+		r0 = rf(_a0, _a1, _a2)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(io.ReadCloser)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(context.Context, string, typescontainer.LogsOptions) error); ok {
+		r1 = rf(_a0, _a1, _a2)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// DockerClient_ContainerLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ContainerLogs'
+type DockerClient_ContainerLogs_Call struct {
+	*mock.Call
+}
+
+// ContainerLogs is a helper method to define mock.On call
+//   - _a0 context.Context
+//   - _a1 string
+//   - _a2 typescontainer.LogsOptions
+func (_e *DockerClient_Expecter) ContainerLogs(_a0 interface{}, _a1 interface{}, _a2 interface{}) *DockerClient_ContainerLogs_Call {
+	return &DockerClient_ContainerLogs_Call{Call: _e.mock.On("ContainerLogs", _a0, _a1, _a2)}
+}
+
+func (_c *DockerClient_ContainerLogs_Call) Run(run func(_a0 context.Context, _a1 string, _a2 typescontainer.LogsOptions)) *DockerClient_ContainerLogs_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(context.Context), args[1].(string), args[2].(typescontainer.LogsOptions))
+	})
+	return _c
+}
+
+func (_c *DockerClient_ContainerLogs_Call) Return(_a0 io.ReadCloser, _a1 error) *DockerClient_ContainerLogs_Call {
+	_c.Call.Return(_a0, _a1)
+	return _c
+}
+
+func (_c *DockerClient_ContainerLogs_Call) RunAndReturn(run func(context.Context, string, typescontainer.LogsOptions) (io.ReadCloser, error)) *DockerClient_ContainerLogs_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
 // ContainerRemove provides a mock function with given fields: _a0, _a1, _a2
 func (_m *DockerClient) ContainerRemove(_a0 context.Context, _a1 string, _a2 typescontainer.RemoveOptions) error {
 	ret := _m.Called(_a0, _a1, _a2)
diff --git a/internal/mediaserver/actor.go b/internal/mediaserver/actor.go
index 1cff252..17da238 100644
--- a/internal/mediaserver/actor.go
+++ b/internal/mediaserver/actor.go
@@ -186,7 +186,8 @@ func (a *Actor) Start(ctx context.Context) error {
 				PortBindings: portBindings,
 			},
 			NetworkCountConfig: container.NetworkCountConfig{Rx: "eth0", Tx: "eth1"},
-			CopyFileConfigs: []container.CopyFileConfig{
+			Logs:               container.LogConfig{Stdout: true},
+			CopyFiles: []container.CopyFileConfig{
 				{
 					Path:    "/mediamtx.yml",
 					Payload: bytes.NewReader(cfg),
diff --git a/internal/replicator/replicator.go b/internal/replicator/replicator.go
index d3ca087..90fbe56 100644
--- a/internal/replicator/replicator.go
+++ b/internal/replicator/replicator.go
@@ -7,6 +7,7 @@ import (
 	"fmt"
 	"log/slog"
 	"strconv"
+	"strings"
 	"sync"
 	"time"
 
@@ -96,6 +97,7 @@ func (a *Actor) StartDestination(url string) {
 			ContainerConfig: &typescontainer.Config{
 				Image: imageNameFFMPEG,
 				Cmd: []string{
+					"-loglevel", "level+error",
 					"-i", a.sourceURL,
 					"-c", "copy",
 					"-f", "flv",
@@ -108,13 +110,13 @@ func (a *Actor) StartDestination(url string) {
 			},
 			HostConfig:         &typescontainer.HostConfig{NetworkMode: "default"},
 			NetworkCountConfig: container.NetworkCountConfig{Rx: "eth1", Tx: "eth0"},
-			ShouldRestart: func(_ int64, restartCount int, runningTime time.Duration) (bool, error) {
+			Logs:               container.LogConfig{Stderr: true},
+			ShouldRestart: func(_ int64, restartCount int, logs [][]byte, runningTime time.Duration) (bool, error) {
 				// Try to infer if the container failed to start.
 				//
-				// TODO: this is a bit hacky, we should check the container logs and
-				// include some details in the error message.
+				// For now, we just check if it was running for less than ten seconds.
 				if restartCount == 0 && runningTime < 10*time.Second {
-					return false, errors.New("container failed to start")
+					return false, containerStartErrFromLogs(logs)
 				}
 
 				// Otherwise, always restart, regardless of the exit code.
@@ -131,6 +133,32 @@ func (a *Actor) StartDestination(url string) {
 	}
 }
 
+// Grab the first fatal log line, if it exists, or the first error log line,
+// from the FFmpeg output.
+func containerStartErrFromLogs(logs [][]byte) error {
+	var fatalLog, errLog string
+
+	for _, logBytes := range logs {
+		log := string(logBytes)
+		if strings.HasPrefix(log, "[fatal]") {
+			fatalLog = log
+			break
+		}
+	}
+
+	if fatalLog == "" {
+		for _, logBytes := range logs {
+			log := string(logBytes)
+			if strings.HasPrefix(log, "[error]") {
+				errLog = log
+				break
+			}
+		}
+	}
+
+	return errors.New(cmp.Or(fatalLog, errLog, "container failed to start"))
+}
+
 // StopDestination stops a destination stream.
 func (a *Actor) StopDestination(url string) {
 	a.actorC <- func() {
diff --git a/internal/replicator/replicator_test.go b/internal/replicator/replicator_test.go
new file mode 100644
index 0000000..8c321f0
--- /dev/null
+++ b/internal/replicator/replicator_test.go
@@ -0,0 +1,51 @@
+package replicator
+
+import (
+	"errors"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestContainerStartErrFromLogs(t *testing.T) {
+	testCases := []struct {
+		name string
+		logs [][]byte
+		want error
+	}{
+		{
+			name: "no logs",
+			want: errors.New("container failed to start"),
+		},
+		{
+			name: "with only error logs",
+			logs: [][]byte{
+				[]byte("[error] this is an error log"),
+				[]byte("[error] this is another error log"),
+			},
+			want: errors.New("[error] this is an error log"),
+		},
+		{
+			name: "with only fatal logs",
+			logs: [][]byte{
+				[]byte("[fatal] this is a fatal log"),
+				[]byte("[fatal] this is another fatal log"),
+			},
+			want: errors.New("[fatal] this is a fatal log"),
+		},
+		{
+			name: "with error and fatal logs",
+			logs: [][]byte{
+				[]byte("[error] this is an error log"),
+				[]byte("[fatal] this is a fatal log"),
+			},
+			want: errors.New("[fatal] this is a fatal log"),
+		},
+	}
+
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			assert.Equal(t, tc.want, containerStartErrFromLogs(tc.logs))
+		})
+	}
+}