clipper/backend/media/get_video.go

137 lines
3.5 KiB
Go

package media
import (
"context"
"fmt"
"io"
"git.netflux.io/rob/clipper/generated/store"
"github.com/google/uuid"
"go.uber.org/zap"
)
type GetVideoProgress struct {
PercentComplete float32
URL string
}
type GetVideoProgressReader interface {
// Next returns the next video progress status. When the stream has finished,
// a valid GetVideoProgress value will be returned with io.EOF.
Next() (GetVideoProgress, error)
}
type videoGetter struct {
store Store
fileStore FileStore
logger *zap.SugaredLogger
}
type videoGetterState struct {
*videoGetter
r io.ReadCloser
count, exp int64
mediaSetID uuid.UUID
key, contentType string
url string
progressChan chan GetVideoProgress
errorChan chan error
}
func newVideoGetter(store Store, fileStore FileStore, logger *zap.SugaredLogger) *videoGetter {
return &videoGetter{store: store, fileStore: fileStore, logger: logger}
}
// GetVideo gets video from Youtube and uploads it to a filestore using the
// specified key and content type. The returned reader must have its Next()
// method called until err == io.EOF, otherwise a deadlock or other resource
// leakage is likely.
//
// GetVideo will consume and close r.
func (g *videoGetter) GetVideo(ctx context.Context, r io.ReadCloser, exp int64, mediaSetID uuid.UUID, key, contentType string) (GetVideoProgressReader, error) {
s := &videoGetterState{
videoGetter: g,
r: r,
exp: exp,
mediaSetID: mediaSetID,
key: key,
contentType: contentType,
progressChan: make(chan GetVideoProgress),
errorChan: make(chan error, 1),
}
go s.getVideo(ctx)
// return s, exposing only the limited interface to the caller.
return s, nil
}
// Write implements io.Writer. It is copied that same data that is written to
// the file store, to implement progress tracking.
func (s *videoGetterState) Write(p []byte) (int, error) {
s.count += int64(len(p))
pc := (float32(s.count) / float32(s.exp)) * 100
s.progressChan <- GetVideoProgress{PercentComplete: pc}
return len(p), nil
}
func (s *videoGetterState) getVideo(ctx context.Context) {
logReader := newLogProgressReader(s.r, "video", s.exp, s.logger)
teeReader := io.TeeReader(logReader, s)
_, err := s.fileStore.PutObject(ctx, s.key, teeReader, s.contentType)
if err != nil {
s.errorChan <- fmt.Errorf("error uploading to file store: %v", err)
return
}
if err = s.r.Close(); err != nil {
s.errorChan <- fmt.Errorf("error closing video stream: %v", err)
return
}
s.url, err = s.fileStore.GetURL(ctx, s.key)
if err != nil {
s.errorChan <- fmt.Errorf("error getting object URL: %v", err)
return
}
storeParams := store.SetVideoUploadedParams{
ID: s.mediaSetID,
VideoS3Key: sqlString(s.key),
}
_, err = s.store.SetVideoUploaded(ctx, storeParams)
if err != nil {
s.errorChan <- fmt.Errorf("error saving to store: %v", err)
return
}
close(s.progressChan)
}
// Next implements GetVideoProgressReader.
func (s *videoGetterState) Next() (GetVideoProgress, error) {
for {
select {
case progress, ok := <-s.progressChan:
if !ok {
return GetVideoProgress{PercentComplete: 100, URL: s.url}, io.EOF
}
return progress, nil
case err := <-s.errorChan:
return GetVideoProgress{}, fmt.Errorf("error waiting for progress: %v", err)
}
}
}
type videoGetterFromFileStore string
// Next() implements GetVideoProgressReader.
func (s *videoGetterFromFileStore) Next() (GetVideoProgress, error) {
return GetVideoProgress{
PercentComplete: 100,
URL: string(*s),
}, io.EOF
}