package logs import ( "bufio" "context" "fmt" "io" "sync" corev1 "k8s.io/api/core/v1" ) // concurrentWriter implements io.Writer. type concurrentWriter struct { w io.Writer mu sync.Mutex } // NewConcurrentWriter returns an io.Writer which can be safely written to from // multiple goroutines without further synchronization. func NewConcurrentWriter(w io.Writer) io.Writer { return &concurrentWriter{w: w} } // Write implements io.Writer. func (cw *concurrentWriter) Write(p []byte) (int, error) { cw.mu.Lock() defer cw.mu.Unlock() return cw.w.Write(p) } // Stream represents the logstream from an individual pod. type Stream struct { clientset KubernetesClient pod *corev1.Pod dst io.Writer } // NewStream initializes a new Stream. func NewStream(clientset KubernetesClient, pod *corev1.Pod, w io.Writer) *Stream { return &Stream{ clientset: clientset, pod: pod, dst: w, } } const nl = "\n" // Copy blocks while reading logs from the pod and writing them to the provided // writer. func (ls *Stream) Copy(ctx context.Context) error { podLogOpts := corev1.PodLogOptions{Follow: true} req := ls.clientset.CoreV1().Pods(ls.pod.Namespace).GetLogs(ls.pod.Name, &podLogOpts) logs, err := req.Stream(ctx) if err != nil { return fmt.Errorf("stream error: err = %v", err) } defer func() { _ = logs.Close() }() scanner := bufio.NewScanner(logs) for scanner.Scan() { if _, err = ls.dst.Write([]byte("[" + ls.pod.Name + "] " + scanner.Text() + nl)); err != nil { return fmt.Errorf("error writing: %v", err) } } if err := scanner.Err(); err != nil { return fmt.Errorf("scanner error: %v", err) } return nil }