Implement GetAudioSegment, add panic recovery handler

This commit is contained in:
Rob Watson 2021-11-17 18:53:27 +01:00
parent 8794b42459
commit 542080e400
5 changed files with 125 additions and 85 deletions

View File

@ -46,6 +46,7 @@ func main() {
var youtubeClient youtube.Client
serverOptions := server.Options{
Environment: server.Development,
BindAddr: DefaultHTTPBindAddr,
Timeout: DefaultTimeout,
Store: store,

View File

@ -422,7 +422,7 @@ func (s *getAudioFromYoutubeState) run(ctx context.Context, mediaSetID uuid.UUID
}
}
func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, numBins int) ([][]int16, error) {
func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, numBins int) ([]int16, error) {
mediaSet, err := s.store.GetMediaSet(ctx, id)
if err != nil {
return nil, fmt.Errorf("error getting media set: %v", err)
@ -444,51 +444,64 @@ func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, sta
}
defer output.Body.Close()
modReader := NewModuloBufReader(output.Body, int(mediaSet.AudioChannels)*SizeOfInt16)
bufSizeBytes := 8_192
buf := make([]byte, bufSizeBytes)
peaks := make([][]int16, mediaSet.AudioChannels)
for i := range peaks {
peaks[i] = make([]int16, numBins)
}
var currPeakIndex int
var currFrame int64
const readBufSizeBytes = 8_192
channels := int(mediaSet.AudioChannels)
modReader := NewModuloBufReader(output.Body, channels*SizeOfInt16)
readBuf := make([]byte, readBufSizeBytes)
peaks := make([]int16, channels*numBins)
totalFrames := endFrame - startFrame
framesPerBin := totalFrames / int64(numBins)
sampleBuf := make([]int16, readBufSizeBytes/SizeOfInt16)
bytesExpected := (endFrame - startFrame) * int64(channels) * SizeOfInt16
samples := make([]int16, bufSizeBytes/SizeOfInt16)
var (
bytesRead int64
closing bool
currPeakIndex int
currFrame int64
)
for {
n, err := modReader.Read(buf)
if err != nil {
n, err := modReader.Read(readBuf)
if err == io.EOF {
break
}
closing = true
} else if err != nil {
return nil, fmt.Errorf("read error: %v", err)
}
if err := binary.Read(bytes.NewReader(buf[:n]), binary.LittleEndian, samples); err != nil {
bytesRead += int64(n)
samples := sampleBuf[:n/SizeOfInt16]
if err := binary.Read(bytes.NewReader(readBuf[:n]), binary.LittleEndian, samples); err != nil {
return nil, fmt.Errorf("error interpreting samples: %v", err)
}
for i := 0; i < len(samples); i += channels {
for j := 0; j < channels; j++ {
samp := samples[i+j]
samp := sampleBuf[i+j]
if samp < 0 {
samp = -samp
}
if samp > peaks[currPeakIndex][j] {
peaks[currPeakIndex][j] = samp
if samp > peaks[currPeakIndex+j] {
peaks[currPeakIndex+j] = samp
}
}
currFrame++
if currFrame == framesPerBin {
currFrame = 0
currPeakIndex += channels
} else {
currFrame++
}
}
if closing {
break
}
}
if bytesRead < bytesExpected {
s.logger.With("startFrame", startFrame, "endFrame", endFrame, "got", bytesRead, "want", bytesExpected, "key", mediaSet.AudioS3Key.String).Info("short read from S3")
}
return peaks, nil

View File

@ -10,7 +10,9 @@ import (
pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set"
"git.netflux.io/rob/clipper/media"
"github.com/google/uuid"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -55,7 +57,15 @@ func newResponseError(err error) *ResponseError {
return &ResponseError{err: err, s: defaultResponseMessage}
}
type Environment int
const (
Development Environment = iota
Production
)
type Options struct {
Environment Environment
BindAddr string
Timeout time.Duration
Store media.Store
@ -120,7 +130,6 @@ func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest
return newResponseError(err)
}
// TODO: consider using int32 throughout the backend flow to avoid this.
peaks := make([]int32, len(progress.Peaks))
for i, p := range progress.Peaks {
peaks[i] = int32(p)
@ -148,38 +157,74 @@ func (c *mediaSetServiceController) GetAudioSegment(ctx context.Context, request
return nil, newResponseError(err)
}
_, err = c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins()))
peaks, err := c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins()))
if err != nil {
return nil, newResponseError(err)
}
return nil, nil
peaks32 := make([]int32, len(peaks))
for i, p := range peaks {
peaks32[i] = int32(p)
}
response := pbMediaSet.GetAudioSegmentResponse{
Peaks: peaks32,
}
return &response, nil
}
func Start(options Options) error {
logger, _ := zap.NewDevelopment()
logger, err := buildLogger(options.Environment)
if err != nil {
return fmt.Errorf("error building logger: %v", err)
}
defer logger.Sync()
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(grpczap.UnaryServerInterceptor(logger)),
)
fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3Client, logger)
grpcServer := buildGRPCServer(options, logger)
pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{mediaSetService: fetchMediaSetService})
// TODO: configure CORS
grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true }))
handler := func(w http.ResponseWriter, r *http.Request) {
grpcWebServer.ServeHTTP(w, r)
}
httpServer := http.Server{
Addr: options.BindAddr,
ReadTimeout: options.Timeout,
WriteTimeout: options.Timeout,
Handler: http.HandlerFunc(handler),
Handler: http.HandlerFunc(grpcWebServer.ServeHTTP),
}
return httpServer.ListenAndServe()
}
func buildLogger(env Environment) (*zap.Logger, error) {
if env == Production {
return zap.NewProduction()
}
return zap.NewDevelopment()
}
func buildGRPCServer(options Options, logger *zap.Logger) *grpc.Server {
unaryInterceptors := []grpc.UnaryServerInterceptor{
grpczap.UnaryServerInterceptor(logger),
}
streamInterceptors := []grpc.StreamServerInterceptor{
grpczap.StreamServerInterceptor(logger),
}
if options.Environment == Production {
panicOpts := []grpcrecovery.Option{
grpcrecovery.WithRecoveryHandler(func(p interface{}) error {
return newResponseError(fmt.Errorf("%v", p))
}),
}
unaryInterceptors = append(unaryInterceptors, grpcrecovery.UnaryServerInterceptor(panicOpts...))
streamInterceptors = append(streamInterceptors, grpcrecovery.StreamServerInterceptor(panicOpts...))
}
return grpc.NewServer(
grpc.StreamInterceptor(grpcmiddleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpcmiddleware.ChainUnaryServer(unaryInterceptors...)),
)
}

