diff --git a/logs/pod_watcher.go b/logs/pod_watcher.go index 5e94c91..ae0b678 100644 --- a/logs/pod_watcher.go +++ b/logs/pod_watcher.go @@ -48,10 +48,11 @@ type PodWatcher struct { streamResults chan error dst io.Writer closeChan chan struct{} + logger *log.Logger } // NewPodWatcher initializes a new PodWatcher. -func NewPodWatcher(client KubernetesClient, container string, labelSelector labels.Selector, dst io.Writer) PodWatcherInterface { +func NewPodWatcher(client KubernetesClient, container string, labelSelector labels.Selector, dst io.Writer, logger *log.Logger) PodWatcherInterface { return &PodWatcher{ client: client, container: container, @@ -61,6 +62,7 @@ func NewPodWatcher(client KubernetesClient, container string, labelSelector labe streamResults: make(chan error), dst: dst, closeChan: make(chan struct{}), + logger: logger, } } @@ -73,7 +75,7 @@ func (pw *PodWatcher) WatchPods(ctx context.Context) error { err := pw.watchPods(ctx, &wg) wg.Wait() - log.Println("[PodWatcher] all goroutines exited, exiting") + pw.logger.Println("[PodWatcher] all goroutines exited, exiting") return err } @@ -117,7 +119,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { for podName, pod := range pw.spec { pod := pod if _, ok := pw.status[podName]; !ok { - log.Printf("[PodWatcher] adding pod, name = %s", pod.Name) + pw.logger.Printf("[PodWatcher] adding pod, name = %s", pod.Name) pw.status[pod.Name] = true wg.Add(1) go func() { @@ -163,7 +165,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { } func (pw *PodWatcher) removePod(podName string) { - log.Printf("[PodWatcher] removing pod, name = %s", podName) + pw.logger.Printf("[PodWatcher] removing pod, name = %s", podName) delete(pw.status, podName) } diff --git a/logs/pod_watcher_test.go b/logs/pod_watcher_test.go index 3ed9ab7..df3a0b5 100644 --- a/logs/pod_watcher_test.go +++ b/logs/pod_watcher_test.go @@ -76,7 +76,7 @@ func TestPodWatcherClose(t *testing.T) { selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) var buf bytes.Buffer - pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf) + pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf, discardLogger()) go func() { podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) @@ -148,7 +148,7 @@ func TestPodWatcher(t *testing.T) { selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) var buf bytes.Buffer - pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf) + pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf, discardLogger()) go func() { for _, pod := range tc.podEvents { diff --git a/logs/watcher.go b/logs/watcher.go index 1f2744b..e2f31b5 100644 --- a/logs/watcher.go +++ b/logs/watcher.go @@ -3,8 +3,10 @@ package logs import ( "context" "errors" + "fmt" "io" "log" + "os" "sync" "time" @@ -48,7 +50,7 @@ type PodWatcherInterface interface { } // PodWatcherFunc builds a PodWatcher. -type PodWatcherFunc func(KubernetesClient, string, labels.Selector, io.Writer) PodWatcherInterface +type PodWatcherFunc func(KubernetesClient, string, labels.Selector, io.Writer, *log.Logger) PodWatcherInterface // WatcherParams defines the input parameters of a Watcher. type WatcherParams struct { @@ -70,16 +72,18 @@ type Watcher struct { podWatcherFunc PodWatcherFunc errChan chan error dst *concurrentWriter + logger *log.Logger } // NewWatcher creates a new Watcher. -func NewWatcher(params WatcherParams, client KubernetesClient, podWatcherFunc PodWatcherFunc, dst io.Writer) *Watcher { +func NewWatcher(params WatcherParams, client KubernetesClient, podWatcherFunc PodWatcherFunc, dst io.Writer, logger *log.Logger) *Watcher { return &Watcher{ params: params, client: client, podWatcherFunc: podWatcherFunc, errChan: make(chan error), dst: &concurrentWriter{w: dst}, + logger: logger, } } @@ -130,7 +134,7 @@ func (w *Watcher) Watch(ctx context.Context) error { if !ok || err != nil { // matchLabels don't exist or cannot be parsed. // Should this be fatal? - log.Printf("warning: unable to parse matchLabels: ok = %t, err = %v", ok, err) + w.logger.Printf("warning: unable to parse matchLabels: ok = %t, err = %v", ok, err) continue } selector := labels.SelectorFromSet(selectorAsMap) @@ -151,7 +155,7 @@ func (w *Watcher) checkResourceExists(ctx context.Context, namespace string, res _, err := w.client.Untyped.Resource(resourceID).Namespace(namespace).Get(ctx, w.params.Name, metav1.GetOptions{}) var statusErr *apierrors.StatusError if !w.params.StrictExist && errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonNotFound { - log.Printf(`%s "%s" does not exist, waiting`, resourceID.Resource, w.params.Name) + fmt.Fprintf(os.Stderr, "%s \"%s\" does not exist, waiting\n", resourceID.Resource, w.params.Name) return nil } return err @@ -164,7 +168,7 @@ func (w *Watcher) addDeployment(ctx context.Context, resourceUID types.UID, podS w.removeDeployment() - log.Println("[DeploymentWatcher] add podWatcher") + w.logger.Println("[DeploymentWatcher] add podWatcher") w.resourceUID = resourceUID w.podSelector = podSelector @@ -173,6 +177,7 @@ func (w *Watcher) addDeployment(ctx context.Context, resourceUID types.UID, podS w.params.Container, w.podSelector, w.dst, + w.logger, ) go func() { @@ -184,7 +189,7 @@ func (w *Watcher) addDeployment(ctx context.Context, resourceUID types.UID, podS func (w *Watcher) removeDeployment() { if w.podWatcher != nil { - log.Println("[DeploymentWatcher] remove podWatcher") + w.logger.Println("[DeploymentWatcher] remove podWatcher") w.podWatcher.Close() w.podWatcher = nil } diff --git a/logs/watcher_test.go b/logs/watcher_test.go index 8ef7feb..b13e1ea 100644 --- a/logs/watcher_test.go +++ b/logs/watcher_test.go @@ -5,6 +5,7 @@ import ( "context" "errors" "io" + "log" "strings" "testing" "time" @@ -30,11 +31,15 @@ func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err } func (m *mockPodWatcher) Close() {} func mockPodwatcherFunc(err error) logs.PodWatcherFunc { - return func(logs.KubernetesClient, string, labels.Selector, io.Writer) logs.PodWatcherInterface { + return func(logs.KubernetesClient, string, labels.Selector, io.Writer, *log.Logger) logs.PodWatcherInterface { return &mockPodWatcher{err: err} } } +func discardLogger() *log.Logger { + return log.New(io.Discard, "", 0) +} + func buildDeployment(t *testing.T, name string) *unstructured.Unstructured { deployment := new(unstructured.Unstructured) deployment.SetAPIVersion("v1") @@ -51,7 +56,7 @@ func TestWatcherStrictExist(t *testing.T) { var buf bytes.Buffer params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default", StrictExist: true} - watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(nil), &buf) + watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(nil), &buf, discardLogger()) err := watcher.Watch(context.Background()) assert.EqualError(t, err, `deployments.apps "mydeployment" not found`) @@ -67,7 +72,7 @@ func TestWatcherPodWatcherError(t *testing.T) { var buf bytes.Buffer wantErr := errors.New("foo") params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"} - watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(wantErr), &buf) + watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(wantErr), &buf, discardLogger()) go func() { defer deploymentsWatcher.Stop() @@ -89,7 +94,7 @@ func TestWatcherClosedChannel(t *testing.T) { var buf bytes.Buffer params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"} - watcher := logs.NewWatcher(params, client, nil, &buf) + watcher := logs.NewWatcher(params, client, nil, &buf, discardLogger()) // Immediately stop the watcher, which closes the ResultChan. // This should be expected to be handled. deploymentsWatcher.Stop() @@ -136,7 +141,7 @@ func TestWatcherWithPodWatcher(t *testing.T) { var buf bytes.Buffer params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"} - watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &buf) + watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &buf, discardLogger()) err := watcher.Watch(ctx) require.EqualError(t, err, context.DeadlineExceeded.Error()) diff --git a/main.go b/main.go index 7ee2b22..9989571 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "io" "log" "os" @@ -23,11 +24,13 @@ func main() { var ( container string strictExist bool + debug bool version bool ) flag.StringVar(&container, "container", "", "name of a specific container") flag.BoolVar(&strictExist, "strict", false, "require deployment to exist on launch") + flag.BoolVar(&debug, "debug", false, "enable debug logging to stderr") flag.BoolVar(&version, "version", false, "print version and exit") flag.Parse() @@ -36,6 +39,11 @@ func main() { os.Exit(0) } + logger := log.New(io.Discard, "", 0) + if debug { + logger = log.New(os.Stderr, "", 0) + } + var params logs.WatcherParams switch len(flag.Args()) { @@ -79,6 +87,7 @@ func main() { logs.KubernetesClient{Typed: clientset, Untyped: dynamicclient}, logs.NewPodWatcher, os.Stdout, + logger, ) if err := watcher.Watch(ctx); err != nil { log.Fatal(err)