159 lines
5.6 KiB
Go
159 lines
5.6 KiB
Go
package logs_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.netflux.io/rob/kubectl-persistent-logger/logs"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
dynamicclient "k8s.io/client-go/dynamic/fake"
|
|
testclient "k8s.io/client-go/kubernetes/fake"
|
|
k8stest "k8s.io/client-go/testing"
|
|
)
|
|
|
|
type mockPodWatcher struct{ err error }
|
|
|
|
func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err }
|
|
func (m *mockPodWatcher) Close() {}
|
|
|
|
func mockPodwatcherFunc(err error) logs.PodWatcherFunc {
|
|
return func(logs.KubernetesClient, string, string, labels.Selector, io.Writer, *log.Logger) logs.PodWatcherInterface {
|
|
return &mockPodWatcher{err: err}
|
|
}
|
|
}
|
|
|
|
func discardLogger() *log.Logger {
|
|
return log.New(io.Discard, "", 0)
|
|
}
|
|
|
|
func buildDeployment(t *testing.T, name string) *unstructured.Unstructured {
|
|
deployment := new(unstructured.Unstructured)
|
|
deployment.SetAPIVersion("v1")
|
|
deployment.SetKind("deployment")
|
|
deployment.SetName("mydeployment")
|
|
deployment.SetNamespace("default")
|
|
deployment.SetUID(types.UID("foo"))
|
|
deployment.SetResourceVersion("1")
|
|
require.NoError(t, unstructured.SetNestedField(deployment.Object, map[string]any{"app": "myapp"}, "spec", "selector", "matchLabels"))
|
|
return deployment
|
|
}
|
|
|
|
func TestWatcherStrictExist(t *testing.T) {
|
|
client := logs.KubernetesClient{Untyped: dynamicclient.NewSimpleDynamicClient(runtime.NewScheme())}
|
|
|
|
var buf bytes.Buffer
|
|
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default", StrictExist: true}
|
|
watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(nil), &buf, discardLogger())
|
|
|
|
err := watcher.Watch(context.Background())
|
|
assert.EqualError(t, err, `deployments.apps "mydeployment" not found`)
|
|
}
|
|
|
|
func TestWatcherPodWatcherError(t *testing.T) {
|
|
deploymentsWatcher := watch.NewFake()
|
|
|
|
untypedClient := dynamicclient.NewSimpleDynamicClient(runtime.NewScheme())
|
|
untypedClient.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil))
|
|
client := logs.KubernetesClient{Untyped: untypedClient}
|
|
|
|
var buf bytes.Buffer
|
|
wantErr := errors.New("foo")
|
|
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"}
|
|
watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(wantErr), &buf, discardLogger())
|
|
|
|
go func() {
|
|
defer deploymentsWatcher.Stop()
|
|
|
|
deployment := buildDeployment(t, "mydeployment")
|
|
deploymentsWatcher.Add(deployment)
|
|
}()
|
|
|
|
err := watcher.Watch(context.Background())
|
|
assert.EqualError(t, err, wantErr.Error())
|
|
}
|
|
|
|
func TestWatcherClosedChannel(t *testing.T) {
|
|
deploymentsWatcher := watch.NewFake()
|
|
|
|
untypedClient := dynamicclient.NewSimpleDynamicClient(runtime.NewScheme())
|
|
untypedClient.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil))
|
|
client := logs.KubernetesClient{Untyped: untypedClient}
|
|
|
|
var buf bytes.Buffer
|
|
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"}
|
|
watcher := logs.NewWatcher(params, client, nil, &buf, discardLogger())
|
|
// Immediately stop the watcher, which closes the ResultChan.
|
|
// This should be expected to be handled.
|
|
deploymentsWatcher.Stop()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
|
|
defer cancel()
|
|
|
|
err := watcher.Watch(ctx)
|
|
require.Equal(t, context.DeadlineExceeded, err)
|
|
assert.Equal(t, "", buf.String())
|
|
}
|
|
|
|
func TestWatcherWithPodWatcher(t *testing.T) {
|
|
// TODO: use watcher.Close() to clean up watcher instead of relying on
|
|
// context, which leads to occasional race conditions in tests.
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
|
|
defer cancel()
|
|
|
|
deploymentsWatcher := watch.NewFake()
|
|
defer deploymentsWatcher.Stop()
|
|
|
|
podsWatcher := watch.NewFake()
|
|
defer podsWatcher.Stop()
|
|
|
|
typedClient := testclient.NewSimpleClientset()
|
|
typedClient.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil))
|
|
untypedClient := dynamicclient.NewSimpleDynamicClient(runtime.NewScheme())
|
|
untypedClient.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil))
|
|
client := logs.KubernetesClient{Typed: typedClient, Untyped: untypedClient}
|
|
|
|
go func() {
|
|
deployment := buildDeployment(t, "mydeployment")
|
|
deploymentsWatcher.Add(deployment)
|
|
time.Sleep(time.Millisecond * 250)
|
|
|
|
pods := []*corev1.Pod{
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodPending}},
|
|
}
|
|
for _, pod := range pods {
|
|
podsWatcher.Add(pod)
|
|
time.Sleep(time.Millisecond * 250)
|
|
}
|
|
}()
|
|
|
|
var buf bytes.Buffer
|
|
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"}
|
|
watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &buf, discardLogger())
|
|
|
|
err := watcher.Watch(ctx)
|
|
require.EqualError(t, err, context.DeadlineExceeded.Error())
|
|
lines := bufToLines(&buf)
|
|
require.Len(t, lines, 2)
|
|
assert.ElementsMatch(t, []string{"[foo] fake logs", "[bar] fake logs"}, lines)
|
|
}
|
|
|
|
func bufToLines(buf *bytes.Buffer) []string {
|
|
return strings.Split(strings.TrimSpace(buf.String()), "\n")
|
|
}
|