Refactor response handling.

- Add FileSystem implementation to handle HTTP index files but constrain
  directory listings
- Refactor server implementation into a dedicated handler struct
- Add test coverage
This commit is contained in:
Rob Watson 2022-01-10 18:45:10 +01:00
parent 8a26b75127
commit c7d5541379
14 changed files with 658 additions and 248 deletions

View File

@ -3,9 +3,11 @@ package config
import (
"errors"
"fmt"
"net/url"
"os"
"runtime"
"strconv"
"strings"
)
type Environment int
@ -30,7 +32,7 @@ type Config struct {
DatabaseURL string
FileStore FileStore
FileStoreHTTPRoot string
FileStoreHTTPBaseURL string
FileStoreHTTPBaseURL *url.URL
AWSAccessKeyID string
AWSSecretAccessKey string
AWSRegion string
@ -80,9 +82,13 @@ func NewFromEnv() (Config, error) {
return Config{}, fmt.Errorf("invalid FILE_STORE value: %s", fileStoreString)
}
fileStoreHTTPBaseURL := os.Getenv("FILE_STORE_HTTP_BASE_URL")
if fileStoreHTTPBaseURL == "" {
fileStoreHTTPBaseURL = "/"
fileStoreHTTPBaseURLString := os.Getenv("FILE_STORE_HTTP_BASE_URL")
if !strings.HasSuffix(fileStoreHTTPBaseURLString, "/") {
fileStoreHTTPBaseURLString += "/"
}
fileStoreHTTPBaseURL, err := url.Parse(fileStoreHTTPBaseURLString)
if err != nil {
return Config{}, fmt.Errorf("invalid FILE_STORE_HTTP_BASE_URL: %v", err)
}
var awsAccessKeyID, awsSecretAccessKey, awsRegion, s3Bucket, fileStoreHTTPRoot string

View File

@ -4,12 +4,20 @@ import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
)
// NewFileSystemStoreHTTPMiddleware returns an HTTP middleware which strips the
// base URL path prefix from incoming paths, suitable for passing to an
// appropriately-configured FileSystemStore.
func NewFileSystemStoreHTTPMiddleware(baseURL *url.URL, next http.Handler) http.Handler {
return http.StripPrefix(baseURL.Path, next)
}
// FileSystemStore is a file store that stores files on the local filesystem.
// It is currently intended for usage in a development environment.
type FileSystemStore struct {
@ -21,15 +29,12 @@ type FileSystemStore struct {
// which is the storage location on the local file system for stored objects,
// and a baseURL which is a URL which should be configured to serve the stored
// files over HTTP.
func NewFileSystemStore(rootPath string, baseURL string) (*FileSystemStore, error) {
url, err := url.Parse(baseURL)
if err != nil {
return nil, fmt.Errorf("error parsing URL: %v", err)
}
func NewFileSystemStore(rootPath string, baseURL *url.URL) (*FileSystemStore, error) {
url := *baseURL
if !strings.HasSuffix(url.Path, "/") {
url.Path += "/"
}
return &FileSystemStore{rootPath: rootPath, baseURL: url}, nil
return &FileSystemStore{rootPath: rootPath, baseURL: &url}, nil
}
// GetObject retrieves an object from the local filesystem.

View File

@ -3,6 +3,7 @@ package filestore_test
import (
"context"
"io/ioutil"
"net/url"
"os"
"path"
"strings"
@ -14,7 +15,9 @@ import (
)
func TestFileStoreGetObject(t *testing.T) {
store, err := filestore.NewFileSystemStore("testdata/", "/")
baseURL, err := url.Parse("/")
require.NoError(t, err)
store, err := filestore.NewFileSystemStore("testdata/", baseURL)
require.NoError(t, err)
reader, err := store.GetObject(context.Background(), "file.txt")
require.NoError(t, err)
@ -60,7 +63,9 @@ func TestFileStoreGetObjectWithRange(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
store, err := filestore.NewFileSystemStore("testdata/", "/")
baseURL, err := url.Parse("/")
require.NoError(t, err)
store, err := filestore.NewFileSystemStore("testdata/", baseURL)
require.NoError(t, err)
reader, err := store.GetObjectWithRange(context.Background(), "file.txt", tc.start, tc.end)
@ -113,7 +118,9 @@ func TestFileStoreGetURL(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
store, err := filestore.NewFileSystemStore("testdata/", tc.baseURL)
baseURL, err := url.Parse(tc.baseURL)
require.NoError(t, err)
store, err := filestore.NewFileSystemStore("testdata/", baseURL)
require.NoError(t, err)
url, err := store.GetURL(context.Background(), tc.key)
@ -149,7 +156,9 @@ func TestFileStorePutObject(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
store, err := filestore.NewFileSystemStore(rootPath, "/")
baseURL, err := url.Parse("/")
require.NoError(t, err)
store, err := filestore.NewFileSystemStore(rootPath, baseURL)
require.NoError(t, err)
n, err := store.PutObject(context.Background(), tc.key, strings.NewReader(tc.content), "text/plain")

View File

@ -0,0 +1,153 @@
// Code generated by mockery v2.9.4. DO NOT EDIT.
package mocks
import (
context "context"
media "git.netflux.io/rob/clipper/media"
mock "github.com/stretchr/testify/mock"
uuid "github.com/google/uuid"
)
// MediaSetService is an autogenerated mock type for the MediaSetService type
type MediaSetService struct {
mock.Mock
}
// Get provides a mock function with given fields: _a0, _a1
func (_m *MediaSetService) Get(_a0 context.Context, _a1 string) (*media.MediaSet, error) {
ret := _m.Called(_a0, _a1)
var r0 *media.MediaSet
if rf, ok := ret.Get(0).(func(context.Context, string) *media.MediaSet); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*media.MediaSet)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetAudioSegment provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4
func (_m *MediaSetService) GetAudioSegment(_a0 context.Context, _a1 uuid.UUID, _a2 int64, _a3 int64, _a4 media.AudioFormat) (*media.AudioSegmentStream, error) {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4)
var r0 *media.AudioSegmentStream
if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, int64, int64, media.AudioFormat) *media.AudioSegmentStream); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*media.AudioSegmentStream)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, uuid.UUID, int64, int64, media.AudioFormat) error); ok {
r1 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetPeaks provides a mock function with given fields: _a0, _a1, _a2
func (_m *MediaSetService) GetPeaks(_a0 context.Context, _a1 uuid.UUID, _a2 int) (media.GetPeaksProgressReader, error) {
ret := _m.Called(_a0, _a1, _a2)
var r0 media.GetPeaksProgressReader
if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, int) media.GetPeaksProgressReader); ok {
r0 = rf(_a0, _a1, _a2)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(media.GetPeaksProgressReader)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, uuid.UUID, int) error); ok {
r1 = rf(_a0, _a1, _a2)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetPeaksForSegment provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4
func (_m *MediaSetService) GetPeaksForSegment(_a0 context.Context, _a1 uuid.UUID, _a2 int64, _a3 int64, _a4 int) ([]int16, error) {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4)
var r0 []int16
if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, int64, int64, int) []int16); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int16)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, uuid.UUID, int64, int64, int) error); ok {
r1 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetVideo provides a mock function with given fields: _a0, _a1
func (_m *MediaSetService) GetVideo(_a0 context.Context, _a1 uuid.UUID) (media.GetVideoProgressReader, error) {
ret := _m.Called(_a0, _a1)
var r0 media.GetVideoProgressReader
if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID) media.GetVideoProgressReader); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(media.GetVideoProgressReader)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, uuid.UUID) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetVideoThumbnail provides a mock function with given fields: _a0, _a1
func (_m *MediaSetService) GetVideoThumbnail(_a0 context.Context, _a1 uuid.UUID) (media.VideoThumbnail, error) {
ret := _m.Called(_a0, _a1)
var r0 media.VideoThumbnail
if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID) media.VideoThumbnail); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(media.VideoThumbnail)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, uuid.UUID) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View File

