Compare commits

..

23 Commits

Author SHA1 Message Date
f006187894 fixup! wip: refactor: API 2025-05-13 07:17:05 +02:00
5565331630 fixup! wip: refactor: API 2025-05-13 07:11:18 +02:00
32701499e7 fixup! wip: refactor: API 2025-05-12 21:13:16 +02:00
311c100d89 fixup! wip: refactor: API 2025-05-12 19:42:50 +02:00
e3e469edec fixup! wip: refactor: API 2025-05-12 19:32:24 +02:00
eaccb17f03 fixup! wip: refactor: API 2025-05-12 19:30:28 +02:00
7a3f1335c1 fixup! wip: refactor: API 2025-05-12 09:00:43 +02:00
5aa1be2066 fixup! wip: refactor: API 2025-05-11 08:11:05 +02:00
c5724dfe59 fixup! wip: refactor: API 2025-05-11 07:55:06 +02:00
d0d96dd1d9 fixup! wip: refactor: API 2025-05-11 06:15:20 +02:00
977b6fe7d7 fixup! wip: refactor: API 2025-05-11 06:14:17 +02:00
b8a77d9c6c fixup! wip: refactor: API 2025-05-11 06:12:44 +02:00
c706a41acd fixup! wip: refactor: API 2025-05-10 21:47:10 +02:00
59b0a060ba fixup! wip: refactor: API 2025-05-10 21:18:42 +02:00
116623f386 fixup! wip: refactor: API 2025-05-10 21:14:40 +02:00
aa6f50715d fixup! wip: refactor: API 2025-05-10 08:15:23 +02:00
7706bb363f fixup! wip: refactor: API 2025-05-10 07:46:58 +02:00
888ac7d67d fixup! wip: refactor: API 2025-05-10 07:36:39 +02:00
c6f21b194a fixup! wip: refactor: API 2025-05-09 06:47:27 +02:00
6ceb286e6b fixup! wip: refactor: API 2025-05-08 07:08:43 +02:00
0a6b9fad90 fixup! wip: refactor: API 2025-05-07 20:32:27 +02:00
f67a456d1e wip: refactor: API 2025-05-06 21:38:14 +02:00
461c2b1167 refactor: main.go > ./cmd/server 2025-05-06 20:17:41 +02:00
25 changed files with 4039 additions and 421 deletions

10
go.mod
View File

@ -11,7 +11,11 @@ require (
github.com/rivo/tview v0.0.0-20250330220935-949945f8d922 github.com/rivo/tview v0.0.0-20250330220935-949945f8d922
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.10.0
github.com/testcontainers/testcontainers-go v0.35.0 github.com/testcontainers/testcontainers-go v0.35.0
github.com/urfave/cli/v2 v2.27.6
golang.design/x/clipboard v0.7.0 golang.design/x/clipboard v0.7.0
golang.org/x/sync v0.13.0
google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.36.3
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
@ -24,6 +28,7 @@ require (
github.com/containerd/log v0.1.0 // indirect github.com/containerd/log v0.1.0 // indirect
github.com/containerd/platforms v0.2.1 // indirect github.com/containerd/platforms v0.2.1 // indirect
github.com/cpuguy83/dockercfg v0.3.2 // indirect github.com/cpuguy83/dockercfg v0.3.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.6.0 // indirect github.com/distribution/reference v0.6.0 // indirect
github.com/docker/go-units v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect
@ -65,6 +70,7 @@ require (
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rivo/uniseg v0.4.7 // indirect github.com/rivo/uniseg v0.4.7 // indirect
github.com/rs/zerolog v1.33.0 // indirect github.com/rs/zerolog v1.33.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.7.0 // indirect github.com/sagikazarmark/locafero v0.7.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/shirou/gopsutil/v3 v3.23.12 // indirect github.com/shirou/gopsutil/v3 v3.23.12 // indirect
@ -81,6 +87,7 @@ require (
github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect github.com/tklauser/numcpus v0.6.1 // indirect
github.com/vektra/mockery/v2 v2.52.2 // indirect github.com/vektra/mockery/v2 v2.52.2 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
@ -95,12 +102,13 @@ require (
golang.org/x/image v0.26.0 // indirect golang.org/x/image v0.26.0 // indirect
golang.org/x/mobile v0.0.0-20250408133729-978277e7eaf7 // indirect golang.org/x/mobile v0.0.0-20250408133729-978277e7eaf7 // indirect
golang.org/x/mod v0.24.0 // indirect golang.org/x/mod v0.24.0 // indirect
golang.org/x/sync v0.13.0 // indirect golang.org/x/net v0.39.0 // indirect
golang.org/x/sys v0.32.0 // indirect golang.org/x/sys v0.32.0 // indirect
golang.org/x/term v0.31.0 // indirect golang.org/x/term v0.31.0 // indirect
golang.org/x/text v0.24.0 // indirect golang.org/x/text v0.24.0 // indirect
golang.org/x/time v0.9.0 // indirect golang.org/x/time v0.9.0 // indirect
golang.org/x/tools v0.32.0 // indirect golang.org/x/tools v0.32.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
) )

9
go.sum
View File

@ -18,6 +18,8 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA= github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA=
github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc=
github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY=
github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -52,6 +54,8 @@ github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiU
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
@ -140,6 +144,7 @@ github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWN
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo= github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo=
github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k= github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k=
@ -185,8 +190,12 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/urfave/cli/v2 v2.27.6 h1:VdRdS98FNhKZ8/Az8B7MTyGQmpIr36O1EHybx/LaZ4g=
github.com/urfave/cli/v2 v2.27.6/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/vektra/mockery/v2 v2.52.2 h1:8QfPKUIrq8P3Cs7G79Iu4Byd5wdhGCE0quIS27x7rQo= github.com/vektra/mockery/v2 v2.52.2 h1:8QfPKUIrq8P3Cs7G79Iu4Byd5wdhGCE0quIS27x7rQo=
github.com/vektra/mockery/v2 v2.52.2/go.mod h1:zGDY/f6bip0Yh13GQ5j7xa43fuEoYBa4ICHEaihisHw= github.com/vektra/mockery/v2 v2.52.2/go.mod h1:zGDY/f6bip0Yh13GQ5j7xa43fuEoYBa4ICHEaihisHw=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=

View File

@ -0,0 +1,131 @@
package client
import (
"context"
"fmt"
"log/slog"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/protocol"
"git.netflux.io/rob/octoplex/internal/terminal"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// App is the client application.
type App struct {
bus *event.Bus
clipboardAvailable bool
buildInfo domain.BuildInfo
screen *terminal.Screen
logger *slog.Logger
}
// NewParams contains the parameters for the App.
type NewParams struct {
ClipboardAvailable bool
BuildInfo domain.BuildInfo
Screen *terminal.Screen
Logger *slog.Logger
}
// New creates a new App instance.
func New(params NewParams) *App {
return &App{
bus: event.NewBus(params.Logger),
clipboardAvailable: params.ClipboardAvailable,
buildInfo: params.BuildInfo,
screen: params.Screen,
logger: params.Logger,
}
}
// Run starts the application, and blocks until it is closed.
//
// It returns nil if the application was closed by the user, or an error if it
// closed for any other reason.
func (a *App) Run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
conn, err := grpc.NewClient("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to gRPC server: %w", err)
}
apiClient := pb.NewInternalAPIClient(conn)
stream, err := apiClient.Communicate(ctx)
if err != nil {
return fmt.Errorf("create gRPC stream: %w", err)
}
ui, err := terminal.NewUI(ctx, terminal.Params{
EventBus: a.bus,
Dispatcher: func(cmd event.Command) {
a.logger.Info("Command dispatched", "cmd", cmd.Name())
if sendErr := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: protocol.CommandToProto(cmd)}}); sendErr != nil {
a.logger.Error("Error sending command to gRPC API", "err", sendErr)
}
},
ClipboardAvailable: a.clipboardAvailable,
BuildInfo: a.buildInfo,
Screen: a.screen,
Logger: a.logger.With("component", "ui"),
})
if err != nil {
return fmt.Errorf("start terminal user interface: %w", err)
}
defer ui.Close()
g.Go(func() error { return ui.Run(ctx) })
// After the UI is available, perform a handshake with the server.
// Ordering is important here. We want to ensure that the UI is ready to
// react to events received from the server. Performing the handshake ensures
// the client has received at least one event.
if err := a.doHandshake(stream); err != nil {
return fmt.Errorf("do handshake: %w", err)
}
g.Go(func() error {
for {
envelope, recErr := stream.Recv()
if recErr != nil {
return fmt.Errorf("receive envelope: %w", recErr)
}
pbEvt := envelope.GetEvent()
if pbEvt == nil {
a.logger.Error("Received envelope without event")
continue
}
evt := protocol.EventFromProto(pbEvt)
a.logger.Debug("Received event from gRPC stream", "event", evt.EventName(), "payload", evt)
a.bus.Send(evt)
}
})
if err := g.Wait(); err == terminal.ErrUserClosed {
return nil
} else {
return fmt.Errorf("errgroup.Wait: %w", err)
}
}
func (a *App) doHandshake(stream pb.InternalAPI_CommunicateClient) error {
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: &pb.Command{CommandType: &pb.Command_StartHandshake{}}}}); err != nil {
return fmt.Errorf("send start handshake command: %w", err)
}
env, err := stream.Recv()
if err != nil {
return fmt.Errorf("receive handshake completed event: %w", err)
}
if evt := env.GetEvent(); evt == nil || evt.GetHandshakeCompleted() == nil {
return fmt.Errorf("expected handshake completed event but got: %T", env)
}
return nil
}

View File

