fixup! wip: refactor: API

This commit is contained in:
Rob Watson 2025-05-08 07:08:43 +02:00
parent 0a6b9fad90
commit 6ceb286e6b
11 changed files with 2718 additions and 1711 deletions

View File

@ -6,11 +6,16 @@ import (
"log/slog"
"os"
"runtime/debug"
"time"
"git.netflux.io/rob/octoplex/internal/client"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/terminal"
"golang.design/x/clipboard"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
var (
@ -53,11 +58,50 @@ func run() error {
return fmt.Errorf("read build info: %w", err)
}
conn, err := grpc.NewClient("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("connect to gRPC server: %w", err)
}
apiClient := pb.NewInternalAPIClient(conn)
stream, err := apiClient.Communicate(ctx)
if err != nil {
return fmt.Errorf("create gRPC stream: %w", err)
}
go func() {
for evt := range bus.Register() {
if sendErr := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: client.EventToProto(evt)}}); sendErr != nil {
logger.Error("Error sending event to gRPC API", "err", sendErr)
}
}
}()
go func() {
for {
envelope, recErr := stream.Recv()
if recErr != nil {
logger.Error("Error receiving envelope from gRPC API", "err", recErr)
continue
}
evt := envelope.GetEvent()
if evt == nil {
logger.Error("Received envelope without event")
continue
}
logger.Info("Received event from gRPC API", "event", evt)
// TODO: convert to domain event
}
}()
ui, err := terminal.StartUI(ctx, terminal.StartParams{
EventBus: bus,
Dispatcher: func(cmd event.Command) {
// TODO: this must call the gRPC client
logger.Info("Command dispatched", "cmd", cmd)
if sendErr := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Command{Command: client.CommandToProto(cmd)}}); sendErr != nil {
logger.Error("Error sending command to gRPC API", "err", sendErr)
}
},
ClipboardAvailable: clipboardAvailable,
ConfigFilePath: "TODO",
@ -74,5 +118,7 @@ func run() error {
}
defer ui.Close()
time.Sleep(10 * time.Minute) // Simulate long-running process
return nil
}

View File

