diff --git a/backend/cmd/progress-test/main.go b/backend/cmd/progress-test/main.go index 5d400bc..93ea33b 100644 --- a/backend/cmd/progress-test/main.go +++ b/backend/cmd/progress-test/main.go @@ -16,7 +16,8 @@ import ( ) const ( - videoID = "N1BufwrE1I8" + // videoID = "N1BufwrE1I8" + videoID = "LBRIsFbOoc4" ) func main() { diff --git a/backend/media/audio_progress.go b/backend/media/audio_progress.go index 2ccc989..f466f79 100644 --- a/backend/media/audio_progress.go +++ b/backend/media/audio_progress.go @@ -28,7 +28,7 @@ type fetchAudioProgressReader struct { samples []int16 currPeaks []int16 currCount int - framesProcessed int + framesProcessed int64 progress chan GetAudioProgress errorChan chan error } @@ -84,7 +84,7 @@ func (w *fetchAudioProgressReader) Write(p []byte) (int, error) { } } - w.framesProcessed += len(samples) / w.channels + w.framesProcessed += int64(len(samples) / w.channels) return len(p), nil } @@ -98,7 +98,6 @@ func (w *fetchAudioProgressReader) nextBin() { w.progress <- progress w.currCount = 0 - // log.Printf("got peak for %d frames, which is equal to target of %d frames per bin, %d total bins processed, peaks: %+v", w.currCount, w.framesPerBin, w.total+1, w.currPeaks) for i := 0; i < len(w.currPeaks); i++ { w.currPeaks[i] = 0 } diff --git a/backend/media/service.go b/backend/media/service.go index 370b092..a58d3cc 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -1,6 +1,7 @@ package media import ( + "bytes" "context" "database/sql" "errors" @@ -265,7 +266,7 @@ func (s *MediaSetService) getAudioFromS3(ctx context.Context, mediaSet store.Med state := getAudioFromS3State{ fetchAudioProgressReader: fetchAudioProgressReader, - s3Reader: output.Body, + s3Reader: NewModuloBufReader(output.Body, int(mediaSet.AudioChannels)*SizeOfInt16), } go state.run(ctx) @@ -365,7 +366,7 @@ func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet stor s3Key: s3Key, store: s.store, } - go state.run(ctx) + go state.run(ctx, mediaSet.ID) return &state, nil } @@ -379,7 +380,7 @@ type getAudioFromYoutubeState struct { store Store } -func (s *getAudioFromYoutubeState) run(ctx context.Context) { +func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID) { mw := io.MultiWriter(s, s.uploader) done := make(chan error) var err error @@ -417,6 +418,7 @@ outer: if err == nil { _, updateErr := s.store.SetAudioUploaded(ctx, store.SetAudioUploadedParams{ + ID: mediaSetID, AudioS3Bucket: sqlString(s.s3Bucket), AudioS3Key: sqlString(s.s3Key), AudioFrames: sqlInt64(framesUploaded), @@ -450,3 +452,29 @@ func sqlString(s string) sql.NullString { func sqlInt64(i int64) sql.NullInt64 { return sql.NullInt64{Int64: i, Valid: true} } + +type ModuloBufReader struct { + io.ReadCloser + + buf bytes.Buffer + modSize int +} + +func NewModuloBufReader(r io.ReadCloser, modSize int) *ModuloBufReader { + return &ModuloBufReader{ReadCloser: r, modSize: modSize} +} + +func (r *ModuloBufReader) Read(p []byte) (int, error) { + // err is always io.EOF or nil + nr1, _ := r.buf.Read(p) + nr2, err := r.ReadCloser.Read(p[nr1:]) + + nr := nr1 + nr2 + rem := nr % r.modSize + if rem != 0 { + // err is always nil + _, _ = r.buf.Write(p[nr-rem:]) + } + + return nr - rem, err +} diff --git a/backend/media/service_test.go b/backend/media/service_test.go new file mode 100644 index 0000000..8c44c94 --- /dev/null +++ b/backend/media/service_test.go @@ -0,0 +1,69 @@ +package media_test + +import ( + "io" + "testing" + + "git.netflux.io/rob/clipper/media" + "github.com/stretchr/testify/assert" +) + +type testReader struct { + count int + data [][]byte +} + +func (r *testReader) Read(p []byte) (int, error) { + if r.count == len(r.data) { + return 0, io.EOF + } + n := copy(p, r.data[r.count]) + r.count++ + return n, nil +} + +func TestModuloBufReader(t *testing.T) { + reader := testReader{ + data: [][]byte{ + {'a', 'b', 'c', 'd'}, + {'e', 'f', 'g', 'h', 'i'}, + {'j', 'k', 'l', 'm'}, + {'n', 'o', 'p'}, + {'q', 'r', 's', 't'}, + }, + } + + modReader := media.NewModuloBufReader(io.NopCloser(&reader), 4) + + out := make([]byte, 5) + + n, err := modReader.Read(out) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, []byte{'a', 'b', 'c', 'd'}, out[:n]) + + n, err = modReader.Read(out) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, []byte{'e', 'f', 'g', 'h'}, out[:n]) + + n, err = modReader.Read(out) + assert.NoError(t, err) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, []byte{'i', 'j', 'k', 'l'}, out[:n]) + + n, err = modReader.Read(out) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, []byte{'m', 'n', 'o', 'p'}, out[:n]) + + n, err = modReader.Read(out) + assert.NoError(t, err) + assert.Equal(t, 4, n) + assert.Equal(t, []byte{'q', 'r', 's', 't'}, out[:n]) + + n, err = modReader.Read(out) + assert.Zero(t, n) + assert.Equal(t, io.EOF, err) +} diff --git a/backend/sql/queries.sql b/backend/sql/queries.sql index caf7298..cea7f0b 100644 --- a/backend/sql/queries.sql +++ b/backend/sql/queries.sql @@ -11,5 +11,6 @@ INSERT INTO media_sets (youtube_id, audio_youtube_itag, audio_channels, audio_fr -- name: SetAudioUploaded :one UPDATE media_sets - SET audio_s3_bucket = $1, audio_s3_key = $2, audio_frames = $3, audio_s3_uploaded_at = NOW(), updated_at = NOW() + SET audio_s3_bucket = $2, audio_s3_key = $3, audio_frames = $4, audio_s3_uploaded_at = NOW(), updated_at = NOW() + WHERE id = $1 RETURNING *; diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index d967e2c..0d6c784 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -20,8 +20,6 @@ import { SeekBar } from './SeekBar'; import './App.css'; import { Duration } from './generated/google/protobuf/duration'; -const grpcHost = 'http://localhost:8888'; - // ported from backend, where should they live? const thumbnailWidth = 177; const thumbnailHeight = 100; @@ -49,7 +47,7 @@ function App(): JSX.Element { // fetch mediaset on page load: useEffect(() => { (async function () { - const rpc = new GrpcWebImpl('http://localhost:8888', {}); + const rpc = newRPC(); const service = new MediaSetServiceClientImpl(rpc); const mediaSet = await service.Get({ youtubeId: videoID }); @@ -224,3 +222,9 @@ function millisFromDuration(dur?: Duration): number { } return Math.floor(dur.seconds * 1000.0 + dur.nanos / 1000.0 / 1000.0); } + +const grpcHost = 'http://localhost:8888'; + +export function newRPC(): GrpcWebImpl { + return new GrpcWebImpl(grpcHost, {}); +} diff --git a/frontend/src/Overview.tsx b/frontend/src/Overview.tsx index 299d387..54137d1 100644 --- a/frontend/src/Overview.tsx +++ b/frontend/src/Overview.tsx @@ -1,6 +1,10 @@ import { useState, useEffect, useRef, MouseEvent } from 'react'; -import { MediaSet } from './generated/media_set'; -import { Frames } from './App'; +import { + MediaSetServiceClientImpl, + MediaSet, + GetAudioProgress, +} from './generated/media_set'; +import { Frames, newRPC } from './App'; import { WaveformCanvas } from './WaveformCanvas'; import { mouseEventToCanvasX } from './Helpers'; import { secsToCanvasX } from './Helpers'; @@ -50,6 +54,17 @@ export const Overview: React.FC = ({ return; } + console.log('fetching audio...'); + const service = new MediaSetServiceClientImpl(newRPC()); + const observable = service.GetAudio({ id: mediaSet.id, numBins: 2_000 }); + + console.log('calling forEach...'); + await observable.forEach((progress: GetAudioProgress) => { + console.log('got progress', progress.percentCompleted); + }); + + console.log('done'); + // const resp = await fetch( // `http://localhost:8888/api/media_sets/${mediaSet.id}/peaks?start=0&end=${mediaSet.audioFrames}&bins=${CanvasLogicalWidth}` // );