package server

import (
	"context"
	"errors"
	"fmt"
	"io"

	"git.netflux.io/rob/octoplex/internal/event"
	pb "git.netflux.io/rob/octoplex/internal/generated/grpc"
	"git.netflux.io/rob/octoplex/internal/protocol"
	"github.com/sagikazarmark/slog-shim"
	"golang.org/x/sync/errgroup"
)

type Server struct {
	pb.UnimplementedInternalAPIServer

	dispatcher func(event.Command)
	bus        *event.Bus
	logger     *slog.Logger
}

func New(
	dispatcher func(event.Command),
	bus *event.Bus,
	logger *slog.Logger,
) *Server {
	return &Server{
		dispatcher: dispatcher,
		bus:        bus,
		logger:     logger.With("component", "server"),
	}
}

func (s *Server) Communicate(stream pb.InternalAPI_CommunicateServer) error {
	g, ctx := errgroup.WithContext(stream.Context())

	g.Go(func() error {
		eventsC := s.bus.Register()
		defer s.bus.Deregister(eventsC)

		for {
			select {
			case evt := <-eventsC:
				if err := stream.Send(&pb.Envelope{Payload: &pb.Envelope_Event{Event: protocol.EventToProto(evt)}}); err != nil {
					return fmt.Errorf("send event: %w", err)
				}
			case <-ctx.Done():
				return ctx.Err()
			}
		}
	})

	g.Go(func() error {
		for {
			in, err := stream.Recv()
			if err == io.EOF {
				s.logger.Info("Client disconnected")
				return err
			}

			if err != nil {
				return fmt.Errorf("receive message: %w", err)
			}

			switch pbCmd := in.Payload.(type) {
			case *pb.Envelope_Command:
				cmd := protocol.CommandFromProto(pbCmd.Command)
				s.logger.Info("Received command", "command", cmd.Name())
				s.dispatcher(cmd)
			default:
				return fmt.Errorf("expected command but got: %T", pbCmd)
			}
		}
	})

	if err := g.Wait(); err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
		s.logger.Error("Client stream closed with error", "err", err)
		return fmt.Errorf("errgroup.Wait: %w", err)
	}

	s.logger.Info("Client stream closed")

	return nil
}