From 4b464c680d1e91e8b85a7e9438c92c6c2821c047 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Tue, 22 Apr 2025 15:50:49 +0200 Subject: [PATCH] refactor: add event bus --- internal/app/app.go | 12 +++--- internal/domain/events.go | 1 + internal/event/bus.go | 70 +++++++++++++++++++++++++++++++++++ internal/event/bus_test.go | 29 +++++++++++++++ internal/terminal/terminal.go | 18 ++++++--- 5 files changed, 119 insertions(+), 11 deletions(-) create mode 100644 internal/domain/events.go create mode 100644 internal/event/bus.go create mode 100644 internal/event/bus_test.go diff --git a/internal/app/app.go b/internal/app/app.go index 557ef17..b3c86ee 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -11,6 +11,7 @@ import ( "git.netflux.io/rob/octoplex/internal/config" "git.netflux.io/rob/octoplex/internal/container" "git.netflux.io/rob/octoplex/internal/domain" + "git.netflux.io/rob/octoplex/internal/event" "git.netflux.io/rob/octoplex/internal/mediaserver" "git.netflux.io/rob/octoplex/internal/replicator" "git.netflux.io/rob/octoplex/internal/terminal" @@ -30,6 +31,9 @@ type RunParams struct { // Run starts the application, and blocks until it exits. func Run(ctx context.Context, params RunParams) error { + logger := params.Logger + eventBus := event.NewBus(logger.With("component", "event_bus")) + // cfg is the current configuration of the application, as reflected in the // config file. cfg := params.ConfigService.Current() @@ -43,8 +47,8 @@ func Run(ctx context.Context, params RunParams) error { return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set") } - logger := params.Logger ui, err := terminal.StartUI(ctx, terminal.StartParams{ + EventBus: eventBus, Screen: params.Screen, ClipboardAvailable: params.ClipboardAvailable, ConfigFilePath: params.ConfigFilePath, @@ -113,10 +117,6 @@ func Run(ctx context.Context, params RunParams) error { } defer srv.Close() - // Set the RTMP and RTMPS URLs in the UI, which are only known after the - // MediaServer is available. - ui.SetRTMPURLs(srv.RTMPURL(), srv.RTMPSURL()) - repl := replicator.StartActor(ctx, replicator.StartActorParams{ SourceURL: srv.RTMPInternalURL(), ContainerClient: containerClient, @@ -143,6 +143,8 @@ func Run(ctx context.Context, params RunParams) error { if err = srv.Start(ctx); err != nil { return fmt.Errorf("start mediaserver: %w", err) } + + eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()}) } case <-params.ConfigService.C(): // No-op, config updates are handled synchronously for now. diff --git a/internal/domain/events.go b/internal/domain/events.go new file mode 100644 index 0000000..4188b5a --- /dev/null +++ b/internal/domain/events.go @@ -0,0 +1 @@ +package domain diff --git a/internal/event/bus.go b/internal/event/bus.go new file mode 100644 index 0000000..7584854 --- /dev/null +++ b/internal/event/bus.go @@ -0,0 +1,70 @@ +package event + +import ( + "log/slog" + "sync" +) + +const defaultChannelSize = 64 + +type Name string + +const ( + EventNameMediaServerStarted Name = "media_server_started" +) + +type Event interface { + name() Name +} + +// MediaServerStartedEvent is emitted when the mediaserver component starts successfully. +type MediaServerStartedEvent struct { + RTMPURL string + RTMPSURL string +} + +func (e MediaServerStartedEvent) name() Name { + return "media_server_started" +} + +// Bus is an event bus. +type Bus struct { + consumers map[Name][]chan Event + mu sync.Mutex + logger *slog.Logger +} + +// NewBus returns a new event bus. +func NewBus(logger *slog.Logger) *Bus { + return &Bus{ + consumers: make(map[Name][]chan Event), + logger: logger, + } +} + +// Register registers a consumer for a given event. +func (b *Bus) Register(name Name) <-chan Event { + b.mu.Lock() + defer b.mu.Unlock() + + ch := make(chan Event, defaultChannelSize) + b.consumers[name] = append(b.consumers[name], ch) + return ch +} + +// Send sends an event to all registered consumers. +func (b *Bus) Send(evt Event) { + // The mutex is needed to ensure the backing array of b.consumers cannot be + // modified under our feet. There is probably a more efficient way to do this + // but this should be ok. + b.mu.Lock() + defer b.mu.Unlock() + + for _, ch := range b.consumers[evt.name()] { + select { + case ch <- evt: + default: + b.logger.Warn("Event dropped", "name", evt.name()) + } + } +} diff --git a/internal/event/bus_test.go b/internal/event/bus_test.go new file mode 100644 index 0000000..df6c13a --- /dev/null +++ b/internal/event/bus_test.go @@ -0,0 +1,29 @@ +package event_test + +import ( + "testing" + + "git.netflux.io/rob/octoplex/internal/event" + "git.netflux.io/rob/octoplex/internal/testhelpers" + "github.com/stretchr/testify/assert" +) + +func TestBus(t *testing.T) { + bus := event.NewBus(testhelpers.NewTestLogger(t)) + + ch1 := bus.Register(event.EventNameMediaServerStarted) + ch2 := bus.Register(event.EventNameMediaServerStarted) + + evt := event.MediaServerStartedEvent{ + RTMPURL: "rtmp://rtmp.example.com/live", + RTMPSURL: "rtmps://rtmp.example.com/live", + } + + go func() { + bus.Send(evt) + bus.Send(evt) + }() + + assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent)) + assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent)) +} diff --git a/internal/terminal/terminal.go b/internal/terminal/terminal.go index cc065a3..bc1e8cc 100644 --- a/internal/terminal/terminal.go +++ b/internal/terminal/terminal.go @@ -13,6 +13,7 @@ import ( "time" "git.netflux.io/rob/octoplex/internal/domain" + "git.netflux.io/rob/octoplex/internal/event" "git.netflux.io/rob/octoplex/internal/shortid" "github.com/gdamore/tcell/v2" "github.com/rivo/tview" @@ -40,6 +41,7 @@ const ( // UI is responsible for managing the terminal user interface. type UI struct { + eventBus *event.Bus commandC chan domain.Command clipboardAvailable bool configFilePath string @@ -93,6 +95,7 @@ type ScreenCapture struct { // StartParams contains the parameters for starting a new terminal user // interface. type StartParams struct { + EventBus *event.Bus ChanSize int Logger *slog.Logger ClipboardAvailable bool @@ -211,6 +214,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) { ui := &UI{ commandC: commandCh, + eventBus: params.EventBus, clipboardAvailable: params.ClipboardAvailable, configFilePath: params.ConfigFilePath, buildInfo: params.BuildInfo, @@ -275,6 +279,8 @@ func (ui *UI) C() <-chan domain.Command { func (ui *UI) run(ctx context.Context) { defer close(ui.commandC) + mediaServerStartedC := ui.eventBus.Register(event.EventNameMediaServerStarted) + uiDone := make(chan struct{}) go func() { defer func() { @@ -288,6 +294,8 @@ func (ui *UI) run(ctx context.Context) { for { select { + case evt := <-mediaServerStartedC: + ui.handleMediaServerStarted(evt.(event.MediaServerStartedEvent)) case <-ctx.Done(): return case <-uiDone: @@ -296,15 +304,13 @@ func (ui *UI) run(ctx context.Context) { } } -// SetRTMPURLs sets the RTMP and RTMPS URLs for the user interface, which are -// unavailable when the UI is first created. -func (ui *UI) SetRTMPURLs(rtmpURL, rtmpsURL string) { +func (ui *UI) handleMediaServerStarted(evt event.MediaServerStartedEvent) { ui.mu.Lock() - ui.rtmpURL = rtmpURL - ui.rtmpsURL = rtmpsURL + ui.rtmpURL = evt.RTMPURL + ui.rtmpsURL = evt.RTMPSURL ui.mu.Unlock() - ui.renderAboutView() + ui.app.QueueUpdateDraw(ui.renderAboutView) } func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {