diff --git a/README.md b/README.md index 2e41c22..2fe6558 100644 --- a/README.md +++ b/README.md @@ -18,8 +18,8 @@ kubectl-persistent-logger -deployment mydeploymentname ## TODO - [ ] Implement for other relevant resource types (replica set, stateful set) -- [ ] Pass other options back to logger backend (--previous, --container, etc) -- [ ] Test coverage +- [ ] Pass other options back to logger backend (--container, etc) +- [x] Test coverage - [ ] Improve context handling - [ ] Remove/hide debug logging diff --git a/go.mod b/go.mod index 278acfa..cebb9c4 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module git.netflux.io/rob/kubectl-persistent-logger go 1.18 require ( + github.com/stretchr/testify v1.7.0 k8s.io/api v0.24.0 k8s.io/apimachinery v0.24.0 k8s.io/client-go v0.24.0 @@ -13,6 +14,7 @@ require ( github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful v2.9.5+incompatible // indirect + github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/go-logr/logr v1.2.0 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.19.5 // indirect @@ -28,6 +30,8 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/pflag v1.0.5 // indirect golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect diff --git a/go.sum b/go.sum index 1837262..03d5288 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= @@ -225,6 +226,7 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/logs/stream.go b/logs/stream.go index 237e553..54adeb1 100644 --- a/logs/stream.go +++ b/logs/stream.go @@ -8,7 +8,6 @@ import ( "sync" corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes" ) // concurrentWriter implements io.Writer. @@ -33,13 +32,13 @@ func (cw *concurrentWriter) Write(p []byte) (int, error) { // Stream represents the logstream from an individual pod. type Stream struct { - clientset *kubernetes.Clientset + clientset KubernetesClient pod *corev1.Pod dst io.Writer } // NewStream initializes a new Stream. -func NewStream(clientset *kubernetes.Clientset, pod *corev1.Pod, w io.Writer) *Stream { +func NewStream(clientset KubernetesClient, pod *corev1.Pod, w io.Writer) *Stream { return &Stream{ clientset: clientset, pod: pod, diff --git a/logs/stream_test.go b/logs/stream_test.go new file mode 100644 index 0000000..62f6478 --- /dev/null +++ b/logs/stream_test.go @@ -0,0 +1,26 @@ +package logs_test + +import ( + "bytes" + "context" + "testing" + + "git.netflux.io/rob/kubectl-persistent-logger/logs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testclient "k8s.io/client-go/kubernetes/fake" +) + +func TestStream(t *testing.T) { + client := logs.KubernetesClient{Interface: testclient.NewSimpleClientset()} + + var buf bytes.Buffer + pod := corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}} + stream := logs.NewStream(client, &pod, &buf) + + err := stream.Copy(context.Background()) + require.NoError(t, err) + assert.Equal(t, "[foo] fake logs\n", buf.String()) +} diff --git a/logs/watcher.go b/logs/watcher.go index 9500bc3..7f24b4f 100644 --- a/logs/watcher.go +++ b/logs/watcher.go @@ -4,7 +4,6 @@ import ( "context" "io" "log" - "os" "time" corev1 "k8s.io/api/core/v1" @@ -14,28 +13,34 @@ import ( "k8s.io/client-go/kubernetes" ) +type KubernetesClient struct { + kubernetes.Interface +} + // Watcher watches a deployment and tails the logs for its currently active // pods. type Watcher struct { deployName string - clientset *kubernetes.Clientset + clientset KubernetesClient spec map[string]*corev1.Pod status map[string]*Stream + dst io.Writer } // NewWatcher creates a new Watcher. -func NewWatcher(deployName string, clientset *kubernetes.Clientset) *Watcher { +func NewWatcher(deployName string, clientset KubernetesClient, dst io.Writer) *Watcher { return &Watcher{ deployName: deployName, clientset: clientset, spec: make(map[string]*corev1.Pod), status: make(map[string]*Stream), + dst: dst, } } // Watch watches a deployment. func (w *Watcher) Watch(ctx context.Context) error { - dst := NewConcurrentWriter(os.Stdout) + dst := NewConcurrentWriter(w.dst) ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -65,6 +70,9 @@ func (w *Watcher) Watch(ctx context.Context) error { for { select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: // Iterate through the desired state (w.spec) and launch goroutines to // process the logs of any missing pods. diff --git a/logs/watcher_test.go b/logs/watcher_test.go new file mode 100644 index 0000000..6d87d9f --- /dev/null +++ b/logs/watcher_test.go @@ -0,0 +1,70 @@ +package logs_test + +import ( + "bytes" + "context" + "testing" + "time" + + "git.netflux.io/rob/kubectl-persistent-logger/logs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + testclient "k8s.io/client-go/kubernetes/fake" + k8stest "k8s.io/client-go/testing" +) + +func TestWatcher(t *testing.T) { + clientset := testclient.NewSimpleClientset( + &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mydeployment", + Namespace: "default", + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"mylabelname": "mylabelvalue"}, + }, + }, + }, + ) + + k8swatcher := watch.NewFake() + clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(k8swatcher, nil)) + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, + Status: corev1.PodStatus{Phase: corev1.PodPending}, + }, + } + + go func() { + defer k8swatcher.Stop() + for _, pod := range pods { + time.Sleep(time.Millisecond * 300) + k8swatcher.Add(pod) + } + }() + + client := logs.KubernetesClient{Interface: clientset} + + var buf bytes.Buffer + watcher := logs.NewWatcher("mydeployment", client, &buf) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + err := watcher.Watch(ctx) + require.EqualError(t, err, context.DeadlineExceeded.Error()) + assert.Equal(t, "[foo] fake logs\n[bar] fake logs\n", buf.String()) +} diff --git a/main.go b/main.go index db15477..55c8a30 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "flag" "log" + "os" "path/filepath" "git.netflux.io/rob/kubectl-persistent-logger/logs" @@ -34,7 +35,7 @@ func main() { } ctx := context.Background() - watcher := logs.NewWatcher(*deployName, clientset) + watcher := logs.NewWatcher(*deployName, logs.KubernetesClient{Interface: clientset}, os.Stdout) if err := watcher.Watch(ctx); err != nil { log.Fatal(err) }