package media import ( "bytes" "context" "database/sql" "encoding/binary" "errors" "fmt" "io" "log" "strconv" "time" "git.netflux.io/rob/clipper/generated/store" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/google/uuid" youtubev2 "github.com/kkdai/youtube/v2" ) const s3Bucket = "clipper-development" const ( rawAudioCodec = "pcm_s16le" rawAudioFormat = "s16le" rawAudioSampleRate = 48_000 rawAudioMimeType = "audio/raw" ) const ( thumbnailWidth = 177 // 16:9 thumbnailHeight = 100 // " ) // progressReader is a reader that prints progress logs as it reads. type progressReader struct { io.Reader label string total, exp int } func (pw *progressReader) Read(p []byte) (int, error) { n, err := pw.Reader.Read(p) pw.total += n log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0) return n, err } // Store wraps a database store. type Store interface { GetMediaSet(ctx context.Context, id uuid.UUID) (store.MediaSet, error) GetMediaSetByYoutubeID(ctx context.Context, youtubeID string) (store.MediaSet, error) CreateMediaSet(ctx context.Context, arg store.CreateMediaSetParams) (store.MediaSet, error) SetAudioUploaded(ctx context.Context, arg store.SetAudioUploadedParams) (store.MediaSet, error) } // S3Client wraps the AWS S3 service client. type S3Client interface { GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error) CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) } // YoutubeClient wraps the youtube.Client client. type YoutubeClient interface { GetVideoContext(context.Context, string) (*youtubev2.Video, error) GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error) } // MediaSetService exposes logical flows handling MediaSets. type MediaSetService struct { store Store youtube YoutubeClient s3 S3Client } func NewMediaSetService(store Store, youtubeClient YoutubeClient, s3Client S3Client) *MediaSetService { return &MediaSetService{ store: store, youtube: youtubeClient, s3: s3Client, } } // Get fetches the metadata for a given MediaSet source. func (s *MediaSetService) Get(ctx context.Context, youtubeID string) (*MediaSet, error) { var ( mediaSet *MediaSet err error ) mediaSet, err = s.findMediaSet(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error getting existing media set: %v", err) } if mediaSet == nil { mediaSet, err = s.createMediaSet(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error getting new media set: %v", err) } } return mediaSet, nil } func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) { video, err := s.youtube.GetVideoContext(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } if len(video.Formats) == 0 { return nil, errors.New("no format available") } audioMetadata, err := s.fetchAudioMetadata(ctx, video) if err != nil { return nil, fmt.Errorf("error fetching audio metadata: %v", err) } videoMetadata, err := s.fetchVideoMetadata(ctx, video) if err != nil { return nil, fmt.Errorf("error fetching video metadata: %v", err) } params := store.CreateMediaSetParams{ YoutubeID: youtubeID, AudioYoutubeItag: int32(audioMetadata.YoutubeItag), AudioChannels: int32(audioMetadata.Channels), AudioFramesApprox: audioMetadata.ApproxFrames, AudioSampleRate: int32(audioMetadata.SampleRate), AudioMimeTypeEncoded: audioMetadata.MimeType, VideoYoutubeItag: int32(videoMetadata.YoutubeItag), VideoMimeType: videoMetadata.MimeType, VideoDurationNanos: videoMetadata.Duration.Nanoseconds(), } mediaSet, err := s.store.CreateMediaSet(ctx, params) if err != nil { return nil, fmt.Errorf("error creating media set in store: %v", err) } return &MediaSet{ ID: mediaSet.ID, YoutubeID: youtubeID, Audio: audioMetadata, Video: videoMetadata, }, nil } // findMediaSet fetches a record from the database, returning (nil, nil) if it does not exist. func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) { mediaSet, err := s.store.GetMediaSetByYoutubeID(ctx, youtubeID) if err != nil { if err == sql.ErrNoRows { return nil, nil } return nil, fmt.Errorf("error getting existing media set: %v", err) } return &MediaSet{ ID: mediaSet.ID, YoutubeID: mediaSet.YoutubeID, Audio: Audio{ YoutubeItag: int(mediaSet.AudioYoutubeItag), Bytes: 0, // DEPRECATED Channels: int(mediaSet.AudioChannels), ApproxFrames: int64(mediaSet.AudioFramesApprox), Frames: mediaSet.AudioFrames.Int64, SampleRate: int(mediaSet.AudioSampleRate), MimeType: mediaSet.AudioMimeTypeEncoded, }, Video: Video{ YoutubeItag: int(mediaSet.VideoYoutubeItag), Bytes: 0, // DEPRECATED? Duration: time.Duration(mediaSet.VideoDurationNanos), MimeType: mediaSet.VideoMimeType, ThumbnailWidth: 0, // ?? ThumbnailHeight: 0, // ?? }, }, nil } func (s *MediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) { formats := FilterYoutubeVideo(video.Formats) if len(video.Formats) == 0 { return Video{}, errors.New("no format available") } format := formats[0] durationMsecs, err := strconv.Atoi(format.ApproxDurationMs) if err != nil { return Video{}, fmt.Errorf("could not parse video duration: %s", err) } return Video{ YoutubeItag: format.ItagNo, MimeType: format.MimeType, Bytes: format.ContentLength, ThumbnailWidth: thumbnailWidth, ThumbnailHeight: thumbnailHeight, Duration: time.Duration(durationMsecs) * time.Millisecond, }, nil } func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) { formats := FilterYoutubeAudio(video.Formats) if len(video.Formats) == 0 { return Audio{}, errors.New("no format available") } format := formats[0] sampleRate, err := strconv.Atoi(format.AudioSampleRate) if err != nil { return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate) } approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs) if err != nil { return Audio{}, fmt.Errorf("could not parse audio duration: %s", err) } approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond approxFrames := int64(approxDuration/time.Second) * int64(sampleRate) return Audio{ MimeType: format.MimeType, YoutubeItag: format.ItagNo, ApproxFrames: approxFrames, Channels: format.AudioChannels, SampleRate: sampleRate, }, nil } // GetAudio fetches the audio part of a MediaSet. func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins int) (GetAudioProgressReader, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) } if mediaSet.AudioS3UploadedAt.Valid { return s.getAudioFromS3(ctx, mediaSet, numBins) } return s.getAudioFromYoutube(ctx, mediaSet, numBins) } func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { input := s3.GetObjectInput{ Bucket: aws.String(mediaSet.AudioS3Bucket.String), Key: aws.String(mediaSet.AudioS3Key.String), } output, err := s.s3.GetObject(ctx, &input) if err != nil { return nil, fmt.Errorf("error getting object from s3: %v", err) } fetchAudioProgressReader := newGetAudioProgressReader( binary.BigEndian, int64(mediaSet.AudioFrames.Int64), int(mediaSet.AudioChannels), numBins, ) state := getAudioFromS3State{ fetchAudioProgressReader: fetchAudioProgressReader, s3Reader: NewModuloBufReader(output.Body, int(mediaSet.AudioChannels)*SizeOfInt16), } go state.run(ctx) return &state, nil } type getAudioFromS3State struct { *fetchAudioProgressReader s3Reader io.ReadCloser } func (s *getAudioFromS3State) run(ctx context.Context) { done := make(chan error) var err error go func() { _, copyErr := io.Copy(s, s.s3Reader) done <- copyErr }() outer: for { select { case <-ctx.Done(): err = ctx.Err() break outer case err = <-done: break outer } } if readerErr := s.s3Reader.Close(); readerErr != nil { if err == nil { err = readerErr } } if err != nil { log.Printf("error closing s3Reader: %v", err) s.Abort(err) return } if iterErr := s.Close(); iterErr != nil { log.Printf("error closing progress iterator: %v", iterErr) } } func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag)) if format == nil { return nil, fmt.Errorf("error finding itag: %v", err) } stream, _, err := s.youtube.GetStreamContext(ctx, video, format) if err != nil { return nil, fmt.Errorf("error fetching stream: %v", err) } // wrap it in a progress reader progressStream := &progressReader{Reader: stream, label: "audio", exp: int(format.ContentLength)} ffmpegReader, err := newFfmpegReader(ctx, progressStream, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") if err != nil { return nil, fmt.Errorf("error creating ffmpegreader: %v", err) } s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) uploader := newMultipartUploader(s.s3) fetchAudioProgressReader := newGetAudioProgressReader( binary.LittleEndian, int64(mediaSet.AudioFramesApprox), format.AudioChannels, numBins, ) state := getAudioFromYoutubeState{ fetchAudioProgressReader: fetchAudioProgressReader, ffmpegReader: ffmpegReader, uploader: uploader, s3Bucket: s3Bucket, s3Key: s3Key, store: s.store, } go state.run(ctx, mediaSet.ID) return &state, nil } type getAudioFromYoutubeState struct { *fetchAudioProgressReader ffmpegReader *ffmpegReader uploader *multipartUploader s3Bucket, s3Key string store Store } func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID) { teeReader := io.TeeReader(s.ffmpegReader, s) bytesUploaded, err := s.uploader.Upload(ctx, teeReader, s.s3Bucket, s.s3Key, rawAudioMimeType) // If there was an error returned, the underlying ffmpegReader process may // still be active. Kill it. if err != nil { if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil { log.Printf("error cancelling ffmpegreader: %v", cancelErr) } } // Either way, we need to wait for the ffmpegReader process to exit, // and ensure there is no error. if readerErr := s.ffmpegReader.Close(); readerErr != nil { if err == nil { err = readerErr } } if err == nil { _, updateErr := s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{ ID: mediaSetID, AudioS3Bucket: sqlString(s.s3Bucket), AudioS3Key: sqlString(s.s3Key), AudioFrames: sqlInt64(bytesUploaded), }) if updateErr != nil { err = updateErr } } if err != nil { log.Printf("error uploading asynchronously: %v", err) s.Abort(err) return } if iterErr := s.Close(); iterErr != nil { log.Printf("error closing progress iterator: %v", iterErr) } } func sqlString(s string) sql.NullString { return sql.NullString{String: s, Valid: true} } func sqlInt64(i int64) sql.NullInt64 { return sql.NullInt64{Int64: i, Valid: true} } type ModuloBufReader struct { io.ReadCloser buf bytes.Buffer modSize int } func NewModuloBufReader(r io.ReadCloser, modSize int) *ModuloBufReader { return &ModuloBufReader{ReadCloser: r, modSize: modSize} } func (r *ModuloBufReader) Read(p []byte) (int, error) { // err is always io.EOF or nil nr1, _ := r.buf.Read(p) nr2, err := r.ReadCloser.Read(p[nr1:]) nr := nr1 + nr2 rem := nr % r.modSize if rem != 0 { // err is always nil _, _ = r.buf.Write(p[nr-rem:]) } return nr - rem, err }