package server import ( "context" "fmt" "io" "net/http" "time" "git.netflux.io/rob/clipper/config" pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set" "git.netflux.io/rob/clipper/media" "github.com/google/uuid" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "github.com/improbable-eng/grpc-web/go/grpcweb" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" ) const ( // ts-proto generates code that automatically retries for a subset of gRPC // response codes. To avoid invoking this behaviour, default to returning a // Cancelled code for now. // See https://github.com/stephenh/ts-proto/blob/459b94f5b2988d58d186461332e888c3e511603a/src/generate-grpc-web.ts#L293 // and https://github.com/stephenh/ts-proto/pull/131. defaultResponseCode = codes.Canceled defaultResponseMessage = "An unexpected error occurred" ) const ( getAudioTimeout = time.Minute * 5 getAudioSegmentTimeout = time.Second * 10 getVideoTimeout = time.Minute * 5 ) type ResponseError struct { err error s string } func (r *ResponseError) Error() string { return fmt.Sprintf("unexpected error: %v", r.err.Error()) } func (r *ResponseError) Unwrap() error { return r.err } func (r *ResponseError) GRPCStatus() *status.Status { return status.New(defaultResponseCode, r.s) } func newResponseError(err error) *ResponseError { return &ResponseError{err: err, s: defaultResponseMessage} } type Options struct { Config config.Config Timeout time.Duration Store media.Store YoutubeClient media.YoutubeClient S3API media.S3API } // mediaSetServiceController implements gRPC controller for MediaSetService type mediaSetServiceController struct { pbmediaset.UnimplementedMediaSetServiceServer mediaSetService *media.MediaSetService logger *zap.SugaredLogger } // Get returns a pbMediaSet.MediaSet func (c *mediaSetServiceController) Get(ctx context.Context, request *pbmediaset.GetRequest) (*pbmediaset.MediaSet, error) { mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId()) if err != nil { return nil, newResponseError(err) } result := pbmediaset.MediaSet{ Id: mediaSet.ID.String(), YoutubeId: mediaSet.YoutubeID, AudioChannels: int32(mediaSet.Audio.Channels), AudioFrames: mediaSet.Audio.Frames, AudioApproxFrames: mediaSet.Audio.ApproxFrames, AudioSampleRate: int32(mediaSet.Audio.SampleRate), AudioYoutubeItag: int32(mediaSet.Audio.YoutubeItag), AudioMimeType: mediaSet.Audio.MimeType, VideoDuration: durationpb.New(mediaSet.Video.Duration), VideoYoutubeItag: int32(mediaSet.Video.YoutubeItag), VideoMimeType: mediaSet.Video.MimeType, } return &result, nil } // GetAudio returns a stream of GetAudioProgress relating to the entire audio // part of the MediaSet. func (c *mediaSetServiceController) GetAudio(request *pbmediaset.GetAudioRequest, stream pbmediaset.MediaSetService_GetAudioServer) error { // TODO: reduce timeout when fetching from S3 ctx, cancel := context.WithTimeout(context.Background(), getAudioTimeout) defer cancel() id, err := uuid.Parse(request.GetId()) if err != nil { return newResponseError(err) } reader, err := c.mediaSetService.GetAudio(ctx, id, int(request.GetNumBins())) if err != nil { return newResponseError(err) } for { progress, err := reader.Read() if err != nil && err != io.EOF { return newResponseError(err) } peaks := make([]int32, len(progress.Peaks)) for i, p := range progress.Peaks { peaks[i] = int32(p) } progressPb := pbmediaset.GetAudioProgress{ PercentComplete: progress.PercentComplete, Peaks: peaks, } stream.Send(&progressPb) if err == io.EOF { break } } return nil } // GetAudioSegment returns a set of peaks for a segment of an audio part of a // MediaSet. func (c *mediaSetServiceController) GetAudioSegment(ctx context.Context, request *pbmediaset.GetAudioSegmentRequest) (*pbmediaset.GetAudioSegmentResponse, error) { ctx, cancel := context.WithTimeout(ctx, getAudioSegmentTimeout) defer cancel() id, err := uuid.Parse(request.GetId()) if err != nil { return nil, newResponseError(err) } peaks, err := c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins())) if err != nil { return nil, newResponseError(err) } peaks32 := make([]int32, len(peaks)) for i, p := range peaks { peaks32[i] = int32(p) } response := pbmediaset.GetAudioSegmentResponse{ Peaks: peaks32, } return &response, nil } func (c *mediaSetServiceController) GetVideo(request *pbmediaset.GetVideoRequest, stream pbmediaset.MediaSetService_GetVideoServer) error { // TODO: reduce timeout when already fetched from Youtube ctx, cancel := context.WithTimeout(context.Background(), getVideoTimeout) defer cancel() id, err := uuid.Parse(request.GetId()) if err != nil { return newResponseError(err) } reader, err := c.mediaSetService.GetVideo(ctx, id) if err != nil { return newResponseError(err) } for { progress, err := reader.Next() if err != nil && err != io.EOF { return newResponseError(err) } progressPb := pbmediaset.GetVideoProgress{ PercentComplete: progress.PercentComplete, Url: progress.URL, } stream.Send(&progressPb) if err == io.EOF { break } } return nil } func (c *mediaSetServiceController) GetVideoThumbnail(ctx context.Context, request *pbmediaset.GetVideoThumbnailRequest) (*pbmediaset.GetVideoThumbnailResponse, error) { id, err := uuid.Parse(request.GetId()) if err != nil { return nil, newResponseError(err) } thumbnail, err := c.mediaSetService.GetVideoThumbnail(ctx, id) if err != nil { return nil, newResponseError(err) } response := pbmediaset.GetVideoThumbnailResponse{ Image: thumbnail.Data, Width: int32(thumbnail.Width), Height: int32(thumbnail.Height), } return &response, nil } func Start(options Options) error { logger, err := buildLogger(options.Config) if err != nil { return fmt.Errorf("error building logger: %v", err) } defer logger.Sync() fetchMediaSetService := media.NewMediaSetService( options.Store, options.YoutubeClient, options.S3API, options.Config, logger, ) grpcServer, err := buildGRPCServer(options.Config, logger) if err != nil { return fmt.Errorf("error building server: %v", err) } mediaSetController := &mediaSetServiceController{mediaSetService: fetchMediaSetService, logger: logger.Sugar().Named("controller")} pbmediaset.RegisterMediaSetServiceServer(grpcServer, mediaSetController) // TODO: configure CORS grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true })) log := logger.Sugar() fileHandler := http.NotFoundHandler() if options.Config.AssetsHTTPBasePath != "" { log.With("basePath", options.Config.AssetsHTTPBasePath).Info("Configured to serve assets over HTTP") fileHandler = http.FileServer(http.Dir(options.Config.AssetsHTTPBasePath)) } httpServer := http.Server{ Addr: options.Config.BindAddr, ReadTimeout: options.Timeout, WriteTimeout: options.Timeout, Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if !grpcWebServer.IsGrpcWebRequest(r) && !grpcWebServer.IsAcceptableGrpcCorsRequest(r) { fileHandler.ServeHTTP(w, r) return } grpcWebServer.ServeHTTP(w, r) }), } log.Infof("Listening at %s", options.Config.BindAddr) return httpServer.ListenAndServe() } func buildLogger(c config.Config) (*zap.Logger, error) { if c.Environment == config.EnvProduction { return zap.NewProduction() } return zap.NewDevelopment() } func buildGRPCServer(c config.Config, logger *zap.Logger) (*grpc.Server, error) { unaryInterceptors := []grpc.UnaryServerInterceptor{ grpczap.UnaryServerInterceptor(logger), } streamInterceptors := []grpc.StreamServerInterceptor{ grpczap.StreamServerInterceptor(logger), } if c.Environment == config.EnvProduction { panicOpts := []grpcrecovery.Option{ grpcrecovery.WithRecoveryHandler(func(p interface{}) error { return newResponseError(fmt.Errorf("%v", p)) }), } unaryInterceptors = append(unaryInterceptors, grpcrecovery.UnaryServerInterceptor(panicOpts...)) streamInterceptors = append(streamInterceptors, grpcrecovery.StreamServerInterceptor(panicOpts...)) } options := []grpc.ServerOption{ grpc.StreamInterceptor(grpcmiddleware.ChainStreamServer(streamInterceptors...)), grpc.UnaryInterceptor(grpcmiddleware.ChainUnaryServer(unaryInterceptors...)), } if c.TLSCertFile != "" && c.TLSKeyFile != "" { creds, err := credentials.NewServerTLSFromFile(c.TLSCertFile, c.TLSKeyFile) if err != nil { return nil, fmt.Errorf("error building credentials: %v", err) } options = append(options, grpc.Creds(creds)) } return grpc.NewServer(options...), nil }