@ -9,6 +9,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.22.0
github.com/aws/smithy-go v1.9.0
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/improbable-eng/grpc-web v0.15.0
github.com/jackc/pgconn v1.10.1

View File

@ -201,6 +201,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=

View File

@ -0,0 +1,223 @@
package server
//go:generate mockery --recursive --name MediaSetService --output ../generated/mocks
import (
"context"
"errors"
"io"
pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set"
"git.netflux.io/rob/clipper/media"
"github.com/google/uuid"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/durationpb"
)
type MediaSetService interface {
Get(context.Context, string) (*media.MediaSet, error)
GetAudioSegment(context.Context, uuid.UUID, int64, int64, media.AudioFormat) (*media.AudioSegmentStream, error)
GetPeaks(context.Context, uuid.UUID, int) (media.GetPeaksProgressReader, error)
GetPeaksForSegment(context.Context, uuid.UUID, int64, int64, int) ([]int16, error)
GetVideo(context.Context, uuid.UUID) (media.GetVideoProgressReader, error)
GetVideoThumbnail(context.Context, uuid.UUID) (media.VideoThumbnail, error)
}
// mediaSetServiceController implements gRPC controller for MediaSetService
type mediaSetServiceController struct {
pbmediaset.UnimplementedMediaSetServiceServer
mediaSetService MediaSetService
logger *zap.SugaredLogger
}
// Get returns a pbMediaSet.MediaSet
func (c *mediaSetServiceController) Get(ctx context.Context, request *pbmediaset.GetRequest) (*pbmediaset.MediaSet, error) {
mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId())
if err != nil {
return nil, newResponseError(err)
}
result := pbmediaset.MediaSet{
Id: mediaSet.ID.String(),
YoutubeId: mediaSet.YoutubeID,
AudioChannels: int32(mediaSet.Audio.Channels),
AudioFrames: mediaSet.Audio.Frames,
AudioApproxFrames: mediaSet.Audio.ApproxFrames,
AudioSampleRate: int32(mediaSet.Audio.SampleRate),
AudioYoutubeItag: int32(mediaSet.Audio.YoutubeItag),
AudioMimeType: mediaSet.Audio.MimeType,
VideoDuration: durationpb.New(mediaSet.Video.Duration),
VideoYoutubeItag: int32(mediaSet.Video.YoutubeItag),
VideoMimeType: mediaSet.Video.MimeType,
}
return &result, nil
}
// GetPeaks returns a stream of GetPeaksProgress relating to the entire audio
// part of the MediaSet.
func (c *mediaSetServiceController) GetPeaks(request *pbmediaset.GetPeaksRequest, stream pbmediaset.MediaSetService_GetPeaksServer) error {
// TODO: reduce timeout when fetching from S3
ctx, cancel := context.WithTimeout(context.Background(), getPeaksTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return newResponseError(err)
}
reader, err := c.mediaSetService.GetPeaks(ctx, id, int(request.GetNumBins()))
if err != nil {
return newResponseError(err)
}
for {
progress, err := reader.Next()
if err != nil && err != io.EOF {
return newResponseError(err)
}
peaks := make([]int32, len(progress.Peaks))
for i, p := range progress.Peaks {
peaks[i] = int32(p)
}
progressPb := pbmediaset.GetPeaksProgress{
PercentComplete: progress.PercentComplete,
Url: progress.URL,
Peaks: peaks,
}
stream.Send(&progressPb)
if err == io.EOF {
break
}
}
return nil
}
// GetPeaksForSegment returns a set of peaks for a segment of an audio part of
// a MediaSet.
func (c *mediaSetServiceController) GetPeaksForSegment(ctx context.Context, request *pbmediaset.GetPeaksForSegmentRequest) (*pbmediaset.GetPeaksForSegmentResponse, error) {
ctx, cancel := context.WithTimeout(ctx, getPeaksForSegmentTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return nil, newResponseError(err)
}
peaks, err := c.mediaSetService.GetPeaksForSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins()))
if err != nil {
return nil, newResponseError(err)
}
peaks32 := make([]int32, len(peaks))
for i, p := range peaks {
peaks32[i] = int32(p)
}
return &pbmediaset.GetPeaksForSegmentResponse{Peaks: peaks32}, nil
}
func (c *mediaSetServiceController) GetAudioSegment(request *pbmediaset.GetAudioSegmentRequest, outStream pbmediaset.MediaSetService_GetAudioSegmentServer) error {
ctx, cancel := context.WithTimeout(context.Background(), getPeaksForSegmentTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return newResponseError(err)
}
var format media.AudioFormat
switch request.Format {
case pbmediaset.AudioFormat_MP3:
format = media.AudioFormatMP3
case pbmediaset.AudioFormat_WAV:
format = media.AudioFormatWAV
default:
return newResponseError(errors.New("unknown format"))
}
stream, err := c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, format)
if err != nil {
return newResponseError(err)
}
for {
progress, err := stream.Next(ctx)
if err != nil && err != io.EOF {
return newResponseError(err)
}
progressPb := pbmediaset.GetAudioSegmentProgress{
PercentComplete: progress.PercentComplete,
AudioData: progress.Data,
}
outStream.Send(&progressPb)
if err == io.EOF {
break
}
}
return nil
}
func (c *mediaSetServiceController) GetVideo(request *pbmediaset.GetVideoRequest, stream pbmediaset.MediaSetService_GetVideoServer) error {
// TODO: reduce timeout when already fetched from Youtube
ctx, cancel := context.WithTimeout(context.Background(), getVideoTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return newResponseError(err)
}
reader, err := c.mediaSetService.GetVideo(ctx, id)
if err != nil {
return newResponseError(err)
}
for {
progress, err := reader.Next()
if err != nil && err != io.EOF {
return newResponseError(err)
}
progressPb := pbmediaset.GetVideoProgress{
PercentComplete: progress.PercentComplete,
Url: progress.URL,
}
stream.Send(&progressPb)
if err == io.EOF {
break
}
}
return nil
}
func (c *mediaSetServiceController) GetVideoThumbnail(ctx context.Context, request *pbmediaset.GetVideoThumbnailRequest) (*pbmediaset.GetVideoThumbnailResponse, error) {
id, err := uuid.Parse(request.GetId())
if err != nil {
return nil, newResponseError(err)
}
thumbnail, err := c.mediaSetService.GetVideoThumbnail(ctx, id)
if err != nil {
return nil, newResponseError(err)
}
response := pbmediaset.GetVideoThumbnailResponse{
Image: thumbnail.Data,
Width: int32(thumbnail.Width),
Height: int32(thumbnail.Height),
}
return &response, nil
}

