From 3ce3736770a499c5cb893300b1994a97c18214bb Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Wed, 27 Oct 2021 22:17:59 +0200 Subject: [PATCH] improve naming, refactor --- backend/cmd/progress-test/main.go | 8 +- .../{peak_progress.go => audio_progress.go} | 37 ++--- backend/media/fetch.go | 136 ++++-------------- backend/media/ffmpeg_reader.go | 48 +++++++ backend/media/uploader.go | 7 +- backend/media/youtube.go | 34 +++++ 6 files changed, 138 insertions(+), 132 deletions(-) rename backend/media/{peak_progress.go => audio_progress.go} (67%) create mode 100644 backend/media/ffmpeg_reader.go create mode 100644 backend/media/youtube.go diff --git a/backend/cmd/progress-test/main.go b/backend/cmd/progress-test/main.go index 70109e1..a1fab26 100644 --- a/backend/cmd/progress-test/main.go +++ b/backend/cmd/progress-test/main.go @@ -30,14 +30,16 @@ func main() { var youtubeClient youtube.Client // Create a VideoFetchService - fetchService := media.NewVideoFetchService(&youtubeClient, s3Client) - peakReader, err := fetchService.FetchPeaks(ctx, videoID) + fetchService := media.NewFetchMediaSetService(&youtubeClient, s3Client) + + // Create a progressReader + progressReader, err := fetchService.FetchAudio(ctx, videoID) if err != nil { log.Fatalf("error calling fetch service: %v", err) } for { - progress, err := peakReader.Next() + progress, err := progressReader.Read() if err != nil { if err != io.EOF { log.Printf("error reading progress: %v", err) diff --git a/backend/media/peak_progress.go b/backend/media/audio_progress.go similarity index 67% rename from backend/media/peak_progress.go rename to backend/media/audio_progress.go index 296984f..09bd990 100644 --- a/backend/media/peak_progress.go +++ b/backend/media/audio_progress.go @@ -7,15 +7,20 @@ import ( "io" ) -type FetchPeaksProgress struct { +type FetchAudioProgress struct { percentComplete float32 Peaks []int16 } -// fetchPeaksIterator accepts a byte stream containing little endian +type FetchAudioProgressReader interface { + Read() (FetchAudioProgress, error) + Close() error +} + +// fetchAudioProgressReader accepts a byte stream containing little endian // signed int16s and, given a target number of bins, emits a stream of peaks // corresponding to each channel of the audio data. -type fetchPeaksIterator struct { +type fetchAudioProgressReader struct { channels int framesPerBin int @@ -23,32 +28,32 @@ type fetchPeaksIterator struct { currPeaks []int16 currCount int total int - progress chan FetchPeaksProgress + progress chan FetchAudioProgress errorChan chan error } // TODO: validate inputs, debugging is confusing otherwise -func newFetchPeaksIterator(expFrames int64, channels, numBins int) *fetchPeaksIterator { - return &fetchPeaksIterator{ +func newFetchAudioProgressReader(expFrames int64, channels, numBins int) *fetchAudioProgressReader { + return &fetchAudioProgressReader{ channels: channels, framesPerBin: int(expFrames / int64(numBins)), samples: make([]int16, 8_192), currPeaks: make([]int16, channels), - progress: make(chan FetchPeaksProgress), + progress: make(chan FetchAudioProgress), errorChan: make(chan error, 1), } } -func (w *fetchPeaksIterator) Abort(err error) { +func (w *fetchAudioProgressReader) Abort(err error) { w.errorChan <- err } -func (w *fetchPeaksIterator) Close() error { +func (w *fetchAudioProgressReader) Close() error { close(w.progress) return nil } -func (w *fetchPeaksIterator) Write(p []byte) (int, error) { +func (w *fetchAudioProgressReader) Write(p []byte) (int, error) { // expand our target slice if it is of insufficient size: numSamples := len(p) / SizeOfInt16 if len(w.samples) < numSamples { @@ -80,8 +85,8 @@ func (w *fetchPeaksIterator) Write(p []byte) (int, error) { return len(p), nil } -func (w *fetchPeaksIterator) nextBin() { - var progress FetchPeaksProgress +func (w *fetchAudioProgressReader) nextBin() { + var progress FetchAudioProgress // TODO: avoid an allocation? progress.Peaks = append(progress.Peaks, w.currPeaks...) @@ -95,16 +100,16 @@ func (w *fetchPeaksIterator) nextBin() { w.total++ } -func (w *fetchPeaksIterator) Next() (FetchPeaksProgress, error) { +func (w *fetchAudioProgressReader) Read() (FetchAudioProgress, error) { for { select { case progress, ok := <-w.progress: if !ok { - return FetchPeaksProgress{}, io.EOF + return FetchAudioProgress{}, io.EOF } - return FetchPeaksProgress{Peaks: progress.Peaks}, nil + return FetchAudioProgress{Peaks: progress.Peaks}, nil case err := <-w.errorChan: - return FetchPeaksProgress{}, fmt.Errorf("error waiting for progress: %v", err) + return FetchAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err) } } } diff --git a/backend/media/fetch.go b/backend/media/fetch.go index fe91927..4f6f35d 100644 --- a/backend/media/fetch.go +++ b/backend/media/fetch.go @@ -1,16 +1,12 @@ package media import ( - "bytes" "context" "errors" "fmt" "io" "log" - "os/exec" - "sort" "strconv" - "strings" "time" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -41,7 +37,7 @@ func (pw *progressReader) Read(p []byte) (int, error) { return n, err } -// S3Client stubs the AWS S3 service client. +// S3Client wraps the AWS S3 service client. type S3Client interface { CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) @@ -49,38 +45,27 @@ type S3Client interface { CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) } -// YoutubeClient stubs the youtube.Client client. +// YoutubeClient wraps the youtube.Client client. type YoutubeClient interface { GetVideoContext(context.Context, string) (*youtubev2.Video, error) GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error) } -type MediaSet2 struct { - id string -} - -func NewMediaSet2(id string) *MediaSet2 { - return &MediaSet2{ - id: id, - } -} - -// VideoFetchService fetches a video via an io.Reader. -type VideoFetchService struct { +// FetchMediaSetService fetches a video via an io.Reader. +type FetchMediaSetService struct { youtube YoutubeClient s3 S3Client } -func NewVideoFetchService(youtubeClient YoutubeClient, s3Client S3Client) *VideoFetchService { - return &VideoFetchService{ +func NewFetchMediaSetService(youtubeClient YoutubeClient, s3Client S3Client) *FetchMediaSetService { + return &FetchMediaSetService{ youtube: youtubeClient, s3: s3Client, } } -// Fetch handles the entire process to fetch and process the audio and video -// parts of a MediaSet. -func (s *VideoFetchService) Fetch(ctx context.Context, id string) (*MediaSet, error) { +// Fetch fetches the metadata for a given MediaSet source. +func (s *FetchMediaSetService) Fetch(ctx context.Context, id string) (*MediaSet, error) { video, err := s.youtube.GetVideoContext(ctx, id) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) @@ -118,15 +103,17 @@ func (s *VideoFetchService) Fetch(ctx context.Context, id string) (*MediaSet, er }, } + // TODO: video + // TODO: save to JSON + return &mediaSet, nil } -type PeakIterator interface { - Next() (FetchPeaksProgress, error) - Close() error -} - -func (s *VideoFetchService) FetchPeaks(ctx context.Context, id string) (PeakIterator, error) { +// FetchAudio fetches the audio stream from Youtube, pipes it through FFMPEG to +// extract the raw audio samples, and uploads them to S3. It +// returns a FetchAudioProgressReader. This reader must be read until +// completion - it will return any error which occurs during the fetch process. +func (s *FetchMediaSetService) FetchAudio(ctx context.Context, id string) (FetchAudioProgressReader, error) { mediaSet := NewMediaSet(id) if !mediaSet.Exists() { // TODO check if audio uploaded already, don't bother again @@ -174,34 +161,33 @@ func (s *VideoFetchService) FetchPeaks(ctx context.Context, id string) (PeakIter return nil, fmt.Errorf("error creating uploader: %v", err) } - peakIterator := newFetchPeaksIterator( + fetchAudioProgressReader := newFetchAudioProgressReader( mediaSet.Audio.ApproxFrames, format.AudioChannels, 100, ) - state := fetchPeaksState{ - fetchPeaksIterator: peakIterator, - ffmpegReader: ffmpegReader, - uploader: uploader, + state := fetchAudioState{ + fetchAudioProgressReader: fetchAudioProgressReader, + ffmpegReader: ffmpegReader, + uploader: uploader, } - go state.run(ctx) // pass ctx? + go state.run(ctx) return &state, nil } -type fetchPeaksState struct { - *fetchPeaksIterator +type fetchAudioState struct { + *fetchAudioProgressReader - ffmpegReader *ffmpegReader + ffmpegReader io.ReadCloser uploader *multipartUploadWriter - err error } // run copies the audio data from ffmpeg, waits for termination and then cleans // up appropriately. -func (s *fetchPeaksState) run(ctx context.Context) { - mw := io.MultiWriter(s.fetchPeaksIterator, s.uploader) +func (s *fetchAudioState) run(ctx context.Context) { + mw := io.MultiWriter(s, s.uploader) done := make(chan error) var err error @@ -222,7 +208,6 @@ outer: } if readerErr := s.ffmpegReader.Close(); readerErr != nil { - log.Printf("error closing ffmpegReader: %v", readerErr) if err == nil { err = readerErr } @@ -230,7 +215,6 @@ outer: if err == nil { if uploaderErr := s.uploader.Complete(); uploaderErr != nil { - log.Printf("error closing uploader: %v", uploaderErr) err = uploaderErr } } @@ -250,69 +234,3 @@ outer: log.Printf("error closing peak iterator: %v", iterErr) } } - -type ffmpegReader struct { - io.ReadCloser - - cmd *exec.Cmd -} - -func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmpegReader, error) { - var stdErr bytes.Buffer - - cmd := exec.CommandContext(ctx, "ffmpeg", arg...) - cmd.Stdin = input - cmd.Stderr = &stdErr // TODO: fix error handling - - r, err := cmd.StdoutPipe() - if err != nil { - return nil, fmt.Errorf("error creating pipe: %v", err) - } - - if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("error starting ffmpeg: %v", err) - } - - return &ffmpegReader{ReadCloser: r, cmd: cmd}, nil -} - -func (r *ffmpegReader) Close() error { - state, err := r.cmd.Process.Wait() - - if err != nil { - return fmt.Errorf("error returned from process: %v", err) - } - - if state.ExitCode() != 0 { - return fmt.Errorf("command exited with code %d", state.ExitCode()) - } - - log.Println("returning from ffmpegreader.close") - return nil -} - -// sortAudio returns the provided formats ordered in descending preferred -// order. The ideal candidate is opus-encoded stereo audio in a webm container, -// with the lowest available bitrate. -func sortAudio(inFormats youtubev2.FormatList) youtubev2.FormatList { - var formats youtubev2.FormatList - for _, format := range inFormats { - if format.FPS == 0 && format.AudioChannels > 0 { - formats = append(formats, format) - } - } - sort.SliceStable(formats, func(i, j int) bool { - isOpusI := strings.Contains(formats[i].MimeType, "opus") - isOpusJ := strings.Contains(formats[j].MimeType, "opus") - if isOpusI && isOpusJ { - isStereoI := formats[i].AudioChannels == 2 - isStereoJ := formats[j].AudioChannels == 2 - if isStereoI && isStereoJ { - return formats[i].ContentLength < formats[j].ContentLength - } - return isStereoI - } - return isOpusI - }) - return formats -} diff --git a/backend/media/ffmpeg_reader.go b/backend/media/ffmpeg_reader.go new file mode 100644 index 0000000..6ca05ca --- /dev/null +++ b/backend/media/ffmpeg_reader.go @@ -0,0 +1,48 @@ +package media + +import ( + "bytes" + "context" + "fmt" + "io" + "os/exec" +) + +type ffmpegReader struct { + io.ReadCloser + + cmd *exec.Cmd +} + +func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmpegReader, error) { + var stdErr bytes.Buffer + + cmd := exec.CommandContext(ctx, "ffmpeg", arg...) + cmd.Stdin = input + cmd.Stderr = &stdErr + + r, err := cmd.StdoutPipe() + if err != nil { + return nil, fmt.Errorf("error creating pipe: %v", err) + } + + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("error starting ffmpeg: %v, output: %s", err, stdErr.String()) + } + + return &ffmpegReader{ReadCloser: r, cmd: cmd}, nil +} + +func (r *ffmpegReader) Close() error { + state, err := r.cmd.Process.Wait() + + if err != nil { + return fmt.Errorf("error returned from ffmpeg process: %v", err) + } + + if state.ExitCode() != 0 { + return fmt.Errorf("non-zero status %d returned from ffmpeg process", state.ExitCode()) + } + + return nil +} diff --git a/backend/media/uploader.go b/backend/media/uploader.go index dea3bdf..c48c58f 100644 --- a/backend/media/uploader.go +++ b/backend/media/uploader.go @@ -96,9 +96,9 @@ func (u *multipartUploadWriter) partNum() int32 { return int32(len(u.completedParts) + 1) } -// Abort aborts the upload process, cancelling the upload on S3. -// Accepts a separate context in case it is called during cleanup after the -// original context was killed. +// Abort aborts the upload process, cancelling the upload on S3. It accepts a +// separate context to the associated writer in case it is called during +// cleanup after the original context was killed. func (u *multipartUploadWriter) Abort(ctx context.Context) error { input := s3.AbortMultipartUploadInput{ Bucket: aws.String(u.bucket), @@ -139,6 +139,5 @@ func (u *multipartUploadWriter) Complete() error { } log.Printf("completed upload, key = %s, bytesUploaded = %d", u.key, u.bytesUploaded) - return nil } diff --git a/backend/media/youtube.go b/backend/media/youtube.go new file mode 100644 index 0000000..824a934 --- /dev/null +++ b/backend/media/youtube.go @@ -0,0 +1,34 @@ +package media + +import ( + "sort" + "strings" + + youtubev2 "github.com/kkdai/youtube/v2" +) + +// sortAudio returns the provided formats ordered in descending preferred +// order. The ideal candidate is opus-encoded stereo audio in a webm container, +// with the lowest available bitrate. +func sortAudio(inFormats youtubev2.FormatList) youtubev2.FormatList { + var formats youtubev2.FormatList + for _, format := range inFormats { + if format.FPS == 0 && format.AudioChannels > 0 { + formats = append(formats, format) + } + } + sort.SliceStable(formats, func(i, j int) bool { + isOpusI := strings.Contains(formats[i].MimeType, "opus") + isOpusJ := strings.Contains(formats[j].MimeType, "opus") + if isOpusI && isOpusJ { + isStereoI := formats[i].AudioChannels == 2 + isStereoJ := formats[j].AudioChannels == 2 + if isStereoI && isStereoJ { + return formats[i].ContentLength < formats[j].ContentLength + } + return isStereoI + } + return isOpusI + }) + return formats +}