From 492f44dc0ba5d86ff980ac78fb5dd66d7fac4a2b Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Wed, 14 May 2025 00:31:14 +0200 Subject: [PATCH] fixup! wip: refactor: API --- go.mod | 2 +- go.sum | 4 +- internal/protocol/command_test.go | 180 ++++++++++++++++++++ internal/protocol/domain.go | 70 ++++---- internal/protocol/domain_test.go | 222 +++++++++++++++++++++++++ internal/protocol/event.go | 23 ++- internal/protocol/event_test.go | 267 ++++++++++++++++++++++++++++++ internal/server/serverapp.go | 2 + 8 files changed, 732 insertions(+), 38 deletions(-) create mode 100644 internal/protocol/command_test.go create mode 100644 internal/protocol/domain_test.go create mode 100644 internal/protocol/event_test.go diff --git a/go.mod b/go.mod index 2e9d901..7ed2f48 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( golang.design/x/clipboard v0.7.0 golang.org/x/sync v0.13.0 google.golang.org/grpc v1.69.4 - google.golang.org/protobuf v1.36.3 + google.golang.org/protobuf v1.36.6 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 219f718..a5ce7a8 100644 --- a/go.sum +++ b/go.sum @@ -343,8 +343,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= -google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= -google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/internal/protocol/command_test.go b/internal/protocol/command_test.go new file mode 100644 index 0000000..6571084 --- /dev/null +++ b/internal/protocol/command_test.go @@ -0,0 +1,180 @@ +package protocol_test + +import ( + "testing" + + "git.netflux.io/rob/octoplex/internal/event" + pb "git.netflux.io/rob/octoplex/internal/generated/grpc" + "git.netflux.io/rob/octoplex/internal/protocol" + gocmp "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/testing/protocmp" +) + +func TestCommandToProto(t *testing.T) { + testCases := []struct { + name string + in event.Command + want *pb.Command + }{ + { + name: "AddDestination", + in: event.CommandAddDestination{ + DestinationName: "test", + URL: "rtmp://rtmp.example.com", + }, + want: &pb.Command{ + CommandType: &pb.Command_AddDestination{ + AddDestination: &pb.AddDestinationCommand{ + Name: "test", + Url: "rtmp://rtmp.example.com", + }, + }, + }, + }, + { + name: "RemoveDestination", + in: event.CommandRemoveDestination{ + URL: "rtmp://remove.example.com", + }, + want: &pb.Command{ + CommandType: &pb.Command_RemoveDestination{ + RemoveDestination: &pb.RemoveDestinationCommand{ + Url: "rtmp://remove.example.com", + }, + }, + }, + }, + { + name: "StartDestination", + in: event.CommandStartDestination{ + URL: "rtmp://start.example.com", + }, + want: &pb.Command{ + CommandType: &pb.Command_StartDestination{ + StartDestination: &pb.StartDestinationCommand{ + Url: "rtmp://start.example.com", + }, + }, + }, + }, + { + name: "StopDestination", + in: event.CommandStopDestination{ + URL: "rtmp://stop.example.com", + }, + want: &pb.Command{ + CommandType: &pb.Command_StopDestination{ + StopDestination: &pb.StopDestinationCommand{ + Url: "rtmp://stop.example.com", + }, + }, + }, + }, + { + name: "CloseOtherInstance", + in: event.CommandCloseOtherInstance{}, + want: &pb.Command{ + CommandType: &pb.Command_CloseOtherInstances{ + CloseOtherInstances: &pb.CloseOtherInstancesCommand{}, + }, + }, + }, + { + name: "Quit", + in: event.CommandQuit{}, + want: &pb.Command{ + CommandType: &pb.Command_Quit{ + Quit: &pb.QuitCommand{}, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Empty(t, gocmp.Diff(tc.want, protocol.CommandToProto(tc.in), protocmp.Transform())) + }) + } +} + +func TestCommandFromProto(t *testing.T) { + testCases := []struct { + name string + in *pb.Command + want event.Command + }{ + { + name: "AddDestination", + in: &pb.Command{ + CommandType: &pb.Command_AddDestination{ + AddDestination: &pb.AddDestinationCommand{ + Name: "test", + Url: "rtmp://rtmp.example.com", + }, + }, + }, + want: event.CommandAddDestination{ + DestinationName: "test", + URL: "rtmp://rtmp.example.com", + }, + }, + { + name: "RemoveDestination", + in: &pb.Command{ + CommandType: &pb.Command_RemoveDestination{ + RemoveDestination: &pb.RemoveDestinationCommand{ + Url: "rtmp://remove.example.com", + }, + }, + }, + want: event.CommandRemoveDestination{URL: "rtmp://remove.example.com"}, + }, + { + name: "StartDestination", + in: &pb.Command{ + CommandType: &pb.Command_StartDestination{ + StartDestination: &pb.StartDestinationCommand{ + Url: "rtmp://start.example.com", + }, + }, + }, + want: event.CommandStartDestination{URL: "rtmp://start.example.com"}, + }, + { + name: "StopDestination", + in: &pb.Command{ + CommandType: &pb.Command_StopDestination{ + StopDestination: &pb.StopDestinationCommand{ + Url: "rtmp://stop.example.com", + }, + }, + }, + want: event.CommandStopDestination{URL: "rtmp://stop.example.com"}, + }, + { + name: "CloseOtherInstance", + in: &pb.Command{ + CommandType: &pb.Command_CloseOtherInstances{ + CloseOtherInstances: &pb.CloseOtherInstancesCommand{}, + }, + }, + want: event.CommandCloseOtherInstance{}, + }, + { + name: "Quit", + in: &pb.Command{ + CommandType: &pb.Command_Quit{ + Quit: &pb.QuitCommand{}, + }, + }, + want: event.CommandQuit{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Empty(t, gocmp.Diff(tc.want, protocol.CommandFromProto(tc.in))) + }) + } +} diff --git a/internal/protocol/domain.go b/internal/protocol/domain.go index d906df7..d77ac21 100644 --- a/internal/protocol/domain.go +++ b/internal/protocol/domain.go @@ -2,13 +2,14 @@ package protocol import ( "errors" + "time" "git.netflux.io/rob/octoplex/internal/domain" pb "git.netflux.io/rob/octoplex/internal/generated/grpc" "google.golang.org/protobuf/types/known/timestamppb" ) -func containerToProto(c domain.Container) *pb.Container { +func ContainerToProto(c domain.Container) *pb.Container { var errString string if c.Err != nil { errString = c.Err.Error() @@ -20,6 +21,11 @@ func containerToProto(c domain.Container) *pb.Container { exitCode = &code } + var rxSince *timestamppb.Timestamp + if !c.RxSince.IsZero() { + rxSince = timestamppb.New(c.RxSince) + } + return &pb.Container{ Id: c.ID, Status: c.Status, @@ -28,7 +34,7 @@ func containerToProto(c domain.Container) *pb.Container { MemoryUsageBytes: c.MemoryUsageBytes, RxRate: int32(c.RxRate), TxRate: int32(c.TxRate), - RxSince: timestamppb.New(c.RxSince), + RxSince: rxSince, ImageName: c.ImageName, PullStatus: c.PullStatus, PullProgress: c.PullProgress, @@ -38,59 +44,65 @@ func containerToProto(c domain.Container) *pb.Container { Err: errString, } } -func protoToContainer(pbCont *pb.Container) domain.Container { - if pbCont == nil { + +func ContainerFromProto(pbContainer *pb.Container) domain.Container { + if pbContainer == nil { return domain.Container{} } var exitCode *int - if pbCont.ExitCode != nil { - val := int(*pbCont.ExitCode) + if pbContainer.ExitCode != nil { + val := int(*pbContainer.ExitCode) exitCode = &val } var err error - if pbCont.Err != "" { - err = errors.New(pbCont.Err) + if pbContainer.Err != "" { + err = errors.New(pbContainer.Err) + } + + var rxSince time.Time + if pbContainer.RxSince != nil { + rxSince = pbContainer.RxSince.AsTime() } return domain.Container{ - ID: pbCont.Id, - Status: pbCont.Status, - HealthState: pbCont.HealthState, - CPUPercent: pbCont.CpuPercent, - MemoryUsageBytes: pbCont.MemoryUsageBytes, - RxRate: int(pbCont.RxRate), - TxRate: int(pbCont.TxRate), - RxSince: pbCont.RxSince.AsTime(), - ImageName: pbCont.ImageName, - PullStatus: pbCont.PullStatus, - PullProgress: pbCont.PullProgress, - PullPercent: int(pbCont.PullPercent), - RestartCount: int(pbCont.RestartCount), + ID: pbContainer.Id, + Status: pbContainer.Status, + HealthState: pbContainer.HealthState, + CPUPercent: pbContainer.CpuPercent, + MemoryUsageBytes: pbContainer.MemoryUsageBytes, + RxRate: int(pbContainer.RxRate), + TxRate: int(pbContainer.TxRate), + RxSince: rxSince, + ImageName: pbContainer.ImageName, + PullStatus: pbContainer.PullStatus, + PullProgress: pbContainer.PullProgress, + PullPercent: int(pbContainer.PullPercent), + RestartCount: int(pbContainer.RestartCount), ExitCode: exitCode, Err: err, } } -func destinationsToProto(inDests []domain.Destination) []*pb.Destination { +func DestinationsToProto(inDests []domain.Destination) []*pb.Destination { destinations := make([]*pb.Destination, 0, len(inDests)) for _, d := range inDests { - destinations = append(destinations, destinationToProto(d)) + destinations = append(destinations, DestinationToProto(d)) } return destinations } -func destinationToProto(d domain.Destination) *pb.Destination { +func DestinationToProto(d domain.Destination) *pb.Destination { return &pb.Destination{ - Container: containerToProto(d.Container), - Status: destinationStatusToProto(d.Status), + Container: ContainerToProto(d.Container), + Status: DestinationStatusToProto(d.Status), Name: d.Name, Url: d.URL, } } -func protoToDestinations(pbDests []*pb.Destination) []domain.Destination { +func ProtoToDestinations(pbDests []*pb.Destination) []domain.Destination { if pbDests == nil { return nil } @@ -101,7 +113,7 @@ func protoToDestinations(pbDests []*pb.Destination) []domain.Destination { continue } dests = append(dests, domain.Destination{ - Container: protoToContainer(pbDest.Container), + Container: ContainerFromProto(pbDest.Container), Status: domain.DestinationStatus(pbDest.Status), // direct cast, same underlying int Name: pbDest.Name, URL: pbDest.Url, @@ -109,7 +121,7 @@ func protoToDestinations(pbDests []*pb.Destination) []domain.Destination { } return dests } -func destinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status { +func DestinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status { switch s { case domain.DestinationStatusStarting: return pb.Destination_STATUS_STARTING diff --git a/internal/protocol/domain_test.go b/internal/protocol/domain_test.go new file mode 100644 index 0000000..b0365fd --- /dev/null +++ b/internal/protocol/domain_test.go @@ -0,0 +1,222 @@ +package protocol_test + +import ( + "errors" + "testing" + "time" + + "git.netflux.io/rob/octoplex/internal/domain" + pb "git.netflux.io/rob/octoplex/internal/generated/grpc" + "git.netflux.io/rob/octoplex/internal/protocol" + gocmp "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestContainerToProto(t *testing.T) { + exitCode := 1 + ts := time.Unix(1234567890, 0) + + testCases := []struct { + name string + in domain.Container + want *pb.Container + }{ + { + name: "complete", + in: domain.Container{ + ID: "abc123", + Status: "running", + HealthState: "healthy", + CPUPercent: 12.5, + MemoryUsageBytes: 2048, + RxRate: 100, + TxRate: 200, + RxSince: ts, + ImageName: "nginx", + PullStatus: "pulling", + PullProgress: "50%", + PullPercent: 50, + RestartCount: 3, + ExitCode: &exitCode, + Err: errors.New("container error"), + }, + want: &pb.Container{ + Id: "abc123", + Status: "running", + HealthState: "healthy", + CpuPercent: 12.5, + MemoryUsageBytes: 2048, + RxRate: 100, + TxRate: 200, + RxSince: timestamppb.New(ts), + ImageName: "nginx", + PullStatus: "pulling", + PullProgress: "50%", + PullPercent: 50, + RestartCount: 3, + ExitCode: protoInt32(1), + Err: "container error", + }, + }, + { + name: "zero values", + in: domain.Container{}, + want: &pb.Container{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Empty(t, gocmp.Diff(tc.want, protocol.ContainerToProto(tc.in), protocmp.Transform())) + }) + } +} + +func TestContainerFromProto(t *testing.T) { + ts := timestamppb.New(time.Now()) + exitCode := int32(2) + + testCases := []struct { + name string + in *pb.Container + want domain.Container + }{ + { + name: "complete", + in: &pb.Container{ + Id: "xyz789", + Status: "exited", + HealthState: "unhealthy", + CpuPercent: 42.0, + MemoryUsageBytes: 4096, + RxRate: 300, + TxRate: 400, + RxSince: ts, + ImageName: "redis", + PullStatus: "complete", + PullProgress: "100%", + PullPercent: 100, + RestartCount: 1, + ExitCode: &exitCode, + Err: "crash error", + }, + want: domain.Container{ + ID: "xyz789", + Status: "exited", + HealthState: "unhealthy", + CPUPercent: 42.0, + MemoryUsageBytes: 4096, + RxRate: 300, + TxRate: 400, + RxSince: ts.AsTime(), + ImageName: "redis", + PullStatus: "complete", + PullProgress: "100%", + PullPercent: 100, + RestartCount: 1, + ExitCode: protoInt(2), + Err: errors.New("crash error"), + }, + }, + { + name: "nil proto", + in: nil, + want: domain.Container{}, + }, + { + name: "zero values", + in: &pb.Container{}, + want: domain.Container{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := protocol.ContainerFromProto(tc.in) + assert.Empty( + t, + gocmp.Diff( + tc.want, + got, + gocmp.Comparer(compareErrorMessages), + )) + }) + } +} + +func TestDestinationConversions(t *testing.T) { + testCases := []struct { + name string + in domain.Destination + want *pb.Destination + }{ + { + name: "basic destination", + in: domain.Destination{ + Name: "dest1", + URL: "rtmp://dest1", + Status: domain.DestinationStatusLive, + Container: domain.Container{ID: "c1"}, + }, + want: &pb.Destination{ + Name: "dest1", + Url: "rtmp://dest1", + Status: pb.Destination_STATUS_LIVE, + Container: &pb.Container{Id: "c1"}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + proto := protocol.DestinationToProto(tc.in) + assert.Equal(t, tc.want.Name, proto.Name) + assert.Equal(t, tc.want.Url, proto.Url) + assert.Equal(t, tc.want.Status, proto.Status) + require.NotNil(t, proto.Container) + assert.Equal(t, tc.want.Container.Id, proto.Container.Id) + + dests := protocol.ProtoToDestinations([]*pb.Destination{proto}) + assert.Len(t, dests, 1) + assert.Equal(t, tc.in.Name, dests[0].Name) + assert.Equal(t, tc.in.URL, dests[0].URL) + assert.Equal(t, tc.in.Status, dests[0].Status) + assert.Equal(t, tc.in.Container.ID, dests[0].Container.ID) + }) + } +} + +func TestDestinationStatusToProto(t *testing.T) { + testCases := []struct { + name string + in domain.DestinationStatus + want pb.Destination_Status + }{ + {"Starting", domain.DestinationStatusStarting, pb.Destination_STATUS_STARTING}, + {"Live", domain.DestinationStatusLive, pb.Destination_STATUS_LIVE}, + {"Off-air", domain.DestinationStatusOffAir, pb.Destination_STATUS_OFF_AIR}, + {"Unknown", domain.DestinationStatus(999), pb.Destination_STATUS_OFF_AIR}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, protocol.DestinationStatusToProto(tc.in)) + }) + } +} + +func protoInt32(v int32) *int32 { return &v } +func protoInt(v int) *int { return &v } + +// compareErrorMessages compares two error messages for equality using only the +// error message string. +func compareErrorMessages(x, y error) bool { + if x == nil || y == nil { + return x == y + } + + return x.Error() == y.Error() +} diff --git a/internal/protocol/event.go b/internal/protocol/event.go index 8be3ccd..0d57c78 100644 --- a/internal/protocol/event.go +++ b/internal/protocol/event.go @@ -2,6 +2,7 @@ package protocol import ( "errors" + "time" "git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/event" @@ -38,18 +39,23 @@ func EventToProto(ev event.Event) *pb.Event { } func buildAppStateChangeEvent(evt event.AppStateChangedEvent) *pb.Event { + var liveChangedAt *timestamppb.Timestamp + if !evt.State.Source.LiveChangedAt.IsZero() { + liveChangedAt = timestamppb.New(evt.State.Source.LiveChangedAt) + } + return &pb.Event{ EventType: &pb.Event_AppStateChanged{ AppStateChanged: &pb.AppStateChangedEvent{ AppState: &pb.AppState{ Source: &pb.Source{ - Container: containerToProto(evt.State.Source.Container), + Container: ContainerToProto(evt.State.Source.Container), Live: evt.State.Source.Live, - LiveChangedAt: timestamppb.New(evt.State.Source.LiveChangedAt), + LiveChangedAt: liveChangedAt, Tracks: evt.State.Source.Tracks, ExitReason: evt.State.Source.ExitReason, }, - Destinations: destinationsToProto(evt.State.Destinations), + Destinations: DestinationsToProto(evt.State.Destinations), BuildInfo: &pb.BuildInfo{ GoVersion: evt.State.BuildInfo.GoVersion, Version: evt.State.BuildInfo.Version, @@ -171,16 +177,21 @@ func parseAppStateChangedEvent(evt *pb.AppStateChangedEvent) event.Event { panic("invalid AppStateChangedEvent") } + var liveChangedAt time.Time + if evt.AppState.Source.LiveChangedAt != nil { + liveChangedAt = evt.AppState.Source.LiveChangedAt.AsTime() + } + return event.AppStateChangedEvent{ State: domain.AppState{ Source: domain.Source{ - Container: protoToContainer(evt.AppState.Source.Container), + Container: ContainerFromProto(evt.AppState.Source.Container), Live: evt.AppState.Source.Live, - LiveChangedAt: evt.AppState.Source.LiveChangedAt.AsTime(), + LiveChangedAt: liveChangedAt, Tracks: evt.AppState.Source.Tracks, ExitReason: evt.AppState.Source.ExitReason, }, - Destinations: protoToDestinations(evt.AppState.Destinations), + Destinations: ProtoToDestinations(evt.AppState.Destinations), BuildInfo: domain.BuildInfo{ GoVersion: evt.AppState.BuildInfo.GoVersion, Version: evt.AppState.BuildInfo.Version, diff --git a/internal/protocol/event_test.go b/internal/protocol/event_test.go new file mode 100644 index 0000000..11f9994 --- /dev/null +++ b/internal/protocol/event_test.go @@ -0,0 +1,267 @@ +package protocol_test + +import ( + "errors" + "testing" + + "git.netflux.io/rob/octoplex/internal/domain" + "git.netflux.io/rob/octoplex/internal/event" + pb "git.netflux.io/rob/octoplex/internal/generated/grpc" + "git.netflux.io/rob/octoplex/internal/protocol" + "github.com/google/go-cmp/cmp" + gocmp "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/testing/protocmp" +) + +func TestEventToProto(t *testing.T) { + testCases := []struct { + name string + in event.Event + want *pb.Event + }{ + { + name: "AppStateChanged", + in: event.AppStateChangedEvent{ + State: domain.AppState{ + Source: domain.Source{ + Container: domain.Container{ + ID: "abc123", + }, + Live: true, + }, + Destinations: []domain.Destination{ + { + Name: "dest1", + URL: "rtmp://dest1.example.com", + Container: domain.Container{ + ID: "bcd456", + }, + }, + }, + BuildInfo: domain.BuildInfo{GoVersion: "go1.16", Version: "v1.0.0"}, + }, + }, + want: &pb.Event{ + EventType: &pb.Event_AppStateChanged{ + AppStateChanged: &pb.AppStateChangedEvent{ + AppState: &pb.AppState{ + Source: &pb.Source{ + Container: &pb.Container{ + Id: "abc123", + }, + Live: true, + }, + Destinations: []*pb.Destination{ + { + Name: "dest1", + Url: "rtmp://dest1.example.com", + Container: &pb.Container{ + Id: "bcd456", + }, + }, + }, + BuildInfo: &pb.BuildInfo{GoVersion: "go1.16", Version: "v1.0.0"}, + }, + }, + }, + }, + }, + { + name: "DestinationAdded", + in: event.DestinationAddedEvent{URL: "rtmp://dest.example.com"}, + want: &pb.Event{ + EventType: &pb.Event_DestinationAdded{ + DestinationAdded: &pb.DestinationAddedEvent{ + Url: "rtmp://dest.example.com", + }, + }, + }, + }, + { + name: "AddDestinationFailed", + in: event.AddDestinationFailedEvent{URL: "rtmp://fail.example.com", Err: errors.New("failed")}, + want: &pb.Event{ + EventType: &pb.Event_AddDestinationFailed{ + AddDestinationFailed: &pb.AddDestinationFailedEvent{ + Url: "rtmp://fail.example.com", + Error: "failed", + }, + }, + }, + }, + { + name: "DestinationStreamExited", + in: event.DestinationStreamExitedEvent{Name: "stream1", Err: errors.New("exit reason")}, + want: &pb.Event{ + EventType: &pb.Event_DestinationStreamExited{ + DestinationStreamExited: &pb.DestinationStreamExitedEvent{ + Name: "stream1", + Error: "exit reason", + }, + }, + }, + }, + { + name: "FatalErrorOccurred", + in: event.FatalErrorOccurredEvent{Message: "fatal error"}, + want: &pb.Event{ + EventType: &pb.Event_FatalError{ + FatalError: &pb.FatalErrorEvent{Message: "fatal error"}, + }, + }, + }, + { + name: "OtherInstanceDetected", + in: event.OtherInstanceDetectedEvent{}, + want: &pb.Event{ + EventType: &pb.Event_OtherInstanceDetected{ + OtherInstanceDetected: &pb.OtherInstanceDetectedEvent{}, + }, + }, + }, + { + name: "MediaServerStarted", + in: event.MediaServerStartedEvent{RTMPURL: "rtmp://media", RTMPSURL: "rtmps://media"}, + want: &pb.Event{ + EventType: &pb.Event_MediaServerStarted{ + MediaServerStarted: &pb.MediaServerStartedEvent{ + RtmpUrl: "rtmp://media", + RtmpsUrl: "rtmps://media", + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Empty(t, gocmp.Diff(tc.want, protocol.EventToProto(tc.in), protocmp.Transform())) + }) + } +} + +func TestEventFromProto(t *testing.T) { + testCases := []struct { + name string + in *pb.Event + want event.Event + }{ + { + name: "AppStateChanged", + in: &pb.Event{ + EventType: &pb.Event_AppStateChanged{ + AppStateChanged: &pb.AppStateChangedEvent{ + AppState: &pb.AppState{ + Source: &pb.Source{ + Container: &pb.Container{Id: "abc123"}, + Live: true, + }, + Destinations: []*pb.Destination{ + { + Name: "dest1", + Url: "rtmp://dest1.example.com", + Container: &pb.Container{Id: "bcd456"}, + }, + }, + BuildInfo: &pb.BuildInfo{ + GoVersion: "go1.16", + Version: "v1.0.0", + }, + }, + }, + }, + }, + want: event.AppStateChangedEvent{ + State: domain.AppState{ + Source: domain.Source{ + Container: domain.Container{ID: "abc123"}, + Live: true, + }, + Destinations: []domain.Destination{ + { + Name: "dest1", + URL: "rtmp://dest1.example.com", + Container: domain.Container{ID: "bcd456"}, + }, + }, + BuildInfo: domain.BuildInfo{ + GoVersion: "go1.16", + Version: "v1.0.0", + }, + }, + }, + }, + { + name: "DestinationAdded", + in: &pb.Event{ + EventType: &pb.Event_DestinationAdded{ + DestinationAdded: &pb.DestinationAddedEvent{ + Url: "rtmp://dest.example.com", + }, + }, + }, + want: event.DestinationAddedEvent{URL: "rtmp://dest.example.com"}, + }, + { + name: "AddDestinationFailed", + in: &pb.Event{ + EventType: &pb.Event_AddDestinationFailed{ + AddDestinationFailed: &pb.AddDestinationFailedEvent{ + Url: "rtmp://fail.example.com", + Error: "failed", + }, + }, + }, + want: event.AddDestinationFailedEvent{URL: "rtmp://fail.example.com", Err: errors.New("failed")}, + }, + { + name: "DestinationStreamExited", + in: &pb.Event{ + EventType: &pb.Event_DestinationStreamExited{ + DestinationStreamExited: &pb.DestinationStreamExitedEvent{ + Name: "stream1", + Error: "exit reason", + }, + }, + }, + want: event.DestinationStreamExitedEvent{Name: "stream1", Err: errors.New("exit reason")}, + }, + { + name: "FatalErrorOccurred", + in: &pb.Event{ + EventType: &pb.Event_FatalError{ + FatalError: &pb.FatalErrorEvent{Message: "fatal error"}, + }, + }, + want: event.FatalErrorOccurredEvent{Message: "fatal error"}, + }, + { + name: "OtherInstanceDetected", + in: &pb.Event{ + EventType: &pb.Event_OtherInstanceDetected{ + OtherInstanceDetected: &pb.OtherInstanceDetectedEvent{}, + }, + }, + want: event.OtherInstanceDetectedEvent{}, + }, + { + name: "MediaServerStarted", + in: &pb.Event{ + EventType: &pb.Event_MediaServerStarted{ + MediaServerStarted: &pb.MediaServerStartedEvent{ + RtmpUrl: "rtmp://media", + RtmpsUrl: "rtmps://media", + }, + }, + }, + want: event.MediaServerStartedEvent{RTMPURL: "rtmp://media", RTMPSURL: "rtmps://media"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.Empty(t, cmp.Diff(tc.want, protocol.EventFromProto(tc.in), gocmp.Comparer(compareErrorMessages))) + }) + } +} diff --git a/internal/server/serverapp.go b/internal/server/serverapp.go index 6d37e84..2cf4be7 100644 --- a/internal/server/serverapp.go +++ b/internal/server/serverapp.go @@ -275,6 +275,8 @@ func (a *App) handleCommand( } }() + // If the command is a syncCommand, we need to extract the command from it so + // it can be type-switched against. if c, ok := cmd.(syncCommand); ok { cmd = c.Command }