Refactor audio fetching logic

This commit is contained in:
Rob Watson 2021-11-29 12:46:33 +01:00
parent e1a15a5e69
commit c3da27ca49
10 changed files with 294 additions and 326 deletions

View File

@ -1,125 +0,0 @@
package media
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"math"
)
type GetAudioProgress struct {
PercentComplete float32
Peaks []int16
}
type GetAudioProgressReader interface {
Read() (GetAudioProgress, error)
Close() error
}
// getAudioProgressReader accepts a byte stream containing little endian
// signed int16s and, given a target number of bins, emits a stream of peaks
// corresponding to each channel of the audio data.
type getAudioProgressReader struct {
framesExpected int64
channels int
framesPerBin int
samples []int16
currPeaks []int16
currCount int
framesProcessed int64
progress chan GetAudioProgress
errorChan chan error
}
func newGetAudioProgressReader(framesExpected int64, channels, numBins int) (*getAudioProgressReader, error) {
if framesExpected <= 0 || channels <= 0 || numBins <= 0 {
return nil, fmt.Errorf("error creating audio progress reader (framesExpected = %d, channels = %d, numBins = %d)", framesExpected, channels, numBins)
}
return &getAudioProgressReader{
channels: channels,
framesExpected: framesExpected,
framesPerBin: int(math.Ceil(float64(framesExpected) / float64(numBins))),
samples: make([]int16, 8_192),
currPeaks: make([]int16, channels),
progress: make(chan GetAudioProgress),
errorChan: make(chan error, 1),
}, nil
}
func (w *getAudioProgressReader) Abort(err error) {
w.errorChan <- err
}
func (w *getAudioProgressReader) Close() error {
close(w.progress)
return nil
}
func (w *getAudioProgressReader) Read() (GetAudioProgress, error) {
for {
select {
case progress, ok := <-w.progress:
if !ok {
return GetAudioProgress{Peaks: w.currPeaks, PercentComplete: w.percentComplete()}, io.EOF
}
return progress, nil
case err := <-w.errorChan:
return GetAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err)
}
}
}
func (w *getAudioProgressReader) Write(p []byte) (int, error) {
// expand our target slice if it is of insufficient size:
numSamples := len(p) / SizeOfInt16
if len(w.samples) < numSamples {
w.samples = append(w.samples, make([]int16, numSamples-len(w.samples))...)
}
samples := w.samples[:numSamples]
if err := binary.Read(bytes.NewReader(p), binary.LittleEndian, samples); err != nil {
return 0, fmt.Errorf("error parsing samples: %v", err)
}
for i := 0; i < len(samples); i += w.channels {
for j := 0; j < w.channels; j++ {
samp := samples[i+j]
if samp < 0 {
samp = -samp
}
if samp > w.currPeaks[j] {
w.currPeaks[j] = samp
}
}
w.currCount++
if w.currCount == w.framesPerBin {
w.nextBin()
}
}
w.framesProcessed += int64(len(samples) / w.channels)
return len(p), nil
}
func (w *getAudioProgressReader) percentComplete() float32 {
return (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0
}
func (w *getAudioProgressReader) nextBin() {
var progress GetAudioProgress
progress.Peaks = append(progress.Peaks, w.currPeaks...)
progress.PercentComplete = w.percentComplete()
w.progress <- progress
w.currCount = 0
for i := 0; i < len(w.currPeaks); i++ {
w.currPeaks[i] = 0
}
}

View File

@ -1,56 +0,0 @@
package media
import (
"bytes"
"context"
"fmt"
"io"
"os/exec"
)
type ffmpegReader struct {
io.ReadCloser
cmd *exec.Cmd
stdErrBuf *bytes.Buffer
}
func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmpegReader, error) {
var stdErr bytes.Buffer
cmd := exec.CommandContext(ctx, "ffmpeg", arg...)
cmd.Stdin = input
cmd.Stderr = &stdErr
r, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("error creating pipe: %v", err)
}
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("error starting ffmpeg: %v, output: %s", err, stdErr.String())
}
return &ffmpegReader{ReadCloser: r, cmd: cmd, stdErrBuf: &stdErr}, nil
}
func (r *ffmpegReader) Cancel() error {
if err := r.cmd.Process.Kill(); err != nil {
return fmt.Errorf("error killing ffmpeg process: %v", err)
}
return nil
}
func (r *ffmpegReader) Close() error {
state, err := r.cmd.Process.Wait()
if err != nil {
return fmt.Errorf("error returned from ffmpeg process: %v", err)
}
if state.ExitCode() != 0 {
return fmt.Errorf("non-zero status %d returned from ffmpeg process, output: %s", state.ExitCode(), r.stdErrBuf.String())
}
return nil
}

