Read peaks from S3

This commit is contained in:
Rob Watson 2021-11-02 19:03:26 +01:00
parent 5cbcfe22cf
commit d117419b0c
6 changed files with 109 additions and 22 deletions

View File

@ -11,7 +11,6 @@ import (
"git.netflux.io/rob/clipper/media" "git.netflux.io/rob/clipper/media"
"github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid"
"github.com/kkdai/youtube/v2" "github.com/kkdai/youtube/v2"
_ "github.com/lib/pq" _ "github.com/lib/pq"
) )
@ -50,8 +49,7 @@ func main() {
} }
// Create a progressReader // Create a progressReader
id := uuid.MustParse(mediaSet.ID) progressReader, err := mediaSetService.GetAudio(ctx, mediaSet.ID, 2_000)
progressReader, err := mediaSetService.GetAudio(ctx, id, 100)
if err != nil { if err != nil {
log.Fatalf("error calling fetch service: %v", err) log.Fatalf("error calling fetch service: %v", err)
} }

View File

@ -9,6 +9,8 @@ import (
"log" "log"
"os" "os"
"time" "time"
"github.com/google/uuid"
) )
const SizeOfInt16 = 2 const SizeOfInt16 = 2
@ -38,10 +40,10 @@ type Video struct {
// MediaSet represents the media and metadata associated with a single media // MediaSet represents the media and metadata associated with a single media
// resource (for example, a YouTube video). // resource (for example, a YouTube video).
type MediaSet struct { type MediaSet struct {
Audio Audio `json:"audio"` Audio Audio `json:"audio"`
Video Video `json:"video"` Video Video `json:"video"`
ID string `json:"id"` ID uuid.UUID `json:"id"`
YoutubeID string `json:"youtube_id"` YoutubeID string `json:"youtube_id"`
exists bool `json:"exists"` exists bool `json:"exists"`
} }

View File

@ -11,6 +11,7 @@ import (
"time" "time"
"git.netflux.io/rob/clipper/generated/store" "git.netflux.io/rob/clipper/generated/store"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/uuid" "github.com/google/uuid"
youtubev2 "github.com/kkdai/youtube/v2" youtubev2 "github.com/kkdai/youtube/v2"
@ -56,6 +57,7 @@ type Store interface {
// S3Client wraps the AWS S3 service client. // S3Client wraps the AWS S3 service client.
type S3Client interface { type S3Client interface {
GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error)
CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
@ -141,7 +143,7 @@ func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string)
} }
return &MediaSet{ return &MediaSet{
ID: mediaSet.ID.String(), ID: mediaSet.ID,
YoutubeID: youtubeID, YoutubeID: youtubeID,
Audio: audioMetadata, Audio: audioMetadata,
Video: videoMetadata, Video: videoMetadata,
@ -159,6 +161,8 @@ func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*
} }
return &MediaSet{ return &MediaSet{
ID: mediaSet.ID,
YoutubeID: mediaSet.YoutubeID,
Audio: Audio{ Audio: Audio{
YoutubeItag: int(mediaSet.AudioYoutubeItag), YoutubeItag: int(mediaSet.AudioYoutubeItag),
Bytes: 0, // DEPRECATED Bytes: 0, // DEPRECATED
@ -176,7 +180,6 @@ func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*
ThumbnailWidth: 0, // ?? ThumbnailWidth: 0, // ??
ThumbnailHeight: 0, // ?? ThumbnailHeight: 0, // ??
}, },
YoutubeID: mediaSet.YoutubeID,
}, nil }, nil
} }
@ -237,6 +240,82 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in
return nil, fmt.Errorf("error getting media set: %v", err) return nil, fmt.Errorf("error getting media set: %v", err)
} }
if mediaSet.AudioS3UploadedAt.Valid {
return s.getAudioFromS3(ctx, mediaSet, numBins)
}
return s.getAudioFromYoutube(ctx, mediaSet, numBins)
}
func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) {
input := s3.GetObjectInput{
Bucket: aws.String(mediaSet.AudioS3Bucket.String),
Key: aws.String(mediaSet.AudioS3Key.String),
}
output, err := s.s3.GetObject(ctx, &input)
if err != nil {
return nil, fmt.Errorf("error getting object from s3: %v", err)
}
fetchAudioProgressReader := newGetAudioProgressReader(
int64(mediaSet.AudioFrames.Int64),
int(mediaSet.AudioChannels),
numBins,
)
state := getAudioFromS3State{
fetchAudioProgressReader: fetchAudioProgressReader,
s3Reader: output.Body,
}
go state.run(ctx)
return &state, nil
}
type getAudioFromS3State struct {
*fetchAudioProgressReader
s3Reader io.ReadCloser
}
func (s *getAudioFromS3State) run(ctx context.Context) {
done := make(chan error)
var err error
go func() {
_, copyErr := io.Copy(s, s.s3Reader)
done <- copyErr
}()
outer:
for {
select {
case <-ctx.Done():
err = ctx.Err()
break outer
case err = <-done:
break outer
}
}
if readerErr := s.s3Reader.Close(); readerErr != nil {
if err == nil {
err = readerErr
}
}
if err != nil {
log.Printf("error closing s3Reader: %v", err)
s.Abort(err)
return
}
if iterErr := s.Close(); iterErr != nil {
log.Printf("error closing progress iterator: %v", iterErr)
}
}
func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) {
video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID)
if err != nil { if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err) return nil, fmt.Errorf("error fetching video: %v", err)
@ -260,7 +339,7 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in
return nil, fmt.Errorf("error creating ffmpegreader: %v", err) return nil, fmt.Errorf("error creating ffmpegreader: %v", err)
} }
s3Key := fmt.Sprintf("media_sets/%s/audio.webm", id) s3Key := fmt.Sprintf("media_sets/%s/audio.webm", mediaSet.ID)
uploader, err := newMultipartUploadWriter( uploader, err := newMultipartUploadWriter(
ctx, ctx,
s.s3, s.s3,
@ -275,10 +354,10 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in
fetchAudioProgressReader := newGetAudioProgressReader( fetchAudioProgressReader := newGetAudioProgressReader(
int64(mediaSet.AudioFramesApprox), int64(mediaSet.AudioFramesApprox),
format.AudioChannels, format.AudioChannels,
100, numBins,
) )
state := getAudioState{ state := getAudioFromYoutubeState{
fetchAudioProgressReader: fetchAudioProgressReader, fetchAudioProgressReader: fetchAudioProgressReader,
ffmpegReader: ffmpegReader, ffmpegReader: ffmpegReader,
uploader: uploader, uploader: uploader,
@ -291,7 +370,7 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in
return &state, nil return &state, nil
} }
type getAudioState struct { type getAudioFromYoutubeState struct {
*fetchAudioProgressReader *fetchAudioProgressReader
ffmpegReader io.ReadCloser ffmpegReader io.ReadCloser
@ -300,7 +379,7 @@ type getAudioState struct {
store Store store Store
} }
func (s *getAudioState) run(ctx context.Context) { func (s *getAudioFromYoutubeState) run(ctx context.Context) {
mw := io.MultiWriter(s, s.uploader) mw := io.MultiWriter(s, s.uploader)
done := make(chan error) done := make(chan error)
var err error var err error
@ -327,9 +406,12 @@ outer:
} }
} }
var framesUploaded int64
if err == nil { if err == nil {
if uploaderErr := s.uploader.Complete(); uploaderErr != nil { if bytesUploaded, uploaderErr := s.uploader.Complete(); uploaderErr != nil {
err = uploaderErr err = uploaderErr
} else {
framesUploaded = bytesUploaded / int64(s.channels) / SizeOfInt16
} }
} }
@ -337,6 +419,7 @@ outer:
_, updateErr := s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{ _, updateErr := s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{
AudioS3Bucket: sqlString(s.s3Bucket), AudioS3Bucket: sqlString(s.s3Bucket),
AudioS3Key: sqlString(s.s3Key), AudioS3Key: sqlString(s.s3Key),
AudioFrames: sqlInt64(framesUploaded),
}) })
if updateErr != nil { if updateErr != nil {
@ -356,10 +439,14 @@ outer:
} }
if iterErr := s.Close(); iterErr != nil { if iterErr := s.Close(); iterErr != nil {
log.Printf("error closing peak iterator: %v", iterErr) log.Printf("error closing progress iterator: %v", iterErr)
} }
} }
func sqlString(s string) sql.NullString { func sqlString(s string) sql.NullString {
return sql.NullString{String: s, Valid: true} return sql.NullString{String: s, Valid: true}
} }
func sqlInt64(i int64) sql.NullInt64 {
return sql.NullInt64{Int64: i, Valid: true}
}

