package logs_test import ( "bytes" "context" "errors" "fmt" "io" "net/http" "strings" "testing" "time" "git.netflux.io/rob/kubectl-persistent-logger/logs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" testclient "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" clientv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" fakerest "k8s.io/client-go/rest/fake" k8stest "k8s.io/client-go/testing" ) type mockClientset struct { testclient.Clientset clientv1.CoreV1Interface clientv1.PodInterface podsWatcher watch.Interface getLogsStatusCode int getLogsReaderFunc func() io.ReadCloser getLogsErr error } func (m *mockClientset) CoreV1() clientv1.CoreV1Interface { return m } func (m *mockClientset) Pods(string) clientv1.PodInterface { return m } func (m *mockClientset) Watch(context.Context, metav1.ListOptions) (watch.Interface, error) { return m.podsWatcher, nil } func (m *mockClientset) GetLogs(podName string, _ *corev1.PodLogOptions) *rest.Request { fakeClient := fakerest.RESTClient{ Client: fakerest.CreateHTTPClient(func(*http.Request) (*http.Response, error) { if m.getLogsErr != nil { return nil, m.getLogsErr } return &http.Response{ StatusCode: m.getLogsStatusCode, Body: m.getLogsReaderFunc(), }, nil }), NegotiatedSerializer: scheme.Codecs.WithoutConversion(), VersionedAPIPath: fmt.Sprintf("/api/v1/namespaces/default/pods/%s/log", podName), } return fakeClient.Request() } func TestPodWatcherClose(t *testing.T) { podsWatcher := watch.NewFake() clientset := mockClientset{ getLogsReaderFunc: func() io.ReadCloser { return io.NopCloser(strings.NewReader("it worked")) }, getLogsStatusCode: http.StatusOK, podsWatcher: podsWatcher, } clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) client := logs.KubernetesClient{Typed: &clientset} selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) var buf bytes.Buffer pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger()) go func() { podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) time.Sleep(time.Second) // Close() should cause the watcher to return cleanly: pw.Close() }() err := pw.WatchPods(context.Background()) require.NoError(t, err) assert.Equal(t, "[foo] it worked\n", buf.String()) } // makeReadCloserIterator returns a function which iterates through each // provided ReadCloser. func makeReadCloserIterator(rcs ...io.ReadCloser) func() io.ReadCloser { return func() (rc io.ReadCloser) { rc, rcs = rcs[0], rcs[1:] return } } func TestPodWatcherRemovedPod(t *testing.T) { podsWatcher := watch.NewFake() r1, w1 := io.Pipe() r2, w2 := io.Pipe() clientset := mockClientset{ getLogsReaderFunc: makeReadCloserIterator(r1, r2), getLogsStatusCode: http.StatusOK, podsWatcher: podsWatcher, } clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) client := logs.KubernetesClient{Typed: &clientset} selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) var buf bytes.Buffer pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger()) go func() { defer pw.Close() podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) time.Sleep(time.Millisecond * 500) w1.Write([]byte("should be logged\n")) podsWatcher.Delete(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) time.Sleep(time.Millisecond * 500) w1.Write([]byte("should not be logged\n")) time.Sleep(time.Millisecond * 500) podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) time.Sleep(time.Millisecond * 500) w2.Write([]byte("should be logged\n")) time.Sleep(time.Millisecond * 500) }() err := pw.WatchPods(context.Background()) require.NoError(t, err) assert.Equal(t, "[foo] should be logged\n[bar] should be logged\n", buf.String()) } func TestPodWatcher(t *testing.T) { testCases := []struct { name string podEvents []*corev1.Pod getLogsRespBody string getLogsStatusCode int getLogsErr error wantOut []string wantErr string }{ { name: "unexpected error getting logs", podEvents: []*corev1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, }, getLogsErr: errors.New("nope"), wantOut: nil, wantErr: "nope", }, { name: "recoverable error getting logs", podEvents: []*corev1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, }, getLogsErr: &apierrors.StatusError{ErrStatus: metav1.Status{Message: "is waiting to start: ContainerCreating", Reason: metav1.StatusReasonBadRequest}}, wantOut: nil, }, { name: "success", podEvents: []*corev1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, {ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, {ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodPending}}, }, getLogsRespBody: "some logs", getLogsStatusCode: http.StatusOK, wantOut: []string{"[foo] some logs", "[bar] some logs"}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { podsWatcher := watch.NewFake() clientset := mockClientset{ getLogsReaderFunc: func() io.ReadCloser { return io.NopCloser(strings.NewReader(tc.getLogsRespBody)) }, getLogsStatusCode: tc.getLogsStatusCode, getLogsErr: tc.getLogsErr, podsWatcher: podsWatcher, } clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) client := logs.KubernetesClient{Typed: &clientset} selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) var buf bytes.Buffer pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger()) go func() { for _, pod := range tc.podEvents { podsWatcher.Add(pod) time.Sleep(time.Millisecond * 500) } pw.Close() }() err := pw.WatchPods(context.Background()) if tc.wantErr == "" { require.NoError(t, err) } else { require.Error(t, err) require.Contains(t, err.Error(), tc.wantErr) } if tc.wantOut != nil { lines := bufToLines(&buf) require.Len(t, lines, len(tc.wantOut)) assert.ElementsMatch(t, tc.wantOut, lines) } }) } }