refactor(app): async startup check
This commit is contained in:
parent
4a16780915
commit
94623248c0
@ -154,24 +154,24 @@ func (a *App) Run(ctx context.Context) error {
|
|||||||
uiUpdateT := time.NewTicker(uiUpdateInterval)
|
uiUpdateT := time.NewTicker(uiUpdateInterval)
|
||||||
defer uiUpdateT.Stop()
|
defer uiUpdateT.Stop()
|
||||||
|
|
||||||
startupCheckC := doStartupCheck(ctx, containerClient, ui.ShowStartupCheckModal)
|
startMediaServerC := make(chan struct{}, 1)
|
||||||
|
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
|
||||||
|
startupErr = fmt.Errorf("startup check: %w", startupErr)
|
||||||
|
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: startupErr.Error()})
|
||||||
|
<-ui.C()
|
||||||
|
return startupErr
|
||||||
|
} else if ok {
|
||||||
|
startMediaServerC <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case err := <-startupCheckC:
|
case <-startMediaServerC:
|
||||||
if errors.Is(err, errStartupCheckUserQuit) {
|
if err = srv.Start(ctx); err != nil {
|
||||||
return nil
|
return fmt.Errorf("start mediaserver: %w", err)
|
||||||
} else if err != nil {
|
|
||||||
return fmt.Errorf("startup check: %w", err)
|
|
||||||
} else {
|
|
||||||
startupCheckC = nil
|
|
||||||
|
|
||||||
if err = srv.Start(ctx); err != nil {
|
|
||||||
return fmt.Errorf("start mediaserver: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
a.eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
a.eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()})
|
||||||
case <-a.configService.C():
|
case <-a.configService.C():
|
||||||
// No-op, config updates are handled synchronously for now.
|
// No-op, config updates are handled synchronously for now.
|
||||||
case cmd, ok := <-ui.C():
|
case cmd, ok := <-ui.C():
|
||||||
@ -181,7 +181,9 @@ func (a *App) Run(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if !a.handleCommand(cmd, state, repl) {
|
if ok, err := a.handleCommand(ctx, cmd, state, repl, containerClient, startMediaServerC); err != nil {
|
||||||
|
return fmt.Errorf("handle command: %w", err)
|
||||||
|
} else if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
case <-uiUpdateT.C:
|
case <-uiUpdateT.C:
|
||||||
@ -206,10 +208,13 @@ func (a *App) Run(ctx context.Context) error {
|
|||||||
// handleCommand handles an incoming command. It returns false if the app
|
// handleCommand handles an incoming command. It returns false if the app
|
||||||
// should not continue, i.e. quit.
|
// should not continue, i.e. quit.
|
||||||
func (a *App) handleCommand(
|
func (a *App) handleCommand(
|
||||||
|
ctx context.Context,
|
||||||
cmd domain.Command,
|
cmd domain.Command,
|
||||||
state *domain.AppState,
|
state *domain.AppState,
|
||||||
repl *replicator.Actor,
|
repl *replicator.Actor,
|
||||||
) bool {
|
containerClient *container.Client,
|
||||||
|
startMediaServerC chan struct{},
|
||||||
|
) (bool, error) {
|
||||||
a.logger.Debug("Command received", "cmd", cmd.Name())
|
a.logger.Debug("Command received", "cmd", cmd.Name())
|
||||||
switch c := cmd.(type) {
|
switch c := cmd.(type) {
|
||||||
case domain.CommandAddDestination:
|
case domain.CommandAddDestination:
|
||||||
@ -249,11 +254,17 @@ func (a *App) handleCommand(
|
|||||||
repl.StartDestination(c.URL)
|
repl.StartDestination(c.URL)
|
||||||
case domain.CommandStopDestination:
|
case domain.CommandStopDestination:
|
||||||
repl.StopDestination(c.URL)
|
repl.StopDestination(c.URL)
|
||||||
|
case domain.CommandCloseOtherInstance:
|
||||||
|
if err := closeOtherInstances(ctx, containerClient); err != nil {
|
||||||
|
return false, fmt.Errorf("close other instances: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
startMediaServerC <- struct{}{}
|
||||||
case domain.CommandQuit:
|
case domain.CommandQuit:
|
||||||
return false
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleConfigUpdate applies the config to the app state, and sends an AppStateChangedEvent.
|
// handleConfigUpdate applies the config to the app state, and sends an AppStateChangedEvent.
|
||||||
@ -341,40 +352,34 @@ func resolveDestinations(destinations []domain.Destination, inDestinations []con
|
|||||||
return destinations[:len(inDestinations)]
|
return destinations[:len(inDestinations)]
|
||||||
}
|
}
|
||||||
|
|
||||||
var errStartupCheckUserQuit = errors.New("user quit startup check modal")
|
|
||||||
|
|
||||||
// doStartupCheck performs a startup check to see if there are any existing app
|
// doStartupCheck performs a startup check to see if there are any existing app
|
||||||
// containers.
|
// containers.
|
||||||
//
|
//
|
||||||
// It returns a channel that will be closed, possibly after receiving an error.
|
// It returns a bool if the check is clear. If the bool is false, then
|
||||||
// If the error is non-nil the app must not be started. If the error is
|
// startup should be paused until the choice selected by the user is received
|
||||||
// [errStartupCheckUserQuit], the user voluntarily quit the startup check
|
// via a command.
|
||||||
// modal.
|
func doStartupCheck(ctx context.Context, containerClient *container.Client, eventBus *event.Bus) (bool, error) {
|
||||||
func doStartupCheck(ctx context.Context, containerClient *container.Client, showModal func() bool) <-chan error {
|
if exists, err := containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil {
|
||||||
ch := make(chan error, 1)
|
return false, fmt.Errorf("check existing containers: %w", err)
|
||||||
|
} else if exists {
|
||||||
|
eventBus.Send(event.OtherInstanceDetectedEvent{})
|
||||||
|
|
||||||
go func() {
|
return false, nil
|
||||||
defer close(ch)
|
}
|
||||||
|
|
||||||
if exists, err := containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil {
|
return true, nil
|
||||||
ch <- fmt.Errorf("check existing containers: %w", err)
|
}
|
||||||
} else if exists {
|
|
||||||
if showModal() {
|
|
||||||
if err = containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil {
|
|
||||||
ch <- fmt.Errorf("remove existing containers: %w", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err = containerClient.RemoveUnusedNetworks(ctx); err != nil {
|
|
||||||
ch <- fmt.Errorf("remove unused networks: %w", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ch <- errStartupCheckUserQuit
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return ch
|
func closeOtherInstances(ctx context.Context, containerClient *container.Client) error {
|
||||||
|
if err := containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil {
|
||||||
|
return fmt.Errorf("remove existing containers: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := containerClient.RemoveUnusedNetworks(ctx); err != nil {
|
||||||
|
return fmt.Errorf("remove unused networks: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildNetAddr builds a [mediaserver.OptionalNetAddr] from the config.
|
// buildNetAddr builds a [mediaserver.OptionalNetAddr] from the config.
|
||||||
|
@ -41,6 +41,14 @@ func (c CommandStopDestination) Name() string {
|
|||||||
return "stop_destination"
|
return "stop_destination"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CommandCloseOtherInstance closes the other instance of the application.
|
||||||
|
type CommandCloseOtherInstance struct{}
|
||||||
|
|
||||||
|
// Name implements the Command interface.
|
||||||
|
func (c CommandCloseOtherInstance) Name() string {
|
||||||
|
return "close_other_instance"
|
||||||
|
}
|
||||||
|
|
||||||
// CommandQuit quits the app.
|
// CommandQuit quits the app.
|
||||||
type CommandQuit struct{}
|
type CommandQuit struct{}
|
||||||
|
|
||||||
|
@ -12,6 +12,7 @@ const (
|
|||||||
EventNameDestinationRemoved Name = "destination_removed"
|
EventNameDestinationRemoved Name = "destination_removed"
|
||||||
EventNameRemoveDestinationFailed Name = "remove_destination_failed"
|
EventNameRemoveDestinationFailed Name = "remove_destination_failed"
|
||||||
EventNameFatalErrorOccurred Name = "fatal_error_occurred"
|
EventNameFatalErrorOccurred Name = "fatal_error_occurred"
|
||||||
|
EventNameOtherInstanceDetected Name = "other_instance_detected"
|
||||||
EventNameMediaServerStarted Name = "media_server_started"
|
EventNameMediaServerStarted Name = "media_server_started"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -80,6 +81,13 @@ type FatalErrorOccurredEvent struct {
|
|||||||
Message string
|
Message string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OtherInstanceDetectedEvent is emitted when the app launches and detects another instance.
|
||||||
|
type OtherInstanceDetectedEvent struct{}
|
||||||
|
|
||||||
|
func (e OtherInstanceDetectedEvent) name() Name {
|
||||||
|
return EventNameOtherInstanceDetected
|
||||||
|
}
|
||||||
|
|
||||||
func (e FatalErrorOccurredEvent) name() Name {
|
func (e FatalErrorOccurredEvent) name() Name {
|
||||||
return "fatal_error_occurred"
|
return "fatal_error_occurred"
|
||||||
}
|
}
|
||||||
|
@ -285,6 +285,7 @@ func (ui *UI) run(ctx context.Context) {
|
|||||||
startDestinationFailedC := ui.eventBus.Register(event.EventNameStartDestinationFailed)
|
startDestinationFailedC := ui.eventBus.Register(event.EventNameStartDestinationFailed)
|
||||||
destinationRemovedC := ui.eventBus.Register(event.EventNameDestinationRemoved)
|
destinationRemovedC := ui.eventBus.Register(event.EventNameDestinationRemoved)
|
||||||
removeDestinationFailedC := ui.eventBus.Register(event.EventNameRemoveDestinationFailed)
|
removeDestinationFailedC := ui.eventBus.Register(event.EventNameRemoveDestinationFailed)
|
||||||
|
existingAppDetectedC := ui.eventBus.Register(event.EventNameOtherInstanceDetected)
|
||||||
mediaServerStartedC := ui.eventBus.Register(event.EventNameMediaServerStarted)
|
mediaServerStartedC := ui.eventBus.Register(event.EventNameMediaServerStarted)
|
||||||
fatalErrorOccurredC := ui.eventBus.Register(event.EventNameFatalErrorOccurred)
|
fatalErrorOccurredC := ui.eventBus.Register(event.EventNameFatalErrorOccurred)
|
||||||
|
|
||||||
@ -325,6 +326,10 @@ func (ui *UI) run(ctx context.Context) {
|
|||||||
ui.app.QueueUpdateDraw(func() {
|
ui.app.QueueUpdateDraw(func() {
|
||||||
ui.handleDestinationEventError(evt.(event.RemoveDestinationFailedEvent).Err)
|
ui.handleDestinationEventError(evt.(event.RemoveDestinationFailedEvent).Err)
|
||||||
})
|
})
|
||||||
|
case evt := <-existingAppDetectedC:
|
||||||
|
ui.app.QueueUpdateDraw(func() {
|
||||||
|
ui.handleOtherInstanceDetected(evt.(event.OtherInstanceDetectedEvent))
|
||||||
|
})
|
||||||
case evt := <-mediaServerStartedC:
|
case evt := <-mediaServerStartedC:
|
||||||
ui.app.QueueUpdateDraw(func() {
|
ui.app.QueueUpdateDraw(func() {
|
||||||
ui.handleMediaServerStarted(evt.(event.MediaServerStartedEvent))
|
ui.handleMediaServerStarted(evt.(event.MediaServerStartedEvent))
|
||||||
@ -428,32 +433,20 @@ func (ui *UI) handleStartDestinationFailed(event.StartDestinationFailedEvent) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShowStartupCheckModal shows a modal dialog to the user, asking if they want
|
func (ui *UI) handleOtherInstanceDetected(event.OtherInstanceDetectedEvent) {
|
||||||
// to kill a running instance of Octoplex.
|
ui.showModal(
|
||||||
//
|
pageNameModalStartupCheck,
|
||||||
// The method will block until the user has made a choice, after which the
|
"Another instance of Octoplex may already be running.\n\nPressing continue will close that instance. Continue?",
|
||||||
// channel will receive true if the user wants to quit the other instance, or
|
[]string{"Continue", "Exit"},
|
||||||
// false to quit this instance.
|
false,
|
||||||
func (ui *UI) ShowStartupCheckModal() bool {
|
func(buttonIndex int, _ string) {
|
||||||
done := make(chan bool)
|
if buttonIndex == 0 {
|
||||||
|
ui.commandC <- domain.CommandCloseOtherInstance{}
|
||||||
ui.app.QueueUpdateDraw(func() {
|
} else {
|
||||||
ui.showModal(
|
ui.commandC <- domain.CommandQuit{}
|
||||||
pageNameModalStartupCheck,
|
}
|
||||||
"Another instance of Octoplex may already be running.\n\nPressing continue will close that instance. Continue?",
|
},
|
||||||
[]string{"Continue", "Exit"},
|
)
|
||||||
false,
|
|
||||||
func(buttonIndex int, _ string) {
|
|
||||||
if buttonIndex == 0 {
|
|
||||||
done <- true
|
|
||||||
} else {
|
|
||||||
done <- false
|
|
||||||
}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
})
|
|
||||||
|
|
||||||
return <-done
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ui *UI) ShowDestinationErrorModal(name string, err error) {
|
func (ui *UI) ShowDestinationErrorModal(name string, err error) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user