Add test coverage for getAudioFromYoutube flow
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Rob Watson 2022-01-03 18:44:19 +01:00
parent 176a1cd8c1
commit 66c65694ae
5 changed files with 319 additions and 130 deletions

View File

@ -7,7 +7,6 @@ import (
"fmt"
"io"
"math"
"os/exec"
"strconv"
"sync"
@ -29,21 +28,23 @@ type GetPeaksProgressReader interface {
// audioGetter manages getting and processing audio from Youtube.
type audioGetter struct {
store Store
youtube YoutubeClient
fileStore FileStore
config config.Config
logger *zap.SugaredLogger
store Store
youtube YoutubeClient
fileStore FileStore
commandFunc CommandFunc
config config.Config
logger *zap.SugaredLogger
}
// newAudioGetter returns a new audioGetter.
func newAudioGetter(store Store, youtube YoutubeClient, fileStore FileStore, config config.Config, logger *zap.SugaredLogger) *audioGetter {
func newAudioGetter(store Store, youtube YoutubeClient, fileStore FileStore, commandFunc CommandFunc, config config.Config, logger *zap.SugaredLogger) *audioGetter {
return &audioGetter{
store: store,
youtube: youtube,
fileStore: fileStore,
config: config,
logger: logger,
store: store,
youtube: youtube,
fileStore: fileStore,
commandFunc: commandFunc,
config: config,
logger: logger,
}
}
@ -60,7 +61,7 @@ func (g *audioGetter) GetAudio(ctx context.Context, mediaSet store.MediaSet, num
format := video.Formats.FindByItag(int(mediaSet.AudioYoutubeItag))
if format == nil {
return nil, fmt.Errorf("error finding itag: %v", err)
return nil, fmt.Errorf("error finding itag: %d", mediaSet.AudioYoutubeItag)
}
stream, _, err := g.youtube.GetStreamContext(ctx, video, format)
@ -94,7 +95,8 @@ func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaS
teeReader := io.TeeReader(streamWithProgress, pw)
var stdErr bytes.Buffer
cmd := exec.CommandContext(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
cmd := s.commandFunc(ctx, "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", "-", "-f", rawAudioFormat, "-ar", strconv.Itoa(rawAudioSampleRate), "-acodec", rawAudioCodec, "-")
fmt.Println("cmd is", cmd, cmd.Env)
cmd.Stdin = teeReader
cmd.Stderr = &stdErr
stdout, err := cmd.StdoutPipe()
@ -124,6 +126,7 @@ func (s *audioGetterState) getAudio(ctx context.Context, r io.ReadCloser, mediaS
s.CloseWithError(fmt.Errorf("error uploading encoded audio: %v", encErr))
return
}
pr.Close()
presignedAudioURL, err = s.fileStore.GetURL(ctx, key)
if err != nil {

View File

@ -1,10 +1,13 @@
package media_test
import (
"bytes"
"context"
"database/sql"
"errors"
"io"
"io/ioutil"
"strings"
"testing"
"time"
@ -13,14 +16,174 @@ import (
"git.netflux.io/rob/clipper/generated/store"
"git.netflux.io/rob/clipper/media"
"github.com/google/uuid"
"github.com/kkdai/youtube/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestGetAudioFromYoutube(t *testing.T) {
const (
videoID = "abcdef12"
inFixturePath = "testdata/tone-44100-stereo-int16-30000ms.raw"
inFixtureLen = int64(5_292_000)
inFixtureFrames = inFixtureLen / 4 // stereo-int16
)
ctx := context.Background()
mediaSetID := uuid.New()
mediaSet := store.MediaSet{
ID: mediaSetID,
YoutubeID: videoID,
AudioYoutubeItag: 123,
AudioChannels: 2,
AudioFramesApprox: inFixtureFrames,
}
video := &youtube.Video{
ID: videoID,
Formats: []youtube.Format{{ItagNo: 123, FPS: 0, AudioChannels: 2}},
}
t.Run("NOK,ErrorFetchingMediaSet", func(t *testing.T) {
var mockStore mocks.Store
mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(store.MediaSet{}, errors.New("db went boom"))
service := media.NewMediaSetService(&mockStore, nil, nil, nil, config.Config{}, zap.NewNop().Sugar())
_, err := service.GetPeaks(ctx, mediaSetID, 10)
assert.EqualError(t, err, "error getting media set: db went boom")
})
t.Run("NOK,ErrorFetchingStream", func(t *testing.T) {
var mockStore mocks.Store
mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil)
var youtubeClient mocks.YoutubeClient
youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil)
youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(nil, int64(0), errors.New("uh oh"))
service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, config.Config{}, zap.NewNop().Sugar())
_, err := service.GetPeaks(ctx, mediaSetID, 10)
assert.EqualError(t, err, "error fetching stream: uh oh")
})
t.Run("NOK,ErrorBuildingProgressReader", func(t *testing.T) {
invalidMediaSet := mediaSet
invalidMediaSet.AudioChannels = 0
var mockStore mocks.Store
mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(invalidMediaSet, nil)
var youtubeClient mocks.YoutubeClient
youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil)
youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(nil, int64(0), nil)
service := media.NewMediaSetService(&mockStore, &youtubeClient, nil, nil, config.Config{}, zap.NewNop().Sugar())
_, err := service.GetPeaks(ctx, mediaSetID, 10)
assert.EqualError(t, err, "error building progress reader: error creating audio progress reader (framesExpected = 1323000, channels = 0, numBins = 10)")
})
t.Run("NOK,UploadError", func(t *testing.T) {
var mockStore mocks.Store
mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil)
mockStore.On("SetEncodedAudioUploaded", ctx, mock.Anything).Return(mediaSet, nil)
var youtubeClient mocks.YoutubeClient
youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil)
youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(io.NopCloser(bytes.NewReader(nil)), int64(0), nil)
var fileStore mocks.FileStore
fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, "audio/raw").Return(int64(0), errors.New("error uploading raw audio"))
fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, "audio/opus").Return(int64(0), nil)
fileStore.On("GetURL", ctx, mock.Anything).Return("", nil)
cmd := helperCommand(t, "", inFixturePath, "", 0)
service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, config.Config{}, zap.NewNop().Sugar())
stream, err := service.GetPeaks(ctx, mediaSetID, 10)
assert.NoError(t, err)
_, err = stream.Next()
assert.EqualError(t, err, "error waiting for progress: error uploading raw audio: error uploading raw audio")
})
t.Run("NOK,FFmpegError", func(t *testing.T) {
var mockStore mocks.Store
mockStore.On("GetMediaSet", mock.Anything, mediaSetID).Return(mediaSet, nil)
mockStore.On("SetEncodedAudioUploaded", ctx, mock.Anything).Return(mediaSet, nil)
mockStore.On("SetRawAudioUploaded", ctx, mock.Anything).Return(mediaSet, nil)
var youtubeClient mocks.YoutubeClient
youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil)
youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(io.NopCloser(strings.NewReader("some audio")), int64(0), nil)
var fileStore mocks.FileStore
fileStore.On("PutObject", ctx, mock.Anything, mock.Anything, mock.Anything).Return(int64(0), nil)
fileStore.On("GetURL", ctx, mock.Anything).Return("", nil)
cmd := helperCommand(t, "", inFixturePath, "oh no", 101)
service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, config.Config{}, zap.NewNop().Sugar())
stream, err := service.GetPeaks(ctx, mediaSetID, 10)
assert.NoError(t, err)
_, err = stream.Next()
assert.EqualError(t, err, "error waiting for progress: error waiting for command: exit status 101, output: oh no")
})
t.Run("OK", func(t *testing.T) {
// Mock Store
var mockStore mocks.Store
mockStore.On("GetMediaSet", ctx, mediaSetID).Return(mediaSet, nil)
mockStore.On("SetRawAudioUploaded", ctx, mock.MatchedBy(func(p store.SetRawAudioUploadedParams) bool {
return p.ID == mediaSetID && p.AudioFrames.Int64 == inFixtureFrames
})).Return(mediaSet, nil)
mockStore.On("SetEncodedAudioUploaded", ctx, mock.MatchedBy(func(p store.SetEncodedAudioUploadedParams) bool {
return p.ID == mediaSetID
})).Return(mediaSet, nil)
defer mockStore.AssertExpectations(t)
// Mock YoutubeClient
encodedContent := "this is an opus stream"
reader := io.NopCloser(strings.NewReader(encodedContent))
var youtubeClient mocks.YoutubeClient
youtubeClient.On("GetVideoContext", ctx, mediaSet.YoutubeID).Return(video, nil)
youtubeClient.On("GetStreamContext", ctx, video, &video.Formats[0]).Return(reader, int64(len(encodedContent)), nil)
defer youtubeClient.AssertExpectations(t)
// Mock FileStore
// It is necessary to consume the readers passed into the mocks to avoid IO
// errors. Since we're doing that we can also assert the content that is
// passed to them is as expected.
url := "https://www.example.com/foo"
var fileStore mocks.FileStore
fileStore.On("PutObject", ctx, "media_sets/"+mediaSetID.String()+"/audio.opus", mock.Anything, "audio/opus").
Run(func(args mock.Arguments) {
readContent, err := ioutil.ReadAll(args[2].(io.Reader))
require.NoError(t, err)
assert.Equal(t, encodedContent, string(readContent))
}).
Return(int64(len(encodedContent)), nil)
fileStore.On("PutObject", ctx, "media_sets/"+mediaSetID.String()+"/audio.raw", mock.Anything, "audio/raw").
Run(func(args mock.Arguments) {
n, err := io.Copy(io.Discard, args[2].(io.Reader))
require.NoError(t, err)
assert.Equal(t, inFixtureLen, n)
}).
Return(inFixtureLen, nil)
fileStore.On("GetURL", ctx, "media_sets/"+mediaSetID.String()+"/audio.opus").Return(url, nil)
defer fileStore.AssertExpectations(t)
numBins := 10
cmd := helperCommand(t, "ffmpeg -hide_banner -loglevel error -i - -f s16le -ar 48000 -acodec pcm_s16le -", inFixturePath, "", 0)
service := media.NewMediaSetService(&mockStore, &youtubeClient, &fileStore, cmd, config.Config{}, zap.NewNop().Sugar())
stream, err := service.GetPeaks(ctx, mediaSetID, numBins)
require.NoError(t, err)
assertConsumeStream(t, numBins, url, stream)
})
}
func TestGetPeaksFromFileStore(t *testing.T) {
const inFixturePath = "testdata/tone-44100-stereo-int16-30000ms.raw"
const (
inFixturePath = "testdata/tone-44100-stereo-int16-30000ms.raw"
inFixtureLen = 5_292_000
)
ctx := context.Background()
logger := zap.NewNop().Sugar()
@ -62,7 +225,7 @@ func TestGetPeaksFromFileStore(t *testing.T) {
defer mockStore.AssertExpectations(t)
var fileStore mocks.FileStore
reader := fixtureReader(t, inFixturePath, 5_292_000)
reader := fixtureReader(t, inFixturePath, inFixtureLen)
fileStore.On("GetObject", mock.Anything, "raw audio key").Return(reader, nil)
fileStore.On("GetURL", mock.Anything, "encoded audio key").Return("", errors.New("network error"))
defer fileStore.AssertExpectations(t)
@ -89,9 +252,10 @@ func TestGetPeaksFromFileStore(t *testing.T) {
defer mockStore.AssertExpectations(t)
var fileStore mocks.FileStore
reader := fixtureReader(t, inFixturePath, 5_292_000)
url := "https://www.example.com/foo"
reader := fixtureReader(t, inFixturePath, inFixtureLen)
fileStore.On("GetObject", mock.Anything, "raw audio key").Return(reader, nil)
fileStore.On("GetURL", mock.Anything, "encoded audio key").Return("https://www.example.com/foo", nil)
fileStore.On("GetURL", mock.Anything, "encoded audio key").Return(url, nil)
defer fileStore.AssertExpectations(t)
numBins := 10
@ -99,38 +263,44 @@ func TestGetPeaksFromFileStore(t *testing.T) {
stream, err := service.GetPeaks(ctx, mediaSetID, numBins)
require.NoError(t, err)
lastPeaks := make([]int16, 2) // stereo
var (
count int
lastPercentComplete float32
lastURL string
)
for {
progress, err := stream.Next()
if err != io.EOF {
require.NoError(t, err)
}
assert.Len(t, progress.Peaks, 2)
assert.GreaterOrEqual(t, progress.PercentComplete, lastPercentComplete)
lastPercentComplete = progress.PercentComplete
lastURL = progress.URL
if err == io.EOF {
break
}
// the fixture is a tone gradually increasing in amplitude:
assert.Greater(t, progress.Peaks[0], lastPeaks[0])
assert.Greater(t, progress.Peaks[1], lastPeaks[1])
lastPeaks = progress.Peaks
count++
}
assert.Equal(t, float32(100), lastPercentComplete)
assert.Equal(t, []int16{32_767, 32_766}, lastPeaks)
assert.Equal(t, numBins, count)
assert.Equal(t, "https://www.example.com/foo", lastURL)
assertConsumeStream(t, numBins, url, stream)
})
}
// assertConsumeStream asserts that the stream produced by both the
// from-youtube and from-filestore flows is identical.
func assertConsumeStream(t *testing.T, expBins int, expURL string, stream media.GetPeaksProgressReader) {
lastPeaks := make([]int16, 2) // stereo
var (
count int
lastPercentComplete float32
lastURL string
)
for {
progress, err := stream.Next()
if err != io.EOF {
require.NoError(t, err)
}
assert.Len(t, progress.Peaks, 2)
assert.GreaterOrEqual(t, progress.PercentComplete, lastPercentComplete)
lastPercentComplete = progress.PercentComplete
lastURL = progress.URL
if err == io.EOF {
break
}
// the fixture is a tone gradually increasing in amplitude:
assert.Greater(t, progress.Peaks[0], lastPeaks[0])
assert.Greater(t, progress.Peaks[1], lastPeaks[1])
lastPeaks = progress.Peaks
count++
}
assert.Equal(t, float32(100), lastPercentComplete)
assert.Equal(t, []int16{32_767, 32_766}, lastPeaks)
assert.Equal(t, expBins, count)
assert.Equal(t, expURL, lastURL)
}

View File

@ -4,12 +4,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"strings"
"testing"
"git.netflux.io/rob/clipper/config"
@ -24,79 +19,6 @@ import (
"go.uber.org/zap"
)
func fixtureReader(t *testing.T, fixturePath string, limit int64) io.ReadCloser {
fptr, err := os.Open(fixturePath)
require.NoError(t, err)
// limitReader to make the mock work realistically, not intended for assertions:
return struct {
io.Reader
io.Closer
}{
Reader: io.LimitReader(fptr, limit),
Closer: fptr,
}
}
func helperCommand(t *testing.T, wantCommand, stdoutFile, stderrString string, forceExitCode int) media.CommandFunc {
return func(ctx context.Context, name string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcess", "--", name}
cs = append(cs, args...)
cmd := exec.CommandContext(ctx, os.Args[0], cs...)
cmd.Env = []string{
"GO_WANT_HELPER_PROCESS=1",
"GO_WANT_COMMAND=" + wantCommand,
"GO_STDOUT_FILE=" + stdoutFile,
"GO_STDERR_STRING=" + stderrString,
"GO_FORCE_EXIT_CODE=" + strconv.Itoa(forceExitCode),
}
return cmd
}
}
func TestHelperProcess(t *testing.T) {
if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
return
}
defer func() {
// Stop the helper process writing to stdout after the test has finished.
// This prevents it from writing the "PASS" string which is unwanted in
// this context.
if !t.Failed() {
os.Stdout, _ = os.Open(os.DevNull)
}
}()
if exitCode := os.Getenv("GO_FORCE_EXIT_CODE"); exitCode != "0" {
c, _ := strconv.Atoi(exitCode)
os.Stderr.WriteString(os.Getenv("GO_STDERR_STRING"))
os.Exit(c)
}
if wantCommand := os.Getenv("GO_WANT_COMMAND"); wantCommand != "" {
gotCmd := strings.Split(strings.Join(os.Args, " "), " -- ")[1]
if wantCommand != gotCmd {
fmt.Printf("GO_WANT_COMMAND assertion failed:\nwant = %v\ngot = %v", wantCommand, gotCmd)
return
}
}
// Copy stdin to /dev/null. This is required to avoid broken pipe errors in
// the tests:
_, err := io.Copy(io.Discard, os.Stdin)
require.NoError(t, err)
// If an output file is provided, then copy that to stdout:
if fname := os.Getenv("GO_STDOUT_FILE"); fname != "" {
fptr, err := os.Open(fname)
require.NoError(t, err)
_, err = io.Copy(os.Stdout, fptr)
require.NoError(t, err)
}
}
func TestGetSegment(t *testing.T) {
mediaSetID := uuid.MustParse("4c440241-cca9-436f-adb0-be074588cf2b")
const inFixturePath = "testdata/tone-44100-stereo-int16-30000ms.raw"

View File

@ -271,7 +271,7 @@ func (s *MediaSetService) GetPeaks(ctx context.Context, id uuid.UUID, numBins in
}
func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet store.MediaSet, numBins int) (GetPeaksProgressReader, error) {
audioGetter := newAudioGetter(s.store, s.youtube, s.fileStore, s.config, s.logger)
audioGetter := newAudioGetter(s.store, s.youtube, s.fileStore, s.commandFunc, s.config, s.logger)
return audioGetter.GetAudio(ctx, mediaSet, numBins)
}

