Compare commits

...

18 Commits

Author SHA1 Message Date
d4e136c8d6 feat: headless mode (experimental) 2025-05-06 01:19:47 +02:00
2ab48bfc93 build: update .goreleaser.yaml 2025-05-06 01:11:18 +02:00
2bdb5335dc refactor(app): add Dispatch method 2025-05-06 01:11:18 +02:00
4be90993ef doc: update README 2025-05-06 01:11:18 +02:00
d1efffc937 refactor(app): internalize dispatch channel 2025-05-06 01:11:18 +02:00
d3171c6251 refactor: extract commands from domain 2025-05-06 01:11:18 +02:00
d9dbb7fc8f doc: add godoc 2025-05-06 01:11:18 +02:00
8c4974e0c1 refactor(app): add DestinationWentOffAirEvent 2025-05-06 01:11:18 +02:00
6bcd5d05c7 fix(app): event ordering
Use a single channel per consumer, instead of one channel per
consumer/event tuple. This ensures that overall ordering of events
remains consistent, and avoids introducing subtle race conditions.
2025-05-06 01:11:18 +02:00
c6ce8be5b1 refactor(app): async startup check 2025-05-06 01:11:18 +02:00
9c16275207 refactor(app): add StartDestinationFailedEvent 2025-05-06 01:11:18 +02:00
a80e891b75 refactor(app): add destination error events 2025-05-06 01:11:18 +02:00
9e2f6649eb refactor(app): add AppStateChangedEvent 2025-05-06 01:11:18 +02:00
81679be6c3 doc: update README 2025-05-06 01:11:18 +02:00
de0ecb1f34 refactor(app): extract more events 2025-05-06 01:11:18 +02:00
d96d26c29c refactor(app): extract handleCommand 2025-05-06 01:11:18 +02:00
4a2857e310 refactor(app): add App type 2025-05-06 01:11:18 +02:00
e8be872047 chore: remove stray file 2025-05-06 01:10:33 +02:00
15 changed files with 653 additions and 434 deletions

View File

@ -1,4 +1,4 @@
name: ci-build name: build
run-name: Building ${{ github.ref_name }} run-name: Building ${{ github.ref_name }}
on: on:
push: push:

View File

@ -1,4 +1,4 @@
name: ci-scan name: codeql
on: on:
push: push:

View File

@ -40,12 +40,25 @@ brews:
system "#{bin}/octoplex -h" system "#{bin}/octoplex -h"
release: release:
draft: true
github: github:
owner: rfwatson owner: rfwatson
name: octoplex name: octoplex
changelog: changelog:
use: github use: github
groups:
- title: New Features
regexp: '^.*?feat(\([[:word:]]+\))??!?:.+$'
order: 0
- title: "Bug fixes"
regexp: '^.*?fix(\([[:word:]]+\))??!?:.+$'
order: 1
- title: "Refactorings"
regexp: '^.*?refactor(\([[:word:]]+\))??!?:.+$'
order: 2
- title: Others
order: 999
filters: filters:
exclude: exclude:
- "^doc:" - "^doc:"

View File

