Revert "FileStore.PutObject: Accept io.ReadCloser"
This turned out to actually make testing more difficult, as the
FileStore objects are generally mocked themselves and moving the Close()
call inside them introduced IO problems in the test suite.
This reverts commit a063f85eca
.
This commit is contained in:
parent
a063f85eca
commit
176a1cd8c1
|
@ -74,9 +74,8 @@ func (s *FileSystemStore) GetURL(ctx context.Context, key string) (string, error
|
||||||
return url.String(), nil
|
return url.String(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutObject writes an object to the local filesystem. It will close r after
|
// PutObject writes an object to the local filesystem.
|
||||||
// consuming it.
|
func (s *FileSystemStore) PutObject(ctx context.Context, key string, r io.Reader, _ string) (int64, error) {
|
||||||
func (s *FileSystemStore) PutObject(ctx context.Context, key string, r io.ReadCloser, _ string) (int64, error) {
|
|
||||||
path := filepath.Join(s.rootPath, key)
|
path := filepath.Join(s.rootPath, key)
|
||||||
if err := os.MkdirAll(filepath.Dir(path), 0777); err != nil {
|
if err := os.MkdirAll(filepath.Dir(path), 0777); err != nil {
|
||||||
return 0, fmt.Errorf("error creating directories: %v", err)
|
return 0, fmt.Errorf("error creating directories: %v", err)
|
||||||
|
@ -90,10 +89,5 @@ func (s *FileSystemStore) PutObject(ctx context.Context, key string, r io.ReadCl
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return n, fmt.Errorf("error writing file: %v", err)
|
return n, fmt.Errorf("error writing file: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.Close(); err != nil {
|
|
||||||
return n, fmt.Errorf("error closing reader: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package filestore_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
@ -153,7 +152,7 @@ func TestFileStorePutObject(t *testing.T) {
|
||||||
store, err := filestore.NewFileSystemStore(rootPath, "/")
|
store, err := filestore.NewFileSystemStore(rootPath, "/")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
n, err := store.PutObject(context.Background(), tc.key, io.NopCloser(strings.NewReader(tc.content)), "text/plain")
|
n, err := store.PutObject(context.Background(), tc.key, strings.NewReader(tc.content), "text/plain")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
content, err := os.ReadFile(path.Join(rootPath, tc.key))
|
content, err := os.ReadFile(path.Join(rootPath, tc.key))
|
||||||
|
|
|
@ -104,7 +104,7 @@ func (s *S3FileStore) GetURL(ctx context.Context, key string) (string, error) {
|
||||||
|
|
||||||
// PutObject uploads an object using multipart upload, returning the number of
|
// PutObject uploads an object using multipart upload, returning the number of
|
||||||
// bytes uploaded and any error.
|
// bytes uploaded and any error.
|
||||||
func (s *S3FileStore) PutObject(ctx context.Context, key string, r io.ReadCloser, contentType string) (int64, error) {
|
func (s *S3FileStore) PutObject(ctx context.Context, key string, r io.Reader, contentType string) (int64, error) {
|
||||||
const (
|
const (
|
||||||
targetPartSizeBytes = 5 * 1024 * 1024 // 5MB
|
targetPartSizeBytes = 5 * 1024 * 1024 // 5MB
|
||||||
readBufferSizeBytes = 32_768 // 32Kb
|
readBufferSizeBytes = 32_768 // 32Kb
|
||||||
|
@ -249,10 +249,6 @@ outer:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = r.Close(); err != nil {
|
|
||||||
return 0, fmt.Errorf("error closing reader: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(results) == 0 {
|
if len(results) == 0 {
|
||||||
return 0, errors.New("no parts available to upload")
|
return 0, errors.New("no parts available to upload")
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,7 +90,6 @@ func TestS3GetURL(t *testing.T) {
|
||||||
|
|
||||||
type testReader struct {
|
type testReader struct {
|
||||||
count, exp int
|
count, exp int
|
||||||
closed bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *testReader) Read(p []byte) (int, error) {
|
func (r *testReader) Read(p []byte) (int, error) {
|
||||||
|
@ -103,11 +102,6 @@ func (r *testReader) Read(p []byte) (int, error) {
|
||||||
return len(p), nil
|
return len(p), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *testReader) Close() error {
|
|
||||||
r.closed = true
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestS3PutObject(t *testing.T) {
|
func TestS3PutObject(t *testing.T) {
|
||||||
const (
|
const (
|
||||||
bucket = "some-bucket"
|
bucket = "some-bucket"
|
||||||
|
@ -141,12 +135,10 @@ func TestS3PutObject(t *testing.T) {
|
||||||
|
|
||||||
store := filestore.NewS3FileStore(filestore.S3API{S3Client: s3Client}, bucket, time.Hour, zap.NewNop().Sugar())
|
store := filestore.NewS3FileStore(filestore.S3API{S3Client: s3Client}, bucket, time.Hour, zap.NewNop().Sugar())
|
||||||
|
|
||||||
reader := &testReader{exp: contentLength}
|
n, err := store.PutObject(context.Background(), key, &testReader{exp: contentLength}, contentType)
|
||||||
n, err := store.PutObject(context.Background(), key, reader, contentType)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, int64(contentLength), n)
|
assert.Equal(t, int64(contentLength), n)
|
||||||
assert.ElementsMatch(t, []int64{5_242_880, 5_242_880, 5_242_880, 4_271_360}, partLengths)
|
assert.ElementsMatch(t, []int64{5_242_880, 5_242_880, 5_242_880, 4_271_360}, partLengths)
|
||||||
assert.True(t, reader.closed)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("NOK,UploadPartFailure", func(t *testing.T) {
|
t.Run("NOK,UploadPartFailure", func(t *testing.T) {
|
||||||
|
|
|
@ -82,18 +82,18 @@ func (_m *FileStore) GetURL(ctx context.Context, key string) (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutObject provides a mock function with given fields: ctx, key, reader, contentType
|
// PutObject provides a mock function with given fields: ctx, key, reader, contentType
|
||||||
func (_m *FileStore) PutObject(ctx context.Context, key string, reader io.ReadCloser, contentType string) (int64, error) {
|
func (_m *FileStore) PutObject(ctx context.Context, key string, reader io.Reader, contentType string) (int64, error) {
|
||||||
ret := _m.Called(ctx, key, reader, contentType)
|
ret := _m.Called(ctx, key, reader, contentType)
|
||||||
|
|
||||||
var r0 int64
|
var r0 int64
|
||||||
if rf, ok := ret.Get(0).(func(context.Context, string, io.ReadCloser, string) int64); ok {
|
if rf, ok := ret.Get(0).(func(context.Context, string, io.Reader, string) int64); ok {
|
||||||
r0 = rf(ctx, key, reader, contentType)
|
r0 = rf(ctx, key, reader, contentType)
|
||||||
} else {
|
} else {
|
||||||
r0 = ret.Get(0).(int64)
|
r0 = ret.Get(0).(int64)
|
||||||
}
|
}
|
||||||
|
|
||||||
var r1 error
|
var r1 error
|
||||||
if rf, ok := ret.Get(1).(func(context.Context, string, io.ReadCloser, string) error); ok {
|
if rf, ok := ret.Get(1).(func(context.Context, string, io.Reader, string) error); ok {
|
||||||
r1 = rf(ctx, key, reader, contentType)
|
r1 = rf(ctx, key, reader, contentType)
|
||||||
} else {
|
} else {
|
||||||
r1 = ret.Error(1)
|
r1 = ret.Error(1)
|
||||||
|
|
|
@ -145,8 +145,8 @@ func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaS
|
||||||
// TODO: use mediaSet func to fetch key
|
// TODO: use mediaSet func to fetch key
|
||||||
key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID)
|
key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID)
|
||||||
|
|
||||||
bytesUploaded, rawErr := s.fileStore.PutObject(ctx, key, readCloser{io.TeeReader(stdout, s), stdout}, rawAudioMimeType)
|
teeReader := io.TeeReader(stdout, s)
|
||||||
|
bytesUploaded, rawErr := s.fileStore.PutObject(ctx, key, teeReader, rawAudioMimeType)
|
||||||
if rawErr != nil {
|
if rawErr != nil {
|
||||||
s.CloseWithError(fmt.Errorf("error uploading raw audio: %v", rawErr))
|
s.CloseWithError(fmt.Errorf("error uploading raw audio: %v", rawErr))
|
||||||
return
|
return
|
||||||
|
|
|
@ -30,7 +30,7 @@ type videoGetter struct {
|
||||||
type videoGetterState struct {
|
type videoGetterState struct {
|
||||||
*videoGetter
|
*videoGetter
|
||||||
|
|
||||||
r io.ReadCloser
|
r io.Reader
|
||||||
count, exp int64
|
count, exp int64
|
||||||
mediaSetID uuid.UUID
|
mediaSetID uuid.UUID
|
||||||
key, contentType string
|
key, contentType string
|
||||||
|
@ -47,10 +47,10 @@ func newVideoGetter(store Store, fileStore FileStore, logger *zap.SugaredLogger)
|
||||||
// specified key and content type. The returned reader must have its Next()
|
// specified key and content type. The returned reader must have its Next()
|
||||||
// method called until error = io.EOF, otherwise a deadlock or other resource
|
// method called until error = io.EOF, otherwise a deadlock or other resource
|
||||||
// leakage is likely.
|
// leakage is likely.
|
||||||
func (g *videoGetter) GetVideo(ctx context.Context, r io.ReadCloser, exp int64, mediaSetID uuid.UUID, key, contentType string) (GetVideoProgressReader, error) {
|
func (g *videoGetter) GetVideo(ctx context.Context, r io.Reader, exp int64, mediaSetID uuid.UUID, key, contentType string) (GetVideoProgressReader, error) {
|
||||||
s := &videoGetterState{
|
s := &videoGetterState{
|
||||||
videoGetter: g,
|
videoGetter: g,
|
||||||
r: r,
|
r: newLogProgressReader(r, "video", exp, g.logger),
|
||||||
exp: exp,
|
exp: exp,
|
||||||
mediaSetID: mediaSetID,
|
mediaSetID: mediaSetID,
|
||||||
key: key,
|
key: key,
|
||||||
|
@ -75,8 +75,8 @@ func (s *videoGetterState) Write(p []byte) (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *videoGetterState) getVideo(ctx context.Context) {
|
func (s *videoGetterState) getVideo(ctx context.Context) {
|
||||||
progressReader := newLogProgressReader(s.r, "video", s.exp, s.logger)
|
teeReader := io.TeeReader(s.r, s)
|
||||||
teeReader := readCloser{io.TeeReader(progressReader, s), s.r}
|
|
||||||
_, err := s.fileStore.PutObject(ctx, s.key, teeReader, s.contentType)
|
_, err := s.fileStore.PutObject(ctx, s.key, teeReader, s.contentType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.errorChan <- fmt.Errorf("error uploading to file store: %v", err)
|
s.errorChan <- fmt.Errorf("error uploading to file store: %v", err)
|
||||||
|
|
|
@ -78,7 +78,7 @@ func (s *MediaSetService) getThumbnailFromYoutube(ctx context.Context, mediaSet
|
||||||
thumbnailKey := fmt.Sprintf("media_sets/%s/thumbnail.jpg", mediaSet.ID)
|
thumbnailKey := fmt.Sprintf("media_sets/%s/thumbnail.jpg", mediaSet.ID)
|
||||||
|
|
||||||
const mimeType = "application/jpeg"
|
const mimeType = "application/jpeg"
|
||||||
_, err = s.fileStore.PutObject(ctx, thumbnailKey, io.NopCloser(bytes.NewReader(imageData)), mimeType)
|
_, err = s.fileStore.PutObject(ctx, thumbnailKey, bytes.NewReader(imageData), mimeType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return VideoThumbnail{}, fmt.Errorf("error uploading thumbnail: %v", err)
|
return VideoThumbnail{}, fmt.Errorf("error uploading thumbnail: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,7 @@ type FileStore interface {
|
||||||
GetObject(ctx context.Context, key string) (io.ReadCloser, error)
|
GetObject(ctx context.Context, key string) (io.ReadCloser, error)
|
||||||
GetObjectWithRange(ctx context.Context, key string, startFrame, endFrame int64) (io.ReadCloser, error)
|
GetObjectWithRange(ctx context.Context, key string, startFrame, endFrame int64) (io.ReadCloser, error)
|
||||||
GetURL(ctx context.Context, key string) (string, error)
|
GetURL(ctx context.Context, key string) (string, error)
|
||||||
PutObject(ctx context.Context, key string, reader io.ReadCloser, contentType string) (int64, error)
|
PutObject(ctx context.Context, key string, reader io.Reader, contentType string) (int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// YoutubeClient wraps the youtube.Client client.
|
// YoutubeClient wraps the youtube.Client client.
|
||||||
|
@ -74,8 +74,3 @@ type YoutubeClient interface {
|
||||||
GetVideoContext(ctx context.Context, id string) (*youtubev2.Video, error)
|
GetVideoContext(ctx context.Context, id string) (*youtubev2.Video, error)
|
||||||
GetStreamContext(ctx context.Context, video *youtubev2.Video, format *youtubev2.Format) (io.ReadCloser, int64, error)
|
GetStreamContext(ctx context.Context, video *youtubev2.Video, format *youtubev2.Format) (io.ReadCloser, int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type readCloser struct {
|
|
||||||
io.Reader
|
|
||||||
io.Closer
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue