From 082769380109a7cb7ba55639e2de1d6208718a30 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Fri, 3 Jun 2022 22:20:23 +0200 Subject: [PATCH] podWatcher: handle closed channel --- logs/pod_watcher.go | 7 ++++++- logs/pod_watcher_test.go | 33 +++++++++++++++++++++++++++++++++ logs/watcher.go | 1 + 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/logs/pod_watcher.go b/logs/pod_watcher.go index 87c9cd1..2fde12a 100644 --- a/logs/pod_watcher.go +++ b/logs/pod_watcher.go @@ -100,6 +100,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { // streamErrors is never closed. streamErrors := make(chan *streamError) + resultChan := watcher.ResultChan() for { select { @@ -143,7 +144,11 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { return fmt.Errorf("error streaming logs: %w", streamErr) } - case evt := <-watcher.ResultChan(): + case evt, ok := <-resultChan: + if !ok { + resultChan = nil + continue + } switch evt.Type { case watch.Added, watch.Modified: pod := evt.Object.(*corev1.Pod) diff --git a/logs/pod_watcher_test.go b/logs/pod_watcher_test.go index b4313c4..059895a 100644 --- a/logs/pod_watcher_test.go +++ b/logs/pod_watcher_test.go @@ -60,6 +60,39 @@ func (m *mockClientset) GetLogs(podName string, _ *corev1.PodLogOptions) *rest.R return fakeClient.Request() } +func TestPodWatcherClosedWatcher(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + podsWatcher := watch.NewFake() + + clientset := mockClientset{ + getLogsRespBody: "it worked", + getLogsStatusCode: http.StatusOK, + podsWatcher: podsWatcher, + } + clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) + + client := logs.KubernetesClient{Interface: &clientset} + selector := metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}} + + go func() { + defer podsWatcher.Stop() + podsWatcher.Add(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + }) + }() + + var buf bytes.Buffer + pw := logs.NewPodWatcher(client, "mycontainer", &selector, &buf) + + // WatchPods should wait for the logs and return cleanly. + err := pw.WatchPods(ctx) + require.EqualError(t, err, context.DeadlineExceeded.Error()) + require.Equal(t, "[foo] it worked\n", buf.String()) +} + func TestPodWatcher(t *testing.T) { testCases := []struct { name string diff --git a/logs/watcher.go b/logs/watcher.go index 73f8bb0..f55c827 100644 --- a/logs/watcher.go +++ b/logs/watcher.go @@ -98,6 +98,7 @@ func (w *Watcher) Watch(ctx context.Context) error { for { select { + // TODO: check if closed. case evt := <-deploymentsWatcher.ResultChan(): switch evt.Type { case watch.Added, watch.Modified: