Configure debug logging
continuous-integration/drone/push Build is passing
Details
continuous-integration/drone/push Build is passing
Details
This commit is contained in:
parent
12530471d0
commit
b1a8a75bdb
|
@ -48,10 +48,11 @@ type PodWatcher struct {
|
||||||
streamResults chan error
|
streamResults chan error
|
||||||
dst io.Writer
|
dst io.Writer
|
||||||
closeChan chan struct{}
|
closeChan chan struct{}
|
||||||
|
logger *log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPodWatcher initializes a new PodWatcher.
|
// NewPodWatcher initializes a new PodWatcher.
|
||||||
func NewPodWatcher(client KubernetesClient, container string, labelSelector labels.Selector, dst io.Writer) PodWatcherInterface {
|
func NewPodWatcher(client KubernetesClient, container string, labelSelector labels.Selector, dst io.Writer, logger *log.Logger) PodWatcherInterface {
|
||||||
return &PodWatcher{
|
return &PodWatcher{
|
||||||
client: client,
|
client: client,
|
||||||
container: container,
|
container: container,
|
||||||
|
@ -61,6 +62,7 @@ func NewPodWatcher(client KubernetesClient, container string, labelSelector labe
|
||||||
streamResults: make(chan error),
|
streamResults: make(chan error),
|
||||||
dst: dst,
|
dst: dst,
|
||||||
closeChan: make(chan struct{}),
|
closeChan: make(chan struct{}),
|
||||||
|
logger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,7 +75,7 @@ func (pw *PodWatcher) WatchPods(ctx context.Context) error {
|
||||||
err := pw.watchPods(ctx, &wg)
|
err := pw.watchPods(ctx, &wg)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
log.Println("[PodWatcher] all goroutines exited, exiting")
|
pw.logger.Println("[PodWatcher] all goroutines exited, exiting")
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -117,7 +119,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error {
|
||||||
for podName, pod := range pw.spec {
|
for podName, pod := range pw.spec {
|
||||||
pod := pod
|
pod := pod
|
||||||
if _, ok := pw.status[podName]; !ok {
|
if _, ok := pw.status[podName]; !ok {
|
||||||
log.Printf("[PodWatcher] adding pod, name = %s", pod.Name)
|
pw.logger.Printf("[PodWatcher] adding pod, name = %s", pod.Name)
|
||||||
pw.status[pod.Name] = true
|
pw.status[pod.Name] = true
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -163,7 +165,7 @@ func (pw *PodWatcher) watchPods(ctx context.Context, wg *sync.WaitGroup) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pw *PodWatcher) removePod(podName string) {
|
func (pw *PodWatcher) removePod(podName string) {
|
||||||
log.Printf("[PodWatcher] removing pod, name = %s", podName)
|
pw.logger.Printf("[PodWatcher] removing pod, name = %s", podName)
|
||||||
delete(pw.status, podName)
|
delete(pw.status, podName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,7 +76,7 @@ func TestPodWatcherClose(t *testing.T) {
|
||||||
selector := labels.SelectorFromSet(map[string]string{"foo": "bar"})
|
selector := labels.SelectorFromSet(map[string]string{"foo": "bar"})
|
||||||
|
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf)
|
pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf, discardLogger())
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
podsWatcher.Add(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, Status: corev1.PodStatus{Phase: corev1.PodRunning}})
|
||||||
|
@ -148,7 +148,7 @@ func TestPodWatcher(t *testing.T) {
|
||||||
selector := labels.SelectorFromSet(map[string]string{"foo": "bar"})
|
selector := labels.SelectorFromSet(map[string]string{"foo": "bar"})
|
||||||
|
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf)
|
pw := logs.NewPodWatcher(client, "mycontainer", selector, &buf, discardLogger())
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for _, pod := range tc.podEvents {
|
for _, pod := range tc.podEvents {
|
||||||
|
|
|
@ -3,8 +3,10 @@ package logs
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -48,7 +50,7 @@ type PodWatcherInterface interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodWatcherFunc builds a PodWatcher.
|
// PodWatcherFunc builds a PodWatcher.
|
||||||
type PodWatcherFunc func(KubernetesClient, string, labels.Selector, io.Writer) PodWatcherInterface
|
type PodWatcherFunc func(KubernetesClient, string, labels.Selector, io.Writer, *log.Logger) PodWatcherInterface
|
||||||
|
|
||||||
// WatcherParams defines the input parameters of a Watcher.
|
// WatcherParams defines the input parameters of a Watcher.
|
||||||
type WatcherParams struct {
|
type WatcherParams struct {
|
||||||
|
@ -70,16 +72,18 @@ type Watcher struct {
|
||||||
podWatcherFunc PodWatcherFunc
|
podWatcherFunc PodWatcherFunc
|
||||||
errChan chan error
|
errChan chan error
|
||||||
dst *concurrentWriter
|
dst *concurrentWriter
|
||||||
|
logger *log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWatcher creates a new Watcher.
|
// NewWatcher creates a new Watcher.
|
||||||
func NewWatcher(params WatcherParams, client KubernetesClient, podWatcherFunc PodWatcherFunc, dst io.Writer) *Watcher {
|
func NewWatcher(params WatcherParams, client KubernetesClient, podWatcherFunc PodWatcherFunc, dst io.Writer, logger *log.Logger) *Watcher {
|
||||||
return &Watcher{
|
return &Watcher{
|
||||||
params: params,
|
params: params,
|
||||||
client: client,
|
client: client,
|
||||||
podWatcherFunc: podWatcherFunc,
|
podWatcherFunc: podWatcherFunc,
|
||||||
errChan: make(chan error),
|
errChan: make(chan error),
|
||||||
dst: &concurrentWriter{w: dst},
|
dst: &concurrentWriter{w: dst},
|
||||||
|
logger: logger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,7 +134,7 @@ func (w *Watcher) Watch(ctx context.Context) error {
|
||||||
if !ok || err != nil {
|
if !ok || err != nil {
|
||||||
// matchLabels don't exist or cannot be parsed.
|
// matchLabels don't exist or cannot be parsed.
|
||||||
// Should this be fatal?
|
// Should this be fatal?
|
||||||
log.Printf("warning: unable to parse matchLabels: ok = %t, err = %v", ok, err)
|
w.logger.Printf("warning: unable to parse matchLabels: ok = %t, err = %v", ok, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
selector := labels.SelectorFromSet(selectorAsMap)
|
selector := labels.SelectorFromSet(selectorAsMap)
|
||||||
|
@ -151,7 +155,7 @@ func (w *Watcher) checkResourceExists(ctx context.Context, namespace string, res
|
||||||
_, err := w.client.Untyped.Resource(resourceID).Namespace(namespace).Get(ctx, w.params.Name, metav1.GetOptions{})
|
_, err := w.client.Untyped.Resource(resourceID).Namespace(namespace).Get(ctx, w.params.Name, metav1.GetOptions{})
|
||||||
var statusErr *apierrors.StatusError
|
var statusErr *apierrors.StatusError
|
||||||
if !w.params.StrictExist && errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonNotFound {
|
if !w.params.StrictExist && errors.As(err, &statusErr) && statusErr.Status().Reason == metav1.StatusReasonNotFound {
|
||||||
log.Printf(`%s "%s" does not exist, waiting`, resourceID.Resource, w.params.Name)
|
fmt.Fprintf(os.Stderr, "%s \"%s\" does not exist, waiting\n", resourceID.Resource, w.params.Name)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -164,7 +168,7 @@ func (w *Watcher) addDeployment(ctx context.Context, resourceUID types.UID, podS
|
||||||
|
|
||||||
w.removeDeployment()
|
w.removeDeployment()
|
||||||
|
|
||||||
log.Println("[DeploymentWatcher] add podWatcher")
|
w.logger.Println("[DeploymentWatcher] add podWatcher")
|
||||||
|
|
||||||
w.resourceUID = resourceUID
|
w.resourceUID = resourceUID
|
||||||
w.podSelector = podSelector
|
w.podSelector = podSelector
|
||||||
|
@ -173,6 +177,7 @@ func (w *Watcher) addDeployment(ctx context.Context, resourceUID types.UID, podS
|
||||||
w.params.Container,
|
w.params.Container,
|
||||||
w.podSelector,
|
w.podSelector,
|
||||||
w.dst,
|
w.dst,
|
||||||
|
w.logger,
|
||||||
)
|
)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -184,7 +189,7 @@ func (w *Watcher) addDeployment(ctx context.Context, resourceUID types.UID, podS
|
||||||
|
|
||||||
func (w *Watcher) removeDeployment() {
|
func (w *Watcher) removeDeployment() {
|
||||||
if w.podWatcher != nil {
|
if w.podWatcher != nil {
|
||||||
log.Println("[DeploymentWatcher] remove podWatcher")
|
w.logger.Println("[DeploymentWatcher] remove podWatcher")
|
||||||
w.podWatcher.Close()
|
w.podWatcher.Close()
|
||||||
w.podWatcher = nil
|
w.podWatcher = nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -30,11 +31,15 @@ func (m *mockPodWatcher) WatchPods(ctx context.Context) error { return m.err }
|
||||||
func (m *mockPodWatcher) Close() {}
|
func (m *mockPodWatcher) Close() {}
|
||||||
|
|
||||||
func mockPodwatcherFunc(err error) logs.PodWatcherFunc {
|
func mockPodwatcherFunc(err error) logs.PodWatcherFunc {
|
||||||
return func(logs.KubernetesClient, string, labels.Selector, io.Writer) logs.PodWatcherInterface {
|
return func(logs.KubernetesClient, string, labels.Selector, io.Writer, *log.Logger) logs.PodWatcherInterface {
|
||||||
return &mockPodWatcher{err: err}
|
return &mockPodWatcher{err: err}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func discardLogger() *log.Logger {
|
||||||
|
return log.New(io.Discard, "", 0)
|
||||||
|
}
|
||||||
|
|
||||||
func buildDeployment(t *testing.T, name string) *unstructured.Unstructured {
|
func buildDeployment(t *testing.T, name string) *unstructured.Unstructured {
|
||||||
deployment := new(unstructured.Unstructured)
|
deployment := new(unstructured.Unstructured)
|
||||||
deployment.SetAPIVersion("v1")
|
deployment.SetAPIVersion("v1")
|
||||||
|
@ -51,7 +56,7 @@ func TestWatcherStrictExist(t *testing.T) {
|
||||||
|
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default", StrictExist: true}
|
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default", StrictExist: true}
|
||||||
watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(nil), &buf)
|
watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(nil), &buf, discardLogger())
|
||||||
|
|
||||||
err := watcher.Watch(context.Background())
|
err := watcher.Watch(context.Background())
|
||||||
assert.EqualError(t, err, `deployments.apps "mydeployment" not found`)
|
assert.EqualError(t, err, `deployments.apps "mydeployment" not found`)
|
||||||
|
@ -67,7 +72,7 @@ func TestWatcherPodWatcherError(t *testing.T) {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
wantErr := errors.New("foo")
|
wantErr := errors.New("foo")
|
||||||
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"}
|
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"}
|
||||||
watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(wantErr), &buf)
|
watcher := logs.NewWatcher(params, client, mockPodwatcherFunc(wantErr), &buf, discardLogger())
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer deploymentsWatcher.Stop()
|
defer deploymentsWatcher.Stop()
|
||||||
|
@ -89,7 +94,7 @@ func TestWatcherClosedChannel(t *testing.T) {
|
||||||
|
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"}
|
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"}
|
||||||
watcher := logs.NewWatcher(params, client, nil, &buf)
|
watcher := logs.NewWatcher(params, client, nil, &buf, discardLogger())
|
||||||
// Immediately stop the watcher, which closes the ResultChan.
|
// Immediately stop the watcher, which closes the ResultChan.
|
||||||
// This should be expected to be handled.
|
// This should be expected to be handled.
|
||||||
deploymentsWatcher.Stop()
|
deploymentsWatcher.Stop()
|
||||||
|
@ -136,7 +141,7 @@ func TestWatcherWithPodWatcher(t *testing.T) {
|
||||||
|
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"}
|
params := logs.WatcherParams{Name: "mydeployment", Type: "deployments", Namespace: "default"}
|
||||||
watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &buf)
|
watcher := logs.NewWatcher(params, client, logs.NewPodWatcher, &buf, discardLogger())
|
||||||
|
|
||||||
err := watcher.Watch(ctx)
|
err := watcher.Watch(ctx)
|
||||||
require.EqualError(t, err, context.DeadlineExceeded.Error())
|
require.EqualError(t, err, context.DeadlineExceeded.Error())
|
||||||
|
|
9
main.go
9
main.go
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
@ -23,11 +24,13 @@ func main() {
|
||||||
var (
|
var (
|
||||||
container string
|
container string
|
||||||
strictExist bool
|
strictExist bool
|
||||||
|
debug bool
|
||||||
version bool
|
version bool
|
||||||
)
|
)
|
||||||
|
|
||||||
flag.StringVar(&container, "container", "", "name of a specific container")
|
flag.StringVar(&container, "container", "", "name of a specific container")
|
||||||
flag.BoolVar(&strictExist, "strict", false, "require deployment to exist on launch")
|
flag.BoolVar(&strictExist, "strict", false, "require deployment to exist on launch")
|
||||||
|
flag.BoolVar(&debug, "debug", false, "enable debug logging to stderr")
|
||||||
flag.BoolVar(&version, "version", false, "print version and exit")
|
flag.BoolVar(&version, "version", false, "print version and exit")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
|
@ -36,6 +39,11 @@ func main() {
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger := log.New(io.Discard, "", 0)
|
||||||
|
if debug {
|
||||||
|
logger = log.New(os.Stderr, "", 0)
|
||||||
|
}
|
||||||
|
|
||||||
var params logs.WatcherParams
|
var params logs.WatcherParams
|
||||||
|
|
||||||
switch len(flag.Args()) {
|
switch len(flag.Args()) {
|
||||||
|
@ -79,6 +87,7 @@ func main() {
|
||||||
logs.KubernetesClient{Typed: clientset, Untyped: dynamicclient},
|
logs.KubernetesClient{Typed: clientset, Untyped: dynamicclient},
|
||||||
logs.NewPodWatcher,
|
logs.NewPodWatcher,
|
||||||
os.Stdout,
|
os.Stdout,
|
||||||
|
logger,
|
||||||
)
|
)
|
||||||
if err := watcher.Watch(ctx); err != nil {
|
if err := watcher.Watch(ctx); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
|
Loading…
Reference in New Issue