refactor: replicator

This commit is contained in:
Rob Watson 2025-04-03 07:03:18 +02:00
parent 6952516204
commit 522be621ee
3 changed files with 29 additions and 29 deletions

View File

@ -12,7 +12,7 @@ import (
"git.netflux.io/rob/octoplex/internal/container" "git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/mediaserver" "git.netflux.io/rob/octoplex/internal/mediaserver"
"git.netflux.io/rob/octoplex/internal/multiplexer" "git.netflux.io/rob/octoplex/internal/replicator"
"git.netflux.io/rob/octoplex/internal/terminal" "git.netflux.io/rob/octoplex/internal/terminal"
) )
@ -91,12 +91,12 @@ func Run(ctx context.Context, params RunParams) error {
} }
defer srv.Close() defer srv.Close()
mp := multiplexer.NewActor(ctx, multiplexer.NewActorParams{ repl := replicator.NewActor(ctx, replicator.NewActorParams{
SourceURL: srv.State().RTMPInternalURL, SourceURL: srv.State().RTMPInternalURL,
ContainerClient: containerClient, ContainerClient: containerClient,
Logger: logger.With("component", "multiplexer"), Logger: logger.With("component", "replicator"),
}) })
defer mp.Close() defer repl.Close()
const uiUpdateInterval = time.Second const uiUpdateInterval = time.Second
uiUpdateT := time.NewTicker(uiUpdateInterval) uiUpdateT := time.NewTicker(uiUpdateInterval)
@ -129,7 +129,7 @@ func Run(ctx context.Context, params RunParams) error {
} }
ui.DestinationAdded() ui.DestinationAdded()
case terminal.CommandRemoveDestination: case terminal.CommandRemoveDestination:
mp.StopDestination(c.URL) // no-op if not live repl.StopDestination(c.URL) // no-op if not live
newCfg := cfg newCfg := cfg
newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool { newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool {
return dest.URL == c.URL return dest.URL == c.URL
@ -145,9 +145,9 @@ func Run(ctx context.Context, params RunParams) error {
continue continue
} }
mp.StartDestination(c.URL) repl.StartDestination(c.URL)
case terminal.CommandStopDestination: case terminal.CommandStopDestination:
mp.StopDestination(c.URL) repl.StopDestination(c.URL)
case terminal.CommandQuit: case terminal.CommandQuit:
return nil return nil
} }
@ -157,12 +157,12 @@ func Run(ctx context.Context, params RunParams) error {
logger.Debug("Server state received", "state", serverState) logger.Debug("Server state received", "state", serverState)
applyServerState(serverState, state) applyServerState(serverState, state)
updateUI() updateUI()
case mpState := <-mp.C(): case replState := <-repl.C():
logger.Debug("Multiplexer state received", "state", mpState) logger.Debug("Replicator state received", "state", replState)
destErrors := applyMultiplexerState(mpState, state) destErrors := applyReplicatorState(replState, state)
for _, destError := range destErrors { for _, destError := range destErrors {
handleDestError(destError, mp, ui) handleDestError(destError, repl, ui)
} }
updateUI() updateUI()
@ -183,29 +183,29 @@ type destinationError struct {
err error err error
} }
// applyMultiplexerState applies the current multiplexer state to the app state. // applyReplicatorState applies the current replicator state to the app state.
// //
// It returns a list of destination errors that should be displayed to the user. // It returns a list of destination errors that should be displayed to the user.
func applyMultiplexerState(mpState multiplexer.State, appState *domain.AppState) []destinationError { func applyReplicatorState(replState replicator.State, appState *domain.AppState) []destinationError {
var errorsToDisplay []destinationError var errorsToDisplay []destinationError
for i := range appState.Destinations { for i := range appState.Destinations {
dest := &appState.Destinations[i] dest := &appState.Destinations[i]
if dest.URL != mpState.URL { if dest.URL != replState.URL {
continue continue
} }
if dest.Container.Err == nil && mpState.Container.Err != nil { if dest.Container.Err == nil && replState.Container.Err != nil {
errorsToDisplay = append(errorsToDisplay, destinationError{ errorsToDisplay = append(errorsToDisplay, destinationError{
name: dest.Name, name: dest.Name,
url: dest.URL, url: dest.URL,
err: mpState.Container.Err, err: replState.Container.Err,
}) })
} }
dest.Container = mpState.Container dest.Container = replState.Container
dest.Status = mpState.Status dest.Status = replState.Status
break break
} }
@ -214,10 +214,10 @@ func applyMultiplexerState(mpState multiplexer.State, appState *domain.AppState)
} }
// handleDestError displays a modal to the user, and stops the destination. // handleDestError displays a modal to the user, and stops the destination.
func handleDestError(destError destinationError, mp *multiplexer.Actor, ui *terminal.UI) { func handleDestError(destError destinationError, repl *replicator.Actor, ui *terminal.UI) {
ui.ShowDestinationErrorModal(destError.name, destError.err) ui.ShowDestinationErrorModal(destError.name, destError.err)
mp.StopDestination(destError.url) repl.StopDestination(destError.url)
} }
// applyConfig applies the config to the app state. For now we only set the // applyConfig applies the config to the app state. For now we only set the

