package media import ( "context" "fmt" "io" "git.netflux.io/rob/clipper/generated/store" "github.com/google/uuid" "go.uber.org/zap" ) type GetVideoProgress struct { PercentComplete float32 URL string } type GetVideoProgressReader interface { // Next returns the next video progress status. When the stream has finished, // a valid GetVideoProgress value will be returned with io.EOF. Next() (GetVideoProgress, error) } type videoGetter struct { store Store fileStore FileStore logger *zap.SugaredLogger } type videoGetterState struct { *videoGetter r io.ReadCloser count, exp int64 mediaSetID uuid.UUID key, contentType string url string progressChan chan GetVideoProgress errorChan chan error } func newVideoGetter(store Store, fileStore FileStore, logger *zap.SugaredLogger) *videoGetter { return &videoGetter{store: store, fileStore: fileStore, logger: logger} } // GetVideo gets video from Youtube and uploads it to a filestore using the // specified key and content type. The returned reader must have its Next() // method called until err == io.EOF, otherwise a deadlock or other resource // leakage is likely. // // GetVideo will consume and close r. func (g *videoGetter) GetVideo(ctx context.Context, r io.ReadCloser, exp int64, mediaSetID uuid.UUID, key, contentType string) (GetVideoProgressReader, error) { s := &videoGetterState{ videoGetter: g, r: r, exp: exp, mediaSetID: mediaSetID, key: key, contentType: contentType, progressChan: make(chan GetVideoProgress), errorChan: make(chan error, 1), } go s.getVideo(ctx) // return s, exposing only the limited interface to the caller. return s, nil } // Write implements io.Writer. It is copied that same data that is written to // the file store, to implement progress tracking. func (s *videoGetterState) Write(p []byte) (int, error) { s.count += int64(len(p)) pc := (float32(s.count) / float32(s.exp)) * 100 s.progressChan <- GetVideoProgress{PercentComplete: pc} return len(p), nil } func (s *videoGetterState) getVideo(ctx context.Context) { logReader := newLogProgressReader(s.r, "video", s.exp, s.logger) teeReader := io.TeeReader(logReader, s) _, err := s.fileStore.PutObject(ctx, s.key, teeReader, s.contentType) if err != nil { s.errorChan <- fmt.Errorf("error uploading to file store: %v", err) return } if err = s.r.Close(); err != nil { s.errorChan <- fmt.Errorf("error closing video stream: %v", err) return } s.url, err = s.fileStore.GetURL(ctx, s.key) if err != nil { s.errorChan <- fmt.Errorf("error getting object URL: %v", err) return } storeParams := store.SetVideoUploadedParams{ ID: s.mediaSetID, VideoS3Key: sqlString(s.key), } _, err = s.store.SetVideoUploaded(ctx, storeParams) if err != nil { s.errorChan <- fmt.Errorf("error saving to store: %v", err) return } close(s.progressChan) } // Next implements GetVideoProgressReader. func (s *videoGetterState) Next() (GetVideoProgress, error) { for { select { case progress, ok := <-s.progressChan: if !ok { return GetVideoProgress{PercentComplete: 100, URL: s.url}, io.EOF } return progress, nil case err := <-s.errorChan: return GetVideoProgress{}, fmt.Errorf("error waiting for progress: %v", err) } } } type videoGetterFromFileStore string // Next() implements GetVideoProgressReader. func (s *videoGetterFromFileStore) Next() (GetVideoProgress, error) { return GetVideoProgress{ PercentComplete: 100, URL: string(*s), }, io.EOF }