From cbdcef097f25d6f3c220bad79f40d0fcf1580a85 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Fri, 8 Jul 2022 19:39:10 +0200 Subject: [PATCH] Fix race condition in tests In normal usage, the io.Writer passed to the watcher is typically `os.Stdout` and does not require locking because it is not being read from inside the same process. In tests however, the io.Writer was a *bytes.Buffer which is read concurrently from another goroutine, introducing a race condition. This fixes the issue in the test suite by introducing a wrapper around *bytes.Buffer which implements the required locking. --- logs/pod_watcher.go | 2 -- logs/pod_watcher_test.go | 6 +++--- logs/watcher.go | 4 ++++ logs/watcher_test.go | 31 ++++++++++++++++++++----------- 4 files changed, 27 insertions(+), 16 deletions(-) diff --git a/logs/pod_watcher.go b/logs/pod_watcher.go index 596b975..d82d827 100644 --- a/logs/pod_watcher.go +++ b/logs/pod_watcher.go @@ -20,8 +20,6 @@ import ( toolswatch "k8s.io/client-go/tools/watch" ) -const nl = "\n" - // logStream represents the logStream from an individual pod. type logStream struct { podName string diff --git a/logs/pod_watcher_test.go b/logs/pod_watcher_test.go index d96cff1..ebe06c3 100644 --- a/logs/pod_watcher_test.go +++ b/logs/pod_watcher_test.go @@ -194,8 +194,8 @@ func TestPodWatcher(t *testing.T) { client := logs.KubernetesClient{Typed: &clientset} selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) - var buf bytes.Buffer - pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger()) + var cb concurrentBuffer + pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &cb, discardLogger()) go func() { for _, pod := range tc.podEvents { @@ -215,7 +215,7 @@ func TestPodWatcher(t *testing.T) { } if tc.wantOut != nil { - lines := bufToLines(&buf) + lines := bufToLines(&cb) require.Len(t, lines, len(tc.wantOut)) assert.ElementsMatch(t, tc.wantOut, lines) } diff --git a/logs/watcher.go b/logs/watcher.go index c959caa..78d9534 100644 --- a/logs/watcher.go +++ b/logs/watcher.go @@ -63,6 +63,10 @@ type Watcher struct { } // NewWatcher creates a new Watcher. +// +// The resource defined by WatcherParams will be monitored and all associated +// logs will be written to `dst`. Note that this writer should provide its own +// locking mechanisms if needed. func NewWatcher(params WatcherParams, client KubernetesClient, podWatcherFunc PodWatcherFunc, dst io.Writer, logger *log.Logger) *Watcher { return &Watcher{ params: params, diff --git a/logs/watcher_test.go b/logs/watcher_test.go index 0f1177e..cbc9add 100644 --- a/logs/watcher_test.go +++ b/logs/watcher_test.go @@ -26,16 +26,26 @@ import ( k8stest "k8s.io/client-go/testing" ) -type concurrentWriter struct { - w io.Writer +// concurrentBuffer is a buffer which implements io.Writer and fmt.Stringer. It +// is useful in tests to allow the output to be read back from the destination +// buffer. +type concurrentBuffer struct { + bytes.Buffer mu sync.Mutex } -func (cw *concurrentWriter) Write(p []byte) (int, error) { - cw.mu.Lock() - defer cw.mu.Unlock() +func (cb *concurrentBuffer) Write(p []byte) (int, error) { + cb.mu.Lock() + defer cb.mu.Unlock() - return cw.w.Write(p) + return cb.Buffer.Write(p) +} + +func (cb *concurrentBuffer) String() string { + cb.mu.Lock() + defer cb.mu.Unlock() + + return cb.Buffer.String() } type mockPodWatcher struct{ err error } @@ -134,10 +144,9 @@ func TestWatcherWithPodWatcher(t *testing.T) { untypedClient.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) client := logs.KubernetesClient{Typed: typedClient, Untyped: untypedClient} - var buf bytes.Buffer - cw := concurrentWriter{w: &buf} + var cb concurrentBuffer params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"} - watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &cw, discardLogger()) + watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &cb, discardLogger()) go func() { deployment := buildDeployment(t, "mydeployment") @@ -159,12 +168,12 @@ func TestWatcherWithPodWatcher(t *testing.T) { err := watcher.Watch(context.Background()) require.NoError(t, err) - lines := bufToLines(&buf) + lines := bufToLines(&cb) require.Len(t, lines, 2) assert.ElementsMatch(t, []string{"[foo] fake logs", "[bar] fake logs"}, lines) } // bufToLines splits the output buffer removing newline characters. -func bufToLines(buf *bytes.Buffer) []string { +func bufToLines(buf *concurrentBuffer) []string { return strings.Split(strings.TrimSpace(buf.String()), "\n") }