Get video from Youtube, send progress via gRPC

This commit is contained in:
Rob Watson 2021-11-20 19:29:34 +01:00
parent b864835f40
commit 4afec11074
11 changed files with 340 additions and 59 deletions

1
.gitignore vendored
View File

@ -1,5 +1,4 @@
/backend/.env /backend/.env
/backend/cache/
/backend/debug/ /backend/debug/
# generated files: # generated files:

View File

@ -8,6 +8,7 @@ import (
"time" "time"
"git.netflux.io/rob/clipper/generated/store" "git.netflux.io/rob/clipper/generated/store"
"git.netflux.io/rob/clipper/media"
"git.netflux.io/rob/clipper/server" "git.netflux.io/rob/clipper/server"
"github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3"
@ -42,6 +43,9 @@ func main() {
} }
s3Client := s3.NewFromConfig(cfg) s3Client := s3.NewFromConfig(cfg)
// Create an Amazon S3 presign client
s3PresignClient := s3.NewPresignClient(s3Client)
// Create a Youtube client // Create a Youtube client
var youtubeClient youtube.Client var youtubeClient youtube.Client
@ -51,7 +55,10 @@ func main() {
Timeout: DefaultTimeout, Timeout: DefaultTimeout,
Store: store, Store: store,
YoutubeClient: &youtubeClient, YoutubeClient: &youtubeClient,
S3API: media.S3API{
S3Client: s3Client, S3Client: s3Client,
S3PresignClient: s3PresignClient,
},
} }
log.Fatal(server.Start(serverOptions)) log.Fatal(server.Start(serverOptions))

View File

