Compare commits

..

23 Commits

Author SHA1 Message Date
f006187894 fixup! wip: refactor: API 2025-05-13 07:17:05 +02:00
5565331630 fixup! wip: refactor: API 2025-05-13 07:11:18 +02:00
32701499e7 fixup! wip: refactor: API 2025-05-12 21:13:16 +02:00
311c100d89 fixup! wip: refactor: API 2025-05-12 19:42:50 +02:00
e3e469edec fixup! wip: refactor: API 2025-05-12 19:32:24 +02:00
eaccb17f03 fixup! wip: refactor: API 2025-05-12 19:30:28 +02:00
7a3f1335c1 fixup! wip: refactor: API 2025-05-12 09:00:43 +02:00
5aa1be2066 fixup! wip: refactor: API 2025-05-11 08:11:05 +02:00
c5724dfe59 fixup! wip: refactor: API 2025-05-11 07:55:06 +02:00
d0d96dd1d9 fixup! wip: refactor: API 2025-05-11 06:15:20 +02:00
977b6fe7d7 fixup! wip: refactor: API 2025-05-11 06:14:17 +02:00
b8a77d9c6c fixup! wip: refactor: API 2025-05-11 06:12:44 +02:00
c706a41acd fixup! wip: refactor: API 2025-05-10 21:47:10 +02:00
59b0a060ba fixup! wip: refactor: API 2025-05-10 21:18:42 +02:00
116623f386 fixup! wip: refactor: API 2025-05-10 21:14:40 +02:00
aa6f50715d fixup! wip: refactor: API 2025-05-10 08:15:23 +02:00
7706bb363f fixup! wip: refactor: API 2025-05-10 07:46:58 +02:00
888ac7d67d fixup! wip: refactor: API 2025-05-10 07:36:39 +02:00
c6f21b194a fixup! wip: refactor: API 2025-05-09 06:47:27 +02:00
6ceb286e6b fixup! wip: refactor: API 2025-05-08 07:08:43 +02:00
0a6b9fad90 fixup! wip: refactor: API 2025-05-07 20:32:27 +02:00
f67a456d1e wip: refactor: API 2025-05-06 21:38:14 +02:00
461c2b1167 refactor: main.go > ./cmd/server 2025-05-06 20:17:41 +02:00
25 changed files with 4039 additions and 421 deletions

10
go.mod
View File

@ -11,7 +11,11 @@ require (
github.com/rivo/tview v0.0.0-20250330220935-949945f8d922
github.com/stretchr/testify v1.10.0
github.com/testcontainers/testcontainers-go v0.35.0
github.com/urfave/cli/v2 v2.27.6
golang.design/x/clipboard v0.7.0
golang.org/x/sync v0.13.0
google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.36.3
gopkg.in/yaml.v3 v3.0.1
)
@ -24,6 +28,7 @@ require (
github.com/containerd/log v0.1.0 // indirect
github.com/containerd/platforms v0.2.1 // indirect
github.com/cpuguy83/dockercfg v0.3.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
@ -65,6 +70,7 @@ require (
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rs/zerolog v1.33.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.7.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
@ -81,6 +87,7 @@ require (
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/vektra/mockery/v2 v2.52.2 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
@ -95,12 +102,13 @@ require (
golang.org/x/image v0.26.0 // indirect
golang.org/x/mobile v0.0.0-20250408133729-978277e7eaf7 // indirect
golang.org/x/mod v0.24.0 // indirect
golang.org/x/sync v0.13.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/term v0.31.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/time v0.9.0 // indirect
golang.org/x/tools v0.32.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

9
go.sum
View File

@ -18,6 +18,8 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA=
github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc=
github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -52,6 +54,8 @@ github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiU
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
@ -140,6 +144,7 @@ github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWN
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo=
github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k=
@ -185,8 +190,12 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/urfave/cli/v2 v2.27.6 h1:VdRdS98FNhKZ8/Az8B7MTyGQmpIr36O1EHybx/LaZ4g=
github.com/urfave/cli/v2 v2.27.6/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/vektra/mockery/v2 v2.52.2 h1:8QfPKUIrq8P3Cs7G79Iu4Byd5wdhGCE0quIS27x7rQo=
github.com/vektra/mockery/v2 v2.52.2/go.mod h1:zGDY/f6bip0Yh13GQ5j7xa43fuEoYBa4ICHEaihisHw=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=

View File

@ -0,0 +1,131 @@
package client
import (
"context"
"fmt"
"log/slog"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/protocol"
"git.netflux.io/rob/octoplex/internal/terminal"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// App is the client application.
type App struct {
bus *event.Bus
clipboardAvailable bool
buildInfo domain.BuildInfo
screen *terminal.Screen
logger *slog.Logger
}
// NewParams contains the parameters for the App.
type NewParams struct {
ClipboardAvailable bool
BuildInfo domain.BuildInfo
Screen *terminal.Screen
Logger *slog.Logger
}
// New creates a new App instance.
func New(params NewParams) *App {
return &App{
bus: event.NewBus(params.Logger),
clipboardAvailable: params.ClipboardAvailable,
buildInfo: params.BuildInfo,
screen: params.Screen,
logger: params.Logger,
}
}
// Run starts the application, and blocks until it is closed.
//
// It returns nil if the application was closed by the user, or an error if it
// closed for any other reason.
func (a *App) Run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
conn, err := grpc.NewClient("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to gRPC server: %w", err)
}
apiClient := pb.NewInternalAPIClient(conn)
stream, err := apiClient.Communicate(ctx)
if err != nil {
return fmt.Errorf("create gRPC stream: %w", err)
}
ui, err := terminal.NewUI(ctx, terminal.Params{
EventBus: a.bus,
Dispatcher: func(cmd event.Command) {
a.logger.Info("Command dispatched", "cmd", cmd.Name())
if sendErr := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: protocol.CommandToProto(cmd)}}); sendErr != nil {
a.logger.Error("Error sending command to gRPC API", "err", sendErr)
}
},
ClipboardAvailable: a.clipboardAvailable,
BuildInfo: a.buildInfo,
Screen: a.screen,
Logger: a.logger.With("component", "ui"),
})
if err != nil {
return fmt.Errorf("start terminal user interface: %w", err)
}
defer ui.Close()
g.Go(func() error { return ui.Run(ctx) })
// After the UI is available, perform a handshake with the server.
// Ordering is important here. We want to ensure that the UI is ready to
// react to events received from the server. Performing the handshake ensures
// the client has received at least one event.
if err := a.doHandshake(stream); err != nil {
return fmt.Errorf("do handshake: %w", err)
}
g.Go(func() error {
for {
envelope, recErr := stream.Recv()
if recErr != nil {
return fmt.Errorf("receive envelope: %w", recErr)
}
pbEvt := envelope.GetEvent()
if pbEvt == nil {
a.logger.Error("Received envelope without event")
continue
}
evt := protocol.EventFromProto(pbEvt)
a.logger.Debug("Received event from gRPC stream", "event", evt.EventName(), "payload", evt)
a.bus.Send(evt)
}
})
if err := g.Wait(); err == terminal.ErrUserClosed {
return nil
} else {
return fmt.Errorf("errgroup.Wait: %w", err)
}
}
func (a *App) doHandshake(stream pb.InternalAPI_CommunicateClient) error {
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: &pb.Command{CommandType: &pb.Command_StartHandshake{}}}}); err != nil {
return fmt.Errorf("send start handshake command: %w", err)
}
env, err := stream.Recv()
if err != nil {
return fmt.Errorf("receive handshake completed event: %w", err)
}
if evt := env.GetEvent(); evt == nil || evt.GetHandshakeCompleted() == nil {
return fmt.Errorf("expected handshake completed event but got: %T", env)
}
return nil
}

View File

