diff --git a/logs/pod_watcher.go b/logs/pod_watcher.go index a540bd5..40139b3 100644 --- a/logs/pod_watcher.go +++ b/logs/pod_watcher.go @@ -20,36 +20,39 @@ import ( const nl = "\n" -type stream struct { +// logStream represents the logStream from an individual pod. +type logStream struct { podName string + container string namespace string done chan struct{} } -func newStream(podName, namespace string) stream { - return stream{podName: podName, namespace: namespace, done: make(chan struct{})} +func newStream(podName, container, namespace string) logStream { + return logStream{ + podName: podName, + container: container, + namespace: namespace, + done: make(chan struct{}), + } } -func (s stream) close() { - close(s.done) -} - -type streamError struct { +type logError struct { err error - stream stream + logStream logStream recoverable bool } -func (re *streamError) Error() string { +func (re *logError) Error() string { return re.err.Error() } -func newStreamError(err error, stream stream) *streamError { - return &streamError{err: err, stream: stream} +func newLogError(err error, stream logStream) *logError { + return &logError{err: err, logStream: stream} } -func newRecoverableError(err error, stream stream) *streamError { - return &streamError{err: err, stream: stream, recoverable: true} +func newRecoverableLogError(err error, stream logStream) *logError { + return &logError{err: err, logStream: stream, recoverable: true} } // PodWatcher consumes and merges the logs for a specified set of pods. @@ -59,7 +62,7 @@ type PodWatcher struct { container string labelSelector labels.Selector spec map[string]*corev1.Pod - status map[string]stream + status map[string]logStream streamResults chan error dst io.Writer closeChan chan struct{} @@ -74,7 +77,7 @@ func NewPodWatcher(client KubernetesClient, namespace string, container string, container: container, labelSelector: labelSelector, spec: make(map[string]*corev1.Pod), - status: make(map[string]stream), + status: make(map[string]logStream), streamResults: make(chan error), dst: dst, closeChan: make(chan struct{}), @@ -115,8 +118,8 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { ticker := time.NewTicker(tickerInterval) defer ticker.Stop() - logStream := make(chan string) - streamErrors := make(chan error) + logLines := make(chan string) + logErrorChan := make(chan error) resultChan := watcher.ResultChan() for { @@ -126,7 +129,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { case <-pw.closeChan: for _, stream := range pw.status { - stream.close() + pw.removePod(stream) } return nil @@ -135,14 +138,14 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { // process the logs of any missing pods. for podName, pod := range pw.spec { if _, ok := pw.status[podName]; !ok { - pw.logger.Printf("[PodWatcher] adding pod, name = %s", pod.Name) - s := newStream(pod.Name, pod.Namespace) - pw.status[pod.Name] = s + pw.logger.Printf("[PodWatcher] adding pod, name = %s, container = %s, namespace = %s,", pod.Name, pw.container, pod.Namespace) + stream := newStream(pod.Name, pw.container, pod.Namespace) + pw.status[pod.Name] = stream wg.Add(1) go func() { - if err := copyPodLogs(ctx, wg, pw.client, s, pw.container, logStream); err != nil { - streamErrors <- err + if err := copyLogStream(ctx, wg, pw.client, stream, logLines); err != nil { + logErrorChan <- err } }() } @@ -154,27 +157,28 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { } } - // logStream is never closed - case l := <-logStream: - if _, err := pw.dst.Write([]byte(l)); err != nil { + // logLines is never closed + case line := <-logLines: + if _, err := pw.dst.Write([]byte(line)); err != nil { return fmt.Errorf("error writing logs: %v", err) } - // streamErrors is never closed - case err := <-streamErrors: - var streamErr *streamError - if errors.As(err, &streamErr) && streamErr.recoverable { + // logErrorChan is never closed + case err := <-logErrorChan: + var logErr *logError + if errors.As(err, &logErr) && logErr.recoverable { // if the error is recoverable, we just remove the pod from the status // map. It will be recreated and retried on the next iteration. - pw.removePod(streamErr.stream) + pw.removePod(logErr.logStream) } else { - return fmt.Errorf("error streaming logs: %w", streamErr) + return fmt.Errorf("error streaming logs: %w", logErr) } case evt, ok := <-resultChan: if !ok { return nil } + switch evt.Type { case watch.Added, watch.Modified: pod := evt.Object.(*corev1.Pod) @@ -189,40 +193,40 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { } } -func (pw *PodWatcher) removePod(stream stream) { +func (pw *PodWatcher) removePod(stream logStream) { pw.logger.Printf("[PodWatcher] removing pod, name = %s", stream.podName) - stream.close() + close(stream.done) delete(pw.status, stream.podName) } -func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, client KubernetesClient, stream stream, container string, logStream chan string) error { +func copyLogStream(ctx context.Context, wg *sync.WaitGroup, client KubernetesClient, stream logStream, logsLines chan string) error { defer wg.Done() - req := client.Typed.CoreV1().Pods(stream.namespace).GetLogs(stream.podName, &corev1.PodLogOptions{Follow: true, Container: container}) - logs, err := req.Stream(ctx) + req := client.Typed.CoreV1().Pods(stream.namespace).GetLogs(stream.podName, &corev1.PodLogOptions{Follow: true, Container: stream.container}) + logReader, err := req.Stream(ctx) // If one container is still being created, do not treat this as a fatal error. var statusErr *apierrors.StatusError if errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonBadRequest && strings.Contains(statusErr.Error(), "ContainerCreating") { - return newRecoverableError(err, stream) + return newRecoverableLogError(err, stream) } else if err != nil { - return newStreamError(err, stream) + return newLogError(err, stream) } // Closing the reader ensures that the goroutine below is not leaked. - defer func() { _ = logs.Close() }() + defer func() { _ = logReader.Close() }() done := make(chan error, 1) go func() { - scanner := bufio.NewScanner(logs) + defer close(done) + scanner := bufio.NewScanner(logReader) for scanner.Scan() { - logStream <- "[" + stream.podName + "] " + scanner.Text() + nl + logsLines <- "[" + stream.podName + "] " + scanner.Text() + nl } if err := scanner.Err(); err != nil { - done <- newStreamError(fmt.Errorf("error scanning logs: %v", err), stream) + done <- newLogError(fmt.Errorf("error scanning logs: %v", err), stream) return } - done <- nil }() for {