kubectl-persistent-logger/logs/watcher_test.go

139 lines
4.5 KiB
Go

package logs_test
import (
"bytes"
"context"
"errors"
"io"
"strings"
"testing"
"time"
"git.netflux.io/rob/kubectl-persistent-logger/logs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
testclient "k8s.io/client-go/kubernetes/fake"
k8stest "k8s.io/client-go/testing"
)
type mockPodWatcher struct{ err error }
func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err }
func (m *mockPodWatcher) Close() {}
func mockPodwatcherFunc(err error) logs.PodWatcherFunc {
return func(logs.KubernetesClient, string, *metav1.LabelSelector, io.Writer) logs.PodWatcherInterface {
return &mockPodWatcher{err: err}
}
}
func TestWatcherAllowNonExistent(t *testing.T) {
clientset := testclient.NewSimpleClientset()
var buf bytes.Buffer
client := logs.KubernetesClient{Interface: clientset}
watcher := logs.NewWatcher("mydeployment", "mycontainer", true, client, mockPodwatcherFunc(nil), &buf)
err := watcher.Watch(context.Background())
assert.EqualError(t, err, `deployments.apps "mydeployment" not found`)
}
func TestWatcherPodWatcherError(t *testing.T) {
deploymentsWatcher := watch.NewFake()
clientset := testclient.NewSimpleClientset(
&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}},
)
clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil))
var buf bytes.Buffer
client := logs.KubernetesClient{Interface: clientset}
wantErr := errors.New("foo")
watcher := logs.NewWatcher("mydeployment", "mycontainer", true, client, mockPodwatcherFunc(wantErr), &buf)
go func() {
defer deploymentsWatcher.Stop()
deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}}
deploymentsWatcher.Add(deployment)
}()
err := watcher.Watch(context.Background())
assert.EqualError(t, err, wantErr.Error())
}
func TestWatcherClosedChannel(t *testing.T) {
deploymentsWatcher := watch.NewFake()
clientset := testclient.NewSimpleClientset(
&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}},
)
clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil))
var buf bytes.Buffer
client := logs.KubernetesClient{Interface: clientset}
watcher := logs.NewWatcher("mydeployment", "mycontainer", true, client, nil, &buf)
go deploymentsWatcher.Stop()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
err := watcher.Watch(ctx)
require.Equal(t, context.DeadlineExceeded, err)
assert.Equal(t, "", buf.String())
}
func TestWatcherWithPodWatcher(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
deploymentsWatcher := watch.NewFake()
defer deploymentsWatcher.Stop()
podsWatcher := watch.NewFake()
defer podsWatcher.Stop()
clientset := testclient.NewSimpleClientset()
clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil))
clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil))
go func() {
deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}}
deploymentsWatcher.Add(deployment)
time.Sleep(time.Millisecond * 250)
pods := []*corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"},
Status: corev1.PodStatus{Phase: corev1.PodPending},
},
}
for _, pod := range pods {
podsWatcher.Add(pod)
time.Sleep(time.Millisecond * 250)
}
}()
var buf bytes.Buffer
client := logs.KubernetesClient{Interface: clientset}
watcher := logs.NewWatcher("mydeployment", "mycontainer", false, client, logs.NewPodWatcher, &buf)
err := watcher.Watch(ctx)
require.EqualError(t, err, context.DeadlineExceeded.Error())
lines := bufToLines(&buf)
assert.Len(t, lines, 2)
assert.ElementsMatch(t, []string{"[foo] fake logs", "[bar] fake logs"}, bufToLines(&buf))
}
func bufToLines(buf *bytes.Buffer) []string {
return strings.Split(strings.TrimSpace(buf.String()), "\n")
}