package media import ( "bytes" "context" "errors" "fmt" "io" "log" "os/exec" "sort" "strconv" "strings" "time" "github.com/aws/aws-sdk-go-v2/service/s3" youtubev2 "github.com/kkdai/youtube/v2" ) const s3Bucket = "clipper-development" const ( rawAudioCodec = "pcm_s16le" rawAudioFormat = "s16le" rawAudioSampleRate = 48_000 ) // progressReader is a reader that prints progress logs as it reads. type progressReader struct { io.Reader label string total, exp int } func (pw *progressReader) Read(p []byte) (int, error) { n, err := pw.Reader.Read(p) pw.total += n log.Printf("[ProgressReader] [%s] Read %d of %d (%.02f%%) bytes from the provided reader", pw.label, pw.total, pw.exp, (float32(pw.total)/float32(pw.exp))*100.0) return n, err } // S3Client stubs the AWS S3 service client. type S3Client interface { CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) } // YoutubeClient stubs the youtube.Client client. type YoutubeClient interface { GetVideoContext(context.Context, string) (*youtubev2.Video, error) GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error) } type MediaSet2 struct { id string } func NewMediaSet2(id string) *MediaSet2 { return &MediaSet2{ id: id, } } // VideoFetchService fetches a video via an io.Reader. type VideoFetchService struct { youtube YoutubeClient s3 S3Client } func NewVideoFetchService(youtubeClient YoutubeClient, s3Client S3Client) *VideoFetchService { return &VideoFetchService{ youtube: youtubeClient, s3: s3Client, } } // Fetch handles the entire process to fetch and process the audio and video // parts of a MediaSet. func (s *VideoFetchService) Fetch(ctx context.Context, id string) (*MediaSet, error) { video, err := s.youtube.GetVideoContext(ctx, id) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } if len(video.Formats) == 0 { return nil, errors.New("no format available") } // just the audio for now // grab an audio stream from youtube // TODO: avoid possible panic format := sortAudio(video.Formats)[0] sampleRate, err := strconv.Atoi(format.AudioSampleRate) if err != nil { return nil, fmt.Errorf("invalid samplerate: %s", format.AudioSampleRate) } approxDurationMsecs, err := strconv.Atoi(format.ApproxDurationMs) if err != nil { return nil, fmt.Errorf("could not parse audio duration: %s", err) } approxDuration := time.Duration(approxDurationMsecs) * time.Millisecond approxFrames := int64(approxDuration/time.Second) * int64(sampleRate) mediaSet := MediaSet{ ID: id, Audio: Audio{ // we need to decode it to be able to know bytes and frames exactly ApproxFrames: approxFrames, Channels: format.AudioChannels, SampleRate: sampleRate, }, } return &mediaSet, nil } type PeakIterator interface { Next() (FetchPeaksProgress, error) Close() error } func (s *VideoFetchService) FetchPeaks(ctx context.Context, id string) (PeakIterator, error) { mediaSet := NewMediaSet(id) if !mediaSet.Exists() { // TODO check if audio uploaded already, don't bother again return nil, errors.New("no media set found") } if err := mediaSet.Load(); err != nil { return nil, fmt.Errorf("error loading media set: %v", err) } video, err := s.youtube.GetVideoContext(ctx, id) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } if len(video.Formats) == 0 { return nil, errors.New("no format available") } // TODO: avoid possible panic format := sortAudio(video.Formats)[0] stream, _, err := s.youtube.GetStreamContext(ctx, video, &format) if err != nil { return nil, fmt.Errorf("error fetching stream: %v", err) } // wrap it in a progress reader progressStream := &progressReader{Reader: stream, label: "audio", exp: int(format.ContentLength)} ffmpegReader, err := newFfmpegReader(ctx, progressStream, "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") if err != nil { return nil, fmt.Errorf("error creating ffmpegreader: %v", err) } // set up uploader, this is writer 1 uploader, err := newMultipartUploadWriter( ctx, s.s3, s3Bucket, fmt.Sprintf("media_sets/%s/audio.webm", id), "application/octet-stream", ) if err != nil { return nil, fmt.Errorf("error creating uploader: %v", err) } peakIterator := newFetchPeaksIterator( mediaSet.Audio.ApproxFrames, format.AudioChannels, 100, ) state := fetchPeaksState{ fetchPeaksIterator: peakIterator, ffmpegReader: ffmpegReader, uploader: uploader, } go state.run(ctx) // pass ctx? return &state, nil } type fetchPeaksState struct { *fetchPeaksIterator ffmpegReader *ffmpegReader uploader *multipartUploadWriter err error } // run copies the audio data from ffmpeg, waits for termination and then cleans // up appropriately. func (s *fetchPeaksState) run(ctx context.Context) { mw := io.MultiWriter(s.fetchPeaksIterator, s.uploader) done := make(chan error) var err error go func() { _, copyErr := io.Copy(mw, s.ffmpegReader) done <- copyErr }() outer: for { select { case <-ctx.Done(): err = ctx.Err() break outer case err = <-done: break outer } } if readerErr := s.ffmpegReader.Close(); readerErr != nil { log.Printf("error closing ffmpegReader: %v", readerErr) if err == nil { err = readerErr } } if err == nil { if uploaderErr := s.uploader.Complete(); uploaderErr != nil { log.Printf("error closing uploader: %v", uploaderErr) err = uploaderErr } } if err != nil { newCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() if abortUploadErr := s.uploader.Abort(newCtx); abortUploadErr != nil { log.Printf("error aborting uploader: %v", abortUploadErr) } s.Abort(err) return } if iterErr := s.Close(); iterErr != nil { log.Printf("error closing peak iterator: %v", iterErr) } } type ffmpegReader struct { io.ReadCloser cmd *exec.Cmd } func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmpegReader, error) { var stdErr bytes.Buffer cmd := exec.CommandContext(ctx, "ffmpeg", arg...) cmd.Stdin = input cmd.Stderr = &stdErr // TODO: fix error handling r, err := cmd.StdoutPipe() if err != nil { return nil, fmt.Errorf("error creating pipe: %v", err) } if err := cmd.Start(); err != nil { return nil, fmt.Errorf("error starting ffmpeg: %v", err) } return &ffmpegReader{ReadCloser: r, cmd: cmd}, nil } func (r *ffmpegReader) Close() error { state, err := r.cmd.Process.Wait() if err != nil { return fmt.Errorf("error returned from process: %v", err) } if state.ExitCode() != 0 { return fmt.Errorf("command exited with code %d", state.ExitCode()) } log.Println("returning from ffmpegreader.close") return nil } // sortAudio returns the provided formats ordered in descending preferred // order. The ideal candidate is opus-encoded stereo audio in a webm container, // with the lowest available bitrate. func sortAudio(inFormats youtubev2.FormatList) youtubev2.FormatList { var formats youtubev2.FormatList for _, format := range inFormats { if format.FPS == 0 && format.AudioChannels > 0 { formats = append(formats, format) } } sort.SliceStable(formats, func(i, j int) bool { isOpusI := strings.Contains(formats[i].MimeType, "opus") isOpusJ := strings.Contains(formats[j].MimeType, "opus") if isOpusI && isOpusJ { isStereoI := formats[i].AudioChannels == 2 isStereoJ := formats[j].AudioChannels == 2 if isStereoI && isStereoJ { return formats[i].ContentLength < formats[j].ContentLength } return isStereoI } return isOpusI }) return formats }