118 lines
3.7 KiB
Go
118 lines
3.7 KiB
Go
package logs_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.netflux.io/rob/kubectl-persistent-logger/logs"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
testclient "k8s.io/client-go/kubernetes/fake"
|
|
k8stest "k8s.io/client-go/testing"
|
|
)
|
|
|
|
type mockPodWatcher struct{ err error }
|
|
|
|
func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err }
|
|
func (m *mockPodWatcher) Close() {}
|
|
|
|
func mockPodwatcherFunc(err error) logs.PodWatcherFunc {
|
|
return func(logs.KubernetesClient, string, *metav1.LabelSelector, io.Writer) logs.PodWatcherInterface {
|
|
return &mockPodWatcher{err: err}
|
|
}
|
|
}
|
|
|
|
func TestWatcherAllowNonExistent(t *testing.T) {
|
|
clientset := testclient.NewSimpleClientset()
|
|
|
|
var buf bytes.Buffer
|
|
client := logs.KubernetesClient{Interface: clientset}
|
|
watcher := logs.NewWatcher("mydeployment", "mycontainer", false, client, mockPodwatcherFunc(nil), &buf)
|
|
|
|
err := watcher.Watch(context.Background())
|
|
assert.EqualError(t, err, `deployments.apps "mydeployment" not found`)
|
|
}
|
|
|
|
func TestWatcherPodWatcherError(t *testing.T) {
|
|
deploymentsWatcher := watch.NewFake()
|
|
clientset := testclient.NewSimpleClientset(
|
|
&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}},
|
|
)
|
|
clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil))
|
|
|
|
var buf bytes.Buffer
|
|
client := logs.KubernetesClient{Interface: clientset}
|
|
wantErr := errors.New("foo")
|
|
watcher := logs.NewWatcher("mydeployment", "mycontainer", false, client, mockPodwatcherFunc(wantErr), &buf)
|
|
|
|
go func() {
|
|
defer deploymentsWatcher.Stop()
|
|
deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}}
|
|
deploymentsWatcher.Add(deployment)
|
|
}()
|
|
|
|
err := watcher.Watch(context.Background())
|
|
assert.EqualError(t, err, wantErr.Error())
|
|
}
|
|
|
|
func TestWatcherWithPodWatcher(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
deploymentsWatcher := watch.NewFake()
|
|
podsWatcher := watch.NewFake()
|
|
clientset := testclient.NewSimpleClientset()
|
|
clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil))
|
|
clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil))
|
|
|
|
go func() {
|
|
defer podsWatcher.Stop()
|
|
defer deploymentsWatcher.Stop()
|
|
|
|
deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}}
|
|
deploymentsWatcher.Add(deployment)
|
|
time.Sleep(time.Millisecond * 250)
|
|
|
|
pods := []*corev1.Pod{
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"},
|
|
Status: corev1.PodStatus{Phase: corev1.PodRunning},
|
|
},
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"},
|
|
Status: corev1.PodStatus{Phase: corev1.PodRunning},
|
|
},
|
|
{
|
|
ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"},
|
|
Status: corev1.PodStatus{Phase: corev1.PodPending},
|
|
},
|
|
}
|
|
for _, pod := range pods {
|
|
podsWatcher.Add(pod)
|
|
time.Sleep(time.Millisecond * 250)
|
|
}
|
|
|
|
cancel()
|
|
}()
|
|
|
|
var buf bytes.Buffer
|
|
client := logs.KubernetesClient{Interface: clientset}
|
|
watcher := logs.NewWatcher("mydeployment", "mycontainer", true, client, logs.NewPodWatcher, &buf)
|
|
|
|
err := watcher.Watch(ctx)
|
|
require.EqualError(t, err, context.Canceled.Error())
|
|
assert.ElementsMatch(t, []string{"[foo] fake logs", "[bar] fake logs"}, splitBuf(&buf))
|
|
}
|
|
|
|
func splitBuf(buf *bytes.Buffer) []string {
|
|
return strings.Split(strings.TrimSpace(buf.String()), "\n")
|
|
}
|