@ -15,7 +15,7 @@ import (
"git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc/proto"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/mediaserver"
"git.netflux.io/rob/octoplex/internal/replicator"
"git.netflux.io/rob/octoplex/internal/server"
@ -259,7 +259,7 @@ func (a *App) handleCommand(
})
if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Add destination failed", "err", err)
return event.AddDestinationFailedEvent{Err: err}, nil
return event.AddDestinationFailedEvent{URL: c.URL, Err: err}, nil
}
a.cfg = newCfg
a.handleConfigUpdate(state)
@ -272,7 +272,7 @@ func (a *App) handleCommand(
})
if err := a.configService.SetConfig(newCfg); err != nil {
a.logger.Error("Remove destination failed", "err", err)
a.eventBus.Send(event.RemoveDestinationFailedEvent{Err: err})
a.eventBus.Send(event.RemoveDestinationFailedEvent{URL: c.URL, Err: err})
break
}
a.cfg = newCfg
@ -280,7 +280,7 @@ func (a *App) handleCommand(
a.eventBus.Send(event.DestinationRemovedEvent{URL: c.URL}) //nolint:gosimple
case event.CommandStartDestination:
if !state.Source.Live {
a.eventBus.Send(event.StartDestinationFailedEvent{})
a.eventBus.Send(event.StartDestinationFailedEvent{URL: c.URL, Message: "source not live"})
break
}

238
internal/client/protocol.go Normal file
View File

@ -0,0 +1,238 @@
// TODO: move protocol to a separate package
package client
import (
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
"google.golang.org/protobuf/types/known/timestamppb"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
)
// EventToProto converts an event to a protobuf message.
func EventToProto(ev event.Event) *pb.Event {
switch evt := ev.(type) {
case event.AppStateChangedEvent:
return buildAppStateChangeEvent(evt)
case event.DestinationAddedEvent:
return buildDestinationAddedEvent(evt)
case event.AddDestinationFailedEvent:
return buildAddDestinationFailedEvent(evt)
case event.DestinationStreamExitedEvent:
return buildDestinationStreamExitedEvent(evt)
case event.StartDestinationFailedEvent:
return buildStartDestinationFailedEvent(evt)
case event.DestinationRemovedEvent:
return buildDestinationRemovedEvent(evt)
case event.RemoveDestinationFailedEvent:
return buildRemoveDestinationFailedEvent(evt)
case event.FatalErrorOccurredEvent:
return buildFatalErrorOccurredEvent(evt)
case event.OtherInstanceDetectedEvent:
return buildOtherInstanceDetectedEvent(evt)
case event.MediaServerStartedEvent:
return buildMediaServerStartedEvent(evt)
default:
panic("unknown event type")
}
}
// CommandToProto converts a command to a protobuf message.
func CommandToProto(command event.Command) *pb.Command {
switch evt := command.(type) {
case event.CommandAddDestination:
return buildAddDestinationCommand(evt)
case event.CommandRemoveDestination:
return buildRemoveDestinationCommand(evt)
case event.CommandStartDestination:
return buildStartDestinationCommand(evt)
case event.CommandStopDestination:
return buildStopDestinationCommand(evt)
case event.CommandCloseOtherInstance:
return buildCloseOtherInstanceCommand(evt)
case event.CommandQuit:
return buildQuitCommand(evt)
default:
panic("unknown command type")
}
}
func buildAppStateChangeEvent(evt event.AppStateChangedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_AppStateChanged{
AppStateChanged: &pb.AppStateChangedEvent{
AppState: &pb.AppState{
Source: &pb.Source{
Container: containerToProto(evt.State.Source.Container),
Live: evt.State.Source.Live,
LiveChangedAt: timestamppb.New(evt.State.Source.LiveChangedAt),
Tracks: evt.State.Source.Tracks,
ExitReason: evt.State.Source.ExitReason,
},
Destinations: destinationsToProto(evt.State.Destinations),
BuildInfo: &pb.BuildInfo{
GoVersion: evt.State.BuildInfo.GoVersion,
Version: evt.State.BuildInfo.Version,
Commit: evt.State.BuildInfo.Commit,
Date: evt.State.BuildInfo.Date,
},
},
},
},
}
}
func buildDestinationAddedEvent(evt event.DestinationAddedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_DestinationAdded{
DestinationAdded: &pb.DestinationAddedEvent{Url: evt.URL},
},
}
}
func buildAddDestinationFailedEvent(evt event.AddDestinationFailedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_AddDestinationFailed{
AddDestinationFailed: &pb.AddDestinationFailedEvent{Url: evt.URL, Error: evt.Err.Error()},
},
}
}
func buildDestinationStreamExitedEvent(evt event.DestinationStreamExitedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_DestinationStreamExited{
DestinationStreamExited: &pb.DestinationStreamExitedEvent{Name: evt.Name, Error: evt.Err.Error()},
},
}
}
func buildStartDestinationFailedEvent(evt event.StartDestinationFailedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_StartDestinationFailed{
StartDestinationFailed: &pb.StartDestinationFailedEvent{Url: evt.URL, Message: evt.Message},
},
}
}
func buildDestinationRemovedEvent(evt event.DestinationRemovedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_DestinationRemoved{
DestinationRemoved: &pb.DestinationRemovedEvent{Url: evt.URL},
},
}
}
func buildRemoveDestinationFailedEvent(evt event.RemoveDestinationFailedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_RemoveDestinationFailed{
RemoveDestinationFailed: &pb.RemoveDestinationFailedEvent{Url: evt.URL, Error: evt.Err.Error()},
},
}
}
func buildFatalErrorOccurredEvent(evt event.FatalErrorOccurredEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_FatalError{
FatalError: &pb.FatalErrorEvent{Message: evt.Message},
},
}
}
func buildOtherInstanceDetectedEvent(_ event.OtherInstanceDetectedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_OtherInstanceDetected{
OtherInstanceDetected: &pb.OtherInstanceDetectedEvent{},
},
}
}
func buildMediaServerStartedEvent(evt event.MediaServerStartedEvent) *pb.Event {
return &pb.Event{
EventType: &pb.Event_MediaServerStarted{
MediaServerStarted: &pb.MediaServerStartedEvent{RtmpUrl: evt.RTMPURL, RtmpsUrl: evt.RTMPSURL},
},
}
}
func containerToProto(c domain.Container) *pb.Container {
var errString string
if c.Err != nil {
errString = c.Err.Error()
}
var exitCode *int32
if c.ExitCode != nil {
code := int32(*c.ExitCode)
exitCode = &code
}
return &pb.Container{
Id: c.ID,
Status: c.Status,
HealthState: c.HealthState,
CpuPercent: c.CPUPercent,
MemoryUsageBytes: c.MemoryUsageBytes,
RxRate: int32(c.RxRate),
TxRate: int32(c.TxRate),
RxSince: timestamppb.New(c.RxSince),
ImageName: c.ImageName,
PullStatus: c.PullStatus,
PullProgress: c.PullProgress,
PullPercent: int32(c.PullPercent),
RestartCount: int32(c.RestartCount),
ExitCode: exitCode,
Err: errString,
}
}
func destinationsToProto(inDests []domain.Destination) []*pb.Destination {
destinations := make([]*pb.Destination, 0, len(inDests))
for _, d := range inDests {
destinations = append(destinations, destinationToProto(d))
}
return destinations
}
func destinationToProto(d domain.Destination) *pb.Destination {
return &pb.Destination{
Container: containerToProto(d.Container),
Status: destinationStatusToProto(d.Status),
Name: d.Name,
Url: d.URL,
}
}
func destinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status {
switch s {
case domain.DestinationStatusStarting:
return pb.Destination_STATUS_STARTING
case domain.DestinationStatusLive:
return pb.Destination_STATUS_LIVE
default:
return pb.Destination_STATUS_OFF_AIR
}
}
func buildAddDestinationCommand(cmd event.CommandAddDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_AddDestination{AddDestination: &pb.AddDestinationCommand{Url: cmd.URL}}}
}
func buildRemoveDestinationCommand(cmd event.CommandRemoveDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_RemoveDestination{RemoveDestination: &pb.RemoveDestinationCommand{Url: cmd.URL}}}
}
func buildStartDestinationCommand(cmd event.CommandStartDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_StartDestination{StartDestination: &pb.StartDestinationCommand{Url: cmd.URL}}}
}
func buildStopDestinationCommand(cmd event.CommandStopDestination) *pb.Command {
return &pb.Command{CommandType: &pb.Command_StopDestination{StopDestination: &pb.StopDestinationCommand{Url: cmd.URL}}}
}
func buildCloseOtherInstanceCommand(event.CommandCloseOtherInstance) *pb.Command {
return &pb.Command{CommandType: &pb.Command_CloseOtherInstances{CloseOtherInstances: &pb.CloseOtherInstancesCommand{}}}
}
func buildQuitCommand(event.CommandQuit) *pb.Command {
return &pb.Command{CommandType: &pb.Command_Quit{Quit: &pb.QuitCommand{}}}
}

