diff --git a/logs/pod_watcher.go b/logs/pod_watcher.go index d1ff769..20e03bf 100644 --- a/logs/pod_watcher.go +++ b/logs/pod_watcher.go @@ -30,6 +30,14 @@ func (re *streamError) Error() string { return re.err.Error() } +func newStreamError(err error, podName string) *streamError { + return &streamError{err: err, podName: podName} +} + +func newRecoverableError(err error, podName string) *streamError { + return &streamError{err: err, podName: podName, recoverable: true} +} + // PodWatcher consumes and merges the logs for a specified set of pods. type PodWatcher struct { clientset KubernetesClient @@ -91,7 +99,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { defer ticker.Stop() // streamErrors is never closed. - streamErrors := make(chan error) + streamErrors := make(chan *streamError) for { select { @@ -126,14 +134,13 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error { } } - case err := <-streamErrors: - var streamErr *streamError - if errors.As(err, &streamErr) && streamErr.recoverable { + case streamErr := <-streamErrors: + if streamErr.recoverable { // if the error is recoverable, we just remove the pod from the status // map. It will be recreated and retried on the next iteration. pw.removePod(streamErr.podName) } else { - return streamErr + return fmt.Errorf("error streaming logs: %w", streamErr) } case evt := <-watcher.ResultChan(): @@ -156,7 +163,7 @@ func (pw *PodWatcher) removePod(podName string) { delete(pw.status, podName) } -func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, clientset KubernetesClient, p *corev1.Pod, container string, dst io.Writer) error { +func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, clientset KubernetesClient, p *corev1.Pod, container string, dst io.Writer) *streamError { defer wg.Done() podLogOpts := corev1.PodLogOptions{ @@ -170,9 +177,9 @@ func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, clientset KubernetesCl // We try to verify the error type as strictly as possible. var statusErr *apierrors.StatusError if errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonBadRequest && strings.Contains(statusErr.Error(), "ContainerCreating") { - return &streamError{err: err, podName: p.Name, recoverable: true} + return newRecoverableError(err, p.Name) } else if err != nil { - return &streamError{err: err, podName: p.Name} + return newStreamError(err, p.Name) } defer func() { _ = logs.Close() }() @@ -180,11 +187,11 @@ func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, clientset KubernetesCl scanner := bufio.NewScanner(logs) for scanner.Scan() { if _, err = dst.Write([]byte("[" + p.Name + "] " + scanner.Text() + nl)); err != nil { - return &streamError{err: fmt.Errorf("error writing logs: %v", err), podName: p.Name} + return newStreamError(fmt.Errorf("error writing logs: %v", err), p.Name) } } if err := scanner.Err(); err != nil { - return &streamError{err: fmt.Errorf("error scanning logs: %v", err), podName: p.Name} + return newStreamError(fmt.Errorf("error scanning logs: %v", err), p.Name) } return nil }