improve naming, refactor

This commit is contained in:
Rob Watson 2021-10-27 22:17:59 +02:00
parent d04debbe9a
commit 3ce3736770
6 changed files with 138 additions and 132 deletions

View File

@ -30,14 +30,16 @@ func main() {
var youtubeClient youtube.Client var youtubeClient youtube.Client
// Create a VideoFetchService // Create a VideoFetchService
fetchService := media.NewVideoFetchService(&youtubeClient, s3Client) fetchService := media.NewFetchMediaSetService(&youtubeClient, s3Client)
peakReader, err := fetchService.FetchPeaks(ctx, videoID)
// Create a progressReader
progressReader, err := fetchService.FetchAudio(ctx, videoID)
if err != nil { if err != nil {
log.Fatalf("error calling fetch service: %v", err) log.Fatalf("error calling fetch service: %v", err)
} }
for { for {
progress, err := peakReader.Next() progress, err := progressReader.Read()
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
log.Printf("error reading progress: %v", err) log.Printf("error reading progress: %v", err)

View File

@ -7,15 +7,20 @@ import (
"io" "io"
) )
type FetchPeaksProgress struct { type FetchAudioProgress struct {
percentComplete float32 percentComplete float32
Peaks []int16 Peaks []int16
} }
// fetchPeaksIterator accepts a byte stream containing little endian type FetchAudioProgressReader interface {
Read() (FetchAudioProgress, error)
Close() error
}
// fetchAudioProgressReader accepts a byte stream containing little endian
// signed int16s and, given a target number of bins, emits a stream of peaks // signed int16s and, given a target number of bins, emits a stream of peaks
// corresponding to each channel of the audio data. // corresponding to each channel of the audio data.
type fetchPeaksIterator struct { type fetchAudioProgressReader struct {
channels int channels int
framesPerBin int framesPerBin int
@ -23,32 +28,32 @@ type fetchPeaksIterator struct {
currPeaks []int16 currPeaks []int16
currCount int currCount int
total int total int
progress chan FetchPeaksProgress progress chan FetchAudioProgress
errorChan chan error errorChan chan error
} }
// TODO: validate inputs, debugging is confusing otherwise // TODO: validate inputs, debugging is confusing otherwise
func newFetchPeaksIterator(expFrames int64, channels, numBins int) *fetchPeaksIterator { func newFetchAudioProgressReader(expFrames int64, channels, numBins int) *fetchAudioProgressReader {
return &fetchPeaksIterator{ return &fetchAudioProgressReader{
channels: channels, channels: channels,
framesPerBin: int(expFrames / int64(numBins)), framesPerBin: int(expFrames / int64(numBins)),
samples: make([]int16, 8_192), samples: make([]int16, 8_192),
currPeaks: make([]int16, channels), currPeaks: make([]int16, channels),
progress: make(chan FetchPeaksProgress), progress: make(chan FetchAudioProgress),
errorChan: make(chan error, 1), errorChan: make(chan error, 1),
} }
} }
func (w *fetchPeaksIterator) Abort(err error) { func (w *fetchAudioProgressReader) Abort(err error) {
w.errorChan <- err w.errorChan <- err
} }
func (w *fetchPeaksIterator) Close() error { func (w *fetchAudioProgressReader) Close() error {
close(w.progress) close(w.progress)
return nil return nil
} }
func (w *fetchPeaksIterator) Write(p []byte) (int, error) { func (w *fetchAudioProgressReader) Write(p []byte) (int, error) {
// expand our target slice if it is of insufficient size: // expand our target slice if it is of insufficient size:
numSamples := len(p) / SizeOfInt16 numSamples := len(p) / SizeOfInt16
if len(w.samples) < numSamples { if len(w.samples) < numSamples {
@ -80,8 +85,8 @@ func (w *fetchPeaksIterator) Write(p []byte) (int, error) {
return len(p), nil return len(p), nil
} }
func (w *fetchPeaksIterator) nextBin() { func (w *fetchAudioProgressReader) nextBin() {
var progress FetchPeaksProgress var progress FetchAudioProgress
// TODO: avoid an allocation? // TODO: avoid an allocation?
progress.Peaks = append(progress.Peaks, w.currPeaks...) progress.Peaks = append(progress.Peaks, w.currPeaks...)
@ -95,16 +100,16 @@ func (w *fetchPeaksIterator) nextBin() {
w.total++ w.total++
} }
func (w *fetchPeaksIterator) Next() (FetchPeaksProgress, error) { func (w *fetchAudioProgressReader) Read() (FetchAudioProgress, error) {
for { for {
select { select {
case progress, ok := <-w.progress: case progress, ok := <-w.progress:
if !ok { if !ok {
return FetchPeaksProgress{}, io.EOF return FetchAudioProgress{}, io.EOF
} }
return FetchPeaksProgress{Peaks: progress.Peaks}, nil return FetchAudioProgress{Peaks: progress.Peaks}, nil
case err := <-w.errorChan: case err := <-w.errorChan:
return FetchPeaksProgress{}, fmt.Errorf("error waiting for progress: %v", err) return FetchAudioProgress{}, fmt.Errorf("error waiting for progress: %v", err)
} }
} }
} }

View File

@ -1,16 +1,12 @@
package media package media
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log" "log"
"os/exec"
"sort"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3"
@ -41,7 +37,7 @@ func (pw *progressReader) Read(p []byte) (int, error) {
return n, err return n, err
} }
// S3Client stubs the AWS S3 service client. // S3Client wraps the AWS S3 service client.
type S3Client interface { type S3Client interface {
CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
@ -49,38 +45,27 @@ type S3Client interface {
CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)
} }
// YoutubeClient stubs the youtube.Client client. // YoutubeClient wraps the youtube.Client client.
type YoutubeClient interface { type YoutubeClient interface {
GetVideoContext(context.Context, string) (*youtubev2.Video, error) GetVideoContext(context.Context, string) (*youtubev2.Video, error)
GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error) GetStreamContext(context.Context, *youtubev2.Video, *youtubev2.Format) (io.ReadCloser, int64, error)
} }
type MediaSet2 struct { // FetchMediaSetService fetches a video via an io.Reader.
id string type FetchMediaSetService struct {
}
func NewMediaSet2(id string) *MediaSet2 {
return &MediaSet2{
id: id,
}
}
// VideoFetchService fetches a video via an io.Reader.
type VideoFetchService struct {
youtube YoutubeClient youtube YoutubeClient
s3 S3Client s3 S3Client
} }
func NewVideoFetchService(youtubeClient YoutubeClient, s3Client S3Client) *VideoFetchService { func NewFetchMediaSetService(youtubeClient YoutubeClient, s3Client S3Client) *FetchMediaSetService {
return &VideoFetchService{ return &FetchMediaSetService{
youtube: youtubeClient, youtube: youtubeClient,
s3: s3Client, s3: s3Client,
} }
} }
// Fetch handles the entire process to fetch and process the audio and video // Fetch fetches the metadata for a given MediaSet source.
// parts of a MediaSet. func (s *FetchMediaSetService) Fetch(ctx context.Context, id string) (*MediaSet, error) {
func (s *VideoFetchService) Fetch(ctx context.Context, id string) (*MediaSet, error) {
video, err := s.youtube.GetVideoContext(ctx, id) video, err := s.youtube.GetVideoContext(ctx, id)
if err != nil { if err != nil {
return nil, fmt.Errorf("error fetching video: %v", err) return nil, fmt.Errorf("error fetching video: %v", err)
@ -118,15 +103,17 @@ func (s *VideoFetchService) Fetch(ctx context.Context, id string) (*MediaSet, er
}, },
} }
// TODO: video
// TODO: save to JSON
return &mediaSet, nil return &mediaSet, nil
} }
type PeakIterator interface { // FetchAudio fetches the audio stream from Youtube, pipes it through FFMPEG to
Next() (FetchPeaksProgress, error) // extract the raw audio samples, and uploads them to S3. It
Close() error // returns a FetchAudioProgressReader. This reader must be read until
} // completion - it will return any error which occurs during the fetch process.
func (s *FetchMediaSetService) FetchAudio(ctx context.Context, id string) (FetchAudioProgressReader, error) {
func (s *VideoFetchService) FetchPeaks(ctx context.Context, id string) (PeakIterator, error) {
mediaSet := NewMediaSet(id) mediaSet := NewMediaSet(id)
if !mediaSet.Exists() { if !mediaSet.Exists() {
// TODO check if audio uploaded already, don't bother again // TODO check if audio uploaded already, don't bother again
@ -174,34 +161,33 @@ func (s *VideoFetchService) FetchPeaks(ctx context.Context, id string) (PeakIter
return nil, fmt.Errorf("error creating uploader: %v", err) return nil, fmt.Errorf("error creating uploader: %v", err)
} }
peakIterator := newFetchPeaksIterator( fetchAudioProgressReader := newFetchAudioProgressReader(
mediaSet.Audio.ApproxFrames, mediaSet.Audio.ApproxFrames,
format.AudioChannels, format.AudioChannels,
100, 100,
) )
state := fetchPeaksState{ state := fetchAudioState{
fetchPeaksIterator: peakIterator, fetchAudioProgressReader: fetchAudioProgressReader,
ffmpegReader: ffmpegReader, ffmpegReader: ffmpegReader,
uploader: uploader, uploader: uploader,
} }
go state.run(ctx) // pass ctx? go state.run(ctx)
return &state, nil return &state, nil
} }
type fetchPeaksState struct { type fetchAudioState struct {
*fetchPeaksIterator *fetchAudioProgressReader
ffmpegReader *ffmpegReader ffmpegReader io.ReadCloser
uploader *multipartUploadWriter uploader *multipartUploadWriter
err error
} }
// run copies the audio data from ffmpeg, waits for termination and then cleans // run copies the audio data from ffmpeg, waits for termination and then cleans
// up appropriately. // up appropriately.
func (s *fetchPeaksState) run(ctx context.Context) { func (s *fetchAudioState) run(ctx context.Context) {
mw := io.MultiWriter(s.fetchPeaksIterator, s.uploader) mw := io.MultiWriter(s, s.uploader)
done := make(chan error) done := make(chan error)
var err error var err error
@ -222,7 +208,6 @@ outer:
} }
if readerErr := s.ffmpegReader.Close(); readerErr != nil { if readerErr := s.ffmpegReader.Close(); readerErr != nil {
log.Printf("error closing ffmpegReader: %v", readerErr)
if err == nil { if err == nil {
err = readerErr err = readerErr
} }
@ -230,7 +215,6 @@ outer:
if err == nil { if err == nil {
if uploaderErr := s.uploader.Complete(); uploaderErr != nil { if uploaderErr := s.uploader.Complete(); uploaderErr != nil {
log.Printf("error closing uploader: %v", uploaderErr)
err = uploaderErr err = uploaderErr
} }
} }
@ -250,69 +234,3 @@ outer:
log.Printf("error closing peak iterator: %v", iterErr) log.Printf("error closing peak iterator: %v", iterErr)
} }
} }
type ffmpegReader struct {
io.ReadCloser
cmd *exec.Cmd
}
func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmpegReader, error) {
var stdErr bytes.Buffer
cmd := exec.CommandContext(ctx, "ffmpeg", arg...)
cmd.Stdin = input
cmd.Stderr = &stdErr // TODO: fix error handling
r, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("error creating pipe: %v", err)
}
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("error starting ffmpeg: %v", err)
}
return &ffmpegReader{ReadCloser: r, cmd: cmd}, nil
}
func (r *ffmpegReader) Close() error {
state, err := r.cmd.Process.Wait()
if err != nil {
return fmt.Errorf("error returned from process: %v", err)
}
if state.ExitCode() != 0 {
return fmt.Errorf("command exited with code %d", state.ExitCode())
}
log.Println("returning from ffmpegreader.close")
return nil
}
// sortAudio returns the provided formats ordered in descending preferred
// order. The ideal candidate is opus-encoded stereo audio in a webm container,
// with the lowest available bitrate.
func sortAudio(inFormats youtubev2.FormatList) youtubev2.FormatList {
var formats youtubev2.FormatList
for _, format := range inFormats {
if format.FPS == 0 && format.AudioChannels > 0 {
formats = append(formats, format)
}
}
sort.SliceStable(formats, func(i, j int) bool {
isOpusI := strings.Contains(formats[i].MimeType, "opus")
isOpusJ := strings.Contains(formats[j].MimeType, "opus")
if isOpusI && isOpusJ {
isStereoI := formats[i].AudioChannels == 2
isStereoJ := formats[j].AudioChannels == 2
if isStereoI && isStereoJ {
return formats[i].ContentLength < formats[j].ContentLength
}
return isStereoI
}
return isOpusI
})
return formats
}

