From 522be621eed76b5d7c79dec22c12ae79149fe7fc Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Thu, 3 Apr 2025 07:03:18 +0200 Subject: [PATCH] refactor: replicator --- internal/app/app.go | 40 +++++++++---------- .../replicator.go} | 14 +++---- internal/terminal/terminal.go | 4 +- 3 files changed, 29 insertions(+), 29 deletions(-) rename internal/{multiplexer/multiplexer.go => replicator/replicator.go} (94%) diff --git a/internal/app/app.go b/internal/app/app.go index e4f13a9..34b9b57 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -12,7 +12,7 @@ import ( "git.netflux.io/rob/octoplex/internal/container" "git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/mediaserver" - "git.netflux.io/rob/octoplex/internal/multiplexer" + "git.netflux.io/rob/octoplex/internal/replicator" "git.netflux.io/rob/octoplex/internal/terminal" ) @@ -91,12 +91,12 @@ func Run(ctx context.Context, params RunParams) error { } defer srv.Close() - mp := multiplexer.NewActor(ctx, multiplexer.NewActorParams{ + repl := replicator.NewActor(ctx, replicator.NewActorParams{ SourceURL: srv.State().RTMPInternalURL, ContainerClient: containerClient, - Logger: logger.With("component", "multiplexer"), + Logger: logger.With("component", "replicator"), }) - defer mp.Close() + defer repl.Close() const uiUpdateInterval = time.Second uiUpdateT := time.NewTicker(uiUpdateInterval) @@ -129,7 +129,7 @@ func Run(ctx context.Context, params RunParams) error { } ui.DestinationAdded() case terminal.CommandRemoveDestination: - mp.StopDestination(c.URL) // no-op if not live + repl.StopDestination(c.URL) // no-op if not live newCfg := cfg newCfg.Destinations = slices.DeleteFunc(newCfg.Destinations, func(dest config.Destination) bool { return dest.URL == c.URL @@ -145,9 +145,9 @@ func Run(ctx context.Context, params RunParams) error { continue } - mp.StartDestination(c.URL) + repl.StartDestination(c.URL) case terminal.CommandStopDestination: - mp.StopDestination(c.URL) + repl.StopDestination(c.URL) case terminal.CommandQuit: return nil } @@ -157,12 +157,12 @@ func Run(ctx context.Context, params RunParams) error { logger.Debug("Server state received", "state", serverState) applyServerState(serverState, state) updateUI() - case mpState := <-mp.C(): - logger.Debug("Multiplexer state received", "state", mpState) - destErrors := applyMultiplexerState(mpState, state) + case replState := <-repl.C(): + logger.Debug("Replicator state received", "state", replState) + destErrors := applyReplicatorState(replState, state) for _, destError := range destErrors { - handleDestError(destError, mp, ui) + handleDestError(destError, repl, ui) } updateUI() @@ -183,29 +183,29 @@ type destinationError struct { err error } -// applyMultiplexerState applies the current multiplexer state to the app state. +// applyReplicatorState applies the current replicator state to the app state. // // It returns a list of destination errors that should be displayed to the user. -func applyMultiplexerState(mpState multiplexer.State, appState *domain.AppState) []destinationError { +func applyReplicatorState(replState replicator.State, appState *domain.AppState) []destinationError { var errorsToDisplay []destinationError for i := range appState.Destinations { dest := &appState.Destinations[i] - if dest.URL != mpState.URL { + if dest.URL != replState.URL { continue } - if dest.Container.Err == nil && mpState.Container.Err != nil { + if dest.Container.Err == nil && replState.Container.Err != nil { errorsToDisplay = append(errorsToDisplay, destinationError{ name: dest.Name, url: dest.URL, - err: mpState.Container.Err, + err: replState.Container.Err, }) } - dest.Container = mpState.Container - dest.Status = mpState.Status + dest.Container = replState.Container + dest.Status = replState.Status break } @@ -214,10 +214,10 @@ func applyMultiplexerState(mpState multiplexer.State, appState *domain.AppState) } // handleDestError displays a modal to the user, and stops the destination. -func handleDestError(destError destinationError, mp *multiplexer.Actor, ui *terminal.UI) { +func handleDestError(destError destinationError, repl *replicator.Actor, ui *terminal.UI) { ui.ShowDestinationErrorModal(destError.name, destError.err) - mp.StopDestination(destError.url) + repl.StopDestination(destError.url) } // applyConfig applies the config to the app state. For now we only set the diff --git a/internal/multiplexer/multiplexer.go b/internal/replicator/replicator.go similarity index 94% rename from internal/multiplexer/multiplexer.go rename to internal/replicator/replicator.go index 87dcac2..d2e06d8 100644 --- a/internal/multiplexer/multiplexer.go +++ b/internal/replicator/replicator.go @@ -1,4 +1,4 @@ -package multiplexer +package replicator import ( "cmp" @@ -19,19 +19,19 @@ type action func() const ( defaultChanSize = 64 // default channel size for asynchronous non-error channels - componentName = "multiplexer" // component name, mostly used for Docker labels + componentName = "replicator" // component name, mostly used for Docker labels imageNameFFMPEG = "ghcr.io/jrottenberg/ffmpeg:7.1-scratch" // image name for ffmpeg ) // State is the state of a single destination from the point of view of the -// multiplexer. +// replicator. type State struct { URL string Container domain.Container Status domain.DestinationStatus } -// Actor is responsible for managing the multiplexer. +// Actor is responsible for managing the replicator. type Actor struct { wg sync.WaitGroup ctx context.Context @@ -47,7 +47,7 @@ type Actor struct { nextIndex int } -// NewActorParams contains the parameters for starting a new multiplexer actor. +// NewActorParams contains the parameters for starting a new replicator actor. type NewActorParams struct { SourceURL string ChanSize int @@ -55,7 +55,7 @@ type NewActorParams struct { Logger *slog.Logger } -// NewActor starts a new multiplexer actor. +// NewActor starts a new replicator actor. // // The channel exposed by [C] must be consumed by the caller. func NewActor(ctx context.Context, params NewActorParams) *Actor { @@ -175,7 +175,7 @@ func (a *Actor) destLoop(url string, containerStateC <-chan domain.Container, er } } -// C returns a channel that will receive the current state of the multiplexer. +// C returns a channel that will receive the current state of the replicator. // The channel is never closed. func (a *Actor) C() <-chan State { return a.stateC diff --git a/internal/terminal/terminal.go b/internal/terminal/terminal.go index 2ef51a6..8330f38 100644 --- a/internal/terminal/terminal.go +++ b/internal/terminal/terminal.go @@ -779,7 +779,7 @@ func (ui *UI) toggleDestination() { return } - // Communicating with the multiplexer/container client is asynchronous. To + // Communicating with the replicator/container client is asynchronous. To // ensure we can limit each destination to a single container we need some // kind of local mutable state which synchronously tracks the "start state" // of each destination. @@ -879,7 +879,7 @@ func (ui *UI) showAbout() { ui.showModal( pageNameModalAbout, fmt.Sprintf( - "%s: live stream multiplexer\n(c) Rob Watson\nhttps://git.netflux.io/rob/octoplex\n\nReleased under AGPL3.\n\nv%s (%s)\nBuilt on %s (%s).", + "%s: live stream replicator\n(c) Rob Watson\nhttps://git.netflux.io/rob/octoplex\n\nReleased under AGPL3.\n\nv%s (%s)\nBuilt on %s (%s).", domain.AppName, cmp.Or(ui.buildInfo.Version, "0.0.0-devel"), cmp.Or(commit, "unknown SHA"),