Compare commits

..

No commits in common. "refactor/api" and "main" have entirely different histories.

25 changed files with 416 additions and 4034 deletions

10
go.mod
View File

@ -11,11 +11,7 @@ require (
github.com/rivo/tview v0.0.0-20250330220935-949945f8d922
github.com/stretchr/testify v1.10.0
github.com/testcontainers/testcontainers-go v0.35.0
github.com/urfave/cli/v2 v2.27.6
golang.design/x/clipboard v0.7.0
golang.org/x/sync v0.13.0
google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.36.3
gopkg.in/yaml.v3 v3.0.1
)
@ -28,7 +24,6 @@ require (
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/platforms v0.2.1 // indirect
github.com/cpuguy83/dockercfg v0.3.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
@ -70,7 +65,6 @@ require (
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rs/zerolog v1.33.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.7.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
@ -87,7 +81,6 @@ require (
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/vektra/mockery/v2 v2.52.2 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
@ -102,13 +95,12 @@ require (
golang.org/x/image v0.26.0 // indirect
golang.org/x/mobile v0.0.0-20250408133729-978277e7eaf7 // indirect
golang.org/x/mod v0.24.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/sync v0.13.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/term v0.31.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/time v0.9.0 // indirect
golang.org/x/tools v0.32.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

9
go.sum
View File

@ -18,8 +18,6 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA=
github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc=
github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -54,8 +52,6 @@ github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiU
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
@ -144,7 +140,6 @@ github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWN
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo=
github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k=
@ -190,12 +185,8 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/urfave/cli/v2 v2.27.6 h1:VdRdS98FNhKZ8/Az8B7MTyGQmpIr36O1EHybx/LaZ4g=
github.com/urfave/cli/v2 v2.27.6/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/vektra/mockery/v2 v2.52.2 h1:8QfPKUIrq8P3Cs7G79Iu4Byd5wdhGCE0quIS27x7rQo=
github.com/vektra/mockery/v2 v2.52.2/go.mod h1:zGDY/f6bip0Yh13GQ5j7xa43fuEoYBa4ICHEaihisHw=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=

View File

@ -1,13 +1,11 @@
package server
package app
import (
"cmp"
"context"
"errors"
"fmt"
"log"
"log/slog"
"net"
"slices"
"time"
@ -15,32 +13,38 @@ import (
"git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/mediaserver"
"git.netflux.io/rob/octoplex/internal/replicator"
"git.netflux.io/rob/octoplex/internal/terminal"
"github.com/docker/docker/client"
"google.golang.org/grpc"
)
// App is an instance of the app.
type App struct {
cfg config.Config
configService *config.Service
eventBus *event.Bus
dispatchC chan event.Command
dockerClient container.DockerClient
waitForClient bool
logger *slog.Logger
cfg config.Config
configService *config.Service
eventBus *event.Bus
dispatchC chan event.Command
dockerClient container.DockerClient
screen *terminal.Screen // Screen may be nil.
headless bool
clipboardAvailable bool
configFilePath string
buildInfo domain.BuildInfo
logger *slog.Logger
}
// Params holds the parameters for running the application.
type Params struct {
ConfigService *config.Service
DockerClient container.DockerClient
ChanSize int
ConfigFilePath string
WaitForClient bool
Logger *slog.Logger
ConfigService *config.Service
DockerClient container.DockerClient
ChanSize int
Screen *terminal.Screen // Screen may be nil.
Headless bool
ClipboardAvailable bool
ConfigFilePath string
BuildInfo domain.BuildInfo
Logger *slog.Logger
}
// defaultChanSize is the default size of the dispatch channel.
@ -49,13 +53,17 @@ const defaultChanSize = 64
// New creates a new application instance.
func New(params Params) *App {
return &App{
cfg: params.ConfigService.Current(),
configService: params.ConfigService,
eventBus: event.NewBus(params.Logger.With("component", "event_bus")),
dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)),
dockerClient: params.DockerClient,
waitForClient: params.WaitForClient,
logger: params.Logger,
cfg: params.ConfigService.Current(),
configService: params.ConfigService,
eventBus: event.NewBus(params.Logger.With("component", "event_bus")),
dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)),
dockerClient: params.DockerClient,
screen: params.Screen,
headless: params.Headless,
clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo,
logger: params.Logger,
}
}
@ -70,26 +78,20 @@ func (a *App) Run(ctx context.Context) error {
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
}
const grpcAddr = ":50051"
lis, err := net.Listen("tcp", grpcAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
defer lis.Close()
grpcServer := grpc.NewServer()
grpcDone := make(chan error, 1)
internalAPI := newServer(a.DispatchAsync, a.eventBus, a.logger)
pb.RegisterInternalAPIServer(grpcServer, internalAPI)
go func() {
a.logger.Info("gRPC server started", "addr", grpcAddr)
grpcDone <- grpcServer.Serve(lis)
}()
if a.waitForClient {
if err = internalAPI.WaitForClient(ctx); err != nil {
return fmt.Errorf("wait for client: %w", err)
if !a.headless {
ui, err := terminal.StartUI(ctx, terminal.StartParams{
EventBus: a.eventBus,
Dispatcher: func(cmd event.Command) { a.dispatchC <- cmd },
Screen: a.screen,
ClipboardAvailable: a.clipboardAvailable,
ConfigFilePath: a.configFilePath,
BuildInfo: a.buildInfo,
Logger: a.logger.With("component", "ui"),
})
if err != nil {
return fmt.Errorf("start terminal user interface: %w", err)
}
defer ui.Close()
}
// emptyUI is a dummy function that sets the UI state to an empty state, and
@ -105,13 +107,12 @@ func (a *App) Run(ctx context.Context) error {
a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}})
}
// doFatalError publishes a fatal error to the event bus. It will block until
// the user acknowledges it if there is 1 or more clients connected to the
// internal API.
// doFatalError publishes a fatal error to the event bus, waiting for the
// user to acknowledge it if not in headless mode.
doFatalError := func(msg string) {
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
if internalAPI.GetClientCount() == 0 {
if a.headless {
return
}
@ -176,20 +177,21 @@ func (a *App) Run(ctx context.Context) error {
defer uiUpdateT.Stop()
startMediaServerC := make(chan struct{}, 1)
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
doFatalError(startupErr.Error())
return startupErr
} else if ok {
if a.headless { // disable startup check in headless mode for now
startMediaServerC <- struct{}{}
} else {
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
doFatalError(startupErr.Error())
return startupErr
} else if ok {
startMediaServerC <- struct{}{}
}
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case grpcErr := <-grpcDone:
a.logger.Error("gRPC server exited", "err", grpcErr)
return grpcErr
case <-startMediaServerC:
if err = srv.Start(ctx); err != nil {
return fmt.Errorf("start mediaserver: %w", err)
@ -243,11 +245,6 @@ func (a *App) Dispatch(cmd event.Command) event.Event {
return <-ch
}
// DispatchAsync dispatches a command to be executed synchronously.
func (a *App) DispatchAsync(cmd event.Command) {
a.dispatchC <- cmd
}
// errExit is an error that indicates the app should exit.
var errExit = errors.New("exit")
@ -275,10 +272,6 @@ func (a *App) handleCommand(
}
}()
if c, ok := cmd.(syncCommand); ok {
cmd = c.Command
}
switch c := cmd.(type) {
case event.CommandAddDestination:
newCfg := a.cfg
@ -288,11 +281,10 @@ func (a *App) handleCommand(
})
if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Add destination failed", "err", err)
return event.AddDestinationFailedEvent{URL: c.URL, Err: err}, nil
return event.AddDestinationFailedEvent{Err: err}, nil
}
a.cfg = newCfg
a.handleConfigUpdate(state)
a.logger.Info("Destination added", "url", c.URL)
a.eventBus.Send(event.DestinationAddedEvent{URL: c.URL})
case event.CommandRemoveDestination:
repl.StopDestination(c.URL) // no-op if not live
@ -302,7 +294,7 @@ func (a *App) handleCommand(
})
if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Remove destination failed", "err", err)
a.eventBus.Send(event.RemoveDestinationFailedEvent{URL: c.URL, Err: err})
a.eventBus.Send(event.RemoveDestinationFailedEvent{Err: err})
break
}
a.cfg = newCfg
@ -310,7 +302,7 @@ func (a *App) handleCommand(
a.eventBus.Send(event.DestinationRemovedEvent{URL: c.URL}) //nolint:gosimple
case event.CommandStartDestination:
if !state.Source.Live {
a.eventBus.Send(event.StartDestinationFailedEvent{URL: c.URL, Message: "source not live"})
a.eventBus.Send(event.StartDestinationFailedEvent{})
break
}

View File

@ -1,4 +1,4 @@
package server
package app
import (
"testing"

View File

@ -1,11 +1,9 @@
//go:build integration
package client_test
package app_test
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
@ -16,79 +14,39 @@ import (
"testing"
"time"
"git.netflux.io/rob/octoplex/internal/client"
"git.netflux.io/rob/octoplex/internal/app"
"git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/server"
"git.netflux.io/rob/octoplex/internal/terminal"
"github.com/gdamore/tcell/v2"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"golang.org/x/sync/errgroup"
)
func buildClientServer(
func buildAppParams(
t *testing.T,
configService *config.Service,
dockerClient container.DockerClient,
screen tcell.SimulationScreen,
screenCaptureC chan<- terminal.ScreenCapture,
logger *slog.Logger,
) (*client.App, *server.App) {
client := client.New(client.NewParams{
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
) app.Params {
t.Helper()
return app.Params{
ConfigService: configService,
DockerClient: dockerClient,
Screen: &terminal.Screen{
Screen: screen,
Width: 180,
Height: 25,
CaptureC: screenCaptureC,
},
Logger: logger,
})
server := server.New(server.Params{
ConfigService: configService,
DockerClient: dockerClient,
WaitForClient: true,
Logger: logger,
})
return client, server
}
type clientServerResult struct {
errClient error
errServer error
}
func runClientServer(
ctx context.Context,
_ *testing.T,
clientApp *client.App,
serverApp *server.App,
) <-chan clientServerResult {
ch := make(chan clientServerResult, 1)
g, ctx := errgroup.WithContext(ctx)
var clientErr, srvErr error
g.Go(func() error {
srvErr = serverApp.Run(ctx)
return errors.New("server closed")
})
g.Go(func() error {
clientErr = clientApp.Run(ctx)
return errors.New("client closed")
})
go func() {
_ = g.Wait()
ch <- clientServerResult{errClient: clientErr, errServer: srvErr}
}()
return ch
ClipboardAvailable: false,
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
Logger: logger,
}
}
func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) {

View File

@ -1,6 +1,6 @@
//go:build integration
package client_test
package app_test
import (
"cmp"
@ -15,10 +15,12 @@ import (
"testing"
"time"
"git.netflux.io/rob/octoplex/internal/app"
"git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/container/mocks"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/terminal"
"git.netflux.io/rob/octoplex/internal/testhelpers"
"github.com/docker/docker/api/types/network"
dockerclient "github.com/docker/docker/client"
@ -124,8 +126,26 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
})
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(app.Params{
ConfigService: configService,
DockerClient: dockerClient,
Screen: &terminal.Screen{
Screen: screen,
Width: 160,
Height: 25,
CaptureC: screenCaptureC,
},
ClipboardAvailable: false,
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
Logger: logger,
}).Run(ctx))
}()
require.EventuallyWithT(
t,
@ -266,12 +286,13 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
printScreen(t, getContents, "After stopping the first destination")
// TODO:
// - Source error
// - Additional features (copy URL, etc.)
cancel()
result := <-ch
// May be a gRPC error, not context.Canceled:
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
<-done
}
func TestIntegrationCustomHost(t *testing.T) {
@ -292,8 +313,14 @@ func TestIntegrationCustomHost(t *testing.T) {
})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
time.Sleep(time.Second)
sendKey(t, screen, tcell.KeyF1, ' ')
@ -333,10 +360,7 @@ func TestIntegrationCustomHost(t *testing.T) {
cancel()
result := <-ch
// May be a gRPC error, not context.Canceled:
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
<-done
}
func TestIntegrationCustomTLSCerts(t *testing.T) {
@ -360,8 +384,14 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT(
t,
@ -396,9 +426,7 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
cancel()
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
<-done
}
func TestIntegrationRestartDestination(t *testing.T) {
@ -437,8 +465,14 @@ func TestIntegrationRestartDestination(t *testing.T) {
}},
})
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT(
t,
@ -550,9 +584,7 @@ func TestIntegrationRestartDestination(t *testing.T) {
cancel()
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
<-done
}
func TestIntegrationStartDestinationFailed(t *testing.T) {
@ -570,8 +602,14 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
Destinations: []config.Destination{{Name: "Example server", URL: "rtmp://rtmp.example.com/live"}},
})
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT(
t,
@ -620,9 +658,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
cancel()
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
<-done
}
func TestIntegrationDestinationValidations(t *testing.T) {
@ -639,8 +675,14 @@ func TestIntegrationDestinationValidations(t *testing.T) {
Sources: config.Sources{MediaServer: config.MediaServerSource{StreamKey: "live", RTMP: config.RTMPSource{Enabled: true}}},
})
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT(
t,
@ -744,9 +786,7 @@ func TestIntegrationDestinationValidations(t *testing.T) {
printScreen(t, getContents, "After entering a duplicate destination URL")
cancel()
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
<-done
}
func TestIntegrationStartupCheck(t *testing.T) {
@ -777,8 +817,14 @@ func TestIntegrationStartupCheck(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT(
t,
@ -822,9 +868,7 @@ func TestIntegrationStartupCheck(t *testing.T) {
printScreen(t, getContents, "After starting the mediaserver")
cancel()
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
<-done
}
func TestIntegrationMediaServerError(t *testing.T) {
@ -842,8 +886,18 @@ func TestIntegrationMediaServerError(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.EqualError(
t,
app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx),
"media server exited",
)
}()
require.EventuallyWithT(
t,
@ -857,12 +911,10 @@ func TestIntegrationMediaServerError(t *testing.T) {
)
printScreen(t, getContents, "Ater displaying the media server error modal")
// Quit the app:
// Quit the app, this should cause the done channel to receive.
sendKey(t, screen, tcell.KeyEnter, ' ')
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorContains(t, result.errServer, "media server exited")
<-done
}
func TestIntegrationDockerClientError(t *testing.T) {
@ -877,8 +929,18 @@ func TestIntegrationDockerClientError(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
client, server := buildClientServer(configService, &dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.EqualError(
t,
app.New(buildAppParams(t, configService, &dockerClient, screen, screenCaptureC, logger)).Run(ctx),
"create container client: network create: boom",
)
}()
require.EventuallyWithT(
t,
@ -892,12 +954,10 @@ func TestIntegrationDockerClientError(t *testing.T) {
)
printScreen(t, getContents, "Ater displaying the fatal error modal")
// Quit the app:
// Quit the app, this should cause the done channel to receive.
sendKey(t, screen, tcell.KeyEnter, ' ')
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.EqualError(t, result.errServer, "create container client: network create: boom")
<-done
}
func TestIntegrationDockerConnectionError(t *testing.T) {
@ -911,8 +971,16 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
err := app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)
require.ErrorContains(t, err, "dial tcp: lookup docker.example.com")
require.ErrorContains(t, err, "no such host")
}()
require.EventuallyWithT(
t,
@ -926,13 +994,10 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
)
printScreen(t, getContents, "Ater displaying the fatal error modal")
// Quit the app:
// Quit the app, this should cause the done channel to receive.
sendKey(t, screen, tcell.KeyEnter, ' ')
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorContains(t, result.errServer, "dial tcp: lookup docker.example.com")
assert.ErrorContains(t, result.errServer, "no such host")
<-done
}
func TestIntegrationCopyURLs(t *testing.T) {
@ -1002,8 +1067,14 @@ func TestIntegrationCopyURLs(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: tc.mediaServerConfig}})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
time.Sleep(3 * time.Second)
printScreen(t, getContents, "Ater loading the app")
@ -1024,9 +1095,7 @@ func TestIntegrationCopyURLs(t *testing.T) {
cancel()
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
<-done
})
}
}

View File

@ -1,131 +0,0 @@
package client
import (
"context"
"fmt"
"log/slog"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/protocol"
"git.netflux.io/rob/octoplex/internal/terminal"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// App is the client application.
type App struct {
bus *event.Bus
clipboardAvailable bool
buildInfo domain.BuildInfo
screen *terminal.Screen
logger *slog.Logger
}
// NewParams contains the parameters for the App.
type NewParams struct {
ClipboardAvailable bool
BuildInfo domain.BuildInfo
Screen *terminal.Screen
Logger *slog.Logger
}
// New creates a new App instance.
func New(params NewParams) *App {
return &App{
bus: event.NewBus(params.Logger),
clipboardAvailable: params.ClipboardAvailable,
buildInfo: params.BuildInfo,
screen: params.Screen,
logger: params.Logger,
}
}
// Run starts the application, and blocks until it is closed.
//
// It returns nil if the application was closed by the user, or an error if it
// closed for any other reason.
func (a *App) Run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
conn, err := grpc.NewClient("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to gRPC server: %w", err)
}
apiClient := pb.NewInternalAPIClient(conn)
stream, err := apiClient.Communicate(ctx)
if err != nil {
return fmt.Errorf("create gRPC stream: %w", err)
}
ui, err := terminal.NewUI(ctx, terminal.Params{
EventBus: a.bus,
Dispatcher: func(cmd event.Command) {
a.logger.Info("Command dispatched", "cmd", cmd.Name())
if sendErr := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: protocol.CommandToProto(cmd)}}); sendErr != nil {
a.logger.Error("Error sending command to gRPC API", "err", sendErr)
}
},
ClipboardAvailable: a.clipboardAvailable,
BuildInfo: a.buildInfo,
Screen: a.screen,
Logger: a.logger.With("component", "ui"),
})
if err != nil {
return fmt.Errorf("start terminal user interface: %w", err)
}
defer ui.Close()
g.Go(func() error { return ui.Run(ctx) })
// After the UI is available, perform a handshake with the server.
// Ordering is important here. We want to ensure that the UI is ready to
// react to events received from the server. Performing the handshake ensures
// the client has received at least one event.
if err := a.doHandshake(stream); err != nil {
return fmt.Errorf("do handshake: %w", err)
}
g.Go(func() error {
for {
envelope, recErr := stream.Recv()
if recErr != nil {
return fmt.Errorf("receive envelope: %w", recErr)
}
pbEvt := envelope.GetEvent()
if pbEvt == nil {
a.logger.Error("Received envelope without event")
continue
}
evt := protocol.EventFromProto(pbEvt)
a.logger.Debug("Received event from gRPC stream", "event", evt.EventName(), "payload", evt)
a.bus.Send(evt)
}
})
if err := g.Wait(); err == terminal.ErrUserClosed {
return nil
} else {
return fmt.Errorf("errgroup.Wait: %w", err)
}
}
func (a *App) doHandshake(stream pb.InternalAPI_CommunicateClient) error {
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: &pb.Command{CommandType: &pb.Command_StartHandshake{}}}}); err != nil {
return fmt.Errorf("send start handshake command: %w", err)
}
env, err := stream.Recv()
if err != nil {
return fmt.Errorf("receive handshake completed event: %w", err)
}
if evt := env.GetEvent(); evt == nil || evt.GetHandshakeCompleted() == nil {
return fmt.Errorf("expected handshake completed event but got: %T", env)
}
return nil
}