240
backend/media/get_audio.go Normal file
View File

@ -0,0 +1,240 @@
package media
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"math"
"os/exec"
"strconv"
"git.netflux.io/rob/clipper/config"
"git.netflux.io/rob/clipper/generated/store"
"go.uber.org/zap"
)
type GetAudioProgress struct {
PercentComplete float32
Peaks []int16
}
type GetAudioProgressReader interface {
Next() (GetAudioProgress, error)
Close() error
}
// audioGetter manages getting and processing audio from Youtube.
type audioGetter struct {
store Store
youtube YoutubeClient
s3API S3API
config config.Config
logger *zap.SugaredLogger
}
// newAudioGetter returns a new audioGetter.
func newAudioGetter(store Store, youtube YoutubeClient, s3API S3API, config config.Config, logger *zap.SugaredLogger) *audioGetter {
return &audioGetter{
store: store,
youtube: youtube,
s3API: s3API,
config: config,
logger: logger,
}
}
// GetAudio gets the audio, processes it and uploads it to S3. It returns a
// GetAudioProgressReader that can be used to poll progress reports and audio
// peaks.
//
// TODO: accept domain object instead
func (g *audioGetter) GetAudio(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) {
video, err := g.youtube.GetVideoContext(ctx, mediaSet.YoutubeID)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag))
if format == nil {
return nil, fmt.Errorf("error finding itag: %v", err)
}
stream, _, err := g.youtube.GetStreamContext(ctx, video, format)
if err != nil {
return nil, fmt.Errorf("error fetching stream: %v", err)
}
audioProgressReader, err := newGetAudioProgressReader(mediaSet.AudioFramesApprox, int(mediaSet.AudioChannels), numBins)
if err != nil {
return nil, fmt.Errorf("error building progress reader: %v", err)
}
s := &audioGetterState{
audioGetter: g,
getAudioProgressReader: audioProgressReader,
}
go s.getAudio(ctx, stream, mediaSet)
return s, nil
}
// audioGetterState represents the state of an individual audio fetch.
type audioGetterState struct {
*audioGetter
*getAudioProgressReader
}
func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaSet store.MediaSet) {
defer s.Close()
defer r.Close()
streamWithProgress := newProgressReader(r, "audio", mediaSet.AudioContentLength, s.logger)
var stdErr bytes.Buffer
cmd := exec.CommandContext(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
cmd.Stdin = streamWithProgress
cmd.Stderr = &stdErr
stdout, err := cmd.StdoutPipe()
if err != nil {
s.CloseWithError(fmt.Errorf("error getting stdout: %v", err))
return
}
if err = cmd.Start(); err != nil {
s.CloseWithError(fmt.Errorf("error starting command: %v", err))
return
}
// TODO: use mediaSet func to fetch s3Key
s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID)
teeReader := io.TeeReader(stdout, s)
uploader := newMultipartUploader(s.s3API, s.logger)
bytesUploaded, err := uploader.Upload(ctx, teeReader, s.config.S3Bucket, s3Key, rawAudioMimeType)
if err != nil {
s.CloseWithError(fmt.Errorf("error uploading audio: %v", err))
return
}
if _, err = s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{
ID: mediaSet.ID,
AudioS3Bucket: sqlString(s.config.S3Bucket),
AudioS3Key: sqlString(s3Key),
AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(mediaSet.AudioChannels)),
}); err != nil {
s.CloseWithError(fmt.Errorf("error setting audio uploaded: %v", err))
return
}
if err = cmd.Wait(); err != nil {
s.CloseWithError(fmt.Errorf("error waiting for command: %v", err))
return
}
}
// getAudioProgressReader accepts a byte stream containing little endian
// signed int16s and, given a target number of bins, emits a stream of peaks
// corresponding to each channel of the audio data.
type getAudioProgressReader struct {
framesExpected int64
channels int
framesPerBin int
samples []int16
currPeaks []int16
currCount int
framesProcessed int64
progress chan GetAudioProgress
errorChan chan error
}
func newGetAudioProgressReader(framesExpected int64, channels, numBins int) (*getAudioProgressReader, error) {
if framesExpected <= 0 || channels <= 0 || numBins <= 0 {
return nil, fmt.Errorf("error creating audio progress reader (framesExpected = %d, channels = %d, numBins = %d)", framesExpected, channels, numBins)
}
return &getAudioProgressReader{
channels: channels,
framesExpected: framesExpected,
framesPerBin: int(math.Ceil(float64(framesExpected) / float64(numBins))),
samples: make([]int16, 8_192),
currPeaks: make([]int16, channels),
progress: make(chan GetAudioProgress),
errorChan: make(chan error, 1),
}, nil
}
func (w *getAudioProgressReader) CloseWithError(err error) {
w.errorChan <- err
}
func (w *getAudioProgressReader) Close() error {
close(w.progress)
return nil
}
func (w *getAudioProgressReader) Next() (GetAudioProgress, error) {
for {
select {
case progress, ok := <-w.progress:
if !ok {
return GetAudioProgress{Peaks: w.currPeaks, PercentComplete: w.percentComplete()}, io.EOF
}
return progress, nil
case err := <-w.errorChan:
return GetAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err)
}
}
}
func (w *getAudioProgressReader) Write(p []byte) (int, error) {
// expand our target slice if it is of insufficient size:
numSamples := len(p) / SizeOfInt16
if len(w.samples) < numSamples {
w.samples = append(w.samples, make([]int16, numSamples-len(w.samples))...)
}
samples := w.samples[:numSamples]
if err := binary.Read(bytes.NewReader(p), binary.LittleEndian, samples); err != nil {
return 0, fmt.Errorf("error parsing samples: %v", err)
}
for i := 0; i < len(samples); i += w.channels {
for j := 0; j < w.channels; j++ {
samp := samples[i+j]
if samp < 0 {
samp = -samp
}
if samp > w.currPeaks[j] {
w.currPeaks[j] = samp
}
}
w.currCount++
if w.currCount == w.framesPerBin {
w.nextBin()
}
}
w.framesProcessed += int64(len(samples) / w.channels)
return len(p), nil
}
func (w *getAudioProgressReader) percentComplete() float32 {
return (float32(w.framesProcessed) / float32(w.framesExpected)) * 100.0
}
func (w *getAudioProgressReader) nextBin() {
var progress GetAudioProgress
progress.Peaks = append(progress.Peaks, w.currPeaks...)
progress.PercentComplete = w.percentComplete()
w.progress <- progress
w.currCount = 0
for i := 0; i < len(w.currPeaks); i++ {
w.currPeaks[i] = 0
}
}

