WIP: refactor/api #1

Draft
rob wants to merge 26 commits from refactor/api into main
12 changed files with 510 additions and 156 deletions
Showing only changes of commit c6f21b194a - Show all commits

View File

@ -2,18 +2,19 @@ package main
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"runtime/debug"
"time"
"git.netflux.io/rob/octoplex/internal/client"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/protocol"
"git.netflux.io/rob/octoplex/internal/terminal"
"golang.design/x/clipboard"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
@ -30,6 +31,7 @@ var (
func main() {
if err := run(); err != nil {
os.Stderr.WriteString("Error: " + err.Error() + "\n")
os.Exit(1)
}
}
@ -37,6 +39,8 @@ func run() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g, ctx := errgroup.WithContext(ctx)
// TODO: logger from config
fptr, err := os.OpenFile("octoplex.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
@ -68,20 +72,11 @@ func run() error {
return fmt.Errorf("create gRPC stream: %w", err)
}
go func() {
for evt := range bus.Register() {
if sendErr := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: client.EventToProto(evt)}}); sendErr != nil {
logger.Error("Error sending event to gRPC API", "err", sendErr)
}
}
}()
go func() {
g.Go(func() error {
for {
envelope, recErr := stream.Recv()
if recErr != nil {
logger.Error("Error receiving envelope from gRPC API", "err", recErr)
continue
return fmt.Errorf("receive envelope: %w", recErr)
}
evt := envelope.GetEvent()
@ -90,21 +85,20 @@ func run() error {
continue
}
logger.Info("Received event from gRPC API", "event", evt)
// TODO: convert to domain event
logger.Debug("Received event", "type", fmt.Sprintf("%T", evt))
bus.Send(protocol.EventFromProto(evt))
}
}()
})
ui, err := terminal.StartUI(ctx, terminal.StartParams{
EventBus: bus,
Dispatcher: func(cmd event.Command) {
logger.Info("Command dispatched", "cmd", cmd)
if sendErr := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: client.CommandToProto(cmd)}}); sendErr != nil {
logger.Info("Command dispatched", "cmd", cmd.Name())
if sendErr := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: protocol.CommandToProto(cmd)}}); sendErr != nil {
logger.Error("Error sending command to gRPC API", "err", sendErr)
}
},
ClipboardAvailable: clipboardAvailable,
ConfigFilePath: "TODO",
BuildInfo: domain.BuildInfo{
GoVersion: buildInfo.GoVersion,
Version: version,
@ -118,7 +112,18 @@ func run() error {
}
defer ui.Close()
time.Sleep(10 * time.Minute) // Simulate long-running process
errUIClosed := errors.New("UI closed")
g.Go(func() error {
ui.Wait()
logger.Info("UI closed!")
return errUIClosed
})
return nil
if err := g.Wait(); err == errUIClosed {
logger.Info("UI closed, exiting")
return nil
} else {
logger.Error("UI closed with error", "err", err)
return fmt.Errorf("errgroup.Wait: %w", err)
}
}

View File

@ -125,7 +125,14 @@ func run() error {
Logger: logger,
})
return app.Run(ctx)
if err := app.Run(ctx); err != nil {
if errors.Is(err, context.Canceled) && context.Cause(ctx) == errShutdown {
return errShutdown
}
return err
}
return nil
}
// editConfigFile opens the config file in the user's editor.

View File

