Add test coverage for getVideoFromYoutube flow
continuous-integration/drone/push Build is passing
Details
continuous-integration/drone/push Build is passing
Details
This commit is contained in:
parent
932648a44b
commit
6cb462f769
|
@ -30,7 +30,7 @@ type videoGetter struct {
|
|||
type videoGetterState struct {
|
||||
*videoGetter
|
||||
|
||||
r io.Reader
|
||||
r io.ReadCloser
|
||||
count, exp int64
|
||||
mediaSetID uuid.UUID
|
||||
key, contentType string
|
||||
|
@ -45,12 +45,14 @@ func newVideoGetter(store Store, fileStore FileStore, logger *zap.SugaredLogger)
|
|||
|
||||
// GetVideo gets video from Youtube and uploads it to a filestore using the
|
||||
// specified key and content type. The returned reader must have its Next()
|
||||
// method called until error = io.EOF, otherwise a deadlock or other resource
|
||||
// method called until err == io.EOF, otherwise a deadlock or other resource
|
||||
// leakage is likely.
|
||||
func (g *videoGetter) GetVideo(ctx context.Context, r io.Reader, exp int64, mediaSetID uuid.UUID, key, contentType string) (GetVideoProgressReader, error) {
|
||||
//
|
||||
// GetVideo will consume and close r.
|
||||
func (g *videoGetter) GetVideo(ctx context.Context, r io.ReadCloser, exp int64, mediaSetID uuid.UUID, key, contentType string) (GetVideoProgressReader, error) {
|
||||
s := &videoGetterState{
|
||||
videoGetter: g,
|
||||
r: newLogProgressReader(r, "video", exp, g.logger),
|
||||
r: r,
|
||||
exp: exp,
|
||||
mediaSetID: mediaSetID,
|
||||
key: key,
|
||||
|
@ -75,7 +77,8 @@ func (s *videoGetterState) Write(p []byte) (int, error) {
|
|||
}
|
||||
|
||||
func (s *videoGetterState) getVideo(ctx context.Context) {
|
||||
teeReader := io.TeeReader(s.r, s)
|
||||
logReader := newLogProgressReader(s.r, "video", s.exp, s.logger)
|
||||
teeReader := io.TeeReader(logReader, s)
|
||||
|
||||
_, err := s.fileStore.PutObject(ctx, s.key, teeReader, s.contentType)
|
||||
if err != nil {
|
||||
|
@ -83,9 +86,15 @@ func (s *videoGetterState) getVideo(ctx context.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
if err = s.r.Close(); err != nil {
|
||||
s.errorChan <- fmt.Errorf("error closing video stream: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
s.url, err = s.fileStore.GetURL(ctx, s.key)
|
||||
if err != nil {
|
||||
s.errorChan <- fmt.Errorf("error getting object URL: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
storeParams := store.SetVideoUploadedParams{
|
||||
|
@ -95,6 +104,7 @@ func (s *videoGetterState) getVideo(ctx context.Context) {
|
|||
_, err = s.store.SetVideoUploaded(ctx, storeParams)
|
||||
if err != nil {
|
||||
s.errorChan <- fmt.Errorf("error saving to store: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
close(s.progressChan)
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package media_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -13,16 +15,216 @@ import (
|
|||
"git.netflux.io/rob/clipper/generated/store"
|
||||
"git.netflux.io/rob/clipper/media"
|
||||
"github.com/google/uuid"
|
||||
"github.com/kkdai/youtube/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestGetVideoFromYoutube(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
logger := zap.NewNop().Sugar()
|
||||
|
||||
const (
|
||||
itag = 234
|
||||
videoID = "video001"
|
||||
videoMimeType = "video/mp4"
|
||||
videoContentLength = int64(100_000)
|
||||
)
|
||||
mediaSetID := uuid.New()
|
||||
mediaSet := store.MediaSet{
|
||||
ID: mediaSetID,
|
||||
YoutubeID: videoID,
|
||||
VideoYoutubeItag: int32(itag),
|
||||
}
|
||||
|
||||
video := &youtube.Video{
|
||||
ID: videoID,
|
||||
Formats: []youtube.Format{{ItagNo: itag, FPS: 30, AudioChannels: 0, MimeType: videoMimeType, ContentLength: videoContentLength}},
|
||||
}
|
||||
|
||||
t.Run("NOK,ErrorFetchingVideo", func(t *testing.T) {
|
||||
var mockStore mocks.Store
|
||||
mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil)
|
||||
|
||||
var youtubeClient mocks.YoutubeClient
|
||||
youtubeClient.On("GetVideoContext", ctx, videoID).Return(nil, errors.New("nope"))
|
||||
|
||||
service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, config.Config{}, logger)
|
||||
_, err := service.GetVideo(ctx, mediaSetID)
|
||||
assert.EqualError(t, err, "error fetching video: nope")
|
||||
})
|
||||
|
||||
t.Run("NOK,ErrorFetchingStream", func(t *testing.T) {
|
||||
var mockStore mocks.Store
|
||||
mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil)
|
||||
|
||||
var youtubeClient mocks.YoutubeClient
|
||||
youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil)
|
||||
youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(nil, int64(0), errors.New("network failure"))
|
||||
|
||||
service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, config.Config{}, logger)
|
||||
_, err := service.GetVideo(ctx, mediaSetID)
|
||||
assert.EqualError(t, err, "error fetching stream: network failure")
|
||||
})
|
||||
|
||||
t.Run("NOK,ErrorPuttingObjectInFileStore", func(t *testing.T) {
|
||||
var mockStore mocks.Store
|
||||
mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil)
|
||||
|
||||
encodedContent := "a video stream"
|
||||
reader := io.NopCloser(strings.NewReader(encodedContent))
|
||||
|
||||
var youtubeClient mocks.YoutubeClient
|
||||
youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil)
|
||||
youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil)
|
||||
|
||||
var fileStore mocks.FileStore
|
||||
fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).Return(int64(0), errors.New("error storing object"))
|
||||
|
||||
service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger)
|
||||
stream, err := service.GetVideo(ctx, mediaSetID)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = stream.Next()
|
||||
assert.EqualError(t, err, "error waiting for progress: error uploading to file store: error storing object")
|
||||
})
|
||||
|
||||
t.Run("NOK,ErrorClosingStream", func(t *testing.T) {
|
||||
var mockStore mocks.Store
|
||||
mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil)
|
||||
|
||||
encodedContent := "a video stream"
|
||||
reader := errorCloser{Reader: strings.NewReader(encodedContent)}
|
||||
|
||||
var youtubeClient mocks.YoutubeClient
|
||||
youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil)
|
||||
youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil)
|
||||
|
||||
var fileStore mocks.FileStore
|
||||
fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).Return(int64(len(encodedContent)), nil)
|
||||
|
||||
service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger)
|
||||
stream, err := service.GetVideo(ctx, mediaSetID)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = stream.Next()
|
||||
assert.EqualError(t, err, "error waiting for progress: error closing video stream: close error")
|
||||
})
|
||||
|
||||
t.Run("NOK,ErrorGettingObjectURL", func(t *testing.T) {
|
||||
var mockStore mocks.Store
|
||||
mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil)
|
||||
|
||||
encodedContent := "a video stream"
|
||||
reader := io.NopCloser(strings.NewReader(encodedContent))
|
||||
|
||||
var youtubeClient mocks.YoutubeClient
|
||||
youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil)
|
||||
youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil)
|
||||
|
||||
var fileStore mocks.FileStore
|
||||
fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).Return(int64(len(encodedContent)), nil)
|
||||
fileStore.On("GetURL", ctx, mock.Anything).Return("", errors.New("URL error"))
|
||||
|
||||
service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger)
|
||||
stream, err := service.GetVideo(ctx, mediaSetID)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = stream.Next()
|
||||
assert.EqualError(t, err, "error waiting for progress: error getting object URL: URL error")
|
||||
})
|
||||
|
||||
t.Run("NOK,ErrorUpdatingStore", func(t *testing.T) {
|
||||
var mockStore mocks.Store
|
||||
mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil)
|
||||
mockStore.On("SetVideoUploaded", ctx, mock.Anything).Return(mediaSet, errors.New("boom"))
|
||||
|
||||
encodedContent := "a video stream"
|
||||
reader := io.NopCloser(strings.NewReader(encodedContent))
|
||||
|
||||
var youtubeClient mocks.YoutubeClient
|
||||
youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil)
|
||||
youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil)
|
||||
|
||||
var fileStore mocks.FileStore
|
||||
fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).Return(int64(len(encodedContent)), nil)
|
||||
fileStore.On("GetURL", ctx, mock.Anything).Return("a url", nil)
|
||||
|
||||
service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger)
|
||||
stream, err := service.GetVideo(ctx, mediaSetID)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = stream.Next()
|
||||
assert.EqualError(t, err, "error waiting for progress: error saving to store: boom")
|
||||
})
|
||||
|
||||
t.Run("OK", func(t *testing.T) {
|
||||
var mockStore mocks.Store
|
||||
mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil)
|
||||
mockStore.On("SetVideoUploaded", ctx, mock.Anything).Return(mediaSet, nil)
|
||||
defer mockStore.AssertExpectations(t)
|
||||
|
||||
encodedContent := make([]byte, videoContentLength)
|
||||
reader := io.NopCloser(bytes.NewReader(encodedContent))
|
||||
|
||||
var youtubeClient mocks.YoutubeClient
|
||||
youtubeClient.On("GetVideoContext", ctx, videoID).Return(video, nil)
|
||||
youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, videoContentLength, nil)
|
||||
defer youtubeClient.AssertExpectations(t)
|
||||
|
||||
var fileStore mocks.FileStore
|
||||
fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, videoMimeType).
|
||||
Run(func(args mock.Arguments) {
|
||||
n, err := io.Copy(io.Discard, args[2].(io.Reader))
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, videoContentLength, n)
|
||||
}).
|
||||
Return(videoContentLength, nil)
|
||||
fileStore.On("GetURL", ctx, mock.Anything).Return("a url", nil)
|
||||
defer fileStore.AssertExpectations(t)
|
||||
|
||||
service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, nil, config.Config{}, logger)
|
||||
stream, err := service.GetVideo(ctx, mediaSetID)
|
||||
require.NoError(t, err)
|
||||
|
||||
var (
|
||||
lastPercentComplete float32
|
||||
lastURL string
|
||||
)
|
||||
|
||||
for {
|
||||
progress, err := stream.Next()
|
||||
if err != io.EOF {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
assert.GreaterOrEqual(t, progress.PercentComplete, lastPercentComplete)
|
||||
lastPercentComplete = progress.PercentComplete
|
||||
lastURL = progress.URL
|
||||
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
assert.Equal(t, float32(100), lastPercentComplete)
|
||||
assert.Equal(t, "a url", lastURL)
|
||||
})
|
||||
}
|
||||
|
||||
type errorCloser struct {
|
||||
io.Reader
|
||||
}
|
||||
|
||||
func (c errorCloser) Close() error { return errors.New("close error") }
|
||||
|
||||
func TestGetVideoFromFileStore(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
logger := zap.NewNop().Sugar()
|
||||
|
||||
videoID := "video001"
|
||||
videoID := "video002"
|
||||
mediaSetID := uuid.New()
|
||||
mediaSet := store.MediaSet{
|
||||
ID: mediaSetID,
|
||||
|
|
Loading…
Reference in New Issue