View File

@ -0,0 +1,94 @@
package media_test
import (
"context"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"strings"
"testing"
"git.netflux.io/rob/clipper/media"
"github.com/stretchr/testify/require"
)
// fixtureReader loads a fixture into a ReadCloser with the provided limit.
func fixtureReader(t *testing.T, fixturePath string, limit int64) io.ReadCloser {
fptr, err := os.Open(fixturePath)
require.NoError(t, err)
// limitReader to make the mock work realistically, not intended for assertions:
return struct {
io.Reader
io.Closer
}{
Reader: io.LimitReader(fptr, limit),
Closer: fptr,
}
}
// helperCommand returns a function that builds an *exec.Cmd which executes a
// test function in order to act as a mock process.
func helperCommand(t *testing.T, wantCommand, stdoutFile, stderrString string, forceExitCode int) media.CommandFunc {
return func(ctx context.Context, name string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcess", "--", name}
cs = append(cs, args...)
cmd := exec.CommandContext(ctx, os.Args[0], cs...)
cmd.Env = []string{
"GO_WANT_HELPER_PROCESS=1",
"GO_WANT_COMMAND=" + wantCommand,
"GO_STDOUT_FILE=" + stdoutFile,
"GO_STDERR_STRING=" + stderrString,
"GO_FORCE_EXIT_CODE=" + strconv.Itoa(forceExitCode),
}
return cmd
}
}
// TestHelperProcess is the body for the mock executable process built by
// helperCommand.
func TestHelperProcess(t *testing.T) {
if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
return
}
defer func() {
// Stop the helper process writing to stdout after the test has finished.
// This prevents it from writing the "PASS" string which is unwanted in
// this context.
if !t.Failed() {
os.Stdout, _ = os.Open(os.DevNull)
}
}()
if exitCode := os.Getenv("GO_FORCE_EXIT_CODE"); exitCode != "0" {
c, _ := strconv.Atoi(exitCode)
os.Stderr.WriteString(os.Getenv("GO_STDERR_STRING"))
os.Exit(c)
}
if wantCommand := os.Getenv("GO_WANT_COMMAND"); wantCommand != "" {
gotCmd := strings.Split(strings.Join(os.Args, " "), " -- ")[1]
if wantCommand != gotCmd {
fmt.Fprintf(os.Stderr, "GO_WANT_COMMAND assertion failed:\nwant = %v\ngot = %v", wantCommand, gotCmd)
t.Fail() // necessary to make the test fail
}
}
// Copy stdin to /dev/null. This is required to avoid broken pipe errors in
// the tests:
_, err := io.Copy(io.Discard, os.Stdin)
require.NoError(t, err)
// If an output file is provided, then copy that to stdout:
if fname := os.Getenv("GO_STDOUT_FILE"); fname != "" {
fptr, err := os.Open(fname)
require.NoError(t, err)
defer fptr.Close()
_, err = io.Copy(os.Stdout, fptr)
require.NoError(t, err)
}
}