From c3da27ca4975e3ddd24ddebe36521039d60ea35a Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Mon, 29 Nov 2021 12:46:33 +0100 Subject: [PATCH] Refactor audio fetching logic --- backend/media/audio_progress.go | 125 --------- backend/media/ffmpeg_reader.go | 56 ---- backend/media/get_audio.go | 240 ++++++++++++++++++ .../media/{video_progress.go => get_video.go} | 9 +- backend/media/media_set.go | 26 +- backend/media/service.go | 151 ++--------- backend/server/server.go | 2 +- ...tent_length_columns_to_media_sets.down.sql | 2 + ...ontent_length_columns_to_media_sets.up.sql | 5 + backend/sql/queries.sql | 4 +- 10 files changed, 294 insertions(+), 326 deletions(-) delete mode 100644 backend/media/audio_progress.go delete mode 100644 backend/media/ffmpeg_reader.go create mode 100644 backend/media/get_audio.go rename backend/media/{video_progress.go => get_video.go} (91%) create mode 100644 backend/sql/migrations/1638182250_add_content_length_columns_to_media_sets.down.sql create mode 100644 backend/sql/migrations/1638182250_add_content_length_columns_to_media_sets.up.sql diff --git a/backend/media/audio_progress.go b/backend/media/audio_progress.go deleted file mode 100644 index b6c444b..0000000 --- a/backend/media/audio_progress.go +++ /dev/null @@ -1,125 +0,0 @@ -package media - -import ( - "bytes" - "encoding/binary" - "fmt" - "io" - "math" -) - -type GetAudioProgress struct { - PercentComplete float32 - Peaks []int16 -} - -type GetAudioProgressReader interface { - Read() (GetAudioProgress, error) - Close() error -} - -// getAudioProgressReader accepts a byte stream containing little endian -// signed int16s and, given a target number of bins, emits a stream of peaks -// corresponding to each channel of the audio data. -type getAudioProgressReader struct { - framesExpected int64 - channels int - framesPerBin int - - samples []int16 - currPeaks []int16 - currCount int - framesProcessed int64 - progress chan GetAudioProgress - errorChan chan error -} - -func newGetAudioProgressReader(framesExpected int64, channels, numBins int) (*getAudioProgressReader, error) { - if framesExpected <= 0 || channels <= 0 || numBins <= 0 { - return nil, fmt.Errorf("error creating audio progress reader (framesExpected = %d, channels = %d, numBins = %d)", framesExpected, channels, numBins) - } - - return &getAudioProgressReader{ - channels: channels, - framesExpected: framesExpected, - framesPerBin: int(math.Ceil(float64(framesExpected) / float64(numBins))), - samples: make([]int16, 8_192), - currPeaks: make([]int16, channels), - progress: make(chan GetAudioProgress), - errorChan: make(chan error, 1), - }, nil -} - -func (w *getAudioProgressReader) Abort(err error) { - w.errorChan <- err -} - -func (w *getAudioProgressReader) Close() error { - close(w.progress) - return nil -} - -func (w *getAudioProgressReader) Read() (GetAudioProgress, error) { - for { - select { - case progress, ok := <-w.progress: - if !ok { - return GetAudioProgress{Peaks: w.currPeaks, PercentComplete: w.percentComplete()}, io.EOF - } - return progress, nil - case err := <-w.errorChan: - return GetAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err) - } - } -} - -func (w *getAudioProgressReader) Write(p []byte) (int, error) { - // expand our target slice if it is of insufficient size: - numSamples := len(p) / SizeOfInt16 - if len(w.samples) < numSamples { - w.samples = append(w.samples, make([]int16, numSamples-len(w.samples))...) - } - - samples := w.samples[:numSamples] - - if err := binary.Read(bytes.NewReader(p), binary.LittleEndian, samples); err != nil { - return 0, fmt.Errorf("error parsing samples: %v", err) - } - - for i := 0; i < len(samples); i += w.channels { - for j := 0; j < w.channels; j++ { - samp := samples[i+j] - if samp < 0 { - samp = -samp - } - if samp > w.currPeaks[j] { - w.currPeaks[j] = samp - } - } - w.currCount++ - if w.currCount == w.framesPerBin { - w.nextBin() - } - } - - w.framesProcessed += int64(len(samples) / w.channels) - - return len(p), nil -} - -func (w *getAudioProgressReader) percentComplete() float32 { - return (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0 -} - -func (w *getAudioProgressReader) nextBin() { - var progress GetAudioProgress - progress.Peaks = append(progress.Peaks, w.currPeaks...) - progress.PercentComplete = w.percentComplete() - - w.progress <- progress - - w.currCount = 0 - for i := 0; i < len(w.currPeaks); i++ { - w.currPeaks[i] = 0 - } -} diff --git a/backend/media/ffmpeg_reader.go b/backend/media/ffmpeg_reader.go deleted file mode 100644 index b21935e..0000000 --- a/backend/media/ffmpeg_reader.go +++ /dev/null @@ -1,56 +0,0 @@ -package media - -import ( - "bytes" - "context" - "fmt" - "io" - "os/exec" -) - -type ffmpegReader struct { - io.ReadCloser - - cmd *exec.Cmd - stdErrBuf *bytes.Buffer -} - -func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmpegReader, error) { - var stdErr bytes.Buffer - - cmd := exec.CommandContext(ctx, "ffmpeg", arg...) - cmd.Stdin = input - cmd.Stderr = &stdErr - - r, err := cmd.StdoutPipe() - if err != nil { - return nil, fmt.Errorf("error creating pipe: %v", err) - } - - if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("error starting ffmpeg: %v, output: %s", err, stdErr.String()) - } - - return &ffmpegReader{ReadCloser: r, cmd: cmd, stdErrBuf: &stdErr}, nil -} - -func (r *ffmpegReader) Cancel() error { - if err := r.cmd.Process.Kill(); err != nil { - return fmt.Errorf("error killing ffmpeg process: %v", err) - } - return nil -} - -func (r *ffmpegReader) Close() error { - state, err := r.cmd.Process.Wait() - - if err != nil { - return fmt.Errorf("error returned from ffmpeg process: %v", err) - } - - if state.ExitCode() != 0 { - return fmt.Errorf("non-zero status %d returned from ffmpeg process, output: %s", state.ExitCode(), r.stdErrBuf.String()) - } - - return nil -} diff --git a/backend/media/get_audio.go b/backend/media/get_audio.go new file mode 100644 index 0000000..50220b1 --- /dev/null +++ b/backend/media/get_audio.go @@ -0,0 +1,240 @@ +package media + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "math" + "os/exec" + "strconv" + + "git.netflux.io/rob/clipper/config" + "git.netflux.io/rob/clipper/generated/store" + "go.uber.org/zap" +) + +type GetAudioProgress struct { + PercentComplete float32 + Peaks []int16 +} + +type GetAudioProgressReader interface { + Next() (GetAudioProgress, error) + Close() error +} + +// audioGetter manages getting and processing audio from Youtube. +type audioGetter struct { + store Store + youtube YoutubeClient + s3API S3API + config config.Config + logger *zap.SugaredLogger +} + +// newAudioGetter returns a new audioGetter. +func newAudioGetter(store Store, youtube YoutubeClient, s3API S3API, config config.Config, logger *zap.SugaredLogger) *audioGetter { + return &audioGetter{ + store: store, + youtube: youtube, + s3API: s3API, + config: config, + logger: logger, + } +} + +// GetAudio gets the audio, processes it and uploads it to S3. It returns a +// GetAudioProgressReader that can be used to poll progress reports and audio +// peaks. +// +// TODO: accept domain object instead +func (g *audioGetter) GetAudio(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { + video, err := g.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) + if err != nil { + return nil, fmt.Errorf("error fetching video: %v", err) + } + + format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag)) + if format == nil { + return nil, fmt.Errorf("error finding itag: %v", err) + } + + stream, _, err := g.youtube.GetStreamContext(ctx, video, format) + if err != nil { + return nil, fmt.Errorf("error fetching stream: %v", err) + } + + audioProgressReader, err := newGetAudioProgressReader(mediaSet.AudioFramesApprox, int(mediaSet.AudioChannels), numBins) + if err != nil { + return nil, fmt.Errorf("error building progress reader: %v", err) + } + + s := &audioGetterState{ + audioGetter: g, + getAudioProgressReader: audioProgressReader, + } + go s.getAudio(ctx, stream, mediaSet) + + return s, nil +} + +// audioGetterState represents the state of an individual audio fetch. +type audioGetterState struct { + *audioGetter + *getAudioProgressReader +} + +func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaSet store.MediaSet) { + defer s.Close() + defer r.Close() + + streamWithProgress := newProgressReader(r, "audio", mediaSet.AudioContentLength, s.logger) + + var stdErr bytes.Buffer + cmd := exec.CommandContext(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") + cmd.Stdin = streamWithProgress + cmd.Stderr = &stdErr + stdout, err := cmd.StdoutPipe() + if err != nil { + s.CloseWithError(fmt.Errorf("error getting stdout: %v", err)) + return + } + if err = cmd.Start(); err != nil { + s.CloseWithError(fmt.Errorf("error starting command: %v", err)) + return + } + + // TODO: use mediaSet func to fetch s3Key + s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) + + teeReader := io.TeeReader(stdout, s) + uploader := newMultipartUploader(s.s3API, s.logger) + bytesUploaded, err := uploader.Upload(ctx, teeReader, s.config.S3Bucket, s3Key, rawAudioMimeType) + if err != nil { + s.CloseWithError(fmt.Errorf("error uploading audio: %v", err)) + return + } + + if _, err = s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{ + ID: mediaSet.ID, + AudioS3Bucket: sqlString(s.config.S3Bucket), + AudioS3Key: sqlString(s3Key), + AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(mediaSet.AudioChannels)), + }); err != nil { + s.CloseWithError(fmt.Errorf("error setting audio uploaded: %v", err)) + return + } + + if err = cmd.Wait(); err != nil { + s.CloseWithError(fmt.Errorf("error waiting for command: %v", err)) + return + } +} + +// getAudioProgressReader accepts a byte stream containing little endian +// signed int16s and, given a target number of bins, emits a stream of peaks +// corresponding to each channel of the audio data. +type getAudioProgressReader struct { + framesExpected int64 + channels int + framesPerBin int + + samples []int16 + currPeaks []int16 + currCount int + framesProcessed int64 + progress chan GetAudioProgress + errorChan chan error +} + +func newGetAudioProgressReader(framesExpected int64, channels, numBins int) (*getAudioProgressReader, error) { + if framesExpected <= 0 || channels <= 0 || numBins <= 0 { + return nil, fmt.Errorf("error creating audio progress reader (framesExpected = %d, channels = %d, numBins = %d)", framesExpected, channels, numBins) + } + + return &getAudioProgressReader{ + channels: channels, + framesExpected: framesExpected, + framesPerBin: int(math.Ceil(float64(framesExpected) / float64(numBins))), + samples: make([]int16, 8_192), + currPeaks: make([]int16, channels), + progress: make(chan GetAudioProgress), + errorChan: make(chan error, 1), + }, nil +} + +func (w *getAudioProgressReader) CloseWithError(err error) { + w.errorChan <- err +} + +func (w *getAudioProgressReader) Close() error { + close(w.progress) + return nil +} + +func (w *getAudioProgressReader) Next() (GetAudioProgress, error) { + for { + select { + case progress, ok := <-w.progress: + if !ok { + return GetAudioProgress{Peaks: w.currPeaks, PercentComplete: w.percentComplete()}, io.EOF + } + return progress, nil + case err := <-w.errorChan: + return GetAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err) + } + } +} + +func (w *getAudioProgressReader) Write(p []byte) (int, error) { + // expand our target slice if it is of insufficient size: + numSamples := len(p) / SizeOfInt16 + if len(w.samples) < numSamples { + w.samples = append(w.samples, make([]int16, numSamples-len(w.samples))...) + } + + samples := w.samples[:numSamples] + + if err := binary.Read(bytes.NewReader(p), binary.LittleEndian, samples); err != nil { + return 0, fmt.Errorf("error parsing samples: %v", err) + } + + for i := 0; i < len(samples); i += w.channels { + for j := 0; j < w.channels; j++ { + samp := samples[i+j] + if samp < 0 { + samp = -samp + } + if samp > w.currPeaks[j] { + w.currPeaks[j] = samp + } + } + w.currCount++ + if w.currCount == w.framesPerBin { + w.nextBin() + } + } + + w.framesProcessed += int64(len(samples) / w.channels) + + return len(p), nil +} + +func (w *getAudioProgressReader) percentComplete() float32 { + return (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0 +} + +func (w *getAudioProgressReader) nextBin() { + var progress GetAudioProgress + progress.Peaks = append(progress.Peaks, w.currPeaks...) + progress.PercentComplete = w.percentComplete() + + w.progress <- progress + + w.currCount = 0 + for i := 0; i < len(w.currPeaks); i++ { + w.currPeaks[i] = 0 + } +} diff --git a/backend/media/video_progress.go b/backend/media/get_video.go similarity index 91% rename from backend/media/video_progress.go rename to backend/media/get_video.go index c28196f..7091284 100644 --- a/backend/media/video_progress.go +++ b/backend/media/get_video.go @@ -17,6 +17,12 @@ type GetVideoProgress struct { URL string } +type GetVideoProgressReader interface { + // Next returns the next video progress status. When the stream has finished, + // a valid GetVideoProgress value will be returned with io.EOF. + Next() (GetVideoProgress, error) +} + type videoGetter struct { s3 S3API store Store @@ -62,7 +68,8 @@ func (g *videoGetter) GetVideo(ctx context.Context, r io.Reader, exp int64, medi return s, nil } -// Write implements io.Writer. +// Write implements io.Writer. It is copied that same data that is written to +// S3, to implement progress tracking. func (s *videoGetterState) Write(p []byte) (int, error) { s.count += int64(len(p)) pc := (float32(s.count) / float32(s.exp)) * 100 diff --git a/backend/media/media_set.go b/backend/media/media_set.go index 65a4636..8937c6a 100644 --- a/backend/media/media_set.go +++ b/backend/media/media_set.go @@ -10,25 +10,25 @@ import ( const SizeOfInt16 = 2 type Audio struct { - Bytes int64 `json:"bytes"` - Channels int `json:"channels"` + ContentLength int64 + Channels int // ApproxFrames is used during initial processing when a precise frame count // cannot be determined. Prefer Frames in all other cases. - ApproxFrames int64 `json:"approx_frames"` - Frames int64 `json:"frames"` - SampleRate int `json:"sample_rate"` - YoutubeItag int `json:"youtube_itag"` - MimeType string `json:"mime_type"` + ApproxFrames int64 + Frames int64 + SampleRate int + YoutubeItag int + MimeType string } type Video struct { - Bytes int64 `json:"bytes"` - Duration time.Duration `json:"duration"` + ContentLength int64 + Duration time.Duration // not sure if this are needed any more? - ThumbnailWidth int `json:"thumbnail_width"` - ThumbnailHeight int `json:"thumbnail_height"` - YoutubeItag int `json:"youtube_itag"` - MimeType string `json:"mime_type"` + ThumbnailWidth int + ThumbnailHeight int + YoutubeItag int + MimeType string } // MediaSet represents the media and metadata associated with a single media diff --git a/backend/media/service.go b/backend/media/service.go index fb677f3..4825d9a 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -169,7 +169,9 @@ func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) AudioFramesApprox: audioMetadata.ApproxFrames, AudioSampleRate: int32(audioMetadata.SampleRate), AudioMimeTypeEncoded: audioMetadata.MimeType, + AudioContentLength: audioMetadata.ContentLength, VideoYoutubeItag: int32(videoMetadata.YoutubeItag), + VideoContentLength: videoMetadata.ContentLength, VideoMimeType: videoMetadata.MimeType, VideoDurationNanos: videoMetadata.Duration.Nanoseconds(), } @@ -200,17 +202,17 @@ func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (* ID: mediaSet.ID, YoutubeID: mediaSet.YoutubeID, Audio: Audio{ - YoutubeItag: int(mediaSet.AudioYoutubeItag), - Bytes: 0, // DEPRECATED - Channels: int(mediaSet.AudioChannels), - ApproxFrames: int64(mediaSet.AudioFramesApprox), - Frames: mediaSet.AudioFrames.Int64, - SampleRate: int(mediaSet.AudioSampleRate), - MimeType: mediaSet.AudioMimeTypeEncoded, + YoutubeItag: int(mediaSet.AudioYoutubeItag), + ContentLength: mediaSet.AudioContentLength, + Channels: int(mediaSet.AudioChannels), + ApproxFrames: int64(mediaSet.AudioFramesApprox), + Frames: mediaSet.AudioFrames.Int64, + SampleRate: int(mediaSet.AudioSampleRate), + MimeType: mediaSet.AudioMimeTypeEncoded, }, Video: Video{ YoutubeItag: int(mediaSet.VideoYoutubeItag), - Bytes: 0, // DEPRECATED? + ContentLength: mediaSet.VideoContentLength, Duration: time.Duration(mediaSet.VideoDurationNanos), MimeType: mediaSet.VideoMimeType, ThumbnailWidth: 0, // ?? @@ -234,7 +236,7 @@ func (s *MediaSetService) fetchVideoMetadata(ctx context.Context, video *youtube return Video{ YoutubeItag: format.ItagNo, MimeType: format.MimeType, - Bytes: format.ContentLength, + ContentLength: format.ContentLength, ThumbnailWidth: thumbnailWidth, ThumbnailHeight: thumbnailHeight, Duration: time.Duration(durationMsecs) * time.Millisecond, @@ -261,11 +263,12 @@ func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtube approxFrames := int64(approxDuration/time.Second) * int64(sampleRate) return Audio{ - MimeType: format.MimeType, - YoutubeItag: format.ItagNo, - ApproxFrames: approxFrames, - Channels: format.AudioChannels, - SampleRate: sampleRate, + ContentLength: format.ContentLength, + MimeType: format.MimeType, + YoutubeItag: format.ItagNo, + ApproxFrames: approxFrames, + Channels: format.AudioChannels, + SampleRate: sampleRate, }, nil } @@ -316,12 +319,6 @@ func (s *MediaSetService) GetVideo(ctx context.Context, id uuid.UUID) (GetVideoP ) } -type GetVideoProgressReader interface { - // Next returns the next video progress status. When the stream has finished, - // a valid GetVideoProgress value will be returned with io.EOF. - Next() (GetVideoProgress, error) -} - // GetAudio fetches the audio part of a MediaSet. func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins int) (GetAudioProgressReader, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) @@ -336,6 +333,11 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in return s.getAudioFromYoutube(ctx, mediaSet, numBins) } +func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { + audioGetter := newAudioGetter(s.store, s.youtube, s.s3, s.config, s.logger) + return audioGetter.GetAudio(ctx, mediaSet, numBins) +} + func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { input := s3.GetObjectInput{ Bucket: aws.String(mediaSet.AudioS3Bucket.String), @@ -400,7 +402,7 @@ outer: if err != nil { s.logger.Errorf("getAudioFromS3State: error closing s3 reader: %v", err) - s.Abort(err) + s.CloseWithError(err) return } @@ -409,113 +411,6 @@ outer: } } -func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { - video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) - if err != nil { - return nil, fmt.Errorf("error fetching video: %v", err) - } - - format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag)) - if format == nil { - return nil, fmt.Errorf("error finding itag: %v", err) - } - - stream, _, err := s.youtube.GetStreamContext(ctx, video, format) - if err != nil { - return nil, fmt.Errorf("error fetching stream: %v", err) - } - - streamWithProgress := newProgressReader(stream, "audio", format.ContentLength, s.logger) - - ffmpegReader, err := newFfmpegReader(ctx, streamWithProgress, "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") - if err != nil { - return nil, fmt.Errorf("error creating ffmpegreader: %v", err) - } - - // TODO: use mediaSet func to fetch s3Key - s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) - uploader := newMultipartUploader(s.s3, s.logger) - - getAudioProgressReader, err := newGetAudioProgressReader( - int64(mediaSet.AudioFramesApprox), - format.AudioChannels, - numBins, - ) - if err != nil { - return nil, fmt.Errorf("error creating audio reader: %v", err) - } - - state := getAudioFromYoutubeState{ - getAudioProgressReader: getAudioProgressReader, - ffmpegReader: ffmpegReader, - uploader: uploader, - s3Bucket: s.config.S3Bucket, - s3Key: s3Key, - store: s.store, - channels: format.AudioChannels, - logger: s.logger, - } - go state.run(ctx, mediaSet.ID) - - return &state, nil -} - -type getAudioFromYoutubeState struct { - *getAudioProgressReader - - ffmpegReader *ffmpegReader - uploader *multipartUploader - s3Bucket, s3Key string - store Store - channels int - logger *zap.SugaredLogger -} - -func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID) { - teeReader := io.TeeReader(s.ffmpegReader, s) - bytesUploaded, err := s.uploader.Upload(ctx, teeReader, s.s3Bucket, s.s3Key, rawAudioMimeType) - - // If there was an error returned, the underlying ffmpegReader process may - // still be active. Kill it. - if err != nil { - if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil { - s.logger.Errorf("getAudioFromYoutubeState: error cancelling ffmpegreader: %v", cancelErr) - } - } - - // Either way, we need to wait for the ffmpegReader process to exit, - // and ensure there is no error. - if readerErr := s.ffmpegReader.Close(); readerErr != nil { - if err == nil { - err = readerErr - } - } - - if err == nil { - _, updateErr := s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{ - ID: mediaSetID, - AudioS3Bucket: sqlString(s.s3Bucket), - AudioS3Key: sqlString(s.s3Key), - AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(s.channels)), - }) - - if updateErr != nil { - err = updateErr - } - } - - if err != nil { - s.logger.Errorf("getAudioFromYoutubeState: error uploading asynchronously: %v", err) - - s.Abort(err) - return - } - - if iterErr := s.Close(); iterErr != nil { - s.logger.Errorf("getAudioFromYoutubeState: error closing progress iterator: %v", iterErr) - } -} - func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, numBins int) ([]int16, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { diff --git a/backend/server/server.go b/backend/server/server.go index 1c0f5d9..325e15e 100644 --- a/backend/server/server.go +++ b/backend/server/server.go @@ -117,7 +117,7 @@ func (c *mediaSetServiceController) GetAudio(request *pbmediaset.GetAudioRequest } for { - progress, err := reader.Read() + progress, err := reader.Next() if err != nil && err != io.EOF { return newResponseError(err) } diff --git a/backend/sql/migrations/1638182250_add_content_length_columns_to_media_sets.down.sql b/backend/sql/migrations/1638182250_add_content_length_columns_to_media_sets.down.sql new file mode 100644 index 0000000..90b50ae --- /dev/null +++ b/backend/sql/migrations/1638182250_add_content_length_columns_to_media_sets.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE media_sets DROP COLUMN video_content_length; +ALTER TABLE media_sets DROP COLUMN audio_content_length; diff --git a/backend/sql/migrations/1638182250_add_content_length_columns_to_media_sets.up.sql b/backend/sql/migrations/1638182250_add_content_length_columns_to_media_sets.up.sql new file mode 100644 index 0000000..6144c03 --- /dev/null +++ b/backend/sql/migrations/1638182250_add_content_length_columns_to_media_sets.up.sql @@ -0,0 +1,5 @@ +ALTER TABLE media_sets ADD COLUMN audio_content_length bigint NOT NULL; +ALTER TABLE media_sets ADD COLUMN video_content_length bigint NOT NULL; + +ALTER TABLE media_sets ADD CONSTRAINT check_audio_content_length_gt_0 CHECK (audio_content_length > 0); +ALTER TABLE media_sets ADD CONSTRAINT check_video_content_length_gt_0 CHECK (video_content_length > 0); diff --git a/backend/sql/queries.sql b/backend/sql/queries.sql index b8c2ed5..d4a8df8 100644 --- a/backend/sql/queries.sql +++ b/backend/sql/queries.sql @@ -5,8 +5,8 @@ SELECT * FROM media_sets WHERE id = $1; SELECT * FROM media_sets WHERE youtube_id = $1; -- name: CreateMediaSet :one -INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_frames_approx, audio_sample_rate, audio_mime_type_encoded, video_youtube_itag, video_mime_type, video_duration_nanos, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW(), NOW()) +INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_frames_approx, audio_sample_rate, audio_content_length, audio_mime_type_encoded, video_youtube_itag, video_content_length, video_mime_type, video_duration_nanos, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW(), NOW()) RETURNING *; -- name: SetAudioUploaded :one