@ -93,7 +93,7 @@ func (a *App) Run(ctx context.Context) error {
}
grpcServer := grpc.NewServer()
grpcDone := make(chan error, 1)
pb.RegisterInternalAPIServer(grpcServer, server.Server{})
pb.RegisterInternalAPIServer(grpcServer, server.New(a.DispatchAsync, a.eventBus, a.logger))
go func() {
a.logger.Info("gRPC server started", "addr", grpcAddr)
grpcDone <- grpcServer.Serve(lis)
@ -223,6 +223,11 @@ func (a *App) Dispatch(cmd event.Command) event.Event {
return <-ch
}
// DispatchAsync dispatches a command to be executed synchronously.
func (a *App) DispatchAsync(cmd event.Command) {
a.dispatchC <- cmd
}
// errExit is an error that indicates the app should exit.
var errExit = errors.New("exit")
@ -250,6 +255,10 @@ func (a *App) handleCommand(
}
}()
if c, ok := cmd.(syncCommand); ok {
cmd = c.Command
}
switch c := cmd.(type) {
case event.CommandAddDestination:
newCfg := a.cfg
@ -263,6 +272,7 @@ func (a *App) handleCommand(
}
a.cfg = newCfg
a.handleConfigUpdate(state)
a.logger.Info("Destination added", "url", c.URL)
a.eventBus.Send(event.DestinationAddedEvent{URL: c.URL})
case event.CommandRemoveDestination:
repl.StopDestination(c.URL) // no-op if not live

View File

@ -2,6 +2,7 @@ package event
import (
"log/slog"
"slices"
"sync"
)
@ -31,6 +32,20 @@ func (b *Bus) Register() <-chan Event {
return ch
}
func (b *Bus) Deregister(ch <-chan Event) {
b.mu.Lock()
defer b.mu.Unlock()
b.consumers = slices.DeleteFunc(b.consumers, func(other chan Event) bool {
if ch == other {
close(other)
return true
}
return false
})
}
// Send sends an event to all registered consumers.
func (b *Bus) Send(evt Event) {
// The mutex is needed to ensure the backing array of b.consumers cannot be

View File

@ -6,6 +6,7 @@ import (
"git.netflux.io/rob/octoplex/internal/event"
"git.netflux.io/rob/octoplex/internal/testhelpers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestBus(t *testing.T) {
@ -25,5 +26,19 @@ func TestBus(t *testing.T) {
}()
assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent))
bus.Deregister(ch1)
_, ok := <-ch1
assert.False(t, ok)
select {
case <-ch2:
require.Fail(t, "ch2 should be blocking")
default:
}
}

View File

@ -50,6 +50,8 @@ func (c CommandCloseOtherInstance) Name() string {
}
// CommandQuit quits the app.
//
// TODO: consider how this should look in a client/server architecture.
type CommandQuit struct{}
// Name implements the Command interface.

View File

