2022-06-02 17:23:47 +00:00
|
|
|
package logs_test
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"net/http"
|
|
|
|
"strings"
|
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"git.netflux.io/rob/kubectl-persistent-logger/logs"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
2022-06-09 16:27:17 +00:00
|
|
|
"k8s.io/apimachinery/pkg/labels"
|
2022-06-02 17:23:47 +00:00
|
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
|
|
testclient "k8s.io/client-go/kubernetes/fake"
|
|
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
|
|
clientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
|
|
"k8s.io/client-go/rest"
|
|
|
|
fakerest "k8s.io/client-go/rest/fake"
|
|
|
|
k8stest "k8s.io/client-go/testing"
|
|
|
|
)
|
|
|
|
|
|
|
|
type mockClientset struct {
|
|
|
|
testclient.Clientset
|
|
|
|
clientv1.CoreV1Interface
|
|
|
|
clientv1.PodInterface
|
|
|
|
|
|
|
|
podsWatcher watch.Interface
|
|
|
|
getLogsStatusCode int
|
2022-06-12 13:26:27 +00:00
|
|
|
getLogsReaderFunc func() io.ReadCloser
|
2022-06-02 17:23:47 +00:00
|
|
|
getLogsErr error
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockClientset) CoreV1() clientv1.CoreV1Interface { return m }
|
|
|
|
func (m *mockClientset) Pods(string) clientv1.PodInterface { return m }
|
|
|
|
func (m *mockClientset) Watch(context.Context, metav1.ListOptions) (watch.Interface, error) {
|
|
|
|
return m.podsWatcher, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *mockClientset) GetLogs(podName string, _ *corev1.PodLogOptions) *rest.Request {
|
|
|
|
fakeClient := fakerest.RESTClient{
|
|
|
|
Client: fakerest.CreateHTTPClient(func(*http.Request) (*http.Response, error) {
|
|
|
|
if m.getLogsErr != nil {
|
|
|
|
return nil, m.getLogsErr
|
|
|
|
}
|
|
|
|
return &http.Response{
|
|
|
|
StatusCode: m.getLogsStatusCode,
|
2022-06-12 13:26:27 +00:00
|
|
|
Body: m.getLogsReaderFunc(),
|
2022-06-02 17:23:47 +00:00
|
|
|
}, nil
|
|
|
|
}),
|
|
|
|
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
|
|
|
|
VersionedAPIPath: fmt.Sprintf("/api/v1/namespaces/default/pods/%s/log", podName),
|
|
|
|
}
|
|
|
|
return fakeClient.Request()
|
|
|
|
}
|
|
|
|
|
2022-06-09 16:27:17 +00:00
|
|
|
func TestPodWatcherClose(t *testing.T) {
|
2022-06-03 20:20:23 +00:00
|
|
|
podsWatcher := watch.NewFake()
|
|
|
|
|
|
|
|
clientset := mockClientset{
|
2022-06-12 13:26:27 +00:00
|
|
|
getLogsReaderFunc: func() io.ReadCloser { return io.NopCloser(strings.NewReader("it worked")) },
|
2022-06-03 20:20:23 +00:00
|
|
|
getLogsStatusCode: http.StatusOK,
|
|
|
|
podsWatcher: podsWatcher,
|
|
|
|
}
|
|
|
|
clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil))
|
|
|
|
|
2022-06-09 16:27:17 +00:00
|
|
|
client := logs.KubernetesClient{Typed: &clientset}
|
|
|
|
selector := labels.SelectorFromSet(map[string]string{"foo": "bar"})
|
|
|
|
|
|
|
|
var buf bytes.Buffer
|
2022-06-14 05:47:38 +00:00
|
|
|
pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger())
|
2022-06-03 20:20:23 +00:00
|
|
|
|
|
|
|
go func() {
|
2022-06-09 16:27:17 +00:00
|
|
|
podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
// Close() should cause the watcher to return cleanly:
|
|
|
|
pw.Close()
|
2022-06-03 20:20:23 +00:00
|
|
|
}()
|
|
|
|
|
2022-06-09 16:27:17 +00:00
|
|
|
err := pw.WatchPods(context.Background())
|
|
|
|
require.NoError(t, err)
|
2022-06-03 20:20:23 +00:00
|
|
|
|
2022-06-04 00:27:20 +00:00
|
|
|
assert.Equal(t, "[foo] it worked\n", buf.String())
|
2022-06-03 20:20:23 +00:00
|
|
|
}
|
|
|
|
|
2022-06-14 05:44:55 +00:00
|
|
|
// makeReadCloserIterator returns a function which iterates through each
|
|
|
|
// provided ReadCloser.
|
2022-06-12 20:06:07 +00:00
|
|
|
func makeReadCloserIterator(rcs ...io.ReadCloser) func() io.ReadCloser {
|
|
|
|
return func() (rc io.ReadCloser) {
|
|
|
|
rc, rcs = rcs[0], rcs[1:]
|
|
|
|
return
|
|
|
|
}
|
2022-06-12 13:26:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func TestPodWatcherRemovedPod(t *testing.T) {
|
|
|
|
podsWatcher := watch.NewFake()
|
|
|
|
r1, w1 := io.Pipe()
|
|
|
|
r2, w2 := io.Pipe()
|
|
|
|
|
|
|
|
clientset := mockClientset{
|
2022-06-12 20:06:07 +00:00
|
|
|
getLogsReaderFunc: makeReadCloserIterator(r1, r2),
|
2022-06-12 13:26:27 +00:00
|
|
|
getLogsStatusCode: http.StatusOK,
|
|
|
|
podsWatcher: podsWatcher,
|
|
|
|
}
|
|
|
|
clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil))
|
|
|
|
|
|
|
|
client := logs.KubernetesClient{Typed: &clientset}
|
|
|
|
selector := labels.SelectorFromSet(map[string]string{"foo": "bar"})
|
|
|
|
|
|
|
|
var buf bytes.Buffer
|
2022-06-14 05:47:38 +00:00
|
|
|
pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger())
|
2022-06-12 13:26:27 +00:00
|
|
|
|
|
|
|
go func() {
|
|
|
|
defer pw.Close()
|
|
|
|
|
|
|
|
podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
|
|
|
time.Sleep(time.Millisecond * 500)
|
|
|
|
w1.Write([]byte("should be logged\n"))
|
|
|
|
podsWatcher.Delete(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
|
|
|
time.Sleep(time.Millisecond * 500)
|
|
|
|
w1.Write([]byte("should not be logged\n"))
|
|
|
|
time.Sleep(time.Millisecond * 500)
|
|
|
|
podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
|
|
|
time.Sleep(time.Millisecond * 500)
|
|
|
|
w2.Write([]byte("should be logged\n"))
|
|
|
|
time.Sleep(time.Millisecond * 500)
|
|
|
|
}()
|
|
|
|
|
|
|
|
err := pw.WatchPods(context.Background())
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
assert.Equal(t, "[foo] should be logged\n[bar] should be logged\n", buf.String())
|
|
|
|
}
|
|
|
|
|
2022-06-02 17:23:47 +00:00
|
|
|
func TestPodWatcher(t *testing.T) {
|
|
|
|
testCases := []struct {
|
|
|
|
name string
|
2022-06-09 16:27:17 +00:00
|
|
|
podEvents []*corev1.Pod
|
2022-06-02 17:23:47 +00:00
|
|
|
getLogsRespBody string
|
|
|
|
getLogsStatusCode int
|
|
|
|
getLogsErr error
|
|
|
|
wantOut []string
|
|
|
|
wantErr string
|
|
|
|
}{
|
|
|
|
{
|
2022-06-09 16:27:17 +00:00
|
|
|
name: "unexpected error getting logs",
|
|
|
|
podEvents: []*corev1.Pod{
|
|
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
|
|
|
},
|
2022-06-02 17:23:47 +00:00
|
|
|
getLogsErr: errors.New("nope"),
|
|
|
|
wantOut: nil,
|
|
|
|
wantErr: "nope",
|
|
|
|
},
|
|
|
|
{
|
2022-06-09 16:27:17 +00:00
|
|
|
name: "recoverable error getting logs",
|
|
|
|
podEvents: []*corev1.Pod{
|
|
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
|
|
|
},
|
2022-06-02 17:23:47 +00:00
|
|
|
getLogsErr: &apierrors.StatusError{ErrStatus: metav1.Status{Message: "is waiting to start: ContainerCreating", Reason: metav1.StatusReasonBadRequest}},
|
|
|
|
wantOut: nil,
|
|
|
|
},
|
|
|
|
{
|
2022-06-09 16:27:17 +00:00
|
|
|
name: "success",
|
|
|
|
podEvents: []*corev1.Pod{
|
|
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
|
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
|
|
|
{ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodPending}},
|
|
|
|
},
|
2022-06-02 17:23:47 +00:00
|
|
|
getLogsRespBody: "some logs",
|
|
|
|
getLogsStatusCode: http.StatusOK,
|
|
|
|
wantOut: []string{"[foo] some logs", "[bar] some logs"},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, tc := range testCases {
|
|
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
|
|
podsWatcher := watch.NewFake()
|
|
|
|
|
|
|
|
clientset := mockClientset{
|
2022-06-12 13:26:27 +00:00
|
|
|
getLogsReaderFunc: func() io.ReadCloser { return io.NopCloser(strings.NewReader(tc.getLogsRespBody)) },
|
2022-06-02 17:23:47 +00:00
|
|
|
getLogsStatusCode: tc.getLogsStatusCode,
|
|
|
|
getLogsErr: tc.getLogsErr,
|
|
|
|
podsWatcher: podsWatcher,
|
|
|
|
}
|
|
|
|
clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil))
|
|
|
|
|
2022-06-09 16:27:17 +00:00
|
|
|
client := logs.KubernetesClient{Typed: &clientset}
|
|
|
|
selector := labels.SelectorFromSet(map[string]string{"foo": "bar"})
|
|
|
|
|
|
|
|
var buf bytes.Buffer
|
2022-06-14 05:47:38 +00:00
|
|
|
pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger())
|
2022-06-02 17:23:47 +00:00
|
|
|
|
|
|
|
go func() {
|
2022-06-09 16:27:17 +00:00
|
|
|
for _, pod := range tc.podEvents {
|
2022-06-02 17:23:47 +00:00
|
|
|
podsWatcher.Add(pod)
|
2022-06-09 16:27:17 +00:00
|
|
|
time.Sleep(time.Millisecond * 500)
|
2022-06-02 17:23:47 +00:00
|
|
|
}
|
2022-06-09 16:27:17 +00:00
|
|
|
pw.Close()
|
2022-06-02 17:23:47 +00:00
|
|
|
}()
|
|
|
|
|
2022-06-09 16:27:17 +00:00
|
|
|
err := pw.WatchPods(context.Background())
|
2022-06-02 17:23:47 +00:00
|
|
|
|
|
|
|
if tc.wantErr == "" {
|
|
|
|
require.NoError(t, err)
|
|
|
|
} else {
|
2022-06-09 16:27:17 +00:00
|
|
|
require.Error(t, err)
|
2022-06-02 17:23:47 +00:00
|
|
|
require.Contains(t, err.Error(), tc.wantErr)
|
|
|
|
}
|
2022-06-09 16:27:17 +00:00
|
|
|
|
2022-06-02 17:23:47 +00:00
|
|
|
if tc.wantOut != nil {
|
2022-06-09 16:27:17 +00:00
|
|
|
lines := bufToLines(&buf)
|
|
|
|
require.Len(t, lines, len(tc.wantOut))
|
|
|
|
assert.ElementsMatch(t, tc.wantOut, lines)
|
2022-06-02 17:23:47 +00:00
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|