From c7d55413796ccb5d9f1f2b93638d985a11718558 Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Mon, 10 Jan 2022 18:45:10 +0100 Subject: [PATCH] Refactor response handling. - Add FileSystem implementation to handle HTTP index files but constrain directory listings - Refactor server implementation into a dedicated handler struct - Add test coverage --- backend/config/config.go | 14 +- backend/filestore/fs.go | 17 +- backend/filestore/fs_test.go | 17 +- backend/generated/mocks/MediaSetService.go | 153 +++++++++++ backend/go.mod | 1 + backend/go.sum | 2 + backend/server/gprc_handler.go | 223 ++++++++++++++++ backend/server/handler.go | 93 +++++++ backend/server/handler_test.go | 135 ++++++++++ backend/server/server.go | 247 +----------------- .../server/testdata/http/assets/css/style.css | 1 + backend/server/testdata/http/assets/foo.js | 1 + .../server/testdata/http/assets/index.html | 1 + .../server/testdata/http/filestore/bar.mp4 | 1 + 14 files changed, 658 insertions(+), 248 deletions(-) create mode 100644 backend/generated/mocks/MediaSetService.go create mode 100644 backend/server/gprc_handler.go create mode 100644 backend/server/handler.go create mode 100644 backend/server/handler_test.go create mode 100644 backend/server/testdata/http/assets/css/style.css create mode 100644 backend/server/testdata/http/assets/foo.js create mode 100644 backend/server/testdata/http/assets/index.html create mode 100644 backend/server/testdata/http/filestore/bar.mp4 diff --git a/backend/config/config.go b/backend/config/config.go index 0e4088d..43e43ae 100644 --- a/backend/config/config.go +++ b/backend/config/config.go @@ -3,9 +3,11 @@ package config import ( "errors" "fmt" + "net/url" "os" "runtime" "strconv" + "strings" ) type Environment int @@ -30,7 +32,7 @@ type Config struct { DatabaseURL string FileStore FileStore FileStoreHTTPRoot string - FileStoreHTTPBaseURL string + FileStoreHTTPBaseURL *url.URL AWSAccessKeyID string AWSSecretAccessKey string AWSRegion string @@ -80,9 +82,13 @@ func NewFromEnv() (Config, error) { return Config{}, fmt.Errorf("invalid FILE_STORE value: %s", fileStoreString) } - fileStoreHTTPBaseURL := os.Getenv("FILE_STORE_HTTP_BASE_URL") - if fileStoreHTTPBaseURL == "" { - fileStoreHTTPBaseURL = "/" + fileStoreHTTPBaseURLString := os.Getenv("FILE_STORE_HTTP_BASE_URL") + if !strings.HasSuffix(fileStoreHTTPBaseURLString, "/") { + fileStoreHTTPBaseURLString += "/" + } + fileStoreHTTPBaseURL, err := url.Parse(fileStoreHTTPBaseURLString) + if err != nil { + return Config{}, fmt.Errorf("invalid FILE_STORE_HTTP_BASE_URL: %v", err) } var awsAccessKeyID, awsSecretAccessKey, awsRegion, s3Bucket, fileStoreHTTPRoot string diff --git a/backend/filestore/fs.go b/backend/filestore/fs.go index 337988f..7ee09a6 100644 --- a/backend/filestore/fs.go +++ b/backend/filestore/fs.go @@ -4,12 +4,20 @@ import ( "context" "fmt" "io" + "net/http" "net/url" "os" "path/filepath" "strings" ) +// NewFileSystemStoreHTTPMiddleware returns an HTTP middleware which strips the +// base URL path prefix from incoming paths, suitable for passing to an +// appropriately-configured FileSystemStore. +func NewFileSystemStoreHTTPMiddleware(baseURL *url.URL, next http.Handler) http.Handler { + return http.StripPrefix(baseURL.Path, next) +} + // FileSystemStore is a file store that stores files on the local filesystem. // It is currently intended for usage in a development environment. type FileSystemStore struct { @@ -21,15 +29,12 @@ type FileSystemStore struct { // which is the storage location on the local file system for stored objects, // and a baseURL which is a URL which should be configured to serve the stored // files over HTTP. -func NewFileSystemStore(rootPath string, baseURL string) (*FileSystemStore, error) { - url, err := url.Parse(baseURL) - if err != nil { - return nil, fmt.Errorf("error parsing URL: %v", err) - } +func NewFileSystemStore(rootPath string, baseURL *url.URL) (*FileSystemStore, error) { + url := *baseURL if !strings.HasSuffix(url.Path, "/") { url.Path += "/" } - return &FileSystemStore{rootPath: rootPath, baseURL: url}, nil + return &FileSystemStore{rootPath: rootPath, baseURL: &url}, nil } // GetObject retrieves an object from the local filesystem. diff --git a/backend/filestore/fs_test.go b/backend/filestore/fs_test.go index 14f248c..c1b3f57 100644 --- a/backend/filestore/fs_test.go +++ b/backend/filestore/fs_test.go @@ -3,6 +3,7 @@ package filestore_test import ( "context" "io/ioutil" + "net/url" "os" "path" "strings" @@ -14,7 +15,9 @@ import ( ) func TestFileStoreGetObject(t *testing.T) { - store, err := filestore.NewFileSystemStore("testdata/", "/") + baseURL, err := url.Parse("/") + require.NoError(t, err) + store, err := filestore.NewFileSystemStore("testdata/", baseURL) require.NoError(t, err) reader, err := store.GetObject(context.Background(), "file.txt") require.NoError(t, err) @@ -60,7 +63,9 @@ func TestFileStoreGetObjectWithRange(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - store, err := filestore.NewFileSystemStore("testdata/", "/") + baseURL, err := url.Parse("/") + require.NoError(t, err) + store, err := filestore.NewFileSystemStore("testdata/", baseURL) require.NoError(t, err) reader, err := store.GetObjectWithRange(context.Background(), "file.txt", tc.start, tc.end) @@ -113,7 +118,9 @@ func TestFileStoreGetURL(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - store, err := filestore.NewFileSystemStore("testdata/", tc.baseURL) + baseURL, err := url.Parse(tc.baseURL) + require.NoError(t, err) + store, err := filestore.NewFileSystemStore("testdata/", baseURL) require.NoError(t, err) url, err := store.GetURL(context.Background(), tc.key) @@ -149,7 +156,9 @@ func TestFileStorePutObject(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - store, err := filestore.NewFileSystemStore(rootPath, "/") + baseURL, err := url.Parse("/") + require.NoError(t, err) + store, err := filestore.NewFileSystemStore(rootPath, baseURL) require.NoError(t, err) n, err := store.PutObject(context.Background(), tc.key, strings.NewReader(tc.content), "text/plain") diff --git a/backend/generated/mocks/MediaSetService.go b/backend/generated/mocks/MediaSetService.go new file mode 100644 index 0000000..5368d10 --- /dev/null +++ b/backend/generated/mocks/MediaSetService.go @@ -0,0 +1,153 @@ +// Code generated by mockery v2.9.4. DO NOT EDIT. + +package mocks + +import ( + context "context" + + media "git.netflux.io/rob/clipper/media" + mock "github.com/stretchr/testify/mock" + + uuid "github.com/google/uuid" +) + +// MediaSetService is an autogenerated mock type for the MediaSetService type +type MediaSetService struct { + mock.Mock +} + +// Get provides a mock function with given fields: _a0, _a1 +func (_m *MediaSetService) Get(_a0 context.Context, _a1 string) (*media.MediaSet, error) { + ret := _m.Called(_a0, _a1) + + var r0 *media.MediaSet + if rf, ok := ret.Get(0).(func(context.Context, string) *media.MediaSet); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*media.MediaSet) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetAudioSegment provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 +func (_m *MediaSetService) GetAudioSegment(_a0 context.Context, _a1 uuid.UUID, _a2 int64, _a3 int64, _a4 media.AudioFormat) (*media.AudioSegmentStream, error) { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + + var r0 *media.AudioSegmentStream + if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, int64, int64, media.AudioFormat) *media.AudioSegmentStream); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*media.AudioSegmentStream) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, uuid.UUID, int64, int64, media.AudioFormat) error); ok { + r1 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetPeaks provides a mock function with given fields: _a0, _a1, _a2 +func (_m *MediaSetService) GetPeaks(_a0 context.Context, _a1 uuid.UUID, _a2 int) (media.GetPeaksProgressReader, error) { + ret := _m.Called(_a0, _a1, _a2) + + var r0 media.GetPeaksProgressReader + if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, int) media.GetPeaksProgressReader); ok { + r0 = rf(_a0, _a1, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(media.GetPeaksProgressReader) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, uuid.UUID, int) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetPeaksForSegment provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 +func (_m *MediaSetService) GetPeaksForSegment(_a0 context.Context, _a1 uuid.UUID, _a2 int64, _a3 int64, _a4 int) ([]int16, error) { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + + var r0 []int16 + if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, int64, int64, int) []int16); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int16) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, uuid.UUID, int64, int64, int) error); ok { + r1 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetVideo provides a mock function with given fields: _a0, _a1 +func (_m *MediaSetService) GetVideo(_a0 context.Context, _a1 uuid.UUID) (media.GetVideoProgressReader, error) { + ret := _m.Called(_a0, _a1) + + var r0 media.GetVideoProgressReader + if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID) media.GetVideoProgressReader); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(media.GetVideoProgressReader) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, uuid.UUID) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetVideoThumbnail provides a mock function with given fields: _a0, _a1 +func (_m *MediaSetService) GetVideoThumbnail(_a0 context.Context, _a1 uuid.UUID) (media.VideoThumbnail, error) { + ret := _m.Called(_a0, _a1) + + var r0 media.VideoThumbnail + if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID) media.VideoThumbnail); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Get(0).(media.VideoThumbnail) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, uuid.UUID) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/backend/go.mod b/backend/go.mod index cca85dc..8f82040 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -9,6 +9,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.22.0 github.com/aws/smithy-go v1.9.0 github.com/google/uuid v1.3.0 + github.com/gorilla/mux v1.8.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/improbable-eng/grpc-web v0.15.0 github.com/jackc/pgconn v1.10.1 diff --git a/backend/go.sum b/backend/go.sum index da080d1..2f7315e 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -201,6 +201,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/backend/server/gprc_handler.go b/backend/server/gprc_handler.go new file mode 100644 index 0000000..0f60d6f --- /dev/null +++ b/backend/server/gprc_handler.go @@ -0,0 +1,223 @@ +package server + +//go:generate mockery --recursive --name MediaSetService --output ../generated/mocks + +import ( + "context" + "errors" + "io" + + pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set" + "git.netflux.io/rob/clipper/media" + "github.com/google/uuid" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/durationpb" +) + +type MediaSetService interface { + Get(context.Context, string) (*media.MediaSet, error) + GetAudioSegment(context.Context, uuid.UUID, int64, int64, media.AudioFormat) (*media.AudioSegmentStream, error) + GetPeaks(context.Context, uuid.UUID, int) (media.GetPeaksProgressReader, error) + GetPeaksForSegment(context.Context, uuid.UUID, int64, int64, int) ([]int16, error) + GetVideo(context.Context, uuid.UUID) (media.GetVideoProgressReader, error) + GetVideoThumbnail(context.Context, uuid.UUID) (media.VideoThumbnail, error) +} + +// mediaSetServiceController implements gRPC controller for MediaSetService +type mediaSetServiceController struct { + pbmediaset.UnimplementedMediaSetServiceServer + + mediaSetService MediaSetService + logger *zap.SugaredLogger +} + +// Get returns a pbMediaSet.MediaSet +func (c *mediaSetServiceController) Get(ctx context.Context, request *pbmediaset.GetRequest) (*pbmediaset.MediaSet, error) { + mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId()) + if err != nil { + return nil, newResponseError(err) + } + + result := pbmediaset.MediaSet{ + Id: mediaSet.ID.String(), + YoutubeId: mediaSet.YoutubeID, + AudioChannels: int32(mediaSet.Audio.Channels), + AudioFrames: mediaSet.Audio.Frames, + AudioApproxFrames: mediaSet.Audio.ApproxFrames, + AudioSampleRate: int32(mediaSet.Audio.SampleRate), + AudioYoutubeItag: int32(mediaSet.Audio.YoutubeItag), + AudioMimeType: mediaSet.Audio.MimeType, + VideoDuration: durationpb.New(mediaSet.Video.Duration), + VideoYoutubeItag: int32(mediaSet.Video.YoutubeItag), + VideoMimeType: mediaSet.Video.MimeType, + } + + return &result, nil +} + +// GetPeaks returns a stream of GetPeaksProgress relating to the entire audio +// part of the MediaSet. +func (c *mediaSetServiceController) GetPeaks(request *pbmediaset.GetPeaksRequest, stream pbmediaset.MediaSetService_GetPeaksServer) error { + // TODO: reduce timeout when fetching from S3 + ctx, cancel := context.WithTimeout(context.Background(), getPeaksTimeout) + defer cancel() + + id, err := uuid.Parse(request.GetId()) + if err != nil { + return newResponseError(err) + } + + reader, err := c.mediaSetService.GetPeaks(ctx, id, int(request.GetNumBins())) + if err != nil { + return newResponseError(err) + } + + for { + progress, err := reader.Next() + if err != nil && err != io.EOF { + return newResponseError(err) + } + + peaks := make([]int32, len(progress.Peaks)) + for i, p := range progress.Peaks { + peaks[i] = int32(p) + } + + progressPb := pbmediaset.GetPeaksProgress{ + PercentComplete: progress.PercentComplete, + Url: progress.URL, + Peaks: peaks, + } + stream.Send(&progressPb) + + if err == io.EOF { + break + } + } + + return nil +} + +// GetPeaksForSegment returns a set of peaks for a segment of an audio part of +// a MediaSet. +func (c *mediaSetServiceController) GetPeaksForSegment(ctx context.Context, request *pbmediaset.GetPeaksForSegmentRequest) (*pbmediaset.GetPeaksForSegmentResponse, error) { + ctx, cancel := context.WithTimeout(ctx, getPeaksForSegmentTimeout) + defer cancel() + + id, err := uuid.Parse(request.GetId()) + if err != nil { + return nil, newResponseError(err) + } + + peaks, err := c.mediaSetService.GetPeaksForSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins())) + if err != nil { + return nil, newResponseError(err) + } + + peaks32 := make([]int32, len(peaks)) + for i, p := range peaks { + peaks32[i] = int32(p) + } + + return &pbmediaset.GetPeaksForSegmentResponse{Peaks: peaks32}, nil +} + +func (c *mediaSetServiceController) GetAudioSegment(request *pbmediaset.GetAudioSegmentRequest, outStream pbmediaset.MediaSetService_GetAudioSegmentServer) error { + ctx, cancel := context.WithTimeout(context.Background(), getPeaksForSegmentTimeout) + defer cancel() + + id, err := uuid.Parse(request.GetId()) + if err != nil { + return newResponseError(err) + } + + var format media.AudioFormat + switch request.Format { + case pbmediaset.AudioFormat_MP3: + format = media.AudioFormatMP3 + case pbmediaset.AudioFormat_WAV: + format = media.AudioFormatWAV + default: + return newResponseError(errors.New("unknown format")) + } + + stream, err := c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, format) + if err != nil { + return newResponseError(err) + } + + for { + progress, err := stream.Next(ctx) + if err != nil && err != io.EOF { + return newResponseError(err) + } + + progressPb := pbmediaset.GetAudioSegmentProgress{ + PercentComplete: progress.PercentComplete, + AudioData: progress.Data, + } + + outStream.Send(&progressPb) + + if err == io.EOF { + break + } + } + + return nil +} + +func (c *mediaSetServiceController) GetVideo(request *pbmediaset.GetVideoRequest, stream pbmediaset.MediaSetService_GetVideoServer) error { + // TODO: reduce timeout when already fetched from Youtube + ctx, cancel := context.WithTimeout(context.Background(), getVideoTimeout) + defer cancel() + + id, err := uuid.Parse(request.GetId()) + if err != nil { + return newResponseError(err) + } + + reader, err := c.mediaSetService.GetVideo(ctx, id) + if err != nil { + return newResponseError(err) + } + + for { + progress, err := reader.Next() + if err != nil && err != io.EOF { + return newResponseError(err) + } + + progressPb := pbmediaset.GetVideoProgress{ + PercentComplete: progress.PercentComplete, + Url: progress.URL, + } + stream.Send(&progressPb) + + if err == io.EOF { + break + } + } + + return nil +} + +func (c *mediaSetServiceController) GetVideoThumbnail(ctx context.Context, request *pbmediaset.GetVideoThumbnailRequest) (*pbmediaset.GetVideoThumbnailResponse, error) { + id, err := uuid.Parse(request.GetId()) + if err != nil { + return nil, newResponseError(err) + } + + thumbnail, err := c.mediaSetService.GetVideoThumbnail(ctx, id) + if err != nil { + return nil, newResponseError(err) + } + + response := pbmediaset.GetVideoThumbnailResponse{ + Image: thumbnail.Data, + Width: int32(thumbnail.Width), + Height: int32(thumbnail.Height), + } + + return &response, nil +} diff --git a/backend/server/handler.go b/backend/server/handler.go new file mode 100644 index 0000000..d5a6abc --- /dev/null +++ b/backend/server/handler.go @@ -0,0 +1,93 @@ +package server + +import ( + "net/http" + "path/filepath" + + "git.netflux.io/rob/clipper/config" + "git.netflux.io/rob/clipper/filestore" + "github.com/gorilla/mux" + "github.com/improbable-eng/grpc-web/go/grpcweb" + "go.uber.org/zap" +) + +type httpHandler struct { + grpcHandler *grpcweb.WrappedGrpcServer + router http.Handler + mediaSetService MediaSetService + logger *zap.SugaredLogger +} + +func newHTTPHandler(grpcHandler *grpcweb.WrappedGrpcServer, mediaSetService MediaSetService, c config.Config, logger *zap.SugaredLogger) *httpHandler { + fileStoreHandler := http.NotFoundHandler() + if c.FileStoreHTTPRoot != "" { + logger.With("root", c.FileStoreHTTPRoot, "baseURL", c.FileStoreHTTPBaseURL.String()).Info("Configured to serve file store over HTTP") + fileStoreHandler = http.FileServer(&indexedFileSystem{http.Dir(c.FileStoreHTTPRoot)}) + } + + assetsHandler := http.NotFoundHandler() + if c.AssetsHTTPRoot != "" { + logger.With("root", c.AssetsHTTPRoot).Info("Configured to serve assets over HTTP") + assetsHandler = http.FileServer(&indexedFileSystem{http.Dir(c.AssetsHTTPRoot)}) + } + + // If FileSystemStore AND assets serving are both enabled, + // FileStoreHTTPBaseURL *must* be set to a value other than "/" to avoid + // clobbering the assets routes. + router := mux.NewRouter() + if c.FileStore == config.FileSystemStore { + router. + Methods("GET"). + PathPrefix(c.FileStoreHTTPBaseURL.Path). + Handler(filestore.NewFileSystemStoreHTTPMiddleware(c.FileStoreHTTPBaseURL, fileStoreHandler)) + } + router. + Methods("GET"). + Handler(assetsHandler) + + return &httpHandler{ + grpcHandler: grpcHandler, + router: router, + mediaSetService: mediaSetService, + logger: logger, + } +} + +func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !h.grpcHandler.IsGrpcWebRequest(r) && !h.grpcHandler.IsAcceptableGrpcCorsRequest(r) { + h.router.ServeHTTP(w, r) + return + } + h.grpcHandler.ServeHTTP(w, r) +} + +// indexedFileSystem is an HTTP file system which handles index.html files if +// they exist, but does not serve directory listings. +// +// Ref: https://www.alexedwards.net/blog/disable-http-fileserver-directory-listings +type indexedFileSystem struct { + httpFS http.FileSystem +} + +func (ifs *indexedFileSystem) Open(path string) (http.File, error) { + f, err := ifs.httpFS.Open(path) + if err != nil { + return nil, err + } + + s, err := f.Stat() + if err != nil { + return nil, err + } + if !s.IsDir() { + return f, nil + } + + index := filepath.Join(path, "index.html") + if _, err := ifs.httpFS.Open(index); err != nil { + _ = f.Close() // ignore error + return nil, err + } + + return f, nil +} diff --git a/backend/server/handler_test.go b/backend/server/handler_test.go new file mode 100644 index 0000000..6faf910 --- /dev/null +++ b/backend/server/handler_test.go @@ -0,0 +1,135 @@ +package server + +import ( + "io" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "git.netflux.io/rob/clipper/config" + "git.netflux.io/rob/clipper/generated/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestHandler(t *testing.T) { + testCases := []struct { + name string + path string + method string + config config.Config + wantStatus int + wantBody string + }{ + { + name: "assets disabled, file system store disabled, GET /", + path: "/", + method: http.MethodGet, + config: config.Config{FileStore: config.S3Store}, + wantStatus: http.StatusNotFound, + }, + { + name: "assets disabled, file system store disabled, GET /foo.js", + path: "/foo.js", + method: http.MethodGet, + config: config.Config{FileStore: config.S3Store}, + wantStatus: http.StatusNotFound, + }, + { + name: "assets enabled, file system store disabled, index.html exists, GET /", + path: "/", + method: http.MethodGet, + config: config.Config{FileStore: config.S3Store, AssetsHTTPRoot: "testdata/http/assets"}, + wantStatus: http.StatusOK, + wantBody: "index", + }, + { + name: "assets enabled, file system store disabled, index.html does not exist, GET /css/", + path: "/css/", + method: http.MethodGet, + config: config.Config{FileStore: config.S3Store, AssetsHTTPRoot: "testdata/http/assets"}, + wantStatus: http.StatusNotFound, + }, + { + name: "assets enabled, file system store disabled, index.html does not exist, GET /css/style.css", + path: "/css/style.css", + method: http.MethodGet, + config: config.Config{FileStore: config.S3Store, AssetsHTTPRoot: "testdata/http/assets"}, + wantStatus: http.StatusOK, + wantBody: "css", + }, + { + name: "assets enabled, file system store disabled, GET /foo.js", + path: "/foo.js", + method: http.MethodGet, + config: config.Config{FileStore: config.S3Store, AssetsHTTPRoot: "testdata/http/assets"}, + wantStatus: http.StatusOK, + wantBody: "foo", + }, + { + name: "assets enabled, file system store enabled with path prefix /store/, GET /foo.js", + path: "/foo.js", + method: http.MethodGet, + config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/"), FileStoreHTTPRoot: "testdata/http/filestore", AssetsHTTPRoot: "testdata/http/assets"}, + wantStatus: http.StatusOK, + wantBody: "foo", + }, + { + name: "assets enabled, file system store enabled with path prefix /store/, GET /store/bar.mp4", + path: "/store/bar.mp4", + method: http.MethodGet, + config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/"), FileStoreHTTPRoot: "testdata/http/filestore", AssetsHTTPRoot: "testdata/http/assets"}, + wantStatus: http.StatusOK, + wantBody: "bar", + }, + { + name: "assets enabled, file system store enabled with path prefix /store/, GET /store/", + path: "/store/", + method: http.MethodGet, + config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/"), FileStoreHTTPRoot: "testdata/http/filestore", AssetsHTTPRoot: "testdata/http/assets"}, + wantStatus: http.StatusNotFound, + }, + { + name: "assets enabled, file system store enabled with path prefix /store/, GET /", + path: "/", + method: http.MethodGet, + config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/"), FileStoreHTTPRoot: "testdata/http/filestore", AssetsHTTPRoot: "testdata/http/assets"}, + wantStatus: http.StatusOK, + wantBody: "index", + }, + { + name: "assets enabled, file system store enabled with path prefix /, GET / clobbers the assets routes", + path: "/", + method: http.MethodGet, + config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/"), FileStoreHTTPRoot: "testdata/http/filestore", AssetsHTTPRoot: "testdata/http/assets"}, + wantStatus: http.StatusNotFound, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var mediaSetService mocks.MediaSetService + handler := newHTTPHandler(nil, &mediaSetService, tc.config, zap.NewNop().Sugar()) + + req := httptest.NewRequest(tc.method, tc.path, nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + resp := w.Result() + + assert.Equal(t, tc.wantStatus, resp.StatusCode) + if tc.wantBody != "" { + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.Equal(t, tc.wantBody, string(body)) + } + }) + } +} + +func mustParseURL(t *testing.T, u string) *url.URL { + pu, err := url.Parse(u) + require.NoError(t, err) + return pu +} diff --git a/backend/server/server.go b/backend/server/server.go index a305d73..2d14b9d 100644 --- a/backend/server/server.go +++ b/backend/server/server.go @@ -1,10 +1,7 @@ package server import ( - "context" - "errors" "fmt" - "io" "net/http" "os/exec" "time" @@ -12,7 +9,6 @@ import ( "git.netflux.io/rob/clipper/config" pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set" "git.netflux.io/rob/clipper/media" - "github.com/google/uuid" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" @@ -21,7 +17,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" ) const ( @@ -72,258 +67,42 @@ type Options struct { Logger *zap.Logger } -// mediaSetServiceController implements gRPC controller for MediaSetService -type mediaSetServiceController struct { - pbmediaset.UnimplementedMediaSetServiceServer - - mediaSetService *media.MediaSetService - logger *zap.SugaredLogger -} - -// Get returns a pbMediaSet.MediaSet -func (c *mediaSetServiceController) Get(ctx context.Context, request *pbmediaset.GetRequest) (*pbmediaset.MediaSet, error) { - mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId()) - if err != nil { - return nil, newResponseError(err) - } - - result := pbmediaset.MediaSet{ - Id: mediaSet.ID.String(), - YoutubeId: mediaSet.YoutubeID, - AudioChannels: int32(mediaSet.Audio.Channels), - AudioFrames: mediaSet.Audio.Frames, - AudioApproxFrames: mediaSet.Audio.ApproxFrames, - AudioSampleRate: int32(mediaSet.Audio.SampleRate), - AudioYoutubeItag: int32(mediaSet.Audio.YoutubeItag), - AudioMimeType: mediaSet.Audio.MimeType, - VideoDuration: durationpb.New(mediaSet.Video.Duration), - VideoYoutubeItag: int32(mediaSet.Video.YoutubeItag), - VideoMimeType: mediaSet.Video.MimeType, - } - - return &result, nil -} - -// GetPeaks returns a stream of GetPeaksProgress relating to the entire audio -// part of the MediaSet. -func (c *mediaSetServiceController) GetPeaks(request *pbmediaset.GetPeaksRequest, stream pbmediaset.MediaSetService_GetPeaksServer) error { - // TODO: reduce timeout when fetching from S3 - ctx, cancel := context.WithTimeout(context.Background(), getPeaksTimeout) - defer cancel() - - id, err := uuid.Parse(request.GetId()) - if err != nil { - return newResponseError(err) - } - - reader, err := c.mediaSetService.GetPeaks(ctx, id, int(request.GetNumBins())) - if err != nil { - return newResponseError(err) - } - - for { - progress, err := reader.Next() - if err != nil && err != io.EOF { - return newResponseError(err) - } - - peaks := make([]int32, len(progress.Peaks)) - for i, p := range progress.Peaks { - peaks[i] = int32(p) - } - - progressPb := pbmediaset.GetPeaksProgress{ - PercentComplete: progress.PercentComplete, - Url: progress.URL, - Peaks: peaks, - } - stream.Send(&progressPb) - - if err == io.EOF { - break - } - } - - return nil -} - -// GetPeaksForSegment returns a set of peaks for a segment of an audio part of -// a MediaSet. -func (c *mediaSetServiceController) GetPeaksForSegment(ctx context.Context, request *pbmediaset.GetPeaksForSegmentRequest) (*pbmediaset.GetPeaksForSegmentResponse, error) { - ctx, cancel := context.WithTimeout(ctx, getPeaksForSegmentTimeout) - defer cancel() - - id, err := uuid.Parse(request.GetId()) - if err != nil { - return nil, newResponseError(err) - } - - peaks, err := c.mediaSetService.GetPeaksForSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins())) - if err != nil { - return nil, newResponseError(err) - } - - peaks32 := make([]int32, len(peaks)) - for i, p := range peaks { - peaks32[i] = int32(p) - } - - return &pbmediaset.GetPeaksForSegmentResponse{Peaks: peaks32}, nil -} - -func (c *mediaSetServiceController) GetAudioSegment(request *pbmediaset.GetAudioSegmentRequest, outStream pbmediaset.MediaSetService_GetAudioSegmentServer) error { - ctx, cancel := context.WithTimeout(context.Background(), getPeaksForSegmentTimeout) - defer cancel() - - id, err := uuid.Parse(request.GetId()) - if err != nil { - return newResponseError(err) - } - - var format media.AudioFormat - switch request.Format { - case pbmediaset.AudioFormat_MP3: - format = media.AudioFormatMP3 - case pbmediaset.AudioFormat_WAV: - format = media.AudioFormatWAV - default: - return newResponseError(errors.New("unknown format")) - } - - stream, err := c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, format) - if err != nil { - return newResponseError(err) - } - - for { - progress, err := stream.Next(ctx) - if err != nil && err != io.EOF { - return newResponseError(err) - } - - progressPb := pbmediaset.GetAudioSegmentProgress{ - PercentComplete: progress.PercentComplete, - AudioData: progress.Data, - } - - outStream.Send(&progressPb) - - if err == io.EOF { - break - } - } - - return nil -} - -func (c *mediaSetServiceController) GetVideo(request *pbmediaset.GetVideoRequest, stream pbmediaset.MediaSetService_GetVideoServer) error { - // TODO: reduce timeout when already fetched from Youtube - ctx, cancel := context.WithTimeout(context.Background(), getVideoTimeout) - defer cancel() - - id, err := uuid.Parse(request.GetId()) - if err != nil { - return newResponseError(err) - } - - reader, err := c.mediaSetService.GetVideo(ctx, id) - if err != nil { - return newResponseError(err) - } - - for { - progress, err := reader.Next() - if err != nil && err != io.EOF { - return newResponseError(err) - } - - progressPb := pbmediaset.GetVideoProgress{ - PercentComplete: progress.PercentComplete, - Url: progress.URL, - } - stream.Send(&progressPb) - - if err == io.EOF { - break - } - } - - return nil -} - -func (c *mediaSetServiceController) GetVideoThumbnail(ctx context.Context, request *pbmediaset.GetVideoThumbnailRequest) (*pbmediaset.GetVideoThumbnailResponse, error) { - id, err := uuid.Parse(request.GetId()) - if err != nil { - return nil, newResponseError(err) - } - - thumbnail, err := c.mediaSetService.GetVideoThumbnail(ctx, id) - if err != nil { - return nil, newResponseError(err) - } - - response := pbmediaset.GetVideoThumbnailResponse{ - Image: thumbnail.Data, - Width: int32(thumbnail.Width), - Height: int32(thumbnail.Height), - } - - return &response, nil -} - func Start(options Options) error { - fetchMediaSetService := media.NewMediaSetService( + conf := options.Config + + mediaSetService := media.NewMediaSetService( options.Store, options.YoutubeClient, options.FileStore, exec.CommandContext, options.WorkerPool, - options.Config, + conf, options.Logger.Sugar().Named("mediaSetService"), ) - grpcServer, err := buildGRPCServer(options.Config, options.Logger) + grpcServer, err := buildGRPCServer(conf, options.Logger) if err != nil { return fmt.Errorf("error building server: %v", err) } - mediaSetController := &mediaSetServiceController{mediaSetService: fetchMediaSetService, logger: options.Logger.Sugar().Named("controller")} + mediaSetController := &mediaSetServiceController{mediaSetService: mediaSetService, logger: options.Logger.Sugar().Named("controller")} pbmediaset.RegisterMediaSetServiceServer(grpcServer, mediaSetController) - // TODO: configure CORS - grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true })) - - log := options.Logger.Sugar() - fileHandler := http.NotFoundHandler() - - // Enabling the file system store disables serving assets over HTTP. - // TODO: fix this. - if options.Config.AssetsHTTPRoot != "" { - log.With("root", options.Config.AssetsHTTPRoot).Info("Configured to serve assets over HTTP") - fileHandler = http.FileServer(http.Dir(options.Config.AssetsHTTPRoot)) - } - if options.Config.FileStoreHTTPRoot != "" { - log.With("root", options.Config.FileStoreHTTPRoot).Info("Configured to serve file store over HTTP") - fileHandler = http.FileServer(http.Dir(options.Config.FileStoreHTTPRoot)) - } + grpcHandler := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true })) + httpHandler := newHTTPHandler(grpcHandler, mediaSetService, conf, options.Logger.Sugar().Named("httpHandler")) httpServer := http.Server{ - Addr: options.Config.BindAddr, + Addr: conf.BindAddr, ReadTimeout: options.Timeout, WriteTimeout: options.Timeout, - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if !grpcWebServer.IsGrpcWebRequest(r) && !grpcWebServer.IsAcceptableGrpcCorsRequest(r) { - fileHandler.ServeHTTP(w, r) - return - } - grpcWebServer.ServeHTTP(w, r) - }), + Handler: httpHandler, } + log := options.Logger.Sugar() log.Infof("Listening at %s", options.Config.BindAddr) - if options.Config.TLSCertFile != "" && options.Config.TLSKeyFile != "" { - return httpServer.ListenAndServeTLS(options.Config.TLSCertFile, options.Config.TLSKeyFile) + if conf.TLSCertFile != "" && conf.TLSKeyFile != "" { + return httpServer.ListenAndServeTLS(conf.TLSCertFile, conf.TLSKeyFile) } return httpServer.ListenAndServe() diff --git a/backend/server/testdata/http/assets/css/style.css b/backend/server/testdata/http/assets/css/style.css new file mode 100644 index 0000000..493ec68 --- /dev/null +++ b/backend/server/testdata/http/assets/css/style.css @@ -0,0 +1 @@ +css \ No newline at end of file diff --git a/backend/server/testdata/http/assets/foo.js b/backend/server/testdata/http/assets/foo.js new file mode 100644 index 0000000..1910281 --- /dev/null +++ b/backend/server/testdata/http/assets/foo.js @@ -0,0 +1 @@ +foo \ No newline at end of file diff --git a/backend/server/testdata/http/assets/index.html b/backend/server/testdata/http/assets/index.html new file mode 100644 index 0000000..b2d525b --- /dev/null +++ b/backend/server/testdata/http/assets/index.html @@ -0,0 +1 @@ +index \ No newline at end of file diff --git a/backend/server/testdata/http/filestore/bar.mp4 b/backend/server/testdata/http/filestore/bar.mp4 new file mode 100644 index 0000000..ba0e162 --- /dev/null +++ b/backend/server/testdata/http/filestore/bar.mp4 @@ -0,0 +1 @@ +bar \ No newline at end of file