From c6f21b194a741064b2dd03948f534dd8acc2b255 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Fri, 9 May 2025 06:47:27 +0200 Subject: [PATCH] fixup! wip: refactor: API --- cmd/client/main.go | 47 +++-- cmd/server/main.go | 9 +- internal/app/app.go | 12 +- internal/event/bus.go | 15 ++ internal/event/bus_test.go | 15 ++ internal/event/command.go | 2 + internal/generated/grpc/api.pb.go | 5 +- internal/protocol/command.go | 110 ++++++++++ internal/protocol/domain.go | 121 +++++++++++ .../{client/protocol.go => protocol/event.go} | 192 ++++++++++-------- internal/server/server.go | 81 +++++++- internal/terminal/terminal.go | 57 ++---- 12 files changed, 510 insertions(+), 156 deletions(-) create mode 100644 internal/protocol/command.go create mode 100644 internal/protocol/domain.go rename internal/{client/protocol.go => protocol/event.go} (51%) diff --git a/cmd/client/main.go b/cmd/client/main.go index 4afe0cf..d3e08c5 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -2,18 +2,19 @@ package main import ( "context" + "errors" "fmt" "log/slog" "os" "runtime/debug" - "time" - "git.netflux.io/rob/octoplex/internal/client" "git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/event" pb "git.netflux.io/rob/octoplex/internal/generated/grpc" + "git.netflux.io/rob/octoplex/internal/protocol" "git.netflux.io/rob/octoplex/internal/terminal" "golang.design/x/clipboard" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -30,6 +31,7 @@ var ( func main() { if err := run(); err != nil { os.Stderr.WriteString("Error: " + err.Error() + "\n") + os.Exit(1) } } @@ -37,6 +39,8 @@ func run() error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + g, ctx := errgroup.WithContext(ctx) + // TODO: logger from config fptr, err := os.OpenFile("octoplex.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { @@ -68,20 +72,11 @@ func run() error { return fmt.Errorf("create gRPC stream: %w", err) } - go func() { - for evt := range bus.Register() { - if sendErr := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: client.EventToProto(evt)}}); sendErr != nil { - logger.Error("Error sending event to gRPC API", "err", sendErr) - } - } - }() - - go func() { + g.Go(func() error { for { envelope, recErr := stream.Recv() if recErr != nil { - logger.Error("Error receiving envelope from gRPC API", "err", recErr) - continue + return fmt.Errorf("receive envelope: %w", recErr) } evt := envelope.GetEvent() @@ -90,21 +85,20 @@ func run() error { continue } - logger.Info("Received event from gRPC API", "event", evt) - // TODO: convert to domain event + logger.Debug("Received event", "type", fmt.Sprintf("%T", evt)) + bus.Send(protocol.EventFromProto(evt)) } - }() + }) ui, err := terminal.StartUI(ctx, terminal.StartParams{ EventBus: bus, Dispatcher: func(cmd event.Command) { - logger.Info("Command dispatched", "cmd", cmd) - if sendErr := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: client.CommandToProto(cmd)}}); sendErr != nil { + logger.Info("Command dispatched", "cmd", cmd.Name()) + if sendErr := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: protocol.CommandToProto(cmd)}}); sendErr != nil { logger.Error("Error sending command to gRPC API", "err", sendErr) } }, ClipboardAvailable: clipboardAvailable, - ConfigFilePath: "TODO", BuildInfo: domain.BuildInfo{ GoVersion: buildInfo.GoVersion, Version: version, @@ -118,7 +112,18 @@ func run() error { } defer ui.Close() - time.Sleep(10 * time.Minute) // Simulate long-running process + errUIClosed := errors.New("UI closed") + g.Go(func() error { + ui.Wait() + logger.Info("UI closed!") + return errUIClosed + }) - return nil + if err := g.Wait(); err == errUIClosed { + logger.Info("UI closed, exiting") + return nil + } else { + logger.Error("UI closed with error", "err", err) + return fmt.Errorf("errgroup.Wait: %w", err) + } } diff --git a/cmd/server/main.go b/cmd/server/main.go index 15b2ab2..2fdfcb9 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -125,7 +125,14 @@ func run() error { Logger: logger, }) - return app.Run(ctx) + if err := app.Run(ctx); err != nil { + if errors.Is(err, context.Canceled) && context.Cause(ctx) == errShutdown { + return errShutdown + } + return err + } + + return nil } // editConfigFile opens the config file in the user's editor. diff --git a/internal/app/app.go b/internal/app/app.go index 16aecea..49589c3 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -93,7 +93,7 @@ func (a *App) Run(ctx context.Context) error { } grpcServer := grpc.NewServer() grpcDone := make(chan error, 1) - pb.RegisterInternalAPIServer(grpcServer, server.Server{}) + pb.RegisterInternalAPIServer(grpcServer, server.New(a.DispatchAsync, a.eventBus, a.logger)) go func() { a.logger.Info("gRPC server started", "addr", grpcAddr) grpcDone <- grpcServer.Serve(lis) @@ -223,6 +223,11 @@ func (a *App) Dispatch(cmd event.Command) event.Event { return <-ch } +// DispatchAsync dispatches a command to be executed synchronously. +func (a *App) DispatchAsync(cmd event.Command) { + a.dispatchC <- cmd +} + // errExit is an error that indicates the app should exit. var errExit = errors.New("exit") @@ -250,6 +255,10 @@ func (a *App) handleCommand( } }() + if c, ok := cmd.(syncCommand); ok { + cmd = c.Command + } + switch c := cmd.(type) { case event.CommandAddDestination: newCfg := a.cfg @@ -263,6 +272,7 @@ func (a *App) handleCommand( } a.cfg = newCfg a.handleConfigUpdate(state) + a.logger.Info("Destination added", "url", c.URL) a.eventBus.Send(event.DestinationAddedEvent{URL: c.URL}) case event.CommandRemoveDestination: repl.StopDestination(c.URL) // no-op if not live diff --git a/internal/event/bus.go b/internal/event/bus.go index 0498c40..246f4ae 100644 --- a/internal/event/bus.go +++ b/internal/event/bus.go @@ -2,6 +2,7 @@ package event import ( "log/slog" + "slices" "sync" ) @@ -31,6 +32,20 @@ func (b *Bus) Register() <-chan Event { return ch } +func (b *Bus) Deregister(ch <-chan Event) { + b.mu.Lock() + defer b.mu.Unlock() + + b.consumers = slices.DeleteFunc(b.consumers, func(other chan Event) bool { + if ch == other { + close(other) + return true + } + + return false + }) +} + // Send sends an event to all registered consumers. func (b *Bus) Send(evt Event) { // The mutex is needed to ensure the backing array of b.consumers cannot be diff --git a/internal/event/bus_test.go b/internal/event/bus_test.go index cceae37..8c5bc78 100644 --- a/internal/event/bus_test.go +++ b/internal/event/bus_test.go @@ -6,6 +6,7 @@ import ( "git.netflux.io/rob/octoplex/internal/event" "git.netflux.io/rob/octoplex/internal/testhelpers" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestBus(t *testing.T) { @@ -25,5 +26,19 @@ func TestBus(t *testing.T) { }() assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent)) + assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent)) + assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent)) + assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent)) + + bus.Deregister(ch1) + + _, ok := <-ch1 + assert.False(t, ok) + + select { + case <-ch2: + require.Fail(t, "ch2 should be blocking") + default: + } } diff --git a/internal/event/command.go b/internal/event/command.go index 22b0090..295a302 100644 --- a/internal/event/command.go +++ b/internal/event/command.go @@ -50,6 +50,8 @@ func (c CommandCloseOtherInstance) Name() string { } // CommandQuit quits the app. +// +// TODO: consider how this should look in a client/server architecture. type CommandQuit struct{} // Name implements the Command interface. diff --git a/internal/generated/grpc/api.pb.go b/internal/generated/grpc/api.pb.go index 70fcdcc..59f1571 100644 --- a/internal/generated/grpc/api.pb.go +++ b/internal/generated/grpc/api.pb.go @@ -7,11 +7,12 @@ package grpc import ( + reflect "reflect" + sync "sync" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" timestamppb "google.golang.org/protobuf/types/known/timestamppb" - reflect "reflect" - sync "sync" ) const ( diff --git a/internal/protocol/command.go b/internal/protocol/command.go new file mode 100644 index 0000000..9540e9d --- /dev/null +++ b/internal/protocol/command.go @@ -0,0 +1,110 @@ +package protocol + +import ( + "git.netflux.io/rob/octoplex/internal/event" + pb "git.netflux.io/rob/octoplex/internal/generated/grpc" +) + +// CommandToProto converts a command to a protobuf message. +func CommandToProto(command event.Command) *pb.Command { + switch evt := command.(type) { + case event.CommandAddDestination: + return buildAddDestinationCommand(evt) + case event.CommandRemoveDestination: + return buildRemoveDestinationCommand(evt) + case event.CommandStartDestination: + return buildStartDestinationCommand(evt) + case event.CommandStopDestination: + return buildStopDestinationCommand(evt) + case event.CommandCloseOtherInstance: + return buildCloseOtherInstanceCommand(evt) + case event.CommandQuit: + return buildQuitCommand(evt) + default: + panic("unknown command type") + } +} + +func buildAddDestinationCommand(cmd event.CommandAddDestination) *pb.Command { + return &pb.Command{CommandType: &pb.Command_AddDestination{AddDestination: &pb.AddDestinationCommand{Name: cmd.DestinationName, Url: cmd.URL}}} +} + +func buildRemoveDestinationCommand(cmd event.CommandRemoveDestination) *pb.Command { + return &pb.Command{CommandType: &pb.Command_RemoveDestination{RemoveDestination: &pb.RemoveDestinationCommand{Url: cmd.URL}}} +} + +func buildStartDestinationCommand(cmd event.CommandStartDestination) *pb.Command { + return &pb.Command{CommandType: &pb.Command_StartDestination{StartDestination: &pb.StartDestinationCommand{Url: cmd.URL}}} +} + +func buildStopDestinationCommand(cmd event.CommandStopDestination) *pb.Command { + return &pb.Command{CommandType: &pb.Command_StopDestination{StopDestination: &pb.StopDestinationCommand{Url: cmd.URL}}} +} + +func buildCloseOtherInstanceCommand(event.CommandCloseOtherInstance) *pb.Command { + return &pb.Command{CommandType: &pb.Command_CloseOtherInstances{CloseOtherInstances: &pb.CloseOtherInstancesCommand{}}} +} + +func buildQuitCommand(event.CommandQuit) *pb.Command { + return &pb.Command{CommandType: &pb.Command_Quit{Quit: &pb.QuitCommand{}}} +} + +// CommandFromProto converts a protobuf message to a command. +func CommandFromProto(pbCmd *pb.Command) event.Command { + if pbCmd == nil || pbCmd.CommandType == nil { + panic("invalid or nil pb.Command") + } + + switch cmd := pbCmd.CommandType.(type) { + case *pb.Command_AddDestination: + return parseAddDestinationCommand(cmd.AddDestination) + case *pb.Command_RemoveDestination: + return parseRemoveDestinationCommand(cmd.RemoveDestination) + case *pb.Command_StartDestination: + return parseStartDestinationCommand(cmd.StartDestination) + case *pb.Command_StopDestination: + return parseStopDestinationCommand(cmd.StopDestination) + case *pb.Command_CloseOtherInstances: + return parseCloseOtherInstanceCommand(cmd.CloseOtherInstances) + case *pb.Command_Quit: + return parseQuitCommand(cmd.Quit) + default: + panic("unknown pb.Command type") + } +} + +func parseAddDestinationCommand(cmd *pb.AddDestinationCommand) event.Command { + if cmd == nil { + panic("nil AddDestinationCommand") + } + return event.CommandAddDestination{DestinationName: cmd.Name, URL: cmd.Url} +} + +func parseRemoveDestinationCommand(cmd *pb.RemoveDestinationCommand) event.Command { + if cmd == nil { + panic("nil RemoveDestinationCommand") + } + return event.CommandRemoveDestination{URL: cmd.Url} +} + +func parseStartDestinationCommand(cmd *pb.StartDestinationCommand) event.Command { + if cmd == nil { + panic("nil StartDestinationCommand") + } + return event.CommandStartDestination{URL: cmd.Url} +} + +func parseStopDestinationCommand(cmd *pb.StopDestinationCommand) event.Command { + if cmd == nil { + panic("nil StopDestinationCommand") + } + return event.CommandStopDestination{URL: cmd.Url} +} + +func parseCloseOtherInstanceCommand(_ *pb.CloseOtherInstancesCommand) event.Command { + return event.CommandCloseOtherInstance{} +} + +func parseQuitCommand(_ *pb.QuitCommand) event.Command { + return event.CommandQuit{} +} diff --git a/internal/protocol/domain.go b/internal/protocol/domain.go new file mode 100644 index 0000000..d906df7 --- /dev/null +++ b/internal/protocol/domain.go @@ -0,0 +1,121 @@ +package protocol + +import ( + "errors" + + "git.netflux.io/rob/octoplex/internal/domain" + pb "git.netflux.io/rob/octoplex/internal/generated/grpc" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func containerToProto(c domain.Container) *pb.Container { + var errString string + if c.Err != nil { + errString = c.Err.Error() + } + + var exitCode *int32 + if c.ExitCode != nil { + code := int32(*c.ExitCode) + exitCode = &code + } + + return &pb.Container{ + Id: c.ID, + Status: c.Status, + HealthState: c.HealthState, + CpuPercent: c.CPUPercent, + MemoryUsageBytes: c.MemoryUsageBytes, + RxRate: int32(c.RxRate), + TxRate: int32(c.TxRate), + RxSince: timestamppb.New(c.RxSince), + ImageName: c.ImageName, + PullStatus: c.PullStatus, + PullProgress: c.PullProgress, + PullPercent: int32(c.PullPercent), + RestartCount: int32(c.RestartCount), + ExitCode: exitCode, + Err: errString, + } +} +func protoToContainer(pbCont *pb.Container) domain.Container { + if pbCont == nil { + return domain.Container{} + } + + var exitCode *int + if pbCont.ExitCode != nil { + val := int(*pbCont.ExitCode) + exitCode = &val + } + + var err error + if pbCont.Err != "" { + err = errors.New(pbCont.Err) + } + + return domain.Container{ + ID: pbCont.Id, + Status: pbCont.Status, + HealthState: pbCont.HealthState, + CPUPercent: pbCont.CpuPercent, + MemoryUsageBytes: pbCont.MemoryUsageBytes, + RxRate: int(pbCont.RxRate), + TxRate: int(pbCont.TxRate), + RxSince: pbCont.RxSince.AsTime(), + ImageName: pbCont.ImageName, + PullStatus: pbCont.PullStatus, + PullProgress: pbCont.PullProgress, + PullPercent: int(pbCont.PullPercent), + RestartCount: int(pbCont.RestartCount), + ExitCode: exitCode, + Err: err, + } +} + +func destinationsToProto(inDests []domain.Destination) []*pb.Destination { + destinations := make([]*pb.Destination, 0, len(inDests)) + for _, d := range inDests { + destinations = append(destinations, destinationToProto(d)) + } + return destinations +} + +func destinationToProto(d domain.Destination) *pb.Destination { + return &pb.Destination{ + Container: containerToProto(d.Container), + Status: destinationStatusToProto(d.Status), + Name: d.Name, + Url: d.URL, + } +} + +func protoToDestinations(pbDests []*pb.Destination) []domain.Destination { + if pbDests == nil { + return nil + } + + dests := make([]domain.Destination, 0, len(pbDests)) + for _, pbDest := range pbDests { + if pbDest == nil { + continue + } + dests = append(dests, domain.Destination{ + Container: protoToContainer(pbDest.Container), + Status: domain.DestinationStatus(pbDest.Status), // direct cast, same underlying int + Name: pbDest.Name, + URL: pbDest.Url, + }) + } + return dests +} +func destinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status { + switch s { + case domain.DestinationStatusStarting: + return pb.Destination_STATUS_STARTING + case domain.DestinationStatusLive: + return pb.Destination_STATUS_LIVE + default: + return pb.Destination_STATUS_OFF_AIR + } +} diff --git a/internal/client/protocol.go b/internal/protocol/event.go similarity index 51% rename from internal/client/protocol.go rename to internal/protocol/event.go index b210b87..8be3ccd 100644 --- a/internal/client/protocol.go +++ b/internal/protocol/event.go @@ -1,12 +1,12 @@ -// TODO: move protocol to a separate package -package client +package protocol import ( + "errors" + "git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/event" - "google.golang.org/protobuf/types/known/timestamppb" - pb "git.netflux.io/rob/octoplex/internal/generated/grpc" + "google.golang.org/protobuf/types/known/timestamppb" ) // EventToProto converts an event to a protobuf message. @@ -37,26 +37,6 @@ func EventToProto(ev event.Event) *pb.Event { } } -// CommandToProto converts a command to a protobuf message. -func CommandToProto(command event.Command) *pb.Command { - switch evt := command.(type) { - case event.CommandAddDestination: - return buildAddDestinationCommand(evt) - case event.CommandRemoveDestination: - return buildRemoveDestinationCommand(evt) - case event.CommandStartDestination: - return buildStartDestinationCommand(evt) - case event.CommandStopDestination: - return buildStopDestinationCommand(evt) - case event.CommandCloseOtherInstance: - return buildCloseOtherInstanceCommand(evt) - case event.CommandQuit: - return buildQuitCommand(evt) - default: - panic("unknown command type") - } -} - func buildAppStateChangeEvent(evt event.AppStateChangedEvent) *pb.Event { return &pb.Event{ EventType: &pb.Event_AppStateChanged{ @@ -154,85 +134,119 @@ func buildMediaServerStartedEvent(evt event.MediaServerStartedEvent) *pb.Event { } } -func containerToProto(c domain.Container) *pb.Container { - var errString string - if c.Err != nil { - errString = c.Err.Error() +// EventFromProto converts a protobuf message to an event. +func EventFromProto(pbEv *pb.Event) event.Event { + if pbEv == nil || pbEv.EventType == nil { + panic("invalid or nil pb.Event") } - var exitCode *int32 - if c.ExitCode != nil { - code := int32(*c.ExitCode) - exitCode = &code - } - - return &pb.Container{ - Id: c.ID, - Status: c.Status, - HealthState: c.HealthState, - CpuPercent: c.CPUPercent, - MemoryUsageBytes: c.MemoryUsageBytes, - RxRate: int32(c.RxRate), - TxRate: int32(c.TxRate), - RxSince: timestamppb.New(c.RxSince), - ImageName: c.ImageName, - PullStatus: c.PullStatus, - PullProgress: c.PullProgress, - PullPercent: int32(c.PullPercent), - RestartCount: int32(c.RestartCount), - ExitCode: exitCode, - Err: errString, - } -} - -func destinationsToProto(inDests []domain.Destination) []*pb.Destination { - destinations := make([]*pb.Destination, 0, len(inDests)) - for _, d := range inDests { - destinations = append(destinations, destinationToProto(d)) - } - return destinations -} - -func destinationToProto(d domain.Destination) *pb.Destination { - return &pb.Destination{ - Container: containerToProto(d.Container), - Status: destinationStatusToProto(d.Status), - Name: d.Name, - Url: d.URL, - } -} - -func destinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status { - switch s { - case domain.DestinationStatusStarting: - return pb.Destination_STATUS_STARTING - case domain.DestinationStatusLive: - return pb.Destination_STATUS_LIVE + switch evt := pbEv.EventType.(type) { + case *pb.Event_AppStateChanged: + return parseAppStateChangedEvent(evt.AppStateChanged) + case *pb.Event_DestinationAdded: + return parseDestinationAddedEvent(evt.DestinationAdded) + case *pb.Event_AddDestinationFailed: + return parseAddDestinationFailedEvent(evt.AddDestinationFailed) + case *pb.Event_DestinationStreamExited: + return parseDestinationStreamExitedEvent(evt.DestinationStreamExited) + case *pb.Event_StartDestinationFailed: + return parseStartDestinationFailedEvent(evt.StartDestinationFailed) + case *pb.Event_DestinationRemoved: + return parseDestinationRemovedEvent(evt.DestinationRemoved) + case *pb.Event_RemoveDestinationFailed: + return parseRemoveDestinationFailedEvent(evt.RemoveDestinationFailed) + case *pb.Event_FatalError: + return parseFatalErrorOccurredEvent(evt.FatalError) + case *pb.Event_OtherInstanceDetected: + return parseOtherInstanceDetectedEvent(evt.OtherInstanceDetected) + case *pb.Event_MediaServerStarted: + return parseMediaServerStartedEvent(evt.MediaServerStarted) default: - return pb.Destination_STATUS_OFF_AIR + panic("unknown pb.Event type") } } -func buildAddDestinationCommand(cmd event.CommandAddDestination) *pb.Command { - return &pb.Command{CommandType: &pb.Command_AddDestination{AddDestination: &pb.AddDestinationCommand{Url: cmd.URL}}} +func parseAppStateChangedEvent(evt *pb.AppStateChangedEvent) event.Event { + if evt == nil || evt.AppState == nil || evt.AppState.Source == nil { + panic("invalid AppStateChangedEvent") + } + + return event.AppStateChangedEvent{ + State: domain.AppState{ + Source: domain.Source{ + Container: protoToContainer(evt.AppState.Source.Container), + Live: evt.AppState.Source.Live, + LiveChangedAt: evt.AppState.Source.LiveChangedAt.AsTime(), + Tracks: evt.AppState.Source.Tracks, + ExitReason: evt.AppState.Source.ExitReason, + }, + Destinations: protoToDestinations(evt.AppState.Destinations), + BuildInfo: domain.BuildInfo{ + GoVersion: evt.AppState.BuildInfo.GoVersion, + Version: evt.AppState.BuildInfo.Version, + Commit: evt.AppState.BuildInfo.Commit, + Date: evt.AppState.BuildInfo.Date, + }, + }, + } } -func buildRemoveDestinationCommand(cmd event.CommandRemoveDestination) *pb.Command { - return &pb.Command{CommandType: &pb.Command_RemoveDestination{RemoveDestination: &pb.RemoveDestinationCommand{Url: cmd.URL}}} +func parseDestinationAddedEvent(evt *pb.DestinationAddedEvent) event.Event { + if evt == nil { + panic("nil DestinationAddedEvent") + } + return event.DestinationAddedEvent{URL: evt.Url} } -func buildStartDestinationCommand(cmd event.CommandStartDestination) *pb.Command { - return &pb.Command{CommandType: &pb.Command_StartDestination{StartDestination: &pb.StartDestinationCommand{Url: cmd.URL}}} +func parseAddDestinationFailedEvent(evt *pb.AddDestinationFailedEvent) event.Event { + if evt == nil { + panic("nil AddDestinationFailedEvent") + } + return event.AddDestinationFailedEvent{URL: evt.Url, Err: errors.New(evt.Error)} } -func buildStopDestinationCommand(cmd event.CommandStopDestination) *pb.Command { - return &pb.Command{CommandType: &pb.Command_StopDestination{StopDestination: &pb.StopDestinationCommand{Url: cmd.URL}}} +func parseDestinationStreamExitedEvent(evt *pb.DestinationStreamExitedEvent) event.Event { + if evt == nil { + panic("nil DestinationStreamExitedEvent") + } + return event.DestinationStreamExitedEvent{Name: evt.Name, Err: errors.New(evt.Error)} } -func buildCloseOtherInstanceCommand(event.CommandCloseOtherInstance) *pb.Command { - return &pb.Command{CommandType: &pb.Command_CloseOtherInstances{CloseOtherInstances: &pb.CloseOtherInstancesCommand{}}} +func parseStartDestinationFailedEvent(evt *pb.StartDestinationFailedEvent) event.Event { + if evt == nil { + panic("nil StartDestinationFailedEvent") + } + return event.StartDestinationFailedEvent{URL: evt.Url, Message: evt.Message} } -func buildQuitCommand(event.CommandQuit) *pb.Command { - return &pb.Command{CommandType: &pb.Command_Quit{Quit: &pb.QuitCommand{}}} +func parseDestinationRemovedEvent(evt *pb.DestinationRemovedEvent) event.Event { + if evt == nil { + panic("nil DestinationRemovedEvent") + } + return event.DestinationRemovedEvent{URL: evt.Url} +} + +func parseRemoveDestinationFailedEvent(evt *pb.RemoveDestinationFailedEvent) event.Event { + if evt == nil { + panic("nil RemoveDestinationFailedEvent") + } + return event.RemoveDestinationFailedEvent{URL: evt.Url, Err: errors.New(evt.Error)} +} + +func parseFatalErrorOccurredEvent(evt *pb.FatalErrorEvent) event.Event { + if evt == nil { + panic("nil FatalErrorEvent") + } + return event.FatalErrorOccurredEvent{Message: evt.Message} +} + +func parseOtherInstanceDetectedEvent(_ *pb.OtherInstanceDetectedEvent) event.Event { + return event.OtherInstanceDetectedEvent{} +} + +func parseMediaServerStartedEvent(evt *pb.MediaServerStartedEvent) event.Event { + if evt == nil { + panic("nil MediaServerStartedEvent") + } + return event.MediaServerStartedEvent{RTMPURL: evt.RtmpUrl, RTMPSURL: evt.RtmpsUrl} } diff --git a/internal/server/server.go b/internal/server/server.go index 239efe4..c07ac63 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1,7 +1,86 @@ package server -import pb "git.netflux.io/rob/octoplex/internal/generated/grpc" +import ( + "context" + "errors" + "fmt" + "io" + + "git.netflux.io/rob/octoplex/internal/event" + pb "git.netflux.io/rob/octoplex/internal/generated/grpc" + "git.netflux.io/rob/octoplex/internal/protocol" + "github.com/sagikazarmark/slog-shim" + "golang.org/x/sync/errgroup" +) type Server struct { pb.UnimplementedInternalAPIServer + + dispatcher func(event.Command) + bus *event.Bus + logger *slog.Logger +} + +func New( + dispatcher func(event.Command), + bus *event.Bus, + logger *slog.Logger, +) *Server { + return &Server{ + dispatcher: dispatcher, + bus: bus, + logger: logger.With("component", "server"), + } +} + +func (s *Server) Communicate(stream pb.InternalAPI_CommunicateServer) error { + g, ctx := errgroup.WithContext(stream.Context()) + + g.Go(func() error { + eventsC := s.bus.Register() + defer s.bus.Deregister(eventsC) + + for { + select { + case evt := <-eventsC: + if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: protocol.EventToProto(evt)}}); err != nil { + return fmt.Errorf("send event: %w", err) + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + g.Go(func() error { + for { + in, err := stream.Recv() + if err == io.EOF { + s.logger.Info("Client disconnected") + return err + } + + if err != nil { + return fmt.Errorf("receive message: %w", err) + } + + switch pbCmd := in.Payload.(type) { + case *pb.Envelope_Command: + cmd := protocol.CommandFromProto(pbCmd.Command) + s.logger.Info("Received command", "command", cmd.Name()) + s.dispatcher(cmd) + default: + return fmt.Errorf("expected command but got: %T", pbCmd) + } + } + }) + + if err := g.Wait(); err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { + s.logger.Error("Error in gRPC stream handler, exiting", "err", err) + return fmt.Errorf("errgroup.Wait: %w", err) + } + + s.logger.Info("Client stream closed") + + return nil } diff --git a/internal/terminal/terminal.go b/internal/terminal/terminal.go index fd7cae9..dc0a289 100644 --- a/internal/terminal/terminal.go +++ b/internal/terminal/terminal.go @@ -44,9 +44,9 @@ type UI struct { eventBus *event.Bus dispatch func(event.Command) clipboardAvailable bool - configFilePath string rtmpURL, rtmpsURL string buildInfo domain.BuildInfo + doneC chan struct{} logger *slog.Logger // tview state @@ -99,7 +99,6 @@ type StartParams struct { Dispatcher func(event.Command) Logger *slog.Logger ClipboardAvailable bool - ConfigFilePath string BuildInfo domain.BuildInfo Screen *Screen // Screen may be nil. } @@ -211,7 +210,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { eventBus: params.EventBus, dispatch: params.Dispatcher, clipboardAvailable: params.ClipboardAvailable, - configFilePath: params.ConfigFilePath, + doneC: make(chan struct{}, 1), buildInfo: params.BuildInfo, logger: params.Logger, app: app, @@ -262,32 +261,27 @@ func (ui *UI) renderAboutView() { ui.aboutView.AddItem(rtmpsURLView, 1, 0, false) } - ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]c[-] Copy config file path"), 1, 0, false) ui.aboutView.AddItem(tview.NewTextView().SetDynamicColors(true).SetText("[grey]?[-] About"), 1, 0, false) } func (ui *UI) run(ctx context.Context) { - defer func() { - // Ensure the application is stopped when the UI is closed. - ui.dispatch(event.CommandQuit{}) - }() - eventC := ui.eventBus.Register() + defer ui.eventBus.Deregister(eventC) - uiDone := make(chan struct{}) go func() { - defer func() { - uiDone <- struct{}{} - }() + defer close(ui.doneC) if err := ui.app.Run(); err != nil { - ui.logger.Error("tui application error", "err", err) + ui.logger.Error("Error in UI run loop, exiting", "err", err) } }() for { select { - case evt := <-eventC: + case evt, ok := <-eventC: + if !ok { + return + } ui.app.QueueUpdateDraw(func() { switch evt := evt.(type) { case event.AppStateChangedEvent: @@ -317,7 +311,7 @@ func (ui *UI) run(ctx context.Context) { }) case <-ctx.Done(): return - case <-uiDone: + case <-ui.doneC: return } } @@ -358,8 +352,6 @@ func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey { return nil case ' ': ui.toggleDestination() - case 'c', 'C': - ui.copyConfigFilePathToClipboard(ui.clipboardAvailable, ui.configFilePath) case '?': ui.showAbout() case 'k': // tview vim bindings @@ -825,6 +817,11 @@ func (ui *UI) Close() { ui.app.Stop() } +// Wait waits for the terminal user interface to finish. +func (ui *UI) Wait() { + <-ui.doneC +} + func (ui *UI) addDestination() { const ( inputLen = 60 @@ -1006,28 +1003,6 @@ func (ui *UI) copySourceURLToClipboard(url string) { ) } -func (ui *UI) copyConfigFilePathToClipboard(clipboardAvailable bool, configFilePath string) { - var text string - if clipboardAvailable { - if configFilePath != "" { - clipboard.Write(clipboard.FmtText, []byte(configFilePath)) - text = "Configuration file path copied to clipboard:\n\n" + configFilePath - } else { - text = "Configuration file path not set" - } - } else { - text = "Copy to clipboard not available" - } - - ui.showModal( - pageNameModalClipboard, - text, - []string{"Ok"}, - false, - nil, - ) -} - func (ui *UI) confirmQuit() { ui.showModal( pageNameModalQuit, @@ -1036,7 +1011,7 @@ func (ui *UI) confirmQuit() { false, func(buttonIndex int, _ string) { if buttonIndex == 0 { - ui.dispatch(event.CommandQuit{}) + ui.app.Stop() } }, )