watcher: handle closed channel
This commit is contained in:
parent
0827693801
commit
59a9335017
|
@ -89,8 +89,8 @@ func TestPodWatcherClosedWatcher(t *testing.T) {
|
||||||
|
|
||||||
// WatchPods should wait for the logs and return cleanly.
|
// WatchPods should wait for the logs and return cleanly.
|
||||||
err := pw.WatchPods(ctx)
|
err := pw.WatchPods(ctx)
|
||||||
require.EqualError(t, err, context.DeadlineExceeded.Error())
|
require.Equal(t, context.DeadlineExceeded, err)
|
||||||
require.Equal(t, "[foo] it worked\n", buf.String())
|
assert.Equal(t, "[foo] it worked\n", buf.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPodWatcher(t *testing.T) {
|
func TestPodWatcher(t *testing.T) {
|
||||||
|
|
|
@ -96,10 +96,14 @@ func (w *Watcher) Watch(ctx context.Context) error {
|
||||||
ticker := time.NewTicker(time.Second)
|
ticker := time.NewTicker(time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
resultChan := deploymentsWatcher.ResultChan()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
// TODO: check if closed.
|
case evt, ok := <-resultChan:
|
||||||
case evt := <-deploymentsWatcher.ResultChan():
|
if !ok {
|
||||||
|
resultChan = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
switch evt.Type {
|
switch evt.Type {
|
||||||
case watch.Added, watch.Modified:
|
case watch.Added, watch.Modified:
|
||||||
deployment := evt.Object.(*appsv1.Deployment)
|
deployment := evt.Object.(*appsv1.Deployment)
|
||||||
|
|
|
@ -64,6 +64,26 @@ func TestWatcherPodWatcherError(t *testing.T) {
|
||||||
assert.EqualError(t, err, wantErr.Error())
|
assert.EqualError(t, err, wantErr.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatcherClosedChannel(t *testing.T) {
|
||||||
|
deploymentsWatcher := watch.NewFake()
|
||||||
|
clientset := testclient.NewSimpleClientset(
|
||||||
|
&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mydeployment", Namespace: "default"}},
|
||||||
|
)
|
||||||
|
clientset.PrependWatchReactor("deployments", k8stest.DefaultWatchReactor(deploymentsWatcher, nil))
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
client := logs.KubernetesClient{Interface: clientset}
|
||||||
|
watcher := logs.NewWatcher("mydeployment", "mycontainer", true, client, nil, &buf)
|
||||||
|
go deploymentsWatcher.Stop()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
err := watcher.Watch(ctx)
|
||||||
|
require.Equal(t, context.DeadlineExceeded, err)
|
||||||
|
assert.Equal(t, "", buf.String())
|
||||||
|
}
|
||||||
|
|
||||||
func TestWatcherWithPodWatcher(t *testing.T) {
|
func TestWatcherWithPodWatcher(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue