Move uploader reader to its own goroutine

This commit is contained in:
Rob Watson 2021-11-12 10:05:57 +01:00
parent 79be8b7936
commit 97db31209c
1 changed files with 49 additions and 38 deletions

View File

@ -30,7 +30,7 @@ type readResult struct {
const ( const (
targetPartSizeBytes = 5 * 1024 * 1024 // 5MB targetPartSizeBytes = 5 * 1024 * 1024 // 5MB
bufferOverflowSize = 16_384 // 16Kb readBufferSizeBytes = 32_768 // 32Kb
) )
func newMultipartUploader(s3Client S3Client) *multipartUploader { func newMultipartUploader(s3Client S3Client) *multipartUploader {
@ -64,7 +64,13 @@ func (u *multipartUploader) Upload(ctx context.Context, r io.Reader, bucket, key
UploadId: output.UploadId, UploadId: output.UploadId,
} }
_, abortErr := u.s3.AbortMultipartUpload(ctx, &input) // if the context was cancelled, just use the background context.
ctxToUse := ctx
if ctxToUse.Err() != nil {
ctxToUse = context.Background()
}
_, abortErr := u.s3.AbortMultipartUpload(ctxToUse, &input)
if abortErr != nil { if abortErr != nil {
log.Printf("error aborting upload: %v", abortErr) log.Printf("error aborting upload: %v", abortErr)
} else { } else {
@ -108,51 +114,32 @@ func (u *multipartUploader) Upload(ctx context.Context, r io.Reader, bucket, key
wgDone := make(chan struct{}) wgDone := make(chan struct{})
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) // done when the reader returns EOF wg.Add(1) // done when the reader goroutine returns
go func() { go func() {
wg.Wait() wg.Wait()
wgDone <- struct{}{} wgDone <- struct{}{}
}() }()
readChan := make(chan readResult) readChan := make(chan error, 1)
buf := make([]byte, 32_768)
go func() {
for {
var rr readResult
rr.n, rr.err = r.Read(buf)
readChan <- rr
if rr.err != nil { go func() {
return defer wg.Done()
}
}
}()
var closing bool var closing bool
currPart := bytes.NewBuffer(make([]byte, 0, targetPartSizeBytes+bufferOverflowSize)) currPart := bytes.NewBuffer(make([]byte, 0, targetPartSizeBytes+readBufferSizeBytes))
partNum := int32(1) partNum := int32(1)
results := make([]uploadResult, 0, 64) buf := make([]byte, readBufferSizeBytes)
outer:
for { for {
select { n, readErr := r.Read(buf)
case uploadResult := <-uploadResultChan: if readErr == io.EOF {
results = append(results, uploadResult)
case uploadErr := <-uploadErrorChan:
return 0, fmt.Errorf("error while uploading part: %v", uploadErr)
case <-wgDone:
break outer
case <-ctx.Done():
return 0, ctx.Err()
case readResult := <-readChan:
if readResult.err == io.EOF {
wg.Done()
closing = true closing = true
} else if readResult.err != nil { } else if readErr != nil {
return 0, fmt.Errorf("reader error: %v", readResult.err) readChan <- readErr
return
} }
_, _ = currPart.Write(buf[:readResult.n]) _, _ = currPart.Write(buf[:n])
if closing || currPart.Len() >= targetPartSizeBytes { if closing || currPart.Len() >= targetPartSizeBytes {
part := make([]byte, currPart.Len()) part := make([]byte, currPart.Len())
copy(part, currPart.Bytes()) copy(part, currPart.Bytes())
@ -162,6 +149,30 @@ outer:
go uploadPart(&wg, part, partNum) go uploadPart(&wg, part, partNum)
partNum++ partNum++
} }
if closing {
return
}
}
}()
results := make([]uploadResult, 0, 64)
outer:
for {
select {
case readErr := <-readChan:
if readErr != io.EOF {
return 0, fmt.Errorf("reader error: %v", readErr)
}
case uploadResult := <-uploadResultChan:
results = append(results, uploadResult)
case uploadErr := <-uploadErrorChan:
return 0, fmt.Errorf("error while uploading part: %v", uploadErr)
case <-ctx.Done():
return 0, ctx.Err()
case <-wgDone:
break outer
} }
} }