fixup! wip: refactor: API
This commit is contained in:
parent
2038384290
commit
492f44dc0b
2
go.mod
2
go.mod
@ -15,7 +15,7 @@ require (
|
||||
golang.design/x/clipboard v0.7.0
|
||||
golang.org/x/sync v0.13.0
|
||||
google.golang.org/grpc v1.69.4
|
||||
google.golang.org/protobuf v1.36.3
|
||||
google.golang.org/protobuf v1.36.6
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
|
4
go.sum
4
go.sum
@ -343,8 +343,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50=
|
||||
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
|
||||
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
|
||||
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
|
||||
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
||||
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
|
180
internal/protocol/command_test.go
Normal file
180
internal/protocol/command_test.go
Normal file
@ -0,0 +1,180 @@
|
||||
package protocol_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.netflux.io/rob/octoplex/internal/event"
|
||||
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
|
||||
"git.netflux.io/rob/octoplex/internal/protocol"
|
||||
gocmp "github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/protobuf/testing/protocmp"
|
||||
)
|
||||
|
||||
func TestCommandToProto(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
in event.Command
|
||||
want *pb.Command
|
||||
}{
|
||||
{
|
||||
name: "AddDestination",
|
||||
in: event.CommandAddDestination{
|
||||
DestinationName: "test",
|
||||
URL: "rtmp://rtmp.example.com",
|
||||
},
|
||||
want: &pb.Command{
|
||||
CommandType: &pb.Command_AddDestination{
|
||||
AddDestination: &pb.AddDestinationCommand{
|
||||
Name: "test",
|
||||
Url: "rtmp://rtmp.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "RemoveDestination",
|
||||
in: event.CommandRemoveDestination{
|
||||
URL: "rtmp://remove.example.com",
|
||||
},
|
||||
want: &pb.Command{
|
||||
CommandType: &pb.Command_RemoveDestination{
|
||||
RemoveDestination: &pb.RemoveDestinationCommand{
|
||||
Url: "rtmp://remove.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "StartDestination",
|
||||
in: event.CommandStartDestination{
|
||||
URL: "rtmp://start.example.com",
|
||||
},
|
||||
want: &pb.Command{
|
||||
CommandType: &pb.Command_StartDestination{
|
||||
StartDestination: &pb.StartDestinationCommand{
|
||||
Url: "rtmp://start.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "StopDestination",
|
||||
in: event.CommandStopDestination{
|
||||
URL: "rtmp://stop.example.com",
|
||||
},
|
||||
want: &pb.Command{
|
||||
CommandType: &pb.Command_StopDestination{
|
||||
StopDestination: &pb.StopDestinationCommand{
|
||||
Url: "rtmp://stop.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "CloseOtherInstance",
|
||||
in: event.CommandCloseOtherInstance{},
|
||||
want: &pb.Command{
|
||||
CommandType: &pb.Command_CloseOtherInstances{
|
||||
CloseOtherInstances: &pb.CloseOtherInstancesCommand{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Quit",
|
||||
in: event.CommandQuit{},
|
||||
want: &pb.Command{
|
||||
CommandType: &pb.Command_Quit{
|
||||
Quit: &pb.QuitCommand{},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
assert.Empty(t, gocmp.Diff(tc.want, protocol.CommandToProto(tc.in), protocmp.Transform()))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCommandFromProto(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
in *pb.Command
|
||||
want event.Command
|
||||
}{
|
||||
{
|
||||
name: "AddDestination",
|
||||
in: &pb.Command{
|
||||
CommandType: &pb.Command_AddDestination{
|
||||
AddDestination: &pb.AddDestinationCommand{
|
||||
Name: "test",
|
||||
Url: "rtmp://rtmp.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
want: event.CommandAddDestination{
|
||||
DestinationName: "test",
|
||||
URL: "rtmp://rtmp.example.com",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "RemoveDestination",
|
||||
in: &pb.Command{
|
||||
CommandType: &pb.Command_RemoveDestination{
|
||||
RemoveDestination: &pb.RemoveDestinationCommand{
|
||||
Url: "rtmp://remove.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
want: event.CommandRemoveDestination{URL: "rtmp://remove.example.com"},
|
||||
},
|
||||
{
|
||||
name: "StartDestination",
|
||||
in: &pb.Command{
|
||||
CommandType: &pb.Command_StartDestination{
|
||||
StartDestination: &pb.StartDestinationCommand{
|
||||
Url: "rtmp://start.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
want: event.CommandStartDestination{URL: "rtmp://start.example.com"},
|
||||
},
|
||||
{
|
||||
name: "StopDestination",
|
||||
in: &pb.Command{
|
||||
CommandType: &pb.Command_StopDestination{
|
||||
StopDestination: &pb.StopDestinationCommand{
|
||||
Url: "rtmp://stop.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
want: event.CommandStopDestination{URL: "rtmp://stop.example.com"},
|
||||
},
|
||||
{
|
||||
name: "CloseOtherInstance",
|
||||
in: &pb.Command{
|
||||
CommandType: &pb.Command_CloseOtherInstances{
|
||||
CloseOtherInstances: &pb.CloseOtherInstancesCommand{},
|
||||
},
|
||||
},
|
||||
want: event.CommandCloseOtherInstance{},
|
||||
},
|
||||
{
|
||||
name: "Quit",
|
||||
in: &pb.Command{
|
||||
CommandType: &pb.Command_Quit{
|
||||
Quit: &pb.QuitCommand{},
|
||||
},
|
||||
},
|
||||
want: event.CommandQuit{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
assert.Empty(t, gocmp.Diff(tc.want, protocol.CommandFromProto(tc.in)))
|
||||
})
|
||||
}
|
||||
}
|
@ -2,13 +2,14 @@ package protocol
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.netflux.io/rob/octoplex/internal/domain"
|
||||
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func containerToProto(c domain.Container) *pb.Container {
|
||||
func ContainerToProto(c domain.Container) *pb.Container {
|
||||
var errString string
|
||||
if c.Err != nil {
|
||||
errString = c.Err.Error()
|
||||
@ -20,6 +21,11 @@ func containerToProto(c domain.Container) *pb.Container {
|
||||
exitCode = &code
|
||||
}
|
||||
|
||||
var rxSince *timestamppb.Timestamp
|
||||
if !c.RxSince.IsZero() {
|
||||
rxSince = timestamppb.New(c.RxSince)
|
||||
}
|
||||
|
||||
return &pb.Container{
|
||||
Id: c.ID,
|
||||
Status: c.Status,
|
||||
@ -28,7 +34,7 @@ func containerToProto(c domain.Container) *pb.Container {
|
||||
MemoryUsageBytes: c.MemoryUsageBytes,
|
||||
RxRate: int32(c.RxRate),
|
||||
TxRate: int32(c.TxRate),
|
||||
RxSince: timestamppb.New(c.RxSince),
|
||||
RxSince: rxSince,
|
||||
ImageName: c.ImageName,
|
||||
PullStatus: c.PullStatus,
|
||||
PullProgress: c.PullProgress,
|
||||
@ -38,59 +44,65 @@ func containerToProto(c domain.Container) *pb.Container {
|
||||
Err: errString,
|
||||
}
|
||||
}
|
||||
func protoToContainer(pbCont *pb.Container) domain.Container {
|
||||
if pbCont == nil {
|
||||
|
||||
func ContainerFromProto(pbContainer *pb.Container) domain.Container {
|
||||
if pbContainer == nil {
|
||||
return domain.Container{}
|
||||
}
|
||||
|
||||
var exitCode *int
|
||||
if pbCont.ExitCode != nil {
|
||||
val := int(*pbCont.ExitCode)
|
||||
if pbContainer.ExitCode != nil {
|
||||
val := int(*pbContainer.ExitCode)
|
||||
exitCode = &val
|
||||
}
|
||||
|
||||
var err error
|
||||
if pbCont.Err != "" {
|
||||
err = errors.New(pbCont.Err)
|
||||
if pbContainer.Err != "" {
|
||||
err = errors.New(pbContainer.Err)
|
||||
}
|
||||
|
||||
var rxSince time.Time
|
||||
if pbContainer.RxSince != nil {
|
||||
rxSince = pbContainer.RxSince.AsTime()
|
||||
}
|
||||
|
||||
return domain.Container{
|
||||
ID: pbCont.Id,
|
||||
Status: pbCont.Status,
|
||||
HealthState: pbCont.HealthState,
|
||||
CPUPercent: pbCont.CpuPercent,
|
||||
MemoryUsageBytes: pbCont.MemoryUsageBytes,
|
||||
RxRate: int(pbCont.RxRate),
|
||||
TxRate: int(pbCont.TxRate),
|
||||
RxSince: pbCont.RxSince.AsTime(),
|
||||
ImageName: pbCont.ImageName,
|
||||
PullStatus: pbCont.PullStatus,
|
||||
PullProgress: pbCont.PullProgress,
|
||||
PullPercent: int(pbCont.PullPercent),
|
||||
RestartCount: int(pbCont.RestartCount),
|
||||
ID: pbContainer.Id,
|
||||
Status: pbContainer.Status,
|
||||
HealthState: pbContainer.HealthState,
|
||||
CPUPercent: pbContainer.CpuPercent,
|
||||
MemoryUsageBytes: pbContainer.MemoryUsageBytes,
|
||||
RxRate: int(pbContainer.RxRate),
|
||||
TxRate: int(pbContainer.TxRate),
|
||||
RxSince: rxSince,
|
||||
ImageName: pbContainer.ImageName,
|
||||
PullStatus: pbContainer.PullStatus,
|
||||
PullProgress: pbContainer.PullProgress,
|
||||
PullPercent: int(pbContainer.PullPercent),
|
||||
RestartCount: int(pbContainer.RestartCount),
|
||||
ExitCode: exitCode,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
|
||||
func destinationsToProto(inDests []domain.Destination) []*pb.Destination {
|
||||
func DestinationsToProto(inDests []domain.Destination) []*pb.Destination {
|
||||
destinations := make([]*pb.Destination, 0, len(inDests))
|
||||
for _, d := range inDests {
|
||||
destinations = append(destinations, destinationToProto(d))
|
||||
destinations = append(destinations, DestinationToProto(d))
|
||||
}
|
||||
return destinations
|
||||
}
|
||||
|
||||
func destinationToProto(d domain.Destination) *pb.Destination {
|
||||
func DestinationToProto(d domain.Destination) *pb.Destination {
|
||||
return &pb.Destination{
|
||||
Container: containerToProto(d.Container),
|
||||
Status: destinationStatusToProto(d.Status),
|
||||
Container: ContainerToProto(d.Container),
|
||||
Status: DestinationStatusToProto(d.Status),
|
||||
Name: d.Name,
|
||||
Url: d.URL,
|
||||
}
|
||||
}
|
||||
|
||||
func protoToDestinations(pbDests []*pb.Destination) []domain.Destination {
|
||||
func ProtoToDestinations(pbDests []*pb.Destination) []domain.Destination {
|
||||
if pbDests == nil {
|
||||
return nil
|
||||
}
|
||||
@ -101,7 +113,7 @@ func protoToDestinations(pbDests []*pb.Destination) []domain.Destination {
|
||||
continue
|
||||
}
|
||||
dests = append(dests, domain.Destination{
|
||||
Container: protoToContainer(pbDest.Container),
|
||||
Container: ContainerFromProto(pbDest.Container),
|
||||
Status: domain.DestinationStatus(pbDest.Status), // direct cast, same underlying int
|
||||
Name: pbDest.Name,
|
||||
URL: pbDest.Url,
|
||||
@ -109,7 +121,7 @@ func protoToDestinations(pbDests []*pb.Destination) []domain.Destination {
|
||||
}
|
||||
return dests
|
||||
}
|
||||
func destinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status {
|
||||
func DestinationStatusToProto(s domain.DestinationStatus) pb.Destination_Status {
|
||||
switch s {
|
||||
case domain.DestinationStatusStarting:
|
||||
return pb.Destination_STATUS_STARTING
|
||||
|
222
internal/protocol/domain_test.go
Normal file
222
internal/protocol/domain_test.go
Normal file
@ -0,0 +1,222 @@
|
||||
package protocol_test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.netflux.io/rob/octoplex/internal/domain"
|
||||
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
|
||||
"git.netflux.io/rob/octoplex/internal/protocol"
|
||||
gocmp "github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/protobuf/testing/protocmp"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func TestContainerToProto(t *testing.T) {
|
||||
exitCode := 1
|
||||
ts := time.Unix(1234567890, 0)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
in domain.Container
|
||||
want *pb.Container
|
||||
}{
|
||||
{
|
||||
name: "complete",
|
||||
in: domain.Container{
|
||||
ID: "abc123",
|
||||
Status: "running",
|
||||
HealthState: "healthy",
|
||||
CPUPercent: 12.5,
|
||||
MemoryUsageBytes: 2048,
|
||||
RxRate: 100,
|
||||
TxRate: 200,
|
||||
RxSince: ts,
|
||||
ImageName: "nginx",
|
||||
PullStatus: "pulling",
|
||||
PullProgress: "50%",
|
||||
PullPercent: 50,
|
||||
RestartCount: 3,
|
||||
ExitCode: &exitCode,
|
||||
Err: errors.New("container error"),
|
||||
},
|
||||
want: &pb.Container{
|
||||
Id: "abc123",
|
||||
Status: "running",
|
||||
HealthState: "healthy",
|
||||
CpuPercent: 12.5,
|
||||
MemoryUsageBytes: 2048,
|
||||
RxRate: 100,
|
||||
TxRate: 200,
|
||||
RxSince: timestamppb.New(ts),
|
||||
ImageName: "nginx",
|
||||
PullStatus: "pulling",
|
||||
PullProgress: "50%",
|
||||
PullPercent: 50,
|
||||
RestartCount: 3,
|
||||
ExitCode: protoInt32(1),
|
||||
Err: "container error",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "zero values",
|
||||
in: domain.Container{},
|
||||
want: &pb.Container{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
assert.Empty(t, gocmp.Diff(tc.want, protocol.ContainerToProto(tc.in), protocmp.Transform()))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestContainerFromProto(t *testing.T) {
|
||||
ts := timestamppb.New(time.Now())
|
||||
exitCode := int32(2)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
in *pb.Container
|
||||
want domain.Container
|
||||
}{
|
||||
{
|
||||
name: "complete",
|
||||
in: &pb.Container{
|
||||
Id: "xyz789",
|
||||
Status: "exited",
|
||||
HealthState: "unhealthy",
|
||||
CpuPercent: 42.0,
|
||||
MemoryUsageBytes: 4096,
|
||||
RxRate: 300,
|
||||
TxRate: 400,
|
||||
RxSince: ts,
|
||||
ImageName: "redis",
|
||||
PullStatus: "complete",
|
||||
PullProgress: "100%",
|
||||
PullPercent: 100,
|
||||
RestartCount: 1,
|
||||
ExitCode: &exitCode,
|
||||
Err: "crash error",
|
||||
},
|
||||
want: domain.Container{
|
||||
ID: "xyz789",
|
||||
Status: "exited",
|
||||
HealthState: "unhealthy",
|
||||
CPUPercent: 42.0,
|
||||
MemoryUsageBytes: 4096,
|
||||
RxRate: 300,
|
||||
TxRate: 400,
|
||||
RxSince: ts.AsTime(),
|
||||
ImageName: "redis",
|
||||
PullStatus: "complete",
|
||||
PullProgress: "100%",
|
||||
PullPercent: 100,
|
||||
RestartCount: 1,
|
||||
ExitCode: protoInt(2),
|
||||
Err: errors.New("crash error"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "nil proto",
|
||||
in: nil,
|
||||
want: domain.Container{},
|
||||
},
|
||||
{
|
||||
name: "zero values",
|
||||
in: &pb.Container{},
|
||||
want: domain.Container{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := protocol.ContainerFromProto(tc.in)
|
||||
assert.Empty(
|
||||
t,
|
||||
gocmp.Diff(
|
||||
tc.want,
|
||||
got,
|
||||
gocmp.Comparer(compareErrorMessages),
|
||||
))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDestinationConversions(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
in domain.Destination
|
||||
want *pb.Destination
|
||||
}{
|
||||
{
|
||||
name: "basic destination",
|
||||
in: domain.Destination{
|
||||
Name: "dest1",
|
||||
URL: "rtmp://dest1",
|
||||
Status: domain.DestinationStatusLive,
|
||||
Container: domain.Container{ID: "c1"},
|
||||
},
|
||||
want: &pb.Destination{
|
||||
Name: "dest1",
|
||||
Url: "rtmp://dest1",
|
||||
Status: pb.Destination_STATUS_LIVE,
|
||||
Container: &pb.Container{Id: "c1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
proto := protocol.DestinationToProto(tc.in)
|
||||
assert.Equal(t, tc.want.Name, proto.Name)
|
||||
assert.Equal(t, tc.want.Url, proto.Url)
|
||||
assert.Equal(t, tc.want.Status, proto.Status)
|
||||
require.NotNil(t, proto.Container)
|
||||
assert.Equal(t, tc.want.Container.Id, proto.Container.Id)
|
||||
|
||||
dests := protocol.ProtoToDestinations([]*pb.Destination{proto})
|
||||
assert.Len(t, dests, 1)
|
||||
assert.Equal(t, tc.in.Name, dests[0].Name)
|
||||
assert.Equal(t, tc.in.URL, dests[0].URL)
|
||||
assert.Equal(t, tc.in.Status, dests[0].Status)
|
||||
assert.Equal(t, tc.in.Container.ID, dests[0].Container.ID)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDestinationStatusToProto(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
in domain.DestinationStatus
|
||||
want pb.Destination_Status
|
||||
}{
|
||||
{"Starting", domain.DestinationStatusStarting, pb.Destination_STATUS_STARTING},
|
||||
{"Live", domain.DestinationStatusLive, pb.Destination_STATUS_LIVE},
|
||||
{"Off-air", domain.DestinationStatusOffAir, pb.Destination_STATUS_OFF_AIR},
|
||||
{"Unknown", domain.DestinationStatus(999), pb.Destination_STATUS_OFF_AIR},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
assert.Equal(t, tc.want, protocol.DestinationStatusToProto(tc.in))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func protoInt32(v int32) *int32 { return &v }
|
||||
func protoInt(v int) *int { return &v }
|
||||
|
||||
// compareErrorMessages compares two error messages for equality using only the
|
||||
// error message string.
|
||||
func compareErrorMessages(x, y error) bool {
|
||||
if x == nil || y == nil {
|
||||
return x == y
|
||||
}
|
||||
|
||||
return x.Error() == y.Error()
|
||||
}
|
@ -2,6 +2,7 @@ package protocol
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.netflux.io/rob/octoplex/internal/domain"
|
||||
"git.netflux.io/rob/octoplex/internal/event"
|
||||
@ -38,18 +39,23 @@ func EventToProto(ev event.Event) *pb.Event {
|
||||
}
|
||||
|
||||
func buildAppStateChangeEvent(evt event.AppStateChangedEvent) *pb.Event {
|
||||
var liveChangedAt *timestamppb.Timestamp
|
||||
if !evt.State.Source.LiveChangedAt.IsZero() {
|
||||
liveChangedAt = timestamppb.New(evt.State.Source.LiveChangedAt)
|
||||
}
|
||||
|
||||
return &pb.Event{
|
||||
EventType: &pb.Event_AppStateChanged{
|
||||
AppStateChanged: &pb.AppStateChangedEvent{
|
||||
AppState: &pb.AppState{
|
||||
Source: &pb.Source{
|
||||
Container: containerToProto(evt.State.Source.Container),
|
||||
Container: ContainerToProto(evt.State.Source.Container),
|
||||
Live: evt.State.Source.Live,
|
||||
LiveChangedAt: timestamppb.New(evt.State.Source.LiveChangedAt),
|
||||
LiveChangedAt: liveChangedAt,
|
||||
Tracks: evt.State.Source.Tracks,
|
||||
ExitReason: evt.State.Source.ExitReason,
|
||||
},
|
||||
Destinations: destinationsToProto(evt.State.Destinations),
|
||||
Destinations: DestinationsToProto(evt.State.Destinations),
|
||||
BuildInfo: &pb.BuildInfo{
|
||||
GoVersion: evt.State.BuildInfo.GoVersion,
|
||||
Version: evt.State.BuildInfo.Version,
|
||||
@ -171,16 +177,21 @@ func parseAppStateChangedEvent(evt *pb.AppStateChangedEvent) event.Event {
|
||||
panic("invalid AppStateChangedEvent")
|
||||
}
|
||||
|
||||
var liveChangedAt time.Time
|
||||
if evt.AppState.Source.LiveChangedAt != nil {
|
||||
liveChangedAt = evt.AppState.Source.LiveChangedAt.AsTime()
|
||||
}
|
||||
|
||||
return event.AppStateChangedEvent{
|
||||
State: domain.AppState{
|
||||
Source: domain.Source{
|
||||
Container: protoToContainer(evt.AppState.Source.Container),
|
||||
Container: ContainerFromProto(evt.AppState.Source.Container),
|
||||
Live: evt.AppState.Source.Live,
|
||||
LiveChangedAt: evt.AppState.Source.LiveChangedAt.AsTime(),
|
||||
LiveChangedAt: liveChangedAt,
|
||||
Tracks: evt.AppState.Source.Tracks,
|
||||
ExitReason: evt.AppState.Source.ExitReason,
|
||||
},
|
||||
Destinations: protoToDestinations(evt.AppState.Destinations),
|
||||
Destinations: ProtoToDestinations(evt.AppState.Destinations),
|
||||
BuildInfo: domain.BuildInfo{
|
||||
GoVersion: evt.AppState.BuildInfo.GoVersion,
|
||||
Version: evt.AppState.BuildInfo.Version,
|
||||
|
267
internal/protocol/event_test.go
Normal file
267
internal/protocol/event_test.go
Normal file
@ -0,0 +1,267 @@
|
||||
package protocol_test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"git.netflux.io/rob/octoplex/internal/domain"
|
||||
"git.netflux.io/rob/octoplex/internal/event"
|
||||
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
|
||||
"git.netflux.io/rob/octoplex/internal/protocol"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
gocmp "github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/protobuf/testing/protocmp"
|
||||
)
|
||||
|
||||
func TestEventToProto(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
in event.Event
|
||||
want *pb.Event
|
||||
}{
|
||||
{
|
||||
name: "AppStateChanged",
|
||||
in: event.AppStateChangedEvent{
|
||||
State: domain.AppState{
|
||||
Source: domain.Source{
|
||||
Container: domain.Container{
|
||||
ID: "abc123",
|
||||
},
|
||||
Live: true,
|
||||
},
|
||||
Destinations: []domain.Destination{
|
||||
{
|
||||
Name: "dest1",
|
||||
URL: "rtmp://dest1.example.com",
|
||||
Container: domain.Container{
|
||||
ID: "bcd456",
|
||||
},
|
||||
},
|
||||
},
|
||||
BuildInfo: domain.BuildInfo{GoVersion: "go1.16", Version: "v1.0.0"},
|
||||
},
|
||||
},
|
||||
want: &pb.Event{
|
||||
EventType: &pb.Event_AppStateChanged{
|
||||
AppStateChanged: &pb.AppStateChangedEvent{
|
||||
AppState: &pb.AppState{
|
||||
Source: &pb.Source{
|
||||
Container: &pb.Container{
|
||||
Id: "abc123",
|
||||
},
|
||||
Live: true,
|
||||
},
|
||||
Destinations: []*pb.Destination{
|
||||
{
|
||||
Name: "dest1",
|
||||
Url: "rtmp://dest1.example.com",
|
||||
Container: &pb.Container{
|
||||
Id: "bcd456",
|
||||
},
|
||||
},
|
||||
},
|
||||
BuildInfo: &pb.BuildInfo{GoVersion: "go1.16", Version: "v1.0.0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "DestinationAdded",
|
||||
in: event.DestinationAddedEvent{URL: "rtmp://dest.example.com"},
|
||||
want: &pb.Event{
|
||||
EventType: &pb.Event_DestinationAdded{
|
||||
DestinationAdded: &pb.DestinationAddedEvent{
|
||||
Url: "rtmp://dest.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "AddDestinationFailed",
|
||||
in: event.AddDestinationFailedEvent{URL: "rtmp://fail.example.com", Err: errors.New("failed")},
|
||||
want: &pb.Event{
|
||||
EventType: &pb.Event_AddDestinationFailed{
|
||||
AddDestinationFailed: &pb.AddDestinationFailedEvent{
|
||||
Url: "rtmp://fail.example.com",
|
||||
Error: "failed",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "DestinationStreamExited",
|
||||
in: event.DestinationStreamExitedEvent{Name: "stream1", Err: errors.New("exit reason")},
|
||||
want: &pb.Event{
|
||||
EventType: &pb.Event_DestinationStreamExited{
|
||||
DestinationStreamExited: &pb.DestinationStreamExitedEvent{
|
||||
Name: "stream1",
|
||||
Error: "exit reason",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "FatalErrorOccurred",
|
||||
in: event.FatalErrorOccurredEvent{Message: "fatal error"},
|
||||
want: &pb.Event{
|
||||
EventType: &pb.Event_FatalError{
|
||||
FatalError: &pb.FatalErrorEvent{Message: "fatal error"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "OtherInstanceDetected",
|
||||
in: event.OtherInstanceDetectedEvent{},
|
||||
want: &pb.Event{
|
||||
EventType: &pb.Event_OtherInstanceDetected{
|
||||
OtherInstanceDetected: &pb.OtherInstanceDetectedEvent{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "MediaServerStarted",
|
||||
in: event.MediaServerStartedEvent{RTMPURL: "rtmp://media", RTMPSURL: "rtmps://media"},
|
||||
want: &pb.Event{
|
||||
EventType: &pb.Event_MediaServerStarted{
|
||||
MediaServerStarted: &pb.MediaServerStartedEvent{
|
||||
RtmpUrl: "rtmp://media",
|
||||
RtmpsUrl: "rtmps://media",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
assert.Empty(t, gocmp.Diff(tc.want, protocol.EventToProto(tc.in), protocmp.Transform()))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventFromProto(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
in *pb.Event
|
||||
want event.Event
|
||||
}{
|
||||
{
|
||||
name: "AppStateChanged",
|
||||
in: &pb.Event{
|
||||
EventType: &pb.Event_AppStateChanged{
|
||||
AppStateChanged: &pb.AppStateChangedEvent{
|
||||
AppState: &pb.AppState{
|
||||
Source: &pb.Source{
|
||||
Container: &pb.Container{Id: "abc123"},
|
||||
Live: true,
|
||||
},
|
||||
Destinations: []*pb.Destination{
|
||||
{
|
||||
Name: "dest1",
|
||||
Url: "rtmp://dest1.example.com",
|
||||
Container: &pb.Container{Id: "bcd456"},
|
||||
},
|
||||
},
|
||||
BuildInfo: &pb.BuildInfo{
|
||||
GoVersion: "go1.16",
|
||||
Version: "v1.0.0",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: event.AppStateChangedEvent{
|
||||
State: domain.AppState{
|
||||
Source: domain.Source{
|
||||
Container: domain.Container{ID: "abc123"},
|
||||
Live: true,
|
||||
},
|
||||
Destinations: []domain.Destination{
|
||||
{
|
||||
Name: "dest1",
|
||||
URL: "rtmp://dest1.example.com",
|
||||
Container: domain.Container{ID: "bcd456"},
|
||||
},
|
||||
},
|
||||
BuildInfo: domain.BuildInfo{
|
||||
GoVersion: "go1.16",
|
||||
Version: "v1.0.0",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "DestinationAdded",
|
||||
in: &pb.Event{
|
||||
EventType: &pb.Event_DestinationAdded{
|
||||
DestinationAdded: &pb.DestinationAddedEvent{
|
||||
Url: "rtmp://dest.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
want: event.DestinationAddedEvent{URL: "rtmp://dest.example.com"},
|
||||
},
|
||||
{
|
||||
name: "AddDestinationFailed",
|
||||
in: &pb.Event{
|
||||
EventType: &pb.Event_AddDestinationFailed{
|
||||
AddDestinationFailed: &pb.AddDestinationFailedEvent{
|
||||
Url: "rtmp://fail.example.com",
|
||||
Error: "failed",
|
||||
},
|
||||
},
|
||||
},
|
||||
want: event.AddDestinationFailedEvent{URL: "rtmp://fail.example.com", Err: errors.New("failed")},
|
||||
},
|
||||
{
|
||||
name: "DestinationStreamExited",
|
||||
in: &pb.Event{
|
||||
EventType: &pb.Event_DestinationStreamExited{
|
||||
DestinationStreamExited: &pb.DestinationStreamExitedEvent{
|
||||
Name: "stream1",
|
||||
Error: "exit reason",
|
||||
},
|
||||
},
|
||||
},
|
||||
want: event.DestinationStreamExitedEvent{Name: "stream1", Err: errors.New("exit reason")},
|
||||
},
|
||||
{
|
||||
name: "FatalErrorOccurred",
|
||||
in: &pb.Event{
|
||||
EventType: &pb.Event_FatalError{
|
||||
FatalError: &pb.FatalErrorEvent{Message: "fatal error"},
|
||||
},
|
||||
},
|
||||
want: event.FatalErrorOccurredEvent{Message: "fatal error"},
|
||||
},
|
||||
{
|
||||
name: "OtherInstanceDetected",
|
||||
in: &pb.Event{
|
||||
EventType: &pb.Event_OtherInstanceDetected{
|
||||
OtherInstanceDetected: &pb.OtherInstanceDetectedEvent{},
|
||||
},
|
||||
},
|
||||
want: event.OtherInstanceDetectedEvent{},
|
||||
},
|
||||
{
|
||||
name: "MediaServerStarted",
|
||||
in: &pb.Event{
|
||||
EventType: &pb.Event_MediaServerStarted{
|
||||
MediaServerStarted: &pb.MediaServerStartedEvent{
|
||||
RtmpUrl: "rtmp://media",
|
||||
RtmpsUrl: "rtmps://media",
|
||||
},
|
||||
},
|
||||
},
|
||||
want: event.MediaServerStartedEvent{RTMPURL: "rtmp://media", RTMPSURL: "rtmps://media"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
assert.Empty(t, cmp.Diff(tc.want, protocol.EventFromProto(tc.in), gocmp.Comparer(compareErrorMessages)))
|
||||
})
|
||||
}
|
||||
}
|
@ -275,6 +275,8 @@ func (a *App) handleCommand(
|
||||
}
|
||||
}()
|
||||
|
||||
// If the command is a syncCommand, we need to extract the command from it so
|
||||
// it can be type-switched against.
|
||||
if c, ok := cmd.(syncCommand); ok {
|
||||
cmd = c.Command
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user