diff --git a/backend/media/service.go b/backend/media/service.go index a8d04f6..2c22ea6 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -388,44 +388,54 @@ func (s *MediaSetService) GetPeaksForSegment(ctx context.Context, id uuid.UUID, sampleBuf := make([]int16, readBufSizeBytes/SizeOfInt16) bytesExpected := (endFrame - startFrame) * int64(channels) * SizeOfInt16 - var ( - bytesRead int64 - closing bool - currPeakIndex int - currFrame int64 - ) + var bytesRead int64 + var closing bool - for { - n, err := modReader.Read(readBuf) - if err == io.EOF { - closing = true - } else if err != nil { - return nil, fmt.Errorf("read error: %v", err) + for bin := 0; bin < numBins; bin++ { + framesRemaining := framesPerBin + if bin == numBins-1 { + framesRemaining += totalFrames % int64(numBins) } - bytesRead += int64(n) - samples := sampleBuf[:n/SizeOfInt16] + for { + // Read as many bytes as possible, but not exceeding the available buffer + // size nor framesRemaining: + bytesToRead := framesRemaining * int64(channels) * SizeOfInt16 + max := int64(len(readBuf)) + if bytesToRead > max { + bytesToRead = max + } - if err := binary.Read(bytes.NewReader(readBuf[:n]), binary.LittleEndian, samples); err != nil { - return nil, fmt.Errorf("error interpreting samples: %v", err) - } + n, err := modReader.Read(readBuf[:bytesToRead]) + if err == io.EOF { + closing = true + } else if err != nil { + return nil, fmt.Errorf("read error: %v", err) + } - for i := 0; i < len(samples); i += channels { - for j := 0; j < channels; j++ { - samp := sampleBuf[i+j] - if samp < 0 { - samp = -samp - } - if samp > peaks[currPeakIndex+j] { - peaks[currPeakIndex+j] = samp + ss := sampleBuf[:n/SizeOfInt16] + if err := binary.Read(bytes.NewReader(readBuf[:n]), binary.LittleEndian, ss); err != nil { + return nil, fmt.Errorf("error interpreting samples: %v", err) + } + + pi := bin * channels + for i := 0; i < len(ss); i += channels { + for j := 0; j < channels; j++ { + s := ss[i+j] + if s < 0 { + s = -s + } + if s > peaks[pi+j] { + peaks[pi+j] = s + } } } - if currFrame == framesPerBin { - currFrame = 0 - currPeakIndex += channels - } else { - currFrame++ + framesRemaining -= int64(n) / int64(channels) / SizeOfInt16 + bytesRead += int64(n) + + if closing || framesRemaining == 0 { + break } } diff --git a/backend/media/service_test.go b/backend/media/service_test.go index 313ce8d..44282e1 100644 --- a/backend/media/service_test.go +++ b/backend/media/service_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "database/sql" + "errors" "io" "os" "testing" @@ -19,11 +20,31 @@ import ( "go.uber.org/zap" ) +// segmentReader returns an error if provided after reading errBytes bytes. +type segmentReader struct { + r io.Reader + n, errBytes int64 + err error +} + +func (r *segmentReader) Read(p []byte) (int, error) { + n, err := r.r.Read(p) + r.n += int64(n) + if r.n >= r.errBytes && r.err != nil { + return n, r.err + } + return n, err +} + +func (r *segmentReader) Close() error { return nil } + func TestPeaksForSegment(t *testing.T) { testCases := []struct { name string fixturePath string - fixtureLen int64 + fixtureReadErrBytes int64 + fixtureReadErr error + fixtureMaxRead int64 startFrame, endFrame int64 channels int32 numBins int @@ -31,9 +52,8 @@ func TestPeaksForSegment(t *testing.T) { wantErr string }{ { - name: "entire fixture, stereo, 1 bin", + name: "OK, entire fixture, stereo, 1 bin", fixturePath: "testdata/tone-44100-stereo-int16.raw", - fixtureLen: 176400, startFrame: 0, endFrame: 44100, channels: 2, @@ -41,9 +61,8 @@ func TestPeaksForSegment(t *testing.T) { wantPeaks: []int16{32747, 32747}, }, { - name: "entire fixture, stereo, 4 bins", + name: "OK, entire fixture, stereo, 4 bins", fixturePath: "testdata/tone-44100-stereo-int16.raw", - fixtureLen: 176400, startFrame: 0, endFrame: 44100, channels: 2, @@ -51,9 +70,8 @@ func TestPeaksForSegment(t *testing.T) { wantPeaks: []int16{8173, 8177, 16366, 16370, 24557, 24555, 32747, 32747}, }, { - name: "entire fixture, stereo, 16 bins", + name: "OK, entire fixture, stereo, 16 bins", fixturePath: "testdata/tone-44100-stereo-int16.raw", - fixtureLen: 176400, startFrame: 0, endFrame: 44100, channels: 2, @@ -61,9 +79,8 @@ func TestPeaksForSegment(t *testing.T) { wantPeaks: []int16{2029, 2029, 4075, 4076, 6124, 6125, 8173, 8177, 10222, 10221, 12267, 12265, 14314, 14313, 16366, 16370, 18413, 18411, 20453, 20454, 22505, 22508, 24557, 24555, 26604, 26605, 28644, 28643, 30698, 30694, 32747, 32747}, }, { - name: "entire fixture, mono, 1 bin", + name: "OK, entire fixture, mono, 1 bin", fixturePath: "testdata/tone-44100-mono-int16.raw", - fixtureLen: 88200, startFrame: 0, endFrame: 44100, channels: 1, @@ -71,14 +88,34 @@ func TestPeaksForSegment(t *testing.T) { wantPeaks: []int16{32748}, }, { - name: "entire fixture, mono, 32 bins", + name: "OK, entire fixture, mono, 32 bins", fixturePath: "testdata/tone-44100-mono-int16.raw", - fixtureLen: 88200, startFrame: 0, endFrame: 44100, channels: 1, numBins: 32, - wantPeaks: []int16{1026, 2030, 3071, 4075, 5122, 6126, 7167, 8172, 9213, 10217, 11259, 12264, 13311, 14315, 15360, 16364, 17405, 18412, 19450, 20453, 21497, 22504, 23549, 24554, 25599, 26607, 27641, 28642, 29688, 30738, 31746, 32748}, + wantPeaks: []int16{1018, 2030, 3060, 4075, 5092, 6126, 7129, 8172, 9174, 10217, 11227, 12264, 13272, 14315, 15319, 16364, 17370, 18412, 19417, 20453, 21457, 22504, 23513, 24554, 25564, 26607, 27607, 28642, 29647, 30700, 31699, 32748}, + }, + { + name: "NOK, entire fixture, mono, 32 bins, read returns io.EOF after 50% complete", + fixturePath: "testdata/tone-44100-mono-int16.raw", + fixtureMaxRead: 44100, + startFrame: 0, + endFrame: 44100, + channels: 1, + numBins: 32, + wantPeaks: []int16{1018, 2030, 3060, 4075, 5092, 6126, 7129, 8172, 9174, 10217, 11227, 12264, 13272, 14315, 15319, 16364, 2053, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + }, + { + name: "NOK, entire fixture, mono, 32 bins, read error after 50% complete", + fixturePath: "testdata/tone-44100-mono-int16.raw", + fixtureReadErrBytes: 44100, + fixtureReadErr: errors.New("foo"), + startFrame: 0, + endFrame: 44100, + channels: 1, + numBins: 32, + wantErr: "read error: foo", }, } @@ -87,11 +124,18 @@ func TestPeaksForSegment(t *testing.T) { startByte := tc.startFrame * int64(tc.channels) * media.SizeOfInt16 endByte := tc.endFrame * int64(tc.channels) * media.SizeOfInt16 expectedBytes := endByte - startByte + if tc.fixtureMaxRead != 0 { + expectedBytes = tc.fixtureMaxRead + } - audioFile, err := os.Open(tc.fixturePath) + fixture, err := os.Open(tc.fixturePath) require.NoError(t, err) - defer audioFile.Close() - audioData := io.NopCloser(io.LimitReader(audioFile, int64(expectedBytes))) + defer fixture.Close() + sr := segmentReader{ + r: io.LimitReader(fixture, int64(expectedBytes)), + err: tc.fixtureReadErr, + errBytes: tc.fixtureReadErrBytes, + } mediaSet := store.MediaSet{ ID: uuid.New(), @@ -108,13 +152,13 @@ func TestPeaksForSegment(t *testing.T) { fileStore := &mocks.FileStore{} fileStore. On("GetObjectWithRange", mock.Anything, "foo", startByte, endByte). - Return(audioData, nil) + Return(&sr, nil) service := media.NewMediaSetService(store, nil, fileStore, nil, media.NewTestWorkerPool(), config.Config{}, zap.NewNop().Sugar()) peaks, err := service.GetPeaksForSegment(context.Background(), mediaSet.ID, tc.startFrame, tc.endFrame, tc.numBins) if tc.wantErr == "" { - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, tc.wantPeaks, peaks) } else { assert.EqualError(t, err, tc.wantErr) @@ -129,7 +173,6 @@ func BenchmarkGetPeaksForSegment(b *testing.B) { endFrame = 1323000 channels = 2 fixturePath = "testdata/tone-44100-stereo-int16-30000ms.raw" - fixtureLen = 5292000 numBins = 2000 ) diff --git a/backend/media/test_helper_test.go b/backend/media/test_helper_test.go index 90bb50b..dd25b80 100644 --- a/backend/media/test_helper_test.go +++ b/backend/media/test_helper_test.go @@ -12,6 +12,7 @@ import ( "git.netflux.io/rob/clipper/media" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) // fixtureReader loads a fixture into a ReadCloser with the provided limit. @@ -92,3 +93,10 @@ func TestHelperProcess(t *testing.T) { require.NoError(t, err) } } + +// testLogger returns a functional development logger. +func testLogger(t *testing.T) *zap.Logger { + l, err := zap.NewDevelopment() + require.NoError(t, err) + return l +}