@ -1,9 +1,11 @@
//go:build integration //go:build integration
package app_test package client_test
import ( import (
"context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"log/slog" "log/slog"
@ -14,39 +16,79 @@ import (
"testing" "testing"
"time" "time"
"git.netflux.io/rob/octoplex/internal/app" "git.netflux.io/rob/octoplex/internal/client"
"git.netflux.io/rob/octoplex/internal/config" "git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/container" "git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/server"
"git.netflux.io/rob/octoplex/internal/terminal" "git.netflux.io/rob/octoplex/internal/terminal"
"github.com/gdamore/tcell/v2" "github.com/gdamore/tcell/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go"
"golang.org/x/sync/errgroup"
) )
func buildAppParams( func buildClientServer(
t *testing.T,
configService *config.Service, configService *config.Service,
dockerClient container.DockerClient, dockerClient container.DockerClient,
screen tcell.SimulationScreen, screen tcell.SimulationScreen,
screenCaptureC chan<- terminal.ScreenCapture, screenCaptureC chan<- terminal.ScreenCapture,
logger *slog.Logger, logger *slog.Logger,
) app.Params { ) (*client.App, *server.App) {
t.Helper() client := client.New(client.NewParams{
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
return app.Params{
ConfigService: configService,
DockerClient: dockerClient,
Screen: &terminal.Screen{ Screen: &terminal.Screen{
Screen: screen, Screen: screen,
Width: 180, Width: 180,
Height: 25, Height: 25,
CaptureC: screenCaptureC, CaptureC: screenCaptureC,
}, },
ClipboardAvailable: false, Logger: logger,
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"}, })
Logger: logger,
} server := server.New(server.Params{
ConfigService: configService,
DockerClient: dockerClient,
WaitForClient: true,
Logger: logger,
})
return client, server
}
type clientServerResult struct {
errClient error
errServer error
}
func runClientServer(
ctx context.Context,
_ *testing.T,
clientApp *client.App,
serverApp *server.App,
) <-chan clientServerResult {
ch := make(chan clientServerResult, 1)
g, ctx := errgroup.WithContext(ctx)
var clientErr, srvErr error
g.Go(func() error {
srvErr = serverApp.Run(ctx)
return errors.New("server closed")
})
g.Go(func() error {
clientErr = clientApp.Run(ctx)
return errors.New("client closed")
})
go func() {
_ = g.Wait()
ch <- clientServerResult{errClient: clientErr, errServer: srvErr}
}()
return ch
} }
func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) { func setupSimulationScreen(t *testing.T) (tcell.SimulationScreen, chan<- terminal.ScreenCapture, func() []string) {

View File

@ -1,6 +1,6 @@
//go:build integration //go:build integration
package app_test package client_test
import ( import (
"cmp" "cmp"
@ -15,12 +15,10 @@ import (
"testing" "testing"
"time" "time"
"git.netflux.io/rob/octoplex/internal/app"
"git.netflux.io/rob/octoplex/internal/config" "git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/container" "git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/container/mocks" "git.netflux.io/rob/octoplex/internal/container/mocks"
"git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/terminal"
"git.netflux.io/rob/octoplex/internal/testhelpers" "git.netflux.io/rob/octoplex/internal/testhelpers"
"github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/network"
dockerclient "github.com/docker/docker/client" dockerclient "github.com/docker/docker/client"
@ -126,26 +124,8 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}}, Destinations: []config.Destination{{Name: "Local server 1", URL: destURL1}},
}) })
done := make(chan struct{}) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
go func() { ch := runClientServer(ctx, t, client, server)
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(app.Params{
ConfigService: configService,
DockerClient: dockerClient,
Screen: &terminal.Screen{
Screen: screen,
Width: 160,
Height: 25,
CaptureC: screenCaptureC,
},
ClipboardAvailable: false,
BuildInfo: domain.BuildInfo{Version: "0.0.1", GoVersion: "go1.16.3"},
Logger: logger,
}).Run(ctx))
}()
require.EventuallyWithT( require.EventuallyWithT(
t, t,
@ -286,13 +266,12 @@ func testIntegration(t *testing.T, mediaServerConfig config.MediaServerSource) {
printScreen(t, getContents, "After stopping the first destination") printScreen(t, getContents, "After stopping the first destination")
// TODO:
// - Source error
// - Additional features (copy URL, etc.)
cancel() cancel()
<-done result := <-ch
// May be a gRPC error, not context.Canceled:
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
} }
func TestIntegrationCustomHost(t *testing.T) { func TestIntegrationCustomHost(t *testing.T) {
@ -313,14 +292,8 @@ func TestIntegrationCustomHost(t *testing.T) {
}) })
screen, screenCaptureC, getContents := setupSimulationScreen(t) screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{}) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
go func() { ch := runClientServer(ctx, t, client, server)
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
time.Sleep(time.Second) time.Sleep(time.Second)
sendKey(t, screen, tcell.KeyF1, ' ') sendKey(t, screen, tcell.KeyF1, ' ')
@ -360,7 +333,10 @@ func TestIntegrationCustomHost(t *testing.T) {
cancel() cancel()
<-done result := <-ch
// May be a gRPC error, not context.Canceled:
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
} }
func TestIntegrationCustomTLSCerts(t *testing.T) { func TestIntegrationCustomTLSCerts(t *testing.T) {
@ -384,14 +360,8 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
}) })
screen, screenCaptureC, getContents := setupSimulationScreen(t) screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{}) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
go func() { ch := runClientServer(ctx, t, client, server)
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT( require.EventuallyWithT(
t, t,
@ -426,7 +396,9 @@ func TestIntegrationCustomTLSCerts(t *testing.T) {
cancel() cancel()
<-done result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
} }
func TestIntegrationRestartDestination(t *testing.T) { func TestIntegrationRestartDestination(t *testing.T) {
@ -465,14 +437,8 @@ func TestIntegrationRestartDestination(t *testing.T) {
}}, }},
}) })
done := make(chan struct{}) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
go func() { ch := runClientServer(ctx, t, client, server)
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT( require.EventuallyWithT(
t, t,
@ -584,7 +550,9 @@ func TestIntegrationRestartDestination(t *testing.T) {
cancel() cancel()
<-done result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
} }
func TestIntegrationStartDestinationFailed(t *testing.T) { func TestIntegrationStartDestinationFailed(t *testing.T) {
@ -602,14 +570,8 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
Destinations: []config.Destination{{Name: "Example server", URL: "rtmp://rtmp.example.com/live"}}, Destinations: []config.Destination{{Name: "Example server", URL: "rtmp://rtmp.example.com/live"}},
}) })
done := make(chan struct{}) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
go func() { ch := runClientServer(ctx, t, client, server)
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT( require.EventuallyWithT(
t, t,
@ -658,7 +620,9 @@ func TestIntegrationStartDestinationFailed(t *testing.T) {
cancel() cancel()
<-done result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
} }
func TestIntegrationDestinationValidations(t *testing.T) { func TestIntegrationDestinationValidations(t *testing.T) {
@ -675,14 +639,8 @@ func TestIntegrationDestinationValidations(t *testing.T) {
Sources: config.Sources{MediaServer: config.MediaServerSource{StreamKey: "live", RTMP: config.RTMPSource{Enabled: true}}}, Sources: config.Sources{MediaServer: config.MediaServerSource{StreamKey: "live", RTMP: config.RTMPSource{Enabled: true}}},
}) })
done := make(chan struct{}) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
go func() { ch := runClientServer(ctx, t, client, server)
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT( require.EventuallyWithT(
t, t,
@ -786,7 +744,9 @@ func TestIntegrationDestinationValidations(t *testing.T) {
printScreen(t, getContents, "After entering a duplicate destination URL") printScreen(t, getContents, "After entering a duplicate destination URL")
cancel() cancel()
<-done result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
} }
func TestIntegrationStartupCheck(t *testing.T) { func TestIntegrationStartupCheck(t *testing.T) {
@ -817,14 +777,8 @@ func TestIntegrationStartupCheck(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t) screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{}) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
go func() { ch := runClientServer(ctx, t, client, server)
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
require.EventuallyWithT( require.EventuallyWithT(
t, t,
@ -868,7 +822,9 @@ func TestIntegrationStartupCheck(t *testing.T) {
printScreen(t, getContents, "After starting the mediaserver") printScreen(t, getContents, "After starting the mediaserver")
cancel() cancel()
<-done result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
} }
func TestIntegrationMediaServerError(t *testing.T) { func TestIntegrationMediaServerError(t *testing.T) {
@ -886,18 +842,8 @@ func TestIntegrationMediaServerError(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t) screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{}) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
go func() { ch := runClientServer(ctx, t, client, server)
defer func() {
done <- struct{}{}
}()
require.EqualError(
t,
app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx),
"media server exited",
)
}()
require.EventuallyWithT( require.EventuallyWithT(
t, t,
@ -911,10 +857,12 @@ func TestIntegrationMediaServerError(t *testing.T) {
) )
printScreen(t, getContents, "Ater displaying the media server error modal") printScreen(t, getContents, "Ater displaying the media server error modal")
// Quit the app, this should cause the done channel to receive. // Quit the app:
sendKey(t, screen, tcell.KeyEnter, ' ') sendKey(t, screen, tcell.KeyEnter, ' ')
<-done result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorContains(t, result.errServer, "media server exited")
} }
func TestIntegrationDockerClientError(t *testing.T) { func TestIntegrationDockerClientError(t *testing.T) {
@ -929,18 +877,8 @@ func TestIntegrationDockerClientError(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t) screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{}) client, server := buildClientServer(configService, &dockerClient, screen, screenCaptureC, logger)
go func() { ch := runClientServer(ctx, t, client, server)
defer func() {
done <- struct{}{}
}()
require.EqualError(
t,
app.New(buildAppParams(t, configService, &dockerClient, screen, screenCaptureC, logger)).Run(ctx),
"create container client: network create: boom",
)
}()
require.EventuallyWithT( require.EventuallyWithT(
t, t,
@ -954,10 +892,12 @@ func TestIntegrationDockerClientError(t *testing.T) {
) )
printScreen(t, getContents, "Ater displaying the fatal error modal") printScreen(t, getContents, "Ater displaying the fatal error modal")
// Quit the app, this should cause the done channel to receive. // Quit the app:
sendKey(t, screen, tcell.KeyEnter, ' ') sendKey(t, screen, tcell.KeyEnter, ' ')
<-done result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.EqualError(t, result.errServer, "create container client: network create: boom")
} }
func TestIntegrationDockerConnectionError(t *testing.T) { func TestIntegrationDockerConnectionError(t *testing.T) {
@ -971,16 +911,8 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}}) configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: config.MediaServerSource{RTMP: config.RTMPSource{Enabled: true}}}})
screen, screenCaptureC, getContents := setupSimulationScreen(t) screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{}) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
go func() { ch := runClientServer(ctx, t, client, server)
defer func() {
done <- struct{}{}
}()
err := app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx)
require.ErrorContains(t, err, "dial tcp: lookup docker.example.com")
require.ErrorContains(t, err, "no such host")
}()
require.EventuallyWithT( require.EventuallyWithT(
t, t,
@ -994,10 +926,13 @@ func TestIntegrationDockerConnectionError(t *testing.T) {
) )
printScreen(t, getContents, "Ater displaying the fatal error modal") printScreen(t, getContents, "Ater displaying the fatal error modal")
// Quit the app, this should cause the done channel to receive. // Quit the app:
sendKey(t, screen, tcell.KeyEnter, ' ') sendKey(t, screen, tcell.KeyEnter, ' ')
<-done result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorContains(t, result.errServer, "dial tcp: lookup docker.example.com")
assert.ErrorContains(t, result.errServer, "no such host")
} }
func TestIntegrationCopyURLs(t *testing.T) { func TestIntegrationCopyURLs(t *testing.T) {
@ -1067,14 +1002,8 @@ func TestIntegrationCopyURLs(t *testing.T) {
configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: tc.mediaServerConfig}}) configService := setupConfigService(t, config.Config{Sources: config.Sources{MediaServer: tc.mediaServerConfig}})
screen, screenCaptureC, getContents := setupSimulationScreen(t) screen, screenCaptureC, getContents := setupSimulationScreen(t)
done := make(chan struct{}) client, server := buildClientServer(configService, dockerClient, screen, screenCaptureC, logger)
go func() { ch := runClientServer(ctx, t, client, server)
defer func() {
done <- struct{}{}
}()
require.Equal(t, context.Canceled, app.New(buildAppParams(t, configService, dockerClient, screen, screenCaptureC, logger)).Run(ctx))
}()
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
printScreen(t, getContents, "Ater loading the app") printScreen(t, getContents, "Ater loading the app")
@ -1095,7 +1024,9 @@ func TestIntegrationCopyURLs(t *testing.T) {
cancel() cancel()
<-done result := <-ch
assert.ErrorContains(t, result.errClient, "context canceled")
assert.ErrorIs(t, result.errServer, context.Canceled)
}) })
} }
} }

