package server import ( "context" "fmt" "io" "log" "net/http" "os" "time" pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set" "git.netflux.io/rob/clipper/media" "git.netflux.io/rob/clipper/youtube" "github.com/google/uuid" "github.com/improbable-eng/grpc-web/go/grpcweb" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "google.golang.org/protobuf/types/known/durationpb" ) type ResponseError struct { error ResponseCode codes.Code } func (r *ResponseError) Error() string { return fmt.Sprintf("An unexpected error occurred: %v (error code = %d).", r.error.Error(), r.ResponseCode) } func (r *ResponseError) Unwrap() error { return r.error } func newResponseError(err error, code codes.Code) *ResponseError { return &ResponseError{error: err, ResponseCode: code} } type Options struct { BindAddr string Timeout time.Duration Store media.Store YoutubeClient youtube.YoutubeClient S3Client media.S3Client } const ( getAudioTimeout = time.Minute * 5 ) // mediaSetServiceController implements gRPC controller for MediaSetService type mediaSetServiceController struct { pbMediaSet.UnimplementedMediaSetServiceServer mediaSetService *media.MediaSetService } // Get returns a pbMediaSet.MediaSet func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet.GetRequest) (*pbMediaSet.MediaSet, error) { mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId()) if err != nil { return nil, newResponseError(err, codes.Unknown) } result := pbMediaSet.MediaSet{ Id: mediaSet.YoutubeID, Audio: &pbMediaSet.MediaSet_Audio{ Channels: int32(mediaSet.Audio.Channels), Frames: mediaSet.Audio.Frames, ApproxFrames: mediaSet.Audio.ApproxFrames, SampleRate: int32(mediaSet.Audio.SampleRate), YoutubeItag: int32(mediaSet.Audio.YoutubeItag), MimeType: mediaSet.Audio.MimeType, }, Video: &pbMediaSet.MediaSet_Video{ Duration: durationpb.New(mediaSet.Video.Duration), YoutubeItag: int32(mediaSet.Video.YoutubeItag), MimeType: mediaSet.Video.MimeType, }, } return &result, nil } // GetAudio streams the progress report of GetAudio. func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest, stream pbMediaSet.MediaSetService_GetAudioServer) error { ctx, cancel := context.WithTimeout(context.Background(), getAudioTimeout) defer cancel() id, err := uuid.Parse(request.GetId()) if err != nil { return newResponseError(err, codes.Unknown) } reader, err := c.mediaSetService.GetAudio(ctx, id, int(request.GetNumBins())) if err != nil { return newResponseError(err, codes.Unknown) } for { progress, err := reader.Read() if err != nil { if err == io.EOF { break } return newResponseError(err, codes.Unknown) } // TODO: consider using int32 throughout the backend flow to avoid this. peaks := make([]int32, len(progress.Peaks)) for i, p := range progress.Peaks { peaks[i] = int32(p) } progressPb := pbMediaSet.GetAudioProgress{ PercentCompleted: progress.PercentComplete, Peaks: peaks, } stream.Send(&progressPb) } return nil } func Start(options Options) error { grpcServer := grpc.NewServer() fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3Client) pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{mediaSetService: fetchMediaSetService}) grpclog.SetLogger(log.New(os.Stdout, "server: ", log.LstdFlags)) // TODO: proper CORS support grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true })) handler := func(w http.ResponseWriter, r *http.Request) { grpcWebServer.ServeHTTP(w, r) } httpServer := http.Server{ Addr: options.BindAddr, ReadTimeout: options.Timeout, WriteTimeout: options.Timeout, Handler: http.HandlerFunc(handler), } return httpServer.ListenAndServe() }