package mediaserver

import (
	"bytes"
	"cmp"
	"context"
	"crypto/rand"
	"fmt"
	"log/slog"
	"net/http"
	"os"
	"time"

	typescontainer "github.com/docker/docker/api/types/container"
	"github.com/docker/go-connections/nat"
	"gopkg.in/yaml.v3"

	"git.netflux.io/rob/octoplex/internal/container"
	"git.netflux.io/rob/octoplex/internal/domain"
)

// StreamKey is the stream key for the media server, which forms the RTMP path
// component and can be used as a basic form of authentication.
//
// It defaults to "live", in which case the full RTMP URL would be:
// `rtmp://localhost:1935/live`.
type StreamKey string

const (
	defaultUpdateStateInterval           = 5 * time.Second                           // default interval to update the state of the media server
	defaultAPIPort                       = 9997                                      // default API host port for the media server
	defaultRTMPIP                        = "127.0.0.1"                               // default RTMP host IP, bound to localhost for security
	defaultRTMPPort                      = 1935                                      // default RTMP host port for the media server
	defaultRTMPSPort                     = 1936                                      // default RTMPS host port for the media server
	defaultHost                          = "localhost"                               // default mediaserver host name
	defaultChanSize                      = 64                                        // default channel size for asynchronous non-error channels
	imageNameMediaMTX                    = "ghcr.io/rfwatson/mediamtx-alpine:latest" // image name for mediamtx
	defaultStreamKey           StreamKey = "live"                                    // Default stream key. See [StreamKey].
	componentName                        = "mediaserver"                             // component name, mostly used for Docker labels
	httpClientTimeout                    = time.Second                               // timeout for outgoing HTTP client requests
	configPath                           = "/mediamtx.yml"                           // path to the media server config file
	tlsInternalCertPath                  = "/etc/tls-internal.crt"                   // path to the internal TLS cert
	tlsInternalKeyPath                   = "/etc/tls-internal.key"                   // path to the internal TLS key
	tlsCertPath                          = "/etc/tls.crt"                            // path to the custom TLS cert
	tlsKeyPath                           = "/etc/tls.key"                            // path to the custom TLS key
)

// action is an action to be performed by the actor.
type action func()

// Actor is responsible for managing the media server.
type Actor struct {
	actorC              chan action
	stateC              chan domain.Source
	chanSize            int
	containerClient     *container.Client
	rtmpAddr            domain.NetAddr
	rtmpsAddr           domain.NetAddr
	apiPort             int
	host                string
	streamKey           StreamKey
	updateStateInterval time.Duration
	pass                string         // password for the media server
	keyPairInternal     domain.KeyPair // TLS key pair for the media server
	keyPairCustom       domain.KeyPair // TLS key pair for the media server
	logger              *slog.Logger
	apiClient           *http.Client

	// mutable state
	state *domain.Source
}

// NewActorParams contains the parameters for building a new media server
// actor.
type NewActorParams struct {
	RTMPAddr            OptionalNetAddr // defaults to disabled, or 127.0.0.1:1935
	RTMPSAddr           OptionalNetAddr // defaults to disabled, or 127.0.0.1:1936
	APIPort             int             // defaults to 9997
	Host                string          // defaults to "localhost"
	TLSCertPath         string          // defaults to empty
	TLSKeyPath          string          // defaults to empty
	StreamKey           StreamKey       // defaults to "live"
	ChanSize            int             // defaults to 64
	UpdateStateInterval time.Duration   // defaults to 5 seconds
	ContainerClient     *container.Client
	Logger              *slog.Logger
}

// OptionalNetAddr is a wrapper around domain.NetAddr that indicates whether it
// is enabled or not.
type OptionalNetAddr struct {
	domain.NetAddr

	Enabled bool
}

