fixup! wip: refactor: API
This commit is contained in:
parent
311c100d89
commit
32701499e7
@ -5,6 +5,7 @@ package client_test
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
@ -24,6 +25,7 @@ import (
|
|||||||
"github.com/gdamore/tcell/v2"
|
"github.com/gdamore/tcell/v2"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/testcontainers/testcontainers-go"
|
"github.com/testcontainers/testcontainers-go"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
func buildClientServer(
|
func buildClientServer(
|
||||||
@ -37,7 +39,7 @@ func buildClientServer(
|
|||||||
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
|
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
|
||||||
Screen: &terminal.Screen{
|
Screen: &terminal.Screen{
|
||||||
Screen: screen,
|
Screen: screen,
|
||||||
Width: 160,
|
Width: 180,
|
||||||
Height: 25,
|
Height: 25,
|
||||||
CaptureC: screenCaptureC,
|
CaptureC: screenCaptureC,
|
||||||
},
|
},
|
||||||
@ -66,23 +68,21 @@ func runClientServer(
|
|||||||
) <-chan clientServerResult {
|
) <-chan clientServerResult {
|
||||||
ch := make(chan clientServerResult, 1)
|
ch := make(chan clientServerResult, 1)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
g, ctx := errgroup.WithContext(ctx)
|
||||||
var clientErr, srvErr error
|
var clientErr, srvErr error
|
||||||
|
|
||||||
wg.Add(1)
|
g.Go(func() error {
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
srvErr = serverApp.Run(ctx)
|
srvErr = serverApp.Run(ctx)
|
||||||
}()
|
return errors.New("server closed")
|
||||||
|
})
|
||||||
|
|
||||||
wg.Add(1)
|
g.Go(func() error {
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
clientErr = clientApp.Run(ctx)
|
clientErr = clientApp.Run(ctx)
|
||||||
}()
|
return errors.New("client closed")
|
||||||
|
})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Wait()
|
_ = g.Wait()
|
||||||
|
|
||||||
ch <- clientServerResult{errClient: clientErr, errServer: srvErr}
|
ch <- clientServerResult{errClient: clientErr, errServer: srvErr}
|
||||||
}()
|
}()
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -822,6 +823,44 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
|||||||
assert.ErrorIs(t, result.errServer, context.Canceled)
|
assert.ErrorIs(t, result.errServer, context.Canceled)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIntegrationMediaServerError(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
lis, err := net.Listen("tcp", ":1935")
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() { lis.Close() })
|
||||||
|
|
||||||
|
logger := testhelpers.NewTestLogger(t).With("component", "integration")
|
||||||
|
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||||
|
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||||
|
|
||||||
|
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
|
||||||
|
ch := runClientServer(ctx, t, client, server)
|
||||||
|
|
||||||
|
require.EventuallyWithT(
|
||||||
|
t,
|
||||||
|
func(c *assert.CollectT) {
|
||||||
|
assert.True(c, contentsIncludes(getContents(), "Server process exited unexpectedly."), "expected to see title")
|
||||||
|
assert.True(c, contentsIncludes(getContents(), "address already in use"), "expected to see message")
|
||||||
|
},
|
||||||
|
waitTime,
|
||||||
|
time.Second,
|
||||||
|
"expected to see media server error modal",
|
||||||
|
)
|
||||||
|
printScreen(t, getContents, "Ater displaying the media server error modal")
|
||||||
|
|
||||||
|
// Quit the app:
|
||||||
|
sendKey(t, screen, tcell.KeyEnter, ' ')
|
||||||
|
|
||||||
|
result := <-ch
|
||||||
|
assert.ErrorContains(t, result.errClient, "context canceled")
|
||||||
|
assert.ErrorContains(t, result.errServer, "media server exited")
|
||||||
|
}
|
||||||
|
|
||||||
func TestIntegrationCopyURLs(t *testing.T) {
|
func TestIntegrationCopyURLs(t *testing.T) {
|
||||||
type binding struct {
|
type binding struct {
|
||||||
key tcell.Key
|
key tcell.Key
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"git.netflux.io/rob/octoplex/internal/event"
|
"git.netflux.io/rob/octoplex/internal/event"
|
||||||
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
|
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
|
||||||
@ -13,6 +14,10 @@ import (
|
|||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// APIListenerCountDeltaFunc is a function that is called when the number of
|
||||||
|
// API clients increments or decrements.
|
||||||
|
type APIClientCountDeltaFunc func(delta int)
|
||||||
|
|
||||||
// Server is the gRPC server that handles incoming commands and outgoing
|
// Server is the gRPC server that handles incoming commands and outgoing
|
||||||
// events.
|
// events.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
@ -21,6 +26,9 @@ type Server struct {
|
|||||||
dispatcher func(event.Command)
|
dispatcher func(event.Command)
|
||||||
bus *event.Bus
|
bus *event.Bus
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
clientCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
// newServer creates a new gRPC server.
|
// newServer creates a new gRPC server.
|
||||||
@ -56,6 +64,16 @@ func (s *Server) Communicate(stream pb.InternalAPI_CommunicateServer) error {
|
|||||||
})
|
})
|
||||||
|
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.clientCount++
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.clientCount--
|
||||||
|
s.mu.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
in, err := stream.Recv()
|
in, err := stream.Recv()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
@ -87,3 +105,11 @@ func (s *Server) Communicate(stream pb.InternalAPI_CommunicateServer) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetClientCount returns the number of connected clients.
|
||||||
|
func (s *Server) GetClientCount() int {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
return s.clientCount
|
||||||
|
}
|
||||||
|
@ -67,11 +67,6 @@ func (a *App) Run(ctx context.Context) error {
|
|||||||
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
|
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
|
||||||
}
|
}
|
||||||
|
|
||||||
// doFatalError publishes a fatal error to the event bus.
|
|
||||||
doFatalError := func(msg string) {
|
|
||||||
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
|
|
||||||
}
|
|
||||||
|
|
||||||
const grpcAddr = ":50051"
|
const grpcAddr = ":50051"
|
||||||
lis, err := net.Listen("tcp", grpcAddr)
|
lis, err := net.Listen("tcp", grpcAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -81,13 +76,40 @@ func (a *App) Run(ctx context.Context) error {
|
|||||||
|
|
||||||
grpcServer := grpc.NewServer()
|
grpcServer := grpc.NewServer()
|
||||||
grpcDone := make(chan error, 1)
|
grpcDone := make(chan error, 1)
|
||||||
|
internalAPI := newServer(a.DispatchAsync, a.eventBus, a.logger)
|
||||||
pb.RegisterInternalAPIServer(grpcServer, newServer(a.DispatchAsync, a.eventBus, a.logger))
|
pb.RegisterInternalAPIServer(grpcServer, internalAPI)
|
||||||
go func() {
|
go func() {
|
||||||
a.logger.Info("gRPC server started", "addr", grpcAddr)
|
a.logger.Info("gRPC server started", "addr", grpcAddr)
|
||||||
grpcDone <- grpcServer.Serve(lis)
|
grpcDone <- grpcServer.Serve(lis)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// emptyUI is a dummy function that sets the UI state to an empty state, and
|
||||||
|
// re-renders the screen.
|
||||||
|
//
|
||||||
|
// This is a workaround for a weird interaction between tview and
|
||||||
|
// tcell.SimulationScreen which leads to newly-added pages not rendering if
|
||||||
|
// the UI is not re-rendered for a second time.
|
||||||
|
// It is only needed for integration tests when rendering modals before the
|
||||||
|
// main loop starts. It would be nice to remove this but the risk/impact on
|
||||||
|
// non-test code is pretty low.
|
||||||
|
emptyUI := func() {
|
||||||
|
a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}})
|
||||||
|
}
|
||||||
|
|
||||||
|
// doFatalError publishes a fatal error to the event bus. It will block until
|
||||||
|
// the user acknowledges it if there is 1 or more clients connected to the
|
||||||
|
// internal API.
|
||||||
|
doFatalError := func(msg string) {
|
||||||
|
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
|
||||||
|
|
||||||
|
if internalAPI.GetClientCount() == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
emptyUI()
|
||||||
|
<-a.dispatchC
|
||||||
|
}
|
||||||
|
|
||||||
containerClient, err := container.NewClient(ctx, a.dockerClient, a.logger.With("component", "container_client"))
|
containerClient, err := container.NewClient(ctx, a.dockerClient, a.logger.With("component", "container_client"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("create container client: %w", err)
|
err = fmt.Errorf("create container client: %w", err)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user