@ -43,7 +43,7 @@ func main() {
var youtubeClient youtube.Client var youtubeClient youtube.Client
// Create a MediaSetService // Create a MediaSetService
mediaSetService := media.NewMediaSetService(store, &youtubeClient, s3Client, zap.NewNop()) mediaSetService := media.NewMediaSetService(store, &youtubeClient, media.S3API{S3Client: s3Client}, zap.NewNop())
mediaSet, err := mediaSetService.Get(ctx, videoID) mediaSet, err := mediaSetService.Get(ctx, videoID)
if err != nil { if err != nil {

View File

@ -34,8 +34,11 @@ type getAudioProgressReader struct {
errorChan chan error errorChan chan error
} }
// TODO: validate inputs, debugging is confusing otherwise func newGetAudioProgressReader(framesExpected int64, channels, numBins int) (*getAudioProgressReader, error) {
func newGetAudioProgressReader(framesExpected int64, channels, numBins int) *getAudioProgressReader { if framesExpected <= 0 || channels <= 0 || numBins <= 0 {
return nil, fmt.Errorf("error creating audio progress reader (framesExpected = %d, channels = %d, numBins = %d)", framesExpected, channels, numBins)
}
return &getAudioProgressReader{ return &getAudioProgressReader{
channels: channels, channels: channels,
framesExpected: framesExpected, framesExpected: framesExpected,
@ -44,7 +47,7 @@ func newGetAudioProgressReader(framesExpected int64, channels, numBins int) *get
currPeaks: make([]int16, channels), currPeaks: make([]int16, channels),
progress: make(chan GetAudioProgress), progress: make(chan GetAudioProgress),
errorChan: make(chan error, 1), errorChan: make(chan error, 1),
} }, nil
} }
func (w *getAudioProgressReader) Abort(err error) { func (w *getAudioProgressReader) Abort(err error) {
@ -61,7 +64,7 @@ func (w *getAudioProgressReader) Read() (GetAudioProgress, error) {
select { select {
case progress, ok := <-w.progress: case progress, ok := <-w.progress:
if !ok { if !ok {
return GetAudioProgress{}, io.EOF return GetAudioProgress{Peaks: w.currPeaks, PercentComplete: w.percentComplete()}, io.EOF
} }
return progress, nil return progress, nil
case err := <-w.errorChan: case err := <-w.errorChan:
@ -104,11 +107,14 @@ func (w *getAudioProgressReader) Write(p []byte) (int, error) {
return len(p), nil return len(p), nil
} }
func (w *getAudioProgressReader) percentComplete() float32 {
return (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0
}
func (w *getAudioProgressReader) nextBin() { func (w *getAudioProgressReader) nextBin() {
var progress GetAudioProgress var progress GetAudioProgress
// TODO: avoid an allocation?
progress.Peaks = append(progress.Peaks, w.currPeaks...) progress.Peaks = append(progress.Peaks, w.currPeaks...)
progress.PercentComplete = (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0 progress.PercentComplete = w.percentComplete()
w.progress <- progress w.progress <- progress
@ -116,5 +122,4 @@ func (w *getAudioProgressReader) nextBin() {
for i := 0; i < len(w.currPeaks); i++ { for i := 0; i < len(w.currPeaks); i++ {
w.currPeaks[i] = 0 w.currPeaks[i] = 0
} }
w.framesProcessed++
} }

View File

@ -14,13 +14,18 @@ import (
"git.netflux.io/rob/clipper/generated/store" "git.netflux.io/rob/clipper/generated/store"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
signerv4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid" "github.com/google/uuid"
youtubev2 "github.com/kkdai/youtube/v2" youtubev2 "github.com/kkdai/youtube/v2"
"go.uber.org/zap" "go.uber.org/zap"
) )
const s3Bucket = "clipper-development" const (
s3Bucket = "clipper-development"
getVideoExpiresIn = time.Hour * 1
)
const ( const (
rawAudioCodec = "pcm_s16le" rawAudioCodec = "pcm_s16le"
@ -38,12 +43,12 @@ const (
type progressReader struct { type progressReader struct {
io.Reader io.Reader
label string label string
total, exp int total, exp int64
} }
func (pw *progressReader) Read(p []byte) (int, error) { func (pw *progressReader) Read(p []byte) (int, error) {
n, err := pw.Reader.Read(p) n, err := pw.Reader.Read(p)
pw.total += n pw.total += int64(n)
log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0) log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0)
@ -56,6 +61,13 @@ type Store interface {
GetMediaSetByYoutubeID(ctx context.Context, youtubeID string) (store.MediaSet, error) GetMediaSetByYoutubeID(ctx context.Context, youtubeID string) (store.MediaSet, error)
CreateMediaSet(ctx context.Context, arg store.CreateMediaSetParams) (store.MediaSet, error) CreateMediaSet(ctx context.Context, arg store.CreateMediaSetParams) (store.MediaSet, error)
SetAudioUploaded(ctx context.Context, arg store.SetAudioUploadedParams) (store.MediaSet, error) SetAudioUploaded(ctx context.Context, arg store.SetAudioUploadedParams) (store.MediaSet, error)
SetVideoUploaded(ctx context.Context, arg store.SetVideoUploadedParams) (store.MediaSet, error)
}
// S3API provides an API to AWS S3.
type S3API struct {
S3Client
S3PresignClient
} }
// S3Client wraps the AWS S3 service client. // S3Client wraps the AWS S3 service client.
@ -63,10 +75,15 @@ type S3Client interface {
GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error) GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error)
CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)
} }
// S3PresignClient wraps the AWS S3 Presign client.
type S3PresignClient interface {
PresignGetObject(context.Context, *s3.GetObjectInput, ...func(*s3.PresignOptions)) (*signerv4.PresignedHTTPRequest, error)
}
// YoutubeClient wraps the youtube.Client client. // YoutubeClient wraps the youtube.Client client.
type YoutubeClient interface { type YoutubeClient interface {
GetVideoContext(context.Context, string) (*youtubev2.Video, error) GetVideoContext(context.Context, string) (*youtubev2.Video, error)
@ -77,20 +94,21 @@ type YoutubeClient interface {
type MediaSetService struct { type MediaSetService struct {
store Store store Store
youtube YoutubeClient youtube YoutubeClient
s3 S3Client s3 S3API
logger *zap.SugaredLogger logger *zap.SugaredLogger
} }
func NewMediaSetService(store Store, youtubeClient YoutubeClient, s3Client S3Client, logger *zap.Logger) *MediaSetService { func NewMediaSetService(store Store, youtubeClient YoutubeClient, s3API S3API, logger *zap.Logger) *MediaSetService {
return &MediaSetService{ return &MediaSetService{
store: store, store: store,
youtube: youtubeClient, youtube: youtubeClient,
s3: s3Client, s3: s3API,
logger: logger.Sugar(), logger: logger.Sugar(),
} }
} }
// Get fetches the metadata for a given MediaSet source. // Get fetches the metadata for a given MediaSet source. If it does not exist
// in the local DB, it will attempt to create it.
func (s *MediaSetService) Get(ctx context.Context, youtubeID string) (*MediaSet, error) { func (s *MediaSetService) Get(ctx context.Context, youtubeID string) (*MediaSet, error) {
var ( var (
mediaSet *MediaSet mediaSet *MediaSet
@ -131,7 +149,7 @@ func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string)
return nil, fmt.Errorf("error fetching video metadata: %v", err) return nil, fmt.Errorf("error fetching video metadata: %v", err)
} }
params := store.CreateMediaSetParams{ storeParams := store.CreateMediaSetParams{
YoutubeID: youtubeID, YoutubeID: youtubeID,
AudioYoutubeItag: int32(audioMetadata.YoutubeItag), AudioYoutubeItag: int32(audioMetadata.YoutubeItag),
AudioChannels: int32(audioMetadata.Channels), AudioChannels: int32(audioMetadata.Channels),
@ -142,7 +160,7 @@ func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string)
VideoMimeType: videoMetadata.MimeType, VideoMimeType: videoMetadata.MimeType,
VideoDurationNanos: videoMetadata.Duration.Nanoseconds(), VideoDurationNanos: videoMetadata.Duration.Nanoseconds(),
} }
mediaSet, err := s.store.CreateMediaSet(ctx, params) mediaSet, err := s.store.CreateMediaSet(ctx, storeParams)
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating media set in store: %v", err) return nil, fmt.Errorf("error creating media set in store: %v", err)
} }
@ -238,6 +256,59 @@ func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtube
}, nil }, nil
} }
// GetVideo fetches the video part of a MediaSet.
func (s *MediaSetService) GetVideo(ctx context.Context, id uuid.UUID) (GetVideoProgressReader, error) {
mediaSet, err := s.store.GetMediaSet(ctx, id)
if err != nil {
return nil, fmt.Errorf("error getting media set: %v", err)
}
// TODO: use mediaSet func to fetch s3Key
s3Key := fmt.Sprintf("media_sets/%s/video.mp4", mediaSet.ID)
if mediaSet.VideoS3UploadedAt.Valid {
input := s3.GetObjectInput{Bucket: aws.String(s3Bucket), Key: aws.String(s3Key)}
request, signErr := s.s3.PresignGetObject(ctx, &input, s3.WithPresignExpires(getVideoExpiresIn))
if signErr != nil {
return nil, fmt.Errorf("error generating presigned URL: %v", signErr)
}
videoGetter := videoGetterDownloaded(request.URL)
return &videoGetter, nil
}
video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
format := video.Formats.FindByItag(int(mediaSet.VideoYoutubeItag))
if format == nil {
return nil, fmt.Errorf("error finding itag: %v", err)
}
stream, _, err := s.youtube.GetStreamContext(ctx, video, format)
if err != nil {
return nil, fmt.Errorf("error fetching stream: %v", err)
}
videoGetter := newVideoGetter(s.s3, s.store, s.logger)
return videoGetter.GetVideo(
ctx,
stream,
format.ContentLength,
mediaSet.ID,
s3Bucket,
s3Key,
format.MimeType,
)
}
type GetVideoProgressReader interface {
// Next returns the next video progress status. When the stream has finished,
// a valid GetVideoProgress value will be returned with io.EOF.
Next() (GetVideoProgress, error)
}
// GetAudio fetches the audio part of a MediaSet. // GetAudio fetches the audio part of a MediaSet.
func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins int) (GetAudioProgressReader, error) { func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins int) (GetAudioProgressReader, error) {
mediaSet, err := s.store.GetMediaSet(ctx, id) mediaSet, err := s.store.GetMediaSet(ctx, id)
@ -262,11 +333,14 @@ func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.Med
return nil, fmt.Errorf("error getting object from s3: %v", err) return nil, fmt.Errorf("error getting object from s3: %v", err)
} }
getAudioProgressReader := newGetAudioProgressReader( getAudioProgressReader, err := newGetAudioProgressReader(
int64(mediaSet.AudioFrames.Int64), int64(mediaSet.AudioFrames.Int64),
int(mediaSet.AudioChannels), int(mediaSet.AudioChannels),
numBins, numBins,
) )
if err != nil {
return nil, fmt.Errorf("error creating audio reader: %v", err)
}
state := getAudioFromS3State{ state := getAudioFromS3State{
getAudioProgressReader: getAudioProgressReader, getAudioProgressReader: getAudioProgressReader,
@ -336,22 +410,25 @@ func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet stor
return nil, fmt.Errorf("error fetching stream: %v", err) return nil, fmt.Errorf("error fetching stream: %v", err)
} }
// wrap it in a progress reader streamWithProgress := &progressReader{Reader: stream, label: "audio", exp: format.ContentLength}
progressStream := &progressReader{Reader: stream, label: "audio", exp: int(format.ContentLength)}
ffmpegReader, err := newFfmpegReader(ctx, progressStream, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") ffmpegReader, err := newFfmpegReader(ctx, streamWithProgress, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating ffmpegreader: %v", err) return nil, fmt.Errorf("error creating ffmpegreader: %v", err)
} }
// TODO: use mediaSet func to fetch s3Key
s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID)
uploader := newMultipartUploader(s.s3) uploader := newMultipartUploader(s.s3)
getAudioProgressReader := newGetAudioProgressReader( getAudioProgressReader, err := newGetAudioProgressReader(
int64(mediaSet.AudioFramesApprox), int64(mediaSet.AudioFramesApprox),
format.AudioChannels, format.AudioChannels,
numBins, numBins,
) )
if err != nil {
return nil, fmt.Errorf("error creating audio reader: %v", err)
}
state := getAudioFromYoutubeState{ state := getAudioFromYoutubeState{
getAudioProgressReader: getAudioProgressReader, getAudioProgressReader: getAudioProgressReader,

View File

@ -38,6 +38,8 @@ func newMultipartUploader(s3Client S3Client) *multipartUploader {
// Upload uploads to an S3 bucket in 5MB parts. It buffers data internally // Upload uploads to an S3 bucket in 5MB parts. It buffers data internally
// until a part is ready to send over the network. Parts are sent as soon as // until a part is ready to send over the network. Parts are sent as soon as
// they exceed the minimum part size of 5MB. // they exceed the minimum part size of 5MB.
//
// TODO: expire after configurable period.
func (u *multipartUploader) Upload(ctx context.Context, r io.Reader, bucket, key, contentType string) (int64, error) { func (u *multipartUploader) Upload(ctx context.Context, r io.Reader, bucket, key, contentType string) (int64, error) {
var uploaded bool var uploaded bool

View File

@ -0,0 +1,129 @@
package media
import (
"context"
"fmt"
"io"
"git.netflux.io/rob/clipper/generated/store"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid"
"go.uber.org/zap"
)
type GetVideoProgress struct {
PercentComplete float32
URL string
}
type videoGetter struct {
s3 S3API
store Store
logger *zap.SugaredLogger
}
type videoGetterState struct {
*videoGetter
r io.Reader
count, exp int64
mediaSetID uuid.UUID
bucket, key, contentType string
url string
progressChan chan GetVideoProgress
errorChan chan error
}
func newVideoGetter(s3 S3API, store Store, logger *zap.SugaredLogger) *videoGetter {
return &videoGetter{s3: s3, store: store, logger: logger}
}
// GetVideo gets video from Youtube and uploads it to S3 using the specified
// bucket, key and content type. The returned reader must have its Next()
// method called until error = io.EOF, otherwise a deadlock or other resource
// leakage is likely.
func (g *videoGetter) GetVideo(ctx context.Context, r io.Reader, exp int64, mediaSetID uuid.UUID, bucket, key, contentType string) (GetVideoProgressReader, error) {
s := &videoGetterState{
videoGetter: g,
r: &progressReader{Reader: r, label: "video", exp: exp},
exp: exp,
mediaSetID: mediaSetID,
bucket: bucket,
key: key,
contentType: contentType,
progressChan: make(chan GetVideoProgress),
errorChan: make(chan error, 1),
}
go s.getVideo(ctx)
// return s, exposing only the limited interface to the caller.
return s, nil
}
// Write implements io.Writer.
func (s *videoGetterState) Write(p []byte) (int, error) {
s.count += int64(len(p))
pc := (float32(s.count) / float32(s.exp)) * 100
s.progressChan <- GetVideoProgress{PercentComplete: pc}
return len(p), nil
}
func (s *videoGetterState) getVideo(ctx context.Context) {
uploader := newMultipartUploader(s.s3)
teeReader := io.TeeReader(s.r, s)
_, err := uploader.Upload(ctx, teeReader, s.bucket, s.key, s.contentType)
if err != nil {
s.errorChan <- fmt.Errorf("error uploading to S3: %v", err)
return
}
input := s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s.key),
}
request, err := s.s3.PresignGetObject(ctx, &input, s3.WithPresignExpires(getVideoExpiresIn))
if err != nil {
s.errorChan <- fmt.Errorf("error generating presigned URL: %v", err)
}
s.url = request.URL
storeParams := store.SetVideoUploadedParams{
ID: s.mediaSetID,
VideoS3Bucket: sqlString(s.bucket),
VideoS3Key: sqlString(s.key),
}
_, err = s.store.SetVideoUploaded(ctx, storeParams)
if err != nil {
s.errorChan <- fmt.Errorf("error saving to store: %v", err)
}
close(s.progressChan)
}
// Next implements GetVideoProgressReader.
func (s *videoGetterState) Next() (GetVideoProgress, error) {
for {
select {
case progress, ok := <-s.progressChan:
if !ok {
return GetVideoProgress{PercentComplete: 100, URL: s.url}, io.EOF
}
return progress, nil
case err := <-s.errorChan:
return GetVideoProgress{}, fmt.Errorf("error waiting for progress: %v", err)
}
}
}
type videoGetterDownloaded string
// Next() implements GetVideoProgressReader.
func (s *videoGetterDownloaded) Next() (GetVideoProgress, error) {
return GetVideoProgress{
PercentComplete: 100,
URL: string(*s),
}, io.EOF
}

View File

@ -7,7 +7,7 @@ import (
"net/http" "net/http"
"time" "time"
pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set" pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set"
"git.netflux.io/rob/clipper/media" "git.netflux.io/rob/clipper/media"
"github.com/google/uuid" "github.com/google/uuid"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
@ -34,6 +34,7 @@ const (
const ( const (
getAudioTimeout = time.Minute * 5 getAudioTimeout = time.Minute * 5
getAudioSegmentTimeout = time.Second * 10 getAudioSegmentTimeout = time.Second * 10
getVideoTimeout = time.Minute * 5
) )
type ResponseError struct { type ResponseError struct {
@ -70,24 +71,25 @@ type Options struct {
Timeout time.Duration Timeout time.Duration
Store media.Store Store media.Store
YoutubeClient media.YoutubeClient YoutubeClient media.YoutubeClient
S3Client media.S3Client S3API media.S3API
} }
// mediaSetServiceController implements gRPC controller for MediaSetService // mediaSetServiceController implements gRPC controller for MediaSetService
type mediaSetServiceController struct { type mediaSetServiceController struct {
pbMediaSet.UnimplementedMediaSetServiceServer pbmediaset.UnimplementedMediaSetServiceServer
mediaSetService *media.MediaSetService mediaSetService *media.MediaSetService
logger *zap.SugaredLogger
} }
// Get returns a pbMediaSet.MediaSet // Get returns a pbMediaSet.MediaSet
func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet.GetRequest) (*pbMediaSet.MediaSet, error) { func (c *mediaSetServiceController) Get(ctx context.Context, request *pbmediaset.GetRequest) (*pbmediaset.MediaSet, error) {
mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId()) mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId())
if err != nil { if err != nil {
return nil, newResponseError(err) return nil, newResponseError(err)
} }
result := pbMediaSet.MediaSet{ result := pbmediaset.MediaSet{
Id: mediaSet.ID.String(), Id: mediaSet.ID.String(),
YoutubeId: mediaSet.YoutubeID, YoutubeId: mediaSet.YoutubeID,
AudioChannels: int32(mediaSet.Audio.Channels), AudioChannels: int32(mediaSet.Audio.Channels),
@ -106,7 +108,7 @@ func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet
// GetAudio returns a stream of GetAudioProgress relating to the entire audio // GetAudio returns a stream of GetAudioProgress relating to the entire audio
// part of the MediaSet. // part of the MediaSet.
func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest, stream pbMediaSet.MediaSetService_GetAudioServer) error { func (c *mediaSetServiceController) GetAudio(request *pbmediaset.GetAudioRequest, stream pbmediaset.MediaSetService_GetAudioServer) error {
// TODO: reduce timeout when fetching from S3 // TODO: reduce timeout when fetching from S3
ctx, cancel := context.WithTimeout(context.Background(), getAudioTimeout) ctx, cancel := context.WithTimeout(context.Background(), getAudioTimeout)
defer cancel() defer cancel()
@ -123,10 +125,7 @@ func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest
for { for {
progress, err := reader.Read() progress, err := reader.Read()
if err != nil { if err != nil && err != io.EOF {
if err == io.EOF {
break
}
return newResponseError(err) return newResponseError(err)
} }
@ -135,12 +134,15 @@ func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest
peaks[i] = int32(p) peaks[i] = int32(p)
} }
progressPb := pbMediaSet.GetAudioProgress{ progressPb := pbmediaset.GetAudioProgress{
PercentCompleted: progress.PercentComplete, PercentComplete: progress.PercentComplete,
Peaks: peaks, Peaks: peaks,
} }
stream.Send(&progressPb) stream.Send(&progressPb)
if err == io.EOF {
break
}
} }
return nil return nil
@ -148,7 +150,7 @@ func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest
// GetAudioSegment returns a set of peaks for a segment of an audio part of a // GetAudioSegment returns a set of peaks for a segment of an audio part of a
// MediaSet. // MediaSet.
func (c *mediaSetServiceController) GetAudioSegment(ctx context.Context, request *pbMediaSet.GetAudioSegmentRequest) (*pbMediaSet.GetAudioSegmentResponse, error) { func (c *mediaSetServiceController) GetAudioSegment(ctx context.Context, request *pbmediaset.GetAudioSegmentRequest) (*pbmediaset.GetAudioSegmentResponse, error) {
ctx, cancel := context.WithTimeout(ctx, getAudioSegmentTimeout) ctx, cancel := context.WithTimeout(ctx, getAudioSegmentTimeout)
defer cancel() defer cancel()
@ -167,13 +169,48 @@ func (c *mediaSetServiceController) GetAudioSegment(ctx context.Context, request
peaks32[i] = int32(p) peaks32[i] = int32(p)
} }
response := pbMediaSet.GetAudioSegmentResponse{ response := pbmediaset.GetAudioSegmentResponse{
Peaks: peaks32, Peaks: peaks32,
} }
return &response, nil return &response, nil
} }
func (c *mediaSetServiceController) GetVideo(request *pbmediaset.GetVideoRequest, stream pbmediaset.MediaSetService_GetVideoServer) error {
// TODO: reduce timeout when already fetched from Youtube
ctx, cancel := context.WithTimeout(context.Background(), getVideoTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return newResponseError(err)
}
reader, err := c.mediaSetService.GetVideo(ctx, id)
if err != nil {
return newResponseError(err)
}
for {
progress, err := reader.Next()
if err != nil && err != io.EOF {
return newResponseError(err)
}
progressPb := pbmediaset.GetVideoProgress{
PercentComplete: progress.PercentComplete,
Url: progress.URL,
}
stream.Send(&progressPb)
if err == io.EOF {
break
}
}
return nil
}
func Start(options Options) error { func Start(options Options) error {
logger, err := buildLogger(options.Environment) logger, err := buildLogger(options.Environment)
if err != nil { if err != nil {
@ -181,10 +218,11 @@ func Start(options Options) error {
} }
defer logger.Sync() defer logger.Sync()
fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3Client, logger) fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3API, logger)
grpcServer := buildGRPCServer(options, logger) grpcServer := buildGRPCServer(options, logger)
pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{mediaSetService: fetchMediaSetService}) mediaSetController := &mediaSetServiceController{mediaSetService: fetchMediaSetService, logger: logger.Sugar().Named("controller")}
pbmediaset.RegisterMediaSetServiceServer(grpcServer, mediaSetController)
// TODO: configure CORS // TODO: configure CORS
grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true })) grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true }))