@ -1,9 +1,11 @@
//go:build integration
package app_test
package client_test
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
@ -14,39 +16,79 @@ import (
"testing"
"time"
"git.netflux.io/rob/octoplex/internal/app"
"git.netflux.io/rob/octoplex/internal/client"
"git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/server"
"git.netflux.io/rob/octoplex/internal/terminal"
"github.com/gdamore/tcell/v2"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"golang.org/x/sync/errgroup"
)
func buildAppParams(
t *testing.T,
func buildClientServer(
configService *config.Service,
dockerClient container.DockerClient,
screen tcell.SimulationScreen,
screenCaptureC chan<- terminal.ScreenCapture,
logger *slog.Logger,
) app.Params {
t.Helper()
return app.Params{
ConfigService: configService,
DockerClient: dockerClient,
) (*client.App, *server.App) {
client := client.New(client.NewParams{
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
Screen: &terminal.Screen{
Screen: screen,
Width: 180,
Height: 25,
CaptureC: screenCaptureC,
},
ClipboardAvailable: false,
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
Logger: logger,
}
Logger: logger,
})
server := server.New(server.Params{
ConfigService: configService,
DockerClient: dockerClient,
WaitForClient: true,
Logger: logger,
})
return client, server
}
type clientServerResult struct {
errClient error
errServer error
}
func runClientServer(
ctx context.Context,
_ *testing.T,
clientApp *client.App,
serverApp *server.App,
) <-chan clientServerResult {
ch := make(chan clientServerResult, 1)
g, ctx := errgroup.WithContext(ctx)
var clientErr, srvErr error
g.Go(func() error {
srvErr = serverApp.Run(ctx)
return errors.New("server closed")
})
g.Go(func() error {
clientErr = clientApp.Run(ctx)
return errors.New("client closed")
})
go func() {
_ = g.Wait()
ch <- clientServerResult{errClient: clientErr, errServer: srvErr}
}()
return ch
}
func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) {

View File

@ -1,6 +1,6 @@
//go:build integration
package app_test
package client_test
import (
"cmp"
@ -15,12 +15,10 @@ import (
"testing"
"time"
"git.netflux.io/rob/octoplex/internal/app"
"git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/container/mocks"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/terminal"
"git.netflux.io/rob/octoplex/internal/testhelpers"
"github.com/docker/docker/api/types/network"
dockerclient "github.com/docker/docker/client"
@ -126,26 +124,8 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
})
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(app.Params{
ConfigService: configService,
DockerClient: dockerClient,
Screen: &terminal.Screen{
Screen: screen,
Width: 160,
Height: 25,
CaptureC: screenCaptureC,
},
ClipboardAvailable: false,
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
Logger: logger,
}).Run(ctx))
}()
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
require.EventuallyWithT(
t,
@ -286,13 +266,12 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
printScreen(t, getContents, "After stopping the first destination")
// TODO:
// - Source error
// - Additional features (copy URL, etc.)
cancel()
<-done
result := <-ch
// May be a gRPC error, not context.Canceled:
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
}
func TestIntegrationCustomHost(t *testing.T) {
@ -313,14 +292,8 @@ func TestIntegrationCustomHost(t *testing.T) {
})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
time.Sleep(time.Second)
sendKey(t, screen, tcell.KeyF1, ' ')
@ -360,7 +333,10 @@ func TestIntegrationCustomHost(t *testing.T) {
cancel()
<-done
result := <-ch
// May be a gRPC error, not context.Canceled:
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
}
func TestIntegrationCustomTLSCerts(t *testing.T) {
@ -384,14 +360,8 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
require.EventuallyWithT(
t,
@ -426,7 +396,9 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
cancel()
<-done
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
}
func TestIntegrationRestartDestination(t *testing.T) {
@ -465,14 +437,8 @@ func TestIntegrationRestartDestination(t *testing.T) {
}},
})
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
require.EventuallyWithT(
t,
@ -584,7 +550,9 @@ func TestIntegrationRestartDestination(t *testing.T) {
cancel()
<-done
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
}
func TestIntegrationStartDestinationFailed(t *testing.T) {
@ -602,14 +570,8 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
Destinations: []config.Destination{{Name: "Example server", URL: "rtmp://rtmp.example.com/live"}},
})
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
require.EventuallyWithT(
t,
@ -658,7 +620,9 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
cancel()
<-done
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
}
func TestIntegrationDestinationValidations(t *testing.T) {
@ -675,14 +639,8 @@ func TestIntegrationDestinationValidations(t *testing.T) {
Sources: config.Sources{MediaServer: config.MediaServerSource{StreamKey: "live", RTMP: config.RTMPSource{Enabled: true}}},
})
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
require.EventuallyWithT(
t,
@ -786,7 +744,9 @@ func TestIntegrationDestinationValidations(t *testing.T) {
printScreen(t, getContents, "After entering a duplicate destination URL")
cancel()
<-done
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
}
func TestIntegrationStartupCheck(t *testing.T) {
@ -817,14 +777,8 @@ func TestIntegrationStartupCheck(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
require.EventuallyWithT(
t,
@ -868,7 +822,9 @@ func TestIntegrationStartupCheck(t *testing.T) {
printScreen(t, getContents, "After starting the mediaserver")
cancel()
<-done
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
}
func TestIntegrationMediaServerError(t *testing.T) {
@ -886,18 +842,8 @@ func TestIntegrationMediaServerError(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.EqualError(
t,
app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx),
"media server exited",
)
}()
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
require.EventuallyWithT(
t,
@ -911,10 +857,12 @@ func TestIntegrationMediaServerError(t *testing.T) {
)
printScreen(t, getContents, "Ater displaying the media server error modal")
// Quit the app, this should cause the done channel to receive.
// Quit the app:
sendKey(t, screen, tcell.KeyEnter, ' ')
<-done
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorContains(t, result.errServer, "media server exited")
}
func TestIntegrationDockerClientError(t *testing.T) {
@ -929,18 +877,8 @@ func TestIntegrationDockerClientError(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.EqualError(
t,
app.New(buildAppParams(t, configService, &dockerClient, screen, screenCaptureC, logger)).Run(ctx),
"create container client: network create: boom",
)
}()
client, server := buildClientServer(configService, &dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
require.EventuallyWithT(
t,
@ -954,10 +892,12 @@ func TestIntegrationDockerClientError(t *testing.T) {
)
printScreen(t, getContents, "Ater displaying the fatal error modal")
// Quit the app, this should cause the done channel to receive.
// Quit the app:
sendKey(t, screen, tcell.KeyEnter, ' ')
<-done
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.EqualError(t, result.errServer, "create container client: network create: boom")
}
func TestIntegrationDockerConnectionError(t *testing.T) {
@ -971,16 +911,8 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
err := app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)
require.ErrorContains(t, err, "dial tcp: lookup docker.example.com")
require.ErrorContains(t, err, "no such host")
}()
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
require.EventuallyWithT(
t,
@ -994,10 +926,13 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
)
printScreen(t, getContents, "Ater displaying the fatal error modal")
// Quit the app, this should cause the done channel to receive.
// Quit the app:
sendKey(t, screen, tcell.KeyEnter, ' ')
<-done
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorContains(t, result.errServer, "dial tcp: lookup docker.example.com")
assert.ErrorContains(t, result.errServer, "no such host")
}
func TestIntegrationCopyURLs(t *testing.T) {
@ -1067,14 +1002,8 @@ func TestIntegrationCopyURLs(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: tc.mediaServerConfig}})
screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{})
go func() {
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
ch := runClientServer(ctx, t, client, server)
time.Sleep(3 * time.Second)
printScreen(t, getContents, "Ater loading the app")
@ -1095,7 +1024,9 @@ func TestIntegrationCopyURLs(t *testing.T) {
cancel()
<-done
result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
})
}
}

View File

@ -2,6 +2,7 @@ package event
import (
"log/slog"
"slices"
"sync"
)
@ -31,6 +32,21 @@ func (b *Bus) Register() <-chan Event {
return ch
}
// Deregister deregisters a consumer for all events.
func (b *Bus) Deregister(ch <-chan Event) {
b.mu.Lock()
defer b.mu.Unlock()
b.consumers = slices.DeleteFunc(b.consumers, func(other chan Event) bool {
if ch == other {
close(other)
return true
}
return false
})
}
// Send sends an event to all registered consumers.
func (b *Bus) Send(evt Event) {
// The mutex is needed to ensure the backing array of b.consumers cannot be
@ -43,7 +59,7 @@ func (b *Bus) Send(evt Event) {
select {
case ch <- evt:
default:
b.logger.Warn("Event dropped", "name", evt.name())
b.logger.Warn("Event dropped", "name", evt.EventName())
}
}
}

View File

@ -6,6 +6,7 @@ import (
"git.netflux.io/rob/octoplex/internal/event"
"git.netflux.io/rob/octoplex/internal/testhelpers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestBus(t *testing.T) {
@ -25,5 +26,19 @@ func TestBus(t *testing.T) {
}()
assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent))
bus.Deregister(ch1)
_, ok := <-ch1
assert.False(t, ok)
select {
case <-ch2:
require.Fail(t, "ch2 should be blocking")
default:
}
}

View File

