From 542080e400e10bfe2278f4cfc1c6c5819600272f Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Wed, 17 Nov 2021 18:53:27 +0100 Subject: [PATCH] Implement GetAudioSegment, add panic recovery handler --- backend/cmd/clipper/main.go | 1 + backend/media/service.go | 59 ++++++++++++++++++------------ backend/server/server.go | 71 ++++++++++++++++++++++++++++++------- frontend/src/App.tsx | 46 +++++------------------- frontend/src/Waveform.tsx | 33 +++++++++++------ 5 files changed, 125 insertions(+), 85 deletions(-) diff --git a/backend/cmd/clipper/main.go b/backend/cmd/clipper/main.go index 712a021..d77f87d 100644 --- a/backend/cmd/clipper/main.go +++ b/backend/cmd/clipper/main.go @@ -46,6 +46,7 @@ func main() { var youtubeClient youtube.Client serverOptions := server.Options{ + Environment: server.Development, BindAddr: DefaultHTTPBindAddr, Timeout: DefaultTimeout, Store: store, diff --git a/backend/media/service.go b/backend/media/service.go index 461748f..44f1f55 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -422,7 +422,7 @@ func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID } } -func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, numBins int) ([][]int16, error) { +func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, numBins int) ([]int16, error) { mediaSet, err := s.store.GetMediaSet(ctx, id) if err != nil { return nil, fmt.Errorf("error getting media set: %v", err) @@ -444,51 +444,64 @@ func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, sta } defer output.Body.Close() - modReader := NewModuloBufReader(output.Body, int(mediaSet.AudioChannels)*SizeOfInt16) - bufSizeBytes := 8_192 - buf := make([]byte, bufSizeBytes) - - peaks := make([][]int16, mediaSet.AudioChannels) - for i := range peaks { - peaks[i] = make([]int16, numBins) - } - var currPeakIndex int - var currFrame int64 - + const readBufSizeBytes = 8_192 channels := int(mediaSet.AudioChannels) + modReader := NewModuloBufReader(output.Body, channels*SizeOfInt16) + readBuf := make([]byte, readBufSizeBytes) + peaks := make([]int16, channels*numBins) totalFrames := endFrame - startFrame framesPerBin := totalFrames / int64(numBins) + sampleBuf := make([]int16, readBufSizeBytes/SizeOfInt16) + bytesExpected := (endFrame - startFrame) * int64(channels) * SizeOfInt16 - samples := make([]int16, bufSizeBytes/SizeOfInt16) + var ( + bytesRead int64 + closing bool + currPeakIndex int + currFrame int64 + ) for { - n, err := modReader.Read(buf) - if err != nil { - if err == io.EOF { - break - } + n, err := modReader.Read(readBuf) + if err == io.EOF { + closing = true + } else if err != nil { return nil, fmt.Errorf("read error: %v", err) } - if err := binary.Read(bytes.NewReader(buf[:n]), binary.LittleEndian, samples); err != nil { + bytesRead += int64(n) + samples := sampleBuf[:n/SizeOfInt16] + + if err := binary.Read(bytes.NewReader(readBuf[:n]), binary.LittleEndian, samples); err != nil { return nil, fmt.Errorf("error interpreting samples: %v", err) } for i := 0; i < len(samples); i += channels { for j := 0; j < channels; j++ { - samp := samples[i+j] + samp := sampleBuf[i+j] if samp < 0 { samp = -samp } - if samp > peaks[currPeakIndex][j] { - peaks[currPeakIndex][j] = samp + if samp > peaks[currPeakIndex+j] { + peaks[currPeakIndex+j] = samp } } - currFrame++ + if currFrame == framesPerBin { currFrame = 0 + currPeakIndex += channels + } else { + currFrame++ } } + + if closing { + break + } + } + + if bytesRead < bytesExpected { + s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioS3Key.String).Info("short read from S3") } return peaks, nil diff --git a/backend/server/server.go b/backend/server/server.go index a723049..5c90e4b 100644 --- a/backend/server/server.go +++ b/backend/server/server.go @@ -10,7 +10,9 @@ import ( pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set" "git.netflux.io/rob/clipper/media" "github.com/google/uuid" + grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "github.com/improbable-eng/grpc-web/go/grpcweb" "go.uber.org/zap" "google.golang.org/grpc" @@ -55,7 +57,15 @@ func newResponseError(err error) *ResponseError { return &ResponseError{err: err, s: defaultResponseMessage} } +type Environment int + +const ( + Development Environment = iota + Production +) + type Options struct { + Environment Environment BindAddr string Timeout time.Duration Store media.Store @@ -120,7 +130,6 @@ func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest return newResponseError(err) } - // TODO: consider using int32 throughout the backend flow to avoid this. peaks := make([]int32, len(progress.Peaks)) for i, p := range progress.Peaks { peaks[i] = int32(p) @@ -148,38 +157,74 @@ func (c *mediaSetServiceController) GetAudioSegment(ctx context.Context, request return nil, newResponseError(err) } - _, err = c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins())) + peaks, err := c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins())) if err != nil { return nil, newResponseError(err) } - return nil, nil + peaks32 := make([]int32, len(peaks)) + for i, p := range peaks { + peaks32[i] = int32(p) + } + + response := pbMediaSet.GetAudioSegmentResponse{ + Peaks: peaks32, + } + + return &response, nil } func Start(options Options) error { - logger, _ := zap.NewDevelopment() + logger, err := buildLogger(options.Environment) + if err != nil { + return fmt.Errorf("error building logger: %v", err) + } defer logger.Sync() - grpcServer := grpc.NewServer( - grpc.UnaryInterceptor(grpczap.UnaryServerInterceptor(logger)), - ) - fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3Client, logger) + + grpcServer := buildGRPCServer(options, logger) pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{mediaSetService: fetchMediaSetService}) // TODO: configure CORS grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true })) - handler := func(w http.ResponseWriter, r *http.Request) { - grpcWebServer.ServeHTTP(w, r) - } - httpServer := http.Server{ Addr: options.BindAddr, ReadTimeout: options.Timeout, WriteTimeout: options.Timeout, - Handler: http.HandlerFunc(handler), + Handler: http.HandlerFunc(grpcWebServer.ServeHTTP), } return httpServer.ListenAndServe() } + +func buildLogger(env Environment) (*zap.Logger, error) { + if env == Production { + return zap.NewProduction() + } + return zap.NewDevelopment() +} + +func buildGRPCServer(options Options, logger *zap.Logger) *grpc.Server { + unaryInterceptors := []grpc.UnaryServerInterceptor{ + grpczap.UnaryServerInterceptor(logger), + } + streamInterceptors := []grpc.StreamServerInterceptor{ + grpczap.StreamServerInterceptor(logger), + } + if options.Environment == Production { + panicOpts := []grpcrecovery.Option{ + grpcrecovery.WithRecoveryHandler(func(p interface{}) error { + return newResponseError(fmt.Errorf("%v", p)) + }), + } + unaryInterceptors = append(unaryInterceptors, grpcrecovery.UnaryServerInterceptor(panicOpts...)) + streamInterceptors = append(streamInterceptors, grpcrecovery.StreamServerInterceptor(panicOpts...)) + } + + return grpc.NewServer( + grpc.StreamInterceptor(grpcmiddleware.ChainStreamServer(streamInterceptors...)), + grpc.UnaryInterceptor(grpcmiddleware.ChainUnaryServer(unaryInterceptors...)), + ) +} diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 0d6c784..b79d1e9 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -24,6 +24,8 @@ import { Duration } from './generated/google/protobuf/duration'; const thumbnailWidth = 177; const thumbnailHeight = 100; +const initialViewportSeconds = 10; + // Frames represents a selection of audio frames. export interface Frames { start: number; @@ -52,44 +54,7 @@ function App(): JSX.Element { const mediaSet = await service.Get({ youtubeId: videoID }); console.log('got media set:', mediaSet); - setMediaSet(mediaSet); - - // const handleProgress = (progress: GetAudioProgress) => { - // console.log('got progress', progress); - // }; - - // const audioRequest = new GetAudioRequest(); - // audioRequest.setId(videoID); - // audioRequest.setNumBins(1000); - // GetMediaSetAudio(grpcHost, audioRequest, handleProgress); - - // console.log('fetching media...'); - // const resp = await fetch( - // `http://localhost:8888/api/media_sets/${videoID}` - // ); - // const respBody = await resp.json(); - // if (respBody.error) { - // console.log('error fetching media set:', respBody.error); - // return; - // } - // const mediaSet = { - // id: respBody.id, - // source: respBody.source, - // audio: { - // sampleRate: respBody.audio.sample_rate, - // bytes: respBody.audio.bytes, - // frames: respBody.audio.frames, - // channels: respBody.audio.channels, - // }, - // video: { - // bytes: respBody.video.bytes, - // thumbnailWidth: respBody.video.thumbnail_width, - // thumbnailHeight: respBody.video.thumbnail_height, - // durationMillis: Math.floor(respBody.video.duration / 1000 / 1000), - // }, - // }; - // setMediaSet(mediaSet); })(); }, []); @@ -120,7 +85,12 @@ function App(): JSX.Element { return; } - setViewport({ start: 0, end: mediaSet.audioFrames }); + const numFrames = Math.min( + mediaSet.audioSampleRate * initialViewportSeconds, + mediaSet.audioFrames + ); + + setViewport({ start: 0, end: numFrames }); }, [mediaSet]); useEffect(() => { diff --git a/frontend/src/Waveform.tsx b/frontend/src/Waveform.tsx index 4be9b60..ed991ac 100644 --- a/frontend/src/Waveform.tsx +++ b/frontend/src/Waveform.tsx @@ -1,9 +1,10 @@ import { useEffect, useState, useRef } from 'react'; -import { Frames } from './App'; -import { MediaSet } from './generated/media_set'; +import { Frames, newRPC } from './App'; +import { MediaSetServiceClientImpl, MediaSet } from './generated/media_set'; import { WaveformCanvas } from './WaveformCanvas'; import { secsToCanvasX } from './Helpers'; import { from, Observable } from 'rxjs'; +import { bufferCount } from 'rxjs/operators'; interface Props { mediaSet: MediaSet; @@ -33,18 +34,28 @@ export const Waveform: React.FC = ({ return; } - let endFrame = viewport.end; - if (endFrame <= viewport.start) { - endFrame = mediaSet.audioFrames; + if (viewport.start >= viewport.end) { + return; } - // const resp = await fetch( - // `http://localhost:8888/api/media_sets/${mediaSet.id}/peaks?start=${viewport.start}&end=${endFrame}&bins=${CanvasLogicalWidth}` - // ); - // const newPeaks = await resp.json(); - // setPeaks(newPeaks); + console.log('fetch audio segment...'); + + const service = new MediaSetServiceClientImpl(newRPC()); + const segment = await service.GetAudioSegment({ + id: mediaSet.id, + numBins: CanvasLogicalWidth, + startFrame: viewport.start, + endFrame: viewport.end, + }); + + console.log('got segment', segment); + + const peaks = from(segment.peaks).pipe( + bufferCount(mediaSet.audioChannels) + ); + setPeaks(peaks); })(); - }, [mediaSet, viewport]); + }, [viewport]); // render HUD useEffect(() => {