View File

@ -2,6 +2,7 @@ package event
import ( import (
"log/slog" "log/slog"
"slices"
"sync" "sync"
) )
@ -31,6 +32,21 @@ func (b *Bus) Register() <-chan Event {
return ch return ch
} }
// Deregister deregisters a consumer for all events.
func (b *Bus) Deregister(ch <-chan Event) {
b.mu.Lock()
defer b.mu.Unlock()
b.consumers = slices.DeleteFunc(b.consumers, func(other chan Event) bool {
if ch == other {
close(other)
return true
}
return false
})
}
// Send sends an event to all registered consumers. // Send sends an event to all registered consumers.
func (b *Bus) Send(evt Event) { func (b *Bus) Send(evt Event) {
// The mutex is needed to ensure the backing array of b.consumers cannot be // The mutex is needed to ensure the backing array of b.consumers cannot be
@ -43,7 +59,7 @@ func (b *Bus) Send(evt Event) {
select { select {
case ch <- evt: case ch <- evt:
default: default:
b.logger.Warn("Event dropped", "name", evt.name()) b.logger.Warn("Event dropped", "name", evt.EventName())
} }
} }
} }

View File

@ -6,6 +6,7 @@ import (
"git.netflux.io/rob/octoplex/internal/event" "git.netflux.io/rob/octoplex/internal/event"
"git.netflux.io/rob/octoplex/internal/testhelpers" "git.netflux.io/rob/octoplex/internal/testhelpers"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestBus(t *testing.T) { func TestBus(t *testing.T) {
@ -25,5 +26,19 @@ func TestBus(t *testing.T) {
}() }()
assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent)) assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent)) assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent))
bus.Deregister(ch1)
_, ok := <-ch1
assert.False(t, ok)
select {
case <-ch2:
require.Fail(t, "ch2 should be blocking")
default:
}
} }

View File

@ -50,6 +50,8 @@ func (c CommandCloseOtherInstance) Name() string {
} }
// CommandQuit quits the app. // CommandQuit quits the app.
//
// TODO: consider how this should look in a client/server architecture.
type CommandQuit struct{} type CommandQuit struct{}
// Name implements the Command interface. // Name implements the Command interface.

View File

