package media import ( "context" "fmt" "io" "git.netflux.io/rob/clipper/generated/store" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/google/uuid" "go.uber.org/zap" ) type GetVideoProgress struct { PercentComplete float32 URL string } type GetVideoProgressReader interface { // Next returns the next video progress status. When the stream has finished, // a valid GetVideoProgress value will be returned with io.EOF. Next() (GetVideoProgress, error) } type videoGetter struct { s3 S3API store Store logger *zap.SugaredLogger } type videoGetterState struct { *videoGetter r io.Reader count, exp int64 mediaSetID uuid.UUID bucket, key, contentType string url string progressChan chan GetVideoProgress errorChan chan error } func newVideoGetter(s3 S3API, store Store, logger *zap.SugaredLogger) *videoGetter { return &videoGetter{s3: s3, store: store, logger: logger} } // GetVideo gets video from Youtube and uploads it to S3 using the specified // bucket, key and content type. The returned reader must have its Next() // method called until error = io.EOF, otherwise a deadlock or other resource // leakage is likely. func (g *videoGetter) GetVideo(ctx context.Context, r io.Reader, exp int64, mediaSetID uuid.UUID, bucket, key, contentType string) (GetVideoProgressReader, error) { s := &videoGetterState{ videoGetter: g, r: newProgressReader(r, "video", exp, g.logger), exp: exp, mediaSetID: mediaSetID, bucket: bucket, key: key, contentType: contentType, progressChan: make(chan GetVideoProgress), errorChan: make(chan error, 1), } go s.getVideo(ctx) // return s, exposing only the limited interface to the caller. return s, nil } // Write implements io.Writer. It is copied that same data that is written to // S3, to implement progress tracking. func (s *videoGetterState) Write(p []byte) (int, error) { s.count += int64(len(p)) pc := (float32(s.count) / float32(s.exp)) * 100 s.progressChan <- GetVideoProgress{PercentComplete: pc} return len(p), nil } func (s *videoGetterState) getVideo(ctx context.Context) { uploader := newMultipartUploader(s.s3, s.logger) teeReader := io.TeeReader(s.r, s) _, err := uploader.Upload(ctx, teeReader, s.bucket, s.key, s.contentType) if err != nil { s.errorChan <- fmt.Errorf("error uploading to S3: %v", err) return } input := s3.GetObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(s.key), } request, err := s.s3.PresignGetObject(ctx, &input, s3.WithPresignExpires(getVideoExpiresIn)) if err != nil { s.errorChan <- fmt.Errorf("error generating presigned URL: %v", err) } s.url = request.URL storeParams := store.SetVideoUploadedParams{ ID: s.mediaSetID, VideoS3Bucket: sqlString(s.bucket), VideoS3Key: sqlString(s.key), } _, err = s.store.SetVideoUploaded(ctx, storeParams) if err != nil { s.errorChan <- fmt.Errorf("error saving to store: %v", err) } close(s.progressChan) } // Next implements GetVideoProgressReader. func (s *videoGetterState) Next() (GetVideoProgress, error) { for { select { case progress, ok := <-s.progressChan: if !ok { return GetVideoProgress{PercentComplete: 100, URL: s.url}, io.EOF } return progress, nil case err := <-s.errorChan: return GetVideoProgress{}, fmt.Errorf("error waiting for progress: %v", err) } } } type videoGetterDownloaded string // Next() implements GetVideoProgressReader. func (s *videoGetterDownloaded) Next() (GetVideoProgress, error) { return GetVideoProgress{ PercentComplete: 100, URL: string(*s), }, io.EOF }