diff --git a/internal/client/integration_helpers_test.go b/internal/client/integration_helpers_test.go index 13d2fc5..7f5a73c 100644 --- a/internal/client/integration_helpers_test.go +++ b/internal/client/integration_helpers_test.go @@ -5,6 +5,7 @@ package client_test import ( "context" "encoding/json" + "errors" "fmt" "io" "log/slog" @@ -24,6 +25,7 @@ import ( "github.com/gdamore/tcell/v2" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" + "golang.org/x/sync/errgroup" ) func buildClientServer( @@ -37,7 +39,7 @@ func buildClientServer( BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"}, Screen: &terminal.Screen{ Screen: screen, - Width: 160, + Width: 180, Height: 25, CaptureC: screenCaptureC, }, @@ -66,23 +68,21 @@ func runClientServer( ) <-chan clientServerResult { ch := make(chan clientServerResult, 1) - var wg sync.WaitGroup + g, ctx := errgroup.WithContext(ctx) var clientErr, srvErr error - wg.Add(1) - go func() { - defer wg.Done() + g.Go(func() error { srvErr = serverApp.Run(ctx) - }() + return errors.New("server closed") + }) - wg.Add(1) - go func() { - defer wg.Done() + g.Go(func() error { clientErr = clientApp.Run(ctx) - }() + return errors.New("client closed") + }) go func() { - wg.Wait() + _ = g.Wait() ch <- clientServerResult{errClient: clientErr, errServer: srvErr} }() diff --git a/internal/client/integration_test.go b/internal/client/integration_test.go index 51a5f62..c31b3ea 100644 --- a/internal/client/integration_test.go +++ b/internal/client/integration_test.go @@ -9,6 +9,7 @@ import ( "crypto/x509" "encoding/pem" "fmt" + "net" "os" "testing" "time" @@ -822,6 +823,44 @@ func TestIntegrationStartupCheck(t *testing.T) { assert.ErrorIs(t, result.errServer, context.Canceled) } +func TestIntegrationMediaServerError(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) + defer cancel() + + lis, err := net.Listen("tcp", ":1935") + require.NoError(t, err) + t.Cleanup(func() { lis.Close() }) + + logger := testhelpers.NewTestLogger(t).With("component", "integration") + dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) + require.NoError(t, err) + + configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) + screen, screenCaptureC, getContents := setupSimulationScreen(t) + + client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger) + ch := runClientServer(ctx, t, client, server) + + require.EventuallyWithT( + t, + func(c *assert.CollectT) { + assert.True(c, contentsIncludes(getContents(), "Server process exited unexpectedly."), "expected to see title") + assert.True(c, contentsIncludes(getContents(), "address already in use"), "expected to see message") + }, + waitTime, + time.Second, + "expected to see media server error modal", + ) + printScreen(t, getContents, "Ater displaying the media server error modal") + + // Quit the app: + sendKey(t, screen, tcell.KeyEnter, ' ') + + result := <-ch + assert.ErrorContains(t, result.errClient, "context canceled") + assert.ErrorContains(t, result.errServer, "media server exited") +} + func TestIntegrationCopyURLs(t *testing.T) { type binding struct { key tcell.Key diff --git a/internal/server/grpc.go b/internal/server/grpc.go index b28bbf6..1c68bba 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log/slog" + "sync" "git.netflux.io/rob/octoplex/internal/event" pb "git.netflux.io/rob/octoplex/internal/generated/grpc" @@ -13,6 +14,10 @@ import ( "golang.org/x/sync/errgroup" ) +// APIListenerCountDeltaFunc is a function that is called when the number of +// API clients increments or decrements. +type APIClientCountDeltaFunc func(delta int) + // Server is the gRPC server that handles incoming commands and outgoing // events. type Server struct { @@ -21,6 +26,9 @@ type Server struct { dispatcher func(event.Command) bus *event.Bus logger *slog.Logger + + mu sync.Mutex + clientCount int } // newServer creates a new gRPC server. @@ -56,6 +64,16 @@ func (s *Server) Communicate(stream pb.InternalAPI_CommunicateServer) error { }) g.Go(func() error { + s.mu.Lock() + s.clientCount++ + s.mu.Unlock() + + defer func() { + s.mu.Lock() + s.clientCount-- + s.mu.Unlock() + }() + for { in, err := stream.Recv() if err == io.EOF { @@ -87,3 +105,11 @@ func (s *Server) Communicate(stream pb.InternalAPI_CommunicateServer) error { return nil } + +// GetClientCount returns the number of connected clients. +func (s *Server) GetClientCount() int { + s.mu.Lock() + defer s.mu.Unlock() + + return s.clientCount +} diff --git a/internal/server/serverapp.go b/internal/server/serverapp.go index ded01e8..7e1f63d 100644 --- a/internal/server/serverapp.go +++ b/internal/server/serverapp.go @@ -67,11 +67,6 @@ func (a *App) Run(ctx context.Context) error { return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set") } - // doFatalError publishes a fatal error to the event bus. - doFatalError := func(msg string) { - a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg}) - } - const grpcAddr = ":50051" lis, err := net.Listen("tcp", grpcAddr) if err != nil { @@ -81,13 +76,40 @@ func (a *App) Run(ctx context.Context) error { grpcServer := grpc.NewServer() grpcDone := make(chan error, 1) - - pb.RegisterInternalAPIServer(grpcServer, newServer(a.DispatchAsync, a.eventBus, a.logger)) + internalAPI := newServer(a.DispatchAsync, a.eventBus, a.logger) + pb.RegisterInternalAPIServer(grpcServer, internalAPI) go func() { a.logger.Info("gRPC server started", "addr", grpcAddr) grpcDone <- grpcServer.Serve(lis) }() + // emptyUI is a dummy function that sets the UI state to an empty state, and + // re-renders the screen. + // + // This is a workaround for a weird interaction between tview and + // tcell.SimulationScreen which leads to newly-added pages not rendering if + // the UI is not re-rendered for a second time. + // It is only needed for integration tests when rendering modals before the + // main loop starts. It would be nice to remove this but the risk/impact on + // non-test code is pretty low. + emptyUI := func() { + a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}}) + } + + // doFatalError publishes a fatal error to the event bus. It will block until + // the user acknowledges it if there is 1 or more clients connected to the + // internal API. + doFatalError := func(msg string) { + a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg}) + + if internalAPI.GetClientCount() == 0 { + return + } + + emptyUI() + <-a.dispatchC + } + containerClient, err := container.NewClient(ctx, a.dockerClient, a.logger.With("component", "container_client")) if err != nil { err = fmt.Errorf("create container client: %w", err)