@ -50,6 +50,8 @@ func (c CommandCloseOtherInstance) Name() string {
}
// CommandQuit quits the app.
//
// TODO: consider how this should look in a client/server architecture.
type CommandQuit struct{}
// Name implements the Command interface.

View File

@ -19,7 +19,7 @@ const (
// Event represents something which happened in the appllication.
type Event interface {
name() Name
EventName() Name
}
// AppStateChangedEvent is emitted when the application state changes.
@ -27,7 +27,7 @@ type AppStateChangedEvent struct {
State domain.AppState
}
func (e AppStateChangedEvent) name() Name {
func (e AppStateChangedEvent) EventName() Name {
return EventNameAppStateChanged
}
@ -36,16 +36,17 @@ type DestinationAddedEvent struct {
URL string
}
func (e DestinationAddedEvent) name() Name {
func (e DestinationAddedEvent) EventName() Name {
return EventNameDestinationAdded
}
// AddDestinationFailedEvent is emitted when a destination fails to be added.
type AddDestinationFailedEvent struct {
URL string
Err error
}
func (e AddDestinationFailedEvent) name() Name {
func (e AddDestinationFailedEvent) EventName() Name {
return EventNameAddDestinationFailed
}
@ -55,14 +56,17 @@ type DestinationStreamExitedEvent struct {
Err error
}
func (e DestinationStreamExitedEvent) name() Name {
func (e DestinationStreamExitedEvent) EventName() Name {
return EventNameDestinationStreamExited
}
// StartDestinationFailedEvent is emitted when a destination fails to start.
type StartDestinationFailedEvent struct{}
type StartDestinationFailedEvent struct {
URL string
Message string
}
func (e StartDestinationFailedEvent) name() Name {
func (e StartDestinationFailedEvent) EventName() Name {
return EventNameStartDestinationFailed
}
@ -72,17 +76,18 @@ type DestinationRemovedEvent struct {
URL string
}
func (e DestinationRemovedEvent) name() Name {
func (e DestinationRemovedEvent) EventName() Name {
return EventNameDestinationRemoved
}
// RemoveDestinationFailedEvent is emitted when a destination fails to be
// removed.
type RemoveDestinationFailedEvent struct {
URL string
Err error
}
func (e RemoveDestinationFailedEvent) name() Name {
func (e RemoveDestinationFailedEvent) EventName() Name {
return EventNameRemoveDestinationFailed
}
@ -95,11 +100,11 @@ type FatalErrorOccurredEvent struct {
// OtherInstanceDetectedEvent is emitted when the app launches and detects another instance.
type OtherInstanceDetectedEvent struct{}
func (e OtherInstanceDetectedEvent) name() Name {
func (e OtherInstanceDetectedEvent) EventName() Name {
return EventNameOtherInstanceDetected
}
func (e FatalErrorOccurredEvent) name() Name {
func (e FatalErrorOccurredEvent) EventName() Name {
return "fatal_error_occurred"
}
@ -109,6 +114,6 @@ type MediaServerStartedEvent struct {
RTMPSURL string
}
func (e MediaServerStartedEvent) name() Name {
func (e MediaServerStartedEvent) EventName() Name {
return "media_server_started"
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,137 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v6.30.1
// source: api.proto
package grpc
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// InternalAPIClient is the client API for InternalAPI service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type InternalAPIClient interface {
Communicate(ctx context.Context, opts ...grpc.CallOption) (InternalAPI_CommunicateClient, error)
}
type internalAPIClient struct {
cc grpc.ClientConnInterface
}
func NewInternalAPIClient(cc grpc.ClientConnInterface) InternalAPIClient {
return &internalAPIClient{cc}
}
func (c *internalAPIClient) Communicate(ctx context.Context, opts ...grpc.CallOption) (InternalAPI_CommunicateClient, error) {
stream, err := c.cc.NewStream(ctx, &InternalAPI_ServiceDesc.Streams[0], "/api.InternalAPI/Communicate", opts...)
if err != nil {
return nil, err
}
x := &internalAPICommunicateClient{stream}
return x, nil
}
type InternalAPI_CommunicateClient interface {
Send(*Envelope) error
Recv() (*Envelope, error)
grpc.ClientStream
}
type internalAPICommunicateClient struct {
grpc.ClientStream
}
func (x *internalAPICommunicateClient) Send(m *Envelope) error {
return x.ClientStream.SendMsg(m)
}
func (x *internalAPICommunicateClient) Recv() (*Envelope, error) {
m := new(Envelope)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// InternalAPIServer is the server API for InternalAPI service.
// All implementations must embed UnimplementedInternalAPIServer
// for forward compatibility
type InternalAPIServer interface {
Communicate(InternalAPI_CommunicateServer) error
mustEmbedUnimplementedInternalAPIServer()
}
// UnimplementedInternalAPIServer must be embedded to have forward compatible implementations.
type UnimplementedInternalAPIServer struct {
}
func (UnimplementedInternalAPIServer) Communicate(InternalAPI_CommunicateServer) error {
return status.Errorf(codes.Unimplemented, "method Communicate not implemented")
}
func (UnimplementedInternalAPIServer) mustEmbedUnimplementedInternalAPIServer() {}
// UnsafeInternalAPIServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to InternalAPIServer will
// result in compilation errors.
type UnsafeInternalAPIServer interface {
mustEmbedUnimplementedInternalAPIServer()
}
func RegisterInternalAPIServer(s grpc.ServiceRegistrar, srv InternalAPIServer) {
s.RegisterService(&InternalAPI_ServiceDesc, srv)
}
func _InternalAPI_Communicate_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(InternalAPIServer).Communicate(&internalAPICommunicateServer{stream})
}
type InternalAPI_CommunicateServer interface {
Send(*Envelope) error
Recv() (*Envelope, error)
grpc.ServerStream
}
type internalAPICommunicateServer struct {
grpc.ServerStream
}
func (x *internalAPICommunicateServer) Send(m *Envelope) error {
return x.ServerStream.SendMsg(m)
}
func (x *internalAPICommunicateServer) Recv() (*Envelope, error) {
m := new(Envelope)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// InternalAPI_ServiceDesc is the grpc.ServiceDesc for InternalAPI service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var InternalAPI_ServiceDesc = grpc.ServiceDesc{
ServiceName: "api.InternalAPI",
HandlerType: (*InternalAPIServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "Communicate",
Handler: _InternalAPI_Communicate_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "api.proto",
}

View File

@ -0,0 +1,110 @@
package protocol
import (
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
)
// CommandToProto converts a command to a protobuf message.
func CommandToProto(command event.Command) *pb.Command {
switch evt := command.(type) {
case event.CommandAddDestination:
return buildAddDestinationCommand(evt)
case event.CommandRemoveDestination:
return buildRemoveDestinationCommand(evt)
case event.CommandStartDestination:
return buildStartDestinationCommand(evt)
case event.CommandStopDestination:
return buildStopDestinationCommand(evt)
case event.CommandCloseOtherInstance:
return buildCloseOtherInstanceCommand(evt)
case event.CommandQuit:
return buildQuitCommand(evt)
default:
panic("unknown command type")
}
}
func buildAddDestinationCommand(cmd event.CommandAddDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_AddDestination{AddDestination: &pb.AddDestinationCommand{Name: cmd.DestinationName, Url: cmd.URL}}}
}
func buildRemoveDestinationCommand(cmd event.CommandRemoveDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_RemoveDestination{RemoveDestination: &pb.RemoveDestinationCommand{Url: cmd.URL}}}
}
func buildStartDestinationCommand(cmd event.CommandStartDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_StartDestination{StartDestination: &pb.StartDestinationCommand{Url: cmd.URL}}}
}
func buildStopDestinationCommand(cmd event.CommandStopDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_StopDestination{StopDestination: &pb.StopDestinationCommand{Url: cmd.URL}}}
}
func buildCloseOtherInstanceCommand(event.CommandCloseOtherInstance) *pb.Command {
return &pb.Command{CommandType: &pb.Command_CloseOtherInstances{CloseOtherInstances: &pb.CloseOtherInstancesCommand{}}}
}
func buildQuitCommand(event.CommandQuit) *pb.Command {
return &pb.Command{CommandType: &pb.Command_Quit{Quit: &pb.QuitCommand{}}}
}
// CommandFromProto converts a protobuf message to a command.
func CommandFromProto(pbCmd *pb.Command) event.Command {
if pbCmd == nil || pbCmd.CommandType == nil {
panic("invalid or nil pb.Command")
}
switch cmd := pbCmd.CommandType.(type) {
case *pb.Command_AddDestination:
return parseAddDestinationCommand(cmd.AddDestination)
case *pb.Command_RemoveDestination:
return parseRemoveDestinationCommand(cmd.RemoveDestination)
case *pb.Command_StartDestination:
return parseStartDestinationCommand(cmd.StartDestination)
case *pb.Command_StopDestination:
return parseStopDestinationCommand(cmd.StopDestination)
case *pb.Command_CloseOtherInstances:
return parseCloseOtherInstanceCommand(cmd.CloseOtherInstances)
case *pb.Command_Quit:
return parseQuitCommand(cmd.Quit)
default:
panic("unknown pb.Command type")
}
}
func parseAddDestinationCommand(cmd *pb.AddDestinationCommand) event.Command {
if cmd == nil {
panic("nil AddDestinationCommand")
}
return event.CommandAddDestination{DestinationName: cmd.Name, URL: cmd.Url}
}
func parseRemoveDestinationCommand(cmd *pb.RemoveDestinationCommand) event.Command {
if cmd == nil {
panic("nil RemoveDestinationCommand")
}
return event.CommandRemoveDestination{URL: cmd.Url}
}
func parseStartDestinationCommand(cmd *pb.StartDestinationCommand) event.Command {
if cmd == nil {
panic("nil StartDestinationCommand")
}
return event.CommandStartDestination{URL: cmd.Url}
}
func parseStopDestinationCommand(cmd *pb.StopDestinationCommand) event.Command {
if cmd == nil {
panic("nil StopDestinationCommand")
}
return event.CommandStopDestination{URL: cmd.Url}
}
func parseCloseOtherInstanceCommand(_ *pb.CloseOtherInstancesCommand) event.Command {
return event.CommandCloseOtherInstance{}
}
func parseQuitCommand(_ *pb.QuitCommand) event.Command {
return event.CommandQuit{}
}

121
internal/protocol/domain.go Normal file
View File

@ -0,0 +1,121 @@
package protocol
import (
"errors"
"git.netflux.io/rob/octoplex/internal/domain"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
func containerToProto(c domain.Container) *pb.Container {
var errString string
if c.Err != nil {
errString = c.Err.Error()
}
var exitCode *int32
if c.ExitCode != nil {
code := int32(*c.ExitCode)
exitCode = &code
}
return &pb.Container{
Id: c.ID,
Status: c.Status,
HealthState: c.HealthState,
CpuPercent: c.CPUPercent,
MemoryUsageBytes: c.MemoryUsageBytes,
RxRate: int32(c.RxRate),
TxRate: int32(c.TxRate),
RxSince: timestamppb.New(c.RxSince),
ImageName: c.ImageName,
PullStatus: c.PullStatus,
PullProgress: c.PullProgress,
PullPercent: int32(c.PullPercent),
RestartCount: int32(c.RestartCount),
ExitCode: exitCode,
Err: errString,
}
}
func protoToContainer(pbCont *pb.Container) domain.Container {
if pbCont == nil {
return domain.Container{}
}
var exitCode *int
if pbCont.ExitCode != nil {
val := int(*pbCont.ExitCode)
exitCode = &val
}
var err error
if pbCont.Err != "" {
err = errors.New(pbCont.Err)
}
return domain.Container{
ID: pbCont.Id,
Status: pbCont.Status,
HealthState: pbCont.HealthState,
CPUPercent: pbCont.CpuPercent,
MemoryUsageBytes: pbCont.MemoryUsageBytes,
RxRate: int(pbCont.RxRate),
TxRate: int(pbCont.TxRate),
RxSince: pbCont.RxSince.AsTime(),
ImageName: pbCont.ImageName,
PullStatus: pbCont.PullStatus,
PullProgress: pbCont.PullProgress,
PullPercent: int(pbCont.PullPercent),
RestartCount: int(pbCont.RestartCount),
ExitCode: exitCode,
Err: err,
}
}
func destinationsToProto(inDests []domain.Destination) []*pb.Destination {
destinations := make([]*pb.Destination, 0, len(inDests))
for _, d := range inDests {
destinations = append(destinations, destinationToProto(d))
}
return destinations
}
func destinationToProto(d domain.Destination) *pb.Destination {
return &pb.Destination{
Container: containerToProto(d.Container),
Status: destinationStatusToProto(d.Status),
Name: d.Name,
Url: d.URL,
}
}
func protoToDestinations(pbDests []*pb.Destination) []domain.Destination {
if pbDests == nil {
return nil
}
dests := make([]domain.Destination, 0, len(pbDests))
for _, pbDest := range pbDests {
if pbDest == nil {
continue
}
dests = append(dests, domain.Destination{
Container: protoToContainer(pbDest.Container),
Status: domain.DestinationStatus(pbDest.Status), // direct cast, same underlying int
Name: pbDest.Name,
URL: pbDest.Url,
})
}
return dests
}
func destinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status {
switch s {
case domain.DestinationStatusStarting:
return pb.Destination_STATUS_STARTING
case domain.DestinationStatusLive:
return pb.Destination_STATUS_LIVE
default:
return pb.Destination_STATUS_OFF_AIR
}
}

252
internal/protocol/event.go Normal file
View File

@ -0,0 +1,252 @@
package protocol
import (
"errors"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
// EventToProto converts an event to a protobuf message.
func EventToProto(ev event.Event) *pb.Event {
switch evt := ev.(type) {
case event.AppStateChangedEvent:
return buildAppStateChangeEvent(evt)
case event.DestinationAddedEvent:
return buildDestinationAddedEvent(evt)
case event.AddDestinationFailedEvent:
return buildAddDestinationFailedEvent(evt)
case event.DestinationStreamExitedEvent:
return buildDestinationStreamExitedEvent(evt)
case event.StartDestinationFailedEvent:
return buildStartDestinationFailedEvent(evt)
case event.DestinationRemovedEvent:
return buildDestinationRemovedEvent(evt)
case event.RemoveDestinationFailedEvent:
return buildRemoveDestinationFailedEvent(evt)
case event.FatalErrorOccurredEvent:
return buildFatalErrorOccurredEvent(evt)
case event.OtherInstanceDetectedEvent:
return buildOtherInstanceDetectedEvent(evt)
case event.MediaServerStartedEvent:
return buildMediaServerStartedEvent(evt)
default:
panic("unknown event type")
}
}
func buildAppStateChangeEvent(evt event.AppStateChangedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_AppStateChanged{
AppStateChanged: &pb.AppStateChangedEvent{
AppState: &pb.AppState{
Source: &pb.Source{
Container: containerToProto(evt.State.Source.Container),
Live: evt.State.Source.Live,
LiveChangedAt: timestamppb.New(evt.State.Source.LiveChangedAt),
Tracks: evt.State.Source.Tracks,
ExitReason: evt.State.Source.ExitReason,
},
Destinations: destinationsToProto(evt.State.Destinations),
BuildInfo: &pb.BuildInfo{
GoVersion: evt.State.BuildInfo.GoVersion,
Version: evt.State.BuildInfo.Version,
Commit: evt.State.BuildInfo.Commit,
Date: evt.State.BuildInfo.Date,
},
},
},
},
}
}
func buildDestinationAddedEvent(evt event.DestinationAddedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_DestinationAdded{
DestinationAdded: &pb.DestinationAddedEvent{Url: evt.URL},
},
}
}
func buildAddDestinationFailedEvent(evt event.AddDestinationFailedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_AddDestinationFailed{
AddDestinationFailed: &pb.AddDestinationFailedEvent{Url: evt.URL, Error: evt.Err.Error()},
},
}
}
func buildDestinationStreamExitedEvent(evt event.DestinationStreamExitedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_DestinationStreamExited{
DestinationStreamExited: &pb.DestinationStreamExitedEvent{Name: evt.Name, Error: evt.Err.Error()},
},
}
}
func buildStartDestinationFailedEvent(evt event.StartDestinationFailedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_StartDestinationFailed{
StartDestinationFailed: &pb.StartDestinationFailedEvent{Url: evt.URL, Message: evt.Message},
},
}
}
func buildDestinationRemovedEvent(evt event.DestinationRemovedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_DestinationRemoved{
DestinationRemoved: &pb.DestinationRemovedEvent{Url: evt.URL},
},
}
}
func buildRemoveDestinationFailedEvent(evt event.RemoveDestinationFailedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_RemoveDestinationFailed{
RemoveDestinationFailed: &pb.RemoveDestinationFailedEvent{Url: evt.URL, Error: evt.Err.Error()},
},
}
}
func buildFatalErrorOccurredEvent(evt event.FatalErrorOccurredEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_FatalError{
FatalError: &pb.FatalErrorEvent{Message: evt.Message},
},
}
}
func buildOtherInstanceDetectedEvent(_ event.OtherInstanceDetectedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_OtherInstanceDetected{
OtherInstanceDetected: &pb.OtherInstanceDetectedEvent{},
},
}
}
func buildMediaServerStartedEvent(evt event.MediaServerStartedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_MediaServerStarted{
MediaServerStarted: &pb.MediaServerStartedEvent{RtmpUrl: evt.RTMPURL, RtmpsUrl: evt.RTMPSURL},
},
}
}
// EventFromProto converts a protobuf message to an event.
func EventFromProto(pbEv *pb.Event) event.Event {
if pbEv == nil || pbEv.EventType == nil {
panic("invalid or nil pb.Event")
}
switch evt := pbEv.EventType.(type) {
case *pb.Event_AppStateChanged:
return parseAppStateChangedEvent(evt.AppStateChanged)
case *pb.Event_DestinationAdded:
return parseDestinationAddedEvent(evt.DestinationAdded)
case *pb.Event_AddDestinationFailed:
return parseAddDestinationFailedEvent(evt.AddDestinationFailed)
case *pb.Event_DestinationStreamExited:
return parseDestinationStreamExitedEvent(evt.DestinationStreamExited)
case *pb.Event_StartDestinationFailed:
return parseStartDestinationFailedEvent(evt.StartDestinationFailed)
case *pb.Event_DestinationRemoved:
return parseDestinationRemovedEvent(evt.DestinationRemoved)
case *pb.Event_RemoveDestinationFailed:
return parseRemoveDestinationFailedEvent(evt.RemoveDestinationFailed)
case *pb.Event_FatalError:
return parseFatalErrorOccurredEvent(evt.FatalError)
case *pb.Event_OtherInstanceDetected:
return parseOtherInstanceDetectedEvent(evt.OtherInstanceDetected)
case *pb.Event_MediaServerStarted:
return parseMediaServerStartedEvent(evt.MediaServerStarted)
default:
panic("unknown pb.Event type")
}
}
func parseAppStateChangedEvent(evt *pb.AppStateChangedEvent) event.Event {
if evt == nil || evt.AppState == nil || evt.AppState.Source == nil {
panic("invalid AppStateChangedEvent")
}
return event.AppStateChangedEvent{
State: domain.AppState{
Source: domain.Source{
Container: protoToContainer(evt.AppState.Source.Container),
Live: evt.AppState.Source.Live,
LiveChangedAt: evt.AppState.Source.LiveChangedAt.AsTime(),
Tracks: evt.AppState.Source.Tracks,
ExitReason: evt.AppState.Source.ExitReason,
},
Destinations: protoToDestinations(evt.AppState.Destinations),
BuildInfo: domain.BuildInfo{
GoVersion: evt.AppState.BuildInfo.GoVersion,
Version: evt.AppState.BuildInfo.Version,
Commit: evt.AppState.BuildInfo.Commit,
Date: evt.AppState.BuildInfo.Date,
},
},
}
}
func parseDestinationAddedEvent(evt *pb.DestinationAddedEvent) event.Event {
if evt == nil {
panic("nil DestinationAddedEvent")
}
return event.DestinationAddedEvent{URL: evt.Url}
}
func parseAddDestinationFailedEvent(evt *pb.AddDestinationFailedEvent) event.Event {
if evt == nil {
panic("nil AddDestinationFailedEvent")
}
return event.AddDestinationFailedEvent{URL: evt.Url, Err: errors.New(evt.Error)}
}
func parseDestinationStreamExitedEvent(evt *pb.DestinationStreamExitedEvent) event.Event {
if evt == nil {
panic("nil DestinationStreamExitedEvent")
}
return event.DestinationStreamExitedEvent{Name: evt.Name, Err: errors.New(evt.Error)}
}
func parseStartDestinationFailedEvent(evt *pb.StartDestinationFailedEvent) event.Event {
if evt == nil {
panic("nil StartDestinationFailedEvent")
}
return event.StartDestinationFailedEvent{URL: evt.Url, Message: evt.Message}
}
func parseDestinationRemovedEvent(evt *pb.DestinationRemovedEvent) event.Event {
if evt == nil {
panic("nil DestinationRemovedEvent")
}
return event.DestinationRemovedEvent{URL: evt.Url}
}
func parseRemoveDestinationFailedEvent(evt *pb.RemoveDestinationFailedEvent) event.Event {
if evt == nil {
panic("nil RemoveDestinationFailedEvent")
}
return event.RemoveDestinationFailedEvent{URL: evt.Url, Err: errors.New(evt.Error)}
}
func parseFatalErrorOccurredEvent(evt *pb.FatalErrorEvent) event.Event {
if evt == nil {
panic("nil FatalErrorEvent")
}
return event.FatalErrorOccurredEvent{Message: evt.Message}
}
func parseOtherInstanceDetectedEvent(_ *pb.OtherInstanceDetectedEvent) event.Event {
return event.OtherInstanceDetectedEvent{}
}
func parseMediaServerStartedEvent(evt *pb.MediaServerStartedEvent) event.Event {
if evt == nil {
panic("nil MediaServerStartedEvent")
}
return event.MediaServerStartedEvent{RTMPURL: evt.RtmpUrl, RTMPSURL: evt.RtmpsUrl}
}

151
internal/server/grpc.go Normal file
View File

@ -0,0 +1,151 @@
package server
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"sync"
"time"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/protocol"
"golang.org/x/sync/errgroup"
)
// APIListenerCountDeltaFunc is a function that is called when the number of
// API clients increments or decrements.
type APIClientCountDeltaFunc func(delta int)
// Server is the gRPC server that handles incoming commands and outgoing
// events.
type Server struct {
pb.UnimplementedInternalAPIServer
dispatcher func(event.Command)
bus *event.Bus
logger *slog.Logger
mu sync.Mutex
clientCount int
clientC chan struct{}
}
// newServer creates a new gRPC server.
func newServer(
dispatcher func(event.Command),
bus *event.Bus,
logger *slog.Logger,
) *Server {
return &Server{
dispatcher: dispatcher,
bus: bus,
clientC: make(chan struct{}, 1),
logger: logger.With("component", "server"),
}
}
func (s *Server) Communicate(stream pb.InternalAPI_CommunicateServer) error {
g, ctx := errgroup.WithContext(stream.Context())
// perform handshake:
startHandshakeCmd, err := stream.Recv()
if err != nil {
return fmt.Errorf("receive start handshake command: %w", err)
}
if startHandshakeCmd.GetCommand() == nil || startHandshakeCmd.GetCommand().GetStartHandshake() == nil {
return fmt.Errorf("expected start handshake command but got: %T", startHandshakeCmd)
}
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: &pb.Event{EventType: &pb.Event_HandshakeCompleted{}}}}); err != nil {
return fmt.Errorf("send handshake completed event: %w", err)
}
// Notify that a client has connected and completed the handshake.
select {
case s.clientC <- struct{}{}:
default:
}
g.Go(func() error {
eventsC := s.bus.Register()
defer s.bus.Deregister(eventsC)
for {
select {
case evt := <-eventsC:
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: protocol.EventToProto(evt)}}); err != nil {
return fmt.Errorf("send event: %w", err)
}
case <-ctx.Done():
return ctx.Err()
}
}
})
g.Go(func() error {
s.mu.Lock()
s.clientCount++
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.clientCount--
s.mu.Unlock()
}()
for {
in, err := stream.Recv()
if err == io.EOF {
s.logger.Info("Client disconnected")
return err
}
if err != nil {
return fmt.Errorf("receive message: %w", err)
}
switch pbCmd := in.Payload.(type) {
case *pb.Envelope_Command:
cmd := protocol.CommandFromProto(pbCmd.Command)
s.logger.Info("Received command", "command", cmd.Name())
s.dispatcher(cmd)
default:
return fmt.Errorf("expected command but got: %T", pbCmd)
}
}
})
if err := g.Wait(); err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
s.logger.Error("Client stream closed with error", "err", err)
return fmt.Errorf("errgroup.Wait: %w", err)
}
s.logger.Info("Client stream closed")
return nil
}
// GetClientCount returns the number of connected clients.
func (s *Server) GetClientCount() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.clientCount
}
const waitForClientTimeout = 10 * time.Second
// WaitForClient waits for _any_ client to connect and complete the handshake.
// It times out if no client has connected after 10 seconds.
func (s *Server) WaitForClient(ctx context.Context) error {
select {
case <-s.clientC:
return nil
case <-time.After(waitForClientTimeout):
return errors.New("timeout")
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -1,11 +1,13 @@
package app
package server
import (
"cmp"
"context"
"errors"
"fmt"
"log"
"log/slog"
"net"
"slices"
"time"
@ -13,38 +15,32 @@ import (
"git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/mediaserver"
"git.netflux.io/rob/octoplex/internal/replicator"
"git.netflux.io/rob/octoplex/internal/terminal"
"github.com/docker/docker/client"
"google.golang.org/grpc"
)
// App is an instance of the app.
type App struct {
cfg config.Config
configService *config.Service
eventBus *event.Bus
dispatchC chan event.Command
dockerClient container.DockerClient
screen *terminal.Screen // Screen may be nil.
headless bool
clipboardAvailable bool
configFilePath string
buildInfo domain.BuildInfo
logger *slog.Logger
cfg config.Config
configService *config.Service
eventBus *event.Bus
dispatchC chan event.Command
dockerClient container.DockerClient
waitForClient bool
logger *slog.Logger
}
// Params holds the parameters for running the application.
type Params struct {
ConfigService *config.Service
DockerClient container.DockerClient
ChanSize int
Screen *terminal.Screen // Screen may be nil.
Headless bool
ClipboardAvailable bool
ConfigFilePath string
BuildInfo domain.BuildInfo
Logger *slog.Logger
ConfigService *config.Service
DockerClient container.DockerClient
ChanSize int
ConfigFilePath string
WaitForClient bool
Logger *slog.Logger
}
// defaultChanSize is the default size of the dispatch channel.
@ -53,17 +49,13 @@ const defaultChanSize = 64
// New creates a new application instance.
func New(params Params) *App {
return &App{
cfg: params.ConfigService.Current(),
configService: params.ConfigService,
eventBus: event.NewBus(params.Logger.With("component", "event_bus")),
dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)),
dockerClient: params.DockerClient,
screen: params.Screen,
headless: params.Headless,
clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo,
logger: params.Logger,
cfg: params.ConfigService.Current(),
configService: params.ConfigService,
eventBus: event.NewBus(params.Logger.With("component", "event_bus")),
dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)),
dockerClient: params.DockerClient,
waitForClient: params.WaitForClient,
logger: params.Logger,
}
}
@ -78,20 +70,26 @@ func (a *App) Run(ctx context.Context) error {
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
}
if !a.headless {
ui, err := terminal.StartUI(ctx, terminal.StartParams{
EventBus: a.eventBus,
Dispatcher: func(cmd event.Command) { a.dispatchC <- cmd },
Screen: a.screen,
ClipboardAvailable: a.clipboardAvailable,
ConfigFilePath: a.configFilePath,
BuildInfo: a.buildInfo,
Logger: a.logger.With("component", "ui"),
})
if err != nil {
return fmt.Errorf("start terminal user interface: %w", err)
const grpcAddr = ":50051"
lis, err := net.Listen("tcp", grpcAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
defer lis.Close()
grpcServer := grpc.NewServer()
grpcDone := make(chan error, 1)
internalAPI := newServer(a.DispatchAsync, a.eventBus, a.logger)
pb.RegisterInternalAPIServer(grpcServer, internalAPI)
go func() {
a.logger.Info("gRPC server started", "addr", grpcAddr)
grpcDone <- grpcServer.Serve(lis)
}()
if a.waitForClient {
if err = internalAPI.WaitForClient(ctx); err != nil {
return fmt.Errorf("wait for client: %w", err)
}
defer ui.Close()
}
// emptyUI is a dummy function that sets the UI state to an empty state, and
@ -107,12 +105,13 @@ func (a *App) Run(ctx context.Context) error {
a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}})
}
// doFatalError publishes a fatal error to the event bus, waiting for the
// user to acknowledge it if not in headless mode.
// doFatalError publishes a fatal error to the event bus. It will block until
// the user acknowledges it if there is 1 or more clients connected to the
// internal API.
doFatalError := func(msg string) {
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
if a.headless {
if internalAPI.GetClientCount() == 0 {
return
}
@ -177,21 +176,20 @@ func (a *App) Run(ctx context.Context) error {
defer uiUpdateT.Stop()
startMediaServerC := make(chan struct{}, 1)
if a.headless { // disable startup check in headless mode for now
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
doFatalError(startupErr.Error())
return startupErr
} else if ok {
startMediaServerC <- struct{}{}
} else {
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
doFatalError(startupErr.Error())
return startupErr
} else if ok {
startMediaServerC <- struct{}{}
}
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case grpcErr := <-grpcDone:
a.logger.Error("gRPC server exited", "err", grpcErr)
return grpcErr
case <-startMediaServerC:
if err = srv.Start(ctx); err != nil {
return fmt.Errorf("start mediaserver: %w", err)
@ -245,6 +243,11 @@ func (a *App) Dispatch(cmd event.Command) event.Event {
return <-ch
}
// DispatchAsync dispatches a command to be executed synchronously.
func (a *App) DispatchAsync(cmd event.Command) {
a.dispatchC <- cmd
}
// errExit is an error that indicates the app should exit.
var errExit = errors.New("exit")
@ -272,6 +275,10 @@ func (a *App) handleCommand(
}
}()
if c, ok := cmd.(syncCommand); ok {
cmd = c.Command
}
switch c := cmd.(type) {
case event.CommandAddDestination:
newCfg := a.cfg
@ -281,10 +288,11 @@ func (a *App) handleCommand(
})
if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Add destination failed", "err", err)
return event.AddDestinationFailedEvent{Err: err}, nil
return event.AddDestinationFailedEvent{URL: c.URL, Err: err}, nil
}
a.cfg = newCfg
a.handleConfigUpdate(state)
a.logger.Info("Destination added", "url", c.URL)
a.eventBus.Send(event.DestinationAddedEvent{URL: c.URL})
case event.CommandRemoveDestination:
repl.StopDestination(c.URL) // no-op if not live
@ -294,7 +302,7 @@ func (a *App) handleCommand(
})
if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Remove destination failed", "err", err)
a.eventBus.Send(event.RemoveDestinationFailedEvent{Err: err})
a.eventBus.Send(event.RemoveDestinationFailedEvent{URL: c.URL, Err: err})
break
}
a.cfg = newCfg
@ -302,7 +310,7 @@ func (a *App) handleCommand(
a.eventBus.Send(event.DestinationRemovedEvent{URL: c.URL}) //nolint:gosimple
case event.CommandStartDestination:
if !state.Source.Live {
a.eventBus.Send(event.StartDestinationFailedEvent{})
a.eventBus.Send(event.StartDestinationFailedEvent{URL: c.URL, Message: "source not live"})
break
}

View File

@ -1,4 +1,4 @@
package app
package server
import (
"testing"

View File

@ -3,6 +3,7 @@ package terminal
import (
"cmp"
"context"
"errors"
"fmt"
"log/slog"
"maps"
@ -44,9 +45,9 @@ type UI struct {
eventBus *event.Bus
dispatch func(event.Command)
clipboardAvailable bool
configFilePath string
rtmpURL, rtmpsURL string
buildInfo domain.BuildInfo
appExitC chan error
logger *slog.Logger
// tview state
@ -92,20 +93,20 @@ type ScreenCapture struct {
Width, Height int
}
// StartParams contains the parameters for starting a new terminal user
// Params contains the parameters for starting a new terminal user
// interface.
type StartParams struct {
type Params struct {
EventBus *event.Bus
Dispatcher func(event.Command)
Logger *slog.Logger
ClipboardAvailable bool
ConfigFilePath string
BuildInfo domain.BuildInfo
Screen *Screen // Screen may be nil.
}
// StartUI starts the terminal user interface.
func StartUI(ctx context.Context, params StartParams) (*UI, error) {
// NewUI creates the user interface. Call [Run] on the *UI instance to block
// until it is completed.
func NewUI(ctx context.Context, params Params) (*UI, error) {
app := tview.NewApplication()
var screen tcell.Screen
@ -211,7 +212,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
eventBus: params.EventBus,
dispatch: params.Dispatcher,
clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath,
appExitC: make(chan error, 1),
buildInfo: params.BuildInfo,
logger: params.Logger,
app: app,
@ -237,8 +238,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
app.SetInputCapture(ui.inputCaptureHandler)
app.SetAfterDrawFunc(ui.afterDrawHandler)
go ui.run(ctx)
return ui, nil
}
@ -262,32 +261,32 @@ func (ui *UI) renderAboutView() {
ui.aboutView.AddItem(rtmpsURLView, 1, 0, false)
}
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
}
func (ui *UI) run(ctx context.Context) {
defer func() {
// Ensure the application is stopped when the UI is closed.
ui.dispatch(event.CommandQuit{})
}()
var ErrUserClosed = errors.New("user closed UI")
// Run runs the user interface. It always returns a non-nil error, which will
// be [ErrUserClosed] if the user voluntarily closed the UI.
func (ui *UI) Run(ctx context.Context) error {
eventC := ui.eventBus.Register()
defer ui.eventBus.Deregister(eventC)
uiDone := make(chan struct{})
go func() {
defer func() {
uiDone <- struct{}{}
}()
if err := ui.app.Run(); err != nil {
ui.logger.Error("tui application error", "err", err)
err := ui.app.Run()
if err != nil {
ui.logger.Error("Error in UI run loop, exiting", "err", err)
}
ui.appExitC <- err
}()
for {
select {
case evt := <-eventC:
case evt, ok := <-eventC:
if !ok {
// should never happen
return errors.New("event channel closed")
}
ui.app.QueueUpdateDraw(func() {
switch evt := evt.(type) {
case event.AppStateChangedEvent:
@ -313,12 +312,11 @@ func (ui *UI) run(ctx context.Context) {
default:
ui.logger.Warn("unhandled event", "event", evt)
}
})
case <-ctx.Done():
return
case <-uiDone:
return
return ctx.Err()
case err := <-ui.appExitC:
return cmp.Or(err, ErrUserClosed)
}
}
}
@ -358,8 +356,6 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
return nil
case ' ':
ui.toggleDestination()
case 'c', 'C':
ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath)
case '?':
ui.showAbout()
case 'k': // tview vim bindings
@ -825,6 +821,11 @@ func (ui *UI) Close() {
ui.app.Stop()
}
// Wait waits for the terminal user interface to finish.
func (ui *UI) Wait() {
<-ui.appExitC
}
func (ui *UI) addDestination() {
const (
inputLen = 60
@ -1006,28 +1007,6 @@ func (ui *UI) copySourceURLToClipboard(url string) {
)
}
func (ui *UI) copyConfigFilePathToClipboard(clipboardAvailable bool, configFilePath string) {
var text string
if clipboardAvailable {
if configFilePath != "" {
clipboard.Write(clipboard.FmtText, []byte(configFilePath))
text = "Configuration file path copied to clipboard:\n\n" + configFilePath
} else {
text = "Configuration file path not set"
}
} else {
text = "Copy to clipboard not available"
}
ui.showModal(
pageNameModalClipboard,
text,
[]string{"Ok"},
false,
nil,
)
}
func (ui *UI) confirmQuit() {
ui.showModal(
pageNameModalQuit,
@ -1036,7 +1015,7 @@ func (ui *UI) confirmQuit() {
false,
func(buttonIndex int, _ string) {
if buttonIndex == 0 {
ui.dispatch(event.CommandQuit{})
ui.app.Stop()
}
},
)

330
main.go
View File

@ -4,21 +4,23 @@ import (
"cmp"
"context"
"errors"
"flag"
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"os/signal"
"runtime"
"runtime/debug"
"syscall"
"git.netflux.io/rob/octoplex/internal/app"
"git.netflux.io/rob/octoplex/internal/client"
"git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/server"
dockerclient "github.com/docker/docker/client"
"github.com/urfave/cli/v2"
"golang.design/x/clipboard"
"golang.org/x/sync/errgroup"
)
var (
@ -30,23 +32,108 @@ var (
date string
)
var errShutdown = errors.New("shutdown")
// errInterrupt is an error type that indicates an interrupt signal was
// received.
type errInterrupt struct{}
func main() {
var exitStatus int
if err := run(); errors.Is(err, errShutdown) {
exitStatus = 130
} else if err != nil {
exitStatus = 1
_, _ = os.Stderr.WriteString("Error: " + err.Error() + "\n")
}
os.Exit(exitStatus)
// Error implements the error interface.
func (e errInterrupt) Error() string {
return "interrupt signal received"
}
func run() error {
ctx, cancel := context.WithCancelCause(context.Background())
// ExitCode implements the ExitCoder interface.
func (e errInterrupt) ExitCode() int {
return 130
}
func main() {
app := &cli.App{
Name: "Octoplex",
Usage: "Octoplex is a live video restreamer for Docker.",
Commands: []*cli.Command{
{
Name: "client",
Usage: "Run the client",
Action: func(c *cli.Context) error {
return runClient(c.Context, c)
},
},
{
Name: "server",
Usage: "Run the server",
Action: func(c *cli.Context) error {
return runServer(c.Context, c, serverConfig{
stderrAvailable: true,
handleSigInt: true,
waitForClient: false,
})
},
},
{
Name: "run",
Usage: "Run server and client together (testing)",
Action: func(c *cli.Context) error {
return runClientAndServer(c)
},
},
},
}
if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}
func runClient(ctx context.Context, _ *cli.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// TODO: logger from config
fptr, err := os.OpenFile("octoplex.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return fmt.Errorf("open log file: %w", err)
}
logger := slog.New(slog.NewTextHandler(fptr, nil))
logger.Info("Starting client", "version", cmp.Or(version, "devel"), "commit", cmp.Or(commit, "unknown"), "date", cmp.Or(date, "unknown"), "go_version", runtime.Version())
var clipboardAvailable bool
if err = clipboard.Init(); err != nil {
logger.Warn("Clipboard not available", "err", err)
} else {
clipboardAvailable = true
}
buildInfo, ok := debug.ReadBuildInfo()
if !ok {
return fmt.Errorf("read build info: %w", err)
}
app := client.New(client.NewParams{
ClipboardAvailable: clipboardAvailable,
BuildInfo: domain.BuildInfo{
GoVersion: buildInfo.GoVersion,
Version: version,
Commit: commit,
Date: date,
},
Logger: logger,
})
if err := app.Run(ctx); err != nil {
return fmt.Errorf("run app: %w", err)
}
return nil
}
type serverConfig struct {
stderrAvailable bool
handleSigInt bool
waitForClient bool
}
func runServer(ctx context.Context, _ *cli.Context, serverCfg serverConfig) error {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)
configService, err := config.NewDefaultService()
@ -54,45 +141,35 @@ func run() error {
return fmt.Errorf("build config service: %w", err)
}
help := flag.Bool("h", false, "Show help")
flag.Parse()
if *help {
printUsage()
return nil
}
if narg := flag.NArg(); narg > 1 {
printUsage()
return fmt.Errorf("too many arguments")
} else if narg == 1 {
switch flag.Arg(0) {
case "edit-config":
return editConfigFile(configService)
case "print-config":
return printConfigPath(configService.Path())
case "version":
return printVersion()
case "help":
printUsage()
return nil
}
}
cfg, err := configService.ReadOrCreateConfig()
if err != nil {
return fmt.Errorf("read or create config: %w", err)
}
headless := os.Getenv("OCTO_HEADLESS") != ""
logger, err := buildLogger(cfg.LogFile, headless)
if err != nil {
return fmt.Errorf("build logger: %w", err)
// TODO: improve logger API
// Currently it's a bit complicated because we can only use stdout - the
// preferred destination - if the client is not running. Otherwise we
// fallback to the legacy configuration but this should be bought more
// in-line with the client/server split.
var w io.Writer
if serverCfg.stderrAvailable {
w = os.Stdout
} else if !cfg.LogFile.Enabled {
w = io.Discard
} else {
w, err = os.OpenFile(cfg.LogFile.GetPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return fmt.Errorf("error opening log file: %w", err)
}
}
if headless {
// When running in headless mode tview doesn't handle SIGINT for us.
var handlerOpts slog.HandlerOptions
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
logger := slog.New(slog.NewTextHandler(w, &handlerOpts))
if serverCfg.handleSigInt {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
@ -100,17 +177,10 @@ func run() error {
<-ch
logger.Info("Received interrupt signal, exiting")
signal.Stop(ch)
cancel(errShutdown)
cancel(errInterrupt{})
}()
}
var clipboardAvailable bool
if err = clipboard.Init(); err != nil {
logger.Warn("Clipboard not available", "err", err)
} else {
clipboardAvailable = true
}
dockerClient, err := dockerclient.NewClientWithOpts(
dockerclient.FromEnv,
dockerclient.WithAPIVersionNegotiation(),
@ -119,102 +189,64 @@ func run() error {
return fmt.Errorf("new docker client: %w", err)
}
buildInfo, ok := debug.ReadBuildInfo()
if !ok {
return fmt.Errorf("read build info: %w", err)
}
app := app.New(app.Params{
ConfigService: configService,
DockerClient: dockerClient,
Headless: headless,
ClipboardAvailable: clipboardAvailable,
ConfigFilePath: configService.Path(),
BuildInfo: domain.BuildInfo{
GoVersion: buildInfo.GoVersion,
Version: version,
Commit: commit,
Date: date,
},
Logger: logger,
app := server.New(server.Params{
ConfigService: configService,
DockerClient: dockerClient,
ConfigFilePath: configService.Path(),
WaitForClient: serverCfg.waitForClient,
Logger: logger,
})
return app.Run(ctx)
}
logger.Info(
"Starting server",
"version",
cmp.Or(version, "devel"),
"commit",
cmp.Or(commit, "unknown"),
"date",
cmp.Or(date, "unknown"),
"go_version",
runtime.Version(),
)
// editConfigFile opens the config file in the user's editor.
func editConfigFile(configService *config.Service) error {
if _, err := configService.ReadOrCreateConfig(); err != nil {
return fmt.Errorf("read or create config: %w", err)
}
editor := os.Getenv("EDITOR")
if editor == "" {
editor = "vi"
}
binary, err := exec.LookPath(editor)
if err != nil {
return fmt.Errorf("look path: %w", err)
}
fmt.Fprintf(os.Stderr, "Editing config file: %s\n", configService.Path())
fmt.Println(binary)
if err := syscall.Exec(binary, []string{"--", configService.Path()}, os.Environ()); err != nil {
return fmt.Errorf("exec: %w", err)
}
return nil
}
// printConfigPath prints the path to the config file to stderr.
func printConfigPath(configPath string) error {
fmt.Fprintln(os.Stderr, configPath)
return nil
}
// printVersion prints the version of the application to stderr.
func printVersion() error {
fmt.Fprintf(os.Stderr, "%s version %s\n", domain.AppName, cmp.Or(version, "0.0.0-dev"))
return nil
}
func printUsage() {
os.Stderr.WriteString("Usage: octoplex [command]\n\n")
os.Stderr.WriteString("Commands:\n\n")
os.Stderr.WriteString(" edit-config Edit the config file\n")
os.Stderr.WriteString(" print-config Print the path to the config file\n")
os.Stderr.WriteString(" version Print the version of the application\n")
os.Stderr.WriteString(" help Print this help message\n")
os.Stderr.WriteString("\n")
os.Stderr.WriteString("Additionally, Octoplex can be configured with the following environment variables:\n\n")
os.Stderr.WriteString(" OCTO_DEBUG Enables debug logging if set\n")
os.Stderr.WriteString(" OCTO_HEADLESS Enables headless mode if set (experimental)\n\n")
}
// buildLogger builds the logger, which may be a no-op logger.
func buildLogger(cfg config.LogFile, headless bool) (*slog.Logger, error) {
build := func(w io.Writer) *slog.Logger {
var handlerOpts slog.HandlerOptions
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
if err := app.Run(ctx); err != nil {
if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ctx), errInterrupt{}) {
return context.Cause(ctx)
}
return slog.New(slog.NewTextHandler(w, &handlerOpts))
return err
}
// In headless mode, always log to stderr.
if headless {
return build(os.Stderr), nil
}
if !cfg.Enabled {
return slog.New(slog.DiscardHandler), nil
}
fptr, err := os.OpenFile(cfg.GetPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return nil, fmt.Errorf("error opening log file: %w", err)
}
return build(fptr), nil
return nil
}
func runClientAndServer(c *cli.Context) error {
errNoErr := errors.New("no error")
g, ctx := errgroup.WithContext(c.Context)
g.Go(func() error {
if err := runClient(ctx, c); err != nil {
return err
}
return errNoErr
})
g.Go(func() error {
if err := runServer(ctx, c, serverConfig{
stderrAvailable: false,
handleSigInt: false,
waitForClient: true,
}); err != nil {
return err
}
return errNoErr
})
if err := g.Wait(); err == errNoErr {
return nil
} else {
return err
}
}

View File

@ -40,3 +40,9 @@ description = "Generate mocks"
dir = "{{cwd}}"
run = "go tool mockery"
alias = "m"
[tasks.generate_proto]
description = "Generate gRPC files from proto"
dir = "{{cwd}}"
run = "protoc -I proto --go_out=paths=source_relative:internal/generated/grpc --go-grpc_out=paths=source_relative:internal/generated/grpc proto/api.proto"
alias = "p"

166
proto/api.proto Normal file
View File

@ -0,0 +1,166 @@
syntax = "proto3";
package api;
import "google/protobuf/timestamp.proto";
option go_package = "git.netflux.io/rob/octoplex/internal/generated/grpc";
service InternalAPI {
rpc Communicate(stream Envelope) returns (stream Envelope);
}
message Envelope {
oneof payload {
Command command = 1;
Event event = 2;
}
}
message Command {
oneof command_type {
AddDestinationCommand add_destination = 1;
RemoveDestinationCommand remove_destination = 2;
StartDestinationCommand start_destination = 3;
StopDestinationCommand stop_destination = 4;
CloseOtherInstancesCommand close_other_instances = 5;
QuitCommand quit = 6;
StartHandshakeCommand start_handshake = 7;
}
}
message AddDestinationCommand {
string name = 1;
string url = 2;
}
message RemoveDestinationCommand {
string url= 1;
}
message StartDestinationCommand {
string url = 1;
}
message StopDestinationCommand {
string url = 1;
}
message CloseOtherInstancesCommand {}
message QuitCommand {}
message StartHandshakeCommand {};
message Event {
oneof event_type {
AppStateChangedEvent app_state_changed = 1;
DestinationStreamExitedEvent destination_stream_exited = 2;
DestinationAddedEvent destination_added = 3;
AddDestinationFailedEvent add_destination_failed = 4;
DestinationRemovedEvent destination_removed = 5;
RemoveDestinationFailedEvent remove_destination_failed = 6;
StartDestinationFailedEvent start_destination_failed = 7;
MediaServerStartedEvent media_server_started = 8;
OtherInstanceDetectedEvent other_instance_detected = 9;
FatalErrorEvent fatal_error = 10;
HandshakeCompletedEvent handshake_completed = 11;
}
}
message Container {
string id = 1;
string status = 2;
string health_state = 3;
double cpu_percent = 4;
uint64 memory_usage_bytes = 5;
int32 rx_rate = 6;
int32 tx_rate = 7;
google.protobuf.Timestamp rx_since = 8;
string image_name = 9;
string pull_status = 10;
string pull_progress = 11;
int32 pull_percent = 12;
int32 restart_count = 13;
optional int32 exit_code = 14;
string err = 15;
}
message Source {
Container container = 1;
bool live = 2;
google.protobuf.Timestamp live_changed_at = 3;
repeated string tracks = 4;
string exit_reason = 5;
}
message Destination {
enum Status {
STATUS_OFF_AIR = 0;
STATUS_STARTING = 1;
STATUS_LIVE = 2;
}
Container container = 1;
Status status = 2;
string name = 3;
string url = 4;
}
message BuildInfo {
string go_version = 1;
string version = 2;
string commit = 3;
string date = 4;
}
message AppState {
Source source = 1;
repeated Destination destinations = 2;
BuildInfo build_info = 3;
}
message AppStateChangedEvent {
AppState app_state = 1;
}
message DestinationStreamExitedEvent {
string name = 1;
string error = 2;
}
message DestinationAddedEvent {
string url = 1;
}
message AddDestinationFailedEvent {
string url = 1;
string error = 2;
}
message DestinationRemovedEvent {
string url = 1;
}
message RemoveDestinationFailedEvent {
string url = 1;
string error = 2;
}
message StartDestinationFailedEvent {
string url = 1;
string message = 2;
}
message MediaServerStartedEvent {
string rtmp_url = 1;
string rtmps_url = 2;
}
message OtherInstanceDetectedEvent {}
message FatalErrorEvent {
string message = 1;
}
message HandshakeCompletedEvent {}