Update database with encoded audio metadata

This commit is contained in:
Rob Watson 2021-11-29 14:59:05 +01:00
parent c3da27ca49
commit be42d452d6
5 changed files with 102 additions and 36 deletions

View File

@ -9,6 +9,7 @@ import (
"math" "math"
"os/exec" "os/exec"
"strconv" "strconv"
"sync"
"git.netflux.io/rob/clipper/config" "git.netflux.io/rob/clipper/config"
"git.netflux.io/rob/clipper/generated/store" "git.netflux.io/rob/clipper/generated/store"
@ -87,14 +88,13 @@ type audioGetterState struct {
} }
func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaSet store.MediaSet) { func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaSet store.MediaSet) {
defer s.Close()
defer r.Close()
streamWithProgress := newProgressReader(r, "audio", mediaSet.AudioContentLength, s.logger) streamWithProgress := newProgressReader(r, "audio", mediaSet.AudioContentLength, s.logger)
pr, pw := io.Pipe()
teeReader := io.TeeReader(streamWithProgress, pw)
var stdErr bytes.Buffer var stdErr bytes.Buffer
cmd := exec.CommandContext(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") cmd := exec.CommandContext(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
cmd.Stdin = streamWithProgress cmd.Stdin = teeReader
cmd.Stderr = &stdErr cmd.Stderr = &stdErr
stdout, err := cmd.StdoutPipe() stdout, err := cmd.StdoutPipe()
if err != nil { if err != nil {
@ -102,35 +102,78 @@ func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaS
return return
} }
if err = cmd.Start(); err != nil { if err = cmd.Start(); err != nil {
s.CloseWithError(fmt.Errorf("error starting command: %v", err)) s.CloseWithError(fmt.Errorf("error starting command: %v, output: %s", err, stdErr.String()))
return return
} }
var wg sync.WaitGroup
wg.Add(2)
// Upload the encoded audio.
go func() {
defer wg.Done()
// TODO: use mediaSet func to fetch s3Key
s3Key := fmt.Sprintf("media_sets/%s/audio.opus", mediaSet.ID)
uploader := newMultipartUploader(s.s3API, s.logger)
_, encErr := uploader.Upload(ctx, pr, s.config.S3Bucket, s3Key, "audio/opus")
if encErr != nil {
s.CloseWithError(fmt.Errorf("error uploading encoded audio: %v", encErr))
return
}
if _, err = s.store.SetEncodedAudioUploaded(ctx, store.SetEncodedAudioUploadedParams{
ID: mediaSet.ID,
AudioEncodedS3Bucket: sqlString(s.config.S3Bucket),
AudioEncodedS3Key: sqlString(s3Key),
}); err != nil {
s.CloseWithError(fmt.Errorf("error setting encoded audio uploaded: %v", err))
}
}()
// Upload the raw audio.
go func() {
defer wg.Done()
// TODO: use mediaSet func to fetch s3Key // TODO: use mediaSet func to fetch s3Key
s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID)
teeReader := io.TeeReader(stdout, s) teeReader := io.TeeReader(stdout, s)
uploader := newMultipartUploader(s.s3API, s.logger) uploader := newMultipartUploader(s.s3API, s.logger)
bytesUploaded, err := uploader.Upload(ctx, teeReader, s.config.S3Bucket, s3Key, rawAudioMimeType) bytesUploaded, rawErr := uploader.Upload(ctx, teeReader, s.config.S3Bucket, s3Key, rawAudioMimeType)
if err != nil { if rawErr != nil {
s.CloseWithError(fmt.Errorf("error uploading audio: %v", err)) s.CloseWithError(fmt.Errorf("error uploading raw audio: %v", rawErr))
return return
} }
if _, err = s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{ if _, err = s.store.SetRawAudioUploaded(ctx, store.SetRawAudioUploadedParams{
ID: mediaSet.ID, ID: mediaSet.ID,
AudioS3Bucket: sqlString(s.config.S3Bucket), AudioRawS3Bucket: sqlString(s.config.S3Bucket),
AudioS3Key: sqlString(s3Key), AudioRawS3Key: sqlString(s3Key),
AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(mediaSet.AudioChannels)), AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(mediaSet.AudioChannels)),
}); err != nil { }); err != nil {
s.CloseWithError(fmt.Errorf("error setting audio uploaded: %v", err)) s.CloseWithError(fmt.Errorf("error setting raw audio uploaded: %v", err))
}
}()
if err = cmd.Wait(); err != nil {
// TODO: cancel other goroutines (e.g. video fetch) if an error occurs here.
s.CloseWithError(fmt.Errorf("error waiting for command: %v, output: %s", err, stdErr.String()))
return return
} }
if err = cmd.Wait(); err != nil { // Close the pipe sending encoded audio to be uploaded, this ensures the
s.CloseWithError(fmt.Errorf("error waiting for command: %v", err)) // uploader reading from the pipe will receive io.EOF and complete
return // successfully.
} pw.Close()
// Wait for the uploaders to complete.
wg.Wait()
// Finally, close the progress reader so that the subsequent call to Next()
// returns io.EOF.
s.Close()
} }
// getAudioProgressReader accepts a byte stream containing little endian // getAudioProgressReader accepts a byte stream containing little endian