@ -19,7 +19,7 @@ const (
// Event represents something which happened in the appllication. // Event represents something which happened in the appllication.
type Event interface { type Event interface {
name() Name EventName() Name
} }
// AppStateChangedEvent is emitted when the application state changes. // AppStateChangedEvent is emitted when the application state changes.
@ -27,7 +27,7 @@ type AppStateChangedEvent struct {
State domain.AppState State domain.AppState
} }
func (e AppStateChangedEvent) name() Name { func (e AppStateChangedEvent) EventName() Name {
return EventNameAppStateChanged return EventNameAppStateChanged
} }
@ -36,16 +36,17 @@ type DestinationAddedEvent struct {
URL string URL string
} }
func (e DestinationAddedEvent) name() Name { func (e DestinationAddedEvent) EventName() Name {
return EventNameDestinationAdded return EventNameDestinationAdded
} }
// AddDestinationFailedEvent is emitted when a destination fails to be added. // AddDestinationFailedEvent is emitted when a destination fails to be added.
type AddDestinationFailedEvent struct { type AddDestinationFailedEvent struct {
URL string
Err error Err error
} }
func (e AddDestinationFailedEvent) name() Name { func (e AddDestinationFailedEvent) EventName() Name {
return EventNameAddDestinationFailed return EventNameAddDestinationFailed
} }
@ -55,14 +56,17 @@ type DestinationStreamExitedEvent struct {
Err error Err error
} }
func (e DestinationStreamExitedEvent) name() Name { func (e DestinationStreamExitedEvent) EventName() Name {
return EventNameDestinationStreamExited return EventNameDestinationStreamExited
} }
// StartDestinationFailedEvent is emitted when a destination fails to start. // StartDestinationFailedEvent is emitted when a destination fails to start.
type StartDestinationFailedEvent struct{} type StartDestinationFailedEvent struct {
URL string
Message string
}
func (e StartDestinationFailedEvent) name() Name { func (e StartDestinationFailedEvent) EventName() Name {
return EventNameStartDestinationFailed return EventNameStartDestinationFailed
} }
@ -72,17 +76,18 @@ type DestinationRemovedEvent struct {
URL string URL string
} }
func (e DestinationRemovedEvent) name() Name { func (e DestinationRemovedEvent) EventName() Name {
return EventNameDestinationRemoved return EventNameDestinationRemoved
} }
// RemoveDestinationFailedEvent is emitted when a destination fails to be // RemoveDestinationFailedEvent is emitted when a destination fails to be
// removed. // removed.
type RemoveDestinationFailedEvent struct { type RemoveDestinationFailedEvent struct {
URL string
Err error Err error
} }
func (e RemoveDestinationFailedEvent) name() Name { func (e RemoveDestinationFailedEvent) EventName() Name {
return EventNameRemoveDestinationFailed return EventNameRemoveDestinationFailed
} }
@ -95,11 +100,11 @@ type FatalErrorOccurredEvent struct {
// OtherInstanceDetectedEvent is emitted when the app launches and detects another instance. // OtherInstanceDetectedEvent is emitted when the app launches and detects another instance.
type OtherInstanceDetectedEvent struct{} type OtherInstanceDetectedEvent struct{}
func (e OtherInstanceDetectedEvent) name() Name { func (e OtherInstanceDetectedEvent) EventName() Name {
return EventNameOtherInstanceDetected return EventNameOtherInstanceDetected
} }
func (e FatalErrorOccurredEvent) name() Name { func (e FatalErrorOccurredEvent) EventName() Name {
return "fatal_error_occurred" return "fatal_error_occurred"
} }
@ -109,6 +114,6 @@ type MediaServerStartedEvent struct {
RTMPSURL string RTMPSURL string
} }
func (e MediaServerStartedEvent) name() Name { func (e MediaServerStartedEvent) EventName() Name {
return "media_server_started" return "media_server_started"
} }

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,137 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v6.30.1
// source: api.proto
package grpc
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// InternalAPIClient is the client API for InternalAPI service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type InternalAPIClient interface {
Communicate(ctx context.Context, opts ...grpc.CallOption) (InternalAPI_CommunicateClient, error)
}
type internalAPIClient struct {
cc grpc.ClientConnInterface
}
func NewInternalAPIClient(cc grpc.ClientConnInterface) InternalAPIClient {
return &internalAPIClient{cc}
}
func (c *internalAPIClient) Communicate(ctx context.Context, opts ...grpc.CallOption) (InternalAPI_CommunicateClient, error) {
stream, err := c.cc.NewStream(ctx, &InternalAPI_ServiceDesc.Streams[0], "/api.InternalAPI/Communicate", opts...)
if err != nil {
return nil, err
}
x := &internalAPICommunicateClient{stream}
return x, nil
}
type InternalAPI_CommunicateClient interface {
Send(*Envelope) error
Recv() (*Envelope, error)
grpc.ClientStream
}
type internalAPICommunicateClient struct {
grpc.ClientStream
}
func (x *internalAPICommunicateClient) Send(m *Envelope) error {
return x.ClientStream.SendMsg(m)
}
func (x *internalAPICommunicateClient) Recv() (*Envelope, error) {
m := new(Envelope)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// InternalAPIServer is the server API for InternalAPI service.
// All implementations must embed UnimplementedInternalAPIServer
// for forward compatibility
type InternalAPIServer interface {
Communicate(InternalAPI_CommunicateServer) error
mustEmbedUnimplementedInternalAPIServer()
}
// UnimplementedInternalAPIServer must be embedded to have forward compatible implementations.
type UnimplementedInternalAPIServer struct {
}
func (UnimplementedInternalAPIServer) Communicate(InternalAPI_CommunicateServer) error {
return status.Errorf(codes.Unimplemented, "method Communicate not implemented")
}
func (UnimplementedInternalAPIServer) mustEmbedUnimplementedInternalAPIServer() {}
// UnsafeInternalAPIServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to InternalAPIServer will
// result in compilation errors.
type UnsafeInternalAPIServer interface {
mustEmbedUnimplementedInternalAPIServer()
}
func RegisterInternalAPIServer(s grpc.ServiceRegistrar, srv InternalAPIServer) {
s.RegisterService(&InternalAPI_ServiceDesc, srv)
}
func _InternalAPI_Communicate_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(InternalAPIServer).Communicate(&internalAPICommunicateServer{stream})
}
type InternalAPI_CommunicateServer interface {
Send(*Envelope) error
Recv() (*Envelope, error)
grpc.ServerStream
}
type internalAPICommunicateServer struct {
grpc.ServerStream
}
func (x *internalAPICommunicateServer) Send(m *Envelope) error {
return x.ServerStream.SendMsg(m)
}
func (x *internalAPICommunicateServer) Recv() (*Envelope, error) {
m := new(Envelope)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// InternalAPI_ServiceDesc is the grpc.ServiceDesc for InternalAPI service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var InternalAPI_ServiceDesc = grpc.ServiceDesc{
ServiceName: "api.InternalAPI",
HandlerType: (*InternalAPIServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "Communicate",
Handler: _InternalAPI_Communicate_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "api.proto",
}

View File

@ -0,0 +1,110 @@
package protocol
import (
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
)
// CommandToProto converts a command to a protobuf message.
func CommandToProto(command event.Command) *pb.Command {
switch evt := command.(type) {
case event.CommandAddDestination:
return buildAddDestinationCommand(evt)
case event.CommandRemoveDestination:
return buildRemoveDestinationCommand(evt)
case event.CommandStartDestination:
return buildStartDestinationCommand(evt)
case event.CommandStopDestination:
return buildStopDestinationCommand(evt)
case event.CommandCloseOtherInstance:
return buildCloseOtherInstanceCommand(evt)
case event.CommandQuit:
return buildQuitCommand(evt)
default:
panic("unknown command type")
}
}
func buildAddDestinationCommand(cmd event.CommandAddDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_AddDestination{AddDestination: &pb.AddDestinationCommand{Name: cmd.DestinationName, Url: cmd.URL}}}
}
func buildRemoveDestinationCommand(cmd event.CommandRemoveDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_RemoveDestination{RemoveDestination: &pb.RemoveDestinationCommand{Url: cmd.URL}}}
}
func buildStartDestinationCommand(cmd event.CommandStartDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_StartDestination{StartDestination: &pb.StartDestinationCommand{Url: cmd.URL}}}
}
func buildStopDestinationCommand(cmd event.CommandStopDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_StopDestination{StopDestination: &pb.StopDestinationCommand{Url: cmd.URL}}}
}
func buildCloseOtherInstanceCommand(event.CommandCloseOtherInstance) *pb.Command {
return &pb.Command{CommandType: &pb.Command_CloseOtherInstances{CloseOtherInstances: &pb.CloseOtherInstancesCommand{}}}
}
func buildQuitCommand(event.CommandQuit) *pb.Command {
return &pb.Command{CommandType: &pb.Command_Quit{Quit: &pb.QuitCommand{}}}
}
// CommandFromProto converts a protobuf message to a command.
func CommandFromProto(pbCmd *pb.Command) event.Command {
if pbCmd == nil || pbCmd.CommandType == nil {
panic("invalid or nil pb.Command")
}
switch cmd := pbCmd.CommandType.(type) {
case *pb.Command_AddDestination:
return parseAddDestinationCommand(cmd.AddDestination)
case *pb.Command_RemoveDestination:
return parseRemoveDestinationCommand(cmd.RemoveDestination)
case *pb.Command_StartDestination:
return parseStartDestinationCommand(cmd.StartDestination)
case *pb.Command_StopDestination:
return parseStopDestinationCommand(cmd.StopDestination)
case *pb.Command_CloseOtherInstances:
return parseCloseOtherInstanceCommand(cmd.CloseOtherInstances)
case *pb.Command_Quit:
return parseQuitCommand(cmd.Quit)
default:
panic("unknown pb.Command type")
}
}
func parseAddDestinationCommand(cmd *pb.AddDestinationCommand) event.Command {
if cmd == nil {
panic("nil AddDestinationCommand")
}
return event.CommandAddDestination{DestinationName: cmd.Name, URL: cmd.Url}
}
func parseRemoveDestinationCommand(cmd *pb.RemoveDestinationCommand) event.Command {
if cmd == nil {
panic("nil RemoveDestinationCommand")
}
return event.CommandRemoveDestination{URL: cmd.Url}
}
func parseStartDestinationCommand(cmd *pb.StartDestinationCommand) event.Command {
if cmd == nil {
panic("nil StartDestinationCommand")
}
return event.CommandStartDestination{URL: cmd.Url}
}
func parseStopDestinationCommand(cmd *pb.StopDestinationCommand) event.Command {
if cmd == nil {
panic("nil StopDestinationCommand")
}
return event.CommandStopDestination{URL: cmd.Url}
}
func parseCloseOtherInstanceCommand(_ *pb.CloseOtherInstancesCommand) event.Command {
return event.CommandCloseOtherInstance{}
}
func parseQuitCommand(_ *pb.QuitCommand) event.Command {
return event.CommandQuit{}
}

121
internal/protocol/domain.go Normal file
View File

