Introduce PG store

This commit is contained in:
Rob Watson 2021-11-01 06:28:40 +01:00
parent 281d5ce8a2
commit 7c5b22a407
21 changed files with 628 additions and 468 deletions

2
.gitignore vendored
View File

@ -1,4 +1,4 @@
*.m4a /backend/.env
/backend/cache/ /backend/cache/
/backend/debug/ /backend/debug/

View File

@ -1,2 +0,0 @@
AWS_ACCESS_KEY_ID=AKIARZPRT6YGKUMKQPV5
AWS_SECRET_ACCESS_KEY=P8zJInhiHoXT4NV0gFMNHy8XVN285CqfOSCeaCHX

4
backend/.env.example Normal file
View File

@ -0,0 +1,4 @@
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
DATABASE_URL=

View File

@ -2,13 +2,17 @@ package main
import ( import (
"context" "context"
"database/sql"
"log" "log"
"os"
"time" "time"
"git.netflux.io/rob/clipper/generated/store"
"git.netflux.io/rob/clipper/server" "git.netflux.io/rob/clipper/server"
"github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/kkdai/youtube/v2" "github.com/kkdai/youtube/v2"
_ "github.com/lib/pq"
) )
const ( const (
@ -19,12 +23,20 @@ const (
func main() { func main() {
ctx := context.Background() ctx := context.Background()
// Create a store
databaseURL := os.Getenv("DATABASE_URL")
log.Printf("DATABASE_URL = %s", databaseURL)
db, err := sql.Open("postgres", databaseURL)
if err != nil {
log.Fatal(err)
}
store := store.New(db)
// Create an Amazon S3 service s3Client
cfg, err := config.LoadDefaultConfig(ctx) cfg, err := config.LoadDefaultConfig(ctx)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
// Create an Amazon S3 service s3Client
s3Client := s3.NewFromConfig(cfg) s3Client := s3.NewFromConfig(cfg)
// Create a Youtube client // Create a Youtube client
@ -33,6 +45,7 @@ func main() {
serverOptions := server.Options{ serverOptions := server.Options{
BindAddr: DefaultHTTPBindAddr, BindAddr: DefaultHTTPBindAddr,
Timeout: DefaultTimeout, Timeout: DefaultTimeout,
Store: store,
YoutubeClient: &youtubeClient, YoutubeClient: &youtubeClient,
S3Client: s3Client, S3Client: s3Client,
} }

View File

@ -2,13 +2,18 @@ package main
import ( import (
"context" "context"
"database/sql"
"io" "io"
"log" "log"
"os"
"git.netflux.io/rob/clipper/generated/store"
"git.netflux.io/rob/clipper/media" "git.netflux.io/rob/clipper/media"
"github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid"
"github.com/kkdai/youtube/v2" "github.com/kkdai/youtube/v2"
_ "github.com/lib/pq"
) )
const ( const (
@ -18,22 +23,30 @@ const (
func main() { func main() {
ctx := context.Background() ctx := context.Background()
// Create a store
databaseURL := os.Getenv("DATABASE_URL")
db, err := sql.Open("postgres", databaseURL)
if err != nil {
log.Fatal(err)
}
store := store.New(db)
// Create an Amazon S3 service s3Client
cfg, err := config.LoadDefaultConfig(ctx) cfg, err := config.LoadDefaultConfig(ctx)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
// Create an Amazon S3 service s3Client
s3Client := s3.NewFromConfig(cfg) s3Client := s3.NewFromConfig(cfg)
// Create a Youtube client // Create a Youtube client
var youtubeClient youtube.Client var youtubeClient youtube.Client
// Create a VideoFetchService // Create a MediaSetService
fetchService := media.NewFetchMediaSetService(&youtubeClient, s3Client) mediaSetService := media.NewMediaSetService(store, &youtubeClient, s3Client)
// Create a progressReader // Create a progressReader
progressReader, err := fetchService.FetchAudio(ctx, videoID) // TODO: fix
progressReader, err := mediaSetService.GetAudio(ctx, uuid.New(), 100)
if err != nil { if err != nil {
log.Fatalf("error calling fetch service: %v", err) log.Fatalf("error calling fetch service: %v", err)
} }

View File

@ -3,6 +3,7 @@ module git.netflux.io/rob/clipper
go 1.17 go 1.17
require ( require (
github.com/google/uuid v1.1.2
github.com/improbable-eng/grpc-web v0.14.1 github.com/improbable-eng/grpc-web v0.14.1
github.com/kkdai/youtube/v2 v2.7.4 github.com/kkdai/youtube/v2 v2.7.4
github.com/labstack/echo/v4 v4.6.0 github.com/labstack/echo/v4 v4.6.0
@ -30,6 +31,7 @@ require (
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/klauspost/compress v1.11.7 // indirect github.com/klauspost/compress v1.11.7 // indirect
github.com/labstack/gommon v0.3.0 // indirect github.com/labstack/gommon v0.3.0 // indirect
github.com/lib/pq v1.10.3 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect github.com/mattn/go-isatty v0.0.14 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect

View File

@ -252,6 +252,7 @@ github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
@ -341,6 +342,8 @@ github.com/labstack/gommon v0.3.0 h1:JEeO0bvc78PKdyHxloTKiF8BD5iGrH8T6MSeGvSgob0
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg=
github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=

View File

@ -7,13 +7,13 @@ import (
"io" "io"
) )
type FetchAudioProgress struct { type GetAudioProgress struct {
PercentComplete float32 PercentComplete float32
Peaks []int16 Peaks []int16
} }
type FetchAudioProgressReader interface { type GetAudioProgressReader interface {
Read() (FetchAudioProgress, error) Read() (GetAudioProgress, error)
Close() error Close() error
} }
@ -29,19 +29,19 @@ type fetchAudioProgressReader struct {
currPeaks []int16 currPeaks []int16
currCount int currCount int
framesProcessed int framesProcessed int
progress chan FetchAudioProgress progress chan GetAudioProgress
errorChan chan error errorChan chan error
} }
// TODO: validate inputs, debugging is confusing otherwise // TODO: validate inputs, debugging is confusing otherwise
func newFetchAudioProgressReader(framesExpected int64, channels, numBins int) *fetchAudioProgressReader { func newGetAudioProgressReader(framesExpected int64, channels, numBins int) *fetchAudioProgressReader {
return &fetchAudioProgressReader{ return &fetchAudioProgressReader{
channels: channels, channels: channels,
framesExpected: framesExpected, framesExpected: framesExpected,
framesPerBin: int(framesExpected / int64(numBins)), framesPerBin: int(framesExpected / int64(numBins)),
samples: make([]int16, 8_192), samples: make([]int16, 8_192),
currPeaks: make([]int16, channels), currPeaks: make([]int16, channels),
progress: make(chan FetchAudioProgress), progress: make(chan GetAudioProgress),
errorChan: make(chan error, 1), errorChan: make(chan error, 1),
} }
} }
@ -90,7 +90,7 @@ func (w *fetchAudioProgressReader) Write(p []byte) (int, error) {
} }
func (w *fetchAudioProgressReader) nextBin() { func (w *fetchAudioProgressReader) nextBin() {
var progress FetchAudioProgress var progress GetAudioProgress
// TODO: avoid an allocation? // TODO: avoid an allocation?
progress.Peaks = append(progress.Peaks, w.currPeaks...) progress.Peaks = append(progress.Peaks, w.currPeaks...)
progress.PercentComplete = (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0 progress.PercentComplete = (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0
@ -105,16 +105,16 @@ func (w *fetchAudioProgressReader) nextBin() {
w.framesProcessed++ w.framesProcessed++
} }
func (w *fetchAudioProgressReader) Read() (FetchAudioProgress, error) { func (w *fetchAudioProgressReader) Read() (GetAudioProgress, error) {
for { for {
select { select {
case progress, ok := <-w.progress: case progress, ok := <-w.progress:
if !ok { if !ok {
return FetchAudioProgress{}, io.EOF return GetAudioProgress{}, io.EOF
} }
return progress, nil return progress, nil
case err := <-w.errorChan: case err := <-w.errorChan:
return FetchAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err) return GetAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err)
} }
} }
} }

View File

@ -1,268 +0,0 @@
package media
import (
"context"
"errors"
"fmt"
"io"
"log"
"strconv"
"time"
"github.com/aws/aws-sdk-go-v2/service/s3"
youtubev2 "github.com/kkdai/youtube/v2"
)
const s3Bucket = "clipper-development"
const (
rawAudioCodec = "pcm_s16le"
rawAudioFormat = "s16le"
rawAudioSampleRate = 48_000
)
const (
thumbnailWidth = 177 // 16:9
thumbnailHeight = 100 // "
)
// progressReader is a reader that prints progress logs as it reads.
type progressReader struct {
io.Reader
label string
total, exp int
}
func (pw *progressReader) Read(p []byte) (int, error) {
n, err := pw.Reader.Read(p)
pw.total += n
log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0)
return n, err
}
// S3Client wraps the AWS S3 service client.
type S3Client interface {
CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)
}
// YoutubeClient wraps the youtube.Client client.
type YoutubeClient interface {
GetVideoContext(context.Context, string) (*youtubev2.Video, error)
GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error)
}
// FetchMediaSetService fetches a video via an io.Reader.
type FetchMediaSetService struct {
youtube YoutubeClient
s3 S3Client
}
func NewFetchMediaSetService(youtubeClient YoutubeClient, s3Client S3Client) *FetchMediaSetService {
return &FetchMediaSetService{
youtube: youtubeClient,
s3: s3Client,
}
}
// Fetch fetches the metadata for a given MediaSet source.
func (s *FetchMediaSetService) Fetch(ctx context.Context, id string) (*MediaSet, error) {
video, err := s.youtube.GetVideoContext(ctx, id)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
if len(video.Formats) == 0 {
return nil, errors.New("no format available")
}
audioMetadata, err := s.fetchAudioMetadata(ctx, video)
if err != nil {
return nil, fmt.Errorf("error fetching audio metadata: %v", err)
}
videoMetadata, err := s.fetchVideoMetadata(ctx, video)
if err != nil {
return nil, fmt.Errorf("error fetching video metadata: %v", err)
}
mediaSet := MediaSet{
ID: id,
Audio: audioMetadata,
Video: videoMetadata,
}
// TODO: save to JSON
return &mediaSet, nil
}
func (s *FetchMediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) {
formats := FilterYoutubeVideo(video.Formats)
if len(video.Formats) == 0 {
return Video{}, errors.New("no format available")
}
format := formats[0]
durationMsecs, err := strconv.Atoi(format.ApproxDurationMs)
if err != nil {
return Video{}, fmt.Errorf("could not parse video duration: %s", err)
}
return Video{
Bytes: format.ContentLength,
ThumbnailWidth: thumbnailWidth,
ThumbnailHeight: thumbnailHeight,
Duration: time.Duration(durationMsecs) * time.Millisecond,
}, nil
}
func (s *FetchMediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) {
formats := FilterYoutubeAudio(video.Formats)
if len(video.Formats) == 0 {
return Audio{}, errors.New("no format available")
}
format := formats[0]
sampleRate, err := strconv.Atoi(format.AudioSampleRate)
if err != nil {
return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate)
}
approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs)
if err != nil {
return Audio{}, fmt.Errorf("could not parse audio duration: %s", err)
}
approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond
approxFrames := int64(approxDuration/time.Second) * int64(sampleRate)
return Audio{
// we need to decode it to be able to know bytes and frame counts exactly
ApproxFrames: approxFrames,
Channels: format.AudioChannels,
SampleRate: sampleRate,
}, nil
}
// FetchAudio fetches the audio part of a MediaSet.
func (s *FetchMediaSetService) FetchAudio(ctx context.Context, id string, numBins int) (FetchAudioProgressReader, error) {
mediaSet := NewMediaSet(id)
if !mediaSet.Exists() {
// TODO check if audio uploaded already, don't bother again
return nil, errors.New("no media set found")
}
if err := mediaSet.Load(); err != nil {
return nil, fmt.Errorf("error loading media set: %v", err)
}
video, err := s.youtube.GetVideoContext(ctx, id)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
formats := FilterYoutubeAudio(video.Formats)
if len(video.Formats) == 0 {
return nil, errors.New("no format available")
}
format := formats[0]
stream, _, err := s.youtube.GetStreamContext(ctx, video, &format)
if err != nil {
return nil, fmt.Errorf("error fetching stream: %v", err)
}
// wrap it in a progress reader
progressStream := &progressReader{Reader: stream, label: "audio", exp: int(format.ContentLength)}
ffmpegReader, err := newFfmpegReader(ctx, progressStream, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
if err != nil {
return nil, fmt.Errorf("error creating ffmpegreader: %v", err)
}
// set up uploader, this is writer 1
uploader, err := newMultipartUploadWriter(
ctx,
s.s3,
s3Bucket,
fmt.Sprintf("media_sets/%s/audio.webm", id),
"application/octet-stream",
)
if err != nil {
return nil, fmt.Errorf("error creating uploader: %v", err)
}
fetchAudioProgressReader := newFetchAudioProgressReader(
mediaSet.Audio.ApproxFrames,
format.AudioChannels,
100,
)
state := fetchAudioState{
fetchAudioProgressReader: fetchAudioProgressReader,
ffmpegReader: ffmpegReader,
uploader: uploader,
}
go state.run(ctx)
return &state, nil
}
type fetchAudioState struct {
*fetchAudioProgressReader
ffmpegReader io.ReadCloser
uploader *multipartUploadWriter
}
func (s *fetchAudioState) run(ctx context.Context) {
mw := io.MultiWriter(s, s.uploader)
done := make(chan error)
var err error
go func() {
_, copyErr := io.Copy(mw, s.ffmpegReader)
done <- copyErr
}()
outer:
for {
select {
case <-ctx.Done():
err = ctx.Err()
break outer
case err = <-done:
break outer
}
}
if readerErr := s.ffmpegReader.Close(); readerErr != nil {
if err == nil {
err = readerErr
}
}
if err == nil {
if uploaderErr := s.uploader.Complete(); uploaderErr != nil {
err = uploaderErr
}
}
if err != nil {
newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
if abortUploadErr := s.uploader.Abort(newCtx); abortUploadErr != nil {
log.Printf("error aborting uploader: %v", abortUploadErr)
}
s.Abort(err)
return
}
if iterErr := s.Close(); iterErr != nil {
log.Printf("error closing peak iterator: %v", iterErr)
}
}

View File

@ -18,43 +18,48 @@ type Audio struct {
Channels int `json:"channels"` Channels int `json:"channels"`
// ApproxFrames is used during initial processing when a precise frame count // ApproxFrames is used during initial processing when a precise frame count
// cannot be determined. Prefer Frames in all other cases. // cannot be determined. Prefer Frames in all other cases.
ApproxFrames int64 `json:"approx_frames"` ApproxFrames int64 `json:"approx_frames"`
Frames int64 `json:"frames"` Frames int64 `json:"frames"`
SampleRate int `json:"sample_rate"` SampleRate int `json:"sample_rate"`
YoutubeItag int `json:"youtube_itag"`
MimeType string `json:"mime_type"`
} }
type Video struct { type Video struct {
Bytes int64 `json:"bytes"` Bytes int64 `json:"bytes"`
Duration time.Duration `json:"duration"` Duration time.Duration `json:"duration"`
// not sure if this are needed any more? // not sure if this are needed any more?
ThumbnailWidth int `json:"thumbnail_width"` ThumbnailWidth int `json:"thumbnail_width"`
ThumbnailHeight int `json:"thumbnail_height"` ThumbnailHeight int `json:"thumbnail_height"`
YoutubeItag int `json:"youtube_itag"`
MimeType string `json:"mime_type"`
} }
// MediaSet represents the media and metadata associated with a single media // MediaSet represents the media and metadata associated with a single media
// resource (for example, a YouTube video). // resource (for example, a YouTube video).
type MediaSet struct { type MediaSet struct {
Audio Audio `json:"audio"` Audio Audio `json:"audio"`
Video Video `json:"video"` Video Video `json:"video"`
ID string `json:"id"` ID string `json:"id"`
YoutubeID string `json:"youtube_id"`
exists bool exists bool `json:"exists"`
} }
// New builds a new MediaSet with the given ID. // New builds a new MediaSet with the given ID.
func NewMediaSet(id string) *MediaSet { func NewMediaSet(youtubeID string) *MediaSet {
return &MediaSet{ID: id} return &MediaSet{YoutubeID: youtubeID}
} }
// TODO: pass io.Readers/Writers instead of strings. // TODO: pass io.Readers/Writers instead of strings.
func (m *MediaSet) RawAudioPath() string { return fmt.Sprintf("cache/%s.raw", m.ID) } func (m *MediaSet) RawAudioPath() string { return fmt.Sprintf("cache/%s.raw", m.YoutubeID) }
func (m *MediaSet) EncodedAudioPath() string { return fmt.Sprintf("cache/%s.m4a", m.ID) } func (m *MediaSet) EncodedAudioPath() string { return fmt.Sprintf("cache/%s.m4a", m.YoutubeID) }
func (m *MediaSet) VideoPath() string { return fmt.Sprintf("cache/%s.mp4", m.ID) } func (m *MediaSet) VideoPath() string { return fmt.Sprintf("cache/%s.mp4", m.YoutubeID) }
func (m *MediaSet) ThumbnailPath() string { return fmt.Sprintf("cache/%s.jpg", m.ID) } func (m *MediaSet) ThumbnailPath() string { return fmt.Sprintf("cache/%s.jpg", m.YoutubeID) }
func (m *MediaSet) MetadataPath() string { return fmt.Sprintf("cache/%s.json", m.ID) } func (m *MediaSet) MetadataPath() string { return fmt.Sprintf("cache/%s.json", m.YoutubeID) }
func (m *MediaSet) Exists() bool { func (m *MediaSet) Exists() bool {
if m.ID == "" { if m.YoutubeID == "" {
return false return false
} }
if m.exists { if m.exists {
@ -68,7 +73,7 @@ func (m *MediaSet) Exists() bool {
} }
func (m *MediaSet) Load() error { func (m *MediaSet) Load() error {
if m.ID == "" { if m.YoutubeID == "" {
return errors.New("error opening mediaset with blank ID") return errors.New("error opening mediaset with blank ID")
} }

View File

@ -2,28 +2,345 @@ package media
import ( import (
"context" "context"
"database/sql"
"errors" "errors"
"fmt" "fmt"
"io"
"log" "log"
"strconv"
"time"
"git.netflux.io/rob/clipper/generated/store"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid"
youtubev2 "github.com/kkdai/youtube/v2"
) )
type MediaSetService struct{} const s3Bucket = "clipper-development"
func (s *MediaSetService) GetMediaSet(ctx context.Context, source string, id string) (*MediaSet, error) { const (
log.Printf("GetMediaSet called with source %q, id %q", source, id) rawAudioCodec = "pcm_s16le"
rawAudioFormat = "s16le"
rawAudioSampleRate = 48_000
)
if source != "youtube" { const (
return nil, errors.New("unknown source") thumbnailWidth = 177 // 16:9
} thumbnailHeight = 100 // "
)
// try to load and return a cached MediaSet, if possible: // progressReader is a reader that prints progress logs as it reads.
mediaSet := NewMediaSet(id) type progressReader struct {
if mediaSet.Exists() { io.Reader
if err := mediaSet.Load(); err != nil { label string
return nil, fmt.Errorf("error loading MediaSet: %v", err) total, exp int
} }
return mediaSet, nil
} func (pw *progressReader) Read(p []byte) (int, error) {
n, err := pw.Reader.Read(p)
return &MediaSet{ID: id}, nil pw.total += n
log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0)
return n, err
}
// Store wraps a database store.
type Store interface {
GetMediaSet(ctx context.Context, id uuid.UUID) (store.MediaSet, error)
GetMediaSetByYoutubeID(ctx context.Context, youtubeID string) (store.MediaSet, error)
CreateMediaSet(ctx context.Context, arg store.CreateMediaSetParams) (store.MediaSet, error)
}
// S3Client wraps the AWS S3 service client.
type S3Client interface {
CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)
}
// YoutubeClient wraps the youtube.Client client.
type YoutubeClient interface {
GetVideoContext(context.Context, string) (*youtubev2.Video, error)
GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error)
}
// MediaSetService exposes logical flows handling MediaSets.
type MediaSetService struct {
store Store
youtube YoutubeClient
s3 S3Client
}
func NewMediaSetService(store Store, youtubeClient YoutubeClient, s3Client S3Client) *MediaSetService {
return &MediaSetService{
store: store,
youtube: youtubeClient,
s3: s3Client,
}
}
// Get fetches the metadata for a given MediaSet source.
func (s *MediaSetService) Get(ctx context.Context, youtubeID string) (*MediaSet, error) {
var (
mediaSet *MediaSet
err error
)
mediaSet, err = s.findMediaSet(ctx, youtubeID)
if err != nil {
return nil, fmt.Errorf("error getting existing media set: %v", err)
}
if mediaSet == nil {
mediaSet, err = s.createMediaSet(ctx, youtubeID)
if err != nil {
return nil, fmt.Errorf("error getting new media set: %v", err)
}
}
return mediaSet, nil
}
func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) {
video, err := s.youtube.GetVideoContext(ctx, youtubeID)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
if len(video.Formats) == 0 {
return nil, errors.New("no format available")
}
audioMetadata, err := s.fetchAudioMetadata(ctx, video)
if err != nil {
return nil, fmt.Errorf("error fetching audio metadata: %v", err)
}
videoMetadata, err := s.fetchVideoMetadata(ctx, video)
if err != nil {
return nil, fmt.Errorf("error fetching video metadata: %v", err)
}
params := store.CreateMediaSetParams{
YoutubeID: youtubeID,
AudioYoutubeItag: int32(audioMetadata.YoutubeItag),
AudioChannels: int32(audioMetadata.Channels),
AudioFramesApprox: audioMetadata.ApproxFrames,
AudioSampleRateRaw: int32(audioMetadata.SampleRate),
AudioMimeTypeEncoded: audioMetadata.MimeType,
VideoYoutubeItag: int32(videoMetadata.YoutubeItag),
VideoMimeType: videoMetadata.MimeType,
VideoDurationNanos: videoMetadata.Duration.Nanoseconds(),
}
mediaSet, err := s.store.CreateMediaSet(ctx, params)
if err != nil {
return nil, fmt.Errorf("error creating media set in store: %v", err)
}
return &MediaSet{
ID: mediaSet.ID.String(),
YoutubeID: youtubeID,
Audio: audioMetadata,
Video: videoMetadata,
}, nil
}
// findMediaSet fetches a record from the database, returning (nil, nil) if it does not exist.
func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) {
mediaSet, err := s.store.GetMediaSetByYoutubeID(ctx, youtubeID)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, fmt.Errorf("error getting existing media set: %v", err)
}
var frames int64
if mediaSet.AudioFramesRaw.Valid {
frames = mediaSet.AudioFramesRaw.Int64
}
return &MediaSet{
Audio: Audio{
YoutubeItag: int(mediaSet.AudioYoutubeItag),
Bytes: 0, // DEPRECATED
Channels: int(mediaSet.AudioChannels),
ApproxFrames: int64(mediaSet.AudioFramesApprox),
Frames: frames,
SampleRate: int(mediaSet.AudioSampleRateRaw),
},
Video: Video{
YoutubeItag: int(mediaSet.VideoYoutubeItag),
Bytes: 0, // DEPRECATED?
Duration: time.Duration(mediaSet.VideoDurationNanos),
ThumbnailWidth: 0, // ??
ThumbnailHeight: 0, // ??
},
YoutubeID: mediaSet.YoutubeID,
}, nil
}
func (s *MediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) {
formats := FilterYoutubeVideo(video.Formats)
if len(video.Formats) == 0 {
return Video{}, errors.New("no format available")
}
format := formats[0]
durationMsecs, err := strconv.Atoi(format.ApproxDurationMs)
if err != nil {
return Video{}, fmt.Errorf("could not parse video duration: %s", err)
}
return Video{
YoutubeItag: format.ItagNo,
MimeType: format.MimeType,
Bytes: format.ContentLength,
ThumbnailWidth: thumbnailWidth,
ThumbnailHeight: thumbnailHeight,
Duration: time.Duration(durationMsecs) * time.Millisecond,
}, nil
}
func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) {
formats := FilterYoutubeAudio(video.Formats)
if len(video.Formats) == 0 {
return Audio{}, errors.New("no format available")
}
format := formats[0]
sampleRate, err := strconv.Atoi(format.AudioSampleRate)
if err != nil {
return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate)
}
approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs)
if err != nil {
return Audio{}, fmt.Errorf("could not parse audio duration: %s", err)
}
approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond
approxFrames := int64(approxDuration/time.Second) * int64(sampleRate)
return Audio{
MimeType: format.MimeType,
YoutubeItag: format.ItagNo,
ApproxFrames: approxFrames,
Channels: format.AudioChannels,
SampleRate: sampleRate,
}, nil
}
// GetAudio fetches the audio part of a MediaSet.
func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins int) (GetAudioProgressReader, error) {
mediaSet, err := s.store.GetMediaSet(ctx, id)
if err != nil {
return nil, fmt.Errorf("error getting media set: %v", err)
}
video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag))
if format == nil {
return nil, fmt.Errorf("error finding itag: %v", err)
}
stream, _, err := s.youtube.GetStreamContext(ctx, video, format)
if err != nil {
return nil, fmt.Errorf("error fetching stream: %v", err)
}
// wrap it in a progress reader
progressStream := &progressReader{Reader: stream, label: "audio", exp: int(format.ContentLength)}
ffmpegReader, err := newFfmpegReader(ctx, progressStream, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
if err != nil {
return nil, fmt.Errorf("error creating ffmpegreader: %v", err)
}
// set up uploader, this is writer 1
uploader, err := newMultipartUploadWriter(
ctx,
s.s3,
s3Bucket,
fmt.Sprintf("media_sets/%s/audio.webm", id),
"application/octet-stream",
)
if err != nil {
return nil, fmt.Errorf("error creating uploader: %v", err)
}
fetchAudioProgressReader := newGetAudioProgressReader(
int64(mediaSet.AudioFramesApprox),
format.AudioChannels,
100,
)
state := fetchAudioState{
fetchAudioProgressReader: fetchAudioProgressReader,
ffmpegReader: ffmpegReader,
uploader: uploader,
}
go state.run(ctx)
return &state, nil
}
type fetchAudioState struct {
*fetchAudioProgressReader
ffmpegReader io.ReadCloser
uploader *multipartUploadWriter
}
func (s *fetchAudioState) run(ctx context.Context) {
mw := io.MultiWriter(s, s.uploader)
done := make(chan error)
var err error
go func() {
_, copyErr := io.Copy(mw, s.ffmpegReader)
done <- copyErr
}()
outer:
for {
select {
case <-ctx.Done():
err = ctx.Err()
break outer
case err = <-done:
break outer
}
}
if readerErr := s.ffmpegReader.Close(); readerErr != nil {
if err == nil {
err = readerErr
}
}
if err == nil {
if uploaderErr := s.uploader.Complete(); uploaderErr != nil {
err = uploaderErr
}
}
if err != nil {
newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
if abortUploadErr := s.uploader.Abort(newCtx); abortUploadErr != nil {
log.Printf("error aborting uploader: %v", abortUploadErr)
}
s.Abort(err)
return
}
if iterErr := s.Close(); iterErr != nil {
log.Printf("error closing peak iterator: %v", iterErr)
}
} }

View File

@ -1,95 +1,83 @@
package server package server
import ( // // getMediaSet is a handler that responds with a MediaSet.
"encoding/json" // func getMediaSet(c echo.Context) error {
"log" // videoID := c.Param("id")
"net/http" // mediaSet := media.NewMediaSet(videoID)
"strconv"
"git.netflux.io/rob/clipper/media" // if mediaSet.Exists() {
"git.netflux.io/rob/clipper/youtube" // if err := mediaSet.Load(); err != nil {
youtubev2 "github.com/kkdai/youtube/v2" // log.Printf("error loading MediaSet: %v", err)
"github.com/labstack/echo/v4" // return echo.NewHTTPError(http.StatusInternalServerError, "could not fetch media set")
) // }
// return c.JSON(http.StatusOK, mediaSet)
// }
// getMediaSet is a handler that responds with a MediaSet. // var youtubeClient youtubev2.Client
func getMediaSet(c echo.Context) error { // downloader := youtube.NewDownloader(&youtubeClient)
videoID := c.Param("id") // mediaSet, err := downloader.Download(c.Request().Context(), videoID)
mediaSet := media.NewMediaSet(videoID) // if err != nil {
// log.Printf("error downloading MediaSet: %v", err)
// return echo.NewHTTPError(http.StatusInternalServerError, "could not fetch media set")
// }
// return c.JSON(http.StatusOK, mediaSet)
// }
if mediaSet.Exists() { // // getThumbnails is a handler that responds with a MediaSet thumbnail grid.
if err := mediaSet.Load(); err != nil { // func getThumbnails(c echo.Context) error {
log.Printf("error loading MediaSet: %v", err) // videoID := c.Param("id")
return echo.NewHTTPError(http.StatusInternalServerError, "could not fetch media set") // mediaSet := media.NewMediaSet(videoID)
} // if err := mediaSet.Load(); err != nil {
return c.JSON(http.StatusOK, mediaSet) // log.Printf("error loading MediaSet: %v", err)
} // return echo.NewHTTPError(http.StatusInternalServerError, "could not load media set")
// }
var youtubeClient youtubev2.Client // return c.File(mediaSet.ThumbnailPath())
downloader := youtube.NewDownloader(&youtubeClient) // }
mediaSet, err := downloader.Download(c.Request().Context(), videoID)
if err != nil {
log.Printf("error downloading MediaSet: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError, "could not fetch media set")
}
return c.JSON(http.StatusOK, mediaSet)
}
// getThumbnails is a handler that responds with a MediaSet thumbnail grid. // // getVideo is a handler that responds with the video file for a MediaSet
func getThumbnails(c echo.Context) error { // func getVideo(c echo.Context) error {
videoID := c.Param("id") // videoID := c.Param("id")
mediaSet := media.NewMediaSet(videoID) // mediaSet := media.NewMediaSet(videoID)
if err := mediaSet.Load(); err != nil { // if err := mediaSet.Load(); err != nil {
log.Printf("error loading MediaSet: %v", err) // log.Printf("error loading MediaSet: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError, "could not load media set") // return echo.NewHTTPError(http.StatusInternalServerError, "could not load media set")
} // }
return c.File(mediaSet.ThumbnailPath()) // return c.File(mediaSet.VideoPath())
} // }
// getVideo is a handler that responds with the video file for a MediaSet // // getPeaks is a handler that returns a two-dimensional array of peaks, with
func getVideo(c echo.Context) error { // // the number of bins matching the provided parameter.
videoID := c.Param("id") // func getPeaks(c echo.Context) error {
mediaSet := media.NewMediaSet(videoID) // videoID := c.Param("id")
if err := mediaSet.Load(); err != nil {
log.Printf("error loading MediaSet: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError, "could not load media set")
}
return c.File(mediaSet.VideoPath()) // start, err := strconv.ParseInt(c.QueryParam("start"), 0, 64)
} // if err != nil {
// return echo.NewHTTPError(http.StatusBadRequest, "invalid start parameter provided")
// }
// getPeaks is a handler that returns a two-dimensional array of peaks, with // end, err := strconv.ParseInt(c.QueryParam("end"), 0, 64)
// the number of bins matching the provided parameter. // if err != nil {
func getPeaks(c echo.Context) error { // return echo.NewHTTPError(http.StatusBadRequest, "invalid end parameter provided")
videoID := c.Param("id") // }
start, err := strconv.ParseInt(c.QueryParam("start"), 0, 64) // numBins, err := strconv.Atoi(c.QueryParam("bins"))
if err != nil { // if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "invalid start parameter provided") // return echo.NewHTTPError(http.StatusBadRequest, "invalid bins parameter provided")
} // }
end, err := strconv.ParseInt(c.QueryParam("end"), 0, 64) // mediaSet := media.NewMediaSet(videoID)
if err != nil { // if err = mediaSet.Load(); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "invalid end parameter provided") // log.Printf("error loading MediaSet: %v", err)
} // return echo.NewHTTPError(http.StatusInternalServerError, "could not load media set")
// }
numBins, err := strconv.Atoi(c.QueryParam("bins")) // peaks, err := mediaSet.Peaks(start, end, numBins)
if err != nil { // if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "invalid bins parameter provided") // log.Printf("error generating peaks: %v", err)
} // return echo.NewHTTPError(http.StatusInternalServerError, "could not generate peaks")
// }
mediaSet := media.NewMediaSet(videoID) // return json.NewEncoder(c.Response()).Encode(peaks)
if err = mediaSet.Load(); err != nil { // }
log.Printf("error loading MediaSet: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError, "could not load media set")
}
peaks, err := mediaSet.Peaks(start, end, numBins)
if err != nil {
log.Printf("error generating peaks: %v", err)
return echo.NewHTTPError(http.StatusInternalServerError, "could not generate peaks")
}
return json.NewEncoder(c.Response()).Encode(peaks)
}

View File

@ -2,6 +2,7 @@ package server
import ( import (
"context" "context"
"fmt"
"io" "io"
"log" "log"
"net/http" "net/http"
@ -11,39 +12,60 @@ import (
pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set" pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set"
"git.netflux.io/rob/clipper/media" "git.netflux.io/rob/clipper/media"
"git.netflux.io/rob/clipper/youtube" "git.netflux.io/rob/clipper/youtube"
"github.com/google/uuid"
"github.com/improbable-eng/grpc-web/go/grpcweb" "github.com/improbable-eng/grpc-web/go/grpcweb"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/durationpb"
) )
type ResponseError struct {
error
ResponseCode codes.Code
}
func (r *ResponseError) Error() string {
return fmt.Sprintf("An unexpected error occurred: %v (error code = %d).", r.error.Error(), r.ResponseCode)
}
func (r *ResponseError) Unwrap() error {
return r.error
}
func newResponseError(err error, code codes.Code) *ResponseError {
return &ResponseError{error: err, ResponseCode: code}
}
type Options struct { type Options struct {
BindAddr string BindAddr string
Timeout time.Duration Timeout time.Duration
Store media.Store
YoutubeClient youtube.YoutubeClient YoutubeClient youtube.YoutubeClient
S3Client media.S3Client S3Client media.S3Client
} }
const ( const (
fetchAudioTimeout = time.Minute * 5 getAudioTimeout = time.Minute * 5
) )
// fetchMediaSetServiceController implements gRPC controller for FetchMediaSetService // mediaSetServiceController implements gRPC controller for MediaSetService
type fetchMediaSetServiceController struct { type mediaSetServiceController struct {
pbMediaSet.UnimplementedFetchServiceServer pbMediaSet.UnimplementedMediaSetServiceServer
fetchMediaSetService *media.FetchMediaSetService mediaSetService *media.MediaSetService
} }
// Fetch fetches a pbMediaSet.MediaSet // Get returns a pbMediaSet.MediaSet
func (c *fetchMediaSetServiceController) Fetch(ctx context.Context, request *pbMediaSet.FetchRequest) (*pbMediaSet.MediaSet, error) { func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet.GetRequest) (*pbMediaSet.MediaSet, error) {
mediaSet, err := c.fetchMediaSetService.Fetch(ctx, request.GetId()) mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId())
if err != nil { if err != nil {
return nil, err return nil, newResponseError(err, codes.Unknown)
} }
result := pbMediaSet.MediaSet{ result := pbMediaSet.MediaSet{
Id: mediaSet.ID, Id: mediaSet.YoutubeID,
Audio: &pbMediaSet.MediaSet_Audio{ Audio: &pbMediaSet.MediaSet_Audio{
Bytes: mediaSet.Audio.Bytes, Bytes: mediaSet.Audio.Bytes,
Channels: int32(mediaSet.Audio.Channels), Channels: int32(mediaSet.Audio.Channels),
@ -61,14 +83,19 @@ func (c *fetchMediaSetServiceController) Fetch(ctx context.Context, request *pbM
return &result, nil return &result, nil
} }
// TODO: wrap errors // GetAudio streams the progress report of GetAudio.
func (c *fetchMediaSetServiceController) FetchAudio(request *pbMediaSet.FetchAudioRequest, stream pbMediaSet.FetchService_FetchAudioServer) error { func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest, stream pbMediaSet.MediaSetService_GetAudioServer) error {
ctx, cancel := context.WithTimeout(context.Background(), fetchAudioTimeout) ctx, cancel := context.WithTimeout(context.Background(), getAudioTimeout)
defer cancel() defer cancel()
reader, err := c.fetchMediaSetService.FetchAudio(ctx, request.GetId(), int(request.GetNumBins())) id, err := uuid.Parse(request.GetId())
if err != nil { if err != nil {
return err return newResponseError(err, codes.Unknown)
}
reader, err := c.mediaSetService.GetAudio(ctx, id, int(request.GetNumBins()))
if err != nil {
return newResponseError(err, codes.Unknown)
} }
for { for {
@ -77,7 +104,7 @@ func (c *fetchMediaSetServiceController) FetchAudio(request *pbMediaSet.FetchAud
if err == io.EOF { if err == io.EOF {
break break
} }
return err return newResponseError(err, codes.Unknown)
} }
// TODO: consider using int32 throughout the backend flow to avoid this. // TODO: consider using int32 throughout the backend flow to avoid this.
@ -86,7 +113,7 @@ func (c *fetchMediaSetServiceController) FetchAudio(request *pbMediaSet.FetchAud
peaks[i] = int32(p) peaks[i] = int32(p)
} }
progressPb := pbMediaSet.FetchAudioProgress{ progressPb := pbMediaSet.GetAudioProgress{
PercentCompleted: progress.PercentComplete, PercentCompleted: progress.PercentComplete,
Peaks: peaks, Peaks: peaks,
} }
@ -100,9 +127,9 @@ func (c *fetchMediaSetServiceController) FetchAudio(request *pbMediaSet.FetchAud
func Start(options Options) error { func Start(options Options) error {
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
fetchMediaSetService := media.NewFetchMediaSetService(options.YoutubeClient, options.S3Client) fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3Client)
pbMediaSet.RegisterFetchServiceServer(grpcServer, &fetchMediaSetServiceController{fetchMediaSetService: fetchMediaSetService}) pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{mediaSetService: fetchMediaSetService})
grpclog.SetLogger(log.New(os.Stdout, "server: ", log.LstdFlags)) grpclog.SetLogger(log.New(os.Stdout, "server: ", log.LstdFlags))
// TODO: proper CORS support // TODO: proper CORS support

View File

@ -0,0 +1,3 @@
DROP TABLE media_sets;
DROP EXTENSION pgcrypto;

View File

@ -0,0 +1,39 @@
CREATE EXTENSION pgcrypto;
CREATE TABLE media_sets (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
youtube_id CHARACTER VARYING(32) NOT NULL,
audio_youtube_itag int NOT NULL,
audio_channels int NOT NULL,
audio_frames_approx bigint NOT NULL,
audio_frames bigint,
audio_sample_rate int NOT NULL,
audio_s3_bucket CHARACTER VARYING(256),
audio_s3_key CHARACTER VARYING(256),
audio_s3_uploaded_at TIMESTAMP WITH TIME ZONE,
audio_mime_type_encoded CHARACTER VARYING(256) NOT NULL,
video_youtube_itag int NOT NULL,
video_s3_bucket CHARACTER VARYING(256),
video_s3_key CHARACTER VARYING(256),
video_s3_uploaded_at TIMESTAMP WITH TIME ZONE,
video_mime_type CHARACTER VARYING(256) NOT NULL,
video_duration_nanos bigint NOT NULL,
video_thumbnail_s3_bucket CHARACTER VARYING(256),
video_thumbnail_s3_key CHARACTER VARYING(256),
video_thumbnail_s3_uploaded_at TIMESTAMP WITH TIME ZONE,
video_thumbnail_mime_type CHARACTER VARYING(256),
video_thumbnail_width int DEFAULT 0,
video_thumbnail_height int DEFAULT 0,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
);
CREATE UNIQUE INDEX index_media_sets_on_youtube_id ON media_sets (youtube_id);
ALTER TABLE media_sets ADD CONSTRAINT check_audio_youtube_itag_gt_0 CHECK (audio_youtube_itag > 0);
ALTER TABLE media_sets ADD CONSTRAINT check_audio_frames_gt_0 CHECK (audio_frames > 0);
ALTER TABLE media_sets ADD CONSTRAINT check_audio_frames_approx_gt_0 CHECK (audio_frames_approx > 0);
ALTER TABLE media_sets ADD CONSTRAINT check_audio_channels_gt_0 CHECK (audio_channels > 0);
ALTER TABLE media_sets ADD CONSTRAINT check_audio_sample_rate_gt_0 CHECK (audio_sample_rate > 0);
ALTER TABLE media_sets ADD CONSTRAINT check_video_youtube_itag_gt_0 CHECK (video_youtube_itag > 0);
ALTER TABLE media_sets ADD CONSTRAINT check_video_duration_nanos_gt_0 CHECK (video_duration_nanos > 0);

10
backend/sql/queries.sql Normal file
View File

@ -0,0 +1,10 @@
-- name: GetMediaSet :one
SELECT * FROM media_sets WHERE id = $1;
-- name: GetMediaSetByYoutubeID :one
SELECT * FROM media_sets WHERE youtube_id = $1;
-- name: CreateMediaSet :one
INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_frames_approx, audio_sample_rate_raw, audio_mime_type_encoded, video_youtube_itag, video_mime_type, video_duration_nanos, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW(), NOW())
RETURNING *;

6
backend/sqlc.yaml Normal file
View File

@ -0,0 +1,6 @@
version: 1
packages:
- path: "generated/store"
engine: "postgresql"
schema: "sql/migrations"
queries: "sql/queries.sql"

View File

@ -53,7 +53,7 @@ func (s *MediaSetService) GetMediaSet(ctx context.Context, id string) (*media.Me
} }
return &media.MediaSet{ return &media.MediaSet{
ID: "", YoutubeID: "",
Audio: media.Audio{ Audio: media.Audio{
Bytes: audioFormat.ContentLength, Bytes: audioFormat.ContentLength,
Channels: audioFormat.AudioChannels, Channels: audioFormat.AudioChannels,

View File

@ -1,12 +1,12 @@
import { grpc } from '@improbable-eng/grpc-web'; import { grpc } from '@improbable-eng/grpc-web';
import { import {
MediaSet as MediaSetPb, MediaSet as MediaSetPb,
FetchRequest, GetRequest,
FetchAudioRequest, GetAudioRequest,
FetchAudioProgress, GetAudioProgress,
} from './generated/media_set_pb'; } from './generated/media_set_pb';
import { FetchMediaSet, FetchMediaSetAudio } from './GrpcWrapper'; import { GetMediaSet } from './GrpcWrapper';
import { useState, useEffect } from 'react'; import { useState, useEffect } from 'react';
import { VideoPreview } from './VideoPreview'; import { VideoPreview } from './VideoPreview';
@ -65,20 +65,20 @@ function App(): JSX.Element {
// fetch mediaset on page load: // fetch mediaset on page load:
useEffect(() => { useEffect(() => {
(async function () { (async function () {
const request = new FetchRequest(); const request = new GetRequest();
request.setId(videoID); request.setYoutubeId(videoID);
const mediaSet = await FetchMediaSet(grpcHost, request); const mediaSet = await GetMediaSet(grpcHost, request);
console.log('got media set:', mediaSet); console.log('got media set:', mediaSet);
const handleProgress = (progress: FetchAudioProgress) => { // const handleProgress = (progress: GetAudioProgress) => {
console.log('got progress', progress); // console.log('got progress', progress);
}; // };
const audioRequest = new FetchAudioRequest(); // const audioRequest = new GetAudioRequest();
audioRequest.setId(videoID); // audioRequest.setId(videoID);
audioRequest.setNumBins(1000); // audioRequest.setNumBins(1000);
FetchMediaSetAudio(grpcHost, audioRequest, handleProgress); // GetMediaSetAudio(grpcHost, audioRequest, handleProgress);
// console.log('fetching media...'); // console.log('fetching media...');
// const resp = await fetch( // const resp = await fetch(

View File

@ -1,20 +1,20 @@
import { grpc } from '@improbable-eng/grpc-web'; import { grpc } from '@improbable-eng/grpc-web';
import { FetchService } from './generated/media_set_pb_service'; import { MediaSetService } from './generated/media_set_pb_service';
import { import {
MediaSet, MediaSet,
FetchRequest, GetRequest,
FetchAudioProgress, GetAudioProgress,
FetchAudioRequest, GetAudioRequest,
} from './generated/media_set_pb'; } from './generated/media_set_pb';
export const FetchMediaSet = ( export const GetMediaSet = (
host: string, host: string,
request: FetchRequest request: GetRequest
): Promise<MediaSet> => { ): Promise<MediaSet> => {
return new Promise<MediaSet>((resolve, reject) => { return new Promise<MediaSet>((resolve, reject) => {
let result: MediaSet; let result: MediaSet;
grpc.invoke(FetchService.Fetch, { grpc.invoke(MediaSetService.Get, {
host: host, host: host,
request: request, request: request,
onMessage: (mediaSet: MediaSet) => { onMessage: (mediaSet: MediaSet) => {
@ -35,21 +35,21 @@ export const FetchMediaSet = (
}); });
}; };
export const FetchMediaSetAudio = ( // export const etchMediaSetAudio = (
host: string, // host: string,
request: FetchAudioRequest, // request: FetchAudioRequest,
onProgress: { (progress: FetchAudioProgress): void } // onProgress: { (progress: FetchAudioProgress): void }
) => { // ) => {
grpc.invoke(FetchService.FetchAudio, { // grpc.invoke(FetchService.FetchAudio, {
host: 'http://localhost:8888', // host: 'http://localhost:8888',
request: request, // request: request,
onMessage: onProgress, // onMessage: onProgress,
onEnd: ( // onEnd: (
code: grpc.Code, // code: grpc.Code,
msg: string | undefined, // msg: string | undefined,
trailers: grpc.Metadata // trailers: grpc.Metadata
) => { // ) => {
console.log('fetch audio request ended'); // console.log('fetch audio request ended');
}, // },
}); // });
}; // };

View File

@ -27,21 +27,21 @@ message MediaSet {
bool loaded = 4; bool loaded = 4;
}; };
message FetchAudioProgress { message GetAudioProgress {
float percent_completed = 2; float percent_completed = 2;
repeated int32 peaks = 1; repeated int32 peaks = 1;
} }
message FetchRequest { message GetRequest {
string id = 1; string youtube_id = 1;
} }
message FetchAudioRequest { message GetAudioRequest {
string id = 1; string id = 1;
int32 num_bins = 2; int32 num_bins = 2;
} }
service FetchService { service MediaSetService {
rpc Fetch(FetchRequest) returns (MediaSet) {} rpc Get(GetRequest) returns (MediaSet) {}
rpc FetchAudio(FetchAudioRequest) returns (stream FetchAudioProgress) {} rpc GetAudio(GetAudioRequest) returns (stream GetAudioProgress) {}
} }