View File

@ -70,7 +70,8 @@ type Store interface {
GetMediaSet(context.Context, uuid.UUID) (store.MediaSet, error) GetMediaSet(context.Context, uuid.UUID) (store.MediaSet, error)
GetMediaSetByYoutubeID(context.Context, string) (store.MediaSet, error) GetMediaSetByYoutubeID(context.Context, string) (store.MediaSet, error)
CreateMediaSet(context.Context, store.CreateMediaSetParams) (store.MediaSet, error) CreateMediaSet(context.Context, store.CreateMediaSetParams) (store.MediaSet, error)
SetAudioUploaded(context.Context, store.SetAudioUploadedParams) (store.MediaSet, error) SetRawAudioUploaded(context.Context, store.SetRawAudioUploadedParams) (store.MediaSet, error)
SetEncodedAudioUploaded(context.Context, store.SetEncodedAudioUploadedParams) (store.MediaSet, error)
SetVideoUploaded(context.Context, store.SetVideoUploadedParams) (store.MediaSet, error) SetVideoUploaded(context.Context, store.SetVideoUploadedParams) (store.MediaSet, error)
SetVideoThumbnailUploaded(context.Context, store.SetVideoThumbnailUploadedParams) (store.MediaSet, error) SetVideoThumbnailUploaded(context.Context, store.SetVideoThumbnailUploadedParams) (store.MediaSet, error)
} }
@ -168,7 +169,7 @@ func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string)
AudioChannels: int32(audioMetadata.Channels), AudioChannels: int32(audioMetadata.Channels),
AudioFramesApprox: audioMetadata.ApproxFrames, AudioFramesApprox: audioMetadata.ApproxFrames,
AudioSampleRate: int32(audioMetadata.SampleRate), AudioSampleRate: int32(audioMetadata.SampleRate),
AudioMimeTypeEncoded: audioMetadata.MimeType, AudioEncodedMimeType: audioMetadata.MimeType,
AudioContentLength: audioMetadata.ContentLength, AudioContentLength: audioMetadata.ContentLength,
VideoYoutubeItag: int32(videoMetadata.YoutubeItag), VideoYoutubeItag: int32(videoMetadata.YoutubeItag),
VideoContentLength: videoMetadata.ContentLength, VideoContentLength: videoMetadata.ContentLength,
@ -208,7 +209,7 @@ func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*
ApproxFrames: int64(mediaSet.AudioFramesApprox), ApproxFrames: int64(mediaSet.AudioFramesApprox),
Frames: mediaSet.AudioFrames.Int64, Frames: mediaSet.AudioFrames.Int64,
SampleRate: int(mediaSet.AudioSampleRate), SampleRate: int(mediaSet.AudioSampleRate),
MimeType: mediaSet.AudioMimeTypeEncoded, MimeType: mediaSet.AudioEncodedMimeType,
}, },
Video: Video{ Video: Video{
YoutubeItag: int(mediaSet.VideoYoutubeItag), YoutubeItag: int(mediaSet.VideoYoutubeItag),
@ -326,7 +327,7 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in
return nil, fmt.Errorf("error getting media set: %v", err) return nil, fmt.Errorf("error getting media set: %v", err)
} }
if mediaSet.AudioS3UploadedAt.Valid { if mediaSet.AudioRawS3UploadedAt.Valid {
return s.getAudioFromS3(ctx, mediaSet, numBins) return s.getAudioFromS3(ctx, mediaSet, numBins)
} }
@ -340,8 +341,8 @@ func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet stor
func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) {
input := s3.GetObjectInput{ input := s3.GetObjectInput{
Bucket: aws.String(mediaSet.AudioS3Bucket.String), Bucket: aws.String(mediaSet.AudioRawS3Bucket.String),
Key: aws.String(mediaSet.AudioS3Key.String), Key: aws.String(mediaSet.AudioRawS3Key.String),
} }
output, err := s.s3.GetObject(ctx, &input) output, err := s.s3.GetObject(ctx, &input)
if err != nil { if err != nil {
@ -423,8 +424,8 @@ func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, sta
endFrame*int64(mediaSet.AudioChannels)*SizeOfInt16, endFrame*int64(mediaSet.AudioChannels)*SizeOfInt16,
) )
input := s3.GetObjectInput{ input := s3.GetObjectInput{
Bucket: aws.String(mediaSet.AudioS3Bucket.String), Bucket: aws.String(mediaSet.AudioRawS3Bucket.String),
Key: aws.String(mediaSet.AudioS3Key.String), Key: aws.String(mediaSet.AudioRawS3Key.String),
Range: aws.String(byteRange), Range: aws.String(byteRange),
} }
output, err := s.s3.GetObject(ctx, &input) output, err := s.s3.GetObject(ctx, &input)
@ -490,7 +491,7 @@ func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, sta
} }
if bytesRead < bytesExpected { if bytesRead < bytesExpected {
s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioS3Key.String).Warn("short read from S3") s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioRawS3Key.String).Warn("short read from S3")
} }
return peaks, nil return peaks, nil

