From 4afec11074ac5bac805414c40d841460e9b6681c Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Sat, 20 Nov 2021 19:29:34 +0100 Subject: [PATCH] Get video from Youtube, send progress via gRPC --- .gitignore | 1 - backend/cmd/clipper/main.go | 9 ++- backend/cmd/progress-test/main.go | 2 +- backend/media/audio_progress.go | 19 +++-- backend/media/service.go | 107 +++++++++++++++++++++---- backend/media/uploader.go | 2 + backend/media/video_progress.go | 129 ++++++++++++++++++++++++++++++ backend/server/server.go | 74 ++++++++++++----- backend/sql/queries.sql | 6 ++ frontend/src/App.tsx | 37 +++++---- proto/media_set.proto | 13 ++- 11 files changed, 340 insertions(+), 59 deletions(-) create mode 100644 backend/media/video_progress.go diff --git a/.gitignore b/.gitignore index c72f6c8..ceeabbc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ /backend/.env -/backend/cache/ /backend/debug/ # generated files: diff --git a/backend/cmd/clipper/main.go b/backend/cmd/clipper/main.go index d77f87d..682f61a 100644 --- a/backend/cmd/clipper/main.go +++ b/backend/cmd/clipper/main.go @@ -8,6 +8,7 @@ import ( "time" "git.netflux.io/rob/clipper/generated/store" + "git.netflux.io/rob/clipper/media" "git.netflux.io/rob/clipper/server" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -42,6 +43,9 @@ func main() { } s3Client := s3.NewFromConfig(cfg) + // Create an Amazon S3 presign client + s3PresignClient := s3.NewPresignClient(s3Client) + // Create a Youtube client var youtubeClient youtube.Client @@ -51,7 +55,10 @@ func main() { Timeout: DefaultTimeout, Store: store, YoutubeClient: &youtubeClient, - S3Client: s3Client, + S3API: media.S3API{ + S3Client: s3Client, + S3PresignClient: s3PresignClient, + }, } log.Fatal(server.Start(serverOptions)) diff --git a/backend/cmd/progress-test/main.go b/backend/cmd/progress-test/main.go index 27b02e6..d2d2e0d 100644 --- a/backend/cmd/progress-test/main.go +++ b/backend/cmd/progress-test/main.go @@ -43,7 +43,7 @@ func main() { var youtubeClient youtube.Client // Create a MediaSetService - mediaSetService := media.NewMediaSetService(store, &youtubeClient, s3Client, zap.NewNop()) + mediaSetService := media.NewMediaSetService(store, &youtubeClient, media.S3API{S3Client: s3Client}, zap.NewNop()) mediaSet, err := mediaSetService.Get(ctx, videoID) if err != nil { diff --git a/backend/media/audio_progress.go b/backend/media/audio_progress.go index d0750bb..b6c444b 100644 --- a/backend/media/audio_progress.go +++ b/backend/media/audio_progress.go @@ -34,8 +34,11 @@ type getAudioProgressReader struct { errorChan chan error } -// TODO: validate inputs, debugging is confusing otherwise -func newGetAudioProgressReader(framesExpected int64, channels, numBins int) *getAudioProgressReader { +func newGetAudioProgressReader(framesExpected int64, channels, numBins int) (*getAudioProgressReader, error) { + if framesExpected <= 0 || channels <= 0 || numBins <= 0 { + return nil, fmt.Errorf("error creating audio progress reader (framesExpected = %d, channels = %d, numBins = %d)", framesExpected, channels, numBins) + } + return &getAudioProgressReader{ channels: channels, framesExpected: framesExpected, @@ -44,7 +47,7 @@ func newGetAudioProgressReader(framesExpected int64, channels, numBins int) *get currPeaks: make([]int16, channels), progress: make(chan GetAudioProgress), errorChan: make(chan error, 1), - } + }, nil } func (w *getAudioProgressReader) Abort(err error) { @@ -61,7 +64,7 @@ func (w *getAudioProgressReader) Read() (GetAudioProgress, error) { select { case progress, ok := <-w.progress: if !ok { - return GetAudioProgress{}, io.EOF + return GetAudioProgress{Peaks: w.currPeaks, PercentComplete: w.percentComplete()}, io.EOF } return progress, nil case err := <-w.errorChan: @@ -104,11 +107,14 @@ func (w *getAudioProgressReader) Write(p []byte) (int, error) { return len(p), nil } +func (w *getAudioProgressReader) percentComplete() float32 { + return (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0 +} + func (w *getAudioProgressReader) nextBin() { var progress GetAudioProgress - // TODO: avoid an allocation? progress.Peaks = append(progress.Peaks, w.currPeaks...) - progress.PercentComplete = (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0 + progress.PercentComplete = w.percentComplete() w.progress <- progress @@ -116,5 +122,4 @@ func (w *getAudioProgressReader) nextBin() { for i := 0; i < len(w.currPeaks); i++ { w.currPeaks[i] = 0 } - w.framesProcessed++ } diff --git a/backend/media/service.go b/backend/media/service.go index 44f1f55..9e28ac0 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -14,13 +14,18 @@ import ( "git.netflux.io/rob/clipper/generated/store" "github.com/aws/aws-sdk-go-v2/aws" + signerv4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/google/uuid" youtubev2 "github.com/kkdai/youtube/v2" "go.uber.org/zap" ) -const s3Bucket = "clipper-development" +const ( + s3Bucket = "clipper-development" + + getVideoExpiresIn = time.Hour * 1 +) const ( rawAudioCodec = "pcm_s16le" @@ -38,12 +43,12 @@ const ( type progressReader struct { io.Reader label string - total, exp int + total, exp int64 } func (pw *progressReader) Read(p []byte) (int, error) { n, err := pw.Reader.Read(p) - pw.total += n + pw.total += int64(n) log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0) @@ -56,6 +61,13 @@ type Store interface { GetMediaSetByYoutubeID(ctx context.Context, youtubeID string) (store.MediaSet, error) CreateMediaSet(ctx context.Context, arg store.CreateMediaSetParams) (store.MediaSet, error) SetAudioUploaded(ctx context.Context, arg store.SetAudioUploadedParams) (store.MediaSet, error) + SetVideoUploaded(ctx context.Context, arg store.SetVideoUploadedParams) (store.MediaSet, error) +} + +// S3API provides an API to AWS S3. +type S3API struct { + S3Client + S3PresignClient } // S3Client wraps the AWS S3 service client. @@ -63,10 +75,15 @@ type S3Client interface { GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error) CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) - AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) + AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) } +// S3PresignClient wraps the AWS S3 Presign client. +type S3PresignClient interface { + PresignGetObject(context.Context, *s3.GetObjectInput, ...func(*s3.PresignOptions)) (*signerv4.PresignedHTTPRequest, error) +} + // YoutubeClient wraps the youtube.Client client. type YoutubeClient interface { GetVideoContext(context.Context, string) (*youtubev2.Video, error) @@ -77,20 +94,21 @@ type YoutubeClient interface { type MediaSetService struct { store Store youtube YoutubeClient - s3 S3Client + s3 S3API logger *zap.SugaredLogger } -func NewMediaSetService(store Store, youtubeClient YoutubeClient, s3Client S3Client, logger *zap.Logger) *MediaSetService { +func NewMediaSetService(store Store, youtubeClient YoutubeClient, s3API S3API, logger *zap.Logger) *MediaSetService { return &MediaSetService{ store: store, youtube: youtubeClient, - s3: s3Client, + s3: s3API, logger: logger.Sugar(), } } -// Get fetches the metadata for a given MediaSet source. +// Get fetches the metadata for a given MediaSet source. If it does not exist +// in the local DB, it will attempt to create it. func (s *MediaSetService) Get(ctx context.Context, youtubeID string) (*MediaSet, error) { var ( mediaSet *MediaSet @@ -131,7 +149,7 @@ func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) return nil, fmt.Errorf("error fetching video metadata: %v", err) } - params := store.CreateMediaSetParams{ + storeParams := store.CreateMediaSetParams{ YoutubeID: youtubeID, AudioYoutubeItag: int32(audioMetadata.YoutubeItag), AudioChannels: int32(audioMetadata.Channels), @@ -142,7 +160,7 @@ func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) VideoMimeType: videoMetadata.MimeType, VideoDurationNanos: videoMetadata.Duration.Nanoseconds(), } - mediaSet, err := s.store.CreateMediaSet(ctx, params) + mediaSet, err := s.store.CreateMediaSet(ctx, storeParams) if err != nil { return nil, fmt.Errorf("error creating media set in store: %v", err) } @@ -238,6 +256,59 @@ func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtube }, nil } +// GetVideo fetches the video part of a MediaSet. +func (s *MediaSetService) GetVideo(ctx context.Context, id uuid.UUID) (GetVideoProgressReader, error) { + mediaSet, err := s.store.GetMediaSet(ctx, id) + if err != nil { + return nil, fmt.Errorf("error getting media set: %v", err) + } + + // TODO: use mediaSet func to fetch s3Key + s3Key := fmt.Sprintf("media_sets/%s/video.mp4", mediaSet.ID) + + if mediaSet.VideoS3UploadedAt.Valid { + input := s3.GetObjectInput{Bucket: aws.String(s3Bucket), Key: aws.String(s3Key)} + request, signErr := s.s3.PresignGetObject(ctx, &input, s3.WithPresignExpires(getVideoExpiresIn)) + if signErr != nil { + return nil, fmt.Errorf("error generating presigned URL: %v", signErr) + } + videoGetter := videoGetterDownloaded(request.URL) + return &videoGetter, nil + } + + video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) + if err != nil { + return nil, fmt.Errorf("error fetching video: %v", err) + } + + format := video.Formats.FindByItag(int(mediaSet.VideoYoutubeItag)) + if format == nil { + return nil, fmt.Errorf("error finding itag: %v", err) + } + + stream, _, err := s.youtube.GetStreamContext(ctx, video, format) + if err != nil { + return nil, fmt.Errorf("error fetching stream: %v", err) + } + + videoGetter := newVideoGetter(s.s3, s.store, s.logger) + return videoGetter.GetVideo( + ctx, + stream, + format.ContentLength, + mediaSet.ID, + s3Bucket, + s3Key, + format.MimeType, + ) +} + +type GetVideoProgressReader interface { + // Next returns the next video progress status. When the stream has finished, + // a valid GetVideoProgress value will be returned with io.EOF. + Next() (GetVideoProgress, error) +} + // GetAudio fetches the audio part of a MediaSet. func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins int) (GetAudioProgressReader, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) @@ -262,11 +333,14 @@ func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.Med return nil, fmt.Errorf("error getting object from s3: %v", err) } - getAudioProgressReader := newGetAudioProgressReader( + getAudioProgressReader, err := newGetAudioProgressReader( int64(mediaSet.AudioFrames.Int64), int(mediaSet.AudioChannels), numBins, ) + if err != nil { + return nil, fmt.Errorf("error creating audio reader: %v", err) + } state := getAudioFromS3State{ getAudioProgressReader: getAudioProgressReader, @@ -336,22 +410,25 @@ func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet stor return nil, fmt.Errorf("error fetching stream: %v", err) } - // wrap it in a progress reader - progressStream := &progressReader{Reader: stream, label: "audio", exp: int(format.ContentLength)} + streamWithProgress := &progressReader{Reader: stream, label: "audio", exp: format.ContentLength} - ffmpegReader, err := newFfmpegReader(ctx, progressStream, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") + ffmpegReader, err := newFfmpegReader(ctx, streamWithProgress, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") if err != nil { return nil, fmt.Errorf("error creating ffmpegreader: %v", err) } + // TODO: use mediaSet func to fetch s3Key s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) uploader := newMultipartUploader(s.s3) - getAudioProgressReader := newGetAudioProgressReader( + getAudioProgressReader, err := newGetAudioProgressReader( int64(mediaSet.AudioFramesApprox), format.AudioChannels, numBins, ) + if err != nil { + return nil, fmt.Errorf("error creating audio reader: %v", err) + } state := getAudioFromYoutubeState{ getAudioProgressReader: getAudioProgressReader, diff --git a/backend/media/uploader.go b/backend/media/uploader.go index e2452f3..ce1a509 100644 --- a/backend/media/uploader.go +++ b/backend/media/uploader.go @@ -38,6 +38,8 @@ func newMultipartUploader(s3Client S3Client) *multipartUploader { // Upload uploads to an S3 bucket in 5MB parts. It buffers data internally // until a part is ready to send over the network. Parts are sent as soon as // they exceed the minimum part size of 5MB. +// +// TODO: expire after configurable period. func (u *multipartUploader) Upload(ctx context.Context, r io.Reader, bucket, key, contentType string) (int64, error) { var uploaded bool diff --git a/backend/media/video_progress.go b/backend/media/video_progress.go new file mode 100644 index 0000000..24b65c7 --- /dev/null +++ b/backend/media/video_progress.go @@ -0,0 +1,129 @@ +package media + +import ( + "context" + "fmt" + "io" + + "git.netflux.io/rob/clipper/generated/store" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/google/uuid" + "go.uber.org/zap" +) + +type GetVideoProgress struct { + PercentComplete float32 + URL string +} + +type videoGetter struct { + s3 S3API + store Store + logger *zap.SugaredLogger +} + +type videoGetterState struct { + *videoGetter + + r io.Reader + count, exp int64 + mediaSetID uuid.UUID + bucket, key, contentType string + url string + progressChan chan GetVideoProgress + errorChan chan error +} + +func newVideoGetter(s3 S3API, store Store, logger *zap.SugaredLogger) *videoGetter { + return &videoGetter{s3: s3, store: store, logger: logger} +} + +// GetVideo gets video from Youtube and uploads it to S3 using the specified +// bucket, key and content type. The returned reader must have its Next() +// method called until error = io.EOF, otherwise a deadlock or other resource +// leakage is likely. +func (g *videoGetter) GetVideo(ctx context.Context, r io.Reader, exp int64, mediaSetID uuid.UUID, bucket, key, contentType string) (GetVideoProgressReader, error) { + s := &videoGetterState{ + videoGetter: g, + r: &progressReader{Reader: r, label: "video", exp: exp}, + exp: exp, + mediaSetID: mediaSetID, + bucket: bucket, + key: key, + contentType: contentType, + progressChan: make(chan GetVideoProgress), + errorChan: make(chan error, 1), + } + + go s.getVideo(ctx) + + // return s, exposing only the limited interface to the caller. + return s, nil +} + +// Write implements io.Writer. +func (s *videoGetterState) Write(p []byte) (int, error) { + s.count += int64(len(p)) + pc := (float32(s.count) / float32(s.exp)) * 100 + s.progressChan <- GetVideoProgress{PercentComplete: pc} + return len(p), nil +} + +func (s *videoGetterState) getVideo(ctx context.Context) { + uploader := newMultipartUploader(s.s3) + teeReader := io.TeeReader(s.r, s) + + _, err := uploader.Upload(ctx, teeReader, s.bucket, s.key, s.contentType) + if err != nil { + s.errorChan <- fmt.Errorf("error uploading to S3: %v", err) + return + } + + input := s3.GetObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(s.key), + } + request, err := s.s3.PresignGetObject(ctx, &input, s3.WithPresignExpires(getVideoExpiresIn)) + if err != nil { + s.errorChan <- fmt.Errorf("error generating presigned URL: %v", err) + } + s.url = request.URL + + storeParams := store.SetVideoUploadedParams{ + ID: s.mediaSetID, + VideoS3Bucket: sqlString(s.bucket), + VideoS3Key: sqlString(s.key), + } + _, err = s.store.SetVideoUploaded(ctx, storeParams) + if err != nil { + s.errorChan <- fmt.Errorf("error saving to store: %v", err) + } + + close(s.progressChan) +} + +// Next implements GetVideoProgressReader. +func (s *videoGetterState) Next() (GetVideoProgress, error) { + for { + select { + case progress, ok := <-s.progressChan: + if !ok { + return GetVideoProgress{PercentComplete: 100, URL: s.url}, io.EOF + } + return progress, nil + case err := <-s.errorChan: + return GetVideoProgress{}, fmt.Errorf("error waiting for progress: %v", err) + } + } +} + +type videoGetterDownloaded string + +// Next() implements GetVideoProgressReader. +func (s *videoGetterDownloaded) Next() (GetVideoProgress, error) { + return GetVideoProgress{ + PercentComplete: 100, + URL: string(*s), + }, io.EOF +} diff --git a/backend/server/server.go b/backend/server/server.go index 5c90e4b..e0269a4 100644 --- a/backend/server/server.go +++ b/backend/server/server.go @@ -7,7 +7,7 @@ import ( "net/http" "time" - pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set" + pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set" "git.netflux.io/rob/clipper/media" "github.com/google/uuid" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -34,6 +34,7 @@ const ( const ( getAudioTimeout = time.Minute * 5 getAudioSegmentTimeout = time.Second * 10 + getVideoTimeout = time.Minute * 5 ) type ResponseError struct { @@ -70,24 +71,25 @@ type Options struct { Timeout time.Duration Store media.Store YoutubeClient media.YoutubeClient - S3Client media.S3Client + S3API media.S3API } // mediaSetServiceController implements gRPC controller for MediaSetService type mediaSetServiceController struct { - pbMediaSet.UnimplementedMediaSetServiceServer + pbmediaset.UnimplementedMediaSetServiceServer mediaSetService *media.MediaSetService + logger *zap.SugaredLogger } // Get returns a pbMediaSet.MediaSet -func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet.GetRequest) (*pbMediaSet.MediaSet, error) { +func (c *mediaSetServiceController) Get(ctx context.Context, request *pbmediaset.GetRequest) (*pbmediaset.MediaSet, error) { mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId()) if err != nil { return nil, newResponseError(err) } - result := pbMediaSet.MediaSet{ + result := pbmediaset.MediaSet{ Id: mediaSet.ID.String(), YoutubeId: mediaSet.YoutubeID, AudioChannels: int32(mediaSet.Audio.Channels), @@ -106,7 +108,7 @@ func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet // GetAudio returns a stream of GetAudioProgress relating to the entire audio // part of the MediaSet. -func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest, stream pbMediaSet.MediaSetService_GetAudioServer) error { +func (c *mediaSetServiceController) GetAudio(request *pbmediaset.GetAudioRequest, stream pbmediaset.MediaSetService_GetAudioServer) error { // TODO: reduce timeout when fetching from S3 ctx, cancel := context.WithTimeout(context.Background(), getAudioTimeout) defer cancel() @@ -123,10 +125,7 @@ func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest for { progress, err := reader.Read() - if err != nil { - if err == io.EOF { - break - } + if err != nil && err != io.EOF { return newResponseError(err) } @@ -135,12 +134,15 @@ func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest peaks[i] = int32(p) } - progressPb := pbMediaSet.GetAudioProgress{ - PercentCompleted: progress.PercentComplete, - Peaks: peaks, + progressPb := pbmediaset.GetAudioProgress{ + PercentComplete: progress.PercentComplete, + Peaks: peaks, } - stream.Send(&progressPb) + + if err == io.EOF { + break + } } return nil @@ -148,7 +150,7 @@ func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest // GetAudioSegment returns a set of peaks for a segment of an audio part of a // MediaSet. -func (c *mediaSetServiceController) GetAudioSegment(ctx context.Context, request *pbMediaSet.GetAudioSegmentRequest) (*pbMediaSet.GetAudioSegmentResponse, error) { +func (c *mediaSetServiceController) GetAudioSegment(ctx context.Context, request *pbmediaset.GetAudioSegmentRequest) (*pbmediaset.GetAudioSegmentResponse, error) { ctx, cancel := context.WithTimeout(ctx, getAudioSegmentTimeout) defer cancel() @@ -167,13 +169,48 @@ func (c *mediaSetServiceController) GetAudioSegment(ctx context.Context, request peaks32[i] = int32(p) } - response := pbMediaSet.GetAudioSegmentResponse{ + response := pbmediaset.GetAudioSegmentResponse{ Peaks: peaks32, } return &response, nil } +func (c *mediaSetServiceController) GetVideo(request *pbmediaset.GetVideoRequest, stream pbmediaset.MediaSetService_GetVideoServer) error { + // TODO: reduce timeout when already fetched from Youtube + ctx, cancel := context.WithTimeout(context.Background(), getVideoTimeout) + defer cancel() + + id, err := uuid.Parse(request.GetId()) + if err != nil { + return newResponseError(err) + } + + reader, err := c.mediaSetService.GetVideo(ctx, id) + if err != nil { + return newResponseError(err) + } + + for { + progress, err := reader.Next() + if err != nil && err != io.EOF { + return newResponseError(err) + } + + progressPb := pbmediaset.GetVideoProgress{ + PercentComplete: progress.PercentComplete, + Url: progress.URL, + } + stream.Send(&progressPb) + + if err == io.EOF { + break + } + } + + return nil +} + func Start(options Options) error { logger, err := buildLogger(options.Environment) if err != nil { @@ -181,10 +218,11 @@ func Start(options Options) error { } defer logger.Sync() - fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3Client, logger) + fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3API, logger) grpcServer := buildGRPCServer(options, logger) - pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{mediaSetService: fetchMediaSetService}) + mediaSetController := &mediaSetServiceController{mediaSetService: fetchMediaSetService, logger: logger.Sugar().Named("controller")} + pbmediaset.RegisterMediaSetServiceServer(grpcServer, mediaSetController) // TODO: configure CORS grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true })) diff --git a/backend/sql/queries.sql b/backend/sql/queries.sql index cea7f0b..3475cbc 100644 --- a/backend/sql/queries.sql +++ b/backend/sql/queries.sql @@ -14,3 +14,9 @@ UPDATE media_sets SET audio_s3_bucket = $2, audio_s3_key = $3, audio_frames = $4, audio_s3_uploaded_at = NOW(), updated_at = NOW() WHERE id = $1 RETURNING *; + +-- name: SetVideoUploaded :one +UPDATE media_sets + SET video_s3_bucket = $2, video_s3_key = $3, video_s3_uploaded_at = NOW(), updated_at = NOW() + WHERE id = $1 + RETURNING *; diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 377fb35..1b4b127 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -1,14 +1,8 @@ -import { grpc } from '@improbable-eng/grpc-web'; -// import { -// MediaSet as MediaSetPb, -// GetRequest, -// GetAudioRequest, -// GetAudioProgress, -// } from './generated/media_set_pb'; import { MediaSet, GrpcWebImpl, MediaSetServiceClientImpl, + GetVideoProgress, } from './generated/media_set'; import { useState, useEffect } from 'react'; @@ -67,16 +61,29 @@ function App(): JSX.Element { // load video when MediaSet is loaded: useEffect(() => { - if (mediaSet == null) { - return; - } + (async function () { + if (mediaSet == null) { + return; + } - return; + console.log('getting video...'); + const rpc = newRPC(); + const service = new MediaSetServiceClientImpl(rpc); + const videoProgressStream = service.GetVideo({ id: mediaSet.id }); - video.src = `http://localhost:8888/api/media_sets/${videoID}/video`; - video.muted = false; - video.volume = 1; - console.log('set video src', video.src); + let url = ''; + // TODO: probably a nicer way to do this. + await videoProgressStream.forEach((progress: GetVideoProgress) => { + if (progress.url != '') { + url = progress.url; + } + }); + + video.src = url; + video.muted = false; + video.volume = 1; + console.log('set video src', video.src); + })(); }, [mediaSet]); // set viewport when MediaSet is loaded: diff --git a/proto/media_set.proto b/proto/media_set.proto index 4ae2b32..1b9acd8 100644 --- a/proto/media_set.proto +++ b/proto/media_set.proto @@ -23,7 +23,7 @@ message MediaSet { message GetAudioProgress { repeated int32 peaks = 1; - float percent_completed = 2; + float percent_complete = 2; } message GetRequest { @@ -46,8 +46,19 @@ message GetAudioSegmentResponse { repeated int32 peaks = 1; } +message GetVideoRequest { + string id = 1; +} + + +message GetVideoProgress { + float percent_complete = 1; + string url = 2; +} + service MediaSetService { rpc Get(GetRequest) returns (MediaSet) {} rpc GetAudio(GetAudioRequest) returns (stream GetAudioProgress) {} rpc GetAudioSegment(GetAudioSegmentRequest) returns (GetAudioSegmentResponse) {} + rpc GetVideo(GetVideoRequest) returns (stream GetVideoProgress) {} }