Compare commits

...

20 Commits

Author SHA1 Message Date
7566c207b0 fixup! build: add woodpecker
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2025-05-04 20:13:18 +02:00
53d52b3673 fixup! build: add woodpecker 2025-05-04 19:49:19 +02:00
3e2954f33b fixup! build: add woodpecker 2025-05-04 19:48:08 +02:00
586a376405 build: add woodpecker 2025-05-04 19:44:42 +02:00
750e9432be refactor(app): add Dispatch method 2025-04-30 22:17:19 +02:00
d313c1e020 doc: update README 2025-04-29 22:39:46 +02:00
812d3901d3 refactor(app): internalize dispatch channel 2025-04-29 22:37:05 +02:00
caa543703e refactor: extract commands from domain 2025-04-28 06:32:00 +02:00
8403d751b6 doc: add godoc
Some checks failed
ci-build / lint (push) Has been cancelled
ci-scan / Analyze (go) (push) Has been cancelled
ci-scan / Analyze (actions) (push) Has been cancelled
ci-build / build (push) Has been cancelled
ci-build / release (push) Has been cancelled
2025-04-25 19:19:54 +02:00
2f1cadcf40 refactor(app): add DestinationWentOffAirEvent 2025-04-25 18:10:22 +02:00
1f4a931903 fix(app): event ordering
Some checks are pending
ci-build / lint (push) Waiting to run
ci-build / build (push) Blocked by required conditions
ci-build / release (push) Blocked by required conditions
ci-scan / Analyze (go) (push) Waiting to run
ci-scan / Analyze (actions) (push) Waiting to run
Use a single channel per consumer, instead of one channel per
consumer/event tuple. This ensures that overall ordering of events
remains consistent, and avoids introducing subtle race conditions.
2025-04-25 17:42:49 +02:00
94623248c0 refactor(app): async startup check 2025-04-25 17:42:49 +02:00
4a16780915 fixup! refactor: add event bus 2025-04-25 04:46:32 +02:00
3019387f38 refactor(app): add StartDestinationFailedEvent 2025-04-25 04:44:36 +02:00
f4021a2886 refactor(app): add destination error events 2025-04-24 21:45:53 +02:00
b8550f050b refactor(app): add AppStateChangedEvent 2025-04-24 21:45:53 +02:00
c02a66202f doc: update README 2025-04-24 21:45:53 +02:00
4029c66a4a refactor(app): extract more events 2025-04-24 21:45:53 +02:00
cdf41e47c3 refactor(app): extract handleCommand 2025-04-22 16:28:38 +02:00
d7f8fb49eb refactor(app): add App type 2025-04-22 16:06:37 +02:00
15 changed files with 577 additions and 405 deletions

View File

@ -1,4 +1,4 @@
name: ci-build name: build
run-name: Building ${{ github.ref_name }} run-name: Building ${{ github.ref_name }}
on: on:
push: push:

View File

@ -1,4 +1,4 @@
name: ci-scan name: codeql
on: on:
push: push:

9
.woodpecker.yml Normal file
View File

@ -0,0 +1,9 @@
---
when:
- event: push
steps:
- name: lint
image: koalaman/shellcheck:stable
commands:
- shellcheck $(find . -type f -name "*.sh")

View File

