fix: context handling

This commit is contained in:
Rob Watson 2025-02-23 03:12:24 +01:00
parent 0141c54c64
commit e984258444
2 changed files with 12 additions and 4 deletions

View File

@ -255,6 +255,7 @@ func (a *Client) runContainerLoop(
containerErrC <- err containerErrC <- err
return return
case <-ctx.Done(): case <-ctx.Done():
containerErrC <- ctx.Err()
return return
} }
} }

View File

@ -32,6 +32,8 @@ type action func()
// Actor is responsible for managing the media server. // Actor is responsible for managing the media server.
type Actor struct { type Actor struct {
ctx context.Context
cancel context.CancelFunc
actorC chan action actorC chan action
stateC chan domain.Source stateC chan domain.Source
containerClient *container.Client containerClient *container.Client
@ -61,8 +63,11 @@ type StartActorParams struct {
// Callers must consume the state channel exposed via [C]. // Callers must consume the state channel exposed via [C].
func StartActor(ctx context.Context, params StartActorParams) *Actor { func StartActor(ctx context.Context, params StartActorParams) *Actor {
chanSize := cmp.Or(params.ChanSize, defaultChanSize) chanSize := cmp.Or(params.ChanSize, defaultChanSize)
ctx, cancel := context.WithCancel(ctx)
actor := &Actor{ actor := &Actor{
ctx: ctx,
cancel: cancel,
apiPort: cmp.Or(params.APIPort, defaultAPIPort), apiPort: cmp.Or(params.APIPort, defaultAPIPort),
rtmpPort: cmp.Or(params.RTMPPort, defaultRTMPPort), rtmpPort: cmp.Or(params.RTMPPort, defaultRTMPPort),
fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval), fetchIngressStateInterval: cmp.Or(params.FetchIngressStateInterval, defaultFetchIngressStateInterval),
@ -138,13 +143,13 @@ func (s *Actor) Close() error {
return fmt.Errorf("remove containers: %w", err) return fmt.Errorf("remove containers: %w", err)
} }
close(s.actorC) s.cancel()
return nil return nil
} }
// actorLoop is the main loop of the media server actor. It only exits when the // actorLoop is the main loop of the media server actor. It exits when the
// actor is closed. // actor is closed, or the parent context is cancelled.
func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan error) { func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan error) {
fetchStateT := time.NewTicker(s.fetchIngressStateInterval) fetchStateT := time.NewTicker(s.fetchIngressStateInterval)
defer fetchStateT.Stop() defer fetchStateT.Stop()
@ -198,9 +203,11 @@ func (s *Actor) actorLoop(containerStateC <-chan domain.Container, errC <-chan e
} }
case action, ok := <-s.actorC: case action, ok := <-s.actorC:
if !ok { if !ok {
return continue
} }
action() action()
case <-s.ctx.Done():
return
} }
} }
} }