From b880127e726693f4d370a20b15a9a1d47317cc61 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Tue, 21 Jun 2022 20:32:32 +0200 Subject: [PATCH] watcher: implement RetryWatcher --- go.mod | 1 + go.sum | 1 + logs/watcher.go | 20 +++++++++++++------- logs/watcher_test.go | 1 + 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 5c2ec08..815ce66 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect + github.com/google/go-cmp v0.5.5 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/josharian/intern v1.0.0 // indirect diff --git a/go.sum b/go.sum index 097d7e1..f596b58 100644 --- a/go.sum +++ b/go.sum @@ -169,6 +169,7 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= diff --git a/logs/watcher.go b/logs/watcher.go index 3fb3253..c959caa 100644 --- a/logs/watcher.go +++ b/logs/watcher.go @@ -19,6 +19,8 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + toolswatch "k8s.io/client-go/tools/watch" ) // KubernetesClient provides both typed and untyped interfaces to the @@ -101,17 +103,21 @@ func (w *Watcher) Watch(ctx context.Context) error { return err } - opts := metav1.ListOptions{Watch: true, FieldSelector: "metadata.name=" + w.params.Name} - watcher, err := w.client.Untyped.Resource(resourceID).Namespace(ns).Watch(ctx, opts) + watchFunc := func(_ metav1.ListOptions) (watch.Interface, error) { + return w.client.Untyped. + Resource(resourceID). + Namespace(ns). + Watch(ctx, metav1.ListOptions{Watch: true, FieldSelector: "metadata.name=" + w.params.Name}) + } + retryWatcher, err := toolswatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watchFunc}) if err != nil { return err } - defer watcher.Stop() ticker := time.NewTicker(time.Second) defer ticker.Stop() - resultChan := watcher.ResultChan() + resultChan := retryWatcher.ResultChan() for { select { case evt, ok := <-resultChan: @@ -135,7 +141,7 @@ func (w *Watcher) Watch(ctx context.Context) error { case watch.Deleted: w.unwatchResource() case watch.Error: - err := evt.Object.(error) + err := evt.Object.(*metav1.Status) // TODO: remove panic and handle error panic(err) } @@ -167,7 +173,7 @@ func (w *Watcher) watchResource(ctx context.Context, resourceUID types.UID, podS w.unwatchResource() - w.logger.Println("[Watcher] add podWatcher") + w.logger.Printf("[Watcher] add podWatcher, resourceUID = %s", resourceUID) w.resourceUID = resourceUID w.podSelector = podSelector @@ -189,7 +195,7 @@ func (w *Watcher) watchResource(ctx context.Context, resourceUID types.UID, podS func (w *Watcher) unwatchResource() { if w.podWatcher != nil { - w.logger.Println("[Watcher] remove podWatcher") + w.logger.Printf("[Watcher] remove podWatcher, resourceUID = %s", w.resourceUID) w.podWatcher.Close() w.podWatcher = nil } diff --git a/logs/watcher_test.go b/logs/watcher_test.go index afa90c4..6da08c6 100644 --- a/logs/watcher_test.go +++ b/logs/watcher_test.go @@ -47,6 +47,7 @@ func buildDeployment(t *testing.T, name string) *unstructured.Unstructured { deployment.SetName("mydeployment") deployment.SetNamespace("default") deployment.SetUID(types.UID("foo")) + deployment.SetResourceVersion("1") require.NoError(t, unstructured.SetNestedField(deployment.Object, map[string]any{"app": "myapp"}, "spec", "selector", "matchLabels")) return deployment }