@ -33,11 +33,12 @@ broadcast to localhost:1935 or localhost:1936.
## Opening a pull request ## Opening a pull request
Pull requests are welcome, but please propose significant changes in a Pull requests are welcome, please propose significant changes in a
[discussion](https://github.com/rfwatson/octoplex/discussions) first. [discussion](https://github.com/rfwatson/octoplex/discussions) first.
1. Fork the repo 1. Fork the repo
2. Make your changes, including test coverage 1. Make your changes, including test coverage
3. Push the changes to a branch 1. Run the formatter (`mise run format`)
4. Ensure the branch is passing 1. Push the changes to a branch
5. Open a pull request 1. Ensure the branch is passing
1. Open a pull request

View File

@ -1,11 +1,12 @@
# Octoplex :octopus: # Octoplex :octopus:
![build status](https://github.com/rfwatson/octoplex/actions/workflows/build.yml/badge.svg) [![build status](https://github.com/rfwatson/octoplex/actions/workflows/build.yml/badge.svg)](https://github.com/rfwatson/octoplex/actions/workflows/build.yml)
![scan status](https://github.com/rfwatson/octoplex/actions/workflows/codeql.yml/badge.svg) [![scan status](https://github.com/rfwatson/octoplex/actions/workflows/codeql.yml/badge.svg)](https://github.com/rfwatson/octoplex/actions/workflows/codeql.yml)
![GitHub Release](https://img.shields.io/github/v/release/rfwatson/octoplex) ![GitHub Release](https://img.shields.io/github/v/release/rfwatson/octoplex)
[![Go Report Card](https://goreportcard.com/badge/git.netflux.io/rob/octoplex)](https://goreportcard.com/report/git.netflux.io/rob/octoplex)
[![License: AGPL v3](https://img.shields.io/badge/License-AGPL_v3-blue.svg)](https://www.gnu.org/licenses/agpl-3.0) [![License: AGPL v3](https://img.shields.io/badge/License-AGPL_v3-blue.svg)](https://www.gnu.org/licenses/agpl-3.0)
Octoplex is a live video restreamer for the terminal. Octoplex is a Docker-native live video restreamer.
* Restream RTMP/RTMPS to unlimited destinations * Restream RTMP/RTMPS to unlimited destinations
* Broadcast using OBS or any standard tool * Broadcast using OBS or any standard tool
@ -38,8 +39,7 @@ Octoplex is a live video restreamer for the terminal.
### Docker Engine ### Docker Engine
First, make sure Docker Engine is installed. Octoplex uses Docker to manage First, ensure that Docker Engine is installed.
FFmpeg and other streaming tools.
Linux: See https://docs.docker.com/engine/install/. Linux: See https://docs.docker.com/engine/install/.
@ -160,14 +160,12 @@ localhost (`127.0.0.1`) or use `0.0.0.0` to bind to all network interfaces.
## Contributing ## Contributing
See [CONTRIBUTING.md](/CONTRIBUTING.md).
### Bug reports ### Bug reports
Open bug reports [on GitHub](https://github.com/rfwatson/octoplex/issues/new). Open bug reports [on GitHub](https://github.com/rfwatson/octoplex/issues/new).
### Pull requests
Pull requests are welcome.
## Acknowledgements ## Acknowledgements
Octoplex is built on and/or makes use of other free and open source software, Octoplex is built on and/or makes use of other free and open source software,

View File

@ -1,6 +1,7 @@
package app package app
import ( import (
"cmp"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -18,10 +19,25 @@ import (
"github.com/docker/docker/client" "github.com/docker/docker/client"
) )
// RunParams holds the parameters for running the application. // App is an instance of the app.
type RunParams struct { type App struct {
cfg config.Config
configService *config.Service
eventBus *event.Bus
dispatchC chan event.Command
dockerClient container.DockerClient
screen *terminal.Screen // Screen may be nil.
clipboardAvailable bool
configFilePath string
buildInfo domain.BuildInfo
logger *slog.Logger
}
// Params holds the parameters for running the application.
type Params struct {
ConfigService *config.Service ConfigService *config.Service
DockerClient container.DockerClient DockerClient container.DockerClient
ChanSize int
Screen *terminal.Screen // Screen may be nil. Screen *terminal.Screen // Screen may be nil.
ClipboardAvailable bool ClipboardAvailable bool
ConfigFilePath string ConfigFilePath string
@ -29,31 +45,44 @@ type RunParams struct {
Logger *slog.Logger Logger *slog.Logger
} }
// defaultChanSize is the default size of the dispatch channel.
const defaultChanSize = 64
// New creates a new application instance.
func New(params Params) *App {
return &App{
cfg: params.ConfigService.Current(),
configService: params.ConfigService,
eventBus: event.NewBus(params.Logger.With("component", "event_bus")),
dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)),
dockerClient: params.DockerClient,
screen: params.Screen,
clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo,
logger: params.Logger,
}
}
// Run starts the application, and blocks until it exits. // Run starts the application, and blocks until it exits.
func Run(ctx context.Context, params RunParams) error { func (a *App) Run(ctx context.Context) error {
logger := params.Logger
eventBus := event.NewBus(logger.With("component", "event_bus"))
// cfg is the current configuration of the application, as reflected in the
// config file.
cfg := params.ConfigService.Current()
// state is the current state of the application, as reflected in the UI. // state is the current state of the application, as reflected in the UI.
state := new(domain.AppState) state := new(domain.AppState)
applyConfig(cfg, state) applyConfig(a.cfg, state)
// Ensure there is at least one active source. // Ensure there is at least one active source.
if !cfg.Sources.MediaServer.RTMP.Enabled && !cfg.Sources.MediaServer.RTMPS.Enabled { if !a.cfg.Sources.MediaServer.RTMP.Enabled && !a.cfg.Sources.MediaServer.RTMPS.Enabled {
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set") return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
} }
ui, err := terminal.StartUI(ctx, terminal.StartParams{ ui, err := terminal.StartUI(ctx, terminal.StartParams{
EventBus: eventBus, EventBus: a.eventBus,
Screen: params.Screen, Dispatcher: func(cmd event.Command) { a.dispatchC <- cmd },
ClipboardAvailable: params.ClipboardAvailable, Screen: a.screen,
ConfigFilePath: params.ConfigFilePath, ClipboardAvailable: a.clipboardAvailable,
BuildInfo: params.BuildInfo, ConfigFilePath: a.configFilePath,
Logger: logger.With("component", "ui"), BuildInfo: a.buildInfo,
Logger: a.logger.With("component", "ui"),
}) })
if err != nil { if err != nil {
return fmt.Errorf("start terminal user interface: %w", err) return fmt.Errorf("start terminal user interface: %w", err)
@ -69,50 +98,56 @@ func Run(ctx context.Context, params RunParams) error {
// It is only needed for integration tests when rendering modals before the // It is only needed for integration tests when rendering modals before the
// main loop starts. It would be nice to remove this but the risk/impact on // main loop starts. It would be nice to remove this but the risk/impact on
// non-test code is pretty low. // non-test code is pretty low.
emptyUI := func() { ui.SetState(domain.AppState{}) } emptyUI := func() {
a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}})
}
containerClient, err := container.NewClient(ctx, params.DockerClient, logger.With("component", "container_client")) containerClient, err := container.NewClient(ctx, a.dockerClient, a.logger.With("component", "container_client"))
if err != nil { if err != nil {
err = fmt.Errorf("create container client: %w", err) err = fmt.Errorf("create container client: %w", err)
var errString string var msg string
if client.IsErrConnectionFailed(err) { if client.IsErrConnectionFailed(err) {
errString = "Could not connect to Docker. Is Docker installed and running?" msg = "Could not connect to Docker. Is Docker installed and running?"
} else { } else {
errString = err.Error() msg = err.Error()
} }
ui.ShowFatalErrorModal(errString) a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
emptyUI() emptyUI()
<-ui.C() <-a.dispatchC
return err return err
} }
defer containerClient.Close() defer containerClient.Close()
updateUI := func() { ui.SetState(*state) } updateUI := func() {
// The state is mutable so can't be passed into another goroutine
// without cloning it first.
a.eventBus.Send(event.AppStateChangedEvent{State: state.Clone()})
}
updateUI() updateUI()
var tlsCertPath, tlsKeyPath string var tlsCertPath, tlsKeyPath string
if cfg.Sources.MediaServer.TLS != nil { if a.cfg.Sources.MediaServer.TLS != nil {
tlsCertPath = cfg.Sources.MediaServer.TLS.CertPath tlsCertPath = a.cfg.Sources.MediaServer.TLS.CertPath
tlsKeyPath = cfg.Sources.MediaServer.TLS.KeyPath tlsKeyPath = a.cfg.Sources.MediaServer.TLS.KeyPath
} }
srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{ srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{
RTMPAddr: buildNetAddr(cfg.Sources.MediaServer.RTMP), RTMPAddr: buildNetAddr(a.cfg.Sources.MediaServer.RTMP),
RTMPSAddr: buildNetAddr(cfg.Sources.MediaServer.RTMPS), RTMPSAddr: buildNetAddr(a.cfg.Sources.MediaServer.RTMPS),
Host: cfg.Sources.MediaServer.Host, Host: a.cfg.Sources.MediaServer.Host,
TLSCertPath: tlsCertPath, TLSCertPath: tlsCertPath,
TLSKeyPath: tlsKeyPath, TLSKeyPath: tlsKeyPath,
StreamKey: mediaserver.StreamKey(cfg.Sources.MediaServer.StreamKey), StreamKey: mediaserver.StreamKey(a.cfg.Sources.MediaServer.StreamKey),
ContainerClient: containerClient, ContainerClient: containerClient,
Logger: logger.With("component", "mediaserver"), Logger: a.logger.With("component", "mediaserver"),
}) })
if err != nil { if err != nil {
err = fmt.Errorf("create mediaserver: %w", err) err = fmt.Errorf("create mediaserver: %w", err)
ui.ShowFatalErrorModal(err.Error()) a.eventBus.Send(event.FatalErrorOccurredEvent{Message: err.Error()})
emptyUI() emptyUI()
<-ui.C() <-a.dispatchC
return err return err
} }
defer srv.Close() defer srv.Close()
@ -120,7 +155,7 @@ func Run(ctx context.Context, params RunParams) error {
repl := replicator.StartActor(ctx, replicator.StartActorParams{ repl := replicator.StartActor(ctx, replicator.StartActorParams{
SourceURL: srv.RTMPInternalURL(), SourceURL: srv.RTMPInternalURL(),
ContainerClient: containerClient, ContainerClient: containerClient,
Logger: logger.With("component", "replicator"), Logger: a.logger.With("component", "replicator"),
}) })
defer repl.Close() defer repl.Close()
@ -128,87 +163,45 @@ func Run(ctx context.Context, params RunParams) error {
uiUpdateT := time.NewTicker(uiUpdateInterval) uiUpdateT := time.NewTicker(uiUpdateInterval)
defer uiUpdateT.Stop() defer uiUpdateT.Stop()
startupCheckC := doStartupCheck(ctx, containerClient, ui.ShowStartupCheckModal) startMediaServerC := make(chan struct{}, 1)
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
startupErr = fmt.Errorf("startup check: %w", startupErr)
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: startupErr.Error()})
<-a.dispatchC
return startupErr
} else if ok {
startMediaServerC <- struct{}{}
}
for { for {
select { select {
case err := <-startupCheckC: case <-startMediaServerC:
if errors.Is(err, errStartupCheckUserQuit) {
return nil
} else if err != nil {
return fmt.Errorf("startup check: %w", err)
} else {
startupCheckC = nil
if err = srv.Start(ctx); err != nil { if err = srv.Start(ctx); err != nil {
return fmt.Errorf("start mediaserver: %w", err) return fmt.Errorf("start mediaserver: %w", err)
} }
eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()}) a.eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()})
} case <-a.configService.C():
case <-params.ConfigService.C():
// No-op, config updates are handled synchronously for now. // No-op, config updates are handled synchronously for now.
case cmd, ok := <-ui.C(): case cmd := <-a.dispatchC:
if !ok { if _, err := a.handleCommand(ctx, cmd, state, repl, containerClient, startMediaServerC); errors.Is(err, errExit) {
// TODO: keep UI open until all containers have closed
logger.Info("UI closed")
return nil
}
logger.Debug("Command received", "cmd", cmd.Name())
switch c := cmd.(type) {
case domain.CommandAddDestination:
newCfg := cfg
newCfg.Destinations = append(newCfg.Destinations, config.Destination{
Name: c.DestinationName,
URL: c.URL,
})
if err := params.ConfigService.SetConfig(newCfg); err != nil {
logger.Error("Config update failed", "err", err)
ui.ConfigUpdateFailed(err)
continue
}
cfg = newCfg
handleConfigUpdate(cfg, state, ui)
ui.DestinationAdded()
case domain.CommandRemoveDestination:
repl.StopDestination(c.URL) // no-op if not live
newCfg := cfg
newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool {
return dest.URL == c.URL
})
if err := params.ConfigService.SetConfig(newCfg); err != nil {
logger.Error("Config update failed", "err", err)
ui.ConfigUpdateFailed(err)
continue
}
cfg = newCfg
handleConfigUpdate(cfg, state, ui)
ui.DestinationRemoved()
case domain.CommandStartDestination:
if !state.Source.Live {
ui.ShowSourceNotLiveModal()
continue
}
repl.StartDestination(c.URL)
case domain.CommandStopDestination:
repl.StopDestination(c.URL)
case domain.CommandQuit:
return nil return nil
} else if err != nil {
return fmt.Errorf("handle command: %w", err)
} }
case <-uiUpdateT.C: case <-uiUpdateT.C:
updateUI() updateUI()
case serverState := <-srv.C(): case serverState := <-srv.C():
logger.Debug("Server state received", "state", serverState) a.logger.Debug("Server state received", "state", serverState)
applyServerState(serverState, state) applyServerState(serverState, state)
updateUI() updateUI()
case replState := <-repl.C(): case replState := <-repl.C():
logger.Debug("Replicator state received", "state", replState) a.logger.Debug("Replicator state received", "state", replState)
destErrors := applyReplicatorState(replState, state) destErrors := applyReplicatorState(replState, state)
for _, destError := range destErrors { for _, destError := range destErrors {
handleDestError(destError, repl, ui) a.eventBus.Send(event.DestinationStreamExitedEvent{Name: destError.name, Err: destError.err})
repl.StopDestination(destError.url)
} }
updateUI() updateUI()
@ -216,10 +209,100 @@ func Run(ctx context.Context, params RunParams) error {
} }
} }
// handleConfigUpdate applies the config to the app state, and updates the UI. type syncCommand struct {
func handleConfigUpdate(cfg config.Config, appState *domain.AppState, ui *terminal.UI) { event.Command
applyConfig(cfg, appState)
ui.SetState(*appState) done chan<- event.Event
}
// Dispatch dispatches a command to be executed synchronously.
func (a *App) Dispatch(cmd event.Command) event.Event {
ch := make(chan event.Event, 1)
a.dispatchC <- syncCommand{Command: cmd, done: ch}
return <-ch
}
// errExit is an error that indicates the app should exit.
var errExit = errors.New("exit")
// handleCommand handles an incoming command. It may return an Event which will
// already have been published to the event bus, but which is returned for the
// benefit of synchronous callers. The event may be nil. It may also publish
// other events to the event bus which are not returned. Currently the only
// error that may be returned is [errExit], which indicates to the main event
// loop that the app should exit.
func (a *App) handleCommand(
ctx context.Context,
cmd event.Command,
state *domain.AppState,
repl *replicator.Actor,
containerClient *container.Client,
startMediaServerC chan struct{},
) (evt event.Event, _ error) {
a.logger.Debug("Command received", "cmd", cmd.Name())
defer func() {
if evt != nil {
a.eventBus.Send(evt)
}
if c, ok := cmd.(syncCommand); ok {
c.done <- evt
}
}()
switch c := cmd.(type) {
case event.CommandAddDestination:
newCfg := a.cfg
newCfg.Destinations = append(newCfg.Destinations, config.Destination{
Name: c.DestinationName,
URL: c.URL,
})
if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Add destination failed", "err", err)
return event.AddDestinationFailedEvent{Err: err}, nil
}
a.cfg = newCfg
a.handleConfigUpdate(state)
a.eventBus.Send(event.DestinationAddedEvent{URL: c.URL})
case event.CommandRemoveDestination:
repl.StopDestination(c.URL) // no-op if not live
newCfg := a.cfg
newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool {
return dest.URL == c.URL
})
if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Remove destination failed", "err", err)
a.eventBus.Send(event.RemoveDestinationFailedEvent{Err: err})
break
}
a.cfg = newCfg
a.handleConfigUpdate(state)
a.eventBus.Send(event.DestinationRemovedEvent{URL: c.URL}) //nolint:gosimple
case event.CommandStartDestination:
if !state.Source.Live {
a.eventBus.Send(event.StartDestinationFailedEvent{})
break
}
repl.StartDestination(c.URL)
case event.CommandStopDestination:
repl.StopDestination(c.URL)
case event.CommandCloseOtherInstance:
if err := closeOtherInstances(ctx, containerClient); err != nil {
return nil, fmt.Errorf("close other instances: %w", err)
}
startMediaServerC <- struct{}{}
case event.CommandQuit:
return nil, errExit
}
return nil, nil
}
// handleConfigUpdate applies the config to the app state, and sends an AppStateChangedEvent.
func (a *App) handleConfigUpdate(appState *domain.AppState) {
applyConfig(a.cfg, appState)
a.eventBus.Send(event.AppStateChangedEvent{State: appState.Clone()})
} }
// applyServerState applies the current server state to the app state. // applyServerState applies the current server state to the app state.
@ -265,13 +348,6 @@ func applyReplicatorState(replState replicator.State, appState *domain.AppState)
return errorsToDisplay return errorsToDisplay
} }
// handleDestError displays a modal to the user, and stops the destination.
func handleDestError(destError destinationError, repl *replicator.Actor, ui *terminal.UI) {
ui.ShowDestinationErrorModal(destError.name, destError.err)
repl.StopDestination(destError.url)
}
// applyConfig applies the config to the app state. For now we only set the // applyConfig applies the config to the app state. For now we only set the
// destinations. // destinations.
func applyConfig(cfg config.Config, appState *domain.AppState) { func applyConfig(cfg config.Config, appState *domain.AppState) {
@ -301,40 +377,34 @@ func resolveDestinations(destinations []domain.Destination, inDestinations []con
return destinations[:len(inDestinations)] return destinations[:len(inDestinations)]
} }
var errStartupCheckUserQuit = errors.New("user quit startup check modal")
// doStartupCheck performs a startup check to see if there are any existing app // doStartupCheck performs a startup check to see if there are any existing app
// containers. // containers.
// //
// It returns a channel that will be closed, possibly after receiving an error. // It returns a bool if the check is clear. If the bool is false, then
// If the error is non-nil the app must not be started. If the error is // startup should be paused until the choice selected by the user is received
// [errStartupCheckUserQuit], the user voluntarily quit the startup check // via a command.
// modal. func doStartupCheck(ctx context.Context, containerClient *container.Client, eventBus *event.Bus) (bool, error) {
func doStartupCheck(ctx context.Context, containerClient *container.Client, showModal func() bool) <-chan error {
ch := make(chan error, 1)
go func() {
defer close(ch)
if exists, err := containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil { if exists, err := containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil {
ch <- fmt.Errorf("check existing containers: %w", err) return false, fmt.Errorf("check existing containers: %w", err)
} else if exists { } else if exists {
if showModal() { eventBus.Send(event.OtherInstanceDetectedEvent{})
if err = containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil {
ch <- fmt.Errorf("remove existing containers: %w", err)
return
}
if err = containerClient.RemoveUnusedNetworks(ctx); err != nil {
ch <- fmt.Errorf("remove unused networks: %w", err)
return
}
} else {
ch <- errStartupCheckUserQuit
}
}
}()
return ch return false, nil
}
return true, nil
}
func closeOtherInstances(ctx context.Context, containerClient *container.Client) error {
if err := containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil {
return fmt.Errorf("remove existing containers: %w", err)
}
if err := containerClient.RemoveUnusedNetworks(ctx); err != nil {
return fmt.Errorf("remove unused networks: %w", err)
}
return nil
} }
// buildNetAddr builds a [mediaserver.OptionalNetAddr] from the config. // buildNetAddr builds a [mediaserver.OptionalNetAddr] from the config.

