From dc497b546aa7d54b69e178b3f0e9eb3ac72ebf0b Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Wed, 10 Nov 2021 13:37:25 +0100 Subject: [PATCH] Experimental changes prior to refactoring Uploader --- backend/media/service.go | 34 +++++++++++++++++++++------------- backend/media/uploader.go | 36 +++++++++++++++++++++++++++++------- 2 files changed, 50 insertions(+), 20 deletions(-) diff --git a/backend/media/service.go b/backend/media/service.go index 0c0bb93..a133ef5 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -390,6 +390,11 @@ func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID go func() { _, copyErr := io.Copy(mw, s.ffmpegReader) + + // At this point, there is no more data to send to the uploader. + // We can close it safely, it always returns nil. + _ = s.uploader.Close() + done <- copyErr }() @@ -404,18 +409,6 @@ outer: } } - if err != nil { - if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil { - log.Printf("error cancelling ffmpegreader: %v", cancelErr) - } - } - - if readerErr := s.ffmpegReader.Close(); readerErr != nil { - if err == nil { - err = readerErr - } - } - var framesUploaded int64 if err == nil { if bytesUploaded, uploaderErr := s.uploader.Complete(); uploaderErr != nil { @@ -425,6 +418,21 @@ outer: } } + // If there was an error returned, the underlying ffmpegReader process may + // still be active. Kill it. + if err != nil { + if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil { + log.Printf("error cancelling ffmpegreader: %v", cancelErr) + } + } + + // Either way, we need to wait for the ffmpegReader process to exit. + if readerErr := s.ffmpegReader.Close(); readerErr != nil { + if err == nil { + err = readerErr + } + } + if err == nil { _, updateErr := s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{ ID: mediaSetID, @@ -439,7 +447,7 @@ outer: } if err != nil { - log.Printf("error in upload flow: %v", err) + log.Printf("error uploading asynchronously: %v", err) newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() diff --git a/backend/media/uploader.go b/backend/media/uploader.go index 9da152e..ebba02a 100644 --- a/backend/media/uploader.go +++ b/backend/media/uploader.go @@ -120,31 +120,50 @@ func (u *multipartUploadWriter) uploadPart(buf []byte, partNum int32) { } } +// Close signals that no further data will be written to the writer. +// Always returns nil. +func (u *multipartUploadWriter) Close() error { + // TODO: trigger Complete() here too? + close(u.uploadResults) + return nil +} + // Complete waits for all currently uploading parts to be uploaded, and -// finalizes the object in S3. Close() must be called first. +// finalizes the object in S3. +// +// Close() must have been been called first. func (u *multipartUploadWriter) Complete() (int64, error) { - var completedParts []types.CompletedPart + completedParts := make([]types.CompletedPart, 0, 64) var uploadedBytes int64 - // we wait for all parts to be completed before collecting the results: - wgDone := make(chan struct{}) + // Write() launches multiple goroutines to upload the parts asynchronously. + // We need a waitgroup to ensure that all parts are complete, and the channel + // has been closed, before we continue. + uploadDone := make(chan struct{}) go func() { u.wg.Wait() close(u.uploadResults) - wgDone <- struct{}{} + uploadDone <- struct{}{} }() outer: for { select { - case uploadResult := <-u.uploadResults: + case uploadResult, ok := <-u.uploadResults: + if !ok { + break outer + } + // if len(completedParts) == 3 { + // return 0, errors.New("nope") + // } if uploadResult.err != nil { return 0, uploadResult.err } + log.Println("APPENDING PART, len now", len(completedParts)) completedParts = append(completedParts, uploadResult.completedPart) uploadedBytes += uploadResult.size - case <-wgDone: + case <-uploadDone: break outer case <-u.ctx.Done(): return 0, u.ctx.Err() @@ -155,6 +174,9 @@ outer: return 0, errors.New("no parts available to upload") } + log.Printf("parts - %+v, bucket - %s, key - %s, id - %s", completedParts, u.bucket, u.key, u.uploadID) + log.Printf("len(parts) = %d, cap(parts) = %d", len(completedParts), cap(completedParts)) + input := s3.CompleteMultipartUploadInput{ Bucket: aws.String(u.bucket), Key: aws.String(u.key),