refactor(app): extract more events
This commit is contained in:
parent
cdf41e47c3
commit
4029c66a4a
@ -22,6 +22,7 @@ import (
|
|||||||
type App struct {
|
type App struct {
|
||||||
cfg config.Config
|
cfg config.Config
|
||||||
configService *config.Service
|
configService *config.Service
|
||||||
|
eventBus *event.Bus
|
||||||
dockerClient container.DockerClient
|
dockerClient container.DockerClient
|
||||||
screen *terminal.Screen // Screen may be nil.
|
screen *terminal.Screen // Screen may be nil.
|
||||||
clipboardAvailable bool
|
clipboardAvailable bool
|
||||||
@ -45,6 +46,7 @@ func New(params Params) *App {
|
|||||||
return &App{
|
return &App{
|
||||||
cfg: params.ConfigService.Current(),
|
cfg: params.ConfigService.Current(),
|
||||||
configService: params.ConfigService,
|
configService: params.ConfigService,
|
||||||
|
eventBus: event.NewBus(params.Logger.With("component", "event_bus")),
|
||||||
dockerClient: params.DockerClient,
|
dockerClient: params.DockerClient,
|
||||||
screen: params.Screen,
|
screen: params.Screen,
|
||||||
clipboardAvailable: params.ClipboardAvailable,
|
clipboardAvailable: params.ClipboardAvailable,
|
||||||
@ -56,8 +58,6 @@ func New(params Params) *App {
|
|||||||
|
|
||||||
// Run starts the application, and blocks until it exits.
|
// Run starts the application, and blocks until it exits.
|
||||||
func (a *App) Run(ctx context.Context) error {
|
func (a *App) Run(ctx context.Context) error {
|
||||||
eventBus := event.NewBus(a.logger.With("component", "event_bus"))
|
|
||||||
|
|
||||||
// state is the current state of the application, as reflected in the UI.
|
// state is the current state of the application, as reflected in the UI.
|
||||||
state := new(domain.AppState)
|
state := new(domain.AppState)
|
||||||
applyConfig(a.cfg, state)
|
applyConfig(a.cfg, state)
|
||||||
@ -68,7 +68,7 @@ func (a *App) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ui, err := terminal.StartUI(ctx, terminal.StartParams{
|
ui, err := terminal.StartUI(ctx, terminal.StartParams{
|
||||||
EventBus: eventBus,
|
EventBus: a.eventBus,
|
||||||
Screen: a.screen,
|
Screen: a.screen,
|
||||||
ClipboardAvailable: a.clipboardAvailable,
|
ClipboardAvailable: a.clipboardAvailable,
|
||||||
ConfigFilePath: a.configFilePath,
|
ConfigFilePath: a.configFilePath,
|
||||||
@ -95,13 +95,13 @@ func (a *App) Run(ctx context.Context) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("create container client: %w", err)
|
err = fmt.Errorf("create container client: %w", err)
|
||||||
|
|
||||||
var errString string
|
var msg string
|
||||||
if client.IsErrConnectionFailed(err) {
|
if client.IsErrConnectionFailed(err) {
|
||||||
errString = "Could not connect to Docker. Is Docker installed and running?"
|
msg = "Could not connect to Docker. Is Docker installed and running?"
|
||||||
} else {
|
} else {
|
||||||
errString = err.Error()
|
msg = err.Error()
|
||||||
}
|
}
|
||||||
ui.ShowFatalErrorModal(errString)
|
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
|
||||||
|
|
||||||
emptyUI()
|
emptyUI()
|
||||||
<-ui.C()
|
<-ui.C()
|
||||||
@ -130,7 +130,7 @@ func (a *App) Run(ctx context.Context) error {
|
|||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("create mediaserver: %w", err)
|
err = fmt.Errorf("create mediaserver: %w", err)
|
||||||
ui.ShowFatalErrorModal(err.Error())
|
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: err.Error()})
|
||||||
emptyUI()
|
emptyUI()
|
||||||
<-ui.C()
|
<-ui.C()
|
||||||
return err
|
return err
|
||||||
@ -164,7 +164,7 @@ func (a *App) Run(ctx context.Context) error {
|
|||||||
return fmt.Errorf("start mediaserver: %w", err)
|
return fmt.Errorf("start mediaserver: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()})
|
a.eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()})
|
||||||
}
|
}
|
||||||
case <-a.configService.C():
|
case <-a.configService.C():
|
||||||
// No-op, config updates are handled synchronously for now.
|
// No-op, config updates are handled synchronously for now.
|
||||||
@ -220,7 +220,7 @@ func (a *App) handleCommand(
|
|||||||
}
|
}
|
||||||
a.cfg = newCfg
|
a.cfg = newCfg
|
||||||
handleConfigUpdate(a.cfg, state, ui)
|
handleConfigUpdate(a.cfg, state, ui)
|
||||||
ui.DestinationAdded()
|
a.eventBus.Send(event.DestinationAddedEvent{URL: c.URL})
|
||||||
case domain.CommandRemoveDestination:
|
case domain.CommandRemoveDestination:
|
||||||
repl.StopDestination(c.URL) // no-op if not live
|
repl.StopDestination(c.URL) // no-op if not live
|
||||||
newCfg := a.cfg
|
newCfg := a.cfg
|
||||||
@ -234,7 +234,7 @@ func (a *App) handleCommand(
|
|||||||
}
|
}
|
||||||
a.cfg = newCfg
|
a.cfg = newCfg
|
||||||
handleConfigUpdate(a.cfg, state, ui)
|
handleConfigUpdate(a.cfg, state, ui)
|
||||||
ui.DestinationRemoved()
|
a.eventBus.Send(event.DestinationRemovedEvent{URL: c.URL})
|
||||||
case domain.CommandStartDestination:
|
case domain.CommandStartDestination:
|
||||||
if !state.Source.Live {
|
if !state.Source.Live {
|
||||||
ui.ShowSourceNotLiveModal()
|
ui.ShowSourceNotLiveModal()
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
@ -150,25 +151,37 @@ func printScreen(t *testing.T, getContents func() []string, label string) {
|
|||||||
func sendKey(t *testing.T, screen tcell.SimulationScreen, key tcell.Key, ch rune) {
|
func sendKey(t *testing.T, screen tcell.SimulationScreen, key tcell.Key, ch rune) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
screen.InjectKey(key, ch, tcell.ModNone)
|
const (
|
||||||
time.Sleep(50 * time.Millisecond)
|
waitTime = 50 * time.Millisecond
|
||||||
|
maxTries = 50
|
||||||
|
)
|
||||||
|
|
||||||
|
for i := 0; i < maxTries; i++ {
|
||||||
|
if err := screen.PostEvent(tcell.NewEventKey(key, ch, tcell.ModNone)); err != nil {
|
||||||
|
fmt.Printf("Error injecting rune %s, will retry in %s: %s\n", strconv.QuoteRune(ch), waitTime, err)
|
||||||
|
time.Sleep(waitTime)
|
||||||
|
} else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Fatalf("Failed to send key event after %d tries", maxTries)
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendKeys(t *testing.T, screen tcell.SimulationScreen, keys string) {
|
func sendKeys(t *testing.T, screen tcell.SimulationScreen, keys string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
screen.InjectKeyBytes([]byte(keys))
|
for _, ch := range keys {
|
||||||
time.Sleep(500 * time.Millisecond)
|
sendKey(t, screen, tcell.KeyRune, ch)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendBackspaces(t *testing.T, screen tcell.SimulationScreen, n int) {
|
func sendBackspaces(t *testing.T, screen tcell.SimulationScreen, n int) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
for range n {
|
for range n {
|
||||||
screen.InjectKey(tcell.KeyBackspace, ' ', tcell.ModNone)
|
sendKey(t, screen, tcell.KeyBackspace, 0)
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
}
|
}
|
||||||
time.Sleep(500 * time.Millisecond)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// kickFirstRTMPConn kicks the first RTMP connection from the mediaMTX server.
|
// kickFirstRTMPConn kicks the first RTMP connection from the mediaMTX server.
|
||||||
|
@ -181,11 +181,11 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
|
|||||||
// Add a second destination in-app:
|
// Add a second destination in-app:
|
||||||
sendKey(t, screen, tcell.KeyRune, 'a')
|
sendKey(t, screen, tcell.KeyRune, 'a')
|
||||||
|
|
||||||
sendBackspaces(t, screen, 30)
|
sendBackspaces(t, screen, 10)
|
||||||
sendKeys(t, screen, "Local server 2")
|
sendKeys(t, screen, "Local server 2")
|
||||||
sendKey(t, screen, tcell.KeyTab, ' ')
|
sendKey(t, screen, tcell.KeyTab, ' ')
|
||||||
|
|
||||||
sendBackspaces(t, screen, 30)
|
sendBackspaces(t, screen, 10)
|
||||||
sendKeys(t, screen, destURL2)
|
sendKeys(t, screen, destURL2)
|
||||||
sendKey(t, screen, tcell.KeyTab, ' ')
|
sendKey(t, screen, tcell.KeyTab, ' ')
|
||||||
sendKey(t, screen, tcell.KeyEnter, ' ')
|
sendKey(t, screen, tcell.KeyEnter, ' ')
|
||||||
|
@ -7,26 +7,6 @@ import (
|
|||||||
|
|
||||||
const defaultChannelSize = 64
|
const defaultChannelSize = 64
|
||||||
|
|
||||||
type Name string
|
|
||||||
|
|
||||||
const (
|
|
||||||
EventNameMediaServerStarted Name = "media_server_started"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Event interface {
|
|
||||||
name() Name
|
|
||||||
}
|
|
||||||
|
|
||||||
// MediaServerStartedEvent is emitted when the mediaserver component starts successfully.
|
|
||||||
type MediaServerStartedEvent struct {
|
|
||||||
RTMPURL string
|
|
||||||
RTMPSURL string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e MediaServerStartedEvent) name() Name {
|
|
||||||
return "media_server_started"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Bus is an event bus.
|
// Bus is an event bus.
|
||||||
type Bus struct {
|
type Bus struct {
|
||||||
consumers map[Name][]chan Event
|
consumers map[Name][]chan Event
|
||||||
|
53
internal/event/events.go
Normal file
53
internal/event/events.go
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
package event
|
||||||
|
|
||||||
|
type Name string
|
||||||
|
|
||||||
|
const (
|
||||||
|
EventNameDestinationAdded Name = "destination_added"
|
||||||
|
EventNameDestinationRemoved Name = "destination_removed"
|
||||||
|
EventNameMediaServerStarted Name = "media_server_started"
|
||||||
|
EventNameFatalErrorOccurred Name = "fatal_error_occurred"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Event interface {
|
||||||
|
name() Name
|
||||||
|
}
|
||||||
|
|
||||||
|
// DestinationAddedEvent is emitted when a destination is successfully added.
|
||||||
|
type DestinationAddedEvent struct {
|
||||||
|
URL string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e DestinationAddedEvent) name() Name {
|
||||||
|
return EventNameDestinationAdded
|
||||||
|
}
|
||||||
|
|
||||||
|
// DestinationRemovedEvent is emitted when a destination is successfully
|
||||||
|
// removed.
|
||||||
|
type DestinationRemovedEvent struct {
|
||||||
|
URL string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e DestinationRemovedEvent) name() Name {
|
||||||
|
return EventNameDestinationRemoved
|
||||||
|
}
|
||||||
|
|
||||||
|
// MediaServerStartedEvent is emitted when the mediaserver component starts successfully.
|
||||||
|
type MediaServerStartedEvent struct {
|
||||||
|
RTMPURL string
|
||||||
|
RTMPSURL string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e MediaServerStartedEvent) name() Name {
|
||||||
|
return "media_server_started"
|
||||||
|
}
|
||||||
|
|
||||||
|
// FatalErrorOccurredEvent is emitted when a fatal application
|
||||||
|
// error occurs.
|
||||||
|
type FatalErrorOccurredEvent struct {
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e FatalErrorOccurredEvent) name() Name {
|
||||||
|
return "fatal_error_occurred"
|
||||||
|
}
|
@ -279,7 +279,10 @@ func (ui *UI) C() <-chan domain.Command {
|
|||||||
func (ui *UI) run(ctx context.Context) {
|
func (ui *UI) run(ctx context.Context) {
|
||||||
defer close(ui.commandC)
|
defer close(ui.commandC)
|
||||||
|
|
||||||
|
destinationAddedC := ui.eventBus.Register(event.EventNameDestinationAdded)
|
||||||
|
destinationRemovedC := ui.eventBus.Register(event.EventNameDestinationRemoved)
|
||||||
mediaServerStartedC := ui.eventBus.Register(event.EventNameMediaServerStarted)
|
mediaServerStartedC := ui.eventBus.Register(event.EventNameMediaServerStarted)
|
||||||
|
fatalErrorOccurredC := ui.eventBus.Register(event.EventNameFatalErrorOccurred)
|
||||||
|
|
||||||
uiDone := make(chan struct{})
|
uiDone := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
@ -294,8 +297,14 @@ func (ui *UI) run(ctx context.Context) {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case evt := <-destinationAddedC:
|
||||||
|
ui.handleDestinationAdded(evt.(event.DestinationAddedEvent))
|
||||||
|
case evt := <-destinationRemovedC:
|
||||||
|
ui.handleDestinationRemoved(evt.(event.DestinationRemovedEvent))
|
||||||
case evt := <-mediaServerStartedC:
|
case evt := <-mediaServerStartedC:
|
||||||
ui.handleMediaServerStarted(evt.(event.MediaServerStartedEvent))
|
ui.handleMediaServerStarted(evt.(event.MediaServerStartedEvent))
|
||||||
|
case evt := <-fatalErrorOccurredC:
|
||||||
|
ui.handleFatalErrorOccurred(evt.(event.FatalErrorOccurredEvent))
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-uiDone:
|
case <-uiDone:
|
||||||
@ -437,15 +446,13 @@ func (ui *UI) ShowDestinationErrorModal(name string, err error) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShowFatalErrorModal displays the provided error. It sends a CommandQuit to the
|
func (ui *UI) handleFatalErrorOccurred(evt event.FatalErrorOccurredEvent) {
|
||||||
// command channel when the user selects the Quit button.
|
|
||||||
func (ui *UI) ShowFatalErrorModal(errString string) {
|
|
||||||
ui.app.QueueUpdateDraw(func() {
|
ui.app.QueueUpdateDraw(func() {
|
||||||
ui.showModal(
|
ui.showModal(
|
||||||
pageNameModalFatalError,
|
pageNameModalFatalError,
|
||||||
fmt.Sprintf(
|
fmt.Sprintf(
|
||||||
"An error occurred:\n\n%s",
|
"An error occurred:\n\n%s",
|
||||||
errString,
|
evt.Message,
|
||||||
),
|
),
|
||||||
[]string{"Quit"},
|
[]string{"Quit"},
|
||||||
false,
|
false,
|
||||||
@ -957,8 +964,7 @@ func (ui *UI) removeDestination() {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DestinationAdded should be called when a new destination is added.
|
func (ui *UI) handleDestinationAdded(event.DestinationAddedEvent) {
|
||||||
func (ui *UI) DestinationAdded() {
|
|
||||||
ui.mu.Lock()
|
ui.mu.Lock()
|
||||||
ui.hasDestinations = true
|
ui.hasDestinations = true
|
||||||
ui.mu.Unlock()
|
ui.mu.Unlock()
|
||||||
@ -970,9 +976,8 @@ func (ui *UI) DestinationAdded() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// DestinationRemoved should be called when a destination is removed.
|
func (ui *UI) handleDestinationRemoved(event.DestinationRemovedEvent) {
|
||||||
func (ui *UI) DestinationRemoved() {
|
ui.app.QueueUpdateDraw(ui.selectPreviousDestination)
|
||||||
ui.selectPreviousDestination()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ui *UI) closeAddDestinationForm() {
|
func (ui *UI) closeAddDestinationForm() {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user