diff --git a/internal/app/integration_helpers_test.go b/internal/app/integration_helpers_test.go index 61c3c96..fc90f04 100644 --- a/internal/app/integration_helpers_test.go +++ b/internal/app/integration_helpers_test.go @@ -22,30 +22,10 @@ import ( "git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/terminal" "github.com/gdamore/tcell/v2" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" ) -func buildAppParams( - t *testing.T, - configService *config.Service, - dockerClient container.DockerClient, - screen tcell.SimulationScreen, - screenCaptureC chan<- terminal.ScreenCapture, - logger *slog.Logger, -) app.Params { - t.Helper() - - return app.Params{ - ConfigService: configService, - DockerClient: dockerClient, - ClipboardAvailable: false, - BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"}, - Logger: logger, - } -} - func buildClientServer( configService *config.Service, dockerClient container.DockerClient, @@ -54,7 +34,8 @@ func buildClientServer( logger *slog.Logger, ) (*client.App, *app.App) { buildInfo := domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"} - clientApp := client.New(client.NewParams{ + + client := client.New(client.NewParams{ BuildInfo: buildInfo, Screen: &terminal.Screen{ Screen: screen, @@ -65,8 +46,7 @@ func buildClientServer( Logger: logger, }) - // TODO: use buildAppParams - srvApp := app.New(app.Params{ + server := app.New(app.Params{ ConfigService: configService, DockerClient: dockerClient, ClipboardAvailable: false, @@ -74,29 +54,44 @@ func buildClientServer( Logger: logger, }) - return clientApp, srvApp + return client, server +} + +type clientServerResult struct { + errClient error + errServer error } func runClientServer( ctx context.Context, - t *testing.T, - wg *sync.WaitGroup, + _ *testing.T, clientApp *client.App, serverApp *app.App, -) { +) <-chan clientServerResult { + ch := make(chan clientServerResult, 1) + + var wg sync.WaitGroup + var clientErr, srvErr error + wg.Add(1) go func() { defer wg.Done() - assert.ErrorIs(t, serverApp.Run(ctx), context.Canceled) + srvErr = serverApp.Run(ctx) }() wg.Add(1) go func() { defer wg.Done() - // May be a gRPC error, not context.Canceled: - assert.ErrorContains(t, clientApp.Run(ctx), "context canceled") + clientErr = clientApp.Run(ctx) }() + go func() { + wg.Wait() + + ch <- clientServerResult{errClient: clientErr, errServer: srvErr} + }() + + return ch } func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) { diff --git a/internal/app/integration_test.go b/internal/app/integration_test.go index bfc0f55..df5e13b 100644 --- a/internal/app/integration_test.go +++ b/internal/app/integration_test.go @@ -8,26 +8,19 @@ import ( "crypto/tls" "crypto/x509" "encoding/pem" - "errors" "fmt" - "net" "os" - "sync" "testing" "time" - "git.netflux.io/rob/octoplex/internal/app" "git.netflux.io/rob/octoplex/internal/config" "git.netflux.io/rob/octoplex/internal/container" - "git.netflux.io/rob/octoplex/internal/container/mocks" "git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/testhelpers" - "github.com/docker/docker/api/types/network" dockerclient "github.com/docker/docker/client" "github.com/docker/docker/errdefs" "github.com/gdamore/tcell/v2" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" @@ -127,8 +120,7 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) { }) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger) - var wg sync.WaitGroup - runClientServer(ctx, t, &wg, client, server) + ch := runClientServer(ctx, t, client, server) require.EventuallyWithT( t, @@ -271,7 +263,10 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) { cancel() - wg.Wait() + result := <-ch + // May be a gRPC error, not context.Canceled: + assert.ErrorContains(t, result.errClient, "context canceled") + assert.ErrorIs(t, result.errServer, context.Canceled) } func TestIntegrationCustomHost(t *testing.T) { @@ -293,8 +288,7 @@ func TestIntegrationCustomHost(t *testing.T) { screen, screenCaptureC, getContents := setupSimulationScreen(t) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger) - var wg sync.WaitGroup - runClientServer(ctx, t, &wg, client, server) + ch := runClientServer(ctx, t, client, server) time.Sleep(time.Second) sendKey(t, screen, tcell.KeyF1, ' ') @@ -334,7 +328,10 @@ func TestIntegrationCustomHost(t *testing.T) { cancel() - wg.Wait() + result := <-ch + // May be a gRPC error, not context.Canceled: + assert.ErrorContains(t, result.errClient, "context canceled") + assert.ErrorIs(t, result.errServer, context.Canceled) } func TestIntegrationCustomTLSCerts(t *testing.T) { @@ -359,8 +356,7 @@ func TestIntegrationCustomTLSCerts(t *testing.T) { screen, screenCaptureC, getContents := setupSimulationScreen(t) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger) - var wg sync.WaitGroup - runClientServer(ctx, t, &wg, client, server) + ch := runClientServer(ctx, t, client, server) require.EventuallyWithT( t, @@ -395,7 +391,9 @@ func TestIntegrationCustomTLSCerts(t *testing.T) { cancel() - wg.Wait() + result := <-ch + assert.ErrorContains(t, result.errClient, "context canceled") + assert.ErrorIs(t, result.errServer, context.Canceled) } func TestIntegrationRestartDestination(t *testing.T) { @@ -434,14 +432,8 @@ func TestIntegrationRestartDestination(t *testing.T) { }}, }) - done := make(chan struct{}) - go func() { - defer func() { - done <- struct{}{} - }() - - require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) - }() + client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger) + ch := runClientServer(ctx, t, client, server) require.EventuallyWithT( t, @@ -553,7 +545,9 @@ func TestIntegrationRestartDestination(t *testing.T) { cancel() - <-done + result := <-ch + assert.ErrorContains(t, result.errClient, "context canceled") + assert.ErrorIs(t, result.errServer, context.Canceled) } func TestIntegrationStartDestinationFailed(t *testing.T) { @@ -571,14 +565,8 @@ func TestIntegrationStartDestinationFailed(t *testing.T) { Destinations: []config.Destination{{Name: "Example server", URL: "rtmp://rtmp.example.com/live"}}, }) - done := make(chan struct{}) - go func() { - defer func() { - done <- struct{}{} - }() - - require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) - }() + client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger) + ch := runClientServer(ctx, t, client, server) require.EventuallyWithT( t, @@ -627,7 +615,9 @@ func TestIntegrationStartDestinationFailed(t *testing.T) { cancel() - <-done + result := <-ch + assert.ErrorContains(t, result.errClient, "context canceled") + assert.ErrorIs(t, result.errServer, context.Canceled) } func TestIntegrationDestinationValidations(t *testing.T) { @@ -644,14 +634,8 @@ func TestIntegrationDestinationValidations(t *testing.T) { Sources: config.Sources{MediaServer: config.MediaServerSource{StreamKey: "live", RTMP: config.RTMPSource{Enabled: true}}}, }) - done := make(chan struct{}) - go func() { - defer func() { - done <- struct{}{} - }() - - require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) - }() + client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger) + ch := runClientServer(ctx, t, client, server) require.EventuallyWithT( t, @@ -755,7 +739,9 @@ func TestIntegrationDestinationValidations(t *testing.T) { printScreen(t, getContents, "After entering a duplicate destination URL") cancel() - <-done + result := <-ch + assert.ErrorContains(t, result.errClient, "context canceled") + assert.ErrorIs(t, result.errServer, context.Canceled) } func TestIntegrationStartupCheck(t *testing.T) { @@ -786,14 +772,8 @@ func TestIntegrationStartupCheck(t *testing.T) { configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) screen, screenCaptureC, getContents := setupSimulationScreen(t) - done := make(chan struct{}) - go func() { - defer func() { - done <- struct{}{} - }() - - require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) - }() + client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger) + ch := runClientServer(ctx, t, client, server) require.EventuallyWithT( t, @@ -837,136 +817,9 @@ func TestIntegrationStartupCheck(t *testing.T) { printScreen(t, getContents, "After starting the mediaserver") cancel() - <-done -} - -func TestIntegrationMediaServerError(t *testing.T) { - ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) - defer cancel() - - lis, err := net.Listen("tcp", ":1935") - require.NoError(t, err) - t.Cleanup(func() { lis.Close() }) - - logger := testhelpers.NewTestLogger(t).With("component", "integration") - dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) - require.NoError(t, err) - - configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) - screen, screenCaptureC, getContents := setupSimulationScreen(t) - - done := make(chan struct{}) - go func() { - defer func() { - done <- struct{}{} - }() - - require.EqualError( - t, - app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx), - "media server exited", - ) - }() - - require.EventuallyWithT( - t, - func(c *assert.CollectT) { - assert.True(c, contentsIncludes(getContents(), "Server process exited unexpectedly."), "expected to see title") - assert.True(c, contentsIncludes(getContents(), "address already in use"), "expected to see message") - }, - waitTime, - time.Second, - "expected to see media server error modal", - ) - printScreen(t, getContents, "Ater displaying the media server error modal") - - // Quit the app, this should cause the done channel to receive. - sendKey(t, screen, tcell.KeyEnter, ' ') - - <-done -} - -func TestIntegrationDockerClientError(t *testing.T) { - ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) - defer cancel() - - logger := testhelpers.NewTestLogger(t).With("component", "integration") - - var dockerClient mocks.DockerClient - dockerClient.EXPECT().NetworkCreate(mock.Anything, mock.Anything, mock.Anything).Return(network.CreateResponse{}, errors.New("boom")) - - configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) - screen, screenCaptureC, getContents := setupSimulationScreen(t) - - done := make(chan struct{}) - go func() { - defer func() { - done <- struct{}{} - }() - - require.EqualError( - t, - app.New(buildAppParams(t, configService, &dockerClient, screen, screenCaptureC, logger)).Run(ctx), - "create container client: network create: boom", - ) - }() - - require.EventuallyWithT( - t, - func(c *assert.CollectT) { - assert.True(c, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message") - assert.True(c, contentsIncludes(getContents(), "create container client: network create: boom"), "expected to see message") - }, - waitTime, - time.Second, - "expected to see fatal error modal", - ) - printScreen(t, getContents, "Ater displaying the fatal error modal") - - // Quit the app, this should cause the done channel to receive. - sendKey(t, screen, tcell.KeyEnter, ' ') - - <-done -} - -func TestIntegrationDockerConnectionError(t *testing.T) { - ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) - defer cancel() - - logger := testhelpers.NewTestLogger(t).With("component", "integration") - dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost("http://docker.example.com")) - require.NoError(t, err) - - configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) - screen, screenCaptureC, getContents := setupSimulationScreen(t) - - done := make(chan struct{}) - go func() { - defer func() { - done <- struct{}{} - }() - - err := app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx) - require.ErrorContains(t, err, "dial tcp: lookup docker.example.com") - require.ErrorContains(t, err, "no such host") - }() - - require.EventuallyWithT( - t, - func(c *assert.CollectT) { - assert.True(c, contentsIncludes(getContents(), "An error occurred:"), "expected to see error message") - assert.True(c, contentsIncludes(getContents(), "Could not connect to Docker. Is Docker installed"), "expected to see message") - }, - waitTime, - time.Second, - "expected to see fatal error modal", - ) - printScreen(t, getContents, "Ater displaying the fatal error modal") - - // Quit the app, this should cause the done channel to receive. - sendKey(t, screen, tcell.KeyEnter, ' ') - - <-done + result := <-ch + assert.ErrorContains(t, result.errClient, "context canceled") + assert.ErrorIs(t, result.errServer, context.Canceled) } func TestIntegrationCopyURLs(t *testing.T) { @@ -1036,14 +889,8 @@ func TestIntegrationCopyURLs(t *testing.T) { configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: tc.mediaServerConfig}}) screen, screenCaptureC, getContents := setupSimulationScreen(t) - done := make(chan struct{}) - go func() { - defer func() { - done <- struct{}{} - }() - - require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) - }() + client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger) + ch := runClientServer(ctx, t, client, server) time.Sleep(3 * time.Second) printScreen(t, getContents, "Ater loading the app") @@ -1064,7 +911,9 @@ func TestIntegrationCopyURLs(t *testing.T) { cancel() - <-done + result := <-ch + assert.ErrorContains(t, result.errClient, "context canceled") + assert.ErrorIs(t, result.errServer, context.Canceled) }) } } diff --git a/internal/client/clientapp.go b/internal/client/clientapp.go index 572a5c4..6fbc7e7 100644 --- a/internal/client/clientapp.go +++ b/internal/client/clientapp.go @@ -70,7 +70,7 @@ func (a *App) Run(ctx context.Context) error { continue } - a.logger.Debug("Received event", "type", fmt.Sprintf("%T", evt)) + a.logger.Debug("Received event") a.bus.Send(protocol.EventFromProto(evt)) } })