package server import ( "context" "fmt" "io" "net/http" "time" pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set" "git.netflux.io/rob/clipper/media" "github.com/google/uuid" grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" "github.com/improbable-eng/grpc-web/go/grpcweb" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" ) const ( // ts-proto generates code that automatically retries for a subset of gRPC // response codes. To avoid invoking this behaviour, default to returning a // Cancelled code for now. // See https://github.com/stephenh/ts-proto/blob/459b94f5b2988d58d186461332e888c3e511603a/src/generate-grpc-web.ts#L293 // and https://github.com/stephenh/ts-proto/pull/131. defaultResponseCode = codes.Canceled defaultResponseMessage = "An unexpected error occurred" ) const ( getAudioTimeout = time.Minute * 5 getAudioSegmentTimeout = time.Second * 10 ) type ResponseError struct { err error s string } func (r *ResponseError) Error() string { return fmt.Sprintf("unexpected error: %v", r.err.Error()) } func (r *ResponseError) Unwrap() error { return r.err } func (r *ResponseError) GRPCStatus() *status.Status { return status.New(defaultResponseCode, r.s) } func newResponseError(err error) *ResponseError { return &ResponseError{err: err, s: defaultResponseMessage} } type Options struct { BindAddr string Timeout time.Duration Store media.Store YoutubeClient media.YoutubeClient S3Client media.S3Client } // mediaSetServiceController implements gRPC controller for MediaSetService type mediaSetServiceController struct { pbMediaSet.UnimplementedMediaSetServiceServer mediaSetService *media.MediaSetService } // Get returns a pbMediaSet.MediaSet func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet.GetRequest) (*pbMediaSet.MediaSet, error) { mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId()) if err != nil { return nil, newResponseError(err) } result := pbMediaSet.MediaSet{ Id: mediaSet.ID.String(), YoutubeId: mediaSet.YoutubeID, AudioChannels: int32(mediaSet.Audio.Channels), AudioFrames: mediaSet.Audio.Frames, AudioApproxFrames: mediaSet.Audio.ApproxFrames, AudioSampleRate: int32(mediaSet.Audio.SampleRate), AudioYoutubeItag: int32(mediaSet.Audio.YoutubeItag), AudioMimeType: mediaSet.Audio.MimeType, VideoDuration: durationpb.New(mediaSet.Video.Duration), VideoYoutubeItag: int32(mediaSet.Video.YoutubeItag), VideoMimeType: mediaSet.Video.MimeType, } return &result, nil } // GetAudio returns a stream of GetAudioProgress relating to the entire audio // part of the MediaSet. func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest, stream pbMediaSet.MediaSetService_GetAudioServer) error { // TODO: reduce timeout when fetching from S3 ctx, cancel := context.WithTimeout(context.Background(), getAudioTimeout) defer cancel() id, err := uuid.Parse(request.GetId()) if err != nil { return newResponseError(err) } reader, err := c.mediaSetService.GetAudio(ctx, id, int(request.GetNumBins())) if err != nil { return newResponseError(err) } for { progress, err := reader.Read() if err != nil { if err == io.EOF { break } return newResponseError(err) } // TODO: consider using int32 throughout the backend flow to avoid this. peaks := make([]int32, len(progress.Peaks)) for i, p := range progress.Peaks { peaks[i] = int32(p) } progressPb := pbMediaSet.GetAudioProgress{ PercentCompleted: progress.PercentComplete, Peaks: peaks, } stream.Send(&progressPb) } return nil } // GetAudioSegment returns a set of peaks for a segment of an audio part of a // MediaSet. func (c *mediaSetServiceController) GetAudioSegment(ctx context.Context, request *pbMediaSet.GetAudioSegmentRequest) (*pbMediaSet.GetAudioSegmentResponse, error) { ctx, cancel := context.WithTimeout(ctx, getAudioSegmentTimeout) defer cancel() id, err := uuid.Parse(request.GetId()) if err != nil { return nil, newResponseError(err) } _, err = c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins())) if err != nil { return nil, newResponseError(err) } return nil, nil } func Start(options Options) error { logger, _ := zap.NewDevelopment() defer logger.Sync() grpcServer := grpc.NewServer( grpc.UnaryInterceptor(grpczap.UnaryServerInterceptor(logger)), ) fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3Client, logger) pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{mediaSetService: fetchMediaSetService}) // TODO: configure CORS grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true })) handler := func(w http.ResponseWriter, r *http.Request) { grpcWebServer.ServeHTTP(w, r) } httpServer := http.Server{ Addr: options.BindAddr, ReadTimeout: options.Timeout, WriteTimeout: options.Timeout, Handler: http.HandlerFunc(handler), } return httpServer.ListenAndServe() }