View File

@ -31,10 +31,10 @@ func buildAppParams(
screen tcell.SimulationScreen, screen tcell.SimulationScreen,
screenCaptureC chan<- terminal.ScreenCapture, screenCaptureC chan<- terminal.ScreenCapture,
logger *slog.Logger, logger *slog.Logger,
) app.RunParams { ) app.Params {
t.Helper() t.Helper()
return app.RunParams{ return app.Params{
ConfigService: configService, ConfigService: configService,
DockerClient: dockerClient, DockerClient: dockerClient,
Screen: &terminal.Screen{ Screen: &terminal.Screen{
@ -150,25 +150,36 @@ func printScreen(t *testing.T, getContents func() []string, label string) {
func sendKey(t *testing.T, screen tcell.SimulationScreen, key tcell.Key, ch rune) { func sendKey(t *testing.T, screen tcell.SimulationScreen, key tcell.Key, ch rune) {
t.Helper() t.Helper()
screen.InjectKey(key, ch, tcell.ModNone) const (
time.Sleep(50 * time.Millisecond) waitTime = 50 * time.Millisecond
maxTries = 50
)
for i := 0; i < maxTries; i++ {
if err := screen.PostEvent(tcell.NewEventKey(key, ch, tcell.ModNone)); err != nil {
time.Sleep(waitTime)
} else {
return
}
}
t.Fatalf("Failed to send key event after %d tries", maxTries)
} }
func sendKeys(t *testing.T, screen tcell.SimulationScreen, keys string) { func sendKeys(t *testing.T, screen tcell.SimulationScreen, keys string) {
t.Helper() t.Helper()
screen.InjectKeyBytes([]byte(keys)) for _, ch := range keys {
time.Sleep(500 * time.Millisecond) sendKey(t, screen, tcell.KeyRune, ch)
}
} }
func sendBackspaces(t *testing.T, screen tcell.SimulationScreen, n int) { func sendBackspaces(t *testing.T, screen tcell.SimulationScreen, n int) {
t.Helper() t.Helper()
for range n { for range n {
screen.InjectKey(tcell.KeyBackspace, ' ', tcell.ModNone) sendKey(t, screen, tcell.KeyBackspace, 0)
time.Sleep(50 * time.Millisecond)
} }
time.Sleep(500 * time.Millisecond)
} }
// kickFirstRTMPConn kicks the first RTMP connection from the mediaMTX server. // kickFirstRTMPConn kicks the first RTMP connection from the mediaMTX server.

View File

@ -132,7 +132,7 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
done <- struct{}{} done <- struct{}{}
}() }()
err := app.Run(ctx, app.RunParams{ require.NoError(t, app.New(app.Params{
ConfigService: configService, ConfigService: configService,
DockerClient: dockerClient, DockerClient: dockerClient,
Screen: &terminal.Screen{ Screen: &terminal.Screen{
@ -144,8 +144,7 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
ClipboardAvailable: false, ClipboardAvailable: false,
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"}, BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
Logger: logger, Logger: logger,
}) }).Run(ctx))
require.NoError(t, err)
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -182,11 +181,11 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
// Add a second destination in-app: // Add a second destination in-app:
sendKey(t, screen, tcell.KeyRune, 'a') sendKey(t, screen, tcell.KeyRune, 'a')
sendBackspaces(t, screen, 30) sendBackspaces(t, screen, 10)
sendKeys(t, screen, "Local server 2") sendKeys(t, screen, "Local server 2")
sendKey(t, screen, tcell.KeyTab, ' ') sendKey(t, screen, tcell.KeyTab, ' ')
sendBackspaces(t, screen, 30) sendBackspaces(t, screen, 10)
sendKeys(t, screen, destURL2) sendKeys(t, screen, destURL2)
sendKey(t, screen, tcell.KeyTab, ' ') sendKey(t, screen, tcell.KeyTab, ' ')
sendKey(t, screen, tcell.KeyEnter, ' ') sendKey(t, screen, tcell.KeyEnter, ' ')
@ -320,7 +319,7 @@ func TestIntegrationCustomHost(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
time.Sleep(time.Second) time.Sleep(time.Second)
@ -391,7 +390,7 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -472,7 +471,7 @@ func TestIntegrationRestartDestination(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -609,7 +608,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -682,7 +681,7 @@ func TestIntegrationDestinationValidations(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -824,7 +823,7 @@ func TestIntegrationStartupCheck(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -893,7 +892,7 @@ func TestIntegrationMediaServerError(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -934,7 +933,7 @@ func TestIntegrationDockerClientError(t *testing.T) {
require.EqualError( require.EqualError(
t, t,
app.Run(ctx, buildAppParams(t, configService, &dockerClient, screen, screenCaptureC, logger)), app.New(buildAppParams(t, configService, &dockerClient, screen, screenCaptureC, logger)).Run(ctx),
"create container client: network create: boom", "create container client: network create: boom",
) )
}() }()
@ -974,7 +973,7 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
err := app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)) err := app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)
require.ErrorContains(t, err, "dial tcp: lookup docker.example.com") require.ErrorContains(t, err, "dial tcp: lookup docker.example.com")
require.ErrorContains(t, err, "no such host") require.ErrorContains(t, err, "no such host")
}() }()
@ -1070,7 +1069,7 @@ func TestIntegrationCopyURLs(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)

View File

@ -1 +0,0 @@
package domain

View File

@ -7,29 +7,9 @@ import (
const defaultChannelSize = 64 const defaultChannelSize = 64
type Name string
const (
EventNameMediaServerStarted Name = "media_server_started"
)
type Event interface {
name() Name
}
// MediaServerStartedEvent is emitted when the mediaserver component starts successfully.
type MediaServerStartedEvent struct {
RTMPURL string
RTMPSURL string
}
func (e MediaServerStartedEvent) name() Name {
return "media_server_started"
}
// Bus is an event bus. // Bus is an event bus.
type Bus struct { type Bus struct {
consumers map[Name][]chan Event consumers []chan Event
mu sync.Mutex mu sync.Mutex
logger *slog.Logger logger *slog.Logger
} }
@ -37,18 +17,17 @@ type Bus struct {
// NewBus returns a new event bus. // NewBus returns a new event bus.
func NewBus(logger *slog.Logger) *Bus { func NewBus(logger *slog.Logger) *Bus {
return &Bus{ return &Bus{
consumers: make(map[Name][]chan Event),
logger: logger, logger: logger,
} }
} }
// Register registers a consumer for a given event. // Register registers a consumer for all events.
func (b *Bus) Register(name Name) <-chan Event { func (b *Bus) Register() <-chan Event {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
ch := make(chan Event, defaultChannelSize) ch := make(chan Event, defaultChannelSize)
b.consumers[name] = append(b.consumers[name], ch) b.consumers = append(b.consumers, ch)
return ch return ch
} }
@ -60,7 +39,7 @@ func (b *Bus) Send(evt Event) {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
for _, ch := range b.consumers[evt.name()] { for _, ch := range b.consumers {
select { select {
case ch <- evt: case ch <- evt:
default: default:

View File

@ -11,8 +11,8 @@ import (
func TestBus(t *testing.T) { func TestBus(t *testing.T) {
bus := event.NewBus(testhelpers.NewTestLogger(t)) bus := event.NewBus(testhelpers.NewTestLogger(t))
ch1 := bus.Register(event.EventNameMediaServerStarted) ch1 := bus.Register()
ch2 := bus.Register(event.EventNameMediaServerStarted) ch2 := bus.Register()
evt := event.MediaServerStartedEvent{ evt := event.MediaServerStartedEvent{
RTMPURL: "rtmp://rtmp.example.com/live", RTMPURL: "rtmp://rtmp.example.com/live",

View File

@ -1,4 +1,4 @@
package domain package event
// CommandAddDestination adds a destination. // CommandAddDestination adds a destination.
type CommandAddDestination struct { type CommandAddDestination struct {
@ -41,6 +41,14 @@ func (c CommandStopDestination) Name() string {
return "stop_destination" return "stop_destination"
} }
// CommandCloseOtherInstance closes the other instance of the application.
type CommandCloseOtherInstance struct{}
// Name implements the Command interface.
func (c CommandCloseOtherInstance) Name() string {
return "close_other_instance"
}
// CommandQuit quits the app. // CommandQuit quits the app.
type CommandQuit struct{} type CommandQuit struct{}

114
internal/event/events.go Normal file
View File

@ -0,0 +1,114 @@
package event
import "git.netflux.io/rob/octoplex/internal/domain"
type Name string
const (
EventNameAppStateChanged Name = "app_state_changed"
EventNameDestinationAdded Name = "destination_added"
EventNameAddDestinationFailed Name = "add_destination_failed"
EventNameDestinationStreamExited Name = "destination_stream_exited"
EventNameStartDestinationFailed Name = "start_destination_failed"
EventNameDestinationRemoved Name = "destination_removed"
EventNameRemoveDestinationFailed Name = "remove_destination_failed"
EventNameFatalErrorOccurred Name = "fatal_error_occurred"
EventNameOtherInstanceDetected Name = "other_instance_detected"
EventNameMediaServerStarted Name = "media_server_started"
)
// Event represents something which happened in the appllication.
type Event interface {
name() Name
}
// AppStateChangedEvent is emitted when the application state changes.
type AppStateChangedEvent struct {
State domain.AppState
}
func (e AppStateChangedEvent) name() Name {
return EventNameAppStateChanged
}
// DestinationAddedEvent is emitted when a destination is successfully added.
type DestinationAddedEvent struct {
URL string
}
func (e DestinationAddedEvent) name() Name {
return EventNameDestinationAdded
}
// AddDestinationFailedEvent is emitted when a destination fails to be added.
type AddDestinationFailedEvent struct {
Err error
}
func (e AddDestinationFailedEvent) name() Name {
return EventNameAddDestinationFailed
}
// DestinationStreamExitedEvent is emitted when a destination goes off-air unexpectedly.
type DestinationStreamExitedEvent struct {
Name string
Err error
}
func (e DestinationStreamExitedEvent) name() Name {
return EventNameDestinationStreamExited
}
// StartDestinationFailedEvent is emitted when a destination fails to start.
type StartDestinationFailedEvent struct{}
func (e StartDestinationFailedEvent) name() Name {
return EventNameStartDestinationFailed
}
// DestinationRemovedEvent is emitted when a destination is successfully
// removed.
type DestinationRemovedEvent struct {
URL string
}
func (e DestinationRemovedEvent) name() Name {
return EventNameDestinationRemoved
}
// RemoveDestinationFailedEvent is emitted when a destination fails to be
// removed.
type RemoveDestinationFailedEvent struct {
Err error
}
func (e RemoveDestinationFailedEvent) name() Name {
return EventNameRemoveDestinationFailed
}
// FatalErrorOccurredEvent is emitted when a fatal application
// error occurs.
type FatalErrorOccurredEvent struct {
Message string
}
// OtherInstanceDetectedEvent is emitted when the app launches and detects another instance.
type OtherInstanceDetectedEvent struct{}
func (e OtherInstanceDetectedEvent) name() Name {
return EventNameOtherInstanceDetected
}
func (e FatalErrorOccurredEvent) name() Name {
return "fatal_error_occurred"
}
// MediaServerStartedEvent is emitted when the mediaserver component starts successfully.
type MediaServerStartedEvent struct {
RTMPURL string
RTMPSURL string
}
func (e MediaServerStartedEvent) name() Name {
return "media_server_started"
}

View File

@ -42,7 +42,7 @@ const (
// UI is responsible for managing the terminal user interface. // UI is responsible for managing the terminal user interface.
type UI struct { type UI struct {
eventBus *event.Bus eventBus *event.Bus
commandC chan domain.Command dispatch func(event.Command)
clipboardAvailable bool clipboardAvailable bool
configFilePath string configFilePath string
rtmpURL, rtmpsURL string rtmpURL, rtmpsURL string
@ -96,7 +96,7 @@ type ScreenCapture struct {
// interface. // interface.
type StartParams struct { type StartParams struct {
EventBus *event.Bus EventBus *event.Bus
ChanSize int Dispatcher func(event.Command)
Logger *slog.Logger Logger *slog.Logger
ClipboardAvailable bool ClipboardAvailable bool
ConfigFilePath string ConfigFilePath string
@ -104,13 +104,8 @@ type StartParams struct {
Screen *Screen // Screen may be nil. Screen *Screen // Screen may be nil.
} }
const defaultChanSize = 64
// StartUI starts the terminal user interface. // StartUI starts the terminal user interface.
func StartUI(ctx context.Context, params StartParams) (*UI, error) { func StartUI(ctx context.Context, params StartParams) (*UI, error) {
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
commandCh := make(chan domain.Command, chanSize)
app := tview.NewApplication() app := tview.NewApplication()
var screen tcell.Screen var screen tcell.Screen
@ -213,8 +208,8 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
app.EnableMouse(false) app.EnableMouse(false)
ui := &UI{ ui := &UI{
commandC: commandCh,
eventBus: params.EventBus, eventBus: params.EventBus,
dispatch: params.Dispatcher,
clipboardAvailable: params.ClipboardAvailable, clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath, configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo, buildInfo: params.BuildInfo,
@ -271,15 +266,13 @@ func (ui *UI) renderAboutView() {
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false) ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
} }
// C returns a channel that receives commands from the user interface.
func (ui *UI) C() <-chan domain.Command {
return ui.commandC
}
func (ui *UI) run(ctx context.Context) { func (ui *UI) run(ctx context.Context) {
defer close(ui.commandC) defer func() {
// Ensure the application is stopped when the UI is closed.
ui.dispatch(event.CommandQuit{})
}()
mediaServerStartedC := ui.eventBus.Register(event.EventNameMediaServerStarted) eventC := ui.eventBus.Register()
uiDone := make(chan struct{}) uiDone := make(chan struct{})
go func() { go func() {
@ -294,8 +287,34 @@ func (ui *UI) run(ctx context.Context) {
for { for {
select { select {
case evt := <-mediaServerStartedC: case evt := <-eventC:
ui.handleMediaServerStarted(evt.(event.MediaServerStartedEvent)) ui.app.QueueUpdateDraw(func() {
switch evt := evt.(type) {
case event.AppStateChangedEvent:
ui.handleAppStateChanged(evt)
case event.DestinationAddedEvent:
ui.handleDestinationAdded(evt)
case event.StartDestinationFailedEvent:
ui.handleStartDestinationFailed(evt)
case event.AddDestinationFailedEvent:
ui.handleDestinationEventError(evt.Err)
case event.DestinationStreamExitedEvent:
ui.handleDestinationStreamExited(evt)
case event.DestinationRemovedEvent:
ui.handleDestinationRemoved(evt)
case event.RemoveDestinationFailedEvent:
ui.handleDestinationEventError(evt.Err)
case event.OtherInstanceDetectedEvent:
ui.handleOtherInstanceDetected(evt)
case event.MediaServerStartedEvent:
ui.handleMediaServerStarted(evt)
case event.FatalErrorOccurredEvent:
ui.handleFatalErrorOccurred(evt)
default:
ui.logger.Warn("unhandled event", "event", evt)
}
})
case <-ctx.Done(): case <-ctx.Done():
return return
case <-uiDone: case <-uiDone:
@ -310,7 +329,7 @@ func (ui *UI) handleMediaServerStarted(evt event.MediaServerStartedEvent) {
ui.rtmpsURL = evt.RTMPSURL ui.rtmpsURL = evt.RTMPSURL
ui.mu.Unlock() ui.mu.Unlock()
ui.app.QueueUpdateDraw(ui.renderAboutView) ui.renderAboutView()
} }
func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey { func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
@ -381,28 +400,17 @@ func (ui *UI) fkeyHandler(key tcell.Key) {
} }
} }
func (ui *UI) ShowSourceNotLiveModal() { func (ui *UI) handleStartDestinationFailed(event.StartDestinationFailedEvent) {
ui.app.QueueUpdateDraw(func() {
ui.showModal( ui.showModal(
pageNameModalNotLive, pageNameModalStartDestinationFailed,
"Waiting for stream.\n\nStart streaming to a source URL then try again.", "Waiting for stream.\n\nStart streaming to a source URL then try again.",
[]string{"Ok"}, []string{"Ok"},
false, false,
nil, nil,
) )
})
} }
// ShowStartupCheckModal shows a modal dialog to the user, asking if they want func (ui *UI) handleOtherInstanceDetected(event.OtherInstanceDetectedEvent) {
// to kill a running instance of Octoplex.
//
// The method will block until the user has made a choice, after which the
// channel will receive true if the user wants to quit the other instance, or
// false to quit this instance.
func (ui *UI) ShowStartupCheckModal() bool {
done := make(chan bool)
ui.app.QueueUpdateDraw(func() {
ui.showModal( ui.showModal(
pageNameModalStartupCheck, pageNameModalStartupCheck,
"Another instance of Octoplex may already be running.\n\nPressing continue will close that instance. Continue?", "Another instance of Octoplex may already be running.\n\nPressing continue will close that instance. Continue?",
@ -410,50 +418,41 @@ func (ui *UI) ShowStartupCheckModal() bool {
false, false,
func(buttonIndex int, _ string) { func(buttonIndex int, _ string) {
if buttonIndex == 0 { if buttonIndex == 0 {
done <- true ui.dispatch(event.CommandCloseOtherInstance{})
} else { } else {
done <- false ui.dispatch(event.CommandQuit{})
} }
}, },
) )
})
return <-done
} }
func (ui *UI) ShowDestinationErrorModal(name string, err error) { func (ui *UI) handleDestinationStreamExited(evt event.DestinationStreamExitedEvent) {
ui.app.QueueUpdateDraw(func() {
ui.showModal( ui.showModal(
pageNameModalDestinationError, pageNameModalDestinationError,
fmt.Sprintf( fmt.Sprintf(
"Streaming to %s failed:\n\n%s", "Streaming to %s failed:\n\n%s",
cmp.Or(name, "this destination"), cmp.Or(evt.Name, "this destination"),
err, evt.Err,
), ),
[]string{"Ok"}, []string{"Ok"},
true, true,
nil, nil,
) )
})
} }
// ShowFatalErrorModal displays the provided error. It sends a CommandQuit to the func (ui *UI) handleFatalErrorOccurred(evt event.FatalErrorOccurredEvent) {
// command channel when the user selects the Quit button.
func (ui *UI) ShowFatalErrorModal(errString string) {
ui.app.QueueUpdateDraw(func() {
ui.showModal( ui.showModal(
pageNameModalFatalError, pageNameModalFatalError,
fmt.Sprintf( fmt.Sprintf(
"An error occurred:\n\n%s", "An error occurred:\n\n%s",
errString, evt.Message,
), ),
[]string{"Quit"}, []string{"Quit"},
false, false,
func(int, string) { func(int, string) {
ui.commandC <- domain.CommandQuit{} ui.dispatch(event.CommandQuit{})
}, },
) )
})
} }
func (ui *UI) afterDrawHandler(screen tcell.Screen) { func (ui *UI) afterDrawHandler(screen tcell.Screen) {
@ -484,8 +483,9 @@ func (ui *UI) captureScreen(screen tcell.Screen) {
} }
} }
// SetState sets the state of the terminal user interface. func (ui *UI) handleAppStateChanged(evt event.AppStateChangedEvent) {
func (ui *UI) SetState(state domain.AppState) { state := evt.State
if state.Source.ExitReason != "" { if state.Source.ExitReason != "" {
ui.handleMediaServerClosed(state.Source.ExitReason) ui.handleMediaServerClosed(state.Source.ExitReason)
} }
@ -500,10 +500,7 @@ func (ui *UI) SetState(state domain.AppState) {
ui.hasDestinations = len(state.Destinations) > 0 ui.hasDestinations = len(state.Destinations) > 0
ui.mu.Unlock() ui.mu.Unlock()
// The state is mutable so can't be passed into QueueUpdateDraw, which ui.redrawFromState(state)
// passes it to another goroutine, without cloning it first.
stateClone := state.Clone()
ui.app.QueueUpdateDraw(func() { ui.redrawFromState(stateClone) })
} }
func (ui *UI) updatePullProgress(state domain.AppState) { func (ui *UI) updatePullProgress(state domain.AppState) {
@ -524,9 +521,7 @@ func (ui *UI) updatePullProgress(state domain.AppState) {
} }
if len(pullingContainers) == 0 { if len(pullingContainers) == 0 {
ui.app.QueueUpdateDraw(func() {
ui.hideModal(pageNameModalPullProgress) ui.hideModal(pageNameModalPullProgress)
})
return return
} }
@ -540,7 +535,6 @@ func (ui *UI) updatePullProgress(state domain.AppState) {
} }
func (ui *UI) updateProgressModal(container domain.Container) { func (ui *UI) updateProgressModal(container domain.Container) {
ui.app.QueueUpdateDraw(func() {
modalName := string(pageNameModalPullProgress) modalName := string(pageNameModalPullProgress)
var status string var status string
@ -562,7 +556,6 @@ func (ui *UI) updateProgressModal(container domain.Container) {
} else { } else {
ui.pages.AddPage(modalName, ui.pullProgressModal, true, true) ui.pages.AddPage(modalName, ui.pullProgressModal, true, true)
} }
})
} }
// page names represent a specific page in the terminal user interface. // page names represent a specific page in the terminal user interface.
@ -583,8 +576,8 @@ const (
pageNameModalQuit = "modal-quit" pageNameModalQuit = "modal-quit"
pageNameModalRemoveDestination = "modal-remove-destination" pageNameModalRemoveDestination = "modal-remove-destination"
pageNameModalSourceError = "modal-source-error" pageNameModalSourceError = "modal-source-error"
pageNameModalStartDestinationFailed = "modal-start-destination-failed"
pageNameModalStartupCheck = "modal-startup-check" pageNameModalStartupCheck = "modal-startup-check"
pageNameModalNotLive = "modal-not-live"
) )
// modalVisible returns true if any modal, including the add destination form, // modalVisible returns true if any modal, including the add destination form,
@ -692,7 +685,6 @@ func (ui *UI) hideModal(pageName string) {
} }
func (ui *UI) handleMediaServerClosed(exitReason string) { func (ui *UI) handleMediaServerClosed(exitReason string) {
ui.app.QueueUpdateDraw(func() {
if ui.pages.HasPage(pageNameModalSourceError) { if ui.pages.HasPage(pageNameModalSourceError) {
return return
} }
@ -703,12 +695,11 @@ func (ui *UI) handleMediaServerClosed(exitReason string) {
SetBackgroundColor(tcell.ColorBlack). SetBackgroundColor(tcell.ColorBlack).
SetTextColor(tcell.ColorWhite). SetTextColor(tcell.ColorWhite).
SetDoneFunc(func(int, string) { SetDoneFunc(func(int, string) {
ui.commandC <- domain.CommandQuit{} ui.dispatch(event.CommandQuit{})
}) })
modal.SetBorderStyle(tcell.StyleDefault.Background(tcell.ColorBlack).Foreground(tcell.ColorWhite)) modal.SetBorderStyle(tcell.StyleDefault.Background(tcell.ColorBlack).Foreground(tcell.ColorWhite))
ui.pages.AddPage(pageNameModalSourceError, modal, true, true) ui.pages.AddPage(pageNameModalSourceError, modal, true, true)
})
} }
const dash = "—" const dash = "—"
@ -856,24 +847,6 @@ func (ui *UI) Close() {
ui.app.Stop() ui.app.Stop()
} }
func (ui *UI) ConfigUpdateFailed(err error) {
ui.app.QueueUpdateDraw(func() {
ui.showModal(
pageNameConfigUpdateFailed,
"Configuration update failed:\n\n"+err.Error(),
[]string{"Ok"},
false,
func(int, string) {
pageName, frontPage := ui.pages.GetFrontPage()
if pageName != pageNameAddDestination {
ui.logger.Warn("Unexpected page when configuration form closed", "page", pageName)
}
ui.app.SetFocus(frontPage)
},
)
})
}
func (ui *UI) addDestination() { func (ui *UI) addDestination() {
const ( const (
inputLen = 60 inputLen = 60
@ -893,10 +866,10 @@ func (ui *UI) addDestination() {
AddInputField(inputLabelName, "My stream", inputLen, nil, nil). AddInputField(inputLabelName, "My stream", inputLen, nil, nil).
AddInputField(inputLabelURL, "rtmp://", inputLen, nil, nil). AddInputField(inputLabelURL, "rtmp://", inputLen, nil, nil).
AddButton("Add", func() { AddButton("Add", func() {
ui.commandC <- domain.CommandAddDestination{ ui.dispatch(event.CommandAddDestination{
DestinationName: form.GetFormItemByLabel(inputLabelName).(*tview.InputField).GetText(), DestinationName: form.GetFormItemByLabel(inputLabelName).(*tview.InputField).GetText(),
URL: form.GetFormItemByLabel(inputLabelURL).(*tview.InputField).GetText(), URL: form.GetFormItemByLabel(inputLabelURL).(*tview.InputField).GetText(),
} })
}). }).
AddButton("Cancel", func() { AddButton("Cancel", func() {
ui.closeAddDestinationForm() ui.closeAddDestinationForm()
@ -951,30 +924,42 @@ func (ui *UI) removeDestination() {
false, false,
func(buttonIndex int, _ string) { func(buttonIndex int, _ string) {
if buttonIndex == 0 { if buttonIndex == 0 {
ui.commandC <- domain.CommandRemoveDestination{URL: url} ui.dispatch(event.CommandRemoveDestination{URL: url})
} }
}, },
) )
} }
// DestinationAdded should be called when a new destination is added. func (ui *UI) handleDestinationAdded(event.DestinationAddedEvent) {
func (ui *UI) DestinationAdded() {
ui.mu.Lock() ui.mu.Lock()
ui.hasDestinations = true ui.hasDestinations = true
ui.mu.Unlock() ui.mu.Unlock()
ui.app.QueueUpdateDraw(func() {
ui.pages.HidePage(pageNameNoDestinations) ui.pages.HidePage(pageNameNoDestinations)
ui.closeAddDestinationForm() ui.closeAddDestinationForm()
ui.selectLastDestination() ui.selectLastDestination()
})
} }
// DestinationRemoved should be called when a destination is removed. func (ui *UI) handleDestinationRemoved(event.DestinationRemovedEvent) {
func (ui *UI) DestinationRemoved() {
ui.selectPreviousDestination() ui.selectPreviousDestination()
} }
func (ui *UI) handleDestinationEventError(err error) {
ui.showModal(
pageNameConfigUpdateFailed,
"Configuration update failed:\n\n"+err.Error(),
[]string{"Ok"},
false,
func(int, string) {
pageName, frontPage := ui.pages.GetFrontPage()
if pageName != pageNameAddDestination {
ui.logger.Warn("Unexpected page when configuration form closed", "page", pageName)
}
ui.app.SetFocus(frontPage)
},
)
}
func (ui *UI) closeAddDestinationForm() { func (ui *UI) closeAddDestinationForm() {
var hasDestinations bool var hasDestinations bool
ui.mu.Lock() ui.mu.Lock()
@ -1015,12 +1000,12 @@ func (ui *UI) toggleDestination() {
switch ss { switch ss {
case startStateNotStarted: case startStateNotStarted:
ui.urlsToStartState[url] = startStateStarting ui.urlsToStartState[url] = startStateStarting
ui.commandC <- domain.CommandStartDestination{URL: url} ui.dispatch(event.CommandStartDestination{URL: url})
case startStateStarting: case startStateStarting:
// do nothing // do nothing
return return
case startStateStarted: case startStateStarted:
ui.commandC <- domain.CommandStopDestination{URL: url} ui.dispatch(event.CommandStopDestination{URL: url})
} }
} }
@ -1073,7 +1058,7 @@ func (ui *UI) confirmQuit() {
false, false,
func(buttonIndex int, _ string) { func(buttonIndex int, _ string) {
if buttonIndex == 0 { if buttonIndex == 0 {
ui.commandC <- domain.CommandQuit{} ui.dispatch(event.CommandQuit{})
} }
}, },
) )

View File

@ -97,9 +97,7 @@ func run(ctx context.Context) error {
return fmt.Errorf("read build info: %w", err) return fmt.Errorf("read build info: %w", err)
} }
return app.Run( app := app.New(app.Params{
ctx,
app.RunParams{
ConfigService: configService, ConfigService: configService,
DockerClient: dockerClient, DockerClient: dockerClient,
ClipboardAvailable: clipboardAvailable, ClipboardAvailable: clipboardAvailable,
@ -111,8 +109,9 @@ func run(ctx context.Context) error {
Date: date, Date: date,
}, },
Logger: logger, Logger: logger,
}, })
)
return app.Run(ctx)
} }
// editConfigFile opens the config file in the user's editor. // editConfigFile opens the config file in the user's editor.