refactor: startup check
Some checks failed
ci-build / lint (push) Has been cancelled
ci-build / build (push) Has been cancelled
ci-build / release (push) Has been cancelled

- separate mediaserver create and start
- avoid blocking main app loop during startup check
- remove ui.allowQuit
- add integration test
This commit is contained in:
Rob Watson 2025-04-05 09:37:01 +02:00
parent 266a9307d2
commit e14cfdee85
6 changed files with 203 additions and 104 deletions

View File

@ -37,6 +37,11 @@ func Run(ctx context.Context, params RunParams) error {
state := new(domain.AppState)
applyConfig(cfg, state)
// While RTMP is the only source, it doesn't make sense to disable it.
if !cfg.Sources.RTMP.Enabled {
return errors.New("config: sources.rtmp.enabled must be set to true")
}
logger := params.Logger
ui, err := terminal.StartUI(ctx, terminal.StartParams{
Screen: params.Screen,
@ -59,40 +64,18 @@ func Run(ctx context.Context, params RunParams) error {
updateUI := func() { ui.SetState(*state) }
updateUI()
var exists bool
if exists, err = containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil {
return fmt.Errorf("check existing containers: %w", err)
} else if exists {
if ui.ShowStartupCheckModal() {
if err = containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil {
return fmt.Errorf("remove existing containers: %w", err)
}
if err = containerClient.RemoveUnusedNetworks(ctx); err != nil {
return fmt.Errorf("remove unused networks: %w", err)
}
} else {
return nil
}
}
ui.AllowQuit()
// While RTMP is the only source, it doesn't make sense to disable it.
if !cfg.Sources.RTMP.Enabled {
return errors.New("config: sources.rtmp.enabled must be set to true")
}
srv, err := mediaserver.StartActor(ctx, mediaserver.StartActorParams{
srv, err := mediaserver.NewActor(ctx, mediaserver.StartActorParams{
StreamKey: mediaserver.StreamKey(cfg.Sources.RTMP.StreamKey),
ContainerClient: containerClient,
Logger: logger.With("component", "mediaserver"),
})
if err != nil {
return fmt.Errorf("start mediaserver: %w", err)
return fmt.Errorf("create mediaserver: %w", err)
}
defer srv.Close()
repl := replicator.NewActor(ctx, replicator.NewActorParams{
SourceURL: srv.State().RTMPInternalURL,
repl := replicator.StartActor(ctx, replicator.NewActorParams{
SourceURL: srv.RTMPInternalURL(),
ContainerClient: containerClient,
Logger: logger.With("component", "replicator"),
})
@ -102,8 +85,22 @@ func Run(ctx context.Context, params RunParams) error {
uiUpdateT := time.NewTicker(uiUpdateInterval)
defer uiUpdateT.Stop()
startupCheckC := doStartupCheck(ctx, containerClient, ui.ShowStartupCheckModal)
for {
select {
case err := <-startupCheckC:
if errors.Is(err, errStartupCheckUserQuit) {
return nil
} else if err != nil {
return fmt.Errorf("startup check: %w", err)
} else {
startupCheckC = nil
if err = srv.Start(ctx); err != nil {
return fmt.Errorf("start mediaserver: %w", err)
}
}
case cfg = <-params.ConfigService.C():
applyConfig(cfg, state)
updateUI()
@ -248,3 +245,39 @@ func resolveDestinations(destinations []domain.Destination, inDestinations []con
return destinations[:len(inDestinations)]
}
var errStartupCheckUserQuit = errors.New("user quit startup check modal")
// doStartupCheck performs a startup check to see if there are any existing app
// containers.
//
// It returns a channel that will be closed, possibly after receiving an error.
// If the error is non-nil the app must not be started. If the error is
// [errStartupCheckUserQuit], the user voluntarily quit the startup check
// modal.
func doStartupCheck(ctx context.Context, containerClient *container.Client, showModal func() bool) <-chan error {
ch := make(chan error, 1)
go func() {
defer close(ch)
if exists, err := containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil {
ch <- fmt.Errorf("check existing containers: %w", err)
} else if exists {
if showModal() {
if err = containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil {
ch <- fmt.Errorf("remove existing containers: %w", err)
return
}
if err = containerClient.RemoveUnusedNetworks(ctx); err != nil {
ch <- fmt.Errorf("remove unused networks: %w", err)
return
}
} else {
ch <- errStartupCheckUserQuit
}
}
}()
return ch
}

View File

@ -15,10 +15,12 @@ import (
"git.netflux.io/rob/octoplex/internal/app"
"git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/terminal"
"git.netflux.io/rob/octoplex/internal/testhelpers"
dockerclient "github.com/docker/docker/client"
"github.com/docker/docker/errdefs"
"github.com/gdamore/tcell/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -381,6 +383,99 @@ func TestIntegrationDestinationValidations(t *testing.T) {
<-done
}
func TestIntegrationStartupCheck(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Minute)
defer cancel()
// Start a container that looks like a stale Octoplex container:
staleContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "bluenviron/mediamtx:latest",
Env: map[string]string{"MTX_RTMPADDRESS": ":1937"},
ExposedPorts: []string{"1937/tcp"},
WaitingFor: wait.ForListeningPort("1937/tcp"),
// It doesn't matter what image this container uses, as long as it has
// labels that look like an Octoplex container:
Labels: map[string]string{container.LabelApp: domain.AppName},
},
Started: true,
})
testcontainers.CleanupContainer(t, staleContainer)
require.NoError(t, err)
require.NoError(t, err)
logger := testhelpers.NewTestLogger(t).With("component", "integration")
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
require.NoError(t, err)
configService := setupConfigService(t, config.Config{Sources: config.Sources{RTMP: config.RTMPSource{Enabled: true}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{})
go func() {
err := app.Run(ctx, app.RunParams{
ConfigService: configService,
DockerClient: dockerClient,
Screen: &terminal.Screen{
Screen: screen,
Width: 200,
Height: 25,
CaptureC: screenCaptureC,
},
ClipboardAvailable: false,
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
Logger: logger,
})
require.NoError(t, err)
done <- struct{}{}
}()
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
assert.True(t, contentsIncludes(getContents(), "Another instance of Octoplex may already be running."), "expected to see startup check modal")
},
30*time.Second,
time.Second,
"expected to see startup check modal",
)
printScreen(getContents, "Ater displaying the startup check modal")
sendKey(screen, tcell.KeyEnter, ' ') // quit other containers
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
_, err := staleContainer.State(context.Background())
// IsRunning() does not work, probably because we're undercutting the
// testcontainers API.
require.True(t, errdefs.IsNotFound(err), "expected to not find the container")
},
time.Minute,
2*time.Second,
"expected to quit the other containers",
)
printScreen(getContents, "After quitting the other containers")
require.EventuallyWithT(
t,
func(t *assert.CollectT) {
contents := getContents()
require.True(t, len(contents) > 2, "expected at least 3 lines of output")
assert.Contains(t, contents[2], "Status waiting for stream", "expected mediaserver status to be waiting")
},
10*time.Second,
time.Second,
"expected the mediaserver to start",
)
printScreen(getContents, "After starting the mediaserver")
cancel()
<-done
}
func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) {
// Fetching the screen contents is tricky at this level of the test pyramid,
// because we need to:

View File

@ -37,7 +37,6 @@ type Source struct {
Listeners int
Tracks []string
RTMPURL string
RTMPInternalURL string
ExitReason string
}

View File

@ -42,16 +42,16 @@ type action func()
// Actor is responsible for managing the media server.
type Actor struct {
ctx context.Context
cancel context.CancelFunc
actorC chan action
stateC chan domain.Source
chanSize int
containerClient *container.Client
apiPort int
rtmpPort int
streamKey StreamKey
fetchIngressStateInterval time.Duration
pass string // password for the media server
tlsCert, tlsKey []byte // TLS cert and key for the media server
logger *slog.Logger
apiClient *http.Client
@ -71,18 +71,10 @@ type StartActorParams struct {
Logger *slog.Logger
}
// StartActor starts a new media server actor.
// NewActor creates a new media server actor.
//
// Callers must consume the state channel exposed via [C].
func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err error) {
ctx, cancel := context.WithCancel(ctx)
defer func() {
// if err is nil, the context should not be cancelled.
if err != nil {
cancel()
}
}()
func NewActor(ctx context.Context, params StartActorParams) (_ *Actor, err error) {
tlsCert, tlsKey, err := generateTLSCert()
if err != nil {
return nil, fmt.Errorf("generate TLS cert: %w", err)
@ -93,26 +85,33 @@ func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err err
}
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
actor := &Actor{
ctx: ctx,
cancel: cancel,
return &Actor{
apiPort: cmp.Or(params.APIPort, defaultAPIPort),
rtmpPort: cmp.Or(params.RTMPPort, defaultRTMPPort),
streamKey: cmp.Or(params.StreamKey, defaultStreamKey),
fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval),
tlsCert: tlsCert,
tlsKey: tlsKey,
pass: generatePassword(),
actorC: make(chan action, chanSize),
state: new(domain.Source),
stateC: make(chan domain.Source, chanSize),
chanSize: chanSize,
containerClient: params.ContainerClient,
logger: params.Logger,
apiClient: apiClient,
}, nil
}
apiPortSpec := nat.Port(strconv.Itoa(actor.apiPort) + ":9997")
rtmpPortSpec := nat.Port(strconv.Itoa(actor.rtmpPort) + ":1935")
func (a *Actor) Start(ctx context.Context) error {
apiPortSpec := nat.Port(strconv.Itoa(a.apiPort) + ":9997")
rtmpPortSpec := nat.Port(strconv.Itoa(+a.rtmpPort) + ":1935")
exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)})
// The RTMP URL is passed to the UI via the state.
// This could be refactored, it's not really stateful data.
a.state.RTMPURL = a.RTMPURL()
cfg, err := yaml.Marshal(
Config{
LogLevel: "info",
@ -128,7 +127,7 @@ func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err err
},
{
User: "api",
Pass: actor.pass,
Pass: a.pass,
IPs: []string{}, // any IP
Permissions: []UserPermission{
{Action: "read"},
@ -136,7 +135,7 @@ func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err err
},
{
User: "api",
Pass: actor.pass,
Pass: a.pass,
IPs: []string{}, // any IP
Permissions: []UserPermission{{Action: "api"}},
},
@ -146,19 +145,19 @@ func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err err
APIServerCert: "/etc/tls.crt",
APIServerKey: "/etc/tls.key",
Paths: map[string]Path{
string(actor.streamKey): {Source: "publisher"},
string(a.streamKey): {Source: "publisher"},
},
},
)
if err != nil { // should never happen
return nil, fmt.Errorf("marshal config: %w", err)
return fmt.Errorf("marshal config: %w", err)
}
containerStateC, errC := params.ContainerClient.RunContainer(
containerStateC, errC := a.containerClient.RunContainer(
ctx,
container.RunContainerParams{
Name: componentName,
ChanSize: chanSize,
ChanSize: a.chanSize,
ContainerConfig: &typescontainer.Config{
Image: imageNameMediaMTX,
Hostname: "mediaserver",
@ -171,7 +170,7 @@ func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err err
"--silent",
"--cacert", "/etc/tls.crt",
"--config", "/etc/healthcheckopts.txt",
actor.healthCheckURL(),
a.healthCheckURL(),
},
Interval: time.Second * 10,
StartPeriod: time.Second * 2,
@ -193,29 +192,26 @@ func StartActor(ctx context.Context, params StartActorParams) (_ *Actor, err err
},
{
Path: "/etc/tls.crt",
Payload: bytes.NewReader(tlsCert),
Payload: bytes.NewReader(a.tlsCert),
Mode: 0600,
},
{
Path: "/etc/tls.key",
Payload: bytes.NewReader(tlsKey),
Payload: bytes.NewReader(a.tlsKey),
Mode: 0600,
},
{
Path: "/etc/healthcheckopts.txt",
Payload: bytes.NewReader([]byte(fmt.Sprintf("--user api:%s", actor.pass))),
Payload: bytes.NewReader([]byte(fmt.Sprintf("--user api:%s", a.pass))),
Mode: 0600,
},
},
},
)
actor.state.RTMPURL = actor.rtmpURL()
actor.state.RTMPInternalURL = actor.rtmpInternalURL()
go a.actorLoop(ctx, containerStateC, errC)
go actor.actorLoop(containerStateC, errC)
return actor, nil
return nil
}
// C returns a channel that will receive the current state of the media server.
@ -224,6 +220,8 @@ func (s *Actor) C() <-chan domain.Source {
}
// State returns the current state of the media server.
//
// Blocks if the actor is not started yet.
func (s *Actor) State() domain.Source {
resultChan := make(chan domain.Source)
s.actorC <- func() {
@ -241,14 +239,12 @@ func (s *Actor) Close() error {
return fmt.Errorf("remove containers: %w", err)
}
s.cancel()
return nil
}
// actorLoop is the main loop of the media server actor. It exits when the
// actor is closed, or the parent context is cancelled.
func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan error) {
func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Container, errC <-chan error) {
fetchStateT := time.NewTicker(s.fetchIngressStateInterval)
defer fetchStateT.Stop()
@ -331,7 +327,7 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e
continue
}
action()
case <-s.ctx.Done():
case <-ctx.Done():
return
}
}
@ -351,14 +347,14 @@ func (s *Actor) handleContainerExit(err error) {
s.state.Live = false
}
// rtmpURL returns the RTMP URL for the media server, accessible from the host.
func (s *Actor) rtmpURL() string {
// RTMPURL returns the RTMP URL for the media server, accessible from the host.
func (s *Actor) RTMPURL() string {
return fmt.Sprintf("rtmp://localhost:%d/%s", s.rtmpPort, s.streamKey)
}
// rtmpInternalURL returns the RTMP URL for the media server, accessible from
// RTMPInternalURL returns the RTMP URL for the media server, accessible from
// the app network.
func (s *Actor) rtmpInternalURL() string {
func (s *Actor) RTMPInternalURL() string {
// Container port, not host port:
return fmt.Sprintf("rtmp://mediaserver:1935/%s?user=api&pass=%s", s.streamKey, s.pass)
}

View File

@ -55,10 +55,10 @@ type NewActorParams struct {
Logger *slog.Logger
}
// NewActor starts a new replicator actor.
// StartActor starts a new replicator actor.
//
// The channel exposed by [C] must be consumed by the caller.
func NewActor(ctx context.Context, params NewActorParams) *Actor {
func StartActor(ctx context.Context, params NewActorParams) *Actor {
ctx, cancel := context.WithCancel(ctx)
actor := &Actor{

View File

@ -61,9 +61,6 @@ type UI struct {
mu sync.Mutex
urlsToStartState map[string]startState
// allowQuit is true if the user is allowed to quit the app (after the
// startup check has completed).
allowQuit bool
/// addingDestination is true if add destination modal is currently visible.
addingDestination bool
// hasDestinations is true if the UI thinks there are destinations
@ -341,7 +338,7 @@ func (ui *UI) ShowStartupCheckModal() bool {
ui.app.QueueUpdateDraw(func() {
ui.showModal(
pageNameModalStartupCheck,
"Another instance of Octoplex may already be running. Pressing continue will close that instance. Continue?",
"Another instance of Octoplex may already be running.\n\nPressing continue will close that instance. Continue?",
[]string{"Continue", "Exit"},
func(buttonIndex int, _ string) {
if buttonIndex == 0 {
@ -361,7 +358,7 @@ func (ui *UI) ShowDestinationErrorModal(name string, err error) {
ui.app.QueueUpdateDraw(func() {
ui.showModal(
pageNameModalStartupCheck,
pageNameModalDestinationError,
fmt.Sprintf(
"Streaming to %s failed:\n\n%s",
cmp.Or(name, "this destination"),
@ -377,19 +374,6 @@ func (ui *UI) ShowDestinationErrorModal(name string, err error) {
<-done
}
// AllowQuit enables the quit action.
func (ui *UI) AllowQuit() {
ui.mu.Lock()
defer ui.mu.Unlock()
// This is required to prevent the user from quitting during the startup
// check modal, when the main event loop is not yet running, and avoid an
// unexpected user experience. It might be nice to find a way to remove this
// but it probably means refactoring the mediaserver actor to separate
// starting the server from starting the event loop.
ui.allowQuit = true
}
// captureScreen captures the screen and sends it to the screenCaptureC
// channel, which must have been set in StartParams.
//
@ -498,6 +482,7 @@ const (
pageNameMain = "main"
pageNameNoDestinations = "no-destinations"
pageNameAddDestination = "add-destination"
pageNameModalDestinationError = "modal-destination-error"
pageNameModalAbout = "modal-about"
pageNameModalQuit = "modal-quit"
pageNameModalStartupCheck = "modal-startup-check"
@ -912,15 +897,6 @@ func (ui *UI) copyConfigFilePathToClipboard(clipboardAvailable bool, configFileP
}
func (ui *UI) confirmQuit() {
var allowQuit bool
ui.mu.Lock()
allowQuit = ui.allowQuit
ui.mu.Unlock()
if !allowQuit {
return
}
ui.showModal(
pageNameModalQuit,
"Are you sure you want to quit?",