refactor: add event bus

This commit is contained in:
Rob Watson 2025-04-22 15:50:49 +02:00
parent eaa06fe3f4
commit 4b464c680d
5 changed files with 119 additions and 11 deletions

View File

@ -11,6 +11,7 @@ import (
"git.netflux.io/rob/octoplex/internal/config" "git.netflux.io/rob/octoplex/internal/config"
"git.netflux.io/rob/octoplex/internal/container" "git.netflux.io/rob/octoplex/internal/container"
"git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
"git.netflux.io/rob/octoplex/internal/mediaserver" "git.netflux.io/rob/octoplex/internal/mediaserver"
"git.netflux.io/rob/octoplex/internal/replicator" "git.netflux.io/rob/octoplex/internal/replicator"
"git.netflux.io/rob/octoplex/internal/terminal" "git.netflux.io/rob/octoplex/internal/terminal"
@ -30,6 +31,9 @@ type RunParams struct {
// Run starts the application, and blocks until it exits. // Run starts the application, and blocks until it exits.
func Run(ctx context.Context, params RunParams) error { func Run(ctx context.Context, params RunParams) error {
logger := params.Logger
eventBus := event.NewBus(logger.With("component", "event_bus"))
// cfg is the current configuration of the application, as reflected in the // cfg is the current configuration of the application, as reflected in the
// config file. // config file.
cfg := params.ConfigService.Current() cfg := params.ConfigService.Current()
@ -43,8 +47,8 @@ func Run(ctx context.Context, params RunParams) error {
return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set") return errors.New("config: either sources.mediaServer.rtmp.enabled or sources.mediaServer.rtmps.enabled must be set")
} }
logger := params.Logger
ui, err := terminal.StartUI(ctx, terminal.StartParams{ ui, err := terminal.StartUI(ctx, terminal.StartParams{
EventBus: eventBus,
Screen: params.Screen, Screen: params.Screen,
ClipboardAvailable: params.ClipboardAvailable, ClipboardAvailable: params.ClipboardAvailable,
ConfigFilePath: params.ConfigFilePath, ConfigFilePath: params.ConfigFilePath,
@ -113,10 +117,6 @@ func Run(ctx context.Context, params RunParams) error {
} }
defer srv.Close() defer srv.Close()
// Set the RTMP and RTMPS URLs in the UI, which are only known after the
// MediaServer is available.
ui.SetRTMPURLs(srv.RTMPURL(), srv.RTMPSURL())
repl := replicator.StartActor(ctx, replicator.StartActorParams{ repl := replicator.StartActor(ctx, replicator.StartActorParams{
SourceURL: srv.RTMPInternalURL(), SourceURL: srv.RTMPInternalURL(),
ContainerClient: containerClient, ContainerClient: containerClient,
@ -143,6 +143,8 @@ func Run(ctx context.Context, params RunParams) error {
if err = srv.Start(ctx); err != nil { if err = srv.Start(ctx); err != nil {
return fmt.Errorf("start mediaserver: %w", err) return fmt.Errorf("start mediaserver: %w", err)
} }
eventBus.Send(event.MediaServerStartedEvent{RTMPURL: srv.RTMPURL(), RTMPSURL: srv.RTMPSURL()})
} }
case <-params.ConfigService.C(): case <-params.ConfigService.C():
// No-op, config updates are handled synchronously for now. // No-op, config updates are handled synchronously for now.

View File

@ -0,0 +1 @@
package domain

70
internal/event/bus.go Normal file
View File

@ -0,0 +1,70 @@
package event
import (
"log/slog"
"sync"
)
const defaultChannelSize = 64
type Name string
const (
EventNameMediaServerStarted Name = "media_server_started"
)
type Event interface {
name() Name
}
// MediaServerStartedEvent is emitted when the mediaserver component starts successfully.
type MediaServerStartedEvent struct {
RTMPURL string
RTMPSURL string
}
func (e MediaServerStartedEvent) name() Name {
return "media_server_started"
}
// Bus is an event bus.
type Bus struct {
consumers map[Name][]chan Event
mu sync.Mutex
logger *slog.Logger
}
// NewBus returns a new event bus.
func NewBus(logger *slog.Logger) *Bus {
return &Bus{
consumers: make(map[Name][]chan Event),
logger: logger,
}
}
// Register registers a consumer for a given event.
func (b *Bus) Register(name Name) <-chan Event {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan Event, defaultChannelSize)
b.consumers[name] = append(b.consumers[name], ch)
return ch
}
// Send sends an event to all registered consumers.
func (b *Bus) Send(evt Event) {
// The mutex is needed to ensure the backing array of b.consumers cannot be
// modified under our feet. There is probably a more efficient way to do this
// but this should be ok.
b.mu.Lock()
defer b.mu.Unlock()
for _, ch := range b.consumers[evt.name()] {
select {
case ch <- evt:
default:
b.logger.Warn("Event dropped", "name", evt.name())
}
}
}

View File

@ -0,0 +1,29 @@
package event_test
import (
"testing"
"git.netflux.io/rob/octoplex/internal/event"
"git.netflux.io/rob/octoplex/internal/testhelpers"
"github.com/stretchr/testify/assert"
)
func TestBus(t *testing.T) {
bus := event.NewBus(testhelpers.NewTestLogger(t))
ch1 := bus.Register(event.EventNameMediaServerStarted)
ch2 := bus.Register(event.EventNameMediaServerStarted)
evt := event.MediaServerStartedEvent{
RTMPURL: "rtmp://rtmp.example.com/live",
RTMPSURL: "rtmps://rtmp.example.com/live",
}
go func() {
bus.Send(evt)
bus.Send(evt)
}()
assert.Equal(t, evt, (<-ch1).(event.MediaServerStartedEvent))
assert.Equal(t, evt, (<-ch2).(event.MediaServerStartedEvent))
}

View File

@ -13,6 +13,7 @@ import (
"time" "time"
"git.netflux.io/rob/octoplex/internal/domain" "git.netflux.io/rob/octoplex/internal/domain"
"git.netflux.io/rob/octoplex/internal/event"
"git.netflux.io/rob/octoplex/internal/shortid" "git.netflux.io/rob/octoplex/internal/shortid"
"github.com/gdamore/tcell/v2" "github.com/gdamore/tcell/v2"
"github.com/rivo/tview" "github.com/rivo/tview"
@ -40,6 +41,7 @@ const (
// UI is responsible for managing the terminal user interface. // UI is responsible for managing the terminal user interface.
type UI struct { type UI struct {
eventBus *event.Bus
commandC chan domain.Command commandC chan domain.Command
clipboardAvailable bool clipboardAvailable bool
configFilePath string configFilePath string
@ -93,6 +95,7 @@ type ScreenCapture struct {
// StartParams contains the parameters for starting a new terminal user // StartParams contains the parameters for starting a new terminal user
// interface. // interface.
type StartParams struct { type StartParams struct {
EventBus *event.Bus
ChanSize int ChanSize int
Logger *slog.Logger Logger *slog.Logger
ClipboardAvailable bool ClipboardAvailable bool
@ -211,6 +214,7 @@ func StartUI(ctx context.Context, params StartParams) (*UI, error) {
ui := &UI{ ui := &UI{
commandC: commandCh, commandC: commandCh,
eventBus: params.EventBus,
clipboardAvailable: params.ClipboardAvailable, clipboardAvailable: params.ClipboardAvailable,
configFilePath: params.ConfigFilePath, configFilePath: params.ConfigFilePath,
buildInfo: params.BuildInfo, buildInfo: params.BuildInfo,
@ -275,6 +279,8 @@ func (ui *UI) C() <-chan domain.Command {
func (ui *UI) run(ctx context.Context) { func (ui *UI) run(ctx context.Context) {
defer close(ui.commandC) defer close(ui.commandC)
mediaServerStartedC := ui.eventBus.Register(event.EventNameMediaServerStarted)
uiDone := make(chan struct{}) uiDone := make(chan struct{})
go func() { go func() {
defer func() { defer func() {
@ -288,6 +294,8 @@ func (ui *UI) run(ctx context.Context) {
for { for {
select { select {
case evt := <-mediaServerStartedC:
ui.handleMediaServerStarted(evt.(event.MediaServerStartedEvent))
case <-ctx.Done(): case <-ctx.Done():
return return
case <-uiDone: case <-uiDone:
@ -296,15 +304,13 @@ func (ui *UI) run(ctx context.Context) {
} }
} }
// SetRTMPURLs sets the RTMP and RTMPS URLs for the user interface, which are func (ui *UI) handleMediaServerStarted(evt event.MediaServerStartedEvent) {
// unavailable when the UI is first created.
func (ui *UI) SetRTMPURLs(rtmpURL, rtmpsURL string) {
ui.mu.Lock() ui.mu.Lock()
ui.rtmpURL = rtmpURL ui.rtmpURL = evt.RTMPURL
ui.rtmpsURL = rtmpsURL ui.rtmpsURL = evt.RTMPSURL
ui.mu.Unlock() ui.mu.Unlock()
ui.renderAboutView() ui.app.QueueUpdateDraw(ui.renderAboutView)
} }
func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey { func (ui *UI) inputCaptureHandler(event *tcell.EventKey) *tcell.EventKey {