clipper/backend/server/gprc_handler.go

215 lines
5.7 KiB
Go

package server
//go:generate mockery --recursive --name MediaSetService --output ../generated/mocks
import (
"context"
"errors"
"io"
pbmediaset "git.netflux.io/rob/clipper/generated/pb/media_set"
"git.netflux.io/rob/clipper/media"
"github.com/google/uuid"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/durationpb"
)
// mediaSetServiceController implements gRPC controller for MediaSetService
type mediaSetServiceController struct {
pbmediaset.UnimplementedMediaSetServiceServer
mediaSetService MediaSetService
logger *zap.SugaredLogger
}
// Get returns a pbMediaSet.MediaSet
func (c *mediaSetServiceController) Get(ctx context.Context, request *pbmediaset.GetRequest) (*pbmediaset.MediaSet, error) {
mediaSet, err := c.mediaSetService.Get(ctx, request.GetYoutubeId())
if err != nil {
return nil, newResponseError(err)
}
result := pbmediaset.MediaSet{
Id: mediaSet.ID.String(),
YoutubeId: mediaSet.YoutubeID,
AudioChannels: int32(mediaSet.Audio.Channels),
AudioFrames: mediaSet.Audio.Frames,
AudioApproxFrames: mediaSet.Audio.ApproxFrames,
AudioSampleRate: int32(mediaSet.Audio.SampleRate),
AudioYoutubeItag: int32(mediaSet.Audio.YoutubeItag),
AudioMimeType: mediaSet.Audio.MimeType,
VideoDuration: durationpb.New(mediaSet.Video.Duration),
VideoYoutubeItag: int32(mediaSet.Video.YoutubeItag),
VideoMimeType: mediaSet.Video.MimeType,
}
return &result, nil
}
// GetPeaks returns a stream of GetPeaksProgress relating to the entire audio
// part of the MediaSet.
func (c *mediaSetServiceController) GetPeaks(request *pbmediaset.GetPeaksRequest, stream pbmediaset.MediaSetService_GetPeaksServer) error {
// TODO: reduce timeout when fetching from S3
ctx, cancel := context.WithTimeout(context.Background(), getPeaksTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return newResponseError(err)
}
reader, err := c.mediaSetService.GetPeaks(ctx, id, int(request.GetNumBins()))
if err != nil {
return newResponseError(err)
}
for {
progress, err := reader.Next()
if err != nil && err != io.EOF {
return newResponseError(err)
}
peaks := make([]int32, len(progress.Peaks))
for i, p := range progress.Peaks {
peaks[i] = int32(p)
}
progressPb := pbmediaset.GetPeaksProgress{
PercentComplete: progress.PercentComplete,
Url: progress.URL,
Peaks: peaks,
}
stream.Send(&progressPb)
if err == io.EOF {
break
}
}
return nil
}
// GetPeaksForSegment returns a set of peaks for a segment of an audio part of
// a MediaSet.
func (c *mediaSetServiceController) GetPeaksForSegment(ctx context.Context, request *pbmediaset.GetPeaksForSegmentRequest) (*pbmediaset.GetPeaksForSegmentResponse, error) {
ctx, cancel := context.WithTimeout(ctx, getPeaksForSegmentTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return nil, newResponseError(err)
}
peaks, err := c.mediaSetService.GetPeaksForSegment(ctx, id, request.StartFrame, request.EndFrame, int(request.GetNumBins()))
if err != nil {
return nil, newResponseError(err)
}
peaks32 := make([]int32, len(peaks))
for i, p := range peaks {
peaks32[i] = int32(p)
}
return &pbmediaset.GetPeaksForSegmentResponse{Peaks: peaks32}, nil
}
func (c *mediaSetServiceController) GetAudioSegment(request *pbmediaset.GetAudioSegmentRequest, outStream pbmediaset.MediaSetService_GetAudioSegmentServer) error {
ctx, cancel := context.WithTimeout(context.Background(), getPeaksForSegmentTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return newResponseError(err)
}
var format media.AudioFormat
switch request.Format {
case pbmediaset.AudioFormat_MP3:
format = media.AudioFormatMP3
case pbmediaset.AudioFormat_WAV:
format = media.AudioFormatWAV
default:
return newResponseError(errors.New("unknown format"))
}
stream, err := c.mediaSetService.GetAudioSegment(ctx, id, request.StartFrame, request.EndFrame, format)
if err != nil {
return newResponseError(err)
}
for {
progress, err := stream.Next(ctx)
if err != nil && err != io.EOF {
return newResponseError(err)
}
progressPb := pbmediaset.GetAudioSegmentProgress{
PercentComplete: progress.PercentComplete,
AudioData: progress.Data,
}
outStream.Send(&progressPb)
if err == io.EOF {
break
}
}
return nil
}
func (c *mediaSetServiceController) GetVideo(request *pbmediaset.GetVideoRequest, stream pbmediaset.MediaSetService_GetVideoServer) error {
// TODO: reduce timeout when already fetched from Youtube
ctx, cancel := context.WithTimeout(context.Background(), getVideoTimeout)
defer cancel()
id, err := uuid.Parse(request.GetId())
if err != nil {
return newResponseError(err)
}
reader, err := c.mediaSetService.GetVideo(ctx, id)
if err != nil {
return newResponseError(err)
}
for {
progress, err := reader.Next()
if err != nil && err != io.EOF {
return newResponseError(err)
}
progressPb := pbmediaset.GetVideoProgress{
PercentComplete: progress.PercentComplete,
Url: progress.URL,
}
stream.Send(&progressPb)
if err == io.EOF {
break
}
}
return nil
}
func (c *mediaSetServiceController) GetVideoThumbnail(ctx context.Context, request *pbmediaset.GetVideoThumbnailRequest) (*pbmediaset.GetVideoThumbnailResponse, error) {
id, err := uuid.Parse(request.GetId())
if err != nil {
return nil, newResponseError(err)
}
thumbnail, err := c.mediaSetService.GetVideoThumbnail(ctx, id)
if err != nil {
return nil, newResponseError(err)
}
response := pbmediaset.GetVideoThumbnailResponse{
Image: thumbnail.Data,
Width: int32(thumbnail.Width),
Height: int32(thumbnail.Height),
}
return &response, nil
}