package logs_test import ( "bytes" "context" "errors" "fmt" "io" "net/http" "strings" "testing" "time" "git.netflux.io/rob/kubectl-persistent-logger/logs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" testclient "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" clientv1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" fakerest "k8s.io/client-go/rest/fake" k8stest "k8s.io/client-go/testing" ) type mockClientset struct { testclient.Clientset clientv1.CoreV1Interface clientv1.PodInterface podsWatcher watch.Interface getLogsStatusCode int getLogsRespBody string getLogsErr error } func (m *mockClientset) CoreV1() clientv1.CoreV1Interface { return m } func (m *mockClientset) Pods(string) clientv1.PodInterface { return m } func (m *mockClientset) Watch(context.Context, metav1.ListOptions) (watch.Interface, error) { return m.podsWatcher, nil } func (m *mockClientset) GetLogs(podName string, _ *corev1.PodLogOptions) *rest.Request { fakeClient := fakerest.RESTClient{ Client: fakerest.CreateHTTPClient(func(*http.Request) (*http.Response, error) { if m.getLogsErr != nil { return nil, m.getLogsErr } return &http.Response{ StatusCode: m.getLogsStatusCode, Body: io.NopCloser(strings.NewReader(m.getLogsRespBody)), }, nil }), NegotiatedSerializer: scheme.Codecs.WithoutConversion(), VersionedAPIPath: fmt.Sprintf("/api/v1/namespaces/default/pods/%s/log", podName), } return fakeClient.Request() } func TestPodWatcher(t *testing.T) { testCases := []struct { name string getLogsRespBody string getLogsStatusCode int getLogsErr error wantOut []string wantErr string }{ { name: "unexpected error getting logs", getLogsErr: errors.New("nope"), wantOut: nil, wantErr: "nope", }, { name: "recoverable error getting logs", getLogsErr: &apierrors.StatusError{ErrStatus: metav1.Status{Message: "is waiting to start: ContainerCreating", Reason: metav1.StatusReasonBadRequest}}, wantOut: nil, wantErr: context.DeadlineExceeded.Error(), }, { name: "success", getLogsRespBody: "some logs", getLogsStatusCode: http.StatusOK, wantOut: []string{"[foo] some logs", "[bar] some logs"}, wantErr: context.DeadlineExceeded.Error(), }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() podsWatcher := watch.NewFake() defer podsWatcher.Stop() clientset := mockClientset{ getLogsRespBody: tc.getLogsRespBody, getLogsStatusCode: tc.getLogsStatusCode, getLogsErr: tc.getLogsErr, podsWatcher: podsWatcher, } clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) client := logs.KubernetesClient{Interface: &clientset} selector := metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}} go func() { pods := []*corev1.Pod{ { ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}, }, { ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}, }, { ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodPending}, }, } for _, pod := range pods { podsWatcher.Add(pod) time.Sleep(time.Millisecond * 250) } }() var buf bytes.Buffer pw := logs.NewPodWatcher(client, "mycontainer", &selector, &buf) err := pw.WatchPods(ctx) if tc.wantErr == "" { require.NoError(t, err) } else { require.Contains(t, err.Error(), tc.wantErr) } if tc.wantOut != nil { assert.ElementsMatch(t, tc.wantOut, splitBuf(&buf)) } }) } }