Compare commits
No commits in common. "refactor/api" and "main" have entirely different histories.
refactor/a
...
main
10
go.mod
10
go.mod
@ -11,11 +11,7 @@ require (
|
||||
github.com/rivo/tview v0.0.0-20250330220935-949945f8d922
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/testcontainers/testcontainers-go v0.35.0
|
||||
github.com/urfave/cli/v2 v2.27.6
|
||||
golang.design/x/clipboard v0.7.0
|
||||
golang.org/x/sync v0.13.0
|
||||
google.golang.org/grpc v1.69.4
|
||||
google.golang.org/protobuf v1.36.3
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
@ -28,7 +24,6 @@ require (
|
||||
github.com/containerd/log v0.1.0 // indirect
|
||||
github.com/containerd/platforms v0.2.1 // indirect
|
||||
github.com/cpuguy83/dockercfg v0.3.2 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/distribution/reference v0.6.0 // indirect
|
||||
github.com/docker/go-units v0.5.0 // indirect
|
||||
@ -70,7 +65,6 @@ require (
|
||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
|
||||
github.com/rivo/uniseg v0.4.7 // indirect
|
||||
github.com/rs/zerolog v1.33.0 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
github.com/sagikazarmark/locafero v0.7.0 // indirect
|
||||
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
|
||||
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
|
||||
@ -87,7 +81,6 @@ require (
|
||||
github.com/tklauser/go-sysconf v0.3.12 // indirect
|
||||
github.com/tklauser/numcpus v0.6.1 // indirect
|
||||
github.com/vektra/mockery/v2 v2.52.2 // indirect
|
||||
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.3 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
|
||||
@ -102,13 +95,12 @@ require (
|
||||
golang.org/x/image v0.26.0 // indirect
|
||||
golang.org/x/mobile v0.0.0-20250408133729-978277e7eaf7 // indirect
|
||||
golang.org/x/mod v0.24.0 // indirect
|
||||
golang.org/x/net v0.39.0 // indirect
|
||||
golang.org/x/sync v0.13.0 // indirect
|
||||
golang.org/x/sys v0.32.0 // indirect
|
||||
golang.org/x/term v0.31.0 // indirect
|
||||
golang.org/x/text v0.24.0 // indirect
|
||||
golang.org/x/time v0.9.0 // indirect
|
||||
golang.org/x/tools v0.32.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
)
|
||||
|
||||
|
9
go.sum
9
go.sum
@ -18,8 +18,6 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
|
||||
github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA=
|
||||
github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
|
||||
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
@ -54,8 +52,6 @@ github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiU
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
@ -144,7 +140,6 @@ github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWN
|
||||
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
|
||||
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
|
||||
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo=
|
||||
github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k=
|
||||
@ -190,12 +185,8 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
|
||||
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
|
||||
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
|
||||
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
|
||||
github.com/urfave/cli/v2 v2.27.6 h1:VdRdS98FNhKZ8/Az8B7MTyGQmpIr36O1EHybx/LaZ4g=
|
||||
github.com/urfave/cli/v2 v2.27.6/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
|
||||
github.com/vektra/mockery/v2 v2.52.2 h1:8QfPKUIrq8P3Cs7G79Iu4Byd5wdhGCE0quIS27x7rQo=
|
||||
github.com/vektra/mockery/v2 v2.52.2/go.mod h1:zGDY/f6bip0Yh13GQ5j7xa43fuEoYBa4ICHEaihisHw=
|
||||
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
|
||||
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
|
@ -1,13 +1,11 @@
|
||||
package server
|
||||
package app
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"log/slog"
|
||||
"net"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
@ -15,11 +13,10 @@ import (
|
||||
"git.netflux.io/rob/octoplex/internal/container"
|
||||
"git.netflux.io/rob/octoplex/internal/domain"
|
||||
"git.netflux.io/rob/octoplex/internal/event"
|
||||
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
|
||||
"git.netflux.io/rob/octoplex/internal/mediaserver"
|
||||
"git.netflux.io/rob/octoplex/internal/replicator"
|
||||
"git.netflux.io/rob/octoplex/internal/terminal"
|
||||
"github.com/docker/docker/client"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// App is an instance of the app.
|
||||
@ -29,7 +26,11 @@ type App struct {
|
||||
eventBus *event.Bus
|
||||
dispatchC chan event.Command
|
||||
dockerClient container.DockerClient
|
||||
waitForClient bool
|
||||
screen *terminal.Screen // Screen may be nil.
|
||||
headless bool
|
||||
clipboardAvailable bool
|
||||
configFilePath string
|
||||
buildInfo domain.BuildInfo
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
@ -38,8 +39,11 @@ type Params struct {
|
||||
ConfigService *config.Service
|
||||
DockerClient container.DockerClient
|
||||
ChanSize int
|
||||
Screen *terminal.Screen // Screen may be nil.
|
||||
Headless bool
|
||||
ClipboardAvailable bool
|
||||
ConfigFilePath string
|
||||
WaitForClient bool
|
||||
BuildInfo domain.BuildInfo
|
||||
Logger *slog.Logger
|
||||
}
|
||||
|
||||
@ -54,7 +58,11 @@ func New(params Params) *App {
|
||||
eventBus: event.NewBus(params.Logger.With("component", "event_bus")),
|
||||
dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)),
|
||||
dockerClient: params.DockerClient,
|
||||
waitForClient: params.WaitForClient,
|
||||
screen: params.Screen,
|
||||
headless: params.Headless,
|
||||
clipboardAvailable: params.ClipboardAvailable,
|
||||
configFilePath: params.ConfigFilePath,
|
||||
buildInfo: params.BuildInfo,
|
||||
logger: params.Logger,
|
||||
}
|
||||
}
|
||||
@ -70,26 +78,20 @@ func (a *App) Run(ctx context.Context) error {
|
||||
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
|
||||
}
|
||||
|
||||
const grpcAddr = ":50051"
|
||||
lis, err := net.Listen("tcp", grpcAddr)
|
||||
if !a.headless {
|
||||
ui, err := terminal.StartUI(ctx, terminal.StartParams{
|
||||
EventBus: a.eventBus,
|
||||
Dispatcher: func(cmd event.Command) { a.dispatchC <- cmd },
|
||||
Screen: a.screen,
|
||||
ClipboardAvailable: a.clipboardAvailable,
|
||||
ConfigFilePath: a.configFilePath,
|
||||
BuildInfo: a.buildInfo,
|
||||
Logger: a.logger.With("component", "ui"),
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
defer lis.Close()
|
||||
|
||||
grpcServer := grpc.NewServer()
|
||||
grpcDone := make(chan error, 1)
|
||||
internalAPI := newServer(a.DispatchAsync, a.eventBus, a.logger)
|
||||
pb.RegisterInternalAPIServer(grpcServer, internalAPI)
|
||||
go func() {
|
||||
a.logger.Info("gRPC server started", "addr", grpcAddr)
|
||||
grpcDone <- grpcServer.Serve(lis)
|
||||
}()
|
||||
|
||||
if a.waitForClient {
|
||||
if err = internalAPI.WaitForClient(ctx); err != nil {
|
||||
return fmt.Errorf("wait for client: %w", err)
|
||||
return fmt.Errorf("start terminal user interface: %w", err)
|
||||
}
|
||||
defer ui.Close()
|
||||
}
|
||||
|
||||
// emptyUI is a dummy function that sets the UI state to an empty state, and
|
||||
@ -105,13 +107,12 @@ func (a *App) Run(ctx context.Context) error {
|
||||
a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}})
|
||||
}
|
||||
|
||||
// doFatalError publishes a fatal error to the event bus. It will block until
|
||||
// the user acknowledges it if there is 1 or more clients connected to the
|
||||
// internal API.
|
||||
// doFatalError publishes a fatal error to the event bus, waiting for the
|
||||
// user to acknowledge it if not in headless mode.
|
||||
doFatalError := func(msg string) {
|
||||
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
|
||||
|
||||
if internalAPI.GetClientCount() == 0 {
|
||||
if a.headless {
|
||||
return
|
||||
}
|
||||
|
||||
@ -176,20 +177,21 @@ func (a *App) Run(ctx context.Context) error {
|
||||
defer uiUpdateT.Stop()
|
||||
|
||||
startMediaServerC := make(chan struct{}, 1)
|
||||
if a.headless { // disable startup check in headless mode for now
|
||||
startMediaServerC <- struct{}{}
|
||||
} else {
|
||||
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
|
||||
doFatalError(startupErr.Error())
|
||||
return startupErr
|
||||
} else if ok {
|
||||
startMediaServerC <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case grpcErr := <-grpcDone:
|
||||
a.logger.Error("gRPC server exited", "err", grpcErr)
|
||||
return grpcErr
|
||||
case <-startMediaServerC:
|
||||
if err = srv.Start(ctx); err != nil {
|
||||
return fmt.Errorf("start mediaserver: %w", err)
|
||||
@ -243,11 +245,6 @@ func (a *App) Dispatch(cmd event.Command) event.Event {
|
||||
return <-ch
|
||||
}
|
||||
|
||||
// DispatchAsync dispatches a command to be executed synchronously.
|
||||
func (a *App) DispatchAsync(cmd event.Command) {
|
||||
a.dispatchC <- cmd
|
||||
}
|
||||
|
||||
// errExit is an error that indicates the app should exit.
|
||||
var errExit = errors.New("exit")
|
||||
|
||||
@ -275,10 +272,6 @@ func (a *App) handleCommand(
|
||||
}
|
||||
}()
|
||||
|
||||
if c, ok := cmd.(syncCommand); ok {
|
||||
cmd = c.Command
|
||||
}
|
||||
|
||||
switch c := cmd.(type) {
|
||||
case event.CommandAddDestination:
|
||||
newCfg := a.cfg
|
||||
@ -288,11 +281,10 @@ func (a *App) handleCommand(
|
||||
})
|
||||
if err := a.configService.SetConfig(newCfg); err != nil {
|
||||
a.logger.Error("Add destination failed", "err", err)
|
||||
return event.AddDestinationFailedEvent{URL: c.URL, Err: err}, nil
|
||||
return event.AddDestinationFailedEvent{Err: err}, nil
|
||||
}
|
||||
a.cfg = newCfg
|
||||
a.handleConfigUpdate(state)
|
||||
a.logger.Info("Destination added", "url", c.URL)
|
||||
a.eventBus.Send(event.DestinationAddedEvent{URL: c.URL})
|
||||
case event.CommandRemoveDestination:
|
||||
repl.StopDestination(c.URL) // no-op if not live
|
||||
@ -302,7 +294,7 @@ func (a *App) handleCommand(
|
||||
})
|
||||
if err := a.configService.SetConfig(newCfg); err != nil {
|
||||
a.logger.Error("Remove destination failed", "err", err)
|
||||
a.eventBus.Send(event.RemoveDestinationFailedEvent{URL: c.URL, Err: err})
|
||||
a.eventBus.Send(event.RemoveDestinationFailedEvent{Err: err})
|
||||
break
|
||||
}
|
||||
a.cfg = newCfg
|
||||
@ -310,7 +302,7 @@ func (a *App) handleCommand(
|
||||
a.eventBus.Send(event.DestinationRemovedEvent{URL: c.URL}) //nolint:gosimple
|
||||
case event.CommandStartDestination:
|
||||
if !state.Source.Live {
|
||||
a.eventBus.Send(event.StartDestinationFailedEvent{URL: c.URL, Message: "source not live"})
|
||||
a.eventBus.Send(event.StartDestinationFailedEvent{})
|
||||
break
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
package server
|
||||
package app
|
||||
|
||||
import (
|
||||
"testing"
|
@ -1,11 +1,9 @@
|
||||
//go:build integration
|
||||
|
||||
package client_test
|
||||
package app_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
@ -16,79 +14,39 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.netflux.io/rob/octoplex/internal/client"
|
||||
"git.netflux.io/rob/octoplex/internal/app"
|
||||
"git.netflux.io/rob/octoplex/internal/config"
|
||||
"git.netflux.io/rob/octoplex/internal/container"
|
||||
"git.netflux.io/rob/octoplex/internal/domain"
|
||||
"git.netflux.io/rob/octoplex/internal/server"
|
||||
"git.netflux.io/rob/octoplex/internal/terminal"
|
||||
"github.com/gdamore/tcell/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/testcontainers/testcontainers-go"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func buildClientServer(
|
||||
func buildAppParams(
|
||||
t *testing.T,
|
||||
configService *config.Service,
|
||||
dockerClient container.DockerClient,
|
||||
screen tcell.SimulationScreen,
|
||||
screenCaptureC chan<- terminal.ScreenCapture,
|
||||
logger *slog.Logger,
|
||||
) (*client.App, *server.App) {
|
||||
client := client.New(client.NewParams{
|
||||
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
|
||||
) app.Params {
|
||||
t.Helper()
|
||||
|
||||
return app.Params{
|
||||
ConfigService: configService,
|
||||
DockerClient: dockerClient,
|
||||
Screen: &terminal.Screen{
|
||||
Screen: screen,
|
||||
Width: 180,
|
||||
Height: 25,
|
||||
CaptureC: screenCaptureC,
|
||||
},
|
||||
ClipboardAvailable: false,
|
||||
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
|
||||
Logger: logger,
|
||||
})
|
||||
|
||||
server := server.New(server.Params{
|
||||
ConfigService: configService,
|
||||
DockerClient: dockerClient,
|
||||
WaitForClient: true,
|
||||
Logger: logger,
|
||||
})
|
||||
|
||||
return client, server
|
||||
}
|
||||
|
||||
type clientServerResult struct {
|
||||
errClient error
|
||||
errServer error
|
||||
}
|
||||
|
||||
func runClientServer(
|
||||
ctx context.Context,
|
||||
_ *testing.T,
|
||||
clientApp *client.App,
|
||||
serverApp *server.App,
|
||||
) <-chan clientServerResult {
|
||||
ch := make(chan clientServerResult, 1)
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
var clientErr, srvErr error
|
||||
|
||||
g.Go(func() error {
|
||||
srvErr = serverApp.Run(ctx)
|
||||
return errors.New("server closed")
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
clientErr = clientApp.Run(ctx)
|
||||
return errors.New("client closed")
|
||||
})
|
||||
|
||||
go func() {
|
||||
_ = g.Wait()
|
||||
|
||||
ch <- clientServerResult{errClient: clientErr, errServer: srvErr}
|
||||
}()
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) {
|
@ -1,6 +1,6 @@
|
||||
//go:build integration
|
||||
|
||||
package client_test
|
||||
package app_test
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
@ -15,10 +15,12 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.netflux.io/rob/octoplex/internal/app"
|
||||
"git.netflux.io/rob/octoplex/internal/config"
|
||||
"git.netflux.io/rob/octoplex/internal/container"
|
||||
"git.netflux.io/rob/octoplex/internal/container/mocks"
|
||||
"git.netflux.io/rob/octoplex/internal/domain"
|
||||
"git.netflux.io/rob/octoplex/internal/terminal"
|
||||
"git.netflux.io/rob/octoplex/internal/testhelpers"
|
||||
"github.com/docker/docker/api/types/network"
|
||||
dockerclient "github.com/docker/docker/client"
|
||||
@ -124,8 +126,26 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
|
||||
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
|
||||
})
|
||||
|
||||
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
|
||||
ch := runClientServer(ctx, t, client, server)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.Equal(t, context.Canceled, app.New(app.Params{
|
||||
ConfigService: configService,
|
||||
DockerClient: dockerClient,
|
||||
Screen: &terminal.Screen{
|
||||
Screen: screen,
|
||||
Width: 160,
|
||||
Height: 25,
|
||||
CaptureC: screenCaptureC,
|
||||
},
|
||||
ClipboardAvailable: false,
|
||||
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
|
||||
Logger: logger,
|
||||
}).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
@ -266,12 +286,13 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
|
||||
|
||||
printScreen(t, getContents, "After stopping the first destination")
|
||||
|
||||
// TODO:
|
||||
// - Source error
|
||||
// - Additional features (copy URL, etc.)
|
||||
|
||||
cancel()
|
||||
|
||||
result := <-ch
|
||||
// May be a gRPC error, not context.Canceled:
|
||||
assert.ErrorContains(t, result.errClient, "context canceled")
|
||||
assert.ErrorIs(t, result.errServer, context.Canceled)
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationCustomHost(t *testing.T) {
|
||||
@ -292,8 +313,14 @@ func TestIntegrationCustomHost(t *testing.T) {
|
||||
})
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
|
||||
ch := runClientServer(ctx, t, client, server)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
time.Sleep(time.Second)
|
||||
sendKey(t, screen, tcell.KeyF1, ' ')
|
||||
@ -333,10 +360,7 @@ func TestIntegrationCustomHost(t *testing.T) {
|
||||
|
||||
cancel()
|
||||
|
||||
result := <-ch
|
||||
// May be a gRPC error, not context.Canceled:
|
||||
assert.ErrorContains(t, result.errClient, "context canceled")
|
||||
assert.ErrorIs(t, result.errServer, context.Canceled)
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationCustomTLSCerts(t *testing.T) {
|
||||
@ -360,8 +384,14 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
|
||||
})
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
|
||||
ch := runClientServer(ctx, t, client, server)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
@ -396,9 +426,7 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
|
||||
|
||||
cancel()
|
||||
|
||||
result := <-ch
|
||||
assert.ErrorContains(t, result.errClient, "context canceled")
|
||||
assert.ErrorIs(t, result.errServer, context.Canceled)
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationRestartDestination(t *testing.T) {
|
||||
@ -437,8 +465,14 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
||||
}},
|
||||
})
|
||||
|
||||
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
|
||||
ch := runClientServer(ctx, t, client, server)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
@ -550,9 +584,7 @@ func TestIntegrationRestartDestination(t *testing.T) {
|
||||
|
||||
cancel()
|
||||
|
||||
result := <-ch
|
||||
assert.ErrorContains(t, result.errClient, "context canceled")
|
||||
assert.ErrorIs(t, result.errServer, context.Canceled)
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationStartDestinationFailed(t *testing.T) {
|
||||
@ -570,8 +602,14 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
||||
Destinations: []config.Destination{{Name: "Example server", URL: "rtmp://rtmp.example.com/live"}},
|
||||
})
|
||||
|
||||
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
|
||||
ch := runClientServer(ctx, t, client, server)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
@ -620,9 +658,7 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
|
||||
|
||||
cancel()
|
||||
|
||||
result := <-ch
|
||||
assert.ErrorContains(t, result.errClient, "context canceled")
|
||||
assert.ErrorIs(t, result.errServer, context.Canceled)
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationDestinationValidations(t *testing.T) {
|
||||
@ -639,8 +675,14 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
||||
Sources: config.Sources{MediaServer: config.MediaServerSource{StreamKey: "live", RTMP: config.RTMPSource{Enabled: true}}},
|
||||
})
|
||||
|
||||
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
|
||||
ch := runClientServer(ctx, t, client, server)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
@ -744,9 +786,7 @@ func TestIntegrationDestinationValidations(t *testing.T) {
|
||||
printScreen(t, getContents, "After entering a duplicate destination URL")
|
||||
cancel()
|
||||
|
||||
result := <-ch
|
||||
assert.ErrorContains(t, result.errClient, "context canceled")
|
||||
assert.ErrorIs(t, result.errServer, context.Canceled)
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationStartupCheck(t *testing.T) {
|
||||
@ -777,8 +817,14 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
|
||||
ch := runClientServer(ctx, t, client, server)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
@ -822,9 +868,7 @@ func TestIntegrationStartupCheck(t *testing.T) {
|
||||
printScreen(t, getContents, "After starting the mediaserver")
|
||||
cancel()
|
||||
|
||||
result := <-ch
|
||||
assert.ErrorContains(t, result.errClient, "context canceled")
|
||||
assert.ErrorIs(t, result.errServer, context.Canceled)
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationMediaServerError(t *testing.T) {
|
||||
@ -842,8 +886,18 @@ func TestIntegrationMediaServerError(t *testing.T) {
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
|
||||
ch := runClientServer(ctx, t, client, server)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.EqualError(
|
||||
t,
|
||||
app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx),
|
||||
"media server exited",
|
||||
)
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
@ -857,12 +911,10 @@ func TestIntegrationMediaServerError(t *testing.T) {
|
||||
)
|
||||
printScreen(t, getContents, "Ater displaying the media server error modal")
|
||||
|
||||
// Quit the app:
|
||||
// Quit the app, this should cause the done channel to receive.
|
||||
sendKey(t, screen, tcell.KeyEnter, ' ')
|
||||
|
||||
result := <-ch
|
||||
assert.ErrorContains(t, result.errClient, "context canceled")
|
||||
assert.ErrorContains(t, result.errServer, "media server exited")
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationDockerClientError(t *testing.T) {
|
||||
@ -877,8 +929,18 @@ func TestIntegrationDockerClientError(t *testing.T) {
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
client, server := buildClientServer(configService, &dockerClient, screen, screenCaptureC, logger)
|
||||
ch := runClientServer(ctx, t, client, server)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.EqualError(
|
||||
t,
|
||||
app.New(buildAppParams(t, configService, &dockerClient, screen, screenCaptureC, logger)).Run(ctx),
|
||||
"create container client: network create: boom",
|
||||
)
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
@ -892,12 +954,10 @@ func TestIntegrationDockerClientError(t *testing.T) {
|
||||
)
|
||||
printScreen(t, getContents, "Ater displaying the fatal error modal")
|
||||
|
||||
// Quit the app:
|
||||
// Quit the app, this should cause the done channel to receive.
|
||||
sendKey(t, screen, tcell.KeyEnter, ' ')
|
||||
|
||||
result := <-ch
|
||||
assert.ErrorContains(t, result.errClient, "context canceled")
|
||||
assert.EqualError(t, result.errServer, "create container client: network create: boom")
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationDockerConnectionError(t *testing.T) {
|
||||
@ -911,8 +971,16 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
|
||||
ch := runClientServer(ctx, t, client, server)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
err := app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)
|
||||
require.ErrorContains(t, err, "dial tcp: lookup docker.example.com")
|
||||
require.ErrorContains(t, err, "no such host")
|
||||
}()
|
||||
|
||||
require.EventuallyWithT(
|
||||
t,
|
||||
@ -926,13 +994,10 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
|
||||
)
|
||||
printScreen(t, getContents, "Ater displaying the fatal error modal")
|
||||
|
||||
// Quit the app:
|
||||
// Quit the app, this should cause the done channel to receive.
|
||||
sendKey(t, screen, tcell.KeyEnter, ' ')
|
||||
|
||||
result := <-ch
|
||||
assert.ErrorContains(t, result.errClient, "context canceled")
|
||||
assert.ErrorContains(t, result.errServer, "dial tcp: lookup docker.example.com")
|
||||
assert.ErrorContains(t, result.errServer, "no such host")
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestIntegrationCopyURLs(t *testing.T) {
|
||||
@ -1002,8 +1067,14 @@ func TestIntegrationCopyURLs(t *testing.T) {
|
||||
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: tc.mediaServerConfig}})
|
||||
screen, screenCaptureC, getContents := setupSimulationScreen(t)
|
||||
|
||||
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
|
||||
ch := runClientServer(ctx, t, client, server)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer func() {
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
|
||||
}()
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
printScreen(t, getContents, "Ater loading the app")
|
||||
@ -1024,9 +1095,7 @@ func TestIntegrationCopyURLs(t *testing.T) {
|
||||
|
||||
cancel()
|
||||
|
||||
result := <-ch
|
||||
assert.ErrorContains(t, result.errClient, "context canceled")
|
||||
assert.ErrorIs(t, result.errServer, context.Canceled)
|
||||
<-done
|
||||
})
|
||||
}
|
||||
}
|
@ -1,131 +0,0 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
"git.netflux.io/rob/octoplex/internal/domain"
|
||||
"git.netflux.io/rob/octoplex/internal/event"
|
||||
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
|
||||
"git.netflux.io/rob/octoplex/internal/protocol"
|
||||
"git.netflux.io/rob/octoplex/internal/terminal"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
// App is the client application.
|
||||
type App struct {
|
||||
bus *event.Bus
|
||||
clipboardAvailable bool
|
||||
buildInfo domain.BuildInfo
|
||||
screen *terminal.Screen
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewParams contains the parameters for the App.
|
||||
type NewParams struct {
|
||||
ClipboardAvailable bool
|
||||
BuildInfo domain.BuildInfo
|
||||
Screen *terminal.Screen
|
||||
Logger *slog.Logger
|
||||
}
|
||||
|
||||
// New creates a new App instance.
|
||||
func New(params NewParams) *App {
|
||||
return &App{
|
||||
bus: event.NewBus(params.Logger),
|
||||
clipboardAvailable: params.ClipboardAvailable,
|
||||
buildInfo: params.BuildInfo,
|
||||
screen: params.Screen,
|
||||
logger: params.Logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts the application, and blocks until it is closed.
|
||||
//
|
||||
// It returns nil if the application was closed by the user, or an error if it
|
||||
// closed for any other reason.
|
||||
func (a *App) Run(ctx context.Context) error {
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
conn, err := grpc.NewClient("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
return fmt.Errorf("connect to gRPC server: %w", err)
|
||||
}
|
||||
apiClient := pb.NewInternalAPIClient(conn)
|
||||
stream, err := apiClient.Communicate(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create gRPC stream: %w", err)
|
||||
}
|
||||
|
||||
ui, err := terminal.NewUI(ctx, terminal.Params{
|
||||
EventBus: a.bus,
|
||||
Dispatcher: func(cmd event.Command) {
|
||||
a.logger.Info("Command dispatched", "cmd", cmd.Name())
|
||||
if sendErr := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: protocol.CommandToProto(cmd)}}); sendErr != nil {
|
||||
a.logger.Error("Error sending command to gRPC API", "err", sendErr)
|
||||
}
|
||||
},
|
||||
ClipboardAvailable: a.clipboardAvailable,
|
||||
BuildInfo: a.buildInfo,
|
||||
Screen: a.screen,
|
||||
Logger: a.logger.With("component", "ui"),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("start terminal user interface: %w", err)
|
||||
}
|
||||
defer ui.Close()
|
||||
|
||||
g.Go(func() error { return ui.Run(ctx) })
|
||||
|
||||
// After the UI is available, perform a handshake with the server.
|
||||
// Ordering is important here. We want to ensure that the UI is ready to
|
||||
// react to events received from the server. Performing the handshake ensures
|
||||
// the client has received at least one event.
|
||||
if err := a.doHandshake(stream); err != nil {
|
||||
return fmt.Errorf("do handshake: %w", err)
|
||||
}
|
||||
|
||||
g.Go(func() error {
|
||||
for {
|
||||
envelope, recErr := stream.Recv()
|
||||
if recErr != nil {
|
||||
return fmt.Errorf("receive envelope: %w", recErr)
|
||||
}
|
||||
|
||||
pbEvt := envelope.GetEvent()
|
||||
if pbEvt == nil {
|
||||
a.logger.Error("Received envelope without event")
|
||||
continue
|
||||
}
|
||||
|
||||
evt := protocol.EventFromProto(pbEvt)
|
||||
a.logger.Debug("Received event from gRPC stream", "event", evt.EventName(), "payload", evt)
|
||||
a.bus.Send(evt)
|
||||
}
|
||||
})
|
||||
|
||||
if err := g.Wait(); err == terminal.ErrUserClosed {
|
||||
return nil
|
||||
} else {
|
||||
return fmt.Errorf("errgroup.Wait: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) doHandshake(stream pb.InternalAPI_CommunicateClient) error {
|
||||
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: &pb.Command{CommandType: &pb.Command_StartHandshake{}}}}); err != nil {
|
||||
return fmt.Errorf("send start handshake command: %w", err)
|
||||
}
|
||||
|
||||
env, err := stream.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("receive handshake completed event: %w", err)
|
||||
}
|
||||
if evt := env.GetEvent(); evt == nil || evt.GetHandshakeCompleted() == nil {
|
||||
return fmt.Errorf("expected handshake completed event but got: %T", env)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -2,7 +2,6 @@ package event
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"slices"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@ -32,21 +31,6 @@ func (b *Bus) Register() <-chan Event {
|
||||
return ch
|
||||
}
|
||||
|
||||
// Deregister deregisters a consumer for all events.
|
||||
func (b *Bus) Deregister(ch <-chan Event) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
b.consumers = slices.DeleteFunc(b.consumers, func(other chan Event) bool {
|
||||
if ch == other {
|
||||
close(other)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
// Send sends an event to all registered consumers.
|
||||
func (b *Bus) Send(evt Event) {
|
||||
// The mutex is needed to ensure the backing array of b.consumers cannot be
|
||||
@ -59,7 +43,7 @@ func (b *Bus) Send(evt Event) {
|
||||
select {
|
||||
case ch <- evt:
|
||||
default:
|
||||
b.logger.Warn("Event dropped", "name", evt.EventName())
|
||||
b.logger.Warn("Event dropped", "name", evt.name())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"git.netflux.io/rob/octoplex/internal/event"
|
||||
"git.netflux.io/rob/octoplex/internal/testhelpers"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBus(t *testing.T) {
|
||||
@ -26,19 +25,5 @@ func TestBus(t *testing.T) {
|
||||
}()
|
||||
|
||||
assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent))
|
||||
assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent))
|
||||
|
||||
assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent))
|
||||
assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent))
|
||||
|
||||
bus.Deregister(ch1)
|
||||
|
||||
_, ok := <-ch1
|
||||
assert.False(t, ok)
|
||||
|
||||
select {
|
||||
case <-ch2:
|
||||
require.Fail(t, "ch2 should be blocking")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
@ -50,8 +50,6 @@ func (c CommandCloseOtherInstance) Name() string {
|
||||
}
|
||||
|
||||
// CommandQuit quits the app.
|
||||
//
|
||||
// TODO: consider how this should look in a client/server architecture.
|
||||
type CommandQuit struct{}
|
||||
|
||||
// Name implements the Command interface.
|
||||
|
@ -19,7 +19,7 @@ const (
|
||||
|
||||
// Event represents something which happened in the appllication.
|
||||
type Event interface {
|
||||
EventName() Name
|
||||
name() Name
|
||||
}
|
||||
|
||||
// AppStateChangedEvent is emitted when the application state changes.
|
||||
@ -27,7 +27,7 @@ type AppStateChangedEvent struct {
|
||||
State domain.AppState
|
||||
}
|
||||
|
||||
func (e AppStateChangedEvent) EventName() Name {
|
||||
func (e AppStateChangedEvent) name() Name {
|
||||
return EventNameAppStateChanged
|
||||
}
|
||||
|
||||
@ -36,17 +36,16 @@ type DestinationAddedEvent struct {
|
||||
URL string
|
||||
}
|
||||
|
||||
func (e DestinationAddedEvent) EventName() Name {
|
||||
func (e DestinationAddedEvent) name() Name {
|
||||
return EventNameDestinationAdded
|
||||
}
|
||||
|
||||
// AddDestinationFailedEvent is emitted when a destination fails to be added.
|
||||
type AddDestinationFailedEvent struct {
|
||||
URL string
|
||||
Err error
|
||||
}
|
||||
|
||||
func (e AddDestinationFailedEvent) EventName() Name {
|
||||
func (e AddDestinationFailedEvent) name() Name {
|
||||
return EventNameAddDestinationFailed
|
||||
}
|
||||
|
||||
@ -56,17 +55,14 @@ type DestinationStreamExitedEvent struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
func (e DestinationStreamExitedEvent) EventName() Name {
|
||||
func (e DestinationStreamExitedEvent) name() Name {
|
||||
return EventNameDestinationStreamExited
|
||||
}
|
||||
|
||||
// StartDestinationFailedEvent is emitted when a destination fails to start.
|
||||
type StartDestinationFailedEvent struct {
|
||||
URL string
|
||||
Message string
|
||||
}
|
||||
type StartDestinationFailedEvent struct{}
|
||||
|
||||
func (e StartDestinationFailedEvent) EventName() Name {
|
||||
func (e StartDestinationFailedEvent) name() Name {
|
||||
return EventNameStartDestinationFailed
|
||||
}
|
||||
|
||||
@ -76,18 +72,17 @@ type DestinationRemovedEvent struct {
|
||||
URL string
|
||||
}
|
||||
|
||||
func (e DestinationRemovedEvent) EventName() Name {
|
||||
func (e DestinationRemovedEvent) name() Name {
|
||||
return EventNameDestinationRemoved
|
||||
}
|
||||
|
||||
// RemoveDestinationFailedEvent is emitted when a destination fails to be
|
||||
// removed.
|
||||
type RemoveDestinationFailedEvent struct {
|
||||
URL string
|
||||
Err error
|
||||
}
|
||||
|
||||
func (e RemoveDestinationFailedEvent) EventName() Name {
|
||||
func (e RemoveDestinationFailedEvent) name() Name {
|
||||
return EventNameRemoveDestinationFailed
|
||||
}
|
||||
|
||||
@ -100,11 +95,11 @@ type FatalErrorOccurredEvent struct {
|
||||
// OtherInstanceDetectedEvent is emitted when the app launches and detects another instance.
|
||||
type OtherInstanceDetectedEvent struct{}
|
||||
|
||||
func (e OtherInstanceDetectedEvent) EventName() Name {
|
||||
func (e OtherInstanceDetectedEvent) name() Name {
|
||||
return EventNameOtherInstanceDetected
|
||||
}
|
||||
|
||||
func (e FatalErrorOccurredEvent) EventName() Name {
|
||||
func (e FatalErrorOccurredEvent) name() Name {
|
||||
return "fatal_error_occurred"
|
||||
}
|
||||
|
||||
@ -114,6 +109,6 @@ type MediaServerStartedEvent struct {
|
||||
RTMPSURL string
|
||||
}
|
||||
|
||||
func (e MediaServerStartedEvent) EventName() Name {
|
||||
func (e MediaServerStartedEvent) name() Name {
|
||||
return "media_server_started"
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,137 +0,0 @@
|
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.2.0
|
||||
// - protoc v6.30.1
|
||||
// source: api.proto
|
||||
|
||||
package grpc
|
||||
|
||||
import (
|
||||
context "context"
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.32.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion7
|
||||
|
||||
// InternalAPIClient is the client API for InternalAPI service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
type InternalAPIClient interface {
|
||||
Communicate(ctx context.Context, opts ...grpc.CallOption) (InternalAPI_CommunicateClient, error)
|
||||
}
|
||||
|
||||
type internalAPIClient struct {
|
||||
cc grpc.ClientConnInterface
|
||||
}
|
||||
|
||||
func NewInternalAPIClient(cc grpc.ClientConnInterface) InternalAPIClient {
|
||||
return &internalAPIClient{cc}
|
||||
}
|
||||
|
||||
func (c *internalAPIClient) Communicate(ctx context.Context, opts ...grpc.CallOption) (InternalAPI_CommunicateClient, error) {
|
||||
stream, err := c.cc.NewStream(ctx, &InternalAPI_ServiceDesc.Streams[0], "/api.InternalAPI/Communicate", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &internalAPICommunicateClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type InternalAPI_CommunicateClient interface {
|
||||
Send(*Envelope) error
|
||||
Recv() (*Envelope, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type internalAPICommunicateClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *internalAPICommunicateClient) Send(m *Envelope) error {
|
||||
return x.ClientStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *internalAPICommunicateClient) Recv() (*Envelope, error) {
|
||||
m := new(Envelope)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// InternalAPIServer is the server API for InternalAPI service.
|
||||
// All implementations must embed UnimplementedInternalAPIServer
|
||||
// for forward compatibility
|
||||
type InternalAPIServer interface {
|
||||
Communicate(InternalAPI_CommunicateServer) error
|
||||
mustEmbedUnimplementedInternalAPIServer()
|
||||
}
|
||||
|
||||
// UnimplementedInternalAPIServer must be embedded to have forward compatible implementations.
|
||||
type UnimplementedInternalAPIServer struct {
|
||||
}
|
||||
|
||||
func (UnimplementedInternalAPIServer) Communicate(InternalAPI_CommunicateServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method Communicate not implemented")
|
||||
}
|
||||
func (UnimplementedInternalAPIServer) mustEmbedUnimplementedInternalAPIServer() {}
|
||||
|
||||
// UnsafeInternalAPIServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to InternalAPIServer will
|
||||
// result in compilation errors.
|
||||
type UnsafeInternalAPIServer interface {
|
||||
mustEmbedUnimplementedInternalAPIServer()
|
||||
}
|
||||
|
||||
func RegisterInternalAPIServer(s grpc.ServiceRegistrar, srv InternalAPIServer) {
|
||||
s.RegisterService(&InternalAPI_ServiceDesc, srv)
|
||||
}
|
||||
|
||||
func _InternalAPI_Communicate_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(InternalAPIServer).Communicate(&internalAPICommunicateServer{stream})
|
||||
}
|
||||
|
||||
type InternalAPI_CommunicateServer interface {
|
||||
Send(*Envelope) error
|
||||
Recv() (*Envelope, error)
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type internalAPICommunicateServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *internalAPICommunicateServer) Send(m *Envelope) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *internalAPICommunicateServer) Recv() (*Envelope, error) {
|
||||
m := new(Envelope)
|
||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// InternalAPI_ServiceDesc is the grpc.ServiceDesc for InternalAPI service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
var InternalAPI_ServiceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "api.InternalAPI",
|
||||
HandlerType: (*InternalAPIServer)(nil),
|
||||
Methods: []grpc.MethodDesc{},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "Communicate",
|
||||
Handler: _InternalAPI_Communicate_Handler,
|
||||
ServerStreams: true,
|
||||
ClientStreams: true,
|
||||
},
|
||||
},
|
||||
Metadata: "api.proto",
|
||||
}
|
@ -1,110 +0,0 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"git.netflux.io/rob/octoplex/internal/event"
|
||||
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
|
||||
)
|
||||
|
||||
// CommandToProto converts a command to a protobuf message.
|
||||
func CommandToProto(command event.Command) *pb.Command {
|
||||
switch evt := command.(type) {
|
||||
case event.CommandAddDestination:
|
||||
return buildAddDestinationCommand(evt)
|
||||
case event.CommandRemoveDestination:
|
||||
return buildRemoveDestinationCommand(evt)
|
||||
case event.CommandStartDestination:
|
||||
return buildStartDestinationCommand(evt)
|
||||
case event.CommandStopDestination:
|
||||
return buildStopDestinationCommand(evt)
|
||||
case event.CommandCloseOtherInstance:
|
||||
return buildCloseOtherInstanceCommand(evt)
|
||||
case event.CommandQuit:
|
||||
return buildQuitCommand(evt)
|
||||
default:
|
||||
panic("unknown command type")
|
||||
}
|
||||
}
|
||||
|
||||
func buildAddDestinationCommand(cmd event.CommandAddDestination) *pb.Command {
|
||||
return &pb.Command{CommandType: &pb.Command_AddDestination{AddDestination: &pb.AddDestinationCommand{Name: cmd.DestinationName, Url: cmd.URL}}}
|
||||
}
|
||||
|
||||
func buildRemoveDestinationCommand(cmd event.CommandRemoveDestination) *pb.Command {
|
||||
return &pb.Command{CommandType: &pb.Command_RemoveDestination{RemoveDestination: &pb.RemoveDestinationCommand{Url: cmd.URL}}}
|
||||
}
|
||||
|
||||
func buildStartDestinationCommand(cmd event.CommandStartDestination) *pb.Command {
|
||||
return &pb.Command{CommandType: &pb.Command_StartDestination{StartDestination: &pb.StartDestinationCommand{Url: cmd.URL}}}
|
||||
}
|
||||
|
||||
func buildStopDestinationCommand(cmd event.CommandStopDestination) *pb.Command {
|
||||
return &pb.Command{CommandType: &pb.Command_StopDestination{StopDestination: &pb.StopDestinationCommand{Url: cmd.URL}}}
|
||||
}
|
||||
|
||||
func buildCloseOtherInstanceCommand(event.CommandCloseOtherInstance) *pb.Command {
|
||||
return &pb.Command{CommandType: &pb.Command_CloseOtherInstances{CloseOtherInstances: &pb.CloseOtherInstancesCommand{}}}
|
||||
}
|
||||
|
||||
func buildQuitCommand(event.CommandQuit) *pb.Command {
|
||||
return &pb.Command{CommandType: &pb.Command_Quit{Quit: &pb.QuitCommand{}}}
|
||||
}
|
||||
|
||||
// CommandFromProto converts a protobuf message to a command.
|
||||
func CommandFromProto(pbCmd *pb.Command) event.Command {
|
||||
if pbCmd == nil || pbCmd.CommandType == nil {
|
||||
panic("invalid or nil pb.Command")
|
||||
}
|
||||
|
||||
switch cmd := pbCmd.CommandType.(type) {
|
||||
case *pb.Command_AddDestination:
|
||||
return parseAddDestinationCommand(cmd.AddDestination)
|
||||
case *pb.Command_RemoveDestination:
|
||||
return parseRemoveDestinationCommand(cmd.RemoveDestination)
|
||||
case *pb.Command_StartDestination:
|
||||
return parseStartDestinationCommand(cmd.StartDestination)
|
||||
case *pb.Command_StopDestination:
|
||||
return parseStopDestinationCommand(cmd.StopDestination)
|
||||
case *pb.Command_CloseOtherInstances:
|
||||
return parseCloseOtherInstanceCommand(cmd.CloseOtherInstances)
|
||||
case *pb.Command_Quit:
|
||||
return parseQuitCommand(cmd.Quit)
|
||||
default:
|
||||
panic("unknown pb.Command type")
|
||||
}
|
||||
}
|
||||
|
||||
func parseAddDestinationCommand(cmd *pb.AddDestinationCommand) event.Command {
|
||||
if cmd == nil {
|
||||
panic("nil AddDestinationCommand")
|
||||
}
|
||||
return event.CommandAddDestination{DestinationName: cmd.Name, URL: cmd.Url}
|
||||
}
|
||||
|
||||
func parseRemoveDestinationCommand(cmd *pb.RemoveDestinationCommand) event.Command {
|
||||
if cmd == nil {
|
||||
panic("nil RemoveDestinationCommand")
|
||||
}
|
||||
return event.CommandRemoveDestination{URL: cmd.Url}
|
||||
}
|
||||
|
||||
func parseStartDestinationCommand(cmd *pb.StartDestinationCommand) event.Command {
|
||||
if cmd == nil {
|
||||
panic("nil StartDestinationCommand")
|
||||
}
|
||||
return event.CommandStartDestination{URL: cmd.Url}
|
||||
}
|
||||
|
||||
func parseStopDestinationCommand(cmd *pb.StopDestinationCommand) event.Command {
|
||||
if cmd == nil {
|
||||
panic("nil StopDestinationCommand")
|
||||
}
|
||||
return event.CommandStopDestination{URL: cmd.Url}
|
||||
}
|
||||
|
||||
func parseCloseOtherInstanceCommand(_ *pb.CloseOtherInstancesCommand) event.Command {
|
||||
return event.CommandCloseOtherInstance{}
|
||||
}
|
||||
|
||||
func parseQuitCommand(_ *pb.QuitCommand) event.Command {
|
||||
return event.CommandQuit{}
|
||||
}
|
@ -1,121 +0,0 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"git.netflux.io/rob/octoplex/internal/domain"
|
||||
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func containerToProto(c domain.Container) *pb.Container {
|
||||
var errString string
|
||||
if c.Err != nil {
|
||||
errString = c.Err.Error()
|
||||
}
|
||||
|
||||
var exitCode *int32
|
||||
if c.ExitCode != nil {
|
||||
code := int32(*c.ExitCode)
|
||||
exitCode = &code
|
||||
}
|
||||
|
||||
return &pb.Container{
|
||||
Id: c.ID,
|
||||
Status: c.Status,
|
||||
HealthState: c.HealthState,
|
||||
CpuPercent: c.CPUPercent,
|
||||
MemoryUsageBytes: c.MemoryUsageBytes,
|
||||
RxRate: int32(c.RxRate),
|
||||
TxRate: int32(c.TxRate),
|
||||
RxSince: timestamppb.New(c.RxSince),
|
||||
ImageName: c.ImageName,
|
||||
PullStatus: c.PullStatus,
|
||||
PullProgress: c.PullProgress,
|
||||
PullPercent: int32(c.PullPercent),
|
||||
RestartCount: int32(c.RestartCount),
|
||||
ExitCode: exitCode,
|
||||
Err: errString,
|
||||
}
|
||||
}
|
||||
func protoToContainer(pbCont *pb.Container) domain.Container {
|
||||
if pbCont == nil {
|
||||
return domain.Container{}
|
||||
}
|
||||
|
||||
var exitCode *int
|
||||
if pbCont.ExitCode != nil {
|
||||
val := int(*pbCont.ExitCode)
|
||||
exitCode = &val
|
||||
}
|
||||
|
||||
var err error
|
||||
if pbCont.Err != "" {
|
||||
err = errors.New(pbCont.Err)
|
||||
}
|
||||
|
||||
return domain.Container{
|
||||
ID: pbCont.Id,
|
||||
Status: pbCont.Status,
|
||||
HealthState: pbCont.HealthState,
|
||||
CPUPercent: pbCont.CpuPercent,
|
||||
MemoryUsageBytes: pbCont.MemoryUsageBytes,
|
||||
RxRate: int(pbCont.RxRate),
|
||||
TxRate: int(pbCont.TxRate),
|
||||
RxSince: pbCont.RxSince.AsTime(),
|
||||
ImageName: pbCont.ImageName,
|
||||
PullStatus: pbCont.PullStatus,
|
||||
PullProgress: pbCont.PullProgress,
|
||||
PullPercent: int(pbCont.PullPercent),
|
||||
RestartCount: int(pbCont.RestartCount),
|
||||
ExitCode: exitCode,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
func destinationsToProto(inDests []domain.Destination) []*pb.Destination {
|
||||
destinations := make([]*pb.Destination, 0, len(inDests))
|
||||
for _, d := range inDests {
|
||||
destinations = append(destinations, destinationToProto(d))
|
||||
}
|
||||
return destinations
|
||||
}
|
||||
|
||||
func destinationToProto(d domain.Destination) *pb.Destination {
|
||||
return &pb.Destination{
|
||||
Container: containerToProto(d.Container),
|
||||
Status: destinationStatusToProto(d.Status),
|
||||
Name: d.Name,
|
||||
Url: d.URL,
|
||||
}
|
||||
}
|
||||
|
||||
func protoToDestinations(pbDests []*pb.Destination) []domain.Destination {
|
||||
if pbDests == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
dests := make([]domain.Destination, 0, len(pbDests))
|
||||
for _, pbDest := range pbDests {
|
||||
if pbDest == nil {
|
||||
continue
|
||||
}
|
||||
dests = append(dests, domain.Destination{
|
||||
Container: protoToContainer(pbDest.Container),
|
||||
Status: domain.DestinationStatus(pbDest.Status), // direct cast, same underlying int
|
||||
Name: pbDest.Name,
|
||||
URL: pbDest.Url,
|
||||
})
|
||||
}
|
||||
return dests
|
||||
}
|
||||
func destinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status {
|
||||
switch s {
|
||||
case domain.DestinationStatusStarting:
|
||||
return pb.Destination_STATUS_STARTING
|
||||
case domain.DestinationStatusLive:
|
||||
return pb.Destination_STATUS_LIVE
|
||||
default:
|
||||
return pb.Destination_STATUS_OFF_AIR
|
||||
}
|
||||
}
|
@ -1,252 +0,0 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"git.netflux.io/rob/octoplex/internal/domain"
|
||||
"git.netflux.io/rob/octoplex/internal/event"
|
||||
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
// EventToProto converts an event to a protobuf message.
|
||||
func EventToProto(ev event.Event) *pb.Event {
|
||||
switch evt := ev.(type) {
|
||||
case event.AppStateChangedEvent:
|
||||
return buildAppStateChangeEvent(evt)
|
||||
case event.DestinationAddedEvent:
|
||||
return buildDestinationAddedEvent(evt)
|
||||
case event.AddDestinationFailedEvent:
|
||||
return buildAddDestinationFailedEvent(evt)
|
||||
case event.DestinationStreamExitedEvent:
|
||||
return buildDestinationStreamExitedEvent(evt)
|
||||
case event.StartDestinationFailedEvent:
|
||||
return buildStartDestinationFailedEvent(evt)
|
||||
case event.DestinationRemovedEvent:
|
||||
return buildDestinationRemovedEvent(evt)
|
||||
case event.RemoveDestinationFailedEvent:
|
||||
return buildRemoveDestinationFailedEvent(evt)
|
||||
case event.FatalErrorOccurredEvent:
|
||||
return buildFatalErrorOccurredEvent(evt)
|
||||
case event.OtherInstanceDetectedEvent:
|
||||
return buildOtherInstanceDetectedEvent(evt)
|
||||
case event.MediaServerStartedEvent:
|
||||
return buildMediaServerStartedEvent(evt)
|
||||
default:
|
||||
panic("unknown event type")
|
||||
}
|
||||
}
|
||||
|
||||
func buildAppStateChangeEvent(evt event.AppStateChangedEvent) *pb.Event {
|
||||
return &pb.Event{
|
||||
EventType: &pb.Event_AppStateChanged{
|
||||
AppStateChanged: &pb.AppStateChangedEvent{
|
||||
AppState: &pb.AppState{
|
||||
Source: &pb.Source{
|
||||
Container: containerToProto(evt.State.Source.Container),
|
||||
Live: evt.State.Source.Live,
|
||||
LiveChangedAt: timestamppb.New(evt.State.Source.LiveChangedAt),
|
||||
Tracks: evt.State.Source.Tracks,
|
||||
ExitReason: evt.State.Source.ExitReason,
|
||||
},
|
||||
Destinations: destinationsToProto(evt.State.Destinations),
|
||||
BuildInfo: &pb.BuildInfo{
|
||||
GoVersion: evt.State.BuildInfo.GoVersion,
|
||||
Version: evt.State.BuildInfo.Version,
|
||||
Commit: evt.State.BuildInfo.Commit,
|
||||
Date: evt.State.BuildInfo.Date,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func buildDestinationAddedEvent(evt event.DestinationAddedEvent) *pb.Event {
|
||||
return &pb.Event{
|
||||
EventType: &pb.Event_DestinationAdded{
|
||||
DestinationAdded: &pb.DestinationAddedEvent{Url: evt.URL},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func buildAddDestinationFailedEvent(evt event.AddDestinationFailedEvent) *pb.Event {
|
||||
return &pb.Event{
|
||||
EventType: &pb.Event_AddDestinationFailed{
|
||||
AddDestinationFailed: &pb.AddDestinationFailedEvent{Url: evt.URL, Error: evt.Err.Error()},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func buildDestinationStreamExitedEvent(evt event.DestinationStreamExitedEvent) *pb.Event {
|
||||
return &pb.Event{
|
||||
EventType: &pb.Event_DestinationStreamExited{
|
||||
DestinationStreamExited: &pb.DestinationStreamExitedEvent{Name: evt.Name, Error: evt.Err.Error()},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func buildStartDestinationFailedEvent(evt event.StartDestinationFailedEvent) *pb.Event {
|
||||
return &pb.Event{
|
||||
EventType: &pb.Event_StartDestinationFailed{
|
||||
StartDestinationFailed: &pb.StartDestinationFailedEvent{Url: evt.URL, Message: evt.Message},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func buildDestinationRemovedEvent(evt event.DestinationRemovedEvent) *pb.Event {
|
||||
return &pb.Event{
|
||||
EventType: &pb.Event_DestinationRemoved{
|
||||
DestinationRemoved: &pb.DestinationRemovedEvent{Url: evt.URL},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func buildRemoveDestinationFailedEvent(evt event.RemoveDestinationFailedEvent) *pb.Event {
|
||||
return &pb.Event{
|
||||
EventType: &pb.Event_RemoveDestinationFailed{
|
||||
RemoveDestinationFailed: &pb.RemoveDestinationFailedEvent{Url: evt.URL, Error: evt.Err.Error()},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func buildFatalErrorOccurredEvent(evt event.FatalErrorOccurredEvent) *pb.Event {
|
||||
return &pb.Event{
|
||||
EventType: &pb.Event_FatalError{
|
||||
FatalError: &pb.FatalErrorEvent{Message: evt.Message},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func buildOtherInstanceDetectedEvent(_ event.OtherInstanceDetectedEvent) *pb.Event {
|
||||
return &pb.Event{
|
||||
EventType: &pb.Event_OtherInstanceDetected{
|
||||
OtherInstanceDetected: &pb.OtherInstanceDetectedEvent{},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func buildMediaServerStartedEvent(evt event.MediaServerStartedEvent) *pb.Event {
|
||||
return &pb.Event{
|
||||
EventType: &pb.Event_MediaServerStarted{
|
||||
MediaServerStarted: &pb.MediaServerStartedEvent{RtmpUrl: evt.RTMPURL, RtmpsUrl: evt.RTMPSURL},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// EventFromProto converts a protobuf message to an event.
|
||||
func EventFromProto(pbEv *pb.Event) event.Event {
|
||||
if pbEv == nil || pbEv.EventType == nil {
|
||||
panic("invalid or nil pb.Event")
|
||||
}
|
||||
|
||||
switch evt := pbEv.EventType.(type) {
|
||||
case *pb.Event_AppStateChanged:
|
||||
return parseAppStateChangedEvent(evt.AppStateChanged)
|
||||
case *pb.Event_DestinationAdded:
|
||||
return parseDestinationAddedEvent(evt.DestinationAdded)
|
||||
case *pb.Event_AddDestinationFailed:
|
||||
return parseAddDestinationFailedEvent(evt.AddDestinationFailed)
|
||||
case *pb.Event_DestinationStreamExited:
|
||||
return parseDestinationStreamExitedEvent(evt.DestinationStreamExited)
|
||||
case *pb.Event_StartDestinationFailed:
|
||||
return parseStartDestinationFailedEvent(evt.StartDestinationFailed)
|
||||
case *pb.Event_DestinationRemoved:
|
||||
return parseDestinationRemovedEvent(evt.DestinationRemoved)
|
||||
case *pb.Event_RemoveDestinationFailed:
|
||||
return parseRemoveDestinationFailedEvent(evt.RemoveDestinationFailed)
|
||||
case *pb.Event_FatalError:
|
||||
return parseFatalErrorOccurredEvent(evt.FatalError)
|
||||
case *pb.Event_OtherInstanceDetected:
|
||||
return parseOtherInstanceDetectedEvent(evt.OtherInstanceDetected)
|
||||
case *pb.Event_MediaServerStarted:
|
||||
return parseMediaServerStartedEvent(evt.MediaServerStarted)
|
||||
default:
|
||||
panic("unknown pb.Event type")
|
||||
}
|
||||
}
|
||||
|
||||
func parseAppStateChangedEvent(evt *pb.AppStateChangedEvent) event.Event {
|
||||
if evt == nil || evt.AppState == nil || evt.AppState.Source == nil {
|
||||
panic("invalid AppStateChangedEvent")
|
||||
}
|
||||
|
||||
return event.AppStateChangedEvent{
|
||||
State: domain.AppState{
|
||||
Source: domain.Source{
|
||||
Container: protoToContainer(evt.AppState.Source.Container),
|
||||
Live: evt.AppState.Source.Live,
|
||||
LiveChangedAt: evt.AppState.Source.LiveChangedAt.AsTime(),
|
||||
Tracks: evt.AppState.Source.Tracks,
|
||||
ExitReason: evt.AppState.Source.ExitReason,
|
||||
},
|
||||
Destinations: protoToDestinations(evt.AppState.Destinations),
|
||||
BuildInfo: domain.BuildInfo{
|
||||
GoVersion: evt.AppState.BuildInfo.GoVersion,
|
||||
Version: evt.AppState.BuildInfo.Version,
|
||||
Commit: evt.AppState.BuildInfo.Commit,
|
||||
Date: evt.AppState.BuildInfo.Date,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func parseDestinationAddedEvent(evt *pb.DestinationAddedEvent) event.Event {
|
||||
if evt == nil {
|
||||
panic("nil DestinationAddedEvent")
|
||||
}
|
||||
return event.DestinationAddedEvent{URL: evt.Url}
|
||||
}
|
||||
|
||||
func parseAddDestinationFailedEvent(evt *pb.AddDestinationFailedEvent) event.Event {
|
||||
if evt == nil {
|
||||
panic("nil AddDestinationFailedEvent")
|
||||
}
|
||||
return event.AddDestinationFailedEvent{URL: evt.Url, Err: errors.New(evt.Error)}
|
||||
}
|
||||
|
||||
func parseDestinationStreamExitedEvent(evt *pb.DestinationStreamExitedEvent) event.Event {
|
||||
if evt == nil {
|
||||
panic("nil DestinationStreamExitedEvent")
|
||||
}
|
||||
return event.DestinationStreamExitedEvent{Name: evt.Name, Err: errors.New(evt.Error)}
|
||||
}
|
||||
|
||||
func parseStartDestinationFailedEvent(evt *pb.StartDestinationFailedEvent) event.Event {
|
||||
if evt == nil {
|
||||
panic("nil StartDestinationFailedEvent")
|
||||
}
|
||||
return event.StartDestinationFailedEvent{URL: evt.Url, Message: evt.Message}
|
||||
}
|
||||
|
||||
func parseDestinationRemovedEvent(evt *pb.DestinationRemovedEvent) event.Event {
|
||||
if evt == nil {
|
||||
panic("nil DestinationRemovedEvent")
|
||||
}
|
||||
return event.DestinationRemovedEvent{URL: evt.Url}
|
||||
}
|
||||
|
||||
func parseRemoveDestinationFailedEvent(evt *pb.RemoveDestinationFailedEvent) event.Event {
|
||||
if evt == nil {
|
||||
panic("nil RemoveDestinationFailedEvent")
|
||||
}
|
||||
return event.RemoveDestinationFailedEvent{URL: evt.Url, Err: errors.New(evt.Error)}
|
||||
}
|
||||
|
||||
func parseFatalErrorOccurredEvent(evt *pb.FatalErrorEvent) event.Event {
|
||||
if evt == nil {
|
||||
panic("nil FatalErrorEvent")
|
||||
}
|
||||
return event.FatalErrorOccurredEvent{Message: evt.Message}
|
||||
}
|
||||
|
||||
func parseOtherInstanceDetectedEvent(_ *pb.OtherInstanceDetectedEvent) event.Event {
|
||||
return event.OtherInstanceDetectedEvent{}
|
||||
}
|
||||
|
||||
func parseMediaServerStartedEvent(evt *pb.MediaServerStartedEvent) event.Event {
|
||||
if evt == nil {
|
||||
panic("nil MediaServerStartedEvent")
|
||||
}
|
||||
return event.MediaServerStartedEvent{RTMPURL: evt.RtmpUrl, RTMPSURL: evt.RtmpsUrl}
|
||||
}
|
@ -1,151 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.netflux.io/rob/octoplex/internal/event"
|
||||
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
|
||||
"git.netflux.io/rob/octoplex/internal/protocol"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// APIListenerCountDeltaFunc is a function that is called when the number of
|
||||
// API clients increments or decrements.
|
||||
type APIClientCountDeltaFunc func(delta int)
|
||||
|
||||
// Server is the gRPC server that handles incoming commands and outgoing
|
||||
// events.
|
||||
type Server struct {
|
||||
pb.UnimplementedInternalAPIServer
|
||||
|
||||
dispatcher func(event.Command)
|
||||
bus *event.Bus
|
||||
logger *slog.Logger
|
||||
|
||||
mu sync.Mutex
|
||||
clientCount int
|
||||
clientC chan struct{}
|
||||
}
|
||||
|
||||
// newServer creates a new gRPC server.
|
||||
func newServer(
|
||||
dispatcher func(event.Command),
|
||||
bus *event.Bus,
|
||||
logger *slog.Logger,
|
||||
) *Server {
|
||||
return &Server{
|
||||
dispatcher: dispatcher,
|
||||
bus: bus,
|
||||
clientC: make(chan struct{}, 1),
|
||||
logger: logger.With("component", "server"),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) Communicate(stream pb.InternalAPI_CommunicateServer) error {
|
||||
g, ctx := errgroup.WithContext(stream.Context())
|
||||
|
||||
// perform handshake:
|
||||
startHandshakeCmd, err := stream.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("receive start handshake command: %w", err)
|
||||
}
|
||||
if startHandshakeCmd.GetCommand() == nil || startHandshakeCmd.GetCommand().GetStartHandshake() == nil {
|
||||
return fmt.Errorf("expected start handshake command but got: %T", startHandshakeCmd)
|
||||
}
|
||||
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: &pb.Event{EventType: &pb.Event_HandshakeCompleted{}}}}); err != nil {
|
||||
return fmt.Errorf("send handshake completed event: %w", err)
|
||||
}
|
||||
|
||||
// Notify that a client has connected and completed the handshake.
|
||||
select {
|
||||
case s.clientC <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
g.Go(func() error {
|
||||
eventsC := s.bus.Register()
|
||||
defer s.bus.Deregister(eventsC)
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt := <-eventsC:
|
||||
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: protocol.EventToProto(evt)}}); err != nil {
|
||||
return fmt.Errorf("send event: %w", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
s.mu.Lock()
|
||||
s.clientCount++
|
||||
s.mu.Unlock()
|
||||
|
||||
defer func() {
|
||||
s.mu.Lock()
|
||||
s.clientCount--
|
||||
s.mu.Unlock()
|
||||
}()
|
||||
|
||||
for {
|
||||
in, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
s.logger.Info("Client disconnected")
|
||||
return err
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("receive message: %w", err)
|
||||
}
|
||||
|
||||
switch pbCmd := in.Payload.(type) {
|
||||
case *pb.Envelope_Command:
|
||||
cmd := protocol.CommandFromProto(pbCmd.Command)
|
||||
s.logger.Info("Received command", "command", cmd.Name())
|
||||
s.dispatcher(cmd)
|
||||
default:
|
||||
return fmt.Errorf("expected command but got: %T", pbCmd)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if err := g.Wait(); err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
|
||||
s.logger.Error("Client stream closed with error", "err", err)
|
||||
return fmt.Errorf("errgroup.Wait: %w", err)
|
||||
}
|
||||
|
||||
s.logger.Info("Client stream closed")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetClientCount returns the number of connected clients.
|
||||
func (s *Server) GetClientCount() int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.clientCount
|
||||
}
|
||||
|
||||
const waitForClientTimeout = 10 * time.Second
|
||||
|
||||
// WaitForClient waits for _any_ client to connect and complete the handshake.
|
||||
// It times out if no client has connected after 10 seconds.
|
||||
func (s *Server) WaitForClient(ctx context.Context) error {
|
||||
select {
|
||||
case <-s.clientC:
|
||||
return nil
|
||||
case <-time.After(waitForClientTimeout):
|
||||
return errors.New("timeout")
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
@ -3,7 +3,6 @@ package terminal
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"maps"
|
||||
@ -45,9 +44,9 @@ type UI struct {
|
||||
eventBus *event.Bus
|
||||
dispatch func(event.Command)
|
||||
clipboardAvailable bool
|
||||
configFilePath string
|
||||
rtmpURL, rtmpsURL string
|
||||
buildInfo domain.BuildInfo
|
||||
appExitC chan error
|
||||
logger *slog.Logger
|
||||
|
||||
// tview state
|
||||
@ -93,20 +92,20 @@ type ScreenCapture struct {
|
||||
Width, Height int
|
||||
}
|
||||
|
||||
// Params contains the parameters for starting a new terminal user
|
||||
// StartParams contains the parameters for starting a new terminal user
|
||||
// interface.
|
||||
type Params struct {
|
||||
type StartParams struct {
|
||||
EventBus *event.Bus
|
||||
Dispatcher func(event.Command)
|
||||
Logger *slog.Logger
|
||||
ClipboardAvailable bool
|
||||
ConfigFilePath string
|
||||
BuildInfo domain.BuildInfo
|
||||
Screen *Screen // Screen may be nil.
|
||||
}
|
||||
|
||||
// NewUI creates the user interface. Call [Run] on the *UI instance to block
|
||||
// until it is completed.
|
||||
func NewUI(ctx context.Context, params Params) (*UI, error) {
|
||||
// StartUI starts the terminal user interface.
|
||||
func StartUI(ctx context.Context, params StartParams) (*UI, error) {
|
||||
app := tview.NewApplication()
|
||||
|
||||
var screen tcell.Screen
|
||||
@ -212,7 +211,7 @@ func NewUI(ctx context.Context, params Params) (*UI, error) {
|
||||
eventBus: params.EventBus,
|
||||
dispatch: params.Dispatcher,
|
||||
clipboardAvailable: params.ClipboardAvailable,
|
||||
appExitC: make(chan error, 1),
|
||||
configFilePath: params.ConfigFilePath,
|
||||
buildInfo: params.BuildInfo,
|
||||
logger: params.Logger,
|
||||
app: app,
|
||||
@ -238,6 +237,8 @@ func NewUI(ctx context.Context, params Params) (*UI, error) {
|
||||
app.SetInputCapture(ui.inputCaptureHandler)
|
||||
app.SetAfterDrawFunc(ui.afterDrawHandler)
|
||||
|
||||
go ui.run(ctx)
|
||||
|
||||
return ui, nil
|
||||
}
|
||||
|
||||
@ -261,32 +262,32 @@ func (ui *UI) renderAboutView() {
|
||||
ui.aboutView.AddItem(rtmpsURLView, 1, 0, false)
|
||||
}
|
||||
|
||||
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
|
||||
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
|
||||
}
|
||||
|
||||
var ErrUserClosed = errors.New("user closed UI")
|
||||
func (ui *UI) run(ctx context.Context) {
|
||||
defer func() {
|
||||
// Ensure the application is stopped when the UI is closed.
|
||||
ui.dispatch(event.CommandQuit{})
|
||||
}()
|
||||
|
||||
// Run runs the user interface. It always returns a non-nil error, which will
|
||||
// be [ErrUserClosed] if the user voluntarily closed the UI.
|
||||
func (ui *UI) Run(ctx context.Context) error {
|
||||
eventC := ui.eventBus.Register()
|
||||
defer ui.eventBus.Deregister(eventC)
|
||||
|
||||
uiDone := make(chan struct{})
|
||||
go func() {
|
||||
err := ui.app.Run()
|
||||
if err != nil {
|
||||
ui.logger.Error("Error in UI run loop, exiting", "err", err)
|
||||
defer func() {
|
||||
uiDone <- struct{}{}
|
||||
}()
|
||||
|
||||
if err := ui.app.Run(); err != nil {
|
||||
ui.logger.Error("tui application error", "err", err)
|
||||
}
|
||||
ui.appExitC <- err
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt, ok := <-eventC:
|
||||
if !ok {
|
||||
// should never happen
|
||||
return errors.New("event channel closed")
|
||||
}
|
||||
case evt := <-eventC:
|
||||
ui.app.QueueUpdateDraw(func() {
|
||||
switch evt := evt.(type) {
|
||||
case event.AppStateChangedEvent:
|
||||
@ -312,11 +313,12 @@ func (ui *UI) Run(ctx context.Context) error {
|
||||
default:
|
||||
ui.logger.Warn("unhandled event", "event", evt)
|
||||
}
|
||||
|
||||
})
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-ui.appExitC:
|
||||
return cmp.Or(err, ErrUserClosed)
|
||||
return
|
||||
case <-uiDone:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -356,6 +358,8 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
|
||||
return nil
|
||||
case ' ':
|
||||
ui.toggleDestination()
|
||||
case 'c', 'C':
|
||||
ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath)
|
||||
case '?':
|
||||
ui.showAbout()
|
||||
case 'k': // tview vim bindings
|
||||
@ -821,11 +825,6 @@ func (ui *UI) Close() {
|
||||
ui.app.Stop()
|
||||
}
|
||||
|
||||
// Wait waits for the terminal user interface to finish.
|
||||
func (ui *UI) Wait() {
|
||||
<-ui.appExitC
|
||||
}
|
||||
|
||||
func (ui *UI) addDestination() {
|
||||
const (
|
||||
inputLen = 60
|
||||
@ -1007,6 +1006,28 @@ func (ui *UI) copySourceURLToClipboard(url string) {
|
||||
)
|
||||
}
|
||||
|
||||
func (ui *UI) copyConfigFilePathToClipboard(clipboardAvailable bool, configFilePath string) {
|
||||
var text string
|
||||
if clipboardAvailable {
|
||||
if configFilePath != "" {
|
||||
clipboard.Write(clipboard.FmtText, []byte(configFilePath))
|
||||
text = "Configuration file path copied to clipboard:\n\n" + configFilePath
|
||||
} else {
|
||||
text = "Configuration file path not set"
|
||||
}
|
||||
} else {
|
||||
text = "Copy to clipboard not available"
|
||||
}
|
||||
|
||||
ui.showModal(
|
||||
pageNameModalClipboard,
|
||||
text,
|
||||
[]string{"Ok"},
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
}
|
||||
|
||||
func (ui *UI) confirmQuit() {
|
||||
ui.showModal(
|
||||
pageNameModalQuit,
|
||||
@ -1015,7 +1036,7 @@ func (ui *UI) confirmQuit() {
|
||||
false,
|
||||
func(buttonIndex int, _ string) {
|
||||
if buttonIndex == 0 {
|
||||
ui.app.Stop()
|
||||
ui.dispatch(event.CommandQuit{})
|
||||
}
|
||||
},
|
||||
)
|
||||
|
306
main.go
306
main.go
@ -4,23 +4,21 @@ import (
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"syscall"
|
||||
|
||||
"git.netflux.io/rob/octoplex/internal/client"
|
||||
"git.netflux.io/rob/octoplex/internal/app"
|
||||
"git.netflux.io/rob/octoplex/internal/config"
|
||||
"git.netflux.io/rob/octoplex/internal/domain"
|
||||
"git.netflux.io/rob/octoplex/internal/server"
|
||||
dockerclient "github.com/docker/docker/client"
|
||||
"github.com/urfave/cli/v2"
|
||||
"golang.design/x/clipboard"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -32,108 +30,23 @@ var (
|
||||
date string
|
||||
)
|
||||
|
||||
// errInterrupt is an error type that indicates an interrupt signal was
|
||||
// received.
|
||||
type errInterrupt struct{}
|
||||
|
||||
// Error implements the error interface.
|
||||
func (e errInterrupt) Error() string {
|
||||
return "interrupt signal received"
|
||||
}
|
||||
|
||||
// ExitCode implements the ExitCoder interface.
|
||||
func (e errInterrupt) ExitCode() int {
|
||||
return 130
|
||||
}
|
||||
var errShutdown = errors.New("shutdown")
|
||||
|
||||
func main() {
|
||||
app := &cli.App{
|
||||
Name: "Octoplex",
|
||||
Usage: "Octoplex is a live video restreamer for Docker.",
|
||||
Commands: []*cli.Command{
|
||||
{
|
||||
Name: "client",
|
||||
Usage: "Run the client",
|
||||
Action: func(c *cli.Context) error {
|
||||
return runClient(c.Context, c)
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "server",
|
||||
Usage: "Run the server",
|
||||
Action: func(c *cli.Context) error {
|
||||
return runServer(c.Context, c, serverConfig{
|
||||
stderrAvailable: true,
|
||||
handleSigInt: true,
|
||||
waitForClient: false,
|
||||
})
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "run",
|
||||
Usage: "Run server and client together (testing)",
|
||||
Action: func(c *cli.Context) error {
|
||||
return runClientAndServer(c)
|
||||
},
|
||||
},
|
||||
},
|
||||
var exitStatus int
|
||||
|
||||
if err := run(); errors.Is(err, errShutdown) {
|
||||
exitStatus = 130
|
||||
} else if err != nil {
|
||||
exitStatus = 1
|
||||
_, _ = os.Stderr.WriteString("Error: " + err.Error() + "\n")
|
||||
}
|
||||
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
os.Exit(exitStatus)
|
||||
}
|
||||
|
||||
func runClient(ctx context.Context, _ *cli.Context) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
// TODO: logger from config
|
||||
fptr, err := os.OpenFile("octoplex.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open log file: %w", err)
|
||||
}
|
||||
logger := slog.New(slog.NewTextHandler(fptr, nil))
|
||||
logger.Info("Starting client", "version", cmp.Or(version, "devel"), "commit", cmp.Or(commit, "unknown"), "date", cmp.Or(date, "unknown"), "go_version", runtime.Version())
|
||||
|
||||
var clipboardAvailable bool
|
||||
if err = clipboard.Init(); err != nil {
|
||||
logger.Warn("Clipboard not available", "err", err)
|
||||
} else {
|
||||
clipboardAvailable = true
|
||||
}
|
||||
|
||||
buildInfo, ok := debug.ReadBuildInfo()
|
||||
if !ok {
|
||||
return fmt.Errorf("read build info: %w", err)
|
||||
}
|
||||
|
||||
app := client.New(client.NewParams{
|
||||
ClipboardAvailable: clipboardAvailable,
|
||||
BuildInfo: domain.BuildInfo{
|
||||
GoVersion: buildInfo.GoVersion,
|
||||
Version: version,
|
||||
Commit: commit,
|
||||
Date: date,
|
||||
},
|
||||
Logger: logger,
|
||||
})
|
||||
if err := app.Run(ctx); err != nil {
|
||||
return fmt.Errorf("run app: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type serverConfig struct {
|
||||
stderrAvailable bool
|
||||
handleSigInt bool
|
||||
waitForClient bool
|
||||
}
|
||||
|
||||
func runServer(ctx context.Context, _ *cli.Context, serverCfg serverConfig) error {
|
||||
ctx, cancel := context.WithCancelCause(ctx)
|
||||
func run() error {
|
||||
ctx, cancel := context.WithCancelCause(context.Background())
|
||||
defer cancel(nil)
|
||||
|
||||
configService, err := config.NewDefaultService()
|
||||
@ -141,35 +54,45 @@ func runServer(ctx context.Context, _ *cli.Context, serverCfg serverConfig) erro
|
||||
return fmt.Errorf("build config service: %w", err)
|
||||
}
|
||||
|
||||
help := flag.Bool("h", false, "Show help")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
if *help {
|
||||
printUsage()
|
||||
return nil
|
||||
}
|
||||
|
||||
if narg := flag.NArg(); narg > 1 {
|
||||
printUsage()
|
||||
return fmt.Errorf("too many arguments")
|
||||
} else if narg == 1 {
|
||||
switch flag.Arg(0) {
|
||||
case "edit-config":
|
||||
return editConfigFile(configService)
|
||||
case "print-config":
|
||||
return printConfigPath(configService.Path())
|
||||
case "version":
|
||||
return printVersion()
|
||||
case "help":
|
||||
printUsage()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
cfg, err := configService.ReadOrCreateConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("read or create config: %w", err)
|
||||
}
|
||||
|
||||
// TODO: improve logger API
|
||||
// Currently it's a bit complicated because we can only use stdout - the
|
||||
// preferred destination - if the client is not running. Otherwise we
|
||||
// fallback to the legacy configuration but this should be bought more
|
||||
// in-line with the client/server split.
|
||||
var w io.Writer
|
||||
if serverCfg.stderrAvailable {
|
||||
w = os.Stdout
|
||||
} else if !cfg.LogFile.Enabled {
|
||||
w = io.Discard
|
||||
} else {
|
||||
w, err = os.OpenFile(cfg.LogFile.GetPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||
headless := os.Getenv("OCTO_HEADLESS") != ""
|
||||
logger, err := buildLogger(cfg.LogFile, headless)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening log file: %w", err)
|
||||
}
|
||||
return fmt.Errorf("build logger: %w", err)
|
||||
}
|
||||
|
||||
var handlerOpts slog.HandlerOptions
|
||||
if os.Getenv("OCTO_DEBUG") != "" {
|
||||
handlerOpts.Level = slog.LevelDebug
|
||||
}
|
||||
logger := slog.New(slog.NewTextHandler(w, &handlerOpts))
|
||||
|
||||
if serverCfg.handleSigInt {
|
||||
if headless {
|
||||
// When running in headless mode tview doesn't handle SIGINT for us.
|
||||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
@ -177,10 +100,17 @@ func runServer(ctx context.Context, _ *cli.Context, serverCfg serverConfig) erro
|
||||
<-ch
|
||||
logger.Info("Received interrupt signal, exiting")
|
||||
signal.Stop(ch)
|
||||
cancel(errInterrupt{})
|
||||
cancel(errShutdown)
|
||||
}()
|
||||
}
|
||||
|
||||
var clipboardAvailable bool
|
||||
if err = clipboard.Init(); err != nil {
|
||||
logger.Warn("Clipboard not available", "err", err)
|
||||
} else {
|
||||
clipboardAvailable = true
|
||||
}
|
||||
|
||||
dockerClient, err := dockerclient.NewClientWithOpts(
|
||||
dockerclient.FromEnv,
|
||||
dockerclient.WithAPIVersionNegotiation(),
|
||||
@ -189,64 +119,102 @@ func runServer(ctx context.Context, _ *cli.Context, serverCfg serverConfig) erro
|
||||
return fmt.Errorf("new docker client: %w", err)
|
||||
}
|
||||
|
||||
app := server.New(server.Params{
|
||||
buildInfo, ok := debug.ReadBuildInfo()
|
||||
if !ok {
|
||||
return fmt.Errorf("read build info: %w", err)
|
||||
}
|
||||
|
||||
app := app.New(app.Params{
|
||||
ConfigService: configService,
|
||||
DockerClient: dockerClient,
|
||||
Headless: headless,
|
||||
ClipboardAvailable: clipboardAvailable,
|
||||
ConfigFilePath: configService.Path(),
|
||||
WaitForClient: serverCfg.waitForClient,
|
||||
BuildInfo: domain.BuildInfo{
|
||||
GoVersion: buildInfo.GoVersion,
|
||||
Version: version,
|
||||
Commit: commit,
|
||||
Date: date,
|
||||
},
|
||||
Logger: logger,
|
||||
})
|
||||
|
||||
logger.Info(
|
||||
"Starting server",
|
||||
"version",
|
||||
cmp.Or(version, "devel"),
|
||||
"commit",
|
||||
cmp.Or(commit, "unknown"),
|
||||
"date",
|
||||
cmp.Or(date, "unknown"),
|
||||
"go_version",
|
||||
runtime.Version(),
|
||||
)
|
||||
|
||||
if err := app.Run(ctx); err != nil {
|
||||
if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ctx), errInterrupt{}) {
|
||||
return context.Cause(ctx)
|
||||
return app.Run(ctx)
|
||||
}
|
||||
return err
|
||||
|
||||
// editConfigFile opens the config file in the user's editor.
|
||||
func editConfigFile(configService *config.Service) error {
|
||||
if _, err := configService.ReadOrCreateConfig(); err != nil {
|
||||
return fmt.Errorf("read or create config: %w", err)
|
||||
}
|
||||
|
||||
editor := os.Getenv("EDITOR")
|
||||
if editor == "" {
|
||||
editor = "vi"
|
||||
}
|
||||
binary, err := exec.LookPath(editor)
|
||||
if err != nil {
|
||||
return fmt.Errorf("look path: %w", err)
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, "Editing config file: %s\n", configService.Path())
|
||||
fmt.Println(binary)
|
||||
|
||||
if err := syscall.Exec(binary, []string{"--", configService.Path()}, os.Environ()); err != nil {
|
||||
return fmt.Errorf("exec: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func runClientAndServer(c *cli.Context) error {
|
||||
errNoErr := errors.New("no error")
|
||||
|
||||
g, ctx := errgroup.WithContext(c.Context)
|
||||
|
||||
g.Go(func() error {
|
||||
if err := runClient(ctx, c); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return errNoErr
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
if err := runServer(ctx, c, serverConfig{
|
||||
stderrAvailable: false,
|
||||
handleSigInt: false,
|
||||
waitForClient: true,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return errNoErr
|
||||
})
|
||||
|
||||
if err := g.Wait(); err == errNoErr {
|
||||
// printConfigPath prints the path to the config file to stderr.
|
||||
func printConfigPath(configPath string) error {
|
||||
fmt.Fprintln(os.Stderr, configPath)
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
|
||||
// printVersion prints the version of the application to stderr.
|
||||
func printVersion() error {
|
||||
fmt.Fprintf(os.Stderr, "%s version %s\n", domain.AppName, cmp.Or(version, "0.0.0-dev"))
|
||||
return nil
|
||||
}
|
||||
|
||||
func printUsage() {
|
||||
os.Stderr.WriteString("Usage: octoplex [command]\n\n")
|
||||
os.Stderr.WriteString("Commands:\n\n")
|
||||
os.Stderr.WriteString(" edit-config Edit the config file\n")
|
||||
os.Stderr.WriteString(" print-config Print the path to the config file\n")
|
||||
os.Stderr.WriteString(" version Print the version of the application\n")
|
||||
os.Stderr.WriteString(" help Print this help message\n")
|
||||
os.Stderr.WriteString("\n")
|
||||
os.Stderr.WriteString("Additionally, Octoplex can be configured with the following environment variables:\n\n")
|
||||
os.Stderr.WriteString(" OCTO_DEBUG Enables debug logging if set\n")
|
||||
os.Stderr.WriteString(" OCTO_HEADLESS Enables headless mode if set (experimental)\n\n")
|
||||
}
|
||||
|
||||
// buildLogger builds the logger, which may be a no-op logger.
|
||||
func buildLogger(cfg config.LogFile, headless bool) (*slog.Logger, error) {
|
||||
build := func(w io.Writer) *slog.Logger {
|
||||
var handlerOpts slog.HandlerOptions
|
||||
if os.Getenv("OCTO_DEBUG") != "" {
|
||||
handlerOpts.Level = slog.LevelDebug
|
||||
}
|
||||
return slog.New(slog.NewTextHandler(w, &handlerOpts))
|
||||
}
|
||||
|
||||
// In headless mode, always log to stderr.
|
||||
if headless {
|
||||
return build(os.Stderr), nil
|
||||
}
|
||||
|
||||
if !cfg.Enabled {
|
||||
return slog.New(slog.DiscardHandler), nil
|
||||
}
|
||||
|
||||
fptr, err := os.OpenFile(cfg.GetPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening log file: %w", err)
|
||||
}
|
||||
|
||||
return build(fptr), nil
|
||||
}
|
||||
|
@ -40,9 +40,3 @@ description = "Generate mocks"
|
||||
dir = "{{cwd}}"
|
||||
run = "go tool mockery"
|
||||
alias = "m"
|
||||
|
||||
[tasks.generate_proto]
|
||||
description = "Generate gRPC files from proto"
|
||||
dir = "{{cwd}}"
|
||||
run = "protoc -I proto --go_out=paths=source_relative:internal/generated/grpc --go-grpc_out=paths=source_relative:internal/generated/grpc proto/api.proto"
|
||||
alias = "p"
|
||||
|
166
proto/api.proto
166
proto/api.proto
@ -1,166 +0,0 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package api;
|
||||
|
||||
import "google/protobuf/timestamp.proto";
|
||||
|
||||
option go_package = "git.netflux.io/rob/octoplex/internal/generated/grpc";
|
||||
|
||||
service InternalAPI {
|
||||
rpc Communicate(stream Envelope) returns (stream Envelope);
|
||||
}
|
||||
|
||||
message Envelope {
|
||||
oneof payload {
|
||||
Command command = 1;
|
||||
Event event = 2;
|
||||
}
|
||||
}
|
||||
|
||||
message Command {
|
||||
oneof command_type {
|
||||
AddDestinationCommand add_destination = 1;
|
||||
RemoveDestinationCommand remove_destination = 2;
|
||||
StartDestinationCommand start_destination = 3;
|
||||
StopDestinationCommand stop_destination = 4;
|
||||
CloseOtherInstancesCommand close_other_instances = 5;
|
||||
QuitCommand quit = 6;
|
||||
StartHandshakeCommand start_handshake = 7;
|
||||
}
|
||||
}
|
||||
|
||||
message AddDestinationCommand {
|
||||
string name = 1;
|
||||
string url = 2;
|
||||
}
|
||||
|
||||
message RemoveDestinationCommand {
|
||||
string url= 1;
|
||||
}
|
||||
|
||||
message StartDestinationCommand {
|
||||
string url = 1;
|
||||
}
|
||||
|
||||
message StopDestinationCommand {
|
||||
string url = 1;
|
||||
}
|
||||
|
||||
message CloseOtherInstancesCommand {}
|
||||
|
||||
message QuitCommand {}
|
||||
|
||||
message StartHandshakeCommand {};
|
||||
|
||||
message Event {
|
||||
oneof event_type {
|
||||
AppStateChangedEvent app_state_changed = 1;
|
||||
DestinationStreamExitedEvent destination_stream_exited = 2;
|
||||
DestinationAddedEvent destination_added = 3;
|
||||
AddDestinationFailedEvent add_destination_failed = 4;
|
||||
DestinationRemovedEvent destination_removed = 5;
|
||||
RemoveDestinationFailedEvent remove_destination_failed = 6;
|
||||
StartDestinationFailedEvent start_destination_failed = 7;
|
||||
MediaServerStartedEvent media_server_started = 8;
|
||||
OtherInstanceDetectedEvent other_instance_detected = 9;
|
||||
FatalErrorEvent fatal_error = 10;
|
||||
HandshakeCompletedEvent handshake_completed = 11;
|
||||
}
|
||||
}
|
||||
|
||||
message Container {
|
||||
string id = 1;
|
||||
string status = 2;
|
||||
string health_state = 3;
|
||||
double cpu_percent = 4;
|
||||
uint64 memory_usage_bytes = 5;
|
||||
int32 rx_rate = 6;
|
||||
int32 tx_rate = 7;
|
||||
google.protobuf.Timestamp rx_since = 8;
|
||||
string image_name = 9;
|
||||
string pull_status = 10;
|
||||
string pull_progress = 11;
|
||||
int32 pull_percent = 12;
|
||||
int32 restart_count = 13;
|
||||
optional int32 exit_code = 14;
|
||||
string err = 15;
|
||||
}
|
||||
|
||||
message Source {
|
||||
Container container = 1;
|
||||
bool live = 2;
|
||||
google.protobuf.Timestamp live_changed_at = 3;
|
||||
repeated string tracks = 4;
|
||||
string exit_reason = 5;
|
||||
}
|
||||
|
||||
message Destination {
|
||||
enum Status {
|
||||
STATUS_OFF_AIR = 0;
|
||||
STATUS_STARTING = 1;
|
||||
STATUS_LIVE = 2;
|
||||
}
|
||||
|
||||
Container container = 1;
|
||||
Status status = 2;
|
||||
string name = 3;
|
||||
string url = 4;
|
||||
}
|
||||
|
||||
message BuildInfo {
|
||||
string go_version = 1;
|
||||
string version = 2;
|
||||
string commit = 3;
|
||||
string date = 4;
|
||||
}
|
||||
|
||||
message AppState {
|
||||
Source source = 1;
|
||||
repeated Destination destinations = 2;
|
||||
BuildInfo build_info = 3;
|
||||
}
|
||||
|
||||
message AppStateChangedEvent {
|
||||
AppState app_state = 1;
|
||||
}
|
||||
|
||||
message DestinationStreamExitedEvent {
|
||||
string name = 1;
|
||||
string error = 2;
|
||||
}
|
||||
|
||||
message DestinationAddedEvent {
|
||||
string url = 1;
|
||||
}
|
||||
|
||||
message AddDestinationFailedEvent {
|
||||
string url = 1;
|
||||
string error = 2;
|
||||
}
|
||||
|
||||
message DestinationRemovedEvent {
|
||||
string url = 1;
|
||||
}
|
||||
|
||||
message RemoveDestinationFailedEvent {
|
||||
string url = 1;
|
||||
string error = 2;
|
||||
}
|
||||
|
||||
message StartDestinationFailedEvent {
|
||||
string url = 1;
|
||||
string message = 2;
|
||||
}
|
||||
|
||||
message MediaServerStartedEvent {
|
||||
string rtmp_url = 1;
|
||||
string rtmps_url = 2;
|
||||
}
|
||||
|
||||
message OtherInstanceDetectedEvent {}
|
||||
|
||||
message FatalErrorEvent {
|
||||
string message = 1;
|
||||
}
|
||||
|
||||
message HandshakeCompletedEvent {}
|
Loading…
x
Reference in New Issue
Block a user