2021-11-29 11:46:33 +00:00
|
|
|
package media
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"encoding/binary"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"math"
|
|
|
|
"strconv"
|
|
|
|
|
|
|
|
"git.netflux.io/rob/clipper/config"
|
|
|
|
"git.netflux.io/rob/clipper/generated/store"
|
|
|
|
"go.uber.org/zap"
|
2022-01-05 18:49:21 +00:00
|
|
|
"golang.org/x/sync/errgroup"
|
2021-11-29 11:46:33 +00:00
|
|
|
)
|
|
|
|
|
2021-12-17 16:30:53 +00:00
|
|
|
type GetPeaksProgress struct {
|
2021-11-29 11:46:33 +00:00
|
|
|
PercentComplete float32
|
|
|
|
Peaks []int16
|
2021-11-29 14:55:11 +00:00
|
|
|
URL string
|
2022-01-25 19:06:19 +00:00
|
|
|
AudioFrames int64
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
|
|
|
|
2021-12-17 16:30:53 +00:00
|
|
|
type GetPeaksProgressReader interface {
|
|
|
|
Next() (GetPeaksProgress, error)
|
2021-11-29 14:55:11 +00:00
|
|
|
Close(string) error
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// audioGetter manages getting and processing audio from Youtube.
|
|
|
|
type audioGetter struct {
|
2022-01-03 17:44:19 +00:00
|
|
|
store Store
|
|
|
|
youtube YoutubeClient
|
|
|
|
fileStore FileStore
|
|
|
|
commandFunc CommandFunc
|
2022-01-05 18:49:21 +00:00
|
|
|
workerPool *WorkerPool
|
2022-01-03 17:44:19 +00:00
|
|
|
config config.Config
|
|
|
|
logger *zap.SugaredLogger
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// newAudioGetter returns a new audioGetter.
|
2022-01-05 18:49:21 +00:00
|
|
|
func newAudioGetter(store Store, youtube YoutubeClient, fileStore FileStore, commandFunc CommandFunc, workerPool *WorkerPool, config config.Config, logger *zap.SugaredLogger) *audioGetter {
|
2021-11-29 11:46:33 +00:00
|
|
|
return &audioGetter{
|
2022-01-03 17:44:19 +00:00
|
|
|
store: store,
|
|
|
|
youtube: youtube,
|
|
|
|
fileStore: fileStore,
|
|
|
|
commandFunc: commandFunc,
|
2022-01-05 18:49:21 +00:00
|
|
|
workerPool: workerPool,
|
2022-01-03 17:44:19 +00:00
|
|
|
config: config,
|
|
|
|
logger: logger,
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-07 19:58:11 +00:00
|
|
|
// GetAudio gets the audio, processes it and uploads it to a file store. It
|
|
|
|
// returns a GetAudioProgressReader that can be used to poll progress reports
|
|
|
|
// and audio peaks.
|
2021-11-29 11:46:33 +00:00
|
|
|
//
|
|
|
|
// TODO: accept domain object instead
|
2021-12-17 16:30:53 +00:00
|
|
|
func (g *audioGetter) GetAudio(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetPeaksProgressReader, error) {
|
2021-11-29 11:46:33 +00:00
|
|
|
video, err := g.youtube.GetVideoContext(ctx, mediaSet.YoutubeID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("error fetching video: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag))
|
|
|
|
if format == nil {
|
2022-01-03 17:44:19 +00:00
|
|
|
return nil, fmt.Errorf("error finding itag: %d", mediaSet.AudioYoutubeItag)
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
stream, _, err := g.youtube.GetStreamContext(ctx, video, format)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("error fetching stream: %v", err)
|
|
|
|
}
|
|
|
|
|
2021-12-17 16:30:53 +00:00
|
|
|
audioProgressReader, err := newGetPeaksProgressReader(mediaSet.AudioFramesApprox, int(mediaSet.AudioChannels), numBins)
|
2021-11-29 11:46:33 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("error building progress reader: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
s := &audioGetterState{
|
|
|
|
audioGetter: g,
|
2021-12-17 16:30:53 +00:00
|
|
|
getPeaksProgressReader: audioProgressReader,
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
2022-01-05 18:49:21 +00:00
|
|
|
|
|
|
|
go func() {
|
|
|
|
if err := g.workerPool.WaitForTask(ctx, func() error { return s.getAudio(ctx, stream, mediaSet) }); err != nil {
|
|
|
|
// the progress reader is closed inside the worker in the non-error case.
|
|
|
|
s.CloseWithError(err)
|
|
|
|
}
|
|
|
|
}()
|
2021-11-29 11:46:33 +00:00
|
|
|
|
|
|
|
return s, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// audioGetterState represents the state of an individual audio fetch.
|
|
|
|
type audioGetterState struct {
|
|
|
|
*audioGetter
|
2021-12-17 16:30:53 +00:00
|
|
|
*getPeaksProgressReader
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
|
|
|
|
2022-01-05 18:49:21 +00:00
|
|
|
func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaSet store.MediaSet) error {
|
2021-12-17 16:30:53 +00:00
|
|
|
streamWithProgress := newLogProgressReader(r, "audio", mediaSet.AudioContentLength, s.logger)
|
2021-11-29 11:46:33 +00:00
|
|
|
|
|
|
|
var stdErr bytes.Buffer
|
2022-01-03 17:44:19 +00:00
|
|
|
cmd := s.commandFunc(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
|
2021-11-29 11:46:33 +00:00
|
|
|
cmd.Stderr = &stdErr
|
2022-01-05 18:49:21 +00:00
|
|
|
|
|
|
|
// ffmpegWriter accepts encoded audio and pipes it to FFmpeg.
|
|
|
|
ffmpegWriter, err := cmd.StdinPipe()
|
2021-11-29 11:46:33 +00:00
|
|
|
if err != nil {
|
2022-01-05 18:49:21 +00:00
|
|
|
return fmt.Errorf("error getting stdin: %v", err)
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
2022-01-05 18:49:21 +00:00
|
|
|
|
|
|
|
uploadReader, uploadWriter := io.Pipe()
|
|
|
|
mw := io.MultiWriter(uploadWriter, ffmpegWriter)
|
|
|
|
|
|
|
|
// ffmpegReader delivers raw audio output from FFmpeg, and also writes it
|
|
|
|
// back to the progress reader.
|
|
|
|
var ffmpegReader io.Reader
|
|
|
|
if stdoutPipe, err := cmd.StdoutPipe(); err == nil {
|
|
|
|
ffmpegReader = io.TeeReader(stdoutPipe, s)
|
|
|
|
} else {
|
|
|
|
return fmt.Errorf("error getting stdout: %v", err)
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
|
|
|
|
2021-11-29 14:55:11 +00:00
|
|
|
var presignedAudioURL string
|
2022-01-05 18:49:21 +00:00
|
|
|
g, ctx := errgroup.WithContext(ctx)
|
2021-11-29 13:59:05 +00:00
|
|
|
|
|
|
|
// Upload the encoded audio.
|
2022-01-05 18:49:21 +00:00
|
|
|
g.Go(func() error {
|
2021-12-07 19:58:11 +00:00
|
|
|
// TODO: use mediaSet func to fetch key
|
|
|
|
key := fmt.Sprintf("media_sets/%s/audio.opus", mediaSet.ID)
|
2021-11-29 13:59:05 +00:00
|
|
|
|
2022-01-05 18:49:21 +00:00
|
|
|
_, encErr := s.fileStore.PutObject(ctx, key, uploadReader, "audio/opus")
|
2021-11-29 13:59:05 +00:00
|
|
|
if encErr != nil {
|
2022-01-05 18:49:21 +00:00
|
|
|
return fmt.Errorf("error uploading encoded audio: %v", encErr)
|
2021-11-29 13:59:05 +00:00
|
|
|
}
|
|
|
|
|
2022-01-05 18:49:21 +00:00
|
|
|
presignedAudioURL, encErr = s.fileStore.GetURL(ctx, key)
|
|
|
|
if encErr != nil {
|
|
|
|
return fmt.Errorf("error generating presigned URL: %v", encErr)
|
2021-11-29 14:55:11 +00:00
|
|
|
}
|
|
|
|
|
2022-01-05 18:49:21 +00:00
|
|
|
if _, encErr = s.store.SetEncodedAudioUploaded(ctx, store.SetEncodedAudioUploadedParams{
|
2021-12-13 03:28:14 +00:00
|
|
|
ID: mediaSet.ID,
|
|
|
|
AudioEncodedS3Key: sqlString(key),
|
2022-01-05 18:49:21 +00:00
|
|
|
}); encErr != nil {
|
|
|
|
return fmt.Errorf("error setting encoded audio uploaded: %v", encErr)
|
2021-11-29 13:59:05 +00:00
|
|
|
}
|
|
|
|
|
2022-01-05 18:49:21 +00:00
|
|
|
return nil
|
|
|
|
})
|
2021-11-29 13:59:05 +00:00
|
|
|
|
2022-01-05 18:49:21 +00:00
|
|
|
// Upload the raw audio.
|
|
|
|
g.Go(func() error {
|
2021-12-07 19:58:11 +00:00
|
|
|
// TODO: use mediaSet func to fetch key
|
|
|
|
key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID)
|
2021-11-29 13:59:05 +00:00
|
|
|
|
2022-01-05 18:49:21 +00:00
|
|
|
bytesUploaded, rawErr := s.fileStore.PutObject(ctx, key, ffmpegReader, rawAudioMimeType)
|
2021-11-29 13:59:05 +00:00
|
|
|
if rawErr != nil {
|
2022-01-05 18:49:21 +00:00
|
|
|
return fmt.Errorf("error uploading raw audio: %v", rawErr)
|
2021-11-29 13:59:05 +00:00
|
|
|
}
|
|
|
|
|
2022-01-05 18:49:21 +00:00
|
|
|
if _, rawErr = s.store.SetRawAudioUploaded(ctx, store.SetRawAudioUploadedParams{
|
2021-12-13 03:28:14 +00:00
|
|
|
ID: mediaSet.ID,
|
|
|
|
AudioRawS3Key: sqlString(key),
|
|
|
|
AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(mediaSet.AudioChannels)),
|
2022-01-05 18:49:21 +00:00
|
|
|
}); rawErr != nil {
|
|
|
|
return fmt.Errorf("error setting raw audio uploaded: %v", rawErr)
|
2021-11-29 13:59:05 +00:00
|
|
|
}
|
2021-11-29 11:46:33 +00:00
|
|
|
|
2022-01-05 18:49:21 +00:00
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
|
|
|
g.Go(func() error {
|
|
|
|
if _, err := io.Copy(mw, streamWithProgress); err != nil {
|
|
|
|
return fmt.Errorf("error copying: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ignoring the following Close errors should be ok, as the Copy has
|
|
|
|
// already completed successfully.
|
|
|
|
|
|
|
|
if err := ffmpegWriter.Close(); err != nil {
|
|
|
|
s.logger.With("err", err).Warn("getAudio: unable to close ffmpegWriter")
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := uploadWriter.Close(); err != nil {
|
|
|
|
s.logger.With("err", err).Warn("getAudio: unable to close pipeWriter")
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := r.Close(); err != nil {
|
|
|
|
s.logger.With("err", err).Warn("getAudio: unable to close stream")
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
|
|
return fmt.Errorf("error starting command: %v, output: %s", err, stdErr.String())
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
|
|
|
|
2022-01-05 18:49:21 +00:00
|
|
|
if err := g.Wait(); err != nil {
|
|
|
|
return fmt.Errorf("error uploading: %v", err)
|
|
|
|
}
|
2021-11-29 11:46:33 +00:00
|
|
|
|
2022-01-05 18:49:21 +00:00
|
|
|
if err := cmd.Wait(); err != nil {
|
|
|
|
return fmt.Errorf("error waiting for command: %v, output: %s", err, stdErr.String())
|
|
|
|
}
|
2021-11-29 13:59:05 +00:00
|
|
|
|
|
|
|
// Finally, close the progress reader so that the subsequent call to Next()
|
2021-11-29 14:55:11 +00:00
|
|
|
// returns the presigned URL and io.EOF.
|
|
|
|
s.Close(presignedAudioURL)
|
2022-01-05 18:49:21 +00:00
|
|
|
|
|
|
|
return nil
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
|
|
|
|
2021-12-17 16:30:53 +00:00
|
|
|
// getPeaksProgressReader accepts a byte stream containing little endian
|
2021-11-29 11:46:33 +00:00
|
|
|
// signed int16s and, given a target number of bins, emits a stream of peaks
|
|
|
|
// corresponding to each channel of the audio data.
|
2021-12-17 16:30:53 +00:00
|
|
|
type getPeaksProgressReader struct {
|
2021-11-29 11:46:33 +00:00
|
|
|
framesExpected int64
|
|
|
|
channels int
|
|
|
|
framesPerBin int
|
|
|
|
|
|
|
|
samples []int16
|
|
|
|
currPeaks []int16
|
|
|
|
currCount int
|
|
|
|
framesProcessed int64
|
2021-11-29 14:55:11 +00:00
|
|
|
url string
|
2021-12-17 16:30:53 +00:00
|
|
|
progress chan GetPeaksProgress
|
2021-11-29 11:46:33 +00:00
|
|
|
errorChan chan error
|
|
|
|
}
|
|
|
|
|
2021-12-17 16:30:53 +00:00
|
|
|
func newGetPeaksProgressReader(framesExpected int64, channels, numBins int) (*getPeaksProgressReader, error) {
|
2021-11-29 11:46:33 +00:00
|
|
|
if framesExpected <= 0 || channels <= 0 || numBins <= 0 {
|
|
|
|
return nil, fmt.Errorf("error creating audio progress reader (framesExpected = %d, channels = %d, numBins = %d)", framesExpected, channels, numBins)
|
|
|
|
}
|
|
|
|
|
2021-12-17 16:30:53 +00:00
|
|
|
return &getPeaksProgressReader{
|
2021-11-29 11:46:33 +00:00
|
|
|
channels: channels,
|
|
|
|
framesExpected: framesExpected,
|
|
|
|
framesPerBin: int(math.Ceil(float64(framesExpected) / float64(numBins))),
|
|
|
|
samples: make([]int16, 8_192),
|
|
|
|
currPeaks: make([]int16, channels),
|
2021-12-17 16:30:53 +00:00
|
|
|
progress: make(chan GetPeaksProgress),
|
2021-11-29 11:46:33 +00:00
|
|
|
errorChan: make(chan error, 1),
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2021-12-17 16:30:53 +00:00
|
|
|
func (w *getPeaksProgressReader) CloseWithError(err error) {
|
2021-11-29 11:46:33 +00:00
|
|
|
w.errorChan <- err
|
|
|
|
}
|
|
|
|
|
2021-11-29 14:55:11 +00:00
|
|
|
// Close cloes the reader and returns the provided URL to the calling code.
|
2021-12-17 16:30:53 +00:00
|
|
|
func (w *getPeaksProgressReader) Close(url string) error {
|
2021-11-29 14:55:11 +00:00
|
|
|
w.url = url
|
2021-11-29 11:46:33 +00:00
|
|
|
close(w.progress)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-12-17 16:30:53 +00:00
|
|
|
func (w *getPeaksProgressReader) Next() (GetPeaksProgress, error) {
|
2021-11-29 11:46:33 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case progress, ok := <-w.progress:
|
|
|
|
if !ok {
|
2022-01-25 19:06:19 +00:00
|
|
|
return GetPeaksProgress{
|
|
|
|
Peaks: w.currPeaks,
|
|
|
|
PercentComplete: w.percentComplete(),
|
|
|
|
URL: w.url,
|
|
|
|
AudioFrames: w.framesProcessed,
|
|
|
|
}, io.EOF
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
|
|
|
return progress, nil
|
|
|
|
case err := <-w.errorChan:
|
2021-12-17 16:30:53 +00:00
|
|
|
return GetPeaksProgress{}, fmt.Errorf("error waiting for progress: %v", err)
|
2021-11-29 11:46:33 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-17 16:30:53 +00:00
|
|
|
func (w *getPeaksProgressReader) Write(p []byte) (int, error) {
|
2021-11-29 11:46:33 +00:00
|
|
|
// expand our target slice if it is of insufficient size:
|
|
|
|
numSamples := len(p) / SizeOfInt16
|
|
|
|
if len(w.samples) < numSamples {
|
|
|
|
w.samples = append(w.samples, make([]int16, numSamples-len(w.samples))...)
|
|
|
|
}
|
|
|
|
|
|
|
|
samples := w.samples[:numSamples]
|
|
|
|
|
|
|
|
if err := binary.Read(bytes.NewReader(p), binary.LittleEndian, samples); err != nil {
|
|
|
|
return 0, fmt.Errorf("error parsing samples: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := 0; i < len(samples); i += w.channels {
|
|
|
|
for j := 0; j < w.channels; j++ {
|
|
|
|
samp := samples[i+j]
|
|
|
|
if samp < 0 {
|
|
|
|
samp = -samp
|
|
|
|
}
|
|
|
|
if samp > w.currPeaks[j] {
|
|
|
|
w.currPeaks[j] = samp
|
|
|
|
}
|
|
|
|
}
|
|
|
|
w.currCount++
|
|
|
|
if w.currCount == w.framesPerBin {
|
|
|
|
w.nextBin()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
w.framesProcessed += int64(len(samples) / w.channels)
|
|
|
|
|
|
|
|
return len(p), nil
|
|
|
|
}
|
|
|
|
|
2021-12-17 16:30:53 +00:00
|
|
|
func (w *getPeaksProgressReader) percentComplete() float32 {
|
2021-11-29 11:46:33 +00:00
|
|
|
return (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0
|
|
|
|
}
|
|
|
|
|
2021-12-17 16:30:53 +00:00
|
|
|
func (w *getPeaksProgressReader) nextBin() {
|
|
|
|
var progress GetPeaksProgress
|
2021-11-29 11:46:33 +00:00
|
|
|
progress.Peaks = append(progress.Peaks, w.currPeaks...)
|
|
|
|
progress.PercentComplete = w.percentComplete()
|
|
|
|
|
|
|
|
w.progress <- progress
|
|
|
|
|
|
|
|
w.currCount = 0
|
|
|
|
for i := 0; i < len(w.currPeaks); i++ {
|
|
|
|
w.currPeaks[i] = 0
|
|
|
|
}
|
|
|
|
}
|