View File

@ -2,7 +2,6 @@ package event
import (
"log/slog"
"slices"
"sync"
)
@ -32,21 +31,6 @@ func (b *Bus) Register() <-chan Event {
return ch
}
// Deregister deregisters a consumer for all events.
func (b *Bus) Deregister(ch <-chan Event) {
b.mu.Lock()
defer b.mu.Unlock()
b.consumers = slices.DeleteFunc(b.consumers, func(other chan Event) bool {
if ch == other {
close(other)
return true
}
return false
})
}
// Send sends an event to all registered consumers.
func (b *Bus) Send(evt Event) {
// The mutex is needed to ensure the backing array of b.consumers cannot be
@ -59,7 +43,7 @@ func (b *Bus) Send(evt Event) {
select {
case ch <- evt:
default:
b.logger.Warn("Event dropped", "name", evt.EventName())
b.logger.Warn("Event dropped", "name", evt.name())
}
}
}

View File

@ -6,7 +6,6 @@ import (
"git.netflux.io/rob/octoplex/internal/event"
"git.netflux.io/rob/octoplex/internal/testhelpers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestBus(t *testing.T) {
@ -26,19 +25,5 @@ func TestBus(t *testing.T) {
}()
assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent))
bus.Deregister(ch1)
_, ok := <-ch1
assert.False(t, ok)
select {
case <-ch2:
require.Fail(t, "ch2 should be blocking")
default:
}
}

