Compare commits
20 Commits
main
...
build/wood
Author | SHA1 | Date | |
---|---|---|---|
7566c207b0 | |||
53d52b3673 | |||
3e2954f33b | |||
586a376405 | |||
750e9432be | |||
d313c1e020 | |||
812d3901d3 | |||
caa543703e | |||
8403d751b6 | |||
2f1cadcf40 | |||
1f4a931903 | |||
94623248c0 | |||
4a16780915 | |||
3019387f38 | |||
f4021a2886 | |||
b8550f050b | |||
c02a66202f | |||
4029c66a4a | |||
cdf41e47c3 | |||
d7f8fb49eb |
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@ -1,4 +1,4 @@
|
||||
name: ci-build
|
||||
name: build
|
||||
run-name: Building ${{ github.ref_name }}
|
||||
on:
|
||||
push:
|
||||
|
2
.github/workflows/codeql.yml
vendored
2
.github/workflows/codeql.yml
vendored
@ -1,4 +1,4 @@
|
||||
name: ci-scan
|
||||
name: codeql
|
||||
|
||||
on:
|
||||
push:
|
||||
|
9
.woodpecker.yml
Normal file
9
.woodpecker.yml
Normal file
@ -0,0 +1,9 @@
|
||||
---
|
||||
when:
|
||||
- event: push
|
||||
|
||||
steps:
|
||||
- name: lint
|
||||
image: koalaman/shellcheck:stable
|
||||
commands:
|
||||
- shellcheck $(find . -type f -name "*.sh")
|
@ -33,11 +33,12 @@ broadcast to localhost:1935 or localhost:1936.
|
||||
|
||||
## Opening a pull request
|
||||
|
||||
Pull requests are welcome, but please propose significant changes in a
|
||||
Pull requests are welcome, please propose significant changes in a
|
||||
[discussion](https://github.com/rfwatson/octoplex/discussions) first.
|
||||
|
||||
1. Fork the repo
|
||||
2. Make your changes, including test coverage
|
||||
3. Push the changes to a branch
|
||||
4. Ensure the branch is passing
|
||||
5. Open a pull request
|
||||
1. Make your changes, including test coverage
|
||||
1. Run the formatter (`mise run format`)
|
||||
1. Push the changes to a branch
|
||||
1. Ensure the branch is passing
|
||||
1. Open a pull request
|
||||
|
16
README.md
16
README.md
@ -1,11 +1,12 @@
|
||||
# Octoplex :octopus:
|
||||
|
||||

|
||||

|
||||
[](https://github.com/rfwatson/octoplex/actions/workflows/build.yml)
|
||||
[](https://github.com/rfwatson/octoplex/actions/workflows/codeql.yml)
|
||||

|
||||
[](https://goreportcard.com/report/git.netflux.io/rob/octoplex)
|
||||
[](https://www.gnu.org/licenses/agpl-3.0)
|
||||
|
||||
Octoplex is a live video restreamer for the terminal.
|
||||
Octoplex is a Docker-native live video restreamer.
|
||||
|
||||
* Restream RTMP/RTMPS to unlimited destinations
|
||||
* Broadcast using OBS or any standard tool
|
||||
@ -38,8 +39,7 @@ Octoplex is a live video restreamer for the terminal.
|
||||
|
||||
### Docker Engine
|
||||
|
||||
First, make sure Docker Engine is installed. Octoplex uses Docker to manage
|
||||
FFmpeg and other streaming tools.
|
||||
First, ensure that Docker Engine is installed.
|
||||
|
||||
Linux: See https://docs.docker.com/engine/install/.
|
||||
|
||||
@ -160,14 +160,12 @@ localhost (`127.0.0.1`) or use `0.0.0.0` to bind to all network interfaces.
|
||||
|
||||
## Contributing
|
||||
|
||||
See [CONTRIBUTING.md](/CONTRIBUTING.md).
|
||||
|
||||
### Bug reports
|
||||
|
||||
Open bug reports [on GitHub](https://github.com/rfwatson/octoplex/issues/new).
|
||||
|
||||
### Pull requests
|
||||
|
||||
Pull requests are welcome.
|
||||
|
||||
## Acknowledgements
|
||||
|
||||
Octoplex is built on and/or makes use of other free and open source software,
|
||||
|
@ -1,6 +1,7 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -18,10 +19,25 @@ import (
|
||||
"github.com/docker/docker/client"
|
||||
)
|
||||
|
||||
// RunParams holds the parameters for running the application.
|
||||
type RunParams struct {
|
||||
// App is an instance of the app.
|
||||
type App struct {
|
||||
cfg config.Config
|
||||
configService *config.Service
|
||||
eventBus *event.Bus
|
||||
dispatchC chan event.Command
|
||||
dockerClient container.DockerClient
|
||||
screen *terminal.Screen // Screen may be nil.
|
||||
clipboardAvailable bool
|
||||
configFilePath string
|
||||
buildInfo domain.BuildInfo
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// Params holds the parameters for running the application.
|
||||
type Params struct {
|
||||
ConfigService *config.Service
|
||||
DockerClient container.DockerClient
|
||||
ChanSize int
|
||||
Screen *terminal.Screen // Screen may be nil.
|
||||
ClipboardAvailable bool
|
||||
ConfigFilePath string
|
||||
@ -29,31 +45,44 @@ type RunParams struct {
|
||||
Logger *slog.Logger
|
||||
}
|
||||
|
||||
// defaultChanSize is the default size of the dispatch channel.
|
||||
const defaultChanSize = 64
|
||||
|
||||
// New creates a new application instance.
|
||||
func New(params Params) *App {
|
||||
return &App{
|
||||
cfg: params.ConfigService.Current(),
|
||||
configService: params.ConfigService,
|
||||
eventBus: event.NewBus(params.Logger.With("component", "event_bus")),
|
||||
dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)),
|
||||
dockerClient: params.DockerClient,
|
||||
screen: params.Screen,
|
||||
clipboardAvailable: params.ClipboardAvailable,
|
||||
configFilePath: params.ConfigFilePath,
|
||||
buildInfo: params.BuildInfo,
|
||||
logger: params.Logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the application, and blocks until it exits.
|
||||
func Run(ctx context.Context, params RunParams) error {
|
||||
logger := params.Logger
|
||||
eventBus := event.NewBus(logger.With("component", "event_bus"))
|
||||
|
||||
// cfg is the current configuration of the application, as reflected in the
|
||||
// config file.
|
||||
cfg := params.ConfigService.Current()
|
||||
|
||||
func (a *App) Run(ctx context.Context) error {
|
||||
// state is the current state of the application, as reflected in the UI.
|
||||
state := new(domain.AppState)
|
||||
applyConfig(cfg, state)
|
||||
applyConfig(a.cfg, state)
|
||||
|
||||
// Ensure there is at least one active source.
|
||||
if !cfg.Sources.MediaServer.RTMP.Enabled && !cfg.Sources.MediaServer.RTMPS.Enabled {
|
||||
if !a.cfg.Sources.MediaServer.RTMP.Enabled && !a.cfg.Sources.MediaServer.RTMPS.Enabled {
|
||||
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
|
||||
}
|
||||
|
||||
ui, err := terminal.StartUI(ctx, terminal.StartParams{
|
||||
EventBus: eventBus,
|
||||
Screen: params.Screen,
|
||||
ClipboardAvailable: params.ClipboardAvailable,
|
||||
ConfigFilePath: params.ConfigFilePath,
|
||||
BuildInfo: params.BuildInfo,
|
||||
Logger: logger.With("component", "ui"),
|
||||
EventBus: a.eventBus,
|
||||
Dispatcher: func(cmd event.Command) { a.dispatchC <- cmd },
|
||||
Screen: a.screen,
|
||||
ClipboardAvailable: a.clipboardAvailable,
|
||||
ConfigFilePath: a.configFilePath,
|
||||
BuildInfo: a.buildInfo,
|
||||
Logger: a.logger.With("component", "ui"),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("start terminal user interface: %w", err)
|
||||
@ -69,50 +98,56 @@ func Run(ctx context.Context, params RunParams) error {
|
||||
// It is only needed for integration tests when rendering modals before the
|
||||
// main loop starts. It would be nice to remove this but the risk/impact on
|
||||
// non-test code is pretty low.
|
||||
emptyUI := func() { ui.SetState(domain.AppState{}) }
|
||||
emptyUI := func() {
|
||||
a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}})
|
||||
}
|
||||
|
||||
containerClient, err := container.NewClient(ctx, params.DockerClient, logger.With("component", "container_client"))
|
||||
containerClient, err := container.NewClient(ctx, a.dockerClient, a.logger.With("component", "container_client"))
|
||||
if err != nil {
|
||||
err = fmt.Errorf("create container client: %w", err)
|
||||
|
||||
var errString string
|
||||
var msg string
|
||||
if client.IsErrConnectionFailed(err) {
|
||||
errString = "Could not connect to Docker. Is Docker installed and running?"
|
||||
msg = "Could not connect to Docker. Is Docker installed and running?"
|
||||
} else {
|
||||
errString = err.Error()
|
||||
msg = err.Error()
|
||||
}
|
||||
ui.ShowFatalErrorModal(errString)
|
||||
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
|
||||
|
||||
emptyUI()
|
||||
<-ui.C()
|
||||
<-a.dispatchC
|
||||
return err
|
||||
}
|
||||
defer containerClient.Close()
|
||||
|
||||
updateUI := func() { ui.SetState(*state) }
|
||||
updateUI := func() {
|
||||
// The state is mutable so can't be passed into another goroutine
|
||||
// without cloning it first.
|
||||
a.eventBus.Send(event.AppStateChangedEvent{State: state.Clone()})
|
||||
}
|
||||
updateUI()
|
||||
|
||||
var tlsCertPath, tlsKeyPath string
|
||||
if cfg.Sources.MediaServer.TLS != nil {
|
||||
tlsCertPath = cfg.Sources.MediaServer.TLS.CertPath
|
||||
tlsKeyPath = cfg.Sources.MediaServer.TLS.KeyPath
|
||||
if a.cfg.Sources.MediaServer.TLS != nil {
|
||||
tlsCertPath = a.cfg.Sources.MediaServer.TLS.CertPath
|
||||
tlsKeyPath = a.cfg.Sources.MediaServer.TLS.KeyPath
|
||||
}
|
||||
|
||||
srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{
|
||||
RTMPAddr: buildNetAddr(cfg.Sources.MediaServer.RTMP),
|
||||
RTMPSAddr: buildNetAddr(cfg.Sources.MediaServer.RTMPS),
|
||||
Host: cfg.Sources.MediaServer.Host,
|
||||
RTMPAddr: buildNetAddr(a.cfg.Sources.MediaServer.RTMP),
|
||||
RTMPSAddr: buildNetAddr(a.cfg.Sources.MediaServer.RTMPS),
|
||||
Host: a.cfg.Sources.MediaServer.Host,
|
||||
TLSCertPath: tlsCertPath,
|
||||
TLSKeyPath: tlsKeyPath,
|
||||
StreamKey: mediaserver.StreamKey(cfg.Sources.MediaServer.StreamKey),
|
||||
StreamKey: mediaserver.StreamKey(a.cfg.Sources.MediaServer.StreamKey),
|
||||
ContainerClient: containerClient,
|
||||
Logger: logger.With("component", "mediaserver"),
|
||||
Logger: a.logger.With("component", "mediaserver"),
|
||||
})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("create mediaserver: %w", err)
|
||||
ui.ShowFatalErrorModal(err.Error())
|
||||
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: err.Error()})
|
||||
emptyUI()
|
||||
<-ui.C()
|
||||
<-a.dispatchC
|
||||
return err
|
||||
}
|
||||
defer srv.Close()
|
||||
@ -120,7 +155,7 @@ func Run(ctx context.Context, params RunParams) error {
|
||||
repl := replicator.StartActor(ctx, replicator.StartActorParams{
|
||||
SourceURL: srv.RTMPInternalURL(),
|
||||
ContainerClient: containerClient,
|
||||
Logger: logger.With("component", "replicator"),
|
||||
Logger: a.logger.With("component", "replicator"),
|
||||
})
|
||||
defer repl.Close()
|
||||
|
||||
@ -128,87 +163,45 @@ func Run(ctx context.Context, params RunParams) error {
|
||||
uiUpdateT := time.NewTicker(uiUpdateInterval)
|
||||
defer uiUpdateT.Stop()
|
||||
|
||||
startupCheckC := doStartupCheck(ctx, containerClient, ui.ShowStartupCheckModal)
|
||||
startMediaServerC := make(chan struct{}, 1)
|
||||
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
|
||||
startupErr = fmt.Errorf("startup check: %w", startupErr)
|
||||
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: startupErr.Error()})
|
||||
<-a.dispatchC
|
||||
return startupErr
|
||||
} else if ok {
|
||||
startMediaServerC <- struct{}{}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case err := <-startupCheckC:
|
||||
if errors.Is(err, errStartupCheckUserQuit) {
|
||||
case <-startMediaServerC:
|
||||
if err = srv.Start(ctx); err != nil {
|
||||
return fmt.Errorf("start mediaserver: %w", err)
|
||||
}
|
||||
|
||||
a.eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()})
|
||||
case <-a.configService.C():
|
||||
// No-op, config updates are handled synchronously for now.
|
||||
case cmd := <-a.dispatchC:
|
||||
if _, err := a.handleCommand(ctx, cmd, state, repl, containerClient, startMediaServerC); errors.Is(err, errExit) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("startup check: %w", err)
|
||||
} else {
|
||||
startupCheckC = nil
|
||||
|
||||
if err = srv.Start(ctx); err != nil {
|
||||
return fmt.Errorf("start mediaserver: %w", err)
|
||||
}
|
||||
|
||||
eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()})
|
||||
}
|
||||
case <-params.ConfigService.C():
|
||||
// No-op, config updates are handled synchronously for now.
|
||||
case cmd, ok := <-ui.C():
|
||||
if !ok {
|
||||
// TODO: keep UI open until all containers have closed
|
||||
logger.Info("UI closed")
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Debug("Command received", "cmd", cmd.Name())
|
||||
switch c := cmd.(type) {
|
||||
case domain.CommandAddDestination:
|
||||
newCfg := cfg
|
||||
newCfg.Destinations = append(newCfg.Destinations, config.Destination{
|
||||
Name: c.DestinationName,
|
||||
URL: c.URL,
|
||||
})
|
||||
if err := params.ConfigService.SetConfig(newCfg); err != nil {
|
||||
logger.Error("Config update failed", "err", err)
|
||||
ui.ConfigUpdateFailed(err)
|
||||
continue
|
||||
}
|
||||
cfg = newCfg
|
||||
handleConfigUpdate(cfg, state, ui)
|
||||
ui.DestinationAdded()
|
||||
case domain.CommandRemoveDestination:
|
||||
repl.StopDestination(c.URL) // no-op if not live
|
||||
newCfg := cfg
|
||||
newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool {
|
||||
return dest.URL == c.URL
|
||||
})
|
||||
if err := params.ConfigService.SetConfig(newCfg); err != nil {
|
||||
logger.Error("Config update failed", "err", err)
|
||||
ui.ConfigUpdateFailed(err)
|
||||
continue
|
||||
}
|
||||
cfg = newCfg
|
||||
handleConfigUpdate(cfg, state, ui)
|
||||
ui.DestinationRemoved()
|
||||
case domain.CommandStartDestination:
|
||||
if !state.Source.Live {
|
||||
ui.ShowSourceNotLiveModal()
|
||||
continue
|
||||
}
|
||||
|
||||
repl.StartDestination(c.URL)
|
||||
case domain.CommandStopDestination:
|
||||
repl.StopDestination(c.URL)
|
||||
case domain.CommandQuit:
|
||||
return nil
|
||||
return fmt.Errorf("handle command: %w", err)
|
||||
}
|
||||
case <-uiUpdateT.C:
|
||||
updateUI()
|
||||
case serverState := <-srv.C():
|
||||
logger.Debug("Server state received", "state", serverState)
|
||||
a.logger.Debug("Server state received", "state", serverState)
|
||||
applyServerState(serverState, state)
|
||||
updateUI()
|
||||
case replState := <-repl.C():
|
||||
logger.Debug("Replicator state received", "state", replState)
|
||||
a.logger.Debug("Replicator state received", "state", replState)
|
||||
destErrors := applyReplicatorState(replState, state)
|
||||
|
||||
for _, destError := range destErrors {
|
||||
handleDestError(destError, repl, ui)
|
||||
a.eventBus.Send(event.DestinationStreamExitedEvent{Name: destError.name, Err: destError.err})
|
||||
repl.StopDestination(destError.url)
|
||||
}
|
||||
|
||||
updateUI()
|
||||
@ -216,10 +209,100 @@ func Run(ctx context.Context, params RunParams) error {
|
||||
}
|
||||
}
|
||||
|
||||
// handleConfigUpdate applies the config to the app state, and updates the UI.
|
||||
func handleConfigUpdate(cfg config.Config, appState *domain.AppState, ui *terminal.UI) {
|
||||
applyConfig(cfg, appState)
|
||||
ui.SetState(*appState)
|
||||
type syncCommand struct {
|
||||
event.Command
|
||||
|
||||
done chan<- event.Event
|
||||
}
|
||||
|
||||
// Dispatch dispatches a command to be executed synchronously.
|
||||
func (a *App) Dispatch(cmd event.Command) event.Event {
|
||||
ch := make(chan event.Event, 1)
|
||||
a.dispatchC <- syncCommand{Command: cmd, done: ch}
|
||||
return <-ch
|
||||
}
|
||||
|
||||
// errExit is an error that indicates the app should exit.
|
||||
var errExit = errors.New("exit")
|
||||
|
||||
// handleCommand handles an incoming command. It may return an Event which will
|
||||
// already have been published to the event bus, but which is returned for the
|
||||
// benefit of synchronous callers. The event may be nil. It may also publish
|
||||
// other events to the event bus which are not returned. Currently the only
|
||||
// error that may be returned is [errExit], which indicates to the main event
|
||||
// loop that the app should exit.
|
||||
func (a *App) handleCommand(
|
||||
ctx context.Context,
|
||||
cmd event.Command,
|
||||
state *domain.AppState,
|
||||
repl *replicator.Actor,
|
||||
containerClient *container.Client,
|
||||
startMediaServerC chan struct{},
|
||||
) (evt event.Event, _ error) {
|
||||
a.logger.Debug("Command received", "cmd", cmd.Name())
|
||||
defer func() {
|
||||
if evt != nil {
|
||||
a.eventBus.Send(evt)
|
||||
}
|
||||
if c, ok := cmd.(syncCommand); ok {
|
||||
c.done <- evt
|
||||
}
|
||||
}()
|
||||
|
||||
switch c := cmd.(type) {
|
||||
case event.CommandAddDestination:
|
||||
newCfg := a.cfg
|
||||
newCfg.Destinations = append(newCfg.Destinations, config.Destination{
|
||||
Name: c.DestinationName,
|
||||
URL: c.URL,
|
||||
})
|
||||
if err := a.configService.SetConfig(newCfg); err != nil {
|
||||
a.logger.Error("Add destination failed", "err", err)
|
||||
return event.AddDestinationFailedEvent{Err: err}, nil
|
||||
}
|
||||
a.cfg = newCfg
|
||||
a.handleConfigUpdate(state)
|
||||
a.eventBus.Send(event.DestinationAddedEvent{URL: c.URL})
|
||||
case event.CommandRemoveDestination:
|
||||
repl.StopDestination(c.URL) // no-op if not live
|
||||
newCfg := a.cfg
|
||||
newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool {
|
||||
return dest.URL == c.URL
|
||||
})
|
||||
if err := a.configService.SetConfig(newCfg); err != nil {
|
||||
a.logger.Error("Remove destination failed", "err", err)
|
||||
a.eventBus.Send(event.RemoveDestinationFailedEvent{Err: err})
|
||||
break
|
||||
}
|
||||
a.cfg = newCfg
|
||||
a.handleConfigUpdate(state)
|
||||
a.eventBus.Send(event.DestinationRemovedEvent{URL: c.URL}) //nolint:gosimple
|
||||
case event.CommandStartDestination:
|
||||
if !state.Source.Live {
|
||||
a.eventBus.Send(event.StartDestinationFailedEvent{})
|
||||
break
|
||||
}
|
||||
|
||||
repl.StartDestination(c.URL)
|
||||
case event.CommandStopDestination:
|
||||
repl.StopDestination(c.URL)
|
||||
case event.CommandCloseOtherInstance:
|
||||
if err := closeOtherInstances(ctx, containerClient); err != nil {
|
||||
return nil, fmt.Errorf("close other instances: %w", err)
|
||||
}
|
||||
|
||||
startMediaServerC <- struct{}{}
|
||||
case event.CommandQuit:
|
||||
return nil, errExit
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// handleConfigUpdate applies the config to the app state, and sends an AppStateChangedEvent.
|
||||
func (a *App) handleConfigUpdate(appState *domain.AppState) {
|
||||
applyConfig(a.cfg, appState)
|
||||
a.eventBus.Send(event.AppStateChangedEvent{State: appState.Clone()})
|
||||
}
|
||||
|
||||
// applyServerState applies the current server state to the app state.
|
||||
@ -265,13 +348,6 @@ func applyReplicatorState(replState replicator.State, appState *domain.AppState)
|
||||
return errorsToDisplay
|
||||
}
|
||||
|
||||
// handleDestError displays a modal to the user, and stops the destination.
|
||||
func handleDestError(destError destinationError, repl *replicator.Actor, ui *terminal.UI) {
|
||||
ui.ShowDestinationErrorModal(destError.name, destError.err)
|
||||
|
||||
repl.StopDestination(destError.url)
|
||||
}
|
||||
|
||||
// applyConfig applies the config to the app state. For now we only set the
|
||||
// destinations.
|
||||
func applyConfig(cfg config.Config, appState *domain.AppState) {
|
||||
@ -301,40 +377,34 @@ func resolveDestinations(destinations []domain.Destination, inDestinations []con
|
||||
return destinations[:len(inDestinations)]
|
||||
}
|
||||
|
||||
var errStartupCheckUserQuit = errors.New("user quit startup check modal")
|
||||
|
||||
// doStartupCheck performs a startup check to see if there are any existing app
|
||||
// containers.
|
||||
//
|
||||
// It returns a channel that will be closed, possibly after receiving an error.
|
||||
// If the error is non-nil the app must not be started. If the error is
|
||||
// [errStartupCheckUserQuit], the user voluntarily quit the startup check
|
||||
// modal.
|
||||
func doStartupCheck(ctx context.Context, containerClient *container.Client, showModal func() bool) <-chan error {
|
||||
ch := make(chan error, 1)
|
||||
// It returns a bool if the check is clear. If the bool is false, then
|
||||
// startup should be paused until the choice selected by the user is received
|
||||
// via a command.
|
||||
func doStartupCheck(ctx context.Context, containerClient *container.Client, eventBus *event.Bus) (bool, error) {
|
||||
if exists, err := containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil {
|
||||
return false, fmt.Errorf("check existing containers: %w", err)
|
||||
} else if exists {
|
||||
eventBus.Send(event.OtherInstanceDetectedEvent{})
|
||||
|
||||
go func() {
|
||||
defer close(ch)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if exists, err := containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil {
|
||||
ch <- fmt.Errorf("check existing containers: %w", err)
|
||||
} else if exists {
|
||||
if showModal() {
|
||||
if err = containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil {
|
||||
ch <- fmt.Errorf("remove existing containers: %w", err)
|
||||
return
|
||||
}
|
||||
if err = containerClient.RemoveUnusedNetworks(ctx); err != nil {
|
||||
ch <- fmt.Errorf("remove unused networks: %w", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
ch <- errStartupCheckUserQuit
|
||||
}
|
||||
}
|
||||
}()
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return ch
|
||||
func closeOtherInstances(ctx context.Context, containerClient *container.Client) error {
|
||||
if err := containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil {
|
||||
return fmt.Errorf("remove existing containers: %w", err)
|
||||
}
|
||||
|
||||
if err := containerClient.RemoveUnusedNetworks(ctx); err != nil {
|
||||
return fmt.Errorf("remove unused networks: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// buildNetAddr builds a [mediaserver.OptionalNetAddr] from the config.
|
||||
|
@ -31,10 +31,10 @@ func buildAppParams(
|
||||
screen tcell.SimulationScreen,
|
||||
screenCaptureC chan<- terminal.ScreenCapture,
|
||||
logger *slog.Logger,
|
||||
) app.RunParams {
|
||||
) app.Params {
|
||||
t.Helper()
|
||||
|
||||
return app.RunParams{
|
||||
return app.Params{
|
||||
ConfigService: configService,
|
||||
DockerClient: dockerClient,
|
||||
Screen: &terminal.Screen{
|
||||
@ -150,25 +150,36 @@ func printScreen(t *testing.T, getContents func() []string, label string) {
|
||||
func sendKey(t *testing.T, screen tcell.SimulationScreen, key tcell.Key, ch rune) {
|
||||
t.Helper()
|
||||
|
||||
screen.InjectKey(key, ch, tcell.ModNone)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
const (
|
||||
waitTime = 50 * time.Millisecond
|
||||
maxTries = 50
|
||||
)
|
||||
|
||||
for i := 0; i < maxTries; i++ {
|
||||
if err := screen.PostEvent(tcell.NewEventKey(key, ch, tcell.ModNone)); err != nil {
|
||||
time.Sleep(waitTime)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
t.Fatalf("Failed to send key event after %d tries", maxTries)
|
||||
}
|
||||
|
||||
func sendKeys(t *testing.T, screen tcell.SimulationScreen, keys string) {
|
||||
t.Helper()
|
||||
|
||||
screen.InjectKeyBytes([]byte(keys))
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
for _, ch := range keys {
|
||||
sendKey(t, screen, tcell.KeyRune, ch)
|
||||
}
|
||||
}
|
||||
|
||||
func sendBackspaces(t *testing.T, screen tcell.SimulationScreen, n int) {
|
||||
t.Helper()
|
||||
|
||||
for range n {
|
||||
screen.InjectKey(tcell.KeyBackspace, ' ', tcell.ModNone)
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
sendKey(t, screen, tcell.KeyBackspace, 0)
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
|
||||
// kickFirstRTMPConn kicks the first RTMP connection from the mediaMTX server.
|
||||
|
@ -132,7 +132,7 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
err := app.Run(ctx, app.RunParams{
|
||||
require.NoError(t, app.New(app.Params{
|
||||
ConfigService: configService,
|
||||
DockerClient: dockerClient,
|
||||
Screen: &terminal.Screen{
|
||||
@ -144,8 +144,7 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
|
||||
ClipboardAvailable: false,
|
||||
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
|
||||
Logger: logger,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
@ -182,11 +181,11 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
|
||||
// Add a second destination in-app:
|
||||
sendKey(t, screen, tcell.KeyRune, 'a')
|
||||
|
||||
sendBackspaces(t, screen, 30)
|
||||
sendBackspaces(t, screen, 10)
|
||||
sendKeys(t, screen, "Local server 2")
|
||||
sendKey(t, screen, tcell.KeyTab, ' ')
|
||||
|
||||
sendBackspaces(t, screen, 30)
|
||||
sendBackspaces(t, screen, 10)
|
||||
sendKeys(t, screen, destURL2)
|
||||
sendKey(t, screen, tcell.KeyTab, ' ')
|
||||
sendKey(t, screen, tcell.KeyEnter, ' ')
|
||||
@ -320,7 +319,7 @@ func TestIntegrationCustomHost(t *testing.T) {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
time.Sleep(time.Second)
|
||||
@ -391,7 +390,7 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
@ -472,7 +471,7 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
@ -609,7 +608,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
@ -682,7 +681,7 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
@ -824,7 +823,7 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
@ -893,7 +892,7 @@ func TestIntegrationMediaServerError(t *testing.T) {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
@ -934,7 +933,7 @@ func TestIntegrationDockerClientError(t *testing.T) {
|
||||
|
||||
require.EqualError(
|
||||
t,
|
||||
app.Run(ctx, buildAppParams(t, configService, &dockerClient, screen, screenCaptureC, logger)),
|
||||
app.New(buildAppParams(t, configService, &dockerClient, screen, screenCaptureC, logger)).Run(ctx),
|
||||
"create container client: network create: boom",
|
||||
)
|
||||
}()
|
||||
@ -974,7 +973,7 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
err := app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))
|
||||
err := app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)
|
||||
require.ErrorContains(t, err, "dial tcp: lookup docker.example.com")
|
||||
require.ErrorContains(t, err, "no such host")
|
||||
}()
|
||||
@ -1070,7 +1069,7 @@ func TestIntegrationCopyURLs(t *testing.T) {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)))
|
||||
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
|
@ -1 +0,0 @@
|
||||
package domain
|
@ -7,29 +7,9 @@ import (
|
||||
|
||||
const defaultChannelSize = 64
|
||||
|
||||
type Name string
|
||||
|
||||
const (
|
||||
EventNameMediaServerStarted Name = "media_server_started"
|
||||
)
|
||||
|
||||
type Event interface {
|
||||
name() Name
|
||||
}
|
||||
|
||||
// MediaServerStartedEvent is emitted when the mediaserver component starts successfully.
|
||||
type MediaServerStartedEvent struct {
|
||||
RTMPURL string
|
||||
RTMPSURL string
|
||||
}
|
||||
|
||||
func (e MediaServerStartedEvent) name() Name {
|
||||
return "media_server_started"
|
||||
}
|
||||
|
||||
// Bus is an event bus.
|
||||
type Bus struct {
|
||||
consumers map[Name][]chan Event
|
||||
consumers []chan Event
|
||||
mu sync.Mutex
|
||||
logger *slog.Logger
|
||||
}
|
||||
@ -37,18 +17,17 @@ type Bus struct {
|
||||
// NewBus returns a new event bus.
|
||||
func NewBus(logger *slog.Logger) *Bus {
|
||||
return &Bus{
|
||||
consumers: make(map[Name][]chan Event),
|
||||
logger: logger,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Register registers a consumer for a given event.
|
||||
func (b *Bus) Register(name Name) <-chan Event {
|
||||
// Register registers a consumer for all events.
|
||||
func (b *Bus) Register() <-chan Event {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
ch := make(chan Event, defaultChannelSize)
|
||||
b.consumers[name] = append(b.consumers[name], ch)
|
||||
b.consumers = append(b.consumers, ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
@ -60,7 +39,7 @@ func (b *Bus) Send(evt Event) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
for _, ch := range b.consumers[evt.name()] {
|
||||
for _, ch := range b.consumers {
|
||||
select {
|
||||
case ch <- evt:
|
||||
default:
|
||||
|
@ -11,8 +11,8 @@ import (
|
||||
func TestBus(t *testing.T) {
|
||||
bus := event.NewBus(testhelpers.NewTestLogger(t))
|
||||
|
||||
ch1 := bus.Register(event.EventNameMediaServerStarted)
|
||||
ch2 := bus.Register(event.EventNameMediaServerStarted)
|
||||
ch1 := bus.Register()
|
||||
ch2 := bus.Register()
|
||||
|
||||
evt := event.MediaServerStartedEvent{
|
||||
RTMPURL: "rtmp://rtmp.example.com/live",
|
||||
|
@ -1,4 +1,4 @@
|
||||
package domain
|
||||
package event
|
||||
|
||||
// CommandAddDestination adds a destination.
|
||||
type CommandAddDestination struct {
|
||||
@ -41,6 +41,14 @@ func (c CommandStopDestination) Name() string {
|
||||
return "stop_destination"
|
||||
}
|
||||
|
||||
// CommandCloseOtherInstance closes the other instance of the application.
|
||||
type CommandCloseOtherInstance struct{}
|
||||
|
||||
// Name implements the Command interface.
|
||||
func (c CommandCloseOtherInstance) Name() string {
|
||||
return "close_other_instance"
|
||||
}
|
||||
|
||||
// CommandQuit quits the app.
|
||||
type CommandQuit struct{}
|
||||
|
114
internal/event/events.go
Normal file
114
internal/event/events.go
Normal file
@ -0,0 +1,114 @@
|
||||
package event
|
||||
|
||||
import "git.netflux.io/rob/octoplex/internal/domain"
|
||||
|
||||
type Name string
|
||||
|
||||
const (
|
||||
EventNameAppStateChanged Name = "app_state_changed"
|
||||
EventNameDestinationAdded Name = "destination_added"
|
||||
EventNameAddDestinationFailed Name = "add_destination_failed"
|
||||
EventNameDestinationStreamExited Name = "destination_stream_exited"
|
||||
EventNameStartDestinationFailed Name = "start_destination_failed"
|
||||
EventNameDestinationRemoved Name = "destination_removed"
|
||||
EventNameRemoveDestinationFailed Name = "remove_destination_failed"
|
||||
EventNameFatalErrorOccurred Name = "fatal_error_occurred"
|
||||
EventNameOtherInstanceDetected Name = "other_instance_detected"
|
||||
EventNameMediaServerStarted Name = "media_server_started"
|
||||
)
|
||||
|
||||
// Event represents something which happened in the appllication.
|
||||
type Event interface {
|
||||
name() Name
|
||||
}
|
||||
|
||||
// AppStateChangedEvent is emitted when the application state changes.
|
||||
type AppStateChangedEvent struct {
|
||||
State domain.AppState
|
||||
}
|
||||
|
||||
func (e AppStateChangedEvent) name() Name {
|
||||
return EventNameAppStateChanged
|
||||
}
|
||||
|
||||
// DestinationAddedEvent is emitted when a destination is successfully added.
|
||||
type DestinationAddedEvent struct {
|
||||
URL string
|
||||
}
|
||||
|
||||
func (e DestinationAddedEvent) name() Name {
|
||||
return EventNameDestinationAdded
|
||||
}
|
||||
|
||||
// AddDestinationFailedEvent is emitted when a destination fails to be added.
|
||||
type AddDestinationFailedEvent struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
func (e AddDestinationFailedEvent) name() Name {
|
||||
return EventNameAddDestinationFailed
|
||||
}
|
||||
|
||||
// DestinationStreamExitedEvent is emitted when a destination goes off-air unexpectedly.
|
||||
type DestinationStreamExitedEvent struct {
|
||||
Name string
|
||||
Err error
|
||||
}
|
||||
|
||||
func (e DestinationStreamExitedEvent) name() Name {
|
||||
return EventNameDestinationStreamExited
|
||||
}
|
||||
|
||||
// StartDestinationFailedEvent is emitted when a destination fails to start.
|
||||
type StartDestinationFailedEvent struct{}
|
||||
|
||||
func (e StartDestinationFailedEvent) name() Name {
|
||||
return EventNameStartDestinationFailed
|
||||
}
|
||||
|
||||
// DestinationRemovedEvent is emitted when a destination is successfully
|
||||
// removed.
|
||||
type DestinationRemovedEvent struct {
|
||||
URL string
|
||||
}
|
||||
|
||||
func (e DestinationRemovedEvent) name() Name {
|
||||
return EventNameDestinationRemoved
|
||||
}
|
||||
|
||||
// RemoveDestinationFailedEvent is emitted when a destination fails to be
|
||||
// removed.
|
||||
type RemoveDestinationFailedEvent struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
func (e RemoveDestinationFailedEvent) name() Name {
|
||||
return EventNameRemoveDestinationFailed
|
||||
}
|
||||
|
||||
// FatalErrorOccurredEvent is emitted when a fatal application
|
||||
// error occurs.
|
||||
type FatalErrorOccurredEvent struct {
|
||||
Message string
|
||||
}
|
||||
|
||||
// OtherInstanceDetectedEvent is emitted when the app launches and detects another instance.
|
||||
type OtherInstanceDetectedEvent struct{}
|
||||
|
||||
func (e OtherInstanceDetectedEvent) name() Name {
|
||||
return EventNameOtherInstanceDetected
|
||||
}
|
||||
|
||||
func (e FatalErrorOccurredEvent) name() Name {
|
||||
return "fatal_error_occurred"
|
||||
}
|
||||
|
||||
// MediaServerStartedEvent is emitted when the mediaserver component starts successfully.
|
||||
type MediaServerStartedEvent struct {
|
||||
RTMPURL string
|
||||
RTMPSURL string
|
||||
}
|
||||
|
||||
func (e MediaServerStartedEvent) name() Name {
|
||||
return "media_server_started"
|
||||
}
|
@ -42,7 +42,7 @@ const (
|
||||
// UI is responsible for managing the terminal user interface.
|
||||
type UI struct {
|
||||
eventBus *event.Bus
|
||||
commandC chan domain.Command
|
||||
dispatch func(event.Command)
|
||||
clipboardAvailable bool
|
||||
configFilePath string
|
||||
rtmpURL, rtmpsURL string
|
||||
@ -96,7 +96,7 @@ type ScreenCapture struct {
|
||||
// interface.
|
||||
type StartParams struct {
|
||||
EventBus *event.Bus
|
||||
ChanSize int
|
||||
Dispatcher func(event.Command)
|
||||
Logger *slog.Logger
|
||||
ClipboardAvailable bool
|
||||
ConfigFilePath string
|
||||
@ -104,13 +104,8 @@ type StartParams struct {
|
||||
Screen *Screen // Screen may be nil.
|
||||
}
|
||||
|
||||
const defaultChanSize = 64
|
||||
|
||||
// StartUI starts the terminal user interface.
|
||||
func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
||||
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
|
||||
commandCh := make(chan domain.Command, chanSize)
|
||||
|
||||
app := tview.NewApplication()
|
||||
|
||||
var screen tcell.Screen
|
||||
@ -213,8 +208,8 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
||||
app.EnableMouse(false)
|
||||
|
||||
ui := &UI{
|
||||
commandC: commandCh,
|
||||
eventBus: params.EventBus,
|
||||
dispatch: params.Dispatcher,
|
||||
clipboardAvailable: params.ClipboardAvailable,
|
||||
configFilePath: params.ConfigFilePath,
|
||||
buildInfo: params.BuildInfo,
|
||||
@ -271,15 +266,13 @@ func (ui *UI) renderAboutView() {
|
||||
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
|
||||
}
|
||||
|
||||
// C returns a channel that receives commands from the user interface.
|
||||
func (ui *UI) C() <-chan domain.Command {
|
||||
return ui.commandC
|
||||
}
|
||||
|
||||
func (ui *UI) run(ctx context.Context) {
|
||||
defer close(ui.commandC)
|
||||
defer func() {
|
||||
// Ensure the application is stopped when the UI is closed.
|
||||
ui.dispatch(event.CommandQuit{})
|
||||
}()
|
||||
|
||||
mediaServerStartedC := ui.eventBus.Register(event.EventNameMediaServerStarted)
|
||||
eventC := ui.eventBus.Register()
|
||||
|
||||
uiDone := make(chan struct{})
|
||||
go func() {
|
||||
@ -294,8 +287,34 @@ func (ui *UI) run(ctx context.Context) {
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt := <-mediaServerStartedC:
|
||||
ui.handleMediaServerStarted(evt.(event.MediaServerStartedEvent))
|
||||
case evt := <-eventC:
|
||||
ui.app.QueueUpdateDraw(func() {
|
||||
switch evt := evt.(type) {
|
||||
case event.AppStateChangedEvent:
|
||||
ui.handleAppStateChanged(evt)
|
||||
case event.DestinationAddedEvent:
|
||||
ui.handleDestinationAdded(evt)
|
||||
case event.StartDestinationFailedEvent:
|
||||
ui.handleStartDestinationFailed(evt)
|
||||
case event.AddDestinationFailedEvent:
|
||||
ui.handleDestinationEventError(evt.Err)
|
||||
case event.DestinationStreamExitedEvent:
|
||||
ui.handleDestinationStreamExited(evt)
|
||||
case event.DestinationRemovedEvent:
|
||||
ui.handleDestinationRemoved(evt)
|
||||
case event.RemoveDestinationFailedEvent:
|
||||
ui.handleDestinationEventError(evt.Err)
|
||||
case event.OtherInstanceDetectedEvent:
|
||||
ui.handleOtherInstanceDetected(evt)
|
||||
case event.MediaServerStartedEvent:
|
||||
ui.handleMediaServerStarted(evt)
|
||||
case event.FatalErrorOccurredEvent:
|
||||
ui.handleFatalErrorOccurred(evt)
|
||||
default:
|
||||
ui.logger.Warn("unhandled event", "event", evt)
|
||||
}
|
||||
|
||||
})
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-uiDone:
|
||||
@ -310,7 +329,7 @@ func (ui *UI) handleMediaServerStarted(evt event.MediaServerStartedEvent) {
|
||||
ui.rtmpsURL = evt.RTMPSURL
|
||||
ui.mu.Unlock()
|
||||
|
||||
ui.app.QueueUpdateDraw(ui.renderAboutView)
|
||||
ui.renderAboutView()
|
||||
}
|
||||
|
||||
func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
||||
@ -381,79 +400,59 @@ func (ui *UI) fkeyHandler(key tcell.Key) {
|
||||
}
|
||||
}
|
||||
|
||||
func (ui *UI) ShowSourceNotLiveModal() {
|
||||
ui.app.QueueUpdateDraw(func() {
|
||||
ui.showModal(
|
||||
pageNameModalNotLive,
|
||||
"Waiting for stream.\n\nStart streaming to a source URL then try again.",
|
||||
[]string{"Ok"},
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
})
|
||||
func (ui *UI) handleStartDestinationFailed(event.StartDestinationFailedEvent) {
|
||||
ui.showModal(
|
||||
pageNameModalStartDestinationFailed,
|
||||
"Waiting for stream.\n\nStart streaming to a source URL then try again.",
|
||||
[]string{"Ok"},
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
}
|
||||
|
||||
// ShowStartupCheckModal shows a modal dialog to the user, asking if they want
|
||||
// to kill a running instance of Octoplex.
|
||||
//
|
||||
// The method will block until the user has made a choice, after which the
|
||||
// channel will receive true if the user wants to quit the other instance, or
|
||||
// false to quit this instance.
|
||||
func (ui *UI) ShowStartupCheckModal() bool {
|
||||
done := make(chan bool)
|
||||
|
||||
ui.app.QueueUpdateDraw(func() {
|
||||
ui.showModal(
|
||||
pageNameModalStartupCheck,
|
||||
"Another instance of Octoplex may already be running.\n\nPressing continue will close that instance. Continue?",
|
||||
[]string{"Continue", "Exit"},
|
||||
false,
|
||||
func(buttonIndex int, _ string) {
|
||||
if buttonIndex == 0 {
|
||||
done <- true
|
||||
} else {
|
||||
done <- false
|
||||
}
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
return <-done
|
||||
func (ui *UI) handleOtherInstanceDetected(event.OtherInstanceDetectedEvent) {
|
||||
ui.showModal(
|
||||
pageNameModalStartupCheck,
|
||||
"Another instance of Octoplex may already be running.\n\nPressing continue will close that instance. Continue?",
|
||||
[]string{"Continue", "Exit"},
|
||||
false,
|
||||
func(buttonIndex int, _ string) {
|
||||
if buttonIndex == 0 {
|
||||
ui.dispatch(event.CommandCloseOtherInstance{})
|
||||
} else {
|
||||
ui.dispatch(event.CommandQuit{})
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (ui *UI) ShowDestinationErrorModal(name string, err error) {
|
||||
ui.app.QueueUpdateDraw(func() {
|
||||
ui.showModal(
|
||||
pageNameModalDestinationError,
|
||||
fmt.Sprintf(
|
||||
"Streaming to %s failed:\n\n%s",
|
||||
cmp.Or(name, "this destination"),
|
||||
err,
|
||||
),
|
||||
[]string{"Ok"},
|
||||
true,
|
||||
nil,
|
||||
)
|
||||
})
|
||||
func (ui *UI) handleDestinationStreamExited(evt event.DestinationStreamExitedEvent) {
|
||||
ui.showModal(
|
||||
pageNameModalDestinationError,
|
||||
fmt.Sprintf(
|
||||
"Streaming to %s failed:\n\n%s",
|
||||
cmp.Or(evt.Name, "this destination"),
|
||||
evt.Err,
|
||||
),
|
||||
[]string{"Ok"},
|
||||
true,
|
||||
nil,
|
||||
)
|
||||
}
|
||||
|
||||
// ShowFatalErrorModal displays the provided error. It sends a CommandQuit to the
|
||||
// command channel when the user selects the Quit button.
|
||||
func (ui *UI) ShowFatalErrorModal(errString string) {
|
||||
ui.app.QueueUpdateDraw(func() {
|
||||
ui.showModal(
|
||||
pageNameModalFatalError,
|
||||
fmt.Sprintf(
|
||||
"An error occurred:\n\n%s",
|
||||
errString,
|
||||
),
|
||||
[]string{"Quit"},
|
||||
false,
|
||||
func(int, string) {
|
||||
ui.commandC <- domain.CommandQuit{}
|
||||
},
|
||||
)
|
||||
})
|
||||
func (ui *UI) handleFatalErrorOccurred(evt event.FatalErrorOccurredEvent) {
|
||||
ui.showModal(
|
||||
pageNameModalFatalError,
|
||||
fmt.Sprintf(
|
||||
"An error occurred:\n\n%s",
|
||||
evt.Message,
|
||||
),
|
||||
[]string{"Quit"},
|
||||
false,
|
||||
func(int, string) {
|
||||
ui.dispatch(event.CommandQuit{})
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (ui *UI) afterDrawHandler(screen tcell.Screen) {
|
||||
@ -484,8 +483,9 @@ func (ui *UI) captureScreen(screen tcell.Screen) {
|
||||
}
|
||||
}
|
||||
|
||||
// SetState sets the state of the terminal user interface.
|
||||
func (ui *UI) SetState(state domain.AppState) {
|
||||
func (ui *UI) handleAppStateChanged(evt event.AppStateChangedEvent) {
|
||||
state := evt.State
|
||||
|
||||
if state.Source.ExitReason != "" {
|
||||
ui.handleMediaServerClosed(state.Source.ExitReason)
|
||||
}
|
||||
@ -500,10 +500,7 @@ func (ui *UI) SetState(state domain.AppState) {
|
||||
ui.hasDestinations = len(state.Destinations) > 0
|
||||
ui.mu.Unlock()
|
||||
|
||||
// The state is mutable so can't be passed into QueueUpdateDraw, which
|
||||
// passes it to another goroutine, without cloning it first.
|
||||
stateClone := state.Clone()
|
||||
ui.app.QueueUpdateDraw(func() { ui.redrawFromState(stateClone) })
|
||||
ui.redrawFromState(state)
|
||||
}
|
||||
|
||||
func (ui *UI) updatePullProgress(state domain.AppState) {
|
||||
@ -524,9 +521,7 @@ func (ui *UI) updatePullProgress(state domain.AppState) {
|
||||
}
|
||||
|
||||
if len(pullingContainers) == 0 {
|
||||
ui.app.QueueUpdateDraw(func() {
|
||||
ui.hideModal(pageNameModalPullProgress)
|
||||
})
|
||||
ui.hideModal(pageNameModalPullProgress)
|
||||
return
|
||||
}
|
||||
|
||||
@ -540,29 +535,27 @@ func (ui *UI) updatePullProgress(state domain.AppState) {
|
||||
}
|
||||
|
||||
func (ui *UI) updateProgressModal(container domain.Container) {
|
||||
ui.app.QueueUpdateDraw(func() {
|
||||
modalName := string(pageNameModalPullProgress)
|
||||
modalName := string(pageNameModalPullProgress)
|
||||
|
||||
var status string
|
||||
// Avoid showing the long Docker pull status in the modal content.
|
||||
if len(container.PullStatus) < 30 {
|
||||
status = container.PullStatus
|
||||
}
|
||||
var status string
|
||||
// Avoid showing the long Docker pull status in the modal content.
|
||||
if len(container.PullStatus) < 30 {
|
||||
status = container.PullStatus
|
||||
}
|
||||
|
||||
modalContent := fmt.Sprintf(
|
||||
"Pulling %s:\n%s (%d%%)\n\n%s",
|
||||
container.ImageName,
|
||||
status,
|
||||
container.PullPercent,
|
||||
container.PullProgress,
|
||||
)
|
||||
modalContent := fmt.Sprintf(
|
||||
"Pulling %s:\n%s (%d%%)\n\n%s",
|
||||
container.ImageName,
|
||||
status,
|
||||
container.PullPercent,
|
||||
container.PullProgress,
|
||||
)
|
||||
|
||||
if ui.pages.HasPage(modalName) {
|
||||
ui.pullProgressModal.SetText(modalContent)
|
||||
} else {
|
||||
ui.pages.AddPage(modalName, ui.pullProgressModal, true, true)
|
||||
}
|
||||
})
|
||||
if ui.pages.HasPage(modalName) {
|
||||
ui.pullProgressModal.SetText(modalContent)
|
||||
} else {
|
||||
ui.pages.AddPage(modalName, ui.pullProgressModal, true, true)
|
||||
}
|
||||
}
|
||||
|
||||
// page names represent a specific page in the terminal user interface.
|
||||
@ -570,21 +563,21 @@ func (ui *UI) updateProgressModal(container domain.Container) {
|
||||
// Modals should generally have a unique name, which allows them to be stacked
|
||||
// on top of other modals.
|
||||
const (
|
||||
pageNameMain = "main"
|
||||
pageNameAddDestination = "add-destination"
|
||||
pageNameViewURLs = "view-urls"
|
||||
pageNameConfigUpdateFailed = "modal-config-update-failed"
|
||||
pageNameNoDestinations = "no-destinations"
|
||||
pageNameModalAbout = "modal-about"
|
||||
pageNameModalClipboard = "modal-clipboard"
|
||||
pageNameModalDestinationError = "modal-destination-error"
|
||||
pageNameModalFatalError = "modal-fatal-error"
|
||||
pageNameModalPullProgress = "modal-pull-progress"
|
||||
pageNameModalQuit = "modal-quit"
|
||||
pageNameModalRemoveDestination = "modal-remove-destination"
|
||||
pageNameModalSourceError = "modal-source-error"
|
||||
pageNameModalStartupCheck = "modal-startup-check"
|
||||
pageNameModalNotLive = "modal-not-live"
|
||||
pageNameMain = "main"
|
||||
pageNameAddDestination = "add-destination"
|
||||
pageNameViewURLs = "view-urls"
|
||||
pageNameConfigUpdateFailed = "modal-config-update-failed"
|
||||
pageNameNoDestinations = "no-destinations"
|
||||
pageNameModalAbout = "modal-about"
|
||||
pageNameModalClipboard = "modal-clipboard"
|
||||
pageNameModalDestinationError = "modal-destination-error"
|
||||
pageNameModalFatalError = "modal-fatal-error"
|
||||
pageNameModalPullProgress = "modal-pull-progress"
|
||||
pageNameModalQuit = "modal-quit"
|
||||
pageNameModalRemoveDestination = "modal-remove-destination"
|
||||
pageNameModalSourceError = "modal-source-error"
|
||||
pageNameModalStartDestinationFailed = "modal-start-destination-failed"
|
||||
pageNameModalStartupCheck = "modal-startup-check"
|
||||
)
|
||||
|
||||
// modalVisible returns true if any modal, including the add destination form,
|
||||
@ -692,23 +685,21 @@ func (ui *UI) hideModal(pageName string) {
|
||||
}
|
||||
|
||||
func (ui *UI) handleMediaServerClosed(exitReason string) {
|
||||
ui.app.QueueUpdateDraw(func() {
|
||||
if ui.pages.HasPage(pageNameModalSourceError) {
|
||||
return
|
||||
}
|
||||
if ui.pages.HasPage(pageNameModalSourceError) {
|
||||
return
|
||||
}
|
||||
|
||||
modal := tview.NewModal()
|
||||
modal.SetText("Mediaserver error: " + exitReason).
|
||||
AddButtons([]string{"Quit"}).
|
||||
SetBackgroundColor(tcell.ColorBlack).
|
||||
SetTextColor(tcell.ColorWhite).
|
||||
SetDoneFunc(func(int, string) {
|
||||
ui.commandC <- domain.CommandQuit{}
|
||||
})
|
||||
modal.SetBorderStyle(tcell.StyleDefault.Background(tcell.ColorBlack).Foreground(tcell.ColorWhite))
|
||||
modal := tview.NewModal()
|
||||
modal.SetText("Mediaserver error: " + exitReason).
|
||||
AddButtons([]string{"Quit"}).
|
||||
SetBackgroundColor(tcell.ColorBlack).
|
||||
SetTextColor(tcell.ColorWhite).
|
||||
SetDoneFunc(func(int, string) {
|
||||
ui.dispatch(event.CommandQuit{})
|
||||
})
|
||||
modal.SetBorderStyle(tcell.StyleDefault.Background(tcell.ColorBlack).Foreground(tcell.ColorWhite))
|
||||
|
||||
ui.pages.AddPage(pageNameModalSourceError, modal, true, true)
|
||||
})
|
||||
ui.pages.AddPage(pageNameModalSourceError, modal, true, true)
|
||||
}
|
||||
|
||||
const dash = "—"
|
||||
@ -856,24 +847,6 @@ func (ui *UI) Close() {
|
||||
ui.app.Stop()
|
||||
}
|
||||
|
||||
func (ui *UI) ConfigUpdateFailed(err error) {
|
||||
ui.app.QueueUpdateDraw(func() {
|
||||
ui.showModal(
|
||||
pageNameConfigUpdateFailed,
|
||||
"Configuration update failed:\n\n"+err.Error(),
|
||||
[]string{"Ok"},
|
||||
false,
|
||||
func(int, string) {
|
||||
pageName, frontPage := ui.pages.GetFrontPage()
|
||||
if pageName != pageNameAddDestination {
|
||||
ui.logger.Warn("Unexpected page when configuration form closed", "page", pageName)
|
||||
}
|
||||
ui.app.SetFocus(frontPage)
|
||||
},
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
func (ui *UI) addDestination() {
|
||||
const (
|
||||
inputLen = 60
|
||||
@ -893,10 +866,10 @@ func (ui *UI) addDestination() {
|
||||
AddInputField(inputLabelName, "My stream", inputLen, nil, nil).
|
||||
AddInputField(inputLabelURL, "rtmp://", inputLen, nil, nil).
|
||||
AddButton("Add", func() {
|
||||
ui.commandC <- domain.CommandAddDestination{
|
||||
ui.dispatch(event.CommandAddDestination{
|
||||
DestinationName: form.GetFormItemByLabel(inputLabelName).(*tview.InputField).GetText(),
|
||||
URL: form.GetFormItemByLabel(inputLabelURL).(*tview.InputField).GetText(),
|
||||
}
|
||||
})
|
||||
}).
|
||||
AddButton("Cancel", func() {
|
||||
ui.closeAddDestinationForm()
|
||||
@ -951,30 +924,42 @@ func (ui *UI) removeDestination() {
|
||||
false,
|
||||
func(buttonIndex int, _ string) {
|
||||
if buttonIndex == 0 {
|
||||
ui.commandC <- domain.CommandRemoveDestination{URL: url}
|
||||
ui.dispatch(event.CommandRemoveDestination{URL: url})
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// DestinationAdded should be called when a new destination is added.
|
||||
func (ui *UI) DestinationAdded() {
|
||||
func (ui *UI) handleDestinationAdded(event.DestinationAddedEvent) {
|
||||
ui.mu.Lock()
|
||||
ui.hasDestinations = true
|
||||
ui.mu.Unlock()
|
||||
|
||||
ui.app.QueueUpdateDraw(func() {
|
||||
ui.pages.HidePage(pageNameNoDestinations)
|
||||
ui.closeAddDestinationForm()
|
||||
ui.selectLastDestination()
|
||||
})
|
||||
ui.pages.HidePage(pageNameNoDestinations)
|
||||
ui.closeAddDestinationForm()
|
||||
ui.selectLastDestination()
|
||||
}
|
||||
|
||||
// DestinationRemoved should be called when a destination is removed.
|
||||
func (ui *UI) DestinationRemoved() {
|
||||
func (ui *UI) handleDestinationRemoved(event.DestinationRemovedEvent) {
|
||||
ui.selectPreviousDestination()
|
||||
}
|
||||
|
||||
func (ui *UI) handleDestinationEventError(err error) {
|
||||
ui.showModal(
|
||||
pageNameConfigUpdateFailed,
|
||||
"Configuration update failed:\n\n"+err.Error(),
|
||||
[]string{"Ok"},
|
||||
false,
|
||||
func(int, string) {
|
||||
pageName, frontPage := ui.pages.GetFrontPage()
|
||||
if pageName != pageNameAddDestination {
|
||||
ui.logger.Warn("Unexpected page when configuration form closed", "page", pageName)
|
||||
}
|
||||
ui.app.SetFocus(frontPage)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (ui *UI) closeAddDestinationForm() {
|
||||
var hasDestinations bool
|
||||
ui.mu.Lock()
|
||||
@ -1015,12 +1000,12 @@ func (ui *UI) toggleDestination() {
|
||||
switch ss {
|
||||
case startStateNotStarted:
|
||||
ui.urlsToStartState[url] = startStateStarting
|
||||
ui.commandC <- domain.CommandStartDestination{URL: url}
|
||||
ui.dispatch(event.CommandStartDestination{URL: url})
|
||||
case startStateStarting:
|
||||
// do nothing
|
||||
return
|
||||
case startStateStarted:
|
||||
ui.commandC <- domain.CommandStopDestination{URL: url}
|
||||
ui.dispatch(event.CommandStopDestination{URL: url})
|
||||
}
|
||||
}
|
||||
|
||||
@ -1073,7 +1058,7 @@ func (ui *UI) confirmQuit() {
|
||||
false,
|
||||
func(buttonIndex int, _ string) {
|
||||
if buttonIndex == 0 {
|
||||
ui.commandC <- domain.CommandQuit{}
|
||||
ui.dispatch(event.CommandQuit{})
|
||||
}
|
||||
},
|
||||
)
|
||||
|
29
main.go
29
main.go
@ -97,22 +97,21 @@ func run(ctx context.Context) error {
|
||||
return fmt.Errorf("read build info: %w", err)
|
||||
}
|
||||
|
||||
return app.Run(
|
||||
ctx,
|
||||
app.RunParams{
|
||||
ConfigService: configService,
|
||||
DockerClient: dockerClient,
|
||||
ClipboardAvailable: clipboardAvailable,
|
||||
ConfigFilePath: configService.Path(),
|
||||
BuildInfo: domain.BuildInfo{
|
||||
GoVersion: buildInfo.GoVersion,
|
||||
Version: version,
|
||||
Commit: commit,
|
||||
Date: date,
|
||||
},
|
||||
Logger: logger,
|
||||
app := app.New(app.Params{
|
||||
ConfigService: configService,
|
||||
DockerClient: dockerClient,
|
||||
ClipboardAvailable: clipboardAvailable,
|
||||
ConfigFilePath: configService.Path(),
|
||||
BuildInfo: domain.BuildInfo{
|
||||
GoVersion: buildInfo.GoVersion,
|
||||
Version: version,
|
||||
Commit: commit,
|
||||
Date: date,
|
||||
},
|
||||
)
|
||||
Logger: logger,
|
||||
})
|
||||
|
||||
return app.Run(ctx)
|
||||
}
|
||||
|
||||
// editConfigFile opens the config file in the user's editor.
|
||||
|
Loading…
x
Reference in New Issue
Block a user