package server import ( "context" "fmt" "net/http" "os/exec" "time" "git.netflux.io/rob/clipper/config" pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set" "git.netflux.io/rob/clipper/media" "github.com/google/uuid" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "github.com/improbable-eng/grpc-web/go/grpcweb" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) const ( // ts-proto generates code that automatically retries for a subset of gRPC // response codes. To avoid invoking this behaviour, default to returning a // Cancelled code for now. // See https://github.com/stephenh/ts-proto/blob/459b94f5b2988d58d186461332e888c3e511603a/src/generate-grpc-web.ts#L293 // and https://github.com/stephenh/ts-proto/pull/131. defaultResponseCode = codes.Canceled defaultResponseMessage = "An unexpected error occurred" ) const ( getPeaksTimeout = time.Minute * 5 getPeaksForSegmentTimeout = time.Second * 10 getAudioSegmentTimeout = time.Minute * 2 getVideoTimeout = time.Minute * 5 ) type MediaSetService interface { Get(context.Context, string) (*media.MediaSet, error) GetAudioSegment(context.Context, uuid.UUID, int64, int64, media.AudioFormat) (media.AudioSegmentStream, error) GetPeaks(context.Context, uuid.UUID, int) (media.GetPeaksProgressReader, error) GetPeaksForSegment(context.Context, uuid.UUID, int64, int64, int) ([]int16, error) GetVideo(context.Context, uuid.UUID) (media.GetVideoProgressReader, error) GetVideoThumbnail(context.Context, uuid.UUID) (media.VideoThumbnail, error) } type ResponseError struct { err error s string } func (r *ResponseError) Error() string { return fmt.Sprintf("unexpected error: %v", r.err.Error()) } func (r *ResponseError) Unwrap() error { return r.err } func (r *ResponseError) GRPCStatus() *status.Status { return status.New(defaultResponseCode, r.s) } func newResponseError(err error) *ResponseError { return &ResponseError{err: err, s: defaultResponseMessage} } type Options struct { Config config.Config Timeout time.Duration Store media.Store YoutubeClient media.YoutubeClient FileStore media.FileStore WorkerPool *media.WorkerPool Logger *zap.Logger } func Start(options Options) error { conf := options.Config mediaSetService := media.NewMediaSetService( options.Store, options.YoutubeClient, options.FileStore, exec.CommandContext, options.WorkerPool, conf, options.Logger.Sugar().Named("mediaSetService"), ) grpcServer, err := buildGRPCServer(conf, options.Logger) if err != nil { return fmt.Errorf("error building server: %v", err) } mediaSetController := &mediaSetServiceController{mediaSetService: mediaSetService, logger: options.Logger.Sugar().Named("controller")} pbmediaset.RegisterMediaSetServiceServer(grpcServer, mediaSetController) // TODO: convert CORSAllowedOrigins to a map[string]struct{} originChecker := func(origin string) bool { for _, s := range conf.CORSAllowedOrigins { if origin == s { return true } } return false } grpcHandler := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(originChecker)) httpHandler := newHTTPHandler(grpcHandler, mediaSetService, conf, options.Logger.Sugar().Named("httpHandler")) httpServer := http.Server{ Addr: conf.BindAddr, ReadTimeout: options.Timeout, WriteTimeout: options.Timeout, Handler: httpHandler, } log := options.Logger.Sugar() log.Infof("Listening at %s", options.Config.BindAddr) if conf.TLSCertFile != "" && conf.TLSKeyFile != "" { return httpServer.ListenAndServeTLS(conf.TLSCertFile, conf.TLSKeyFile) } return httpServer.ListenAndServe() } func buildGRPCServer(c config.Config, logger *zap.Logger) (*grpc.Server, error) { unaryInterceptors := []grpc.UnaryServerInterceptor{ grpczap.UnaryServerInterceptor(logger), } streamInterceptors := []grpc.StreamServerInterceptor{ grpczap.StreamServerInterceptor(logger), } if c.Environment == config.EnvProduction { panicOpts := []grpcrecovery.Option{ grpcrecovery.WithRecoveryHandler(func(p interface{}) error { return newResponseError(fmt.Errorf("%v", p)) }), } unaryInterceptors = append(unaryInterceptors, grpcrecovery.UnaryServerInterceptor(panicOpts...)) streamInterceptors = append(streamInterceptors, grpcrecovery.StreamServerInterceptor(panicOpts...)) } return grpc.NewServer( grpc.StreamInterceptor(grpcmiddleware.ChainStreamServer(streamInterceptors...)), grpc.UnaryInterceptor(grpcmiddleware.ChainUnaryServer(unaryInterceptors...)), ), nil }