2021-12-07 19:58:11 +00:00
|
|
|
package filestore
|
|
|
|
|
2021-12-13 15:51:40 +00:00
|
|
|
//go:generate mockery --recursive --name S3Client --output ../generated/mocks
|
|
|
|
//go:generate mockery --recursive --name S3PresignClient --output ../generated/mocks
|
|
|
|
|
2021-12-07 19:58:11 +00:00
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"sort"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
|
|
signerv4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
// S3API provides an API to AWS S3.
|
|
|
|
type S3API struct {
|
|
|
|
S3Client
|
|
|
|
S3PresignClient
|
|
|
|
}
|
|
|
|
|
|
|
|
// S3Client wraps the AWS S3 service client.
|
|
|
|
type S3Client interface {
|
|
|
|
GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error)
|
|
|
|
CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
|
|
|
|
UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
|
|
|
|
AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
|
|
|
|
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// S3PresignClient wraps the AWS S3 Presign client.
|
|
|
|
type S3PresignClient interface {
|
|
|
|
PresignGetObject(context.Context, *s3.GetObjectInput, ...func(*s3.PresignOptions)) (*signerv4.PresignedHTTPRequest, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// S3FileStore stores files on Amazon S3.
|
|
|
|
type S3FileStore struct {
|
|
|
|
s3 S3API
|
|
|
|
bucket string
|
|
|
|
urlExpiry time.Duration
|
|
|
|
logger *zap.SugaredLogger
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewS3FileStore builds a new S3FileStore using the provided configuration.
|
|
|
|
func NewS3FileStore(s3API S3API, bucket string, urlExpiry time.Duration, logger *zap.SugaredLogger) *S3FileStore {
|
|
|
|
return &S3FileStore{
|
|
|
|
s3: s3API,
|
|
|
|
bucket: bucket,
|
|
|
|
urlExpiry: urlExpiry,
|
|
|
|
logger: logger,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetObject returns an io.Reader that returns an object associated with the
|
|
|
|
// provided key.
|
|
|
|
func (s *S3FileStore) GetObject(ctx context.Context, key string) (io.ReadCloser, error) {
|
|
|
|
input := s3.GetObjectInput{
|
|
|
|
Bucket: aws.String(s.bucket),
|
|
|
|
Key: aws.String(key),
|
|
|
|
}
|
|
|
|
output, err := s.s3.GetObject(ctx, &input)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("error getting object from s3: %v", err)
|
|
|
|
}
|
|
|
|
return output.Body, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetObjectWithRange returns an io.Reader that returns a partial object
|
|
|
|
// associated with the provided key.
|
|
|
|
func (s *S3FileStore) GetObjectWithRange(ctx context.Context, key string, start, end int64) (io.ReadCloser, error) {
|
|
|
|
byteRange := fmt.Sprintf("bytes=%d-%d", start, end)
|
|
|
|
input := s3.GetObjectInput{
|
|
|
|
Bucket: aws.String(s.bucket),
|
|
|
|
Key: aws.String(key),
|
|
|
|
Range: aws.String(byteRange),
|
|
|
|
}
|
|
|
|
output, err := s.s3.GetObject(ctx, &input)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("error getting object from s3: %v", err)
|
|
|
|
}
|
|
|
|
return output.Body, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetURL returns a presigned URL pointing to the object associated with the
|
|
|
|
// provided key.
|
|
|
|
func (s *S3FileStore) GetURL(ctx context.Context, key string) (string, error) {
|
|
|
|
input := s3.GetObjectInput{
|
|
|
|
Bucket: aws.String(s.bucket),
|
|
|
|
Key: aws.String(key),
|
|
|
|
}
|
|
|
|
request, err := s.s3.PresignGetObject(ctx, &input, s3.WithPresignExpires(s.urlExpiry))
|
|
|
|
if err != nil {
|
|
|
|
return "", fmt.Errorf("error generating presigned URL: %v", err)
|
|
|
|
}
|
|
|
|
return request.URL, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// PutObject uploads an object using multipart upload, returning the number of
|
|
|
|
// bytes uploaded and any error.
|
|
|
|
func (s *S3FileStore) PutObject(ctx context.Context, key string, r io.Reader, contentType string) (int64, error) {
|
|
|
|
const (
|
|
|
|
targetPartSizeBytes = 5 * 1024 * 1024 // 5MB
|
|
|
|
readBufferSizeBytes = 32_768 // 32Kb
|
|
|
|
)
|
|
|
|
|
|
|
|
var uploaded bool
|
|
|
|
|
|
|
|
input := s3.CreateMultipartUploadInput{
|
|
|
|
Bucket: aws.String(s.bucket),
|
|
|
|
Key: aws.String(key),
|
|
|
|
ContentType: aws.String(contentType),
|
|
|
|
}
|
|
|
|
output, err := s.s3.CreateMultipartUpload(ctx, &input)
|
|
|
|
if err != nil {
|
|
|
|
return 0, fmt.Errorf("error creating multipart upload: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// abort the upload if possible, logging any errors, on exit.
|
|
|
|
defer func() {
|
|
|
|
if uploaded {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
input := s3.AbortMultipartUploadInput{
|
|
|
|
Bucket: aws.String(s.bucket),
|
|
|
|
Key: aws.String(key),
|
|
|
|
UploadId: output.UploadId,
|
|
|
|
}
|
|
|
|
|
|
|
|
// if the context was cancelled, just use the background context.
|
|
|
|
ctxToUse := ctx
|
|
|
|
if ctxToUse.Err() != nil {
|
|
|
|
ctxToUse = context.Background()
|
|
|
|
}
|
|
|
|
|
|
|
|
_, deferErr := s.s3.AbortMultipartUpload(ctxToUse, &input)
|
|
|
|
if deferErr != nil {
|
|
|
|
s.logger.Errorf("uploader: error aborting upload: %v", deferErr)
|
|
|
|
} else {
|
|
|
|
s.logger.Infof("aborted upload, key = %s", key)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
type uploadedPart struct {
|
|
|
|
part types.CompletedPart
|
|
|
|
size int64
|
|
|
|
}
|
|
|
|
uploadResultChan := make(chan uploadedPart)
|
|
|
|
uploadErrorChan := make(chan error, 1)
|
|
|
|
|
|
|
|
// uploadPart uploads an individual part.
|
|
|
|
uploadPart := func(wg *sync.WaitGroup, buf []byte, partNum int32) {
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
partLen := int64(len(buf))
|
|
|
|
s.logger.With("key", key, "partNum", partNum, "partLen", partLen).Debug("uploading part")
|
|
|
|
|
|
|
|
input := s3.UploadPartInput{
|
|
|
|
Body: bytes.NewReader(buf),
|
|
|
|
Bucket: aws.String(s.bucket),
|
|
|
|
Key: aws.String(key),
|
|
|
|
PartNumber: partNum,
|
|
|
|
UploadId: output.UploadId,
|
|
|
|
ContentLength: partLen,
|
|
|
|
}
|
|
|
|
|
|
|
|
output, uploadErr := s.s3.UploadPart(ctx, &input)
|
|
|
|
if uploadErr != nil {
|
|
|
|
// TODO: retry on failure
|
|
|
|
uploadErrorChan <- uploadErr
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
s.logger.With("key", key, "partNum", partNum, "partLen", partLen, "etag", *output.ETag).Debug("uploaded part")
|
|
|
|
|
|
|
|
uploadResultChan <- uploadedPart{
|
|
|
|
part: types.CompletedPart{ETag: output.ETag, PartNumber: partNum},
|
|
|
|
size: partLen,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
wgDone := make(chan struct{})
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1) // done when the reader goroutine returns
|
|
|
|
go func() {
|
|
|
|
wg.Wait()
|
|
|
|
wgDone <- struct{}{}
|
|
|
|
}()
|
|
|
|
|
|
|
|
readChan := make(chan error, 1)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
var closing bool
|
|
|
|
currPart := bytes.NewBuffer(make([]byte, 0, targetPartSizeBytes+readBufferSizeBytes))
|
|
|
|
partNum := int32(1)
|
|
|
|
buf := make([]byte, readBufferSizeBytes)
|
|
|
|
|
|
|
|
for {
|
|
|
|
n, readErr := r.Read(buf)
|
|
|
|
if readErr == io.EOF {
|
|
|
|
closing = true
|
|
|
|
} else if readErr != nil {
|
|
|
|
readChan <- readErr
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
_, _ = currPart.Write(buf[:n])
|
|
|
|
if closing || currPart.Len() >= targetPartSizeBytes {
|
|
|
|
part := make([]byte, currPart.Len())
|
|
|
|
copy(part, currPart.Bytes())
|
|
|
|
currPart.Truncate(0)
|
|
|
|
|
|
|
|
wg.Add(1)
|
|
|
|
go uploadPart(&wg, part, partNum)
|
|
|
|
partNum++
|
|
|
|
}
|
|
|
|
|
|
|
|
if closing {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
results := make([]uploadedPart, 0, 64)
|
|
|
|
|
|
|
|
outer:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case readErr := <-readChan:
|
|
|
|
if readErr != io.EOF {
|
|
|
|
return 0, fmt.Errorf("reader error: %v", readErr)
|
|
|
|
}
|
|
|
|
case uploadResult := <-uploadResultChan:
|
|
|
|
results = append(results, uploadResult)
|
|
|
|
case uploadErr := <-uploadErrorChan:
|
|
|
|
return 0, fmt.Errorf("error while uploading part: %v", uploadErr)
|
|
|
|
case <-ctx.Done():
|
|
|
|
return 0, ctx.Err()
|
|
|
|
case <-wgDone:
|
|
|
|
break outer
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(results) == 0 {
|
|
|
|
return 0, errors.New("no parts available to upload")
|
|
|
|
}
|
|
|
|
|
|
|
|
completedParts := make([]types.CompletedPart, 0, 64)
|
|
|
|
var uploadedBytes int64
|
|
|
|
for _, result := range results {
|
|
|
|
completedParts = append(completedParts, result.part)
|
|
|
|
uploadedBytes += result.size
|
|
|
|
}
|
|
|
|
|
|
|
|
// the parts may be out of order, especially with slow network conditions:
|
|
|
|
sort.Slice(completedParts, func(i, j int) bool {
|
|
|
|
return completedParts[i].PartNumber < completedParts[j].PartNumber
|
|
|
|
})
|
|
|
|
|
|
|
|
completeInput := s3.CompleteMultipartUploadInput{
|
|
|
|
Bucket: aws.String(s.bucket),
|
|
|
|
Key: aws.String(key),
|
|
|
|
UploadId: output.UploadId,
|
|
|
|
MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts},
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err = s.s3.CompleteMultipartUpload(ctx, &completeInput); err != nil {
|
|
|
|
return 0, fmt.Errorf("error completing upload: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
s.logger.With("key", key, "numParts", len(completedParts), "len", uploadedBytes).Debug("completed upload")
|
|
|
|
uploaded = true
|
|
|
|
|
|
|
|
return uploadedBytes, nil
|
|
|
|
}
|