From 176a1cd8c17e14a6f54269505893760231968b8a Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Mon, 3 Jan 2022 13:32:39 +0100 Subject: [PATCH] Revert "FileStore.PutObject: Accept io.ReadCloser" This turned out to actually make testing more difficult, as the FileStore objects are generally mocked themselves and moving the Close() call inside them introduced IO problems in the test suite. This reverts commit a063f85eca5dd3f983bc9b790fd68108820cc730. --- backend/filestore/fs.go | 10 ++-------- backend/filestore/fs_test.go | 3 +-- backend/filestore/s3.go | 6 +----- backend/filestore/s3_test.go | 10 +--------- backend/generated/mocks/FileStore.go | 6 +++--- backend/media/get_audio.go | 4 ++-- backend/media/get_video.go | 10 +++++----- backend/media/thumbnail.go | 2 +- backend/media/types.go | 7 +------ 9 files changed, 17 insertions(+), 41 deletions(-) diff --git a/backend/filestore/fs.go b/backend/filestore/fs.go index 63b38b5..337988f 100644 --- a/backend/filestore/fs.go +++ b/backend/filestore/fs.go @@ -74,9 +74,8 @@ func (s *FileSystemStore) GetURL(ctx context.Context, key string) (string, error return url.String(), nil } -// PutObject writes an object to the local filesystem. It will close r after -// consuming it. -func (s *FileSystemStore) PutObject(ctx context.Context, key string, r io.ReadCloser, _ string) (int64, error) { +// PutObject writes an object to the local filesystem. +func (s *FileSystemStore) PutObject(ctx context.Context, key string, r io.Reader, _ string) (int64, error) { path := filepath.Join(s.rootPath, key) if err := os.MkdirAll(filepath.Dir(path), 0777); err != nil { return 0, fmt.Errorf("error creating directories: %v", err) @@ -90,10 +89,5 @@ func (s *FileSystemStore) PutObject(ctx context.Context, key string, r io.ReadCl if err != nil { return n, fmt.Errorf("error writing file: %v", err) } - - if err := r.Close(); err != nil { - return n, fmt.Errorf("error closing reader: %v", err) - } - return n, nil } diff --git a/backend/filestore/fs_test.go b/backend/filestore/fs_test.go index f73ed2d..14f248c 100644 --- a/backend/filestore/fs_test.go +++ b/backend/filestore/fs_test.go @@ -2,7 +2,6 @@ package filestore_test import ( "context" - "io" "io/ioutil" "os" "path" @@ -153,7 +152,7 @@ func TestFileStorePutObject(t *testing.T) { store, err := filestore.NewFileSystemStore(rootPath, "/") require.NoError(t, err) - n, err := store.PutObject(context.Background(), tc.key, io.NopCloser(strings.NewReader(tc.content)), "text/plain") + n, err := store.PutObject(context.Background(), tc.key, strings.NewReader(tc.content), "text/plain") require.NoError(t, err) content, err := os.ReadFile(path.Join(rootPath, tc.key)) diff --git a/backend/filestore/s3.go b/backend/filestore/s3.go index bca3772..bac5ddd 100644 --- a/backend/filestore/s3.go +++ b/backend/filestore/s3.go @@ -104,7 +104,7 @@ func (s *S3FileStore) GetURL(ctx context.Context, key string) (string, error) { // PutObject uploads an object using multipart upload, returning the number of // bytes uploaded and any error. -func (s *S3FileStore) PutObject(ctx context.Context, key string, r io.ReadCloser, contentType string) (int64, error) { +func (s *S3FileStore) PutObject(ctx context.Context, key string, r io.Reader, contentType string) (int64, error) { const ( targetPartSizeBytes = 5 * 1024 * 1024 // 5MB readBufferSizeBytes = 32_768 // 32Kb @@ -249,10 +249,6 @@ outer: } } - if err = r.Close(); err != nil { - return 0, fmt.Errorf("error closing reader: %v", err) - } - if len(results) == 0 { return 0, errors.New("no parts available to upload") } diff --git a/backend/filestore/s3_test.go b/backend/filestore/s3_test.go index b2ab3e5..3434c87 100644 --- a/backend/filestore/s3_test.go +++ b/backend/filestore/s3_test.go @@ -90,7 +90,6 @@ func TestS3GetURL(t *testing.T) { type testReader struct { count, exp int - closed bool } func (r *testReader) Read(p []byte) (int, error) { @@ -103,11 +102,6 @@ func (r *testReader) Read(p []byte) (int, error) { return len(p), nil } -func (r *testReader) Close() error { - r.closed = true - return nil -} - func TestS3PutObject(t *testing.T) { const ( bucket = "some-bucket" @@ -141,12 +135,10 @@ func TestS3PutObject(t *testing.T) { store := filestore.NewS3FileStore(filestore.S3API{S3Client: s3Client}, bucket, time.Hour, zap.NewNop().Sugar()) - reader := &testReader{exp: contentLength} - n, err := store.PutObject(context.Background(), key, reader, contentType) + n, err := store.PutObject(context.Background(), key, &testReader{exp: contentLength}, contentType) require.NoError(t, err) assert.Equal(t, int64(contentLength), n) assert.ElementsMatch(t, []int64{5_242_880, 5_242_880, 5_242_880, 4_271_360}, partLengths) - assert.True(t, reader.closed) }) t.Run("NOK,UploadPartFailure", func(t *testing.T) { diff --git a/backend/generated/mocks/FileStore.go b/backend/generated/mocks/FileStore.go index bc4ca90..3c6c2c3 100644 --- a/backend/generated/mocks/FileStore.go +++ b/backend/generated/mocks/FileStore.go @@ -82,18 +82,18 @@ func (_m *FileStore) GetURL(ctx context.Context, key string) (string, error) { } // PutObject provides a mock function with given fields: ctx, key, reader, contentType -func (_m *FileStore) PutObject(ctx context.Context, key string, reader io.ReadCloser, contentType string) (int64, error) { +func (_m *FileStore) PutObject(ctx context.Context, key string, reader io.Reader, contentType string) (int64, error) { ret := _m.Called(ctx, key, reader, contentType) var r0 int64 - if rf, ok := ret.Get(0).(func(context.Context, string, io.ReadCloser, string) int64); ok { + if rf, ok := ret.Get(0).(func(context.Context, string, io.Reader, string) int64); ok { r0 = rf(ctx, key, reader, contentType) } else { r0 = ret.Get(0).(int64) } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, string, io.ReadCloser, string) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, string, io.Reader, string) error); ok { r1 = rf(ctx, key, reader, contentType) } else { r1 = ret.Error(1) diff --git a/backend/media/get_audio.go b/backend/media/get_audio.go index d14a9bd..fb0a43b 100644 --- a/backend/media/get_audio.go +++ b/backend/media/get_audio.go @@ -145,8 +145,8 @@ func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaS // TODO: use mediaSet func to fetch key key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) - bytesUploaded, rawErr := s.fileStore.PutObject(ctx, key, readCloser{io.TeeReader(stdout, s), stdout}, rawAudioMimeType) - + teeReader := io.TeeReader(stdout, s) + bytesUploaded, rawErr := s.fileStore.PutObject(ctx, key, teeReader, rawAudioMimeType) if rawErr != nil { s.CloseWithError(fmt.Errorf("error uploading raw audio: %v", rawErr)) return diff --git a/backend/media/get_video.go b/backend/media/get_video.go index 9ad86a0..f3ef609 100644 --- a/backend/media/get_video.go +++ b/backend/media/get_video.go @@ -30,7 +30,7 @@ type videoGetter struct { type videoGetterState struct { *videoGetter - r io.ReadCloser + r io.Reader count, exp int64 mediaSetID uuid.UUID key, contentType string @@ -47,10 +47,10 @@ func newVideoGetter(store Store, fileStore FileStore, logger *zap.SugaredLogger) // specified key and content type. The returned reader must have its Next() // method called until error = io.EOF, otherwise a deadlock or other resource // leakage is likely. -func (g *videoGetter) GetVideo(ctx context.Context, r io.ReadCloser, exp int64, mediaSetID uuid.UUID, key, contentType string) (GetVideoProgressReader, error) { +func (g *videoGetter) GetVideo(ctx context.Context, r io.Reader, exp int64, mediaSetID uuid.UUID, key, contentType string) (GetVideoProgressReader, error) { s := &videoGetterState{ videoGetter: g, - r: r, + r: newLogProgressReader(r, "video", exp, g.logger), exp: exp, mediaSetID: mediaSetID, key: key, @@ -75,8 +75,8 @@ func (s *videoGetterState) Write(p []byte) (int, error) { } func (s *videoGetterState) getVideo(ctx context.Context) { - progressReader := newLogProgressReader(s.r, "video", s.exp, s.logger) - teeReader := readCloser{io.TeeReader(progressReader, s), s.r} + teeReader := io.TeeReader(s.r, s) + _, err := s.fileStore.PutObject(ctx, s.key, teeReader, s.contentType) if err != nil { s.errorChan <- fmt.Errorf("error uploading to file store: %v", err) diff --git a/backend/media/thumbnail.go b/backend/media/thumbnail.go index c586657..021946b 100644 --- a/backend/media/thumbnail.go +++ b/backend/media/thumbnail.go @@ -78,7 +78,7 @@ func (s *MediaSetService) getThumbnailFromYoutube(ctx context.Context, mediaSet thumbnailKey := fmt.Sprintf("media_sets/%s/thumbnail.jpg", mediaSet.ID) const mimeType = "application/jpeg" - _, err = s.fileStore.PutObject(ctx, thumbnailKey, io.NopCloser(bytes.NewReader(imageData)), mimeType) + _, err = s.fileStore.PutObject(ctx, thumbnailKey, bytes.NewReader(imageData), mimeType) if err != nil { return VideoThumbnail{}, fmt.Errorf("error uploading thumbnail: %v", err) } diff --git a/backend/media/types.go b/backend/media/types.go index be0a08e..429e25b 100644 --- a/backend/media/types.go +++ b/backend/media/types.go @@ -66,7 +66,7 @@ type FileStore interface { GetObject(ctx context.Context, key string) (io.ReadCloser, error) GetObjectWithRange(ctx context.Context, key string, startFrame, endFrame int64) (io.ReadCloser, error) GetURL(ctx context.Context, key string) (string, error) - PutObject(ctx context.Context, key string, reader io.ReadCloser, contentType string) (int64, error) + PutObject(ctx context.Context, key string, reader io.Reader, contentType string) (int64, error) } // YoutubeClient wraps the youtube.Client client. @@ -74,8 +74,3 @@ type YoutubeClient interface { GetVideoContext(ctx context.Context, id string) (*youtubev2.Video, error) GetStreamContext(ctx context.Context, video *youtubev2.Video, format *youtubev2.Format) (io.ReadCloser, int64, error) } - -type readCloser struct { - io.Reader - io.Closer -}