package logs import ( "bufio" "context" "errors" "fmt" "io" "log" "strings" "sync" "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" ) const nl = "\n" // logStream represents the logStream from an individual pod. type logStream struct { podName string container string namespace string done chan struct{} } func newStream(podName, container, namespace string) logStream { return logStream{ podName: podName, container: container, namespace: namespace, done: make(chan struct{}), } } type logError struct { err error logStream logStream recoverable bool } func (re *logError) Error() string { return re.err.Error() } func newLogError(err error, stream logStream) *logError { return &logError{err: err, logStream: stream} } func newRecoverableLogError(err error, stream logStream) *logError { return &logError{err: err, logStream: stream, recoverable: true} } // PodWatcher consumes and merges the logs for a specified set of pods. type PodWatcher struct { client KubernetesClient namespace string container string labelSelector labels.Selector spec map[string]*corev1.Pod status map[string]logStream streamResults chan error dst io.Writer closeChan chan struct{} logger *log.Logger } // NewPodWatcher initializes a new PodWatcher. func NewPodWatcher(client KubernetesClient, namespace string, container string, labelSelector labels.Selector, dst io.Writer, logger *log.Logger) PodWatcherInterface { return &PodWatcher{ client: client, namespace: namespace, container: container, labelSelector: labelSelector, spec: make(map[string]*corev1.Pod), status: make(map[string]logStream), streamResults: make(chan error), dst: dst, closeChan: make(chan struct{}), logger: logger, } } // WatchPods blocks while it watches the pods which match the Selector of the // provided resource. func (pw *PodWatcher) WatchPods(ctx context.Context) error { var wg sync.WaitGroup err := pw.watchPods(ctx, &wg) wg.Wait() pw.logger.Println("[PodWatcher] all goroutines exited, exiting") return err } const tickerInterval = time.Millisecond * 250 // Close terminates the PodWatcher. func (pw *PodWatcher) Close() { close(pw.closeChan) } func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { podsClient := pw.client.Typed.CoreV1().Pods(pw.namespace) // Returns a watcher which notifies of changes in the relevant pods. // We don't defer Stop() on the returned value because the sender is the // Kubernetes SDK, and that may introduce a send-on-closed-channel panic. watcher, err := podsClient.Watch(ctx, metav1.ListOptions{LabelSelector: pw.labelSelector.String()}) if err != nil { return err } ticker := time.NewTicker(tickerInterval) defer ticker.Stop() logLines := make(chan string) logErrorChan := make(chan error) resultChan := watcher.ResultChan() for { select { case <-ctx.Done(): return ctx.Err() case <-pw.closeChan: for _, stream := range pw.status { pw.removePod(stream) } return nil case <-ticker.C: // Iterate through the desired state (w.spec) and launch goroutines to // process the logs of any missing pods. for podName, pod := range pw.spec { if _, ok := pw.status[podName]; !ok { pw.logger.Printf("[PodWatcher] adding pod, name = %s, container = %s, namespace = %s,", pod.Name, pw.container, pod.Namespace) stream := newStream(pod.Name, pw.container, pod.Namespace) pw.status[pod.Name] = stream wg.Add(1) go func() { if err := copyLogStream(ctx, wg, pw.client, stream, logLines); err != nil { logErrorChan <- err } }() } } // For any pods which no longer exist, remove the pod. for podName, stream := range pw.status { if _, ok := pw.spec[podName]; !ok { pw.removePod(stream) } } // logLines is never closed case line := <-logLines: if _, err := pw.dst.Write([]byte(line)); err != nil { return fmt.Errorf("error writing logs: %v", err) } // logErrorChan is never closed case err := <-logErrorChan: var logErr *logError if errors.As(err, &logErr) && logErr.recoverable { // if the error is recoverable, we just remove the pod from the status // map. It will be recreated and retried on the next iteration. pw.removePod(logErr.logStream) } else { return fmt.Errorf("error streaming logs: %w", logErr) } case evt, ok := <-resultChan: if !ok { return nil } switch evt.Type { case watch.Added, watch.Modified: pod := evt.Object.(*corev1.Pod) if pod.Status.Phase == corev1.PodRunning { pw.spec[pod.Name] = pod } case watch.Deleted: pod := evt.Object.(*corev1.Pod) delete(pw.spec, pod.Name) } } } } func (pw *PodWatcher) removePod(stream logStream) { pw.logger.Printf("[PodWatcher] removing pod, name = %s", stream.podName) close(stream.done) delete(pw.status, stream.podName) } func copyLogStream(ctx context.Context, wg *sync.WaitGroup, client KubernetesClient, stream logStream, logsLines chan string) error { defer wg.Done() req := client.Typed.CoreV1().Pods(stream.namespace).GetLogs(stream.podName, &corev1.PodLogOptions{Follow: true, Container: stream.container}) logReader, err := req.Stream(ctx) // If one container is still being created, do not treat this as a fatal error. var statusErr *apierrors.StatusError if errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonBadRequest && strings.Contains(statusErr.Error(), "ContainerCreating") { return newRecoverableLogError(err, stream) } else if err != nil { return newLogError(err, stream) } // Closing the reader ensures that the goroutine below is not leaked. defer func() { _ = logReader.Close() }() errch := make(chan error, 1) go func() { defer close(errch) scanner := bufio.NewScanner(logReader) for scanner.Scan() { logsLines <- "[" + stream.podName + "] " + scanner.Text() + nl } if err := scanner.Err(); err != nil { errch <- newLogError(fmt.Errorf("error scanning logs: %v", err), stream) return } }() for { select { case err := <-errch: return err case <-stream.done: return nil case <-ctx.Done(): return ctx.Err() } } }