Experimental changes prior to refactoring Uploader

This commit is contained in:
Rob Watson 2021-11-10 13:37:25 +01:00
parent 1496473620
commit dc497b546a
2 changed files with 50 additions and 20 deletions

View File

@ -390,6 +390,11 @@ func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID
go func() {
_, copyErr := io.Copy(mw, s.ffmpegReader)
// At this point, there is no more data to send to the uploader.
// We can close it safely, it always returns nil.
_ = s.uploader.Close()
done <- copyErr
}()
@ -404,18 +409,6 @@ outer:
}
}
if err != nil {
if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil {
log.Printf("error cancelling ffmpegreader: %v", cancelErr)
}
}
if readerErr := s.ffmpegReader.Close(); readerErr != nil {
if err == nil {
err = readerErr
}
}
var framesUploaded int64
if err == nil {
if bytesUploaded, uploaderErr := s.uploader.Complete(); uploaderErr != nil {
@ -425,6 +418,21 @@ outer:
}
}
// If there was an error returned, the underlying ffmpegReader process may
// still be active. Kill it.
if err != nil {
if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil {
log.Printf("error cancelling ffmpegreader: %v", cancelErr)
}
}
// Either way, we need to wait for the ffmpegReader process to exit.
if readerErr := s.ffmpegReader.Close(); readerErr != nil {
if err == nil {
err = readerErr
}
}
if err == nil {
_, updateErr := s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{
ID: mediaSetID,
@ -439,7 +447,7 @@ outer:
}
if err != nil {
log.Printf("error in upload flow: %v", err)
log.Printf("error uploading asynchronously: %v", err)
newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

View File

@ -120,31 +120,50 @@ func (u *multipartUploadWriter) uploadPart(buf []byte, partNum int32) {
}
}
// Close signals that no further data will be written to the writer.
// Always returns nil.
func (u *multipartUploadWriter) Close() error {
// TODO: trigger Complete() here too?
close(u.uploadResults)
return nil
}
// Complete waits for all currently uploading parts to be uploaded, and
// finalizes the object in S3. Close() must be called first.
// finalizes the object in S3.
//
// Close() must have been been called first.
func (u *multipartUploadWriter) Complete() (int64, error) {
var completedParts []types.CompletedPart
completedParts := make([]types.CompletedPart, 0, 64)
var uploadedBytes int64
// we wait for all parts to be completed before collecting the results:
wgDone := make(chan struct{})
// Write() launches multiple goroutines to upload the parts asynchronously.
// We need a waitgroup to ensure that all parts are complete, and the channel
// has been closed, before we continue.
uploadDone := make(chan struct{})
go func() {
u.wg.Wait()
close(u.uploadResults)
wgDone <- struct{}{}
uploadDone <- struct{}{}
}()
outer:
for {
select {
case uploadResult := <-u.uploadResults:
case uploadResult, ok := <-u.uploadResults:
if !ok {
break outer
}
// if len(completedParts) == 3 {
// return 0, errors.New("nope")
// }
if uploadResult.err != nil {
return 0, uploadResult.err
}
log.Println("APPENDING PART, len now", len(completedParts))
completedParts = append(completedParts, uploadResult.completedPart)
uploadedBytes += uploadResult.size
case <-wgDone:
case <-uploadDone:
break outer
case <-u.ctx.Done():
return 0, u.ctx.Err()
@ -155,6 +174,9 @@ outer:
return 0, errors.New("no parts available to upload")
}
log.Printf("parts - %+v, bucket - %s, key - %s, id - %s", completedParts, u.bucket, u.key, u.uploadID)
log.Printf("len(parts) = %d, cap(parts) = %d", len(completedParts), cap(completedParts))
input := s3.CompleteMultipartUploadInput{
Bucket: aws.String(u.bucket),
Key: aws.String(u.key),