From d117419b0c5dc9fc3c439e362caf22390b7cc7a3 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Tue, 2 Nov 2021 19:03:26 +0100 Subject: [PATCH] Read peaks from S3 --- backend/cmd/progress-test/main.go | 4 +- backend/media/media_set.go | 10 +-- backend/media/service.go | 105 +++++++++++++++++++++++++++--- backend/media/uploader.go | 8 +-- backend/server/server.go | 2 +- backend/sql/queries.sql | 2 +- 6 files changed, 109 insertions(+), 22 deletions(-) diff --git a/backend/cmd/progress-test/main.go b/backend/cmd/progress-test/main.go index f42ba44..5d400bc 100644 --- a/backend/cmd/progress-test/main.go +++ b/backend/cmd/progress-test/main.go @@ -11,7 +11,6 @@ import ( "git.netflux.io/rob/clipper/media" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/google/uuid" "github.com/kkdai/youtube/v2" _ "github.com/lib/pq" ) @@ -50,8 +49,7 @@ func main() { } // Create a progressReader - id := uuid.MustParse(mediaSet.ID) - progressReader, err := mediaSetService.GetAudio(ctx, id, 100) + progressReader, err := mediaSetService.GetAudio(ctx, mediaSet.ID, 2_000) if err != nil { log.Fatalf("error calling fetch service: %v", err) } diff --git a/backend/media/media_set.go b/backend/media/media_set.go index 7f63b1d..0c4884b 100644 --- a/backend/media/media_set.go +++ b/backend/media/media_set.go @@ -9,6 +9,8 @@ import ( "log" "os" "time" + + "github.com/google/uuid" ) const SizeOfInt16 = 2 @@ -38,10 +40,10 @@ type Video struct { // MediaSet represents the media and metadata associated with a single media // resource (for example, a YouTube video). type MediaSet struct { - Audio Audio `json:"audio"` - Video Video `json:"video"` - ID string `json:"id"` - YoutubeID string `json:"youtube_id"` + Audio Audio `json:"audio"` + Video Video `json:"video"` + ID uuid.UUID `json:"id"` + YoutubeID string `json:"youtube_id"` exists bool `json:"exists"` } diff --git a/backend/media/service.go b/backend/media/service.go index f2a9701..370b092 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -11,6 +11,7 @@ import ( "time" "git.netflux.io/rob/clipper/generated/store" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/google/uuid" youtubev2 "github.com/kkdai/youtube/v2" @@ -56,6 +57,7 @@ type Store interface { // S3Client wraps the AWS S3 service client. type S3Client interface { + GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error) CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) @@ -141,7 +143,7 @@ func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) } return &MediaSet{ - ID: mediaSet.ID.String(), + ID: mediaSet.ID, YoutubeID: youtubeID, Audio: audioMetadata, Video: videoMetadata, @@ -159,6 +161,8 @@ func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (* } return &MediaSet{ + ID: mediaSet.ID, + YoutubeID: mediaSet.YoutubeID, Audio: Audio{ YoutubeItag: int(mediaSet.AudioYoutubeItag), Bytes: 0, // DEPRECATED @@ -176,7 +180,6 @@ func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (* ThumbnailWidth: 0, // ?? ThumbnailHeight: 0, // ?? }, - YoutubeID: mediaSet.YoutubeID, }, nil } @@ -237,6 +240,82 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in return nil, fmt.Errorf("error getting media set: %v", err) } + if mediaSet.AudioS3UploadedAt.Valid { + return s.getAudioFromS3(ctx, mediaSet, numBins) + } + + return s.getAudioFromYoutube(ctx, mediaSet, numBins) +} + +func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { + input := s3.GetObjectInput{ + Bucket: aws.String(mediaSet.AudioS3Bucket.String), + Key: aws.String(mediaSet.AudioS3Key.String), + } + output, err := s.s3.GetObject(ctx, &input) + if err != nil { + return nil, fmt.Errorf("error getting object from s3: %v", err) + } + + fetchAudioProgressReader := newGetAudioProgressReader( + int64(mediaSet.AudioFrames.Int64), + int(mediaSet.AudioChannels), + numBins, + ) + + state := getAudioFromS3State{ + fetchAudioProgressReader: fetchAudioProgressReader, + s3Reader: output.Body, + } + go state.run(ctx) + + return &state, nil +} + +type getAudioFromS3State struct { + *fetchAudioProgressReader + + s3Reader io.ReadCloser +} + +func (s *getAudioFromS3State) run(ctx context.Context) { + done := make(chan error) + var err error + + go func() { + _, copyErr := io.Copy(s, s.s3Reader) + done <- copyErr + }() + +outer: + for { + select { + case <-ctx.Done(): + err = ctx.Err() + break outer + case err = <-done: + break outer + } + } + + if readerErr := s.s3Reader.Close(); readerErr != nil { + if err == nil { + err = readerErr + } + } + + if err != nil { + log.Printf("error closing s3Reader: %v", err) + s.Abort(err) + return + } + + if iterErr := s.Close(); iterErr != nil { + log.Printf("error closing progress iterator: %v", iterErr) + } +} + +func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) @@ -260,7 +339,7 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in return nil, fmt.Errorf("error creating ffmpegreader: %v", err) } - s3Key := fmt.Sprintf("media_sets/%s/audio.webm", id) + s3Key := fmt.Sprintf("media_sets/%s/audio.webm", mediaSet.ID) uploader, err := newMultipartUploadWriter( ctx, s.s3, @@ -275,10 +354,10 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in fetchAudioProgressReader := newGetAudioProgressReader( int64(mediaSet.AudioFramesApprox), format.AudioChannels, - 100, + numBins, ) - state := getAudioState{ + state := getAudioFromYoutubeState{ fetchAudioProgressReader: fetchAudioProgressReader, ffmpegReader: ffmpegReader, uploader: uploader, @@ -291,7 +370,7 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in return &state, nil } -type getAudioState struct { +type getAudioFromYoutubeState struct { *fetchAudioProgressReader ffmpegReader io.ReadCloser @@ -300,7 +379,7 @@ type getAudioState struct { store Store } -func (s *getAudioState) run(ctx context.Context) { +func (s *getAudioFromYoutubeState) run(ctx context.Context) { mw := io.MultiWriter(s, s.uploader) done := make(chan error) var err error @@ -327,9 +406,12 @@ outer: } } + var framesUploaded int64 if err == nil { - if uploaderErr := s.uploader.Complete(); uploaderErr != nil { + if bytesUploaded, uploaderErr := s.uploader.Complete(); uploaderErr != nil { err = uploaderErr + } else { + framesUploaded = bytesUploaded / int64(s.channels) / SizeOfInt16 } } @@ -337,6 +419,7 @@ outer: _, updateErr := s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{ AudioS3Bucket: sqlString(s.s3Bucket), AudioS3Key: sqlString(s.s3Key), + AudioFrames: sqlInt64(framesUploaded), }) if updateErr != nil { @@ -356,10 +439,14 @@ outer: } if iterErr := s.Close(); iterErr != nil { - log.Printf("error closing peak iterator: %v", iterErr) + log.Printf("error closing progress iterator: %v", iterErr) } } func sqlString(s string) sql.NullString { return sql.NullString{String: s, Valid: true} } + +func sqlInt64(i int64) sql.NullInt64 { + return sql.NullInt64{Int64: i, Valid: true} +} diff --git a/backend/media/uploader.go b/backend/media/uploader.go index 897bbfd..caa5508 100644 --- a/backend/media/uploader.go +++ b/backend/media/uploader.go @@ -120,9 +120,9 @@ func (u *multipartUploadWriter) Abort(ctx context.Context) error { // Complete completes the upload process, finalizing the upload on S3. // If no parts have been successfully uploaded, then Abort() will be called // transparently. -func (u *multipartUploadWriter) Complete() error { +func (u *multipartUploadWriter) Complete() (int64, error) { if len(u.completedParts) == 0 { - return u.Abort(u.ctx) + return 0, u.Abort(u.ctx) } input := s3.CompleteMultipartUploadInput{ @@ -136,9 +136,9 @@ func (u *multipartUploadWriter) Complete() error { _, err := u.s3.CompleteMultipartUpload(u.ctx, &input) if err != nil { - return fmt.Errorf("error completing upload: %v", err) + return 0, fmt.Errorf("error completing upload: %v", err) } log.Printf("completed upload, key = %s, bytesUploaded = %d", u.key, u.bytesUploaded) - return nil + return u.bytesUploaded, nil } diff --git a/backend/server/server.go b/backend/server/server.go index 41a007e..02d0291 100644 --- a/backend/server/server.go +++ b/backend/server/server.go @@ -65,7 +65,7 @@ func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet } result := pbMediaSet.MediaSet{ - Id: mediaSet.ID, + Id: mediaSet.ID.String(), YoutubeId: mediaSet.YoutubeID, AudioChannels: int32(mediaSet.Audio.Channels), AudioFrames: mediaSet.Audio.Frames, diff --git a/backend/sql/queries.sql b/backend/sql/queries.sql index 28d33e6..caf7298 100644 --- a/backend/sql/queries.sql +++ b/backend/sql/queries.sql @@ -11,5 +11,5 @@ INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_fr -- name: SetAudioUploaded :one UPDATE media_sets - SET audio_s3_bucket = $1, audio_s3_key = $2, audio_s3_uploaded_at = NOW(), updated_at = NOW() + SET audio_s3_bucket = $1, audio_s3_key = $2, audio_frames = $3, audio_s3_uploaded_at = NOW(), updated_at = NOW() RETURNING *;