164 lines
4.4 KiB
Go
164 lines
4.4 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
pbMediaSet "git.netflux.io/rob/clipper/generated/pb/media_set"
|
|
"git.netflux.io/rob/clipper/media"
|
|
"github.com/google/uuid"
|
|
"github.com/improbable-eng/grpc-web/go/grpcweb"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/grpclog"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/types/known/durationpb"
|
|
)
|
|
|
|
func init() {
|
|
grpclog.SetLogger(log.New(os.Stdout, "server: ", log.LstdFlags))
|
|
}
|
|
|
|
const (
|
|
// ts-proto generates code that automatically retries for a subset of gRPC
|
|
// response codes. To avoid invoking this behaviour, default to returning a
|
|
// Cancelled code for now.
|
|
// See https://github.com/stephenh/ts-proto/blob/459b94f5b2988d58d186461332e888c3e511603a/src/generate-grpc-web.ts#L293
|
|
// and https://github.com/stephenh/ts-proto/pull/131.
|
|
defaultResponseCode = codes.Canceled
|
|
defaultResponseMessage = "An unexpected error occurred"
|
|
)
|
|
|
|
type ResponseError struct {
|
|
err error
|
|
s string
|
|
}
|
|
|
|
func (r *ResponseError) Error() string {
|
|
return fmt.Sprintf("unexpected error: %v", r.err.Error())
|
|
}
|
|
|
|
func (r *ResponseError) Unwrap() error {
|
|
return r.err
|
|
}
|
|
|
|
func (r *ResponseError) GRPCStatus() *status.Status {
|
|
return status.New(defaultResponseCode, r.s)
|
|
}
|
|
|
|
func newResponseError(err error) *ResponseError {
|
|
return &ResponseError{err: err, s: defaultResponseMessage}
|
|
}
|
|
|
|
type Options struct {
|
|
BindAddr string
|
|
Timeout time.Duration
|
|
Store media.Store
|
|
YoutubeClient media.YoutubeClient
|
|
S3Client media.S3Client
|
|
}
|
|
|
|
const (
|
|
getAudioTimeout = time.Minute * 5
|
|
)
|
|
|
|
// mediaSetServiceController implements gRPC controller for MediaSetService
|
|
type mediaSetServiceController struct {
|
|
pbMediaSet.UnimplementedMediaSetServiceServer
|
|
|
|
mediaSetService *media.MediaSetService
|
|
}
|
|
|
|
// Get returns a pbMediaSet.MediaSet
|
|
func (c *mediaSetServiceController) Get(ctx context.Context, request *pbMediaSet.GetRequest) (*pbMediaSet.MediaSet, error) {
|
|
mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId())
|
|
if err != nil {
|
|
return nil, newResponseError(err)
|
|
}
|
|
|
|
result := pbMediaSet.MediaSet{
|
|
Id: mediaSet.ID.String(),
|
|
YoutubeId: mediaSet.YoutubeID,
|
|
AudioChannels: int32(mediaSet.Audio.Channels),
|
|
AudioFrames: mediaSet.Audio.Frames,
|
|
AudioApproxFrames: mediaSet.Audio.ApproxFrames,
|
|
AudioSampleRate: int32(mediaSet.Audio.SampleRate),
|
|
AudioYoutubeItag: int32(mediaSet.Audio.YoutubeItag),
|
|
AudioMimeType: mediaSet.Audio.MimeType,
|
|
VideoDuration: durationpb.New(mediaSet.Video.Duration),
|
|
VideoYoutubeItag: int32(mediaSet.Video.YoutubeItag),
|
|
VideoMimeType: mediaSet.Video.MimeType,
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
// GetAudio streams the progress report of GetAudio.
|
|
func (c *mediaSetServiceController) GetAudio(request *pbMediaSet.GetAudioRequest, stream pbMediaSet.MediaSetService_GetAudioServer) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), getAudioTimeout)
|
|
defer cancel()
|
|
|
|
id, err := uuid.Parse(request.GetId())
|
|
if err != nil {
|
|
return newResponseError(err)
|
|
}
|
|
|
|
reader, err := c.mediaSetService.GetAudio(ctx, id, int(request.GetNumBins()))
|
|
if err != nil {
|
|
return newResponseError(err)
|
|
}
|
|
|
|
for {
|
|
progress, err := reader.Read()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return newResponseError(err)
|
|
}
|
|
|
|
// TODO: consider using int32 throughout the backend flow to avoid this.
|
|
peaks := make([]int32, len(progress.Peaks))
|
|
for i, p := range progress.Peaks {
|
|
peaks[i] = int32(p)
|
|
}
|
|
|
|
progressPb := pbMediaSet.GetAudioProgress{
|
|
PercentCompleted: progress.PercentComplete,
|
|
Peaks: peaks,
|
|
}
|
|
|
|
stream.Send(&progressPb)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func Start(options Options) error {
|
|
grpcServer := grpc.NewServer()
|
|
|
|
fetchMediaSetService := media.NewMediaSetService(options.Store, options.YoutubeClient, options.S3Client)
|
|
|
|
pbMediaSet.RegisterMediaSetServiceServer(grpcServer, &mediaSetServiceController{mediaSetService: fetchMediaSetService})
|
|
|
|
// TODO: configure CORS
|
|
grpcWebServer := grpcweb.WrapServer(grpcServer, grpcweb.WithOriginFunc(func(string) bool { return true }))
|
|
handler := func(w http.ResponseWriter, r *http.Request) {
|
|
grpcWebServer.ServeHTTP(w, r)
|
|
}
|
|
|
|
httpServer := http.Server{
|
|
Addr: options.BindAddr,
|
|
ReadTimeout: options.Timeout,
|
|
WriteTimeout: options.Timeout,
|
|
Handler: http.HandlerFunc(handler),
|
|
}
|
|
|
|
return httpServer.ListenAndServe()
|
|
}
|