From 149647362018ee5bb07a82972b2582072a0469ae Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Mon, 8 Nov 2021 14:56:25 +0100 Subject: [PATCH] Improve gRPC error handling, handle cancelling ffmpegReader --- backend/media/ffmpeg_reader.go | 7 ++++++ backend/media/service.go | 8 ++++++- backend/server/server.go | 41 ++++++++++++++++++++++++---------- 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/backend/media/ffmpeg_reader.go b/backend/media/ffmpeg_reader.go index 6ca05ca..71ad10b 100644 --- a/backend/media/ffmpeg_reader.go +++ b/backend/media/ffmpeg_reader.go @@ -33,6 +33,13 @@ func newFfmpegReader(ctx context.Context, input io.Reader, arg ...string) (*ffmp return &ffmpegReader{ReadCloser: r, cmd: cmd}, nil } +func (r *ffmpegReader) Cancel() error { + if err := r.cmd.Process.Kill(); err != nil { + return fmt.Errorf("error killing ffmpeg process: %v", err) + } + return nil +} + func (r *ffmpegReader) Close() error { state, err := r.cmd.Process.Wait() diff --git a/backend/media/service.go b/backend/media/service.go index a58aecf..0c0bb93 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -377,7 +377,7 @@ func (s *MediaSetService) getAudioFromYoutube(ctx context.Context, mediaSet stor type getAudioFromYoutubeState struct { *fetchAudioProgressReader - ffmpegReader io.ReadCloser + ffmpegReader *ffmpegReader uploader *multipartUploadWriter s3Bucket, s3Key string store Store @@ -404,6 +404,12 @@ outer: } } + if err != nil { + if cancelErr := s.ffmpegReader.Cancel(); cancelErr != nil { + log.Printf("error cancelling ffmpegreader: %v", cancelErr) + } + } + if readerErr := s.ffmpegReader.Close(); readerErr != nil { if err == nil { err = readerErr diff --git a/backend/server/server.go b/backend/server/server.go index 02d0291..ba6626a 100644 --- a/backend/server/server.go +++ b/backend/server/server.go @@ -17,25 +17,43 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" ) -type ResponseError struct { - error +func init() { + grpclog.SetLogger(log.New(os.Stdout, "server: ", log.LstdFlags)) +} - ResponseCode codes.Code +const ( + // ts-proto generates code that automatically retries for a subset of gRPC + // response codes. To avoid invoking this behaviour, default to returning a + // Cancelled code. Later, + // See https://github.com/stephenh/ts-proto/blob/459b94f5b2988d58d186461332e888c3e511603a/src/generate-grpc-web.ts#L293 + // and https://github.com/stephenh/ts-proto/pull/131. + defaultResponseCode = codes.Canceled + defaultResponseMessage = "An unexpected error occurred" +) + +type ResponseError struct { + err error + s string } func (r *ResponseError) Error() string { - return fmt.Sprintf("An unexpected error occurred: %v (error code = %d).", r.error.Error(), r.ResponseCode) + return fmt.Sprintf("unexpected error: %v", r.err.Error()) } func (r *ResponseError) Unwrap() error { - return r.error + return r.err } -func newResponseError(err error, code codes.Code) *ResponseError { - return &ResponseError{error: err, ResponseCode: code} +func (r *ResponseError) GRPCStatus() *status.Status { + return status.New(defaultResponseCode, r.s) +} + +func newResponseError(err error) *ResponseError { + return &ResponseError{err: err, s: defaultResponseMessage} } type Options struct { @@ -61,7 +79,7 @@ type mediaSetServiceController struct { func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet.GetRequest) (*pbMediaSet.MediaSet, error) { mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId()) if err != nil { - return nil, newResponseError(err, codes.Unknown) + return nil, newResponseError(err) } result := pbMediaSet.MediaSet{ @@ -88,12 +106,12 @@ func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest id, err := uuid.Parse(request.GetId()) if err != nil { - return newResponseError(err, codes.Unknown) + return newResponseError(err) } reader, err := c.mediaSetService.GetAudio(ctx, id, int(request.GetNumBins())) if err != nil { - return newResponseError(err, codes.Unknown) + return newResponseError(err) } for { @@ -102,7 +120,7 @@ func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest if err == io.EOF { break } - return newResponseError(err, codes.Unknown) + return newResponseError(err) } // TODO: consider using int32 throughout the backend flow to avoid this. @@ -128,7 +146,6 @@ func Start(options Options) error { fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3Client) pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{mediaSetService: fetchMediaSetService}) - grpclog.SetLogger(log.New(os.Stdout, "server: ", log.LstdFlags)) // TODO: proper CORS support grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true }))