package logs import ( "bufio" "context" "errors" "fmt" "io" "log" "strings" "sync" "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" ) const nl = "\n" type stream struct { podName string namespace string done chan struct{} } func newStream(podName, namespace string) stream { return stream{podName: podName, namespace: namespace, done: make(chan struct{})} } func (s stream) close() { close(s.done) } type streamError struct { err error stream stream recoverable bool } func (re *streamError) Error() string { return re.err.Error() } func newStreamError(err error, stream stream) *streamError { return &streamError{err: err, stream: stream} } func newRecoverableError(err error, stream stream) *streamError { return &streamError{err: err, stream: stream, recoverable: true} } // PodWatcher consumes and merges the logs for a specified set of pods. type PodWatcher struct { client KubernetesClient namespace string container string labelSelector labels.Selector spec map[string]*corev1.Pod status map[string]stream streamResults chan error dst io.Writer closeChan chan struct{} logger *log.Logger } // NewPodWatcher initializes a new PodWatcher. func NewPodWatcher(client KubernetesClient, namespace string, container string, labelSelector labels.Selector, dst io.Writer, logger *log.Logger) PodWatcherInterface { return &PodWatcher{ client: client, namespace: namespace, container: container, labelSelector: labelSelector, spec: make(map[string]*corev1.Pod), status: make(map[string]stream), streamResults: make(chan error), dst: dst, closeChan: make(chan struct{}), logger: logger, } } // WatchPods blocks while it watches the pods which match the Selector of the // provided resource. func (pw *PodWatcher) WatchPods(ctx context.Context) error { var wg sync.WaitGroup err := pw.watchPods(ctx, &wg) wg.Wait() pw.logger.Println("[PodWatcher] all goroutines exited, exiting") return err } const tickerInterval = time.Millisecond * 250 // Close terminates the PodWatcher. func (pw *PodWatcher) Close() { close(pw.closeChan) } func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { podsClient := pw.client.Typed.CoreV1().Pods(pw.namespace) // Returns a watcher which notifies of changes in the relevant pods. // We don't defer Stop() on the returned value because the sender is the // Kubernetes SDK, and that may introduce a send-on-closed-channel panic. watcher, err := podsClient.Watch(ctx, metav1.ListOptions{LabelSelector: pw.labelSelector.String()}) if err != nil { return err } ticker := time.NewTicker(tickerInterval) defer ticker.Stop() logStream := make(chan string) streamErrors := make(chan error) resultChan := watcher.ResultChan() for { select { case <-ctx.Done(): return ctx.Err() case <-pw.closeChan: for _, stream := range pw.status { stream.close() } return nil case <-ticker.C: // Iterate through the desired state (w.spec) and launch goroutines to // process the logs of any missing pods. for podName, pod := range pw.spec { if _, ok := pw.status[podName]; !ok { pw.logger.Printf("[PodWatcher] adding pod, name = %s", pod.Name) s := newStream(pod.Name, pod.Namespace) pw.status[pod.Name] = s wg.Add(1) go func() { if err := copyPodLogs(ctx, wg, pw.client, s, pw.container, logStream); err != nil { streamErrors <- err } }() } } // For any pods which no longer exist, remove the pod. for podName, stream := range pw.status { if _, ok := pw.spec[podName]; !ok { pw.removePod(stream) } } // logStream is never closed case l := <-logStream: if _, err := pw.dst.Write([]byte(l)); err != nil { return fmt.Errorf("error writing logs: %v", err) } // streamErrors is never closed case err := <-streamErrors: var streamErr *streamError if errors.As(err, &streamErr) && streamErr.recoverable { // if the error is recoverable, we just remove the pod from the status // map. It will be recreated and retried on the next iteration. pw.removePod(streamErr.stream) } else { return fmt.Errorf("error streaming logs: %w", streamErr) } case evt, ok := <-resultChan: if !ok { return nil } switch evt.Type { case watch.Added, watch.Modified: pod := evt.Object.(*corev1.Pod) if pod.Status.Phase == corev1.PodRunning { pw.spec[pod.Name] = pod } case watch.Deleted: pod := evt.Object.(*corev1.Pod) delete(pw.spec, pod.Name) } } } } func (pw *PodWatcher) removePod(stream stream) { pw.logger.Printf("[PodWatcher] removing pod, name = %s", stream.podName) stream.close() delete(pw.status, stream.podName) } func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, client KubernetesClient, stream stream, container string, logStream chan string) error { defer wg.Done() req := client.Typed.CoreV1().Pods(stream.namespace).GetLogs(stream.podName, &corev1.PodLogOptions{Follow: true, Container: container}) logs, err := req.Stream(ctx) // If one container is still being created, do not treat this as a fatal error. var statusErr *apierrors.StatusError if errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonBadRequest && strings.Contains(statusErr.Error(), "ContainerCreating") { return newRecoverableError(err, stream) } else if err != nil { return newStreamError(err, stream) } // Closing the reader ensures that the goroutine below is not leaked. defer func() { _ = logs.Close() }() done := make(chan error, 1) go func() { scanner := bufio.NewScanner(logs) for scanner.Scan() { logStream <- "[" + stream.podName + "] " + scanner.Text() + nl } if err := scanner.Err(); err != nil { done <- newStreamError(fmt.Errorf("error scanning logs: %v", err), stream) return } done <- nil }() for { select { case err := <-done: return err case <-stream.done: return nil case <-ctx.Done(): return ctx.Err() } } }