View File

@ -14,3 +14,9 @@ UPDATE media_sets
SET audio_s3_bucket = $2, audio_s3_key = $3, audio_frames = $4, audio_s3_uploaded_at = NOW(), updated_at = NOW() SET audio_s3_bucket = $2, audio_s3_key = $3, audio_frames = $4, audio_s3_uploaded_at = NOW(), updated_at = NOW()
WHERE id = $1 WHERE id = $1
RETURNING *; RETURNING *;
-- name: SetVideoUploaded :one
UPDATE media_sets
SET video_s3_bucket = $2, video_s3_key = $3, video_s3_uploaded_at = NOW(), updated_at = NOW()
WHERE id = $1
RETURNING *;

View File

@ -1,14 +1,8 @@
import { grpc } from '@improbable-eng/grpc-web';
// import {
// MediaSet as MediaSetPb,
// GetRequest,
// GetAudioRequest,
// GetAudioProgress,
// } from './generated/media_set_pb';
import { import {
MediaSet, MediaSet,
GrpcWebImpl, GrpcWebImpl,
MediaSetServiceClientImpl, MediaSetServiceClientImpl,
GetVideoProgress,
} from './generated/media_set'; } from './generated/media_set';
import { useState, useEffect } from 'react'; import { useState, useEffect } from 'react';
@ -67,16 +61,29 @@ function App(): JSX.Element {
// load video when MediaSet is loaded: // load video when MediaSet is loaded:
useEffect(() => { useEffect(() => {
(async function () {
if (mediaSet == null) { if (mediaSet == null) {
return; return;
} }
return; console.log('getting video...');
const rpc = newRPC();
const service = new MediaSetServiceClientImpl(rpc);
const videoProgressStream = service.GetVideo({ id: mediaSet.id });
video.src = `http://localhost:8888/api/media_sets/${videoID}/video`; let url = '';
// TODO: probably a nicer way to do this.
await videoProgressStream.forEach((progress: GetVideoProgress) => {
if (progress.url != '') {
url = progress.url;
}
});
video.src = url;
video.muted = false; video.muted = false;
video.volume = 1; video.volume = 1;
console.log('set video src', video.src); console.log('set video src', video.src);
})();
}, [mediaSet]); }, [mediaSet]);
// set viewport when MediaSet is loaded: // set viewport when MediaSet is loaded:

View File

@ -23,7 +23,7 @@ message MediaSet {
message GetAudioProgress { message GetAudioProgress {
repeated int32 peaks = 1; repeated int32 peaks = 1;
float percent_completed = 2; float percent_complete = 2;
} }
message GetRequest { message GetRequest {
@ -46,8 +46,19 @@ message GetAudioSegmentResponse {
repeated int32 peaks = 1; repeated int32 peaks = 1;
} }
message GetVideoRequest {
string id = 1;
}
message GetVideoProgress {
float percent_complete = 1;
string url = 2;
}
service MediaSetService { service MediaSetService {
rpc Get(GetRequest) returns (MediaSet) {} rpc Get(GetRequest) returns (MediaSet) {}
rpc GetAudio(GetAudioRequest) returns (stream GetAudioProgress) {} rpc GetAudio(GetAudioRequest) returns (stream GetAudioProgress) {}
rpc GetAudioSegment(GetAudioSegmentRequest) returns (GetAudioSegmentResponse) {} rpc GetAudioSegment(GetAudioSegmentRequest) returns (GetAudioSegmentResponse) {}
rpc GetVideo(GetVideoRequest) returns (stream GetVideoProgress) {}
} }