@ -33,11 +33,12 @@ broadcast to localhost:1935 or localhost:1936.
## Opening a pull request ## Opening a pull request
Pull requests are welcome, but please propose significant changes in a Pull requests are welcome, please propose significant changes in a
[discussion](https://github.com/rfwatson/octoplex/discussions) first. [discussion](https://github.com/rfwatson/octoplex/discussions) first.
1. Fork the repo 1. Fork the repo
2. Make your changes, including test coverage 1. Make your changes, including test coverage
3. Push the changes to a branch 1. Run the formatter (`mise run format`)
4. Ensure the branch is passing 1. Push the changes to a branch
5. Open a pull request 1. Ensure the branch is passing
1. Open a pull request

View File

@ -1,11 +1,12 @@
# Octoplex :octopus: # Octoplex :octopus:
![build status](https://github.com/rfwatson/octoplex/actions/workflows/build.yml/badge.svg) [![build status](https://github.com/rfwatson/octoplex/actions/workflows/build.yml/badge.svg)](https://github.com/rfwatson/octoplex/actions/workflows/build.yml)
![scan status](https://github.com/rfwatson/octoplex/actions/workflows/codeql.yml/badge.svg) [![scan status](https://github.com/rfwatson/octoplex/actions/workflows/codeql.yml/badge.svg)](https://github.com/rfwatson/octoplex/actions/workflows/codeql.yml)
![GitHub Release](https://img.shields.io/github/v/release/rfwatson/octoplex) ![GitHub Release](https://img.shields.io/github/v/release/rfwatson/octoplex)
[![Go Report Card](https://goreportcard.com/badge/git.netflux.io/rob/octoplex)](https://goreportcard.com/report/git.netflux.io/rob/octoplex)
[![License: AGPL v3](https://img.shields.io/badge/License-AGPL_v3-blue.svg)](https://www.gnu.org/licenses/agpl-3.0) [![License: AGPL v3](https://img.shields.io/badge/License-AGPL_v3-blue.svg)](https://www.gnu.org/licenses/agpl-3.0)
Octoplex is a live video restreamer for the terminal. Octoplex is a Docker-native live video restreamer.
* Restream RTMP/RTMPS to unlimited destinations * Restream RTMP/RTMPS to unlimited destinations
* Broadcast using OBS or any standard tool * Broadcast using OBS or any standard tool
@ -38,8 +39,7 @@ Octoplex is a live video restreamer for the terminal.
### Docker Engine ### Docker Engine
First, make sure Docker Engine is installed. Octoplex uses Docker to manage First, ensure that Docker Engine is installed.
FFmpeg and other streaming tools.
Linux: See https://docs.docker.com/engine/install/. Linux: See https://docs.docker.com/engine/install/.
@ -160,14 +160,12 @@ localhost (`127.0.0.1`) or use `0.0.0.0` to bind to all network interfaces.
## Contributing ## Contributing
See [CONTRIBUTING.md](/CONTRIBUTING.md).
### Bug reports ### Bug reports
Open bug reports [on GitHub](https://github.com/rfwatson/octoplex/issues/new). Open bug reports [on GitHub](https://github.com/rfwatson/octoplex/issues/new).
### Pull requests
Pull requests are welcome.
## Acknowledgements ## Acknowledgements
Octoplex is built on and/or makes use of other free and open source software, Octoplex is built on and/or makes use of other free and open source software,

View File

@ -1,6 +1,7 @@
package app package app
import ( import (
"cmp"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -18,47 +19,80 @@ import (
"github.com/docker/docker/client" "github.com/docker/docker/client"
) )
// RunParams holds the parameters for running the application. // App is an instance of the app.
type RunParams struct { type App struct {
cfg config.Config
configService *config.Service
eventBus *event.Bus
dispatchC chan event.Command
dockerClient container.DockerClient
screen *terminal.Screen // Screen may be nil.
headless bool
clipboardAvailable bool
configFilePath string
buildInfo domain.BuildInfo
logger *slog.Logger
}
// Params holds the parameters for running the application.
type Params struct {
ConfigService *config.Service ConfigService *config.Service
DockerClient container.DockerClient DockerClient container.DockerClient
ChanSize int
Screen *terminal.Screen // Screen may be nil. Screen *terminal.Screen // Screen may be nil.
Headless bool
ClipboardAvailable bool ClipboardAvailable bool
ConfigFilePath string ConfigFilePath string
BuildInfo domain.BuildInfo BuildInfo domain.BuildInfo
Logger *slog.Logger Logger *slog.Logger
} }
// defaultChanSize is the default size of the dispatch channel.
const defaultChanSize = 64
// New creates a new application instance.
func New(params Params) *App {
return &App{
cfg: params.ConfigService.Current(),
configService: params.ConfigService,
eventBus: event.NewBus(params.Logger.With("component", "event_bus")),
dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)),
dockerClient: params.DockerClient,
screen: params.Screen,
headless: params.Headless,
clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo,
logger: params.Logger,
}
}
// Run starts the application, and blocks until it exits. // Run starts the application, and blocks until it exits.
func Run(ctx context.Context, params RunParams) error { func (a *App) Run(ctx context.Context) error {
logger := params.Logger
eventBus := event.NewBus(logger.With("component", "event_bus"))
// cfg is the current configuration of the application, as reflected in the
// config file.
cfg := params.ConfigService.Current()
// state is the current state of the application, as reflected in the UI. // state is the current state of the application, as reflected in the UI.
state := new(domain.AppState) state := new(domain.AppState)
applyConfig(cfg, state) applyConfig(a.cfg, state)
// Ensure there is at least one active source. // Ensure there is at least one active source.
if !cfg.Sources.MediaServer.RTMP.Enabled && !cfg.Sources.MediaServer.RTMPS.Enabled { if !a.cfg.Sources.MediaServer.RTMP.Enabled && !a.cfg.Sources.MediaServer.RTMPS.Enabled {
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set") return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
} }
if !a.headless {
ui, err := terminal.StartUI(ctx, terminal.StartParams{ ui, err := terminal.StartUI(ctx, terminal.StartParams{
EventBus: eventBus, EventBus: a.eventBus,
Screen: params.Screen, Dispatcher: func(cmd event.Command) { a.dispatchC <- cmd },
ClipboardAvailable: params.ClipboardAvailable, Screen: a.screen,
ConfigFilePath: params.ConfigFilePath, ClipboardAvailable: a.clipboardAvailable,
BuildInfo: params.BuildInfo, ConfigFilePath: a.configFilePath,
Logger: logger.With("component", "ui"), BuildInfo: a.buildInfo,
Logger: a.logger.With("component", "ui"),
}) })
if err != nil { if err != nil {
return fmt.Errorf("start terminal user interface: %w", err) return fmt.Errorf("start terminal user interface: %w", err)
} }
defer ui.Close() defer ui.Close()
}
// emptyUI is a dummy function that sets the UI state to an empty state, and // emptyUI is a dummy function that sets the UI state to an empty state, and
// re-renders the screen. // re-renders the screen.
@ -69,50 +103,64 @@ func Run(ctx context.Context, params RunParams) error {
// It is only needed for integration tests when rendering modals before the // It is only needed for integration tests when rendering modals before the
// main loop starts. It would be nice to remove this but the risk/impact on // main loop starts. It would be nice to remove this but the risk/impact on
// non-test code is pretty low. // non-test code is pretty low.
emptyUI := func() { ui.SetState(domain.AppState{}) } emptyUI := func() {
a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}})
}
containerClient, err := container.NewClient(ctx, params.DockerClient, logger.With("component", "container_client")) // doFatalError publishes a fatal error to the event bus, waiting for the
// user to acknowledge it if not in headless mode.
doFatalError := func(msg string) {
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
if a.headless {
return
}
emptyUI()
<-a.dispatchC
}
containerClient, err := container.NewClient(ctx, a.dockerClient, a.logger.With("component", "container_client"))
if err != nil { if err != nil {
err = fmt.Errorf("create container client: %w", err) err = fmt.Errorf("create container client: %w", err)
var errString string var msg string
if client.IsErrConnectionFailed(err) { if client.IsErrConnectionFailed(err) {
errString = "Could not connect to Docker. Is Docker installed and running?" msg = "Could not connect to Docker. Is Docker installed and running?"
} else { } else {
errString = err.Error() msg = err.Error()
} }
ui.ShowFatalErrorModal(errString) doFatalError(msg)
emptyUI()
<-ui.C()
return err return err
} }
defer containerClient.Close() defer containerClient.Close()
updateUI := func() { ui.SetState(*state) } updateUI := func() {
// The state is mutable so can't be passed into another goroutine
// without cloning it first.
a.eventBus.Send(event.AppStateChangedEvent{State: state.Clone()})
}
updateUI() updateUI()
var tlsCertPath, tlsKeyPath string var tlsCertPath, tlsKeyPath string
if cfg.Sources.MediaServer.TLS != nil { if a.cfg.Sources.MediaServer.TLS != nil {
tlsCertPath = cfg.Sources.MediaServer.TLS.CertPath tlsCertPath = a.cfg.Sources.MediaServer.TLS.CertPath
tlsKeyPath = cfg.Sources.MediaServer.TLS.KeyPath tlsKeyPath = a.cfg.Sources.MediaServer.TLS.KeyPath
} }
srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{ srv, err := mediaserver.NewActor(ctx, mediaserver.NewActorParams{
RTMPAddr: buildNetAddr(cfg.Sources.MediaServer.RTMP), RTMPAddr: buildNetAddr(a.cfg.Sources.MediaServer.RTMP),
RTMPSAddr: buildNetAddr(cfg.Sources.MediaServer.RTMPS), RTMPSAddr: buildNetAddr(a.cfg.Sources.MediaServer.RTMPS),
Host: cfg.Sources.MediaServer.Host, Host: a.cfg.Sources.MediaServer.Host,
TLSCertPath: tlsCertPath, TLSCertPath: tlsCertPath,
TLSKeyPath: tlsKeyPath, TLSKeyPath: tlsKeyPath,
StreamKey: mediaserver.StreamKey(cfg.Sources.MediaServer.StreamKey), StreamKey: mediaserver.StreamKey(a.cfg.Sources.MediaServer.StreamKey),
ContainerClient: containerClient, ContainerClient: containerClient,
Logger: logger.With("component", "mediaserver"), Logger: a.logger.With("component", "mediaserver"),
}) })
if err != nil { if err != nil {
err = fmt.Errorf("create mediaserver: %w", err) err = fmt.Errorf("create mediaserver: %w", err)
ui.ShowFatalErrorModal(err.Error()) doFatalError(err.Error())
emptyUI()
<-ui.C()
return err return err
} }
defer srv.Close() defer srv.Close()
@ -120,7 +168,7 @@ func Run(ctx context.Context, params RunParams) error {
repl := replicator.StartActor(ctx, replicator.StartActorParams{ repl := replicator.StartActor(ctx, replicator.StartActorParams{
SourceURL: srv.RTMPInternalURL(), SourceURL: srv.RTMPInternalURL(),
ContainerClient: containerClient, ContainerClient: containerClient,
Logger: logger.With("component", "replicator"), Logger: a.logger.With("component", "replicator"),
}) })
defer repl.Close() defer repl.Close()
@ -128,87 +176,55 @@ func Run(ctx context.Context, params RunParams) error {
uiUpdateT := time.NewTicker(uiUpdateInterval) uiUpdateT := time.NewTicker(uiUpdateInterval)
defer uiUpdateT.Stop() defer uiUpdateT.Stop()
startupCheckC := doStartupCheck(ctx, containerClient, ui.ShowStartupCheckModal) startMediaServerC := make(chan struct{}, 1)
if a.headless { // disable startup check in headless mode for now
startMediaServerC <- struct{}{}
} else {
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
doFatalError(startupErr.Error())
return startupErr
} else if ok {
startMediaServerC <- struct{}{}
}
}
for { for {
select { select {
case err := <-startupCheckC: case <-ctx.Done():
if errors.Is(err, errStartupCheckUserQuit) { return ctx.Err()
return nil case <-startMediaServerC:
} else if err != nil {
return fmt.Errorf("startup check: %w", err)
} else {
startupCheckC = nil
if err = srv.Start(ctx); err != nil { if err = srv.Start(ctx); err != nil {
return fmt.Errorf("start mediaserver: %w", err) return fmt.Errorf("start mediaserver: %w", err)
} }
eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()}) a.eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()})
} case <-a.configService.C():
case <-params.ConfigService.C():
// No-op, config updates are handled synchronously for now. // No-op, config updates are handled synchronously for now.
case cmd, ok := <-ui.C(): case cmd := <-a.dispatchC:
if !ok { if _, err := a.handleCommand(ctx, cmd, state, repl, containerClient, startMediaServerC); errors.Is(err, errExit) {
// TODO: keep UI open until all containers have closed
logger.Info("UI closed")
return nil
}
logger.Debug("Command received", "cmd", cmd.Name())
switch c := cmd.(type) {
case domain.CommandAddDestination:
newCfg := cfg
newCfg.Destinations = append(newCfg.Destinations, config.Destination{
Name: c.DestinationName,
URL: c.URL,
})
if err := params.ConfigService.SetConfig(newCfg); err != nil {
logger.Error("Config update failed", "err", err)
ui.ConfigUpdateFailed(err)
continue
}
cfg = newCfg
handleConfigUpdate(cfg, state, ui)
ui.DestinationAdded()
case domain.CommandRemoveDestination:
repl.StopDestination(c.URL) // no-op if not live
newCfg := cfg
newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool {
return dest.URL == c.URL
})
if err := params.ConfigService.SetConfig(newCfg); err != nil {
logger.Error("Config update failed", "err", err)
ui.ConfigUpdateFailed(err)
continue
}
cfg = newCfg
handleConfigUpdate(cfg, state, ui)
ui.DestinationRemoved()
case domain.CommandStartDestination:
if !state.Source.Live {
ui.ShowSourceNotLiveModal()
continue
}
repl.StartDestination(c.URL)
case domain.CommandStopDestination:
repl.StopDestination(c.URL)
case domain.CommandQuit:
return nil return nil
} else if err != nil {
return fmt.Errorf("handle command: %w", err)
} }
case <-uiUpdateT.C: case <-uiUpdateT.C:
updateUI() updateUI()
case serverState := <-srv.C(): case serverState := <-srv.C():
logger.Debug("Server state received", "state", serverState) a.logger.Debug("Server state received", "state", serverState)
if serverState.ExitReason != "" {
doFatalError(serverState.ExitReason)
return errors.New("media server exited")
}
applyServerState(serverState, state) applyServerState(serverState, state)
updateUI() updateUI()
case replState := <-repl.C(): case replState := <-repl.C():
logger.Debug("Replicator state received", "state", replState) a.logger.Debug("Replicator state received", "state", replState)
destErrors := applyReplicatorState(replState, state) destErrors := applyReplicatorState(replState, state)
for _, destError := range destErrors { for _, destError := range destErrors {
handleDestError(destError, repl, ui) a.eventBus.Send(event.DestinationStreamExitedEvent{Name: destError.name, Err: destError.err})
repl.StopDestination(destError.url)
} }
updateUI() updateUI()
@ -216,10 +232,100 @@ func Run(ctx context.Context, params RunParams) error {
} }
} }
// handleConfigUpdate applies the config to the app state, and updates the UI. type syncCommand struct {
func handleConfigUpdate(cfg config.Config, appState *domain.AppState, ui *terminal.UI) { event.Command
applyConfig(cfg, appState)
ui.SetState(*appState) done chan<- event.Event
}
// Dispatch dispatches a command to be executed synchronously.
func (a *App) Dispatch(cmd event.Command) event.Event {
ch := make(chan event.Event, 1)
a.dispatchC <- syncCommand{Command: cmd, done: ch}
return <-ch
}
// errExit is an error that indicates the app should exit.
var errExit = errors.New("exit")
// handleCommand handles an incoming command. It may return an Event which will
// already have been published to the event bus, but which is returned for the
// benefit of synchronous callers. The event may be nil. It may also publish
// other events to the event bus which are not returned. Currently the only
// error that may be returned is [errExit], which indicates to the main event
// loop that the app should exit.
func (a *App) handleCommand(
ctx context.Context,
cmd event.Command,
state *domain.AppState,
repl *replicator.Actor,
containerClient *container.Client,
startMediaServerC chan struct{},
) (evt event.Event, _ error) {
a.logger.Debug("Command received", "cmd", cmd.Name())
defer func() {
if evt != nil {
a.eventBus.Send(evt)
}
if c, ok := cmd.(syncCommand); ok {
c.done <- evt
}
}()
switch c := cmd.(type) {
case event.CommandAddDestination:
newCfg := a.cfg
newCfg.Destinations = append(newCfg.Destinations, config.Destination{
Name: c.DestinationName,
URL: c.URL,
})
if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Add destination failed", "err", err)
return event.AddDestinationFailedEvent{Err: err}, nil
}
a.cfg = newCfg
a.handleConfigUpdate(state)
a.eventBus.Send(event.DestinationAddedEvent{URL: c.URL})
case event.CommandRemoveDestination:
repl.StopDestination(c.URL) // no-op if not live
newCfg := a.cfg
newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool {
return dest.URL == c.URL
})
if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Remove destination failed", "err", err)
a.eventBus.Send(event.RemoveDestinationFailedEvent{Err: err})
break
}
a.cfg = newCfg
a.handleConfigUpdate(state)
a.eventBus.Send(event.DestinationRemovedEvent{URL: c.URL}) //nolint:gosimple
case event.CommandStartDestination:
if !state.Source.Live {
a.eventBus.Send(event.StartDestinationFailedEvent{})
break
}
repl.StartDestination(c.URL)
case event.CommandStopDestination:
repl.StopDestination(c.URL)
case event.CommandCloseOtherInstance:
if err := closeOtherInstances(ctx, containerClient); err != nil {
return nil, fmt.Errorf("close other instances: %w", err)
}
startMediaServerC <- struct{}{}
case event.CommandQuit:
return nil, errExit
}
return nil, nil
}
// handleConfigUpdate applies the config to the app state, and sends an AppStateChangedEvent.
func (a *App) handleConfigUpdate(appState *domain.AppState) {
applyConfig(a.cfg, appState)
a.eventBus.Send(event.AppStateChangedEvent{State: appState.Clone()})
} }
// applyServerState applies the current server state to the app state. // applyServerState applies the current server state to the app state.
@ -265,13 +371,6 @@ func applyReplicatorState(replState replicator.State, appState *domain.AppState)
return errorsToDisplay return errorsToDisplay
} }
// handleDestError displays a modal to the user, and stops the destination.
func handleDestError(destError destinationError, repl *replicator.Actor, ui *terminal.UI) {
ui.ShowDestinationErrorModal(destError.name, destError.err)
repl.StopDestination(destError.url)
}
// applyConfig applies the config to the app state. For now we only set the // applyConfig applies the config to the app state. For now we only set the
// destinations. // destinations.
func applyConfig(cfg config.Config, appState *domain.AppState) { func applyConfig(cfg config.Config, appState *domain.AppState) {
@ -301,40 +400,34 @@ func resolveDestinations(destinations []domain.Destination, inDestinations []con
return destinations[:len(inDestinations)] return destinations[:len(inDestinations)]
} }
var errStartupCheckUserQuit = errors.New("user quit startup check modal")
// doStartupCheck performs a startup check to see if there are any existing app // doStartupCheck performs a startup check to see if there are any existing app
// containers. // containers.
// //
// It returns a channel that will be closed, possibly after receiving an error. // It returns a bool if the check is clear. If the bool is false, then
// If the error is non-nil the app must not be started. If the error is // startup should be paused until the choice selected by the user is received
// [errStartupCheckUserQuit], the user voluntarily quit the startup check // via a command.
// modal. func doStartupCheck(ctx context.Context, containerClient *container.Client, eventBus *event.Bus) (bool, error) {
func doStartupCheck(ctx context.Context, containerClient *container.Client, showModal func() bool) <-chan error {
ch := make(chan error, 1)
go func() {
defer close(ch)
if exists, err := containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil { if exists, err := containerClient.ContainerRunning(ctx, container.AllContainers()); err != nil {
ch <- fmt.Errorf("check existing containers: %w", err) return false, fmt.Errorf("check existing containers: %w", err)
} else if exists { } else if exists {
if showModal() { eventBus.Send(event.OtherInstanceDetectedEvent{})
if err = containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil {
ch <- fmt.Errorf("remove existing containers: %w", err)
return
}
if err = containerClient.RemoveUnusedNetworks(ctx); err != nil {
ch <- fmt.Errorf("remove unused networks: %w", err)
return
}
} else {
ch <- errStartupCheckUserQuit
}
}
}()
return ch return false, nil
}
return true, nil
}
func closeOtherInstances(ctx context.Context, containerClient *container.Client) error {
if err := containerClient.RemoveContainers(ctx, container.AllContainers()); err != nil {
return fmt.Errorf("remove existing containers: %w", err)
}
if err := containerClient.RemoveUnusedNetworks(ctx); err != nil {
return fmt.Errorf("remove unused networks: %w", err)
}
return nil
} }
// buildNetAddr builds a [mediaserver.OptionalNetAddr] from the config. // buildNetAddr builds a [mediaserver.OptionalNetAddr] from the config.

