kubectl-persistent-logger/logs/watcher.go

209 lines
5.5 KiB
Go
Raw Normal View History

2022-05-29 19:04:02 +00:00
package logs
import (
"context"
2022-05-31 04:11:44 +00:00
"errors"
2022-06-10 17:04:52 +00:00
"fmt"
2022-05-29 19:04:02 +00:00
"io"
"log"
2022-06-10 17:04:52 +00:00
"os"
2022-05-29 19:04:02 +00:00
"time"
corev1 "k8s.io/api/core/v1"
2022-06-01 17:19:55 +00:00
apierrors "k8s.io/apimachinery/pkg/api/errors"
2022-05-29 19:04:02 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
2022-05-29 19:04:02 +00:00
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
2022-05-29 19:04:02 +00:00
"k8s.io/client-go/kubernetes"
2022-06-21 18:32:32 +00:00
"k8s.io/client-go/tools/cache"
toolswatch "k8s.io/client-go/tools/watch"
2022-05-29 19:04:02 +00:00
)
// KubernetesClient provides both typed and untyped interfaces to the
// Kubernetes API.
2022-05-30 10:10:58 +00:00
type KubernetesClient struct {
Typed kubernetes.Interface
Untyped dynamic.Interface
2022-05-30 10:10:58 +00:00
}
2022-06-01 17:19:55 +00:00
type PodWatcherInterface interface {
WatchPods(ctx context.Context) error
Close()
2022-05-31 04:11:44 +00:00
}
2022-06-01 17:19:55 +00:00
// PodWatcherFunc builds a PodWatcher.
2022-06-14 05:47:38 +00:00
type PodWatcherFunc func(KubernetesClient, string, string, labels.Selector, io.Writer, *log.Logger) PodWatcherInterface
// WatcherParams defines the input parameters of a Watcher.
type WatcherParams struct {
Name string
Type string
Namespace string
Container string
StrictExist bool
}
2022-05-31 04:11:44 +00:00
2022-06-14 15:54:10 +00:00
// Watcher watches a resource and tails the logs for its currently active
2022-05-29 19:04:02 +00:00
// pods.
type Watcher struct {
params WatcherParams
client KubernetesClient
resourceUID types.UID
podSelector labels.Selector
2022-06-01 20:04:20 +00:00
podWatcher PodWatcherInterface
podWatcherFunc PodWatcherFunc
closeChan chan struct{}
2022-06-01 20:04:20 +00:00
errChan chan error
dst io.Writer
2022-06-10 17:04:52 +00:00
logger *log.Logger
2022-05-29 19:04:02 +00:00
}
// NewWatcher creates a new Watcher.
//
// The resource defined by WatcherParams will be monitored and all associated
// logs will be written to `dst`. Note that this writer should provide its own
// locking mechanisms if needed.
2022-06-10 17:04:52 +00:00
func NewWatcher(params WatcherParams, client KubernetesClient, podWatcherFunc PodWatcherFunc, dst io.Writer, logger *log.Logger) *Watcher {
2022-05-29 19:04:02 +00:00
return &Watcher{
params: params,
client: client,
2022-06-01 20:04:20 +00:00
podWatcherFunc: podWatcherFunc,
closeChan: make(chan struct{}),
2022-06-01 20:04:20 +00:00
errChan: make(chan error),
dst: dst,
2022-06-10 17:04:52 +00:00
logger: logger,
2022-05-29 19:04:02 +00:00
}
}
// Close blocks until any actively monitored resource has been released, and
// exits.
func (w *Watcher) Close() {
close(w.closeChan)
}
2022-06-14 15:54:10 +00:00
// Watch watches a resource.
2022-05-29 19:04:02 +00:00
func (w *Watcher) Watch(ctx context.Context) error {
// ensure any actively monitored resource is cleaned up on exit.
2022-06-14 15:54:10 +00:00
defer w.unwatchResource()
ns := w.params.Namespace
if ns == "" {
ns = corev1.NamespaceDefault
}
2022-05-29 19:04:02 +00:00
// Supported resource types are deployments, statefulsets and replicasets
// (all apps/v1).
resourceID := schema.GroupVersionResource{
Resource: w.params.Type,
Group: "apps",
Version: "v1",
}
if err := w.checkResourceExists(ctx, ns, resourceID); err != nil {
return err
2022-05-29 19:04:02 +00:00
}
2022-06-01 17:19:55 +00:00
2022-06-21 18:32:32 +00:00
watchFunc := func(_ metav1.ListOptions) (watch.Interface, error) {
return w.client.Untyped.
Resource(resourceID).
Namespace(ns).
Watch(ctx, metav1.ListOptions{Watch: true, FieldSelector: "metadata.name=" + w.params.Name})
}
retryWatcher, err := toolswatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watchFunc})
2022-05-29 19:04:02 +00:00
if err != nil {
return err
}
2022-06-01 17:19:55 +00:00
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
2022-05-29 19:04:02 +00:00
2022-06-21 18:32:32 +00:00
resultChan := retryWatcher.ResultChan()
2022-05-29 19:04:02 +00:00
for {
select {
2022-06-04 00:27:20 +00:00
case evt, ok := <-resultChan:
if !ok {
resultChan = nil
continue
}
2022-05-29 19:04:02 +00:00
switch evt.Type {
case watch.Added, watch.Modified:
resource := evt.Object.(*unstructured.Unstructured)
// TODO: handle matchExpressions
selectorAsMap, ok, err := unstructured.NestedStringMap(resource.Object, "spec", "selector", "matchLabels")
if !ok || err != nil {
// matchLabels don't exist or cannot be parsed.
// Should this be fatal?
2022-06-10 17:04:52 +00:00
w.logger.Printf("warning: unable to parse matchLabels: ok = %t, err = %v", ok, err)
continue
}
2022-06-14 15:54:10 +00:00
w.watchResource(ctx, resource.GetUID(), labels.SelectorFromSet(selectorAsMap))
2022-05-29 19:04:02 +00:00
case watch.Deleted:
2022-06-14 15:54:10 +00:00
w.unwatchResource()
case watch.Error:
2022-06-21 18:32:32 +00:00
err := evt.Object.(*metav1.Status)
// TODO: remove panic and handle error
panic(err)
2022-05-29 19:04:02 +00:00
}
2022-06-11 05:20:28 +00:00
// errChan is never closed.
2022-06-01 17:19:55 +00:00
case err := <-w.errChan:
return err
case <-w.closeChan:
return nil
2022-06-01 17:19:55 +00:00
case <-ctx.Done():
return ctx.Err()
2022-05-29 19:04:02 +00:00
}
}
}
func (w *Watcher) checkResourceExists(ctx context.Context, namespace string, resourceID schema.GroupVersionResource) error {
_, err := w.client.Untyped.Resource(resourceID).Namespace(namespace).Get(ctx, w.params.Name, metav1.GetOptions{})
var statusErr *apierrors.StatusError
if !w.params.StrictExist && errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonNotFound {
2022-06-10 17:04:52 +00:00
fmt.Fprintf(os.Stderr, "%s \"%s\" does not exist, waiting\n", resourceID.Resource, w.params.Name)
return nil
}
return err
}
2022-06-14 15:54:10 +00:00
func (w *Watcher) watchResource(ctx context.Context, resourceUID types.UID, podSelector labels.Selector) {
if w.resourceUID == resourceUID {
2022-06-01 17:19:55 +00:00
return
2022-05-31 04:11:44 +00:00
}
2022-06-14 15:54:10 +00:00
w.unwatchResource()
2022-05-31 04:11:44 +00:00
2022-06-21 18:32:32 +00:00
w.logger.Printf("[Watcher] add podWatcher, resourceUID = %s", resourceUID)
2022-05-31 04:11:44 +00:00
w.resourceUID = resourceUID
w.podSelector = podSelector
2022-06-01 17:19:55 +00:00
w.podWatcher = w.podWatcherFunc(
w.client,
2022-06-14 05:47:38 +00:00
w.params.Namespace,
w.params.Container,
w.podSelector,
2022-06-01 17:19:55 +00:00
w.dst,
2022-06-10 17:04:52 +00:00
w.logger,
2022-06-01 17:19:55 +00:00
)
2022-05-31 04:11:44 +00:00
2022-06-01 17:19:55 +00:00
go func() {
if err := w.podWatcher.WatchPods(ctx); err != nil {
w.errChan <- err
}
}()
2022-05-29 19:04:02 +00:00
}
2022-06-14 15:54:10 +00:00
func (w *Watcher) unwatchResource() {
2022-06-01 17:19:55 +00:00
if w.podWatcher != nil {
2022-06-21 18:32:32 +00:00
w.logger.Printf("[Watcher] remove podWatcher, resourceUID = %s", w.resourceUID)
2022-06-01 17:19:55 +00:00
w.podWatcher.Close()
w.podWatcher = nil
}
w.resourceUID = ""
w.podSelector = nil
2022-05-29 19:04:02 +00:00
}