@ -7,11 +7,12 @@
package grpc
import (
reflect "reflect"
sync "sync"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
)
const (

View File

@ -0,0 +1,110 @@
package protocol
import (
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
)
// CommandToProto converts a command to a protobuf message.
func CommandToProto(command event.Command) *pb.Command {
switch evt := command.(type) {
case event.CommandAddDestination:
return buildAddDestinationCommand(evt)
case event.CommandRemoveDestination:
return buildRemoveDestinationCommand(evt)
case event.CommandStartDestination:
return buildStartDestinationCommand(evt)
case event.CommandStopDestination:
return buildStopDestinationCommand(evt)
case event.CommandCloseOtherInstance:
return buildCloseOtherInstanceCommand(evt)
case event.CommandQuit:
return buildQuitCommand(evt)
default:
panic("unknown command type")
}
}
func buildAddDestinationCommand(cmd event.CommandAddDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_AddDestination{AddDestination: &pb.AddDestinationCommand{Name: cmd.DestinationName, Url: cmd.URL}}}
}
func buildRemoveDestinationCommand(cmd event.CommandRemoveDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_RemoveDestination{RemoveDestination: &pb.RemoveDestinationCommand{Url: cmd.URL}}}
}
func buildStartDestinationCommand(cmd event.CommandStartDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_StartDestination{StartDestination: &pb.StartDestinationCommand{Url: cmd.URL}}}
}
func buildStopDestinationCommand(cmd event.CommandStopDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_StopDestination{StopDestination: &pb.StopDestinationCommand{Url: cmd.URL}}}
}
func buildCloseOtherInstanceCommand(event.CommandCloseOtherInstance) *pb.Command {
return &pb.Command{CommandType: &pb.Command_CloseOtherInstances{CloseOtherInstances: &pb.CloseOtherInstancesCommand{}}}
}
func buildQuitCommand(event.CommandQuit) *pb.Command {
return &pb.Command{CommandType: &pb.Command_Quit{Quit: &pb.QuitCommand{}}}
}
// CommandFromProto converts a protobuf message to a command.
func CommandFromProto(pbCmd *pb.Command) event.Command {
if pbCmd == nil || pbCmd.CommandType == nil {
panic("invalid or nil pb.Command")
}
switch cmd := pbCmd.CommandType.(type) {
case *pb.Command_AddDestination:
return parseAddDestinationCommand(cmd.AddDestination)
case *pb.Command_RemoveDestination:
return parseRemoveDestinationCommand(cmd.RemoveDestination)
case *pb.Command_StartDestination:
return parseStartDestinationCommand(cmd.StartDestination)
case *pb.Command_StopDestination:
return parseStopDestinationCommand(cmd.StopDestination)
case *pb.Command_CloseOtherInstances:
return parseCloseOtherInstanceCommand(cmd.CloseOtherInstances)
case *pb.Command_Quit:
return parseQuitCommand(cmd.Quit)
default:
panic("unknown pb.Command type")
}
}
func parseAddDestinationCommand(cmd *pb.AddDestinationCommand) event.Command {
if cmd == nil {
panic("nil AddDestinationCommand")
}
return event.CommandAddDestination{DestinationName: cmd.Name, URL: cmd.Url}
}
func parseRemoveDestinationCommand(cmd *pb.RemoveDestinationCommand) event.Command {
if cmd == nil {
panic("nil RemoveDestinationCommand")
}
return event.CommandRemoveDestination{URL: cmd.Url}
}
func parseStartDestinationCommand(cmd *pb.StartDestinationCommand) event.Command {
if cmd == nil {
panic("nil StartDestinationCommand")
}
return event.CommandStartDestination{URL: cmd.Url}
}
func parseStopDestinationCommand(cmd *pb.StopDestinationCommand) event.Command {
if cmd == nil {
panic("nil StopDestinationCommand")
}
return event.CommandStopDestination{URL: cmd.Url}
}
func parseCloseOtherInstanceCommand(_ *pb.CloseOtherInstancesCommand) event.Command {
return event.CommandCloseOtherInstance{}
}
func parseQuitCommand(_ *pb.QuitCommand) event.Command {
return event.CommandQuit{}
}

121
internal/protocol/domain.go Normal file
View File

