From 650382fb39fca8eafa314ccbcdd7a13d12d03b2d Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Fri, 12 Nov 2021 13:36:26 +0100 Subject: [PATCH] Renaming, fix frames/bytes confusion --- backend/media/audio_progress.go | 22 ++++++++++------------ backend/media/service.go | 31 +++++++++++++++---------------- 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/backend/media/audio_progress.go b/backend/media/audio_progress.go index a46022c..d0750bb 100644 --- a/backend/media/audio_progress.go +++ b/backend/media/audio_progress.go @@ -18,11 +18,10 @@ type GetAudioProgressReader interface { Close() error } -// fetchAudioProgressReader accepts a byte stream containing little endian +// getAudioProgressReader accepts a byte stream containing little endian // signed int16s and, given a target number of bins, emits a stream of peaks // corresponding to each channel of the audio data. -type fetchAudioProgressReader struct { - byteOrder binary.ByteOrder +type getAudioProgressReader struct { framesExpected int64 channels int framesPerBin int @@ -36,9 +35,8 @@ type fetchAudioProgressReader struct { } // TODO: validate inputs, debugging is confusing otherwise -func newGetAudioProgressReader(byteOrder binary.ByteOrder, framesExpected int64, channels, numBins int) *fetchAudioProgressReader { - return &fetchAudioProgressReader{ - byteOrder: byteOrder, +func newGetAudioProgressReader(framesExpected int64, channels, numBins int) *getAudioProgressReader { + return &getAudioProgressReader{ channels: channels, framesExpected: framesExpected, framesPerBin: int(math.Ceil(float64(framesExpected) / float64(numBins))), @@ -49,16 +47,16 @@ func newGetAudioProgressReader(byteOrder binary.ByteOrder, framesExpected int64, } } -func (w *fetchAudioProgressReader) Abort(err error) { +func (w *getAudioProgressReader) Abort(err error) { w.errorChan <- err } -func (w *fetchAudioProgressReader) Close() error { +func (w *getAudioProgressReader) Close() error { close(w.progress) return nil } -func (w *fetchAudioProgressReader) Read() (GetAudioProgress, error) { +func (w *getAudioProgressReader) Read() (GetAudioProgress, error) { for { select { case progress, ok := <-w.progress: @@ -72,7 +70,7 @@ func (w *fetchAudioProgressReader) Read() (GetAudioProgress, error) { } } -func (w *fetchAudioProgressReader) Write(p []byte) (int, error) { +func (w *getAudioProgressReader) Write(p []byte) (int, error) { // expand our target slice if it is of insufficient size: numSamples := len(p) / SizeOfInt16 if len(w.samples) < numSamples { @@ -81,7 +79,7 @@ func (w *fetchAudioProgressReader) Write(p []byte) (int, error) { samples := w.samples[:numSamples] - if err := binary.Read(bytes.NewReader(p), w.byteOrder, samples); err != nil { + if err := binary.Read(bytes.NewReader(p), binary.LittleEndian, samples); err != nil { return 0, fmt.Errorf("error parsing samples: %v", err) } @@ -106,7 +104,7 @@ func (w *fetchAudioProgressReader) Write(p []byte) (int, error) { return len(p), nil } -func (w *fetchAudioProgressReader) nextBin() { +func (w *getAudioProgressReader) nextBin() { var progress GetAudioProgress // TODO: avoid an allocation? progress.Peaks = append(progress.Peaks, w.currPeaks...) diff --git a/backend/media/service.go b/backend/media/service.go index e1fe921..338d855 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "database/sql" - "encoding/binary" "errors" "fmt" "io" @@ -259,16 +258,15 @@ func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.Med return nil, fmt.Errorf("error getting object from s3: %v", err) } - fetchAudioProgressReader := newGetAudioProgressReader( - binary.BigEndian, + getAudioProgressReader := newGetAudioProgressReader( int64(mediaSet.AudioFrames.Int64), int(mediaSet.AudioChannels), numBins, ) state := getAudioFromS3State{ - fetchAudioProgressReader: fetchAudioProgressReader, - s3Reader: NewModuloBufReader(output.Body, int(mediaSet.AudioChannels)*SizeOfInt16), + getAudioProgressReader: getAudioProgressReader, + s3Reader: NewModuloBufReader(output.Body, int(mediaSet.AudioChannels)*SizeOfInt16), } go state.run(ctx) @@ -276,7 +274,7 @@ func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.Med } type getAudioFromS3State struct { - *fetchAudioProgressReader + *getAudioProgressReader s3Reader io.ReadCloser } @@ -345,20 +343,20 @@ func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet stor s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) uploader := newMultipartUploader(s.s3) - fetchAudioProgressReader := newGetAudioProgressReader( - binary.LittleEndian, + getAudioProgressReader := newGetAudioProgressReader( int64(mediaSet.AudioFramesApprox), format.AudioChannels, numBins, ) state := getAudioFromYoutubeState{ - fetchAudioProgressReader: fetchAudioProgressReader, - ffmpegReader: ffmpegReader, - uploader: uploader, - s3Bucket: s3Bucket, - s3Key: s3Key, - store: s.store, + getAudioProgressReader: getAudioProgressReader, + ffmpegReader: ffmpegReader, + uploader: uploader, + s3Bucket: s3Bucket, + s3Key: s3Key, + store: s.store, + channels: format.AudioChannels, } go state.run(ctx, mediaSet.ID) @@ -366,12 +364,13 @@ func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet stor } type getAudioFromYoutubeState struct { - *fetchAudioProgressReader + *getAudioProgressReader ffmpegReader *ffmpegReader uploader *multipartUploader s3Bucket, s3Key string store Store + channels int } func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID) { @@ -399,7 +398,7 @@ func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID ID: mediaSetID, AudioS3Bucket: sqlString(s.s3Bucket), AudioS3Key: sqlString(s.s3Key), - AudioFrames: sqlInt64(bytesUploaded), + AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(s.channels)), }) if updateErr != nil {