package replicator import ( "cmp" "context" "errors" "fmt" "log/slog" "strconv" "strings" "sync" "time" typescontainer "github.com/docker/docker/api/types/container" "git.netflux.io/rob/octoplex/internal/container" "git.netflux.io/rob/octoplex/internal/domain" ) type action func() const ( defaultChanSize = 64 // default channel size for asynchronous non-error channels componentName = "replicator" // component name, mostly used for Docker labels imageNameFFMPEG = "ghcr.io/jrottenberg/ffmpeg:7.1-scratch" // image name for ffmpeg ) // State is the state of a single destination from the point of view of the // replicator. type State struct { URL string Container domain.Container Status domain.DestinationStatus } // Actor is responsible for managing the replicator. type Actor struct { wg sync.WaitGroup ctx context.Context cancel context.CancelFunc sourceURL string containerClient *container.Client logger *slog.Logger actorC chan action stateC chan State // mutable state currURLs map[string]struct{} nextIndex int } // StartActorParams contains the parameters for starting a new replicator actor. type StartActorParams struct { SourceURL string ChanSize int ContainerClient *container.Client Logger *slog.Logger } // StartActor starts a new replicator actor. // // The channel exposed by [C] must be consumed by the caller. func StartActor(ctx context.Context, params StartActorParams) *Actor { ctx, cancel := context.WithCancel(ctx) actor := &Actor{ ctx: ctx, cancel: cancel, sourceURL: params.SourceURL, containerClient: params.ContainerClient, logger: params.Logger, actorC: make(chan action, cmp.Or(params.ChanSize, defaultChanSize)), stateC: make(chan State, cmp.Or(params.ChanSize, defaultChanSize)), currURLs: make(map[string]struct{}), } go actor.actorLoop() return actor } // StartDestination starts a destination stream. func (a *Actor) StartDestination(url string) { a.actorC <- func() { if _, ok := a.currURLs[url]; ok { return } a.nextIndex++ a.currURLs[url] = struct{}{} a.logger.Info("Starting live stream", "url", url) containerStateC, errC := a.containerClient.RunContainer(a.ctx, container.RunContainerParams{ Name: componentName + "-" + strconv.Itoa(a.nextIndex), ContainerConfig: &typescontainer.Config{ Image: imageNameFFMPEG, Cmd: []string{ "-loglevel", "level+error", "-i", a.sourceURL, "-c", "copy", "-f", "flv", url, }, Labels: map[string]string{ container.LabelComponent: componentName, container.LabelURL: url, }, }, HostConfig: &typescontainer.HostConfig{NetworkMode: "default"}, NetworkCountConfig: container.NetworkCountConfig{Rx: "eth1", Tx: "eth0"}, Logs: container.LogConfig{Stderr: true}, ShouldRestart: func(_ int64, restartCount int, logs [][]byte, runningTime time.Duration) (bool, error) { // Try to infer if the container failed to start. // // For now, we just check if it was running for less than ten seconds. if restartCount == 0 && runningTime < 10*time.Second { return false, containerStartErrFromLogs(logs) } // Otherwise, always restart, regardless of the exit code. return true, nil }, }) a.wg.Add(1) go func() { defer a.wg.Done() a.destLoop(url, containerStateC, errC) }() } } // Grab the first fatal log line, if it exists, or the first error log line, // from the FFmpeg output. func containerStartErrFromLogs(logs [][]byte) error { var fatalLog, errLog string for _, logBytes := range logs { log := string(logBytes) if strings.HasPrefix(log, "[fatal]") { fatalLog = log break } } if fatalLog == "" { for _, logBytes := range logs { log := string(logBytes) if strings.HasPrefix(log, "[error]") { errLog = log break } } } return errors.New(cmp.Or(fatalLog, errLog, "container failed to start")) } // StopDestination stops a destination stream. func (a *Actor) StopDestination(url string) { a.actorC <- func() { if _, ok := a.currURLs[url]; !ok { return } a.logger.Info("Stopping live stream", "url", url) if err := a.containerClient.RemoveContainers( a.ctx, a.containerClient.ContainersWithLabels(map[string]string{ container.LabelComponent: componentName, container.LabelURL: url, }), ); err != nil { // TODO: error handling a.logger.Error("Failed to stop live stream", "url", url, "err", err) } delete(a.currURLs, url) } } // destLoop is the actor loop for a destination stream. func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, errC <-chan error) { state := &State{URL: url} sendState := func() { a.stateC <- *state } for { select { case containerState := <-containerStateC: state.Container = containerState if containerState.Status == domain.ContainerStatusRunning { if hasElapsedSince(5*time.Second, containerState.RxSince) { state.Status = domain.DestinationStatusLive } else { state.Status = domain.DestinationStatusStarting } } else { state.Status = domain.DestinationStatusOffAir } sendState() case err := <-errC: if err != nil { a.logger.Error("Error from container client", "err", err) } state.Container.Err = err sendState() return } } } // C returns a channel that will receive the current state of the replicator. // The channel is never closed. func (a *Actor) C() <-chan State { return a.stateC } // Close closes the actor. func (a *Actor) Close() error { if err := a.containerClient.RemoveContainers( context.Background(), a.containerClient.ContainersWithLabels(map[string]string{container.LabelComponent: componentName}), ); err != nil { return fmt.Errorf("remove containers: %w", err) } a.wg.Wait() close(a.actorC) return nil } // actorLoop is the main actor loop. func (a *Actor) actorLoop() { for act := range a.actorC { act() } } // hasElapsedSince returns true if the duration has elapsed since the given // time. If the provided time is zero, the function returns false. func hasElapsedSince(d time.Duration, t time.Time) bool { if t.IsZero() { return false } return d < time.Since(t) }