View File

@ -50,8 +50,6 @@ func (c CommandCloseOtherInstance) Name() string {
}
// CommandQuit quits the app.
//
// TODO: consider how this should look in a client/server architecture.
type CommandQuit struct{}
// Name implements the Command interface.

View File

@ -19,7 +19,7 @@ const (
// Event represents something which happened in the appllication.
type Event interface {
EventName() Name
name() Name
}
// AppStateChangedEvent is emitted when the application state changes.
@ -27,7 +27,7 @@ type AppStateChangedEvent struct {
State domain.AppState
}
func (e AppStateChangedEvent) EventName() Name {
func (e AppStateChangedEvent) name() Name {
return EventNameAppStateChanged
}
@ -36,17 +36,16 @@ type DestinationAddedEvent struct {
URL string
}
func (e DestinationAddedEvent) EventName() Name {
func (e DestinationAddedEvent) name() Name {
return EventNameDestinationAdded
}
// AddDestinationFailedEvent is emitted when a destination fails to be added.
type AddDestinationFailedEvent struct {
URL string
Err error
}
func (e AddDestinationFailedEvent) EventName() Name {
func (e AddDestinationFailedEvent) name() Name {
return EventNameAddDestinationFailed
}
@ -56,17 +55,14 @@ type DestinationStreamExitedEvent struct {
Err error
}
func (e DestinationStreamExitedEvent) EventName() Name {
func (e DestinationStreamExitedEvent) name() Name {
return EventNameDestinationStreamExited
}
// StartDestinationFailedEvent is emitted when a destination fails to start.
type StartDestinationFailedEvent struct {
URL string
Message string
}
type StartDestinationFailedEvent struct{}
func (e StartDestinationFailedEvent) EventName() Name {
func (e StartDestinationFailedEvent) name() Name {
return EventNameStartDestinationFailed
}
@ -76,18 +72,17 @@ type DestinationRemovedEvent struct {
URL string
}
func (e DestinationRemovedEvent) EventName() Name {
func (e DestinationRemovedEvent) name() Name {
return EventNameDestinationRemoved
}
// RemoveDestinationFailedEvent is emitted when a destination fails to be
// removed.
type RemoveDestinationFailedEvent struct {
URL string
Err error
}
func (e RemoveDestinationFailedEvent) EventName() Name {
func (e RemoveDestinationFailedEvent) name() Name {
return EventNameRemoveDestinationFailed
}
@ -100,11 +95,11 @@ type FatalErrorOccurredEvent struct {
// OtherInstanceDetectedEvent is emitted when the app launches and detects another instance.
type OtherInstanceDetectedEvent struct{}
func (e OtherInstanceDetectedEvent) EventName() Name {
func (e OtherInstanceDetectedEvent) name() Name {
return EventNameOtherInstanceDetected
}
func (e FatalErrorOccurredEvent) EventName() Name {
func (e FatalErrorOccurredEvent) name() Name {
return "fatal_error_occurred"
}
@ -114,6 +109,6 @@ type MediaServerStartedEvent struct {
RTMPSURL string
}
func (e MediaServerStartedEvent) EventName() Name {
func (e MediaServerStartedEvent) name() Name {
return "media_server_started"
}

File diff suppressed because it is too large Load Diff

View File

@ -1,137 +0,0 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v6.30.1
// source: api.proto
package grpc
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// InternalAPIClient is the client API for InternalAPI service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type InternalAPIClient interface {
Communicate(ctx context.Context, opts ...grpc.CallOption) (InternalAPI_CommunicateClient, error)
}
type internalAPIClient struct {
cc grpc.ClientConnInterface
}
func NewInternalAPIClient(cc grpc.ClientConnInterface) InternalAPIClient {
return &internalAPIClient{cc}
}
func (c *internalAPIClient) Communicate(ctx context.Context, opts ...grpc.CallOption) (InternalAPI_CommunicateClient, error) {
stream, err := c.cc.NewStream(ctx, &InternalAPI_ServiceDesc.Streams[0], "/api.InternalAPI/Communicate", opts...)
if err != nil {
return nil, err
}
x := &internalAPICommunicateClient{stream}
return x, nil
}
type InternalAPI_CommunicateClient interface {
Send(*Envelope) error
Recv() (*Envelope, error)
grpc.ClientStream
}
type internalAPICommunicateClient struct {
grpc.ClientStream
}
func (x *internalAPICommunicateClient) Send(m *Envelope) error {
return x.ClientStream.SendMsg(m)
}
func (x *internalAPICommunicateClient) Recv() (*Envelope, error) {
m := new(Envelope)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// InternalAPIServer is the server API for InternalAPI service.
// All implementations must embed UnimplementedInternalAPIServer
// for forward compatibility
type InternalAPIServer interface {
Communicate(InternalAPI_CommunicateServer) error
mustEmbedUnimplementedInternalAPIServer()
}
// UnimplementedInternalAPIServer must be embedded to have forward compatible implementations.
type UnimplementedInternalAPIServer struct {
}
func (UnimplementedInternalAPIServer) Communicate(InternalAPI_CommunicateServer) error {
return status.Errorf(codes.Unimplemented, "method Communicate not implemented")
}
func (UnimplementedInternalAPIServer) mustEmbedUnimplementedInternalAPIServer() {}
// UnsafeInternalAPIServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to InternalAPIServer will
// result in compilation errors.
type UnsafeInternalAPIServer interface {
mustEmbedUnimplementedInternalAPIServer()
}
func RegisterInternalAPIServer(s grpc.ServiceRegistrar, srv InternalAPIServer) {
s.RegisterService(&InternalAPI_ServiceDesc, srv)
}
func _InternalAPI_Communicate_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(InternalAPIServer).Communicate(&internalAPICommunicateServer{stream})
}
type InternalAPI_CommunicateServer interface {
Send(*Envelope) error
Recv() (*Envelope, error)
grpc.ServerStream
}
type internalAPICommunicateServer struct {
grpc.ServerStream
}
func (x *internalAPICommunicateServer) Send(m *Envelope) error {
return x.ServerStream.SendMsg(m)
}
func (x *internalAPICommunicateServer) Recv() (*Envelope, error) {
m := new(Envelope)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// InternalAPI_ServiceDesc is the grpc.ServiceDesc for InternalAPI service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var InternalAPI_ServiceDesc = grpc.ServiceDesc{
ServiceName: "api.InternalAPI",
HandlerType: (*InternalAPIServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "Communicate",
Handler: _InternalAPI_Communicate_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "api.proto",
}

View File

@ -1,110 +0,0 @@
package protocol
import (
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
)
// CommandToProto converts a command to a protobuf message.
func CommandToProto(command event.Command) *pb.Command {
switch evt := command.(type) {
case event.CommandAddDestination:
return buildAddDestinationCommand(evt)
case event.CommandRemoveDestination:
return buildRemoveDestinationCommand(evt)
case event.CommandStartDestination:
return buildStartDestinationCommand(evt)
case event.CommandStopDestination:
return buildStopDestinationCommand(evt)
case event.CommandCloseOtherInstance:
return buildCloseOtherInstanceCommand(evt)
case event.CommandQuit:
return buildQuitCommand(evt)
default:
panic("unknown command type")
}
}
func buildAddDestinationCommand(cmd event.CommandAddDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_AddDestination{AddDestination: &pb.AddDestinationCommand{Name: cmd.DestinationName, Url: cmd.URL}}}
}
func buildRemoveDestinationCommand(cmd event.CommandRemoveDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_RemoveDestination{RemoveDestination: &pb.RemoveDestinationCommand{Url: cmd.URL}}}
}
func buildStartDestinationCommand(cmd event.CommandStartDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_StartDestination{StartDestination: &pb.StartDestinationCommand{Url: cmd.URL}}}
}
func buildStopDestinationCommand(cmd event.CommandStopDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_StopDestination{StopDestination: &pb.StopDestinationCommand{Url: cmd.URL}}}
}
func buildCloseOtherInstanceCommand(event.CommandCloseOtherInstance) *pb.Command {
return &pb.Command{CommandType: &pb.Command_CloseOtherInstances{CloseOtherInstances: &pb.CloseOtherInstancesCommand{}}}
}
func buildQuitCommand(event.CommandQuit) *pb.Command {
return &pb.Command{CommandType: &pb.Command_Quit{Quit: &pb.QuitCommand{}}}
}
// CommandFromProto converts a protobuf message to a command.
func CommandFromProto(pbCmd *pb.Command) event.Command {
if pbCmd == nil || pbCmd.CommandType == nil {
panic("invalid or nil pb.Command")
}
switch cmd := pbCmd.CommandType.(type) {
case *pb.Command_AddDestination:
return parseAddDestinationCommand(cmd.AddDestination)
case *pb.Command_RemoveDestination:
return parseRemoveDestinationCommand(cmd.RemoveDestination)
case *pb.Command_StartDestination:
return parseStartDestinationCommand(cmd.StartDestination)
case *pb.Command_StopDestination:
return parseStopDestinationCommand(cmd.StopDestination)
case *pb.Command_CloseOtherInstances:
return parseCloseOtherInstanceCommand(cmd.CloseOtherInstances)
case *pb.Command_Quit:
return parseQuitCommand(cmd.Quit)
default:
panic("unknown pb.Command type")
}
}
func parseAddDestinationCommand(cmd *pb.AddDestinationCommand) event.Command {
if cmd == nil {
panic("nil AddDestinationCommand")
}
return event.CommandAddDestination{DestinationName: cmd.Name, URL: cmd.Url}
}
func parseRemoveDestinationCommand(cmd *pb.RemoveDestinationCommand) event.Command {
if cmd == nil {
panic("nil RemoveDestinationCommand")
}
return event.CommandRemoveDestination{URL: cmd.Url}
}
func parseStartDestinationCommand(cmd *pb.StartDestinationCommand) event.Command {
if cmd == nil {
panic("nil StartDestinationCommand")
}
return event.CommandStartDestination{URL: cmd.Url}
}
func parseStopDestinationCommand(cmd *pb.StopDestinationCommand) event.Command {
if cmd == nil {
panic("nil StopDestinationCommand")
}
return event.CommandStopDestination{URL: cmd.Url}
}
func parseCloseOtherInstanceCommand(_ *pb.CloseOtherInstancesCommand) event.Command {
return event.CommandCloseOtherInstance{}
}
func parseQuitCommand(_ *pb.QuitCommand) event.Command {
return event.CommandQuit{}
}

View File

@ -1,121 +0,0 @@
package protocol
import (
"errors"
"git.netflux.io/rob/octoplex/internal/domain"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
func containerToProto(c domain.Container) *pb.Container {
var errString string
if c.Err != nil {
errString = c.Err.Error()
}
var exitCode *int32
if c.ExitCode != nil {
code := int32(*c.ExitCode)
exitCode = &code
}
return &pb.Container{
Id: c.ID,
Status: c.Status,
HealthState: c.HealthState,
CpuPercent: c.CPUPercent,
MemoryUsageBytes: c.MemoryUsageBytes,
RxRate: int32(c.RxRate),
TxRate: int32(c.TxRate),
RxSince: timestamppb.New(c.RxSince),
ImageName: c.ImageName,
PullStatus: c.PullStatus,
PullProgress: c.PullProgress,
PullPercent: int32(c.PullPercent),
RestartCount: int32(c.RestartCount),
ExitCode: exitCode,
Err: errString,
}
}
func protoToContainer(pbCont *pb.Container) domain.Container {
if pbCont == nil {
return domain.Container{}
}
var exitCode *int
if pbCont.ExitCode != nil {
val := int(*pbCont.ExitCode)
exitCode = &val
}
var err error
if pbCont.Err != "" {
err = errors.New(pbCont.Err)
}
return domain.Container{
ID: pbCont.Id,
Status: pbCont.Status,
HealthState: pbCont.HealthState,
CPUPercent: pbCont.CpuPercent,
MemoryUsageBytes: pbCont.MemoryUsageBytes,
RxRate: int(pbCont.RxRate),
TxRate: int(pbCont.TxRate),
RxSince: pbCont.RxSince.AsTime(),
ImageName: pbCont.ImageName,
PullStatus: pbCont.PullStatus,
PullProgress: pbCont.PullProgress,
PullPercent: int(pbCont.PullPercent),
RestartCount: int(pbCont.RestartCount),
ExitCode: exitCode,
Err: err,
}
}
func destinationsToProto(inDests []domain.Destination) []*pb.Destination {
destinations := make([]*pb.Destination, 0, len(inDests))
for _, d := range inDests {
destinations = append(destinations, destinationToProto(d))
}
return destinations
}
func destinationToProto(d domain.Destination) *pb.Destination {
return &pb.Destination{
Container: containerToProto(d.Container),
Status: destinationStatusToProto(d.Status),
Name: d.Name,
Url: d.URL,
}
}
func protoToDestinations(pbDests []*pb.Destination) []domain.Destination {
if pbDests == nil {
return nil
}
dests := make([]domain.Destination, 0, len(pbDests))
for _, pbDest := range pbDests {
if pbDest == nil {
continue
}
dests = append(dests, domain.Destination{
Container: protoToContainer(pbDest.Container),
Status: domain.DestinationStatus(pbDest.Status), // direct cast, same underlying int
Name: pbDest.Name,
URL: pbDest.Url,
})
}
return dests
}
func destinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status {
switch s {
case domain.DestinationStatusStarting:
return pb.Destination_STATUS_STARTING
case domain.DestinationStatusLive:
return pb.Destination_STATUS_LIVE
default:
return pb.Destination_STATUS_OFF_AIR
}
}

View File

@ -1,252 +0,0 @@
package protocol
import (
"errors"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
// EventToProto converts an event to a protobuf message.
func EventToProto(ev event.Event) *pb.Event {
switch evt := ev.(type) {
case event.AppStateChangedEvent:
return buildAppStateChangeEvent(evt)
case event.DestinationAddedEvent:
return buildDestinationAddedEvent(evt)
case event.AddDestinationFailedEvent:
return buildAddDestinationFailedEvent(evt)
case event.DestinationStreamExitedEvent:
return buildDestinationStreamExitedEvent(evt)
case event.StartDestinationFailedEvent:
return buildStartDestinationFailedEvent(evt)
case event.DestinationRemovedEvent:
return buildDestinationRemovedEvent(evt)
case event.RemoveDestinationFailedEvent:
return buildRemoveDestinationFailedEvent(evt)
case event.FatalErrorOccurredEvent:
return buildFatalErrorOccurredEvent(evt)
case event.OtherInstanceDetectedEvent:
return buildOtherInstanceDetectedEvent(evt)
case event.MediaServerStartedEvent:
return buildMediaServerStartedEvent(evt)
default:
panic("unknown event type")
}
}
func buildAppStateChangeEvent(evt event.AppStateChangedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_AppStateChanged{
AppStateChanged: &pb.AppStateChangedEvent{
AppState: &pb.AppState{
Source: &pb.Source{
Container: containerToProto(evt.State.Source.Container),
Live: evt.State.Source.Live,
LiveChangedAt: timestamppb.New(evt.State.Source.LiveChangedAt),
Tracks: evt.State.Source.Tracks,
ExitReason: evt.State.Source.ExitReason,
},
Destinations: destinationsToProto(evt.State.Destinations),
BuildInfo: &pb.BuildInfo{
GoVersion: evt.State.BuildInfo.GoVersion,
Version: evt.State.BuildInfo.Version,
Commit: evt.State.BuildInfo.Commit,
Date: evt.State.BuildInfo.Date,
},
},
},
},
}
}
func buildDestinationAddedEvent(evt event.DestinationAddedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_DestinationAdded{
DestinationAdded: &pb.DestinationAddedEvent{Url: evt.URL},
},
}
}
func buildAddDestinationFailedEvent(evt event.AddDestinationFailedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_AddDestinationFailed{
AddDestinationFailed: &pb.AddDestinationFailedEvent{Url: evt.URL, Error: evt.Err.Error()},
},
}
}
func buildDestinationStreamExitedEvent(evt event.DestinationStreamExitedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_DestinationStreamExited{
DestinationStreamExited: &pb.DestinationStreamExitedEvent{Name: evt.Name, Error: evt.Err.Error()},
},
}
}
func buildStartDestinationFailedEvent(evt event.StartDestinationFailedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_StartDestinationFailed{
StartDestinationFailed: &pb.StartDestinationFailedEvent{Url: evt.URL, Message: evt.Message},
},
}
}
func buildDestinationRemovedEvent(evt event.DestinationRemovedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_DestinationRemoved{
DestinationRemoved: &pb.DestinationRemovedEvent{Url: evt.URL},
},
}
}
func buildRemoveDestinationFailedEvent(evt event.RemoveDestinationFailedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_RemoveDestinationFailed{
RemoveDestinationFailed: &pb.RemoveDestinationFailedEvent{Url: evt.URL, Error: evt.Err.Error()},
},
}
}
func buildFatalErrorOccurredEvent(evt event.FatalErrorOccurredEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_FatalError{
FatalError: &pb.FatalErrorEvent{Message: evt.Message},
},
}
}
func buildOtherInstanceDetectedEvent(_ event.OtherInstanceDetectedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_OtherInstanceDetected{
OtherInstanceDetected: &pb.OtherInstanceDetectedEvent{},
},
}
}
func buildMediaServerStartedEvent(evt event.MediaServerStartedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_MediaServerStarted{
MediaServerStarted: &pb.MediaServerStartedEvent{RtmpUrl: evt.RTMPURL, RtmpsUrl: evt.RTMPSURL},
},
}
}
// EventFromProto converts a protobuf message to an event.
func EventFromProto(pbEv *pb.Event) event.Event {
if pbEv == nil || pbEv.EventType == nil {
panic("invalid or nil pb.Event")
}
switch evt := pbEv.EventType.(type) {
case *pb.Event_AppStateChanged:
return parseAppStateChangedEvent(evt.AppStateChanged)
case *pb.Event_DestinationAdded:
return parseDestinationAddedEvent(evt.DestinationAdded)
case *pb.Event_AddDestinationFailed:
return parseAddDestinationFailedEvent(evt.AddDestinationFailed)
case *pb.Event_DestinationStreamExited:
return parseDestinationStreamExitedEvent(evt.DestinationStreamExited)
case *pb.Event_StartDestinationFailed:
return parseStartDestinationFailedEvent(evt.StartDestinationFailed)
case *pb.Event_DestinationRemoved:
return parseDestinationRemovedEvent(evt.DestinationRemoved)
case *pb.Event_RemoveDestinationFailed:
return parseRemoveDestinationFailedEvent(evt.RemoveDestinationFailed)
case *pb.Event_FatalError:
return parseFatalErrorOccurredEvent(evt.FatalError)
case *pb.Event_OtherInstanceDetected:
return parseOtherInstanceDetectedEvent(evt.OtherInstanceDetected)
case *pb.Event_MediaServerStarted:
return parseMediaServerStartedEvent(evt.MediaServerStarted)
default:
panic("unknown pb.Event type")
}
}
func parseAppStateChangedEvent(evt *pb.AppStateChangedEvent) event.Event {
if evt == nil || evt.AppState == nil || evt.AppState.Source == nil {
panic("invalid AppStateChangedEvent")
}
return event.AppStateChangedEvent{
State: domain.AppState{
Source: domain.Source{
Container: protoToContainer(evt.AppState.Source.Container),
Live: evt.AppState.Source.Live,
LiveChangedAt: evt.AppState.Source.LiveChangedAt.AsTime(),
Tracks: evt.AppState.Source.Tracks,
ExitReason: evt.AppState.Source.ExitReason,
},
Destinations: protoToDestinations(evt.AppState.Destinations),
BuildInfo: domain.BuildInfo{
GoVersion: evt.AppState.BuildInfo.GoVersion,
Version: evt.AppState.BuildInfo.Version,
Commit: evt.AppState.BuildInfo.Commit,
Date: evt.AppState.BuildInfo.Date,
},
},
}
}
func parseDestinationAddedEvent(evt *pb.DestinationAddedEvent) event.Event {
if evt == nil {
panic("nil DestinationAddedEvent")
}
return event.DestinationAddedEvent{URL: evt.Url}
}
func parseAddDestinationFailedEvent(evt *pb.AddDestinationFailedEvent) event.Event {
if evt == nil {
panic("nil AddDestinationFailedEvent")
}
return event.AddDestinationFailedEvent{URL: evt.Url, Err: errors.New(evt.Error)}
}
func parseDestinationStreamExitedEvent(evt *pb.DestinationStreamExitedEvent) event.Event {
if evt == nil {
panic("nil DestinationStreamExitedEvent")
}
return event.DestinationStreamExitedEvent{Name: evt.Name, Err: errors.New(evt.Error)}
}
func parseStartDestinationFailedEvent(evt *pb.StartDestinationFailedEvent) event.Event {
if evt == nil {
panic("nil StartDestinationFailedEvent")
}
return event.StartDestinationFailedEvent{URL: evt.Url, Message: evt.Message}
}
func parseDestinationRemovedEvent(evt *pb.DestinationRemovedEvent) event.Event {
if evt == nil {
panic("nil DestinationRemovedEvent")
}
return event.DestinationRemovedEvent{URL: evt.Url}
}
func parseRemoveDestinationFailedEvent(evt *pb.RemoveDestinationFailedEvent) event.Event {
if evt == nil {
panic("nil RemoveDestinationFailedEvent")
}
return event.RemoveDestinationFailedEvent{URL: evt.Url, Err: errors.New(evt.Error)}
}
func parseFatalErrorOccurredEvent(evt *pb.FatalErrorEvent) event.Event {
if evt == nil {
panic("nil FatalErrorEvent")
}
return event.FatalErrorOccurredEvent{Message: evt.Message}
}
func parseOtherInstanceDetectedEvent(_ *pb.OtherInstanceDetectedEvent) event.Event {
return event.OtherInstanceDetectedEvent{}
}
func parseMediaServerStartedEvent(evt *pb.MediaServerStartedEvent) event.Event {
if evt == nil {
panic("nil MediaServerStartedEvent")
}
return event.MediaServerStartedEvent{RTMPURL: evt.RtmpUrl, RTMPSURL: evt.RtmpsUrl}
}

