diff --git a/backend/generated/mocks/AudioSegmentStream.go b/backend/generated/mocks/AudioSegmentStream.go new file mode 100644 index 0000000..5ec3804 --- /dev/null +++ b/backend/generated/mocks/AudioSegmentStream.go @@ -0,0 +1,36 @@ +// Code generated by mockery v2.9.4. DO NOT EDIT. + +package mocks + +import ( + context "context" + + media "git.netflux.io/rob/clipper/media" + mock "github.com/stretchr/testify/mock" +) + +// AudioSegmentStream is an autogenerated mock type for the AudioSegmentStream type +type AudioSegmentStream struct { + mock.Mock +} + +// Next provides a mock function with given fields: ctx +func (_m *AudioSegmentStream) Next(ctx context.Context) (media.AudioSegmentProgress, error) { + ret := _m.Called(ctx) + + var r0 media.AudioSegmentProgress + if rf, ok := ret.Get(0).(func(context.Context) media.AudioSegmentProgress); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(media.AudioSegmentProgress) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/backend/generated/mocks/MediaSetService.go b/backend/generated/mocks/MediaSetService.go index 5368d10..03c49da 100644 --- a/backend/generated/mocks/MediaSetService.go +++ b/backend/generated/mocks/MediaSetService.go @@ -40,15 +40,15 @@ func (_m *MediaSetService) Get(_a0 context.Context, _a1 string) (*media.MediaSet } // GetAudioSegment provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 -func (_m *MediaSetService) GetAudioSegment(_a0 context.Context, _a1 uuid.UUID, _a2 int64, _a3 int64, _a4 media.AudioFormat) (*media.AudioSegmentStream, error) { +func (_m *MediaSetService) GetAudioSegment(_a0 context.Context, _a1 uuid.UUID, _a2 int64, _a3 int64, _a4 media.AudioFormat) (media.AudioSegmentStream, error) { ret := _m.Called(_a0, _a1, _a2, _a3, _a4) - var r0 *media.AudioSegmentStream - if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, int64, int64, media.AudioFormat) *media.AudioSegmentStream); ok { + var r0 media.AudioSegmentStream + if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, int64, int64, media.AudioFormat) media.AudioSegmentStream); ok { r0 = rf(_a0, _a1, _a2, _a3, _a4) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*media.AudioSegmentStream) + r0 = ret.Get(0).(media.AudioSegmentStream) } } diff --git a/backend/go.mod b/backend/go.mod index 8f82040..e611061 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -8,8 +8,10 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.6.5 github.com/aws/aws-sdk-go-v2/service/s3 v1.22.0 github.com/aws/smithy-go v1.9.0 + github.com/gofrs/uuid v4.0.0+incompatible github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 + github.com/gorilla/schema v1.2.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/improbable-eng/grpc-web v0.15.0 github.com/jackc/pgconn v1.10.1 diff --git a/backend/go.sum b/backend/go.sum index 2f7315e..ba57bf8 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -203,6 +203,8 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/schema v1.2.0 h1:YufUaxZYCKGFuAq3c96BOhjgd5nmXiOY9NGzF247Tsc= +github.com/gorilla/schema v1.2.0/go.mod h1:kgLaKoK1FELgZqMAVxx/5cbj0kT+57qxUrAlIO2eleU= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/backend/media/get_segment.go b/backend/media/get_segment.go index 99eb91f..4895a8c 100644 --- a/backend/media/get_segment.go +++ b/backend/media/get_segment.go @@ -1,5 +1,7 @@ package media +//go:generate mockery --recursive --name AudioSegmentStream --output ../generated/mocks + import ( "bytes" "context" @@ -42,14 +44,21 @@ type AudioSegmentProgress struct { Data []byte } -// AudioSegmentStream is a stream of AudioSegmentProgress structs. -type AudioSegmentStream struct { +// AudioSegmentStream implements stream of AudioSegmentProgress structs. The +// Next() method must be called until it returns io.EOF to avoid resource +// leakage. +type AudioSegmentStream interface { + Next(ctx context.Context) (AudioSegmentProgress, error) +} + +// audioSegmentStream implements AudioSegmentStream. +type audioSegmentStream struct { progressChan chan AudioSegmentProgress errorChan chan error } // send publishes a new partial segment and progress update to the strean. -func (s *AudioSegmentStream) send(p []byte, percentComplete float32) { +func (s *audioSegmentStream) send(p []byte, percentComplete float32) { s.progressChan <- AudioSegmentProgress{ Data: p, PercentComplete: percentComplete, @@ -57,12 +66,12 @@ func (s *AudioSegmentStream) send(p []byte, percentComplete float32) { } // close signals the successful end of the stream of data. -func (s *AudioSegmentStream) close() { +func (s *audioSegmentStream) close() { close(s.progressChan) } // closeWithError signals the unsuccessful end of a stream of data. -func (s *AudioSegmentStream) closeWithError(err error) { +func (s *audioSegmentStream) closeWithError(err error) { s.errorChan <- err } @@ -74,7 +83,7 @@ type audioSegmentGetter struct { rawAudio io.ReadCloser channels int32 outFormat AudioFormat - stream *AudioSegmentStream + stream *audioSegmentStream bytesRead, bytesExpected int64 } @@ -88,7 +97,7 @@ func newAudioSegmentGetter(commandFunc CommandFunc, workerPool *WorkerPool, rawA channels: channels, bytesExpected: bytesExpected, outFormat: outFormat, - stream: &AudioSegmentStream{ + stream: &audioSegmentStream{ progressChan: make(chan AudioSegmentProgress), errorChan: make(chan error, 1), }, @@ -122,7 +131,7 @@ func (s *audioSegmentGetter) percentComplete() float32 { } // Next implements AudioSegmentStream. -func (s *AudioSegmentStream) Next(ctx context.Context) (AudioSegmentProgress, error) { +func (s *audioSegmentStream) Next(ctx context.Context) (AudioSegmentProgress, error) { select { case progress, ok := <-s.progressChan: if !ok { diff --git a/backend/media/service.go b/backend/media/service.go index 2c22ea6..6a9ec16 100644 --- a/backend/media/service.go +++ b/backend/media/service.go @@ -451,7 +451,7 @@ func (s *MediaSetService) GetPeaksForSegment(ctx context.Context, id uuid.UUID, return peaks, nil } -func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, outFormat AudioFormat) (*AudioSegmentStream, error) { +func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, outFormat AudioFormat) (AudioSegmentStream, error) { if startFrame > endFrame { return nil, errors.New("invalid range") } diff --git a/backend/server/gprc_handler.go b/backend/server/gprc_handler.go index 0f60d6f..8e76b9b 100644 --- a/backend/server/gprc_handler.go +++ b/backend/server/gprc_handler.go @@ -14,15 +14,6 @@ import ( "google.golang.org/protobuf/types/known/durationpb" ) -type MediaSetService interface { - Get(context.Context, string) (*media.MediaSet, error) - GetAudioSegment(context.Context, uuid.UUID, int64, int64, media.AudioFormat) (*media.AudioSegmentStream, error) - GetPeaks(context.Context, uuid.UUID, int) (media.GetPeaksProgressReader, error) - GetPeaksForSegment(context.Context, uuid.UUID, int64, int64, int) ([]int16, error) - GetVideo(context.Context, uuid.UUID) (media.GetVideoProgressReader, error) - GetVideoThumbnail(context.Context, uuid.UUID) (media.VideoThumbnail, error) -} - // mediaSetServiceController implements gRPC controller for MediaSetService type mediaSetServiceController struct { pbmediaset.UnimplementedMediaSetServiceServer diff --git a/backend/server/handler.go b/backend/server/handler.go index d5a6abc..f481811 100644 --- a/backend/server/handler.go +++ b/backend/server/handler.go @@ -1,19 +1,25 @@ package server import ( + "context" + "io" "net/http" "path/filepath" "git.netflux.io/rob/clipper/config" "git.netflux.io/rob/clipper/filestore" + "git.netflux.io/rob/clipper/media" + "github.com/google/uuid" "github.com/gorilla/mux" + "github.com/gorilla/schema" "github.com/improbable-eng/grpc-web/go/grpcweb" "go.uber.org/zap" ) type httpHandler struct { + *mux.Router + grpcHandler *grpcweb.WrappedGrpcServer - router http.Handler mediaSetService MediaSetService logger *zap.SugaredLogger } @@ -34,33 +40,110 @@ func newHTTPHandler(grpcHandler *grpcweb.WrappedGrpcServer, mediaSetService Medi // If FileSystemStore AND assets serving are both enabled, // FileStoreHTTPBaseURL *must* be set to a value other than "/" to avoid // clobbering the assets routes. - router := mux.NewRouter() + h := &httpHandler{ + Router: mux.NewRouter(), + grpcHandler: grpcHandler, + mediaSetService: mediaSetService, + logger: logger, + } + + h. + Methods("POST"). + Path("/api/media_sets/{id}/clip"). + HandlerFunc(h.handleClip) + if c.FileStore == config.FileSystemStore { - router. + h. Methods("GET"). PathPrefix(c.FileStoreHTTPBaseURL.Path). Handler(filestore.NewFileSystemStoreHTTPMiddleware(c.FileStoreHTTPBaseURL, fileStoreHandler)) } - router. + + h. Methods("GET"). Handler(assetsHandler) - return &httpHandler{ - grpcHandler: grpcHandler, - router: router, - mediaSetService: mediaSetService, - logger: logger, - } + return h } func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !h.grpcHandler.IsGrpcWebRequest(r) && !h.grpcHandler.IsAcceptableGrpcCorsRequest(r) { - h.router.ServeHTTP(w, r) + h.Router.ServeHTTP(w, r) return } h.grpcHandler.ServeHTTP(w, r) } +func (h *httpHandler) handleClip(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(context.Background(), getPeaksForSegmentTimeout) + defer cancel() + + if err := r.ParseForm(); err != nil { + h.logger.With("err", err).Info("error parsing form") + w.WriteHeader(http.StatusBadRequest) + return + } + + var params struct { + StartFrame int64 `schema:"start_frame,required"` + EndFrame int64 `schema:"end_frame,required"` + Format string `schema:"format,required"` + } + decoder := schema.NewDecoder() + if err := decoder.Decode(¶ms, r.PostForm); err != nil { + h.logger.With("err", err).Info("error decoding form") + w.WriteHeader(http.StatusBadRequest) + return + } + + vars := mux.Vars(r) + id, err := uuid.Parse(vars["id"]) + if err != nil { + h.logger.With("err", err).Info("error parsing ID") + w.WriteHeader(http.StatusNotFound) + return + } + + var format media.AudioFormat + switch params.Format { + case "mp3": + format = media.AudioFormatMP3 + case "wav": + format = media.AudioFormatWAV + default: + h.logger.Info("bad format") + w.WriteHeader(http.StatusBadRequest) + return + } + + stream, err := h.mediaSetService.GetAudioSegment(ctx, id, params.StartFrame, params.EndFrame, format) + if err != nil { + h.logger.With("err", err).Info("error getting audio segment") + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Header().Set("content-type", "audio/"+format.String()) + w.WriteHeader(http.StatusOK) + + var closing bool + for { + progress, err := stream.Next(ctx) + if err == io.EOF { + closing = true + } else if err != nil { + h.logger.With("err", err).Error("error reading audio segment stream") + return + } + + w.Write(progress.Data) + + if closing { + break + } + } +} + // indexedFileSystem is an HTTP file system which handles index.html files if // they exist, but does not serve directory listings. // diff --git a/backend/server/handler_test.go b/backend/server/handler_test.go index 6faf910..b1ad507 100644 --- a/backend/server/handler_test.go +++ b/backend/server/handler_test.go @@ -5,23 +5,28 @@ import ( "net/http" "net/http/httptest" "net/url" + "strings" "testing" "git.netflux.io/rob/clipper/config" "git.netflux.io/rob/clipper/generated/mocks" + "git.netflux.io/rob/clipper/media" + "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" ) func TestHandler(t *testing.T) { testCases := []struct { - name string - path string - method string - config config.Config - wantStatus int - wantBody string + name string + path, body, method, contentType string + config config.Config + wantStartFrame, wantEndFrame int64 + wantAudioFormat media.AudioFormat + wantStatus int + wantContentType, wantBody string }{ { name: "assets disabled, file system store disabled, GET /", @@ -106,14 +111,89 @@ func TestHandler(t *testing.T) { config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/"), FileStoreHTTPRoot: "testdata/http/filestore", AssetsHTTPRoot: "testdata/http/assets"}, wantStatus: http.StatusNotFound, }, + { + name: "POST /api/media_sets/:id/clip, NOK, no body", + path: "/api/media_sets/05951a4d-584e-4056-9ae7-08b9e4cd355d/clip", + contentType: "application/x-www-form-urlencoded", + method: http.MethodPost, + config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/")}, + wantStatus: http.StatusBadRequest, + }, + { + name: "POST /api/media_sets/:id/clip, NOK, missing params", + path: "/api/media_sets/05951a4d-584e-4056-9ae7-08b9e4cd355d/clip", + body: "start_frame=0&end_frame=1024", + contentType: "application/x-www-form-urlencoded", + method: http.MethodPost, + config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/")}, + wantStatus: http.StatusBadRequest, + }, + { + name: "POST /api/media_sets/:id/clip, NOK, invalid UUID", + path: "/api/media_sets/123/clip", + body: "start_frame=0&end_frame=1024&format=mp3", + contentType: "application/x-www-form-urlencoded", + method: http.MethodPost, + config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/")}, + wantStatus: http.StatusNotFound, + }, + { + name: "POST /api/media_sets/:id/clip, MP3, OK", + path: "/api/media_sets/05951a4d-584e-4056-9ae7-08b9e4cd355d/clip", + body: "start_frame=0&end_frame=1024&format=mp3", + contentType: "application/x-www-form-urlencoded", + method: http.MethodPost, + config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/")}, + wantStartFrame: 0, + wantEndFrame: 1024, + wantAudioFormat: media.AudioFormatMP3, + wantContentType: "audio/mp3", + wantStatus: http.StatusOK, + wantBody: "an audio file", + }, + { + name: "POST /api/media_sets/:id/clip, WAV, OK", + path: "/api/media_sets/05951a4d-584e-4056-9ae7-08b9e4cd355d/clip", + body: "start_frame=4096&end_frame=8192&format=wav", + contentType: "application/x-www-form-urlencoded", + method: http.MethodPost, + config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/")}, + wantStartFrame: 4096, + wantEndFrame: 8192, + wantAudioFormat: media.AudioFormatWAV, + wantContentType: "audio/wav", + wantStatus: http.StatusOK, + wantBody: "an audio file", + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + var stream mocks.AudioSegmentStream + stream.On("Next", mock.Anything).Return(media.AudioSegmentProgress{PercentComplete: 60, Data: []byte("an aud")}, nil).Once() + stream.On("Next", mock.Anything).Return(media.AudioSegmentProgress{PercentComplete: 80, Data: []byte("io file")}, nil).Once() + stream.On("Next", mock.Anything).Return(media.AudioSegmentProgress{PercentComplete: 100}, io.EOF).Once() + var mediaSetService mocks.MediaSetService + mediaSetService. + On("GetAudioSegment", mock.Anything, uuid.MustParse("05951a4d-584e-4056-9ae7-08b9e4cd355d"), tc.wantStartFrame, tc.wantEndFrame, tc.wantAudioFormat). + Return(&stream, nil) + if tc.wantStartFrame != 0 { + defer stream.AssertExpectations(t) + defer mediaSetService.AssertExpectations(t) + } + handler := newHTTPHandler(nil, &mediaSetService, tc.config, zap.NewNop().Sugar()) - req := httptest.NewRequest(tc.method, tc.path, nil) + var body io.Reader + if tc.body != "" { + body = strings.NewReader(tc.body) + } + req := httptest.NewRequest(tc.method, tc.path, body) + if tc.contentType != "" { + req.Header.Add("content-type", tc.contentType) + } + w := httptest.NewRecorder() handler.ServeHTTP(w, req) resp := w.Result() @@ -124,6 +204,9 @@ func TestHandler(t *testing.T) { require.NoError(t, err) assert.Equal(t, tc.wantBody, string(body)) } + if tc.wantContentType != "" { + assert.Equal(t, tc.wantContentType, resp.Header.Get("content-type")) + } }) } } diff --git a/backend/server/server.go b/backend/server/server.go index 2d14b9d..0cd1a60 100644 --- a/backend/server/server.go +++ b/backend/server/server.go @@ -1,6 +1,7 @@ package server import ( + "context" "fmt" "net/http" "os/exec" @@ -9,6 +10,7 @@ import ( "git.netflux.io/rob/clipper/config" pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set" "git.netflux.io/rob/clipper/media" + "github.com/google/uuid" grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware" grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" @@ -36,6 +38,15 @@ const ( getVideoTimeout = time.Minute * 5 ) +type MediaSetService interface { + Get(context.Context, string) (*media.MediaSet, error) + GetAudioSegment(context.Context, uuid.UUID, int64, int64, media.AudioFormat) (media.AudioSegmentStream, error) + GetPeaks(context.Context, uuid.UUID, int) (media.GetPeaksProgressReader, error) + GetPeaksForSegment(context.Context, uuid.UUID, int64, int64, int) ([]int16, error) + GetVideo(context.Context, uuid.UUID) (media.GetVideoProgressReader, error) + GetVideoThumbnail(context.Context, uuid.UUID) (media.VideoThumbnail, error) +} + type ResponseError struct { err error s string