diff --git a/logs/pod_watcher.go b/logs/pod_watcher.go index 4d79c30..75fa1c5 100644 --- a/logs/pod_watcher.go +++ b/logs/pod_watcher.go @@ -16,6 +16,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + toolswatch "k8s.io/client-go/tools/watch" ) const nl = "\n" @@ -107,10 +109,11 @@ func (pw *PodWatcher) Close() { func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { podsClient := pw.client.Typed.CoreV1().Pods(pw.namespace) - // Returns a watcher which notifies of changes in the relevant pods. - // We don't defer Stop() on the returned value because the sender is the - // Kubernetes SDK, and that may introduce a send-on-closed-channel panic. - watcher, err := podsClient.Watch(ctx, metav1.ListOptions{LabelSelector: pw.labelSelector.String()}) + watchFunc := func(_ metav1.ListOptions) (watch.Interface, error) { + return podsClient.Watch(ctx, metav1.ListOptions{LabelSelector: pw.labelSelector.String()}) + } + + retryWatcher, err := toolswatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watchFunc}) if err != nil { return err } @@ -121,7 +124,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { logLines := make(chan string) logErrorChan := make(chan error) - resultChan := watcher.ResultChan() + resultChan := retryWatcher.ResultChan() for { select { case <-ctx.Done(): @@ -138,7 +141,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { // process the logs of any missing pods. for podName, pod := range pw.spec { if _, ok := pw.status[podName]; !ok { - pw.logger.Printf("[PodWatcher] adding pod, name = %s, container = %s, namespace = %s,", pod.Name, pw.container, pod.Namespace) + pw.logger.Printf("[PodWatcher] adding pod, name = %s, container = %s, namespace = %s", pod.Name, pw.container, pod.Namespace) stream := newStream(pod.Name, pw.container, pod.Namespace) pw.status[pod.Name] = stream diff --git a/logs/pod_watcher_test.go b/logs/pod_watcher_test.go index b81be7e..d96cff1 100644 --- a/logs/pod_watcher_test.go +++ b/logs/pod_watcher_test.go @@ -78,7 +78,7 @@ func TestPodWatcherClose(t *testing.T) { pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger()) go func() { - podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) + podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) time.Sleep(time.Second) // Close() should cause the watcher to return cleanly: pw.Close() @@ -120,14 +120,14 @@ func TestPodWatcherRemovedPod(t *testing.T) { go func() { defer pw.Close() - podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) + podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) time.Sleep(time.Millisecond * 500) w1.Write([]byte("should be logged\n")) - podsWatcher.Delete(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) + podsWatcher.Delete(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) time.Sleep(time.Millisecond * 500) w1.Write([]byte("should not be logged\n")) time.Sleep(time.Millisecond * 500) - podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) + podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) time.Sleep(time.Millisecond * 500) w2.Write([]byte("should be logged\n")) time.Sleep(time.Millisecond * 500) @@ -152,7 +152,7 @@ func TestPodWatcher(t *testing.T) { { name: "unexpected error getting logs", podEvents: []*corev1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, }, getLogsErr: errors.New("nope"), wantOut: nil, @@ -161,7 +161,7 @@ func TestPodWatcher(t *testing.T) { { name: "recoverable error getting logs", podEvents: []*corev1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, }, getLogsErr: &apierrors.StatusError{ErrStatus: metav1.Status{Message: "is waiting to start: ContainerCreating", Reason: metav1.StatusReasonBadRequest}}, wantOut: nil, @@ -169,9 +169,9 @@ func TestPodWatcher(t *testing.T) { { name: "success", podEvents: []*corev1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, - {ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, - {ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodPending}}, + {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodPending}}, }, getLogsRespBody: "some logs", getLogsStatusCode: http.StatusOK, diff --git a/logs/watcher_test.go b/logs/watcher_test.go index 7d0d287..ad2492d 100644 --- a/logs/watcher_test.go +++ b/logs/watcher_test.go @@ -7,6 +7,7 @@ import ( "io" "log" "strings" + "sync" "testing" "time" @@ -25,6 +26,18 @@ import ( k8stest "k8s.io/client-go/testing" ) +type concurrentWriter struct { + w io.Writer + mu sync.Mutex +} + +func (cw *concurrentWriter) Write(p []byte) (int, error) { + cw.mu.Lock() + defer cw.mu.Unlock() + + return cw.w.Write(p) +} + type mockPodWatcher struct{ err error } func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err } @@ -122,8 +135,9 @@ func TestWatcherWithPodWatcher(t *testing.T) { client := logs.KubernetesClient{Typed: typedClient, Untyped: untypedClient} var buf bytes.Buffer + cw := concurrentWriter{w: &buf} params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"} - watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &buf, discardLogger()) + watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &cw, discardLogger()) go func() { deployment := buildDeployment(t, "mydeployment") @@ -131,9 +145,9 @@ func TestWatcherWithPodWatcher(t *testing.T) { time.Sleep(time.Millisecond * 250) pods := []*corev1.Pod{ - {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, - {ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, - {ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodPending}}, + {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodPending}}, } for _, pod := range pods { podsWatcher.Add(pod)