podWatcher: pass namespace
continuous-integration/drone/push Build is failing Details

This commit is contained in:
Rob Watson 2022-06-14 07:47:38 +02:00
parent e86d23fdca
commit 969b0db9fc
4 changed files with 11 additions and 9 deletions

View File

@ -55,6 +55,7 @@ func newRecoverableError(err error, stream stream) *streamError {
// PodWatcher consumes and merges the logs for a specified set of pods. // PodWatcher consumes and merges the logs for a specified set of pods.
type PodWatcher struct { type PodWatcher struct {
client KubernetesClient client KubernetesClient
namespace string
container string container string
labelSelector labels.Selector labelSelector labels.Selector
spec map[string]*corev1.Pod spec map[string]*corev1.Pod
@ -66,9 +67,10 @@ type PodWatcher struct {
} }
// NewPodWatcher initializes a new PodWatcher. // NewPodWatcher initializes a new PodWatcher.
func NewPodWatcher(client KubernetesClient, container string, labelSelector labels.Selector, dst io.Writer, logger *log.Logger) PodWatcherInterface { func NewPodWatcher(client KubernetesClient, namespace string, container string, labelSelector labels.Selector, dst io.Writer, logger *log.Logger) PodWatcherInterface {
return &PodWatcher{ return &PodWatcher{
client: client, client: client,
namespace: namespace,
container: container, container: container,
labelSelector: labelSelector, labelSelector: labelSelector,
spec: make(map[string]*corev1.Pod), spec: make(map[string]*corev1.Pod),
@ -96,14 +98,13 @@ func (pw *PodWatcher) WatchPods(ctx context.Context) error {
const tickerInterval = time.Millisecond * 250 const tickerInterval = time.Millisecond * 250
// Close terminates the PodWatcher and should be called at most once. // Close terminates the PodWatcher.
func (pw *PodWatcher) Close() { func (pw *PodWatcher) Close() {
pw.closeChan <- struct{}{} pw.closeChan <- struct{}{}
} }
func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error {
// TODO: pass namespace podsClient := pw.client.Typed.CoreV1().Pods(pw.namespace)
podsClient := pw.client.Typed.CoreV1().Pods(corev1.NamespaceDefault)
// Returns a watcher which notifies of changes in the relevant pods. // Returns a watcher which notifies of changes in the relevant pods.
// We don't defer Stop() on the returned value because the sender is the // We don't defer Stop() on the returned value because the sender is the

View File

@ -75,7 +75,7 @@ func TestPodWatcherClose(t *testing.T) {
selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) selector := labels.SelectorFromSet(map[string]string{"foo": "bar"})
var buf bytes.Buffer var buf bytes.Buffer
pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf, discardLogger()) pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger())
go func() { go func() {
podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}}) podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
@ -115,7 +115,7 @@ func TestPodWatcherRemovedPod(t *testing.T) {
selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) selector := labels.SelectorFromSet(map[string]string{"foo": "bar"})
var buf bytes.Buffer var buf bytes.Buffer
pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf, discardLogger()) pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger())
go func() { go func() {
defer pw.Close() defer pw.Close()
@ -195,7 +195,7 @@ func TestPodWatcher(t *testing.T) {
selector := labels.SelectorFromSet(map[string]string{"foo": "bar"}) selector := labels.SelectorFromSet(map[string]string{"foo": "bar"})
var buf bytes.Buffer var buf bytes.Buffer
pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf, discardLogger()) pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger())
go func() { go func() {
for _, pod := range tc.podEvents { for _, pod := range tc.podEvents {

View File

@ -34,7 +34,7 @@ type PodWatcherInterface interface {
} }
// PodWatcherFunc builds a PodWatcher. // PodWatcherFunc builds a PodWatcher.
type PodWatcherFunc func(KubernetesClient, string, labels.Selector, io.Writer, *log.Logger) PodWatcherInterface type PodWatcherFunc func(KubernetesClient, string, string, labels.Selector, io.Writer, *log.Logger) PodWatcherInterface
// WatcherParams defines the input parameters of a Watcher. // WatcherParams defines the input parameters of a Watcher.
type WatcherParams struct { type WatcherParams struct {
@ -156,6 +156,7 @@ func (w *Watcher) addDeployment(ctx context.Context, resourceUID types.UID, podS
w.podSelector = podSelector w.podSelector = podSelector
w.podWatcher = w.podWatcherFunc( w.podWatcher = w.podWatcherFunc(
w.client, w.client,
w.params.Namespace,
w.params.Container, w.params.Container,
w.podSelector, w.podSelector,
w.dst, w.dst,

View File

@ -31,7 +31,7 @@ func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err }
func (m *mockPodWatcher) Close() {} func (m *mockPodWatcher) Close() {}
func mockPodwatcherFunc(err error) logs.PodWatcherFunc { func mockPodwatcherFunc(err error) logs.PodWatcherFunc {
return func(logs.KubernetesClient, string, labels.Selector, io.Writer, *log.Logger) logs.PodWatcherInterface { return func(logs.KubernetesClient, string, string, labels.Selector, io.Writer, *log.Logger) logs.PodWatcherInterface {
return &mockPodWatcher{err: err} return &mockPodWatcher{err: err}
} }
} }