package media_test import ( "bytes" "context" "database/sql" "errors" "io" "strings" "testing" "time" "git.netflux.io/rob/clipper/config" "git.netflux.io/rob/clipper/generated/mocks" "git.netflux.io/rob/clipper/generated/store" "git.netflux.io/rob/clipper/media" "github.com/google/uuid" "github.com/kkdai/youtube/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" ) func TestGetAudioFromYoutube(t *testing.T) { const ( videoID = "abcdef12" inFixturePath = "testdata/tone-44100-stereo-int16-30000ms.raw" inFixtureLen = int64(5_292_000) inFixtureFrames = inFixtureLen / 4 // stereo-int16 ) ctx := context.Background() wp := media.NewTestWorkerPool() mediaSetID := uuid.New() mediaSet := store.MediaSet{ ID: mediaSetID, YoutubeID: videoID, AudioYoutubeItag: 123, AudioChannels: 2, AudioFramesApprox: inFixtureFrames, AudioContentLength: 22, } video := &youtube.Video{ ID: videoID, Formats: []youtube.Format{{ItagNo: 123, FPS: 0, AudioChannels: 2}}, } t.Run("NOK,ErrorFetchingMediaSet", func(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(store.MediaSet{}, errors.New("db went boom")) service := media.NewMediaSetService(&mockStore, nil, nil, nil, wp, config.Config{}, zap.NewNop().Sugar()) _, err := service.GetPeaks(ctx, mediaSetID, 10) assert.EqualError(t, err, "error getting media set: db went boom") }) t.Run("NOK,ErrorFetchingStream", func(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) var youtubeClient mocks.YoutubeClient youtubeClient.On("GetVideoContext", mock.Anything, mediaSet.YoutubeID).Return(video, nil) youtubeClient.On("GetStreamContext", mock.Anything, video, &video.Formats[0]).Return(nil, int64(0), errors.New("uh oh")) service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, wp, config.Config{}, zap.NewNop().Sugar()) _, err := service.GetPeaks(ctx, mediaSetID, 10) assert.EqualError(t, err, "error fetching stream: uh oh") }) t.Run("NOK,ErrorBuildingProgressReader", func(t *testing.T) { invalidMediaSet := mediaSet invalidMediaSet.AudioChannels = 0 var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(invalidMediaSet, nil) var youtubeClient mocks.YoutubeClient youtubeClient.On("GetVideoContext", mock.Anything, mediaSet.YoutubeID).Return(video, nil) youtubeClient.On("GetStreamContext", mock.Anything, video, &video.Formats[0]).Return(nil, int64(0), nil) service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, wp, config.Config{}, zap.NewNop().Sugar()) _, err := service.GetPeaks(ctx, mediaSetID, 10) assert.EqualError(t, err, "error building progress reader: error creating audio progress reader (framesExpected = 1323000, channels = 0, numBins = 10)") }) t.Run("NOK,UploadError", func(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) mockStore.On("SetEncodedAudioUploaded", mock.Anything, mock.Anything).Return(mediaSet, nil) var youtubeClient mocks.YoutubeClient youtubeClient.On("GetVideoContext", mock.Anything, mediaSet.YoutubeID).Return(video, nil) youtubeClient.On("GetStreamContext", mock.Anything, video, &video.Formats[0]).Return(io.NopCloser(bytes.NewReader(nil)), int64(0), nil) var fileStore mocks.FileStore fileStore.On("PutObject", mock.Anything, mock.Anything, mock.Anything, "audio/raw").Return(int64(0), errors.New("network error")) fileStore.On("PutObject", mock.Anything, mock.Anything, mock.Anything, "audio/opus").Return(int64(0), nil) fileStore.On("GetURL", mock.Anything, mock.Anything).Return("", nil) cmd := helperCommand(t, "", inFixturePath, "", 0) service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, wp, config.Config{}, zap.NewNop().Sugar()) stream, err := service.GetPeaks(ctx, mediaSetID, 10) assert.NoError(t, err) _, err = stream.Next() assert.EqualError(t, err, "error waiting for progress: error uploading: error uploading raw audio: network error") }) t.Run("NOK,FFmpegError", func(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) mockStore.On("SetEncodedAudioUploaded", mock.Anything, mock.Anything).Return(mediaSet, nil) mockStore.On("SetRawAudioUploaded", mock.Anything, mock.Anything).Return(mediaSet, nil) var youtubeClient mocks.YoutubeClient youtubeClient.On("GetVideoContext", mock.Anything, mediaSet.YoutubeID).Return(video, nil) youtubeClient.On("GetStreamContext", mock.Anything, video, &video.Formats[0]).Return(io.NopCloser(strings.NewReader("some audio")), int64(0), nil) var fileStore mocks.FileStore fileStore.On("PutObject", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Run(func(args mock.Arguments) { _, err := io.Copy(io.Discard, args[2].(io.Reader)) require.NoError(t, err) }). Return(int64(0), nil) fileStore.On("GetURL", mock.Anything, mock.Anything).Return("", nil) cmd := helperCommand(t, "", inFixturePath, "oh no", 101) service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, wp, config.Config{}, zap.NewNop().Sugar()) stream, err := service.GetPeaks(ctx, mediaSetID, 10) assert.NoError(t, err) _, err = stream.Next() assert.EqualError(t, err, "error waiting for progress: error waiting for command: exit status 101, output: oh no") }) t.Run("OK", func(t *testing.T) { // Mock Store var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) mockStore.On("SetRawAudioUploaded", mock.Anything, mock.MatchedBy(func(p store.SetRawAudioUploadedParams) bool { return p.ID == mediaSetID && p.AudioFrames.Int64 == inFixtureFrames })).Return(mediaSet, nil) mockStore.On("SetEncodedAudioUploaded", mock.Anything, mock.MatchedBy(func(p store.SetEncodedAudioUploadedParams) bool { return p.ID == mediaSetID })).Return(mediaSet, nil) defer mockStore.AssertExpectations(t) // Mock YoutubeClient encodedContent := "this is an opus stream" reader := io.NopCloser(strings.NewReader(encodedContent)) var youtubeClient mocks.YoutubeClient youtubeClient.On("GetVideoContext", mock.Anything, mediaSet.YoutubeID).Return(video, nil) youtubeClient.On("GetStreamContext", mock.Anything, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil) defer youtubeClient.AssertExpectations(t) // Mock FileStore // It is necessary to consume the readers passed into the mocks to avoid IO // errors. Since we're doing that we can also assert the content that is // passed to them is as expected. url := "https://www.example.com/foo" var fileStore mocks.FileStore fileStore.On("PutObject", mock.Anything, "media_sets/"+mediaSetID.String()+"/audio.opus", mock.Anything, "audio/opus"). Run(func(args mock.Arguments) { readContent, err := io.ReadAll(args[2].(io.Reader)) require.NoError(t, err) assert.Equal(t, encodedContent, string(readContent)) }). Return(int64(len(encodedContent)), nil) fileStore.On("PutObject", mock.Anything, "media_sets/"+mediaSetID.String()+"/audio.raw", mock.Anything, "audio/raw"). Run(func(args mock.Arguments) { n, err := io.Copy(io.Discard, args[2].(io.Reader)) require.NoError(t, err) assert.Equal(t, inFixtureLen, n) }). Return(inFixtureLen, nil) fileStore.On("GetURL", mock.Anything, "media_sets/"+mediaSetID.String()+"/audio.opus").Return(url, nil) defer fileStore.AssertExpectations(t) numBins := 10 cmd := helperCommand(t, "ffmpeg -hide_banner -loglevel error -i - -f s16le -ar 48000 -acodec pcm_s16le -", inFixturePath, "", 0) service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, wp, config.Config{}, zap.NewNop().Sugar()) stream, err := service.GetPeaks(ctx, mediaSetID, numBins) require.NoError(t, err) assertConsumeStream(t, numBins, url, 1_323_000, stream) }) } func TestGetPeaksFromFileStore(t *testing.T) { const ( inFixturePath = "testdata/tone-44100-stereo-int16-30000ms.raw" inFixtureLen = 5_292_000 ) ctx := context.Background() wp := media.NewTestWorkerPool() logger := zap.NewNop().Sugar() mediaSetID := uuid.New() mediaSet := store.MediaSet{ ID: mediaSetID, AudioChannels: 2, AudioFrames: sql.NullInt64{Int64: 1_323_000, Valid: true}, AudioRawS3Key: sql.NullString{String: "raw audio key", Valid: true}, AudioRawS3UploadedAt: sql.NullTime{Time: time.Now(), Valid: true}, AudioEncodedS3Key: sql.NullString{String: "encoded audio key", Valid: true}, AudioEncodedS3UploadedAt: sql.NullTime{Time: time.Now(), Valid: true}, } t.Run("NOK,ErrorFetchingMediaSet", func(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(store.MediaSet{}, errors.New("db went boom")) service := media.NewMediaSetService(&mockStore, nil, nil, nil, wp, config.Config{}, logger) _, err := service.GetPeaks(ctx, mediaSetID, 10) assert.EqualError(t, err, "error getting media set: db went boom") }) t.Run("NOK,ErrorGettingObjectFromFileStore", func(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) defer mockStore.AssertExpectations(t) var fileStore mocks.FileStore fileStore.On("GetObject", mock.Anything, "raw audio key").Return(nil, errors.New("boom")) service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, wp, config.Config{}, logger) _, err := service.GetPeaks(ctx, mediaSetID, 10) require.EqualError(t, err, "error getting object from file store: boom") }) t.Run("NOK,ErrorGettingObjectURL", func(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) defer mockStore.AssertExpectations(t) var fileStore mocks.FileStore reader := fixtureReader(t, inFixturePath, inFixtureLen) fileStore.On("GetObject", mock.Anything, "raw audio key").Return(reader, nil) fileStore.On("GetURL", mock.Anything, "encoded audio key").Return("", errors.New("network error")) defer fileStore.AssertExpectations(t) service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, wp, config.Config{}, logger) stream, err := service.GetPeaks(ctx, mediaSetID, 10) require.NoError(t, err) var hadError bool for { _, err := stream.Next() if err != nil { hadError = true assert.EqualError(t, err, "error waiting for progress: error generating object URL: network error") break } } assert.True(t, hadError) }) t.Run("OK", func(t *testing.T) { var mockStore mocks.Store mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil) defer mockStore.AssertExpectations(t) var fileStore mocks.FileStore url := "https://www.example.com/foo" reader := fixtureReader(t, inFixturePath, inFixtureLen) fileStore.On("GetObject", mock.Anything, "raw audio key").Return(reader, nil) fileStore.On("GetURL", mock.Anything, "encoded audio key").Return(url, nil) defer fileStore.AssertExpectations(t) numBins := 10 service := media.NewMediaSetService(&mockStore, nil, &fileStore, nil, wp, config.Config{}, logger) stream, err := service.GetPeaks(ctx, mediaSetID, numBins) require.NoError(t, err) assertConsumeStream(t, numBins, url, 1_323_000, stream) }) } // assertConsumeStream asserts that the stream produced by both the // from-youtube and from-filestore flows is identical. func assertConsumeStream(t *testing.T, expBins int, expURL string, expAudioFrames int64, stream media.GetPeaksProgressReader) { lastPeaks := make([]int16, 2) // stereo var ( count int lastPercentComplete float32 lastURL string lastAudioFrames int64 ) for { progress, err := stream.Next() if err != io.EOF { require.NoError(t, err) } assert.Len(t, progress.Peaks, 2) assert.GreaterOrEqual(t, progress.PercentComplete, lastPercentComplete) lastPercentComplete = progress.PercentComplete lastURL = progress.URL lastAudioFrames = progress.AudioFrames if err == io.EOF { break } // the fixture is a tone gradually increasing in amplitude: assert.Greater(t, progress.Peaks[0], lastPeaks[0]) assert.Greater(t, progress.Peaks[1], lastPeaks[1]) lastPeaks = progress.Peaks count++ } assert.Equal(t, float32(100), lastPercentComplete) assert.Equal(t, []int16{32_767, 32_766}, lastPeaks) assert.Equal(t, expBins, count) assert.Equal(t, expURL, lastURL) assert.Equal(t, expAudioFrames, lastAudioFrames) }