diff --git a/backend/media/get_video.go b/backend/media/get_video.go index f75dd20..f4060b5 100644 --- a/backend/media/get_video.go +++ b/backend/media/get_video.go @@ -30,7 +30,7 @@ type videoGetter struct { type videoGetterState struct { *videoGetter - r io.Reader + r io.ReadCloser count, exp int64 mediaSetID uuid.UUID key, contentType string @@ -45,12 +45,14 @@ func newVideoGetter(store Store, fileStore FileStore, logger *zap.SugaredLogger) // GetVideo gets video from Youtube and uploads it to a filestore using the // specified key and content type. The returned reader must have its Next() -// method called until error = io.EOF, otherwise a deadlock or other resource +// method called until err == io.EOF, otherwise a deadlock or other resource // leakage is likely. -func (g *videoGetter) GetVideo(ctx context.Context, r io.Reader, exp int64, mediaSetID uuid.UUID, key, contentType string) (GetVideoProgressReader, error) { +// +// GetVideo will consume and close r. +func (g *videoGetter) GetVideo(ctx context.Context, r io.ReadCloser, exp int64, mediaSetID uuid.UUID, key, contentType string) (GetVideoProgressReader, error) { s := &videoGetterState{ videoGetter: g, - r: newLogProgressReader(r, "video", exp, g.logger), + r: r, exp: exp, mediaSetID: mediaSetID, key: key, @@ -75,7 +77,8 @@ func (s *videoGetterState) Write(p []byte) (int, error) { } func (s *videoGetterState) getVideo(ctx context.Context) { - teeReader := io.TeeReader(s.r, s) + logReader := newLogProgressReader(s.r, "video", s.exp, s.logger) + teeReader := io.TeeReader(logReader, s) _, err := s.fileStore.PutObject(ctx, s.key, teeReader, s.contentType) if err != nil { @@ -83,9 +86,15 @@ func (s *videoGetterState) getVideo(ctx context.Context) { return } + if err = s.r.Close(); err != nil { + s.errorChan <- fmt.Errorf("error closing video stream: %v", err) + return + } + s.url, err = s.fileStore.GetURL(ctx, s.key) if err != nil { s.errorChan <- fmt.Errorf("error getting object URL: %v", err) + return } storeParams := store.SetVideoUploadedParams{ @@ -95,6 +104,7 @@ func (s *videoGetterState) getVideo(ctx context.Context) { _, err = s.store.SetVideoUploaded(ctx, storeParams) if err != nil { s.errorChan <- fmt.Errorf("error saving to store: %v", err) + return } close(s.progressChan) diff --git a/backend/media/get_video_test.go b/backend/media/get_video_test.go index 076e46f..c1f3fdf 100644 --- a/backend/media/get_video_test.go +++ b/backend/media/get_video_test.go @@ -1,10 +1,12 @@ package media_test import ( + "bytes" "context" "database/sql" "errors" "io" + "strings" "testing" "time" @@ -13,16 +15,216 @@ import ( "git.netflux.io/rob/clipper/generated/store" "git.netflux.io/rob/clipper/media" "github.com/google/uuid" + "github.com/kkdai/youtube/v2" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" ) +func TestGetVideoFromYoutube(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop().Sugar() + + const ( + itag = 234 + videoID = "video001" + videoMimeType = "video/mp4" + videoContentLength = int64(100_000) + ) + mediaSetID := uuid.New() + mediaSet := store.MediaSet{ + ID: mediaSetID, + YoutubeID: videoID, + VideoYoutubeItag: int32(itag), + } + + video := &youtube.Video{ + ID: videoID, + Formats: []youtube.Format{{ItagNo: itag, FPS: 30, AudioChannels: 0, MimeType: videoMimeType, ContentLength: videoContentLength}}, + } + + t.Run("NOK,ErrorFetchingVideo", func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil) + + var youtubeClient mocks.YoutubeClient + youtubeClient.On("GetVideoContext", ctx, videoID).Return(nil, errors.New("nope")) + + service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, config.Config{}, logger) + _, err := service.GetVideo(ctx, mediaSetID) + assert.EqualError(t, err, "error fetching video: nope") + }) + + t.Run("NOK,ErrorFetchingStream", func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil) + + var youtubeClient mocks.YoutubeClient + youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil) + youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(nil, int64(0), errors.New("network failure")) + + service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, config.Config{}, logger) + _, err := service.GetVideo(ctx, mediaSetID) + assert.EqualError(t, err, "error fetching stream: network failure") + }) + + t.Run("NOK,ErrorPuttingObjectInFileStore", func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil) + + encodedContent := "a video stream" + reader := io.NopCloser(strings.NewReader(encodedContent)) + + var youtubeClient mocks.YoutubeClient + youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil) + youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil) + + var fileStore mocks.FileStore + fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).Return(int64(0), errors.New("error storing object")) + + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger) + stream, err := service.GetVideo(ctx, mediaSetID) + require.NoError(t, err) + + _, err = stream.Next() + assert.EqualError(t, err, "error waiting for progress: error uploading to file store: error storing object") + }) + + t.Run("NOK,ErrorClosingStream", func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil) + + encodedContent := "a video stream" + reader := errorCloser{Reader: strings.NewReader(encodedContent)} + + var youtubeClient mocks.YoutubeClient + youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil) + youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil) + + var fileStore mocks.FileStore + fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).Return(int64(len(encodedContent)), nil) + + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger) + stream, err := service.GetVideo(ctx, mediaSetID) + require.NoError(t, err) + + _, err = stream.Next() + assert.EqualError(t, err, "error waiting for progress: error closing video stream: close error") + }) + + t.Run("NOK,ErrorGettingObjectURL", func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil) + + encodedContent := "a video stream" + reader := io.NopCloser(strings.NewReader(encodedContent)) + + var youtubeClient mocks.YoutubeClient + youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil) + youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil) + + var fileStore mocks.FileStore + fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).Return(int64(len(encodedContent)), nil) + fileStore.On("GetURL", ctx, mock.Anything).Return("", errors.New("URL error")) + + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger) + stream, err := service.GetVideo(ctx, mediaSetID) + require.NoError(t, err) + + _, err = stream.Next() + assert.EqualError(t, err, "error waiting for progress: error getting object URL: URL error") + }) + + t.Run("NOK,ErrorUpdatingStore", func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil) + mockStore.On("SetVideoUploaded", ctx, mock.Anything).Return(mediaSet, errors.New("boom")) + + encodedContent := "a video stream" + reader := io.NopCloser(strings.NewReader(encodedContent)) + + var youtubeClient mocks.YoutubeClient + youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil) + youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil) + + var fileStore mocks.FileStore + fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).Return(int64(len(encodedContent)), nil) + fileStore.On("GetURL", ctx, mock.Anything).Return("a url", nil) + + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger) + stream, err := service.GetVideo(ctx, mediaSetID) + require.NoError(t, err) + + _, err = stream.Next() + assert.EqualError(t, err, "error waiting for progress: error saving to store: boom") + }) + + t.Run("OK", func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil) + mockStore.On("SetVideoUploaded", ctx, mock.Anything).Return(mediaSet, nil) + defer mockStore.AssertExpectations(t) + + encodedContent := make([]byte, videoContentLength) + reader := io.NopCloser(bytes.NewReader(encodedContent)) + + var youtubeClient mocks.YoutubeClient + youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil) + youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, videoContentLength, nil) + defer youtubeClient.AssertExpectations(t) + + var fileStore mocks.FileStore + fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType). + Run(func(args mock.Arguments) { + n, err := io.Copy(io.Discard, args[2].(io.Reader)) + require.NoError(t, err) + assert.Equal(t, videoContentLength, n) + }). + Return(videoContentLength, nil) + fileStore.On("GetURL", ctx, mock.Anything).Return("a url", nil) + defer fileStore.AssertExpectations(t) + + service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger) + stream, err := service.GetVideo(ctx, mediaSetID) + require.NoError(t, err) + + var ( + lastPercentComplete float32 + lastURL string + ) + + for { + progress, err := stream.Next() + if err != io.EOF { + require.NoError(t, err) + } + + assert.GreaterOrEqual(t, progress.PercentComplete, lastPercentComplete) + lastPercentComplete = progress.PercentComplete + lastURL = progress.URL + + if err == io.EOF { + break + } + } + + assert.Equal(t, float32(100), lastPercentComplete) + assert.Equal(t, "a url", lastURL) + }) +} + +type errorCloser struct { + io.Reader +} + +func (c errorCloser) Close() error { return errors.New("close error") } + func TestGetVideoFromFileStore(t *testing.T) { ctx := context.Background() logger := zap.NewNop().Sugar() - videoID := "video001" + videoID := "video002" mediaSetID := uuid.New() mediaSet := store.MediaSet{ ID: mediaSetID,