@ -0,0 +1,121 @@
package protocol
import (
"errors"
"git.netflux.io/rob/octoplex/internal/domain"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
func containerToProto(c domain.Container) *pb.Container {
var errString string
if c.Err != nil {
errString = c.Err.Error()
}
var exitCode *int32
if c.ExitCode != nil {
code := int32(*c.ExitCode)
exitCode = &code
}
return &pb.Container{
Id: c.ID,
Status: c.Status,
HealthState: c.HealthState,
CpuPercent: c.CPUPercent,
MemoryUsageBytes: c.MemoryUsageBytes,
RxRate: int32(c.RxRate),
TxRate: int32(c.TxRate),
RxSince: timestamppb.New(c.RxSince),
ImageName: c.ImageName,
PullStatus: c.PullStatus,
PullProgress: c.PullProgress,
PullPercent: int32(c.PullPercent),
RestartCount: int32(c.RestartCount),
ExitCode: exitCode,
Err: errString,
}
}
func protoToContainer(pbCont *pb.Container) domain.Container {
if pbCont == nil {
return domain.Container{}
}
var exitCode *int
if pbCont.ExitCode != nil {
val := int(*pbCont.ExitCode)
exitCode = &val
}
var err error
if pbCont.Err != "" {
err = errors.New(pbCont.Err)
}
return domain.Container{
ID: pbCont.Id,
Status: pbCont.Status,
HealthState: pbCont.HealthState,
CPUPercent: pbCont.CpuPercent,
MemoryUsageBytes: pbCont.MemoryUsageBytes,
RxRate: int(pbCont.RxRate),
TxRate: int(pbCont.TxRate),
RxSince: pbCont.RxSince.AsTime(),
ImageName: pbCont.ImageName,
PullStatus: pbCont.PullStatus,
PullProgress: pbCont.PullProgress,
PullPercent: int(pbCont.PullPercent),
RestartCount: int(pbCont.RestartCount),
ExitCode: exitCode,
Err: err,
}
}
func destinationsToProto(inDests []domain.Destination) []*pb.Destination {
destinations := make([]*pb.Destination, 0, len(inDests))
for _, d := range inDests {
destinations = append(destinations, destinationToProto(d))
}
return destinations
}
func destinationToProto(d domain.Destination) *pb.Destination {
return &pb.Destination{
Container: containerToProto(d.Container),
Status: destinationStatusToProto(d.Status),
Name: d.Name,
Url: d.URL,
}
}
func protoToDestinations(pbDests []*pb.Destination) []domain.Destination {
if pbDests == nil {
return nil
}
dests := make([]domain.Destination, 0, len(pbDests))
for _, pbDest := range pbDests {
if pbDest == nil {
continue
}
dests = append(dests, domain.Destination{
Container: protoToContainer(pbDest.Container),
Status: domain.DestinationStatus(pbDest.Status), // direct cast, same underlying int
Name: pbDest.Name,
URL: pbDest.Url,
})
}
return dests
}
func destinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status {
switch s {
case domain.DestinationStatusStarting:
return pb.Destination_STATUS_STARTING
case domain.DestinationStatusLive:
return pb.Destination_STATUS_LIVE
default:
return pb.Destination_STATUS_OFF_AIR
}
}

252
internal/protocol/event.go Normal file
View File

@ -0,0 +1,252 @@
package protocol
import (
"errors"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
// EventToProto converts an event to a protobuf message.
func EventToProto(ev event.Event) *pb.Event {
switch evt := ev.(type) {
case event.AppStateChangedEvent:
return buildAppStateChangeEvent(evt)
case event.DestinationAddedEvent:
return buildDestinationAddedEvent(evt)
case event.AddDestinationFailedEvent:
return buildAddDestinationFailedEvent(evt)
case event.DestinationStreamExitedEvent:
return buildDestinationStreamExitedEvent(evt)
case event.StartDestinationFailedEvent:
return buildStartDestinationFailedEvent(evt)
case event.DestinationRemovedEvent:
return buildDestinationRemovedEvent(evt)
case event.RemoveDestinationFailedEvent:
return buildRemoveDestinationFailedEvent(evt)
case event.FatalErrorOccurredEvent:
return buildFatalErrorOccurredEvent(evt)
case event.OtherInstanceDetectedEvent:
return buildOtherInstanceDetectedEvent(evt)
case event.MediaServerStartedEvent:
return buildMediaServerStartedEvent(evt)
default:
panic("unknown event type")
}
}
func buildAppStateChangeEvent(evt event.AppStateChangedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_AppStateChanged{
AppStateChanged: &pb.AppStateChangedEvent{
AppState: &pb.AppState{
Source: &pb.Source{
Container: containerToProto(evt.State.Source.Container),
Live: evt.State.Source.Live,
LiveChangedAt: timestamppb.New(evt.State.Source.LiveChangedAt),
Tracks: evt.State.Source.Tracks,
ExitReason: evt.State.Source.ExitReason,
},
Destinations: destinationsToProto(evt.State.Destinations),
BuildInfo: &pb.BuildInfo{
GoVersion: evt.State.BuildInfo.GoVersion,
Version: evt.State.BuildInfo.Version,
Commit: evt.State.BuildInfo.Commit,
Date: evt.State.BuildInfo.Date,
},
},
},
},
}
}
func buildDestinationAddedEvent(evt event.DestinationAddedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_DestinationAdded{
DestinationAdded: &pb.DestinationAddedEvent{Url: evt.URL},
},
}
}
func buildAddDestinationFailedEvent(evt event.AddDestinationFailedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_AddDestinationFailed{
AddDestinationFailed: &pb.AddDestinationFailedEvent{Url: evt.URL, Error: evt.Err.Error()},
},
}
}
func buildDestinationStreamExitedEvent(evt event.DestinationStreamExitedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_DestinationStreamExited{
DestinationStreamExited: &pb.DestinationStreamExitedEvent{Name: evt.Name, Error: evt.Err.Error()},
},
}
}
func buildStartDestinationFailedEvent(evt event.StartDestinationFailedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_StartDestinationFailed{
StartDestinationFailed: &pb.StartDestinationFailedEvent{Url: evt.URL, Message: evt.Message},
},
}
}
func buildDestinationRemovedEvent(evt event.DestinationRemovedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_DestinationRemoved{
DestinationRemoved: &pb.DestinationRemovedEvent{Url: evt.URL},
},
}
}
func buildRemoveDestinationFailedEvent(evt event.RemoveDestinationFailedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_RemoveDestinationFailed{
RemoveDestinationFailed: &pb.RemoveDestinationFailedEvent{Url: evt.URL, Error: evt.Err.Error()},
},
}
}
func buildFatalErrorOccurredEvent(evt event.FatalErrorOccurredEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_FatalError{
FatalError: &pb.FatalErrorEvent{Message: evt.Message},
},
}
}
func buildOtherInstanceDetectedEvent(_ event.OtherInstanceDetectedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_OtherInstanceDetected{
OtherInstanceDetected: &pb.OtherInstanceDetectedEvent{},
},
}
}
func buildMediaServerStartedEvent(evt event.MediaServerStartedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_MediaServerStarted{
MediaServerStarted: &pb.MediaServerStartedEvent{RtmpUrl: evt.RTMPURL, RtmpsUrl: evt.RTMPSURL},
},
}
}
// EventFromProto converts a protobuf message to an event.
func EventFromProto(pbEv *pb.Event) event.Event {
if pbEv == nil || pbEv.EventType == nil {
panic("invalid or nil pb.Event")
}
switch evt := pbEv.EventType.(type) {
case *pb.Event_AppStateChanged:
return parseAppStateChangedEvent(evt.AppStateChanged)
case *pb.Event_DestinationAdded:
return parseDestinationAddedEvent(evt.DestinationAdded)
case *pb.Event_AddDestinationFailed:
return parseAddDestinationFailedEvent(evt.AddDestinationFailed)
case *pb.Event_DestinationStreamExited:
return parseDestinationStreamExitedEvent(evt.DestinationStreamExited)
case *pb.Event_StartDestinationFailed:
return parseStartDestinationFailedEvent(evt.StartDestinationFailed)
case *pb.Event_DestinationRemoved:
return parseDestinationRemovedEvent(evt.DestinationRemoved)
case *pb.Event_RemoveDestinationFailed:
return parseRemoveDestinationFailedEvent(evt.RemoveDestinationFailed)
case *pb.Event_FatalError:
return parseFatalErrorOccurredEvent(evt.FatalError)
case *pb.Event_OtherInstanceDetected:
return parseOtherInstanceDetectedEvent(evt.OtherInstanceDetected)
case *pb.Event_MediaServerStarted:
return parseMediaServerStartedEvent(evt.MediaServerStarted)
default:
panic("unknown pb.Event type")
}
}
func parseAppStateChangedEvent(evt *pb.AppStateChangedEvent) event.Event {
if evt == nil || evt.AppState == nil || evt.AppState.Source == nil {
panic("invalid AppStateChangedEvent")
}
return event.AppStateChangedEvent{
State: domain.AppState{
Source: domain.Source{
Container: protoToContainer(evt.AppState.Source.Container),
Live: evt.AppState.Source.Live,
LiveChangedAt: evt.AppState.Source.LiveChangedAt.AsTime(),
Tracks: evt.AppState.Source.Tracks,
ExitReason: evt.AppState.Source.ExitReason,
},
Destinations: protoToDestinations(evt.AppState.Destinations),
BuildInfo: domain.BuildInfo{
GoVersion: evt.AppState.BuildInfo.GoVersion,
Version: evt.AppState.BuildInfo.Version,
Commit: evt.AppState.BuildInfo.Commit,
Date: evt.AppState.BuildInfo.Date,
},
},
}
}
func parseDestinationAddedEvent(evt *pb.DestinationAddedEvent) event.Event {
if evt == nil {
panic("nil DestinationAddedEvent")
}
return event.DestinationAddedEvent{URL: evt.Url}
}
func parseAddDestinationFailedEvent(evt *pb.AddDestinationFailedEvent) event.Event {
if evt == nil {
panic("nil AddDestinationFailedEvent")
}
return event.AddDestinationFailedEvent{URL: evt.Url, Err: errors.New(evt.Error)}
}
func parseDestinationStreamExitedEvent(evt *pb.DestinationStreamExitedEvent) event.Event {
if evt == nil {
panic("nil DestinationStreamExitedEvent")
}
return event.DestinationStreamExitedEvent{Name: evt.Name, Err: errors.New(evt.Error)}
}
func parseStartDestinationFailedEvent(evt *pb.StartDestinationFailedEvent) event.Event {
if evt == nil {
panic("nil StartDestinationFailedEvent")
}
return event.StartDestinationFailedEvent{URL: evt.Url, Message: evt.Message}
}
func parseDestinationRemovedEvent(evt *pb.DestinationRemovedEvent) event.Event {
if evt == nil {
panic("nil DestinationRemovedEvent")
}
return event.DestinationRemovedEvent{URL: evt.Url}
}
func parseRemoveDestinationFailedEvent(evt *pb.RemoveDestinationFailedEvent) event.Event {
if evt == nil {
panic("nil RemoveDestinationFailedEvent")
}
return event.RemoveDestinationFailedEvent{URL: evt.Url, Err: errors.New(evt.Error)}
}
func parseFatalErrorOccurredEvent(evt *pb.FatalErrorEvent) event.Event {
if evt == nil {
panic("nil FatalErrorEvent")
}
return event.FatalErrorOccurredEvent{Message: evt.Message}
}
func parseOtherInstanceDetectedEvent(_ *pb.OtherInstanceDetectedEvent) event.Event {
return event.OtherInstanceDetectedEvent{}
}
func parseMediaServerStartedEvent(evt *pb.MediaServerStartedEvent) event.Event {
if evt == nil {
panic("nil MediaServerStartedEvent")
}
return event.MediaServerStartedEvent{RTMPURL: evt.RtmpUrl, RTMPSURL: evt.RtmpsUrl}
}

151
internal/server/grpc.go Normal file
View File

