From d04debbe9afc1cf1571eb7adfb71a3da2791076e Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Wed, 27 Oct 2021 21:34:59 +0200 Subject: [PATCH] wip: refactor flow --- backend/.env | 2 + backend/cmd/progress-test/main.go | 52 +++++ backend/go.mod | 12 ++ backend/go.sum | 27 +++ backend/media/fetch.go | 318 ++++++++++++++++++++++++++++++ backend/media/media_set.go | 20 +- backend/media/peak_progress.go | 110 +++++++++++ backend/media/service.go | 22 ++- backend/media/uploader.go | 144 ++++++++++++++ backend/youtube/youtube.go | 6 - backend/youtube/youtube2.go | 70 +++++++ proto/media_set.proto | 22 ++- 12 files changed, 781 insertions(+), 24 deletions(-) create mode 100644 backend/.env create mode 100644 backend/cmd/progress-test/main.go create mode 100644 backend/media/fetch.go create mode 100644 backend/media/peak_progress.go create mode 100644 backend/media/uploader.go create mode 100644 backend/youtube/youtube2.go diff --git a/backend/.env b/backend/.env new file mode 100644 index 0000000..d1f3ba9 --- /dev/null +++ b/backend/.env @@ -0,0 +1,2 @@ +AWS_ACCESS_KEY_ID=AKIARZPRT6YGKUMKQPV5 +AWS_SECRET_ACCESS_KEY=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 diff --git a/backend/cmd/progress-test/main.go b/backend/cmd/progress-test/main.go new file mode 100644 index 0000000..70109e1 --- /dev/null +++ b/backend/cmd/progress-test/main.go @@ -0,0 +1,52 @@ +package main + +import ( + "context" + "io" + "log" + + "git.netflux.io/rob/clipper/media" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/kkdai/youtube/v2" +) + +const ( + videoID = "N1BufwrE1I8" +) + +func main() { + ctx := context.Background() + + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + log.Fatal(err) + } + + // Create an Amazon S3 service s3Client + s3Client := s3.NewFromConfig(cfg) + + // Create a Youtube client + var youtubeClient youtube.Client + + // Create a VideoFetchService + fetchService := media.NewVideoFetchService(&youtubeClient, s3Client) + peakReader, err := fetchService.FetchPeaks(ctx, videoID) + if err != nil { + log.Fatalf("error calling fetch service: %v", err) + } + + for { + progress, err := peakReader.Next() + if err != nil { + if err != io.EOF { + log.Printf("error reading progress: %v", err) + } + break + } + + log.Printf("progress = %+v", progress) + } + + log.Println("done") +} diff --git a/backend/go.mod b/backend/go.mod index bd578a8..87060e6 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -12,6 +12,18 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2 v1.10.0 // indirect + github.com/aws/aws-sdk-go-v2/config v1.9.0 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.5.0 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.4.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.17.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.5.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.8.0 // indirect + github.com/aws/smithy-go v1.8.1 // indirect github.com/bitly/go-simplejson v0.5.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect diff --git a/backend/go.sum b/backend/go.sum index 251d2b1..1d6670b 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -63,6 +63,30 @@ github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6l github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= +github.com/aws/aws-sdk-go-v2 v1.10.0 h1:+dCJ5W2HiZNa4UtaIc5ljKNulm0dK0vS5dxb5LdDOAA= +github.com/aws/aws-sdk-go-v2 v1.10.0/go.mod h1:U/EyyVvKtzmFeQQcca7eBotKdlpcP2zzU6bXBYcf7CE= +github.com/aws/aws-sdk-go-v2/config v1.9.0 h1:SkREVSwi+J8MSdjhJ96jijZm5ZDNleI0E4hHCNivh7s= +github.com/aws/aws-sdk-go-v2/config v1.9.0/go.mod h1:qhK5NNSgo9/nOSMu3HyE60WHXZTWTHTgd5qtIF44vOQ= +github.com/aws/aws-sdk-go-v2/credentials v1.5.0 h1:r6470olsn2qyOe2aLzK6q+wfO3dzNcMujRT3gqBgBB8= +github.com/aws/aws-sdk-go-v2/credentials v1.5.0/go.mod h1:kvqTkpzQmzri9PbsiTY+LvwFzM0gY19emlAWwBOJMb0= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 h1:FKaqk7geL3oIqSwGJt5SWUKj8uJ+qLZNqlBuqq6sFyA= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0/go.mod h1:KqEkRkxm/+1Pd/rENRNbQpfblDBYeg5HDSqjB6ks8hA= +github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 h1:zPxLGWALExNepElO0gYgoqsbqTlt4ZCrhZ7XlfJ+Qlw= +github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5/go.mod h1:6ZBTuDmvpCOD4Sf1i2/I3PgftlEcDGgvi8ocq64oQEg= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.4.0 h1:EtQ6hVAgNsWTiO+u9e+ziaEYyOAlEkAwLskpL40U6pQ= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.4.0/go.mod h1:vEkJTjJ8vnv0uWy2tAp7DSydWFpudMGWPQ2SFucoN1k= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0 h1:/T5wKsw/po118HEDvnSE8YU7TESxvZbYM2rnn+Oi7Kk= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0/go.mod h1:X5/JuOxPLU/ogICgDTtnpfaQzdQJO0yKDcpoxWLLJ8Y= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.8.0 h1:j1JV89mkJP4f9cssTWbu+anj3p2v+UWMA7qERQQqMkM= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.8.0/go.mod h1:669UCOYqQ7jA8sqwEsbIXoYrfp8KT9BeUrST0/mhCFw= +github.com/aws/aws-sdk-go-v2/service/s3 v1.17.0 h1:VI/NYED5fJqgV1NTvfBlHJaqJd803AAkg8ZcJ8TkrvA= +github.com/aws/aws-sdk-go-v2/service/s3 v1.17.0/go.mod h1:6mvopTtbyJcY0NfSOVtgkBlDDatYwiK1DAFr4VL0QCo= +github.com/aws/aws-sdk-go-v2/service/sso v1.5.0 h1:VnrCAJTp1bDxU79UuW/D4z7bwZ7xOc7JjDKpqXL/m04= +github.com/aws/aws-sdk-go-v2/service/sso v1.5.0/go.mod h1:GsqaJOJeOfeYD88/2vHWKXegvDRofDqWwC5i48A2kgs= +github.com/aws/aws-sdk-go-v2/service/sts v1.8.0 h1:7N7RsEVvUcvEg7jrWKU5AnSi4/6b6eY9+wG1g6W4ExE= +github.com/aws/aws-sdk-go-v2/service/sts v1.8.0/go.mod h1:dOlm91B439le5y1vtPCk5yJtbx3RdT3hRGYRY8TYKvQ= +github.com/aws/smithy-go v1.8.1 h1:9Y6qxtzgEODaLNGN+oN2QvcHvKUe4jsH8w4M+8LXzGk= +github.com/aws/smithy-go v1.8.1/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -210,6 +234,7 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -277,6 +302,8 @@ github.com/improbable-eng/grpc-web v0.14.1/go.mod h1:zEjGHa8DAlkoOXmswrNvhUGEYQA github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= diff --git a/backend/media/fetch.go b/backend/media/fetch.go new file mode 100644 index 0000000..fe91927 --- /dev/null +++ b/backend/media/fetch.go @@ -0,0 +1,318 @@ +package media + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "log" + "os/exec" + "sort" + "strconv" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/service/s3" + youtubev2 "github.com/kkdai/youtube/v2" +) + +const s3Bucket = "clipper-development" + +const ( + rawAudioCodec = "pcm_s16le" + rawAudioFormat = "s16le" + rawAudioSampleRate = 48_000 +) + +// progressReader is a reader that prints progress logs as it reads. +type progressReader struct { + io.Reader + label string + total, exp int +} + +func (pw *progressReader) Read(p []byte) (int, error) { + n, err := pw.Reader.Read(p) + pw.total += n + + log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0) + + return n, err +} + +// S3Client stubs the AWS S3 service client. +type S3Client interface { + CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) + UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) + AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) + CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) +} + +// YoutubeClient stubs the youtube.Client client. +type YoutubeClient interface { + GetVideoContext(context.Context, string) (*youtubev2.Video, error) + GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error) +} + +type MediaSet2 struct { + id string +} + +func NewMediaSet2(id string) *MediaSet2 { + return &MediaSet2{ + id: id, + } +} + +// VideoFetchService fetches a video via an io.Reader. +type VideoFetchService struct { + youtube YoutubeClient + s3 S3Client +} + +func NewVideoFetchService(youtubeClient YoutubeClient, s3Client S3Client) *VideoFetchService { + return &VideoFetchService{ + youtube: youtubeClient, + s3: s3Client, + } +} + +// Fetch handles the entire process to fetch and process the audio and video +// parts of a MediaSet. +func (s *VideoFetchService) Fetch(ctx context.Context, id string) (*MediaSet, error) { + video, err := s.youtube.GetVideoContext(ctx, id) + if err != nil { + return nil, fmt.Errorf("error fetching video: %v", err) + } + + if len(video.Formats) == 0 { + return nil, errors.New("no format available") + } + + // just the audio for now + + // grab an audio stream from youtube + // TODO: avoid possible panic + format := sortAudio(video.Formats)[0] + + sampleRate, err := strconv.Atoi(format.AudioSampleRate) + if err != nil { + return nil, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate) + } + + approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs) + if err != nil { + return nil, fmt.Errorf("could not parse audio duration: %s", err) + } + approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond + approxFrames := int64(approxDuration/time.Second) * int64(sampleRate) + + mediaSet := MediaSet{ + ID: id, + Audio: Audio{ + // we need to decode it to be able to know bytes and frames exactly + ApproxFrames: approxFrames, + Channels: format.AudioChannels, + SampleRate: sampleRate, + }, + } + + return &mediaSet, nil +} + +type PeakIterator interface { + Next() (FetchPeaksProgress, error) + Close() error +} + +func (s *VideoFetchService) FetchPeaks(ctx context.Context, id string) (PeakIterator, error) { + mediaSet := NewMediaSet(id) + if !mediaSet.Exists() { + // TODO check if audio uploaded already, don't bother again + return nil, errors.New("no media set found") + } + + if err := mediaSet.Load(); err != nil { + return nil, fmt.Errorf("error loading media set: %v", err) + } + + video, err := s.youtube.GetVideoContext(ctx, id) + if err != nil { + return nil, fmt.Errorf("error fetching video: %v", err) + } + + if len(video.Formats) == 0 { + return nil, errors.New("no format available") + } + + // TODO: avoid possible panic + format := sortAudio(video.Formats)[0] + + stream, _, err := s.youtube.GetStreamContext(ctx, video, &format) + if err != nil { + return nil, fmt.Errorf("error fetching stream: %v", err) + } + + // wrap it in a progress reader + progressStream := &progressReader{Reader: stream, label: "audio", exp: int(format.ContentLength)} + + ffmpegReader, err := newFfmpegReader(ctx, progressStream, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") + if err != nil { + return nil, fmt.Errorf("error creating ffmpegreader: %v", err) + } + + // set up uploader, this is writer 1 + uploader, err := newMultipartUploadWriter( + ctx, + s.s3, + s3Bucket, + fmt.Sprintf("media_sets/%s/audio.webm", id), + "application/octet-stream", + ) + if err != nil { + return nil, fmt.Errorf("error creating uploader: %v", err) + } + + peakIterator := newFetchPeaksIterator( + mediaSet.Audio.ApproxFrames, + format.AudioChannels, + 100, + ) + + state := fetchPeaksState{ + fetchPeaksIterator: peakIterator, + ffmpegReader: ffmpegReader, + uploader: uploader, + } + go state.run(ctx) // pass ctx? + + return &state, nil +} + +type fetchPeaksState struct { + *fetchPeaksIterator + + ffmpegReader *ffmpegReader + uploader *multipartUploadWriter + err error +} + +// run copies the audio data from ffmpeg, waits for termination and then cleans +// up appropriately. +func (s *fetchPeaksState) run(ctx context.Context) { + mw := io.MultiWriter(s.fetchPeaksIterator, s.uploader) + done := make(chan error) + var err error + + go func() { + _, copyErr := io.Copy(mw, s.ffmpegReader) + done <- copyErr + }() + +outer: + for { + select { + case <-ctx.Done(): + err = ctx.Err() + break outer + case err = <-done: + break outer + } + } + + if readerErr := s.ffmpegReader.Close(); readerErr != nil { + log.Printf("error closing ffmpegReader: %v", readerErr) + if err == nil { + err = readerErr + } + } + + if err == nil { + if uploaderErr := s.uploader.Complete(); uploaderErr != nil { + log.Printf("error closing uploader: %v", uploaderErr) + err = uploaderErr + } + } + + if err != nil { + newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + if abortUploadErr := s.uploader.Abort(newCtx); abortUploadErr != nil { + log.Printf("error aborting uploader: %v", abortUploadErr) + } + s.Abort(err) + return + } + + if iterErr := s.Close(); iterErr != nil { + log.Printf("error closing peak iterator: %v", iterErr) + } +} + +type ffmpegReader struct { + io.ReadCloser + + cmd *exec.Cmd +} + +func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmpegReader, error) { + var stdErr bytes.Buffer + + cmd := exec.CommandContext(ctx, "ffmpeg", arg...) + cmd.Stdin = input + cmd.Stderr = &stdErr // TODO: fix error handling + + r, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("error creating pipe: %v", err) + } + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("error starting ffmpeg: %v", err) + } + + return &ffmpegReader{ReadCloser: r, cmd: cmd}, nil +} + +func (r *ffmpegReader) Close() error { + state, err := r.cmd.Process.Wait() + + if err != nil { + return fmt.Errorf("error returned from process: %v", err) + } + + if state.ExitCode() != 0 { + return fmt.Errorf("command exited with code %d", state.ExitCode()) + } + + log.Println("returning from ffmpegreader.close") + return nil +} + +// sortAudio returns the provided formats ordered in descending preferred +// order. The ideal candidate is opus-encoded stereo audio in a webm container, +// with the lowest available bitrate. +func sortAudio(inFormats youtubev2.FormatList) youtubev2.FormatList { + var formats youtubev2.FormatList + for _, format := range inFormats { + if format.FPS == 0 && format.AudioChannels > 0 { + formats = append(formats, format) + } + } + sort.SliceStable(formats, func(i, j int) bool { + isOpusI := strings.Contains(formats[i].MimeType, "opus") + isOpusJ := strings.Contains(formats[j].MimeType, "opus") + if isOpusI && isOpusJ { + isStereoI := formats[i].AudioChannels == 2 + isStereoJ := formats[j].AudioChannels == 2 + if isStereoI && isStereoJ { + return formats[i].ContentLength < formats[j].ContentLength + } + return isStereoI + } + return isOpusI + }) + return formats +} diff --git a/backend/media/media_set.go b/backend/media/media_set.go index 39a1626..0f7b4ba 100644 --- a/backend/media/media_set.go +++ b/backend/media/media_set.go @@ -14,17 +14,21 @@ import ( const SizeOfInt16 = 2 type Audio struct { - Bytes int64 `json:"bytes"` - Channels int `json:"channels"` - Frames int64 `json:"frames"` - SampleRate int `json:"sample_rate"` + Bytes int64 `json:"bytes"` + Channels int `json:"channels"` + // ApproxFrames is used during initial processing when a precise frame count + // cannot be determined. Prefer Frames in all other cases. + ApproxFrames int64 `json:"approx_frames"` + Frames int64 `json:"frames"` + SampleRate int `json:"sample_rate"` } type Video struct { - Bytes int64 `json:"bytes"` - Duration time.Duration `json:"duration"` - ThumbnailWidth int `json:"thumbnail_width"` - ThumbnailHeight int `json:"thumbnail_height"` + Bytes int64 `json:"bytes"` + Duration time.Duration `json:"duration"` + // not sure if this are needed any more? + ThumbnailWidth int `json:"thumbnail_width"` + ThumbnailHeight int `json:"thumbnail_height"` } // MediaSet represents the media and metadata associated with a single media diff --git a/backend/media/peak_progress.go b/backend/media/peak_progress.go new file mode 100644 index 0000000..296984f --- /dev/null +++ b/backend/media/peak_progress.go @@ -0,0 +1,110 @@ +package media + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" +) + +type FetchPeaksProgress struct { + percentComplete float32 + Peaks []int16 +} + +// fetchPeaksIterator accepts a byte stream containing little endian +// signed int16s and, given a target number of bins, emits a stream of peaks +// corresponding to each channel of the audio data. +type fetchPeaksIterator struct { + channels int + framesPerBin int + + samples []int16 + currPeaks []int16 + currCount int + total int + progress chan FetchPeaksProgress + errorChan chan error +} + +// TODO: validate inputs, debugging is confusing otherwise +func newFetchPeaksIterator(expFrames int64, channels, numBins int) *fetchPeaksIterator { + return &fetchPeaksIterator{ + channels: channels, + framesPerBin: int(expFrames / int64(numBins)), + samples: make([]int16, 8_192), + currPeaks: make([]int16, channels), + progress: make(chan FetchPeaksProgress), + errorChan: make(chan error, 1), + } +} + +func (w *fetchPeaksIterator) Abort(err error) { + w.errorChan <- err +} + +func (w *fetchPeaksIterator) Close() error { + close(w.progress) + return nil +} + +func (w *fetchPeaksIterator) Write(p []byte) (int, error) { + // expand our target slice if it is of insufficient size: + numSamples := len(p) / SizeOfInt16 + if len(w.samples) < numSamples { + w.samples = append(w.samples, make([]int16, numSamples-len(w.samples))...) + } + + samples := w.samples[:numSamples] + + if err := binary.Read(bytes.NewReader(p), binary.LittleEndian, samples); err != nil { + return 0, fmt.Errorf("error parsing samples: %v", err) + } + + for i := 0; i < len(samples); i += w.channels { + for j := 0; j < w.channels; j++ { + samp := samples[i+j] + if samp < 0 { + samp = -samp + } + if samp > w.currPeaks[j] { + w.currPeaks[j] = samp + } + } + w.currCount++ + if w.currCount == w.framesPerBin { + w.nextBin() + } + } + + return len(p), nil +} + +func (w *fetchPeaksIterator) nextBin() { + var progress FetchPeaksProgress + // TODO: avoid an allocation? + progress.Peaks = append(progress.Peaks, w.currPeaks...) + + w.progress <- progress + + w.currCount = 0 + // log.Printf("got peak for %d frames, which is equal to target of %d frames per bin, %d total bins processed, peaks: %+v", w.currCount, w.framesPerBin, w.total+1, w.currPeaks) + for i := 0; i < len(w.currPeaks); i++ { + w.currPeaks[i] = 0 + } + w.total++ +} + +func (w *fetchPeaksIterator) Next() (FetchPeaksProgress, error) { + for { + select { + case progress, ok := <-w.progress: + if !ok { + return FetchPeaksProgress{}, io.EOF + } + return FetchPeaksProgress{Peaks: progress.Peaks}, nil + case err := <-w.errorChan: + return FetchPeaksProgress{}, fmt.Errorf("error waiting for progress: %v", err) + } + } +} diff --git a/backend/media/service.go b/backend/media/service.go index 3fe82d6..12fcfde 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -1,9 +1,29 @@ package media -import "context" +import ( + "context" + "errors" + "fmt" + "log" +) type MediaSetService struct{} func (s *MediaSetService) GetMediaSet(ctx context.Context, source string, id string) (*MediaSet, error) { + log.Printf("GetMediaSet called with source %q, id %q", source, id) + + if source != "youtube" { + return nil, errors.New("unknown source") + } + + // try to load and return a cached MediaSet, if possible: + mediaSet := NewMediaSet(id) + if mediaSet.Exists() { + if err := mediaSet.Load(); err != nil { + return nil, fmt.Errorf("error loading MediaSet: %v", err) + } + return mediaSet, nil + } + return &MediaSet{ID: id}, nil } diff --git a/backend/media/uploader.go b/backend/media/uploader.go new file mode 100644 index 0000000..dea3bdf --- /dev/null +++ b/backend/media/uploader.go @@ -0,0 +1,144 @@ +package media + +import ( + "bytes" + "context" + "fmt" + "log" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +// multipartUploadWriter is a Writer that uploads transparently to an S3 bucket +// in 5MB parts. It buffers data internally until a part is ready to send over +// the network. Parts are sent as soon as they exceed the minimum part size of +// 5MB. +// +// The caller must call either Complete() or Abort() after finishing writing. +// Failure to do so will leave S3 in an inconsistent state. +type multipartUploadWriter struct { + ctx context.Context + s3 S3Client + buf *bytes.Buffer + bucket, key, contentType string + uploadID string + completedParts []types.CompletedPart + bytesUploaded int64 +} + +const targetPartSizeBytes = 5 * 1024 * 1024 // 5MB + +// newMultipartUploadWriter creates a new multipart upload writer, including +// creating the upload on S3. Either Complete or Abort must be called after +// calling this function. +func newMultipartUploadWriter(ctx context.Context, s3Client S3Client, bucket, key, contentType string) (*multipartUploadWriter, error) { + input := s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + ContentType: aws.String(contentType), + } + + output, err := s3Client.CreateMultipartUpload(ctx, &input) + if err != nil { + return nil, fmt.Errorf("error creating multipart upload: %v", err) + } + + b := make([]byte, 0, targetPartSizeBytes+16_384) + + return &multipartUploadWriter{ + ctx: ctx, + s3: s3Client, + buf: bytes.NewBuffer(b), + bucket: bucket, + key: key, + contentType: contentType, + uploadID: *output.UploadId, + }, nil +} + +func (u *multipartUploadWriter) Write(p []byte) (int, error) { + n, err := u.buf.Write(p) + if err != nil { + return n, fmt.Errorf("error writing to buffer: %v", err) + } + + if u.buf.Len() >= targetPartSizeBytes { + partLen := u.buf.Len() + log.Printf("uploading part num = %d, len = %d", u.partNum(), partLen) + + input := s3.UploadPartInput{ + Body: u.buf, + Bucket: aws.String(u.bucket), + Key: aws.String(u.key), + PartNumber: u.partNum(), + UploadId: aws.String(u.uploadID), + ContentLength: int64(partLen), + } + + output, uploadErr := u.s3.UploadPart(u.ctx, &input) + if uploadErr != nil { + // TODO: retry on failure + return n, fmt.Errorf("error uploading part: %v", uploadErr) + } + + log.Printf("uploaded part num = %d, etag = %s, bytes = %d", u.partNum(), *output.ETag, partLen) + + u.completedParts = append(u.completedParts, types.CompletedPart{ETag: output.ETag, PartNumber: u.partNum()}) + u.bytesUploaded += int64(partLen) + } + + return n, err +} + +func (u *multipartUploadWriter) partNum() int32 { + return int32(len(u.completedParts) + 1) +} + +// Abort aborts the upload process, cancelling the upload on S3. +// Accepts a separate context in case it is called during cleanup after the +// original context was killed. +func (u *multipartUploadWriter) Abort(ctx context.Context) error { + input := s3.AbortMultipartUploadInput{ + Bucket: aws.String(u.bucket), + Key: aws.String(u.key), + UploadId: aws.String(u.uploadID), + } + + _, err := u.s3.AbortMultipartUpload(ctx, &input) + if err != nil { + return fmt.Errorf("error aborting upload: %v", err) + } + + log.Printf("aborted upload, key = %s, bytesUploaded = %d", u.key, u.bytesUploaded) + + return nil +} + +// Complete completes the upload process, finalizing the upload on S3. +// If no parts have been successfully uploaded, then Abort() will be called +// transparently. +func (u *multipartUploadWriter) Complete() error { + if len(u.completedParts) == 0 { + return u.Abort(u.ctx) + } + + input := s3.CompleteMultipartUploadInput{ + Bucket: aws.String(u.bucket), + Key: aws.String(u.key), + UploadId: aws.String(u.uploadID), + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: u.completedParts, + }, + } + + _, err := u.s3.CompleteMultipartUpload(u.ctx, &input) + if err != nil { + return fmt.Errorf("error completing upload: %v", err) + } + + log.Printf("completed upload, key = %s, bytesUploaded = %d", u.key, u.bytesUploaded) + + return nil +} diff --git a/backend/youtube/youtube.go b/backend/youtube/youtube.go index d15c886..ddf92c9 100644 --- a/backend/youtube/youtube.go +++ b/backend/youtube/youtube.go @@ -34,12 +34,6 @@ const ( thumbnailHeight = 100 // " ) -// YoutubeClient wraps the youtube.Client client. -type YoutubeClient interface { - GetVideoContext(context.Context, string) (*youtubev2.Video, error) - GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error) -} - // Downloader downloads a set of Youtube media for a given video ID, including // separate audio and video files and a JSON metadata file. Additionally, it // also renders the downloaded audio file as a raw audio file. diff --git a/backend/youtube/youtube2.go b/backend/youtube/youtube2.go new file mode 100644 index 0000000..46be1a2 --- /dev/null +++ b/backend/youtube/youtube2.go @@ -0,0 +1,70 @@ +package youtube + +import ( + "context" + "errors" + "fmt" + "io" + "log" + "strconv" + "time" + + "git.netflux.io/rob/clipper/media" + youtubev2 "github.com/kkdai/youtube/v2" +) + +// YoutubeClient wraps the youtube.Client client. +type YoutubeClient interface { + GetVideoContext(context.Context, string) (*youtubev2.Video, error) + GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error) +} + +// MediaSetService implements a MediaSetService for Youtube videos. +type MediaSetService struct { + youtubeClient YoutubeClient +} + +// not used +func (s *MediaSetService) GetMediaSet(ctx context.Context, id string) (*media.MediaSet, error) { + var video *youtubev2.Video + video, err := s.youtubeClient.GetVideoContext(ctx, id) + if err != nil { + return nil, fmt.Errorf("error fetching video: %v", err) + } + + if len(video.Formats) == 0 { + return nil, errors.New("no format available") + } + + audioFormat := SortAudio(video.Formats)[0] + videoFormat := SortVideo(video.Formats)[0] + + durationMsecs, err := strconv.Atoi(videoFormat.ApproxDurationMs) + if err != nil { + log.Printf("GetMediaSet: invalid duration %s", videoFormat.ApproxDurationMs) + return nil, errors.New("error parsing format") + } + duration := time.Duration(durationMsecs) * time.Millisecond + + sampleRate, err := strconv.Atoi(videoFormat.AudioSampleRate) + if err != nil { + log.Printf("GetMediaSet: invalid samplerate %s", videoFormat.AudioSampleRate) + return nil, errors.New("error parsing format") + } + + return &media.MediaSet{ + ID: "", + Audio: media.Audio{ + Bytes: audioFormat.ContentLength, + Channels: audioFormat.AudioChannels, + Frames: 0, + SampleRate: sampleRate, + }, + Video: media.Video{ + Bytes: videoFormat.ContentLength, + Duration: duration, + ThumbnailWidth: videoFormat.Width, + ThumbnailHeight: videoFormat.Height, + }, + }, nil +} diff --git a/proto/media_set.proto b/proto/media_set.proto index 77b64ce..4bf78f7 100644 --- a/proto/media_set.proto +++ b/proto/media_set.proto @@ -5,30 +5,33 @@ option go_package = "pb/media_set"; import "google/protobuf/duration.proto"; -message Peaks { - repeated int32 peaks = 1; -} - message MediaSet { message Audio { int64 bytes = 1; int32 channels = 2; - int64 frames = 3; - int32 sample_rate = 4; + int64 approx_frames = 3; + int64 frames = 4; + int32 sample_rate = 5; }; message Video { int64 bytes = 1; google.protobuf.Duration duration = 2; - int32 ThumbnailWidth = 3; - int32 ThumbnailHeight = 4; + int32 thumbnail_width = 3; + int32 thumbnail_height = 4; }; string id = 1; Audio audio = 2; Video video = 3; + bool loaded = 4; }; +message PeaksProgress { + float percent_completed = 2; + repeated int32 peaks = 1; +} + message GetMediaSetRequest { string id = 1; string source = 2; @@ -36,9 +39,10 @@ message GetMediaSetRequest { message GetPeaksRequest { string id = 1; + int32 num_bins = 2; } service MediaSetService { rpc GetMediaSet(GetMediaSetRequest) returns (MediaSet) {} - rpc GetPeaks(GetPeaksRequest) returns (stream Peaks) {} + rpc GetPeaks(GetPeaksRequest) returns (stream PeaksProgress) {} }