87 lines
1.9 KiB
Go

package server
import (
"context"
"errors"
"fmt"
"io"
"git.netflux.io/rob/octoplex/internal/event"
pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
"git.netflux.io/rob/octoplex/internal/protocol"
"github.com/sagikazarmark/slog-shim"
"golang.org/x/sync/errgroup"
)
type Server struct {
pb.UnimplementedInternalAPIServer
dispatcher func(event.Command)
bus *event.Bus
logger *slog.Logger
}
func New(
dispatcher func(event.Command),
bus *event.Bus,
logger *slog.Logger,
) *Server {
return &Server{
dispatcher: dispatcher,
bus: bus,
logger: logger.With("component", "server"),
}
}
func (s *Server) Communicate(stream pb.InternalAPI_CommunicateServer) error {
g, ctx := errgroup.WithContext(stream.Context())
g.Go(func() error {
eventsC := s.bus.Register()
defer s.bus.Deregister(eventsC)
for {
select {
case evt := <-eventsC:
if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: protocol.EventToProto(evt)}}); err != nil {
return fmt.Errorf("send event: %w", err)
}
case <-ctx.Done():
return ctx.Err()
}
}
})
g.Go(func() error {
for {
in, err := stream.Recv()
if err == io.EOF {
s.logger.Info("Client disconnected")
return err
}
if err != nil {
return fmt.Errorf("receive message: %w", err)
}
switch pbCmd := in.Payload.(type) {
case *pb.Envelope_Command:
cmd := protocol.CommandFromProto(pbCmd.Command)
s.logger.Info("Received command", "command", cmd.Name())
s.dispatcher(cmd)
default:
return fmt.Errorf("expected command but got: %T", pbCmd)
}
}
})
if err := g.Wait(); err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
s.logger.Error("Client stream closed with error", "err", err)
return fmt.Errorf("errgroup.Wait: %w", err)
}
s.logger.Info("Client stream closed")
return nil
}