93
backend/server/handler.go Normal file
View File

@ -0,0 +1,93 @@
package server
import (
"net/http"
"path/filepath"
"git.netflux.io/rob/clipper/config"
"git.netflux.io/rob/clipper/filestore"
"github.com/gorilla/mux"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"go.uber.org/zap"
)
type httpHandler struct {
grpcHandler *grpcweb.WrappedGrpcServer
router http.Handler
mediaSetService MediaSetService
logger *zap.SugaredLogger
}
func newHTTPHandler(grpcHandler *grpcweb.WrappedGrpcServer, mediaSetService MediaSetService, c config.Config, logger *zap.SugaredLogger) *httpHandler {
fileStoreHandler := http.NotFoundHandler()
if c.FileStoreHTTPRoot != "" {
logger.With("root", c.FileStoreHTTPRoot, "baseURL", c.FileStoreHTTPBaseURL.String()).Info("Configured to serve file store over HTTP")
fileStoreHandler = http.FileServer(&indexedFileSystem{http.Dir(c.FileStoreHTTPRoot)})
}
assetsHandler := http.NotFoundHandler()
if c.AssetsHTTPRoot != "" {
logger.With("root", c.AssetsHTTPRoot).Info("Configured to serve assets over HTTP")
assetsHandler = http.FileServer(&indexedFileSystem{http.Dir(c.AssetsHTTPRoot)})
}
// If FileSystemStore AND assets serving are both enabled,
// FileStoreHTTPBaseURL *must* be set to a value other than "/" to avoid
// clobbering the assets routes.
router := mux.NewRouter()
if c.FileStore == config.FileSystemStore {
router.
Methods("GET").
PathPrefix(c.FileStoreHTTPBaseURL.Path).
Handler(filestore.NewFileSystemStoreHTTPMiddleware(c.FileStoreHTTPBaseURL, fileStoreHandler))
}
router.
Methods("GET").
Handler(assetsHandler)
return &httpHandler{
grpcHandler: grpcHandler,
router: router,
mediaSetService: mediaSetService,
logger: logger,
}
}
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !h.grpcHandler.IsGrpcWebRequest(r) && !h.grpcHandler.IsAcceptableGrpcCorsRequest(r) {
h.router.ServeHTTP(w, r)
return
}
h.grpcHandler.ServeHTTP(w, r)
}
// indexedFileSystem is an HTTP file system which handles index.html files if
// they exist, but does not serve directory listings.
//
// Ref: https://www.alexedwards.net/blog/disable-http-fileserver-directory-listings
type indexedFileSystem struct {
httpFS http.FileSystem
}
func (ifs *indexedFileSystem) Open(path string) (http.File, error) {
f, err := ifs.httpFS.Open(path)
if err != nil {
return nil, err
}
s, err := f.Stat()
if err != nil {
return nil, err
}
if !s.IsDir() {
return f, nil
}
index := filepath.Join(path, "index.html")
if _, err := ifs.httpFS.Open(index); err != nil {
_ = f.Close() // ignore error
return nil, err
}
return f, nil
}

