podWatcher: handle closed channel
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Rob Watson 2022-06-03 22:20:23 +02:00
parent d5835821c5
commit 0827693801
3 changed files with 40 additions and 1 deletions

View File

@ -100,6 +100,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error {
// streamErrors is never closed. // streamErrors is never closed.
streamErrors := make(chan *streamError) streamErrors := make(chan *streamError)
resultChan := watcher.ResultChan()
for { for {
select { select {
@ -143,7 +144,11 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error {
return fmt.Errorf("error streaming logs: %w", streamErr) return fmt.Errorf("error streaming logs: %w", streamErr)
} }
case evt := <-watcher.ResultChan(): case evt, ok := <-resultChan:
if !ok {
resultChan = nil
continue
}
switch evt.Type { switch evt.Type {
case watch.Added, watch.Modified: case watch.Added, watch.Modified:
pod := evt.Object.(*corev1.Pod) pod := evt.Object.(*corev1.Pod)

View File

@ -60,6 +60,39 @@ func (m *mockClientset) GetLogs(podName string, _ *corev1.PodLogOptions) *rest.R
return fakeClient.Request() return fakeClient.Request()
} }
func TestPodWatcherClosedWatcher(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
podsWatcher := watch.NewFake()
clientset := mockClientset{
getLogsRespBody: "it worked",
getLogsStatusCode: http.StatusOK,
podsWatcher: podsWatcher,
}
clientset.PrependWatchReactor("pods", k8stest.DefaultWatchReactor(podsWatcher, nil))
client := logs.KubernetesClient{Interface: &clientset}
selector := metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}
go func() {
defer podsWatcher.Stop()
podsWatcher.Add(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"},
Status: corev1.PodStatus{Phase: corev1.PodRunning},
})
}()
var buf bytes.Buffer
pw := logs.NewPodWatcher(client, "mycontainer", &selector, &buf)
// WatchPods should wait for the logs and return cleanly.
err := pw.WatchPods(ctx)
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Equal(t, "[foo] it worked\n", buf.String())
}
func TestPodWatcher(t *testing.T) { func TestPodWatcher(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string

View File

@ -98,6 +98,7 @@ func (w *Watcher) Watch(ctx context.Context) error {
for { for {
select { select {
// TODO: check if closed.
case evt := <-deploymentsWatcher.ResultChan(): case evt := <-deploymentsWatcher.ResultChan():
switch evt.Type { switch evt.Type {
case watch.Added, watch.Modified: case watch.Added, watch.Modified: