2025-02-12 23:50:27 +01:00

232 lines
7.1 KiB
Go

package mediaserver
import (
"cmp"
"context"
"fmt"
"log/slog"
"net/http"
"strconv"
"time"
typescontainer "github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
"git.netflux.io/rob/termstream/container"
"git.netflux.io/rob/termstream/domain"
)
const (
defaultFetchIngressStateInterval = 5 * time.Second // default interval to fetch the state of the media server
defaultAPIPort = 9997 // default API host port for the media server
defaultRTMPPort = 1935 // default RTMP host port for the media server
defaultChanSize = 64 // default channel size for asynchronous non-error channels
imageNameMediaMTX = "netfluxio/mediamtx-alpine:latest" // image name for mediamtx
rtmpPath = "live" // RTMP path for the media server
componentName = "mediaserver" // component name, mostly used for Docker labels
httpClientTimeout = time.Second // timeout for outgoing HTTP client requests
)
// action is an action to be performed by the actor.
type action func()
// Actor is responsible for managing the media server.
type Actor struct {
actorC chan action
stateC chan domain.Source
containerClient *container.Client
apiPort int
rtmpPort int
fetchIngressStateInterval time.Duration
logger *slog.Logger
httpClient *http.Client
// mutable state
state *domain.Source
}
// StartActorParams contains the parameters for starting a new media server
// actor.
type StartActorParams struct {
APIPort int // defaults to 9997
RTMPPort int // defaults to 1935
ChanSize int // defaults to 64
FetchIngressStateInterval time.Duration // defaults to 5 seconds
ContainerClient *container.Client
Logger *slog.Logger
}
// StartActor starts a new media server actor.
//
// Callers must consume the state channel exposed via [C].
func StartActor(ctx context.Context, params StartActorParams) *Actor {
chanSize := cmp.Or(params.ChanSize, defaultChanSize)
actor := &Actor{
apiPort: cmp.Or(params.APIPort, defaultAPIPort),
rtmpPort: cmp.Or(params.RTMPPort, defaultRTMPPort),
fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval),
actorC: make(chan action, chanSize),
state: new(domain.Source),
stateC: make(chan domain.Source, chanSize),
containerClient: params.ContainerClient,
logger: params.Logger,
httpClient: &http.Client{Timeout: httpClientTimeout},
}
apiPortSpec := nat.Port(strconv.Itoa(actor.apiPort) + ":9997")
rtmpPortSpec := nat.Port(strconv.Itoa(actor.rtmpPort) + ":1935")
exposedPorts, portBindings, _ := nat.ParsePortSpecs([]string{string(apiPortSpec), string(rtmpPortSpec)})
containerStateC, errC := params.ContainerClient.RunContainer(
ctx,
container.RunContainerParams{
Name: componentName,
ChanSize: chanSize,
ContainerConfig: &typescontainer.Config{
Image: imageNameMediaMTX,
Hostname: "mediaserver",
Env: []string{
"MTX_LOGLEVEL=info",
"MTX_API=yes",
},
Labels: map[string]string{
"component": componentName,
},
Healthcheck: &typescontainer.HealthConfig{
Test: []string{"CMD", "curl", "-f", "http://localhost:9997/v3/paths/list"},
Interval: time.Second * 10,
StartPeriod: time.Second * 2,
StartInterval: time.Second * 2,
Retries: 2,
},
ExposedPorts: exposedPorts,
},
HostConfig: &typescontainer.HostConfig{
NetworkMode: "default",
PortBindings: portBindings,
},
NetworkCountConfig: container.NetworkCountConfig{Rx: "eth0", Tx: "eth1"},
},
)
actor.state.RTMPURL = actor.rtmpURL()
actor.state.RTMPInternalURL = actor.rtmpInternalURL()
go actor.actorLoop(containerStateC, errC)
return actor
}
// C returns a channel that will receive the current state of the media server.
func (s *Actor) C() <-chan domain.Source {
return s.stateC
}
// State returns the current state of the media server.
func (s *Actor) State() domain.Source {
resultChan := make(chan domain.Source)
s.actorC <- func() {
resultChan <- *s.state
}
return <-resultChan
}
// Close closes the media server actor.
func (s *Actor) Close() error {
if err := s.containerClient.RemoveContainers(context.Background(), map[string]string{"component": componentName}); err != nil {
return fmt.Errorf("remove containers: %w", err)
}
close(s.actorC)
return nil
}
// actorLoop is the main loop of the media server actor. It only exits when the
// actor is closed.
func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan error) {
fetchStateT := time.NewTicker(s.fetchIngressStateInterval)
defer fetchStateT.Stop()
sendState := func() { s.stateC <- *s.state }
for {
select {
case containerState := <-containerStateC:
s.state.Container = containerState
sendState()
continue
case err, ok := <-errC:
if !ok {
// The loop continues until the actor is closed.
// Avoid receiving duplicate close signals.
errC = nil
continue
}
if err != nil {
s.logger.Error("Error from container client", "error", err, "id", shortID(s.state.Container.ID))
}
fetchStateT.Stop()
// TODO: surface better error from container
if s.state.Container.ExitCode != nil {
s.state.ExitReason = fmt.Sprintf("Server process exited with code %d", *s.state.Container.ExitCode)
} else {
s.state.ExitReason = "Server process exited unexpectedly"
}
if s.state.Live {
s.state.Live = false
}
sendState()
case <-fetchStateT.C:
ingressState, err := s.fetchIngressState()
if err != nil {
s.logger.Error("Error fetching server state", "error", err)
continue
}
if ingressState.ready != s.state.Live || ingressState.listeners != s.state.Listeners {
s.state.Live = ingressState.ready
s.state.Listeners = ingressState.listeners
sendState()
}
case action, ok := <-s.actorC:
if !ok {
return
}
action()
}
}
}
// rtmpURL returns the RTMP URL for the media server, accessible from the host.
func (s *Actor) rtmpURL() string {
return fmt.Sprintf("rtmp://localhost:%d/%s", s.rtmpPort, rtmpPath)
}
// rtmpInternalURL returns the RTMP URL for the media server, accessible from
// the app network.
func (s *Actor) rtmpInternalURL() string {
// Container port, not host port:
return fmt.Sprintf("rtmp://mediaserver:1935/%s", rtmpPath)
}
// apiURL returns the API URL for the media server, accessible from the host.
func (s *Actor) apiURL() string {
return fmt.Sprintf("http://localhost:%d/v3/rtmpconns/list", s.apiPort)
}
// shortID returns the first 12 characters of the given container ID.
func shortID(id string) string {
if len(id) < 12 {
return id
}
return id[:12]
}