// NewActor creates a new media server actor.
//
// Callers must consume the state channel exposed via [C].
func NewActor(ctx context.Context, params NewActorParams) (_ *Actor, err error) {
	dnsNames := []string{"localhost"}
	if params.Host != "" {
		dnsNames = append(dnsNames, params.Host)
	}

	keyPairInternal, err := generateTLSCert(dnsNames...)
	if err != nil {
		return nil, fmt.Errorf("generate TLS cert: %w", err)
	}

	var keyPairCustom domain.KeyPair
	if params.TLSCertPath != "" {
		keyPairCustom.Cert, err = os.ReadFile(params.TLSCertPath)
		if err != nil {
			return nil, fmt.Errorf("read TLS cert: %w", err)
		}
		keyPairCustom.Key, err = os.ReadFile(params.TLSKeyPath)
		if err != nil {
			return nil, fmt.Errorf("read TLS key: %w", err)
		}
	}

	// TODO: custom cert for API?
	apiClient, err := buildAPIClient(keyPairInternal.Cert)
	if err != nil {
		return nil, fmt.Errorf("build API client: %w", err)
	}

	chanSize := cmp.Or(params.ChanSize, defaultChanSize)
	return &Actor{
		rtmpAddr:            toRTMPAddr(params.RTMPAddr, defaultRTMPPort),
		rtmpsAddr:           toRTMPAddr(params.RTMPSAddr, defaultRTMPSPort),
		apiPort:             cmp.Or(params.APIPort, defaultAPIPort),
		host:                cmp.Or(params.Host, defaultHost),
		streamKey:           cmp.Or(params.StreamKey, defaultStreamKey),
		updateStateInterval: cmp.Or(params.UpdateStateInterval, defaultUpdateStateInterval),
		keyPairInternal:     keyPairInternal,
		keyPairCustom:       keyPairCustom,
		pass:                generatePassword(),
		actorC:              make(chan action, chanSize),
		state:               new(domain.Source),
		stateC:              make(chan domain.Source, chanSize),
		chanSize:            chanSize,
		containerClient:     params.ContainerClient,
		logger:              params.Logger,
		apiClient:           apiClient,
	}, nil
}

func (a *Actor) Start(ctx context.Context) error {
	var portSpecs []string
	portSpecs = append(portSpecs, fmt.Sprintf("127.0.0.1:%d:9997", a.apiPort))
	if !a.rtmpAddr.IsZero() {
		portSpecs = append(portSpecs, fmt.Sprintf("%s:%d:%d", a.rtmpAddr.IP, a.rtmpAddr.Port, 1935))
	}
	if !a.rtmpsAddr.IsZero() {
		portSpecs = append(portSpecs, fmt.Sprintf("%s:%d:%d", a.rtmpsAddr.IP, a.rtmpsAddr.Port, 1936))
	}
	exposedPorts, portBindings, err := nat.ParsePortSpecs(portSpecs)
	if err != nil {
		return fmt.Errorf("parse port specs: %w", err)
	}

	cfg, err := a.buildServerConfig()
	if err != nil {
		return fmt.Errorf("build server config: %w", err)
	}

	copyFiles := []container.CopyFileConfig{
		{
			Path:    configPath,
			Payload: bytes.NewReader(cfg),
			Mode:    0600,
		},
		{
			Path:    tlsInternalCertPath,
			Payload: bytes.NewReader(a.keyPairInternal.Cert),
			Mode:    0600,
		},
		{
			Path:    tlsInternalKeyPath,
			Payload: bytes.NewReader(a.keyPairInternal.Key),
			Mode:    0600,
		},
		{
			Path:    "/etc/healthcheckopts.txt",
			Payload: bytes.NewReader([]byte(fmt.Sprintf("--user api:%s", a.pass))),
			Mode:    0600,
		},
	}

	if !a.keyPairCustom.IsZero() {
		copyFiles = append(
			copyFiles,
			container.CopyFileConfig{
				Path:    tlsCertPath,
				Payload: bytes.NewReader(a.keyPairCustom.Cert),
				Mode:    0600,
			},
			container.CopyFileConfig{
				Path:    tlsKeyPath,
				Payload: bytes.NewReader(a.keyPairCustom.Key),
				Mode:    0600,
			},
		)
	}

	args := []any{"host", a.host}
	if a.rtmpAddr.IsZero() {
		args = append(args, "rtmp.enabled", false)
	} else {
		args = append(args, "rtmp.enabled", true, "rtmp.bind_addr", a.rtmpAddr.IP, "rtmp.bind_port", a.rtmpAddr.Port)
	}
	if a.rtmpsAddr.IsZero() {
		args = append(args, "rtmps.enabled", false)
	} else {
		args = append(args, "rtmps.enabled", true, "rtmps.bind_addr", a.rtmpsAddr.IP, "rtmps.bind_port", a.rtmpsAddr.Port)
	}
	a.logger.Info("Starting media server", args...)

	containerStateC, errC := a.containerClient.RunContainer(
		ctx,
		container.RunContainerParams{
			Name:     componentName,
			ChanSize: a.chanSize,
			ContainerConfig: &typescontainer.Config{
				Image:    imageNameMediaMTX,
				Hostname: "mediaserver",
				Labels:   map[string]string{container.LabelComponent: componentName},
				Healthcheck: &typescontainer.HealthConfig{
					Test: []string{
						"CMD",
						"curl",
						"--fail",
						"--silent",
						"--cacert", "/etc/tls-internal.crt",
						"--config", "/etc/healthcheckopts.txt",
						a.healthCheckURL(),
					},
					Interval:      time.Second * 10,
					StartPeriod:   time.Second * 2,
					StartInterval: time.Second * 2,
					Retries:       2,
				},
				ExposedPorts: exposedPorts,
			},
			HostConfig: &typescontainer.HostConfig{
				NetworkMode:  "default",
				PortBindings: portBindings,
			},
			NetworkCountConfig: container.NetworkCountConfig{Rx: "eth0", Tx: "eth1"},
			Logs:               container.LogConfig{Stdout: true},
			CopyFiles:          copyFiles,
		},
	)

	go a.actorLoop(ctx, containerStateC, errC)

	return nil
}

