Add POST /api/media_sets/:id/clip.
continuous-integration/drone Build is passing Details

This will allow for an HTTP/1.1 fallback for
MediaSetService.GetAudioSegment, enabling download of audio clips in
browsers that do not support the File System Access API.
This commit is contained in:
Rob Watson 2022-01-10 18:52:04 +01:00
parent c7d5541379
commit af0674eb11
10 changed files with 257 additions and 40 deletions

View File

@ -0,0 +1,36 @@
// Code generated by mockery v2.9.4. DO NOT EDIT.
package mocks
import (
context "context"
media "git.netflux.io/rob/clipper/media"
mock "github.com/stretchr/testify/mock"
)
// AudioSegmentStream is an autogenerated mock type for the AudioSegmentStream type
type AudioSegmentStream struct {
mock.Mock
}
// Next provides a mock function with given fields: ctx
func (_m *AudioSegmentStream) Next(ctx context.Context) (media.AudioSegmentProgress, error) {
ret := _m.Called(ctx)
var r0 media.AudioSegmentProgress
if rf, ok := ret.Get(0).(func(context.Context) media.AudioSegmentProgress); ok {
r0 = rf(ctx)
} else {
r0 = ret.Get(0).(media.AudioSegmentProgress)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}

View File

@ -40,15 +40,15 @@ func (_m *MediaSetService) Get(_a0 context.Context, _a1 string) (*media.MediaSet
}
// GetAudioSegment provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4
func (_m *MediaSetService) GetAudioSegment(_a0 context.Context, _a1 uuid.UUID, _a2 int64, _a3 int64, _a4 media.AudioFormat) (*media.AudioSegmentStream, error) {
func (_m *MediaSetService) GetAudioSegment(_a0 context.Context, _a1 uuid.UUID, _a2 int64, _a3 int64, _a4 media.AudioFormat) (media.AudioSegmentStream, error) {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4)
var r0 *media.AudioSegmentStream
if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, int64, int64, media.AudioFormat) *media.AudioSegmentStream); ok {
var r0 media.AudioSegmentStream
if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, int64, int64, media.AudioFormat) media.AudioSegmentStream); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*media.AudioSegmentStream)
r0 = ret.Get(0).(media.AudioSegmentStream)
}
}

View File

