clipper/backend/server/server.go

312 lines
8.8 KiB
Go
Raw Normal View History

2021-09-25 17:00:19 +00:00
package server
import (
2021-10-22 19:30:09 +00:00
"context"
2021-11-01 05:28:40 +00:00
"fmt"
2021-10-29 12:52:31 +00:00
"io"
2021-10-22 19:30:09 +00:00
"net/http"
2021-09-25 17:00:19 +00:00
"time"
2021-11-22 18:26:51 +00:00
"git.netflux.io/rob/clipper/config"
pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set"
2021-10-22 19:30:09 +00:00
"git.netflux.io/rob/clipper/media"
2021-11-01 05:28:40 +00:00
"github.com/google/uuid"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
2021-11-16 06:48:30 +00:00
grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
2021-10-22 19:30:09 +00:00
"github.com/improbable-eng/grpc-web/go/grpcweb"
2021-11-16 06:48:30 +00:00
"go.uber.org/zap"
2021-10-22 19:30:09 +00:00
"google.golang.org/grpc"
2021-11-01 05:28:40 +00:00
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
2021-10-22 19:30:09 +00:00
"google.golang.org/protobuf/types/known/durationpb"
2021-09-25 17:00:19 +00:00
)
const (
// ts-proto generates code that automatically retries for a subset of gRPC
// response codes. To avoid invoking this behaviour, default to returning a
2021-11-13 18:52:49 +00:00
// Cancelled code for now.
// See https://github.com/stephenh/ts-proto/blob/459b94f5b2988d58d186461332e888c3e511603a/src/generate-grpc-web.ts#L293
// and https://github.com/stephenh/ts-proto/pull/131.
defaultResponseCode = codes.Canceled
defaultResponseMessage = "An unexpected error occurred"
)
2021-11-01 05:28:40 +00:00
2021-11-16 06:48:30 +00:00
const (
getAudioTimeout = time.Minute * 5
getAudioSegmentTimeout = time.Second * 10
getVideoTimeout = time.Minute * 5
2021-11-16 06:48:30 +00:00
)
type ResponseError struct {
err error
s string
2021-11-01 05:28:40 +00:00
}
func (r *ResponseError) Error() string {
return fmt.Sprintf("unexpected error: %v", r.err.Error())
2021-11-01 05:28:40 +00:00
}
func (r *ResponseError) Unwrap() error {
return r.err
2021-11-01 05:28:40 +00:00
}
func (r *ResponseError) GRPCStatus() *status.Status {
return status.New(defaultResponseCode, r.s)
}
func newResponseError(err error) *ResponseError {
return &ResponseError{err: err, s: defaultResponseMessage}
2021-11-01 05:28:40 +00:00
}
2021-09-25 17:00:19 +00:00
type Options struct {
2021-11-22 18:26:51 +00:00
Config config.Config
2021-10-29 12:52:31 +00:00
Timeout time.Duration
2021-11-01 05:28:40 +00:00
Store media.Store
2021-11-12 12:41:59 +00:00
YoutubeClient media.YoutubeClient
S3API media.S3API
2021-09-25 17:00:19 +00:00
}
2021-11-01 05:28:40 +00:00
// mediaSetServiceController implements gRPC controller for MediaSetService
type mediaSetServiceController struct {
pbmediaset.UnimplementedMediaSetServiceServer
2021-10-22 19:30:09 +00:00
2021-11-01 05:28:40 +00:00
mediaSetService *media.MediaSetService
logger *zap.SugaredLogger
2021-10-22 19:30:09 +00:00
}
2021-11-01 05:28:40 +00:00
// Get returns a pbMediaSet.MediaSet
func (c *mediaSetServiceController) Get(ctx context.Context, request *pbmediaset.GetRequest) (*pbmediaset.MediaSet, error) {
2021-11-01 05:28:40 +00:00
mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId())
2021-10-22 19:30:09 +00:00
if err != nil {
return nil, newResponseError(err)
2021-10-22 19:30:09 +00:00
}
result := pbmediaset.MediaSet{
2021-11-02 18:03:26 +00:00
Id: mediaSet.ID.String(),
2021-11-02 16:20:47 +00:00
YoutubeId: mediaSet.YoutubeID,
AudioChannels: int32(mediaSet.Audio.Channels),
AudioFrames: mediaSet.Audio.Frames,
AudioApproxFrames: mediaSet.Audio.ApproxFrames,
AudioSampleRate: int32(mediaSet.Audio.SampleRate),
AudioYoutubeItag: int32(mediaSet.Audio.YoutubeItag),
AudioMimeType: mediaSet.Audio.MimeType,
VideoDuration: durationpb.New(mediaSet.Video.Duration),
VideoYoutubeItag: int32(mediaSet.Video.YoutubeItag),
VideoMimeType: mediaSet.Video.MimeType,
2021-10-22 19:30:09 +00:00
}
return &result, nil
}
2021-11-16 06:48:30 +00:00
// GetAudio returns a stream of GetAudioProgress relating to the entire audio
// part of the MediaSet.
func (c *mediaSetServiceController) GetAudio(request *pbmediaset.GetAudioRequest, stream pbmediaset.MediaSetService_GetAudioServer) error {
2021-11-16 06:48:30 +00:00
// TODO: reduce timeout when fetching from S3
2021-11-01 05:28:40 +00:00
ctx, cancel := context.WithTimeout(context.Background(), getAudioTimeout)
2021-10-29 12:52:31 +00:00
defer cancel()
2021-11-01 05:28:40 +00:00
id, err := uuid.Parse(request.GetId())
if err != nil {
return newResponseError(err)
2021-11-01 05:28:40 +00:00
}
reader, err := c.mediaSetService.GetAudio(ctx, id, int(request.GetNumBins()))
2021-10-29 12:52:31 +00:00
if err != nil {
return newResponseError(err)
2021-10-29 12:52:31 +00:00
}
for {
2021-11-29 11:46:33 +00:00
progress, err := reader.Next()
if err != nil && err != io.EOF {
return newResponseError(err)
2021-10-29 12:52:31 +00:00
}
peaks := make([]int32, len(progress.Peaks))
for i, p := range progress.Peaks {
peaks[i] = int32(p)
}
progressPb := pbmediaset.GetAudioProgress{
PercentComplete: progress.PercentComplete,
Url: progress.URL,
Peaks: peaks,
2021-10-29 12:52:31 +00:00
}
stream.Send(&progressPb)
if err == io.EOF {
break
}
2021-10-29 12:52:31 +00:00
}
2021-10-22 19:30:09 +00:00
return nil
}
2021-11-16 06:48:30 +00:00
// GetAudioSegment returns a set of peaks for a segment of an audio part of a
// MediaSet.
func (c *mediaSetServiceController) GetAudioSegment(ctx context.Context, request *pbmediaset.GetAudioSegmentRequest) (*pbmediaset.GetAudioSegmentResponse, error) {
2021-11-16 06:48:30 +00:00
ctx, cancel := context.WithTimeout(ctx, getAudioSegmentTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return nil, newResponseError(err)
}
peaks, err := c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins()))
2021-11-16 06:48:30 +00:00
if err != nil {
return nil, newResponseError(err)
}
peaks32 := make([]int32, len(peaks))
for i, p := range peaks {
peaks32[i] = int32(p)
}
response := pbmediaset.GetAudioSegmentResponse{
Peaks: peaks32,
}
return &response, nil
2021-11-16 06:48:30 +00:00
}
func (c *mediaSetServiceController) GetVideo(request *pbmediaset.GetVideoRequest, stream pbmediaset.MediaSetService_GetVideoServer) error {
// TODO: reduce timeout when already fetched from Youtube
ctx, cancel := context.WithTimeout(context.Background(), getVideoTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return newResponseError(err)
}
reader, err := c.mediaSetService.GetVideo(ctx, id)
if err != nil {
return newResponseError(err)
}
for {
progress, err := reader.Next()
if err != nil && err != io.EOF {
return newResponseError(err)
}
progressPb := pbmediaset.GetVideoProgress{
PercentComplete: progress.PercentComplete,
Url: progress.URL,
}
stream.Send(&progressPb)
if err == io.EOF {
break
}
}
return nil
}
2021-11-21 19:43:40 +00:00
func (c *mediaSetServiceController) GetVideoThumbnail(ctx context.Context, request *pbmediaset.GetVideoThumbnailRequest) (*pbmediaset.GetVideoThumbnailResponse, error) {
id, err := uuid.Parse(request.GetId())
if err != nil {
return nil, newResponseError(err)
}
thumbnail, err := c.mediaSetService.GetVideoThumbnail(ctx, id)
if err != nil {
return nil, newResponseError(err)
}
response := pbmediaset.GetVideoThumbnailResponse{
Image: thumbnail.Data,
Width: int32(thumbnail.Width),
Height: int32(thumbnail.Height),
}
return &response, nil
}
2021-10-22 19:30:09 +00:00
func Start(options Options) error {
2021-11-22 18:26:51 +00:00
logger, err := buildLogger(options.Config)
if err != nil {
return fmt.Errorf("error building logger: %v", err)
}
2021-11-16 06:48:30 +00:00
defer logger.Sync()
2021-10-22 19:30:09 +00:00
2021-11-22 18:26:51 +00:00
fetchMediaSetService := media.NewMediaSetService(
options.Store,
options.YoutubeClient,
options.S3API,
options.Config,
logger,
)
grpcServer, err := buildGRPCServer(options.Config, logger)
if err != nil {
return fmt.Errorf("error building server: %v", err)
}
mediaSetController := &mediaSetServiceController{mediaSetService: fetchMediaSetService, logger: logger.Sugar().Named("controller")}
pbmediaset.RegisterMediaSetServiceServer(grpcServer, mediaSetController)
2021-10-22 19:30:09 +00:00
2021-11-13 18:52:49 +00:00
// TODO: configure CORS
2021-10-22 19:30:09 +00:00
grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true }))
2021-11-16 06:48:30 +00:00
log := logger.Sugar()
fileHandler := http.NotFoundHandler()
if options.Config.AssetsHTTPBasePath != "" {
log.With("basePath", options.Config.AssetsHTTPBasePath).Info("Configured to serve assets over HTTP")
fileHandler = http.FileServer(http.Dir(options.Config.AssetsHTTPBasePath))
}
2021-10-22 19:30:09 +00:00
httpServer := http.Server{
Addr: options.Config.BindAddr,
2021-10-22 19:30:09 +00:00
ReadTimeout: options.Timeout,
WriteTimeout: options.Timeout,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !grpcWebServer.IsGrpcWebRequest(r) && !grpcWebServer.IsAcceptableGrpcCorsRequest(r) {
fileHandler.ServeHTTP(w, r)
return
}
grpcWebServer.ServeHTTP(w, r)
}),
2021-10-22 19:30:09 +00:00
}
log.Infof("Listening at %s", options.Config.BindAddr)
2021-11-26 19:01:34 +00:00
if options.Config.TLSCertFile != "" && options.Config.TLSKeyFile != "" {
return httpServer.ListenAndServeTLS(options.Config.TLSCertFile, options.Config.TLSKeyFile)
}
2021-10-22 19:30:09 +00:00
return httpServer.ListenAndServe()
2021-09-25 17:00:19 +00:00
}
2021-11-22 18:26:51 +00:00
func buildLogger(c config.Config) (*zap.Logger, error) {
if c.Environment == config.EnvProduction {
return zap.NewProduction()
}
return zap.NewDevelopment()
}
func buildGRPCServer(c config.Config, logger *zap.Logger) (*grpc.Server, error) {
unaryInterceptors := []grpc.UnaryServerInterceptor{
grpczap.UnaryServerInterceptor(logger),
}
streamInterceptors := []grpc.StreamServerInterceptor{
grpczap.StreamServerInterceptor(logger),
}
2021-11-22 18:26:51 +00:00
if c.Environment == config.EnvProduction {
panicOpts := []grpcrecovery.Option{
grpcrecovery.WithRecoveryHandler(func(p interface{}) error {
return newResponseError(fmt.Errorf("%v", p))
}),
}
unaryInterceptors = append(unaryInterceptors, grpcrecovery.UnaryServerInterceptor(panicOpts...))
streamInterceptors = append(streamInterceptors, grpcrecovery.StreamServerInterceptor(panicOpts...))
}
2021-11-26 19:01:34 +00:00
return grpc.NewServer(
grpc.StreamInterceptor(grpcmiddleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpcmiddleware.ChainUnaryServer(unaryInterceptors...)),
2021-11-26 19:01:34 +00:00
), nil
}