From 7c5b22a4078fa4688821dd799d046990ac097f36 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Mon, 1 Nov 2021 06:28:40 +0100 Subject: [PATCH] Introduce PG store --- .gitignore | 2 +- backend/.env | 2 - backend/.env.example | 4 + backend/cmd/clipper/main.go | 17 +- backend/cmd/progress-test/main.go | 23 +- backend/go.mod | 2 + backend/go.sum | 3 + backend/media/audio_progress.go | 20 +- backend/media/fetch.go | 268 -------------- backend/media/media_set.go | 41 +- backend/media/service.go | 349 +++++++++++++++++- backend/server/handlers.go | 148 ++++---- backend/server/server.go | 65 +++- ...635710597_create_media_sets_table.down.sql | 3 + .../1635710597_create_media_sets_table.up.sql | 39 ++ backend/sql/queries.sql | 10 + backend/sqlc.yaml | 6 + backend/youtube/youtube2.go | 2 +- frontend/src/App.tsx | 28 +- frontend/src/GrpcWrapper.tsx | 50 +-- proto/media_set.proto | 14 +- 21 files changed, 628 insertions(+), 468 deletions(-) delete mode 100644 backend/.env create mode 100644 backend/.env.example delete mode 100644 backend/media/fetch.go create mode 100644 backend/sql/migrations/1635710597_create_media_sets_table.down.sql create mode 100644 backend/sql/migrations/1635710597_create_media_sets_table.up.sql create mode 100644 backend/sql/queries.sql create mode 100644 backend/sqlc.yaml diff --git a/.gitignore b/.gitignore index 395b992..c72f6c8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -*.m4a +/backend/.env /backend/cache/ /backend/debug/ diff --git a/backend/.env b/backend/.env deleted file mode 100644 index d1f3ba9..0000000 --- a/backend/.env +++ /dev/null @@ -1,2 +0,0 @@ -AWS_ACCESS_KEY_ID=AKIARZPRT6YGKUMKQPV5 -AWS_SECRET_ACCESS_KEY=P8zJInhiHoXT4NV0gFMNHy8XVN285CqfOSCeaCHX diff --git a/backend/.env.example b/backend/.env.example new file mode 100644 index 0000000..d738c50 --- /dev/null +++ b/backend/.env.example @@ -0,0 +1,4 @@ +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= + +DATABASE_URL= diff --git a/backend/cmd/clipper/main.go b/backend/cmd/clipper/main.go index a33b200..ff2925b 100644 --- a/backend/cmd/clipper/main.go +++ b/backend/cmd/clipper/main.go @@ -2,13 +2,17 @@ package main import ( "context" + "database/sql" "log" + "os" "time" + "git.netflux.io/rob/clipper/generated/store" "git.netflux.io/rob/clipper/server" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/kkdai/youtube/v2" + _ "github.com/lib/pq" ) const ( @@ -19,12 +23,20 @@ const ( func main() { ctx := context.Background() + // Create a store + databaseURL := os.Getenv("DATABASE_URL") + log.Printf("DATABASE_URL = %s", databaseURL) + db, err := sql.Open("postgres", databaseURL) + if err != nil { + log.Fatal(err) + } + store := store.New(db) + + // Create an Amazon S3 service s3Client cfg, err := config.LoadDefaultConfig(ctx) if err != nil { log.Fatal(err) } - - // Create an Amazon S3 service s3Client s3Client := s3.NewFromConfig(cfg) // Create a Youtube client @@ -33,6 +45,7 @@ func main() { serverOptions := server.Options{ BindAddr: DefaultHTTPBindAddr, Timeout: DefaultTimeout, + Store: store, YoutubeClient: &youtubeClient, S3Client: s3Client, } diff --git a/backend/cmd/progress-test/main.go b/backend/cmd/progress-test/main.go index a1fab26..3151ed4 100644 --- a/backend/cmd/progress-test/main.go +++ b/backend/cmd/progress-test/main.go @@ -2,13 +2,18 @@ package main import ( "context" + "database/sql" "io" "log" + "os" + "git.netflux.io/rob/clipper/generated/store" "git.netflux.io/rob/clipper/media" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/google/uuid" "github.com/kkdai/youtube/v2" + _ "github.com/lib/pq" ) const ( @@ -18,22 +23,30 @@ const ( func main() { ctx := context.Background() + // Create a store + databaseURL := os.Getenv("DATABASE_URL") + db, err := sql.Open("postgres", databaseURL) + if err != nil { + log.Fatal(err) + } + store := store.New(db) + + // Create an Amazon S3 service s3Client cfg, err := config.LoadDefaultConfig(ctx) if err != nil { log.Fatal(err) } - - // Create an Amazon S3 service s3Client s3Client := s3.NewFromConfig(cfg) // Create a Youtube client var youtubeClient youtube.Client - // Create a VideoFetchService - fetchService := media.NewFetchMediaSetService(&youtubeClient, s3Client) + // Create a MediaSetService + mediaSetService := media.NewMediaSetService(store, &youtubeClient, s3Client) // Create a progressReader - progressReader, err := fetchService.FetchAudio(ctx, videoID) + // TODO: fix + progressReader, err := mediaSetService.GetAudio(ctx, uuid.New(), 100) if err != nil { log.Fatalf("error calling fetch service: %v", err) } diff --git a/backend/go.mod b/backend/go.mod index 87060e6..4811a68 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -3,6 +3,7 @@ module git.netflux.io/rob/clipper go 1.17 require ( + github.com/google/uuid v1.1.2 github.com/improbable-eng/grpc-web v0.14.1 github.com/kkdai/youtube/v2 v2.7.4 github.com/labstack/echo/v4 v4.6.0 @@ -30,6 +31,7 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/klauspost/compress v1.11.7 // indirect github.com/labstack/gommon v0.3.0 // indirect + github.com/lib/pq v1.10.3 // indirect github.com/mattn/go-colorable v0.1.8 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/backend/go.sum b/backend/go.sum index 1d6670b..d202c4a 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -252,6 +252,7 @@ github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= @@ -341,6 +342,8 @@ github.com/labstack/gommon v0.3.0 h1:JEeO0bvc78PKdyHxloTKiF8BD5iGrH8T6MSeGvSgob0 github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg= +github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= diff --git a/backend/media/audio_progress.go b/backend/media/audio_progress.go index 5dcd967..2ccc989 100644 --- a/backend/media/audio_progress.go +++ b/backend/media/audio_progress.go @@ -7,13 +7,13 @@ import ( "io" ) -type FetchAudioProgress struct { +type GetAudioProgress struct { PercentComplete float32 Peaks []int16 } -type FetchAudioProgressReader interface { - Read() (FetchAudioProgress, error) +type GetAudioProgressReader interface { + Read() (GetAudioProgress, error) Close() error } @@ -29,19 +29,19 @@ type fetchAudioProgressReader struct { currPeaks []int16 currCount int framesProcessed int - progress chan FetchAudioProgress + progress chan GetAudioProgress errorChan chan error } // TODO: validate inputs, debugging is confusing otherwise -func newFetchAudioProgressReader(framesExpected int64, channels, numBins int) *fetchAudioProgressReader { +func newGetAudioProgressReader(framesExpected int64, channels, numBins int) *fetchAudioProgressReader { return &fetchAudioProgressReader{ channels: channels, framesExpected: framesExpected, framesPerBin: int(framesExpected / int64(numBins)), samples: make([]int16, 8_192), currPeaks: make([]int16, channels), - progress: make(chan FetchAudioProgress), + progress: make(chan GetAudioProgress), errorChan: make(chan error, 1), } } @@ -90,7 +90,7 @@ func (w *fetchAudioProgressReader) Write(p []byte) (int, error) { } func (w *fetchAudioProgressReader) nextBin() { - var progress FetchAudioProgress + var progress GetAudioProgress // TODO: avoid an allocation? progress.Peaks = append(progress.Peaks, w.currPeaks...) progress.PercentComplete = (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0 @@ -105,16 +105,16 @@ func (w *fetchAudioProgressReader) nextBin() { w.framesProcessed++ } -func (w *fetchAudioProgressReader) Read() (FetchAudioProgress, error) { +func (w *fetchAudioProgressReader) Read() (GetAudioProgress, error) { for { select { case progress, ok := <-w.progress: if !ok { - return FetchAudioProgress{}, io.EOF + return GetAudioProgress{}, io.EOF } return progress, nil case err := <-w.errorChan: - return FetchAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err) + return GetAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err) } } } diff --git a/backend/media/fetch.go b/backend/media/fetch.go deleted file mode 100644 index 6aa7921..0000000 --- a/backend/media/fetch.go +++ /dev/null @@ -1,268 +0,0 @@ -package media - -import ( - "context" - "errors" - "fmt" - "io" - "log" - "strconv" - "time" - - "github.com/aws/aws-sdk-go-v2/service/s3" - youtubev2 "github.com/kkdai/youtube/v2" -) - -const s3Bucket = "clipper-development" - -const ( - rawAudioCodec = "pcm_s16le" - rawAudioFormat = "s16le" - rawAudioSampleRate = 48_000 -) - -const ( - thumbnailWidth = 177 // 16:9 - thumbnailHeight = 100 // " -) - -// progressReader is a reader that prints progress logs as it reads. -type progressReader struct { - io.Reader - label string - total, exp int -} - -func (pw *progressReader) Read(p []byte) (int, error) { - n, err := pw.Reader.Read(p) - pw.total += n - - log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0) - - return n, err -} - -// S3Client wraps the AWS S3 service client. -type S3Client interface { - CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) - UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) - AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) - CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) -} - -// YoutubeClient wraps the youtube.Client client. -type YoutubeClient interface { - GetVideoContext(context.Context, string) (*youtubev2.Video, error) - GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error) -} - -// FetchMediaSetService fetches a video via an io.Reader. -type FetchMediaSetService struct { - youtube YoutubeClient - s3 S3Client -} - -func NewFetchMediaSetService(youtubeClient YoutubeClient, s3Client S3Client) *FetchMediaSetService { - return &FetchMediaSetService{ - youtube: youtubeClient, - s3: s3Client, - } -} - -// Fetch fetches the metadata for a given MediaSet source. -func (s *FetchMediaSetService) Fetch(ctx context.Context, id string) (*MediaSet, error) { - video, err := s.youtube.GetVideoContext(ctx, id) - if err != nil { - return nil, fmt.Errorf("error fetching video: %v", err) - } - - if len(video.Formats) == 0 { - return nil, errors.New("no format available") - } - - audioMetadata, err := s.fetchAudioMetadata(ctx, video) - if err != nil { - return nil, fmt.Errorf("error fetching audio metadata: %v", err) - } - videoMetadata, err := s.fetchVideoMetadata(ctx, video) - if err != nil { - return nil, fmt.Errorf("error fetching video metadata: %v", err) - } - - mediaSet := MediaSet{ - ID: id, - Audio: audioMetadata, - Video: videoMetadata, - } - - // TODO: save to JSON - - return &mediaSet, nil -} - -func (s *FetchMediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) { - formats := FilterYoutubeVideo(video.Formats) - if len(video.Formats) == 0 { - return Video{}, errors.New("no format available") - } - format := formats[0] - - durationMsecs, err := strconv.Atoi(format.ApproxDurationMs) - if err != nil { - return Video{}, fmt.Errorf("could not parse video duration: %s", err) - } - - return Video{ - Bytes: format.ContentLength, - ThumbnailWidth: thumbnailWidth, - ThumbnailHeight: thumbnailHeight, - Duration: time.Duration(durationMsecs) * time.Millisecond, - }, nil -} - -func (s *FetchMediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) { - formats := FilterYoutubeAudio(video.Formats) - if len(video.Formats) == 0 { - return Audio{}, errors.New("no format available") - } - format := formats[0] - - sampleRate, err := strconv.Atoi(format.AudioSampleRate) - if err != nil { - return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate) - } - - approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs) - if err != nil { - return Audio{}, fmt.Errorf("could not parse audio duration: %s", err) - } - approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond - approxFrames := int64(approxDuration/time.Second) * int64(sampleRate) - - return Audio{ - // we need to decode it to be able to know bytes and frame counts exactly - ApproxFrames: approxFrames, - Channels: format.AudioChannels, - SampleRate: sampleRate, - }, nil -} - -// FetchAudio fetches the audio part of a MediaSet. -func (s *FetchMediaSetService) FetchAudio(ctx context.Context, id string, numBins int) (FetchAudioProgressReader, error) { - mediaSet := NewMediaSet(id) - if !mediaSet.Exists() { - // TODO check if audio uploaded already, don't bother again - return nil, errors.New("no media set found") - } - - if err := mediaSet.Load(); err != nil { - return nil, fmt.Errorf("error loading media set: %v", err) - } - - video, err := s.youtube.GetVideoContext(ctx, id) - if err != nil { - return nil, fmt.Errorf("error fetching video: %v", err) - } - - formats := FilterYoutubeAudio(video.Formats) - if len(video.Formats) == 0 { - return nil, errors.New("no format available") - } - format := formats[0] - - stream, _, err := s.youtube.GetStreamContext(ctx, video, &format) - if err != nil { - return nil, fmt.Errorf("error fetching stream: %v", err) - } - - // wrap it in a progress reader - progressStream := &progressReader{Reader: stream, label: "audio", exp: int(format.ContentLength)} - - ffmpegReader, err := newFfmpegReader(ctx, progressStream, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") - if err != nil { - return nil, fmt.Errorf("error creating ffmpegreader: %v", err) - } - - // set up uploader, this is writer 1 - uploader, err := newMultipartUploadWriter( - ctx, - s.s3, - s3Bucket, - fmt.Sprintf("media_sets/%s/audio.webm", id), - "application/octet-stream", - ) - if err != nil { - return nil, fmt.Errorf("error creating uploader: %v", err) - } - - fetchAudioProgressReader := newFetchAudioProgressReader( - mediaSet.Audio.ApproxFrames, - format.AudioChannels, - 100, - ) - - state := fetchAudioState{ - fetchAudioProgressReader: fetchAudioProgressReader, - ffmpegReader: ffmpegReader, - uploader: uploader, - } - go state.run(ctx) - - return &state, nil -} - -type fetchAudioState struct { - *fetchAudioProgressReader - - ffmpegReader io.ReadCloser - uploader *multipartUploadWriter -} - -func (s *fetchAudioState) run(ctx context.Context) { - mw := io.MultiWriter(s, s.uploader) - done := make(chan error) - var err error - - go func() { - _, copyErr := io.Copy(mw, s.ffmpegReader) - done <- copyErr - }() - -outer: - for { - select { - case <-ctx.Done(): - err = ctx.Err() - break outer - case err = <-done: - break outer - } - } - - if readerErr := s.ffmpegReader.Close(); readerErr != nil { - if err == nil { - err = readerErr - } - } - - if err == nil { - if uploaderErr := s.uploader.Complete(); uploaderErr != nil { - err = uploaderErr - } - } - - if err != nil { - newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - if abortUploadErr := s.uploader.Abort(newCtx); abortUploadErr != nil { - log.Printf("error aborting uploader: %v", abortUploadErr) - } - s.Abort(err) - return - } - - if iterErr := s.Close(); iterErr != nil { - log.Printf("error closing peak iterator: %v", iterErr) - } -} diff --git a/backend/media/media_set.go b/backend/media/media_set.go index 0f7b4ba..7f63b1d 100644 --- a/backend/media/media_set.go +++ b/backend/media/media_set.go @@ -18,43 +18,48 @@ type Audio struct { Channels int `json:"channels"` // ApproxFrames is used during initial processing when a precise frame count // cannot be determined. Prefer Frames in all other cases. - ApproxFrames int64 `json:"approx_frames"` - Frames int64 `json:"frames"` - SampleRate int `json:"sample_rate"` + ApproxFrames int64 `json:"approx_frames"` + Frames int64 `json:"frames"` + SampleRate int `json:"sample_rate"` + YoutubeItag int `json:"youtube_itag"` + MimeType string `json:"mime_type"` } type Video struct { Bytes int64 `json:"bytes"` Duration time.Duration `json:"duration"` // not sure if this are needed any more? - ThumbnailWidth int `json:"thumbnail_width"` - ThumbnailHeight int `json:"thumbnail_height"` + ThumbnailWidth int `json:"thumbnail_width"` + ThumbnailHeight int `json:"thumbnail_height"` + YoutubeItag int `json:"youtube_itag"` + MimeType string `json:"mime_type"` } // MediaSet represents the media and metadata associated with a single media // resource (for example, a YouTube video). type MediaSet struct { - Audio Audio `json:"audio"` - Video Video `json:"video"` - ID string `json:"id"` + Audio Audio `json:"audio"` + Video Video `json:"video"` + ID string `json:"id"` + YoutubeID string `json:"youtube_id"` - exists bool + exists bool `json:"exists"` } // New builds a new MediaSet with the given ID. -func NewMediaSet(id string) *MediaSet { - return &MediaSet{ID: id} +func NewMediaSet(youtubeID string) *MediaSet { + return &MediaSet{YoutubeID: youtubeID} } // TODO: pass io.Readers/Writers instead of strings. -func (m *MediaSet) RawAudioPath() string { return fmt.Sprintf("cache/%s.raw", m.ID) } -func (m *MediaSet) EncodedAudioPath() string { return fmt.Sprintf("cache/%s.m4a", m.ID) } -func (m *MediaSet) VideoPath() string { return fmt.Sprintf("cache/%s.mp4", m.ID) } -func (m *MediaSet) ThumbnailPath() string { return fmt.Sprintf("cache/%s.jpg", m.ID) } -func (m *MediaSet) MetadataPath() string { return fmt.Sprintf("cache/%s.json", m.ID) } +func (m *MediaSet) RawAudioPath() string { return fmt.Sprintf("cache/%s.raw", m.YoutubeID) } +func (m *MediaSet) EncodedAudioPath() string { return fmt.Sprintf("cache/%s.m4a", m.YoutubeID) } +func (m *MediaSet) VideoPath() string { return fmt.Sprintf("cache/%s.mp4", m.YoutubeID) } +func (m *MediaSet) ThumbnailPath() string { return fmt.Sprintf("cache/%s.jpg", m.YoutubeID) } +func (m *MediaSet) MetadataPath() string { return fmt.Sprintf("cache/%s.json", m.YoutubeID) } func (m *MediaSet) Exists() bool { - if m.ID == "" { + if m.YoutubeID == "" { return false } if m.exists { @@ -68,7 +73,7 @@ func (m *MediaSet) Exists() bool { } func (m *MediaSet) Load() error { - if m.ID == "" { + if m.YoutubeID == "" { return errors.New("error opening mediaset with blank ID") } diff --git a/backend/media/service.go b/backend/media/service.go index 12fcfde..ced51e5 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -2,28 +2,345 @@ package media import ( "context" + "database/sql" "errors" "fmt" + "io" "log" + "strconv" + "time" + + "git.netflux.io/rob/clipper/generated/store" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/google/uuid" + youtubev2 "github.com/kkdai/youtube/v2" ) -type MediaSetService struct{} +const s3Bucket = "clipper-development" -func (s *MediaSetService) GetMediaSet(ctx context.Context, source string, id string) (*MediaSet, error) { - log.Printf("GetMediaSet called with source %q, id %q", source, id) +const ( + rawAudioCodec = "pcm_s16le" + rawAudioFormat = "s16le" + rawAudioSampleRate = 48_000 +) - if source != "youtube" { - return nil, errors.New("unknown source") - } +const ( + thumbnailWidth = 177 // 16:9 + thumbnailHeight = 100 // " +) - // try to load and return a cached MediaSet, if possible: - mediaSet := NewMediaSet(id) - if mediaSet.Exists() { - if err := mediaSet.Load(); err != nil { - return nil, fmt.Errorf("error loading MediaSet: %v", err) - } - return mediaSet, nil - } - - return &MediaSet{ID: id}, nil +// progressReader is a reader that prints progress logs as it reads. +type progressReader struct { + io.Reader + label string + total, exp int +} + +func (pw *progressReader) Read(p []byte) (int, error) { + n, err := pw.Reader.Read(p) + pw.total += n + + log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0) + + return n, err +} + +// Store wraps a database store. +type Store interface { + GetMediaSet(ctx context.Context, id uuid.UUID) (store.MediaSet, error) + GetMediaSetByYoutubeID(ctx context.Context, youtubeID string) (store.MediaSet, error) + CreateMediaSet(ctx context.Context, arg store.CreateMediaSetParams) (store.MediaSet, error) +} + +// S3Client wraps the AWS S3 service client. +type S3Client interface { + CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) + UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) + AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) + CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) +} + +// YoutubeClient wraps the youtube.Client client. +type YoutubeClient interface { + GetVideoContext(context.Context, string) (*youtubev2.Video, error) + GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error) +} + +// MediaSetService exposes logical flows handling MediaSets. +type MediaSetService struct { + store Store + youtube YoutubeClient + s3 S3Client +} + +func NewMediaSetService(store Store, youtubeClient YoutubeClient, s3Client S3Client) *MediaSetService { + return &MediaSetService{ + store: store, + youtube: youtubeClient, + s3: s3Client, + } +} + +// Get fetches the metadata for a given MediaSet source. +func (s *MediaSetService) Get(ctx context.Context, youtubeID string) (*MediaSet, error) { + var ( + mediaSet *MediaSet + err error + ) + + mediaSet, err = s.findMediaSet(ctx, youtubeID) + if err != nil { + return nil, fmt.Errorf("error getting existing media set: %v", err) + } + + if mediaSet == nil { + mediaSet, err = s.createMediaSet(ctx, youtubeID) + if err != nil { + return nil, fmt.Errorf("error getting new media set: %v", err) + } + } + + return mediaSet, nil +} + +func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) { + video, err := s.youtube.GetVideoContext(ctx, youtubeID) + if err != nil { + return nil, fmt.Errorf("error fetching video: %v", err) + } + + if len(video.Formats) == 0 { + return nil, errors.New("no format available") + } + + audioMetadata, err := s.fetchAudioMetadata(ctx, video) + if err != nil { + return nil, fmt.Errorf("error fetching audio metadata: %v", err) + } + videoMetadata, err := s.fetchVideoMetadata(ctx, video) + if err != nil { + return nil, fmt.Errorf("error fetching video metadata: %v", err) + } + + params := store.CreateMediaSetParams{ + YoutubeID: youtubeID, + AudioYoutubeItag: int32(audioMetadata.YoutubeItag), + AudioChannels: int32(audioMetadata.Channels), + AudioFramesApprox: audioMetadata.ApproxFrames, + AudioSampleRateRaw: int32(audioMetadata.SampleRate), + AudioMimeTypeEncoded: audioMetadata.MimeType, + VideoYoutubeItag: int32(videoMetadata.YoutubeItag), + VideoMimeType: videoMetadata.MimeType, + VideoDurationNanos: videoMetadata.Duration.Nanoseconds(), + } + mediaSet, err := s.store.CreateMediaSet(ctx, params) + if err != nil { + return nil, fmt.Errorf("error creating media set in store: %v", err) + } + + return &MediaSet{ + ID: mediaSet.ID.String(), + YoutubeID: youtubeID, + Audio: audioMetadata, + Video: videoMetadata, + }, nil +} + +// findMediaSet fetches a record from the database, returning (nil, nil) if it does not exist. +func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) { + mediaSet, err := s.store.GetMediaSetByYoutubeID(ctx, youtubeID) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, fmt.Errorf("error getting existing media set: %v", err) + } + + var frames int64 + if mediaSet.AudioFramesRaw.Valid { + frames = mediaSet.AudioFramesRaw.Int64 + } + + return &MediaSet{ + Audio: Audio{ + YoutubeItag: int(mediaSet.AudioYoutubeItag), + Bytes: 0, // DEPRECATED + Channels: int(mediaSet.AudioChannels), + ApproxFrames: int64(mediaSet.AudioFramesApprox), + Frames: frames, + SampleRate: int(mediaSet.AudioSampleRateRaw), + }, + Video: Video{ + YoutubeItag: int(mediaSet.VideoYoutubeItag), + Bytes: 0, // DEPRECATED? + Duration: time.Duration(mediaSet.VideoDurationNanos), + ThumbnailWidth: 0, // ?? + ThumbnailHeight: 0, // ?? + }, + YoutubeID: mediaSet.YoutubeID, + }, nil +} + +func (s *MediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) { + formats := FilterYoutubeVideo(video.Formats) + if len(video.Formats) == 0 { + return Video{}, errors.New("no format available") + } + format := formats[0] + + durationMsecs, err := strconv.Atoi(format.ApproxDurationMs) + if err != nil { + return Video{}, fmt.Errorf("could not parse video duration: %s", err) + } + + return Video{ + YoutubeItag: format.ItagNo, + MimeType: format.MimeType, + Bytes: format.ContentLength, + ThumbnailWidth: thumbnailWidth, + ThumbnailHeight: thumbnailHeight, + Duration: time.Duration(durationMsecs) * time.Millisecond, + }, nil +} + +func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) { + formats := FilterYoutubeAudio(video.Formats) + if len(video.Formats) == 0 { + return Audio{}, errors.New("no format available") + } + format := formats[0] + + sampleRate, err := strconv.Atoi(format.AudioSampleRate) + if err != nil { + return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate) + } + + approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs) + if err != nil { + return Audio{}, fmt.Errorf("could not parse audio duration: %s", err) + } + approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond + approxFrames := int64(approxDuration/time.Second) * int64(sampleRate) + + return Audio{ + MimeType: format.MimeType, + YoutubeItag: format.ItagNo, + ApproxFrames: approxFrames, + Channels: format.AudioChannels, + SampleRate: sampleRate, + }, nil +} + +// GetAudio fetches the audio part of a MediaSet. +func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins int) (GetAudioProgressReader, error) { + mediaSet, err := s.store.GetMediaSet(ctx, id) + if err != nil { + return nil, fmt.Errorf("error getting media set: %v", err) + } + + video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) + if err != nil { + return nil, fmt.Errorf("error fetching video: %v", err) + } + + format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag)) + if format == nil { + return nil, fmt.Errorf("error finding itag: %v", err) + } + + stream, _, err := s.youtube.GetStreamContext(ctx, video, format) + if err != nil { + return nil, fmt.Errorf("error fetching stream: %v", err) + } + + // wrap it in a progress reader + progressStream := &progressReader{Reader: stream, label: "audio", exp: int(format.ContentLength)} + + ffmpegReader, err := newFfmpegReader(ctx, progressStream, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") + if err != nil { + return nil, fmt.Errorf("error creating ffmpegreader: %v", err) + } + + // set up uploader, this is writer 1 + uploader, err := newMultipartUploadWriter( + ctx, + s.s3, + s3Bucket, + fmt.Sprintf("media_sets/%s/audio.webm", id), + "application/octet-stream", + ) + if err != nil { + return nil, fmt.Errorf("error creating uploader: %v", err) + } + + fetchAudioProgressReader := newGetAudioProgressReader( + int64(mediaSet.AudioFramesApprox), + format.AudioChannels, + 100, + ) + + state := fetchAudioState{ + fetchAudioProgressReader: fetchAudioProgressReader, + ffmpegReader: ffmpegReader, + uploader: uploader, + } + go state.run(ctx) + + return &state, nil +} + +type fetchAudioState struct { + *fetchAudioProgressReader + + ffmpegReader io.ReadCloser + uploader *multipartUploadWriter +} + +func (s *fetchAudioState) run(ctx context.Context) { + mw := io.MultiWriter(s, s.uploader) + done := make(chan error) + var err error + + go func() { + _, copyErr := io.Copy(mw, s.ffmpegReader) + done <- copyErr + }() + +outer: + for { + select { + case <-ctx.Done(): + err = ctx.Err() + break outer + case err = <-done: + break outer + } + } + + if readerErr := s.ffmpegReader.Close(); readerErr != nil { + if err == nil { + err = readerErr + } + } + + if err == nil { + if uploaderErr := s.uploader.Complete(); uploaderErr != nil { + err = uploaderErr + } + } + + if err != nil { + newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + if abortUploadErr := s.uploader.Abort(newCtx); abortUploadErr != nil { + log.Printf("error aborting uploader: %v", abortUploadErr) + } + s.Abort(err) + return + } + + if iterErr := s.Close(); iterErr != nil { + log.Printf("error closing peak iterator: %v", iterErr) + } } diff --git a/backend/server/handlers.go b/backend/server/handlers.go index 5018723..474b60a 100644 --- a/backend/server/handlers.go +++ b/backend/server/handlers.go @@ -1,95 +1,83 @@ package server -import ( - "encoding/json" - "log" - "net/http" - "strconv" +// // getMediaSet is a handler that responds with a MediaSet. +// func getMediaSet(c echo.Context) error { +// videoID := c.Param("id") +// mediaSet := media.NewMediaSet(videoID) - "git.netflux.io/rob/clipper/media" - "git.netflux.io/rob/clipper/youtube" - youtubev2 "github.com/kkdai/youtube/v2" - "github.com/labstack/echo/v4" -) +// if mediaSet.Exists() { +// if err := mediaSet.Load(); err != nil { +// log.Printf("error loading MediaSet: %v", err) +// return echo.NewHTTPError(http.StatusInternalServerError, "could not fetch media set") +// } +// return c.JSON(http.StatusOK, mediaSet) +// } -// getMediaSet is a handler that responds with a MediaSet. -func getMediaSet(c echo.Context) error { - videoID := c.Param("id") - mediaSet := media.NewMediaSet(videoID) +// var youtubeClient youtubev2.Client +// downloader := youtube.NewDownloader(&youtubeClient) +// mediaSet, err := downloader.Download(c.Request().Context(), videoID) +// if err != nil { +// log.Printf("error downloading MediaSet: %v", err) +// return echo.NewHTTPError(http.StatusInternalServerError, "could not fetch media set") +// } +// return c.JSON(http.StatusOK, mediaSet) +// } - if mediaSet.Exists() { - if err := mediaSet.Load(); err != nil { - log.Printf("error loading MediaSet: %v", err) - return echo.NewHTTPError(http.StatusInternalServerError, "could not fetch media set") - } - return c.JSON(http.StatusOK, mediaSet) - } +// // getThumbnails is a handler that responds with a MediaSet thumbnail grid. +// func getThumbnails(c echo.Context) error { +// videoID := c.Param("id") +// mediaSet := media.NewMediaSet(videoID) +// if err := mediaSet.Load(); err != nil { +// log.Printf("error loading MediaSet: %v", err) +// return echo.NewHTTPError(http.StatusInternalServerError, "could not load media set") +// } - var youtubeClient youtubev2.Client - downloader := youtube.NewDownloader(&youtubeClient) - mediaSet, err := downloader.Download(c.Request().Context(), videoID) - if err != nil { - log.Printf("error downloading MediaSet: %v", err) - return echo.NewHTTPError(http.StatusInternalServerError, "could not fetch media set") - } - return c.JSON(http.StatusOK, mediaSet) -} +// return c.File(mediaSet.ThumbnailPath()) +// } -// getThumbnails is a handler that responds with a MediaSet thumbnail grid. -func getThumbnails(c echo.Context) error { - videoID := c.Param("id") - mediaSet := media.NewMediaSet(videoID) - if err := mediaSet.Load(); err != nil { - log.Printf("error loading MediaSet: %v", err) - return echo.NewHTTPError(http.StatusInternalServerError, "could not load media set") - } +// // getVideo is a handler that responds with the video file for a MediaSet +// func getVideo(c echo.Context) error { +// videoID := c.Param("id") +// mediaSet := media.NewMediaSet(videoID) +// if err := mediaSet.Load(); err != nil { +// log.Printf("error loading MediaSet: %v", err) +// return echo.NewHTTPError(http.StatusInternalServerError, "could not load media set") +// } - return c.File(mediaSet.ThumbnailPath()) -} +// return c.File(mediaSet.VideoPath()) +// } -// getVideo is a handler that responds with the video file for a MediaSet -func getVideo(c echo.Context) error { - videoID := c.Param("id") - mediaSet := media.NewMediaSet(videoID) - if err := mediaSet.Load(); err != nil { - log.Printf("error loading MediaSet: %v", err) - return echo.NewHTTPError(http.StatusInternalServerError, "could not load media set") - } +// // getPeaks is a handler that returns a two-dimensional array of peaks, with +// // the number of bins matching the provided parameter. +// func getPeaks(c echo.Context) error { +// videoID := c.Param("id") - return c.File(mediaSet.VideoPath()) -} +// start, err := strconv.ParseInt(c.QueryParam("start"), 0, 64) +// if err != nil { +// return echo.NewHTTPError(http.StatusBadRequest, "invalid start parameter provided") +// } -// getPeaks is a handler that returns a two-dimensional array of peaks, with -// the number of bins matching the provided parameter. -func getPeaks(c echo.Context) error { - videoID := c.Param("id") +// end, err := strconv.ParseInt(c.QueryParam("end"), 0, 64) +// if err != nil { +// return echo.NewHTTPError(http.StatusBadRequest, "invalid end parameter provided") +// } - start, err := strconv.ParseInt(c.QueryParam("start"), 0, 64) - if err != nil { - return echo.NewHTTPError(http.StatusBadRequest, "invalid start parameter provided") - } +// numBins, err := strconv.Atoi(c.QueryParam("bins")) +// if err != nil { +// return echo.NewHTTPError(http.StatusBadRequest, "invalid bins parameter provided") +// } - end, err := strconv.ParseInt(c.QueryParam("end"), 0, 64) - if err != nil { - return echo.NewHTTPError(http.StatusBadRequest, "invalid end parameter provided") - } +// mediaSet := media.NewMediaSet(videoID) +// if err = mediaSet.Load(); err != nil { +// log.Printf("error loading MediaSet: %v", err) +// return echo.NewHTTPError(http.StatusInternalServerError, "could not load media set") +// } - numBins, err := strconv.Atoi(c.QueryParam("bins")) - if err != nil { - return echo.NewHTTPError(http.StatusBadRequest, "invalid bins parameter provided") - } +// peaks, err := mediaSet.Peaks(start, end, numBins) +// if err != nil { +// log.Printf("error generating peaks: %v", err) +// return echo.NewHTTPError(http.StatusInternalServerError, "could not generate peaks") +// } - mediaSet := media.NewMediaSet(videoID) - if err = mediaSet.Load(); err != nil { - log.Printf("error loading MediaSet: %v", err) - return echo.NewHTTPError(http.StatusInternalServerError, "could not load media set") - } - - peaks, err := mediaSet.Peaks(start, end, numBins) - if err != nil { - log.Printf("error generating peaks: %v", err) - return echo.NewHTTPError(http.StatusInternalServerError, "could not generate peaks") - } - - return json.NewEncoder(c.Response()).Encode(peaks) -} +// return json.NewEncoder(c.Response()).Encode(peaks) +// } diff --git a/backend/server/server.go b/backend/server/server.go index 2c2af7c..cb998df 100644 --- a/backend/server/server.go +++ b/backend/server/server.go @@ -2,6 +2,7 @@ package server import ( "context" + "fmt" "io" "log" "net/http" @@ -11,39 +12,60 @@ import ( pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set" "git.netflux.io/rob/clipper/media" "git.netflux.io/rob/clipper/youtube" + "github.com/google/uuid" "github.com/improbable-eng/grpc-web/go/grpcweb" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "google.golang.org/protobuf/types/known/durationpb" ) +type ResponseError struct { + error + + ResponseCode codes.Code +} + +func (r *ResponseError) Error() string { + return fmt.Sprintf("An unexpected error occurred: %v (error code = %d).", r.error.Error(), r.ResponseCode) +} + +func (r *ResponseError) Unwrap() error { + return r.error +} + +func newResponseError(err error, code codes.Code) *ResponseError { + return &ResponseError{error: err, ResponseCode: code} +} + type Options struct { BindAddr string Timeout time.Duration + Store media.Store YoutubeClient youtube.YoutubeClient S3Client media.S3Client } const ( - fetchAudioTimeout = time.Minute * 5 + getAudioTimeout = time.Minute * 5 ) -// fetchMediaSetServiceController implements gRPC controller for FetchMediaSetService -type fetchMediaSetServiceController struct { - pbMediaSet.UnimplementedFetchServiceServer +// mediaSetServiceController implements gRPC controller for MediaSetService +type mediaSetServiceController struct { + pbMediaSet.UnimplementedMediaSetServiceServer - fetchMediaSetService *media.FetchMediaSetService + mediaSetService *media.MediaSetService } -// Fetch fetches a pbMediaSet.MediaSet -func (c *fetchMediaSetServiceController) Fetch(ctx context.Context, request *pbMediaSet.FetchRequest) (*pbMediaSet.MediaSet, error) { - mediaSet, err := c.fetchMediaSetService.Fetch(ctx, request.GetId()) +// Get returns a pbMediaSet.MediaSet +func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet.GetRequest) (*pbMediaSet.MediaSet, error) { + mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId()) if err != nil { - return nil, err + return nil, newResponseError(err, codes.Unknown) } result := pbMediaSet.MediaSet{ - Id: mediaSet.ID, + Id: mediaSet.YoutubeID, Audio: &pbMediaSet.MediaSet_Audio{ Bytes: mediaSet.Audio.Bytes, Channels: int32(mediaSet.Audio.Channels), @@ -61,14 +83,19 @@ func (c *fetchMediaSetServiceController) Fetch(ctx context.Context, request *pbM return &result, nil } -// TODO: wrap errors -func (c *fetchMediaSetServiceController) FetchAudio(request *pbMediaSet.FetchAudioRequest, stream pbMediaSet.FetchService_FetchAudioServer) error { - ctx, cancel := context.WithTimeout(context.Background(), fetchAudioTimeout) +// GetAudio streams the progress report of GetAudio. +func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest, stream pbMediaSet.MediaSetService_GetAudioServer) error { + ctx, cancel := context.WithTimeout(context.Background(), getAudioTimeout) defer cancel() - reader, err := c.fetchMediaSetService.FetchAudio(ctx, request.GetId(), int(request.GetNumBins())) + id, err := uuid.Parse(request.GetId()) if err != nil { - return err + return newResponseError(err, codes.Unknown) + } + + reader, err := c.mediaSetService.GetAudio(ctx, id, int(request.GetNumBins())) + if err != nil { + return newResponseError(err, codes.Unknown) } for { @@ -77,7 +104,7 @@ func (c *fetchMediaSetServiceController) FetchAudio(request *pbMediaSet.FetchAud if err == io.EOF { break } - return err + return newResponseError(err, codes.Unknown) } // TODO: consider using int32 throughout the backend flow to avoid this. @@ -86,7 +113,7 @@ func (c *fetchMediaSetServiceController) FetchAudio(request *pbMediaSet.FetchAud peaks[i] = int32(p) } - progressPb := pbMediaSet.FetchAudioProgress{ + progressPb := pbMediaSet.GetAudioProgress{ PercentCompleted: progress.PercentComplete, Peaks: peaks, } @@ -100,9 +127,9 @@ func (c *fetchMediaSetServiceController) FetchAudio(request *pbMediaSet.FetchAud func Start(options Options) error { grpcServer := grpc.NewServer() - fetchMediaSetService := media.NewFetchMediaSetService(options.YoutubeClient, options.S3Client) + fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3Client) - pbMediaSet.RegisterFetchServiceServer(grpcServer, &fetchMediaSetServiceController{fetchMediaSetService: fetchMediaSetService}) + pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{mediaSetService: fetchMediaSetService}) grpclog.SetLogger(log.New(os.Stdout, "server: ", log.LstdFlags)) // TODO: proper CORS support diff --git a/backend/sql/migrations/1635710597_create_media_sets_table.down.sql b/backend/sql/migrations/1635710597_create_media_sets_table.down.sql new file mode 100644 index 0000000..cadc8d6 --- /dev/null +++ b/backend/sql/migrations/1635710597_create_media_sets_table.down.sql @@ -0,0 +1,3 @@ +DROP TABLE media_sets; + +DROP EXTENSION pgcrypto; diff --git a/backend/sql/migrations/1635710597_create_media_sets_table.up.sql b/backend/sql/migrations/1635710597_create_media_sets_table.up.sql new file mode 100644 index 0000000..7a03ade --- /dev/null +++ b/backend/sql/migrations/1635710597_create_media_sets_table.up.sql @@ -0,0 +1,39 @@ +CREATE EXTENSION pgcrypto; + +CREATE TABLE media_sets ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + youtube_id CHARACTER VARYING(32) NOT NULL, + audio_youtube_itag int NOT NULL, + audio_channels int NOT NULL, + audio_frames_approx bigint NOT NULL, + audio_frames bigint, + audio_sample_rate int NOT NULL, + audio_s3_bucket CHARACTER VARYING(256), + audio_s3_key CHARACTER VARYING(256), + audio_s3_uploaded_at TIMESTAMP WITH TIME ZONE, + audio_mime_type_encoded CHARACTER VARYING(256) NOT NULL, + video_youtube_itag int NOT NULL, + video_s3_bucket CHARACTER VARYING(256), + video_s3_key CHARACTER VARYING(256), + video_s3_uploaded_at TIMESTAMP WITH TIME ZONE, + video_mime_type CHARACTER VARYING(256) NOT NULL, + video_duration_nanos bigint NOT NULL, + video_thumbnail_s3_bucket CHARACTER VARYING(256), + video_thumbnail_s3_key CHARACTER VARYING(256), + video_thumbnail_s3_uploaded_at TIMESTAMP WITH TIME ZONE, + video_thumbnail_mime_type CHARACTER VARYING(256), + video_thumbnail_width int DEFAULT 0, + video_thumbnail_height int DEFAULT 0, + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL +); + +CREATE UNIQUE INDEX index_media_sets_on_youtube_id ON media_sets (youtube_id); + +ALTER TABLE media_sets ADD CONSTRAINT check_audio_youtube_itag_gt_0 CHECK (audio_youtube_itag > 0); +ALTER TABLE media_sets ADD CONSTRAINT check_audio_frames_gt_0 CHECK (audio_frames > 0); +ALTER TABLE media_sets ADD CONSTRAINT check_audio_frames_approx_gt_0 CHECK (audio_frames_approx > 0); +ALTER TABLE media_sets ADD CONSTRAINT check_audio_channels_gt_0 CHECK (audio_channels > 0); +ALTER TABLE media_sets ADD CONSTRAINT check_audio_sample_rate_gt_0 CHECK (audio_sample_rate > 0); +ALTER TABLE media_sets ADD CONSTRAINT check_video_youtube_itag_gt_0 CHECK (video_youtube_itag > 0); +ALTER TABLE media_sets ADD CONSTRAINT check_video_duration_nanos_gt_0 CHECK (video_duration_nanos > 0); diff --git a/backend/sql/queries.sql b/backend/sql/queries.sql new file mode 100644 index 0000000..dc519bb --- /dev/null +++ b/backend/sql/queries.sql @@ -0,0 +1,10 @@ +-- name: GetMediaSet :one +SELECT * FROM media_sets WHERE id = $1; + +-- name: GetMediaSetByYoutubeID :one +SELECT * FROM media_sets WHERE youtube_id = $1; + +-- name: CreateMediaSet :one +INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_frames_approx, audio_sample_rate_raw, audio_mime_type_encoded, video_youtube_itag, video_mime_type, video_duration_nanos, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW(), NOW()) + RETURNING *; diff --git a/backend/sqlc.yaml b/backend/sqlc.yaml new file mode 100644 index 0000000..c869f59 --- /dev/null +++ b/backend/sqlc.yaml @@ -0,0 +1,6 @@ +version: 1 +packages: + - path: "generated/store" + engine: "postgresql" + schema: "sql/migrations" + queries: "sql/queries.sql" diff --git a/backend/youtube/youtube2.go b/backend/youtube/youtube2.go index 46be1a2..d24ca96 100644 --- a/backend/youtube/youtube2.go +++ b/backend/youtube/youtube2.go @@ -53,7 +53,7 @@ func (s *MediaSetService) GetMediaSet(ctx context.Context, id string) (*media.Me } return &media.MediaSet{ - ID: "", + YoutubeID: "", Audio: media.Audio{ Bytes: audioFormat.ContentLength, Channels: audioFormat.AudioChannels, diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 0c871c1..73deabb 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -1,12 +1,12 @@ import { grpc } from '@improbable-eng/grpc-web'; import { MediaSet as MediaSetPb, - FetchRequest, - FetchAudioRequest, - FetchAudioProgress, + GetRequest, + GetAudioRequest, + GetAudioProgress, } from './generated/media_set_pb'; -import { FetchMediaSet, FetchMediaSetAudio } from './GrpcWrapper'; +import { GetMediaSet } from './GrpcWrapper'; import { useState, useEffect } from 'react'; import { VideoPreview } from './VideoPreview'; @@ -65,20 +65,20 @@ function App(): JSX.Element { // fetch mediaset on page load: useEffect(() => { (async function () { - const request = new FetchRequest(); - request.setId(videoID); + const request = new GetRequest(); + request.setYoutubeId(videoID); - const mediaSet = await FetchMediaSet(grpcHost, request); + const mediaSet = await GetMediaSet(grpcHost, request); console.log('got media set:', mediaSet); - const handleProgress = (progress: FetchAudioProgress) => { - console.log('got progress', progress); - }; + // const handleProgress = (progress: GetAudioProgress) => { + // console.log('got progress', progress); + // }; - const audioRequest = new FetchAudioRequest(); - audioRequest.setId(videoID); - audioRequest.setNumBins(1000); - FetchMediaSetAudio(grpcHost, audioRequest, handleProgress); + // const audioRequest = new GetAudioRequest(); + // audioRequest.setId(videoID); + // audioRequest.setNumBins(1000); + // GetMediaSetAudio(grpcHost, audioRequest, handleProgress); // console.log('fetching media...'); // const resp = await fetch( diff --git a/frontend/src/GrpcWrapper.tsx b/frontend/src/GrpcWrapper.tsx index 1ad8876..ab02d9f 100644 --- a/frontend/src/GrpcWrapper.tsx +++ b/frontend/src/GrpcWrapper.tsx @@ -1,20 +1,20 @@ import { grpc } from '@improbable-eng/grpc-web'; -import { FetchService } from './generated/media_set_pb_service'; +import { MediaSetService } from './generated/media_set_pb_service'; import { MediaSet, - FetchRequest, - FetchAudioProgress, - FetchAudioRequest, + GetRequest, + GetAudioProgress, + GetAudioRequest, } from './generated/media_set_pb'; -export const FetchMediaSet = ( +export const GetMediaSet = ( host: string, - request: FetchRequest + request: GetRequest ): Promise => { return new Promise((resolve, reject) => { let result: MediaSet; - grpc.invoke(FetchService.Fetch, { + grpc.invoke(MediaSetService.Get, { host: host, request: request, onMessage: (mediaSet: MediaSet) => { @@ -35,21 +35,21 @@ export const FetchMediaSet = ( }); }; -export const FetchMediaSetAudio = ( - host: string, - request: FetchAudioRequest, - onProgress: { (progress: FetchAudioProgress): void } -) => { - grpc.invoke(FetchService.FetchAudio, { - host: 'http://localhost:8888', - request: request, - onMessage: onProgress, - onEnd: ( - code: grpc.Code, - msg: string | undefined, - trailers: grpc.Metadata - ) => { - console.log('fetch audio request ended'); - }, - }); -}; +// export const etchMediaSetAudio = ( +// host: string, +// request: FetchAudioRequest, +// onProgress: { (progress: FetchAudioProgress): void } +// ) => { +// grpc.invoke(FetchService.FetchAudio, { +// host: 'http://localhost:8888', +// request: request, +// onMessage: onProgress, +// onEnd: ( +// code: grpc.Code, +// msg: string | undefined, +// trailers: grpc.Metadata +// ) => { +// console.log('fetch audio request ended'); +// }, +// }); +// }; diff --git a/proto/media_set.proto b/proto/media_set.proto index 7ef745c..3206865 100644 --- a/proto/media_set.proto +++ b/proto/media_set.proto @@ -27,21 +27,21 @@ message MediaSet { bool loaded = 4; }; -message FetchAudioProgress { +message GetAudioProgress { float percent_completed = 2; repeated int32 peaks = 1; } -message FetchRequest { - string id = 1; +message GetRequest { + string youtube_id = 1; } -message FetchAudioRequest { +message GetAudioRequest { string id = 1; int32 num_bins = 2; } -service FetchService { - rpc Fetch(FetchRequest) returns (MediaSet) {} - rpc FetchAudio(FetchAudioRequest) returns (stream FetchAudioProgress) {} +service MediaSetService { + rpc Get(GetRequest) returns (MediaSet) {} + rpc GetAudio(GetAudioRequest) returns (stream GetAudioProgress) {} }