From ca822496b0344c8150a0a64f8a28b8d4ee70a407 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Thu, 9 Jun 2022 18:27:17 +0200 Subject: [PATCH] Support other resource types Use the untyped k8s client to enable support for statefulsets and replicasets as well as deployments. The typed client is retained for usage inside pod_watcher.go. --- logs/helpers.go | 18 ++++++ logs/helpers_test.go | 68 ++++++++++++++++++++++ logs/pod_watcher.go | 48 ++++++++-------- logs/pod_watcher_test.go | 102 ++++++++++++++++----------------- logs/watcher.go | 120 ++++++++++++++++++++++++++------------- logs/watcher_test.go | 92 +++++++++++++++++------------- main.go | 45 +++++++++++---- 7 files changed, 326 insertions(+), 167 deletions(-) create mode 100644 logs/helpers.go create mode 100644 logs/helpers_test.go diff --git a/logs/helpers.go b/logs/helpers.go new file mode 100644 index 0000000..a8ecf74 --- /dev/null +++ b/logs/helpers.go @@ -0,0 +1,18 @@ +package logs + +import "fmt" + +// ParseType returns an API resource type (pluralized) from a singular or +// shortened name. If the resource is unsupported, an error will be returned. +func ParseType(input string) (string, error) { + switch input { + case "deploy", "deployment", "deployments": + return "deployments", nil + case "sts", "statefulset", "statefulsets": + return "statefulsets", nil + case "rs", "replicaset", "replicasets": + return "replicasets", nil + default: + return "", fmt.Errorf(`unsupported resource: "%s". Supported resources are [deployment, statefulset, replicaset]`, input) + } +} diff --git a/logs/helpers_test.go b/logs/helpers_test.go new file mode 100644 index 0000000..673ac8f --- /dev/null +++ b/logs/helpers_test.go @@ -0,0 +1,68 @@ +package logs_test + +import ( + "testing" + + "git.netflux.io/rob/kubectl-persistent-logger/logs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseKind(t *testing.T) { + testCases := []struct { + input string + wantOut string + wantErr string + }{ + { + input: "deploy", + wantOut: "deployments", + }, + { + input: "deployment", + wantOut: "deployments", + }, + { + input: "deployments", + wantOut: "deployments", + }, + { + input: "sts", + wantOut: "statefulsets", + }, + { + input: "statefulset", + wantOut: "statefulsets", + }, + { + input: "statefulsets", + wantOut: "statefulsets", + }, + { + input: "rs", + wantOut: "replicasets", + }, + { + input: "replicaset", + wantOut: "replicasets", + }, + { + input: "replicasets", + wantOut: "replicasets", + }, + { + input: "foo", + wantErr: `unsupported resource: "foo". Supported resources are [deployment, statefulset, replicaset]`, + }, + } + + for _, tc := range testCases { + t.Run(tc.input, func(t *testing.T) { + result, err := logs.ParseType(tc.input) + if tc.wantErr != "" { + require.EqualError(t, err, tc.wantErr) + } + assert.Equal(t, tc.wantOut, result) + }) + } +} diff --git a/logs/pod_watcher.go b/logs/pod_watcher.go index 2fde12a..5e94c91 100644 --- a/logs/pod_watcher.go +++ b/logs/pod_watcher.go @@ -40,9 +40,9 @@ func newRecoverableError(err error, podName string) *streamError { // PodWatcher consumes and merges the logs for a specified set of pods. type PodWatcher struct { - clientset KubernetesClient + client KubernetesClient container string - labelSelector *metav1.LabelSelector + labelSelector labels.Selector spec map[string]*corev1.Pod status map[string]bool streamResults chan error @@ -51,15 +51,16 @@ type PodWatcher struct { } // NewPodWatcher initializes a new PodWatcher. -func NewPodWatcher(clientset KubernetesClient, container string, labelSelector *metav1.LabelSelector, dst io.Writer) PodWatcherInterface { +func NewPodWatcher(client KubernetesClient, container string, labelSelector labels.Selector, dst io.Writer) PodWatcherInterface { return &PodWatcher{ - clientset: clientset, + client: client, container: container, labelSelector: labelSelector, spec: make(map[string]*corev1.Pod), status: make(map[string]bool), streamResults: make(chan error), dst: dst, + closeChan: make(chan struct{}), } } @@ -77,25 +78,25 @@ func (pw *PodWatcher) WatchPods(ctx context.Context) error { return err } -// Close terminates the watcher, waiting for all logs to be consumed before -// exiting. +const tickerInterval = time.Millisecond * 250 + +// Close terminates the PodWatcher. func (pw *PodWatcher) Close() { pw.closeChan <- struct{}{} } func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { - podsClient := pw.clientset.CoreV1().Pods(corev1.NamespaceDefault) - labelsMap, err := metav1.LabelSelectorAsMap(pw.labelSelector) - if err != nil { - return err - } - watcher, err := podsClient.Watch(ctx, metav1.ListOptions{LabelSelector: labels.SelectorFromSet(labelsMap).String()}) - if err != nil { - return err - } - defer watcher.Stop() + podsClient := pw.client.Typed.CoreV1().Pods(corev1.NamespaceDefault) - ticker := time.NewTicker(time.Second) + // Returns a watcher which notifies of changes in the relevant pods. + // We don't defer Stop() on the returned value because the sender is the + // Kubernetes SDK, and that may introduce a send-on-closed-channel panic. + watcher, err := podsClient.Watch(ctx, metav1.ListOptions{LabelSelector: pw.labelSelector.String()}) + if err != nil { + return err + } + + ticker := time.NewTicker(tickerInterval) defer ticker.Stop() // streamErrors is never closed. @@ -120,15 +121,14 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { pw.status[pod.Name] = true wg.Add(1) go func() { - if err := copyPodLogs(ctx, wg, pw.clientset, pod, pw.container, pw.dst); err != nil { + if err := copyPodLogs(ctx, wg, pw.client, pod, pw.container, pw.dst); err != nil { streamErrors <- err } }() } } // For any pods which no longer exist, remove the pod. - // TODO: check this is needed when a pod's labels change to no longer - // match the deployment's selector. + // TODO: stop the log streaming. for podName := range pw.status { if _, ok := pw.spec[podName]; !ok { pw.removePod(podName) @@ -146,8 +146,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { case evt, ok := <-resultChan: if !ok { - resultChan = nil - continue + return nil } switch evt.Type { case watch.Added, watch.Modified: @@ -168,14 +167,15 @@ func (pw *PodWatcher) removePod(podName string) { delete(pw.status, podName) } -func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, clientset KubernetesClient, pod *corev1.Pod, container string, dst io.Writer) *streamError { +func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, client KubernetesClient, pod *corev1.Pod, container string, dst io.Writer) *streamError { defer wg.Done() podLogOpts := corev1.PodLogOptions{ Follow: true, Container: container, } - req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts) + + req := client.Typed.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts) logs, err := req.Stream(ctx) // If one container is still being created, do not treat this as a fatal error. diff --git a/logs/pod_watcher_test.go b/logs/pod_watcher_test.go index ee5e834..3ed9ab7 100644 --- a/logs/pod_watcher_test.go +++ b/logs/pod_watcher_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "log" "net/http" "strings" "testing" @@ -17,6 +18,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" testclient "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" @@ -60,10 +62,7 @@ func (m *mockClientset) GetLogs(podName string, _ *corev1.PodLogOptions) *rest.R return fakeClient.Request() } -func TestPodWatcherClosedWatcher(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - +func TestPodWatcherClose(t *testing.T) { podsWatcher := watch.NewFake() clientset := mockClientset{ @@ -73,29 +72,29 @@ func TestPodWatcherClosedWatcher(t *testing.T) { } clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) - client := logs.KubernetesClient{Interface: &clientset} - selector := metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}} - - go func() { - defer podsWatcher.Stop() - podsWatcher.Add(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, - Status: corev1.PodStatus{Phase: corev1.PodRunning}, - }) - }() + client := logs.KubernetesClient{Typed: &clientset} + selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) var buf bytes.Buffer - pw := logs.NewPodWatcher(client, "mycontainer", &selector, &buf) + pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf) + + go func() { + podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) + time.Sleep(time.Second) + // Close() should cause the watcher to return cleanly: + pw.Close() + }() + + err := pw.WatchPods(context.Background()) + require.NoError(t, err) - // WatchPods should wait for the logs and return cleanly. - err := pw.WatchPods(ctx) - require.Equal(t, context.DeadlineExceeded, err) assert.Equal(t, "[foo] it worked\n", buf.String()) } func TestPodWatcher(t *testing.T) { testCases := []struct { name string + podEvents []*corev1.Pod getLogsRespBody string getLogsStatusCode int getLogsErr error @@ -103,33 +102,39 @@ func TestPodWatcher(t *testing.T) { wantErr string }{ { - name: "unexpected error getting logs", + name: "unexpected error getting logs", + podEvents: []*corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, + }, getLogsErr: errors.New("nope"), wantOut: nil, wantErr: "nope", }, { - name: "recoverable error getting logs", + name: "recoverable error getting logs", + podEvents: []*corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, + }, getLogsErr: &apierrors.StatusError{ErrStatus: metav1.Status{Message: "is waiting to start: ContainerCreating", Reason: metav1.StatusReasonBadRequest}}, wantOut: nil, - wantErr: context.DeadlineExceeded.Error(), }, { - name: "success", + name: "success", + podEvents: []*corev1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodPending}}, + }, getLogsRespBody: "some logs", getLogsStatusCode: http.StatusOK, wantOut: []string{"[foo] some logs", "[bar] some logs"}, - wantErr: context.DeadlineExceeded.Error(), }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - podsWatcher := watch.NewFake() - defer podsWatcher.Stop() + defer log.Println("exiting test func") clientset := mockClientset{ getLogsRespBody: tc.getLogsRespBody, @@ -139,42 +144,33 @@ func TestPodWatcher(t *testing.T) { } clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) - client := logs.KubernetesClient{Interface: &clientset} - selector := metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}} - - go func() { - pods := []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, - Status: corev1.PodStatus{Phase: corev1.PodRunning}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, - Status: corev1.PodStatus{Phase: corev1.PodRunning}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, - Status: corev1.PodStatus{Phase: corev1.PodPending}, - }, - } - for _, pod := range pods { - podsWatcher.Add(pod) - time.Sleep(time.Millisecond * 250) - } - }() + client := logs.KubernetesClient{Typed: &clientset} + selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) var buf bytes.Buffer - pw := logs.NewPodWatcher(client, "mycontainer", &selector, &buf) + pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf) - err := pw.WatchPods(ctx) + go func() { + for _, pod := range tc.podEvents { + podsWatcher.Add(pod) + time.Sleep(time.Millisecond * 500) + } + pw.Close() + }() + + err := pw.WatchPods(context.Background()) if tc.wantErr == "" { require.NoError(t, err) } else { + require.Error(t, err) require.Contains(t, err.Error(), tc.wantErr) } + if tc.wantOut != nil { - assert.ElementsMatch(t, tc.wantOut, bufToLines(&buf)) + lines := bufToLines(&buf) + require.Len(t, lines, len(tc.wantOut)) + assert.ElementsMatch(t, tc.wantOut, lines) } }) } diff --git a/logs/watcher.go b/logs/watcher.go index 80c5058..1f2744b 100644 --- a/logs/watcher.go +++ b/logs/watcher.go @@ -8,17 +8,23 @@ import ( "sync" "time" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" ) -// KubernetesClient wraps a Kubernetes clientset. +// KubernetesClient provides both typed and untyped interfaces to the +// Kubernetes API. type KubernetesClient struct { - kubernetes.Interface + Typed kubernetes.Interface + Untyped dynamic.Interface } // concurrentWriter implements an io.Writer that can be safely written to from @@ -42,16 +48,24 @@ type PodWatcherInterface interface { } // PodWatcherFunc builds a PodWatcher. -type PodWatcherFunc func(KubernetesClient, string, *metav1.LabelSelector, io.Writer) PodWatcherInterface +type PodWatcherFunc func(KubernetesClient, string, labels.Selector, io.Writer) PodWatcherInterface + +// WatcherParams defines the input parameters of a Watcher. +type WatcherParams struct { + Name string + Type string + Namespace string + Container string + StrictExist bool +} // Watcher watches a deployment and tails the logs for its currently active // pods. type Watcher struct { - deployName string - container string - strictExist bool - clientset KubernetesClient - deployment *appsv1.Deployment + params WatcherParams + client KubernetesClient + resourceUID types.UID + podSelector labels.Selector podWatcher PodWatcherInterface podWatcherFunc PodWatcherFunc errChan chan error @@ -59,12 +73,10 @@ type Watcher struct { } // NewWatcher creates a new Watcher. -func NewWatcher(deployName string, container string, strictExist bool, clientset KubernetesClient, podWatcherFunc PodWatcherFunc, dst io.Writer) *Watcher { +func NewWatcher(params WatcherParams, client KubernetesClient, podWatcherFunc PodWatcherFunc, dst io.Writer) *Watcher { return &Watcher{ - deployName: deployName, - container: container, - strictExist: strictExist, - clientset: clientset, + params: params, + client: client, podWatcherFunc: podWatcherFunc, errChan: make(chan error), dst: &concurrentWriter{w: dst}, @@ -73,30 +85,35 @@ func NewWatcher(deployName string, container string, strictExist bool, clientset // Watch watches a deployment. func (w *Watcher) Watch(ctx context.Context) error { - deploymentsClient := w.clientset.AppsV1().Deployments(corev1.NamespaceDefault) - - // Check if the deployment exists before we commence watching, to allow us to - // return an error if needed. - _, err := deploymentsClient.Get(ctx, w.deployName, metav1.GetOptions{}) - var statusErr *apierrors.StatusError - if errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonNotFound { - if w.strictExist { - return err - } - log.Printf(`deployment "%s" does not exist, waiting`, w.deployName) + ns := w.params.Namespace + if ns == "" { + ns = corev1.NamespaceDefault } - opts := metav1.ListOptions{Watch: true, FieldSelector: "metadata.name=" + w.deployName} - deploymentsWatcher, err := deploymentsClient.Watch(ctx, opts) + // Supported resource types are deployments, statefulsets and replicasets + // (all apps/v1). + resourceID := schema.GroupVersionResource{ + Resource: w.params.Type, + Group: "apps", + Version: "v1", + } + + if err := w.checkResourceExists(ctx, ns, resourceID); err != nil { + return err + } + + opts := metav1.ListOptions{Watch: true, FieldSelector: "metadata.name=" + w.params.Name} + watcher, err := w.client.Untyped.Resource(resourceID).Namespace(ns).Watch(ctx, opts) if err != nil { return err } - defer deploymentsWatcher.Stop() + + defer watcher.Stop() ticker := time.NewTicker(time.Second) defer ticker.Stop() - resultChan := deploymentsWatcher.ResultChan() + resultChan := watcher.ResultChan() for { select { case evt, ok := <-resultChan: @@ -106,8 +123,19 @@ func (w *Watcher) Watch(ctx context.Context) error { } switch evt.Type { case watch.Added, watch.Modified: - deployment := evt.Object.(*appsv1.Deployment) - w.addDeployment(ctx, deployment) + resource := evt.Object.(*unstructured.Unstructured) + uid := resource.GetUID() + // TODO: handle matchExpressions + selectorAsMap, ok, err := unstructured.NestedStringMap(resource.Object, "spec", "selector", "matchLabels") + if !ok || err != nil { + // matchLabels don't exist or cannot be parsed. + // Should this be fatal? + log.Printf("warning: unable to parse matchLabels: ok = %t, err = %v", ok, err) + continue + } + selector := labels.SelectorFromSet(selectorAsMap) + w.addDeployment(ctx, uid, selector) + case watch.Deleted: w.removeDeployment() } @@ -119,20 +147,31 @@ func (w *Watcher) Watch(ctx context.Context) error { } } -func (w *Watcher) addDeployment(ctx context.Context, deployment *appsv1.Deployment) { - if w.deployment != nil && w.deployment.UID == deployment.UID { +func (w *Watcher) checkResourceExists(ctx context.Context, namespace string, resourceID schema.GroupVersionResource) error { + _, err := w.client.Untyped.Resource(resourceID).Namespace(namespace).Get(ctx, w.params.Name, metav1.GetOptions{}) + var statusErr *apierrors.StatusError + if !w.params.StrictExist && errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonNotFound { + log.Printf(`%s "%s" does not exist, waiting`, resourceID.Resource, w.params.Name) + return nil + } + return err +} + +func (w *Watcher) addDeployment(ctx context.Context, resourceUID types.UID, podSelector labels.Selector) { + if w.resourceUID == resourceUID { return } w.removeDeployment() - log.Println("[DeploymentWatcher] add deployment") + log.Println("[DeploymentWatcher] add podWatcher") - w.deployment = deployment + w.resourceUID = resourceUID + w.podSelector = podSelector w.podWatcher = w.podWatcherFunc( - w.clientset, - w.container, - deployment.Spec.Selector, + w.client, + w.params.Container, + w.podSelector, w.dst, ) @@ -145,9 +184,10 @@ func (w *Watcher) addDeployment(ctx context.Context, deployment *appsv1.Deployme func (w *Watcher) removeDeployment() { if w.podWatcher != nil { - log.Println("[DeploymentWatcher] remove deployment") + log.Println("[DeploymentWatcher] remove podWatcher") w.podWatcher.Close() w.podWatcher = nil } - w.deployment = nil + w.resourceUID = "" + w.podSelector = nil } diff --git a/logs/watcher_test.go b/logs/watcher_test.go index 6e24a87..8ef7feb 100644 --- a/logs/watcher_test.go +++ b/logs/watcher_test.go @@ -12,10 +12,14 @@ import ( "git.netflux.io/rob/kubectl-persistent-logger/logs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + dynamicclient "k8s.io/client-go/dynamic/fake" testclient "k8s.io/client-go/kubernetes/fake" k8stest "k8s.io/client-go/testing" ) @@ -26,17 +30,28 @@ func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err } func (m *mockPodWatcher) Close() {} func mockPodwatcherFunc(err error) logs.PodWatcherFunc { - return func(logs.KubernetesClient, string, *metav1.LabelSelector, io.Writer) logs.PodWatcherInterface { + return func(logs.KubernetesClient, string, labels.Selector, io.Writer) logs.PodWatcherInterface { return &mockPodWatcher{err: err} } } -func TestWatcherAllowNonExistent(t *testing.T) { - clientset := testclient.NewSimpleClientset() +func buildDeployment(t *testing.T, name string) *unstructured.Unstructured { + deployment := new(unstructured.Unstructured) + deployment.SetAPIVersion("v1") + deployment.SetKind("deployment") + deployment.SetName("mydeployment") + deployment.SetNamespace("default") + deployment.SetUID(types.UID("foo")) + require.NoError(t, unstructured.SetNestedField(deployment.Object, map[string]any{"app": "myapp"}, "spec", "selector", "matchLabels")) + return deployment +} + +func TestWatcherStrictExist(t *testing.T) { + client := logs.KubernetesClient{Untyped: dynamicclient.NewSimpleDynamicClient(runtime.NewScheme())} var buf bytes.Buffer - client := logs.KubernetesClient{Interface: clientset} - watcher := logs.NewWatcher("mydeployment", "mycontainer", true, client, mockPodwatcherFunc(nil), &buf) + params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default", StrictExist: true} + watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(nil), &buf) err := watcher.Watch(context.Background()) assert.EqualError(t, err, `deployments.apps "mydeployment" not found`) @@ -44,19 +59,20 @@ func TestWatcherAllowNonExistent(t *testing.T) { func TestWatcherPodWatcherError(t *testing.T) { deploymentsWatcher := watch.NewFake() - clientset := testclient.NewSimpleClientset( - &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}}, - ) - clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) + + untypedClient := dynamicclient.NewSimpleDynamicClient(runtime.NewScheme()) + untypedClient.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) + client := logs.KubernetesClient{Untyped: untypedClient} var buf bytes.Buffer - client := logs.KubernetesClient{Interface: clientset} wantErr := errors.New("foo") - watcher := logs.NewWatcher("mydeployment", "mycontainer", true, client, mockPodwatcherFunc(wantErr), &buf) + params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"} + watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(wantErr), &buf) go func() { defer deploymentsWatcher.Stop() - deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}} + + deployment := buildDeployment(t, "mydeployment") deploymentsWatcher.Add(deployment) }() @@ -66,15 +82,17 @@ func TestWatcherPodWatcherError(t *testing.T) { func TestWatcherClosedChannel(t *testing.T) { deploymentsWatcher := watch.NewFake() - clientset := testclient.NewSimpleClientset( - &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}}, - ) - clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) + + untypedClient := dynamicclient.NewSimpleDynamicClient(runtime.NewScheme()) + untypedClient.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) + client := logs.KubernetesClient{Untyped: untypedClient} var buf bytes.Buffer - client := logs.KubernetesClient{Interface: clientset} - watcher := logs.NewWatcher("mydeployment", "mycontainer", true, client, nil, &buf) - go deploymentsWatcher.Stop() + params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"} + watcher := logs.NewWatcher(params, client, nil, &buf) + // Immediately stop the watcher, which closes the ResultChan. + // This should be expected to be handled. + deploymentsWatcher.Stop() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) defer cancel() @@ -90,31 +108,25 @@ func TestWatcherWithPodWatcher(t *testing.T) { deploymentsWatcher := watch.NewFake() defer deploymentsWatcher.Stop() + podsWatcher := watch.NewFake() defer podsWatcher.Stop() - clientset := testclient.NewSimpleClientset() - clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) - clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) + typedClient := testclient.NewSimpleClientset() + typedClient.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) + untypedClient := dynamicclient.NewSimpleDynamicClient(runtime.NewScheme()) + untypedClient.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) + client := logs.KubernetesClient{Typed: typedClient, Untyped: untypedClient} go func() { - deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}} + deployment := buildDeployment(t, "mydeployment") deploymentsWatcher.Add(deployment) time.Sleep(time.Millisecond * 250) pods := []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, - Status: corev1.PodStatus{Phase: corev1.PodRunning}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, - Status: corev1.PodStatus{Phase: corev1.PodRunning}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, - Status: corev1.PodStatus{Phase: corev1.PodPending}, - }, + {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, + {ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodPending}}, } for _, pod := range pods { podsWatcher.Add(pod) @@ -123,14 +135,14 @@ func TestWatcherWithPodWatcher(t *testing.T) { }() var buf bytes.Buffer - client := logs.KubernetesClient{Interface: clientset} - watcher := logs.NewWatcher("mydeployment", "mycontainer", false, client, logs.NewPodWatcher, &buf) + params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"} + watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &buf) err := watcher.Watch(ctx) require.EqualError(t, err, context.DeadlineExceeded.Error()) lines := bufToLines(&buf) - assert.Len(t, lines, 2) - assert.ElementsMatch(t, []string{"[foo] fake logs", "[bar] fake logs"}, bufToLines(&buf)) + require.Len(t, lines, 2) + assert.ElementsMatch(t, []string{"[foo] fake logs", "[bar] fake logs"}, lines) } func bufToLines(buf *bytes.Buffer) []string { diff --git a/main.go b/main.go index 31fab65..a0017e3 100644 --- a/main.go +++ b/main.go @@ -7,21 +7,43 @@ import ( "os" "git.netflux.io/rob/kubectl-persistent-logger/logs" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" clientconfig "sigs.k8s.io/controller-runtime/pkg/client/config" ) func main() { var ( - deployName *string - container *string - strictExist *bool + container string + strictExist bool ) - deployName = flag.String("deployment", "", "name of a deployment to monitor") - container = flag.String("container", "", "name of a specific container") - strictExist = flag.Bool("strict-exist", false, "require deployment to exist on launch") + + flag.StringVar(&container, "container", "", "name of a specific container") + flag.BoolVar(&strictExist, "strict", false, "require deployment to exist on launch") flag.Parse() + var params logs.WatcherParams + + switch len(flag.Args()) { + case 1: + // TODO: handle type/name style + flag.Usage() + os.Exit(1) + case 2: + kind, err := logs.ParseType(flag.Arg(0)) + if err != nil { + log.Fatal(err) + } + params = logs.WatcherParams{ + Type: kind, + Name: flag.Arg(1), + Namespace: "default", + } + default: + flag.Usage() + os.Exit(1) + } + cfg, err := clientconfig.GetConfig() if err != nil { log.Fatal(err) @@ -32,12 +54,15 @@ func main() { log.Fatal(err) } + dynamicclient, err := dynamic.NewForConfig(cfg) + if err != nil { + log.Fatal(err) + } + ctx := context.Background() watcher := logs.NewWatcher( - *deployName, - *container, - *strictExist, - logs.KubernetesClient{Interface: clientset}, + params, + logs.KubernetesClient{Typed: clientset, Untyped: dynamicclient}, logs.NewPodWatcher, os.Stdout, )