diff --git a/logs/pod_watcher.go b/logs/pod_watcher.go index 6a21ecd..43caa32 100644 --- a/logs/pod_watcher.go +++ b/logs/pod_watcher.go @@ -100,7 +100,7 @@ const tickerInterval = time.Millisecond * 250 // Close terminates the PodWatcher. func (pw *PodWatcher) Close() { - pw.closeChan <- struct{}{} + close(pw.closeChan) } func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { diff --git a/logs/watcher.go b/logs/watcher.go index aa1d2c2..f89446b 100644 --- a/logs/watcher.go +++ b/logs/watcher.go @@ -54,6 +54,7 @@ type Watcher struct { podSelector labels.Selector podWatcher PodWatcherInterface podWatcherFunc PodWatcherFunc + closeChan chan struct{} errChan chan error dst io.Writer logger *log.Logger @@ -65,14 +66,24 @@ func NewWatcher(params WatcherParams, client KubernetesClient, podWatcherFunc Po params: params, client: client, podWatcherFunc: podWatcherFunc, + closeChan: make(chan struct{}), errChan: make(chan error), dst: dst, logger: logger, } } +// Close blocks until any actively monitored resource has been released, and +// exits. +func (w *Watcher) Close() { + close(w.closeChan) +} + // Watch watches a deployment. func (w *Watcher) Watch(ctx context.Context) error { + // ensure any actively monitored resource is cleaned up on exit. + defer w.removeDeployment() + ns := w.params.Namespace if ns == "" { ns = corev1.NamespaceDefault @@ -127,6 +138,8 @@ func (w *Watcher) Watch(ctx context.Context) error { // errChan is never closed. case err := <-w.errChan: return err + case <-w.closeChan: + return nil case <-ctx.Done(): return ctx.Err() }