View File

@ -0,0 +1,48 @@
package media
import (
"bytes"
"context"
"fmt"
"io"
"os/exec"
)
type ffmpegReader struct {
io.ReadCloser
cmd *exec.Cmd
}
func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmpegReader, error) {
var stdErr bytes.Buffer
cmd := exec.CommandContext(ctx, "ffmpeg", arg...)
cmd.Stdin = input
cmd.Stderr = &stdErr
r, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("error creating pipe: %v", err)
}
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("error starting ffmpeg: %v, output: %s", err, stdErr.String())
}
return &ffmpegReader{ReadCloser: r, cmd: cmd}, nil
}
func (r *ffmpegReader) Close() error {
state, err := r.cmd.Process.Wait()
if err != nil {
return fmt.Errorf("error returned from ffmpeg process: %v", err)
}
if state.ExitCode() != 0 {
return fmt.Errorf("non-zero status %d returned from ffmpeg process", state.ExitCode())
}
return nil
}

View File

@ -96,9 +96,9 @@ func (u *multipartUploadWriter) partNum() int32 {
return int32(len(u.completedParts) + 1) return int32(len(u.completedParts) + 1)
} }
// Abort aborts the upload process, cancelling the upload on S3. // Abort aborts the upload process, cancelling the upload on S3. It accepts a
// Accepts a separate context in case it is called during cleanup after the // separate context to the associated writer in case it is called during
// original context was killed. // cleanup after the original context was killed.
func (u *multipartUploadWriter) Abort(ctx context.Context) error { func (u *multipartUploadWriter) Abort(ctx context.Context) error {
input := s3.AbortMultipartUploadInput{ input := s3.AbortMultipartUploadInput{
Bucket: aws.String(u.bucket), Bucket: aws.String(u.bucket),
@ -139,6 +139,5 @@ func (u *multipartUploadWriter) Complete() error {
} }
log.Printf("completed upload, key = %s, bytesUploaded = %d", u.key, u.bytesUploaded) log.Printf("completed upload, key = %s, bytesUploaded = %d", u.key, u.bytesUploaded)
return nil return nil
} }

34
backend/media/youtube.go Normal file
View File

@ -0,0 +1,34 @@
package media
import (
"sort"
"strings"
youtubev2 "github.com/kkdai/youtube/v2"
)
// sortAudio returns the provided formats ordered in descending preferred
// order. The ideal candidate is opus-encoded stereo audio in a webm container,
// with the lowest available bitrate.
func sortAudio(inFormats youtubev2.FormatList) youtubev2.FormatList {
var formats youtubev2.FormatList
for _, format := range inFormats {
if format.FPS == 0 && format.AudioChannels > 0 {
formats = append(formats, format)
}
}
sort.SliceStable(formats, func(i, j int) bool {
isOpusI := strings.Contains(formats[i].MimeType, "opus")
isOpusJ := strings.Contains(formats[j].MimeType, "opus")
if isOpusI && isOpusJ {
isStereoI := formats[i].AudioChannels == 2
isStereoJ := formats[j].AudioChannels == 2
if isStereoI && isStereoJ {
return formats[i].ContentLength < formats[j].ContentLength
}
return isStereoI
}
return isOpusI
})
return formats
}