Compare commits

..

20 Commits

Author SHA1 Message Date
7566c207b0 fixup! build: add woodpecker
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
2025-05-04 20:13:18 +02:00
53d52b3673 fixup! build: add woodpecker 2025-05-04 19:49:19 +02:00
3e2954f33b fixup! build: add woodpecker 2025-05-04 19:48:08 +02:00
586a376405 build: add woodpecker 2025-05-04 19:44:42 +02:00
750e9432be refactor(app): add Dispatch method 2025-04-30 22:17:19 +02:00
d313c1e020 doc: update README 2025-04-29 22:39:46 +02:00
812d3901d3 refactor(app): internalize dispatch channel 2025-04-29 22:37:05 +02:00
caa543703e refactor: extract commands from domain 2025-04-28 06:32:00 +02:00
8403d751b6 doc: add godoc
Some checks failed
ci-build / lint (push) Has been cancelled
ci-scan / Analyze (go) (push) Has been cancelled
ci-scan / Analyze (actions) (push) Has been cancelled
ci-build / build (push) Has been cancelled
ci-build / release (push) Has been cancelled
2025-04-25 19:19:54 +02:00
2f1cadcf40 refactor(app): add DestinationWentOffAirEvent 2025-04-25 18:10:22 +02:00
1f4a931903 fix(app): event ordering
Some checks are pending
ci-build / lint (push) Waiting to run
ci-build / build (push) Blocked by required conditions
ci-build / release (push) Blocked by required conditions
ci-scan / Analyze (go) (push) Waiting to run
ci-scan / Analyze (actions) (push) Waiting to run
Use a single channel per consumer, instead of one channel per
consumer/event tuple. This ensures that overall ordering of events
remains consistent, and avoids introducing subtle race conditions.
2025-04-25 17:42:49 +02:00
94623248c0 refactor(app): async startup check 2025-04-25 17:42:49 +02:00
4a16780915 fixup! refactor: add event bus 2025-04-25 04:46:32 +02:00
3019387f38 refactor(app): add StartDestinationFailedEvent 2025-04-25 04:44:36 +02:00
f4021a2886 refactor(app): add destination error events 2025-04-24 21:45:53 +02:00
b8550f050b refactor(app): add AppStateChangedEvent 2025-04-24 21:45:53 +02:00
c02a66202f doc: update README 2025-04-24 21:45:53 +02:00
4029c66a4a refactor(app): extract more events 2025-04-24 21:45:53 +02:00
cdf41e47c3 refactor(app): extract handleCommand 2025-04-22 16:28:38 +02:00
d7f8fb49eb refactor(app): add App type 2025-04-22 16:06:37 +02:00
6 changed files with 77 additions and 124 deletions

View File

@ -40,25 +40,12 @@ brews:
system "#{bin}/octoplex -h"
release:
draft: true
github:
owner: rfwatson
name: octoplex
changelog:
use: github
groups:
- title: New Features
regexp: '^.*?feat(\([[:word:]]+\))??!?:.+$'
order: 0
- title: "Bug fixes"
regexp: '^.*?fix(\([[:word:]]+\))??!?:.+$'
order: 1
- title: "Refactorings"
regexp: '^.*?refactor(\([[:word:]]+\))??!?:.+$'
order: 2
- title: Others
order: 999
filters:
exclude:
- "^doc:"

9
.woodpecker.yml Normal file
View File

@ -0,0 +1,9 @@
---
when:
- event: push
steps:
- name: lint
image: koalaman/shellcheck:stable
commands:
- shellcheck $(find . -type f -name "*.sh")

View File

