From 969b0db9fca1de3c73c96758e57c1d8f1de8e9d0 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Tue, 14 Jun 2022 07:47:38 +0200 Subject: [PATCH] podWatcher: pass namespace --- logs/pod_watcher.go | 9 +++++---- logs/pod_watcher_test.go | 6 +++--- logs/watcher.go | 3 ++- logs/watcher_test.go | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/logs/pod_watcher.go b/logs/pod_watcher.go index 86665f0..6a21ecd 100644 --- a/logs/pod_watcher.go +++ b/logs/pod_watcher.go @@ -55,6 +55,7 @@ func newRecoverableError(err error, stream stream) *streamError { // PodWatcher consumes and merges the logs for a specified set of pods. type PodWatcher struct { client KubernetesClient + namespace string container string labelSelector labels.Selector spec map[string]*corev1.Pod @@ -66,9 +67,10 @@ type PodWatcher struct { } // NewPodWatcher initializes a new PodWatcher. -func NewPodWatcher(client KubernetesClient, container string, labelSelector labels.Selector, dst io.Writer, logger *log.Logger) PodWatcherInterface { +func NewPodWatcher(client KubernetesClient, namespace string, container string, labelSelector labels.Selector, dst io.Writer, logger *log.Logger) PodWatcherInterface { return &PodWatcher{ client: client, + namespace: namespace, container: container, labelSelector: labelSelector, spec: make(map[string]*corev1.Pod), @@ -96,14 +98,13 @@ func (pw *PodWatcher) WatchPods(ctx context.Context) error { const tickerInterval = time.Millisecond * 250 -// Close terminates the PodWatcher and should be called at most once. +// Close terminates the PodWatcher. func (pw *PodWatcher) Close() { pw.closeChan <- struct{}{} } func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { - // TODO: pass namespace - podsClient := pw.client.Typed.CoreV1().Pods(corev1.NamespaceDefault) + podsClient := pw.client.Typed.CoreV1().Pods(pw.namespace) // Returns a watcher which notifies of changes in the relevant pods. // We don't defer Stop() on the returned value because the sender is the diff --git a/logs/pod_watcher_test.go b/logs/pod_watcher_test.go index bd67a75..b81be7e 100644 --- a/logs/pod_watcher_test.go +++ b/logs/pod_watcher_test.go @@ -75,7 +75,7 @@ func TestPodWatcherClose(t *testing.T) { selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) var buf bytes.Buffer - pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf, discardLogger()) + pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger()) go func() { podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) @@ -115,7 +115,7 @@ func TestPodWatcherRemovedPod(t *testing.T) { selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) var buf bytes.Buffer - pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf, discardLogger()) + pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger()) go func() { defer pw.Close() @@ -195,7 +195,7 @@ func TestPodWatcher(t *testing.T) { selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) var buf bytes.Buffer - pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf, discardLogger()) + pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger()) go func() { for _, pod := range tc.podEvents { diff --git a/logs/watcher.go b/logs/watcher.go index 3b2dad4..aa1d2c2 100644 --- a/logs/watcher.go +++ b/logs/watcher.go @@ -34,7 +34,7 @@ type PodWatcherInterface interface { } // PodWatcherFunc builds a PodWatcher. -type PodWatcherFunc func(KubernetesClient, string, labels.Selector, io.Writer, *log.Logger) PodWatcherInterface +type PodWatcherFunc func(KubernetesClient, string, string, labels.Selector, io.Writer, *log.Logger) PodWatcherInterface // WatcherParams defines the input parameters of a Watcher. type WatcherParams struct { @@ -156,6 +156,7 @@ func (w *Watcher) addDeployment(ctx context.Context, resourceUID types.UID, podS w.podSelector = podSelector w.podWatcher = w.podWatcherFunc( w.client, + w.params.Namespace, w.params.Container, w.podSelector, w.dst, diff --git a/logs/watcher_test.go b/logs/watcher_test.go index b13e1ea..afa90c4 100644 --- a/logs/watcher_test.go +++ b/logs/watcher_test.go @@ -31,7 +31,7 @@ func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err } func (m *mockPodWatcher) Close() {} func mockPodwatcherFunc(err error) logs.PodWatcherFunc { - return func(logs.KubernetesClient, string, labels.Selector, io.Writer, *log.Logger) logs.PodWatcherInterface { + return func(logs.KubernetesClient, string, string, labels.Selector, io.Writer, *log.Logger) logs.PodWatcherInterface { return &mockPodWatcher{err: err} } }