@ -0,0 +1,151 @@
package server
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"sync"
"time"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/protocol"
"golang.org/x/sync/errgroup"
)
// APIListenerCountDeltaFunc is a function that is called when the number of
// API clients increments or decrements.
type APIClientCountDeltaFunc func(delta int)
// Server is the gRPC server that handles incoming commands and outgoing
// events.
type Server struct {
pb.UnimplementedInternalAPIServer
dispatcher func(event.Command)
bus *event.Bus
logger *slog.Logger
mu sync.Mutex
clientCount int
clientC chan struct{}
}
// newServer creates a new gRPC server.
func newServer(
dispatcher func(event.Command),
bus *event.Bus,
logger *slog.Logger,
) *Server {
return &Server{
dispatcher: dispatcher,
bus: bus,
clientC: make(chan struct{}, 1),
logger: logger.With("component", "server"),
}
}
func (s *Server) Communicate(stream pb.InternalAPI_CommunicateServer) error {
g, ctx := errgroup.WithContext(stream.Context())
// perform handshake:
startHandshakeCmd, err := stream.Recv()
if err != nil {
return fmt.Errorf("receive start handshake command: %w", err)
}
if startHandshakeCmd.GetCommand() == nil || startHandshakeCmd.GetCommand().GetStartHandshake() == nil {
return fmt.Errorf("expected start handshake command but got: %T", startHandshakeCmd)
}
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: &pb.Event{EventType: &pb.Event_HandshakeCompleted{}}}}); err != nil {
return fmt.Errorf("send handshake completed event: %w", err)
}
// Notify that a client has connected and completed the handshake.
select {
case s.clientC <- struct{}{}:
default:
}
g.Go(func() error {
eventsC := s.bus.Register()
defer s.bus.Deregister(eventsC)
for {
select {
case evt := <-eventsC:
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: protocol.EventToProto(evt)}}); err != nil {
return fmt.Errorf("send event: %w", err)
}
case <-ctx.Done():
return ctx.Err()
}
}
})
g.Go(func() error {
s.mu.Lock()
s.clientCount++
s.mu.Unlock()
defer func() {
s.mu.Lock()
s.clientCount--
s.mu.Unlock()
}()
for {
in, err := stream.Recv()
if err == io.EOF {
s.logger.Info("Client disconnected")
return err
}
if err != nil {
return fmt.Errorf("receive message: %w", err)
}
switch pbCmd := in.Payload.(type) {
case *pb.Envelope_Command:
cmd := protocol.CommandFromProto(pbCmd.Command)
s.logger.Info("Received command", "command", cmd.Name())
s.dispatcher(cmd)
default:
return fmt.Errorf("expected command but got: %T", pbCmd)
}
}
})
if err := g.Wait(); err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
s.logger.Error("Client stream closed with error", "err", err)
return fmt.Errorf("errgroup.Wait: %w", err)
}
s.logger.Info("Client stream closed")
return nil
}
// GetClientCount returns the number of connected clients.
func (s *Server) GetClientCount() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.clientCount
}
const waitForClientTimeout = 10 * time.Second
// WaitForClient waits for _any_ client to connect and complete the handshake.
// It times out if no client has connected after 10 seconds.
func (s *Server) WaitForClient(ctx context.Context) error {
select {
case <-s.clientC:
return nil
case <-time.After(waitForClientTimeout):
return errors.New("timeout")
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -1,11 +1,13 @@
package app package server
import ( import (
"cmp" "cmp"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"log"
"log/slog" "log/slog"
"net"
"slices" "slices"
"time" "time"
@ -13,38 +15,32 @@ import (
"git.netflux.io/rob/octoplex/internal/container" "git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event" "git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/mediaserver" "git.netflux.io/rob/octoplex/internal/mediaserver"
"git.netflux.io/rob/octoplex/internal/replicator" "git.netflux.io/rob/octoplex/internal/replicator"
"git.netflux.io/rob/octoplex/internal/terminal"
"github.com/docker/docker/client" "github.com/docker/docker/client"
"google.golang.org/grpc"
) )
// App is an instance of the app. // App is an instance of the app.
type App struct { type App struct {
cfg config.Config cfg config.Config
configService *config.Service configService *config.Service
eventBus *event.Bus eventBus *event.Bus
dispatchC chan event.Command dispatchC chan event.Command
dockerClient container.DockerClient dockerClient container.DockerClient
screen *terminal.Screen // Screen may be nil. waitForClient bool
headless bool logger *slog.Logger
clipboardAvailable bool
configFilePath string
buildInfo domain.BuildInfo
logger *slog.Logger
} }
// Params holds the parameters for running the application. // Params holds the parameters for running the application.
type Params struct { type Params struct {
ConfigService *config.Service ConfigService *config.Service
DockerClient container.DockerClient DockerClient container.DockerClient
ChanSize int ChanSize int
Screen *terminal.Screen // Screen may be nil. ConfigFilePath string
Headless bool WaitForClient bool
ClipboardAvailable bool Logger *slog.Logger
ConfigFilePath string
BuildInfo domain.BuildInfo
Logger *slog.Logger
} }
// defaultChanSize is the default size of the dispatch channel. // defaultChanSize is the default size of the dispatch channel.
@ -53,17 +49,13 @@ const defaultChanSize = 64
// New creates a new application instance. // New creates a new application instance.
func New(params Params) *App { func New(params Params) *App {
return &App{ return &App{
cfg: params.ConfigService.Current(), cfg: params.ConfigService.Current(),
configService: params.ConfigService, configService: params.ConfigService,
eventBus: event.NewBus(params.Logger.With("component", "event_bus")), eventBus: event.NewBus(params.Logger.With("component", "event_bus")),
dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)), dispatchC: make(chan event.Command, cmp.Or(params.ChanSize, defaultChanSize)),
dockerClient: params.DockerClient, dockerClient: params.DockerClient,
screen: params.Screen, waitForClient: params.WaitForClient,
headless: params.Headless, logger: params.Logger,
clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo,
logger: params.Logger,
} }
} }
@ -78,20 +70,26 @@ func (a *App) Run(ctx context.Context) error {
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set") return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
} }
if !a.headless { const grpcAddr = ":50051"
ui, err := terminal.StartUI(ctx, terminal.StartParams{ lis, err := net.Listen("tcp", grpcAddr)
EventBus: a.eventBus, if err != nil {
Dispatcher: func(cmd event.Command) { a.dispatchC <- cmd }, log.Fatalf("failed to listen: %v", err)
Screen: a.screen, }
ClipboardAvailable: a.clipboardAvailable, defer lis.Close()
ConfigFilePath: a.configFilePath,
BuildInfo: a.buildInfo, grpcServer := grpc.NewServer()
Logger: a.logger.With("component", "ui"), grpcDone := make(chan error, 1)
}) internalAPI := newServer(a.DispatchAsync, a.eventBus, a.logger)
if err != nil { pb.RegisterInternalAPIServer(grpcServer, internalAPI)
return fmt.Errorf("start terminal user interface: %w", err) go func() {
a.logger.Info("gRPC server started", "addr", grpcAddr)
grpcDone <- grpcServer.Serve(lis)
}()
if a.waitForClient {
if err = internalAPI.WaitForClient(ctx); err != nil {
return fmt.Errorf("wait for client: %w", err)
} }
defer ui.Close()
} }
// emptyUI is a dummy function that sets the UI state to an empty state, and // emptyUI is a dummy function that sets the UI state to an empty state, and
@ -107,12 +105,13 @@ func (a *App) Run(ctx context.Context) error {
a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}}) a.eventBus.Send(event.AppStateChangedEvent{State: domain.AppState{}})
} }
// doFatalError publishes a fatal error to the event bus, waiting for the // doFatalError publishes a fatal error to the event bus. It will block until
// user to acknowledge it if not in headless mode. // the user acknowledges it if there is 1 or more clients connected to the
// internal API.
doFatalError := func(msg string) { doFatalError := func(msg string) {
a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg}) a.eventBus.Send(event.FatalErrorOccurredEvent{Message: msg})
if a.headless { if internalAPI.GetClientCount() == 0 {
return return
} }
@ -177,21 +176,20 @@ func (a *App) Run(ctx context.Context) error {
defer uiUpdateT.Stop() defer uiUpdateT.Stop()
startMediaServerC := make(chan struct{}, 1) startMediaServerC := make(chan struct{}, 1)
if a.headless { // disable startup check in headless mode for now if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
doFatalError(startupErr.Error())
return startupErr
} else if ok {
startMediaServerC <- struct{}{} startMediaServerC <- struct{}{}
} else {
if ok, startupErr := doStartupCheck(ctx, containerClient, a.eventBus); startupErr != nil {
doFatalError(startupErr.Error())
return startupErr
} else if ok {
startMediaServerC <- struct{}{}
}
} }
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case grpcErr := <-grpcDone:
a.logger.Error("gRPC server exited", "err", grpcErr)
return grpcErr
case <-startMediaServerC: case <-startMediaServerC:
if err = srv.Start(ctx); err != nil { if err = srv.Start(ctx); err != nil {
return fmt.Errorf("start mediaserver: %w", err) return fmt.Errorf("start mediaserver: %w", err)
@ -245,6 +243,11 @@ func (a *App) Dispatch(cmd event.Command) event.Event {
return <-ch return <-ch
} }
// DispatchAsync dispatches a command to be executed synchronously.
func (a *App) DispatchAsync(cmd event.Command) {
a.dispatchC <- cmd
}
// errExit is an error that indicates the app should exit. // errExit is an error that indicates the app should exit.
var errExit = errors.New("exit") var errExit = errors.New("exit")
@ -272,6 +275,10 @@ func (a *App) handleCommand(
} }
}() }()
if c, ok := cmd.(syncCommand); ok {
cmd = c.Command
}
switch c := cmd.(type) { switch c := cmd.(type) {
case event.CommandAddDestination: case event.CommandAddDestination:
newCfg := a.cfg newCfg := a.cfg
@ -281,10 +288,11 @@ func (a *App) handleCommand(
}) })
if err := a.configService.SetConfig(newCfg); err != nil { if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Add destination failed", "err", err) a.logger.Error("Add destination failed", "err", err)
return event.AddDestinationFailedEvent{Err: err}, nil return event.AddDestinationFailedEvent{URL: c.URL, Err: err}, nil
} }
a.cfg = newCfg a.cfg = newCfg
a.handleConfigUpdate(state) a.handleConfigUpdate(state)
a.logger.Info("Destination added", "url", c.URL)
a.eventBus.Send(event.DestinationAddedEvent{URL: c.URL}) a.eventBus.Send(event.DestinationAddedEvent{URL: c.URL})
case event.CommandRemoveDestination: case event.CommandRemoveDestination:
repl.StopDestination(c.URL) // no-op if not live repl.StopDestination(c.URL) // no-op if not live
@ -294,7 +302,7 @@ func (a *App) handleCommand(
}) })
if err := a.configService.SetConfig(newCfg); err != nil { if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Remove destination failed", "err", err) a.logger.Error("Remove destination failed", "err", err)
a.eventBus.Send(event.RemoveDestinationFailedEvent{Err: err}) a.eventBus.Send(event.RemoveDestinationFailedEvent{URL: c.URL, Err: err})
break break
} }
a.cfg = newCfg a.cfg = newCfg
@ -302,7 +310,7 @@ func (a *App) handleCommand(
a.eventBus.Send(event.DestinationRemovedEvent{URL: c.URL}) //nolint:gosimple a.eventBus.Send(event.DestinationRemovedEvent{URL: c.URL}) //nolint:gosimple
case event.CommandStartDestination: case event.CommandStartDestination:
if !state.Source.Live { if !state.Source.Live {
a.eventBus.Send(event.StartDestinationFailedEvent{}) a.eventBus.Send(event.StartDestinationFailedEvent{URL: c.URL, Message: "source not live"})
break break
} }

View File

@ -1,4 +1,4 @@
package app package server
import ( import (
"testing" "testing"

View File

@ -3,6 +3,7 @@ package terminal
import ( import (
"cmp" "cmp"
"context" "context"
"errors"
"fmt" "fmt"
"log/slog" "log/slog"
"maps" "maps"
@ -44,9 +45,9 @@ type UI struct {
eventBus *event.Bus eventBus *event.Bus
dispatch func(event.Command) dispatch func(event.Command)
clipboardAvailable bool clipboardAvailable bool
configFilePath string
rtmpURL, rtmpsURL string rtmpURL, rtmpsURL string
buildInfo domain.BuildInfo buildInfo domain.BuildInfo
appExitC chan error
logger *slog.Logger logger *slog.Logger
// tview state // tview state
@ -92,20 +93,20 @@ type ScreenCapture struct {
Width, Height int Width, Height int
} }
// StartParams contains the parameters for starting a new terminal user // Params contains the parameters for starting a new terminal user
// interface. // interface.
type StartParams struct { type Params struct {
EventBus *event.Bus EventBus *event.Bus
Dispatcher func(event.Command) Dispatcher func(event.Command)
Logger *slog.Logger Logger *slog.Logger
ClipboardAvailable bool ClipboardAvailable bool
ConfigFilePath string
BuildInfo domain.BuildInfo BuildInfo domain.BuildInfo
Screen *Screen // Screen may be nil. Screen *Screen // Screen may be nil.
} }
// StartUI starts the terminal user interface. // NewUI creates the user interface. Call [Run] on the *UI instance to block
func StartUI(ctx context.Context, params StartParams) (*UI, error) { // until it is completed.
func NewUI(ctx context.Context, params Params) (*UI, error) {
app := tview.NewApplication() app := tview.NewApplication()
var screen tcell.Screen var screen tcell.Screen
@ -211,7 +212,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
eventBus: params.EventBus, eventBus: params.EventBus,
dispatch: params.Dispatcher, dispatch: params.Dispatcher,
clipboardAvailable: params.ClipboardAvailable, clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath, appExitC: make(chan error, 1),
buildInfo: params.BuildInfo, buildInfo: params.BuildInfo,
logger: params.Logger, logger: params.Logger,
app: app, app: app,
@ -237,8 +238,6 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
app.SetInputCapture(ui.inputCaptureHandler) app.SetInputCapture(ui.inputCaptureHandler)
app.SetAfterDrawFunc(ui.afterDrawHandler) app.SetAfterDrawFunc(ui.afterDrawHandler)
go ui.run(ctx)
return ui, nil return ui, nil
} }
@ -262,32 +261,32 @@ func (ui *UI) renderAboutView() {
ui.aboutView.AddItem(rtmpsURLView, 1, 0, false) ui.aboutView.AddItem(rtmpsURLView, 1, 0, false)
} }
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false) ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
} }
func (ui *UI) run(ctx context.Context) { var ErrUserClosed = errors.New("user closed UI")
defer func() {
// Ensure the application is stopped when the UI is closed.
ui.dispatch(event.CommandQuit{})
}()
// Run runs the user interface. It always returns a non-nil error, which will
// be [ErrUserClosed] if the user voluntarily closed the UI.
func (ui *UI) Run(ctx context.Context) error {
eventC := ui.eventBus.Register() eventC := ui.eventBus.Register()
defer ui.eventBus.Deregister(eventC)
uiDone := make(chan struct{})
go func() { go func() {
defer func() { err := ui.app.Run()
uiDone <- struct{}{} if err != nil {
}() ui.logger.Error("Error in UI run loop, exiting", "err", err)
if err := ui.app.Run(); err != nil {
ui.logger.Error("tui application error", "err", err)
} }
ui.appExitC <- err
}() }()
for { for {
select { select {
case evt := <-eventC: case evt, ok := <-eventC:
if !ok {
// should never happen
return errors.New("event channel closed")
}
ui.app.QueueUpdateDraw(func() { ui.app.QueueUpdateDraw(func() {
switch evt := evt.(type) { switch evt := evt.(type) {
case event.AppStateChangedEvent: case event.AppStateChangedEvent:
@ -313,12 +312,11 @@ func (ui *UI) run(ctx context.Context) {
default: default:
ui.logger.Warn("unhandled event", "event", evt) ui.logger.Warn("unhandled event", "event", evt)
} }
}) })
case <-ctx.Done(): case <-ctx.Done():
return return ctx.Err()
case <-uiDone: case err := <-ui.appExitC:
return return cmp.Or(err, ErrUserClosed)
} }
} }
} }
@ -358,8 +356,6 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
return nil return nil
case ' ': case ' ':
ui.toggleDestination() ui.toggleDestination()
case 'c', 'C':
ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath)
case '?': case '?':
ui.showAbout() ui.showAbout()
case 'k': // tview vim bindings case 'k': // tview vim bindings
@ -825,6 +821,11 @@ func (ui *UI) Close() {
ui.app.Stop() ui.app.Stop()
} }
// Wait waits for the terminal user interface to finish.
func (ui *UI) Wait() {
<-ui.appExitC
}
func (ui *UI) addDestination() { func (ui *UI) addDestination() {
const ( const (
inputLen = 60 inputLen = 60
@ -1006,28 +1007,6 @@ func (ui *UI) copySourceURLToClipboard(url string) {
) )
} }
func (ui *UI) copyConfigFilePathToClipboard(clipboardAvailable bool, configFilePath string) {
var text string
if clipboardAvailable {
if configFilePath != "" {
clipboard.Write(clipboard.FmtText, []byte(configFilePath))
text = "Configuration file path copied to clipboard:\n\n" + configFilePath
} else {
text = "Configuration file path not set"
}
} else {
text = "Copy to clipboard not available"
}
ui.showModal(
pageNameModalClipboard,
text,
[]string{"Ok"},
false,
nil,
)
}
func (ui *UI) confirmQuit() { func (ui *UI) confirmQuit() {
ui.showModal( ui.showModal(
pageNameModalQuit, pageNameModalQuit,
@ -1036,7 +1015,7 @@ func (ui *UI) confirmQuit() {
false, false,
func(buttonIndex int, _ string) { func(buttonIndex int, _ string) {
if buttonIndex == 0 { if buttonIndex == 0 {
ui.dispatch(event.CommandQuit{}) ui.app.Stop()
} }
}, },
) )

330
main.go
View File

@ -4,21 +4,23 @@ import (
"cmp" "cmp"
"context" "context"
"errors" "errors"
"flag"
"fmt" "fmt"
"io" "io"
"log/slog" "log/slog"
"os" "os"
"os/exec"
"os/signal" "os/signal"
"runtime"
"runtime/debug" "runtime/debug"
"syscall" "syscall"
"git.netflux.io/rob/octoplex/internal/app" "git.netflux.io/rob/octoplex/internal/client"
"git.netflux.io/rob/octoplex/internal/config" "git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/server"
dockerclient "github.com/docker/docker/client" dockerclient "github.com/docker/docker/client"
"github.com/urfave/cli/v2"
"golang.design/x/clipboard" "golang.design/x/clipboard"
"golang.org/x/sync/errgroup"
) )
var ( var (
@ -30,23 +32,108 @@ var (
date string date string
) )
var errShutdown = errors.New("shutdown") // errInterrupt is an error type that indicates an interrupt signal was
// received.
type errInterrupt struct{}
func main() { // Error implements the error interface.
var exitStatus int func (e errInterrupt) Error() string {
return "interrupt signal received"
if err := run(); errors.Is(err, errShutdown) {
exitStatus = 130
} else if err != nil {
exitStatus = 1
_, _ = os.Stderr.WriteString("Error: " + err.Error() + "\n")
}
os.Exit(exitStatus)
} }
func run() error { // ExitCode implements the ExitCoder interface.
ctx, cancel := context.WithCancelCause(context.Background()) func (e errInterrupt) ExitCode() int {
return 130
}
func main() {
app := &cli.App{
Name: "Octoplex",
Usage: "Octoplex is a live video restreamer for Docker.",
Commands: []*cli.Command{
{
Name: "client",
Usage: "Run the client",
Action: func(c *cli.Context) error {
return runClient(c.Context, c)
},
},
{
Name: "server",
Usage: "Run the server",
Action: func(c *cli.Context) error {
return runServer(c.Context, c, serverConfig{
stderrAvailable: true,
handleSigInt: true,
waitForClient: false,
})
},
},
{
Name: "run",
Usage: "Run server and client together (testing)",
Action: func(c *cli.Context) error {
return runClientAndServer(c)
},
},
},
}
if err := app.Run(os.Args); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}
func runClient(ctx context.Context, _ *cli.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// TODO: logger from config
fptr, err := os.OpenFile("octoplex.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return fmt.Errorf("open log file: %w", err)
}
logger := slog.New(slog.NewTextHandler(fptr, nil))
logger.Info("Starting client", "version", cmp.Or(version, "devel"), "commit", cmp.Or(commit, "unknown"), "date", cmp.Or(date, "unknown"), "go_version", runtime.Version())
var clipboardAvailable bool
if err = clipboard.Init(); err != nil {
logger.Warn("Clipboard not available", "err", err)
} else {
clipboardAvailable = true
}
buildInfo, ok := debug.ReadBuildInfo()
if !ok {
return fmt.Errorf("read build info: %w", err)
}
app := client.New(client.NewParams{
ClipboardAvailable: clipboardAvailable,
BuildInfo: domain.BuildInfo{
GoVersion: buildInfo.GoVersion,
Version: version,
Commit: commit,
Date: date,
},
Logger: logger,
})
if err := app.Run(ctx); err != nil {
return fmt.Errorf("run app: %w", err)
}
return nil
}
type serverConfig struct {
stderrAvailable bool
handleSigInt bool
waitForClient bool
}
func runServer(ctx context.Context, _ *cli.Context, serverCfg serverConfig) error {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil) defer cancel(nil)
configService, err := config.NewDefaultService() configService, err := config.NewDefaultService()
@ -54,45 +141,35 @@ func run() error {
return fmt.Errorf("build config service: %w", err) return fmt.Errorf("build config service: %w", err)
} }
help := flag.Bool("h", false, "Show help")
flag.Parse()
if *help {
printUsage()
return nil
}
if narg := flag.NArg(); narg > 1 {
printUsage()
return fmt.Errorf("too many arguments")
} else if narg == 1 {
switch flag.Arg(0) {
case "edit-config":
return editConfigFile(configService)
case "print-config":
return printConfigPath(configService.Path())
case "version":
return printVersion()
case "help":
printUsage()
return nil
}
}
cfg, err := configService.ReadOrCreateConfig() cfg, err := configService.ReadOrCreateConfig()
if err != nil { if err != nil {
return fmt.Errorf("read or create config: %w", err) return fmt.Errorf("read or create config: %w", err)
} }
headless := os.Getenv("OCTO_HEADLESS") != "" // TODO: improve logger API
logger, err := buildLogger(cfg.LogFile, headless) // Currently it's a bit complicated because we can only use stdout - the
if err != nil { // preferred destination - if the client is not running. Otherwise we
return fmt.Errorf("build logger: %w", err) // fallback to the legacy configuration but this should be bought more
// in-line with the client/server split.
var w io.Writer
if serverCfg.stderrAvailable {
w = os.Stdout
} else if !cfg.LogFile.Enabled {
w = io.Discard
} else {
w, err = os.OpenFile(cfg.LogFile.GetPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return fmt.Errorf("error opening log file: %w", err)
}
} }
if headless { var handlerOpts slog.HandlerOptions
// When running in headless mode tview doesn't handle SIGINT for us. if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
}
logger := slog.New(slog.NewTextHandler(w, &handlerOpts))
if serverCfg.handleSigInt {
ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
@ -100,17 +177,10 @@ func run() error {
<-ch <-ch
logger.Info("Received interrupt signal, exiting") logger.Info("Received interrupt signal, exiting")
signal.Stop(ch) signal.Stop(ch)
cancel(errShutdown) cancel(errInterrupt{})
}() }()
} }
var clipboardAvailable bool
if err = clipboard.Init(); err != nil {
logger.Warn("Clipboard not available", "err", err)
} else {
clipboardAvailable = true
}
dockerClient, err := dockerclient.NewClientWithOpts( dockerClient, err := dockerclient.NewClientWithOpts(
dockerclient.FromEnv, dockerclient.FromEnv,
dockerclient.WithAPIVersionNegotiation(), dockerclient.WithAPIVersionNegotiation(),
@ -119,102 +189,64 @@ func run() error {
return fmt.Errorf("new docker client: %w", err) return fmt.Errorf("new docker client: %w", err)
} }
buildInfo, ok := debug.ReadBuildInfo() app := server.New(server.Params{
if !ok { ConfigService: configService,
return fmt.Errorf("read build info: %w", err) DockerClient: dockerClient,
} ConfigFilePath: configService.Path(),
WaitForClient: serverCfg.waitForClient,
app := app.New(app.Params{ Logger: logger,
ConfigService: configService,
DockerClient: dockerClient,
Headless: headless,
ClipboardAvailable: clipboardAvailable,
ConfigFilePath: configService.Path(),
BuildInfo: domain.BuildInfo{
GoVersion: buildInfo.GoVersion,
Version: version,
Commit: commit,
Date: date,
},
Logger: logger,
}) })
return app.Run(ctx) logger.Info(
} "Starting server",
"version",
cmp.Or(version, "devel"),
"commit",
cmp.Or(commit, "unknown"),
"date",
cmp.Or(date, "unknown"),
"go_version",
runtime.Version(),
)
// editConfigFile opens the config file in the user's editor. if err := app.Run(ctx); err != nil {
func editConfigFile(configService *config.Service) error { if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ctx), errInterrupt{}) {
if _, err := configService.ReadOrCreateConfig(); err != nil { return context.Cause(ctx)
return fmt.Errorf("read or create config: %w", err)
}
editor := os.Getenv("EDITOR")
if editor == "" {
editor = "vi"
}
binary, err := exec.LookPath(editor)
if err != nil {
return fmt.Errorf("look path: %w", err)
}
fmt.Fprintf(os.Stderr, "Editing config file: %s\n", configService.Path())
fmt.Println(binary)
if err := syscall.Exec(binary, []string{"--", configService.Path()}, os.Environ()); err != nil {
return fmt.Errorf("exec: %w", err)
}
return nil
}
// printConfigPath prints the path to the config file to stderr.
func printConfigPath(configPath string) error {
fmt.Fprintln(os.Stderr, configPath)
return nil
}
// printVersion prints the version of the application to stderr.
func printVersion() error {
fmt.Fprintf(os.Stderr, "%s version %s\n", domain.AppName, cmp.Or(version, "0.0.0-dev"))
return nil
}
func printUsage() {
os.Stderr.WriteString("Usage: octoplex [command]\n\n")
os.Stderr.WriteString("Commands:\n\n")
os.Stderr.WriteString(" edit-config Edit the config file\n")
os.Stderr.WriteString(" print-config Print the path to the config file\n")
os.Stderr.WriteString(" version Print the version of the application\n")
os.Stderr.WriteString(" help Print this help message\n")
os.Stderr.WriteString("\n")
os.Stderr.WriteString("Additionally, Octoplex can be configured with the following environment variables:\n\n")
os.Stderr.WriteString(" OCTO_DEBUG Enables debug logging if set\n")
os.Stderr.WriteString(" OCTO_HEADLESS Enables headless mode if set (experimental)\n\n")
}
// buildLogger builds the logger, which may be a no-op logger.
func buildLogger(cfg config.LogFile, headless bool) (*slog.Logger, error) {
build := func(w io.Writer) *slog.Logger {
var handlerOpts slog.HandlerOptions
if os.Getenv("OCTO_DEBUG") != "" {
handlerOpts.Level = slog.LevelDebug
} }
return slog.New(slog.NewTextHandler(w, &handlerOpts)) return err
} }
// In headless mode, always log to stderr. return nil
if headless { }
return build(os.Stderr), nil
} func runClientAndServer(c *cli.Context) error {
errNoErr := errors.New("no error")
if !cfg.Enabled {
return slog.New(slog.DiscardHandler), nil g, ctx := errgroup.WithContext(c.Context)
}
g.Go(func() error {
fptr, err := os.OpenFile(cfg.GetPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err := runClient(ctx, c); err != nil {
if err != nil { return err
return nil, fmt.Errorf("error opening log file: %w", err) }
}
return errNoErr
return build(fptr), nil })
g.Go(func() error {
if err := runServer(ctx, c, serverConfig{
stderrAvailable: false,
handleSigInt: false,
waitForClient: true,
}); err != nil {
return err
}
return errNoErr
})
if err := g.Wait(); err == errNoErr {
return nil
} else {
return err
}
} }

