From a993806666d73a8a7ec92db94768d26c2deb4d90 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Thu, 6 Feb 2025 20:27:22 +0100 Subject: [PATCH] refactor(container): events handling --- container/container.go | 45 ++++---------------------------- container/events.go | 54 ++++++++++++++++++++++++++++++++++++++ container/events_test.go | 56 ++++++++++++++++++++++++++++++++++++++++ testhelpers/docker.go | 9 +++++++ 4 files changed, 124 insertions(+), 40 deletions(-) create mode 100644 container/events.go create mode 100644 container/events_test.go diff --git a/container/container.go b/container/container.go index 15d4b66..53196a5 100644 --- a/container/container.go +++ b/container/container.go @@ -3,7 +3,6 @@ package container import ( "cmp" "context" - "errors" "fmt" "io" "log/slog" @@ -90,7 +89,7 @@ type stats struct { } // getStats returns a channel that will receive container stats. The channel is -// never closed, but the spawned goroutine will exit when the context is +// never closed, but any spawned goroutines will exit when the context is // cancelled. func (a *Client) getStats(containerID string, networkCountConfig NetworkCountConfig) <-chan stats { ch := make(chan stats) @@ -101,48 +100,14 @@ func (a *Client) getStats(containerID string, networkCountConfig NetworkCountCon } // getEvents returns a channel that will receive container events. The channel is -// never closed, but the spawned goroutine will exit when the context is +// never closed, but any spawned goroutines will exit when the context is // cancelled. func (a *Client) getEvents(containerID string) <-chan events.Message { - sendC := make(chan events.Message) + ch := make(chan events.Message) - getEvents := func() (bool, error) { - recvC, errC := a.apiClient.Events(a.ctx, events.ListOptions{ - Filters: filters.NewArgs( - filters.Arg("container", containerID), - filters.Arg("type", "container"), - ), - }) + go handleEvents(a.ctx, containerID, a.apiClient, a.logger, ch) - for { - select { - case <-a.ctx.Done(): - return false, a.ctx.Err() - case evt := <-recvC: - sendC <- evt - case err := <-errC: - if a.ctx.Err() != nil || errors.Is(err, io.EOF) { - return false, err - } - - return true, err - } - } - } - - go func() { - for { - shouldRetry, err := getEvents() - if !shouldRetry { - break - } - - a.logger.Warn("Error receiving Docker events", "err", err, "id", shortID(containerID)) - time.Sleep(2 * time.Second) - } - }() - - return sendC + return ch } type NetworkCountConfig struct { diff --git a/container/events.go b/container/events.go new file mode 100644 index 0000000..1e2e5bf --- /dev/null +++ b/container/events.go @@ -0,0 +1,54 @@ +package container + +import ( + "context" + "errors" + "io" + "log/slog" + + "github.com/docker/docker/api/types/events" + "github.com/docker/docker/api/types/filters" +) + +func handleEvents( + ctx context.Context, + containerID string, + apiClient DockerClient, + logger *slog.Logger, + ch chan events.Message, +) { + getEvents := func() (bool, error) { + recvC, errC := apiClient.Events(ctx, events.ListOptions{ + Filters: filters.NewArgs( + filters.Arg("container", containerID), + filters.Arg("type", "container"), + ), + }) + + for { + select { + case <-ctx.Done(): + return false, ctx.Err() + case evt := <-recvC: + ch <- evt + case err := <-errC: + if ctx.Err() != nil || errors.Is(err, io.EOF) { + return false, err + } + + return true, err + } + } + } + + go func() { + for { + shouldRetry, err := getEvents() + if !shouldRetry { + break + } + + logger.Warn("Error receiving Docker events", "err", err, "id", shortID(containerID)) + } + }() +} diff --git a/container/events_test.go b/container/events_test.go new file mode 100644 index 0000000..fafcfcc --- /dev/null +++ b/container/events_test.go @@ -0,0 +1,56 @@ +package container + +import ( + "context" + "errors" + "io" + "testing" + + "git.netflux.io/rob/termstream/testhelpers" + "github.com/docker/docker/api/types/events" + "github.com/stretchr/testify/assert" +) + +func TestHandleEvents(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + var count int + eventsC1 := make(chan events.Message) + eventsC2 := make(chan events.Message) + eventsCFunc := func() <-chan events.Message { + if count > 0 { + return eventsC2 + } + + count++ + return eventsC1 + } + errC := make(chan error) + + containerID := "b905f51b47242090ae504c184c7bc84d6274511ef763c1847039dcaa00a3ad27" + dockerClient := testhelpers.MockDockerClient{EventsResponse: eventsCFunc, EventsErr: errC} + logger := testhelpers.NewNopLogger() + ch := make(chan events.Message) + + done := make(chan struct{}) + go func() { + defer close(done) + + handleEvents(ctx, containerID, &dockerClient, logger, ch) + }() + + go func() { + eventsC1 <- events.Message{Action: "start"} + eventsC1 <- events.Message{Action: "stop"} + errC <- errors.New("foo") + eventsC2 <- events.Message{Action: "continue"} + errC <- io.EOF + }() + + assert.Equal(t, events.Action("start"), (<-ch).Action) + assert.Equal(t, events.Action("stop"), (<-ch).Action) + assert.Equal(t, events.Action("continue"), (<-ch).Action) + + <-done +} diff --git a/testhelpers/docker.go b/testhelpers/docker.go index 977ecf0..1038d7b 100644 --- a/testhelpers/docker.go +++ b/testhelpers/docker.go @@ -5,16 +5,25 @@ import ( "io" "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/events" "github.com/docker/docker/client" ) // MockDockerClient is a mock docker client. +// +// TODO: migrate to mockery. type MockDockerClient struct { *client.Client ContainerStatsResponse io.ReadCloser + EventsResponse func() <-chan events.Message + EventsErr <-chan error } func (c *MockDockerClient) ContainerStats(context.Context, string, bool) (container.StatsResponseReader, error) { return container.StatsResponseReader{Body: c.ContainerStatsResponse}, nil } + +func (c *MockDockerClient) Events(context.Context, events.ListOptions) (<-chan events.Message, <-chan error) { + return c.EventsResponse(), c.EventsErr +}