package media import ( "bytes" "context" "encoding/binary" "fmt" "io" "math" "strconv" "git.netflux.io/rob/clipper/config" "git.netflux.io/rob/clipper/generated/store" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) type GetPeaksProgress struct { PercentComplete float32 Peaks []int16 URL string } type GetPeaksProgressReader interface { Next() (GetPeaksProgress, error) Close(string) error } // audioGetter manages getting and processing audio from Youtube. type audioGetter struct { store Store youtube YoutubeClient fileStore FileStore commandFunc CommandFunc workerPool *WorkerPool config config.Config logger *zap.SugaredLogger } // newAudioGetter returns a new audioGetter. func newAudioGetter(store Store, youtube YoutubeClient, fileStore FileStore, commandFunc CommandFunc, workerPool *WorkerPool, config config.Config, logger *zap.SugaredLogger) *audioGetter { return &audioGetter{ store: store, youtube: youtube, fileStore: fileStore, commandFunc: commandFunc, workerPool: workerPool, config: config, logger: logger, } } // GetAudio gets the audio, processes it and uploads it to a file store. It // returns a GetAudioProgressReader that can be used to poll progress reports // and audio peaks. // // TODO: accept domain object instead func (g *audioGetter) GetAudio(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetPeaksProgressReader, error) { video, err := g.youtube.GetVideoContext(ctx, mediaSet.YoutubeID) if err != nil { return nil, fmt.Errorf("error fetching video: %v", err) } format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag)) if format == nil { return nil, fmt.Errorf("error finding itag: %d", mediaSet.AudioYoutubeItag) } stream, _, err := g.youtube.GetStreamContext(ctx, video, format) if err != nil { return nil, fmt.Errorf("error fetching stream: %v", err) } audioProgressReader, err := newGetPeaksProgressReader(mediaSet.AudioFramesApprox, int(mediaSet.AudioChannels), numBins) if err != nil { return nil, fmt.Errorf("error building progress reader: %v", err) } s := &audioGetterState{ audioGetter: g, getPeaksProgressReader: audioProgressReader, } go func() { if err := g.workerPool.WaitForTask(ctx, func() error { return s.getAudio(ctx, stream, mediaSet) }); err != nil { // the progress reader is closed inside the worker in the non-error case. s.CloseWithError(err) } }() return s, nil } // audioGetterState represents the state of an individual audio fetch. type audioGetterState struct { *audioGetter *getPeaksProgressReader } func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaSet store.MediaSet) error { streamWithProgress := newLogProgressReader(r, "audio", mediaSet.AudioContentLength, s.logger) var stdErr bytes.Buffer cmd := s.commandFunc(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-") cmd.Stderr = &stdErr // ffmpegWriter accepts encoded audio and pipes it to FFmpeg. ffmpegWriter, err := cmd.StdinPipe() if err != nil { return fmt.Errorf("error getting stdin: %v", err) } uploadReader, uploadWriter := io.Pipe() mw := io.MultiWriter(uploadWriter, ffmpegWriter) // ffmpegReader delivers raw audio output from FFmpeg, and also writes it // back to the progress reader. var ffmpegReader io.Reader if stdoutPipe, err := cmd.StdoutPipe(); err == nil { ffmpegReader = io.TeeReader(stdoutPipe, s) } else { return fmt.Errorf("error getting stdout: %v", err) } var presignedAudioURL string g, ctx := errgroup.WithContext(ctx) // Upload the encoded audio. g.Go(func() error { // TODO: use mediaSet func to fetch key key := fmt.Sprintf("media_sets/%s/audio.opus", mediaSet.ID) _, encErr := s.fileStore.PutObject(ctx, key, uploadReader, "audio/opus") if encErr != nil { return fmt.Errorf("error uploading encoded audio: %v", encErr) } presignedAudioURL, encErr = s.fileStore.GetURL(ctx, key) if encErr != nil { return fmt.Errorf("error generating presigned URL: %v", encErr) } if _, encErr = s.store.SetEncodedAudioUploaded(ctx, store.SetEncodedAudioUploadedParams{ ID: mediaSet.ID, AudioEncodedS3Key: sqlString(key), }); encErr != nil { return fmt.Errorf("error setting encoded audio uploaded: %v", encErr) } return nil }) // Upload the raw audio. g.Go(func() error { // TODO: use mediaSet func to fetch key key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID) bytesUploaded, rawErr := s.fileStore.PutObject(ctx, key, ffmpegReader, rawAudioMimeType) if rawErr != nil { return fmt.Errorf("error uploading raw audio: %v", rawErr) } if _, rawErr = s.store.SetRawAudioUploaded(ctx, store.SetRawAudioUploadedParams{ ID: mediaSet.ID, AudioRawS3Key: sqlString(key), AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(mediaSet.AudioChannels)), }); rawErr != nil { return fmt.Errorf("error setting raw audio uploaded: %v", rawErr) } return nil }) g.Go(func() error { if _, err := io.Copy(mw, streamWithProgress); err != nil { return fmt.Errorf("error copying: %v", err) } // ignoring the following Close errors should be ok, as the Copy has // already completed successfully. if err := ffmpegWriter.Close(); err != nil { s.logger.With("err", err).Warn("getAudio: unable to close ffmpegWriter") } if err := uploadWriter.Close(); err != nil { s.logger.With("err", err).Warn("getAudio: unable to close pipeWriter") } if err := r.Close(); err != nil { s.logger.With("err", err).Warn("getAudio: unable to close stream") } return nil }) if err := cmd.Start(); err != nil { return fmt.Errorf("error starting command: %v, output: %s", err, stdErr.String()) } if err := g.Wait(); err != nil { return fmt.Errorf("error uploading: %v", err) } if err := cmd.Wait(); err != nil { return fmt.Errorf("error waiting for command: %v, output: %s", err, stdErr.String()) } // Finally, close the progress reader so that the subsequent call to Next() // returns the presigned URL and io.EOF. s.Close(presignedAudioURL) return nil } // getPeaksProgressReader accepts a byte stream containing little endian // signed int16s and, given a target number of bins, emits a stream of peaks // corresponding to each channel of the audio data. type getPeaksProgressReader struct { framesExpected int64 channels int framesPerBin int samples []int16 currPeaks []int16 currCount int framesProcessed int64 url string progress chan GetPeaksProgress errorChan chan error } func newGetPeaksProgressReader(framesExpected int64, channels, numBins int) (*getPeaksProgressReader, error) { if framesExpected <= 0 || channels <= 0 || numBins <= 0 { return nil, fmt.Errorf("error creating audio progress reader (framesExpected = %d, channels = %d, numBins = %d)", framesExpected, channels, numBins) } return &getPeaksProgressReader{ channels: channels, framesExpected: framesExpected, framesPerBin: int(math.Ceil(float64(framesExpected) / float64(numBins))), samples: make([]int16, 8_192), currPeaks: make([]int16, channels), progress: make(chan GetPeaksProgress), errorChan: make(chan error, 1), }, nil } func (w *getPeaksProgressReader) CloseWithError(err error) { w.errorChan <- err } // Close cloes the reader and returns the provided URL to the calling code. func (w *getPeaksProgressReader) Close(url string) error { w.url = url close(w.progress) return nil } func (w *getPeaksProgressReader) Next() (GetPeaksProgress, error) { for { select { case progress, ok := <-w.progress: if !ok { return GetPeaksProgress{Peaks: w.currPeaks, PercentComplete: w.percentComplete(), URL: w.url}, io.EOF } return progress, nil case err := <-w.errorChan: return GetPeaksProgress{}, fmt.Errorf("error waiting for progress: %v", err) } } } func (w *getPeaksProgressReader) Write(p []byte) (int, error) { // expand our target slice if it is of insufficient size: numSamples := len(p) / SizeOfInt16 if len(w.samples) < numSamples { w.samples = append(w.samples, make([]int16, numSamples-len(w.samples))...) } samples := w.samples[:numSamples] if err := binary.Read(bytes.NewReader(p), binary.LittleEndian, samples); err != nil { return 0, fmt.Errorf("error parsing samples: %v", err) } for i := 0; i < len(samples); i += w.channels { for j := 0; j < w.channels; j++ { samp := samples[i+j] if samp < 0 { samp = -samp } if samp > w.currPeaks[j] { w.currPeaks[j] = samp } } w.currCount++ if w.currCount == w.framesPerBin { w.nextBin() } } w.framesProcessed += int64(len(samples) / w.channels) return len(p), nil } func (w *getPeaksProgressReader) percentComplete() float32 { return (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0 } func (w *getPeaksProgressReader) nextBin() { var progress GetPeaksProgress progress.Peaks = append(progress.Peaks, w.currPeaks...) progress.PercentComplete = w.percentComplete() w.progress <- progress w.currCount = 0 for i := 0; i < len(w.currPeaks); i++ { w.currPeaks[i] = 0 } }