diff --git a/logs/stream.go b/logs/stream.go deleted file mode 100644 index 54adeb1..0000000 --- a/logs/stream.go +++ /dev/null @@ -1,74 +0,0 @@ -package logs - -import ( - "bufio" - "context" - "fmt" - "io" - "sync" - - corev1 "k8s.io/api/core/v1" -) - -// concurrentWriter implements io.Writer. -type concurrentWriter struct { - w io.Writer - mu sync.Mutex -} - -// NewConcurrentWriter returns an io.Writer which can be safely written to from -// multiple goroutines without further synchronization. -func NewConcurrentWriter(w io.Writer) io.Writer { - return &concurrentWriter{w: w} -} - -// Write implements io.Writer. -func (cw *concurrentWriter) Write(p []byte) (int, error) { - cw.mu.Lock() - defer cw.mu.Unlock() - - return cw.w.Write(p) -} - -// Stream represents the logstream from an individual pod. -type Stream struct { - clientset KubernetesClient - pod *corev1.Pod - dst io.Writer -} - -// NewStream initializes a new Stream. -func NewStream(clientset KubernetesClient, pod *corev1.Pod, w io.Writer) *Stream { - return &Stream{ - clientset: clientset, - pod: pod, - dst: w, - } -} - -const nl = "\n" - -// Copy blocks while reading logs from the pod and writing them to the provided -// writer. -func (ls *Stream) Copy(ctx context.Context) error { - podLogOpts := corev1.PodLogOptions{Follow: true} - req := ls.clientset.CoreV1().Pods(ls.pod.Namespace).GetLogs(ls.pod.Name, &podLogOpts) - logs, err := req.Stream(ctx) - if err != nil { - return fmt.Errorf("stream error: err = %v", err) - } - defer func() { _ = logs.Close() }() - - scanner := bufio.NewScanner(logs) - for scanner.Scan() { - if _, err = ls.dst.Write([]byte("[" + ls.pod.Name + "] " + scanner.Text() + nl)); err != nil { - return fmt.Errorf("error writing: %v", err) - } - } - - if err := scanner.Err(); err != nil { - return fmt.Errorf("scanner error: %v", err) - } - - return nil -} diff --git a/logs/stream_test.go b/logs/stream_test.go deleted file mode 100644 index 62f6478..0000000 --- a/logs/stream_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package logs_test - -import ( - "bytes" - "context" - "testing" - - "git.netflux.io/rob/kubectl-persistent-logger/logs" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - testclient "k8s.io/client-go/kubernetes/fake" -) - -func TestStream(t *testing.T) { - client := logs.KubernetesClient{Interface: testclient.NewSimpleClientset()} - - var buf bytes.Buffer - pod := corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}} - stream := logs.NewStream(client, &pod, &buf) - - err := stream.Copy(context.Background()) - require.NoError(t, err) - assert.Equal(t, "[foo] fake logs\n", buf.String()) -} diff --git a/logs/watcher.go b/logs/watcher.go index 7f24b4f..7726e0f 100644 --- a/logs/watcher.go +++ b/logs/watcher.go @@ -1,9 +1,14 @@ package logs import ( + "bufio" "context" + "errors" + "fmt" "io" "log" + "strings" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -13,28 +18,65 @@ import ( "k8s.io/client-go/kubernetes" ) +// KubernetesClient wraps a Kubernetes clientset. type KubernetesClient struct { kubernetes.Interface } +// concurrentWriter implements io.Writer. +type concurrentWriter struct { + w io.Writer + mu sync.Mutex +} + +// NewConcurrentWriter returns an io.Writer which can be safely written to from +// multiple goroutines without further synchronization. +func NewConcurrentWriter(w io.Writer) io.Writer { + return &concurrentWriter{w: w} +} + +// Write implements io.Writer. +func (cw *concurrentWriter) Write(p []byte) (int, error) { + cw.mu.Lock() + defer cw.mu.Unlock() + + return cw.w.Write(p) +} + +const nl = "\n" + +type streamError struct { + err error + podName string + recoverable bool +} + +func (re *streamError) Error() string { + return re.err.Error() +} + // Watcher watches a deployment and tails the logs for its currently active // pods. type Watcher struct { - deployName string - clientset KubernetesClient - spec map[string]*corev1.Pod - status map[string]*Stream - dst io.Writer + deployName string + container string + clientset KubernetesClient + spec map[string]*corev1.Pod + status map[string]bool + streamResults chan error + dst io.Writer } // NewWatcher creates a new Watcher. -func NewWatcher(deployName string, clientset KubernetesClient, dst io.Writer) *Watcher { +func NewWatcher(deployName string, container string, clientset KubernetesClient, dst io.Writer) *Watcher { return &Watcher{ - deployName: deployName, - clientset: clientset, - spec: make(map[string]*corev1.Pod), - status: make(map[string]*Stream), - dst: dst, + deployName: deployName, + container: container, + clientset: clientset, + spec: make(map[string]*corev1.Pod), + status: make(map[string]bool), + streamResults: make(chan error), + dst: dst, } } @@ -63,10 +105,8 @@ func (w *Watcher) Watch(ctx context.Context) error { } defer watcher.Stop() - // channel to receive pod names which experience unexpected errors in the - // logging flow. They will be removed from the state and and then recreated - // if the pod still exists. - podsWithErrors := make(chan string) + // streamErrors is never closed. + streamErrors := make(chan error) for { select { @@ -77,8 +117,15 @@ func (w *Watcher) Watch(ctx context.Context) error { // Iterate through the desired state (w.spec) and launch goroutines to // process the logs of any missing pods. for podName, pod := range w.spec { + pod := pod if _, ok := w.status[podName]; !ok { - w.addPod(ctx, pod, dst, podsWithErrors) + log.Printf("adding pod, name = %s", pod.Name) + w.status[pod.Name] = true + go func() { + if err := copyPodLogs(ctx, w.clientset, pod, w.container, dst); err != nil { + streamErrors <- err + } + }() } } // For any pods which no longer exist, remove the pod. @@ -90,8 +137,15 @@ func (w *Watcher) Watch(ctx context.Context) error { } } - case podName := <-podsWithErrors: - w.removePod(podName) + case err := <-streamErrors: + var streamErr *streamError + if errors.As(err, &streamErr) && streamErr.recoverable { + // if the error is recoverable, we just remove the pod from the status + // map. It will be recreated and retried on the next iteration. + w.removePod(streamErr.podName) + } else { + return streamErr + } case evt := <-watcher.ResultChan(): switch evt.Type { @@ -101,29 +155,46 @@ func (w *Watcher) Watch(ctx context.Context) error { if pod.Status.Phase == corev1.PodRunning { w.spec[pod.Name] = pod } - case watch.Deleted: pod := evt.Object.(*corev1.Pod) delete(w.spec, pod.Name) log.Printf("event rcvd, type = DELETED, pod name = %s", pod.Name) - case watch.Error: - // TODO: error handling - log.Fatalf("event rcvd, type = ERROR, object = %+v", evt.Object) } } } } -func (w *Watcher) addPod(ctx context.Context, p *corev1.Pod, dst io.Writer, errChan chan string) { - log.Printf("adding pod, name = %s", p.Name) - ls := NewStream(w.clientset, p, dst) - w.status[p.Name] = ls - go func() { - if err := ls.Copy(ctx); err != nil { - log.Printf("error copying logs: %v", err) - errChan <- p.Name +func copyPodLogs(ctx context.Context, clientset KubernetesClient, p *corev1.Pod, container string, dst io.Writer) error { + podLogOpts := corev1.PodLogOptions{ + Follow: true, + Container: container, + } + req := clientset.CoreV1().Pods(p.Namespace).GetLogs(p.Name, &podLogOpts) + logs, err := req.Stream(ctx) + + // If one pod or container is in a non-running state, we don't want to quit. + // Checking the response string avoids the risk of a race condition but + // obviously feels a bit brittle too. + if err != nil && strings.Contains(err.Error(), "is waiting to start") { + return &streamError{err: err, podName: p.Name, recoverable: true} + } else if err != nil { + return &streamError{err: err, podName: p.Name} + } + + defer func() { _ = logs.Close() }() + + scanner := bufio.NewScanner(logs) + for scanner.Scan() { + if _, err = dst.Write([]byte("[" + p.Name + "] " + scanner.Text() + nl)); err != nil { + return &streamError{err: fmt.Errorf("error writing: %v", err), podName: p.Name} } - }() + } + + if err := scanner.Err(); err != nil { + return &streamError{err: fmt.Errorf("error scanning: %v", err), podName: p.Name} + } + + return nil } func (w *Watcher) removePod(podName string) { diff --git a/logs/watcher_test.go b/logs/watcher_test.go index 6d87d9f..8a18748 100644 --- a/logs/watcher_test.go +++ b/logs/watcher_test.go @@ -60,7 +60,7 @@ func TestWatcher(t *testing.T) { client := logs.KubernetesClient{Interface: clientset} var buf bytes.Buffer - watcher := logs.NewWatcher("mydeployment", client, &buf) + watcher := logs.NewWatcher("mydeployment", "mycontainer", client, &buf) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() diff --git a/main.go b/main.go index 55c8a30..8703921 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,10 @@ import ( ) func main() { - var kubeconfig *string + var ( + kubeconfig *string + container *string + ) var deployName *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") @@ -22,6 +25,7 @@ func main() { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } deployName = flag.String("deployment", "", "name of a deployment to monitor") + container = flag.String("container", "", "name of a specific container") flag.Parse() config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) @@ -35,7 +39,12 @@ func main() { } ctx := context.Background() - watcher := logs.NewWatcher(*deployName, logs.KubernetesClient{Interface: clientset}, os.Stdout) + watcher := logs.NewWatcher( + *deployName, + *container, + logs.KubernetesClient{Interface: clientset}, + os.Stdout, + ) if err := watcher.Watch(ctx); err != nil { log.Fatal(err) }