@ -8,8 +8,10 @@ require (
github.com/aws/aws-sdk-go-v2/credentials v1.6.5
github.com/aws/aws-sdk-go-v2/service/s3 v1.22.0
github.com/aws/smithy-go v1.9.0
github.com/gofrs/uuid v4.0.0+incompatible
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/schema v1.2.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/improbable-eng/grpc-web v0.15.0
github.com/jackc/pgconn v1.10.1

View File

@ -203,6 +203,8 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/schema v1.2.0 h1:YufUaxZYCKGFuAq3c96BOhjgd5nmXiOY9NGzF247Tsc=
github.com/gorilla/schema v1.2.0/go.mod h1:kgLaKoK1FELgZqMAVxx/5cbj0kT+57qxUrAlIO2eleU=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=

View File

@ -1,5 +1,7 @@
package media
//go:generate mockery --recursive --name AudioSegmentStream --output ../generated/mocks
import (
"bytes"
"context"
@ -42,14 +44,21 @@ type AudioSegmentProgress struct {
Data []byte
}
// AudioSegmentStream is a stream of AudioSegmentProgress structs.
type AudioSegmentStream struct {
// AudioSegmentStream implements stream of AudioSegmentProgress structs. The
// Next() method must be called until it returns io.EOF to avoid resource
// leakage.
type AudioSegmentStream interface {
Next(ctx context.Context) (AudioSegmentProgress, error)
}
// audioSegmentStream implements AudioSegmentStream.
type audioSegmentStream struct {
progressChan chan AudioSegmentProgress
errorChan chan error
}
// send publishes a new partial segment and progress update to the strean.
func (s *AudioSegmentStream) send(p []byte, percentComplete float32) {
func (s *audioSegmentStream) send(p []byte, percentComplete float32) {
s.progressChan <- AudioSegmentProgress{
Data: p,
PercentComplete: percentComplete,
@ -57,12 +66,12 @@ func (s *AudioSegmentStream) send(p []byte, percentComplete float32) {
}
// close signals the successful end of the stream of data.
func (s *AudioSegmentStream) close() {
func (s *audioSegmentStream) close() {
close(s.progressChan)
}
// closeWithError signals the unsuccessful end of a stream of data.
func (s *AudioSegmentStream) closeWithError(err error) {
func (s *audioSegmentStream) closeWithError(err error) {
s.errorChan <- err
}
@ -74,7 +83,7 @@ type audioSegmentGetter struct {
rawAudio io.ReadCloser
channels int32
outFormat AudioFormat
stream *AudioSegmentStream
stream *audioSegmentStream
bytesRead, bytesExpected int64
}
@ -88,7 +97,7 @@ func newAudioSegmentGetter(commandFunc CommandFunc, workerPool *WorkerPool, rawA
channels: channels,
bytesExpected: bytesExpected,
outFormat: outFormat,
stream: &AudioSegmentStream{
stream: &audioSegmentStream{
progressChan: make(chan AudioSegmentProgress),
errorChan: make(chan error, 1),
},
@ -122,7 +131,7 @@ func (s *audioSegmentGetter) percentComplete() float32 {
}
// Next implements AudioSegmentStream.
func (s *AudioSegmentStream) Next(ctx context.Context) (AudioSegmentProgress, error) {
func (s *audioSegmentStream) Next(ctx context.Context) (AudioSegmentProgress, error) {
select {
case progress, ok := <-s.progressChan:
if !ok {

View File

@ -451,7 +451,7 @@ func (s *MediaSetService) GetPeaksForSegment(ctx context.Context, id uuid.UUID,
return peaks, nil
}
func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, outFormat AudioFormat) (*AudioSegmentStream, error) {
func (s *MediaSetService) GetAudioSegment(ctx context.Context, id uuid.UUID, startFrame, endFrame int64, outFormat AudioFormat) (AudioSegmentStream, error) {
if startFrame > endFrame {
return nil, errors.New("invalid range")
}

View File

@ -14,15 +14,6 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)
type MediaSetService interface {
Get(context.Context, string) (*media.MediaSet, error)
GetAudioSegment(context.Context, uuid.UUID, int64, int64, media.AudioFormat) (*media.AudioSegmentStream, error)
GetPeaks(context.Context, uuid.UUID, int) (media.GetPeaksProgressReader, error)
GetPeaksForSegment(context.Context, uuid.UUID, int64, int64, int) ([]int16, error)
GetVideo(context.Context, uuid.UUID) (media.GetVideoProgressReader, error)
GetVideoThumbnail(context.Context, uuid.UUID) (media.VideoThumbnail, error)
}
// mediaSetServiceController implements gRPC controller for MediaSetService
type mediaSetServiceController struct {
pbmediaset.UnimplementedMediaSetServiceServer

View File

@ -1,19 +1,25 @@
package server
import (
"context"
"io"
"net/http"
"path/filepath"
"git.netflux.io/rob/clipper/config"
"git.netflux.io/rob/clipper/filestore"
"git.netflux.io/rob/clipper/media"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/gorilla/schema"
"github.com/improbable-eng/grpc-web/go/grpcweb"
"go.uber.org/zap"
)
type httpHandler struct {
*mux.Router
grpcHandler *grpcweb.WrappedGrpcServer
router http.Handler
mediaSetService MediaSetService
logger *zap.SugaredLogger
}
@ -34,33 +40,110 @@ func newHTTPHandler(grpcHandler *grpcweb.WrappedGrpcServer, mediaSetService Medi
// If FileSystemStore AND assets serving are both enabled,
// FileStoreHTTPBaseURL *must* be set to a value other than "/" to avoid
// clobbering the assets routes.
router := mux.NewRouter()
h := &httpHandler{
Router: mux.NewRouter(),
grpcHandler: grpcHandler,
mediaSetService: mediaSetService,
logger: logger,
}
h.
Methods("POST").
Path("/api/media_sets/{id}/clip").
HandlerFunc(h.handleClip)
if c.FileStore == config.FileSystemStore {
router.
h.
Methods("GET").
PathPrefix(c.FileStoreHTTPBaseURL.Path).
Handler(filestore.NewFileSystemStoreHTTPMiddleware(c.FileStoreHTTPBaseURL, fileStoreHandler))
}
router.
h.
Methods("GET").
Handler(assetsHandler)
return &httpHandler{
grpcHandler: grpcHandler,
router: router,
mediaSetService: mediaSetService,
logger: logger,
}
return h
}
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !h.grpcHandler.IsGrpcWebRequest(r) && !h.grpcHandler.IsAcceptableGrpcCorsRequest(r) {
h.router.ServeHTTP(w, r)
h.Router.ServeHTTP(w, r)
return
}
h.grpcHandler.ServeHTTP(w, r)
}
func (h *httpHandler) handleClip(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.Background(), getPeaksForSegmentTimeout)
defer cancel()
if err := r.ParseForm(); err != nil {
h.logger.With("err", err).Info("error parsing form")
w.WriteHeader(http.StatusBadRequest)
return
}
var params struct {
StartFrame int64 `schema:"start_frame,required"`
EndFrame int64 `schema:"end_frame,required"`
Format string `schema:"format,required"`
}
decoder := schema.NewDecoder()
if err := decoder.Decode(&params, r.PostForm); err != nil {
h.logger.With("err", err).Info("error decoding form")
w.WriteHeader(http.StatusBadRequest)
return
}
vars := mux.Vars(r)
id, err := uuid.Parse(vars["id"])
if err != nil {
h.logger.With("err", err).Info("error parsing ID")
w.WriteHeader(http.StatusNotFound)
return
}
var format media.AudioFormat
switch params.Format {
case "mp3":
format = media.AudioFormatMP3
case "wav":
format = media.AudioFormatWAV
default:
h.logger.Info("bad format")
w.WriteHeader(http.StatusBadRequest)
return
}
stream, err := h.mediaSetService.GetAudioSegment(ctx, id, params.StartFrame, params.EndFrame, format)
if err != nil {
h.logger.With("err", err).Info("error getting audio segment")
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("content-type", "audio/"+format.String())
w.WriteHeader(http.StatusOK)
var closing bool
for {
progress, err := stream.Next(ctx)
if err == io.EOF {
closing = true
} else if err != nil {
h.logger.With("err", err).Error("error reading audio segment stream")
return
}
w.Write(progress.Data)
if closing {
break
}
}
}
// indexedFileSystem is an HTTP file system which handles index.html files if
// they exist, but does not serve directory listings.
//

View File

@ -5,23 +5,28 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"git.netflux.io/rob/clipper/config"
"git.netflux.io/rob/clipper/generated/mocks"
"git.netflux.io/rob/clipper/media"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestHandler(t *testing.T) {
testCases := []struct {
name string
path string
method string
config config.Config
wantStatus int
wantBody string
name string
path, body, method, contentType string
config config.Config
wantStartFrame, wantEndFrame int64
wantAudioFormat media.AudioFormat
wantStatus int
wantContentType, wantBody string
}{
{
name: "assets disabled, file system store disabled, GET /",
@ -106,14 +111,89 @@ func TestHandler(t *testing.T) {
config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/"), FileStoreHTTPRoot: "testdata/http/filestore", AssetsHTTPRoot: "testdata/http/assets"},
wantStatus: http.StatusNotFound,
},
{
name: "POST /api/media_sets/:id/clip, NOK, no body",
path: "/api/media_sets/05951a4d-584e-4056-9ae7-08b9e4cd355d/clip",
contentType: "application/x-www-form-urlencoded",
method: http.MethodPost,
config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/")},
wantStatus: http.StatusBadRequest,
},
{
name: "POST /api/media_sets/:id/clip, NOK, missing params",
path: "/api/media_sets/05951a4d-584e-4056-9ae7-08b9e4cd355d/clip",
body: "start_frame=0&end_frame=1024",
contentType: "application/x-www-form-urlencoded",
method: http.MethodPost,
config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/")},
wantStatus: http.StatusBadRequest,
},
{
name: "POST /api/media_sets/:id/clip, NOK, invalid UUID",
path: "/api/media_sets/123/clip",
body: "start_frame=0&end_frame=1024&format=mp3",
contentType: "application/x-www-form-urlencoded",
method: http.MethodPost,
config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/")},
wantStatus: http.StatusNotFound,
},
{
name: "POST /api/media_sets/:id/clip, MP3, OK",
path: "/api/media_sets/05951a4d-584e-4056-9ae7-08b9e4cd355d/clip",
body: "start_frame=0&end_frame=1024&format=mp3",
contentType: "application/x-www-form-urlencoded",
method: http.MethodPost,
config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/")},
wantStartFrame: 0,
wantEndFrame: 1024,
wantAudioFormat: media.AudioFormatMP3,
wantContentType: "audio/mp3",
wantStatus: http.StatusOK,
wantBody: "an audio file",
},
{
name: "POST /api/media_sets/:id/clip, WAV, OK",
path: "/api/media_sets/05951a4d-584e-4056-9ae7-08b9e4cd355d/clip",
body: "start_frame=4096&end_frame=8192&format=wav",
contentType: "application/x-www-form-urlencoded",
method: http.MethodPost,
config: config.Config{FileStore: config.FileSystemStore, FileStoreHTTPBaseURL: mustParseURL(t, "/store/")},
wantStartFrame: 4096,
wantEndFrame: 8192,
wantAudioFormat: media.AudioFormatWAV,
wantContentType: "audio/wav",
wantStatus: http.StatusOK,
wantBody: "an audio file",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var stream mocks.AudioSegmentStream
stream.On("Next", mock.Anything).Return(media.AudioSegmentProgress{PercentComplete: 60, Data: []byte("an aud")}, nil).Once()
stream.On("Next", mock.Anything).Return(media.AudioSegmentProgress{PercentComplete: 80, Data: []byte("io file")}, nil).Once()
stream.On("Next", mock.Anything).Return(media.AudioSegmentProgress{PercentComplete: 100}, io.EOF).Once()
var mediaSetService mocks.MediaSetService
mediaSetService.
On("GetAudioSegment", mock.Anything, uuid.MustParse("05951a4d-584e-4056-9ae7-08b9e4cd355d"), tc.wantStartFrame, tc.wantEndFrame, tc.wantAudioFormat).
Return(&stream, nil)
if tc.wantStartFrame != 0 {
defer stream.AssertExpectations(t)
defer mediaSetService.AssertExpectations(t)
}
handler := newHTTPHandler(nil, &mediaSetService, tc.config, zap.NewNop().Sugar())
req := httptest.NewRequest(tc.method, tc.path, nil)
var body io.Reader
if tc.body != "" {
body = strings.NewReader(tc.body)
}
req := httptest.NewRequest(tc.method, tc.path, body)
if tc.contentType != "" {
req.Header.Add("content-type", tc.contentType)
}
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)
resp := w.Result()
@ -124,6 +204,9 @@ func TestHandler(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, tc.wantBody, string(body))
}
if tc.wantContentType != "" {
assert.Equal(t, tc.wantContentType, resp.Header.Get("content-type"))
}
})
}
}

View File

@ -1,6 +1,7 @@
package server
import (
"context"
"fmt"
"net/http"
"os/exec"
@ -9,6 +10,7 @@ import (
"git.netflux.io/rob/clipper/config"
pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set"
"git.netflux.io/rob/clipper/media"
"github.com/google/uuid"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
@ -36,6 +38,15 @@ const (
getVideoTimeout = time.Minute * 5
)
type MediaSetService interface {
Get(context.Context, string) (*media.MediaSet, error)
GetAudioSegment(context.Context, uuid.UUID, int64, int64, media.AudioFormat) (media.AudioSegmentStream, error)
GetPeaks(context.Context, uuid.UUID, int) (media.GetPeaksProgressReader, error)
GetPeaksForSegment(context.Context, uuid.UUID, int64, int64, int) ([]int16, error)
GetVideo(context.Context, uuid.UUID) (media.GetVideoProgressReader, error)
GetVideoThumbnail(context.Context, uuid.UUID) (media.VideoThumbnail, error)
}
type ResponseError struct {
err error
s string