View File

@ -40,3 +40,9 @@ description = "Generate mocks"
dir = "{{cwd}}" dir = "{{cwd}}"
run = "go tool mockery" run = "go tool mockery"
alias = "m" alias = "m"
[tasks.generate_proto]
description = "Generate gRPC files from proto"
dir = "{{cwd}}"
run = "protoc -I proto --go_out=paths=source_relative:internal/generated/grpc --go-grpc_out=paths=source_relative:internal/generated/grpc proto/api.proto"
alias = "p"

166
proto/api.proto Normal file
View File

@ -0,0 +1,166 @@
syntax = "proto3";
package api;
import "google/protobuf/timestamp.proto";
option go_package = "git.netflux.io/rob/octoplex/internal/generated/grpc";
service InternalAPI {
rpc Communicate(stream Envelope) returns (stream Envelope);
}
message Envelope {
oneof payload {
Command command = 1;
Event event = 2;
}
}
message Command {
oneof command_type {
AddDestinationCommand add_destination = 1;
RemoveDestinationCommand remove_destination = 2;
StartDestinationCommand start_destination = 3;
StopDestinationCommand stop_destination = 4;
CloseOtherInstancesCommand close_other_instances = 5;
QuitCommand quit = 6;
StartHandshakeCommand start_handshake = 7;
}
}
message AddDestinationCommand {
string name = 1;
string url = 2;
}
message RemoveDestinationCommand {
string url= 1;
}
message StartDestinationCommand {
string url = 1;
}
message StopDestinationCommand {
string url = 1;
}
message CloseOtherInstancesCommand {}
message QuitCommand {}
message StartHandshakeCommand {};
message Event {
oneof event_type {
AppStateChangedEvent app_state_changed = 1;
DestinationStreamExitedEvent destination_stream_exited = 2;
DestinationAddedEvent destination_added = 3;
AddDestinationFailedEvent add_destination_failed = 4;
DestinationRemovedEvent destination_removed = 5;
RemoveDestinationFailedEvent remove_destination_failed = 6;
StartDestinationFailedEvent start_destination_failed = 7;
MediaServerStartedEvent media_server_started = 8;
OtherInstanceDetectedEvent other_instance_detected = 9;
FatalErrorEvent fatal_error = 10;
HandshakeCompletedEvent handshake_completed = 11;
}
}
message Container {
string id = 1;
string status = 2;
string health_state = 3;
double cpu_percent = 4;
uint64 memory_usage_bytes = 5;
int32 rx_rate = 6;
int32 tx_rate = 7;
google.protobuf.Timestamp rx_since = 8;
string image_name = 9;
string pull_status = 10;
string pull_progress = 11;
int32 pull_percent = 12;
int32 restart_count = 13;
optional int32 exit_code = 14;
string err = 15;
}
message Source {
Container container = 1;
bool live = 2;
google.protobuf.Timestamp live_changed_at = 3;
repeated string tracks = 4;
string exit_reason = 5;
}
message Destination {
enum Status {
STATUS_OFF_AIR = 0;
STATUS_STARTING = 1;
STATUS_LIVE = 2;
}
Container container = 1;
Status status = 2;
string name = 3;
string url = 4;
}
message BuildInfo {
string go_version = 1;
string version = 2;
string commit = 3;
string date = 4;
}
message AppState {
Source source = 1;
repeated Destination destinations = 2;
BuildInfo build_info = 3;
}
message AppStateChangedEvent {
AppState app_state = 1;
}
message DestinationStreamExitedEvent {
string name = 1;
string error = 2;
}
message DestinationAddedEvent {
string url = 1;
}
message AddDestinationFailedEvent {
string url = 1;
string error = 2;
}
message DestinationRemovedEvent {
string url = 1;
}
message RemoveDestinationFailedEvent {
string url = 1;
string error = 2;
}
message StartDestinationFailedEvent {
string url = 1;
string message = 2;
}
message MediaServerStartedEvent {
string rtmp_url = 1;
string rtmps_url = 2;
}
message OtherInstanceDetectedEvent {}
message FatalErrorEvent {
string message = 1;
}
message HandshakeCompletedEvent {}