View File

@ -17,6 +17,12 @@ type GetVideoProgress struct {
URL string
}
type GetVideoProgressReader interface {
// Next returns the next video progress status. When the stream has finished,
// a valid GetVideoProgress value will be returned with io.EOF.
Next() (GetVideoProgress, error)
}
type videoGetter struct {
s3 S3API
store Store
@ -62,7 +68,8 @@ func (g *videoGetter) GetVideo(ctx context.Context, r io.Reader, exp int64, medi
return s, nil
}
// Write implements io.Writer.
// Write implements io.Writer. It is copied that same data that is written to
// S3, to implement progress tracking.
func (s *videoGetterState) Write(p []byte) (int, error) {
s.count += int64(len(p))
pc := (float32(s.count) / float32(s.exp)) * 100

View File

@ -10,25 +10,25 @@ import (
const SizeOfInt16 = 2
type Audio struct {
Bytes int64 `json:"bytes"`
Channels int `json:"channels"`
ContentLength int64
Channels int
// ApproxFrames is used during initial processing when a precise frame count
// cannot be determined. Prefer Frames in all other cases.
ApproxFrames int64 `json:"approx_frames"`
Frames int64 `json:"frames"`
SampleRate int `json:"sample_rate"`
YoutubeItag int `json:"youtube_itag"`
MimeType string `json:"mime_type"`
ApproxFrames int64
Frames int64
SampleRate int
YoutubeItag int
MimeType string
}
type Video struct {
Bytes int64 `json:"bytes"`
Duration time.Duration `json:"duration"`
ContentLength int64
Duration time.Duration
// not sure if this are needed any more?
ThumbnailWidth int `json:"thumbnail_width"`
ThumbnailHeight int `json:"thumbnail_height"`
YoutubeItag int `json:"youtube_itag"`
MimeType string `json:"mime_type"`
ThumbnailWidth int
ThumbnailHeight int
YoutubeItag int
MimeType string
}
// MediaSet represents the media and metadata associated with a single media

View File

@ -169,7 +169,9 @@ func (s *MediaSetService) createMediaSet(ctx context.Context, youtubeID string)
AudioFramesApprox: audioMetadata.ApproxFrames,
AudioSampleRate: int32(audioMetadata.SampleRate),
AudioMimeTypeEncoded: audioMetadata.MimeType,
AudioContentLength: audioMetadata.ContentLength,
VideoYoutubeItag: int32(videoMetadata.YoutubeItag),
VideoContentLength: videoMetadata.ContentLength,
VideoMimeType: videoMetadata.MimeType,
VideoDurationNanos: videoMetadata.Duration.Nanoseconds(),
}
@ -200,17 +202,17 @@ func (s *MediaSetService) findMediaSet(ctx context.Context, youtubeID string) (*
ID: mediaSet.ID,
YoutubeID: mediaSet.YoutubeID,
Audio: Audio{
YoutubeItag: int(mediaSet.AudioYoutubeItag),
Bytes: 0, // DEPRECATED
Channels: int(mediaSet.AudioChannels),
ApproxFrames: int64(mediaSet.AudioFramesApprox),
Frames: mediaSet.AudioFrames.Int64,
SampleRate: int(mediaSet.AudioSampleRate),
MimeType: mediaSet.AudioMimeTypeEncoded,
YoutubeItag: int(mediaSet.AudioYoutubeItag),
ContentLength: mediaSet.AudioContentLength,
Channels: int(mediaSet.AudioChannels),
ApproxFrames: int64(mediaSet.AudioFramesApprox),
Frames: mediaSet.AudioFrames.Int64,
SampleRate: int(mediaSet.AudioSampleRate),
MimeType: mediaSet.AudioMimeTypeEncoded,
},
Video: Video{
YoutubeItag: int(mediaSet.VideoYoutubeItag),
Bytes: 0, // DEPRECATED?
ContentLength: mediaSet.VideoContentLength,
Duration: time.Duration(mediaSet.VideoDurationNanos),
MimeType: mediaSet.VideoMimeType,
ThumbnailWidth: 0, // ??
@ -234,7 +236,7 @@ func (s *MediaSetService) fetchVideoMetadata(ctx context.Context, video *youtube
return Video{
YoutubeItag: format.ItagNo,
MimeType: format.MimeType,
Bytes: format.ContentLength,
ContentLength: format.ContentLength,
ThumbnailWidth: thumbnailWidth,
ThumbnailHeight: thumbnailHeight,
Duration: time.Duration(durationMsecs) * time.Millisecond,
@ -261,11 +263,12 @@ func (s *MediaSetService) fetchAudioMetadata(ctx context.Context, video *youtube
approxFrames := int64(approxDuration/time.Second) * int64(sampleRate)
return Audio{
MimeType: format.MimeType,
YoutubeItag: format.ItagNo,
ApproxFrames: approxFrames,
Channels: format.AudioChannels,
SampleRate: sampleRate,
ContentLength: format.ContentLength,
MimeType: format.MimeType,
YoutubeItag: format.ItagNo,
ApproxFrames: approxFrames,
Channels: format.AudioChannels,
SampleRate: sampleRate,
}, nil
}
@ -316,12 +319,6 @@ func (s *MediaSetService) GetVideo(ctx context.Context, id uuid.UUID) (GetVideoP
)
}
type GetVideoProgressReader interface {
// Next returns the next video progress status. When the stream has finished,
// a valid GetVideoProgress value will be returned with io.EOF.
Next() (GetVideoProgress, error)
}
// GetAudio fetches the audio part of a MediaSet.
func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins int) (GetAudioProgressReader, error) {
mediaSet, err := s.store.GetMediaSet(ctx, id)
@ -336,6 +333,11 @@ func (s *MediaSetService) GetAudio(ctx context.Context, id uuid.UUID, numBins in
return s.getAudioFromYoutube(ctx, mediaSet, numBins)
}
func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) {
audioGetter := newAudioGetter(s.store, s.youtube, s.s3, s.config, s.logger)
return audioGetter.GetAudio(ctx, mediaSet, numBins)
}
func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) {
input := s3.GetObjectInput{
Bucket: aws.String(mediaSet.AudioS3Bucket.String),
@ -400,7 +402,7 @@ outer:
if err != nil {
s.logger.Errorf("getAudioFromS3State: error closing s3 reader: %v", err)
s.Abort(err)
s.CloseWithError(err)
return
}
@ -409,113 +411,6 @@ outer:
}
}
func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetAudioProgressReader, error) {
video, err := s.youtube.GetVideoContext(ctx, mediaSet.YoutubeID)
if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err)
}
format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag))
if format == nil {
return nil, fmt.Errorf("error finding itag: %v", err)
}
stream, _, err := s.youtube.GetStreamContext(ctx, video, format)
if err != nil {
return nil, fmt.Errorf("error fetching stream: %v", err)
}
streamWithProgress := newProgressReader(stream, "audio", format.ContentLength, s.logger)
ffmpegReader, err := newFfmpegReader(ctx, streamWithProgress, "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
if err != nil {
return nil, fmt.Errorf("error creating ffmpegreader: %v", err)
}
// TODO: use mediaSet func to fetch s3Key
s3Key := fmt.Sprintf("media_sets/%s/audio.raw", mediaSet.ID)
uploader := newMultipartUploader(s.s3, s.logger)
getAudioProgressReader, err := newGetAudioProgressReader(
int64(mediaSet.AudioFramesApprox),
format.AudioChannels,
numBins,
)
if err != nil {
return nil, fmt.Errorf("error creating audio reader: %v", err)
}
state := getAudioFromYoutubeState{
getAudioProgressReader: getAudioProgressReader,
ffmpegReader: ffmpegReader,
uploader: uploader,
s3Bucket: s.config.S3Bucket,
s3Key: s3Key,
store: s.store,
channels: format.AudioChannels,
logger: s.logger,
}
go state.run(ctx, mediaSet.ID)
return &state, nil
}
type getAudioFromYoutubeState struct {
*getAudioProgressReader
ffmpegReader *ffmpegReader
uploader *multipartUploader
s3Bucket, s3Key string
store Store
channels int
logger *zap.SugaredLogger
}
func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID) {
teeReader := io.TeeReader(s.ffmpegReader, s)
bytesUploaded, err := s.uploader.Upload(ctx, teeReader, s.s3Bucket, s.s3Key, rawAudioMimeType)
// If there was an error returned, the underlying ffmpegReader process may
// still be active. Kill it.
if err != nil {
if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil {
s.logger.Errorf("getAudioFromYoutubeState: error cancelling ffmpegreader: %v", cancelErr)
}
}
// Either way, we need to wait for the ffmpegReader process to exit,
// and ensure there is no error.
if readerErr := s.ffmpegReader.Close(); readerErr != nil {
if err == nil {
err = readerErr
}
}
if err == nil {
_, updateErr := s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{
ID: mediaSetID,
AudioS3Bucket: sqlString(s.s3Bucket),
AudioS3Key: sqlString(s.s3Key),
AudioFrames: sqlInt64(bytesUploaded / SizeOfInt16 / int64(s.channels)),
})
if updateErr != nil {
err = updateErr
}
}
if err != nil {
s.logger.Errorf("getAudioFromYoutubeState: error uploading asynchronously: %v", err)
s.Abort(err)
return
}
if iterErr := s.Close(); iterErr != nil {
s.logger.Errorf("getAudioFromYoutubeState: error closing progress iterator: %v", iterErr)
}
}
func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, numBins int) ([]int16, error) {
mediaSet, err := s.store.GetMediaSet(ctx, id)
if err != nil {

View File

@ -117,7 +117,7 @@ func (c *mediaSetServiceController) GetAudio(request *pbmediaset.GetAudioRequest
}
for {
progress, err := reader.Read()
progress, err := reader.Next()
if err != nil && err != io.EOF {
return newResponseError(err)
}

View File

@ -0,0 +1,2 @@
ALTER TABLE media_sets DROP COLUMN video_content_length;
ALTER TABLE media_sets DROP COLUMN audio_content_length;

View File

@ -0,0 +1,5 @@
ALTER TABLE media_sets ADD COLUMN audio_content_length bigint NOT NULL;
ALTER TABLE media_sets ADD COLUMN video_content_length bigint NOT NULL;
ALTER TABLE media_sets ADD CONSTRAINT check_audio_content_length_gt_0 CHECK (audio_content_length > 0);
ALTER TABLE media_sets ADD CONSTRAINT check_video_content_length_gt_0 CHECK (video_content_length > 0);

View File

@ -5,8 +5,8 @@ SELECT * FROM media_sets WHERE id = $1;
SELECT * FROM media_sets WHERE youtube_id = $1;
-- name: CreateMediaSet :one
INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_frames_approx, audio_sample_rate, audio_mime_type_encoded, video_youtube_itag, video_mime_type, video_duration_nanos, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW(), NOW())
INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_frames_approx, audio_sample_rate, audio_content_length, audio_mime_type_encoded, video_youtube_itag, video_content_length, video_mime_type, video_duration_nanos, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW(), NOW())
RETURNING *;
-- name: SetAudioUploaded :one