From 59a93350170a77c54d36402daf6aedbacad0ed7e Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Sat, 4 Jun 2022 02:27:20 +0200 Subject: [PATCH] watcher: handle closed channel --- logs/pod_watcher_test.go | 4 ++-- logs/watcher.go | 8 ++++++-- logs/watcher_test.go | 20 ++++++++++++++++++++ 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/logs/pod_watcher_test.go b/logs/pod_watcher_test.go index 059895a..e69c10b 100644 --- a/logs/pod_watcher_test.go +++ b/logs/pod_watcher_test.go @@ -89,8 +89,8 @@ func TestPodWatcherClosedWatcher(t *testing.T) { // WatchPods should wait for the logs and return cleanly. err := pw.WatchPods(ctx) - require.EqualError(t, err, context.DeadlineExceeded.Error()) - require.Equal(t, "[foo] it worked\n", buf.String()) + require.Equal(t, context.DeadlineExceeded, err) + assert.Equal(t, "[foo] it worked\n", buf.String()) } func TestPodWatcher(t *testing.T) { diff --git a/logs/watcher.go b/logs/watcher.go index f55c827..80c5058 100644 --- a/logs/watcher.go +++ b/logs/watcher.go @@ -96,10 +96,14 @@ func (w *Watcher) Watch(ctx context.Context) error { ticker := time.NewTicker(time.Second) defer ticker.Stop() + resultChan := deploymentsWatcher.ResultChan() for { select { - // TODO: check if closed. - case evt := <-deploymentsWatcher.ResultChan(): + case evt, ok := <-resultChan: + if !ok { + resultChan = nil + continue + } switch evt.Type { case watch.Added, watch.Modified: deployment := evt.Object.(*appsv1.Deployment) diff --git a/logs/watcher_test.go b/logs/watcher_test.go index 1471d5f..2427be6 100644 --- a/logs/watcher_test.go +++ b/logs/watcher_test.go @@ -64,6 +64,26 @@ func TestWatcherPodWatcherError(t *testing.T) { assert.EqualError(t, err, wantErr.Error()) } +func TestWatcherClosedChannel(t *testing.T) { + deploymentsWatcher := watch.NewFake() + clientset := testclient.NewSimpleClientset( + &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}}, + ) + clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) + + var buf bytes.Buffer + client := logs.KubernetesClient{Interface: clientset} + watcher := logs.NewWatcher("mydeployment", "mycontainer", true, client, nil, &buf) + go deploymentsWatcher.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + + err := watcher.Watch(ctx) + require.Equal(t, context.DeadlineExceeded, err) + assert.Equal(t, "", buf.String()) +} + func TestWatcherWithPodWatcher(t *testing.T) { ctx, cancel := context.WithCancel(context.Background())