From 66c65694ae7fd4523b17e22ee8a8698377b15818 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Mon, 3 Jan 2022 18:44:19 +0100 Subject: [PATCH] Add test coverage for getAudioFromYoutube flow --- backend/media/get_audio.go | 31 ++-- backend/media/get_audio_test.go | 244 +++++++++++++++++++++++++----- backend/media/get_segment_test.go | 78 ---------- backend/media/service.go | 2 +- backend/media/test_helper_test.go | 94 ++++++++++++ 5 files changed, 319 insertions(+), 130 deletions(-) create mode 100644 backend/media/test_helper_test.go diff --git a/backend/media/get_audio.go b/backend/media/get_audio.go index fb0a43b..146e1e2 100644 --- a/backend/media/get_audio.go +++ b/backend/media/get_audio.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "math" - "os/exec" "strconv" "sync" @@ -29,21 +28,23 @@ type GetPeaksProgressReader interface { // audioGetter manages getting and processing audio from Youtube. type audioGetter struct { - store Store - youtube YoutubeClient - fileStore FileStore - config config.Config - logger *zap.SugaredLogger + store Store + youtube YoutubeClient + fileStore FileStore + commandFunc CommandFunc + config config.Config + logger *zap.SugaredLogger } // newAudioGetter returns a new audioGetter. -func newAudioGetter(store Store, youtube YoutubeClient, fileStore FileStore, config config.Config, logger *zap.SugaredLogger) *audioGetter { +func newAudioGetter(store Store, youtube YoutubeClient, fileStore FileStore, commandFunc CommandFunc, config config.Config, logger *zap.SugaredLogger) *audioGetter { return &audioGetter{ - store: store, - youtube: youtube, - fileStore: fileStore, - config: config, - logger: logger, + store: store, + youtube: youtube, + fileStore: fileStore, + commandFunc: commandFunc, + config: config, + logger: logger, } } @@ -60,7 +61,7 @@ func (g *audioGetter) GetAudio(ctx context.Context, mediaSet store.MediaSet, num format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag)) if format == nil { - return nil, fmt.Errorf("error finding itag: %v", err) + return nil, fmt.Errorf("error finding itag: %d", mediaSet.AudioYoutubeItag) } stream, _, err := g.youtube.GetStreamContext(ctx, video, format) @@ -94,7 +95,8 @@ func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaS teeReader := io.TeeReader(streamWithProgress, pw) var stdErr bytes.Buffer - cmd := exec.CommandContext(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") + cmd := s.commandFunc(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") + fmt.Println("cmd is", cmd, cmd.Env) cmd.Stdin = teeReader cmd.Stderr = &stdErr stdout, err := cmd.StdoutPipe() @@ -124,6 +126,7 @@ func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaS s.CloseWithError(fmt.Errorf("error uploading encoded audio: %v", encErr)) return } + pr.Close() presignedAudioURL, err = s.fileStore.GetURL(ctx, key) if err != nil { diff --git a/backend/media/get_audio_test.go b/backend/media/get_audio_test.go index 662adca..696d2df 100644 --- a/backend/media/get_audio_test.go +++ b/backend/media/get_audio_test.go @@ -1,10 +1,13 @@ package media_test import ( + "bytes" "context" "database/sql" "errors" "io" + "io/ioutil" + "strings" "testing" "time" @@ -13,14 +16,174 @@ import ( "git.netflux.io/rob/clipper/generated/store" "git.netflux.io/rob/clipper/media" "github.com/google/uuid" + "github.com/kkdai/youtube/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" ) +func TestGetAudioFromYoutube(t *testing.T) { + const ( + videoID = "abcdef12" + inFixturePath = "testdata/tone-44100-stereo-int16-30000ms.raw" + inFixtureLen = int64(5_292_000) + inFixtureFrames = inFixtureLen / 4 // stereo-int16 + ) + + ctx := context.Background() + mediaSetID := uuid.New() + mediaSet := store.MediaSet{ + ID: mediaSetID, + YoutubeID: videoID, + AudioYoutubeItag: 123, + AudioChannels: 2, + AudioFramesApprox: inFixtureFrames, + } + + video := &youtube.Video{ + ID: videoID, + Formats: []youtube.Format{{ItagNo: 123, FPS: 0, AudioChannels: 2}}, + } + + t.Run("NOK,ErrorFetchingMediaSet", func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(store.MediaSet{}, errors.New("db went boom")) + service := media.NewMediaSetService(&mockStore, nil, nil, nil, config.Config{}, zap.NewNop().Sugar()) + _, err := service.GetPeaks(ctx, mediaSetID, 10) + assert.EqualError(t, err, "error getting media set: db went boom") + }) + + t.Run("NOK,ErrorFetchingStream", func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil) + var youtubeClient mocks.YoutubeClient + youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil) + youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(nil, int64(0), errors.New("uh oh")) + + service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, config.Config{}, zap.NewNop().Sugar()) + _, err := service.GetPeaks(ctx, mediaSetID, 10) + assert.EqualError(t, err, "error fetching stream: uh oh") + }) + + t.Run("NOK,ErrorBuildingProgressReader", func(t *testing.T) { + invalidMediaSet := mediaSet + invalidMediaSet.AudioChannels = 0 + var mockStore mocks.Store + mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(invalidMediaSet, nil) + var youtubeClient mocks.YoutubeClient + youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil) + youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(nil, int64(0), nil) + + service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, config.Config{}, zap.NewNop().Sugar()) + _, err := service.GetPeaks(ctx, mediaSetID, 10) + assert.EqualError(t, err, "error building progress reader: error creating audio progress reader (framesExpected = 1323000, channels = 0, numBins = 10)") + }) + + t.Run("NOK,UploadError", func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) + mockStore.On("SetEncodedAudioUploaded", ctx, mock.Anything).Return(mediaSet, nil) + + var youtubeClient mocks.YoutubeClient + youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil) + youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(io.NopCloser(bytes.NewReader(nil)), int64(0), nil) + + var fileStore mocks.FileStore + fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, "audio/raw").Return(int64(0), errors.New("error uploading raw audio")) + fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, "audio/opus").Return(int64(0), nil) + fileStore.On("GetURL", ctx, mock.Anything).Return("", nil) + + cmd := helperCommand(t, "", inFixturePath, "", 0) + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, config.Config{}, zap.NewNop().Sugar()) + stream, err := service.GetPeaks(ctx, mediaSetID, 10) + assert.NoError(t, err) + + _, err = stream.Next() + assert.EqualError(t, err, "error waiting for progress: error uploading raw audio: error uploading raw audio") + }) + + t.Run("NOK,FFmpegError", func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) + mockStore.On("SetEncodedAudioUploaded", ctx, mock.Anything).Return(mediaSet, nil) + mockStore.On("SetRawAudioUploaded", ctx, mock.Anything).Return(mediaSet, nil) + + var youtubeClient mocks.YoutubeClient + youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil) + youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(io.NopCloser(strings.NewReader("some audio")), int64(0), nil) + + var fileStore mocks.FileStore + fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, mock.Anything).Return(int64(0), nil) + fileStore.On("GetURL", ctx, mock.Anything).Return("", nil) + + cmd := helperCommand(t, "", inFixturePath, "oh no", 101) + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, config.Config{}, zap.NewNop().Sugar()) + stream, err := service.GetPeaks(ctx, mediaSetID, 10) + assert.NoError(t, err) + + _, err = stream.Next() + assert.EqualError(t, err, "error waiting for progress: error waiting for command: exit status 101, output: oh no") + }) + + t.Run("OK", func(t *testing.T) { + // Mock Store + var mockStore mocks.Store + mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil) + mockStore.On("SetRawAudioUploaded", ctx, mock.MatchedBy(func(p store.SetRawAudioUploadedParams) bool { + return p.ID == mediaSetID && p.AudioFrames.Int64 == inFixtureFrames + })).Return(mediaSet, nil) + mockStore.On("SetEncodedAudioUploaded", ctx, mock.MatchedBy(func(p store.SetEncodedAudioUploadedParams) bool { + return p.ID == mediaSetID + })).Return(mediaSet, nil) + defer mockStore.AssertExpectations(t) + + // Mock YoutubeClient + encodedContent := "this is an opus stream" + reader := io.NopCloser(strings.NewReader(encodedContent)) + var youtubeClient mocks.YoutubeClient + youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil) + youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil) + defer youtubeClient.AssertExpectations(t) + + // Mock FileStore + // It is necessary to consume the readers passed into the mocks to avoid IO + // errors. Since we're doing that we can also assert the content that is + // passed to them is as expected. + url := "https://www.example.com/foo" + var fileStore mocks.FileStore + fileStore.On("PutObject", ctx, "media_sets/"+mediaSetID.String()+"/audio.opus", mock.Anything, "audio/opus"). + Run(func(args mock.Arguments) { + readContent, err := ioutil.ReadAll(args[2].(io.Reader)) + require.NoError(t, err) + assert.Equal(t, encodedContent, string(readContent)) + }). + Return(int64(len(encodedContent)), nil) + fileStore.On("PutObject", ctx, "media_sets/"+mediaSetID.String()+"/audio.raw", mock.Anything, "audio/raw"). + Run(func(args mock.Arguments) { + n, err := io.Copy(io.Discard, args[2].(io.Reader)) + require.NoError(t, err) + assert.Equal(t, inFixtureLen, n) + }). + Return(inFixtureLen, nil) + fileStore.On("GetURL", ctx, "media_sets/"+mediaSetID.String()+"/audio.opus").Return(url, nil) + defer fileStore.AssertExpectations(t) + + numBins := 10 + cmd := helperCommand(t, "ffmpeg -hide_banner -loglevel error -i - -f s16le -ar 48000 -acodec pcm_s16le -", inFixturePath, "", 0) + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, config.Config{}, zap.NewNop().Sugar()) + stream, err := service.GetPeaks(ctx, mediaSetID, numBins) + require.NoError(t, err) + + assertConsumeStream(t, numBins, url, stream) + }) +} + func TestGetPeaksFromFileStore(t *testing.T) { - const inFixturePath = "testdata/tone-44100-stereo-int16-30000ms.raw" + const ( + inFixturePath = "testdata/tone-44100-stereo-int16-30000ms.raw" + inFixtureLen = 5_292_000 + ) ctx := context.Background() logger := zap.NewNop().Sugar() @@ -62,7 +225,7 @@ func TestGetPeaksFromFileStore(t *testing.T) { defer mockStore.AssertExpectations(t) var fileStore mocks.FileStore - reader := fixtureReader(t, inFixturePath, 5_292_000) + reader := fixtureReader(t, inFixturePath, inFixtureLen) fileStore.On("GetObject", mock.Anything, "raw audio key").Return(reader, nil) fileStore.On("GetURL", mock.Anything, "encoded audio key").Return("", errors.New("network error")) defer fileStore.AssertExpectations(t) @@ -89,9 +252,10 @@ func TestGetPeaksFromFileStore(t *testing.T) { defer mockStore.AssertExpectations(t) var fileStore mocks.FileStore - reader := fixtureReader(t, inFixturePath, 5_292_000) + url := "https://www.example.com/foo" + reader := fixtureReader(t, inFixturePath, inFixtureLen) fileStore.On("GetObject", mock.Anything, "raw audio key").Return(reader, nil) - fileStore.On("GetURL", mock.Anything, "encoded audio key").Return("https://www.example.com/foo", nil) + fileStore.On("GetURL", mock.Anything, "encoded audio key").Return(url, nil) defer fileStore.AssertExpectations(t) numBins := 10 @@ -99,38 +263,44 @@ func TestGetPeaksFromFileStore(t *testing.T) { stream, err := service.GetPeaks(ctx, mediaSetID, numBins) require.NoError(t, err) - lastPeaks := make([]int16, 2) // stereo - var ( - count int - lastPercentComplete float32 - lastURL string - ) - - for { - progress, err := stream.Next() - if err != io.EOF { - require.NoError(t, err) - } - - assert.Len(t, progress.Peaks, 2) - assert.GreaterOrEqual(t, progress.PercentComplete, lastPercentComplete) - lastPercentComplete = progress.PercentComplete - lastURL = progress.URL - - if err == io.EOF { - break - } - - // the fixture is a tone gradually increasing in amplitude: - assert.Greater(t, progress.Peaks[0], lastPeaks[0]) - assert.Greater(t, progress.Peaks[1], lastPeaks[1]) - lastPeaks = progress.Peaks - count++ - } - - assert.Equal(t, float32(100), lastPercentComplete) - assert.Equal(t, []int16{32_767, 32_766}, lastPeaks) - assert.Equal(t, numBins, count) - assert.Equal(t, "https://www.example.com/foo", lastURL) + assertConsumeStream(t, numBins, url, stream) }) } + +// assertConsumeStream asserts that the stream produced by both the +// from-youtube and from-filestore flows is identical. +func assertConsumeStream(t *testing.T, expBins int, expURL string, stream media.GetPeaksProgressReader) { + lastPeaks := make([]int16, 2) // stereo + var ( + count int + lastPercentComplete float32 + lastURL string + ) + + for { + progress, err := stream.Next() + if err != io.EOF { + require.NoError(t, err) + } + + assert.Len(t, progress.Peaks, 2) + assert.GreaterOrEqual(t, progress.PercentComplete, lastPercentComplete) + lastPercentComplete = progress.PercentComplete + lastURL = progress.URL + + if err == io.EOF { + break + } + + // the fixture is a tone gradually increasing in amplitude: + assert.Greater(t, progress.Peaks[0], lastPeaks[0]) + assert.Greater(t, progress.Peaks[1], lastPeaks[1]) + lastPeaks = progress.Peaks + count++ + } + + assert.Equal(t, float32(100), lastPercentComplete) + assert.Equal(t, []int16{32_767, 32_766}, lastPeaks) + assert.Equal(t, expBins, count) + assert.Equal(t, expURL, lastURL) +} diff --git a/backend/media/get_segment_test.go b/backend/media/get_segment_test.go index 9b7d537..1fdc67c 100644 --- a/backend/media/get_segment_test.go +++ b/backend/media/get_segment_test.go @@ -4,12 +4,7 @@ import ( "bytes" "context" "errors" - "fmt" "io" - "os" - "os/exec" - "strconv" - "strings" "testing" "git.netflux.io/rob/clipper/config" @@ -24,79 +19,6 @@ import ( "go.uber.org/zap" ) -func fixtureReader(t *testing.T, fixturePath string, limit int64) io.ReadCloser { - fptr, err := os.Open(fixturePath) - require.NoError(t, err) - - // limitReader to make the mock work realistically, not intended for assertions: - return struct { - io.Reader - io.Closer - }{ - Reader: io.LimitReader(fptr, limit), - Closer: fptr, - } -} - -func helperCommand(t *testing.T, wantCommand, stdoutFile, stderrString string, forceExitCode int) media.CommandFunc { - return func(ctx context.Context, name string, args ...string) *exec.Cmd { - cs := []string{"-test.run=TestHelperProcess", "--", name} - cs = append(cs, args...) - cmd := exec.CommandContext(ctx, os.Args[0], cs...) - cmd.Env = []string{ - "GO_WANT_HELPER_PROCESS=1", - "GO_WANT_COMMAND=" + wantCommand, - "GO_STDOUT_FILE=" + stdoutFile, - "GO_STDERR_STRING=" + stderrString, - "GO_FORCE_EXIT_CODE=" + strconv.Itoa(forceExitCode), - } - return cmd - } -} - -func TestHelperProcess(t *testing.T) { - if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { - return - } - - defer func() { - // Stop the helper process writing to stdout after the test has finished. - // This prevents it from writing the "PASS" string which is unwanted in - // this context. - if !t.Failed() { - os.Stdout, _ = os.Open(os.DevNull) - } - }() - - if exitCode := os.Getenv("GO_FORCE_EXIT_CODE"); exitCode != "0" { - c, _ := strconv.Atoi(exitCode) - os.Stderr.WriteString(os.Getenv("GO_STDERR_STRING")) - os.Exit(c) - } - - if wantCommand := os.Getenv("GO_WANT_COMMAND"); wantCommand != "" { - gotCmd := strings.Split(strings.Join(os.Args, " "), " -- ")[1] - if wantCommand != gotCmd { - fmt.Printf("GO_WANT_COMMAND assertion failed:\nwant = %v\ngot = %v", wantCommand, gotCmd) - return - } - } - - // Copy stdin to /dev/null. This is required to avoid broken pipe errors in - // the tests: - _, err := io.Copy(io.Discard, os.Stdin) - require.NoError(t, err) - - // If an output file is provided, then copy that to stdout: - if fname := os.Getenv("GO_STDOUT_FILE"); fname != "" { - fptr, err := os.Open(fname) - require.NoError(t, err) - - _, err = io.Copy(os.Stdout, fptr) - require.NoError(t, err) - } -} - func TestGetSegment(t *testing.T) { mediaSetID := uuid.MustParse("4c440241-cca9-436f-adb0-be074588cf2b") const inFixturePath = "testdata/tone-44100-stereo-int16-30000ms.raw" diff --git a/backend/media/service.go b/backend/media/service.go index 88305b6..8f21c28 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -271,7 +271,7 @@ func (s *MediaSetService) GetPeaks(ctx context.Context, id uuid.UUID, numBins in } func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetPeaksProgressReader, error) { - audioGetter := newAudioGetter(s.store, s.youtube, s.fileStore, s.config, s.logger) + audioGetter := newAudioGetter(s.store, s.youtube, s.fileStore, s.commandFunc, s.config, s.logger) return audioGetter.GetAudio(ctx, mediaSet, numBins) } diff --git a/backend/media/test_helper_test.go b/backend/media/test_helper_test.go new file mode 100644 index 0000000..90bb50b --- /dev/null +++ b/backend/media/test_helper_test.go @@ -0,0 +1,94 @@ +package media_test + +import ( + "context" + "fmt" + "io" + "os" + "os/exec" + "strconv" + "strings" + "testing" + + "git.netflux.io/rob/clipper/media" + "github.com/stretchr/testify/require" +) + +// fixtureReader loads a fixture into a ReadCloser with the provided limit. +func fixtureReader(t *testing.T, fixturePath string, limit int64) io.ReadCloser { + fptr, err := os.Open(fixturePath) + require.NoError(t, err) + + // limitReader to make the mock work realistically, not intended for assertions: + return struct { + io.Reader + io.Closer + }{ + Reader: io.LimitReader(fptr, limit), + Closer: fptr, + } +} + +// helperCommand returns a function that builds an *exec.Cmd which executes a +// test function in order to act as a mock process. +func helperCommand(t *testing.T, wantCommand, stdoutFile, stderrString string, forceExitCode int) media.CommandFunc { + return func(ctx context.Context, name string, args ...string) *exec.Cmd { + cs := []string{"-test.run=TestHelperProcess", "--", name} + cs = append(cs, args...) + cmd := exec.CommandContext(ctx, os.Args[0], cs...) + cmd.Env = []string{ + "GO_WANT_HELPER_PROCESS=1", + "GO_WANT_COMMAND=" + wantCommand, + "GO_STDOUT_FILE=" + stdoutFile, + "GO_STDERR_STRING=" + stderrString, + "GO_FORCE_EXIT_CODE=" + strconv.Itoa(forceExitCode), + } + return cmd + } +} + +// TestHelperProcess is the body for the mock executable process built by +// helperCommand. +func TestHelperProcess(t *testing.T) { + if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { + return + } + + defer func() { + // Stop the helper process writing to stdout after the test has finished. + // This prevents it from writing the "PASS" string which is unwanted in + // this context. + if !t.Failed() { + os.Stdout, _ = os.Open(os.DevNull) + } + }() + + if exitCode := os.Getenv("GO_FORCE_EXIT_CODE"); exitCode != "0" { + c, _ := strconv.Atoi(exitCode) + os.Stderr.WriteString(os.Getenv("GO_STDERR_STRING")) + os.Exit(c) + } + + if wantCommand := os.Getenv("GO_WANT_COMMAND"); wantCommand != "" { + gotCmd := strings.Split(strings.Join(os.Args, " "), " -- ")[1] + if wantCommand != gotCmd { + fmt.Fprintf(os.Stderr, "GO_WANT_COMMAND assertion failed:\nwant = %v\ngot = %v", wantCommand, gotCmd) + t.Fail() // necessary to make the test fail + } + } + + // Copy stdin to /dev/null. This is required to avoid broken pipe errors in + // the tests: + _, err := io.Copy(io.Discard, os.Stdin) + require.NoError(t, err) + + // If an output file is provided, then copy that to stdout: + if fname := os.Getenv("GO_STDOUT_FILE"); fname != "" { + fptr, err := os.Open(fname) + require.NoError(t, err) + defer fptr.Close() + + _, err = io.Copy(os.Stdout, fptr) + require.NoError(t, err) + } +}