package server import ( "context" "errors" "fmt" "io" "net/http" "os/exec" "time" "git.netflux.io/rob/clipper/config" pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set" "git.netflux.io/rob/clipper/media" "github.com/google/uuid" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "github.com/improbable-eng/grpc-web/go/grpcweb" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" ) const ( // ts-proto generates code that automatically retries for a subset of gRPC // response codes. To avoid invoking this behaviour, default to returning a // Cancelled code for now. // See https://github.com/stephenh/ts-proto/blob/459b94f5b2988d58d186461332e888c3e511603a/src/generate-grpc-web.ts#L293 // and https://github.com/stephenh/ts-proto/pull/131. defaultResponseCode = codes.Canceled defaultResponseMessage = "An unexpected error occurred" ) const ( getPeaksTimeout = time.Minute * 5 getPeaksForSegmentTimeout = time.Second * 10 getAudioSegmentTimeout = time.Minute * 2 getVideoTimeout = time.Minute * 5 ) type ResponseError struct { err error s string } func (r *ResponseError) Error() string { return fmt.Sprintf("unexpected error: %v", r.err.Error()) } func (r *ResponseError) Unwrap() error { return r.err } func (r *ResponseError) GRPCStatus() *status.Status { return status.New(defaultResponseCode, r.s) } func newResponseError(err error) *ResponseError { return &ResponseError{err: err, s: defaultResponseMessage} } type Options struct { Config config.Config Timeout time.Duration Store media.Store YoutubeClient media.YoutubeClient FileStore media.FileStore WorkerPool *media.WorkerPool Logger *zap.Logger } // mediaSetServiceController implements gRPC controller for MediaSetService type mediaSetServiceController struct { pbmediaset.UnimplementedMediaSetServiceServer mediaSetService *media.MediaSetService logger *zap.SugaredLogger } // Get returns a pbMediaSet.MediaSet func (c *mediaSetServiceController) Get(ctx context.Context, request *pbmediaset.GetRequest) (*pbmediaset.MediaSet, error) { mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId()) if err != nil { return nil, newResponseError(err) } result := pbmediaset.MediaSet{ Id: mediaSet.ID.String(), YoutubeId: mediaSet.YoutubeID, AudioChannels: int32(mediaSet.Audio.Channels), AudioFrames: mediaSet.Audio.Frames, AudioApproxFrames: mediaSet.Audio.ApproxFrames, AudioSampleRate: int32(mediaSet.Audio.SampleRate), AudioYoutubeItag: int32(mediaSet.Audio.YoutubeItag), AudioMimeType: mediaSet.Audio.MimeType, VideoDuration: durationpb.New(mediaSet.Video.Duration), VideoYoutubeItag: int32(mediaSet.Video.YoutubeItag), VideoMimeType: mediaSet.Video.MimeType, } return &result, nil } // GetPeaks returns a stream of GetPeaksProgress relating to the entire audio // part of the MediaSet. func (c *mediaSetServiceController) GetPeaks(request *pbmediaset.GetPeaksRequest, stream pbmediaset.MediaSetService_GetPeaksServer) error { // TODO: reduce timeout when fetching from S3 ctx, cancel := context.WithTimeout(context.Background(), getPeaksTimeout) defer cancel() id, err := uuid.Parse(request.GetId()) if err != nil { return newResponseError(err) } reader, err := c.mediaSetService.GetPeaks(ctx, id, int(request.GetNumBins())) if err != nil { return newResponseError(err) } for { progress, err := reader.Next() if err != nil && err != io.EOF { return newResponseError(err) } peaks := make([]int32, len(progress.Peaks)) for i, p := range progress.Peaks { peaks[i] = int32(p) } progressPb := pbmediaset.GetPeaksProgress{ PercentComplete: progress.PercentComplete, Url: progress.URL, Peaks: peaks, } stream.Send(&progressPb) if err == io.EOF { break } } return nil } // GetPeaksForSegment returns a set of peaks for a segment of an audio part of // a MediaSet. func (c *mediaSetServiceController) GetPeaksForSegment(ctx context.Context, request *pbmediaset.GetPeaksForSegmentRequest) (*pbmediaset.GetPeaksForSegmentResponse, error) { ctx, cancel := context.WithTimeout(ctx, getPeaksForSegmentTimeout) defer cancel() id, err := uuid.Parse(request.GetId()) if err != nil { return nil, newResponseError(err) } peaks, err := c.mediaSetService.GetPeaksForSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins())) if err != nil { return nil, newResponseError(err) } peaks32 := make([]int32, len(peaks)) for i, p := range peaks { peaks32[i] = int32(p) } return &pbmediaset.GetPeaksForSegmentResponse{Peaks: peaks32}, nil } func (c *mediaSetServiceController) GetAudioSegment(request *pbmediaset.GetAudioSegmentRequest, outStream pbmediaset.MediaSetService_GetAudioSegmentServer) error { ctx, cancel := context.WithTimeout(context.Background(), getPeaksForSegmentTimeout) defer cancel() id, err := uuid.Parse(request.GetId()) if err != nil { return newResponseError(err) } var format media.AudioFormat switch request.Format { case pbmediaset.AudioFormat_MP3: format = media.AudioFormatMP3 case pbmediaset.AudioFormat_WAV: format = media.AudioFormatWAV default: return newResponseError(errors.New("unknown format")) } stream, err := c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, format) if err != nil { return newResponseError(err) } for { progress, err := stream.Next(ctx) if err != nil && err != io.EOF { return newResponseError(err) } progressPb := pbmediaset.GetAudioSegmentProgress{ PercentComplete: progress.PercentComplete, AudioData: progress.Data, } outStream.Send(&progressPb) if err == io.EOF { break } } return nil } func (c *mediaSetServiceController) GetVideo(request *pbmediaset.GetVideoRequest, stream pbmediaset.MediaSetService_GetVideoServer) error { // TODO: reduce timeout when already fetched from Youtube ctx, cancel := context.WithTimeout(context.Background(), getVideoTimeout) defer cancel() id, err := uuid.Parse(request.GetId()) if err != nil { return newResponseError(err) } reader, err := c.mediaSetService.GetVideo(ctx, id) if err != nil { return newResponseError(err) } for { progress, err := reader.Next() if err != nil && err != io.EOF { return newResponseError(err) } progressPb := pbmediaset.GetVideoProgress{ PercentComplete: progress.PercentComplete, Url: progress.URL, } stream.Send(&progressPb) if err == io.EOF { break } } return nil } func (c *mediaSetServiceController) GetVideoThumbnail(ctx context.Context, request *pbmediaset.GetVideoThumbnailRequest) (*pbmediaset.GetVideoThumbnailResponse, error) { id, err := uuid.Parse(request.GetId()) if err != nil { return nil, newResponseError(err) } thumbnail, err := c.mediaSetService.GetVideoThumbnail(ctx, id) if err != nil { return nil, newResponseError(err) } response := pbmediaset.GetVideoThumbnailResponse{ Image: thumbnail.Data, Width: int32(thumbnail.Width), Height: int32(thumbnail.Height), } return &response, nil } func Start(options Options) error { fetchMediaSetService := media.NewMediaSetService( options.Store, options.YoutubeClient, options.FileStore, exec.CommandContext, options.WorkerPool, options.Config, options.Logger.Sugar().Named("mediaSetService"), ) grpcServer, err := buildGRPCServer(options.Config, options.Logger) if err != nil { return fmt.Errorf("error building server: %v", err) } mediaSetController := &mediaSetServiceController{mediaSetService: fetchMediaSetService, logger: options.Logger.Sugar().Named("controller")} pbmediaset.RegisterMediaSetServiceServer(grpcServer, mediaSetController) // TODO: configure CORS grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true })) log := options.Logger.Sugar() fileHandler := http.NotFoundHandler() // Enabling the file system store disables serving assets over HTTP. // TODO: fix this. if options.Config.AssetsHTTPRoot != "" { log.With("root", options.Config.AssetsHTTPRoot).Info("Configured to serve assets over HTTP") fileHandler = http.FileServer(http.Dir(options.Config.AssetsHTTPRoot)) } if options.Config.FileStoreHTTPRoot != "" { log.With("root", options.Config.FileStoreHTTPRoot).Info("Configured to serve file store over HTTP") fileHandler = http.FileServer(http.Dir(options.Config.FileStoreHTTPRoot)) } httpServer := http.Server{ Addr: options.Config.BindAddr, ReadTimeout: options.Timeout, WriteTimeout: options.Timeout, Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if !grpcWebServer.IsGrpcWebRequest(r) && !grpcWebServer.IsAcceptableGrpcCorsRequest(r) { fileHandler.ServeHTTP(w, r) return } grpcWebServer.ServeHTTP(w, r) }), } log.Infof("Listening at %s", options.Config.BindAddr) if options.Config.TLSCertFile != "" && options.Config.TLSKeyFile != "" { return httpServer.ListenAndServeTLS(options.Config.TLSCertFile, options.Config.TLSKeyFile) } return httpServer.ListenAndServe() } func buildGRPCServer(c config.Config, logger *zap.Logger) (*grpc.Server, error) { unaryInterceptors := []grpc.UnaryServerInterceptor{ grpczap.UnaryServerInterceptor(logger), } streamInterceptors := []grpc.StreamServerInterceptor{ grpczap.StreamServerInterceptor(logger), } if c.Environment == config.EnvProduction { panicOpts := []grpcrecovery.Option{ grpcrecovery.WithRecoveryHandler(func(p interface{}) error { return newResponseError(fmt.Errorf("%v", p)) }), } unaryInterceptors = append(unaryInterceptors, grpcrecovery.UnaryServerInterceptor(panicOpts...)) streamInterceptors = append(streamInterceptors, grpcrecovery.StreamServerInterceptor(panicOpts...)) } return grpc.NewServer( grpc.StreamInterceptor(grpcmiddleware.ChainStreamServer(streamInterceptors...)), grpc.UnaryInterceptor(grpcmiddleware.ChainUnaryServer(unaryInterceptors...)), ), nil }