@ -0,0 +1,121 @@
package protocol
import (
"errors"
"git.netflux.io/rob/octoplex/internal/domain"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
func containerToProto(c domain.Container) *pb.Container {
var errString string
if c.Err != nil {
errString = c.Err.Error()
}
var exitCode *int32
if c.ExitCode != nil {
code := int32(*c.ExitCode)
exitCode = &code
}
return &pb.Container{
Id: c.ID,
Status: c.Status,
HealthState: c.HealthState,
CpuPercent: c.CPUPercent,
MemoryUsageBytes: c.MemoryUsageBytes,
RxRate: int32(c.RxRate),
TxRate: int32(c.TxRate),
RxSince: timestamppb.New(c.RxSince),
ImageName: c.ImageName,
PullStatus: c.PullStatus,
PullProgress: c.PullProgress,
PullPercent: int32(c.PullPercent),
RestartCount: int32(c.RestartCount),
ExitCode: exitCode,
Err: errString,
}
}
func protoToContainer(pbCont *pb.Container) domain.Container {
if pbCont == nil {
return domain.Container{}
}
var exitCode *int
if pbCont.ExitCode != nil {
val := int(*pbCont.ExitCode)
exitCode = &val
}
var err error
if pbCont.Err != "" {
err = errors.New(pbCont.Err)
}
return domain.Container{
ID: pbCont.Id,
Status: pbCont.Status,
HealthState: pbCont.HealthState,
CPUPercent: pbCont.CpuPercent,
MemoryUsageBytes: pbCont.MemoryUsageBytes,
RxRate: int(pbCont.RxRate),
TxRate: int(pbCont.TxRate),
RxSince: pbCont.RxSince.AsTime(),
ImageName: pbCont.ImageName,
PullStatus: pbCont.PullStatus,
PullProgress: pbCont.PullProgress,
PullPercent: int(pbCont.PullPercent),
RestartCount: int(pbCont.RestartCount),
ExitCode: exitCode,
Err: err,
}
}
func destinationsToProto(inDests []domain.Destination) []*pb.Destination {
destinations := make([]*pb.Destination, 0, len(inDests))
for _, d := range inDests {
destinations = append(destinations, destinationToProto(d))
}
return destinations
}
func destinationToProto(d domain.Destination) *pb.Destination {
return &pb.Destination{
Container: containerToProto(d.Container),
Status: destinationStatusToProto(d.Status),
Name: d.Name,
Url: d.URL,
}
}
func protoToDestinations(pbDests []*pb.Destination) []domain.Destination {
if pbDests == nil {
return nil
}
dests := make([]domain.Destination, 0, len(pbDests))
for _, pbDest := range pbDests {
if pbDest == nil {
continue
}
dests = append(dests, domain.Destination{
Container: protoToContainer(pbDest.Container),
Status: domain.DestinationStatus(pbDest.Status), // direct cast, same underlying int
Name: pbDest.Name,
URL: pbDest.Url,
})
}
return dests
}
func destinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status {
switch s {
case domain.DestinationStatusStarting:
return pb.Destination_STATUS_STARTING
case domain.DestinationStatusLive:
return pb.Destination_STATUS_LIVE
default:
return pb.Destination_STATUS_OFF_AIR
}
}

View File

@ -1,12 +1,12 @@
// TODO: move protocol to a separate package
package client
package protocol
import (
"errors"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
"google.golang.org/protobuf/types/known/timestamppb"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
// EventToProto converts an event to a protobuf message.
@ -37,26 +37,6 @@ func EventToProto(ev event.Event) *pb.Event {
}
}
// CommandToProto converts a command to a protobuf message.
func CommandToProto(command event.Command) *pb.Command {
switch evt := command.(type) {
case event.CommandAddDestination:
return buildAddDestinationCommand(evt)
case event.CommandRemoveDestination:
return buildRemoveDestinationCommand(evt)
case event.CommandStartDestination:
return buildStartDestinationCommand(evt)
case event.CommandStopDestination:
return buildStopDestinationCommand(evt)
case event.CommandCloseOtherInstance:
return buildCloseOtherInstanceCommand(evt)
case event.CommandQuit:
return buildQuitCommand(evt)
default:
panic("unknown command type")
}
}
func buildAppStateChangeEvent(evt event.AppStateChangedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_AppStateChanged{
@ -154,85 +134,119 @@ func buildMediaServerStartedEvent(evt event.MediaServerStartedEvent) *pb.Event {
}
}
func containerToProto(c domain.Container) *pb.Container {
var errString string
if c.Err != nil {
errString = c.Err.Error()
// EventFromProto converts a protobuf message to an event.
func EventFromProto(pbEv *pb.Event) event.Event {
if pbEv == nil || pbEv.EventType == nil {
panic("invalid or nil pb.Event")
}
var exitCode *int32
if c.ExitCode != nil {
code := int32(*c.ExitCode)
exitCode = &code
}
return &pb.Container{
Id: c.ID,
Status: c.Status,
HealthState: c.HealthState,
CpuPercent: c.CPUPercent,
MemoryUsageBytes: c.MemoryUsageBytes,
RxRate: int32(c.RxRate),
TxRate: int32(c.TxRate),
RxSince: timestamppb.New(c.RxSince),
ImageName: c.ImageName,
PullStatus: c.PullStatus,
PullProgress: c.PullProgress,
PullPercent: int32(c.PullPercent),
RestartCount: int32(c.RestartCount),
ExitCode: exitCode,
Err: errString,
}
}
func destinationsToProto(inDests []domain.Destination) []*pb.Destination {
destinations := make([]*pb.Destination, 0, len(inDests))
for _, d := range inDests {
destinations = append(destinations, destinationToProto(d))
}
return destinations
}
func destinationToProto(d domain.Destination) *pb.Destination {
return &pb.Destination{
Container: containerToProto(d.Container),
Status: destinationStatusToProto(d.Status),
Name: d.Name,
Url: d.URL,
}
}
func destinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status {
switch s {
case domain.DestinationStatusStarting:
return pb.Destination_STATUS_STARTING
case domain.DestinationStatusLive:
return pb.Destination_STATUS_LIVE
switch evt := pbEv.EventType.(type) {
case *pb.Event_AppStateChanged:
return parseAppStateChangedEvent(evt.AppStateChanged)
case *pb.Event_DestinationAdded:
return parseDestinationAddedEvent(evt.DestinationAdded)
case *pb.Event_AddDestinationFailed:
return parseAddDestinationFailedEvent(evt.AddDestinationFailed)
case *pb.Event_DestinationStreamExited:
return parseDestinationStreamExitedEvent(evt.DestinationStreamExited)
case *pb.Event_StartDestinationFailed:
return parseStartDestinationFailedEvent(evt.StartDestinationFailed)
case *pb.Event_DestinationRemoved:
return parseDestinationRemovedEvent(evt.DestinationRemoved)
case *pb.Event_RemoveDestinationFailed:
return parseRemoveDestinationFailedEvent(evt.RemoveDestinationFailed)
case *pb.Event_FatalError:
return parseFatalErrorOccurredEvent(evt.FatalError)
case *pb.Event_OtherInstanceDetected:
return parseOtherInstanceDetectedEvent(evt.OtherInstanceDetected)
case *pb.Event_MediaServerStarted:
return parseMediaServerStartedEvent(evt.MediaServerStarted)
default:
return pb.Destination_STATUS_OFF_AIR
panic("unknown pb.Event type")
}
}
func buildAddDestinationCommand(cmd event.CommandAddDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_AddDestination{AddDestination: &pb.AddDestinationCommand{Url: cmd.URL}}}
func parseAppStateChangedEvent(evt *pb.AppStateChangedEvent) event.Event {
if evt == nil || evt.AppState == nil || evt.AppState.Source == nil {
panic("invalid AppStateChangedEvent")
}
return event.AppStateChangedEvent{
State: domain.AppState{
Source: domain.Source{
Container: protoToContainer(evt.AppState.Source.Container),
Live: evt.AppState.Source.Live,
LiveChangedAt: evt.AppState.Source.LiveChangedAt.AsTime(),
Tracks: evt.AppState.Source.Tracks,
ExitReason: evt.AppState.Source.ExitReason,
},
Destinations: protoToDestinations(evt.AppState.Destinations),
BuildInfo: domain.BuildInfo{
GoVersion: evt.AppState.BuildInfo.GoVersion,
Version: evt.AppState.BuildInfo.Version,
Commit: evt.AppState.BuildInfo.Commit,
Date: evt.AppState.BuildInfo.Date,
},
},
}
}
func buildRemoveDestinationCommand(cmd event.CommandRemoveDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_RemoveDestination{RemoveDestination: &pb.RemoveDestinationCommand{Url: cmd.URL}}}
func parseDestinationAddedEvent(evt *pb.DestinationAddedEvent) event.Event {
if evt == nil {
panic("nil DestinationAddedEvent")
}
return event.DestinationAddedEvent{URL: evt.Url}
}
func buildStartDestinationCommand(cmd event.CommandStartDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_StartDestination{StartDestination: &pb.StartDestinationCommand{Url: cmd.URL}}}
func parseAddDestinationFailedEvent(evt *pb.AddDestinationFailedEvent) event.Event {
if evt == nil {
panic("nil AddDestinationFailedEvent")
}
return event.AddDestinationFailedEvent{URL: evt.Url, Err: errors.New(evt.Error)}
}
func buildStopDestinationCommand(cmd event.CommandStopDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_StopDestination{StopDestination: &pb.StopDestinationCommand{Url: cmd.URL}}}
func parseDestinationStreamExitedEvent(evt *pb.DestinationStreamExitedEvent) event.Event {
if evt == nil {
panic("nil DestinationStreamExitedEvent")
}
return event.DestinationStreamExitedEvent{Name: evt.Name, Err: errors.New(evt.Error)}
}
func buildCloseOtherInstanceCommand(event.CommandCloseOtherInstance) *pb.Command {
return &pb.Command{CommandType: &pb.Command_CloseOtherInstances{CloseOtherInstances: &pb.CloseOtherInstancesCommand{}}}
func parseStartDestinationFailedEvent(evt *pb.StartDestinationFailedEvent) event.Event {
if evt == nil {
panic("nil StartDestinationFailedEvent")
}
return event.StartDestinationFailedEvent{URL: evt.Url, Message: evt.Message}
}
func buildQuitCommand(event.CommandQuit) *pb.Command {
return &pb.Command{CommandType: &pb.Command_Quit{Quit: &pb.QuitCommand{}}}
func parseDestinationRemovedEvent(evt *pb.DestinationRemovedEvent) event.Event {
if evt == nil {
panic("nil DestinationRemovedEvent")
}
return event.DestinationRemovedEvent{URL: evt.Url}
}
func parseRemoveDestinationFailedEvent(evt *pb.RemoveDestinationFailedEvent) event.Event {
if evt == nil {
panic("nil RemoveDestinationFailedEvent")
}
return event.RemoveDestinationFailedEvent{URL: evt.Url, Err: errors.New(evt.Error)}
}
func parseFatalErrorOccurredEvent(evt *pb.FatalErrorEvent) event.Event {
if evt == nil {
panic("nil FatalErrorEvent")
}
return event.FatalErrorOccurredEvent{Message: evt.Message}
}
func parseOtherInstanceDetectedEvent(_ *pb.OtherInstanceDetectedEvent) event.Event {
return event.OtherInstanceDetectedEvent{}
}
func parseMediaServerStartedEvent(evt *pb.MediaServerStartedEvent) event.Event {
if evt == nil {
panic("nil MediaServerStartedEvent")
}
return event.MediaServerStartedEvent{RTMPURL: evt.RtmpUrl, RTMPSURL: evt.RtmpsUrl}
}

View File

@ -1,7 +1,86 @@
package server
import pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
import (
"context"
"errors"
"fmt"
"io"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/protocol"
"github.com/sagikazarmark/slog-shim"
"golang.org/x/sync/errgroup"
)
type Server struct {
pb.UnimplementedInternalAPIServer
dispatcher func(event.Command)
bus *event.Bus
logger *slog.Logger
}
func New(
dispatcher func(event.Command),
bus *event.Bus,
logger *slog.Logger,
) *Server {
return &Server{
dispatcher: dispatcher,
bus: bus,
logger: logger.With("component", "server"),
}
}
func (s *Server) Communicate(stream pb.InternalAPI_CommunicateServer) error {
g, ctx := errgroup.WithContext(stream.Context())
g.Go(func() error {
eventsC := s.bus.Register()
defer s.bus.Deregister(eventsC)
for {
select {
case evt := <-eventsC:
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: protocol.EventToProto(evt)}}); err != nil {
return fmt.Errorf("send event: %w", err)
}
case <-ctx.Done():
return ctx.Err()
}
}
})
g.Go(func() error {
for {
in, err := stream.Recv()
if err == io.EOF {
s.logger.Info("Client disconnected")
return err
}
if err != nil {
return fmt.Errorf("receive message: %w", err)
}
switch pbCmd := in.Payload.(type) {
case *pb.Envelope_Command:
cmd := protocol.CommandFromProto(pbCmd.Command)
s.logger.Info("Received command", "command", cmd.Name())
s.dispatcher(cmd)
default:
return fmt.Errorf("expected command but got: %T", pbCmd)
}
}
})
if err := g.Wait(); err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
s.logger.Error("Error in gRPC stream handler, exiting", "err", err)
return fmt.Errorf("errgroup.Wait: %w", err)
}
s.logger.Info("Client stream closed")
return nil
}

View File

@ -44,9 +44,9 @@ type UI struct {
eventBus *event.Bus
dispatch func(event.Command)
clipboardAvailable bool
configFilePath string
rtmpURL, rtmpsURL string
buildInfo domain.BuildInfo
doneC chan struct{}
logger *slog.Logger
// tview state
@ -99,7 +99,6 @@ type StartParams struct {
Dispatcher func(event.Command)
Logger *slog.Logger
ClipboardAvailable bool
ConfigFilePath string
BuildInfo domain.BuildInfo
Screen *Screen // Screen may be nil.
}
@ -211,7 +210,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
eventBus: params.EventBus,
dispatch: params.Dispatcher,
clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath,
doneC: make(chan struct{}, 1),
buildInfo: params.BuildInfo,
logger: params.Logger,
app: app,
@ -262,32 +261,27 @@ func (ui *UI) renderAboutView() {
ui.aboutView.AddItem(rtmpsURLView, 1, 0, false)
}
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false)
ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false)
}
func (ui *UI) run(ctx context.Context) {
defer func() {
// Ensure the application is stopped when the UI is closed.
ui.dispatch(event.CommandQuit{})
}()
eventC := ui.eventBus.Register()
defer ui.eventBus.Deregister(eventC)
uiDone := make(chan struct{})
go func() {
defer func() {
uiDone <- struct{}{}
}()
defer close(ui.doneC)
if err := ui.app.Run(); err != nil {
ui.logger.Error("tui application error", "err", err)
ui.logger.Error("Error in UI run loop, exiting", "err", err)
}
}()
for {
select {
case evt := <-eventC:
case evt, ok := <-eventC:
if !ok {
return
}
ui.app.QueueUpdateDraw(func() {
switch evt := evt.(type) {
case event.AppStateChangedEvent:
@ -317,7 +311,7 @@ func (ui *UI) run(ctx context.Context) {
})
case <-ctx.Done():
return
case <-uiDone:
case <-ui.doneC:
return
}
}
@ -358,8 +352,6 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {
return nil
case ' ':
ui.toggleDestination()
case 'c', 'C':
ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath)
case '?':
ui.showAbout()
case 'k': // tview vim bindings
@ -825,6 +817,11 @@ func (ui *UI) Close() {
ui.app.Stop()
}
// Wait waits for the terminal user interface to finish.
func (ui *UI) Wait() {
<-ui.doneC
}
func (ui *UI) addDestination() {
const (
inputLen = 60
@ -1006,28 +1003,6 @@ func (ui *UI) copySourceURLToClipboard(url string) {
)
}
func (ui *UI) copyConfigFilePathToClipboard(clipboardAvailable bool, configFilePath string) {
var text string
if clipboardAvailable {
if configFilePath != "" {
clipboard.Write(clipboard.FmtText, []byte(configFilePath))
text = "Configuration file path copied to clipboard:\n\n" + configFilePath
} else {
text = "Configuration file path not set"
}
} else {
text = "Copy to clipboard not available"
}
ui.showModal(
pageNameModalClipboard,
text,
[]string{"Ok"},
false,
nil,
)
}
func (ui *UI) confirmQuit() {
ui.showModal(
pageNameModalQuit,
@ -1036,7 +1011,7 @@ func (ui *UI) confirmQuit() {
false,
func(buttonIndex int, _ string) {
if buttonIndex == 0 {
ui.dispatch(event.CommandQuit{})
ui.app.Stop()
}
},
)