kubectl-persistent-logger/logs/watcher.go

209 lines
5.5 KiB
Go

package logs
import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
toolswatch "k8s.io/client-go/tools/watch"
)
// KubernetesClient provides both typed and untyped interfaces to the
// Kubernetes API.
type KubernetesClient struct {
Typed kubernetes.Interface
Untyped dynamic.Interface
}
type PodWatcherInterface interface {
WatchPods(ctx context.Context) error
Close()
}
// PodWatcherFunc builds a PodWatcher.
type PodWatcherFunc func(KubernetesClient, string, string, labels.Selector, io.Writer, *log.Logger) PodWatcherInterface
// WatcherParams defines the input parameters of a Watcher.
type WatcherParams struct {
Name string
Type string
Namespace string
Container string
StrictExist bool
}
// Watcher watches a resource and tails the logs for its currently active
// pods.
type Watcher struct {
params WatcherParams
client KubernetesClient
resourceUID types.UID
podSelector labels.Selector
podWatcher PodWatcherInterface
podWatcherFunc PodWatcherFunc
closeChan chan struct{}
errChan chan error
dst io.Writer
logger *log.Logger
}
// NewWatcher creates a new Watcher.
//
// The resource defined by WatcherParams will be monitored and all associated
// logs will be written to `dst`. Note that this writer should provide its own
// locking mechanisms if needed.
func NewWatcher(params WatcherParams, client KubernetesClient, podWatcherFunc PodWatcherFunc, dst io.Writer, logger *log.Logger) *Watcher {
return &Watcher{
params: params,
client: client,
podWatcherFunc: podWatcherFunc,
closeChan: make(chan struct{}),
errChan: make(chan error),
dst: dst,
logger: logger,
}
}
// Close blocks until any actively monitored resource has been released, and
// exits.
func (w *Watcher) Close() {
close(w.closeChan)
}
// Watch watches a resource.
func (w *Watcher) Watch(ctx context.Context) error {
// ensure any actively monitored resource is cleaned up on exit.
defer w.unwatchResource()
ns := w.params.Namespace
if ns == "" {
ns = corev1.NamespaceDefault
}
// Supported resource types are deployments, statefulsets and replicasets
// (all apps/v1).
resourceID := schema.GroupVersionResource{
Resource: w.params.Type,
Group: "apps",
Version: "v1",
}
if err := w.checkResourceExists(ctx, ns, resourceID); err != nil {
return err
}
watchFunc := func(_ metav1.ListOptions) (watch.Interface, error) {
return w.client.Untyped.
Resource(resourceID).
Namespace(ns).
Watch(ctx, metav1.ListOptions{Watch: true, FieldSelector: "metadata.name=" + w.params.Name})
}
retryWatcher, err := toolswatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watchFunc})
if err != nil {
return err
}
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
resultChan := retryWatcher.ResultChan()
for {
select {
case evt, ok := <-resultChan:
if !ok {
resultChan = nil
continue
}
switch evt.Type {
case watch.Added, watch.Modified:
resource := evt.Object.(*unstructured.Unstructured)
// TODO: handle matchExpressions
selectorAsMap, ok, err := unstructured.NestedStringMap(resource.Object, "spec", "selector", "matchLabels")
if !ok || err != nil {
// matchLabels don't exist or cannot be parsed.
// Should this be fatal?
w.logger.Printf("warning: unable to parse matchLabels: ok = %t, err = %v", ok, err)
continue
}
w.watchResource(ctx, resource.GetUID(), labels.SelectorFromSet(selectorAsMap))
case watch.Deleted:
w.unwatchResource()
case watch.Error:
err := evt.Object.(*metav1.Status)
// TODO: remove panic and handle error
panic(err)
}
// errChan is never closed.
case err := <-w.errChan:
return err
case <-w.closeChan:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
}
func (w *Watcher) checkResourceExists(ctx context.Context, namespace string, resourceID schema.GroupVersionResource) error {
_, err := w.client.Untyped.Resource(resourceID).Namespace(namespace).Get(ctx, w.params.Name, metav1.GetOptions{})
var statusErr *apierrors.StatusError
if !w.params.StrictExist && errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonNotFound {
fmt.Fprintf(os.Stderr, "%s \"%s\" does not exist, waiting\n", resourceID.Resource, w.params.Name)
return nil
}
return err
}
func (w *Watcher) watchResource(ctx context.Context, resourceUID types.UID, podSelector labels.Selector) {
if w.resourceUID == resourceUID {
return
}
w.unwatchResource()
w.logger.Printf("[Watcher] add podWatcher, resourceUID = %s", resourceUID)
w.resourceUID = resourceUID
w.podSelector = podSelector
w.podWatcher = w.podWatcherFunc(
w.client,
w.params.Namespace,
w.params.Container,
w.podSelector,
w.dst,
w.logger,
)
go func() {
if err := w.podWatcher.WatchPods(ctx); err != nil {
w.errChan <- err
}
}()
}
func (w *Watcher) unwatchResource() {
if w.podWatcher != nil {
w.logger.Printf("[Watcher] remove podWatcher, resourceUID = %s", w.resourceUID)
w.podWatcher.Close()
w.podWatcher = nil
}
w.resourceUID = ""
w.podSelector = nil
}