func (a *Actor) buildServerConfig() ([]byte, error) {
	// NOTE: Regardless of the user configuration (which mostly affects exposed
	// ports and UI rendering) plain RTMP must be enabled at the container level,
	// for internal connections.
	var encryptionString string
	if a.rtmpsAddr.IsZero() {
		encryptionString = "no"
	} else {
		encryptionString = "optional"
	}

	var certPath, keyPath string
	if a.keyPairCustom.IsZero() {
		certPath = tlsInternalCertPath
		keyPath = tlsInternalKeyPath
	} else {
		certPath = tlsCertPath
		keyPath = tlsKeyPath
	}

	return yaml.Marshal(
		Config{
			LogLevel:        "debug",
			LogDestinations: []string{"stdout"},
			AuthMethod:      "internal",
			AuthInternalUsers: []User{
				{
					User: "any",
					IPs:  []string{}, // any IP
					Permissions: []UserPermission{
						{Action: "publish"},
					},
				},
				{
					User: "api",
					Pass: a.pass,
					IPs:  []string{}, // any IP
					Permissions: []UserPermission{
						{Action: "read"},
					},
				},
				{
					User:        "api",
					Pass:        a.pass,
					IPs:         []string{}, // any IP
					Permissions: []UserPermission{{Action: "api"}},
				},
			},
			RTMP:           true,
			RTMPEncryption: encryptionString,
			RTMPAddress:    ":1935",
			RTMPSAddress:   ":1936",
			RTMPServerCert: certPath,
			RTMPServerKey:  keyPath,
			API:            true,
			APIEncryption:  true,
			APIServerCert:  tlsInternalCertPath,
			APIServerKey:   tlsInternalKeyPath,
			Paths: map[string]Path{
				string(a.streamKey): {Source: "publisher"},
			},
		},
	)
}

// C returns a channel that will receive the current state of the media server.
func (s *Actor) C() <-chan domain.Source {
	return s.stateC
}

// State returns the current state of the media server.
//
// Blocks if the actor is not started yet.
func (s *Actor) State() domain.Source {
	resultChan := make(chan domain.Source)
	s.actorC <- func() {
		resultChan <- *s.state
	}
	return <-resultChan
}

// Close closes the media server actor.
func (s *Actor) Close() error {
	if err := s.containerClient.RemoveContainers(
		context.Background(),
		s.containerClient.ContainersWithLabels(map[string]string{container.LabelComponent: componentName}),
	); err != nil {
		return fmt.Errorf("remove containers: %w", err)
	}

	return nil
}

