package logs_test import ( "bytes" "context" "errors" "io" "log" "strings" "testing" "time" "git.netflux.io/rob/kubectl-persistent-logger/logs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" dynamicclient "k8s.io/client-go/dynamic/fake" testclient "k8s.io/client-go/kubernetes/fake" k8stest "k8s.io/client-go/testing" ) type mockPodWatcher struct{ err error } func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err } func (m *mockPodWatcher) Close() {} func mockPodwatcherFunc(err error) logs.PodWatcherFunc { return func(logs.KubernetesClient, string, labels.Selector, io.Writer, *log.Logger) logs.PodWatcherInterface { return &mockPodWatcher{err: err} } } func discardLogger() *log.Logger { return log.New(io.Discard, "", 0) } func buildDeployment(t *testing.T, name string) *unstructured.Unstructured { deployment := new(unstructured.Unstructured) deployment.SetAPIVersion("v1") deployment.SetKind("deployment") deployment.SetName("mydeployment") deployment.SetNamespace("default") deployment.SetUID(types.UID("foo")) require.NoError(t, unstructured.SetNestedField(deployment.Object, map[string]any{"app": "myapp"}, "spec", "selector", "matchLabels")) return deployment } func TestWatcherStrictExist(t *testing.T) { client := logs.KubernetesClient{Untyped: dynamicclient.NewSimpleDynamicClient(runtime.NewScheme())} var buf bytes.Buffer params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default", StrictExist: true} watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(nil), &buf, discardLogger()) err := watcher.Watch(context.Background()) assert.EqualError(t, err, `deployments.apps "mydeployment" not found`) } func TestWatcherPodWatcherError(t *testing.T) { deploymentsWatcher := watch.NewFake() untypedClient := dynamicclient.NewSimpleDynamicClient(runtime.NewScheme()) untypedClient.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) client := logs.KubernetesClient{Untyped: untypedClient} var buf bytes.Buffer wantErr := errors.New("foo") params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"} watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(wantErr), &buf, discardLogger()) go func() { defer deploymentsWatcher.Stop() deployment := buildDeployment(t, "mydeployment") deploymentsWatcher.Add(deployment) }() err := watcher.Watch(context.Background()) assert.EqualError(t, err, wantErr.Error()) } func TestWatcherClosedChannel(t *testing.T) { deploymentsWatcher := watch.NewFake() untypedClient := dynamicclient.NewSimpleDynamicClient(runtime.NewScheme()) untypedClient.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) client := logs.KubernetesClient{Untyped: untypedClient} var buf bytes.Buffer params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"} watcher := logs.NewWatcher(params, client, nil, &buf, discardLogger()) // Immediately stop the watcher, which closes the ResultChan. // This should be expected to be handled. deploymentsWatcher.Stop() ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) defer cancel() err := watcher.Watch(ctx) require.Equal(t, context.DeadlineExceeded, err) assert.Equal(t, "", buf.String()) } func TestWatcherWithPodWatcher(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() deploymentsWatcher := watch.NewFake() defer deploymentsWatcher.Stop() podsWatcher := watch.NewFake() defer podsWatcher.Stop() typedClient := testclient.NewSimpleClientset() typedClient.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil)) untypedClient := dynamicclient.NewSimpleDynamicClient(runtime.NewScheme()) untypedClient.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil)) client := logs.KubernetesClient{Typed: typedClient, Untyped: untypedClient} go func() { deployment := buildDeployment(t, "mydeployment") deploymentsWatcher.Add(deployment) time.Sleep(time.Millisecond * 250) pods := []*corev1.Pod{ {ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, {ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}, {ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodPending}}, } for _, pod := range pods { podsWatcher.Add(pod) time.Sleep(time.Millisecond * 250) } }() var buf bytes.Buffer params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"} watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &buf, discardLogger()) err := watcher.Watch(ctx) require.EqualError(t, err, context.DeadlineExceeded.Error()) lines := bufToLines(&buf) require.Len(t, lines, 2) assert.ElementsMatch(t, []string{"[foo] fake logs", "[bar] fake logs"}, lines) } func bufToLines(buf *bytes.Buffer) []string { return strings.Split(strings.TrimSpace(buf.String()), "\n") }