View File

@ -120,9 +120,9 @@ func (u *multipartUploadWriter) Abort(ctx context.Context) error {
// Complete completes the upload process, finalizing the upload on S3. // Complete completes the upload process, finalizing the upload on S3.
// If no parts have been successfully uploaded, then Abort() will be called // If no parts have been successfully uploaded, then Abort() will be called
// transparently. // transparently.
func (u *multipartUploadWriter) Complete() error { func (u *multipartUploadWriter) Complete() (int64, error) {
if len(u.completedParts) == 0 { if len(u.completedParts) == 0 {
return u.Abort(u.ctx) return 0, u.Abort(u.ctx)
} }
input := s3.CompleteMultipartUploadInput{ input := s3.CompleteMultipartUploadInput{
@ -136,9 +136,9 @@ func (u *multipartUploadWriter) Complete() error {
_, err := u.s3.CompleteMultipartUpload(u.ctx, &input) _, err := u.s3.CompleteMultipartUpload(u.ctx, &input)
if err != nil { if err != nil {
return fmt.Errorf("error completing upload: %v", err) return 0, fmt.Errorf("error completing upload: %v", err)
} }
log.Printf("completed upload, key = %s, bytesUploaded = %d", u.key, u.bytesUploaded) log.Printf("completed upload, key = %s, bytesUploaded = %d", u.key, u.bytesUploaded)
return nil return u.bytesUploaded, nil
} }

View File

@ -65,7 +65,7 @@ func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet
} }
result := pbMediaSet.MediaSet{ result := pbMediaSet.MediaSet{
Id: mediaSet.ID, Id: mediaSet.ID.String(),
YoutubeId: mediaSet.YoutubeID, YoutubeId: mediaSet.YoutubeID,
AudioChannels: int32(mediaSet.Audio.Channels), AudioChannels: int32(mediaSet.Audio.Channels),
AudioFrames: mediaSet.Audio.Frames, AudioFrames: mediaSet.Audio.Frames,

View File

@ -11,5 +11,5 @@ INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_fr
-- name: SetAudioUploaded :one -- name: SetAudioUploaded :one
UPDATE media_sets UPDATE media_sets
SET audio_s3_bucket = $1, audio_s3_key = $2, audio_s3_uploaded_at = NOW(), updated_at = NOW() SET audio_s3_bucket = $1, audio_s3_key = $2, audio_frames = $3, audio_s3_uploaded_at = NOW(), updated_at = NOW()
RETURNING *; RETURNING *;