View File

@ -1,151 +0,0 @@
package server
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"sync"
"time"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/protocol"
"golang.org/x/sync/errgroup"
)
// APIListenerCountDeltaFunc is a function that is called when the number of
// API clients increments or decrements.
type APIClientCountDeltaFunc func(delta int)
// Server is the gRPC server that handles incoming commands and outgoing
// events.
type Server struct {
pb.UnimplementedInternalAPIServer
dispatcher func(event.Command)
bus *event.Bus
logger *slog.Logger
mu sync.Mutex
clientCount int
clientC chan struct{}
}
// newServer creates a new gRPC server.
func newServer(
dispatcher func(event.Command),
bus *event.Bus,
logger *slog.Logger,
) *Server {
return &Server{
dispatcher: dispatcher,
bus: bus,
clientC: make(chan struct{}, 1),
logger: logger.With("component", "server"),
}
}
func (s *Server) Communicate(stream pb.InternalAPI_CommunicateServer) error {
g, ctx := errgroup.WithContext(stream.Context())
// perform handshake:
startHandshakeCmd, err := stream.Recv()
if err != nil {
return fmt.Errorf("receive start handshake command: %w", err)
}
if startHandshakeCmd.GetCommand() == nil || startHandshakeCmd.GetCommand().GetStartHandshake() == nil {
return fmt.Errorf("expected start handshake command but got: %T", startHandshakeCmd)
}
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: &pb.Event{EventType: &pb.Event_HandshakeCompleted{}}}}); err != nil {
return fmt.Errorf("send handshake completed event: %w", err)
}
// Notify that a client has connected and completed the handshake.
select {
case s.clientC <- struct{}{}:
default:
}
g.Go(func() error {
eventsC := s.bus.Register()
defer s.bus.Deregister(eventsC)
for {
select {
case evt := <-eventsC:
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: protocol.EventToProto(evt)}}); err != nil {
return fmt.Errorf("send event: %w", err)
}
case <-ctx.Done():
return ctx.Err()
}
}
})
g.Go(func() error {
s.mu.Lock()
s.clientCount++
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.clientCount--
s.mu.Unlock()
}()
for {
in, err := stream.Recv()
if err == io.EOF {
s.logger.Info("Client disconnected")
return err
}
if err != nil {
return fmt.Errorf("receive message: %w", err)
}
switch pbCmd := in.Payload.(type) {
case *pb.Envelope_Command:
cmd := protocol.CommandFromProto(pbCmd.Command)
s.logger.Info("Received command", "command", cmd.Name())
s.dispatcher(cmd)
default:
return fmt.Errorf("expected command but got: %T", pbCmd)
}
}
})
if err := g.Wait(); err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
s.logger.Error("Client stream closed with error", "err", err)
return fmt.Errorf("errgroup.Wait: %w", err)
}
s.logger.Info("Client stream closed")
return nil
}
// GetClientCount returns the number of connected clients.
func (s *Server) GetClientCount() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.clientCount
}
const waitForClientTimeout = 10 * time.Second
// WaitForClient waits for _any_ client to connect and complete the handshake.
// It times out if no client has connected after 10 seconds.
func (s *Server) WaitForClient(ctx context.Context) error {
select {
case <-s.clientC:
return nil
case <-time.After(waitForClientTimeout):
return errors.New("timeout")
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -3,7 +3,6 @@ package terminal
import (
"cmp"
"context"
"errors"
"fmt"
"log/slog"
"maps"
@ -45,9 +44,9 @@ type UI struct {
eventBus *event.Bus
dispatch func(event.Command)
clipboardAvailable bool
configFilePath string
rtmpURL, rtmpsURL string
buildInfo domain.BuildInfo
appExitC chan error
logger *slog.Logger
// tview state
@ -93,20 +92,20 @@ type ScreenCapture struct {
Width, Height int
}
// Params contains the parameters for starting a new terminal user
// StartParams contains the parameters for starting a new terminal user
// interface.
type Params struct {
type StartParams struct {
EventBus *event.Bus
Dispatcher func(event.Command)
Logger *slog.Logger
ClipboardAvailable bool
ConfigFilePath string
BuildInfo domain.BuildInfo
Screen *Screen // Screen may be nil.
}
// NewUI creates the user interface. Call [Run] on the *UI instance to block
// until it is completed.
func NewUI(ctx context.Context, params Params) (*UI, error) {
// StartUI starts the terminal user interface.
func StartUI(ctx context.Context, params StartParams) (*UI, error) {
app := tview.NewApplication()
var screen tcell.Screen
@ -212,7 +211,7 @@ func NewUI(ctx context.Context, params Params) (*UI, error) {
eventBus: params.EventBus,
dispatch: params.Dispatcher,
clipboardAvailable: params.ClipboardAvailable,
appExitC: make(chan error, 1),
configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo,
logger: params.Logger,
app: app,
@ -238,6 +237,8 @@ func NewUI(ctx context.Context, params Params) (*UI, error) {
app.SetInputCapture(ui.inputCaptureHandler)
app.SetAfterDrawFunc(ui.afterDrawHandler)
go ui.run(ctx)
return ui, nil
}
@ -261,32 +262,32 @@ func (ui *UI) renderAboutView() {
ui.aboutView.AddItem(rtmpsURLView, 1, 0, false)
}
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
}
var ErrUserClosed = errors.New("user closed UI")
func (ui *UI) run(ctx context.Context) {
defer func() {
// Ensure the application is stopped when the UI is closed.
ui.dispatch(event.CommandQuit{})
}()
// Run runs the user interface. It always returns a non-nil error, which will
// be [ErrUserClosed] if the user voluntarily closed the UI.
func (ui *UI) Run(ctx context.Context) error {
eventC := ui.eventBus.Register()
defer ui.eventBus.Deregister(eventC)
uiDone := make(chan struct{})
go func() {
err := ui.app.Run()
if err != nil {
ui.logger.Error("Error in UI run loop, exiting", "err", err)
defer func() {
uiDone <- struct{}{}
}()
if err := ui.app.Run(); err != nil {
ui.logger.Error("tui application error", "err", err)
}
ui.appExitC <- err
}()
for {
select {
case evt, ok := <-eventC:
if !ok {
// should never happen
return errors.New("event channel closed")
}
case evt := <-eventC:
ui.app.QueueUpdateDraw(func() {
switch evt := evt.(type) {
case event.AppStateChangedEvent:
@ -312,11 +313,12 @@ func (ui *UI) Run(ctx context.Context) error {
default:
ui.logger.Warn("unhandled event", "event", evt)
}
})
case <-ctx.Done():
return ctx.Err()
case err := <-ui.appExitC:
return cmp.Or(err, ErrUserClosed)
return
case <-uiDone:
return
}
}
}
@ -356,6 +358,8 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
return nil
case ' ':
ui.toggleDestination()
case 'c', 'C':
ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath)
case '?':
ui.showAbout()
case 'k': // tview vim bindings
@ -821,11 +825,6 @@ func (ui *UI) Close() {
ui.app.Stop()
}
// Wait waits for the terminal user interface to finish.
func (ui *UI) Wait() {
<-ui.appExitC
}
func (ui *UI) addDestination() {
const (
inputLen = 60
@ -1007,6 +1006,28 @@ func (ui *UI) copySourceURLToClipboard(url string) {
)
}
func (ui *UI) copyConfigFilePathToClipboard(clipboardAvailable bool, configFilePath string) {
var text string
if clipboardAvailable {
if configFilePath != "" {
clipboard.Write(clipboard.FmtText, []byte(configFilePath))
text = "Configuration file path copied to clipboard:\n\n" + configFilePath
} else {
text = "Configuration file path not set"
}
} else {
text = "Copy to clipboard not available"
}
ui.showModal(
pageNameModalClipboard,
text,
[]string{"Ok"},
false,
nil,
)
}
func (ui *UI) confirmQuit() {
ui.showModal(
pageNameModalQuit,
@ -1015,7 +1036,7 @@ func (ui *UI) confirmQuit() {
false,
func(buttonIndex int, _ string) {
if buttonIndex == 0 {
ui.app.Stop()
ui.dispatch(event.CommandQuit{})
}
},
)

320
main.go
View File

@ -4,23 +4,21 @@ import (
"cmp"
"context"
"errors"
"flag"
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"os/signal"
"runtime"
"runtime/debug"
"syscall"
"git.netflux.io/rob/octoplex/internal/client"
"git.netflux.io/rob/octoplex/internal/app"
"git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/server"
dockerclient "github.com/docker/docker/client"
"github.com/urfave/cli/v2"
"golang.design/x/clipboard"
"golang.org/x/sync/errgroup"
)
var (
@ -32,108 +30,23 @@ var (
date string
)
// errInterrupt is an error type that indicates an interrupt signal was
// received.
type errInterrupt struct{}
// Error implements the error interface.
func (e errInterrupt) Error() string {
return "interrupt signal received"
}
// ExitCode implements the ExitCoder interface.
func (e errInterrupt) ExitCode() int {
return 130
}
var errShutdown = errors.New("shutdown")
func main() {
app := &cli.App{
Name: "Octoplex",
Usage: "Octoplex is a live video restreamer for Docker.",
Commands: []*cli.Command{
{
Name: "client",
Usage: "Run the client",
Action: func(c *cli.Context) error {
return runClient(c.Context, c)
},
},
{
Name: "server",
Usage: "Run the server",
Action: func(c *cli.Context) error {
return runServer(c.Context, c, serverConfig{
stderrAvailable: true,
handleSigInt: true,
waitForClient: false,
})
},
},
{
Name: "run",
Usage: "Run server and client together (testing)",
Action: func(c *cli.Context) error {
return runClientAndServer(c)
},
},
},
var exitStatus int
if err := run(); errors.Is(err, errShutdown) {
exitStatus = 130
} else if err != nil {
exitStatus = 1
_, _ = os.Stderr.WriteString("Error: " + err.Error() + "\n")
}
if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
os.Exit(exitStatus)
}
func runClient(ctx context.Context, _ *cli.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// TODO: logger from config
fptr, err := os.OpenFile("octoplex.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return fmt.Errorf("open log file: %w", err)
}
logger := slog.New(slog.NewTextHandler(fptr, nil))
logger.Info("Starting client", "version", cmp.Or(version, "devel"), "commit", cmp.Or(commit, "unknown"), "date", cmp.Or(date, "unknown"), "go_version", runtime.Version())
var clipboardAvailable bool
if err = clipboard.Init(); err != nil {
logger.Warn("Clipboard not available", "err", err)
} else {
clipboardAvailable = true
}
buildInfo, ok := debug.ReadBuildInfo()
if !ok {
return fmt.Errorf("read build info: %w", err)
}
app := client.New(client.NewParams{
ClipboardAvailable: clipboardAvailable,
BuildInfo: domain.BuildInfo{
GoVersion: buildInfo.GoVersion,
Version: version,
Commit: commit,
Date: date,
},
Logger: logger,
})
if err := app.Run(ctx); err != nil {
return fmt.Errorf("run app: %w", err)
}
return nil
}
type serverConfig struct {
stderrAvailable bool
handleSigInt bool
waitForClient bool
}
func runServer(ctx context.Context, _ *cli.Context, serverCfg serverConfig) error {
ctx, cancel := context.WithCancelCause(ctx)
func run() error {
ctx, cancel := context.WithCancelCause(context.Background())
defer cancel(nil)
configService, err := config.NewDefaultService()
@ -141,35 +54,45 @@ func runServer(ctx context.Context, _ *cli.Context, serverCfg serverConfig) erro
return fmt.Errorf("build config service: %w", err)
}
help := flag.Bool("h", false, "Show help")
flag.Parse()
if *help {
printUsage()
return nil
}
if narg := flag.NArg(); narg > 1 {
printUsage()
return fmt.Errorf("too many arguments")
} else if narg == 1 {
switch flag.Arg(0) {
case "edit-config":
return editConfigFile(configService)
case "print-config":
return printConfigPath(configService.Path())
case "version":
return printVersion()
case "help":
printUsage()
return nil
}
}
cfg, err := configService.ReadOrCreateConfig()
if err != nil {
return fmt.Errorf("read or create config: %w", err)
}
// TODO: improve logger API
// Currently it's a bit complicated because we can only use stdout - the
// preferred destination - if the client is not running. Otherwise we
// fallback to the legacy configuration but this should be bought more
// in-line with the client/server split.
var w io.Writer
if serverCfg.stderrAvailable {
w = os.Stdout
} else if !cfg.LogFile.Enabled {
w = io.Discard
} else {
w, err = os.OpenFile(cfg.LogFile.GetPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return fmt.Errorf("error opening log file: %w", err)
}
headless := os.Getenv("OCTO_HEADLESS") != ""
logger, err := buildLogger(cfg.LogFile, headless)
if err != nil {
return fmt.Errorf("build logger: %w", err)
}
var handlerOpts slog.HandlerOptions
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
logger := slog.New(slog.NewTextHandler(w, &handlerOpts))
if serverCfg.handleSigInt {
if headless {
// When running in headless mode tview doesn't handle SIGINT for us.
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
@ -177,10 +100,17 @@ func runServer(ctx context.Context, _ *cli.Context, serverCfg serverConfig) erro
<-ch
logger.Info("Received interrupt signal, exiting")
signal.Stop(ch)
cancel(errInterrupt{})
cancel(errShutdown)
}()
}
var clipboardAvailable bool
if err = clipboard.Init(); err != nil {
logger.Warn("Clipboard not available", "err", err)
} else {
clipboardAvailable = true
}
dockerClient, err := dockerclient.NewClientWithOpts(
dockerclient.FromEnv,
dockerclient.WithAPIVersionNegotiation(),
@ -189,64 +119,102 @@ func runServer(ctx context.Context, _ *cli.Context, serverCfg serverConfig) erro
return fmt.Errorf("new docker client: %w", err)
}
app := server.New(server.Params{
ConfigService: configService,
DockerClient: dockerClient,
ConfigFilePath: configService.Path(),
WaitForClient: serverCfg.waitForClient,
Logger: logger,
buildInfo, ok := debug.ReadBuildInfo()
if !ok {
return fmt.Errorf("read build info: %w", err)
}
app := app.New(app.Params{
ConfigService: configService,
DockerClient: dockerClient,
Headless: headless,
ClipboardAvailable: clipboardAvailable,
ConfigFilePath: configService.Path(),
BuildInfo: domain.BuildInfo{
GoVersion: buildInfo.GoVersion,
Version: version,
Commit: commit,
Date: date,
},
Logger: logger,
})
logger.Info(
"Starting server",
"version",
cmp.Or(version, "devel"),
"commit",
cmp.Or(commit, "unknown"),
"date",
cmp.Or(date, "unknown"),
"go_version",
runtime.Version(),
)
return app.Run(ctx)
}
if err := app.Run(ctx); err != nil {
if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ctx), errInterrupt{}) {
return context.Cause(ctx)
}
return err
// editConfigFile opens the config file in the user's editor.
func editConfigFile(configService *config.Service) error {
if _, err := configService.ReadOrCreateConfig(); err != nil {
return fmt.Errorf("read or create config: %w", err)
}
editor := os.Getenv("EDITOR")
if editor == "" {
editor = "vi"
}
binary, err := exec.LookPath(editor)
if err != nil {
return fmt.Errorf("look path: %w", err)
}
fmt.Fprintf(os.Stderr, "Editing config file: %s\n", configService.Path())
fmt.Println(binary)
if err := syscall.Exec(binary, []string{"--", configService.Path()}, os.Environ()); err != nil {
return fmt.Errorf("exec: %w", err)
}
return nil
}
func runClientAndServer(c *cli.Context) error {
errNoErr := errors.New("no error")
g, ctx := errgroup.WithContext(c.Context)
g.Go(func() error {
if err := runClient(ctx, c); err != nil {
return err
}
return errNoErr
})
g.Go(func() error {
if err := runServer(ctx, c, serverConfig{
stderrAvailable: false,
handleSigInt: false,
waitForClient: true,
}); err != nil {
return err
}
return errNoErr
})
if err := g.Wait(); err == errNoErr {
return nil
} else {
return err
}
// printConfigPath prints the path to the config file to stderr.
func printConfigPath(configPath string) error {
fmt.Fprintln(os.Stderr, configPath)
return nil
}
// printVersion prints the version of the application to stderr.
func printVersion() error {
fmt.Fprintf(os.Stderr, "%s version %s\n", domain.AppName, cmp.Or(version, "0.0.0-dev"))
return nil
}
func printUsage() {
os.Stderr.WriteString("Usage: octoplex [command]\n\n")
os.Stderr.WriteString("Commands:\n\n")
os.Stderr.WriteString(" edit-config Edit the config file\n")
os.Stderr.WriteString(" print-config Print the path to the config file\n")
os.Stderr.WriteString(" version Print the version of the application\n")
os.Stderr.WriteString(" help Print this help message\n")
os.Stderr.WriteString("\n")
os.Stderr.WriteString("Additionally, Octoplex can be configured with the following environment variables:\n\n")
os.Stderr.WriteString(" OCTO_DEBUG Enables debug logging if set\n")
os.Stderr.WriteString(" OCTO_HEADLESS Enables headless mode if set (experimental)\n\n")
}
// buildLogger builds the logger, which may be a no-op logger.
func buildLogger(cfg config.LogFile, headless bool) (*slog.Logger, error) {
build := func(w io.Writer) *slog.Logger {
var handlerOpts slog.HandlerOptions
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
return slog.New(slog.NewTextHandler(w, &handlerOpts))
}
// In headless mode, always log to stderr.
if headless {
return build(os.Stderr), nil
}
if !cfg.Enabled {
return slog.New(slog.DiscardHandler), nil
}
fptr, err := os.OpenFile(cfg.GetPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return nil, fmt.Errorf("error opening log file: %w", err)
}
return build(fptr), nil
}

View File

@ -40,9 +40,3 @@ description = "Generate mocks"
dir = "{{cwd}}"
run = "go tool mockery"
alias = "m"
[tasks.generate_proto]
description = "Generate gRPC files from proto"
dir = "{{cwd}}"
run = "protoc -I proto --go_out=paths=source_relative:internal/generated/grpc --go-grpc_out=paths=source_relative:internal/generated/grpc proto/api.proto"
alias = "p"

View File

@ -1,166 +0,0 @@
syntax = "proto3";
package api;
import "google/protobuf/timestamp.proto";
option go_package = "git.netflux.io/rob/octoplex/internal/generated/grpc";
service InternalAPI {
rpc Communicate(stream Envelope) returns (stream Envelope);
}
message Envelope {
oneof payload {
Command command = 1;
Event event = 2;
}
}
message Command {
oneof command_type {
AddDestinationCommand add_destination = 1;
RemoveDestinationCommand remove_destination = 2;
StartDestinationCommand start_destination = 3;
StopDestinationCommand stop_destination = 4;
CloseOtherInstancesCommand close_other_instances = 5;
QuitCommand quit = 6;
StartHandshakeCommand start_handshake = 7;
}
}
message AddDestinationCommand {
string name = 1;
string url = 2;
}
message RemoveDestinationCommand {
string url= 1;
}
message StartDestinationCommand {
string url = 1;
}
message StopDestinationCommand {
string url = 1;
}
message CloseOtherInstancesCommand {}
message QuitCommand {}
message StartHandshakeCommand {};
message Event {
oneof event_type {
AppStateChangedEvent app_state_changed = 1;
DestinationStreamExitedEvent destination_stream_exited = 2;
DestinationAddedEvent destination_added = 3;
AddDestinationFailedEvent add_destination_failed = 4;
DestinationRemovedEvent destination_removed = 5;
RemoveDestinationFailedEvent remove_destination_failed = 6;
StartDestinationFailedEvent start_destination_failed = 7;
MediaServerStartedEvent media_server_started = 8;
OtherInstanceDetectedEvent other_instance_detected = 9;
FatalErrorEvent fatal_error = 10;
HandshakeCompletedEvent handshake_completed = 11;
}
}
message Container {
string id = 1;
string status = 2;
string health_state = 3;
double cpu_percent = 4;
uint64 memory_usage_bytes = 5;
int32 rx_rate = 6;
int32 tx_rate = 7;
google.protobuf.Timestamp rx_since = 8;
string image_name = 9;
string pull_status = 10;
string pull_progress = 11;
int32 pull_percent = 12;
int32 restart_count = 13;
optional int32 exit_code = 14;
string err = 15;
}
message Source {
Container container = 1;
bool live = 2;
google.protobuf.Timestamp live_changed_at = 3;
repeated string tracks = 4;
string exit_reason = 5;
}
message Destination {
enum Status {
STATUS_OFF_AIR = 0;
STATUS_STARTING = 1;
STATUS_LIVE = 2;
}
Container container = 1;
Status status = 2;
string name = 3;
string url = 4;
}
message BuildInfo {
string go_version = 1;
string version = 2;
string commit = 3;
string date = 4;
}
message AppState {
Source source = 1;
repeated Destination destinations = 2;
BuildInfo build_info = 3;
}
message AppStateChangedEvent {
AppState app_state = 1;
}
message DestinationStreamExitedEvent {
string name = 1;
string error = 2;
}
message DestinationAddedEvent {
string url = 1;
}
message AddDestinationFailedEvent {
string url = 1;
string error = 2;
}
message DestinationRemovedEvent {
string url = 1;
}
message RemoveDestinationFailedEvent {
string url = 1;
string error = 2;
}
message StartDestinationFailedEvent {
string url = 1;
string message = 2;
}
message MediaServerStartedEvent {
string rtmp_url = 1;
string rtmps_url = 2;
}
message OtherInstanceDetectedEvent {}
message FatalErrorEvent {
string message = 1;
}
message HandshakeCompletedEvent {}