Compare commits

..

18 Commits

Author SHA1 Message Date
d4e136c8d6 feat: headless mode (experimental) 2025-05-06 01:19:47 +02:00
2ab48bfc93 build: update .goreleaser.yaml 2025-05-06 01:11:18 +02:00
2bdb5335dc refactor(app): add Dispatch method 2025-05-06 01:11:18 +02:00
4be90993ef doc: update README 2025-05-06 01:11:18 +02:00
d1efffc937 refactor(app): internalize dispatch channel 2025-05-06 01:11:18 +02:00
d3171c6251 refactor: extract commands from domain 2025-05-06 01:11:18 +02:00
d9dbb7fc8f doc: add godoc 2025-05-06 01:11:18 +02:00
8c4974e0c1 refactor(app): add DestinationWentOffAirEvent 2025-05-06 01:11:18 +02:00
6bcd5d05c7 fix(app): event ordering
Use a single channel per consumer, instead of one channel per
consumer/event tuple. This ensures that overall ordering of events
remains consistent, and avoids introducing subtle race conditions.
2025-05-06 01:11:18 +02:00
c6ce8be5b1 refactor(app): async startup check 2025-05-06 01:11:18 +02:00
9c16275207 refactor(app): add StartDestinationFailedEvent 2025-05-06 01:11:18 +02:00
a80e891b75 refactor(app): add destination error events 2025-05-06 01:11:18 +02:00
9e2f6649eb refactor(app): add AppStateChangedEvent 2025-05-06 01:11:18 +02:00
81679be6c3 doc: update README 2025-05-06 01:11:18 +02:00
de0ecb1f34 refactor(app): extract more events 2025-05-06 01:11:18 +02:00
d96d26c29c refactor(app): extract handleCommand 2025-05-06 01:11:18 +02:00
4a2857e310 refactor(app): add App type 2025-05-06 01:11:18 +02:00
e8be872047 chore: remove stray file 2025-05-06 01:10:33 +02:00
6 changed files with 125 additions and 78 deletions

View File

@ -40,12 +40,25 @@ brews:
system "#{bin}/octoplex -h" system "#{bin}/octoplex -h"
release: release:
draft: true
github: github:
owner: rfwatson owner: rfwatson
name: octoplex name: octoplex
changelog: changelog:
use: github use: github
groups:
- title: New Features
regexp: '^.*?feat(\([[:word:]]+\))??!?:.+$'
order: 0
- title: "Bug fixes"
regexp: '^.*?fix(\([[:word:]]+\))??!?:.+$'
order: 1
- title: "Refactorings"
regexp: '^.*?refactor(\([[:word:]]+\))??!?:.+$'
order: 2
- title: Others
order: 999
filters: filters:
exclude: exclude:
- "^doc:" - "^doc:"

View File

@ -1,9 +0,0 @@
---
when:
- event: push
steps:
- name: lint
image: koalaman/shellcheck:stable
commands:
- shellcheck $(find . -type f -name "*.sh")

View File