View File

@ -0,0 +1,135 @@
package server
import (
"io"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"git.netflux.io/rob/clipper/config"
"git.netflux.io/rob/clipper/generated/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestHandler(t *testing.T) {
testCases := []struct {
name string
path string
method string
config config.Config
wantStatus int
wantBody string
}{
{
name: "assets disabled, file system store disabled, GET /",
path: "/",
method: http.MethodGet,
config: config.Config{FileStore: config.S3Store},
wantStatus: http.StatusNotFound,
},
{
name: "assets disabled, file system store disabled, GET /foo.js",
path: "/foo.js",
method: http.MethodGet,
config: config.Config{FileStore: config.S3Store},
wantStatus: http.StatusNotFound,
},
{
name: "assets enabled, file system store disabled, index.html exists, GET /",
path: "/",
method: http.MethodGet,
config: config.Config{FileStore: config.S3Store, AssetsHTTPRoot: "testdata/http/assets"},
wantStatus: http.StatusOK,
wantBody: "index",
},
{
name: "assets enabled, file system store disabled, index.html does not exist, GET /css/",
path: "/css/",
method: http.MethodGet,
config: config.Config{FileStore: config.S3Store, AssetsHTTPRoot: "testdata/http/assets"},
wantStatus: http.StatusNotFound,
},
{
name: "assets enabled, file system store disabled, index.html does not exist, GET /css/style.css",
path: "/css/style.css",
method: http.MethodGet,
config: config.Config{FileStore: config.S3Store, AssetsHTTPRoot: "testdata/http/assets"},
wantStatus: http.StatusOK,
wantBody: "css",
},
{
name: "assets enabled, file system store disabled, GET /foo.js",
path: "/foo.js",
method: http.MethodGet,
config: config.Config{FileStore: config.S3Store, AssetsHTTPRoot: "testdata/http/assets"},
wantStatus: http.StatusOK,
wantBody: "foo",
},
{
name: "assets enabled, file system store enabled with path prefix /store/, GET /foo.js",
path: "/foo.js",
method: http.MethodGet,
config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/"), FileStoreHTTPRoot: "testdata/http/filestore", AssetsHTTPRoot: "testdata/http/assets"},
wantStatus: http.StatusOK,
wantBody: "foo",
},
{
name: "assets enabled, file system store enabled with path prefix /store/, GET /store/bar.mp4",
path: "/store/bar.mp4",
method: http.MethodGet,
config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/"), FileStoreHTTPRoot: "testdata/http/filestore", AssetsHTTPRoot: "testdata/http/assets"},
wantStatus: http.StatusOK,
wantBody: "bar",
},
{
name: "assets enabled, file system store enabled with path prefix /store/, GET /store/",
path: "/store/",
method: http.MethodGet,
config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/"), FileStoreHTTPRoot: "testdata/http/filestore", AssetsHTTPRoot: "testdata/http/assets"},
wantStatus: http.StatusNotFound,
},
{
name: "assets enabled, file system store enabled with path prefix /store/, GET /",
path: "/",
method: http.MethodGet,
config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/"), FileStoreHTTPRoot: "testdata/http/filestore", AssetsHTTPRoot: "testdata/http/assets"},
wantStatus: http.StatusOK,
wantBody: "index",
},
{
name: "assets enabled, file system store enabled with path prefix /, GET / clobbers the assets routes",
path: "/",
method: http.MethodGet,
config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/"), FileStoreHTTPRoot: "testdata/http/filestore", AssetsHTTPRoot: "testdata/http/assets"},
wantStatus: http.StatusNotFound,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var mediaSetService mocks.MediaSetService
handler := newHTTPHandler(nil, &mediaSetService, tc.config, zap.NewNop().Sugar())
req := httptest.NewRequest(tc.method, tc.path, nil)
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
resp := w.Result()
assert.Equal(t, tc.wantStatus, resp.StatusCode)
if tc.wantBody != "" {
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, tc.wantBody, string(body))
}
})
}
}
func mustParseURL(t *testing.T, u string) *url.URL {
pu, err := url.Parse(u)
require.NoError(t, err)
return pu
}