View File

@ -31,10 +31,10 @@ func buildAppParams(
screen tcell.SimulationScreen, screen tcell.SimulationScreen,
screenCaptureC chan<- terminal.ScreenCapture, screenCaptureC chan<- terminal.ScreenCapture,
logger *slog.Logger, logger *slog.Logger,
) app.RunParams { ) app.Params {
t.Helper() t.Helper()
return app.RunParams{ return app.Params{
ConfigService: configService, ConfigService: configService,
DockerClient: dockerClient, DockerClient: dockerClient,
Screen: &terminal.Screen{ Screen: &terminal.Screen{
@ -150,25 +150,36 @@ func printScreen(t *testing.T, getContents func() []string, label string) {
func sendKey(t *testing.T, screen tcell.SimulationScreen, key tcell.Key, ch rune) { func sendKey(t *testing.T, screen tcell.SimulationScreen, key tcell.Key, ch rune) {
t.Helper() t.Helper()
screen.InjectKey(key, ch, tcell.ModNone) const (
time.Sleep(50 * time.Millisecond) waitTime = 50 * time.Millisecond
maxTries = 50
)
for i := 0; i < maxTries; i++ {
if err := screen.PostEvent(tcell.NewEventKey(key, ch, tcell.ModNone)); err != nil {
time.Sleep(waitTime)
} else {
return
}
}
t.Fatalf("Failed to send key event after %d tries", maxTries)
} }
func sendKeys(t *testing.T, screen tcell.SimulationScreen, keys string) { func sendKeys(t *testing.T, screen tcell.SimulationScreen, keys string) {
t.Helper() t.Helper()
screen.InjectKeyBytes([]byte(keys)) for _, ch := range keys {
time.Sleep(500 * time.Millisecond) sendKey(t, screen, tcell.KeyRune, ch)
}
} }
func sendBackspaces(t *testing.T, screen tcell.SimulationScreen, n int) { func sendBackspaces(t *testing.T, screen tcell.SimulationScreen, n int) {
t.Helper() t.Helper()
for range n { for range n {
screen.InjectKey(tcell.KeyBackspace, ' ', tcell.ModNone) sendKey(t, screen, tcell.KeyBackspace, 0)
time.Sleep(50 * time.Millisecond)
} }
time.Sleep(500 * time.Millisecond)
} }
// kickFirstRTMPConn kicks the first RTMP connection from the mediaMTX server. // kickFirstRTMPConn kicks the first RTMP connection from the mediaMTX server.

View File

@ -132,7 +132,7 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
done <- struct{}{} done <- struct{}{}
}() }()
err := app.Run(ctx, app.RunParams{ require.Equal(t, context.Canceled, app.New(app.Params{
ConfigService: configService, ConfigService: configService,
DockerClient: dockerClient, DockerClient: dockerClient,
Screen: &terminal.Screen{ Screen: &terminal.Screen{
@ -144,8 +144,7 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
ClipboardAvailable: false, ClipboardAvailable: false,
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"}, BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
Logger: logger, Logger: logger,
}) }).Run(ctx))
require.NoError(t, err)
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -182,11 +181,11 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
// Add a second destination in-app: // Add a second destination in-app:
sendKey(t, screen, tcell.KeyRune, 'a') sendKey(t, screen, tcell.KeyRune, 'a')
sendBackspaces(t, screen, 30) sendBackspaces(t, screen, 10)
sendKeys(t, screen, "Local server 2") sendKeys(t, screen, "Local server 2")
sendKey(t, screen, tcell.KeyTab, ' ') sendKey(t, screen, tcell.KeyTab, ' ')
sendBackspaces(t, screen, 30) sendBackspaces(t, screen, 10)
sendKeys(t, screen, destURL2) sendKeys(t, screen, destURL2)
sendKey(t, screen, tcell.KeyTab, ' ') sendKey(t, screen, tcell.KeyTab, ' ')
sendKey(t, screen, tcell.KeyEnter, ' ') sendKey(t, screen, tcell.KeyEnter, ' ')
@ -320,7 +319,7 @@ func TestIntegrationCustomHost(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
time.Sleep(time.Second) time.Sleep(time.Second)
@ -391,7 +390,7 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -472,7 +471,7 @@ func TestIntegrationRestartDestination(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -609,7 +608,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -682,7 +681,7 @@ func TestIntegrationDestinationValidations(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -824,7 +823,7 @@ func TestIntegrationStartupCheck(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -893,13 +892,17 @@ func TestIntegrationMediaServerError(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.EqualError(
t,
app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx),
"media server exited",
)
}() }()
require.EventuallyWithT( require.EventuallyWithT(
t, t,
func(c *assert.CollectT) { func(c *assert.CollectT) {
assert.True(c, contentsIncludes(getContents(), "Mediaserver error: Server process exited unexpectedly."), "expected to see title") assert.True(c, contentsIncludes(getContents(), "Server process exited unexpectedly."), "expected to see title")
assert.True(c, contentsIncludes(getContents(), "address already in use"), "expected to see message") assert.True(c, contentsIncludes(getContents(), "address already in use"), "expected to see message")
}, },
waitTime, waitTime,
@ -934,7 +937,7 @@ func TestIntegrationDockerClientError(t *testing.T) {
require.EqualError( require.EqualError(
t, t,
app.Run(ctx, buildAppParams(t, configService, &dockerClient, screen, screenCaptureC, logger)), app.New(buildAppParams(t, configService, &dockerClient, screen, screenCaptureC, logger)).Run(ctx),
"create container client: network create: boom", "create container client: network create: boom",
) )
}() }()
@ -974,7 +977,7 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
err := app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)) err := app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)
require.ErrorContains(t, err, "dial tcp: lookup docker.example.com") require.ErrorContains(t, err, "dial tcp: lookup docker.example.com")
require.ErrorContains(t, err, "no such host") require.ErrorContains(t, err, "no such host")
}() }()
@ -1070,7 +1073,7 @@ func TestIntegrationCopyURLs(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.Run(ctx, buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger))) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)

View File

@ -1 +0,0 @@
package domain

View File

@ -7,29 +7,9 @@ import (
const defaultChannelSize = 64 const defaultChannelSize = 64
type Name string
const (
EventNameMediaServerStarted Name = "media_server_started"
)
type Event interface {
name() Name
}
// MediaServerStartedEvent is emitted when the mediaserver component starts successfully.
type MediaServerStartedEvent struct {
RTMPURL string
RTMPSURL string
}
func (e MediaServerStartedEvent) name() Name {
return "media_server_started"
}
// Bus is an event bus. // Bus is an event bus.
type Bus struct { type Bus struct {
consumers map[Name][]chan Event consumers []chan Event
mu sync.Mutex mu sync.Mutex
logger *slog.Logger logger *slog.Logger
} }
@ -37,18 +17,17 @@ type Bus struct {
// NewBus returns a new event bus. // NewBus returns a new event bus.
func NewBus(logger *slog.Logger) *Bus { func NewBus(logger *slog.Logger) *Bus {
return &Bus{ return &Bus{
consumers: make(map[Name][]chan Event),
logger: logger, logger: logger,
} }
} }
// Register registers a consumer for a given event. // Register registers a consumer for all events.
func (b *Bus) Register(name Name) <-chan Event { func (b *Bus) Register() <-chan Event {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
ch := make(chan Event, defaultChannelSize) ch := make(chan Event, defaultChannelSize)
b.consumers[name] = append(b.consumers[name], ch) b.consumers = append(b.consumers, ch)
return ch return ch
} }
@ -60,7 +39,7 @@ func (b *Bus) Send(evt Event) {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
for _, ch := range b.consumers[evt.name()] { for _, ch := range b.consumers {
select { select {
case ch <- evt: case ch <- evt:
default: default:

View File

@ -11,8 +11,8 @@ import (
func TestBus(t *testing.T) { func TestBus(t *testing.T) {
bus := event.NewBus(testhelpers.NewTestLogger(t)) bus := event.NewBus(testhelpers.NewTestLogger(t))
ch1 := bus.Register(event.EventNameMediaServerStarted) ch1 := bus.Register()
ch2 := bus.Register(event.EventNameMediaServerStarted) ch2 := bus.Register()
evt := event.MediaServerStartedEvent{ evt := event.MediaServerStartedEvent{
RTMPURL: "rtmp://rtmp.example.com/live", RTMPURL: "rtmp://rtmp.example.com/live",

View File

@ -1,4 +1,4 @@
package domain package event
// CommandAddDestination adds a destination. // CommandAddDestination adds a destination.
type CommandAddDestination struct { type CommandAddDestination struct {
@ -41,6 +41,14 @@ func (c CommandStopDestination) Name() string {
return "stop_destination" return "stop_destination"
} }
// CommandCloseOtherInstance closes the other instance of the application.
type CommandCloseOtherInstance struct{}
// Name implements the Command interface.
func (c CommandCloseOtherInstance) Name() string {
return "close_other_instance"
}
// CommandQuit quits the app. // CommandQuit quits the app.
type CommandQuit struct{} type CommandQuit struct{}

114
internal/event/events.go Normal file
View File

@ -0,0 +1,114 @@
package event
import "git.netflux.io/rob/octoplex/internal/domain"
type Name string
const (
EventNameAppStateChanged Name = "app_state_changed"
EventNameDestinationAdded Name = "destination_added"
EventNameAddDestinationFailed Name = "add_destination_failed"
EventNameDestinationStreamExited Name = "destination_stream_exited"
EventNameStartDestinationFailed Name = "start_destination_failed"
EventNameDestinationRemoved Name = "destination_removed"
EventNameRemoveDestinationFailed Name = "remove_destination_failed"
EventNameFatalErrorOccurred Name = "fatal_error_occurred"
EventNameOtherInstanceDetected Name = "other_instance_detected"
EventNameMediaServerStarted Name = "media_server_started"
)
// Event represents something which happened in the appllication.
type Event interface {
name() Name
}
// AppStateChangedEvent is emitted when the application state changes.
type AppStateChangedEvent struct {
State domain.AppState
}
func (e AppStateChangedEvent) name() Name {
return EventNameAppStateChanged
}
// DestinationAddedEvent is emitted when a destination is successfully added.
type DestinationAddedEvent struct {
URL string
}
func (e DestinationAddedEvent) name() Name {
return EventNameDestinationAdded
}
// AddDestinationFailedEvent is emitted when a destination fails to be added.
type AddDestinationFailedEvent struct {
Err error
}
func (e AddDestinationFailedEvent) name() Name {
return EventNameAddDestinationFailed
}
// DestinationStreamExitedEvent is emitted when a destination goes off-air unexpectedly.
type DestinationStreamExitedEvent struct {
Name string
Err error
}
func (e DestinationStreamExitedEvent) name() Name {
return EventNameDestinationStreamExited
}
// StartDestinationFailedEvent is emitted when a destination fails to start.
type StartDestinationFailedEvent struct{}
func (e StartDestinationFailedEvent) name() Name {
return EventNameStartDestinationFailed
}
// DestinationRemovedEvent is emitted when a destination is successfully
// removed.
type DestinationRemovedEvent struct {
URL string
}
func (e DestinationRemovedEvent) name() Name {
return EventNameDestinationRemoved
}
// RemoveDestinationFailedEvent is emitted when a destination fails to be
// removed.
type RemoveDestinationFailedEvent struct {
Err error
}
func (e RemoveDestinationFailedEvent) name() Name {
return EventNameRemoveDestinationFailed
}
// FatalErrorOccurredEvent is emitted when a fatal application
// error occurs.
type FatalErrorOccurredEvent struct {
Message string
}
// OtherInstanceDetectedEvent is emitted when the app launches and detects another instance.
type OtherInstanceDetectedEvent struct{}
func (e OtherInstanceDetectedEvent) name() Name {
return EventNameOtherInstanceDetected
}
func (e FatalErrorOccurredEvent) name() Name {
return "fatal_error_occurred"
}
// MediaServerStartedEvent is emitted when the mediaserver component starts successfully.
type MediaServerStartedEvent struct {
RTMPURL string
RTMPSURL string
}
func (e MediaServerStartedEvent) name() Name {
return "media_server_started"
}

View File

@ -42,7 +42,7 @@ const (
// UI is responsible for managing the terminal user interface. // UI is responsible for managing the terminal user interface.
type UI struct { type UI struct {
eventBus *event.Bus eventBus *event.Bus
commandC chan domain.Command dispatch func(event.Command)
clipboardAvailable bool clipboardAvailable bool
configFilePath string configFilePath string
rtmpURL, rtmpsURL string rtmpURL, rtmpsURL string
@ -96,7 +96,7 @@ type ScreenCapture struct {
// interface. // interface.
type StartParams struct { type StartParams struct {
EventBus *event.Bus EventBus *event.Bus
ChanSize int Dispatcher func(event.Command)
Logger *slog.Logger Logger *slog.Logger
ClipboardAvailable bool ClipboardAvailable bool
ConfigFilePath string ConfigFilePath string
@ -104,13 +104,8 @@ type StartParams struct {
Screen *Screen // Screen may be nil. Screen *Screen // Screen may be nil.
} }
const defaultChanSize = 64
// StartUI starts the terminal user interface. // StartUI starts the terminal user interface.
func StartUI(ctx context.Context, params StartParams) (*UI, error) { func StartUI(ctx context.Context, params StartParams) (*UI, error) {
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
commandCh := make(chan domain.Command, chanSize)
app := tview.NewApplication() app := tview.NewApplication()
var screen tcell.Screen var screen tcell.Screen
@ -213,8 +208,8 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
app.EnableMouse(false) app.EnableMouse(false)
ui := &UI{ ui := &UI{
commandC: commandCh,
eventBus: params.EventBus, eventBus: params.EventBus,
dispatch: params.Dispatcher,
clipboardAvailable: params.ClipboardAvailable, clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath, configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo, buildInfo: params.BuildInfo,
@ -271,15 +266,13 @@ func (ui *UI) renderAboutView() {
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false) ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
} }
// C returns a channel that receives commands from the user interface.
func (ui *UI) C() <-chan domain.Command {
return ui.commandC
}
func (ui *UI) run(ctx context.Context) { func (ui *UI) run(ctx context.Context) {
defer close(ui.commandC) defer func() {
// Ensure the application is stopped when the UI is closed.
ui.dispatch(event.CommandQuit{})
}()
mediaServerStartedC := ui.eventBus.Register(event.EventNameMediaServerStarted) eventC := ui.eventBus.Register()
uiDone := make(chan struct{}) uiDone := make(chan struct{})
go func() { go func() {
@ -294,8 +287,34 @@ func (ui *UI) run(ctx context.Context) {
for { for {
select { select {
case evt := <-mediaServerStartedC: case evt := <-eventC:
ui.handleMediaServerStarted(evt.(event.MediaServerStartedEvent)) ui.app.QueueUpdateDraw(func() {
switch evt := evt.(type) {
case event.AppStateChangedEvent:
ui.handleAppStateChanged(evt)
case event.DestinationAddedEvent:
ui.handleDestinationAdded(evt)
case event.StartDestinationFailedEvent:
ui.handleStartDestinationFailed(evt)
case event.AddDestinationFailedEvent:
ui.handleDestinationEventError(evt.Err)
case event.DestinationStreamExitedEvent:
ui.handleDestinationStreamExited(evt)
case event.DestinationRemovedEvent:
ui.handleDestinationRemoved(evt)
case event.RemoveDestinationFailedEvent:
ui.handleDestinationEventError(evt.Err)
case event.OtherInstanceDetectedEvent:
ui.handleOtherInstanceDetected(evt)
case event.MediaServerStartedEvent:
ui.handleMediaServerStarted(evt)
case event.FatalErrorOccurredEvent:
ui.handleFatalErrorOccurred(evt)
default:
ui.logger.Warn("unhandled event", "event", evt)
}
})
case <-ctx.Done(): case <-ctx.Done():
return return
case <-uiDone: case <-uiDone:
@ -310,7 +329,7 @@ func (ui *UI) handleMediaServerStarted(evt event.MediaServerStartedEvent) {
ui.rtmpsURL = evt.RTMPSURL ui.rtmpsURL = evt.RTMPSURL
ui.mu.Unlock() ui.mu.Unlock()
ui.app.QueueUpdateDraw(ui.renderAboutView) ui.renderAboutView()
} }
func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey { func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
@ -381,28 +400,17 @@ func (ui *UI) fkeyHandler(key tcell.Key) {
} }
} }
func (ui *UI) ShowSourceNotLiveModal() { func (ui *UI) handleStartDestinationFailed(event.StartDestinationFailedEvent) {
ui.app.QueueUpdateDraw(func() {
ui.showModal( ui.showModal(
pageNameModalNotLive, pageNameModalStartDestinationFailed,
"Waiting for stream.\n\nStart streaming to a source URL then try again.", "Waiting for stream.\n\nStart streaming to a source URL then try again.",
[]string{"Ok"}, []string{"Ok"},
false, false,
nil, nil,
) )
})
} }
// ShowStartupCheckModal shows a modal dialog to the user, asking if they want func (ui *UI) handleOtherInstanceDetected(event.OtherInstanceDetectedEvent) {
// to kill a running instance of Octoplex.
//
// The method will block until the user has made a choice, after which the
// channel will receive true if the user wants to quit the other instance, or
// false to quit this instance.
func (ui *UI) ShowStartupCheckModal() bool {
done := make(chan bool)
ui.app.QueueUpdateDraw(func() {
ui.showModal( ui.showModal(
pageNameModalStartupCheck, pageNameModalStartupCheck,
"Another instance of Octoplex may already be running.\n\nPressing continue will close that instance. Continue?", "Another instance of Octoplex may already be running.\n\nPressing continue will close that instance. Continue?",
@ -410,50 +418,41 @@ func (ui *UI) ShowStartupCheckModal() bool {
false, false,
func(buttonIndex int, _ string) { func(buttonIndex int, _ string) {
if buttonIndex == 0 { if buttonIndex == 0 {
done <- true ui.dispatch(event.CommandCloseOtherInstance{})
} else { } else {
done <- false ui.dispatch(event.CommandQuit{})
} }
}, },
) )
})
return <-done
} }
func (ui *UI) ShowDestinationErrorModal(name string, err error) { func (ui *UI) handleDestinationStreamExited(evt event.DestinationStreamExitedEvent) {
ui.app.QueueUpdateDraw(func() {
ui.showModal( ui.showModal(
pageNameModalDestinationError, pageNameModalDestinationError,
fmt.Sprintf( fmt.Sprintf(
"Streaming to %s failed:\n\n%s", "Streaming to %s failed:\n\n%s",
cmp.Or(name, "this destination"), cmp.Or(evt.Name, "this destination"),
err, evt.Err,
), ),
[]string{"Ok"}, []string{"Ok"},
true, true,
nil, nil,
) )
})
} }
// ShowFatalErrorModal displays the provided error. It sends a CommandQuit to the func (ui *UI) handleFatalErrorOccurred(evt event.FatalErrorOccurredEvent) {
// command channel when the user selects the Quit button.
func (ui *UI) ShowFatalErrorModal(errString string) {
ui.app.QueueUpdateDraw(func() {
ui.showModal( ui.showModal(
pageNameModalFatalError, pageNameModalFatalError,
fmt.Sprintf( fmt.Sprintf(
"An error occurred:\n\n%s", "An error occurred:\n\n%s",
errString, evt.Message,
), ),
[]string{"Quit"}, []string{"Quit"},
false, false,
func(int, string) { func(int, string) {
ui.commandC <- domain.CommandQuit{} ui.dispatch(event.CommandQuit{})
}, },
) )
})
} }
func (ui *UI) afterDrawHandler(screen tcell.Screen) { func (ui *UI) afterDrawHandler(screen tcell.Screen) {
@ -484,11 +483,8 @@ func (ui *UI) captureScreen(screen tcell.Screen) {
} }
} }
// SetState sets the state of the terminal user interface. func (ui *UI) handleAppStateChanged(evt event.AppStateChangedEvent) {
func (ui *UI) SetState(state domain.AppState) { state := evt.State
if state.Source.ExitReason != "" {
ui.handleMediaServerClosed(state.Source.ExitReason)
}
ui.updatePullProgress(state) ui.updatePullProgress(state)
@ -500,10 +496,7 @@ func (ui *UI) SetState(state domain.AppState) {
ui.hasDestinations = len(state.Destinations) > 0 ui.hasDestinations = len(state.Destinations) > 0
ui.mu.Unlock() ui.mu.Unlock()
// The state is mutable so can't be passed into QueueUpdateDraw, which ui.redrawFromState(state)
// passes it to another goroutine, without cloning it first.
stateClone := state.Clone()
ui.app.QueueUpdateDraw(func() { ui.redrawFromState(stateClone) })
} }
func (ui *UI) updatePullProgress(state domain.AppState) { func (ui *UI) updatePullProgress(state domain.AppState) {
@ -524,9 +517,7 @@ func (ui *UI) updatePullProgress(state domain.AppState) {
} }
if len(pullingContainers) == 0 { if len(pullingContainers) == 0 {
ui.app.QueueUpdateDraw(func() {
ui.hideModal(pageNameModalPullProgress) ui.hideModal(pageNameModalPullProgress)
})
return return
} }
@ -540,7 +531,6 @@ func (ui *UI) updatePullProgress(state domain.AppState) {
} }
func (ui *UI) updateProgressModal(container domain.Container) { func (ui *UI) updateProgressModal(container domain.Container) {
ui.app.QueueUpdateDraw(func() {
modalName := string(pageNameModalPullProgress) modalName := string(pageNameModalPullProgress)
var status string var status string
@ -562,7 +552,6 @@ func (ui *UI) updateProgressModal(container domain.Container) {
} else { } else {
ui.pages.AddPage(modalName, ui.pullProgressModal, true, true) ui.pages.AddPage(modalName, ui.pullProgressModal, true, true)
} }
})
} }
// page names represent a specific page in the terminal user interface. // page names represent a specific page in the terminal user interface.
@ -583,8 +572,8 @@ const (
pageNameModalQuit = "modal-quit" pageNameModalQuit = "modal-quit"
pageNameModalRemoveDestination = "modal-remove-destination" pageNameModalRemoveDestination = "modal-remove-destination"
pageNameModalSourceError = "modal-source-error" pageNameModalSourceError = "modal-source-error"
pageNameModalStartDestinationFailed = "modal-start-destination-failed"
pageNameModalStartupCheck = "modal-startup-check" pageNameModalStartupCheck = "modal-startup-check"
pageNameModalNotLive = "modal-not-live"
) )
// modalVisible returns true if any modal, including the add destination form, // modalVisible returns true if any modal, including the add destination form,
@ -691,26 +680,6 @@ func (ui *UI) hideModal(pageName string) {
ui.app.SetFocus(ui.destView) ui.app.SetFocus(ui.destView)
} }
func (ui *UI) handleMediaServerClosed(exitReason string) {
ui.app.QueueUpdateDraw(func() {
if ui.pages.HasPage(pageNameModalSourceError) {
return
}
modal := tview.NewModal()
modal.SetText("Mediaserver error: " + exitReason).
AddButtons([]string{"Quit"}).
SetBackgroundColor(tcell.ColorBlack).
SetTextColor(tcell.ColorWhite).
SetDoneFunc(func(int, string) {
ui.commandC <- domain.CommandQuit{}
})
modal.SetBorderStyle(tcell.StyleDefault.Background(tcell.ColorBlack).Foreground(tcell.ColorWhite))
ui.pages.AddPage(pageNameModalSourceError, modal, true, true)
})
}
const dash = "—" const dash = "—"
const ( const (
@ -856,24 +825,6 @@ func (ui *UI) Close() {
ui.app.Stop() ui.app.Stop()
} }
func (ui *UI) ConfigUpdateFailed(err error) {
ui.app.QueueUpdateDraw(func() {
ui.showModal(
pageNameConfigUpdateFailed,
"Configuration update failed:\n\n"+err.Error(),
[]string{"Ok"},
false,
func(int, string) {
pageName, frontPage := ui.pages.GetFrontPage()
if pageName != pageNameAddDestination {
ui.logger.Warn("Unexpected page when configuration form closed", "page", pageName)
}
ui.app.SetFocus(frontPage)
},
)
})
}
func (ui *UI) addDestination() { func (ui *UI) addDestination() {
const ( const (
inputLen = 60 inputLen = 60
@ -893,10 +844,10 @@ func (ui *UI) addDestination() {
AddInputField(inputLabelName, "My stream", inputLen, nil, nil). AddInputField(inputLabelName, "My stream", inputLen, nil, nil).
AddInputField(inputLabelURL, "rtmp://", inputLen, nil, nil). AddInputField(inputLabelURL, "rtmp://", inputLen, nil, nil).
AddButton("Add", func() { AddButton("Add", func() {
ui.commandC <- domain.CommandAddDestination{ ui.dispatch(event.CommandAddDestination{
DestinationName: form.GetFormItemByLabel(inputLabelName).(*tview.InputField).GetText(), DestinationName: form.GetFormItemByLabel(inputLabelName).(*tview.InputField).GetText(),
URL: form.GetFormItemByLabel(inputLabelURL).(*tview.InputField).GetText(), URL: form.GetFormItemByLabel(inputLabelURL).(*tview.InputField).GetText(),
} })
}). }).
AddButton("Cancel", func() { AddButton("Cancel", func() {
ui.closeAddDestinationForm() ui.closeAddDestinationForm()
@ -951,30 +902,42 @@ func (ui *UI) removeDestination() {
false, false,
func(buttonIndex int, _ string) { func(buttonIndex int, _ string) {
if buttonIndex == 0 { if buttonIndex == 0 {
ui.commandC <- domain.CommandRemoveDestination{URL: url} ui.dispatch(event.CommandRemoveDestination{URL: url})
} }
}, },
) )
} }
// DestinationAdded should be called when a new destination is added. func (ui *UI) handleDestinationAdded(event.DestinationAddedEvent) {
func (ui *UI) DestinationAdded() {
ui.mu.Lock() ui.mu.Lock()
ui.hasDestinations = true ui.hasDestinations = true
ui.mu.Unlock() ui.mu.Unlock()
ui.app.QueueUpdateDraw(func() {
ui.pages.HidePage(pageNameNoDestinations) ui.pages.HidePage(pageNameNoDestinations)
ui.closeAddDestinationForm() ui.closeAddDestinationForm()
ui.selectLastDestination() ui.selectLastDestination()
})
} }
// DestinationRemoved should be called when a destination is removed. func (ui *UI) handleDestinationRemoved(event.DestinationRemovedEvent) {
func (ui *UI) DestinationRemoved() {
ui.selectPreviousDestination() ui.selectPreviousDestination()
} }
func (ui *UI) handleDestinationEventError(err error) {
ui.showModal(
pageNameConfigUpdateFailed,
"Configuration update failed:\n\n"+err.Error(),
[]string{"Ok"},
false,
func(int, string) {
pageName, frontPage := ui.pages.GetFrontPage()
if pageName != pageNameAddDestination {
ui.logger.Warn("Unexpected page when configuration form closed", "page", pageName)
}
ui.app.SetFocus(frontPage)
},
)
}
func (ui *UI) closeAddDestinationForm() { func (ui *UI) closeAddDestinationForm() {
var hasDestinations bool var hasDestinations bool
ui.mu.Lock() ui.mu.Lock()
@ -1015,12 +978,12 @@ func (ui *UI) toggleDestination() {
switch ss { switch ss {
case startStateNotStarted: case startStateNotStarted:
ui.urlsToStartState[url] = startStateStarting ui.urlsToStartState[url] = startStateStarting
ui.commandC <- domain.CommandStartDestination{URL: url} ui.dispatch(event.CommandStartDestination{URL: url})
case startStateStarting: case startStateStarting:
// do nothing // do nothing
return return
case startStateStarted: case startStateStarted:
ui.commandC <- domain.CommandStopDestination{URL: url} ui.dispatch(event.CommandStopDestination{URL: url})
} }
} }
@ -1073,7 +1036,7 @@ func (ui *UI) confirmQuit() {
false, false,
func(buttonIndex int, _ string) { func(buttonIndex int, _ string) {
if buttonIndex == 0 { if buttonIndex == 0 {
ui.commandC <- domain.CommandQuit{} ui.dispatch(event.CommandQuit{})
} }
}, },
) )

71
main.go
View File

@ -3,11 +3,14 @@ package main
import ( import (
"cmp" "cmp"
"context" "context"
"errors"
"flag" "flag"
"fmt" "fmt"
"io"
"log/slog" "log/slog"
"os" "os"
"os/exec" "os/exec"
"os/signal"
"runtime/debug" "runtime/debug"
"syscall" "syscall"
@ -27,16 +30,25 @@ var (
date string date string
) )
func main() { var errShutdown = errors.New("shutdown")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := run(ctx); err != nil { func main() {
var exitStatus int
if err := run(); errors.Is(err, errShutdown) {
exitStatus = 130
} else if err != nil {
exitStatus = 1
_, _ = os.Stderr.WriteString("Error: " + err.Error() + "\n") _, _ = os.Stderr.WriteString("Error: " + err.Error() + "\n")
} }
os.Exit(exitStatus)
} }
func run(ctx context.Context) error { func run() error {
ctx, cancel := context.WithCancelCause(context.Background())
defer cancel(nil)
configService, err := config.NewDefaultService() configService, err := config.NewDefaultService()
if err != nil { if err != nil {
return fmt.Errorf("build config service: %w", err) return fmt.Errorf("build config service: %w", err)
@ -72,11 +84,26 @@ func run(ctx context.Context) error {
if err != nil { if err != nil {
return fmt.Errorf("read or create config: %w", err) return fmt.Errorf("read or create config: %w", err)
} }
logger, err := buildLogger(cfg.LogFile)
headless := os.Getenv("OCTO_HEADLESS") != ""
logger, err := buildLogger(cfg.LogFile, headless)
if err != nil { if err != nil {
return fmt.Errorf("build logger: %w", err) return fmt.Errorf("build logger: %w", err)
} }
if headless {
// When running in headless mode tview doesn't handle SIGINT for us.
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-ch
logger.Info("Received interrupt signal, exiting")
signal.Stop(ch)
cancel(errShutdown)
}()
}
var clipboardAvailable bool var clipboardAvailable bool
if err = clipboard.Init(); err != nil { if err = clipboard.Init(); err != nil {
logger.Warn("Clipboard not available", "err", err) logger.Warn("Clipboard not available", "err", err)
@ -97,11 +124,10 @@ func run(ctx context.Context) error {
return fmt.Errorf("read build info: %w", err) return fmt.Errorf("read build info: %w", err)
} }
return app.Run( app := app.New(app.Params{
ctx,
app.RunParams{
ConfigService: configService, ConfigService: configService,
DockerClient: dockerClient, DockerClient: dockerClient,
Headless: headless,
ClipboardAvailable: clipboardAvailable, ClipboardAvailable: clipboardAvailable,
ConfigFilePath: configService.Path(), ConfigFilePath: configService.Path(),
BuildInfo: domain.BuildInfo{ BuildInfo: domain.BuildInfo{
@ -111,8 +137,9 @@ func run(ctx context.Context) error {
Date: date, Date: date,
}, },
Logger: logger, Logger: logger,
}, })
)
return app.Run(ctx)
} }
// editConfigFile opens the config file in the user's editor. // editConfigFile opens the config file in the user's editor.
@ -162,10 +189,24 @@ func printUsage() {
os.Stderr.WriteString("\n") os.Stderr.WriteString("\n")
os.Stderr.WriteString("Additionally, Octoplex can be configured with the following environment variables:\n\n") os.Stderr.WriteString("Additionally, Octoplex can be configured with the following environment variables:\n\n")
os.Stderr.WriteString(" OCTO_DEBUG Enables debug logging if set\n") os.Stderr.WriteString(" OCTO_DEBUG Enables debug logging if set\n")
os.Stderr.WriteString(" OCTO_HEADLESS Enables headless mode if set (experimental)\n\n")
} }
// buildLogger builds the logger, which may be a no-op logger. // buildLogger builds the logger, which may be a no-op logger.
func buildLogger(cfg config.LogFile) (*slog.Logger, error) { func buildLogger(cfg config.LogFile, headless bool) (*slog.Logger, error) {
build := func(w io.Writer) *slog.Logger {
var handlerOpts slog.HandlerOptions
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
return slog.New(slog.NewTextHandler(w, &handlerOpts))
}
// In headless mode, always log to stderr.
if headless {
return build(os.Stderr), nil
}
if !cfg.Enabled { if !cfg.Enabled {
return slog.New(slog.DiscardHandler), nil return slog.New(slog.DiscardHandler), nil
} }
@ -175,9 +216,5 @@ func buildLogger(cfg config.LogFile) (*slog.Logger, error) {
return nil, fmt.Errorf("error opening log file: %w", err) return nil, fmt.Errorf("error opening log file: %w", err)
} }
var handlerOpts slog.HandlerOptions return build(fptr), nil
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
return slog.New(slog.NewTextHandler(fptr, &handlerOpts)), nil
} }