kubectl-persistent-logger/logs/watcher.go

181 lines
4.6 KiB
Go
Raw Normal View History

2022-05-29 19:04:02 +00:00
package logs
import (
"context"
2022-05-31 04:11:44 +00:00
"errors"
2022-06-10 17:04:52 +00:00
"fmt"
2022-05-29 19:04:02 +00:00
"io"
"log"
2022-06-10 17:04:52 +00:00
"os"
2022-05-29 19:04:02 +00:00
"time"
corev1 "k8s.io/api/core/v1"
2022-06-01 17:19:55 +00:00
apierrors "k8s.io/apimachinery/pkg/api/errors"
2022-05-29 19:04:02 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
2022-05-29 19:04:02 +00:00
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
2022-05-29 19:04:02 +00:00
"k8s.io/client-go/kubernetes"
)
// KubernetesClient provides both typed and untyped interfaces to the
// Kubernetes API.
2022-05-30 10:10:58 +00:00
type KubernetesClient struct {
Typed kubernetes.Interface
Untyped dynamic.Interface
2022-05-30 10:10:58 +00:00
}
2022-06-01 17:19:55 +00:00
type PodWatcherInterface interface {
WatchPods(ctx context.Context) error
Close()
2022-05-31 04:11:44 +00:00
}
2022-06-01 17:19:55 +00:00
// PodWatcherFunc builds a PodWatcher.
2022-06-10 17:04:52 +00:00
type PodWatcherFunc func(KubernetesClient, string, labels.Selector, io.Writer, *log.Logger) PodWatcherInterface
// WatcherParams defines the input parameters of a Watcher.
type WatcherParams struct {
Name string
Type string
Namespace string
Container string
StrictExist bool
}
2022-05-31 04:11:44 +00:00
2022-05-29 19:04:02 +00:00
// Watcher watches a deployment and tails the logs for its currently active
// pods.
type Watcher struct {
params WatcherParams
client KubernetesClient
resourceUID types.UID
podSelector labels.Selector
2022-06-01 20:04:20 +00:00
podWatcher PodWatcherInterface
podWatcherFunc PodWatcherFunc
errChan chan error
dst io.Writer
2022-06-10 17:04:52 +00:00
logger *log.Logger
2022-05-29 19:04:02 +00:00
}
// NewWatcher creates a new Watcher.
2022-06-10 17:04:52 +00:00
func NewWatcher(params WatcherParams, client KubernetesClient, podWatcherFunc PodWatcherFunc, dst io.Writer, logger *log.Logger) *Watcher {
2022-05-29 19:04:02 +00:00
return &Watcher{
params: params,
client: client,
2022-06-01 20:04:20 +00:00
podWatcherFunc: podWatcherFunc,
errChan: make(chan error),
dst: dst,
2022-06-10 17:04:52 +00:00
logger: logger,
2022-05-29 19:04:02 +00:00
}
}
// Watch watches a deployment.
func (w *Watcher) Watch(ctx context.Context) error {
ns := w.params.Namespace
if ns == "" {
ns = corev1.NamespaceDefault
}
2022-05-29 19:04:02 +00:00
// Supported resource types are deployments, statefulsets and replicasets
// (all apps/v1).
resourceID := schema.GroupVersionResource{
Resource: w.params.Type,
Group: "apps",
Version: "v1",
}
if err := w.checkResourceExists(ctx, ns, resourceID); err != nil {
return err
2022-05-29 19:04:02 +00:00
}
2022-06-01 17:19:55 +00:00
opts := metav1.ListOptions{Watch: true, FieldSelector: "metadata.name=" + w.params.Name}
watcher, err := w.client.Untyped.Resource(resourceID).Namespace(ns).Watch(ctx, opts)
2022-05-29 19:04:02 +00:00
if err != nil {
return err
}
defer watcher.Stop()
2022-05-29 19:04:02 +00:00
2022-06-01 17:19:55 +00:00
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
2022-05-29 19:04:02 +00:00
resultChan := watcher.ResultChan()
2022-05-29 19:04:02 +00:00
for {
select {
2022-06-04 00:27:20 +00:00
case evt, ok := <-resultChan:
if !ok {
resultChan = nil
continue
}
2022-05-29 19:04:02 +00:00
switch evt.Type {
case watch.Added, watch.Modified:
resource := evt.Object.(*unstructured.Unstructured)
// TODO: handle matchExpressions
selectorAsMap, ok, err := unstructured.NestedStringMap(resource.Object, "spec", "selector", "matchLabels")
if !ok || err != nil {
// matchLabels don't exist or cannot be parsed.
// Should this be fatal?
2022-06-10 17:04:52 +00:00
w.logger.Printf("warning: unable to parse matchLabels: ok = %t, err = %v", ok, err)
continue
}
2022-06-11 05:20:28 +00:00
w.addDeployment(ctx, resource.GetUID(), labels.SelectorFromSet(selectorAsMap))
2022-05-29 19:04:02 +00:00
case watch.Deleted:
2022-06-01 17:19:55 +00:00
w.removeDeployment()
2022-05-29 19:04:02 +00:00
}
2022-06-11 05:20:28 +00:00
// errChan is never closed.
2022-06-01 17:19:55 +00:00
case err := <-w.errChan:
return err
case <-ctx.Done():
return ctx.Err()
2022-05-29 19:04:02 +00:00
}
}
}
func (w *Watcher) checkResourceExists(ctx context.Context, namespace string, resourceID schema.GroupVersionResource) error {
_, err := w.client.Untyped.Resource(resourceID).Namespace(namespace).Get(ctx, w.params.Name, metav1.GetOptions{})
var statusErr *apierrors.StatusError
if !w.params.StrictExist && errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonNotFound {
2022-06-10 17:04:52 +00:00
fmt.Fprintf(os.Stderr, "%s \"%s\" does not exist, waiting\n", resourceID.Resource, w.params.Name)
return nil
}
return err
}
func (w *Watcher) addDeployment(ctx context.Context, resourceUID types.UID, podSelector labels.Selector) {
if w.resourceUID == resourceUID {
2022-06-01 17:19:55 +00:00
return
2022-05-31 04:11:44 +00:00
}
2022-06-01 17:19:55 +00:00
w.removeDeployment()
2022-05-31 04:11:44 +00:00
2022-06-10 17:04:52 +00:00
w.logger.Println("[DeploymentWatcher] add podWatcher")
2022-05-31 04:11:44 +00:00
w.resourceUID = resourceUID
w.podSelector = podSelector
2022-06-01 17:19:55 +00:00
w.podWatcher = w.podWatcherFunc(
w.client,
w.params.Container,
w.podSelector,
2022-06-01 17:19:55 +00:00
w.dst,
2022-06-10 17:04:52 +00:00
w.logger,
2022-06-01 17:19:55 +00:00
)
2022-05-31 04:11:44 +00:00
2022-06-01 17:19:55 +00:00
go func() {
if err := w.podWatcher.WatchPods(ctx); err != nil {
w.errChan <- err
}
}()
2022-05-29 19:04:02 +00:00
}
2022-06-01 17:19:55 +00:00
func (w *Watcher) removeDeployment() {
if w.podWatcher != nil {
2022-06-10 17:04:52 +00:00
w.logger.Println("[DeploymentWatcher] remove podWatcher")
2022-06-01 17:19:55 +00:00
w.podWatcher.Close()
w.podWatcher = nil
}
w.resourceUID = ""
w.podSelector = nil
2022-05-29 19:04:02 +00:00
}