package logs import ( "context" "errors" "io" "log" "sync" "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" ) // KubernetesClient wraps a Kubernetes clientset. type KubernetesClient struct { kubernetes.Interface } // concurrentWriter implements an io.Writer that can be safely written to from // multiple goroutines. type concurrentWriter struct { w io.Writer mu sync.Mutex } // Write implements io.Writer. func (cw *concurrentWriter) Write(p []byte) (int, error) { cw.mu.Lock() defer cw.mu.Unlock() return cw.w.Write(p) } type PodWatcherInterface interface { WatchPods(ctx context.Context) error Close() } // PodWatcherFunc builds a PodWatcher. type PodWatcherFunc func(KubernetesClient, string, *metav1.LabelSelector, io.Writer) PodWatcherInterface // Watcher watches a deployment and tails the logs for its currently active // pods. type Watcher struct { deployName string container string strictExist bool clientset KubernetesClient deployment *appsv1.Deployment podWatcher PodWatcherInterface podWatcherFunc PodWatcherFunc errChan chan error dst *concurrentWriter } // NewWatcher creates a new Watcher. func NewWatcher(deployName string, container string, strictExist bool, clientset KubernetesClient, podWatcherFunc PodWatcherFunc, dst io.Writer) *Watcher { return &Watcher{ deployName: deployName, container: container, strictExist: strictExist, clientset: clientset, podWatcherFunc: podWatcherFunc, errChan: make(chan error), dst: &concurrentWriter{w: dst}, } } // Watch watches a deployment. func (w *Watcher) Watch(ctx context.Context) error { deploymentsClient := w.clientset.AppsV1().Deployments(corev1.NamespaceDefault) // Check if the deployment exists before we commence watching, to allow us to // return an error if needed. _, err := deploymentsClient.Get(ctx, w.deployName, metav1.GetOptions{}) var statusErr *apierrors.StatusError if errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonNotFound { if w.strictExist { return err } log.Printf(`deployment "%s" does not exist, waiting`, w.deployName) } opts := metav1.ListOptions{Watch: true, FieldSelector: "metadata.name=" + w.deployName} deploymentsWatcher, err := deploymentsClient.Watch(ctx, opts) if err != nil { return err } defer deploymentsWatcher.Stop() ticker := time.NewTicker(time.Second) defer ticker.Stop() resultChan := deploymentsWatcher.ResultChan() for { select { case evt, ok := <-resultChan: if !ok { resultChan = nil continue } switch evt.Type { case watch.Added, watch.Modified: deployment := evt.Object.(*appsv1.Deployment) w.addDeployment(ctx, deployment) case watch.Deleted: w.removeDeployment() } case err := <-w.errChan: return err case <-ctx.Done(): return ctx.Err() } } } func (w *Watcher) addDeployment(ctx context.Context, deployment *appsv1.Deployment) { if w.deployment != nil && w.deployment.UID == deployment.UID { return } w.removeDeployment() log.Println("[DeploymentWatcher] add deployment") w.deployment = deployment w.podWatcher = w.podWatcherFunc( w.clientset, w.container, deployment.Spec.Selector, w.dst, ) go func() { if err := w.podWatcher.WatchPods(ctx); err != nil { w.errChan <- err } }() } func (w *Watcher) removeDeployment() { if w.podWatcher != nil { log.Println("[DeploymentWatcher] remove deployment") w.podWatcher.Close() w.podWatcher = nil } w.deployment = nil }