@ -27,7 +27,6 @@ type App struct {
dispatchC chan event.Command
dockerClient container.DockerClient
screen *terminal.Screen // Screen may be nil.
headless bool
clipboardAvailable bool
configFilePath string
buildInfo domain.BuildInfo
@ -40,7 +39,6 @@ type Params struct {
DockerClient container.DockerClient
ChanSize int
Screen *terminal.Screen // Screen may be nil.
Headless bool
ClipboardAvailable bool
ConfigFilePath string
BuildInfo domain.BuildInfo
@ -59,7 +57,6 @@ func New(params Params) *App {
dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)),
dockerClient: params.DockerClient,
screen: params.Screen,
headless: params.Headless,
clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo,
@ -78,21 +75,19 @@ func (a *App) Run(ctx context.Context) error {
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
}
if !a.headless {
ui, err := terminal.StartUI(ctx, terminal.StartParams{
EventBus: a.eventBus,
Dispatcher: func(cmd event.Command) { a.dispatchC <- cmd },
Screen: a.screen,
ClipboardAvailable: a.clipboardAvailable,
ConfigFilePath: a.configFilePath,
BuildInfo: a.buildInfo,
Logger: a.logger.With("component", "ui"),
})
if err != nil {
return fmt.Errorf("start terminal user interface: %w", err)
}
defer ui.Close()
ui, err := terminal.StartUI(ctx, terminal.StartParams{
EventBus: a.eventBus,
Dispatcher: func(cmd event.Command) { a.dispatchC <- cmd },
Screen: a.screen,
ClipboardAvailable: a.clipboardAvailable,
ConfigFilePath: a.configFilePath,
BuildInfo: a.buildInfo,
Logger: a.logger.With("component", "ui"),
})
if err != nil {
return fmt.Errorf("start terminal user interface: %w", err)
}
defer ui.Close()
// emptyUI is a dummy function that sets the UI state to an empty state, and
// re-renders the screen.
@ -107,19 +102,6 @@ func (a *App) Run(ctx context.Context) error {
a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}})
}
// doFatalError publishes a fatal error to the event bus, waiting for the
// user to acknowledge it if not in headless mode.
doFatalError := func(msg string) {
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
if a.headless {
return
}
emptyUI()
<-a.dispatchC
}
containerClient, err := container.NewClient(ctx, a.dockerClient, a.logger.With("component", "container_client"))
if err != nil {
err = fmt.Errorf("create container client: %w", err)
@ -130,7 +112,10 @@ func (a *App) Run(ctx context.Context) error {
} else {
msg = err.Error()
}
doFatalError(msg)
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
emptyUI()
<-a.dispatchC
return err
}
defer containerClient.Close()
@ -160,7 +145,9 @@ func (a *App) Run(ctx context.Context) error {
})
if err != nil {
err = fmt.Errorf("create mediaserver: %w", err)
doFatalError(err.Error())
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: err.Error()})
emptyUI()
<-a.dispatchC
return err
}
defer srv.Close()
@ -177,21 +164,17 @@ func (a *App) Run(ctx context.Context) error {
defer uiUpdateT.Stop()
startMediaServerC := make(chan struct{}, 1)
if a.headless { // disable startup check in headless mode for now
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
startupErr = fmt.Errorf("startup check: %w", startupErr)
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: startupErr.Error()})
<-a.dispatchC
return startupErr
} else if ok {
startMediaServerC <- struct{}{}
} else {
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
doFatalError(startupErr.Error())
return startupErr
} else if ok {
startMediaServerC <- struct{}{}
}
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-startMediaServerC:
if err = srv.Start(ctx); err != nil {
return fmt.Errorf("start mediaserver: %w", err)
@ -210,12 +193,6 @@ func (a *App) Run(ctx context.Context) error {
updateUI()
case serverState := <-srv.C():
a.logger.Debug("Server state received", "state", serverState)
if serverState.ExitReason != "" {
doFatalError(serverState.ExitReason)
return errors.New("media server exited")
}
applyServerState(serverState, state)
updateUI()
case replState := <-repl.C():

View File

@ -132,7 +132,7 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(app.Params{
require.NoError(t, app.New(app.Params{
ConfigService: configService,
DockerClient: dockerClient,
Screen: &terminal.Screen{
@ -319,7 +319,7 @@ func TestIntegrationCustomHost(t *testing.T) {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
time.Sleep(time.Second)
@ -390,7 +390,7 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT(
@ -471,7 +471,7 @@ func TestIntegrationRestartDestination(t *testing.T) {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT(
@ -608,7 +608,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT(
@ -681,7 +681,7 @@ func TestIntegrationDestinationValidations(t *testing.T) {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT(
@ -823,7 +823,7 @@ func TestIntegrationStartupCheck(t *testing.T) {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT(
@ -892,17 +892,13 @@ func TestIntegrationMediaServerError(t *testing.T) {
done <- struct{}{}
}()
require.EqualError(
t,
app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx),
"media server exited",
)
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT(
t,
func(c *assert.CollectT) {
assert.True(c, contentsIncludes(getContents(), "Server process exited unexpectedly."), "expected to see title")
assert.True(c, contentsIncludes(getContents(), "Mediaserver error: Server process exited unexpectedly."), "expected to see title")
assert.True(c, contentsIncludes(getContents(), "address already in use"), "expected to see message")
},
waitTime,
@ -1073,7 +1069,7 @@ func TestIntegrationCopyURLs(t *testing.T) {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
require.NoError(t, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
time.Sleep(3 * time.Second)

View File

@ -486,6 +486,10 @@ func (ui *UI) captureScreen(screen tcell.Screen) {
func (ui *UI) handleAppStateChanged(evt event.AppStateChangedEvent) {
state := evt.State
if state.Source.ExitReason != "" {
ui.handleMediaServerClosed(state.Source.ExitReason)
}
ui.updatePullProgress(state)
ui.mu.Lock()
@ -680,6 +684,24 @@ func (ui *UI) hideModal(pageName string) {
ui.app.SetFocus(ui.destView)
}
func (ui *UI) handleMediaServerClosed(exitReason string) {
if ui.pages.HasPage(pageNameModalSourceError) {
return
}
modal := tview.NewModal()
modal.SetText("Mediaserver error: " + exitReason).
AddButtons([]string{"Quit"}).
SetBackgroundColor(tcell.ColorBlack).
SetTextColor(tcell.ColorWhite).
SetDoneFunc(func(int, string) {
ui.dispatch(event.CommandQuit{})
})
modal.SetBorderStyle(tcell.StyleDefault.Background(tcell.ColorBlack).Foreground(tcell.ColorWhite))
ui.pages.AddPage(pageNameModalSourceError, modal, true, true)
}
const dash = "—"
const (

60
main.go
View File

@ -3,14 +3,11 @@ package main
import (
"cmp"
"context"
"errors"
"flag"
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"os/signal"
"runtime/debug"
"syscall"
@ -30,25 +27,16 @@ var (
date string
)
var errShutdown = errors.New("shutdown")
func main() {
var exitStatus int
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := run(); errors.Is(err, errShutdown) {
exitStatus = 130
} else if err != nil {
exitStatus = 1
if err := run(ctx); err != nil {
_, _ = os.Stderr.WriteString("Error: " + err.Error() + "\n")
}
os.Exit(exitStatus)
}
func run() error {
ctx, cancel := context.WithCancelCause(context.Background())
defer cancel(nil)
func run(ctx context.Context) error {
configService, err := config.NewDefaultService()
if err != nil {
return fmt.Errorf("build config service: %w", err)
@ -84,26 +72,11 @@ func run() error {
if err != nil {
return fmt.Errorf("read or create config: %w", err)
}
headless := os.Getenv("OCTO_HEADLESS") != ""
logger, err := buildLogger(cfg.LogFile, headless)
logger, err := buildLogger(cfg.LogFile)
if err != nil {
return fmt.Errorf("build logger: %w", err)
}
if headless {
// When running in headless mode tview doesn't handle SIGINT for us.
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-ch
logger.Info("Received interrupt signal, exiting")
signal.Stop(ch)
cancel(errShutdown)
}()
}
var clipboardAvailable bool
if err = clipboard.Init(); err != nil {
logger.Warn("Clipboard not available", "err", err)
@ -127,7 +100,6 @@ func run() error {
app := app.New(app.Params{
ConfigService: configService,
DockerClient: dockerClient,
Headless: headless,
ClipboardAvailable: clipboardAvailable,
ConfigFilePath: configService.Path(),
BuildInfo: domain.BuildInfo{
@ -189,24 +161,10 @@ func printUsage() {
os.Stderr.WriteString("\n")
os.Stderr.WriteString("Additionally, Octoplex can be configured with the following environment variables:\n\n")
os.Stderr.WriteString(" OCTO_DEBUG Enables debug logging if set\n")
os.Stderr.WriteString(" OCTO_HEADLESS Enables headless mode if set (experimental)\n\n")
}
// buildLogger builds the logger, which may be a no-op logger.
func buildLogger(cfg config.LogFile, headless bool) (*slog.Logger, error) {
build := func(w io.Writer) *slog.Logger {
var handlerOpts slog.HandlerOptions
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
return slog.New(slog.NewTextHandler(w, &handlerOpts))
}
// In headless mode, always log to stderr.
if headless {
return build(os.Stderr), nil
}
func buildLogger(cfg config.LogFile) (*slog.Logger, error) {
if !cfg.Enabled {
return slog.New(slog.DiscardHandler), nil
}
@ -216,5 +174,9 @@ func buildLogger(cfg config.LogFile, headless bool) (*slog.Logger, error) {
return nil, fmt.Errorf("error opening log file: %w", err)
}
return build(fptr), nil
var handlerOpts slog.HandlerOptions
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
return slog.New(slog.NewTextHandler(fptr, &handlerOpts)), nil
}