View File

@ -1,10 +1,7 @@
package server
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os/exec"
"time"
@ -12,7 +9,6 @@ import (
"git.netflux.io/rob/clipper/config"
pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set"
"git.netflux.io/rob/clipper/media"
"github.com/google/uuid"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
@ -21,7 +17,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
)
const (
@ -72,258 +67,42 @@ type Options struct {
Logger *zap.Logger
}
// mediaSetServiceController implements gRPC controller for MediaSetService
type mediaSetServiceController struct {
pbmediaset.UnimplementedMediaSetServiceServer
mediaSetService *media.MediaSetService
logger *zap.SugaredLogger
}
// Get returns a pbMediaSet.MediaSet
func (c *mediaSetServiceController) Get(ctx context.Context, request *pbmediaset.GetRequest) (*pbmediaset.MediaSet, error) {
mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId())
if err != nil {
return nil, newResponseError(err)
}
result := pbmediaset.MediaSet{
Id: mediaSet.ID.String(),
YoutubeId: mediaSet.YoutubeID,
AudioChannels: int32(mediaSet.Audio.Channels),
AudioFrames: mediaSet.Audio.Frames,
AudioApproxFrames: mediaSet.Audio.ApproxFrames,
AudioSampleRate: int32(mediaSet.Audio.SampleRate),
AudioYoutubeItag: int32(mediaSet.Audio.YoutubeItag),
AudioMimeType: mediaSet.Audio.MimeType,
VideoDuration: durationpb.New(mediaSet.Video.Duration),
VideoYoutubeItag: int32(mediaSet.Video.YoutubeItag),
VideoMimeType: mediaSet.Video.MimeType,
}
return &result, nil
}
// GetPeaks returns a stream of GetPeaksProgress relating to the entire audio
// part of the MediaSet.
func (c *mediaSetServiceController) GetPeaks(request *pbmediaset.GetPeaksRequest, stream pbmediaset.MediaSetService_GetPeaksServer) error {
// TODO: reduce timeout when fetching from S3
ctx, cancel := context.WithTimeout(context.Background(), getPeaksTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return newResponseError(err)
}
reader, err := c.mediaSetService.GetPeaks(ctx, id, int(request.GetNumBins()))
if err != nil {
return newResponseError(err)
}
for {
progress, err := reader.Next()
if err != nil && err != io.EOF {
return newResponseError(err)
}
peaks := make([]int32, len(progress.Peaks))
for i, p := range progress.Peaks {
peaks[i] = int32(p)
}
progressPb := pbmediaset.GetPeaksProgress{
PercentComplete: progress.PercentComplete,
Url: progress.URL,
Peaks: peaks,
}
stream.Send(&progressPb)
if err == io.EOF {
break
}
}
return nil
}
// GetPeaksForSegment returns a set of peaks for a segment of an audio part of
// a MediaSet.
func (c *mediaSetServiceController) GetPeaksForSegment(ctx context.Context, request *pbmediaset.GetPeaksForSegmentRequest) (*pbmediaset.GetPeaksForSegmentResponse, error) {
ctx, cancel := context.WithTimeout(ctx, getPeaksForSegmentTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return nil, newResponseError(err)
}
peaks, err := c.mediaSetService.GetPeaksForSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins()))
if err != nil {
return nil, newResponseError(err)
}
peaks32 := make([]int32, len(peaks))
for i, p := range peaks {
peaks32[i] = int32(p)
}
return &pbmediaset.GetPeaksForSegmentResponse{Peaks: peaks32}, nil
}
func (c *mediaSetServiceController) GetAudioSegment(request *pbmediaset.GetAudioSegmentRequest, outStream pbmediaset.MediaSetService_GetAudioSegmentServer) error {
ctx, cancel := context.WithTimeout(context.Background(), getPeaksForSegmentTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return newResponseError(err)
}
var format media.AudioFormat
switch request.Format {
case pbmediaset.AudioFormat_MP3:
format = media.AudioFormatMP3
case pbmediaset.AudioFormat_WAV:
format = media.AudioFormatWAV
default:
return newResponseError(errors.New("unknown format"))
}
stream, err := c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, format)
if err != nil {
return newResponseError(err)
}
for {
progress, err := stream.Next(ctx)
if err != nil && err != io.EOF {
return newResponseError(err)
}
progressPb := pbmediaset.GetAudioSegmentProgress{
PercentComplete: progress.PercentComplete,
AudioData: progress.Data,
}
outStream.Send(&progressPb)
if err == io.EOF {
break
}
}
return nil
}
func (c *mediaSetServiceController) GetVideo(request *pbmediaset.GetVideoRequest, stream pbmediaset.MediaSetService_GetVideoServer) error {
// TODO: reduce timeout when already fetched from Youtube
ctx, cancel := context.WithTimeout(context.Background(), getVideoTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return newResponseError(err)
}
reader, err := c.mediaSetService.GetVideo(ctx, id)
if err != nil {
return newResponseError(err)
}
for {
progress, err := reader.Next()
if err != nil && err != io.EOF {
return newResponseError(err)
}
progressPb := pbmediaset.GetVideoProgress{
PercentComplete: progress.PercentComplete,
Url: progress.URL,
}
stream.Send(&progressPb)
if err == io.EOF {
break
}
}
return nil
}
func (c *mediaSetServiceController) GetVideoThumbnail(ctx context.Context, request *pbmediaset.GetVideoThumbnailRequest) (*pbmediaset.GetVideoThumbnailResponse, error) {
id, err := uuid.Parse(request.GetId())
if err != nil {
return nil, newResponseError(err)
}
thumbnail, err := c.mediaSetService.GetVideoThumbnail(ctx, id)
if err != nil {
return nil, newResponseError(err)
}
response := pbmediaset.GetVideoThumbnailResponse{
Image: thumbnail.Data,
Width: int32(thumbnail.Width),
Height: int32(thumbnail.Height),
}
return &response, nil
}
func Start(options Options) error {
fetchMediaSetService := media.NewMediaSetService(
conf := options.Config
mediaSetService := media.NewMediaSetService(
options.Store,
options.YoutubeClient,
options.FileStore,
exec.CommandContext,
options.WorkerPool,
options.Config,
conf,
options.Logger.Sugar().Named("mediaSetService"),
)
grpcServer, err := buildGRPCServer(options.Config, options.Logger)
grpcServer, err := buildGRPCServer(conf, options.Logger)
if err != nil {
return fmt.Errorf("error building server: %v", err)
}
mediaSetController := &mediaSetServiceController{mediaSetService: fetchMediaSetService, logger: options.Logger.Sugar().Named("controller")}
mediaSetController := &mediaSetServiceController{mediaSetService: mediaSetService, logger: options.Logger.Sugar().Named("controller")}
pbmediaset.RegisterMediaSetServiceServer(grpcServer, mediaSetController)
// TODO: configure CORS
grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true }))
log := options.Logger.Sugar()
fileHandler := http.NotFoundHandler()
// Enabling the file system store disables serving assets over HTTP.
// TODO: fix this.
if options.Config.AssetsHTTPRoot != "" {
log.With("root", options.Config.AssetsHTTPRoot).Info("Configured to serve assets over HTTP")
fileHandler = http.FileServer(http.Dir(options.Config.AssetsHTTPRoot))
}
if options.Config.FileStoreHTTPRoot != "" {
log.With("root", options.Config.FileStoreHTTPRoot).Info("Configured to serve file store over HTTP")
fileHandler = http.FileServer(http.Dir(options.Config.FileStoreHTTPRoot))
}
grpcHandler := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true }))
httpHandler := newHTTPHandler(grpcHandler, mediaSetService, conf, options.Logger.Sugar().Named("httpHandler"))
httpServer := http.Server{
Addr: options.Config.BindAddr,
Addr: conf.BindAddr,
ReadTimeout: options.Timeout,
WriteTimeout: options.Timeout,
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !grpcWebServer.IsGrpcWebRequest(r) && !grpcWebServer.IsAcceptableGrpcCorsRequest(r) {
fileHandler.ServeHTTP(w, r)
return
}
grpcWebServer.ServeHTTP(w, r)
}),
Handler: httpHandler,
}
log := options.Logger.Sugar()
log.Infof("Listening at %s", options.Config.BindAddr)
if options.Config.TLSCertFile != "" && options.Config.TLSKeyFile != "" {
return httpServer.ListenAndServeTLS(options.Config.TLSCertFile, options.Config.TLSKeyFile)
if conf.TLSCertFile != "" && conf.TLSKeyFile != "" {
return httpServer.ListenAndServeTLS(conf.TLSCertFile, conf.TLSKeyFile)
}
return httpServer.ListenAndServe()

View File

@ -0,0 +1 @@
css

View File

@ -0,0 +1 @@
foo

View File

@ -0,0 +1 @@
index

View File

@ -0,0 +1 @@
bar