Upload parts in background

This commit is contained in:
Rob Watson 2021-11-08 02:54:43 +01:00
parent c1ac075a88
commit 06697dc1b1
2 changed files with 108 additions and 58 deletions

View File

@ -433,6 +433,8 @@ outer:
} }
if err != nil { if err != nil {
log.Printf("error in upload flow: %v", err)
newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel() defer cancel()

View File

@ -3,8 +3,10 @@ package media
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"log" "log"
"sync"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3"
@ -20,12 +22,19 @@ import (
// Failure to do so will leave S3 in an inconsistent state. // Failure to do so will leave S3 in an inconsistent state.
type multipartUploadWriter struct { type multipartUploadWriter struct {
ctx context.Context ctx context.Context
wg sync.WaitGroup
s3 S3Client s3 S3Client
buf *bytes.Buffer buf *bytes.Buffer
bucket, key, contentType string bucket, key, contentType string
partNum int32
uploadResults chan uploadResult
uploadID string uploadID string
completedParts []types.CompletedPart }
bytesUploaded int64
type uploadResult struct {
completedPart types.CompletedPart
size int64
err error
} }
const targetPartSizeBytes = 5 * 1024 * 1024 // 5MB const targetPartSizeBytes = 5 * 1024 * 1024 // 5MB
@ -55,6 +64,8 @@ func newMultipartUploadWriter(ctx context.Context, s3Client S3Client, bucket, ke
bucket: bucket, bucket: bucket,
key: key, key: key,
contentType: contentType, contentType: contentType,
partNum: 1,
uploadResults: make(chan uploadResult),
uploadID: *output.UploadId, uploadID: *output.UploadId,
}, nil }, nil
} }
@ -66,14 +77,30 @@ func (u *multipartUploadWriter) Write(p []byte) (int, error) {
} }
if u.buf.Len() >= targetPartSizeBytes { if u.buf.Len() >= targetPartSizeBytes {
partLen := u.buf.Len() buf := make([]byte, u.buf.Len())
log.Printf("uploading part num = %d, len = %d", u.partNum(), partLen) copy(buf, u.buf.Bytes())
u.buf.Truncate(0)
u.wg.Add(1)
go u.uploadPart(buf, u.partNum)
u.partNum++
}
return n, err
}
func (u *multipartUploadWriter) uploadPart(buf []byte, partNum int32) {
defer u.wg.Done()
partLen := len(buf)
log.Printf("uploading part num = %d, len = %d", partNum, partLen)
input := s3.UploadPartInput{ input := s3.UploadPartInput{
Body: u.buf, Body: bytes.NewReader(buf),
Bucket: aws.String(u.bucket), Bucket: aws.String(u.bucket),
Key: aws.String(u.key), Key: aws.String(u.key),
PartNumber: u.partNum(), PartNumber: partNum,
UploadId: aws.String(u.uploadID), UploadId: aws.String(u.uploadID),
ContentLength: int64(partLen), ContentLength: int64(partLen),
} }
@ -81,20 +108,67 @@ func (u *multipartUploadWriter) Write(p []byte) (int, error) {
output, uploadErr := u.s3.UploadPart(u.ctx, &input) output, uploadErr := u.s3.UploadPart(u.ctx, &input)
if uploadErr != nil { if uploadErr != nil {
// TODO: retry on failure // TODO: retry on failure
return n, fmt.Errorf("error uploading part: %v", uploadErr) u.uploadResults <- uploadResult{err: fmt.Errorf("error uploading part: %v", uploadErr)}
return
} }
log.Printf("uploaded part num = %d, etag = %s, bytes = %d", u.partNum(), *output.ETag, partLen) log.Printf("uploaded part num = %d, etag = %s, bytes = %d", partNum, *output.ETag, partLen)
u.completedParts = append(u.completedParts, types.CompletedPart{ETag: output.ETag, PartNumber: u.partNum()}) u.uploadResults <- uploadResult{
u.bytesUploaded += int64(partLen) completedPart: types.CompletedPart{ETag: output.ETag, PartNumber: partNum},
size: int64(partLen),
}
} }
return n, err // Complete waits for all currently uploading parts to be uploaded, and
// finalizes the object in S3. Close() must be called first.
func (u *multipartUploadWriter) Complete() (int64, error) {
var completedParts []types.CompletedPart
var uploadedBytes int64
// we wait for all parts to be completed before collecting the results:
wgDone := make(chan struct{})
go func() {
u.wg.Wait()
close(u.uploadResults)
wgDone <- struct{}{}
}()
outer:
for {
select {
case uploadResult := <-u.uploadResults:
if uploadResult.err != nil {
return 0, uploadResult.err
} }
func (u *multipartUploadWriter) partNum() int32 { completedParts = append(completedParts, uploadResult.completedPart)
return int32(len(u.completedParts) + 1) uploadedBytes += uploadResult.size
case <-wgDone:
break outer
case <-u.ctx.Done():
return 0, u.ctx.Err()
}
}
if len(completedParts) == 0 {
return 0, errors.New("no parts available to upload")
}
input := s3.CompleteMultipartUploadInput{
Bucket: aws.String(u.bucket),
Key: aws.String(u.key),
UploadId: aws.String(u.uploadID),
MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts},
}
_, err := u.s3.CompleteMultipartUpload(u.ctx, &input)
if err != nil {
return 0, fmt.Errorf("error completing upload: %v", err)
}
log.Printf("completed upload, key = %s, bytesUploaded = %d", u.key, uploadedBytes)
return uploadedBytes, nil
} }
// Abort aborts the upload process, cancelling the upload on S3. It accepts a // Abort aborts the upload process, cancelling the upload on S3. It accepts a
@ -112,33 +186,7 @@ func (u *multipartUploadWriter) Abort(ctx context.Context) error {
return fmt.Errorf("error aborting upload: %v", err) return fmt.Errorf("error aborting upload: %v", err)
} }
log.Printf("aborted upload, key = %s, bytesUploaded = %d", u.key, u.bytesUploaded) log.Printf("aborted upload, key = %s", u.key)
return nil return nil
} }
// Complete completes the upload process, finalizing the upload on S3.
// If no parts have been successfully uploaded, then Abort() will be called
// transparently.
func (u *multipartUploadWriter) Complete() (int64, error) {
if len(u.completedParts) == 0 {
return 0, u.Abort(u.ctx)
}
input := s3.CompleteMultipartUploadInput{
Bucket: aws.String(u.bucket),
Key: aws.String(u.key),
UploadId: aws.String(u.uploadID),
MultipartUpload: &types.CompletedMultipartUpload{
Parts: u.completedParts,
},
}
_, err := u.s3.CompleteMultipartUpload(u.ctx, &input)
if err != nil {
return 0, fmt.Errorf("error completing upload: %v", err)
}
log.Printf("completed upload, key = %s, bytesUploaded = %d", u.key, u.bytesUploaded)
return u.bytesUploaded, nil
}