From be42d452d6eb1093438c52d2c3698dc68055c948 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Mon, 29 Nov 2021 14:59:05 +0100 Subject: [PATCH] Update database with encoded audio metadata --- backend/media/get_audio.go | 91 ++++++++++++++----- backend/media/service.go | 19 ++-- ...coded_audio_columns_to_media_sets.down.sql | 8 ++ ...encoded_audio_columns_to_media_sets.up.sql | 8 ++ backend/sql/queries.sql | 12 ++- 5 files changed, 102 insertions(+), 36 deletions(-) create mode 100644 backend/sql/migrations/1638189108_add_encoded_audio_columns_to_media_sets.down.sql create mode 100644 backend/sql/migrations/1638189108_add_encoded_audio_columns_to_media_sets.up.sql diff --git a/backend/media/get_audio.go b/backend/media/get_audio.go index 50220b1..e96c2a3 100644 --- a/backend/media/get_audio.go +++ b/backend/media/get_audio.go @@ -9,6 +9,7 @@ import ( "math" "os/exec" "strconv" + "sync" "git.netflux.io/rob/clipper/config" "git.netflux.io/rob/clipper/generated/store" @@ -87,14 +88,13 @@ type audioGetterState struct { } func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaSet store.MediaSet) { - defer s.Close() - defer r.Close() - streamWithProgress := newProgressReader(r, "audio", mediaSet.AudioContentLength, s.logger) + pr, pw := io.Pipe() + teeReader := io.TeeReader(streamWithProgress, pw) var stdErr bytes.Buffer cmd := exec.CommandContext(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") - cmd.Stdin = streamWithProgress + cmd.Stdin = teeReader cmd.Stderr = &stdErr stdout, err := cmd.StdoutPipe() if err != nil { @@ -102,35 +102,78 @@ func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaS return } if err = cmd.Start(); err != nil { - s.CloseWithError(fmt.Errorf("error starting command: %v", err)) + s.CloseWithError(fmt.Errorf("error starting command: %v, output: %s", err, stdErr.String())) return } - // TODO: use mediaSet func to fetch s3Key - s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) + var wg sync.WaitGroup + wg.Add(2) - teeReader := io.TeeReader(stdout, s) - uploader := newMultipartUploader(s.s3API, s.logger) - bytesUploaded, err := uploader.Upload(ctx, teeReader, s.config.S3Bucket, s3Key, rawAudioMimeType) - if err != nil { - s.CloseWithError(fmt.Errorf("error uploading audio: %v", err)) - return - } + // Upload the encoded audio. + go func() { + defer wg.Done() - if _, err = s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{ - ID: mediaSet.ID, - AudioS3Bucket: sqlString(s.config.S3Bucket), - AudioS3Key: sqlString(s3Key), - AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(mediaSet.AudioChannels)), - }); err != nil { - s.CloseWithError(fmt.Errorf("error setting audio uploaded: %v", err)) - return - } + // TODO: use mediaSet func to fetch s3Key + s3Key := fmt.Sprintf("media_sets/%s/audio.opus", mediaSet.ID) + + uploader := newMultipartUploader(s.s3API, s.logger) + _, encErr := uploader.Upload(ctx, pr, s.config.S3Bucket, s3Key, "audio/opus") + if encErr != nil { + s.CloseWithError(fmt.Errorf("error uploading encoded audio: %v", encErr)) + return + } + + if _, err = s.store.SetEncodedAudioUploaded(ctx, store.SetEncodedAudioUploadedParams{ + ID: mediaSet.ID, + AudioEncodedS3Bucket: sqlString(s.config.S3Bucket), + AudioEncodedS3Key: sqlString(s3Key), + }); err != nil { + s.CloseWithError(fmt.Errorf("error setting encoded audio uploaded: %v", err)) + } + }() + + // Upload the raw audio. + go func() { + defer wg.Done() + + // TODO: use mediaSet func to fetch s3Key + s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) + + teeReader := io.TeeReader(stdout, s) + uploader := newMultipartUploader(s.s3API, s.logger) + bytesUploaded, rawErr := uploader.Upload(ctx, teeReader, s.config.S3Bucket, s3Key, rawAudioMimeType) + if rawErr != nil { + s.CloseWithError(fmt.Errorf("error uploading raw audio: %v", rawErr)) + return + } + + if _, err = s.store.SetRawAudioUploaded(ctx, store.SetRawAudioUploadedParams{ + ID: mediaSet.ID, + AudioRawS3Bucket: sqlString(s.config.S3Bucket), + AudioRawS3Key: sqlString(s3Key), + AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(mediaSet.AudioChannels)), + }); err != nil { + s.CloseWithError(fmt.Errorf("error setting raw audio uploaded: %v", err)) + } + }() if err = cmd.Wait(); err != nil { - s.CloseWithError(fmt.Errorf("error waiting for command: %v", err)) + // TODO: cancel other goroutines (e.g. video fetch) if an error occurs here. + s.CloseWithError(fmt.Errorf("error waiting for command: %v, output: %s", err, stdErr.String())) return } + + // Close the pipe sending encoded audio to be uploaded, this ensures the + // uploader reading from the pipe will receive io.EOF and complete + // successfully. + pw.Close() + + // Wait for the uploaders to complete. + wg.Wait() + + // Finally, close the progress reader so that the subsequent call to Next() + // returns io.EOF. + s.Close() } // getAudioProgressReader accepts a byte stream containing little endian diff --git a/backend/media/service.go b/backend/media/service.go index 4825d9a..0883a3a 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -70,7 +70,8 @@ type Store interface { GetMediaSet(context.Context, uuid.UUID) (store.MediaSet, error) GetMediaSetByYoutubeID(context.Context, string) (store.MediaSet, error) CreateMediaSet(context.Context, store.CreateMediaSetParams) (store.MediaSet, error) - SetAudioUploaded(context.Context, store.SetAudioUploadedParams) (store.MediaSet, error) + SetRawAudioUploaded(context.Context, store.SetRawAudioUploadedParams) (store.MediaSet, error) + SetEncodedAudioUploaded(context.Context, store.SetEncodedAudioUploadedParams) (store.MediaSet, error) SetVideoUploaded(context.Context, store.SetVideoUploadedParams) (store.MediaSet, error) SetVideoThumbnailUploaded(context.Context, store.SetVideoThumbnailUploadedParams) (store.MediaSet, error) } @@ -168,7 +169,7 @@ func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) AudioChannels: int32(audioMetadata.Channels), AudioFramesApprox: audioMetadata.ApproxFrames, AudioSampleRate: int32(audioMetadata.SampleRate), - AudioMimeTypeEncoded: audioMetadata.MimeType, + AudioEncodedMimeType: audioMetadata.MimeType, AudioContentLength: audioMetadata.ContentLength, VideoYoutubeItag: int32(videoMetadata.YoutubeItag), VideoContentLength: videoMetadata.ContentLength, @@ -208,7 +209,7 @@ func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (* ApproxFrames: int64(mediaSet.AudioFramesApprox), Frames: mediaSet.AudioFrames.Int64, SampleRate: int(mediaSet.AudioSampleRate), - MimeType: mediaSet.AudioMimeTypeEncoded, + MimeType: mediaSet.AudioEncodedMimeType, }, Video: Video{ YoutubeItag: int(mediaSet.VideoYoutubeItag), @@ -326,7 +327,7 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in return nil, fmt.Errorf("error getting media set: %v", err) } - if mediaSet.AudioS3UploadedAt.Valid { + if mediaSet.AudioRawS3UploadedAt.Valid { return s.getAudioFromS3(ctx, mediaSet, numBins) } @@ -340,8 +341,8 @@ func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet stor func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { input := s3.GetObjectInput{ - Bucket: aws.String(mediaSet.AudioS3Bucket.String), - Key: aws.String(mediaSet.AudioS3Key.String), + Bucket: aws.String(mediaSet.AudioRawS3Bucket.String), + Key: aws.String(mediaSet.AudioRawS3Key.String), } output, err := s.s3.GetObject(ctx, &input) if err != nil { @@ -423,8 +424,8 @@ func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, sta endFrame*int64(mediaSet.AudioChannels)*SizeOfInt16, ) input := s3.GetObjectInput{ - Bucket: aws.String(mediaSet.AudioS3Bucket.String), - Key: aws.String(mediaSet.AudioS3Key.String), + Bucket: aws.String(mediaSet.AudioRawS3Bucket.String), + Key: aws.String(mediaSet.AudioRawS3Key.String), Range: aws.String(byteRange), } output, err := s.s3.GetObject(ctx, &input) @@ -490,7 +491,7 @@ func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, sta } if bytesRead < bytesExpected { - s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioS3Key.String).Warn("short read from S3") + s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioRawS3Key.String).Warn("short read from S3") } return peaks, nil diff --git a/backend/sql/migrations/1638189108_add_encoded_audio_columns_to_media_sets.down.sql b/backend/sql/migrations/1638189108_add_encoded_audio_columns_to_media_sets.down.sql new file mode 100644 index 0000000..d36fba2 --- /dev/null +++ b/backend/sql/migrations/1638189108_add_encoded_audio_columns_to_media_sets.down.sql @@ -0,0 +1,8 @@ +ALTER TABLE media_sets DROP COLUMN audio_encoded_s3_bucket; +ALTER TABLE media_sets DROP COLUMN audio_encoded_s3_key; +ALTER TABLE media_sets DROP COLUMN audio_encoded_s3_uploaded_at; + +ALTER TABLE media_sets RENAME COLUMN audio_raw_s3_bucket TO audio_s3_bucket; +ALTER TABLE media_sets RENAME COLUMN audio_raw_s3_key TO audio_s3_key; +ALTER TABLE media_sets RENAME COLUMN audio_raw_s3_uploaded_at TO audio_s3_uploaded_at; +ALTER TABLE media_sets RENAME COLUMN audio_encoded_mime_type TO audio_mime_type_encoded; diff --git a/backend/sql/migrations/1638189108_add_encoded_audio_columns_to_media_sets.up.sql b/backend/sql/migrations/1638189108_add_encoded_audio_columns_to_media_sets.up.sql new file mode 100644 index 0000000..261a9b6 --- /dev/null +++ b/backend/sql/migrations/1638189108_add_encoded_audio_columns_to_media_sets.up.sql @@ -0,0 +1,8 @@ +ALTER TABLE media_sets RENAME COLUMN audio_s3_bucket TO audio_raw_s3_bucket; +ALTER TABLE media_sets RENAME COLUMN audio_s3_key TO audio_raw_s3_key; +ALTER TABLE media_sets RENAME COLUMN audio_s3_uploaded_at TO audio_raw_s3_uploaded_at; +ALTER TABLE media_sets RENAME COLUMN audio_mime_type_encoded TO audio_encoded_mime_type; + +ALTER TABLE media_sets ADD COLUMN audio_encoded_s3_bucket CHARACTER VARYING(255); +ALTER TABLE media_sets ADD COLUMN audio_encoded_s3_key CHARACTER VARYING(255); +ALTER TABLE media_sets ADD COLUMN audio_encoded_s3_uploaded_at TIMESTAMP WITH TIME ZONE; diff --git a/backend/sql/queries.sql b/backend/sql/queries.sql index d4a8df8..340e152 100644 --- a/backend/sql/queries.sql +++ b/backend/sql/queries.sql @@ -5,13 +5,19 @@ SELECT * FROM media_sets WHERE id = $1; SELECT * FROM media_sets WHERE youtube_id = $1; -- name: CreateMediaSet :one -INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_frames_approx, audio_sample_rate, audio_content_length, audio_mime_type_encoded, video_youtube_itag, video_content_length, video_mime_type, video_duration_nanos, created_at, updated_at) +INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_frames_approx, audio_sample_rate, audio_content_length, audio_encoded_mime_type, video_youtube_itag, video_content_length, video_mime_type, video_duration_nanos, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW(), NOW()) RETURNING *; --- name: SetAudioUploaded :one +-- name: SetRawAudioUploaded :one UPDATE media_sets - SET audio_s3_bucket = $2, audio_s3_key = $3, audio_frames = $4, audio_s3_uploaded_at = NOW(), updated_at = NOW() + SET audio_raw_s3_bucket = $2, audio_raw_s3_key = $3, audio_frames = $4, audio_raw_s3_uploaded_at = NOW(), updated_at = NOW() + WHERE id = $1 + RETURNING *; + +-- name: SetEncodedAudioUploaded :one +UPDATE media_sets + SET audio_encoded_s3_bucket = $2, audio_encoded_s3_key = $3, audio_encoded_s3_uploaded_at = NOW(), updated_at = NOW() WHERE id = $1 RETURNING *;