From e14cfdee85e39bf6eef82d04dc3eda0391f468f2 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Sat, 5 Apr 2025 09:37:01 +0200 Subject: [PATCH] refactor: startup check - separate mediaserver create and start - avoid blocking main app loop during startup check - remove ui.allowQuit - add integration test --- internal/app/app.go | 85 ++++++++++++++++++--------- internal/app/integration_test.go | 95 +++++++++++++++++++++++++++++++ internal/domain/types.go | 15 +++-- internal/mediaserver/actor.go | 78 ++++++++++++------------- internal/replicator/replicator.go | 4 +- internal/terminal/terminal.go | 30 +--------- 6 files changed, 203 insertions(+), 104 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 34b9b57..b26cd77 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -37,6 +37,11 @@ func Run(ctx context.Context, params RunParams) error { state := new(domain.AppState) applyConfig(cfg, state) + // While RTMP is the only source, it doesn't make sense to disable it. + if !cfg.Sources.RTMP.Enabled { + return errors.New("config: sources.rtmp.enabled must be set to true") + } + logger := params.Logger ui, err := terminal.StartUI(ctx, terminal.StartParams{ Screen: params.Screen, @@ -59,40 +64,18 @@ func Run(ctx context.Context, params RunParams) error { updateUI := func() { ui.SetState(*state) } updateUI() - var exists bool - if exists, err = containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil { - return fmt.Errorf("check existing containers: %w", err) - } else if exists { - if ui.ShowStartupCheckModal() { - if err = containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil { - return fmt.Errorf("remove existing containers: %w", err) - } - if err = containerClient.RemoveUnusedNetworks(ctx); err != nil { - return fmt.Errorf("remove unused networks: %w", err) - } - } else { - return nil - } - } - ui.AllowQuit() - - // While RTMP is the only source, it doesn't make sense to disable it. - if !cfg.Sources.RTMP.Enabled { - return errors.New("config: sources.rtmp.enabled must be set to true") - } - - srv, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{ + srv, err := mediaserver.NewActor(ctx, mediaserver.StartActorParams{ StreamKey: mediaserver.StreamKey(cfg.Sources.RTMP.StreamKey), ContainerClient: containerClient, Logger: logger.With("component", "mediaserver"), }) if err != nil { - return fmt.Errorf("start mediaserver: %w", err) + return fmt.Errorf("create mediaserver: %w", err) } defer srv.Close() - repl := replicator.NewActor(ctx, replicator.NewActorParams{ - SourceURL: srv.State().RTMPInternalURL, + repl := replicator.StartActor(ctx, replicator.NewActorParams{ + SourceURL: srv.RTMPInternalURL(), ContainerClient: containerClient, Logger: logger.With("component", "replicator"), }) @@ -102,8 +85,22 @@ func Run(ctx context.Context, params RunParams) error { uiUpdateT := time.NewTicker(uiUpdateInterval) defer uiUpdateT.Stop() + startupCheckC := doStartupCheck(ctx, containerClient, ui.ShowStartupCheckModal) + for { select { + case err := <-startupCheckC: + if errors.Is(err, errStartupCheckUserQuit) { + return nil + } else if err != nil { + return fmt.Errorf("startup check: %w", err) + } else { + startupCheckC = nil + + if err = srv.Start(ctx); err != nil { + return fmt.Errorf("start mediaserver: %w", err) + } + } case cfg = <-params.ConfigService.C(): applyConfig(cfg, state) updateUI() @@ -248,3 +245,39 @@ func resolveDestinations(destinations []domain.Destination, inDestinations []con return destinations[:len(inDestinations)] } + +var errStartupCheckUserQuit = errors.New("user quit startup check modal") + +// doStartupCheck performs a startup check to see if there are any existing app +// containers. +// +// It returns a channel that will be closed, possibly after receiving an error. +// If the error is non-nil the app must not be started. If the error is +// [errStartupCheckUserQuit], the user voluntarily quit the startup check +// modal. +func doStartupCheck(ctx context.Context, containerClient *container.Client, showModal func() bool) <-chan error { + ch := make(chan error, 1) + + go func() { + defer close(ch) + + if exists, err := containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil { + ch <- fmt.Errorf("check existing containers: %w", err) + } else if exists { + if showModal() { + if err = containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil { + ch <- fmt.Errorf("remove existing containers: %w", err) + return + } + if err = containerClient.RemoveUnusedNetworks(ctx); err != nil { + ch <- fmt.Errorf("remove unused networks: %w", err) + return + } + } else { + ch <- errStartupCheckUserQuit + } + } + }() + + return ch +} diff --git a/internal/app/integration_test.go b/internal/app/integration_test.go index 7d4d729..bbc51a5 100644 --- a/internal/app/integration_test.go +++ b/internal/app/integration_test.go @@ -15,10 +15,12 @@ import ( "git.netflux.io/rob/octoplex/internal/app" "git.netflux.io/rob/octoplex/internal/config" + "git.netflux.io/rob/octoplex/internal/container" "git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/terminal" "git.netflux.io/rob/octoplex/internal/testhelpers" dockerclient "github.com/docker/docker/client" + "github.com/docker/docker/errdefs" "github.com/gdamore/tcell/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -381,6 +383,99 @@ func TestIntegrationDestinationValidations(t *testing.T) { <-done } +func TestIntegrationStartupCheck(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute) + defer cancel() + + // Start a container that looks like a stale Octoplex container: + staleContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "bluenviron/mediamtx:latest", + Env: map[string]string{"MTX_RTMPADDRESS": ":1937"}, + ExposedPorts: []string{"1937/tcp"}, + WaitingFor: wait.ForListeningPort("1937/tcp"), + // It doesn't matter what image this container uses, as long as it has + // labels that look like an Octoplex container: + Labels: map[string]string{container.LabelApp: domain.AppName}, + }, + Started: true, + }) + testcontainers.CleanupContainer(t, staleContainer) + require.NoError(t, err) + + require.NoError(t, err) + logger := testhelpers.NewTestLogger(t).With("component", "integration") + dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation()) + require.NoError(t, err) + + configService := setupConfigService(t, config.Config{Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}}}) + screen, screenCaptureC, getContents := setupSimulationScreen(t) + + done := make(chan struct{}) + go func() { + err := app.Run(ctx, app.RunParams{ + ConfigService: configService, + DockerClient: dockerClient, + Screen: &terminal.Screen{ + Screen: screen, + Width: 200, + Height: 25, + CaptureC: screenCaptureC, + }, + ClipboardAvailable: false, + BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"}, + Logger: logger, + }) + require.NoError(t, err) + + done <- struct{}{} + }() + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + assert.True(t, contentsIncludes(getContents(), "Another instance of Octoplex may already be running."), "expected to see startup check modal") + }, + 30*time.Second, + time.Second, + "expected to see startup check modal", + ) + printScreen(getContents, "Ater displaying the startup check modal") + + sendKey(screen, tcell.KeyEnter, ' ') // quit other containers + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + _, err := staleContainer.State(context.Background()) + // IsRunning() does not work, probably because we're undercutting the + // testcontainers API. + require.True(t, errdefs.IsNotFound(err), "expected to not find the container") + }, + time.Minute, + 2*time.Second, + "expected to quit the other containers", + ) + printScreen(getContents, "After quitting the other containers") + + require.EventuallyWithT( + t, + func(t *assert.CollectT) { + contents := getContents() + require.True(t, len(contents) > 2, "expected at least 3 lines of output") + + assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting") + }, + 10*time.Second, + time.Second, + "expected the mediaserver to start", + ) + printScreen(getContents, "After starting the mediaserver") + cancel() + + <-done +} + func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) { // Fetching the screen contents is tricky at this level of the test pyramid, // because we need to: diff --git a/internal/domain/types.go b/internal/domain/types.go index 9793fbf..74d4357 100644 --- a/internal/domain/types.go +++ b/internal/domain/types.go @@ -31,14 +31,13 @@ type BuildInfo struct { // Source represents the source, currently always the mediaserver. type Source struct { - Container Container - Live bool - LiveChangedAt time.Time - Listeners int - Tracks []string - RTMPURL string - RTMPInternalURL string - ExitReason string + Container Container + Live bool + LiveChangedAt time.Time + Listeners int + Tracks []string + RTMPURL string + ExitReason string } // DestinationStatus reflects the high-level status of a single destination. diff --git a/internal/mediaserver/actor.go b/internal/mediaserver/actor.go index 415351c..e3e29b9 100644 --- a/internal/mediaserver/actor.go +++ b/internal/mediaserver/actor.go @@ -42,16 +42,16 @@ type action func() // Actor is responsible for managing the media server. type Actor struct { - ctx context.Context - cancel context.CancelFunc actorC chan action stateC chan domain.Source + chanSize int containerClient *container.Client apiPort int rtmpPort int streamKey StreamKey fetchIngressStateInterval time.Duration pass string // password for the media server + tlsCert, tlsKey []byte // TLS cert and key for the media server logger *slog.Logger apiClient *http.Client @@ -71,18 +71,10 @@ type StartActorParams struct { Logger *slog.Logger } -// StartActor starts a new media server actor. +// NewActor creates a new media server actor. // // Callers must consume the state channel exposed via [C]. -func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err error) { - ctx, cancel := context.WithCancel(ctx) - defer func() { - // if err is nil, the context should not be cancelled. - if err != nil { - cancel() - } - }() - +func NewActor(ctx context.Context, params StartActorParams) (_ *Actor, err error) { tlsCert, tlsKey, err := generateTLSCert() if err != nil { return nil, fmt.Errorf("generate TLS cert: %w", err) @@ -93,26 +85,33 @@ func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err err } chanSize := cmp.Or(params.ChanSize, defaultChanSize) - actor := &Actor{ - ctx: ctx, - cancel: cancel, + return &Actor{ apiPort: cmp.Or(params.APIPort, defaultAPIPort), rtmpPort: cmp.Or(params.RTMPPort, defaultRTMPPort), streamKey: cmp.Or(params.StreamKey, defaultStreamKey), fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval), + tlsCert: tlsCert, + tlsKey: tlsKey, pass: generatePassword(), actorC: make(chan action, chanSize), state: new(domain.Source), stateC: make(chan domain.Source, chanSize), + chanSize: chanSize, containerClient: params.ContainerClient, logger: params.Logger, apiClient: apiClient, - } + }, nil +} - apiPortSpec := nat.Port(strconv.Itoa(actor.apiPort) + ":9997") - rtmpPortSpec := nat.Port(strconv.Itoa(actor.rtmpPort) + ":1935") +func (a *Actor) Start(ctx context.Context) error { + apiPortSpec := nat.Port(strconv.Itoa(a.apiPort) + ":9997") + rtmpPortSpec := nat.Port(strconv.Itoa(+a.rtmpPort) + ":1935") exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)}) + // The RTMP URL is passed to the UI via the state. + // This could be refactored, it's not really stateful data. + a.state.RTMPURL = a.RTMPURL() + cfg, err := yaml.Marshal( Config{ LogLevel: "info", @@ -128,7 +127,7 @@ func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err err }, { User: "api", - Pass: actor.pass, + Pass: a.pass, IPs: []string{}, // any IP Permissions: []UserPermission{ {Action: "read"}, @@ -136,7 +135,7 @@ func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err err }, { User: "api", - Pass: actor.pass, + Pass: a.pass, IPs: []string{}, // any IP Permissions: []UserPermission{{Action: "api"}}, }, @@ -146,19 +145,19 @@ func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err err APIServerCert: "/etc/tls.crt", APIServerKey: "/etc/tls.key", Paths: map[string]Path{ - string(actor.streamKey): {Source: "publisher"}, + string(a.streamKey): {Source: "publisher"}, }, }, ) if err != nil { // should never happen - return nil, fmt.Errorf("marshal config: %w", err) + return fmt.Errorf("marshal config: %w", err) } - containerStateC, errC := params.ContainerClient.RunContainer( + containerStateC, errC := a.containerClient.RunContainer( ctx, container.RunContainerParams{ Name: componentName, - ChanSize: chanSize, + ChanSize: a.chanSize, ContainerConfig: &typescontainer.Config{ Image: imageNameMediaMTX, Hostname: "mediaserver", @@ -171,7 +170,7 @@ func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err err "--silent", "--cacert", "/etc/tls.crt", "--config", "/etc/healthcheckopts.txt", - actor.healthCheckURL(), + a.healthCheckURL(), }, Interval: time.Second * 10, StartPeriod: time.Second * 2, @@ -193,29 +192,26 @@ func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err err }, { Path: "/etc/tls.crt", - Payload: bytes.NewReader(tlsCert), + Payload: bytes.NewReader(a.tlsCert), Mode: 0600, }, { Path: "/etc/tls.key", - Payload: bytes.NewReader(tlsKey), + Payload: bytes.NewReader(a.tlsKey), Mode: 0600, }, { Path: "/etc/healthcheckopts.txt", - Payload: bytes.NewReader([]byte(fmt.Sprintf("--user api:%s", actor.pass))), + Payload: bytes.NewReader([]byte(fmt.Sprintf("--user api:%s", a.pass))), Mode: 0600, }, }, }, ) - actor.state.RTMPURL = actor.rtmpURL() - actor.state.RTMPInternalURL = actor.rtmpInternalURL() + go a.actorLoop(ctx, containerStateC, errC) - go actor.actorLoop(containerStateC, errC) - - return actor, nil + return nil } // C returns a channel that will receive the current state of the media server. @@ -224,6 +220,8 @@ func (s *Actor) C() <-chan domain.Source { } // State returns the current state of the media server. +// +// Blocks if the actor is not started yet. func (s *Actor) State() domain.Source { resultChan := make(chan domain.Source) s.actorC <- func() { @@ -241,14 +239,12 @@ func (s *Actor) Close() error { return fmt.Errorf("remove containers: %w", err) } - s.cancel() - return nil } // actorLoop is the main loop of the media server actor. It exits when the // actor is closed, or the parent context is cancelled. -func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan error) { +func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Container, errC <-chan error) { fetchStateT := time.NewTicker(s.fetchIngressStateInterval) defer fetchStateT.Stop() @@ -331,7 +327,7 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e continue } action() - case <-s.ctx.Done(): + case <-ctx.Done(): return } } @@ -351,14 +347,14 @@ func (s *Actor) handleContainerExit(err error) { s.state.Live = false } -// rtmpURL returns the RTMP URL for the media server, accessible from the host. -func (s *Actor) rtmpURL() string { +// RTMPURL returns the RTMP URL for the media server, accessible from the host. +func (s *Actor) RTMPURL() string { return fmt.Sprintf("rtmp://localhost:%d/%s", s.rtmpPort, s.streamKey) } -// rtmpInternalURL returns the RTMP URL for the media server, accessible from +// RTMPInternalURL returns the RTMP URL for the media server, accessible from // the app network. -func (s *Actor) rtmpInternalURL() string { +func (s *Actor) RTMPInternalURL() string { // Container port, not host port: return fmt.Sprintf("rtmp://mediaserver:1935/%s?user=api&pass=%s", s.streamKey, s.pass) } diff --git a/internal/replicator/replicator.go b/internal/replicator/replicator.go index d2e06d8..92bf9f3 100644 --- a/internal/replicator/replicator.go +++ b/internal/replicator/replicator.go @@ -55,10 +55,10 @@ type NewActorParams struct { Logger *slog.Logger } -// NewActor starts a new replicator actor. +// StartActor starts a new replicator actor. // // The channel exposed by [C] must be consumed by the caller. -func NewActor(ctx context.Context, params NewActorParams) *Actor { +func StartActor(ctx context.Context, params NewActorParams) *Actor { ctx, cancel := context.WithCancel(ctx) actor := &Actor{ diff --git a/internal/terminal/terminal.go b/internal/terminal/terminal.go index 4f0c95a..90d9cab 100644 --- a/internal/terminal/terminal.go +++ b/internal/terminal/terminal.go @@ -61,9 +61,6 @@ type UI struct { mu sync.Mutex urlsToStartState map[string]startState - // allowQuit is true if the user is allowed to quit the app (after the - // startup check has completed). - allowQuit bool /// addingDestination is true if add destination modal is currently visible. addingDestination bool // hasDestinations is true if the UI thinks there are destinations @@ -341,7 +338,7 @@ func (ui *UI) ShowStartupCheckModal() bool { ui.app.QueueUpdateDraw(func() { ui.showModal( pageNameModalStartupCheck, - "Another instance of Octoplex may already be running. Pressing continue will close that instance. Continue?", + "Another instance of Octoplex may already be running.\n\nPressing continue will close that instance. Continue?", []string{"Continue", "Exit"}, func(buttonIndex int, _ string) { if buttonIndex == 0 { @@ -361,7 +358,7 @@ func (ui *UI) ShowDestinationErrorModal(name string, err error) { ui.app.QueueUpdateDraw(func() { ui.showModal( - pageNameModalStartupCheck, + pageNameModalDestinationError, fmt.Sprintf( "Streaming to %s failed:\n\n%s", cmp.Or(name, "this destination"), @@ -377,19 +374,6 @@ func (ui *UI) ShowDestinationErrorModal(name string, err error) { <-done } -// AllowQuit enables the quit action. -func (ui *UI) AllowQuit() { - ui.mu.Lock() - defer ui.mu.Unlock() - - // This is required to prevent the user from quitting during the startup - // check modal, when the main event loop is not yet running, and avoid an - // unexpected user experience. It might be nice to find a way to remove this - // but it probably means refactoring the mediaserver actor to separate - // starting the server from starting the event loop. - ui.allowQuit = true -} - // captureScreen captures the screen and sends it to the screenCaptureC // channel, which must have been set in StartParams. // @@ -498,6 +482,7 @@ const ( pageNameMain = "main" pageNameNoDestinations = "no-destinations" pageNameAddDestination = "add-destination" + pageNameModalDestinationError = "modal-destination-error" pageNameModalAbout = "modal-about" pageNameModalQuit = "modal-quit" pageNameModalStartupCheck = "modal-startup-check" @@ -912,15 +897,6 @@ func (ui *UI) copyConfigFilePathToClipboard(clipboardAvailable bool, configFileP } func (ui *UI) confirmQuit() { - var allowQuit bool - ui.mu.Lock() - allowQuit = ui.allowQuit - ui.mu.Unlock() - - if !allowQuit { - return - } - ui.showModal( pageNameModalQuit, "Are you sure you want to quit?",