WIP: refactor/api #1

Draft
rob wants to merge 26 commits from refactor/api into main
8 changed files with 1971 additions and 0 deletions
Showing only changes of commit f67a456d1e - Show all commits

4
go.mod
View File

@ -12,6 +12,8 @@ require (
github.com/stretchr/testify v1.10.0
github.com/testcontainers/testcontainers-go v0.35.0
golang.design/x/clipboard v0.7.0
google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.36.3
gopkg.in/yaml.v3 v3.0.1
)
@ -95,12 +97,14 @@ require (
golang.org/x/image v0.26.0 // indirect
golang.org/x/mobile v0.0.0-20250408133729-978277e7eaf7 // indirect
golang.org/x/mod v0.24.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/sync v0.13.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/term v0.31.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/time v0.9.0 // indirect
golang.org/x/tools v0.32.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

2
go.sum
View File

@ -52,6 +52,8 @@ github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiU
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=

View File

@ -5,7 +5,9 @@ import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"net"
"slices"
"time"
@ -13,10 +15,13 @@ import (
"git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc/proto"
"git.netflux.io/rob/octoplex/internal/mediaserver"
"git.netflux.io/rob/octoplex/internal/replicator"
"git.netflux.io/rob/octoplex/internal/server"
"git.netflux.io/rob/octoplex/internal/terminal"
"github.com/docker/docker/client"
"google.golang.org/grpc"
)
// App is an instance of the app.
@ -120,6 +125,19 @@ func (a *App) Run(ctx context.Context) error {
<-a.dispatchC
}
const grpcAddr = ":50051"
lis, err := net.Listen("tcp", grpcAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
grpcDone := make(chan error, 1)
pb.RegisterInternalAPIServer(grpcServer, server.Server{})
go func() {
a.logger.Info("gRPC server started", "addr", grpcAddr)
grpcDone <- grpcServer.Serve(lis)
}()
containerClient, err := container.NewClient(ctx, a.dockerClient, a.logger.With("component", "container_client"))
if err != nil {
err = fmt.Errorf("create container client: %w", err)
@ -192,6 +210,9 @@ func (a *App) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case grpcErr := <-grpcDone:
a.logger.Error("gRPC server exited", "err", grpcErr)
return grpcErr
case <-startMediaServerC:
if err = srv.Start(ctx); err != nil {
return fmt.Errorf("start mediaserver: %w", err)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,135 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.6
// source: proto/api.proto
package proto
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// InternalAPIClient is the client API for InternalAPI service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type InternalAPIClient interface {
Communicate(ctx context.Context, opts ...grpc.CallOption) (InternalAPI_CommunicateClient, error)
}
type internalAPIClient struct {
cc grpc.ClientConnInterface
}
func NewInternalAPIClient(cc grpc.ClientConnInterface) InternalAPIClient {
return &internalAPIClient{cc}
}
func (c *internalAPIClient) Communicate(ctx context.Context, opts ...grpc.CallOption) (InternalAPI_CommunicateClient, error) {
stream, err := c.cc.NewStream(ctx, &InternalAPI_ServiceDesc.Streams[0], "/api.InternalAPI/Communicate", opts...)
if err != nil {
return nil, err
}
x := &internalAPICommunicateClient{stream}
return x, nil
}
type InternalAPI_CommunicateClient interface {
Send(*Envelope) error
Recv() (*Envelope, error)
grpc.ClientStream
}
type internalAPICommunicateClient struct {
grpc.ClientStream
}
func (x *internalAPICommunicateClient) Send(m *Envelope) error {
return x.ClientStream.SendMsg(m)
}
func (x *internalAPICommunicateClient) Recv() (*Envelope, error) {
m := new(Envelope)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// InternalAPIServer is the server API for InternalAPI service.
// All implementations should embed UnimplementedInternalAPIServer
// for forward compatibility
type InternalAPIServer interface {
Communicate(InternalAPI_CommunicateServer) error
}
// UnimplementedInternalAPIServer should be embedded to have forward compatible implementations.
type UnimplementedInternalAPIServer struct {
}
func (UnimplementedInternalAPIServer) Communicate(InternalAPI_CommunicateServer) error {
return status.Errorf(codes.Unimplemented, "method Communicate not implemented")
}
// UnsafeInternalAPIServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to InternalAPIServer will
// result in compilation errors.
type UnsafeInternalAPIServer interface {
mustEmbedUnimplementedInternalAPIServer()
}
func RegisterInternalAPIServer(s grpc.ServiceRegistrar, srv InternalAPIServer) {
s.RegisterService(&InternalAPI_ServiceDesc, srv)
}
func _InternalAPI_Communicate_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(InternalAPIServer).Communicate(&internalAPICommunicateServer{stream})
}
type InternalAPI_CommunicateServer interface {
Send(*Envelope) error
Recv() (*Envelope, error)
grpc.ServerStream
}
type internalAPICommunicateServer struct {
grpc.ServerStream
}
func (x *internalAPICommunicateServer) Send(m *Envelope) error {
return x.ServerStream.SendMsg(m)
}
func (x *internalAPICommunicateServer) Recv() (*Envelope, error) {
m := new(Envelope)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// InternalAPI_ServiceDesc is the grpc.ServiceDesc for InternalAPI service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var InternalAPI_ServiceDesc = grpc.ServiceDesc{
ServiceName: "api.InternalAPI",
HandlerType: (*InternalAPIServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "Communicate",
Handler: _InternalAPI_Communicate_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "proto/api.proto",
}

View File

@ -0,0 +1,7 @@
package server
import pb "git.netflux.io/rob/octoplex/internal/generated/grpc/proto"
type Server struct {
pb.UnimplementedInternalAPIServer
}

View File

@ -40,3 +40,9 @@ description = "Generate mocks"
dir = "{{cwd}}"
run = "go tool mockery"
alias = "m"
[tasks.generate_proto]
description = "Generate gRPC files from proto"
dir = "{{cwd}}"
run = "protoc --go_out=./generated/grpc --go-grpc_out=./generated/grpc proto/*.proto"
alias = "p"

105
proto/api.proto Normal file
View File

@ -0,0 +1,105 @@
syntax = "proto3";
package api;
option go_package = "git.netflux.io/rob/octoplex/internal/generated/grpc/proto";
service InternalAPI {
rpc Communicate(stream Envelope) returns (stream Envelope);
}
message Envelope {
oneof payload {
Command command = 1;
Event event = 2;
}
}
message Command {
oneof command_type {
AddDestinationCommand add_destinaion = 1;
RemoveDestinationCommand remove_destination = 2;
StartDestinationCommand start_destination = 3;
StopDestinationCommand stop_destination = 4;
CloseOtherInstancesCommand close_other_instances = 5;
QuitCommand quit = 6;
}
}
message AddDestinationCommand {
string name = 1;
string url = 2;
}
message RemoveDestinationCommand {
string url= 1;
}
message StartDestinationCommand {
string url = 1;
}
message StopDestinationCommand {
string url = 1;
}
message CloseOtherInstancesCommand {}
message QuitCommand {}
message Event {
oneof event_type {
AppStateChangedEvent app_state_changed = 1;
DestinationStreamExitedEvent destination_stream_exited = 2;
DestinationAddedEvent destination_added = 3;
AddDestinationFailedEvent add_destination_failed = 4;
DestinationRemovedEvent destination_removed = 5;
RemoveDestinationFailedEvent remove_destination_failed = 6;
StartDestinationFailedEvent start_destination_failed = 7;
MediaServerStartedEvent media_server_started = 8;
OtherInstanceDetectedEvent other_instance_detected = 9;
FatalErrorEvent fatal_error = 10;
}
}
// TODO: complete
message AppStateChangedEvent {}
message DestinationStreamExitedEvent {
string name = 1;
string error = 2;
}
message DestinationAddedEvent {
string url = 1;
}
message AddDestinationFailedEvent {
string url = 1;
string error = 2;
}
message DestinationRemovedEvent {
string url = 1;
}
message RemoveDestinationFailedEvent {
string url = 1;
string error = 2;
}
message StartDestinationFailedEvent {
string url = 1;
string error = 2;
}
message MediaServerStartedEvent {
string rtmp_url = 1;
string rtmps_url = 2;
}
message OtherInstanceDetectedEvent {}
message FatalErrorEvent {
string error = 1;
}