From 5a4ee4e34fecee2a9de77d4e76c4c52fed60d64d Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Wed, 5 Jan 2022 19:49:21 +0100 Subject: [PATCH] Add FFmpeg WorkerPool --- backend/.env.example | 5 +- backend/cmd/clipper/main.go | 10 ++- backend/config/config.go | 13 +++ backend/go.mod | 1 + backend/go.sum | 1 + backend/media/get_audio.go | 137 ++++++++++++++++++------------ backend/media/get_audio_test.go | 94 ++++++++++---------- backend/media/get_segment.go | 33 ++++--- backend/media/get_segment_test.go | 13 +-- backend/media/get_video_test.go | 22 ++--- backend/media/service.go | 8 +- backend/media/service_test.go | 5 +- backend/media/worker_pool.go | 73 ++++++++++++++++ backend/media/worker_pool_test.go | 53 ++++++++++++ backend/server/server.go | 2 + 15 files changed, 337 insertions(+), 133 deletions(-) create mode 100644 backend/media/worker_pool.go create mode 100644 backend/media/worker_pool_test.go diff --git a/backend/.env.example b/backend/.env.example index f42d85d..08ba090 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -15,7 +15,6 @@ ASSETS_HTTP_ROOT= # NOTE: Enabling the file system store will disable serving assets over HTTP. FILE_STORE=filesystem - # The base URL used for serving file store assets. # Example: http://localhost:8888 FILE_STORE_HTTP_BASE_URL= @@ -28,3 +27,7 @@ AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= AWS_REGION= S3_BUCKET= + +# The number of concurrent FFMPEG processes that will be permitted. +# Defaults to runtime.NumCPU(): +FFMPEG_WORKER_POOL_SIZE= diff --git a/backend/cmd/clipper/main.go b/backend/cmd/clipper/main.go index bd9c108..cc629db 100644 --- a/backend/cmd/clipper/main.go +++ b/backend/cmd/clipper/main.go @@ -19,8 +19,9 @@ import ( ) const ( - defaultTimeout = 600 * time.Second - defaultURLExpiry = time.Hour + defaultTimeout = 600 * time.Second + defaultURLExpiry = time.Hour + maximumWorkerQueueSize = 32 ) func main() { @@ -54,12 +55,17 @@ func main() { log.Fatal(err) } + // Create a worker pool + wp := media.NewWorkerPool(config.FFmpegWorkerPoolSize, maximumWorkerQueueSize, logger.Sugar().Named("FFmpegWorkerPool")) + wp.Run() + log.Fatal(server.Start(server.Options{ Config: config, Timeout: defaultTimeout, Store: store, YoutubeClient: &youtubeClient, FileStore: fileStore, + WorkerPool: wp, Logger: logger, })) } diff --git a/backend/config/config.go b/backend/config/config.go index 01144f6..0e4088d 100644 --- a/backend/config/config.go +++ b/backend/config/config.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" "os" + "runtime" + "strconv" ) type Environment int @@ -34,6 +36,7 @@ type Config struct { AWSRegion string S3Bucket string AssetsHTTPRoot string + FFmpegWorkerPoolSize int } func NewFromEnv() (Config, error) { @@ -111,6 +114,15 @@ func NewFromEnv() (Config, error) { assetsHTTPRoot := os.Getenv("ASSETS_HTTP_ROOT") + ffmpegWorkerPoolSize := runtime.NumCPU() + if s := os.Getenv("FFMPEG_WORKER_POOL_SIZE"); s != "" { + if n, err := strconv.Atoi(s); err != nil { + return Config{}, fmt.Errorf("invalid FFMPEG_WORKER_POOL_SIZE value: %s", s) + } else { + ffmpegWorkerPoolSize = n + } + } + return Config{ Environment: env, BindAddr: bindAddr, @@ -125,5 +137,6 @@ func NewFromEnv() (Config, error) { AssetsHTTPRoot: assetsHTTPRoot, FileStoreHTTPRoot: fileStoreHTTPRoot, FileStoreHTTPBaseURL: fileStoreHTTPBaseURL, + FFmpegWorkerPoolSize: ffmpegWorkerPoolSize, }, nil } diff --git a/backend/go.mod b/backend/go.mod index 538404f..cca85dc 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -16,6 +16,7 @@ require ( github.com/kkdai/youtube/v2 v2.7.6 github.com/stretchr/testify v1.7.0 go.uber.org/zap v1.19.1 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c google.golang.org/grpc v1.43.0 google.golang.org/protobuf v1.27.1 ) diff --git a/backend/go.sum b/backend/go.sum index 16863ea..da080d1 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -572,6 +572,7 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/backend/media/get_audio.go b/backend/media/get_audio.go index b8d649e..44205d5 100644 --- a/backend/media/get_audio.go +++ b/backend/media/get_audio.go @@ -8,11 +8,11 @@ import ( "io" "math" "strconv" - "sync" "git.netflux.io/rob/clipper/config" "git.netflux.io/rob/clipper/generated/store" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) type GetPeaksProgress struct { @@ -32,17 +32,19 @@ type audioGetter struct { youtube YoutubeClient fileStore FileStore commandFunc CommandFunc + workerPool *WorkerPool config config.Config logger *zap.SugaredLogger } // newAudioGetter returns a new audioGetter. -func newAudioGetter(store Store, youtube YoutubeClient, fileStore FileStore, commandFunc CommandFunc, config config.Config, logger *zap.SugaredLogger) *audioGetter { +func newAudioGetter(store Store, youtube YoutubeClient, fileStore FileStore, commandFunc CommandFunc, workerPool *WorkerPool, config config.Config, logger *zap.SugaredLogger) *audioGetter { return &audioGetter{ store: store, youtube: youtube, fileStore: fileStore, commandFunc: commandFunc, + workerPool: workerPool, config: config, logger: logger, } @@ -78,7 +80,13 @@ func (g *audioGetter) GetAudio(ctx context.Context, mediaSet store.MediaSet, num audioGetter: g, getPeaksProgressReader: audioProgressReader, } - go s.getAudio(ctx, stream, mediaSet) + + go func() { + if err := g.workerPool.WaitForTask(ctx, func() error { return s.getAudio(ctx, stream, mediaSet) }); err != nil { + // the progress reader is closed inside the worker in the non-error case. + s.CloseWithError(err) + } + }() return s, nil } @@ -89,97 +97,120 @@ type audioGetterState struct { *getPeaksProgressReader } -func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaSet store.MediaSet) { +func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaSet store.MediaSet) error { streamWithProgress := newLogProgressReader(r, "audio", mediaSet.AudioContentLength, s.logger) - pr, pw := io.Pipe() - teeReader := io.TeeReader(streamWithProgress, pw) var stdErr bytes.Buffer cmd := s.commandFunc(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") - cmd.Stdin = teeReader cmd.Stderr = &stdErr - stdout, err := cmd.StdoutPipe() + + // ffmpegWriter accepts encoded audio and pipes it to FFmpeg. + ffmpegWriter, err := cmd.StdinPipe() if err != nil { - s.CloseWithError(fmt.Errorf("error getting stdout: %v", err)) - return + return fmt.Errorf("error getting stdin: %v", err) } - if err = cmd.Start(); err != nil { - s.CloseWithError(fmt.Errorf("error starting command: %v, output: %s", err, stdErr.String())) - return + + uploadReader, uploadWriter := io.Pipe() + mw := io.MultiWriter(uploadWriter, ffmpegWriter) + + // ffmpegReader delivers raw audio output from FFmpeg, and also writes it + // back to the progress reader. + var ffmpegReader io.Reader + if stdoutPipe, err := cmd.StdoutPipe(); err == nil { + ffmpegReader = io.TeeReader(stdoutPipe, s) + } else { + return fmt.Errorf("error getting stdout: %v", err) } var presignedAudioURL string - var wg sync.WaitGroup - wg.Add(2) + g, ctx := errgroup.WithContext(ctx) // Upload the encoded audio. - // TODO: fix error shadowing in these two goroutines. - go func() { - defer wg.Done() - + g.Go(func() error { // TODO: use mediaSet func to fetch key key := fmt.Sprintf("media_sets/%s/audio.opus", mediaSet.ID) - _, encErr := s.fileStore.PutObject(ctx, key, pr, "audio/opus") + _, encErr := s.fileStore.PutObject(ctx, key, uploadReader, "audio/opus") if encErr != nil { - s.CloseWithError(fmt.Errorf("error uploading encoded audio: %v", encErr)) - return - } - pr.Close() - - presignedAudioURL, err = s.fileStore.GetURL(ctx, key) - if err != nil { - s.CloseWithError(fmt.Errorf("error generating presigned URL: %v", err)) + return fmt.Errorf("error uploading encoded audio: %v", encErr) } - if _, err = s.store.SetEncodedAudioUploaded(ctx, store.SetEncodedAudioUploadedParams{ + presignedAudioURL, encErr = s.fileStore.GetURL(ctx, key) + if encErr != nil { + return fmt.Errorf("error generating presigned URL: %v", encErr) + } + + if _, encErr = s.store.SetEncodedAudioUploaded(ctx, store.SetEncodedAudioUploadedParams{ ID: mediaSet.ID, AudioEncodedS3Key: sqlString(key), - }); err != nil { - s.CloseWithError(fmt.Errorf("error setting encoded audio uploaded: %v", err)) + }); encErr != nil { + return fmt.Errorf("error setting encoded audio uploaded: %v", encErr) } - }() + + return nil + }) // Upload the raw audio. - go func() { - defer wg.Done() - + g.Go(func() error { // TODO: use mediaSet func to fetch key key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) - teeReader := io.TeeReader(stdout, s) - bytesUploaded, rawErr := s.fileStore.PutObject(ctx, key, teeReader, rawAudioMimeType) + bytesUploaded, rawErr := s.fileStore.PutObject(ctx, key, ffmpegReader, rawAudioMimeType) if rawErr != nil { - s.CloseWithError(fmt.Errorf("error uploading raw audio: %v", rawErr)) - return + return fmt.Errorf("error uploading raw audio: %v", rawErr) } - if _, err = s.store.SetRawAudioUploaded(ctx, store.SetRawAudioUploadedParams{ + if _, rawErr = s.store.SetRawAudioUploaded(ctx, store.SetRawAudioUploadedParams{ ID: mediaSet.ID, AudioRawS3Key: sqlString(key), AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(mediaSet.AudioChannels)), - }); err != nil { - s.CloseWithError(fmt.Errorf("error setting raw audio uploaded: %v", err)) + }); rawErr != nil { + return fmt.Errorf("error setting raw audio uploaded: %v", rawErr) } - }() - if err = cmd.Wait(); err != nil { - // TODO: cancel other goroutines (e.g. video fetch) if an error occurs here. - s.CloseWithError(fmt.Errorf("error waiting for command: %v, output: %s", err, stdErr.String())) - return + return nil + }) + + g.Go(func() error { + if _, err := io.Copy(mw, streamWithProgress); err != nil { + return fmt.Errorf("error copying: %v", err) + } + + // ignoring the following Close errors should be ok, as the Copy has + // already completed successfully. + + if err := ffmpegWriter.Close(); err != nil { + s.logger.With("err", err).Warn("getAudio: unable to close ffmpegWriter") + } + + if err := uploadWriter.Close(); err != nil { + s.logger.With("err", err).Warn("getAudio: unable to close pipeWriter") + } + + if err := r.Close(); err != nil { + s.logger.With("err", err).Warn("getAudio: unable to close stream") + } + + return nil + }) + + if err := cmd.Start(); err != nil { + return fmt.Errorf("error starting command: %v, output: %s", err, stdErr.String()) } - // Close the pipe sending encoded audio to be uploaded, this ensures the - // uploader reading from the pipe will receive io.EOF and complete - // successfully. - pw.Close() + if err := g.Wait(); err != nil { + return fmt.Errorf("error uploading: %v", err) + } - // Wait for the uploaders to complete. - wg.Wait() + if err := cmd.Wait(); err != nil { + return fmt.Errorf("error waiting for command: %v, output: %s", err, stdErr.String()) + } // Finally, close the progress reader so that the subsequent call to Next() // returns the presigned URL and io.EOF. s.Close(presignedAudioURL) + + return nil } // getPeaksProgressReader accepts a byte stream containing little endian diff --git a/backend/media/get_audio_test.go b/backend/media/get_audio_test.go index 9804fc0..7fa4c86 100644 --- a/backend/media/get_audio_test.go +++ b/backend/media/get_audio_test.go @@ -6,7 +6,6 @@ import ( "database/sql" "errors" "io" - "io/ioutil" "strings" "testing" "time" @@ -32,13 +31,15 @@ func TestGetAudioFromYoutube(t *testing.T) { ) ctx := context.Background() + wp := media.NewTestWorkerPool() mediaSetID := uuid.New() mediaSet := store.MediaSet{ - ID: mediaSetID, - YoutubeID: videoID, - AudioYoutubeItag: 123, - AudioChannels: 2, - AudioFramesApprox: inFixtureFrames, + ID: mediaSetID, + YoutubeID: videoID, + AudioYoutubeItag: 123, + AudioChannels: 2, + AudioFramesApprox: inFixtureFrames, + AudioContentLength: 22, } video := &youtube.Video{ @@ -49,19 +50,19 @@ func TestGetAudioFromYoutube(t *testing.T) { t.Run("NOK,ErrorFetchingMediaSet", func(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(store.MediaSet{}, errors.New("db went boom")) - service := media.NewMediaSetService(&mockStore, nil, nil, nil, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(&mockStore, nil, nil, nil, wp, config.Config{}, zap.NewNop().Sugar()) _, err := service.GetPeaks(ctx, mediaSetID, 10) assert.EqualError(t, err, "error getting media set: db went boom") }) t.Run("NOK,ErrorFetchingStream", func(t *testing.T) { var mockStore mocks.Store - mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil) + mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) var youtubeClient mocks.YoutubeClient - youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil) - youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(nil, int64(0), errors.New("uh oh")) + youtubeClient.On("GetVideoContext", mock.Anything, mediaSet.YoutubeID).Return(video, nil) + youtubeClient.On("GetStreamContext", mock.Anything, video, &video.Formats[0]).Return(nil, int64(0), errors.New("uh oh")) - service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, wp, config.Config{}, zap.NewNop().Sugar()) _, err := service.GetPeaks(ctx, mediaSetID, 10) assert.EqualError(t, err, "error fetching stream: uh oh") }) @@ -73,10 +74,10 @@ func TestGetAudioFromYoutube(t *testing.T) { mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(invalidMediaSet, nil) var youtubeClient mocks.YoutubeClient - youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil) - youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(nil, int64(0), nil) + youtubeClient.On("GetVideoContext", mock.Anything, mediaSet.YoutubeID).Return(video, nil) + youtubeClient.On("GetStreamContext", mock.Anything, video, &video.Formats[0]).Return(nil, int64(0), nil) - service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, wp, config.Config{}, zap.NewNop().Sugar()) _, err := service.GetPeaks(ctx, mediaSetID, 10) assert.EqualError(t, err, "error building progress reader: error creating audio progress reader (framesExpected = 1323000, channels = 0, numBins = 10)") }) @@ -84,42 +85,47 @@ func TestGetAudioFromYoutube(t *testing.T) { t.Run("NOK,UploadError", func(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) - mockStore.On("SetEncodedAudioUploaded", ctx, mock.Anything).Return(mediaSet, nil) + mockStore.On("SetEncodedAudioUploaded", mock.Anything, mock.Anything).Return(mediaSet, nil) var youtubeClient mocks.YoutubeClient - youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil) - youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(io.NopCloser(bytes.NewReader(nil)), int64(0), nil) + youtubeClient.On("GetVideoContext", mock.Anything, mediaSet.YoutubeID).Return(video, nil) + youtubeClient.On("GetStreamContext", mock.Anything, video, &video.Formats[0]).Return(io.NopCloser(bytes.NewReader(nil)), int64(0), nil) var fileStore mocks.FileStore - fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, "audio/raw").Return(int64(0), errors.New("error uploading raw audio")) - fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, "audio/opus").Return(int64(0), nil) - fileStore.On("GetURL", ctx, mock.Anything).Return("", nil) + fileStore.On("PutObject", mock.Anything, mock.Anything, mock.Anything, "audio/raw").Return(int64(0), errors.New("network error")) + fileStore.On("PutObject", mock.Anything, mock.Anything, mock.Anything, "audio/opus").Return(int64(0), nil) + fileStore.On("GetURL", mock.Anything, mock.Anything).Return("", nil) cmd := helperCommand(t, "", inFixturePath, "", 0) - service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, wp, config.Config{}, zap.NewNop().Sugar()) stream, err := service.GetPeaks(ctx, mediaSetID, 10) assert.NoError(t, err) _, err = stream.Next() - assert.EqualError(t, err, "error waiting for progress: error uploading raw audio: error uploading raw audio") + assert.EqualError(t, err, "error waiting for progress: error uploading: error uploading raw audio: network error") }) t.Run("NOK,FFmpegError", func(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) - mockStore.On("SetEncodedAudioUploaded", ctx, mock.Anything).Return(mediaSet, nil) - mockStore.On("SetRawAudioUploaded", ctx, mock.Anything).Return(mediaSet, nil) + mockStore.On("SetEncodedAudioUploaded", mock.Anything, mock.Anything).Return(mediaSet, nil) + mockStore.On("SetRawAudioUploaded", mock.Anything, mock.Anything).Return(mediaSet, nil) var youtubeClient mocks.YoutubeClient - youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil) - youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(io.NopCloser(strings.NewReader("some audio")), int64(0), nil) + youtubeClient.On("GetVideoContext", mock.Anything, mediaSet.YoutubeID).Return(video, nil) + youtubeClient.On("GetStreamContext", mock.Anything, video, &video.Formats[0]).Return(io.NopCloser(strings.NewReader("some audio")), int64(0), nil) var fileStore mocks.FileStore - fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, mock.Anything).Return(int64(0), nil) - fileStore.On("GetURL", ctx, mock.Anything).Return("", nil) + fileStore.On("PutObject", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + _, err := io.Copy(io.Discard, args[2].(io.Reader)) + require.NoError(t, err) + }). + Return(int64(0), nil) + fileStore.On("GetURL", mock.Anything, mock.Anything).Return("", nil) cmd := helperCommand(t, "", inFixturePath, "oh no", 101) - service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, wp, config.Config{}, zap.NewNop().Sugar()) stream, err := service.GetPeaks(ctx, mediaSetID, 10) assert.NoError(t, err) @@ -130,11 +136,11 @@ func TestGetAudioFromYoutube(t *testing.T) { t.Run("OK", func(t *testing.T) { // Mock Store var mockStore mocks.Store - mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil) - mockStore.On("SetRawAudioUploaded", ctx, mock.MatchedBy(func(p store.SetRawAudioUploadedParams) bool { + mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) + mockStore.On("SetRawAudioUploaded", mock.Anything, mock.MatchedBy(func(p store.SetRawAudioUploadedParams) bool { return p.ID == mediaSetID && p.AudioFrames.Int64 == inFixtureFrames })).Return(mediaSet, nil) - mockStore.On("SetEncodedAudioUploaded", ctx, mock.MatchedBy(func(p store.SetEncodedAudioUploadedParams) bool { + mockStore.On("SetEncodedAudioUploaded", mock.Anything, mock.MatchedBy(func(p store.SetEncodedAudioUploadedParams) bool { return p.ID == mediaSetID })).Return(mediaSet, nil) defer mockStore.AssertExpectations(t) @@ -142,9 +148,10 @@ func TestGetAudioFromYoutube(t *testing.T) { // Mock YoutubeClient encodedContent := "this is an opus stream" reader := io.NopCloser(strings.NewReader(encodedContent)) + var youtubeClient mocks.YoutubeClient - youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil) - youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil) + youtubeClient.On("GetVideoContext", mock.Anything, mediaSet.YoutubeID).Return(video, nil) + youtubeClient.On("GetStreamContext", mock.Anything, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil) defer youtubeClient.AssertExpectations(t) // Mock FileStore @@ -153,26 +160,26 @@ func TestGetAudioFromYoutube(t *testing.T) { // passed to them is as expected. url := "https://www.example.com/foo" var fileStore mocks.FileStore - fileStore.On("PutObject", ctx, "media_sets/"+mediaSetID.String()+"/audio.opus", mock.Anything, "audio/opus"). + fileStore.On("PutObject", mock.Anything, "media_sets/"+mediaSetID.String()+"/audio.opus", mock.Anything, "audio/opus"). Run(func(args mock.Arguments) { - readContent, err := ioutil.ReadAll(args[2].(io.Reader)) + readContent, err := io.ReadAll(args[2].(io.Reader)) require.NoError(t, err) assert.Equal(t, encodedContent, string(readContent)) }). Return(int64(len(encodedContent)), nil) - fileStore.On("PutObject", ctx, "media_sets/"+mediaSetID.String()+"/audio.raw", mock.Anything, "audio/raw"). + fileStore.On("PutObject", mock.Anything, "media_sets/"+mediaSetID.String()+"/audio.raw", mock.Anything, "audio/raw"). Run(func(args mock.Arguments) { n, err := io.Copy(io.Discard, args[2].(io.Reader)) require.NoError(t, err) assert.Equal(t, inFixtureLen, n) }). Return(inFixtureLen, nil) - fileStore.On("GetURL", ctx, "media_sets/"+mediaSetID.String()+"/audio.opus").Return(url, nil) + fileStore.On("GetURL", mock.Anything, "media_sets/"+mediaSetID.String()+"/audio.opus").Return(url, nil) defer fileStore.AssertExpectations(t) numBins := 10 cmd := helperCommand(t, "ffmpeg -hide_banner -loglevel error -i - -f s16le -ar 48000 -acodec pcm_s16le -", inFixturePath, "", 0) - service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, wp, config.Config{}, zap.NewNop().Sugar()) stream, err := service.GetPeaks(ctx, mediaSetID, numBins) require.NoError(t, err) @@ -187,6 +194,7 @@ func TestGetPeaksFromFileStore(t *testing.T) { ) ctx := context.Background() + wp := media.NewTestWorkerPool() logger := zap.NewNop().Sugar() mediaSetID := uuid.New() mediaSet := store.MediaSet{ @@ -202,7 +210,7 @@ func TestGetPeaksFromFileStore(t *testing.T) { t.Run("NOK,ErrorFetchingMediaSet", func(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(store.MediaSet{}, errors.New("db went boom")) - service := media.NewMediaSetService(&mockStore, nil, nil, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, nil, nil, nil, wp, config.Config{}, logger) _, err := service.GetPeaks(ctx, mediaSetID, 10) assert.EqualError(t, err, "error getting media set: db went boom") }) @@ -215,7 +223,7 @@ func TestGetPeaksFromFileStore(t *testing.T) { var fileStore mocks.FileStore fileStore.On("GetObject", mock.Anything, "raw audio key").Return(nil, errors.New("boom")) - service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, wp, config.Config{}, logger) _, err := service.GetPeaks(ctx, mediaSetID, 10) require.EqualError(t, err, "error getting object from file store: boom") }) @@ -231,7 +239,7 @@ func TestGetPeaksFromFileStore(t *testing.T) { fileStore.On("GetURL", mock.Anything, "encoded audio key").Return("", errors.New("network error")) defer fileStore.AssertExpectations(t) - service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, wp, config.Config{}, logger) stream, err := service.GetPeaks(ctx, mediaSetID, 10) require.NoError(t, err) @@ -260,7 +268,7 @@ func TestGetPeaksFromFileStore(t *testing.T) { defer fileStore.AssertExpectations(t) numBins := 10 - service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, wp, config.Config{}, logger) stream, err := service.GetPeaks(ctx, mediaSetID, numBins) require.NoError(t, err) diff --git a/backend/media/get_segment.go b/backend/media/get_segment.go index 068f16f..99eb91f 100644 --- a/backend/media/get_segment.go +++ b/backend/media/get_segment.go @@ -70,6 +70,7 @@ func (s *AudioSegmentStream) closeWithError(err error) { type audioSegmentGetter struct { mu sync.Mutex commandFunc CommandFunc + workerPool *WorkerPool rawAudio io.ReadCloser channels int32 outFormat AudioFormat @@ -79,9 +80,10 @@ type audioSegmentGetter struct { // newAudioSegmentGetter returns a new audioSegmentGetter. The io.ReadCloser // will be consumed and closed by the getAudioSegment() function. -func newAudioSegmentGetter(commandFunc CommandFunc, rawAudio io.ReadCloser, channels int32, bytesExpected int64, outFormat AudioFormat) *audioSegmentGetter { +func newAudioSegmentGetter(commandFunc CommandFunc, workerPool *WorkerPool, rawAudio io.ReadCloser, channels int32, bytesExpected int64, outFormat AudioFormat) *audioSegmentGetter { return &audioSegmentGetter{ commandFunc: commandFunc, + workerPool: workerPool, rawAudio: rawAudio, channels: channels, bytesExpected: bytesExpected, @@ -137,19 +139,26 @@ func (s *AudioSegmentStream) Next(ctx context.Context) (AudioSegmentProgress, er func (s *audioSegmentGetter) getAudioSegment(ctx context.Context) { defer s.rawAudio.Close() - var stdErr bytes.Buffer - cmd := s.commandFunc(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-f", "s16le", "-ac", itoa(int(s.channels)), "-ar", itoa(rawAudioSampleRate), "-i", "-", "-f", s.outFormat.String(), "-") - cmd.Stderr = &stdErr - cmd.Stdin = s - cmd.Stdout = s + err := s.workerPool.WaitForTask(ctx, func() error { + var stdErr bytes.Buffer + cmd := s.commandFunc(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-f", "s16le", "-ac", itoa(int(s.channels)), "-ar", itoa(rawAudioSampleRate), "-i", "-", "-f", s.outFormat.String(), "-") + cmd.Stderr = &stdErr + cmd.Stdin = s + cmd.Stdout = s - if err := cmd.Start(); err != nil { - s.stream.closeWithError(fmt.Errorf("error starting command: %v, output: %s", err, stdErr.String())) - return - } + if err := cmd.Start(); err != nil { + return fmt.Errorf("error starting command: %v, output: %s", err, stdErr.String()) + } - if err := cmd.Wait(); err != nil { - s.stream.closeWithError(fmt.Errorf("error waiting for ffmpeg: %v, output: %s", err, stdErr.String())) + if err := cmd.Wait(); err != nil { + return fmt.Errorf("error waiting for ffmpeg: %v, output: %s", err, stdErr.String()) + } + + return nil + }) + + if err != nil { + s.stream.closeWithError(err) return } diff --git a/backend/media/get_segment_test.go b/backend/media/get_segment_test.go index 1fdc67c..f60454b 100644 --- a/backend/media/get_segment_test.go +++ b/backend/media/get_segment_test.go @@ -20,13 +20,14 @@ import ( ) func TestGetSegment(t *testing.T) { - mediaSetID := uuid.MustParse("4c440241-cca9-436f-adb0-be074588cf2b") const inFixturePath = "testdata/tone-44100-stereo-int16-30000ms.raw" + mediaSetID := uuid.MustParse("4c440241-cca9-436f-adb0-be074588cf2b") + wp := media.NewTestWorkerPool() t.Run("invalid range", func(t *testing.T) { var mockStore mocks.Store var fileStore mocks.FileStore - service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, wp, config.Config{}, zap.NewNop().Sugar()) stream, err := service.GetAudioSegment(context.Background(), mediaSetID, 1, 0, media.AudioFormatMP3) require.Nil(t, stream) @@ -37,7 +38,7 @@ func TestGetSegment(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(store.MediaSet{}, pgx.ErrNoRows) var fileStore mocks.FileStore - service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, wp, config.Config{}, zap.NewNop().Sugar()) stream, err := service.GetAudioSegment(context.Background(), mediaSetID, 0, 1, media.AudioFormatMP3) require.Nil(t, stream) @@ -54,7 +55,7 @@ func TestGetSegment(t *testing.T) { fileStore.On("GetObjectWithRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil, errors.New("network error")) - service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, wp, config.Config{}, zap.NewNop().Sugar()) stream, err := service.GetAudioSegment(context.Background(), mediaSetID, 0, 1, media.AudioFormatMP3) require.Nil(t, stream) @@ -72,7 +73,7 @@ func TestGetSegment(t *testing.T) { Return(fixtureReader(t, inFixturePath, 1), nil) cmd := helperCommand(t, "", "", "something bad happened", 2) - service := media.NewMediaSetService(&mockStore, nil, &fileStore, cmd, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(&mockStore, nil, &fileStore, cmd, wp, config.Config{}, zap.NewNop().Sugar()) stream, err := service.GetAudioSegment(context.Background(), mediaSetID, 0, 1, media.AudioFormatMP3) require.NoError(t, err) @@ -158,7 +159,7 @@ func TestGetSegment(t *testing.T) { defer fileStore.AssertExpectations(t) cmd := helperCommand(t, tc.wantCommand, tc.outFixturePath, "", 0) - service := media.NewMediaSetService(&mockStore, nil, &fileStore, cmd, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(&mockStore, nil, &fileStore, cmd, wp, config.Config{}, zap.NewNop().Sugar()) stream, err := service.GetAudioSegment(ctx, mediaSetID, tc.inStartFrame, tc.inEndFrame, tc.audioFormat) require.NoError(t, err) diff --git a/backend/media/get_video_test.go b/backend/media/get_video_test.go index c1f3fdf..1965cf3 100644 --- a/backend/media/get_video_test.go +++ b/backend/media/get_video_test.go @@ -24,6 +24,7 @@ import ( func TestGetVideoFromYoutube(t *testing.T) { ctx := context.Background() + wp := media.NewTestWorkerPool() logger := zap.NewNop().Sugar() const ( @@ -51,7 +52,7 @@ func TestGetVideoFromYoutube(t *testing.T) { var youtubeClient mocks.YoutubeClient youtubeClient.On("GetVideoContext", ctx, videoID).Return(nil, errors.New("nope")) - service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, wp, config.Config{}, logger) _, err := service.GetVideo(ctx, mediaSetID) assert.EqualError(t, err, "error fetching video: nope") }) @@ -64,7 +65,7 @@ func TestGetVideoFromYoutube(t *testing.T) { youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil) youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(nil, int64(0), errors.New("network failure")) - service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, wp, config.Config{}, logger) _, err := service.GetVideo(ctx, mediaSetID) assert.EqualError(t, err, "error fetching stream: network failure") }) @@ -83,7 +84,7 @@ func TestGetVideoFromYoutube(t *testing.T) { var fileStore mocks.FileStore fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).Return(int64(0), errors.New("error storing object")) - service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, wp, config.Config{}, logger) stream, err := service.GetVideo(ctx, mediaSetID) require.NoError(t, err) @@ -105,7 +106,7 @@ func TestGetVideoFromYoutube(t *testing.T) { var fileStore mocks.FileStore fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).Return(int64(len(encodedContent)), nil) - service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, wp, config.Config{}, logger) stream, err := service.GetVideo(ctx, mediaSetID) require.NoError(t, err) @@ -128,7 +129,7 @@ func TestGetVideoFromYoutube(t *testing.T) { fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).Return(int64(len(encodedContent)), nil) fileStore.On("GetURL", ctx, mock.Anything).Return("", errors.New("URL error")) - service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, wp, config.Config{}, logger) stream, err := service.GetVideo(ctx, mediaSetID) require.NoError(t, err) @@ -152,7 +153,7 @@ func TestGetVideoFromYoutube(t *testing.T) { fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).Return(int64(len(encodedContent)), nil) fileStore.On("GetURL", ctx, mock.Anything).Return("a url", nil) - service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, wp, config.Config{}, logger) stream, err := service.GetVideo(ctx, mediaSetID) require.NoError(t, err) @@ -185,7 +186,7 @@ func TestGetVideoFromYoutube(t *testing.T) { fileStore.On("GetURL", ctx, mock.Anything).Return("a url", nil) defer fileStore.AssertExpectations(t) - service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, wp, config.Config{}, logger) stream, err := service.GetVideo(ctx, mediaSetID) require.NoError(t, err) @@ -222,6 +223,7 @@ func (c errorCloser) Close() error { return errors.New("close error") } func TestGetVideoFromFileStore(t *testing.T) { ctx := context.Background() + wp := media.NewTestWorkerPool() logger := zap.NewNop().Sugar() videoID := "video002" @@ -237,7 +239,7 @@ func TestGetVideoFromFileStore(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", ctx, mediaSetID).Return(store.MediaSet{}, errors.New("database fail")) - service := media.NewMediaSetService(&mockStore, nil, nil, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, nil, nil, nil, wp, config.Config{}, logger) _, err := service.GetVideo(ctx, mediaSetID) require.EqualError(t, err, "error getting media set: database fail") }) @@ -249,7 +251,7 @@ func TestGetVideoFromFileStore(t *testing.T) { var fileStore mocks.FileStore fileStore.On("GetURL", ctx, "videos/myvideo").Return("", errors.New("key missing")) - service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, wp, config.Config{}, logger) _, err := service.GetVideo(ctx, mediaSetID) require.EqualError(t, err, "error generating presigned URL: key missing") }) @@ -262,7 +264,7 @@ func TestGetVideoFromFileStore(t *testing.T) { var fileStore mocks.FileStore fileStore.On("GetURL", ctx, "videos/myvideo").Return(url, nil) - service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, config.Config{}, logger) + service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, wp, config.Config{}, logger) stream, err := service.GetVideo(ctx, mediaSetID) require.NoError(t, err) diff --git a/backend/media/service.go b/backend/media/service.go index 316047e..a8d04f6 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -37,16 +37,18 @@ type MediaSetService struct { youtube YoutubeClient fileStore FileStore commandFunc CommandFunc + workerPool *WorkerPool config config.Config logger *zap.SugaredLogger } -func NewMediaSetService(store Store, youtubeClient YoutubeClient, fileStore FileStore, commandFunc CommandFunc, config config.Config, logger *zap.SugaredLogger) *MediaSetService { +func NewMediaSetService(store Store, youtubeClient YoutubeClient, fileStore FileStore, commandFunc CommandFunc, workerPool *WorkerPool, config config.Config, logger *zap.SugaredLogger) *MediaSetService { return &MediaSetService{ store: store, youtube: youtubeClient, fileStore: fileStore, commandFunc: commandFunc, + workerPool: workerPool, config: config, logger: logger, } @@ -272,7 +274,7 @@ func (s *MediaSetService) GetPeaks(ctx context.Context, id uuid.UUID, numBins in } func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetPeaksProgressReader, error) { - audioGetter := newAudioGetter(s.store, s.youtube, s.fileStore, s.commandFunc, s.config, s.logger) + audioGetter := newAudioGetter(s.store, s.youtube, s.fileStore, s.commandFunc, s.workerPool, s.config, s.logger) return audioGetter.GetAudio(ctx, mediaSet, numBins) } @@ -459,7 +461,7 @@ func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, sta return nil, fmt.Errorf("error getting object from store: %v", err) } - g := newAudioSegmentGetter(s.commandFunc, rawAudio, mediaSet.AudioChannels, endByte-startByte, outFormat) + g := newAudioSegmentGetter(s.commandFunc, s.workerPool, rawAudio, mediaSet.AudioChannels, endByte-startByte, outFormat) go g.getAudioSegment(ctx) return g.stream, nil diff --git a/backend/media/service_test.go b/backend/media/service_test.go index 2615edd..313ce8d 100644 --- a/backend/media/service_test.go +++ b/backend/media/service_test.go @@ -6,7 +6,6 @@ import ( "database/sql" "io" "os" - "os/exec" "testing" "git.netflux.io/rob/clipper/config" @@ -111,7 +110,7 @@ func TestPeaksForSegment(t *testing.T) { On("GetObjectWithRange", mock.Anything, "foo", startByte, endByte). Return(audioData, nil) - service := media.NewMediaSetService(store, nil, fileStore, exec.CommandContext, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(store, nil, fileStore, nil, media.NewTestWorkerPool(), config.Config{}, zap.NewNop().Sugar()) peaks, err := service.GetPeaksForSegment(context.Background(), mediaSet.ID, tc.startFrame, tc.endFrame, tc.numBins) if tc.wantErr == "" { @@ -154,7 +153,7 @@ func BenchmarkGetPeaksForSegment(b *testing.B) { On("GetObjectWithRange", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(readCloser, nil) - service := media.NewMediaSetService(store, nil, fileStore, exec.CommandContext, config.Config{}, zap.NewNop().Sugar()) + service := media.NewMediaSetService(store, nil, fileStore, nil, media.NewTestWorkerPool(), config.Config{}, zap.NewNop().Sugar()) b.StartTimer() _, err = service.GetPeaksForSegment(context.Background(), mediaSetID, startFrame, endFrame, numBins) diff --git a/backend/media/worker_pool.go b/backend/media/worker_pool.go new file mode 100644 index 0000000..d7cb983 --- /dev/null +++ b/backend/media/worker_pool.go @@ -0,0 +1,73 @@ +package media + +import ( + "context" + "errors" + "time" + + "go.uber.org/zap" +) + +// WorkerPool is a pool of workers that can consume and run a queue of tasks. +type WorkerPool struct { + size int + ch chan func() + logger *zap.SugaredLogger +} + +// NewWorkerPool returns a new WorkerPool containing the specified number of +// workers, and with the provided maximum queue size. Jobs added to the queue +// after it reaches this size limit will be rejected. +func NewWorkerPool(size int, maxQueueSize int, logger *zap.SugaredLogger) *WorkerPool { + return &WorkerPool{ + size: size, + ch: make(chan func(), maxQueueSize), + logger: logger, + } +} + +// NewTestWorkerPool returns a new running WorkerPool with a single worker, +// and noop logger, suitable for test environments. +func NewTestWorkerPool() *WorkerPool { + p := NewWorkerPool(1, 256, zap.NewNop().Sugar()) + p.Run() + return p +} + +// Run launches the workers, and returns immediately. +func (p *WorkerPool) Run() { + for i := 0; i < p.size; i++ { + go func() { + for task := range p.ch { + task() + } + }() + } +} + +// WaitForTask blocks while the provided task is executed by a worker, +// returning the error returned by the task. +func (p *WorkerPool) WaitForTask(ctx context.Context, taskFunc func() error) error { + done := make(chan error) + queuedAt := time.Now() + fn := func() { + startedAt := time.Now() + result := taskFunc() + + durTotal := time.Since(queuedAt) + durTask := time.Since(startedAt) + durQueue := startedAt.Sub(queuedAt) + p.logger.With("task", durTask, "queue", durQueue, "total", durTotal).Infof("Completed task") + + done <- result + } + + select { + case p.ch <- fn: + return <-done + case <-ctx.Done(): + return ctx.Err() + default: + return errors.New("worker queue full") + } +} diff --git a/backend/media/worker_pool_test.go b/backend/media/worker_pool_test.go new file mode 100644 index 0000000..13937fd --- /dev/null +++ b/backend/media/worker_pool_test.go @@ -0,0 +1,53 @@ +package media_test + +import ( + "context" + "sync" + "testing" + "time" + + "git.netflux.io/rob/clipper/media" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestWorkerPool(t *testing.T) { + ctx := context.Background() + + p := media.NewWorkerPool(2, 1, zap.NewNop().Sugar()) + p.Run() + + const taskCount = 4 + const dur = time.Millisecond * 100 + + ch := make(chan error, taskCount) + var wg sync.WaitGroup + wg.Add(taskCount) + + for i := 0; i < taskCount; i++ { + go func() { + defer wg.Done() + ch <- p.WaitForTask(ctx, func() error { time.Sleep(dur); return nil }) + }() + } + + wg.Wait() + close(ch) + + var okCount, errCount int + + for err := range ch { + if err == nil { + okCount++ + } else { + errCount++ + require.EqualError(t, err, "worker queue full") + } + } + + // There can either be 1 or 2 failures, depending on whether a worker picks + // up one job before the last one is added to the queue. + ok := (okCount == 2 && errCount == 2) || (okCount == 3 && errCount == 1) + assert.True(t, ok) +} diff --git a/backend/server/server.go b/backend/server/server.go index 84af76e..a305d73 100644 --- a/backend/server/server.go +++ b/backend/server/server.go @@ -68,6 +68,7 @@ type Options struct { Store media.Store YoutubeClient media.YoutubeClient FileStore media.FileStore + WorkerPool *media.WorkerPool Logger *zap.Logger } @@ -276,6 +277,7 @@ func Start(options Options) error { options.YoutubeClient, options.FileStore, exec.CommandContext, + options.WorkerPool, options.Config, options.Logger.Sugar().Named("mediaSetService"), )