clipper/backend/server/server.go

153 lines
4.7 KiB
Go
Raw Normal View History

2021-09-25 17:00:19 +00:00
package server
import (
"context"
2021-11-01 05:28:40 +00:00
"fmt"
2021-10-22 19:30:09 +00:00
"net/http"
2021-12-29 15:38:25 +00:00
"os/exec"
2021-09-25 17:00:19 +00:00
"time"
2021-11-22 18:26:51 +00:00
"git.netflux.io/rob/clipper/config"
pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set"
2021-10-22 19:30:09 +00:00
"git.netflux.io/rob/clipper/media"
"github.com/google/uuid"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
2021-11-16 06:48:30 +00:00
grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
2021-10-22 19:30:09 +00:00
"github.com/improbable-eng/grpc-web/go/grpcweb"
2021-11-16 06:48:30 +00:00
"go.uber.org/zap"
2021-10-22 19:30:09 +00:00
"google.golang.org/grpc"
2021-11-01 05:28:40 +00:00
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
2021-09-25 17:00:19 +00:00
)
const (
// ts-proto generates code that automatically retries for a subset of gRPC
// response codes. To avoid invoking this behaviour, default to returning a
2021-11-13 18:52:49 +00:00
// Cancelled code for now.
// See https://github.com/stephenh/ts-proto/blob/459b94f5b2988d58d186461332e888c3e511603a/src/generate-grpc-web.ts#L293
// and https://github.com/stephenh/ts-proto/pull/131.
defaultResponseCode = codes.Canceled
defaultResponseMessage = "An unexpected error occurred"
)
2021-11-01 05:28:40 +00:00
2021-11-16 06:48:30 +00:00
const (
getPeaksTimeout = time.Minute * 5
getPeaksForSegmentTimeout = time.Second * 10
2021-12-29 15:38:25 +00:00
getAudioSegmentTimeout = time.Minute * 2
getVideoTimeout = time.Minute * 5
2021-11-16 06:48:30 +00:00
)
type MediaSetService interface {
Get(context.Context, string) (*media.MediaSet, error)
GetAudioSegment(context.Context, uuid.UUID, int64, int64, media.AudioFormat) (media.AudioSegmentStream, error)
GetPeaks(context.Context, uuid.UUID, int) (media.GetPeaksProgressReader, error)
GetPeaksForSegment(context.Context, uuid.UUID, int64, int64, int) ([]int16, error)
GetVideo(context.Context, uuid.UUID) (media.GetVideoProgressReader, error)
GetVideoThumbnail(context.Context, uuid.UUID) (media.VideoThumbnail, error)
}
type ResponseError struct {
err error
s string
2021-11-01 05:28:40 +00:00
}
func (r *ResponseError) Error() string {
return fmt.Sprintf("unexpected error: %v", r.err.Error())
2021-11-01 05:28:40 +00:00
}
func (r *ResponseError) Unwrap() error {
return r.err
2021-11-01 05:28:40 +00:00
}
func (r *ResponseError) GRPCStatus() *status.Status {
return status.New(defaultResponseCode, r.s)
}
func newResponseError(err error) *ResponseError {
return &ResponseError{err: err, s: defaultResponseMessage}
2021-11-01 05:28:40 +00:00
}
2021-09-25 17:00:19 +00:00
type Options struct {
2021-11-22 18:26:51 +00:00
Config config.Config
2021-10-29 12:52:31 +00:00
Timeout time.Duration
2021-11-01 05:28:40 +00:00
Store media.Store
2021-11-12 12:41:59 +00:00
YoutubeClient media.YoutubeClient
2021-12-07 19:58:11 +00:00
FileStore media.FileStore
2022-01-05 18:49:21 +00:00
WorkerPool *media.WorkerPool
2021-12-07 19:58:11 +00:00
Logger *zap.Logger
2021-09-25 17:00:19 +00:00
}
2021-10-22 19:30:09 +00:00
func Start(options Options) error {
conf := options.Config
mediaSetService := media.NewMediaSetService(
2021-11-22 18:26:51 +00:00
options.Store,
options.YoutubeClient,
2021-12-07 19:58:11 +00:00
options.FileStore,
2021-12-29 15:38:25 +00:00
exec.CommandContext,
2022-01-05 18:49:21 +00:00
options.WorkerPool,
conf,
2021-12-07 19:58:11 +00:00
options.Logger.Sugar().Named("mediaSetService"),
2021-11-22 18:26:51 +00:00
)
grpcServer, err := buildGRPCServer(conf, options.Logger)
if err != nil {
return fmt.Errorf("error building server: %v", err)
}
mediaSetController := &mediaSetServiceController{mediaSetService: mediaSetService, logger: options.Logger.Sugar().Named("controller")}
pbmediaset.RegisterMediaSetServiceServer(grpcServer, mediaSetController)
2021-10-22 19:30:09 +00:00
2022-01-27 19:40:33 +00:00
// TODO: convert CORSAllowedOrigins to a map[string]struct{}
originChecker := func(origin string) bool {
for _, s := range conf.CORSAllowedOrigins {
if origin == s {
return true
}
}
return false
}
grpcHandler := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(originChecker))
httpHandler := newHTTPHandler(grpcHandler, mediaSetService, conf, options.Logger.Sugar().Named("httpHandler"))
2021-10-22 19:30:09 +00:00
httpServer := http.Server{
Addr: conf.BindAddr,
2021-10-22 19:30:09 +00:00
ReadTimeout: options.Timeout,
WriteTimeout: options.Timeout,
Handler: httpHandler,
2021-10-22 19:30:09 +00:00
}
log := options.Logger.Sugar()
log.Infof("Listening at %s", options.Config.BindAddr)
if conf.TLSCertFile != "" && conf.TLSKeyFile != "" {
return httpServer.ListenAndServeTLS(conf.TLSCertFile, conf.TLSKeyFile)
2021-11-26 19:01:34 +00:00
}
2021-10-22 19:30:09 +00:00
return httpServer.ListenAndServe()
2021-09-25 17:00:19 +00:00
}
func buildGRPCServer(c config.Config, logger *zap.Logger) (*grpc.Server, error) {
unaryInterceptors := []grpc.UnaryServerInterceptor{
grpczap.UnaryServerInterceptor(logger),
}
streamInterceptors := []grpc.StreamServerInterceptor{
grpczap.StreamServerInterceptor(logger),
}
2021-11-22 18:26:51 +00:00
if c.Environment == config.EnvProduction {
panicOpts := []grpcrecovery.Option{
grpcrecovery.WithRecoveryHandler(func(p interface{}) error {
return newResponseError(fmt.Errorf("%v", p))
}),
}
unaryInterceptors = append(unaryInterceptors, grpcrecovery.UnaryServerInterceptor(panicOpts...))
streamInterceptors = append(streamInterceptors, grpcrecovery.StreamServerInterceptor(panicOpts...))
}
2021-11-26 19:01:34 +00:00
return grpc.NewServer(
grpc.StreamInterceptor(grpcmiddleware.ChainStreamServer(streamInterceptors...)),
grpc.UnaryInterceptor(grpcmiddleware.ChainUnaryServer(unaryInterceptors...)),
2021-11-26 19:01:34 +00:00
), nil
}