package server import ( "context" "fmt" "io" "log" "net/http" "os" "time" pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set" "git.netflux.io/rob/clipper/media" "git.netflux.io/rob/clipper/youtube" "github.com/google/uuid" "github.com/improbable-eng/grpc-web/go/grpcweb" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" ) func init() { grpclog.SetLogger(log.New(os.Stdout, "server: ", log.LstdFlags)) } const ( // ts-proto generates code that automatically retries for a subset of gRPC // response codes. To avoid invoking this behaviour, default to returning a // Cancelled code. Later, // See https://github.com/stephenh/ts-proto/blob/459b94f5b2988d58d186461332e888c3e511603a/src/generate-grpc-web.ts#L293 // and https://github.com/stephenh/ts-proto/pull/131. defaultResponseCode = codes.Canceled defaultResponseMessage = "An unexpected error occurred" ) type ResponseError struct { err error s string } func (r *ResponseError) Error() string { return fmt.Sprintf("unexpected error: %v", r.err.Error()) } func (r *ResponseError) Unwrap() error { return r.err } func (r *ResponseError) GRPCStatus() *status.Status { return status.New(defaultResponseCode, r.s) } func newResponseError(err error) *ResponseError { return &ResponseError{err: err, s: defaultResponseMessage} } type Options struct { BindAddr string Timeout time.Duration Store media.Store YoutubeClient youtube.YoutubeClient S3Client media.S3Client } const ( getAudioTimeout = time.Minute * 5 ) // mediaSetServiceController implements gRPC controller for MediaSetService type mediaSetServiceController struct { pbMediaSet.UnimplementedMediaSetServiceServer mediaSetService *media.MediaSetService } // Get returns a pbMediaSet.MediaSet func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet.GetRequest) (*pbMediaSet.MediaSet, error) { mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId()) if err != nil { return nil, newResponseError(err) } result := pbMediaSet.MediaSet{ Id: mediaSet.ID.String(), YoutubeId: mediaSet.YoutubeID, AudioChannels: int32(mediaSet.Audio.Channels), AudioFrames: mediaSet.Audio.Frames, AudioApproxFrames: mediaSet.Audio.ApproxFrames, AudioSampleRate: int32(mediaSet.Audio.SampleRate), AudioYoutubeItag: int32(mediaSet.Audio.YoutubeItag), AudioMimeType: mediaSet.Audio.MimeType, VideoDuration: durationpb.New(mediaSet.Video.Duration), VideoYoutubeItag: int32(mediaSet.Video.YoutubeItag), VideoMimeType: mediaSet.Video.MimeType, } return &result, nil } // GetAudio streams the progress report of GetAudio. func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest, stream pbMediaSet.MediaSetService_GetAudioServer) error { ctx, cancel := context.WithTimeout(context.Background(), getAudioTimeout) defer cancel() id, err := uuid.Parse(request.GetId()) if err != nil { return newResponseError(err) } reader, err := c.mediaSetService.GetAudio(ctx, id, int(request.GetNumBins())) if err != nil { return newResponseError(err) } for { progress, err := reader.Read() if err != nil { if err == io.EOF { break } return newResponseError(err) } // TODO: consider using int32 throughout the backend flow to avoid this. peaks := make([]int32, len(progress.Peaks)) for i, p := range progress.Peaks { peaks[i] = int32(p) } progressPb := pbMediaSet.GetAudioProgress{ PercentCompleted: progress.PercentComplete, Peaks: peaks, } stream.Send(&progressPb) } return nil } func Start(options Options) error { grpcServer := grpc.NewServer() fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3Client) pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{mediaSetService: fetchMediaSetService}) // TODO: proper CORS support grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true })) handler := func(w http.ResponseWriter, r *http.Request) { grpcWebServer.ServeHTTP(w, r) } httpServer := http.Server{ Addr: options.BindAddr, ReadTimeout: options.Timeout, WriteTimeout: options.Timeout, Handler: http.HandlerFunc(handler), } return httpServer.ListenAndServe() }