From d5835821c5c030c060460b8e17f6f4a9d22f946b Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Thu, 2 Jun 2022 19:23:47 +0200 Subject: [PATCH] podWatcher: add test coverage --- logs/pod_watcher_test.go | 148 +++++++++++++++++++++++++++++++++++++++ logs/watcher_test.go | 8 +-- 2 files changed, 152 insertions(+), 4 deletions(-) create mode 100644 logs/pod_watcher_test.go diff --git a/logs/pod_watcher_test.go b/logs/pod_watcher_test.go new file mode 100644 index 0000000..b4313c4 --- /dev/null +++ b/logs/pod_watcher_test.go @@ -0,0 +1,148 @@ +package logs_test + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "net/http" + "strings" + "testing" + "time" + + "git.netflux.io/rob/kubectl-persistent-logger/logs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + testclient "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + clientv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + fakerest "k8s.io/client-go/rest/fake" + k8stest "k8s.io/client-go/testing" +) + +type mockClientset struct { + testclient.Clientset + clientv1.CoreV1Interface + clientv1.PodInterface + + podsWatcher watch.Interface + getLogsStatusCode int + getLogsRespBody string + getLogsErr error +} + +func (m *mockClientset) CoreV1() clientv1.CoreV1Interface { return m } +func (m *mockClientset) Pods(string) clientv1.PodInterface { return m } +func (m *mockClientset) Watch(context.Context, metav1.ListOptions) (watch.Interface, error) { + return m.podsWatcher, nil +} + +func (m *mockClientset) GetLogs(podName string, _ *corev1.PodLogOptions) *rest.Request { + fakeClient := fakerest.RESTClient{ + Client: fakerest.CreateHTTPClient(func(*http.Request) (*http.Response, error) { + if m.getLogsErr != nil { + return nil, m.getLogsErr + } + return &http.Response{ + StatusCode: m.getLogsStatusCode, + Body: io.NopCloser(strings.NewReader(m.getLogsRespBody)), + }, nil + }), + NegotiatedSerializer: scheme.Codecs.WithoutConversion(), + VersionedAPIPath: fmt.Sprintf("/api/v1/namespaces/default/pods/%s/log", podName), + } + return fakeClient.Request() +} + +func TestPodWatcher(t *testing.T) { + testCases := []struct { + name string + getLogsRespBody string + getLogsStatusCode int + getLogsErr error + wantOut []string + wantErr string + }{ + { + name: "unexpected error getting logs", + getLogsErr: errors.New("nope"), + wantOut: nil, + wantErr: "nope", + }, + { + name: "recoverable error getting logs", + getLogsErr: &apierrors.StatusError{ErrStatus: metav1.Status{Message: "is waiting to start: ContainerCreating", Reason: metav1.StatusReasonBadRequest}}, + wantOut: nil, + wantErr: context.DeadlineExceeded.Error(), + }, + { + name: "success", + getLogsRespBody: "some logs", + getLogsStatusCode: http.StatusOK, + wantOut: []string{"[foo] some logs", "[bar] some logs"}, + wantErr: context.DeadlineExceeded.Error(), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + podsWatcher := watch.NewFake() + defer podsWatcher.Stop() + + clientset := mockClientset{ + getLogsRespBody: tc.getLogsRespBody, + getLogsStatusCode: tc.getLogsStatusCode, + getLogsErr: tc.getLogsErr, + podsWatcher: podsWatcher, + } + clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) + + client := logs.KubernetesClient{Interface: &clientset} + selector := metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}} + + go func() { + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, + Status: corev1.PodStatus{Phase: corev1.PodPending}, + }, + } + for _, pod := range pods { + podsWatcher.Add(pod) + time.Sleep(time.Millisecond * 250) + } + }() + + var buf bytes.Buffer + pw := logs.NewPodWatcher(client, "mycontainer", &selector, &buf) + + err := pw.WatchPods(ctx) + + if tc.wantErr == "" { + require.NoError(t, err) + } else { + require.Contains(t, err.Error(), tc.wantErr) + } + if tc.wantOut != nil { + assert.ElementsMatch(t, tc.wantOut, splitBuf(&buf)) + } + }) + } +} diff --git a/logs/watcher_test.go b/logs/watcher_test.go index 14852bf..1471d5f 100644 --- a/logs/watcher_test.go +++ b/logs/watcher_test.go @@ -68,14 +68,16 @@ func TestWatcherWithPodWatcher(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) deploymentsWatcher := watch.NewFake() + defer deploymentsWatcher.Stop() podsWatcher := watch.NewFake() + defer podsWatcher.Stop() + clientset := testclient.NewSimpleClientset() clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) go func() { - defer podsWatcher.Stop() - defer deploymentsWatcher.Stop() + defer cancel() deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}} deploymentsWatcher.Add(deployment) @@ -99,8 +101,6 @@ func TestWatcherWithPodWatcher(t *testing.T) { podsWatcher.Add(pod) time.Sleep(time.Millisecond * 250) } - - cancel() }() var buf bytes.Buffer