Refactor stream error handling

This commit is contained in:
Rob Watson 2022-06-02 19:23:41 +02:00
parent 2ea2ebe836
commit 7e74fb6f08
1 changed files with 17 additions and 10 deletions

View File

@ -30,6 +30,14 @@ func (re *streamError) Error() string {
return re.err.Error() return re.err.Error()
} }
func newStreamError(err error, podName string) *streamError {
return &streamError{err: err, podName: podName}
}
func newRecoverableError(err error, podName string) *streamError {
return &streamError{err: err, podName: podName, recoverable: true}
}
// PodWatcher consumes and merges the logs for a specified set of pods. // PodWatcher consumes and merges the logs for a specified set of pods.
type PodWatcher struct { type PodWatcher struct {
clientset KubernetesClient clientset KubernetesClient
@ -91,7 +99,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error {
defer ticker.Stop() defer ticker.Stop()
// streamErrors is never closed. // streamErrors is never closed.
streamErrors := make(chan error) streamErrors := make(chan *streamError)
for { for {
select { select {
@ -126,14 +134,13 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error {
} }
} }
case err := <-streamErrors: case streamErr := <-streamErrors:
var streamErr *streamError if streamErr.recoverable {
if errors.As(err, &streamErr) && streamErr.recoverable {
// if the error is recoverable, we just remove the pod from the status // if the error is recoverable, we just remove the pod from the status
// map. It will be recreated and retried on the next iteration. // map. It will be recreated and retried on the next iteration.
pw.removePod(streamErr.podName) pw.removePod(streamErr.podName)
} else { } else {
return streamErr return fmt.Errorf("error streaming logs: %w", streamErr)
} }
case evt := <-watcher.ResultChan(): case evt := <-watcher.ResultChan():
@ -156,7 +163,7 @@ func (pw *PodWatcher) removePod(podName string) {
delete(pw.status, podName) delete(pw.status, podName)
} }
func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, clientset KubernetesClient, p *corev1.Pod, container string, dst io.Writer) error { func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, clientset KubernetesClient, p *corev1.Pod, container string, dst io.Writer) *streamError {
defer wg.Done() defer wg.Done()
podLogOpts := corev1.PodLogOptions{ podLogOpts := corev1.PodLogOptions{
@ -170,9 +177,9 @@ func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, clientset KubernetesCl
// We try to verify the error type as strictly as possible. // We try to verify the error type as strictly as possible.
var statusErr *apierrors.StatusError var statusErr *apierrors.StatusError
if errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonBadRequest && strings.Contains(statusErr.Error(), "ContainerCreating") { if errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonBadRequest && strings.Contains(statusErr.Error(), "ContainerCreating") {
return &streamError{err: err, podName: p.Name, recoverable: true} return newRecoverableError(err, p.Name)
} else if err != nil { } else if err != nil {
return &streamError{err: err, podName: p.Name} return newStreamError(err, p.Name)
} }
defer func() { _ = logs.Close() }() defer func() { _ = logs.Close() }()
@ -180,11 +187,11 @@ func copyPodLogs(ctx context.Context, wg *sync.WaitGroup, clientset KubernetesCl
scanner := bufio.NewScanner(logs) scanner := bufio.NewScanner(logs)
for scanner.Scan() { for scanner.Scan() {
if _, err = dst.Write([]byte("[" + p.Name + "] " + scanner.Text() + nl)); err != nil { if _, err = dst.Write([]byte("[" + p.Name + "] " + scanner.Text() + nl)); err != nil {
return &streamError{err: fmt.Errorf("error writing logs: %v", err), podName: p.Name} return newStreamError(fmt.Errorf("error writing logs: %v", err), p.Name)
} }
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
return &streamError{err: fmt.Errorf("error scanning logs: %v", err), podName: p.Name} return newStreamError(fmt.Errorf("error scanning logs: %v", err), p.Name)
} }
return nil return nil
} }