From 06697dc1b14ace95ebca8850789968e91fbfe0ca Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Mon, 8 Nov 2021 02:54:43 +0100 Subject: [PATCH] Upload parts in background --- backend/media/service.go | 2 + backend/media/uploader.go | 164 ++++++++++++++++++++++++-------------- 2 files changed, 108 insertions(+), 58 deletions(-) diff --git a/backend/media/service.go b/backend/media/service.go index 50c4cef..a58aecf 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -433,6 +433,8 @@ outer: } if err != nil { + log.Printf("error in upload flow: %v", err) + newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() diff --git a/backend/media/uploader.go b/backend/media/uploader.go index caa5508..9da152e 100644 --- a/backend/media/uploader.go +++ b/backend/media/uploader.go @@ -3,8 +3,10 @@ package media import ( "bytes" "context" + "errors" "fmt" "log" + "sync" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -20,12 +22,19 @@ import ( // Failure to do so will leave S3 in an inconsistent state. type multipartUploadWriter struct { ctx context.Context + wg sync.WaitGroup s3 S3Client buf *bytes.Buffer bucket, key, contentType string + partNum int32 + uploadResults chan uploadResult uploadID string - completedParts []types.CompletedPart - bytesUploaded int64 +} + +type uploadResult struct { + completedPart types.CompletedPart + size int64 + err error } const targetPartSizeBytes = 5 * 1024 * 1024 // 5MB @@ -49,13 +58,15 @@ func newMultipartUploadWriter(ctx context.Context, s3Client S3Client, bucket, ke b := make([]byte, 0, targetPartSizeBytes+bufferOverflowSize) return &multipartUploadWriter{ - ctx: ctx, - s3: s3Client, - buf: bytes.NewBuffer(b), - bucket: bucket, - key: key, - contentType: contentType, - uploadID: *output.UploadId, + ctx: ctx, + s3: s3Client, + buf: bytes.NewBuffer(b), + bucket: bucket, + key: key, + contentType: contentType, + partNum: 1, + uploadResults: make(chan uploadResult), + uploadID: *output.UploadId, }, nil } @@ -66,35 +77,98 @@ func (u *multipartUploadWriter) Write(p []byte) (int, error) { } if u.buf.Len() >= targetPartSizeBytes { - partLen := u.buf.Len() - log.Printf("uploading part num = %d, len = %d", u.partNum(), partLen) + buf := make([]byte, u.buf.Len()) + copy(buf, u.buf.Bytes()) + u.buf.Truncate(0) - input := s3.UploadPartInput{ - Body: u.buf, - Bucket: aws.String(u.bucket), - Key: aws.String(u.key), - PartNumber: u.partNum(), - UploadId: aws.String(u.uploadID), - ContentLength: int64(partLen), - } + u.wg.Add(1) + go u.uploadPart(buf, u.partNum) - output, uploadErr := u.s3.UploadPart(u.ctx, &input) - if uploadErr != nil { - // TODO: retry on failure - return n, fmt.Errorf("error uploading part: %v", uploadErr) - } - - log.Printf("uploaded part num = %d, etag = %s, bytes = %d", u.partNum(), *output.ETag, partLen) - - u.completedParts = append(u.completedParts, types.CompletedPart{ETag: output.ETag, PartNumber: u.partNum()}) - u.bytesUploaded += int64(partLen) + u.partNum++ } return n, err } -func (u *multipartUploadWriter) partNum() int32 { - return int32(len(u.completedParts) + 1) +func (u *multipartUploadWriter) uploadPart(buf []byte, partNum int32) { + defer u.wg.Done() + + partLen := len(buf) + log.Printf("uploading part num = %d, len = %d", partNum, partLen) + + input := s3.UploadPartInput{ + Body: bytes.NewReader(buf), + Bucket: aws.String(u.bucket), + Key: aws.String(u.key), + PartNumber: partNum, + UploadId: aws.String(u.uploadID), + ContentLength: int64(partLen), + } + + output, uploadErr := u.s3.UploadPart(u.ctx, &input) + if uploadErr != nil { + // TODO: retry on failure + u.uploadResults <- uploadResult{err: fmt.Errorf("error uploading part: %v", uploadErr)} + return + } + + log.Printf("uploaded part num = %d, etag = %s, bytes = %d", partNum, *output.ETag, partLen) + + u.uploadResults <- uploadResult{ + completedPart: types.CompletedPart{ETag: output.ETag, PartNumber: partNum}, + size: int64(partLen), + } +} + +// Complete waits for all currently uploading parts to be uploaded, and +// finalizes the object in S3. Close() must be called first. +func (u *multipartUploadWriter) Complete() (int64, error) { + var completedParts []types.CompletedPart + var uploadedBytes int64 + + // we wait for all parts to be completed before collecting the results: + wgDone := make(chan struct{}) + go func() { + u.wg.Wait() + close(u.uploadResults) + wgDone <- struct{}{} + }() + +outer: + for { + select { + case uploadResult := <-u.uploadResults: + if uploadResult.err != nil { + return 0, uploadResult.err + } + + completedParts = append(completedParts, uploadResult.completedPart) + uploadedBytes += uploadResult.size + case <-wgDone: + break outer + case <-u.ctx.Done(): + return 0, u.ctx.Err() + } + } + + if len(completedParts) == 0 { + return 0, errors.New("no parts available to upload") + } + + input := s3.CompleteMultipartUploadInput{ + Bucket: aws.String(u.bucket), + Key: aws.String(u.key), + UploadId: aws.String(u.uploadID), + MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts}, + } + + _, err := u.s3.CompleteMultipartUpload(u.ctx, &input) + if err != nil { + return 0, fmt.Errorf("error completing upload: %v", err) + } + + log.Printf("completed upload, key = %s, bytesUploaded = %d", u.key, uploadedBytes) + return uploadedBytes, nil } // Abort aborts the upload process, cancelling the upload on S3. It accepts a @@ -112,33 +186,7 @@ func (u *multipartUploadWriter) Abort(ctx context.Context) error { return fmt.Errorf("error aborting upload: %v", err) } - log.Printf("aborted upload, key = %s, bytesUploaded = %d", u.key, u.bytesUploaded) + log.Printf("aborted upload, key = %s", u.key) return nil } - -// Complete completes the upload process, finalizing the upload on S3. -// If no parts have been successfully uploaded, then Abort() will be called -// transparently. -func (u *multipartUploadWriter) Complete() (int64, error) { - if len(u.completedParts) == 0 { - return 0, u.Abort(u.ctx) - } - - input := s3.CompleteMultipartUploadInput{ - Bucket: aws.String(u.bucket), - Key: aws.String(u.key), - UploadId: aws.String(u.uploadID), - MultipartUpload: &types.CompletedMultipartUpload{ - Parts: u.completedParts, - }, - } - - _, err := u.s3.CompleteMultipartUpload(u.ctx, &input) - if err != nil { - return 0, fmt.Errorf("error completing upload: %v", err) - } - - log.Printf("completed upload, key = %s, bytesUploaded = %d", u.key, u.bytesUploaded) - return u.bytesUploaded, nil -}