View File

@ -1,4 +1,4 @@
package multiplexer package replicator
import ( import (
"cmp" "cmp"
@ -19,19 +19,19 @@ type action func()
const ( const (
defaultChanSize = 64 // default channel size for asynchronous non-error channels defaultChanSize = 64 // default channel size for asynchronous non-error channels
componentName = "multiplexer" // component name, mostly used for Docker labels componentName = "replicator" // component name, mostly used for Docker labels
imageNameFFMPEG = "ghcr.io/jrottenberg/ffmpeg:7.1-scratch" // image name for ffmpeg imageNameFFMPEG = "ghcr.io/jrottenberg/ffmpeg:7.1-scratch" // image name for ffmpeg
) )
// State is the state of a single destination from the point of view of the // State is the state of a single destination from the point of view of the
// multiplexer. // replicator.
type State struct { type State struct {
URL string URL string
Container domain.Container Container domain.Container
Status domain.DestinationStatus Status domain.DestinationStatus
} }
// Actor is responsible for managing the multiplexer. // Actor is responsible for managing the replicator.
type Actor struct { type Actor struct {
wg sync.WaitGroup wg sync.WaitGroup
ctx context.Context ctx context.Context
@ -47,7 +47,7 @@ type Actor struct {
nextIndex int nextIndex int
} }
// NewActorParams contains the parameters for starting a new multiplexer actor. // NewActorParams contains the parameters for starting a new replicator actor.
type NewActorParams struct { type NewActorParams struct {
SourceURL string SourceURL string
ChanSize int ChanSize int
@ -55,7 +55,7 @@ type NewActorParams struct {
Logger *slog.Logger Logger *slog.Logger
} }
// NewActor starts a new multiplexer actor. // NewActor starts a new replicator actor.
// //
// The channel exposed by [C] must be consumed by the caller. // The channel exposed by [C] must be consumed by the caller.
func NewActor(ctx context.Context, params NewActorParams) *Actor { func NewActor(ctx context.Context, params NewActorParams) *Actor {
@ -175,7 +175,7 @@ func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, er
} }
} }
// C returns a channel that will receive the current state of the multiplexer. // C returns a channel that will receive the current state of the replicator.
// The channel is never closed. // The channel is never closed.
func (a *Actor) C() <-chan State { func (a *Actor) C() <-chan State {
return a.stateC return a.stateC

View File

@ -779,7 +779,7 @@ func (ui *UI) toggleDestination() {
return return
} }
// Communicating with the multiplexer/container client is asynchronous. To // Communicating with the replicator/container client is asynchronous. To
// ensure we can limit each destination to a single container we need some // ensure we can limit each destination to a single container we need some
// kind of local mutable state which synchronously tracks the "start state" // kind of local mutable state which synchronously tracks the "start state"
// of each destination. // of each destination.
@ -879,7 +879,7 @@ func (ui *UI) showAbout() {
ui.showModal( ui.showModal(
pageNameModalAbout, pageNameModalAbout,
fmt.Sprintf( fmt.Sprintf(
"%s: live stream multiplexer\n(c) Rob Watson\nhttps://git.netflux.io/rob/octoplex\n\nReleased under AGPL3.\n\nv%s (%s)\nBuilt on %s (%s).", "%s: live stream replicator\n(c) Rob Watson\nhttps://git.netflux.io/rob/octoplex\n\nReleased under AGPL3.\n\nv%s (%s)\nBuilt on %s (%s).",
domain.AppName, domain.AppName,
cmp.Or(ui.buildInfo.Version, "0.0.0-devel"), cmp.Or(ui.buildInfo.Version, "0.0.0-devel"),
cmp.Or(commit, "unknown SHA"), cmp.Or(commit, "unknown SHA"),