package logs_test import ( "bytes" "context" "errors" "io" "strings" "testing" "time" "git.netflux.io/rob/kubectl-persistent-logger/logs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" testclient "k8s.io/client-go/kubernetes/fake" k8stest "k8s.io/client-go/testing" ) type mockPodWatcher struct{ err error } func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err } func (m *mockPodWatcher) Close() {} func mockPodwatcherFunc(err error) logs.PodWatcherFunc { return func(logs.KubernetesClient, string, *metav1.LabelSelector, io.Writer) logs.PodWatcherInterface { return &mockPodWatcher{err: err} } } func TestWatcherAllowNonExistent(t *testing.T) { clientset := testclient.NewSimpleClientset() var buf bytes.Buffer client := logs.KubernetesClient{Interface: clientset} watcher := logs.NewWatcher("mydeployment", "mycontainer", true, client, mockPodwatcherFunc(nil), &buf) err := watcher.Watch(context.Background()) assert.EqualError(t, err, `deployments.apps "mydeployment" not found`) } func TestWatcherPodWatcherError(t *testing.T) { deploymentsWatcher := watch.NewFake() clientset := testclient.NewSimpleClientset( &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}}, ) clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) var buf bytes.Buffer client := logs.KubernetesClient{Interface: clientset} wantErr := errors.New("foo") watcher := logs.NewWatcher("mydeployment", "mycontainer", true, client, mockPodwatcherFunc(wantErr), &buf) go func() { defer deploymentsWatcher.Stop() deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}} deploymentsWatcher.Add(deployment) }() err := watcher.Watch(context.Background()) assert.EqualError(t, err, wantErr.Error()) } func TestWatcherWithPodWatcher(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) deploymentsWatcher := watch.NewFake() podsWatcher := watch.NewFake() clientset := testclient.NewSimpleClientset() clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) go func() { defer podsWatcher.Stop() defer deploymentsWatcher.Stop() deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}} deploymentsWatcher.Add(deployment) time.Sleep(time.Millisecond * 250) pods := []*corev1.Pod{ { ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}, }, { ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}, }, { ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodPending}, }, } for _, pod := range pods { podsWatcher.Add(pod) time.Sleep(time.Millisecond * 250) } cancel() }() var buf bytes.Buffer client := logs.KubernetesClient{Interface: clientset} watcher := logs.NewWatcher("mydeployment", "mycontainer", false, client, logs.NewPodWatcher, &buf) err := watcher.Watch(ctx) require.EqualError(t, err, context.Canceled.Error()) assert.ElementsMatch(t, []string{"[foo] fake logs", "[bar] fake logs"}, splitBuf(&buf)) } func splitBuf(buf *bytes.Buffer) []string { return strings.Split(strings.TrimSpace(buf.String()), "\n") }