diff --git a/backend/media/get_audio.go b/backend/media/get_audio.go index e96c2a3..d26ea06 100644 --- a/backend/media/get_audio.go +++ b/backend/media/get_audio.go @@ -13,17 +13,20 @@ import ( "git.netflux.io/rob/clipper/config" "git.netflux.io/rob/clipper/generated/store" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" "go.uber.org/zap" ) type GetAudioProgress struct { PercentComplete float32 Peaks []int16 + URL string } type GetAudioProgressReader interface { Next() (GetAudioProgress, error) - Close() error + Close(string) error } // audioGetter manages getting and processing audio from Youtube. @@ -106,6 +109,7 @@ func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaS return } + var presignedAudioURL string var wg sync.WaitGroup wg.Add(2) @@ -123,6 +127,16 @@ func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaS return } + input := s3.GetObjectInput{ + Bucket: aws.String(s.config.S3Bucket), + Key: aws.String(s3Key), + } + request, err := s.s3API.PresignGetObject(ctx, &input, s3.WithPresignExpires(getAudioExpiresIn)) + if err != nil { + s.CloseWithError(fmt.Errorf("error generating presigned URL: %v", err)) + } + presignedAudioURL = request.URL + if _, err = s.store.SetEncodedAudioUploaded(ctx, store.SetEncodedAudioUploadedParams{ ID: mediaSet.ID, AudioEncodedS3Bucket: sqlString(s.config.S3Bucket), @@ -172,8 +186,8 @@ func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaS wg.Wait() // Finally, close the progress reader so that the subsequent call to Next() - // returns io.EOF. - s.Close() + // returns the presigned URL and io.EOF. + s.Close(presignedAudioURL) } // getAudioProgressReader accepts a byte stream containing little endian @@ -188,6 +202,7 @@ type getAudioProgressReader struct { currPeaks []int16 currCount int framesProcessed int64 + url string progress chan GetAudioProgress errorChan chan error } @@ -212,7 +227,9 @@ func (w *getAudioProgressReader) CloseWithError(err error) { w.errorChan <- err } -func (w *getAudioProgressReader) Close() error { +// Close cloes the reader and returns the provided URL to the calling code. +func (w *getAudioProgressReader) Close(url string) error { + w.url = url close(w.progress) return nil } @@ -222,7 +239,7 @@ func (w *getAudioProgressReader) Next() (GetAudioProgress, error) { select { case progress, ok := <-w.progress: if !ok { - return GetAudioProgress{Peaks: w.currPeaks, PercentComplete: w.percentComplete()}, io.EOF + return GetAudioProgress{Peaks: w.currPeaks, PercentComplete: w.percentComplete(), URL: w.url}, io.EOF } return progress, nil case err := <-w.errorChan: diff --git a/backend/media/service.go b/backend/media/service.go index 0883a3a..060df60 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -24,7 +24,8 @@ import ( ) const ( - getVideoExpiresIn = time.Hour * 1 + getVideoExpiresIn = time.Hour + getAudioExpiresIn = time.Hour ) const ( @@ -327,7 +328,10 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in return nil, fmt.Errorf("error getting media set: %v", err) } - if mediaSet.AudioRawS3UploadedAt.Valid { + // We need both raw and encoded audio to have been uploaded successfully. + // Otherwise, we cannot return both peaks and a presigned URL for use by the + // player. + if mediaSet.AudioRawS3UploadedAt.Valid && mediaSet.AudioEncodedS3UploadedAt.Valid { return s.getAudioFromS3(ctx, mediaSet, numBins) } @@ -361,9 +365,11 @@ func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.Med state := getAudioFromS3State{ getAudioProgressReader: getAudioProgressReader, s3Reader: NewModuloBufReader(output.Body, int(mediaSet.AudioChannels)*SizeOfInt16), + s3API: s.s3, + config: s.config, logger: s.logger, } - go state.run(ctx) + go state.run(ctx, mediaSet) return &state, nil } @@ -372,10 +378,12 @@ type getAudioFromS3State struct { *getAudioProgressReader s3Reader io.ReadCloser + s3API S3API + config config.Config logger *zap.SugaredLogger } -func (s *getAudioFromS3State) run(ctx context.Context) { +func (s *getAudioFromS3State) run(ctx context.Context, mediaSet store.MediaSet) { done := make(chan error) var err error @@ -407,7 +415,16 @@ outer: return } - if iterErr := s.Close(); iterErr != nil { + input := s3.GetObjectInput{ + Bucket: aws.String(s.config.S3Bucket), + Key: aws.String(mediaSet.AudioEncodedS3Key.String), + } + request, err := s.s3API.PresignGetObject(ctx, &input, s3.WithPresignExpires(getAudioExpiresIn)) + if err != nil { + s.CloseWithError(fmt.Errorf("error generating presigned URL: %v", err)) + } + + if iterErr := s.Close(request.URL); iterErr != nil { s.logger.Errorf("getAudioFromS3State: error closing progress iterator: %v", iterErr) } } diff --git a/backend/server/server.go b/backend/server/server.go index 325e15e..59eebfd 100644 --- a/backend/server/server.go +++ b/backend/server/server.go @@ -129,6 +129,7 @@ func (c *mediaSetServiceController) GetAudio(request *pbmediaset.GetAudioRequest progressPb := pbmediaset.GetAudioProgress{ PercentComplete: progress.PercentComplete, + Url: progress.URL, Peaks: peaks, } stream.Send(&progressPb) diff --git a/frontend/src/Overview.tsx b/frontend/src/Overview.tsx index fcca002..376c791 100644 --- a/frontend/src/Overview.tsx +++ b/frontend/src/Overview.tsx @@ -1,5 +1,9 @@ import { useState, useEffect, useRef, MouseEvent } from 'react'; -import { MediaSetServiceClientImpl, MediaSet } from './generated/media_set'; +import { + MediaSetServiceClientImpl, + MediaSet, + GetAudioProgress, +} from './generated/media_set'; import { Frames, newRPC, VideoPosition } from './App'; import { WaveformCanvas } from './WaveformCanvas'; import { from, Observable } from 'rxjs'; @@ -116,6 +120,15 @@ export const Overview: React.FC = ({ }); const peaks = audioProgressStream.pipe(map((progress) => progress.peaks)); setPeaks(peaks); + + let url = ''; + // TODO: probably a nicer way to do this. + await audioProgressStream.forEach((progress: GetAudioProgress) => { + if (progress.url != '') { + url = progress.url; + } + }); + console.log('got audio URL', url); })(); }, [mediaSet]); diff --git a/proto/media_set.proto b/proto/media_set.proto index cf97cf4..a7754d9 100644 --- a/proto/media_set.proto +++ b/proto/media_set.proto @@ -24,6 +24,7 @@ message MediaSet { message GetAudioProgress { repeated int32 peaks = 1; float percent_complete = 2; + string url = 3; } message GetRequest {