package logs import ( "bufio" "context" "errors" "fmt" "io" "log" "strings" "sync" "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" ) const nl = "\n" type streamError struct { err error podName string recoverable bool } func (re *streamError) Error() string { return re.err.Error() } func newStreamError(err error, podName string) *streamError { return &streamError{err: err, podName: podName} } func newRecoverableError(err error, podName string) *streamError { return &streamError{err: err, podName: podName, recoverable: true} } // PodWatcher consumes and merges the logs for a specified set of pods. type PodWatcher struct { client KubernetesClient container string labelSelector labels.Selector spec map[string]*corev1.Pod status map[string]bool streamResults chan error dst io.Writer closeChan chan struct{} } // NewPodWatcher initializes a new PodWatcher. func NewPodWatcher(client KubernetesClient, container string, labelSelector labels.Selector, dst io.Writer) PodWatcherInterface { return &PodWatcher{ client: client, container: container, labelSelector: labelSelector, spec: make(map[string]*corev1.Pod), status: make(map[string]bool), streamResults: make(chan error), dst: dst, closeChan: make(chan struct{}), } } // WatchPods blocks while it watches the pods which match the Selector of the // provided deployment. // // Cancelling ctx will result in the event loop exiting immediately. func (pw *PodWatcher) WatchPods(ctx context.Context) error { var wg sync.WaitGroup err := pw.watchPods(ctx, &wg) wg.Wait() log.Println("[PodWatcher] all goroutines exited, exiting") return err } const tickerInterval = time.Millisecond * 250 // Close terminates the PodWatcher. func (pw *PodWatcher) Close() { pw.closeChan <- struct{}{} } func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { podsClient := pw.client.Typed.CoreV1().Pods(corev1.NamespaceDefault) // Returns a watcher which notifies of changes in the relevant pods. // We don't defer Stop() on the returned value because the sender is the // Kubernetes SDK, and that may introduce a send-on-closed-channel panic. watcher, err := podsClient.Watch(ctx, metav1.ListOptions{LabelSelector: pw.labelSelector.String()}) if err != nil { return err } ticker := time.NewTicker(tickerInterval) defer ticker.Stop() // streamErrors is never closed. streamErrors := make(chan *streamError) resultChan := watcher.ResultChan() for { select { case <-ctx.Done(): return ctx.Err() case <-pw.closeChan: return nil case <-ticker.C: // Iterate through the desired state (w.spec) and launch goroutines to // process the logs of any missing pods. for podName, pod := range pw.spec { pod := pod if _, ok := pw.status[podName]; !ok { log.Printf("[PodWatcher] adding pod, name = %s", pod.Name) pw.status[pod.Name] = true wg.Add(1) go func() { if err := copyPodLogs(ctx, wg, pw.client, pod, pw.container, pw.dst); err != nil { streamErrors <- err } }() } } // For any pods which no longer exist, remove the pod. // TODO: stop the log streaming. for podName := range pw.status { if _, ok := pw.spec[podName]; !ok { pw.removePod(podName) } } case streamErr := <-streamErrors: if streamErr.recoverable { // if the error is recoverable, we just remove the pod from the status // map. It will be recreated and retried on the next iteration. pw.removePod(streamErr.podName) } else { return fmt.Errorf("error streaming logs: %w", streamErr) } case evt, ok := <-resultChan: if !ok { return nil } switch evt.Type { case watch.Added, watch.Modified: pod := evt.Object.(*corev1.Pod) if pod.Status.Phase == corev1.PodRunning { pw.spec[pod.Name] = pod } case watch.Deleted: pod := evt.Object.(*corev1.Pod) delete(pw.spec, pod.Name) } } } } func (pw *PodWatcher) removePod(podName string) { log.Printf("[PodWatcher] removing pod, name = %s", podName) delete(pw.status, podName) } func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, client KubernetesClient, pod *corev1.Pod, container string, dst io.Writer) *streamError { defer wg.Done() podLogOpts := corev1.PodLogOptions{ Follow: true, Container: container, } req := client.Typed.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts) logs, err := req.Stream(ctx) // If one container is still being created, do not treat this as a fatal error. // We try to verify the error type as strictly as possible. var statusErr *apierrors.StatusError if errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonBadRequest && strings.Contains(statusErr.Error(), "ContainerCreating") { return newRecoverableError(err, pod.Name) } else if err != nil { return newStreamError(err, pod.Name) } defer func() { _ = logs.Close() }() scanner := bufio.NewScanner(logs) for scanner.Scan() { if _, err = dst.Write([]byte("[" + pod.Name + "] " + scanner.Text() + nl)); err != nil { return newStreamError(fmt.Errorf("error writing logs: %v", err), pod.Name) } } if err := scanner.Err(); err != nil { return newStreamError(fmt.Errorf("error scanning logs: %v", err), pod.Name) } return nil }