// actorLoop is the main loop of the media server actor. It exits when the
// actor is closed, or the parent context is cancelled.
func (s *Actor) actorLoop(ctx context.Context, containerStateC <-chan domain.Container, errC <-chan error) {
	updateStateT := time.NewTicker(s.updateStateInterval)
	defer updateStateT.Stop()

	sendState := func() { s.stateC <- *s.state }

	for {
		select {
		case containerState := <-containerStateC:
			s.state.Container = containerState

			if s.state.Container.Status == domain.ContainerStatusExited {
				updateStateT.Stop()
				s.handleContainerExit(nil)
			}

			sendState()

			continue
		case err, ok := <-errC:
			if !ok {
				// The loop continues until the actor is closed.
				// Avoid receiving duplicate close signals.
				errC = nil
				continue
			}

			if err != nil {
				s.logger.Error("Error from container client", "err", err, "id", shortID(s.state.Container.ID))
			}

			updateStateT.Stop()
			s.handleContainerExit(err)

			sendState()
		case <-updateStateT.C:
			path, err := fetchPath(s.pathURL(string(s.streamKey)), s.apiClient)
			if err != nil {
				s.logger.Error("Error fetching path", "err", err)
				continue
			}

			if path.Ready != s.state.Live {
				s.state.Live = path.Ready
				s.state.LiveChangedAt = time.Now()
				s.state.Tracks = path.Tracks
				sendState()
			}
		case action, ok := <-s.actorC:
			if !ok {
				continue
			}
			action()
		case <-ctx.Done():
			return
		}
	}
}

// TODO: refactor to use container.Err?
func (s *Actor) handleContainerExit(err error) {
	if s.state.Container.ExitCode != nil {
		s.state.ExitReason = fmt.Sprintf("Server process exited with code %d.", *s.state.Container.ExitCode)
	} else {
		s.state.ExitReason = "Server process exited unexpectedly."
	}
	if err != nil {
		s.state.ExitReason += "\n\n" + err.Error()
	}

	s.state.Live = false
}

// RTMPURL returns the RTMP URL for the media server, accessible from the host.
func (s *Actor) RTMPURL() string {
	if s.rtmpAddr.IsZero() {
		return ""
	}

	return fmt.Sprintf("rtmp://%s:%d/%s", s.host, s.rtmpAddr.Port, s.streamKey)
}

// RTMPSURL returns the RTMPS URL for the media server, accessible from the host.
func (s *Actor) RTMPSURL() string {
	if s.rtmpsAddr.IsZero() {
		return ""
	}

	return fmt.Sprintf("rtmps://%s:%d/%s", s.host, s.rtmpsAddr.Port, s.streamKey)
}

// RTMPInternalURL returns the RTMP URL for the media server, accessible from
// the app network.
func (s *Actor) RTMPInternalURL() string {
	// Container port, not host port:
	return fmt.Sprintf("rtmp://mediaserver:1935/%s?user=api&pass=%s", s.streamKey, s.pass)
}

// pathURL returns the URL for fetching a path, accessible from the host.
func (s *Actor) pathURL(path string) string {
	return fmt.Sprintf("https://api:%s@localhost:%d/v3/paths/get/%s", s.pass, s.apiPort, path)
}

// healthCheckURL returns the URL for the health check, accessible from the
// container. It is logged to Docker's events log so must not include
// credentials.
func (s *Actor) healthCheckURL() string {
	return fmt.Sprintf("https://localhost:%d/v3/paths/list", s.apiPort)
}

// shortID returns the first 12 characters of the given container ID.
func shortID(id string) string {
	if len(id) < 12 {
		return id
	}
	return id[:12]
}

// generatePassword securely generates a random password suitable for
// authenticating media server endpoints.
func generatePassword() string {
	const lenBytes = 32
	p := make([]byte, lenBytes)
	_, _ = rand.Read(p)
	return fmt.Sprintf("%x", []byte(p))
}

// toRTMPAddr builds a domain.NetAddr from an OptionalNetAddr, with default
// values set to RTMP default bind config if needed. If the OptionalNetAddr is
// not enabled, a zero value is returned.
func toRTMPAddr(a OptionalNetAddr, defaultPort int) domain.NetAddr {
	if !a.Enabled {
		return domain.NetAddr{}
	}

	return domain.NetAddr{
		IP:   cmp.Or(a.IP, defaultRTMPIP),
		Port: cmp.Or(a.Port, defaultPort),
	}
}