diff --git a/internal/app/integration_helpers_test.go b/internal/app/integration_helpers_test.go index ed5d3f5..48c12a8 100644 --- a/internal/app/integration_helpers_test.go +++ b/internal/app/integration_helpers_test.go @@ -9,7 +9,6 @@ import ( "log/slog" "net/http" "os" - "strconv" "strings" "sync" "testing" @@ -158,7 +157,6 @@ func sendKey(t *testing.T, screen tcell.SimulationScreen, key tcell.Key, ch rune for i := 0; i < maxTries; i++ { if err := screen.PostEvent(tcell.NewEventKey(key, ch, tcell.ModNone)); err != nil { - fmt.Printf("Error injecting rune %s, will retry in %s: %s\n", strconv.QuoteRune(ch), waitTime, err) time.Sleep(waitTime) } else { return diff --git a/internal/event/bus.go b/internal/event/bus.go index 8945997..0498c40 100644 --- a/internal/event/bus.go +++ b/internal/event/bus.go @@ -9,7 +9,7 @@ const defaultChannelSize = 64 // Bus is an event bus. type Bus struct { - consumers map[Name][]chan Event + consumers []chan Event mu sync.Mutex logger *slog.Logger } @@ -17,18 +17,17 @@ type Bus struct { // NewBus returns a new event bus. func NewBus(logger *slog.Logger) *Bus { return &Bus{ - consumers: make(map[Name][]chan Event), - logger: logger, + logger: logger, } } -// Register registers a consumer for a given event. -func (b *Bus) Register(name Name) <-chan Event { +// Register registers a consumer for all events. +func (b *Bus) Register() <-chan Event { b.mu.Lock() defer b.mu.Unlock() ch := make(chan Event, defaultChannelSize) - b.consumers[name] = append(b.consumers[name], ch) + b.consumers = append(b.consumers, ch) return ch } @@ -40,7 +39,7 @@ func (b *Bus) Send(evt Event) { b.mu.Lock() defer b.mu.Unlock() - for _, ch := range b.consumers[evt.name()] { + for _, ch := range b.consumers { select { case ch <- evt: default: diff --git a/internal/event/bus_test.go b/internal/event/bus_test.go index df6c13a..cceae37 100644 --- a/internal/event/bus_test.go +++ b/internal/event/bus_test.go @@ -11,8 +11,8 @@ import ( func TestBus(t *testing.T) { bus := event.NewBus(testhelpers.NewTestLogger(t)) - ch1 := bus.Register(event.EventNameMediaServerStarted) - ch2 := bus.Register(event.EventNameMediaServerStarted) + ch1 := bus.Register() + ch2 := bus.Register() evt := event.MediaServerStartedEvent{ RTMPURL: "rtmp://rtmp.example.com/live", diff --git a/internal/terminal/terminal.go b/internal/terminal/terminal.go index 1a6ef4a..f7e25ab 100644 --- a/internal/terminal/terminal.go +++ b/internal/terminal/terminal.go @@ -279,15 +279,7 @@ func (ui *UI) C() <-chan domain.Command { func (ui *UI) run(ctx context.Context) { defer close(ui.commandC) - appStateChangedC := ui.eventBus.Register(event.EventNameAppStateChanged) - destinationAddedC := ui.eventBus.Register(event.EventNameDestinationAdded) - addDestinationFailedC := ui.eventBus.Register(event.EventNameAddDestinationFailed) - startDestinationFailedC := ui.eventBus.Register(event.EventNameStartDestinationFailed) - destinationRemovedC := ui.eventBus.Register(event.EventNameDestinationRemoved) - removeDestinationFailedC := ui.eventBus.Register(event.EventNameRemoveDestinationFailed) - existingAppDetectedC := ui.eventBus.Register(event.EventNameOtherInstanceDetected) - mediaServerStartedC := ui.eventBus.Register(event.EventNameMediaServerStarted) - fatalErrorOccurredC := ui.eventBus.Register(event.EventNameFatalErrorOccurred) + eventC := ui.eventBus.Register() uiDone := make(chan struct{}) go func() { @@ -302,41 +294,31 @@ func (ui *UI) run(ctx context.Context) { for { select { - case evt := <-appStateChangedC: + case evt := <-eventC: ui.app.QueueUpdateDraw(func() { - ui.handleAppStateChanged(evt.(event.AppStateChangedEvent)) - }) - case evt := <-destinationAddedC: - ui.app.QueueUpdateDraw(func() { - ui.handleDestinationAdded(evt.(event.DestinationAddedEvent)) - }) - case evt := <-startDestinationFailedC: - ui.app.QueueUpdateDraw(func() { - ui.handleStartDestinationFailed(evt.(event.StartDestinationFailedEvent)) - }) - case evt := <-addDestinationFailedC: - ui.app.QueueUpdateDraw(func() { - ui.handleDestinationEventError(evt.(event.AddDestinationFailedEvent).Err) - }) - case evt := <-destinationRemovedC: - ui.app.QueueUpdateDraw(func() { - ui.handleDestinationRemoved(evt.(event.DestinationRemovedEvent)) - }) - case evt := <-removeDestinationFailedC: - ui.app.QueueUpdateDraw(func() { - ui.handleDestinationEventError(evt.(event.RemoveDestinationFailedEvent).Err) - }) - case evt := <-existingAppDetectedC: - ui.app.QueueUpdateDraw(func() { - ui.handleOtherInstanceDetected(evt.(event.OtherInstanceDetectedEvent)) - }) - case evt := <-mediaServerStartedC: - ui.app.QueueUpdateDraw(func() { - ui.handleMediaServerStarted(evt.(event.MediaServerStartedEvent)) - }) - case evt := <-fatalErrorOccurredC: - ui.app.QueueUpdateDraw(func() { - ui.handleFatalErrorOccurred(evt.(event.FatalErrorOccurredEvent)) + switch evt := evt.(type) { + case event.AppStateChangedEvent: + ui.handleAppStateChanged(evt) + case event.DestinationAddedEvent: + ui.handleDestinationAdded(evt) + case event.StartDestinationFailedEvent: + ui.handleStartDestinationFailed(evt) + case event.AddDestinationFailedEvent: + ui.handleDestinationEventError(evt.Err) + case event.DestinationRemovedEvent: + ui.handleDestinationRemoved(evt) + case event.RemoveDestinationFailedEvent: + ui.handleDestinationEventError(evt.Err) + case event.OtherInstanceDetectedEvent: + ui.handleOtherInstanceDetected(evt) + case event.MediaServerStartedEvent: + ui.handleMediaServerStarted(evt) + case event.FatalErrorOccurredEvent: + ui.handleFatalErrorOccurred(evt) + default: + ui.logger.Warn("unhandled event", "event", evt) + } + }) case <-ctx.Done(): return