@ -27,6 +27,7 @@ type App struct {
dispatchC chan event.Command dispatchC chan event.Command
dockerClient container.DockerClient dockerClient container.DockerClient
screen *terminal.Screen // Screen may be nil. screen *terminal.Screen // Screen may be nil.
headless bool
clipboardAvailable bool clipboardAvailable bool
configFilePath string configFilePath string
buildInfo domain.BuildInfo buildInfo domain.BuildInfo
@ -39,6 +40,7 @@ type Params struct {
DockerClient container.DockerClient DockerClient container.DockerClient
ChanSize int ChanSize int
Screen *terminal.Screen // Screen may be nil. Screen *terminal.Screen // Screen may be nil.
Headless bool
ClipboardAvailable bool ClipboardAvailable bool
ConfigFilePath string ConfigFilePath string
BuildInfo domain.BuildInfo BuildInfo domain.BuildInfo
@ -57,6 +59,7 @@ func New(params Params) *App {
dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)), dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)),
dockerClient: params.DockerClient, dockerClient: params.DockerClient,
screen: params.Screen, screen: params.Screen,
headless: params.Headless,
clipboardAvailable: params.ClipboardAvailable, clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath, configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo, buildInfo: params.BuildInfo,
@ -75,19 +78,21 @@ func (a *App) Run(ctx context.Context) error {
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set") return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
} }
ui, err := terminal.StartUI(ctx, terminal.StartParams{ if !a.headless {
EventBus: a.eventBus, ui, err := terminal.StartUI(ctx, terminal.StartParams{
Dispatcher: func(cmd event.Command) { a.dispatchC <- cmd }, EventBus: a.eventBus,
Screen: a.screen, Dispatcher: func(cmd event.Command) { a.dispatchC <- cmd },
ClipboardAvailable: a.clipboardAvailable, Screen: a.screen,
ConfigFilePath: a.configFilePath, ClipboardAvailable: a.clipboardAvailable,
BuildInfo: a.buildInfo, ConfigFilePath: a.configFilePath,
Logger: a.logger.With("component", "ui"), BuildInfo: a.buildInfo,
}) Logger: a.logger.With("component", "ui"),
if err != nil { })
return fmt.Errorf("start terminal user interface: %w", err) if err != nil {
return fmt.Errorf("start terminal user interface: %w", err)
}
defer ui.Close()
} }
defer ui.Close()
// emptyUI is a dummy function that sets the UI state to an empty state, and // emptyUI is a dummy function that sets the UI state to an empty state, and
// re-renders the screen. // re-renders the screen.
@ -102,6 +107,19 @@ func (a *App) Run(ctx context.Context) error {
a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}}) a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}})
} }
// doFatalError publishes a fatal error to the event bus, waiting for the
// user to acknowledge it if not in headless mode.
doFatalError := func(msg string) {
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
if a.headless {
return
}
emptyUI()
<-a.dispatchC
}
containerClient, err := container.NewClient(ctx, a.dockerClient, a.logger.With("component", "container_client")) containerClient, err := container.NewClient(ctx, a.dockerClient, a.logger.With("component", "container_client"))
if err != nil { if err != nil {
err = fmt.Errorf("create container client: %w", err) err = fmt.Errorf("create container client: %w", err)
@ -112,10 +130,7 @@ func (a *App) Run(ctx context.Context) error {
} else { } else {
msg = err.Error() msg = err.Error()
} }
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg}) doFatalError(msg)
emptyUI()
<-a.dispatchC
return err return err
} }
defer containerClient.Close() defer containerClient.Close()
@ -145,9 +160,7 @@ func (a *App) Run(ctx context.Context) error {
}) })
if err != nil { if err != nil {
err = fmt.Errorf("create mediaserver: %w", err) err = fmt.Errorf("create mediaserver: %w", err)
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: err.Error()}) doFatalError(err.Error())
emptyUI()
<-a.dispatchC
return err return err
} }
defer srv.Close() defer srv.Close()
@ -164,17 +177,21 @@ func (a *App) Run(ctx context.Context) error {
defer uiUpdateT.Stop() defer uiUpdateT.Stop()
startMediaServerC := make(chan struct{}, 1) startMediaServerC := make(chan struct{}, 1)
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil { if a.headless { // disable startup check in headless mode for now
startupErr = fmt.Errorf("startup check: %w", startupErr)
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: startupErr.Error()})
<-a.dispatchC
return startupErr
} else if ok {
startMediaServerC <- struct{}{} startMediaServerC <- struct{}{}
} else {
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
doFatalError(startupErr.Error())
return startupErr
} else if ok {
startMediaServerC <- struct{}{}
}
} }
for { for {
select { select {
case <-ctx.Done():
return ctx.Err()
case <-startMediaServerC: case <-startMediaServerC:
if err = srv.Start(ctx); err != nil { if err = srv.Start(ctx); err != nil {
return fmt.Errorf("start mediaserver: %w", err) return fmt.Errorf("start mediaserver: %w", err)
@ -193,6 +210,12 @@ func (a *App) Run(ctx context.Context) error {
updateUI() updateUI()
case serverState := <-srv.C(): case serverState := <-srv.C():
a.logger.Debug("Server state received", "state", serverState) a.logger.Debug("Server state received", "state", serverState)
if serverState.ExitReason != "" {
doFatalError(serverState.ExitReason)
return errors.New("media server exited")
}
applyServerState(serverState, state) applyServerState(serverState, state)
updateUI() updateUI()
case replState := <-repl.C(): case replState := <-repl.C():

View File

@ -132,7 +132,7 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.New(app.Params{ require.Equal(t, context.Canceled, app.New(app.Params{
ConfigService: configService, ConfigService: configService,
DockerClient: dockerClient, DockerClient: dockerClient,
Screen: &terminal.Screen{ Screen: &terminal.Screen{
@ -319,7 +319,7 @@ func TestIntegrationCustomHost(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
time.Sleep(time.Second) time.Sleep(time.Second)
@ -390,7 +390,7 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -471,7 +471,7 @@ func TestIntegrationRestartDestination(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -608,7 +608,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -681,7 +681,7 @@ func TestIntegrationDestinationValidations(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -823,7 +823,7 @@ func TestIntegrationStartupCheck(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
require.EventuallyWithT( require.EventuallyWithT(
@ -892,13 +892,17 @@ func TestIntegrationMediaServerError(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) require.EqualError(
t,
app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx),
"media server exited",
)
}() }()
require.EventuallyWithT( require.EventuallyWithT(
t, t,
func(c *assert.CollectT) { func(c *assert.CollectT) {
assert.True(c, contentsIncludes(getContents(), "Mediaserver error: Server process exited unexpectedly."), "expected to see title") assert.True(c, contentsIncludes(getContents(), "Server process exited unexpectedly."), "expected to see title")
assert.True(c, contentsIncludes(getContents(), "address already in use"), "expected to see message") assert.True(c, contentsIncludes(getContents(), "address already in use"), "expected to see message")
}, },
waitTime, waitTime,
@ -1069,7 +1073,7 @@ func TestIntegrationCopyURLs(t *testing.T) {
done <- struct{}{} done <- struct{}{}
}() }()
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)) require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}() }()
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)

View File

@ -486,10 +486,6 @@ func (ui *UI) captureScreen(screen tcell.Screen) {
func (ui *UI) handleAppStateChanged(evt event.AppStateChangedEvent) { func (ui *UI) handleAppStateChanged(evt event.AppStateChangedEvent) {
state := evt.State state := evt.State
if state.Source.ExitReason != "" {
ui.handleMediaServerClosed(state.Source.ExitReason)
}
ui.updatePullProgress(state) ui.updatePullProgress(state)
ui.mu.Lock() ui.mu.Lock()
@ -684,24 +680,6 @@ func (ui *UI) hideModal(pageName string) {
ui.app.SetFocus(ui.destView) ui.app.SetFocus(ui.destView)
} }
func (ui *UI) handleMediaServerClosed(exitReason string) {
if ui.pages.HasPage(pageNameModalSourceError) {
return
}
modal := tview.NewModal()
modal.SetText("Mediaserver error: " + exitReason).
AddButtons([]string{"Quit"}).
SetBackgroundColor(tcell.ColorBlack).
SetTextColor(tcell.ColorWhite).
SetDoneFunc(func(int, string) {
ui.dispatch(event.CommandQuit{})
})
modal.SetBorderStyle(tcell.StyleDefault.Background(tcell.ColorBlack).Foreground(tcell.ColorWhite))
ui.pages.AddPage(pageNameModalSourceError, modal, true, true)
}
const dash = "—" const dash = "—"
const ( const (

62
main.go
View File

@ -3,11 +3,14 @@ package main
import ( import (
"cmp" "cmp"
"context" "context"
"errors"
"flag" "flag"
"fmt" "fmt"
"io"
"log/slog" "log/slog"
"os" "os"
"os/exec" "os/exec"
"os/signal"
"runtime/debug" "runtime/debug"
"syscall" "syscall"
@ -27,16 +30,25 @@ var (
date string date string
) )
func main() { var errShutdown = errors.New("shutdown")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := run(ctx); err != nil { func main() {
var exitStatus int
if err := run(); errors.Is(err, errShutdown) {
exitStatus = 130
} else if err != nil {
exitStatus = 1
_, _ = os.Stderr.WriteString("Error: " + err.Error() + "\n") _, _ = os.Stderr.WriteString("Error: " + err.Error() + "\n")
} }
os.Exit(exitStatus)
} }
func run(ctx context.Context) error { func run() error {
ctx, cancel := context.WithCancelCause(context.Background())
defer cancel(nil)
configService, err := config.NewDefaultService() configService, err := config.NewDefaultService()
if err != nil { if err != nil {
return fmt.Errorf("build config service: %w", err) return fmt.Errorf("build config service: %w", err)
@ -72,11 +84,26 @@ func run(ctx context.Context) error {
if err != nil { if err != nil {
return fmt.Errorf("read or create config: %w", err) return fmt.Errorf("read or create config: %w", err)
} }
logger, err := buildLogger(cfg.LogFile)
headless := os.Getenv("OCTO_HEADLESS") != ""
logger, err := buildLogger(cfg.LogFile, headless)
if err != nil { if err != nil {
return fmt.Errorf("build logger: %w", err) return fmt.Errorf("build logger: %w", err)
} }
if headless {
// When running in headless mode tview doesn't handle SIGINT for us.
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-ch
logger.Info("Received interrupt signal, exiting")
signal.Stop(ch)
cancel(errShutdown)
}()
}
var clipboardAvailable bool var clipboardAvailable bool
if err = clipboard.Init(); err != nil { if err = clipboard.Init(); err != nil {
logger.Warn("Clipboard not available", "err", err) logger.Warn("Clipboard not available", "err", err)
@ -100,6 +127,7 @@ func run(ctx context.Context) error {
app := app.New(app.Params{ app := app.New(app.Params{
ConfigService: configService, ConfigService: configService,
DockerClient: dockerClient, DockerClient: dockerClient,
Headless: headless,
ClipboardAvailable: clipboardAvailable, ClipboardAvailable: clipboardAvailable,
ConfigFilePath: configService.Path(), ConfigFilePath: configService.Path(),
BuildInfo: domain.BuildInfo{ BuildInfo: domain.BuildInfo{
@ -161,10 +189,24 @@ func printUsage() {
os.Stderr.WriteString("\n") os.Stderr.WriteString("\n")
os.Stderr.WriteString("Additionally, Octoplex can be configured with the following environment variables:\n\n") os.Stderr.WriteString("Additionally, Octoplex can be configured with the following environment variables:\n\n")
os.Stderr.WriteString(" OCTO_DEBUG Enables debug logging if set\n") os.Stderr.WriteString(" OCTO_DEBUG Enables debug logging if set\n")
os.Stderr.WriteString(" OCTO_HEADLESS Enables headless mode if set (experimental)\n\n")
} }
// buildLogger builds the logger, which may be a no-op logger. // buildLogger builds the logger, which may be a no-op logger.
func buildLogger(cfg config.LogFile) (*slog.Logger, error) { func buildLogger(cfg config.LogFile, headless bool) (*slog.Logger, error) {
build := func(w io.Writer) *slog.Logger {
var handlerOpts slog.HandlerOptions
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
return slog.New(slog.NewTextHandler(w, &handlerOpts))
}
// In headless mode, always log to stderr.
if headless {
return build(os.Stderr), nil
}
if !cfg.Enabled { if !cfg.Enabled {
return slog.New(slog.DiscardHandler), nil return slog.New(slog.DiscardHandler), nil
} }
@ -174,9 +216,5 @@ func buildLogger(cfg config.LogFile) (*slog.Logger, error) {
return nil, fmt.Errorf("error opening log file: %w", err) return nil, fmt.Errorf("error opening log file: %w", err)
} }
var handlerOpts slog.HandlerOptions return build(fptr), nil
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
return slog.New(slog.NewTextHandler(fptr, &handlerOpts)), nil
} }