podWatcher: Use RetryWatcher
continuous-integration/drone/push Build is failing
Details
continuous-integration/drone/push Build is failing
Details
This commit is contained in:
parent
680f5b65e7
commit
0bbbb4015b
|
@ -16,6 +16,8 @@ import (
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
|
toolswatch "k8s.io/client-go/tools/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
const nl = "\n"
|
const nl = "\n"
|
||||||
|
@ -107,10 +109,11 @@ func (pw *PodWatcher) Close() {
|
||||||
func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error {
|
func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error {
|
||||||
podsClient := pw.client.Typed.CoreV1().Pods(pw.namespace)
|
podsClient := pw.client.Typed.CoreV1().Pods(pw.namespace)
|
||||||
|
|
||||||
// Returns a watcher which notifies of changes in the relevant pods.
|
watchFunc := func(_ metav1.ListOptions) (watch.Interface, error) {
|
||||||
// We don't defer Stop() on the returned value because the sender is the
|
return podsClient.Watch(ctx, metav1.ListOptions{LabelSelector: pw.labelSelector.String()})
|
||||||
// Kubernetes SDK, and that may introduce a send-on-closed-channel panic.
|
}
|
||||||
watcher, err := podsClient.Watch(ctx, metav1.ListOptions{LabelSelector: pw.labelSelector.String()})
|
|
||||||
|
retryWatcher, err := toolswatch.NewRetryWatcher("1", &cache.ListWatch{WatchFunc: watchFunc})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -121,7 +124,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error {
|
||||||
logLines := make(chan string)
|
logLines := make(chan string)
|
||||||
logErrorChan := make(chan error)
|
logErrorChan := make(chan error)
|
||||||
|
|
||||||
resultChan := watcher.ResultChan()
|
resultChan := retryWatcher.ResultChan()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -138,7 +141,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error {
|
||||||
// process the logs of any missing pods.
|
// process the logs of any missing pods.
|
||||||
for podName, pod := range pw.spec {
|
for podName, pod := range pw.spec {
|
||||||
if _, ok := pw.status[podName]; !ok {
|
if _, ok := pw.status[podName]; !ok {
|
||||||
pw.logger.Printf("[PodWatcher] adding pod, name = %s, container = %s, namespace = %s,", pod.Name, pw.container, pod.Namespace)
|
pw.logger.Printf("[PodWatcher] adding pod, name = %s, container = %s, namespace = %s", pod.Name, pw.container, pod.Namespace)
|
||||||
stream := newStream(pod.Name, pw.container, pod.Namespace)
|
stream := newStream(pod.Name, pw.container, pod.Namespace)
|
||||||
pw.status[pod.Name] = stream
|
pw.status[pod.Name] = stream
|
||||||
|
|
||||||
|
|
|
@ -78,7 +78,7 @@ func TestPodWatcherClose(t *testing.T) {
|
||||||
pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger())
|
pw := logs.NewPodWatcher(client, corev1.NamespaceDefault, "mycontainer", selector, &buf, discardLogger())
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
// Close() should cause the watcher to return cleanly:
|
// Close() should cause the watcher to return cleanly:
|
||||||
pw.Close()
|
pw.Close()
|
||||||
|
@ -120,14 +120,14 @@ func TestPodWatcherRemovedPod(t *testing.T) {
|
||||||
go func() {
|
go func() {
|
||||||
defer pw.Close()
|
defer pw.Close()
|
||||||
|
|
||||||
podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
||||||
time.Sleep(time.Millisecond * 500)
|
time.Sleep(time.Millisecond * 500)
|
||||||
w1.Write([]byte("should be logged\n"))
|
w1.Write([]byte("should be logged\n"))
|
||||||
podsWatcher.Delete(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
podsWatcher.Delete(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
||||||
time.Sleep(time.Millisecond * 500)
|
time.Sleep(time.Millisecond * 500)
|
||||||
w1.Write([]byte("should not be logged\n"))
|
w1.Write([]byte("should not be logged\n"))
|
||||||
time.Sleep(time.Millisecond * 500)
|
time.Sleep(time.Millisecond * 500)
|
||||||
podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
||||||
time.Sleep(time.Millisecond * 500)
|
time.Sleep(time.Millisecond * 500)
|
||||||
w2.Write([]byte("should be logged\n"))
|
w2.Write([]byte("should be logged\n"))
|
||||||
time.Sleep(time.Millisecond * 500)
|
time.Sleep(time.Millisecond * 500)
|
||||||
|
@ -152,7 +152,7 @@ func TestPodWatcher(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "unexpected error getting logs",
|
name: "unexpected error getting logs",
|
||||||
podEvents: []*corev1.Pod{
|
podEvents: []*corev1.Pod{
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
||||||
},
|
},
|
||||||
getLogsErr: errors.New("nope"),
|
getLogsErr: errors.New("nope"),
|
||||||
wantOut: nil,
|
wantOut: nil,
|
||||||
|
@ -161,7 +161,7 @@ func TestPodWatcher(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "recoverable error getting logs",
|
name: "recoverable error getting logs",
|
||||||
podEvents: []*corev1.Pod{
|
podEvents: []*corev1.Pod{
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
||||||
},
|
},
|
||||||
getLogsErr: &apierrors.StatusError{ErrStatus: metav1.Status{Message: "is waiting to start: ContainerCreating", Reason: metav1.StatusReasonBadRequest}},
|
getLogsErr: &apierrors.StatusError{ErrStatus: metav1.Status{Message: "is waiting to start: ContainerCreating", Reason: metav1.StatusReasonBadRequest}},
|
||||||
wantOut: nil,
|
wantOut: nil,
|
||||||
|
@ -169,9 +169,9 @@ func TestPodWatcher(t *testing.T) {
|
||||||
{
|
{
|
||||||
name: "success",
|
name: "success",
|
||||||
podEvents: []*corev1.Pod{
|
podEvents: []*corev1.Pod{
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodPending}},
|
{ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodPending}},
|
||||||
},
|
},
|
||||||
getLogsRespBody: "some logs",
|
getLogsRespBody: "some logs",
|
||||||
getLogsStatusCode: http.StatusOK,
|
getLogsStatusCode: http.StatusOK,
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -25,6 +26,18 @@ import (
|
||||||
k8stest "k8s.io/client-go/testing"
|
k8stest "k8s.io/client-go/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type concurrentWriter struct {
|
||||||
|
w io.Writer
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cw *concurrentWriter) Write(p []byte) (int, error) {
|
||||||
|
cw.mu.Lock()
|
||||||
|
defer cw.mu.Unlock()
|
||||||
|
|
||||||
|
return cw.w.Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
type mockPodWatcher struct{ err error }
|
type mockPodWatcher struct{ err error }
|
||||||
|
|
||||||
func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err }
|
func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err }
|
||||||
|
@ -122,8 +135,9 @@ func TestWatcherWithPodWatcher(t *testing.T) {
|
||||||
client := logs.KubernetesClient{Typed: typedClient, Untyped: untypedClient}
|
client := logs.KubernetesClient{Typed: typedClient, Untyped: untypedClient}
|
||||||
|
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
|
cw := concurrentWriter{w: &buf}
|
||||||
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"}
|
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"}
|
||||||
watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &buf, discardLogger())
|
watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &cw, discardLogger())
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
deployment := buildDeployment(t, "mydeployment")
|
deployment := buildDeployment(t, "mydeployment")
|
||||||
|
@ -131,9 +145,9 @@ func TestWatcherWithPodWatcher(t *testing.T) {
|
||||||
time.Sleep(time.Millisecond * 250)
|
time.Sleep(time.Millisecond * 250)
|
||||||
|
|
||||||
pods := []*corev1.Pod{
|
pods := []*corev1.Pod{
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
{ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}},
|
||||||
{ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodPending}},
|
{ObjectMeta: metav1.ObjectMeta{Name: "baz", Namespace: "default", ResourceVersion: "1"}, Status: corev1.PodStatus{Phase: corev1.PodPending}},
|
||||||
}
|
}
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
podsWatcher.Add(pod)
|
podsWatcher.Add(pod)
|
||||||
|
|
Loading…
Reference in New Issue