watcher: implement RetryWatcher
continuous-integration/drone/push Build is failing Details

This commit is contained in:
Rob Watson 2022-06-21 20:32:32 +02:00
parent d1b3cd1f96
commit b880127e72
4 changed files with 16 additions and 7 deletions

1
go.mod
View File

@ -23,6 +23,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/google/gofuzz v1.1.0 // indirect github.com/google/gofuzz v1.1.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect github.com/imdario/mergo v0.3.12 // indirect
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect

1
go.sum
View File

@ -169,6 +169,7 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=

View File

@ -19,6 +19,8 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
toolswatch "k8s.io/client-go/tools/watch"
) )
// KubernetesClient provides both typed and untyped interfaces to the // KubernetesClient provides both typed and untyped interfaces to the
@ -101,17 +103,21 @@ func (w *Watcher) Watch(ctx context.Context) error {
return err return err
} }
opts := metav1.ListOptions{Watch: true, FieldSelector: "metadata.name=" + w.params.Name} watchFunc := func(_ metav1.ListOptions) (watch.Interface, error) {
watcher, err := w.client.Untyped.Resource(resourceID).Namespace(ns).Watch(ctx, opts) return w.client.Untyped.
Resource(resourceID).
Namespace(ns).
Watch(ctx, metav1.ListOptions{Watch: true, FieldSelector: "metadata.name=" + w.params.Name})
}
retryWatcher, err := toolswatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watchFunc})
if err != nil { if err != nil {
return err return err
} }
defer watcher.Stop()
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(time.Second)
defer ticker.Stop() defer ticker.Stop()
resultChan := watcher.ResultChan() resultChan := retryWatcher.ResultChan()
for { for {
select { select {
case evt, ok := <-resultChan: case evt, ok := <-resultChan:
@ -135,7 +141,7 @@ func (w *Watcher) Watch(ctx context.Context) error {
case watch.Deleted: case watch.Deleted:
w.unwatchResource() w.unwatchResource()
case watch.Error: case watch.Error:
err := evt.Object.(error) err := evt.Object.(*metav1.Status)
// TODO: remove panic and handle error // TODO: remove panic and handle error
panic(err) panic(err)
} }
@ -167,7 +173,7 @@ func (w *Watcher) watchResource(ctx context.Context, resourceUID types.UID, podS
w.unwatchResource() w.unwatchResource()
w.logger.Println("[Watcher] add podWatcher") w.logger.Printf("[Watcher] add podWatcher, resourceUID = %s", resourceUID)
w.resourceUID = resourceUID w.resourceUID = resourceUID
w.podSelector = podSelector w.podSelector = podSelector
@ -189,7 +195,7 @@ func (w *Watcher) watchResource(ctx context.Context, resourceUID types.UID, podS
func (w *Watcher) unwatchResource() { func (w *Watcher) unwatchResource() {
if w.podWatcher != nil { if w.podWatcher != nil {
w.logger.Println("[Watcher] remove podWatcher") w.logger.Printf("[Watcher] remove podWatcher, resourceUID = %s", w.resourceUID)
w.podWatcher.Close() w.podWatcher.Close()
w.podWatcher = nil w.podWatcher = nil
} }

View File

@ -47,6 +47,7 @@ func buildDeployment(t *testing.T, name string) *unstructured.Unstructured {
deployment.SetName("mydeployment") deployment.SetName("mydeployment")
deployment.SetNamespace("default") deployment.SetNamespace("default")
deployment.SetUID(types.UID("foo")) deployment.SetUID(types.UID("foo"))
deployment.SetResourceVersion("1")
require.NoError(t, unstructured.SetNestedField(deployment.Object, map[string]any{"app": "myapp"}, "spec", "selector", "matchLabels")) require.NoError(t, unstructured.SetNestedField(deployment.Object, map[string]any{"app": "myapp"}, "spec", "selector", "matchLabels"))
return deployment return deployment
} }