From 281d5ce8a27a890c06710eab9fa9e21e282171c3 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Fri, 29 Oct 2021 14:52:31 +0200 Subject: [PATCH] Start to refactor and wire in frontend --- backend/cmd/clipper/main.go | 23 ++++++++- backend/media/audio_progress.go | 41 +++++++++-------- backend/media/fetch.go | 82 ++++++++++++++++++++++++--------- backend/server/server.go | 66 +++++++++++++++++++++----- frontend/src/App.tsx | 44 ++++++++---------- frontend/src/GrpcWrapper.tsx | 55 ++++++++++++++++++++++ proto/media_set.proto | 13 +++--- 7 files changed, 238 insertions(+), 86 deletions(-) create mode 100644 frontend/src/GrpcWrapper.tsx diff --git a/backend/cmd/clipper/main.go b/backend/cmd/clipper/main.go index 3edf6b3..a33b200 100644 --- a/backend/cmd/clipper/main.go +++ b/backend/cmd/clipper/main.go @@ -1,10 +1,14 @@ package main import ( + "context" "log" "time" "git.netflux.io/rob/clipper/server" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/kkdai/youtube/v2" ) const ( @@ -13,9 +17,24 @@ const ( ) func main() { + ctx := context.Background() + + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + log.Fatal(err) + } + + // Create an Amazon S3 service s3Client + s3Client := s3.NewFromConfig(cfg) + + // Create a Youtube client + var youtubeClient youtube.Client + serverOptions := server.Options{ - BindAddr: DefaultHTTPBindAddr, - Timeout: DefaultTimeout, + BindAddr: DefaultHTTPBindAddr, + Timeout: DefaultTimeout, + YoutubeClient: &youtubeClient, + S3Client: s3Client, } log.Fatal(server.Start(serverOptions)) diff --git a/backend/media/audio_progress.go b/backend/media/audio_progress.go index 09bd990..5dcd967 100644 --- a/backend/media/audio_progress.go +++ b/backend/media/audio_progress.go @@ -8,7 +8,7 @@ import ( ) type FetchAudioProgress struct { - percentComplete float32 + PercentComplete float32 Peaks []int16 } @@ -21,26 +21,28 @@ type FetchAudioProgressReader interface { // signed int16s and, given a target number of bins, emits a stream of peaks // corresponding to each channel of the audio data. type fetchAudioProgressReader struct { - channels int - framesPerBin int + framesExpected int64 + channels int + framesPerBin int - samples []int16 - currPeaks []int16 - currCount int - total int - progress chan FetchAudioProgress - errorChan chan error + samples []int16 + currPeaks []int16 + currCount int + framesProcessed int + progress chan FetchAudioProgress + errorChan chan error } // TODO: validate inputs, debugging is confusing otherwise -func newFetchAudioProgressReader(expFrames int64, channels, numBins int) *fetchAudioProgressReader { +func newFetchAudioProgressReader(framesExpected int64, channels, numBins int) *fetchAudioProgressReader { return &fetchAudioProgressReader{ - channels: channels, - framesPerBin: int(expFrames / int64(numBins)), - samples: make([]int16, 8_192), - currPeaks: make([]int16, channels), - progress: make(chan FetchAudioProgress), - errorChan: make(chan error, 1), + channels: channels, + framesExpected: framesExpected, + framesPerBin: int(framesExpected / int64(numBins)), + samples: make([]int16, 8_192), + currPeaks: make([]int16, channels), + progress: make(chan FetchAudioProgress), + errorChan: make(chan error, 1), } } @@ -82,6 +84,8 @@ func (w *fetchAudioProgressReader) Write(p []byte) (int, error) { } } + w.framesProcessed += len(samples) / w.channels + return len(p), nil } @@ -89,6 +93,7 @@ func (w *fetchAudioProgressReader) nextBin() { var progress FetchAudioProgress // TODO: avoid an allocation? progress.Peaks = append(progress.Peaks, w.currPeaks...) + progress.PercentComplete = (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0 w.progress <- progress @@ -97,7 +102,7 @@ func (w *fetchAudioProgressReader) nextBin() { for i := 0; i < len(w.currPeaks); i++ { w.currPeaks[i] = 0 } - w.total++ + w.framesProcessed++ } func (w *fetchAudioProgressReader) Read() (FetchAudioProgress, error) { @@ -107,7 +112,7 @@ func (w *fetchAudioProgressReader) Read() (FetchAudioProgress, error) { if !ok { return FetchAudioProgress{}, io.EOF } - return FetchAudioProgress{Peaks: progress.Peaks}, nil + return progress, nil case err := <-w.errorChan: return FetchAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err) } diff --git a/backend/media/fetch.go b/backend/media/fetch.go index f7adeee..6aa7921 100644 --- a/backend/media/fetch.go +++ b/backend/media/fetch.go @@ -21,6 +21,11 @@ const ( rawAudioSampleRate = 48_000 ) +const ( + thumbnailWidth = 177 // 16:9 + thumbnailHeight = 100 // " +) + // progressReader is a reader that prints progress logs as it reads. type progressReader struct { io.Reader @@ -75,42 +80,75 @@ func (s *FetchMediaSetService) Fetch(ctx context.Context, id string) (*MediaSet, return nil, errors.New("no format available") } - formats := FilterYoutubeAudio(video.Formats) - if len(video.Formats) == 0 { - return nil, errors.New("no format available") - } - format := formats[0] - - sampleRate, err := strconv.Atoi(format.AudioSampleRate) + audioMetadata, err := s.fetchAudioMetadata(ctx, video) if err != nil { - return nil, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate) + return nil, fmt.Errorf("error fetching audio metadata: %v", err) } - - approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs) + videoMetadata, err := s.fetchVideoMetadata(ctx, video) if err != nil { - return nil, fmt.Errorf("could not parse audio duration: %s", err) + return nil, fmt.Errorf("error fetching video metadata: %v", err) } - approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond - approxFrames := int64(approxDuration/time.Second) * int64(sampleRate) mediaSet := MediaSet{ - ID: id, - Audio: Audio{ - // we need to decode it to be able to know bytes and frames exactly - ApproxFrames: approxFrames, - Channels: format.AudioChannels, - SampleRate: sampleRate, - }, + ID: id, + Audio: audioMetadata, + Video: videoMetadata, } - // TODO: video // TODO: save to JSON return &mediaSet, nil } +func (s *FetchMediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) { + formats := FilterYoutubeVideo(video.Formats) + if len(video.Formats) == 0 { + return Video{}, errors.New("no format available") + } + format := formats[0] + + durationMsecs, err := strconv.Atoi(format.ApproxDurationMs) + if err != nil { + return Video{}, fmt.Errorf("could not parse video duration: %s", err) + } + + return Video{ + Bytes: format.ContentLength, + ThumbnailWidth: thumbnailWidth, + ThumbnailHeight: thumbnailHeight, + Duration: time.Duration(durationMsecs) * time.Millisecond, + }, nil +} + +func (s *FetchMediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) { + formats := FilterYoutubeAudio(video.Formats) + if len(video.Formats) == 0 { + return Audio{}, errors.New("no format available") + } + format := formats[0] + + sampleRate, err := strconv.Atoi(format.AudioSampleRate) + if err != nil { + return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate) + } + + approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs) + if err != nil { + return Audio{}, fmt.Errorf("could not parse audio duration: %s", err) + } + approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond + approxFrames := int64(approxDuration/time.Second) * int64(sampleRate) + + return Audio{ + // we need to decode it to be able to know bytes and frame counts exactly + ApproxFrames: approxFrames, + Channels: format.AudioChannels, + SampleRate: sampleRate, + }, nil +} + // FetchAudio fetches the audio part of a MediaSet. -func (s *FetchMediaSetService) FetchAudio(ctx context.Context, id string) (FetchAudioProgressReader, error) { +func (s *FetchMediaSetService) FetchAudio(ctx context.Context, id string, numBins int) (FetchAudioProgressReader, error) { mediaSet := NewMediaSet(id) if !mediaSet.Exists() { // TODO check if audio uploaded already, don't bother again diff --git a/backend/server/server.go b/backend/server/server.go index 2f56b2a..2c2af7c 100644 --- a/backend/server/server.go +++ b/backend/server/server.go @@ -2,6 +2,7 @@ package server import ( "context" + "io" "log" "net/http" "os" @@ -9,6 +10,7 @@ import ( pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set" "git.netflux.io/rob/clipper/media" + "git.netflux.io/rob/clipper/youtube" "github.com/improbable-eng/grpc-web/go/grpcweb" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" @@ -16,20 +18,26 @@ import ( ) type Options struct { - BindAddr string - Timeout time.Duration + BindAddr string + Timeout time.Duration + YoutubeClient youtube.YoutubeClient + S3Client media.S3Client } -// mediaSetServiceController implements gRPC controller for MediaSetService -type mediaSetServiceController struct { - pbMediaSet.UnimplementedMediaSetServiceServer +const ( + fetchAudioTimeout = time.Minute * 5 +) - mediaSetService *media.MediaSetService +// fetchMediaSetServiceController implements gRPC controller for FetchMediaSetService +type fetchMediaSetServiceController struct { + pbMediaSet.UnimplementedFetchServiceServer + + fetchMediaSetService *media.FetchMediaSetService } -// GetMediaSet returns a pbMediaSet.MediaSet -func (c *mediaSetServiceController) GetMediaSet(ctx context.Context, request *pbMediaSet.GetMediaSetRequest) (*pbMediaSet.MediaSet, error) { - mediaSet, err := c.mediaSetService.GetMediaSet(ctx, request.Source, request.Id) +// Fetch fetches a pbMediaSet.MediaSet +func (c *fetchMediaSetServiceController) Fetch(ctx context.Context, request *pbMediaSet.FetchRequest) (*pbMediaSet.MediaSet, error) { + mediaSet, err := c.fetchMediaSetService.Fetch(ctx, request.GetId()) if err != nil { return nil, err } @@ -53,17 +61,51 @@ func (c *mediaSetServiceController) GetMediaSet(ctx context.Context, request *pb return &result, nil } -func (c *mediaSetServiceController) GetPeaks(*pbMediaSet.GetPeaksRequest, pbMediaSet.MediaSetService_GetPeaksServer) error { +// TODO: wrap errors +func (c *fetchMediaSetServiceController) FetchAudio(request *pbMediaSet.FetchAudioRequest, stream pbMediaSet.FetchService_FetchAudioServer) error { + ctx, cancel := context.WithTimeout(context.Background(), fetchAudioTimeout) + defer cancel() + + reader, err := c.fetchMediaSetService.FetchAudio(ctx, request.GetId(), int(request.GetNumBins())) + if err != nil { + return err + } + + for { + progress, err := reader.Read() + if err != nil { + if err == io.EOF { + break + } + return err + } + + // TODO: consider using int32 throughout the backend flow to avoid this. + peaks := make([]int32, len(progress.Peaks)) + for i, p := range progress.Peaks { + peaks[i] = int32(p) + } + + progressPb := pbMediaSet.FetchAudioProgress{ + PercentCompleted: progress.PercentComplete, + Peaks: peaks, + } + + stream.Send(&progressPb) + } + return nil } func Start(options Options) error { grpcServer := grpc.NewServer() - pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{}) + fetchMediaSetService := media.NewFetchMediaSetService(options.YoutubeClient, options.S3Client) + + pbMediaSet.RegisterFetchServiceServer(grpcServer, &fetchMediaSetServiceController{fetchMediaSetService: fetchMediaSetService}) grpclog.SetLogger(log.New(os.Stdout, "server: ", log.LstdFlags)) - // TODO: implement CORS + // TODO: proper CORS support grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true })) handler := func(w http.ResponseWriter, r *http.Request) { grpcWebServer.ServeHTTP(w, r) diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 79c677a..0c871c1 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -1,10 +1,13 @@ import { grpc } from '@improbable-eng/grpc-web'; -import { MediaSetService } from './generated/media_set_pb_service'; import { MediaSet as MediaSetPb, - GetMediaSetRequest, + FetchRequest, + FetchAudioRequest, + FetchAudioProgress, } from './generated/media_set_pb'; +import { FetchMediaSet, FetchMediaSetAudio } from './GrpcWrapper'; + import { useState, useEffect } from 'react'; import { VideoPreview } from './VideoPreview'; import { Overview } from './Overview'; @@ -13,6 +16,8 @@ import { ControlBar } from './ControlBar'; import { SeekBar } from './SeekBar'; import './App.css'; +const grpcHost = 'http://localhost:8888'; + // Audio corresponds to media.Audio. export interface Audio { bytes: number; @@ -60,31 +65,20 @@ function App(): JSX.Element { // fetch mediaset on page load: useEffect(() => { (async function () { - const request = new GetMediaSetRequest(); + const request = new FetchRequest(); request.setId(videoID); - request.setSource('youtube'); - grpc.invoke(MediaSetService.GetMediaSet, { - request: request, - host: 'http://localhost:8888', - onMessage: (mediaSet: MediaSetPb) => { - console.log('rcvd media set: ', mediaSet.toObject()); - }, - onEnd: ( - code: grpc.Code, - msg: string | undefined, - trailers: grpc.Metadata - ) => { - console.log( - 'finished, got code', - code, - 'msg', - msg, - 'trailers', - trailers - ); - }, - }); + const mediaSet = await FetchMediaSet(grpcHost, request); + console.log('got media set:', mediaSet); + + const handleProgress = (progress: FetchAudioProgress) => { + console.log('got progress', progress); + }; + + const audioRequest = new FetchAudioRequest(); + audioRequest.setId(videoID); + audioRequest.setNumBins(1000); + FetchMediaSetAudio(grpcHost, audioRequest, handleProgress); // console.log('fetching media...'); // const resp = await fetch( diff --git a/frontend/src/GrpcWrapper.tsx b/frontend/src/GrpcWrapper.tsx new file mode 100644 index 0000000..1ad8876 --- /dev/null +++ b/frontend/src/GrpcWrapper.tsx @@ -0,0 +1,55 @@ +import { grpc } from '@improbable-eng/grpc-web'; +import { FetchService } from './generated/media_set_pb_service'; +import { + MediaSet, + FetchRequest, + FetchAudioProgress, + FetchAudioRequest, +} from './generated/media_set_pb'; + +export const FetchMediaSet = ( + host: string, + request: FetchRequest +): Promise => { + return new Promise((resolve, reject) => { + let result: MediaSet; + + grpc.invoke(FetchService.Fetch, { + host: host, + request: request, + onMessage: (mediaSet: MediaSet) => { + result = mediaSet; + }, + onEnd: ( + code: grpc.Code, + msg: string | undefined, + _trailers: grpc.Metadata + ) => { + if (code != 0) { + reject(new Error(`unexpected grpc code: ${code}, message: ${msg}`)); + return; + } + resolve(result); + }, + }); + }); +}; + +export const FetchMediaSetAudio = ( + host: string, + request: FetchAudioRequest, + onProgress: { (progress: FetchAudioProgress): void } +) => { + grpc.invoke(FetchService.FetchAudio, { + host: 'http://localhost:8888', + request: request, + onMessage: onProgress, + onEnd: ( + code: grpc.Code, + msg: string | undefined, + trailers: grpc.Metadata + ) => { + console.log('fetch audio request ended'); + }, + }); +}; diff --git a/proto/media_set.proto b/proto/media_set.proto index 4bf78f7..7ef745c 100644 --- a/proto/media_set.proto +++ b/proto/media_set.proto @@ -27,22 +27,21 @@ message MediaSet { bool loaded = 4; }; -message PeaksProgress { +message FetchAudioProgress { float percent_completed = 2; repeated int32 peaks = 1; } -message GetMediaSetRequest { +message FetchRequest { string id = 1; - string source = 2; } -message GetPeaksRequest { +message FetchAudioRequest { string id = 1; int32 num_bins = 2; } -service MediaSetService { - rpc GetMediaSet(GetMediaSetRequest) returns (MediaSet) {} - rpc GetPeaks(GetPeaksRequest) returns (stream PeaksProgress) {} +service FetchService { + rpc Fetch(FetchRequest) returns (MediaSet) {} + rpc FetchAudio(FetchAudioRequest) returns (stream FetchAudioProgress) {} }