From 0b0db0ee8f700bce4c4eb58dc36e2290b5d0e998 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Wed, 1 Jun 2022 19:19:55 +0200 Subject: [PATCH] Add per-deployment watcher --- logs/pod_watcher.go | 190 ++++++++++++++++++++++++++++++++++++++ logs/watcher.go | 211 ++++++++++++++++--------------------------- logs/watcher_test.go | 135 ++++++++++++++++++--------- main.go | 10 +- 4 files changed, 366 insertions(+), 180 deletions(-) create mode 100644 logs/pod_watcher.go diff --git a/logs/pod_watcher.go b/logs/pod_watcher.go new file mode 100644 index 0000000..b5ff63d --- /dev/null +++ b/logs/pod_watcher.go @@ -0,0 +1,190 @@ +package logs + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "log" + "strings" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" +) + +const nl = "\n" + +type streamError struct { + err error + podName string + recoverable bool +} + +func (re *streamError) Error() string { + return re.err.Error() +} + +// PodWatcher consumes and merges the logs for a specified set of pods. +type PodWatcher struct { + clientset KubernetesClient + container string + labelSelector *metav1.LabelSelector + spec map[string]*corev1.Pod + status map[string]bool + streamResults chan error + dst io.Writer + closeChan chan struct{} +} + +// NewPodWatcher initializes a new PodWatcher. +func NewPodWatcher(clientset KubernetesClient, container string, labelSelector *metav1.LabelSelector, dst io.Writer) PodWatcherInterface { + return &PodWatcher{ + clientset: clientset, + container: container, + labelSelector: labelSelector, + spec: make(map[string]*corev1.Pod), + status: make(map[string]bool), + streamResults: make(chan error), + dst: dst, + } +} + +// WatchPods blocks while it watches the pods which match the Selector of the +// provided deployment. +// +// Cancelling ctx will result in the event loop exiting immediately. +func (pw *PodWatcher) WatchPods(ctx context.Context) error { + var wg sync.WaitGroup + err := pw.watchPods(ctx, &wg) + + wg.Wait() + log.Println("[PodWatcher] all goroutines exited, exiting") + + return err +} + +// Close terminates the watcher, waiting for all logs to be consumed before +// exiting. +func (pw *PodWatcher) Close() { + pw.closeChan <- struct{}{} +} + +func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { + podsClient := pw.clientset.CoreV1().Pods(corev1.NamespaceDefault) + labelsMap, err := metav1.LabelSelectorAsMap(pw.labelSelector) + if err != nil { + return err + } + watcher, err := podsClient.Watch(ctx, metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labelsMap).String()}) + if err != nil { + return err + } + defer watcher.Stop() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + // streamErrors is never closed. + streamErrors := make(chan error) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case <-pw.closeChan: + return nil + + case <-ticker.C: + // Iterate through the desired state (w.spec) and launch goroutines to + // process the logs of any missing pods. + for podName, pod := range pw.spec { + pod := pod + if _, ok := pw.status[podName]; !ok { + log.Printf("[PodWatcher] adding pod, name = %s", pod.Name) + pw.status[pod.Name] = true + wg.Add(1) + go func() { + if err := copyPodLogs(ctx, wg, pw.clientset, pod, pw.container, pw.dst); err != nil { + streamErrors <- err + } + }() + } + } + // For any pods which no longer exist, remove the pod. + // TODO: check this is needed when a pod's labels change to no longer + // match the deployment's selector. + for podName := range pw.status { + if _, ok := pw.spec[podName]; !ok { + pw.removePod(podName) + } + } + + case err := <-streamErrors: + var streamErr *streamError + if errors.As(err, &streamErr) && streamErr.recoverable { + // if the error is recoverable, we just remove the pod from the status + // map. It will be recreated and retried on the next iteration. + pw.removePod(streamErr.podName) + } else { + return streamErr + } + + case evt := <-watcher.ResultChan(): + switch evt.Type { + case watch.Added, watch.Modified: + pod := evt.Object.(*corev1.Pod) + if pod.Status.Phase == corev1.PodRunning { + pw.spec[pod.Name] = pod + } + case watch.Deleted: + pod := evt.Object.(*corev1.Pod) + delete(pw.spec, pod.Name) + } + } + } +} + +func (pw *PodWatcher) removePod(podName string) { + log.Printf("[PodWatcher] removing pod, name = %s", podName) + delete(pw.status, podName) +} + +func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, clientset KubernetesClient, p *corev1.Pod, container string, dst io.Writer) error { + defer wg.Done() + + podLogOpts := corev1.PodLogOptions{ + Follow: true, + Container: container, + } + req := clientset.CoreV1().Pods(p.Namespace).GetLogs(p.Name, &podLogOpts) + logs, err := req.Stream(ctx) + + // If one pod or container is in a non-running state, we don't want to quit. + // Checking the response string avoids the risk of a race condition but + // obviously feels a bit brittle too. + // TODO: introspect error + if err != nil && strings.Contains(err.Error(), "is waiting to start") { + return &streamError{err: err, podName: p.Name, recoverable: true} + } else if err != nil { + return &streamError{err: err, podName: p.Name} + } + + defer func() { _ = logs.Close() }() + + scanner := bufio.NewScanner(logs) + for scanner.Scan() { + if _, err = dst.Write([]byte("[" + p.Name + "] " + scanner.Text() + nl)); err != nil { + return &streamError{err: fmt.Errorf("error writing logs: %v", err), podName: p.Name} + } + } + if err := scanner.Err(); err != nil { + return &streamError{err: fmt.Errorf("error scanning logs: %v", err), podName: p.Name} + } + return nil +} diff --git a/logs/watcher.go b/logs/watcher.go index 7726e0f..ae94e10 100644 --- a/logs/watcher.go +++ b/logs/watcher.go @@ -1,19 +1,17 @@ package logs import ( - "bufio" "context" "errors" - "fmt" "io" "log" - "strings" "sync" "time" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" ) @@ -23,18 +21,13 @@ type KubernetesClient struct { kubernetes.Interface } -// concurrentWriter implements io.Writer. +// concurrentWriter implements an io.Writer that can be safely written to from +// multiple goroutines. type concurrentWriter struct { w io.Writer mu sync.Mutex } -// NewConcurrentWriter returns an io.Writer which can be safely written to from -// multiple goroutines without further synchronization. -func NewConcurrentWriter(w io.Writer) io.Writer { - return &concurrentWriter{w: w} -} - // Write implements io.Writer. func (cw *concurrentWriter) Write(p []byte) (int, error) { cw.mu.Lock() @@ -43,161 +36,113 @@ func (cw *concurrentWriter) Write(p []byte) (int, error) { return cw.w.Write(p) } -const nl = "\n" - -type streamError struct { - err error - podName string - recoverable bool +type PodWatcherInterface interface { + WatchPods(ctx context.Context) error + Close() } -func (re *streamError) Error() string { - return re.err.Error() -} +// PodWatcherFunc builds a PodWatcher. +type PodWatcherFunc func(KubernetesClient, string, *metav1.LabelSelector, io.Writer) PodWatcherInterface // Watcher watches a deployment and tails the logs for its currently active // pods. type Watcher struct { - deployName string - container string - clientset KubernetesClient - spec map[string]*corev1.Pod - status map[string]bool - streamResults chan error - dst io.Writer + deployName string + container string + allowNonExistentDeployment bool + clientset KubernetesClient + deployment *appsv1.Deployment + podWatcher PodWatcherInterface + podWatcherFunc PodWatcherFunc + errChan chan error + dst *concurrentWriter } // NewWatcher creates a new Watcher. -func NewWatcher(deployName string, container string, clientset KubernetesClient, dst io.Writer) *Watcher { +func NewWatcher(deployName string, container string, allowNonExistentDeployment bool, clientset KubernetesClient, podWatcherFunc PodWatcherFunc, dst io.Writer) *Watcher { return &Watcher{ - deployName: deployName, - container: container, - clientset: clientset, - spec: make(map[string]*corev1.Pod), - status: make(map[string]bool), - streamResults: make(chan error), - dst: dst, + deployName: deployName, + container: container, + allowNonExistentDeployment: allowNonExistentDeployment, + clientset: clientset, + podWatcherFunc: podWatcherFunc, + errChan: make(chan error), + dst: &concurrentWriter{w: dst}, } } // Watch watches a deployment. func (w *Watcher) Watch(ctx context.Context) error { - dst := NewConcurrentWriter(w.dst) + deploymentsClient := w.clientset.AppsV1().Deployments(corev1.NamespaceDefault) + + // Check if the deployment exists before we commence watching, to allow us to + // return an error if needed. + _, err := deploymentsClient.Get(ctx, w.deployName, metav1.GetOptions{}) + var statusError *apierrors.StatusError + if errors.As(err, &statusError) && statusError.Status().Reason == metav1.StatusReasonNotFound { + if !w.allowNonExistentDeployment { + return err + } + log.Printf(`deployment "%s" does not exist, waiting`, w.deployName) + } + + opts := metav1.ListOptions{Watch: true, FieldSelector: "metadata.name=" + w.deployName} + deploymentsWatcher, err := deploymentsClient.Watch(ctx, opts) + if err != nil { + return err + } + defer deploymentsWatcher.Stop() ticker := time.NewTicker(time.Second) defer ticker.Stop() - deploymentsClient := w.clientset.AppsV1().Deployments(corev1.NamespaceDefault) - var opts metav1.GetOptions - deployment, err := deploymentsClient.Get(ctx, w.deployName, opts) - if err != nil { - return err - } - - podsClient := w.clientset.CoreV1().Pods(corev1.NamespaceDefault) - labelsMap, err := metav1.LabelSelectorAsMap(deployment.Spec.Selector) - if err != nil { - return err - } - watcher, err := podsClient.Watch(ctx, metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labelsMap).String()}) - if err != nil { - return err - } - defer watcher.Stop() - - // streamErrors is never closed. - streamErrors := make(chan error) - for { select { - case <-ctx.Done(): - return ctx.Err() - - case <-ticker.C: - // Iterate through the desired state (w.spec) and launch goroutines to - // process the logs of any missing pods. - for podName, pod := range w.spec { - pod := pod - if _, ok := w.status[podName]; !ok { - log.Printf("adding pod, name = %s", pod.Name) - w.status[pod.Name] = true - go func() { - if err := copyPodLogs(ctx, w.clientset, pod, w.container, dst); err != nil { - streamErrors <- err - } - }() - } - } - // For any pods which no longer exist, remove the pod. - // TODO: check this is needed when a pod's labels change to no longer - // match the deployment's selector. - for podName := range w.status { - if _, ok := w.spec[podName]; !ok { - w.removePod(podName) - } - } - - case err := <-streamErrors: - var streamErr *streamError - if errors.As(err, &streamErr) && streamErr.recoverable { - // if the error is recoverable, we just remove the pod from the status - // map. It will be recreated and retried on the next iteration. - w.removePod(streamErr.podName) - } else { - return streamErr - } - - case evt := <-watcher.ResultChan(): + case evt := <-deploymentsWatcher.ResultChan(): switch evt.Type { case watch.Added, watch.Modified: - pod := evt.Object.(*corev1.Pod) - log.Printf("event rcvd, type = %s, pod name = %s, phase = %s", evt.Type, pod.Name, pod.Status.Phase) - if pod.Status.Phase == corev1.PodRunning { - w.spec[pod.Name] = pod - } + deployment := evt.Object.(*appsv1.Deployment) + w.addDeployment(ctx, deployment) case watch.Deleted: - pod := evt.Object.(*corev1.Pod) - delete(w.spec, pod.Name) - log.Printf("event rcvd, type = DELETED, pod name = %s", pod.Name) + w.removeDeployment() } + case err := <-w.errChan: + return err + case <-ctx.Done(): + return ctx.Err() } } } -func copyPodLogs(ctx context.Context, clientset KubernetesClient, p *corev1.Pod, container string, dst io.Writer) error { - podLogOpts := corev1.PodLogOptions{ - Follow: true, - Container: container, - } - req := clientset.CoreV1().Pods(p.Namespace).GetLogs(p.Name, &podLogOpts) - logs, err := req.Stream(ctx) - - // If one pod or container is in a non-running state, we don't want to quit. - // Checking the response string avoids the risk of a race condition but - // obviously feels a bit brittle too. - if err != nil && strings.Contains(err.Error(), "is waiting to start") { - return &streamError{err: err, podName: p.Name, recoverable: true} - } else if err != nil { - return &streamError{err: err, podName: p.Name} +func (w *Watcher) addDeployment(ctx context.Context, deployment *appsv1.Deployment) { + if w.deployment != nil && w.deployment.UID == deployment.UID { + return } - defer func() { _ = logs.Close() }() + w.removeDeployment() - scanner := bufio.NewScanner(logs) - for scanner.Scan() { - if _, err = dst.Write([]byte("[" + p.Name + "] " + scanner.Text() + nl)); err != nil { - return &streamError{err: fmt.Errorf("error writing: %v", err), podName: p.Name} + log.Println("[DeploymentWatcher] add deployment") + + w.deployment = deployment + w.podWatcher = w.podWatcherFunc( + w.clientset, + w.container, + deployment.Spec.Selector, + w.dst, + ) + + go func() { + if err := w.podWatcher.WatchPods(ctx); err != nil { + w.errChan <- err } - } - - if err := scanner.Err(); err != nil { - return &streamError{err: fmt.Errorf("error scanning: %v", err), podName: p.Name} - } - - return nil + }() } -func (w *Watcher) removePod(podName string) { - log.Printf("removing pod, name = %s", podName) - delete(w.status, podName) +func (w *Watcher) removeDeployment() { + if w.podWatcher != nil { + log.Println("[DeploymentWatcher] remove deployment") + w.podWatcher.Close() + w.podWatcher = nil + } + w.deployment = nil } diff --git a/logs/watcher_test.go b/logs/watcher_test.go index 8a18748..d897471 100644 --- a/logs/watcher_test.go +++ b/logs/watcher_test.go @@ -3,6 +3,9 @@ package logs_test import ( "bytes" "context" + "errors" + "io" + "strings" "testing" "time" @@ -17,54 +20,98 @@ import ( k8stest "k8s.io/client-go/testing" ) -func TestWatcher(t *testing.T) { - clientset := testclient.NewSimpleClientset( - &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: "mydeployment", - Namespace: "default", - }, - Spec: appsv1.DeploymentSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"mylabelname": "mylabelvalue"}, - }, - }, - }, - ) +type mockPodWatcher struct{ err error } - k8swatcher := watch.NewFake() - clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(k8swatcher, nil)) - pods := []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, - Status: corev1.PodStatus{Phase: corev1.PodRunning}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, - Status: corev1.PodStatus{Phase: corev1.PodRunning}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, - Status: corev1.PodStatus{Phase: corev1.PodPending}, - }, +func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err } +func (m *mockPodWatcher) Close() {} + +func mockPodwatcherFunc(err error) logs.PodWatcherFunc { + return func(logs.KubernetesClient, string, *metav1.LabelSelector, io.Writer) logs.PodWatcherInterface { + return &mockPodWatcher{err: err} } +} - go func() { - defer k8swatcher.Stop() - for _, pod := range pods { - time.Sleep(time.Millisecond * 300) - k8swatcher.Add(pod) - } - }() - - client := logs.KubernetesClient{Interface: clientset} +func TestWatcherAllowNonExistent(t *testing.T) { + clientset := testclient.NewSimpleClientset() var buf bytes.Buffer - watcher := logs.NewWatcher("mydeployment", "mycontainer", client, &buf) + client := logs.KubernetesClient{Interface: clientset} + watcher := logs.NewWatcher("mydeployment", "mycontainer", false, client, mockPodwatcherFunc(nil), &buf) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - err := watcher.Watch(ctx) - require.EqualError(t, err, context.DeadlineExceeded.Error()) - assert.Equal(t, "[foo] fake logs\n[bar] fake logs\n", buf.String()) + err := watcher.Watch(context.Background()) + assert.EqualError(t, err, `deployments.apps "mydeployment" not found`) +} + +func TestWatcherPodWatcherError(t *testing.T) { + deploymentsWatcher := watch.NewFake() + clientset := testclient.NewSimpleClientset( + &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}}, + ) + clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) + + var buf bytes.Buffer + client := logs.KubernetesClient{Interface: clientset} + wantErr := errors.New("foo") + watcher := logs.NewWatcher("mydeployment", "mycontainer", false, client, mockPodwatcherFunc(wantErr), &buf) + + go func() { + defer deploymentsWatcher.Stop() + deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}} + deploymentsWatcher.Add(deployment) + }() + + err := watcher.Watch(context.Background()) + assert.EqualError(t, err, wantErr.Error()) +} + +func TestWatcherWithPodWatcher(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + deploymentsWatcher := watch.NewFake() + podsWatcher := watch.NewFake() + clientset := testclient.NewSimpleClientset() + clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) + clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) + + go func() { + defer podsWatcher.Stop() + defer deploymentsWatcher.Stop() + + deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}} + deploymentsWatcher.Add(deployment) + time.Sleep(time.Millisecond * 250) + + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, + Status: corev1.PodStatus{Phase: corev1.PodPending}, + }, + } + for _, pod := range pods { + podsWatcher.Add(pod) + time.Sleep(time.Millisecond * 250) + } + + cancel() + }() + + var buf bytes.Buffer + client := logs.KubernetesClient{Interface: clientset} + watcher := logs.NewWatcher("mydeployment", "mycontainer", true, client, logs.NewPodWatcher, &buf) + + err := watcher.Watch(ctx) + require.EqualError(t, err, context.Canceled.Error()) + assert.ElementsMatch(t, []string{"[foo] fake logs", "[bar] fake logs"}, splitBuf(&buf)) +} + +func splitBuf(buf *bytes.Buffer) []string { + return strings.Split(strings.TrimSpace(buf.String()), "\n") } diff --git a/main.go b/main.go index 8703921..0033229 100644 --- a/main.go +++ b/main.go @@ -15,10 +15,11 @@ import ( func main() { var ( - kubeconfig *string - container *string + kubeconfig *string + deployName *string + container *string + allowNonExistentDeployment *bool ) - var deployName *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { @@ -26,6 +27,7 @@ func main() { } deployName = flag.String("deployment", "", "name of a deployment to monitor") container = flag.String("container", "", "name of a specific container") + allowNonExistentDeployment = flag.Bool("allow-nonexistent", true, "allow deployment to be non-existent on launch") flag.Parse() config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) @@ -42,7 +44,9 @@ func main() { watcher := logs.NewWatcher( *deployName, *container, + *allowNonExistentDeployment, logs.KubernetesClient{Interface: clientset}, + logs.NewPodWatcher, os.Stdout, ) if err := watcher.Watch(ctx); err != nil {