package media import ( "context" "database/sql" "errors" "fmt" "io" "log" "strconv" "time" "git.netflux.io/rob/clipper/generated/store" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/google/uuid" youtubev2 "github.com/kkdai/youtube/v2" ) const s3Bucket = "clipper-development" const ( rawAudioCodec = "pcm_s16le" rawAudioFormat = "s16le" rawAudioSampleRate = 48_000 rawAudioMimeType = "audio/raw" ) const ( thumbnailWidth = 177 // 16:9 thumbnailHeight = 100 // " ) // progressReader is a reader that prints progress logs as it reads. type progressReader struct { io.Reader label string total, exp int } func (pw *progressReader) Read(p []byte) (int, error) { n, err := pw.Reader.Read(p) pw.total += n log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0) return n, err } // Store wraps a database store. type Store interface { GetMediaSet(ctx context.Context, id uuid.UUID) (store.MediaSet, error) GetMediaSetByYoutubeID(ctx context.Context, youtubeID string) (store.MediaSet, error) CreateMediaSet(ctx context.Context, arg store.CreateMediaSetParams) (store.MediaSet, error) SetAudioUploaded(ctx context.Context, arg store.SetAudioUploadedParams) (store.MediaSet, error) } // S3Client wraps the AWS S3 service client. type S3Client interface { CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) } // YoutubeClient wraps the youtube.Client client. type YoutubeClient interface { GetVideoContext(context.Context, string) (*youtubev2.Video, error) GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error) } // MediaSetService exposes logical flows handling MediaSets. type MediaSetService struct { store Store youtube YoutubeClient s3 S3Client } func NewMediaSetService(store Store, youtubeClient YoutubeClient, s3Client S3Client) *MediaSetService { return &MediaSetService{ store: store, youtube: youtubeClient, s3: s3Client, } } // Get fetches the metadata for a given MediaSet source. func (s *MediaSetService) Get(ctx context.Context, youtubeID string) (*MediaSet, error) { var ( mediaSet *MediaSet err error ) mediaSet, err = s.findMediaSet(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error getting existing media set: %v", err) } if mediaSet == nil { mediaSet, err = s.createMediaSet(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error getting new media set: %v", err) } } return mediaSet, nil } func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) { video, err := s.youtube.GetVideoContext(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } if len(video.Formats) == 0 { return nil, errors.New("no format available") } audioMetadata, err := s.fetchAudioMetadata(ctx, video) if err != nil { return nil, fmt.Errorf("error fetching audio metadata: %v", err) } videoMetadata, err := s.fetchVideoMetadata(ctx, video) if err != nil { return nil, fmt.Errorf("error fetching video metadata: %v", err) } params := store.CreateMediaSetParams{ YoutubeID: youtubeID, AudioYoutubeItag: int32(audioMetadata.YoutubeItag), AudioChannels: int32(audioMetadata.Channels), AudioFramesApprox: audioMetadata.ApproxFrames, AudioSampleRate: int32(audioMetadata.SampleRate), AudioMimeTypeEncoded: audioMetadata.MimeType, VideoYoutubeItag: int32(videoMetadata.YoutubeItag), VideoMimeType: videoMetadata.MimeType, VideoDurationNanos: videoMetadata.Duration.Nanoseconds(), } mediaSet, err := s.store.CreateMediaSet(ctx, params) if err != nil { return nil, fmt.Errorf("error creating media set in store: %v", err) } return &MediaSet{ ID: mediaSet.ID.String(), YoutubeID: youtubeID, Audio: audioMetadata, Video: videoMetadata, }, nil } // findMediaSet fetches a record from the database, returning (nil, nil) if it does not exist. func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) { mediaSet, err := s.store.GetMediaSetByYoutubeID(ctx, youtubeID) if err != nil { if err == sql.ErrNoRows { return nil, nil } return nil, fmt.Errorf("error getting existing media set: %v", err) } return &MediaSet{ Audio: Audio{ YoutubeItag: int(mediaSet.AudioYoutubeItag), Bytes: 0, // DEPRECATED Channels: int(mediaSet.AudioChannels), ApproxFrames: int64(mediaSet.AudioFramesApprox), Frames: mediaSet.AudioFrames.Int64, SampleRate: int(mediaSet.AudioSampleRate), MimeType: mediaSet.AudioMimeTypeEncoded, }, Video: Video{ YoutubeItag: int(mediaSet.VideoYoutubeItag), Bytes: 0, // DEPRECATED? Duration: time.Duration(mediaSet.VideoDurationNanos), MimeType: mediaSet.VideoMimeType, ThumbnailWidth: 0, // ?? ThumbnailHeight: 0, // ?? }, YoutubeID: mediaSet.YoutubeID, }, nil } func (s *MediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) { formats := FilterYoutubeVideo(video.Formats) if len(video.Formats) == 0 { return Video{}, errors.New("no format available") } format := formats[0] durationMsecs, err := strconv.Atoi(format.ApproxDurationMs) if err != nil { return Video{}, fmt.Errorf("could not parse video duration: %s", err) } return Video{ YoutubeItag: format.ItagNo, MimeType: format.MimeType, Bytes: format.ContentLength, ThumbnailWidth: thumbnailWidth, ThumbnailHeight: thumbnailHeight, Duration: time.Duration(durationMsecs) * time.Millisecond, }, nil } func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) { formats := FilterYoutubeAudio(video.Formats) if len(video.Formats) == 0 { return Audio{}, errors.New("no format available") } format := formats[0] sampleRate, err := strconv.Atoi(format.AudioSampleRate) if err != nil { return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate) } approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs) if err != nil { return Audio{}, fmt.Errorf("could not parse audio duration: %s", err) } approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond approxFrames := int64(approxDuration/time.Second) * int64(sampleRate) return Audio{ MimeType: format.MimeType, YoutubeItag: format.ItagNo, ApproxFrames: approxFrames, Channels: format.AudioChannels, SampleRate: sampleRate, }, nil } // GetAudio fetches the audio part of a MediaSet. func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins int) (GetAudioProgressReader, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) } video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag)) if format == nil { return nil, fmt.Errorf("error finding itag: %v", err) } stream, _, err := s.youtube.GetStreamContext(ctx, video, format) if err != nil { return nil, fmt.Errorf("error fetching stream: %v", err) } // wrap it in a progress reader progressStream := &progressReader{Reader: stream, label: "audio", exp: int(format.ContentLength)} ffmpegReader, err := newFfmpegReader(ctx, progressStream, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") if err != nil { return nil, fmt.Errorf("error creating ffmpegreader: %v", err) } s3Key := fmt.Sprintf("media_sets/%s/audio.webm", id) uploader, err := newMultipartUploadWriter( ctx, s.s3, s3Bucket, s3Key, rawAudioMimeType, ) if err != nil { return nil, fmt.Errorf("error creating uploader: %v", err) } fetchAudioProgressReader := newGetAudioProgressReader( int64(mediaSet.AudioFramesApprox), format.AudioChannels, 100, ) state := getAudioState{ fetchAudioProgressReader: fetchAudioProgressReader, ffmpegReader: ffmpegReader, uploader: uploader, s3Bucket: s3Bucket, s3Key: s3Key, store: s.store, } go state.run(ctx) return &state, nil } type getAudioState struct { *fetchAudioProgressReader ffmpegReader io.ReadCloser uploader *multipartUploadWriter s3Bucket, s3Key string store Store } func (s *getAudioState) run(ctx context.Context) { mw := io.MultiWriter(s, s.uploader) done := make(chan error) var err error go func() { _, copyErr := io.Copy(mw, s.ffmpegReader) done <- copyErr }() outer: for { select { case <-ctx.Done(): err = ctx.Err() break outer case err = <-done: break outer } } if readerErr := s.ffmpegReader.Close(); readerErr != nil { if err == nil { err = readerErr } } if err == nil { if uploaderErr := s.uploader.Complete(); uploaderErr != nil { err = uploaderErr } } if err == nil { _, updateErr := s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{ AudioS3Bucket: sqlString(s.s3Bucket), AudioS3Key: sqlString(s.s3Key), }) if updateErr != nil { err = updateErr } } if err != nil { newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() if abortUploadErr := s.uploader.Abort(newCtx); abortUploadErr != nil { log.Printf("error aborting uploader: %v", abortUploadErr) } s.Abort(err) return } if iterErr := s.Close(); iterErr != nil { log.Printf("error closing peak iterator: %v", iterErr) } } func sqlString(s string) sql.NullString { return sql.NullString{String: s, Valid: true} }