View File

@ -0,0 +1,8 @@
ALTER TABLE media_sets DROP COLUMN audio_encoded_s3_bucket;
ALTER TABLE media_sets DROP COLUMN audio_encoded_s3_key;
ALTER TABLE media_sets DROP COLUMN audio_encoded_s3_uploaded_at;
ALTER TABLE media_sets RENAME COLUMN audio_raw_s3_bucket TO audio_s3_bucket;
ALTER TABLE media_sets RENAME COLUMN audio_raw_s3_key TO audio_s3_key;
ALTER TABLE media_sets RENAME COLUMN audio_raw_s3_uploaded_at TO audio_s3_uploaded_at;
ALTER TABLE media_sets RENAME COLUMN audio_encoded_mime_type TO audio_mime_type_encoded;

View File

@ -0,0 +1,8 @@
ALTER TABLE media_sets RENAME COLUMN audio_s3_bucket TO audio_raw_s3_bucket;
ALTER TABLE media_sets RENAME COLUMN audio_s3_key TO audio_raw_s3_key;
ALTER TABLE media_sets RENAME COLUMN audio_s3_uploaded_at TO audio_raw_s3_uploaded_at;
ALTER TABLE media_sets RENAME COLUMN audio_mime_type_encoded TO audio_encoded_mime_type;
ALTER TABLE media_sets ADD COLUMN audio_encoded_s3_bucket CHARACTER VARYING(255);
ALTER TABLE media_sets ADD COLUMN audio_encoded_s3_key CHARACTER VARYING(255);
ALTER TABLE media_sets ADD COLUMN audio_encoded_s3_uploaded_at TIMESTAMP WITH TIME ZONE;

View File

@ -5,13 +5,19 @@ SELECT * FROM media_sets WHERE id = $1;
SELECT * FROM media_sets WHERE youtube_id = $1; SELECT * FROM media_sets WHERE youtube_id = $1;
-- name: CreateMediaSet :one -- name: CreateMediaSet :one
INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_frames_approx, audio_sample_rate, audio_content_length, audio_mime_type_encoded, video_youtube_itag, video_content_length, video_mime_type, video_duration_nanos, created_at, updated_at) INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_frames_approx, audio_sample_rate, audio_content_length, audio_encoded_mime_type, video_youtube_itag, video_content_length, video_mime_type, video_duration_nanos, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW(), NOW()) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW(), NOW())
RETURNING *; RETURNING *;
-- name: SetAudioUploaded :one -- name: SetRawAudioUploaded :one
UPDATE media_sets UPDATE media_sets
SET audio_s3_bucket = $2, audio_s3_key = $3, audio_frames = $4, audio_s3_uploaded_at = NOW(), updated_at = NOW() SET audio_raw_s3_bucket = $2, audio_raw_s3_key = $3, audio_frames = $4, audio_raw_s3_uploaded_at = NOW(), updated_at = NOW()
WHERE id = $1
RETURNING *;
-- name: SetEncodedAudioUploaded :one
UPDATE media_sets
SET audio_encoded_s3_bucket = $2, audio_encoded_s3_key = $3, audio_encoded_s3_uploaded_at = NOW(), updated_at = NOW()
WHERE id = $1 WHERE id = $1
RETURNING *; RETURNING *;