fix(app): event ordering
Some checks are pending
ci-build / lint (push) Waiting to run
ci-build / build (push) Blocked by required conditions
ci-build / release (push) Blocked by required conditions
ci-scan / Analyze (go) (push) Waiting to run
ci-scan / Analyze (actions) (push) Waiting to run

Use a single channel per consumer, instead of one channel per
consumer/event tuple. This ensures that overall ordering of events
remains consistent, and avoids introducing subtle race conditions.
This commit is contained in:
Rob Watson 2025-04-25 17:41:01 +02:00
parent 94623248c0
commit 1f4a931903
4 changed files with 33 additions and 54 deletions

View File

@ -9,7 +9,6 @@ import (
"log/slog" "log/slog"
"net/http" "net/http"
"os" "os"
"strconv"
"strings" "strings"
"sync" "sync"
"testing" "testing"
@ -158,7 +157,6 @@ func sendKey(t *testing.T, screen tcell.SimulationScreen, key tcell.Key, ch rune
for i := 0; i < maxTries; i++ { for i := 0; i < maxTries; i++ {
if err := screen.PostEvent(tcell.NewEventKey(key, ch, tcell.ModNone)); err != nil { if err := screen.PostEvent(tcell.NewEventKey(key, ch, tcell.ModNone)); err != nil {
fmt.Printf("Error injecting rune %s, will retry in %s: %s\n", strconv.QuoteRune(ch), waitTime, err)
time.Sleep(waitTime) time.Sleep(waitTime)
} else { } else {
return return

View File

@ -9,7 +9,7 @@ const defaultChannelSize = 64
// Bus is an event bus. // Bus is an event bus.
type Bus struct { type Bus struct {
consumers map[Name][]chan Event consumers []chan Event
mu sync.Mutex mu sync.Mutex
logger *slog.Logger logger *slog.Logger
} }
@ -17,18 +17,17 @@ type Bus struct {
// NewBus returns a new event bus. // NewBus returns a new event bus.
func NewBus(logger *slog.Logger) *Bus { func NewBus(logger *slog.Logger) *Bus {
return &Bus{ return &Bus{
consumers: make(map[Name][]chan Event), logger: logger,
logger: logger,
} }
} }
// Register registers a consumer for a given event. // Register registers a consumer for all events.
func (b *Bus) Register(name Name) <-chan Event { func (b *Bus) Register() <-chan Event {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
ch := make(chan Event, defaultChannelSize) ch := make(chan Event, defaultChannelSize)
b.consumers[name] = append(b.consumers[name], ch) b.consumers = append(b.consumers, ch)
return ch return ch
} }
@ -40,7 +39,7 @@ func (b *Bus) Send(evt Event) {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
for _, ch := range b.consumers[evt.name()] { for _, ch := range b.consumers {
select { select {
case ch <- evt: case ch <- evt:
default: default:

View File

@ -11,8 +11,8 @@ import (
func TestBus(t *testing.T) { func TestBus(t *testing.T) {
bus := event.NewBus(testhelpers.NewTestLogger(t)) bus := event.NewBus(testhelpers.NewTestLogger(t))
ch1 := bus.Register(event.EventNameMediaServerStarted) ch1 := bus.Register()
ch2 := bus.Register(event.EventNameMediaServerStarted) ch2 := bus.Register()
evt := event.MediaServerStartedEvent{ evt := event.MediaServerStartedEvent{
RTMPURL: "rtmp://rtmp.example.com/live", RTMPURL: "rtmp://rtmp.example.com/live",

View File

@ -279,15 +279,7 @@ func (ui *UI) C() <-chan domain.Command {
func (ui *UI) run(ctx context.Context) { func (ui *UI) run(ctx context.Context) {
defer close(ui.commandC) defer close(ui.commandC)
appStateChangedC := ui.eventBus.Register(event.EventNameAppStateChanged) eventC := ui.eventBus.Register()
destinationAddedC := ui.eventBus.Register(event.EventNameDestinationAdded)
addDestinationFailedC := ui.eventBus.Register(event.EventNameAddDestinationFailed)
startDestinationFailedC := ui.eventBus.Register(event.EventNameStartDestinationFailed)
destinationRemovedC := ui.eventBus.Register(event.EventNameDestinationRemoved)
removeDestinationFailedC := ui.eventBus.Register(event.EventNameRemoveDestinationFailed)
existingAppDetectedC := ui.eventBus.Register(event.EventNameOtherInstanceDetected)
mediaServerStartedC := ui.eventBus.Register(event.EventNameMediaServerStarted)
fatalErrorOccurredC := ui.eventBus.Register(event.EventNameFatalErrorOccurred)
uiDone := make(chan struct{}) uiDone := make(chan struct{})
go func() { go func() {
@ -302,41 +294,31 @@ func (ui *UI) run(ctx context.Context) {
for { for {
select { select {
case evt := <-appStateChangedC: case evt := <-eventC:
ui.app.QueueUpdateDraw(func() { ui.app.QueueUpdateDraw(func() {
ui.handleAppStateChanged(evt.(event.AppStateChangedEvent)) switch evt := evt.(type) {
}) case event.AppStateChangedEvent:
case evt := <-destinationAddedC: ui.handleAppStateChanged(evt)
ui.app.QueueUpdateDraw(func() { case event.DestinationAddedEvent:
ui.handleDestinationAdded(evt.(event.DestinationAddedEvent)) ui.handleDestinationAdded(evt)
}) case event.StartDestinationFailedEvent:
case evt := <-startDestinationFailedC: ui.handleStartDestinationFailed(evt)
ui.app.QueueUpdateDraw(func() { case event.AddDestinationFailedEvent:
ui.handleStartDestinationFailed(evt.(event.StartDestinationFailedEvent)) ui.handleDestinationEventError(evt.Err)
}) case event.DestinationRemovedEvent:
case evt := <-addDestinationFailedC: ui.handleDestinationRemoved(evt)
ui.app.QueueUpdateDraw(func() { case event.RemoveDestinationFailedEvent:
ui.handleDestinationEventError(evt.(event.AddDestinationFailedEvent).Err) ui.handleDestinationEventError(evt.Err)
}) case event.OtherInstanceDetectedEvent:
case evt := <-destinationRemovedC: ui.handleOtherInstanceDetected(evt)
ui.app.QueueUpdateDraw(func() { case event.MediaServerStartedEvent:
ui.handleDestinationRemoved(evt.(event.DestinationRemovedEvent)) ui.handleMediaServerStarted(evt)
}) case event.FatalErrorOccurredEvent:
case evt := <-removeDestinationFailedC: ui.handleFatalErrorOccurred(evt)
ui.app.QueueUpdateDraw(func() { default:
ui.handleDestinationEventError(evt.(event.RemoveDestinationFailedEvent).Err) ui.logger.Warn("unhandled event", "event", evt)
}) }
case evt := <-existingAppDetectedC:
ui.app.QueueUpdateDraw(func() {
ui.handleOtherInstanceDetected(evt.(event.OtherInstanceDetectedEvent))
})
case evt := <-mediaServerStartedC:
ui.app.QueueUpdateDraw(func() {
ui.handleMediaServerStarted(evt.(event.MediaServerStartedEvent))
})
case evt := <-fatalErrorOccurredC:
ui.app.QueueUpdateDraw(func() {
ui.handleFatalErrorOccurred(evt.(event.FatalErrorOccurredEvent))
}) })
case <-ctx.Done(): case <-ctx.Done():
return return