refactor(container): events handling

This commit is contained in:
Rob Watson 2025-02-06 20:27:22 +01:00
parent a9d59387c4
commit a993806666
4 changed files with 124 additions and 40 deletions

View File

@ -3,7 +3,6 @@ package container
import ( import (
"cmp" "cmp"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"log/slog" "log/slog"
@ -90,7 +89,7 @@ type stats struct {
} }
// getStats returns a channel that will receive container stats. The channel is // getStats returns a channel that will receive container stats. The channel is
// never closed, but the spawned goroutine will exit when the context is // never closed, but any spawned goroutines will exit when the context is
// cancelled. // cancelled.
func (a *Client) getStats(containerID string, networkCountConfig NetworkCountConfig) <-chan stats { func (a *Client) getStats(containerID string, networkCountConfig NetworkCountConfig) <-chan stats {
ch := make(chan stats) ch := make(chan stats)
@ -101,48 +100,14 @@ func (a *Client) getStats(containerID string, networkCountConfig NetworkCountCon
} }
// getEvents returns a channel that will receive container events. The channel is // getEvents returns a channel that will receive container events. The channel is
// never closed, but the spawned goroutine will exit when the context is // never closed, but any spawned goroutines will exit when the context is
// cancelled. // cancelled.
func (a *Client) getEvents(containerID string) <-chan events.Message { func (a *Client) getEvents(containerID string) <-chan events.Message {
sendC := make(chan events.Message) ch := make(chan events.Message)
getEvents := func() (bool, error) { go handleEvents(a.ctx, containerID, a.apiClient, a.logger, ch)
recvC, errC := a.apiClient.Events(a.ctx, events.ListOptions{
Filters: filters.NewArgs(
filters.Arg("container", containerID),
filters.Arg("type", "container"),
),
})
for { return ch
select {
case <-a.ctx.Done():
return false, a.ctx.Err()
case evt := <-recvC:
sendC <- evt
case err := <-errC:
if a.ctx.Err() != nil || errors.Is(err, io.EOF) {
return false, err
}
return true, err
}
}
}
go func() {
for {
shouldRetry, err := getEvents()
if !shouldRetry {
break
}
a.logger.Warn("Error receiving Docker events", "err", err, "id", shortID(containerID))
time.Sleep(2 * time.Second)
}
}()
return sendC
} }
type NetworkCountConfig struct { type NetworkCountConfig struct {

54
container/events.go Normal file
View File

@ -0,0 +1,54 @@
package container
import (
"context"
"errors"
"io"
"log/slog"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
)
func handleEvents(
ctx context.Context,
containerID string,
apiClient DockerClient,
logger *slog.Logger,
ch chan events.Message,
) {
getEvents := func() (bool, error) {
recvC, errC := apiClient.Events(ctx, events.ListOptions{
Filters: filters.NewArgs(
filters.Arg("container", containerID),
filters.Arg("type", "container"),
),
})
for {
select {
case <-ctx.Done():
return false, ctx.Err()
case evt := <-recvC:
ch <- evt
case err := <-errC:
if ctx.Err() != nil || errors.Is(err, io.EOF) {
return false, err
}
return true, err
}
}
}
go func() {
for {
shouldRetry, err := getEvents()
if !shouldRetry {
break
}
logger.Warn("Error receiving Docker events", "err", err, "id", shortID(containerID))
}
}()
}

56
container/events_test.go Normal file
View File

@ -0,0 +1,56 @@
package container
import (
"context"
"errors"
"io"
"testing"
"git.netflux.io/rob/termstream/testhelpers"
"github.com/docker/docker/api/types/events"
"github.com/stretchr/testify/assert"
)
func TestHandleEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
var count int
eventsC1 := make(chan events.Message)
eventsC2 := make(chan events.Message)
eventsCFunc := func() <-chan events.Message {
if count > 0 {
return eventsC2
}
count++
return eventsC1
}
errC := make(chan error)
containerID := "b905f51b47242090ae504c184c7bc84d6274511ef763c1847039dcaa00a3ad27"
dockerClient := testhelpers.MockDockerClient{EventsResponse: eventsCFunc, EventsErr: errC}
logger := testhelpers.NewNopLogger()
ch := make(chan events.Message)
done := make(chan struct{})
go func() {
defer close(done)
handleEvents(ctx, containerID, &dockerClient, logger, ch)
}()
go func() {
eventsC1 <- events.Message{Action: "start"}
eventsC1 <- events.Message{Action: "stop"}
errC <- errors.New("foo")
eventsC2 <- events.Message{Action: "continue"}
errC <- io.EOF
}()
assert.Equal(t, events.Action("start"), (<-ch).Action)
assert.Equal(t, events.Action("stop"), (<-ch).Action)
assert.Equal(t, events.Action("continue"), (<-ch).Action)
<-done
}

View File

@ -5,16 +5,25 @@ import (
"io" "io"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/client" "github.com/docker/docker/client"
) )
// MockDockerClient is a mock docker client. // MockDockerClient is a mock docker client.
//
// TODO: migrate to mockery.
type MockDockerClient struct { type MockDockerClient struct {
*client.Client *client.Client
ContainerStatsResponse io.ReadCloser ContainerStatsResponse io.ReadCloser
EventsResponse func() <-chan events.Message
EventsErr <-chan error
} }
func (c *MockDockerClient) ContainerStats(context.Context, string, bool) (container.StatsResponseReader, error) { func (c *MockDockerClient) ContainerStats(context.Context, string, bool) (container.StatsResponseReader, error) {
return container.StatsResponseReader{Body: c.ContainerStatsResponse}, nil return container.StatsResponseReader{Body: c.ContainerStatsResponse}, nil
} }
func (c *MockDockerClient) Events(context.Context, events.ListOptions) (<-chan events.Message, <-chan error) {
return c.EventsResponse(), c.EventsErr
}