package media import ( "bytes" "context" "database/sql" "encoding/binary" "errors" "fmt" "io" "strconv" "time" "git.netflux.io/rob/clipper/config" "git.netflux.io/rob/clipper/generated/store" "github.com/google/uuid" "github.com/jackc/pgx/v4" youtubev2 "github.com/kkdai/youtube/v2" "go.uber.org/zap" ) const ( rawAudioCodec = "pcm_s16le" rawAudioFormat = "s16le" rawAudioSampleRate = 48_000 rawAudioMimeType = "audio/raw" ) const ( thumbnailWidth = 177 // 16:9 thumbnailHeight = 100 // " ) // MediaSetService exposes logical flows handling MediaSets. type MediaSetService struct { store Store youtube YoutubeClient fileStore FileStore commandFunc CommandFunc config config.Config logger *zap.SugaredLogger } func NewMediaSetService(store Store, youtubeClient YoutubeClient, fileStore FileStore, commandFunc CommandFunc, config config.Config, logger *zap.SugaredLogger) *MediaSetService { return &MediaSetService{ store: store, youtube: youtubeClient, fileStore: fileStore, commandFunc: commandFunc, config: config, logger: logger, } } // Get fetches the metadata for a given MediaSet source. If it does not exist // in the local DB, it will attempt to create it. After the resource has been // created, other endpoints (e.g. GetPeaks) can be called to fetch media from // Youtube and store it in a file store. func (s *MediaSetService) Get(ctx context.Context, youtubeID string) (*MediaSet, error) { var ( mediaSet *MediaSet err error ) mediaSet, err = s.findMediaSet(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error finding existing media set: %v", err) } if mediaSet == nil { mediaSet, err = s.createMediaSet(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error creating new media set: %v", err) } } return mediaSet, nil } func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) { video, err := s.youtube.GetVideoContext(ctx, youtubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } if len(video.Formats) == 0 { return nil, errors.New("no format available") } audioMetadata, err := s.fetchAudioMetadata(ctx, video) if err != nil { return nil, fmt.Errorf("error fetching audio metadata: %v", err) } videoMetadata, err := s.fetchVideoMetadata(ctx, video) if err != nil { return nil, fmt.Errorf("error fetching video metadata: %v", err) } storeParams := store.CreateMediaSetParams{ YoutubeID: youtubeID, AudioYoutubeItag: int32(audioMetadata.YoutubeItag), AudioChannels: int32(audioMetadata.Channels), AudioFramesApprox: audioMetadata.ApproxFrames, AudioSampleRate: int32(audioMetadata.SampleRate), AudioEncodedMimeType: audioMetadata.MimeType, AudioContentLength: audioMetadata.ContentLength, VideoYoutubeItag: int32(videoMetadata.YoutubeItag), VideoContentLength: videoMetadata.ContentLength, VideoMimeType: videoMetadata.MimeType, VideoDurationNanos: videoMetadata.Duration.Nanoseconds(), } mediaSet, err := s.store.CreateMediaSet(ctx, storeParams) if err != nil { return nil, fmt.Errorf("error creating media set in store: %v", err) } return &MediaSet{ ID: mediaSet.ID, YoutubeID: youtubeID, Audio: audioMetadata, Video: videoMetadata, }, nil } // findMediaSet fetches a record from the database, returning (nil, nil) if it does not exist. func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*MediaSet, error) { mediaSet, err := s.store.GetMediaSetByYoutubeID(ctx, youtubeID) if err != nil { if err == pgx.ErrNoRows { return nil, nil } return nil, fmt.Errorf("error getting media set: %v", err) } return &MediaSet{ ID: mediaSet.ID, YoutubeID: mediaSet.YoutubeID, Audio: Audio{ YoutubeItag: int(mediaSet.AudioYoutubeItag), ContentLength: mediaSet.AudioContentLength, Channels: int(mediaSet.AudioChannels), ApproxFrames: int64(mediaSet.AudioFramesApprox), Frames: mediaSet.AudioFrames.Int64, SampleRate: int(mediaSet.AudioSampleRate), MimeType: mediaSet.AudioEncodedMimeType, }, Video: Video{ YoutubeItag: int(mediaSet.VideoYoutubeItag), ContentLength: mediaSet.VideoContentLength, Duration: time.Duration(mediaSet.VideoDurationNanos), MimeType: mediaSet.VideoMimeType, ThumbnailWidth: 0, // ?? ThumbnailHeight: 0, // ?? }, }, nil } func (s *MediaSetService) fetchVideoMetadata(ctx context.Context, video *youtubev2.Video) (Video, error) { formats := FilterYoutubeVideo(video.Formats) if len(video.Formats) == 0 { return Video{}, errors.New("no format available") } format := formats[0] durationMsecs, err := strconv.Atoi(format.ApproxDurationMs) if err != nil { return Video{}, fmt.Errorf("could not parse video duration: %s", err) } return Video{ YoutubeItag: format.ItagNo, MimeType: format.MimeType, ContentLength: format.ContentLength, ThumbnailWidth: thumbnailWidth, ThumbnailHeight: thumbnailHeight, Duration: time.Duration(durationMsecs) * time.Millisecond, }, nil } func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtubev2.Video) (Audio, error) { formats := FilterYoutubeAudio(video.Formats) if len(video.Formats) == 0 { return Audio{}, errors.New("no format available") } format := formats[0] sampleRate, err := strconv.Atoi(format.AudioSampleRate) if err != nil { return Audio{}, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate) } approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs) if err != nil { return Audio{}, fmt.Errorf("could not parse audio duration: %s", err) } approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond approxFrames := int64(approxDuration/time.Second) * int64(sampleRate) return Audio{ ContentLength: format.ContentLength, MimeType: format.MimeType, YoutubeItag: format.ItagNo, ApproxFrames: approxFrames, Channels: format.AudioChannels, SampleRate: sampleRate, }, nil } // GetVideo fetches the video part of a MediaSet. func (s *MediaSetService) GetVideo(ctx context.Context, id uuid.UUID) (GetVideoProgressReader, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) } if mediaSet.VideoS3UploadedAt.Valid { url, err := s.fileStore.GetURL(ctx, mediaSet.VideoS3Key.String) if err != nil { return nil, fmt.Errorf("error generating presigned URL: %v", err) } videoGetter := videoGetterDownloaded(url) return &videoGetter, nil } video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } format := video.Formats.FindByItag(int(mediaSet.VideoYoutubeItag)) if format == nil { return nil, fmt.Errorf("error finding itag: %v", err) } stream, _, err := s.youtube.GetStreamContext(ctx, video, format) if err != nil { return nil, fmt.Errorf("error fetching stream: %v", err) } // TODO: use mediaSet func to fetch videoKey videoKey := fmt.Sprintf("media_sets/%s/video.mp4", mediaSet.ID) videoGetter := newVideoGetter(s.store, s.fileStore, s.logger) return videoGetter.GetVideo( ctx, stream, format.ContentLength, mediaSet.ID, videoKey, format.MimeType, ) } // GetPeaks fetches the audio part of a MediaSet. func (s *MediaSetService) GetPeaks(ctx context.Context, id uuid.UUID, numBins int) (GetPeaksProgressReader, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) } // We need both raw and encoded audio to have been uploaded successfully. // Otherwise, we cannot return both peaks and a presigned URL for use by the // player. if mediaSet.AudioRawS3UploadedAt.Valid && mediaSet.AudioEncodedS3UploadedAt.Valid { return s.getPeaksFromFileStore(ctx, mediaSet, numBins) } // Fetch the audio from Youtube, calculate and store the peaks and return // them. return s.getAudioFromYoutube(ctx, mediaSet, numBins) } func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetPeaksProgressReader, error) { audioGetter := newAudioGetter(s.store, s.youtube, s.fileStore, s.config, s.logger) return audioGetter.GetAudio(ctx, mediaSet, numBins) } func (s *MediaSetService) getPeaksFromFileStore(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetPeaksProgressReader, error) { object, err := s.fileStore.GetObject(ctx, mediaSet.AudioRawS3Key.String) if err != nil { return nil, fmt.Errorf("error getting object from file store: %v", err) } getPeaksProgressReader, err := newGetPeaksProgressReader( int64(mediaSet.AudioFrames.Int64), int(mediaSet.AudioChannels), numBins, ) if err != nil { return nil, fmt.Errorf("error creating audio reader: %v", err) } state := getPeaksFromFileStoreState{ getPeaksProgressReader: getPeaksProgressReader, reader: NewModuloReader(object, int(mediaSet.AudioChannels)*SizeOfInt16), fileStore: s.fileStore, config: s.config, logger: s.logger, } go state.run(ctx, mediaSet) return &state, nil } type getPeaksFromFileStoreState struct { *getPeaksProgressReader reader io.ReadCloser fileStore FileStore config config.Config logger *zap.SugaredLogger } func (s *getPeaksFromFileStoreState) run(ctx context.Context, mediaSet store.MediaSet) { done := make(chan error) var err error go func() { _, copyErr := io.Copy(s, s.reader) done <- copyErr }() outer: for { select { case <-ctx.Done(): err = ctx.Err() break outer case err = <-done: break outer } } if readerErr := s.reader.Close(); readerErr != nil { if err == nil { err = readerErr } } if err != nil { s.logger.Errorf("getAudioFromFileStoreState: error closing reader: %v", err) s.CloseWithError(err) return } url, err := s.fileStore.GetURL(ctx, mediaSet.AudioEncodedS3Key.String) if err != nil { s.CloseWithError(fmt.Errorf("error generating object URL: %v", err)) } if iterErr := s.Close(url); iterErr != nil { s.logger.Errorf("getAudioFromFileStoreState: error closing progress iterator: %v", iterErr) } } func (s *MediaSetService) GetPeaksForSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, numBins int) ([]int16, error) { if startFrame < 0 || endFrame < 0 || numBins <= 0 { s.logger.With("startFrame", startFrame, "endFrame", endFrame, "numBins", numBins).Error("invalid arguments") return nil, errors.New("invalid arguments") } mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) } object, err := s.fileStore.GetObjectWithRange( ctx, mediaSet.AudioRawS3Key.String, startFrame*int64(mediaSet.AudioChannels)*SizeOfInt16, endFrame*int64(mediaSet.AudioChannels)*SizeOfInt16, ) if err != nil { return nil, fmt.Errorf("error getting object from file store: %v", err) } defer object.Close() const readBufSizeBytes = 8_192 channels := int(mediaSet.AudioChannels) modReader := NewModuloReader(object, channels*SizeOfInt16) readBuf := make([]byte, readBufSizeBytes) peaks := make([]int16, channels*numBins) totalFrames := endFrame - startFrame framesPerBin := totalFrames / int64(numBins) sampleBuf := make([]int16, readBufSizeBytes/SizeOfInt16) bytesExpected := (endFrame - startFrame) * int64(channels) * SizeOfInt16 var ( bytesRead int64 closing bool currPeakIndex int currFrame int64 ) for { n, err := modReader.Read(readBuf) if err == io.EOF { closing = true } else if err != nil { return nil, fmt.Errorf("read error: %v", err) } bytesRead += int64(n) samples := sampleBuf[:n/SizeOfInt16] if err := binary.Read(bytes.NewReader(readBuf[:n]), binary.LittleEndian, samples); err != nil { return nil, fmt.Errorf("error interpreting samples: %v", err) } for i := 0; i < len(samples); i += channels { for j := 0; j < channels; j++ { samp := sampleBuf[i+j] if samp < 0 { samp = -samp } if samp > peaks[currPeakIndex+j] { peaks[currPeakIndex+j] = samp } } if currFrame == framesPerBin { currFrame = 0 currPeakIndex += channels } else { currFrame++ } } if closing { break } } if bytesRead < bytesExpected { s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioRawS3Key.String).Warn("short read from file store") } return peaks, nil } func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, outFormat AudioFormat) (*AudioSegmentStream, error) { if startFrame > endFrame { return nil, errors.New("invalid range") } mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) } // TODO: use mediaSet func to fetch key key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) startByte := startFrame * int64(mediaSet.AudioChannels) * SizeOfInt16 endByte := endFrame * int64(mediaSet.AudioChannels) * SizeOfInt16 rawAudio, err := s.fileStore.GetObjectWithRange(ctx, key, startByte, endByte) if err != nil { return nil, fmt.Errorf("error getting object from store: %v", err) } g := newAudioSegmentGetter(s.commandFunc, rawAudio, mediaSet.AudioChannels, endByte-startByte, outFormat) go g.getAudioSegment(ctx) return g.stream, nil } // logProgressReader is a reader that prints progress logs as it reads. type logProgressReader struct { io.Reader total, exp int64 logger *zap.SugaredLogger } func newLogProgressReader(reader io.Reader, label string, exp int64, logger *zap.SugaredLogger) *logProgressReader { return &logProgressReader{ Reader: reader, exp: exp, logger: logger.Named(label), } } func (r *logProgressReader) Read(p []byte) (int, error) { n, err := r.Reader.Read(p) r.total += int64(n) r.logger.Debugf("Read %d of %d (%.02f%%) bytes from the provided reader", r.total, r.exp, (float32(r.total)/float32(r.exp))*100.0) return n, err } func sqlString(s string) sql.NullString { return sql.NullString{String: s, Valid: true} } func sqlInt64(i int64) sql.NullInt64 { return sql.NullInt64{Int64: i, Valid: true} } func sqlInt32(i int32) sql.NullInt32 { return sql.NullInt32{Int32: i, Valid: true} }