package media import ( "bytes" "context" "database/sql" "encoding/binary" "errors" "fmt" "io" "net/http" "strconv" "time" "git.netflux.io/rob/clipper/config" "git.netflux.io/rob/clipper/generated/store" "github.com/aws/aws-sdk-go-v2/aws" signerv4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/google/uuid" youtubev2 "github.com/kkdai/youtube/v2" "go.uber.org/zap" ) const ( getVideoExpiresIn = time.Hour * 1 ) const ( rawAudioCodec = "pcm_s16le" rawAudioFormat = "s16le" rawAudioSampleRate = 48_000 rawAudioMimeType = "audio/raw" ) const ( thumbnailWidth = 177 // 16:9 thumbnailHeight = 100 // " ) // progressReader is a reader that prints progress logs as it reads. type progressReader struct { io.Reader label string total, exp int64 logger *zap.SugaredLogger } func newProgressReader(reader io.Reader, label string, exp int64, logger *zap.SugaredLogger) *progressReader { return &progressReader{ Reader: reader, exp: exp, logger: logger.Named(fmt.Sprintf("ProgressReader %s", label)), } } func (r *progressReader) Read(p []byte) (int, error) { n, err := r.Reader.Read(p) r.total += int64(n) r.logger.Debugf("Read %d of %d (%.02f%%) bytes from the provided reader", r.total, r.exp, (float32(r.total)/float32(r.exp))*100.0) return n, err } // Store wraps a database store. type Store interface { GetMediaSet(context.Context, uuid.UUID) (store.MediaSet, error) GetMediaSetByYoutubeID(context.Context, string) (store.MediaSet, error) CreateMediaSet(context.Context, store.CreateMediaSetParams) (store.MediaSet, error) SetAudioUploaded(context.Context, store.SetAudioUploadedParams) (store.MediaSet, error) SetVideoUploaded(context.Context, store.SetVideoUploadedParams) (store.MediaSet, error) SetVideoThumbnailUploaded(context.Context, store.SetVideoThumbnailUploadedParams) (store.MediaSet, error) } // S3API provides an API to AWS S3. type S3API struct { S3Client S3PresignClient } // S3Client wraps the AWS S3 service client. type S3Client interface { GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error) CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) } // S3PresignClient wraps the AWS S3 Presign client. type S3PresignClient interface { PresignGetObject(context.Context, *s3.GetObjectInput, ...func(*s3.PresignOptions)) (*signerv4.PresignedHTTPRequest, error) } // YoutubeClient wraps the youtube.Client client. type YoutubeClient interface { GetVideoContext(context.Context, string) (*youtubev2.Video, error) GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error) } // MediaSetService exposes logical flows handling MediaSets. type MediaSetService struct { store Store youtube YoutubeClient s3 S3API config config.Config logger *zap.SugaredLogger } func NewMediaSetService(store Store, youtubeClient YoutubeClient, s3API S3API, config config.Config, logger *zap.Logger) *MediaSetService { return &MediaSetService{ store: store, youtube: youtubeClient, s3: s3API, config: config, logger: logger.Sugar(), } } // Get fetches the metadata for a given MediaSet source. If it does not exist // in the local DB, it will attempt to create it. func (s *MediaSetService) Get(ctx context.Context, youtubeID string) (*MediaSet, error) { var ( mediaSet *MediaSet err error ) mediaSet, err = s.findMediaSet(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error getting existing media set: %v", err) } if mediaSet == nil { mediaSet, err = s.createMediaSet(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error getting new media set: %v", err) } } return mediaSet, nil } func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) { video, err := s.youtube.GetVideoContext(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } if len(video.Formats) == 0 { return nil, errors.New("no format available") } audioMetadata, err := s.fetchAudioMetadata(ctx, video) if err != nil { return nil, fmt.Errorf("error fetching audio metadata: %v", err) } videoMetadata, err := s.fetchVideoMetadata(ctx, video) if err != nil { return nil, fmt.Errorf("error fetching video metadata: %v", err) } storeParams := store.CreateMediaSetParams{ YoutubeID: youtubeID, AudioYoutubeItag: int32(audioMetadata.YoutubeItag), AudioChannels: int32(audioMetadata.Channels), AudioFramesApprox: audioMetadata.ApproxFrames, AudioSampleRate: int32(audioMetadata.SampleRate), AudioMimeTypeEncoded: audioMetadata.MimeType, VideoYoutubeItag: int32(videoMetadata.YoutubeItag), VideoMimeType: videoMetadata.MimeType, VideoDurationNanos: videoMetadata.Duration.Nanoseconds(), } mediaSet, err := s.store.CreateMediaSet(ctx, storeParams) if err != nil { return nil, fmt.Errorf("error creating media set in store: %v", err) } return &MediaSet{ ID: mediaSet.ID, YoutubeID: youtubeID, Audio: audioMetadata, Video: videoMetadata, }, nil } // findMediaSet fetches a record from the database, returning (nil, nil) if it does not exist. func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) { mediaSet, err := s.store.GetMediaSetByYoutubeID(ctx, youtubeID) if err != nil { if err == sql.ErrNoRows { return nil, nil } return nil, fmt.Errorf("error getting existing media set: %v", err) } return &MediaSet{ ID: mediaSet.ID, YoutubeID: mediaSet.YoutubeID, Audio: Audio{ YoutubeItag: int(mediaSet.AudioYoutubeItag), Bytes: 0, // DEPRECATED Channels: int(mediaSet.AudioChannels), ApproxFrames: int64(mediaSet.AudioFramesApprox), Frames: mediaSet.AudioFrames.Int64, SampleRate: int(mediaSet.AudioSampleRate), MimeType: mediaSet.AudioMimeTypeEncoded, }, Video: Video{ YoutubeItag: int(mediaSet.VideoYoutubeItag), Bytes: 0, // DEPRECATED? Duration: time.Duration(mediaSet.VideoDurationNanos), MimeType: mediaSet.VideoMimeType, ThumbnailWidth: 0, // ?? ThumbnailHeight: 0, // ?? }, }, nil } func (s *MediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) { formats := FilterYoutubeVideo(video.Formats) if len(video.Formats) == 0 { return Video{}, errors.New("no format available") } format := formats[0] durationMsecs, err := strconv.Atoi(format.ApproxDurationMs) if err != nil { return Video{}, fmt.Errorf("could not parse video duration: %s", err) } return Video{ YoutubeItag: format.ItagNo, MimeType: format.MimeType, Bytes: format.ContentLength, ThumbnailWidth: thumbnailWidth, ThumbnailHeight: thumbnailHeight, Duration: time.Duration(durationMsecs) * time.Millisecond, }, nil } func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) { formats := FilterYoutubeAudio(video.Formats) if len(video.Formats) == 0 { return Audio{}, errors.New("no format available") } format := formats[0] sampleRate, err := strconv.Atoi(format.AudioSampleRate) if err != nil { return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate) } approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs) if err != nil { return Audio{}, fmt.Errorf("could not parse audio duration: %s", err) } approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond approxFrames := int64(approxDuration/time.Second) * int64(sampleRate) return Audio{ MimeType: format.MimeType, YoutubeItag: format.ItagNo, ApproxFrames: approxFrames, Channels: format.AudioChannels, SampleRate: sampleRate, }, nil } // GetVideo fetches the video part of a MediaSet. func (s *MediaSetService) GetVideo(ctx context.Context, id uuid.UUID) (GetVideoProgressReader, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) } // TODO: use mediaSet func to fetch s3Key s3Key := fmt.Sprintf("media_sets/%s/video.mp4", mediaSet.ID) if mediaSet.VideoS3UploadedAt.Valid { input := s3.GetObjectInput{Bucket: aws.String(s.config.S3Bucket), Key: aws.String(s3Key)} request, signErr := s.s3.PresignGetObject(ctx, &input, s3.WithPresignExpires(getVideoExpiresIn)) if signErr != nil { return nil, fmt.Errorf("error generating presigned URL: %v", signErr) } videoGetter := videoGetterDownloaded(request.URL) return &videoGetter, nil } video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } format := video.Formats.FindByItag(int(mediaSet.VideoYoutubeItag)) if format == nil { return nil, fmt.Errorf("error finding itag: %v", err) } stream, _, err := s.youtube.GetStreamContext(ctx, video, format) if err != nil { return nil, fmt.Errorf("error fetching stream: %v", err) } videoGetter := newVideoGetter(s.s3, s.store, s.logger) return videoGetter.GetVideo( ctx, stream, format.ContentLength, mediaSet.ID, s.config.S3Bucket, s3Key, format.MimeType, ) } type GetVideoProgressReader interface { // Next returns the next video progress status. When the stream has finished, // a valid GetVideoProgress value will be returned with io.EOF. Next() (GetVideoProgress, error) } // GetAudio fetches the audio part of a MediaSet. func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins int) (GetAudioProgressReader, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) } if mediaSet.AudioS3UploadedAt.Valid { return s.getAudioFromS3(ctx, mediaSet, numBins) } return s.getAudioFromYoutube(ctx, mediaSet, numBins) } func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { input := s3.GetObjectInput{ Bucket: aws.String(mediaSet.AudioS3Bucket.String), Key: aws.String(mediaSet.AudioS3Key.String), } output, err := s.s3.GetObject(ctx, &input) if err != nil { return nil, fmt.Errorf("error getting object from s3: %v", err) } getAudioProgressReader, err := newGetAudioProgressReader( int64(mediaSet.AudioFrames.Int64), int(mediaSet.AudioChannels), numBins, ) if err != nil { return nil, fmt.Errorf("error creating audio reader: %v", err) } state := getAudioFromS3State{ getAudioProgressReader: getAudioProgressReader, s3Reader: NewModuloBufReader(output.Body, int(mediaSet.AudioChannels)*SizeOfInt16), logger: s.logger, } go state.run(ctx) return &state, nil } type getAudioFromS3State struct { *getAudioProgressReader s3Reader io.ReadCloser logger *zap.SugaredLogger } func (s *getAudioFromS3State) run(ctx context.Context) { done := make(chan error) var err error go func() { _, copyErr := io.Copy(s, s.s3Reader) done <- copyErr }() outer: for { select { case <-ctx.Done(): err = ctx.Err() break outer case err = <-done: break outer } } if readerErr := s.s3Reader.Close(); readerErr != nil { if err == nil { err = readerErr } } if err != nil { s.logger.Errorf("getAudioFromS3State: error closing s3 reader: %v", err) s.Abort(err) return } if iterErr := s.Close(); iterErr != nil { s.logger.Errorf("getAudioFromS3State: error closing progress iterator: %v", iterErr) } } func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag)) if format == nil { return nil, fmt.Errorf("error finding itag: %v", err) } stream, _, err := s.youtube.GetStreamContext(ctx, video, format) if err != nil { return nil, fmt.Errorf("error fetching stream: %v", err) } streamWithProgress := newProgressReader(stream, "audio", format.ContentLength, s.logger) ffmpegReader, err := newFfmpegReader(ctx, streamWithProgress, "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") if err != nil { return nil, fmt.Errorf("error creating ffmpegreader: %v", err) } // TODO: use mediaSet func to fetch s3Key s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) uploader := newMultipartUploader(s.s3, s.logger) getAudioProgressReader, err := newGetAudioProgressReader( int64(mediaSet.AudioFramesApprox), format.AudioChannels, numBins, ) if err != nil { return nil, fmt.Errorf("error creating audio reader: %v", err) } state := getAudioFromYoutubeState{ getAudioProgressReader: getAudioProgressReader, ffmpegReader: ffmpegReader, uploader: uploader, s3Bucket: s.config.S3Bucket, s3Key: s3Key, store: s.store, channels: format.AudioChannels, logger: s.logger, } go state.run(ctx, mediaSet.ID) return &state, nil } type getAudioFromYoutubeState struct { *getAudioProgressReader ffmpegReader *ffmpegReader uploader *multipartUploader s3Bucket, s3Key string store Store channels int logger *zap.SugaredLogger } func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID) { teeReader := io.TeeReader(s.ffmpegReader, s) bytesUploaded, err := s.uploader.Upload(ctx, teeReader, s.s3Bucket, s.s3Key, rawAudioMimeType) // If there was an error returned, the underlying ffmpegReader process may // still be active. Kill it. if err != nil { if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil { s.logger.Errorf("getAudioFromYoutubeState: error cancelling ffmpegreader: %v", cancelErr) } } // Either way, we need to wait for the ffmpegReader process to exit, // and ensure there is no error. if readerErr := s.ffmpegReader.Close(); readerErr != nil { if err == nil { err = readerErr } } if err == nil { _, updateErr := s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{ ID: mediaSetID, AudioS3Bucket: sqlString(s.s3Bucket), AudioS3Key: sqlString(s.s3Key), AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(s.channels)), }) if updateErr != nil { err = updateErr } } if err != nil { s.logger.Errorf("getAudioFromYoutubeState: error uploading asynchronously: %v", err) s.Abort(err) return } if iterErr := s.Close(); iterErr != nil { s.logger.Errorf("getAudioFromYoutubeState: error closing progress iterator: %v", iterErr) } } func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, numBins int) ([]int16, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) } byteRange := fmt.Sprintf( "bytes=%d-%d", startFrame*int64(mediaSet.AudioChannels)*SizeOfInt16, endFrame*int64(mediaSet.AudioChannels)*SizeOfInt16, ) input := s3.GetObjectInput{ Bucket: aws.String(mediaSet.AudioS3Bucket.String), Key: aws.String(mediaSet.AudioS3Key.String), Range: aws.String(byteRange), } output, err := s.s3.GetObject(ctx, &input) if err != nil { return nil, fmt.Errorf("error getting object from s3: %v", err) } defer output.Body.Close() const readBufSizeBytes = 8_192 channels := int(mediaSet.AudioChannels) modReader := NewModuloBufReader(output.Body, channels*SizeOfInt16) readBuf := make([]byte, readBufSizeBytes) peaks := make([]int16, channels*numBins) totalFrames := endFrame - startFrame framesPerBin := totalFrames / int64(numBins) sampleBuf := make([]int16, readBufSizeBytes/SizeOfInt16) bytesExpected := (endFrame - startFrame) * int64(channels) * SizeOfInt16 var ( bytesRead int64 closing bool currPeakIndex int currFrame int64 ) for { n, err := modReader.Read(readBuf) if err == io.EOF { closing = true } else if err != nil { return nil, fmt.Errorf("read error: %v", err) } bytesRead += int64(n) samples := sampleBuf[:n/SizeOfInt16] if err := binary.Read(bytes.NewReader(readBuf[:n]), binary.LittleEndian, samples); err != nil { return nil, fmt.Errorf("error interpreting samples: %v", err) } for i := 0; i < len(samples); i += channels { for j := 0; j < channels; j++ { samp := sampleBuf[i+j] if samp < 0 { samp = -samp } if samp > peaks[currPeakIndex+j] { peaks[currPeakIndex+j] = samp } } if currFrame == framesPerBin { currFrame = 0 currPeakIndex += channels } else { currFrame++ } } if closing { break } } if bytesRead < bytesExpected { s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioS3Key.String).Warn("short read from S3") } return peaks, nil } func sqlString(s string) sql.NullString { return sql.NullString{String: s, Valid: true} } func sqlInt64(i int64) sql.NullInt64 { return sql.NullInt64{Int64: i, Valid: true} } func sqlInt32(i int32) sql.NullInt32 { return sql.NullInt32{Int32: i, Valid: true} } // ModuloBufReader reads from a reader in block sizes that are exactly modulo // modSize, with any remainder buffered until the next read. type ModuloBufReader struct { io.ReadCloser buf bytes.Buffer modSize int } func NewModuloBufReader(r io.ReadCloser, modSize int) *ModuloBufReader { return &ModuloBufReader{ReadCloser: r, modSize: modSize} } func (r *ModuloBufReader) Read(p []byte) (int, error) { // err is always io.EOF or nil nr1, _ := r.buf.Read(p) nr2, err := r.ReadCloser.Read(p[nr1:]) nr := nr1 + nr2 rem := nr % r.modSize // if there was an error, return immediately. if err == io.EOF { return nr, err } else if err != nil { return nr - rem, err } // write any remainder to the buffer if rem != 0 { // err is always nil _, _ = r.buf.Write(p[nr-rem : nr]) } return nr - rem, err } type VideoThumbnail struct { Data []byte Width, Height int } func (s *MediaSetService) GetVideoThumbnail(ctx context.Context, id uuid.UUID) (VideoThumbnail, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return VideoThumbnail{}, fmt.Errorf("error getting media set: %v", err) } if mediaSet.VideoThumbnailS3UploadedAt.Valid { return s.getThumbnailFromS3(ctx, mediaSet) } return s.getThumbnailFromYoutube(ctx, mediaSet) } func (s *MediaSetService) getThumbnailFromS3(ctx context.Context, mediaSet store.MediaSet) (VideoThumbnail, error) { input := s3.GetObjectInput{ Bucket: aws.String(mediaSet.VideoThumbnailS3Bucket.String), Key: aws.String(mediaSet.VideoThumbnailS3Key.String), } output, err := s.s3.GetObject(ctx, &input) if err != nil { return VideoThumbnail{}, fmt.Errorf("error fetching thumbnail from s3: %v", err) } defer output.Body.Close() imageData, err := io.ReadAll(output.Body) if err != nil { return VideoThumbnail{}, fmt.Errorf("error reading thumbnail from s3: %v", err) } return VideoThumbnail{ Width: int(mediaSet.VideoThumbnailWidth.Int32), Height: int(mediaSet.VideoThumbnailHeight.Int32), Data: imageData, }, nil } func (s *MediaSetService) getThumbnailFromYoutube(ctx context.Context, mediaSet store.MediaSet) (VideoThumbnail, error) { video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) if err != nil { return VideoThumbnail{}, fmt.Errorf("error fetching video: %v", err) } if len(video.Formats) == 0 { return VideoThumbnail{}, errors.New("no format available") } thumbnails := video.Thumbnails SortYoutubeThumbnails(thumbnails) thumbnail := thumbnails[0] resp, err := http.Get(thumbnail.URL) if err != nil { return VideoThumbnail{}, fmt.Errorf("error fetching thumbnail: %v", err) } defer resp.Body.Close() imageData, err := io.ReadAll(resp.Body) if err != nil { return VideoThumbnail{}, fmt.Errorf("error reading thumbnail: %v", err) } // TODO: use mediaSet func to fetch s3Key s3Key := fmt.Sprintf("media_sets/%s/thumbnail.jpg", mediaSet.ID) uploader := newMultipartUploader(s.s3, s.logger) const mimeType = "application/jpeg" _, err = uploader.Upload(ctx, bytes.NewReader(imageData), s.config.S3Bucket, s3Key, mimeType) if err != nil { return VideoThumbnail{}, fmt.Errorf("error uploading thumbnail: %v", err) } storeParams := store.SetVideoThumbnailUploadedParams{ ID: mediaSet.ID, VideoThumbnailMimeType: sqlString(mimeType), VideoThumbnailS3Bucket: sqlString(s.config.S3Bucket), VideoThumbnailS3Key: sqlString(s3Key), VideoThumbnailWidth: sqlInt32(int32(thumbnail.Width)), VideoThumbnailHeight: sqlInt32(int32(thumbnail.Height)), } if _, err := s.store.SetVideoThumbnailUploaded(ctx, storeParams); err != nil { return VideoThumbnail{}, fmt.Errorf("error updating media set: %v", err) } return VideoThumbnail{Width: int(thumbnail.Width), Height: int(thumbnail.Height), Data: imageData}, nil }