View File

@ -24,6 +24,8 @@ import { Duration } from './generated/google/protobuf/duration';
const thumbnailWidth = 177;
const thumbnailHeight = 100;
const initialViewportSeconds = 10;
// Frames represents a selection of audio frames.
export interface Frames {
start: number;
@ -52,44 +54,7 @@ function App(): JSX.Element {
const mediaSet = await service.Get({ youtubeId: videoID });
console.log('got media set:', mediaSet);
setMediaSet(mediaSet);
// const handleProgress = (progress: GetAudioProgress) => {
// console.log('got progress', progress);
// };
// const audioRequest = new GetAudioRequest();
// audioRequest.setId(videoID);
// audioRequest.setNumBins(1000);
// GetMediaSetAudio(grpcHost, audioRequest, handleProgress);
// console.log('fetching media...');
// const resp = await fetch(
// `http://localhost:8888/api/media_sets/${videoID}`
// );
// const respBody = await resp.json();
// if (respBody.error) {
// console.log('error fetching media set:', respBody.error);
// return;
// }
// const mediaSet = {
// id: respBody.id,
// source: respBody.source,
// audio: {
// sampleRate: respBody.audio.sample_rate,
// bytes: respBody.audio.bytes,
// frames: respBody.audio.frames,
// channels: respBody.audio.channels,
// },
// video: {
// bytes: respBody.video.bytes,
// thumbnailWidth: respBody.video.thumbnail_width,
// thumbnailHeight: respBody.video.thumbnail_height,
// durationMillis: Math.floor(respBody.video.duration / 1000 / 1000),
// },
// };
// setMediaSet(mediaSet);
})();
}, []);
@ -120,7 +85,12 @@ function App(): JSX.Element {
return;
}
setViewport({ start: 0, end: mediaSet.audioFrames });
const numFrames = Math.min(
mediaSet.audioSampleRate * initialViewportSeconds,
mediaSet.audioFrames
);
setViewport({ start: 0, end: numFrames });
}, [mediaSet]);
useEffect(() => {

View File

@ -1,9 +1,10 @@
import { useEffect, useState, useRef } from 'react';
import { Frames } from './App';
import { MediaSet } from './generated/media_set';
import { Frames, newRPC } from './App';
import { MediaSetServiceClientImpl, MediaSet } from './generated/media_set';
import { WaveformCanvas } from './WaveformCanvas';
import { secsToCanvasX } from './Helpers';
import { from, Observable } from 'rxjs';
import { bufferCount } from 'rxjs/operators';
interface Props {
mediaSet: MediaSet;
@ -33,18 +34,28 @@ export const Waveform: React.FC<Props> = ({
return;
}
let endFrame = viewport.end;
if (endFrame <= viewport.start) {
endFrame = mediaSet.audioFrames;
if (viewport.start >= viewport.end) {
return;
}
// const resp = await fetch(
// `http://localhost:8888/api/media_sets/${mediaSet.id}/peaks?start=${viewport.start}&end=${endFrame}&bins=${CanvasLogicalWidth}`
// );
// const newPeaks = await resp.json();
// setPeaks(newPeaks);
console.log('fetch audio segment...');
const service = new MediaSetServiceClientImpl(newRPC());
const segment = await service.GetAudioSegment({
id: mediaSet.id,
numBins: CanvasLogicalWidth,
startFrame: viewport.start,
endFrame: viewport.end,
});
console.log('got segment', segment);
const peaks = from(segment.peaks).pipe(
bufferCount(mediaSet.audioChannels)
);
setPeaks(peaks);
})();
}, [mediaSet, viewport]);
}, [viewport]);
// render HUD
useEffect(() => {