clipper/backend/media/get_audio.go

301 lines
8.6 KiB
Go

package media
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"math"
"os/exec"
"strconv"
"sync"
"git.netflux.io/rob/clipper/config"
"git.netflux.io/rob/clipper/generated/store"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"go.uber.org/zap"
)
type GetAudioProgress struct {
PercentComplete float32
Peaks []int16
URL string
}
type GetAudioProgressReader interface {
Next() (GetAudioProgress, error)
Close(string) error
}
// audioGetter manages getting and processing audio from Youtube.
type audioGetter struct {
store Store
youtube YoutubeClient
s3API S3API
config config.Config
logger *zap.SugaredLogger
}
// newAudioGetter returns a new audioGetter.
func newAudioGetter(store Store, youtube YoutubeClient, s3API S3API, config config.Config, logger *zap.SugaredLogger) *audioGetter {
return &audioGetter{
store: store,
youtube: youtube,
s3API: s3API,
config: config,
logger: logger,
}
}
// GetAudio gets the audio, processes it and uploads it to S3. It returns a
// GetAudioProgressReader that can be used to poll progress reports and audio
// peaks.
//
// TODO: accept domain object instead
func (g *audioGetter) GetAudio(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) {
video, err := g.youtube.GetVideoContext(ctx, mediaSet.YoutubeID)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag))
if format == nil {
return nil, fmt.Errorf("error finding itag: %v", err)
}
stream, _, err := g.youtube.GetStreamContext(ctx, video, format)
if err != nil {
return nil, fmt.Errorf("error fetching stream: %v", err)
}
audioProgressReader, err := newGetAudioProgressReader(mediaSet.AudioFramesApprox, int(mediaSet.AudioChannels), numBins)
if err != nil {
return nil, fmt.Errorf("error building progress reader: %v", err)
}
s := &audioGetterState{
audioGetter: g,
getAudioProgressReader: audioProgressReader,
}
go s.getAudio(ctx, stream, mediaSet)
return s, nil
}
// audioGetterState represents the state of an individual audio fetch.
type audioGetterState struct {
*audioGetter
*getAudioProgressReader
}
func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaSet store.MediaSet) {
streamWithProgress := newProgressReader(r, "audio", mediaSet.AudioContentLength, s.logger)
pr, pw := io.Pipe()
teeReader := io.TeeReader(streamWithProgress, pw)
var stdErr bytes.Buffer
cmd := exec.CommandContext(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
cmd.Stdin = teeReader
cmd.Stderr = &stdErr
stdout, err := cmd.StdoutPipe()
if err != nil {
s.CloseWithError(fmt.Errorf("error getting stdout: %v", err))
return
}
if err = cmd.Start(); err != nil {
s.CloseWithError(fmt.Errorf("error starting command: %v, output: %s", err, stdErr.String()))
return
}
var presignedAudioURL string
var wg sync.WaitGroup
wg.Add(2)
// Upload the encoded audio.
go func() {
defer wg.Done()
// TODO: use mediaSet func to fetch s3Key
s3Key := fmt.Sprintf("media_sets/%s/audio.opus", mediaSet.ID)
uploader := newMultipartUploader(s.s3API, s.logger)
_, encErr := uploader.Upload(ctx, pr, s.config.S3Bucket, s3Key, "audio/opus")
if encErr != nil {
s.CloseWithError(fmt.Errorf("error uploading encoded audio: %v", encErr))
return
}
input := s3.GetObjectInput{
Bucket: aws.String(s.config.S3Bucket),
Key: aws.String(s3Key),
}
request, err := s.s3API.PresignGetObject(ctx, &input, s3.WithPresignExpires(getAudioExpiresIn))
if err != nil {
s.CloseWithError(fmt.Errorf("error generating presigned URL: %v", err))
}
presignedAudioURL = request.URL
if _, err = s.store.SetEncodedAudioUploaded(ctx, store.SetEncodedAudioUploadedParams{
ID: mediaSet.ID,
AudioEncodedS3Bucket: sqlString(s.config.S3Bucket),
AudioEncodedS3Key: sqlString(s3Key),
}); err != nil {
s.CloseWithError(fmt.Errorf("error setting encoded audio uploaded: %v", err))
}
}()
// Upload the raw audio.
go func() {
defer wg.Done()
// TODO: use mediaSet func to fetch s3Key
s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID)
teeReader := io.TeeReader(stdout, s)
uploader := newMultipartUploader(s.s3API, s.logger)
bytesUploaded, rawErr := uploader.Upload(ctx, teeReader, s.config.S3Bucket, s3Key, rawAudioMimeType)
if rawErr != nil {
s.CloseWithError(fmt.Errorf("error uploading raw audio: %v", rawErr))
return
}
if _, err = s.store.SetRawAudioUploaded(ctx, store.SetRawAudioUploadedParams{
ID: mediaSet.ID,
AudioRawS3Bucket: sqlString(s.config.S3Bucket),
AudioRawS3Key: sqlString(s3Key),
AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(mediaSet.AudioChannels)),
}); err != nil {
s.CloseWithError(fmt.Errorf("error setting raw audio uploaded: %v", err))
}
}()
if err = cmd.Wait(); err != nil {
// TODO: cancel other goroutines (e.g. video fetch) if an error occurs here.
s.CloseWithError(fmt.Errorf("error waiting for command: %v, output: %s", err, stdErr.String()))
return
}
// Close the pipe sending encoded audio to be uploaded, this ensures the
// uploader reading from the pipe will receive io.EOF and complete
// successfully.
pw.Close()
// Wait for the uploaders to complete.
wg.Wait()
// Finally, close the progress reader so that the subsequent call to Next()
// returns the presigned URL and io.EOF.
s.Close(presignedAudioURL)
}
// getAudioProgressReader accepts a byte stream containing little endian
// signed int16s and, given a target number of bins, emits a stream of peaks
// corresponding to each channel of the audio data.
type getAudioProgressReader struct {
framesExpected int64
channels int
framesPerBin int
samples []int16
currPeaks []int16
currCount int
framesProcessed int64
url string
progress chan GetAudioProgress
errorChan chan error
}
func newGetAudioProgressReader(framesExpected int64, channels, numBins int) (*getAudioProgressReader, error) {
if framesExpected <= 0 || channels <= 0 || numBins <= 0 {
return nil, fmt.Errorf("error creating audio progress reader (framesExpected = %d, channels = %d, numBins = %d)", framesExpected, channels, numBins)
}
return &getAudioProgressReader{
channels: channels,
framesExpected: framesExpected,
framesPerBin: int(math.Ceil(float64(framesExpected) / float64(numBins))),
samples: make([]int16, 8_192),
currPeaks: make([]int16, channels),
progress: make(chan GetAudioProgress),
errorChan: make(chan error, 1),
}, nil
}
func (w *getAudioProgressReader) CloseWithError(err error) {
w.errorChan <- err
}
// Close cloes the reader and returns the provided URL to the calling code.
func (w *getAudioProgressReader) Close(url string) error {
w.url = url
close(w.progress)
return nil
}
func (w *getAudioProgressReader) Next() (GetAudioProgress, error) {
for {
select {
case progress, ok := <-w.progress:
if !ok {
return GetAudioProgress{Peaks: w.currPeaks, PercentComplete: w.percentComplete(), URL: w.url}, io.EOF
}
return progress, nil
case err := <-w.errorChan:
return GetAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err)
}
}
}
func (w *getAudioProgressReader) Write(p []byte) (int, error) {
// expand our target slice if it is of insufficient size:
numSamples := len(p) / SizeOfInt16
if len(w.samples) < numSamples {
w.samples = append(w.samples, make([]int16, numSamples-len(w.samples))...)
}
samples := w.samples[:numSamples]
if err := binary.Read(bytes.NewReader(p), binary.LittleEndian, samples); err != nil {
return 0, fmt.Errorf("error parsing samples: %v", err)
}
for i := 0; i < len(samples); i += w.channels {
for j := 0; j < w.channels; j++ {
samp := samples[i+j]
if samp < 0 {
samp = -samp
}
if samp > w.currPeaks[j] {
w.currPeaks[j] = samp
}
}
w.currCount++
if w.currCount == w.framesPerBin {
w.nextBin()
}
}
w.framesProcessed += int64(len(samples) / w.channels)
return len(p), nil
}
func (w *getAudioProgressReader) percentComplete() float32 {
return (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0
}
func (w *getAudioProgressReader) nextBin() {
var progress GetAudioProgress
progress.Peaks = append(progress.Peaks, w.currPeaks...)
progress.PercentComplete = w.percentComplete()
w.progress <- progress
w.currCount = 0
for i := 0; i < len(w.currPeaks); i++ {
w.currPeaks[i] = 0
}
}