package media import ( "bytes" "context" "database/sql" "encoding/binary" "errors" "fmt" "io" "net/http" "strconv" "time" "git.netflux.io/rob/clipper/config" "git.netflux.io/rob/clipper/generated/store" "github.com/google/uuid" "github.com/jackc/pgx/v4" youtubev2 "github.com/kkdai/youtube/v2" "go.uber.org/zap" ) const ( getVideoExpiresIn = time.Hour getAudioExpiresIn = time.Hour ) const ( rawAudioCodec = "pcm_s16le" rawAudioFormat = "s16le" rawAudioSampleRate = 48_000 rawAudioMimeType = "audio/raw" ) const ( thumbnailWidth = 177 // 16:9 thumbnailHeight = 100 // " ) // MediaSetService exposes logical flows handling MediaSets. type MediaSetService struct { store Store youtube YoutubeClient fileStore FileStore config config.Config logger *zap.SugaredLogger } func NewMediaSetService(store Store, youtubeClient YoutubeClient, fileStore FileStore, config config.Config, logger *zap.SugaredLogger) *MediaSetService { return &MediaSetService{ store: store, youtube: youtubeClient, fileStore: fileStore, config: config, logger: logger, } } // Get fetches the metadata for a given MediaSet source. If it does not exist // in the local DB, it will attempt to create it. After the resource has been // created, other endpoints (e.g. GetAudio) can be called to fetch media from // Youtube and store it in a file store. func (s *MediaSetService) Get(ctx context.Context, youtubeID string) (*MediaSet, error) { var ( mediaSet *MediaSet err error ) mediaSet, err = s.findMediaSet(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error finding existing media set: %v", err) } if mediaSet == nil { mediaSet, err = s.createMediaSet(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error creating new media set: %v", err) } } return mediaSet, nil } func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) { video, err := s.youtube.GetVideoContext(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } if len(video.Formats) == 0 { return nil, errors.New("no format available") } audioMetadata, err := s.fetchAudioMetadata(ctx, video) if err != nil { return nil, fmt.Errorf("error fetching audio metadata: %v", err) } videoMetadata, err := s.fetchVideoMetadata(ctx, video) if err != nil { return nil, fmt.Errorf("error fetching video metadata: %v", err) } storeParams := store.CreateMediaSetParams{ YoutubeID: youtubeID, AudioYoutubeItag: int32(audioMetadata.YoutubeItag), AudioChannels: int32(audioMetadata.Channels), AudioFramesApprox: audioMetadata.ApproxFrames, AudioSampleRate: int32(audioMetadata.SampleRate), AudioEncodedMimeType: audioMetadata.MimeType, AudioContentLength: audioMetadata.ContentLength, VideoYoutubeItag: int32(videoMetadata.YoutubeItag), VideoContentLength: videoMetadata.ContentLength, VideoMimeType: videoMetadata.MimeType, VideoDurationNanos: videoMetadata.Duration.Nanoseconds(), } mediaSet, err := s.store.CreateMediaSet(ctx, storeParams) if err != nil { return nil, fmt.Errorf("error creating media set in store: %v", err) } return &MediaSet{ ID: mediaSet.ID, YoutubeID: youtubeID, Audio: audioMetadata, Video: videoMetadata, }, nil } // findMediaSet fetches a record from the database, returning (nil, nil) if it does not exist. func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) { mediaSet, err := s.store.GetMediaSetByYoutubeID(ctx, youtubeID) if err != nil { if err == pgx.ErrNoRows { return nil, nil } return nil, fmt.Errorf("error getting media set: %v", err) } return &MediaSet{ ID: mediaSet.ID, YoutubeID: mediaSet.YoutubeID, Audio: Audio{ YoutubeItag: int(mediaSet.AudioYoutubeItag), ContentLength: mediaSet.AudioContentLength, Channels: int(mediaSet.AudioChannels), ApproxFrames: int64(mediaSet.AudioFramesApprox), Frames: mediaSet.AudioFrames.Int64, SampleRate: int(mediaSet.AudioSampleRate), MimeType: mediaSet.AudioEncodedMimeType, }, Video: Video{ YoutubeItag: int(mediaSet.VideoYoutubeItag), ContentLength: mediaSet.VideoContentLength, Duration: time.Duration(mediaSet.VideoDurationNanos), MimeType: mediaSet.VideoMimeType, ThumbnailWidth: 0, // ?? ThumbnailHeight: 0, // ?? }, }, nil } func (s *MediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) { formats := FilterYoutubeVideo(video.Formats) if len(video.Formats) == 0 { return Video{}, errors.New("no format available") } format := formats[0] durationMsecs, err := strconv.Atoi(format.ApproxDurationMs) if err != nil { return Video{}, fmt.Errorf("could not parse video duration: %s", err) } return Video{ YoutubeItag: format.ItagNo, MimeType: format.MimeType, ContentLength: format.ContentLength, ThumbnailWidth: thumbnailWidth, ThumbnailHeight: thumbnailHeight, Duration: time.Duration(durationMsecs) * time.Millisecond, }, nil } func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) { formats := FilterYoutubeAudio(video.Formats) if len(video.Formats) == 0 { return Audio{}, errors.New("no format available") } format := formats[0] sampleRate, err := strconv.Atoi(format.AudioSampleRate) if err != nil { return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate) } approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs) if err != nil { return Audio{}, fmt.Errorf("could not parse audio duration: %s", err) } approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond approxFrames := int64(approxDuration/time.Second) * int64(sampleRate) return Audio{ ContentLength: format.ContentLength, MimeType: format.MimeType, YoutubeItag: format.ItagNo, ApproxFrames: approxFrames, Channels: format.AudioChannels, SampleRate: sampleRate, }, nil } // GetVideo fetches the video part of a MediaSet. func (s *MediaSetService) GetVideo(ctx context.Context, id uuid.UUID) (GetVideoProgressReader, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) } if mediaSet.VideoS3UploadedAt.Valid { url, err := s.fileStore.GetURL(ctx, mediaSet.VideoS3Key.String) if err != nil { return nil, fmt.Errorf("error generating presigned URL: %v", err) } videoGetter := videoGetterDownloaded(url) return &videoGetter, nil } video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } format := video.Formats.FindByItag(int(mediaSet.VideoYoutubeItag)) if format == nil { return nil, fmt.Errorf("error finding itag: %v", err) } stream, _, err := s.youtube.GetStreamContext(ctx, video, format) if err != nil { return nil, fmt.Errorf("error fetching stream: %v", err) } // TODO: use mediaSet func to fetch videoKey videoKey := fmt.Sprintf("media_sets/%s/video.mp4", mediaSet.ID) videoGetter := newVideoGetter(s.store, s.fileStore, s.logger) return videoGetter.GetVideo( ctx, stream, format.ContentLength, mediaSet.ID, videoKey, format.MimeType, ) } // GetAudio fetches the audio part of a MediaSet. func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins int) (GetAudioProgressReader, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) } // We need both raw and encoded audio to have been uploaded successfully. // Otherwise, we cannot return both peaks and a presigned URL for use by the // player. if mediaSet.AudioRawS3UploadedAt.Valid && mediaSet.AudioEncodedS3UploadedAt.Valid { return s.getAudioFromFileStore(ctx, mediaSet, numBins) } return s.getAudioFromYoutube(ctx, mediaSet, numBins) } func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { audioGetter := newAudioGetter(s.store, s.youtube, s.fileStore, s.config, s.logger) return audioGetter.GetAudio(ctx, mediaSet, numBins) } func (s *MediaSetService) getAudioFromFileStore(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) { object, err := s.fileStore.GetObject(ctx, mediaSet.AudioRawS3Key.String) if err != nil { return nil, fmt.Errorf("error getting object from file store: %v", err) } getAudioProgressReader, err := newGetAudioProgressReader( int64(mediaSet.AudioFrames.Int64), int(mediaSet.AudioChannels), numBins, ) if err != nil { return nil, fmt.Errorf("error creating audio reader: %v", err) } state := getAudioFromFileStoreState{ getAudioProgressReader: getAudioProgressReader, reader: NewModuloBufReader(object, int(mediaSet.AudioChannels)*SizeOfInt16), fileStore: s.fileStore, config: s.config, logger: s.logger, } go state.run(ctx, mediaSet) return &state, nil } type getAudioFromFileStoreState struct { *getAudioProgressReader reader io.ReadCloser fileStore FileStore config config.Config logger *zap.SugaredLogger } func (s *getAudioFromFileStoreState) run(ctx context.Context, mediaSet store.MediaSet) { done := make(chan error) var err error go func() { _, copyErr := io.Copy(s, s.reader) done <- copyErr }() outer: for { select { case <-ctx.Done(): err = ctx.Err() break outer case err = <-done: break outer } } if readerErr := s.reader.Close(); readerErr != nil { if err == nil { err = readerErr } } if err != nil { s.logger.Errorf("getAudioFromFileStoreState: error closing reader: %v", err) s.CloseWithError(err) return } url, err := s.fileStore.GetURL(ctx, mediaSet.AudioEncodedS3Key.String) if err != nil { s.CloseWithError(fmt.Errorf("error generating object URL: %v", err)) } if iterErr := s.Close(url); iterErr != nil { s.logger.Errorf("getAudioFromFileStoreState: error closing progress iterator: %v", iterErr) } } func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, numBins int) ([]int16, error) { if startFrame < 0 || endFrame < 0 || numBins <= 0 { s.logger.With("startFrame", startFrame, "endFrame", endFrame, "numBins", numBins).Error("invalid arguments") return nil, errors.New("invalid arguments") } mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) } object, err := s.fileStore.GetObjectWithRange( ctx, mediaSet.AudioRawS3Key.String, startFrame*int64(mediaSet.AudioChannels)*SizeOfInt16, endFrame*int64(mediaSet.AudioChannels)*SizeOfInt16, ) if err != nil { return nil, fmt.Errorf("error getting object from file store: %v", err) } defer object.Close() const readBufSizeBytes = 8_192 channels := int(mediaSet.AudioChannels) modReader := NewModuloBufReader(object, channels*SizeOfInt16) readBuf := make([]byte, readBufSizeBytes) peaks := make([]int16, channels*numBins) totalFrames := endFrame - startFrame framesPerBin := totalFrames / int64(numBins) sampleBuf := make([]int16, readBufSizeBytes/SizeOfInt16) bytesExpected := (endFrame - startFrame) * int64(channels) * SizeOfInt16 var ( bytesRead int64 closing bool currPeakIndex int currFrame int64 ) for { n, err := modReader.Read(readBuf) if err == io.EOF { closing = true } else if err != nil { return nil, fmt.Errorf("read error: %v", err) } bytesRead += int64(n) samples := sampleBuf[:n/SizeOfInt16] if err := binary.Read(bytes.NewReader(readBuf[:n]), binary.LittleEndian, samples); err != nil { return nil, fmt.Errorf("error interpreting samples: %v", err) } for i := 0; i < len(samples); i += channels { for j := 0; j < channels; j++ { samp := sampleBuf[i+j] if samp < 0 { samp = -samp } if samp > peaks[currPeakIndex+j] { peaks[currPeakIndex+j] = samp } } if currFrame == framesPerBin { currFrame = 0 currPeakIndex += channels } else { currFrame++ } } if closing { break } } if bytesRead < bytesExpected { s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioRawS3Key.String).Warn("short read from file store") } return peaks, nil } func sqlString(s string) sql.NullString { return sql.NullString{String: s, Valid: true} } func sqlInt64(i int64) sql.NullInt64 { return sql.NullInt64{Int64: i, Valid: true} } func sqlInt32(i int32) sql.NullInt32 { return sql.NullInt32{Int32: i, Valid: true} } // ModuloBufReader reads from a reader in block sizes that are exactly modulo // modSize, with any remainder buffered until the next read. type ModuloBufReader struct { io.ReadCloser buf bytes.Buffer modSize int } func NewModuloBufReader(r io.ReadCloser, modSize int) *ModuloBufReader { return &ModuloBufReader{ReadCloser: r, modSize: modSize} } func (r *ModuloBufReader) Read(p []byte) (int, error) { // err is always io.EOF or nil nr1, _ := r.buf.Read(p) nr2, err := r.ReadCloser.Read(p[nr1:]) nr := nr1 + nr2 rem := nr % r.modSize // if there was an error, return immediately. if err == io.EOF { return nr, err } else if err != nil { return nr - rem, err } // write any remainder to the buffer if rem != 0 { // err is always nil _, _ = r.buf.Write(p[nr-rem : nr]) } return nr - rem, err } type VideoThumbnail struct { Data []byte Width, Height int } func (s *MediaSetService) GetVideoThumbnail(ctx context.Context, id uuid.UUID) (VideoThumbnail, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return VideoThumbnail{}, fmt.Errorf("error getting media set: %v", err) } if mediaSet.VideoThumbnailS3UploadedAt.Valid { return s.getThumbnailFromFileStore(ctx, mediaSet) } return s.getThumbnailFromYoutube(ctx, mediaSet) } func (s *MediaSetService) getThumbnailFromFileStore(ctx context.Context, mediaSet store.MediaSet) (VideoThumbnail, error) { object, err := s.fileStore.GetObject(ctx, mediaSet.VideoThumbnailS3Key.String) if err != nil { return VideoThumbnail{}, fmt.Errorf("error fetching thumbnail from file store: %v", err) } defer object.Close() imageData, err := io.ReadAll(object) if err != nil { return VideoThumbnail{}, fmt.Errorf("error reading thumbnail from file store: %v", err) } return VideoThumbnail{ Width: int(mediaSet.VideoThumbnailWidth.Int32), Height: int(mediaSet.VideoThumbnailHeight.Int32), Data: imageData, }, nil } func (s *MediaSetService) getThumbnailFromYoutube(ctx context.Context, mediaSet store.MediaSet) (VideoThumbnail, error) { video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) if err != nil { return VideoThumbnail{}, fmt.Errorf("error fetching video: %v", err) } if len(video.Formats) == 0 { return VideoThumbnail{}, errors.New("no format available") } thumbnails := video.Thumbnails SortYoutubeThumbnails(thumbnails) thumbnail := thumbnails[0] resp, err := http.Get(thumbnail.URL) if err != nil { return VideoThumbnail{}, fmt.Errorf("error fetching thumbnail: %v", err) } defer resp.Body.Close() imageData, err := io.ReadAll(resp.Body) if err != nil { return VideoThumbnail{}, fmt.Errorf("error reading thumbnail: %v", err) } // TODO: use mediaSet func to fetch key thumbnailKey := fmt.Sprintf("media_sets/%s/thumbnail.jpg", mediaSet.ID) const mimeType = "application/jpeg" _, err = s.fileStore.PutObject(ctx, thumbnailKey, bytes.NewReader(imageData), mimeType) if err != nil { return VideoThumbnail{}, fmt.Errorf("error uploading thumbnail: %v", err) } storeParams := store.SetVideoThumbnailUploadedParams{ ID: mediaSet.ID, VideoThumbnailMimeType: sqlString(mimeType), VideoThumbnailS3Bucket: sqlString(s.config.S3Bucket), VideoThumbnailS3Key: sqlString(thumbnailKey), VideoThumbnailWidth: sqlInt32(int32(thumbnail.Width)), VideoThumbnailHeight: sqlInt32(int32(thumbnail.Height)), } if _, err := s.store.SetVideoThumbnailUploaded(ctx, storeParams); err != nil { return VideoThumbnail{}, fmt.Errorf("error updating media set: %v", err) } return VideoThumbnail{Width: int(thumbnail.Width), Height: int(thumbnail.Height), Data: imageData}, nil } // progressReader is a reader that prints progress logs as it reads. type progressReader struct { io.Reader label string total, exp int64 logger *zap.SugaredLogger } func newProgressReader(reader io.Reader, label string, exp int64, logger *zap.SugaredLogger) *progressReader { return &progressReader{ Reader: reader, exp: exp, logger: logger.Named(label), } } func (r *progressReader) Read(p []byte) (int, error) { n, err := r.Reader.Read(p) r.total += int64(n) r.logger.Debugf("Read %d of %d (%.02f%%) bytes from the provided reader", r.total, r.exp, (float32(r.total)/float32(r.exp))*100.0) return n, err }