View File

@ -42,6 +42,7 @@ func (e DestinationAddedEvent) name() Name {
// AddDestinationFailedEvent is emitted when a destination fails to be added.
type AddDestinationFailedEvent struct {
URL string
Err error
}
@ -60,7 +61,10 @@ func (e DestinationStreamExitedEvent) name() Name {
}
// StartDestinationFailedEvent is emitted when a destination fails to start.
type StartDestinationFailedEvent struct{}
type StartDestinationFailedEvent struct {
URL string
Message string
}
func (e StartDestinationFailedEvent) name() Name {
return EventNameStartDestinationFailed
@ -79,6 +83,7 @@ func (e DestinationRemovedEvent) name() Name {
// RemoveDestinationFailedEvent is emitted when a destination fails to be
// removed.
type RemoveDestinationFailedEvent struct {
URL string
Err error
}

File diff suppressed because it is too large Load Diff

View File

@ -1,10 +1,10 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.6
// source: proto/api.proto
// - protoc v6.30.1
// source: api.proto
package proto
package grpc
import (
context "context"
@ -65,19 +65,21 @@ func (x *internalAPICommunicateClient) Recv() (*Envelope, error) {
}
// InternalAPIServer is the server API for InternalAPI service.
// All implementations should embed UnimplementedInternalAPIServer
// All implementations must embed UnimplementedInternalAPIServer
// for forward compatibility
type InternalAPIServer interface {
Communicate(InternalAPI_CommunicateServer) error
mustEmbedUnimplementedInternalAPIServer()
}
// UnimplementedInternalAPIServer should be embedded to have forward compatible implementations.
// UnimplementedInternalAPIServer must be embedded to have forward compatible implementations.
type UnimplementedInternalAPIServer struct {
}
func (UnimplementedInternalAPIServer) Communicate(InternalAPI_CommunicateServer) error {
return status.Errorf(codes.Unimplemented, "method Communicate not implemented")
}
func (UnimplementedInternalAPIServer) mustEmbedUnimplementedInternalAPIServer() {}
// UnsafeInternalAPIServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to InternalAPIServer will
@ -131,5 +133,5 @@ var InternalAPI_ServiceDesc = grpc.ServiceDesc{
ClientStreams: true,
},
},
Metadata: "proto/api.proto",
Metadata: "api.proto",
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1 @@
package server

View File

@ -1,6 +1,6 @@
package server
import pb "git.netflux.io/rob/octoplex/internal/generated/grpc/proto"
import pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
type Server struct {
pb.UnimplementedInternalAPIServer

View File

@ -44,5 +44,5 @@ alias = "m"
[tasks.generate_proto]
description = "Generate gRPC files from proto"
dir = "{{cwd}}"
run = "protoc --go_out=./generated/grpc --go-grpc_out=./generated/grpc proto/*.proto"
run = "protoc -I proto --go_out=paths=source_relative:internal/generated/grpc --go-grpc_out=paths=source_relative:internal/generated/grpc proto/api.proto"
alias = "p"

View File

@ -2,7 +2,9 @@ syntax = "proto3";
package api;
option go_package = "git.netflux.io/rob/octoplex/internal/generated/grpc/proto";
import "google/protobuf/timestamp.proto";
option go_package = "git.netflux.io/rob/octoplex/internal/generated/grpc";
service InternalAPI {
rpc Communicate(stream Envelope) returns (stream Envelope);
@ -17,7 +19,7 @@ message Envelope {
message Command {
oneof command_type {
AddDestinationCommand add_destinaion = 1;
AddDestinationCommand add_destination = 1;
RemoveDestinationCommand remove_destination = 2;
StartDestinationCommand start_destination = 3;
StopDestinationCommand stop_destination = 4;
@ -62,8 +64,61 @@ message Event {
}
}
// TODO: complete
message AppStateChangedEvent {}
message Container {
string id = 1;
string status = 2;
string health_state = 3;
double cpu_percent = 4;
uint64 memory_usage_bytes = 5;
int32 rx_rate = 6;
int32 tx_rate = 7;
google.protobuf.Timestamp rx_since = 8;
string image_name = 9;
string pull_status = 10;
string pull_progress = 11;
int32 pull_percent = 12;
int32 restart_count = 13;
optional int32 exit_code = 14;
string err = 15;
}
message Source {
Container container = 1;
bool live = 2;
google.protobuf.Timestamp live_changed_at = 3;
repeated string tracks = 4;
string exit_reason = 5;
}
message Destination {
enum Status {
STATUS_OFF_AIR = 0;
STATUS_STARTING = 1;
STATUS_LIVE = 2;
}
Container container = 1;
Status status = 2;
string name = 3;
string url = 4;
}
message BuildInfo {
string go_version = 1;
string version = 2;
string commit = 3;
string date = 4;
}
message AppState {
Source source = 1;
repeated Destination destinations = 2;
BuildInfo build_info = 3;
}
message AppStateChangedEvent {
AppState app_state = 1;
}
message DestinationStreamExitedEvent {
string name = 1;
@ -90,7 +145,7 @@ message RemoveDestinationFailedEvent {
message StartDestinationFailedEvent {
string url = 1;
string error = 2;
string message = 2;
}
message MediaServerStartedEvent {
@ -101,5 +156,5 @@ message MediaServerStartedEvent {
message OtherInstanceDetectedEvent {}
message FatalErrorEvent {
string error = 1;
string message = 1;
}